This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new e1b9548  NIFI-8344: Introduced new Rollover Tail Period property
e1b9548 is described below

commit e1b9548ab68ece3a9904f50cd42898e0d41ce166
Author: Mark Payne <[email protected]>
AuthorDate: Thu Mar 18 17:21:31 2021 -0400

    NIFI-8344: Introduced new Rollover Tail Period property
    
    This closes #4916.
    
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../apache/nifi/processors/standard/TailFile.java  | 68 ++++++++++++++++----
 .../nifi/processors/standard/TestTailFile.java     | 73 ++++++++++++++++++++++
 2 files changed, 130 insertions(+), 11 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 1e6727d..6ca3f23 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -37,7 +37,6 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.AbstractProcessor;
@@ -82,6 +81,9 @@ import java.util.zip.CRC32;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+import static 
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+
 // note: it is important that this Processor is not marked as 
@SupportsBatching because the session commits must complete before persisting 
state locally; otherwise, data loss may occur
 @TriggerSerially
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
@@ -131,7 +133,7 @@ public class TailFile extends AbstractProcessor {
             .name("tail-base-directory")
             .displayName("Base directory")
             .description("Base directory used to look for files to tail. This 
property is required when using Multifile mode.")
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .expressionLanguageSupported(VARIABLE_REGISTRY)
             .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
             .required(false)
             .build();
@@ -141,7 +143,7 @@ public class TailFile extends AbstractProcessor {
             .displayName("Tailing mode")
             .description("Mode to use: single file will tail only one file, 
multiple file will look for a list of file. In Multiple mode"
                     + " the Base directory is required.")
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .expressionLanguageSupported(NONE)
             .required(true)
             .allowableValues(MODE_SINGLEFILE, MODE_MULTIFILE)
             .defaultValue(MODE_SINGLEFILE.getValue())
@@ -153,7 +155,7 @@ public class TailFile extends AbstractProcessor {
             .description("Path of the file to tail in case of single file 
mode. If using multifile mode, regular expression to find files "
                     + "to tail in the base directory. In case recursivity is 
set to true, the regular expression will be used to match the "
                     + "path starting from the base directory (see additional 
details for examples).")
-            
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .expressionLanguageSupported(VARIABLE_REGISTRY)
             .addValidator(StandardValidators.createRegexValidator(0, 
Integer.MAX_VALUE, true))
             .required(true)
             .build();
@@ -166,10 +168,21 @@ public class TailFile extends AbstractProcessor {
                     + "(without extension), and will assume that the files 
that have rolled over live in the same directory as the file being tailed. "
                     + "The same glob pattern will be used for all files.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .expressionLanguageSupported(NONE)
             .required(false)
             .build();
 
+    static final PropertyDescriptor POST_ROLLOVER_TAIL_PERIOD = new 
PropertyDescriptor.Builder()
+        .name("Post-Rollover Tail Period")
+        .description("When a file is rolled over, the processor will continue 
tailing the rolled over file until it has not been modified for this amount of 
time. " +
+            "This allows for another process to rollover a file, and then 
flush out any buffered data. Note that when this value is set, and the tailed 
file rolls over, " +
+            "the new file will not be tailed until the old file has not been 
modified for the configured amount of time.")
+        .required(false)
+        .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+        .expressionLanguageSupported(NONE)
+        .defaultValue("0 sec")
+        .build();
+
     static final PropertyDescriptor STATE_LOCATION = new 
PropertyDescriptor.Builder()
             .displayName("State Location")
             .name("File Location") //retained name of property for backward 
compatibility of configs
@@ -251,6 +264,7 @@ public class TailFile extends AbstractProcessor {
         properties.add(MODE);
         properties.add(FILENAME);
         properties.add(ROLLING_FILENAME_PATTERN);
+        properties.add(POST_ROLLOVER_TAIL_PERIOD);
         properties.add(BASE_DIRECTORY);
         properties.add(START_POSITION);
         properties.add(STATE_LOCATION);
@@ -691,6 +705,14 @@ public class TailFile extends AbstractProcessor {
             }
 
             rolloverOccurred = recoverRolledFiles(context, session, tailFile, 
expectedChecksumValue, tfo.getState().getTimestamp(), 
tfo.getState().getPosition());
+            if (rolloverOccurred) {
+                final boolean tailAfterRollover = 
context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS)
 > 0;
+                if (tailAfterRollover) {
+                    getLogger().debug("File {} was rolled over and the 
Rollover Tail Period is set, so will not consume from new file during this 
iteration.", tailFile);
+                    return;
+                }
+            }
+
             tfo.setExpectedRecoveryChecksum(null);
         }
 
@@ -735,6 +757,7 @@ public class TailFile extends AbstractProcessor {
         if (!rotated) {
             final long fileLength = file.length();
             if (length > fileLength) {
+                getLogger().debug("Rotated = true because TailFileState Length 
= {}, File Length = {}", length, fileLength);
                 rotated = true;
             } else {
                 try {
@@ -742,6 +765,7 @@ public class TailFile extends AbstractProcessor {
                     final long readerPosition = reader.position();
 
                     if (readerSize == readerPosition && readerSize != 
fileLength) {
+                        getLogger().debug("Rotated = true because 
readerSize={}, readerPosition={}, fileLength={}", readerSize, readerPosition, 
fileLength);
                         rotated = true;
                     }
                 } catch (final IOException e) {
@@ -1191,19 +1215,22 @@ public class TailFile extends AbstractProcessor {
                             rolledOffFiles.remove(0);
                             FlowFile flowFile = session.create();
 
+                            final TailFileState currentState = tfo.getState();
+                            final Checksum checksum = 
currentState.getChecksum() == null ? new CRC32() : currentState.getChecksum();
+                            final ByteBuffer buffer = currentState.getBuffer() 
== null ? ByteBuffer.allocate(65536) : currentState.getBuffer();
+                            final FileChannel channel = fis.getChannel();
+                            final long timestamp = firstFile.lastModified();
+
                             try {
-                                flowFile = session.write(flowFile,
-                                        out -> readLines(fis.getChannel(), 
ByteBuffer.allocate(65536), out, new CRC32(), reReadOnNul, true));
+                                flowFile = session.write(flowFile, out -> 
readLines(channel, buffer, out, checksum, reReadOnNul, true));
                             } catch (NulCharacterEncounteredException ncee) {
                                 rolledOffFiles.add(0, firstFile);
                                 session.remove(flowFile);
                                 throw ncee;
                             }
+
                             if (flowFile.getSize() == 0L) {
                                 session.remove(flowFile);
-                                // use a timestamp of lastModified() + 1 so 
that we do not ingest this file again.
-                                cleanup();
-                                tfo.setState(new TailFileState(tailFile, null, 
null, 0L, firstFile.lastModified() + 1L, firstFile.length(), null, 
tfo.getState().getBuffer()));
                             } else {
                                 final Map<String, String> attributes = new 
HashMap<>(3);
                                 attributes.put(CoreAttributes.FILENAME.key(), 
firstFile.getName());
@@ -1215,11 +1242,30 @@ public class TailFile extends AbstractProcessor {
                                         
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
                                 session.transfer(flowFile, REL_SUCCESS);
                                 getLogger().debug("Created {} from rolled over 
file {} and routed to success", new Object[]{flowFile, firstFile});
+                            }
 
+                            // We need to update the state to account for the 
fact that we just brought data in.
+                            // If we are going to tail a rolled over file for 
some amount of time, then we need to keep the state pointing to the
+                            // same file, just using an updated 
position/timestamp/checksum/length. This way, the next iteration will compare 
against these
+                            // updated values.
+                            // But if we are not going to tail the rolled over 
file for any period of time, we can essentially reset the state.
+                            final long postRolloverTailMillis = 
context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+                            final long millisSinceUpdate = 
System.currentTimeMillis() - timestamp;
+                            if (postRolloverTailMillis > 0 && 
millisSinceUpdate < postRolloverTailMillis) {
+                                getLogger().debug("File {} has been rolled 
over, but it was updated {} millis ago, which is less than the configured {} 
({} ms), so will continue tailing",
+                                    firstFile, millisSinceUpdate, 
POST_ROLLOVER_TAIL_PERIOD.getDisplayName(), postRolloverTailMillis);
+
+                                final long length = currentState.getLength() + 
flowFile.getSize();
+                                final long updatedPosition = position + 
flowFile.getSize();
+                                final TailFileState updatedState = new 
TailFileState(currentState.getFilename(), currentState.getFile(), channel, 
updatedPosition, timestamp, length, checksum,
+                                    buffer);
+
+                                tfo.setState(updatedState);
+                                persistState(tfo, session, context);
+                            } else {
                                 // use a timestamp of lastModified() + 1 so 
that we do not ingest this file again.
                                 cleanup();
                                 tfo.setState(new TailFileState(tailFile, null, 
null, 0L, firstFile.lastModified() + 1L, firstFile.length(), null, 
tfo.getState().getBuffer()));
-
                                 persistState(tfo, session, context);
                             }
                         } else {
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index ebb752c..a62fbea 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.standard;
 
+import org.apache.commons.lang3.SystemUtils;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.processors.standard.TailFile.TailFileState;
@@ -25,6 +26,7 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -312,6 +314,77 @@ public class TestTailFile {
 
 
     @Test
+    public void testFileWrittenToAfterRollover() throws IOException, 
InterruptedException {
+        Assume.assumeTrue("Test requires renaming a file while a file handle 
is still open to it, so it won't run on Windows", !SystemUtils.IS_OS_WINDOWS);
+
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+        runner.setProperty(TailFile.START_POSITION, 
TailFile.START_BEGINNING_OF_TIME.getValue());
+        runner.setProperty(TailFile.REREAD_ON_NUL, "true");
+        runner.setProperty(TailFile.POST_ROLLOVER_TAIL_PERIOD, "10 mins");
+
+        raf.write("a\nb\n".getBytes());
+        runner.run(1, false, true);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("a\nb\n");
+        runner.clearTransferState();
+
+        raf.write("c\n".getBytes());
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("c\n");
+        runner.clearTransferState();
+
+        // Write additional data to file, then roll file over
+        raf.write("d\n".getBytes());
+
+        final File rolledFile = new File("target/log.1");
+        final boolean renamed = file.renameTo(rolledFile);
+        assertTrue(renamed);
+        raf.getChannel().force(true);
+
+        System.out.println("Wrote d\\n and rolled file");
+
+        // Create the new file
+        final RandomAccessFile newFile = new RandomAccessFile(new 
File("target/log.txt"), "rw");
+        newFile.write("new file\n".getBytes()); // This should not get 
consumed until the old file's last modified date indicates it's complete
+
+        // Trigger processor and verify data is consumed properly
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("d\n");
+        runner.clearTransferState();
+
+        // Write to the file and trigger again.
+        raf.write("e\n".getBytes());
+        System.out.println("Wrote e\\n");
+        runner.run(1, false, false);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("e\n");
+        runner.clearTransferState();
+
+        // Ensure that no data comes in for a bit, since the last modified 
date on the rolled over file isn't old enough.
+        for (int i=0; i < 100; i++) {
+            runner.run(1, false, false);
+            runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+            Thread.sleep(1L);
+        }
+
+        // Set last modified time so that processor believes file to have not 
been modified in a very long time, then run again.
+        assertTrue(rolledFile.setLastModified(500L));
+        System.out.println("Set lastModified on " + rolledFile + " to 500");
+        runner.run(1, false, false);
+
+        // Verify results
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("new
 file\n");
+        runner.clearTransferState();
+
+        raf.close();
+    }
+
+
+    @Test
     public void testConsumeAfterTruncationStartAtBeginningOfFile() throws 
IOException, InterruptedException {
         runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");
         runner.setProperty(TailFile.START_POSITION, 
TailFile.START_CURRENT_FILE.getValue());

Reply via email to