This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 1ef1905 NIFI-8344: Addressed corner case in which we didn't handle a
case where we tailed a file after being rolled over and then processor
encounters a newline followed by NUL characters
1ef1905 is described below
commit 1ef1905461a793ce690534ec96247614fa1f548a
Author: Mark Payne <[email protected]>
AuthorDate: Mon Apr 19 16:49:14 2021 -0400
NIFI-8344: Addressed corner case in which we didn't handle a case where we
tailed a file after being rolled over and then processor encounters a newline
followed by NUL characters
This closes #5009.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../apache/nifi/processors/standard/TailFile.java | 27 ++++++++++++++++------
.../nifi/processors/standard/TestTailFile.java | 24 ++++++++++++++++++-
2 files changed, 43 insertions(+), 8 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 01cf285..5cb502c 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -1306,12 +1306,21 @@ public class TailFile extends AbstractProcessor {
final FileChannel channel = fis.getChannel();
final long timestamp = fileToTail.lastModified();
- try {
- flowFile = session.write(flowFile, out -> readLines(channel,
buffer, out, checksum, reReadOnNul, readFully));
- } catch (NulCharacterEncounteredException ncee) {
- session.remove(flowFile);
- throw ncee;
- }
+ final AtomicReference<NulCharacterEncounteredException> abort =
new AtomicReference<>();
+
+ flowFile = session.write(flowFile, out -> {
+ try {
+ readLines(channel, buffer, out, checksum, reReadOnNul,
readFully);
+ } catch (final NulCharacterEncounteredException ncee) {
+ abort.set(ncee);
+
+ // Log the fact that we encountered a NUL character and
yield. But we don't re-throw the Exception because
+ // we want to continue on with the same logic of
transferring non-zero flowfiles, removing 0-byte flowfiles,
+ // and maintaining our state.
+ getLogger().info("Encountered NUL character when tailing
file {}; will yield", tailFile);
+ context.yield();
+ }
+ });
if (flowFile.getSize() == 0L) {
session.remove(flowFile);
@@ -1346,6 +1355,11 @@ public class TailFile extends AbstractProcessor {
tfo.setState(updatedState);
} else {
+ final NulCharacterEncounteredException ncee = abort.get();
+ if (ncee != null) {
+ throw ncee;
+ }
+
// use a timestamp of lastModified() + 1 so that we do not
ingest this file again.
getLogger().debug("Completed tailing of file {}; will cleanup
state", tailFile);
cleanup();
@@ -1357,7 +1371,6 @@ public class TailFile extends AbstractProcessor {
}
}
-
/**
* Creates a new FlowFile that contains the entire contents of the given
* file and transfers that FlowFile to success. This method will commit the
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index c279c3f..e7d6177 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -364,6 +364,28 @@ public class TestTailFile {
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("e\n");
runner.clearTransferState();
+ // Write out some more characters and then write NUL characters. This
should result in the processor not consuming the data.
+ raf.write("\n".getBytes());
+ raf.write(0);
+ raf.write(0);
+ raf.write(0);
+ System.out.println("Wrote \\n\\0\\0\\0");
+
+ runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("f\n");
+ runner.clearTransferState();
+
+ // Truncate the NUL bytes and replace with additional data, ending
with a new line. This should ingest the entire line of text.
+ raf.setLength(raf.length() - 3);
+ raf.write("g\nh".getBytes());
+ System.out.println("Truncated the NUL bytes and replaced with g\\nh");
+
+ runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("g\n");
+ runner.clearTransferState();
+
// Ensure that no data comes in for a bit, since the last modified
date on the rolled over file isn't old enough.
for (int i=0; i < 100; i++) {
runner.run(1, false, false);
@@ -378,7 +400,7 @@ public class TestTailFile {
// Verify results
runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
-
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("f");
+
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("h");
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("new
file\n");
runner.clearTransferState();