This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 1ef1905  NIFI-8344: Addressed corner case in which we didn't handle a 
case where we tailed a file after being rolled over and then processor 
encounters a newline followed by NUL characters
1ef1905 is described below

commit 1ef1905461a793ce690534ec96247614fa1f548a
Author: Mark Payne <[email protected]>
AuthorDate: Mon Apr 19 16:49:14 2021 -0400

    NIFI-8344: Addressed corner case in which we didn't handle a case where we 
tailed a file after being rolled over and then processor encounters a newline 
followed by NUL characters
    
    This closes #5009.
    
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../apache/nifi/processors/standard/TailFile.java  | 27 ++++++++++++++++------
 .../nifi/processors/standard/TestTailFile.java     | 24 ++++++++++++++++++-
 2 files changed, 43 insertions(+), 8 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 01cf285..5cb502c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -1306,12 +1306,21 @@ public class TailFile extends AbstractProcessor {
             final FileChannel channel = fis.getChannel();
             final long timestamp = fileToTail.lastModified();
 
-            try {
-                flowFile = session.write(flowFile, out -> readLines(channel, 
buffer, out, checksum, reReadOnNul, readFully));
-            } catch (NulCharacterEncounteredException ncee) {
-                session.remove(flowFile);
-                throw ncee;
-            }
+            final AtomicReference<NulCharacterEncounteredException> abort = 
new AtomicReference<>();
+
+            flowFile = session.write(flowFile, out -> {
+                try {
+                    readLines(channel, buffer, out, checksum, reReadOnNul, 
readFully);
+                } catch (final NulCharacterEncounteredException ncee) {
+                    abort.set(ncee);
+
+                    // Log the fact that we encountered a NUL character and 
yield. But we don't re-throw the Exception because
+                    // we want to continue on with the same logic of 
transferring non-zero flowfiles, removing 0-byte flowfiles,
+                    // and maintaining our state.
+                    getLogger().info("Encountered NUL character when tailing 
file {}; will yield", tailFile);
+                    context.yield();
+                }
+            });
 
             if (flowFile.getSize() == 0L) {
                 session.remove(flowFile);
@@ -1346,6 +1355,11 @@ public class TailFile extends AbstractProcessor {
 
                 tfo.setState(updatedState);
             } else {
+                final NulCharacterEncounteredException ncee = abort.get();
+                if (ncee != null) {
+                    throw ncee;
+                }
+
                 // use a timestamp of lastModified() + 1 so that we do not 
ingest this file again.
                 getLogger().debug("Completed tailing of file {}; will cleanup 
state", tailFile);
                 cleanup();
@@ -1357,7 +1371,6 @@ public class TailFile extends AbstractProcessor {
         }
     }
 
-
     /**
      * Creates a new FlowFile that contains the entire contents of the given
      * file and transfers that FlowFile to success. This method will commit the
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index c279c3f..e7d6177 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -364,6 +364,28 @@ public class TestTailFile {
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("e\n");
         runner.clearTransferState();
 
+        // Write out some more characters and then write NUL characters. This 
should result in the processor not consuming the data.
+        raf.write("\n".getBytes());
+        raf.write(0);
+        raf.write(0);
+        raf.write(0);
+        System.out.println("Wrote \\n\\0\\0\\0");
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("f\n");
+        runner.clearTransferState();
+
+        // Truncate the NUL bytes and replace with additional data, ending 
with a new line. This should ingest the entire line of text.
+        raf.setLength(raf.length() - 3);
+        raf.write("g\nh".getBytes());
+        System.out.println("Truncated the NUL bytes and replaced with g\\nh");
+
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("g\n");
+        runner.clearTransferState();
+
         // Ensure that no data comes in for a bit, since the last modified 
date on the rolled over file isn't old enough.
         for (int i=0; i < 100; i++) {
             runner.run(1, false, false);
@@ -378,7 +400,7 @@ public class TestTailFile {
 
         // Verify results
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
-        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("f");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("h");
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("new
 file\n");
         runner.clearTransferState();
 

Reply via email to