tpalfy commented on a change in pull request #4916:
URL: https://github.com/apache/nifi/pull/4916#discussion_r597880479
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -735,13 +757,15 @@ private void processTailFile(final ProcessContext
context, final ProcessSession
if (!rotated) {
final long fileLength = file.length();
if (length > fileLength) {
+ getLogger().debug("Rotated = true because TailFile State
Length = {}, File Length = {}", length, fileLength);
Review comment:
Minor (bit confusing otherwise)
```suggestion
getLogger().debug("Rotated = true because TailFileState
Length = {}, File Length = {}", length, fileLength);
```
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -166,10 +168,21 @@
+ "(without extension), and will assume that the files
that have rolled over live in the same directory as the file being tailed. "
+ "The same glob pattern will be used for all files.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(NONE)
.required(false)
.build();
+ static final PropertyDescriptor ROLLOVER_TAIL_PERIOD = new
PropertyDescriptor.Builder()
Review comment:
Minor: It maybe just me but to me "rollover" feels more like a verb
rather than a noun/adjective. So "Rollover Tail Period" is a bit confusing.
"Post-Rollover Tail Period" feels more natural and highlights the idea that we
do something that we usually do not, _after_ the rollover.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -691,6 +705,14 @@ private void processTailFile(final ProcessContext context,
final ProcessSession
}
rolloverOccurred = recoverRolledFiles(context, session, tailFile,
expectedChecksumValue, tfo.getState().getTimestamp(),
tfo.getState().getPosition());
+ if (rolloverOccurred) {
+ final boolean tailRollover =
context.getProperty(ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS) >
0;
Review comment:
Minor: Similar to previous comment.
We could use `tailAfterRollover` or even `tailRolledOver` for example.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -1215,11 +1242,30 @@ private boolean recoverRolledFiles(final ProcessContext
context, final ProcessSe
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
session.transfer(flowFile, REL_SUCCESS);
getLogger().debug("Created {} from rolled over
file {} and routed to success", new Object[]{flowFile, firstFile});
+ }
+ // We need to update the state to account for the
fact that we just brought data in.
+ // If we are going to tail a rolled over file for
some amount of time, then we need to keep the state pointing to the
+ // same file, just using an updated
position/timestamp/checksum/length. This way, the next iteration will compare
against these
+ // updated values.
+ // But if we are not going to tail the rolled over
file for any period of time, we can essentially reset the state.
+ final long rolloverTailMillis =
context.getProperty(ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
Review comment:
Minor
```suggestion
final long postRolloverTailMillis =
context.getProperty(ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
```
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
##########
@@ -311,6 +313,79 @@ private File rollover(final int index) throws IOException {
}
+ @Test
+ public void testFileWrittenToAfterRollover() throws IOException,
InterruptedException {
+ Assume.assumeTrue("Test requires renaming a file while a file handle
is still open to it, so it won't run on Windows", !SystemUtils.IS_OS_WINDOWS);
+
+ runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+ runner.setProperty(TailFile.START_POSITION,
TailFile.START_BEGINNING_OF_TIME.getValue());
+ runner.setProperty(TailFile.REREAD_ON_NUL, "true");
+ runner.setProperty(TailFile.ROLLOVER_TAIL_PERIOD, "10 mins");
+
+ // first line fully written, second partially
Review comment:
I'd rather remove these comments. Code is clear enough and some of these
already seem to be outdated.
Like this one: why do we consider the second line partial?
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -1215,11 +1242,30 @@ private boolean recoverRolledFiles(final ProcessContext
context, final ProcessSe
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
session.transfer(flowFile, REL_SUCCESS);
getLogger().debug("Created {} from rolled over
file {} and routed to success", new Object[]{flowFile, firstFile});
+ }
+ // We need to update the state to account for the
fact that we just brought data in.
+ // If we are going to tail a rolled over file for
some amount of time, then we need to keep the state pointing to the
+ // same file, just using an updated
position/timestamp/checksum/length. This way, the next iteration will compare
against these
+ // updated values.
+ // But if we are not going to tail the rolled over
file for any period of time, we can essentially reset the state.
+ final long rolloverTailMillis =
context.getProperty(ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long millisSinceUpdate =
System.currentTimeMillis() - firstFile.lastModified();
Review comment:
Minor: Just to help not to become uncertain what `timestamp` was about.
```suggestion
final long millisSinceUpdate =
System.currentTimeMillis() - timestamp;
```
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
##########
@@ -311,6 +313,79 @@ private File rollover(final int index) throws IOException {
}
+ @Test
+ public void testFileWrittenToAfterRollover() throws IOException,
InterruptedException {
+ Assume.assumeTrue("Test requires renaming a file while a file handle
is still open to it, so it won't run on Windows", !SystemUtils.IS_OS_WINDOWS);
+
+ runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+ runner.setProperty(TailFile.START_POSITION,
TailFile.START_BEGINNING_OF_TIME.getValue());
+ runner.setProperty(TailFile.REREAD_ON_NUL, "true");
+ runner.setProperty(TailFile.ROLLOVER_TAIL_PERIOD, "10 mins");
+
+ // first line fully written, second partially
+ raf.write("a\nb\n".getBytes());
+ runner.run(1, false, true);
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("a\nb\n");
+ runner.clearTransferState();
+
+ raf.write("c\n".getBytes());
+ runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("c\n");
+ runner.clearTransferState();
+
+ // Write additional data to file, then roll file over
+ raf.write("d\n".getBytes());
+
+ final File rolledFile = new File("target/log.1");
+ final boolean renamed = file.renameTo(rolledFile);
+ assertTrue(renamed);
+ raf.getChannel().force(true);
+
+ System.out.println("Wrote d\\n and rolled file");
+
+ // Create the new file
+ final RandomAccessFile newFile = new RandomAccessFile(new
File("target/log.txt"), "rw");
+ newFile.write("new file\n".getBytes()); // This should not get
consumed until the old file's last modified date indicates it's complete
+
+ // Trigger processor and verify data is consumed properly
+ runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("d\n");
+ runner.clearTransferState();
+
+ // Write to the file and trigger again.
+ raf.write("e\n".getBytes());
+ System.out.println("Wrote e\\n");
+ runner.run(1, false, false);
+
+ // There should be no data consumed because the last modified time is
too recent.
Review comment:
Another outdated(?) comment. "e" _has_ been consumed (although from the
rolled over file)
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -1215,11 +1242,30 @@ private boolean recoverRolledFiles(final ProcessContext
context, final ProcessSe
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
session.transfer(flowFile, REL_SUCCESS);
getLogger().debug("Created {} from rolled over
file {} and routed to success", new Object[]{flowFile, firstFile});
+ }
+ // We need to update the state to account for the
fact that we just brought data in.
+ // If we are going to tail a rolled over file for
some amount of time, then we need to keep the state pointing to the
+ // same file, just using an updated
position/timestamp/checksum/length. This way, the next iteration will compare
against these
+ // updated values.
+ // But if we are not going to tail the rolled over
file for any period of time, we can essentially reset the state.
+ final long rolloverTailMillis =
context.getProperty(ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long millisSinceUpdate =
System.currentTimeMillis() - firstFile.lastModified();
+ if (rolloverTailMillis > 0 && millisSinceUpdate <
rolloverTailMillis) {
+ getLogger().debug("File {} has been rolled
over, but it was updated {} millis ago, which is less than the configured
Rollover Tail Period, so will continue " +
+ "tailing", firstFile, millisSinceUpdate);
Review comment:
```suggestion
getLogger().debug("File {} has been rolled
over, but it was updated {} millis ago, which is less than the configured " +
ROLLOVER_TAIL_PERIOD.getDisplayName() +
" ({} ms), so will continue tailing",
firstFile, millisSinceUpdate, rolloverTailMillis);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]