This is an automated email from the ASF dual-hosted git repository.
leet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new 5e9cf70 METRON-2005 Batch Writer writes 0-byte files to HDFS on
rotation (justinleet) closes apache/metron#1338
5e9cf70 is described below
commit 5e9cf705ef5feb1d723f584c1b6134bcc1eda9cf
Author: justinleet <[email protected]>
AuthorDate: Tue Feb 19 14:09:02 2019 -0500
METRON-2005 Batch Writer writes 0-byte files to HDFS on rotation
(justinleet) closes apache/metron#1338
---
metron-platform/metron-writer/README.md | 3 +
metron-platform/metron-writer/pom.xml | 5 ++
.../writer/hdfs/ClonedSyncPolicyCreator.java | 3 +
.../org/apache/metron/writer/hdfs/HdfsWriter.java | 74 +++++++++++++------
.../apache/metron/writer/hdfs/SourceHandler.java | 25 +++++--
.../metron/writer/hdfs/SourceHandlerCallback.java | 23 +++++-
.../metron/writer/hdfs/SourceHandlerKey.java | 8 ++
.../apache/metron/writer/hdfs/HdfsWriterTest.java | 38 ----------
.../metron/writer/hdfs/SourceHandlerTest.java | 86 ++++++++++++++++++++++
9 files changed, 194 insertions(+), 71 deletions(-)
diff --git a/metron-platform/metron-writer/README.md
b/metron-platform/metron-writer/README.md
index bbec39b..ed4f053 100644
--- a/metron-platform/metron-writer/README.md
+++ b/metron-platform/metron-writer/README.md
@@ -65,6 +65,9 @@ To manage the output path, a base path argument is provided
by the Flux file, wi
This means that all output will land in `/apps/metron/`. With no further
adjustment, it will be `/apps/metron/<sensor>/`.
However, by modifying the sensor's JSON config, it is possible to provide
additional pathing based on the the message itself.
+The output format of a file will be
`{prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}`. Notably,
because of the way
+file rotations are handled by the HdfsWriter, `rotationNum` will always be 0,
but RotationActions still get executed normally.
+
E.g.
```
{
diff --git a/metron-platform/metron-writer/pom.xml
b/metron-platform/metron-writer/pom.xml
index 0002f7a..a11ce6e 100644
--- a/metron-platform/metron-writer/pom.xml
+++ b/metron-platform/metron-writer/pom.xml
@@ -213,6 +213,11 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>stellar-common</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java
index 4d32fc9..1f908a9 100644
---
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java
+++
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/ClonedSyncPolicyCreator.java
@@ -37,6 +37,9 @@ public class ClonedSyncPolicyCreator implements
SyncPolicyCreator {
// SyncPolicy object does not implement Cloneable, so we'll need to
clone it via serialization
//to get a fresh policy object. Note: this would be expensive if it was
in the critical path,
// but should be called infrequently (once per sync).
+
+ // Reset the SyncPolicy to ensure that the new count properly resets.
+ syncPolicy.reset();
byte[] serializedForm = SerDeUtils.toBytes(syncPolicy);
return SerDeUtils.fromBytes(serializedForm, SyncPolicy.class);
}
diff --git
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
index 1ba9a6b..aaa58fa 100644
---
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
+++
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -17,29 +17,37 @@
*/
package org.apache.metron.writer.hdfs;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.metron.common.configuration.IndexingConfigurations;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.writer.BulkMessageWriter;
+import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.stellar.common.StellarProcessor;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.MapVariableResolver;
import org.apache.metron.stellar.dsl.StellarFunctions;
import org.apache.metron.stellar.dsl.VariableResolver;
-import org.apache.metron.stellar.common.StellarProcessor;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.writer.BulkMessageWriter;
-import org.apache.metron.common.writer.BulkWriterResponse;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
import org.apache.storm.hdfs.common.rotation.RotationAction;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
import org.json.simple.JSONObject;
-
-import java.io.*;
-import java.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable
{
+ private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
List<RotationAction> rotationActions = new ArrayList<>();
FileRotationPolicy rotationPolicy = new NoRotationPolicy();
SyncPolicy syncPolicy;
@@ -82,38 +90,48 @@ public class HdfsWriter implements
BulkMessageWriter<JSONObject>, Serializable {
this.fileNameFormat.prepare(stormConfig,topologyContext);
if(syncPolicy != null) {
//if the user has specified the sync policy, we don't want to override
their wishes.
+ LOG.debug("Using user specified sync policy {}",
syncPolicy.getClass().getSimpleName());
syncPolicyCreator = new ClonedSyncPolicyCreator(syncPolicy);
}
else {
//if the user has not, then we want to have the sync policy depend on
the batch size.
+ LOG.debug("No user specified sync policy, using CountSyncPolicy based on
batch size");
syncPolicyCreator = (source, config) -> new CountSyncPolicy(config ==
null?1:config.getBatchSize(source));
}
}
-
@Override
public BulkWriterResponse write(String sourceType
, WriterConfiguration configurations
, Iterable<Tuple> tuples
, List<JSONObject> messages
- ) throws Exception
- {
+ ) throws Exception {
BulkWriterResponse response = new BulkWriterResponse();
// Currently treating all the messages in a group for pass/failure.
- try {
- // Messages can all result in different HDFS paths, because of Stellar
Expressions, so we'll need to iterate through
- for(JSONObject message : messages) {
- String path = getHdfsPathExtension(
- sourceType,
-
(String)configurations.getSensorConfig(sourceType).getOrDefault(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF,
""),
- message
- );
+ // Messages can all result in different HDFS paths, because of Stellar
Expressions, so we'll need to iterate through
+ for (JSONObject message : messages) {
+ String path = getHdfsPathExtension(
+ sourceType,
+ (String) configurations.getSensorConfig(sourceType)
+ .getOrDefault(IndexingConfigurations.OUTPUT_PATH_FUNCTION_CONF,
""),
+ message
+ );
+
+ try {
+ LOG.trace("Writing message {} to path: {}", message.toJSONString(),
path);
SourceHandler handler = getSourceHandler(sourceType, path,
configurations);
handler.handle(message, sourceType, configurations, syncPolicyCreator);
+ } catch (Exception e) {
+ LOG.error(
+ "HdfsWriter encountered error writing. Source type: {}. #
messages: {}. Output path: {}.",
+ sourceType,
+ messages.size(),
+ path,
+ e
+ );
+ response.addAllErrors(e, tuples);
}
- } catch (Exception e) {
- response.addAllErrors(e, tuples);
}
response.addAllSuccesses(tuples);
@@ -123,6 +141,7 @@ public class HdfsWriter implements
BulkMessageWriter<JSONObject>, Serializable {
public String getHdfsPathExtension(String sourceType, String
stellarFunction, JSONObject message) {
// If no function is provided, just use the sourceType directly
if(stellarFunction == null || stellarFunction.trim().isEmpty()) {
+ LOG.debug("No HDFS path extension provided; using source type {}
directly", sourceType);
return sourceType;
}
@@ -130,7 +149,9 @@ public class HdfsWriter implements
BulkMessageWriter<JSONObject>, Serializable {
VariableResolver resolver = new MapVariableResolver(message);
Object objResult = stellarProcessor.parse(stellarFunction, resolver,
StellarFunctions.FUNCTION_RESOLVER(), Context.EMPTY_CONTEXT());
if(objResult != null && !(objResult instanceof String)) {
- throw new IllegalArgumentException("Stellar Function <" +
stellarFunction + "> did not return a String value. Returned: " + objResult);
+ String errorMsg = "Stellar Function <" + stellarFunction + "> did not
return a String value. Returned: " + objResult;
+ LOG.error(errorMsg);
+ throw new IllegalArgumentException(errorMsg);
}
return objResult == null ? "" : (String)objResult;
}
@@ -143,6 +164,7 @@ public class HdfsWriter implements
BulkMessageWriter<JSONObject>, Serializable {
@Override
public void close() {
for(SourceHandler handler : sourceHandlerMap.values()) {
+ LOG.debug("Closing SourceHandler {}", handler.toString());
handler.close();
}
// Everything is closed, so just clear it
@@ -154,13 +176,17 @@ public class HdfsWriter implements
BulkMessageWriter<JSONObject>, Serializable {
SourceHandler ret = sourceHandlerMap.get(key);
if(ret == null) {
if(sourceHandlerMap.size() >= maxOpenFiles) {
- throw new IllegalStateException("Too many HDFS files open!");
+ String errorMsg = "Too many HDFS files open! Maximum number of open
files is: " + maxOpenFiles +
+ ". Current number of open files is: " + sourceHandlerMap.size();
+ LOG.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
}
ret = new SourceHandler(rotationActions,
rotationPolicy,
syncPolicyCreator.create(sourceType, config),
new
PathExtensionFileNameFormat(key.getStellarResult(), fileNameFormat),
new SourceHandlerCallback(sourceHandlerMap,
key));
+ LOG.debug("Placing key in sourceHandlerMap: {}", key);
sourceHandlerMap.put(key, ret);
}
return ret;
diff --git
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
index b841249..d94b7cf 100644
---
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
+++
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
@@ -49,7 +49,6 @@ public class SourceHandler {
FileNameFormat fileNameFormat;
SourceHandlerCallback cleanupCallback;
private long offset = 0;
- private int rotation = 0;
private transient FSDataOutputStream out;
private transient final Object writeLock = new Object();
protected transient Timer rotationTimer; // only used for TimedRotationPolicy
@@ -89,6 +88,7 @@ public class SourceHandler {
this.offset += bytes.length;
if (this.syncPolicy.mark(null, this.offset)) {
+ LOG.debug("Calling hsync per Sync Policy");
if (this.out instanceof HdfsDataOutputStream) {
((HdfsDataOutputStream) this.out)
.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
@@ -97,11 +97,13 @@ public class SourceHandler {
}
//recreate the sync policy for the next batch just in case something
changed in the config
//and the sync policy depends on the config.
+ LOG.debug("Recreating sync policy");
this.syncPolicy = syncPolicyCreator.create(sensor, config);
}
}
if (this.rotationPolicy.mark(null, this.offset)) {
+ LOG.debug("Rotating due to rotationPolicy");
rotateOutputFile(); // synchronized
this.offset = 0;
this.rotationPolicy.reset();
@@ -109,8 +111,10 @@ public class SourceHandler {
}
private void initialize() throws IOException {
+ LOG.debug("Initializing Source Handler");
this.fs = FileSystem.get(new Configuration());
this.currentFile = createOutputFile();
+ LOG.debug("Source Handler initialized with starting file: {}",
currentFile);
if(this.rotationPolicy instanceof TimedRotationPolicy){
long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval();
this.rotationTimer = new Timer(true);
@@ -118,6 +122,7 @@ public class SourceHandler {
@Override
public void run() {
try {
+ LOG.debug("Rotating output file from TimerTask");
rotateOutputFile();
} catch(IOException e){
LOG.warn("IOException during scheduled file rotation.", e);
@@ -128,28 +133,30 @@ public class SourceHandler {
}
}
+ // Closes the output file, but ensures any RotationActions are performed.
protected void rotateOutputFile() throws IOException {
- LOG.info("Rotating output file...");
+ LOG.debug("Rotating output file...");
long start = System.currentTimeMillis();
synchronized (this.writeLock) {
closeOutputFile();
// Want to use the callback to make sure we have an accurate count of
open files.
cleanupCallback();
- this.rotation++;
- Path newFile = createOutputFile();
- LOG.info("Performing {} file rotation actions.",
this.rotationActions.size());
+ LOG.debug("Performing {} file rotation actions.",
this.rotationActions.size());
for (RotationAction action : this.rotationActions) {
action.execute(this.fs, this.currentFile);
}
- this.currentFile = newFile;
}
long time = System.currentTimeMillis() - start;
LOG.info("File rotation took {} ms", time);
}
private Path createOutputFile() throws IOException {
- Path path = new Path(this.fileNameFormat.getPath(),
this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
+ // The rotation is set to 0. With the way open files are tracked and
managed with the callback, there will
+ // never be data that would go into a rotation > 0. Instead a new
SourceHandler, and by extension file, will
+ // be created.
+ Path path = new Path(this.fileNameFormat.getPath(),
this.fileNameFormat.getName(0, System.currentTimeMillis()));
+ LOG.debug("Creating new output file: {}", path.getName());
if(fs.getScheme().equals("file")) {
//in the situation where we're running this in a local filesystem,
flushing doesn't work.
fs.mkdirs(path.getParent());
@@ -172,6 +179,9 @@ public class SourceHandler {
public void close() {
try {
closeOutputFile();
+ if(rotationTimer != null) {
+ rotationTimer.cancel();
+ }
// Don't call cleanup, to avoid HashMap's
ConcurrentModificationException while iterating
} catch (IOException e) {
throw new RuntimeException("Unable to close output file.", e);
@@ -186,7 +196,6 @@ public class SourceHandler {
", syncPolicy=" + syncPolicy +
", fileNameFormat=" + fileNameFormat +
", offset=" + offset +
- ", rotation=" + rotation +
", out=" + out +
", writeLock=" + writeLock +
", rotationTimer=" + rotationTimer +
diff --git
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java
index bfd1daf..89089f9 100644
---
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java
+++
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerCallback.java
@@ -18,9 +18,18 @@
package org.apache.metron.writer.hdfs;
+import java.lang.invoke.MethodHandles;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Callback intended to be able to manage open files in {@link HdfsWriter}.
This callback will close
+ * the associated {@link SourceHandler} and remove it from the map of open
files.
+ */
public class SourceHandlerCallback {
+ private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
Map<SourceHandlerKey, SourceHandler> sourceHandlerMap;
SourceHandlerKey key;
SourceHandlerCallback(Map<SourceHandlerKey, SourceHandler> sourceHandlerMap,
SourceHandlerKey key) {
@@ -28,8 +37,20 @@ public class SourceHandlerCallback {
this.key = key;
}
+ /**
+ * Removes {@link SourceHandler} from the map of open files. Also closes it
to ensure resources such as
+ * {@link java.util.Timer} is closed.
+ */
public void removeKey() {
- sourceHandlerMap.remove(key);
+ SourceHandler removed = sourceHandlerMap.remove(key);
+ if(removed != null) {
+ removed.close();
+ }
+ LOG.debug("Removed {} -> {}. Current state of sourceHandlerMap: {}",
+ key,
+ removed,
+ sourceHandlerMap
+ );
}
}
diff --git
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java
index 6bf0917..ce5f33a 100644
---
a/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java
+++
b/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/hdfs/SourceHandlerKey.java
@@ -58,5 +58,13 @@ class SourceHandlerKey {
result = 31 * result + (stellarResult != null ? stellarResult.hashCode() :
0);
return result;
}
+
+ @Override
+ public String toString() {
+ return "SourceHandlerKey{" +
+ "sourceType='" + sourceType + '\'' +
+ ", stellarResult='" + stellarResult + '\'' +
+ '}';
+ }
}
diff --git
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
index 832f8bf..09ecafc 100644
---
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
+++
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/HdfsWriterTest.java
@@ -453,44 +453,6 @@ public class HdfsWriterTest {
}
}
- @Test
- @SuppressWarnings("unchecked")
- public void testHandleAttemptsRotateIfStreamClosed() throws Exception {
- String function = "FORMAT('test-%s/%s', test.key, test.key)";
- WriterConfiguration config = buildWriterConfiguration(function);
- HdfsWriter writer = new HdfsWriter().withFileNameFormat(testFormat);
- writer.init(new HashMap<String, String>(), createTopologyContext(),
config);
-
- JSONObject message = new JSONObject();
- message.put("test.key", "test.value");
- ArrayList<JSONObject> messages = new ArrayList<>();
- messages.add(message);
- ArrayList<Tuple> tuples = new ArrayList<>();
-
- CountSyncPolicy basePolicy = new CountSyncPolicy(5);
- ClonedSyncPolicyCreator creator = new ClonedSyncPolicyCreator(basePolicy);
-
- writer.write(SENSOR_NAME, config, tuples, messages);
- writer.getSourceHandler(SENSOR_NAME, "test-test.value/test.value",
config).closeOutputFile();
- writer.getSourceHandler(SENSOR_NAME, "test-test.value/test.value",
config).handle(message, SENSOR_NAME, config, creator);
- writer.close();
-
- File outputFolder = new File(folder.getAbsolutePath() +
"/test-test.value/test.value/");
-
- // The message should show up twice, once in each file
- ArrayList<String> expected = new ArrayList<>();
- expected.add(message.toJSONString());
-
- // Assert this went into a new file because it actually rotated
- Assert.assertEquals(2, outputFolder.listFiles().length);
- for (File file : outputFolder.listFiles()) {
- List<String> lines = Files.readAllLines(file.toPath());
- // One line per file
- Assert.assertEquals(1, lines.size());
- Assert.assertEquals(expected, lines);
- }
- }
-
protected WriterConfiguration buildWriterConfiguration(String function) {
IndexingConfigurations indexingConfig = new IndexingConfigurations();
Map<String, Object> sensorIndexingConfig = new HashMap<>();
diff --git
a/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/SourceHandlerTest.java
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/SourceHandlerTest.java
new file mode 100644
index 0000000..b4f3d46
--- /dev/null
+++
b/metron-platform/metron-writer/src/test/java/org/apache/metron/writer/hdfs/SourceHandlerTest.java
@@ -0,0 +1,86 @@
+package org.apache.metron.writer.hdfs;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy.Units;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.common.rotation.RotationAction;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import static org.mockito.Mockito.*;
+
+public class SourceHandlerTest {
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static final String SENSOR_NAME = "sensor";
+ private static final String WRITER_NAME = "writerName";
+
+ private File folder;
+ private FileNameFormat testFormat;
+
+ RotationAction rotAction1 = mock(RotationAction.class);
+ RotationAction rotAction2 = mock(RotationAction.class);
+ List<RotationAction> rotActions;
+
+ SourceHandlerCallback callback = mock(SourceHandlerCallback.class);
+
+ @Before
+ public void setup() throws IOException {
+ // Ensure each test has a unique folder to work with.
+ folder = tempFolder.newFolder();
+ testFormat = new DefaultFileNameFormat()
+ .withPath(folder.toString())
+ .withExtension(".json")
+ .withPrefix("prefix-");
+
+ rotActions = new ArrayList<>();
+ rotActions.add(rotAction1);
+ rotActions.add(rotAction2);
+ }
+
+ @Test
+ public void testRotateOutputFile() throws IOException {
+ SourceHandler handler = new SourceHandler(
+ rotActions,
+ new FileSizeRotationPolicy(10000, Units.MB), // Don't actually care
about the rotation
+ new CountSyncPolicy(1),
+ testFormat,
+ callback
+ );
+
+ handler.rotateOutputFile();
+
+ // Function should ensure rotation actions and callback are called.
+ verify(rotAction1).execute(any(), any());
+ verify(rotAction2).execute(any(), any());
+ verify(callback).removeKey();
+ }
+}