Repository: nifi Updated Branches: refs/heads/master 0054a9e35 -> 7df5c2dc8
NIFI-3607 Allow multi files mode with fixed names This closes: #1608 Signed-off-by: Andre F de Miranda <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7df5c2dc Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7df5c2dc Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7df5c2dc Branch: refs/heads/master Commit: 7df5c2dc891a57f382075d868244f843de6fd960 Parents: 0054a9e Author: Pierre Villard <[email protected]> Authored: Mon Mar 20 15:28:27 2017 +0100 Committer: Andre F de Miranda <[email protected]> Committed: Mon May 1 09:31:24 2017 +1000 ---------------------------------------------------------------------- .../nifi/util/StandardProcessorTestRunner.java | 18 +++++-- .../java/org/apache/nifi/util/TestRunner.java | 8 ++++ .../nifi/processors/standard/TailFile.java | 8 ++-- .../nifi/processors/standard/TestTailFile.java | 50 +++++++++++++++++++- 4 files changed, 75 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/7df5c2dc/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 6c35643..6fe5195 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -36,9 +36,9 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -87,6 +87,7 @@ public class StandardProcessorTestRunner implements TestRunner { private int numThreads = 1; private MockSessionFactory sessionFactory; + private long runSchedule = 0; private final AtomicInteger invocations = new AtomicInteger(0); private final Map<String, MockComponentLog> controllerServiceLoggers = new HashMap<>(); @@ -177,11 +178,11 @@ public class StandardProcessorTestRunner implements TestRunner { } } - final ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(numThreads); @SuppressWarnings("unchecked") final Future<Throwable>[] futures = new Future[iterations]; for (int i = 0; i < iterations; i++) { - final Future<Throwable> future = executorService.submit(new RunProcessor()); + final Future<Throwable> future = executorService.schedule(new RunProcessor(), i * runSchedule, TimeUnit.MILLISECONDS); futures[i] = future; } @@ -907,4 +908,15 @@ public class StandardProcessorTestRunner implements TestRunner { } } } + + /** + * Set the Run Schedule parameter (in milliseconds). If set, this will be the duration + * between two calls of the onTrigger method. + * + * @param runSchedule Run schedule duration in milliseconds. + */ + @Override + public void setRunSchedule(long runSchedule) { + this.runSchedule = runSchedule; + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/7df5c2dc/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java ---------------------------------------------------------------------- diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java index 9a1a10d..85ef72c 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java @@ -965,4 +965,12 @@ public interface TestRunner { */ void enforceReadStreamsClosed(boolean enforce); + /** + * Set the Run Schedule parameter (in milliseconds). If set, this will be the duration + * between two calls of the onTrigger method. + * + * @param runSchedule Run schedule duration in milliseconds. + */ + void setRunSchedule(long runSchedule); + } http://git-wip-us.apache.org/repos/asf/nifi/blob/7df5c2dc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java index 88b83b4..e2f75bc 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java @@ -44,11 +44,11 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.stream.io.NullOutputStream; import org.apache.nifi.stream.io.StreamUtils; import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -108,8 +108,7 @@ public class TailFile extends AbstractProcessor { + " In this mode, the file may not exist when starting the processor."); static final AllowableValue MODE_MULTIFILE = new AllowableValue("Multiple files", "Multiple files", "In this mode, the 'Files to tail' property accepts a regular expression and the processor will look" - + " for files in 'Base directory' to list the files to tail by the processor. In this mode, only the files existing" - + " when starting the processor will be used."); + + " for files in 'Base directory' to list the files to tail by the processor."); static final AllowableValue FIXED_NAME = new AllowableValue("Fixed name", "Fixed name", "With this rolling strategy, the files " + "where the log messages are appended have always the same name."); @@ -311,8 +310,7 @@ public class TailFile extends AbstractProcessor { @OnScheduled public void recoverState(final ProcessContext context) throws IOException { // set isMultiChanging - isMultiChanging.set(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue()) - && context.getProperty(ROLLING_STRATEGY).getValue().equals(CHANGING_NAME.getValue())); + isMultiChanging.set(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())); // set last lookup to now lastLookup.set(new Date().getTime()); http://git-wip-us.apache.org/repos/asf/nifi/blob/7df5c2dc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java index 227c727..485b8e3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java @@ -20,8 +20,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.BufferedWriter; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileOutputStream; import java.io.FilenameFilter; @@ -31,13 +33,15 @@ import java.io.RandomAccessFile; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; import org.apache.nifi.processors.standard.TailFile.TailFileState; import org.apache.nifi.state.MockStateManager; -import org.apache.nifi.stream.io.ByteArrayOutputStream; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -660,6 +664,38 @@ public class TestTailFile { } @Test + public void testDetectNewFile() throws IOException, InterruptedException { + runner.setProperty(TailFile.BASE_DIRECTORY, "target"); + runner.setProperty(TailFile.MODE, TailFile.MODE_MULTIFILE); + runner.setProperty(TailFile.LOOKUP_FREQUENCY, "1 sec"); + runner.setProperty(TailFile.FILENAME, "log_[0-9]*\\.txt"); + runner.setProperty(TailFile.RECURSIVE, "false"); + runner.setProperty(TailFile.ROLLING_STRATEGY, TailFile.FIXED_NAME); + + initializeFile("target/log_1.txt", "firstLine\n"); + + Runnable task = () -> { + try { + initializeFile("target/log_2.txt", "newFile\n"); + } catch (Exception e) { + fail(); + } + }; + + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + executor.schedule(task, 2, TimeUnit.SECONDS); + + runner.setRunSchedule(2000); + runner.run(3); + + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2); + assertTrue(runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).stream().anyMatch(mockFlowFile -> mockFlowFile.isContentEqual("firstLine\n"))); + assertTrue(runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).stream().anyMatch(mockFlowFile -> mockFlowFile.isContentEqual("newFile\n"))); + + runner.shutdown(); + } + + @Test public void testMultipleFilesWithBasedirAndFilenameEL() throws IOException, InterruptedException { runner.setVariable("vrBaseDirectory", "target"); runner.setProperty(TailFile.BASE_DIRECTORY, "${vrBaseDirectory}"); @@ -933,4 +969,16 @@ public class TestTailFile { cleanFiles("target/testDir"); } + private RandomAccessFile initializeFile(String path, String data) throws IOException { + File file = new File(path); + if(file.exists()) { + file.delete(); + } + assertTrue(file.createNewFile()); + RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"); + randomAccessFile.write(data.getBytes()); + randomAccessFile.close(); + return randomAccessFile; + } + }
