Repository: nifi
Updated Branches:
  refs/heads/master 0054a9e35 -> 7df5c2dc8


NIFI-3607 Allow multi files mode with fixed names

This closes: #1608

Signed-off-by: Andre F de Miranda <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7df5c2dc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7df5c2dc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7df5c2dc

Branch: refs/heads/master
Commit: 7df5c2dc891a57f382075d868244f843de6fd960
Parents: 0054a9e
Author: Pierre Villard <[email protected]>
Authored: Mon Mar 20 15:28:27 2017 +0100
Committer: Andre F de Miranda <[email protected]>
Committed: Mon May 1 09:31:24 2017 +1000

----------------------------------------------------------------------
 .../nifi/util/StandardProcessorTestRunner.java  | 18 +++++--
 .../java/org/apache/nifi/util/TestRunner.java   |  8 ++++
 .../nifi/processors/standard/TailFile.java      |  8 ++--
 .../nifi/processors/standard/TestTailFile.java  | 50 +++++++++++++++++++-
 4 files changed, 75 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7df5c2dc/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
----------------------------------------------------------------------
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index 6c35643..6fe5195 100644
--- 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -36,9 +36,9 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -87,6 +87,7 @@ public class StandardProcessorTestRunner implements 
TestRunner {
 
     private int numThreads = 1;
     private MockSessionFactory sessionFactory;
+    private long runSchedule = 0;
     private final AtomicInteger invocations = new AtomicInteger(0);
 
     private final Map<String, MockComponentLog> controllerServiceLoggers = new 
HashMap<>();
@@ -177,11 +178,11 @@ public class StandardProcessorTestRunner implements 
TestRunner {
                 }
             }
 
-            final ExecutorService executorService = 
Executors.newFixedThreadPool(numThreads);
+            final ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(numThreads);
             @SuppressWarnings("unchecked")
             final Future<Throwable>[] futures = new Future[iterations];
             for (int i = 0; i < iterations; i++) {
-                final Future<Throwable> future = executorService.submit(new 
RunProcessor());
+                final Future<Throwable> future = executorService.schedule(new 
RunProcessor(), i * runSchedule, TimeUnit.MILLISECONDS);
                 futures[i] = future;
             }
 
@@ -907,4 +908,15 @@ public class StandardProcessorTestRunner implements 
TestRunner {
             }
         }
     }
+
+    /**
+     * Set the Run Schedule parameter (in milliseconds). If set, this will be 
the duration
+     * between two calls of the onTrigger method.
+     *
+     * @param runSchedule Run schedule duration in milliseconds.
+     */
+    @Override
+    public void setRunSchedule(long runSchedule) {
+        this.runSchedule = runSchedule;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7df5c2dc/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
index 9a1a10d..85ef72c 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/TestRunner.java
@@ -965,4 +965,12 @@ public interface TestRunner {
      */
     void enforceReadStreamsClosed(boolean enforce);
 
+    /**
+     * Set the Run Schedule parameter (in milliseconds). If set, this will be 
the duration
+     * between two calls of the onTrigger method.
+     *
+     * @param runSchedule Run schedule duration in milliseconds.
+     */
+     void setRunSchedule(long runSchedule);
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7df5c2dc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 88b83b4..e2f75bc 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -44,11 +44,11 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.stream.io.NullOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 
 import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -108,8 +108,7 @@ public class TailFile extends AbstractProcessor {
             + " In this mode, the file may not exist when starting the 
processor.");
     static final AllowableValue MODE_MULTIFILE = new AllowableValue("Multiple 
files", "Multiple files",
             "In this mode, the 'Files to tail' property accepts a regular 
expression and the processor will look"
-            + " for files in 'Base directory' to list the files to tail by the 
processor. In this mode, only the files existing"
-            + " when starting the processor will be used.");
+            + " for files in 'Base directory' to list the files to tail by the 
processor.");
 
     static final AllowableValue FIXED_NAME = new AllowableValue("Fixed name", 
"Fixed name", "With this rolling strategy, the files "
             + "where the log messages are appended have always the same 
name.");
@@ -311,8 +310,7 @@ public class TailFile extends AbstractProcessor {
     @OnScheduled
     public void recoverState(final ProcessContext context) throws IOException {
         // set isMultiChanging
-        
isMultiChanging.set(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())
-                && 
context.getProperty(ROLLING_STRATEGY).getValue().equals(CHANGING_NAME.getValue()));
+        
isMultiChanging.set(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue()));
 
         // set last lookup to now
         lastLookup.set(new Date().getTime());

http://git-wip-us.apache.org/repos/asf/nifi/blob/7df5c2dc/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index 227c727..485b8e3 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -20,8 +20,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FilenameFilter;
@@ -31,13 +33,15 @@ import java.io.RandomAccessFile;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.processors.standard.TailFile.TailFileState;
 import org.apache.nifi.state.MockStateManager;
-import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -660,6 +664,38 @@ public class TestTailFile {
     }
 
     @Test
+    public void testDetectNewFile() throws IOException, InterruptedException {
+        runner.setProperty(TailFile.BASE_DIRECTORY, "target");
+        runner.setProperty(TailFile.MODE, TailFile.MODE_MULTIFILE);
+        runner.setProperty(TailFile.LOOKUP_FREQUENCY, "1 sec");
+        runner.setProperty(TailFile.FILENAME, "log_[0-9]*\\.txt");
+        runner.setProperty(TailFile.RECURSIVE, "false");
+        runner.setProperty(TailFile.ROLLING_STRATEGY, TailFile.FIXED_NAME);
+
+        initializeFile("target/log_1.txt", "firstLine\n");
+
+        Runnable task = () -> {
+            try {
+                initializeFile("target/log_2.txt", "newFile\n");
+            } catch (Exception e) {
+                fail();
+            }
+        };
+
+        ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(1);
+        executor.schedule(task, 2, TimeUnit.SECONDS);
+
+        runner.setRunSchedule(2000);
+        runner.run(3);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+        
assertTrue(runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).stream().anyMatch(mockFlowFile
 -> mockFlowFile.isContentEqual("firstLine\n")));
+        
assertTrue(runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).stream().anyMatch(mockFlowFile
 -> mockFlowFile.isContentEqual("newFile\n")));
+
+        runner.shutdown();
+    }
+
+    @Test
     public void testMultipleFilesWithBasedirAndFilenameEL() throws 
IOException, InterruptedException {
         runner.setVariable("vrBaseDirectory", "target");
         runner.setProperty(TailFile.BASE_DIRECTORY, "${vrBaseDirectory}");
@@ -933,4 +969,16 @@ public class TestTailFile {
         cleanFiles("target/testDir");
     }
 
+    private RandomAccessFile initializeFile(String path, String data) throws 
IOException {
+        File file = new File(path);
+        if(file.exists()) {
+            file.delete();
+        }
+        assertTrue(file.createNewFile());
+        RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
+        randomAccessFile.write(data.getBytes());
+        randomAccessFile.close();
+        return randomAccessFile;
+    }
+
 }

Reply via email to