This is an automated email from the ASF dual-hosted git repository.
tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new e1b9548 NIFI-8344: Introduced new Rollover Tail Period property
e1b9548 is described below
commit e1b9548ab68ece3a9904f50cd42898e0d41ce166
Author: Mark Payne <[email protected]>
AuthorDate: Thu Mar 18 17:21:31 2021 -0400
NIFI-8344: Introduced new Rollover Tail Period property
This closes #4916.
Signed-off-by: Tamas Palfy <[email protected]>
---
.../apache/nifi/processors/standard/TailFile.java | 68 ++++++++++++++++----
.../nifi/processors/standard/TestTailFile.java | 73 ++++++++++++++++++++++
2 files changed, 130 insertions(+), 11 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 1e6727d..6ca3f23 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -37,7 +37,6 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
@@ -82,6 +81,9 @@ import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+
// note: it is important that this Processor is not marked as
@SupportsBatching because the session commits must complete before persisting
state locally; otherwise, data loss may occur
@TriggerSerially
@InputRequirement(Requirement.INPUT_FORBIDDEN)
@@ -131,7 +133,7 @@ public class TailFile extends AbstractProcessor {
.name("tail-base-directory")
.displayName("Base directory")
.description("Base directory used to look for files to tail. This
property is required when using Multifile mode.")
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(VARIABLE_REGISTRY)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
.required(false)
.build();
@@ -141,7 +143,7 @@ public class TailFile extends AbstractProcessor {
.displayName("Tailing mode")
.description("Mode to use: single file will tail only one file,
multiple file will look for a list of file. In Multiple mode"
+ " the Base directory is required.")
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(NONE)
.required(true)
.allowableValues(MODE_SINGLEFILE, MODE_MULTIFILE)
.defaultValue(MODE_SINGLEFILE.getValue())
@@ -153,7 +155,7 @@ public class TailFile extends AbstractProcessor {
.description("Path of the file to tail in case of single file
mode. If using multifile mode, regular expression to find files "
+ "to tail in the base directory. In case recursivity is
set to true, the regular expression will be used to match the "
+ "path starting from the base directory (see additional
details for examples).")
-
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .expressionLanguageSupported(VARIABLE_REGISTRY)
.addValidator(StandardValidators.createRegexValidator(0,
Integer.MAX_VALUE, true))
.required(true)
.build();
@@ -166,10 +168,21 @@ public class TailFile extends AbstractProcessor {
+ "(without extension), and will assume that the files
that have rolled over live in the same directory as the file being tailed. "
+ "The same glob pattern will be used for all files.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .expressionLanguageSupported(NONE)
.required(false)
.build();
+ static final PropertyDescriptor POST_ROLLOVER_TAIL_PERIOD = new
PropertyDescriptor.Builder()
+ .name("Post-Rollover Tail Period")
+ .description("When a file is rolled over, the processor will continue
tailing the rolled over file until it has not been modified for this amount of
time. " +
+ "This allows for another process to rollover a file, and then
flush out any buffered data. Note that when this value is set, and the tailed
file rolls over, " +
+ "the new file will not be tailed until the old file has not been
modified for the configured amount of time.")
+ .required(false)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(NONE)
+ .defaultValue("0 sec")
+ .build();
+
static final PropertyDescriptor STATE_LOCATION = new
PropertyDescriptor.Builder()
.displayName("State Location")
.name("File Location") //retained name of property for backward
compatibility of configs
@@ -251,6 +264,7 @@ public class TailFile extends AbstractProcessor {
properties.add(MODE);
properties.add(FILENAME);
properties.add(ROLLING_FILENAME_PATTERN);
+ properties.add(POST_ROLLOVER_TAIL_PERIOD);
properties.add(BASE_DIRECTORY);
properties.add(START_POSITION);
properties.add(STATE_LOCATION);
@@ -691,6 +705,14 @@ public class TailFile extends AbstractProcessor {
}
rolloverOccurred = recoverRolledFiles(context, session, tailFile,
expectedChecksumValue, tfo.getState().getTimestamp(),
tfo.getState().getPosition());
+ if (rolloverOccurred) {
+ final boolean tailAfterRollover =
context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS)
> 0;
+ if (tailAfterRollover) {
+ getLogger().debug("File {} was rolled over and the
Rollover Tail Period is set, so will not consume from new file during this
iteration.", tailFile);
+ return;
+ }
+ }
+
tfo.setExpectedRecoveryChecksum(null);
}
@@ -735,6 +757,7 @@ public class TailFile extends AbstractProcessor {
if (!rotated) {
final long fileLength = file.length();
if (length > fileLength) {
+ getLogger().debug("Rotated = true because TailFileState Length
= {}, File Length = {}", length, fileLength);
rotated = true;
} else {
try {
@@ -742,6 +765,7 @@ public class TailFile extends AbstractProcessor {
final long readerPosition = reader.position();
if (readerSize == readerPosition && readerSize !=
fileLength) {
+ getLogger().debug("Rotated = true because
readerSize={}, readerPosition={}, fileLength={}", readerSize, readerPosition,
fileLength);
rotated = true;
}
} catch (final IOException e) {
@@ -1191,19 +1215,22 @@ public class TailFile extends AbstractProcessor {
rolledOffFiles.remove(0);
FlowFile flowFile = session.create();
+ final TailFileState currentState = tfo.getState();
+ final Checksum checksum =
currentState.getChecksum() == null ? new CRC32() : currentState.getChecksum();
+ final ByteBuffer buffer = currentState.getBuffer()
== null ? ByteBuffer.allocate(65536) : currentState.getBuffer();
+ final FileChannel channel = fis.getChannel();
+ final long timestamp = firstFile.lastModified();
+
try {
- flowFile = session.write(flowFile,
- out -> readLines(fis.getChannel(),
ByteBuffer.allocate(65536), out, new CRC32(), reReadOnNul, true));
+ flowFile = session.write(flowFile, out ->
readLines(channel, buffer, out, checksum, reReadOnNul, true));
} catch (NulCharacterEncounteredException ncee) {
rolledOffFiles.add(0, firstFile);
session.remove(flowFile);
throw ncee;
}
+
if (flowFile.getSize() == 0L) {
session.remove(flowFile);
- // use a timestamp of lastModified() + 1 so
that we do not ingest this file again.
- cleanup();
- tfo.setState(new TailFileState(tailFile, null,
null, 0L, firstFile.lastModified() + 1L, firstFile.length(), null,
tfo.getState().getBuffer()));
} else {
final Map<String, String> attributes = new
HashMap<>(3);
attributes.put(CoreAttributes.FILENAME.key(),
firstFile.getName());
@@ -1215,11 +1242,30 @@ public class TailFile extends AbstractProcessor {
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
session.transfer(flowFile, REL_SUCCESS);
getLogger().debug("Created {} from rolled over
file {} and routed to success", new Object[]{flowFile, firstFile});
+ }
+ // We need to update the state to account for the
fact that we just brought data in.
+ // If we are going to tail a rolled over file for
some amount of time, then we need to keep the state pointing to the
+ // same file, just using an updated
position/timestamp/checksum/length. This way, the next iteration will compare
against these
+ // updated values.
+ // But if we are not going to tail the rolled over
file for any period of time, we can essentially reset the state.
+ final long postRolloverTailMillis =
context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+ final long millisSinceUpdate =
System.currentTimeMillis() - timestamp;
+ if (postRolloverTailMillis > 0 &&
millisSinceUpdate < postRolloverTailMillis) {
+ getLogger().debug("File {} has been rolled
over, but it was updated {} millis ago, which is less than the configured {}
({} ms), so will continue tailing",
+ firstFile, millisSinceUpdate,
POST_ROLLOVER_TAIL_PERIOD.getDisplayName(), postRolloverTailMillis);
+
+ final long length = currentState.getLength() +
flowFile.getSize();
+ final long updatedPosition = position +
flowFile.getSize();
+ final TailFileState updatedState = new
TailFileState(currentState.getFilename(), currentState.getFile(), channel,
updatedPosition, timestamp, length, checksum,
+ buffer);
+
+ tfo.setState(updatedState);
+ persistState(tfo, session, context);
+ } else {
// use a timestamp of lastModified() + 1 so
that we do not ingest this file again.
cleanup();
tfo.setState(new TailFileState(tailFile, null,
null, 0L, firstFile.lastModified() + 1L, firstFile.length(), null,
tfo.getState().getBuffer()));
-
persistState(tfo, session, context);
}
} else {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index ebb752c..a62fbea 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.standard;
+import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.processors.standard.TailFile.TailFileState;
@@ -25,6 +26,7 @@ import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
@@ -312,6 +314,77 @@ public class TestTailFile {
@Test
+ public void testFileWrittenToAfterRollover() throws IOException,
InterruptedException {
+ Assume.assumeTrue("Test requires renaming a file while a file handle
is still open to it, so it won't run on Windows", !SystemUtils.IS_OS_WINDOWS);
+
+ runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+ runner.setProperty(TailFile.START_POSITION,
TailFile.START_BEGINNING_OF_TIME.getValue());
+ runner.setProperty(TailFile.REREAD_ON_NUL, "true");
+ runner.setProperty(TailFile.POST_ROLLOVER_TAIL_PERIOD, "10 mins");
+
+ raf.write("a\nb\n".getBytes());
+ runner.run(1, false, true);
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("a\nb\n");
+ runner.clearTransferState();
+
+ raf.write("c\n".getBytes());
+ runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("c\n");
+ runner.clearTransferState();
+
+ // Write additional data to file, then roll file over
+ raf.write("d\n".getBytes());
+
+ final File rolledFile = new File("target/log.1");
+ final boolean renamed = file.renameTo(rolledFile);
+ assertTrue(renamed);
+ raf.getChannel().force(true);
+
+ System.out.println("Wrote d\\n and rolled file");
+
+ // Create the new file
+ final RandomAccessFile newFile = new RandomAccessFile(new
File("target/log.txt"), "rw");
+ newFile.write("new file\n".getBytes()); // This should not get
consumed until the old file's last modified date indicates it's complete
+
+ // Trigger processor and verify data is consumed properly
+ runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("d\n");
+ runner.clearTransferState();
+
+ // Write to the file and trigger again.
+ raf.write("e\n".getBytes());
+ System.out.println("Wrote e\\n");
+ runner.run(1, false, false);
+
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("e\n");
+ runner.clearTransferState();
+
+ // Ensure that no data comes in for a bit, since the last modified
date on the rolled over file isn't old enough.
+ for (int i=0; i < 100; i++) {
+ runner.run(1, false, false);
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+ Thread.sleep(1L);
+ }
+
+ // Set last modified time so that processor believes file to have not
been modified in a very long time, then run again.
+ assertTrue(rolledFile.setLastModified(500L));
+ System.out.println("Set lastModified on " + rolledFile + " to 500");
+ runner.run(1, false, false);
+
+ // Verify results
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("new
file\n");
+ runner.clearTransferState();
+
+ raf.close();
+ }
+
+
+ @Test
public void testConsumeAfterTruncationStartAtBeginningOfFile() throws
IOException, InterruptedException {
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");
runner.setProperty(TailFile.START_POSITION,
TailFile.START_CURRENT_FILE.getValue());