This is an automated email from the ASF dual-hosted git repository.

tpalfy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2edf551  NIFI-8344: If configured to tail a file for some period of 
time post-rollover, ensure that we only consume lines that are fully written 
(i.e., end with a newline). Once we stop tailing that file post-rollover, 
consume any data from that file that has not yet been consumed, up to the end 
of the file, even if there is no newline.
2edf551 is described below

commit 2edf5514b72691fb77a3fe391a48808155d3b29b
Author: Mark Payne <[email protected]>
AuthorDate: Thu Mar 25 14:28:47 2021 -0400

    NIFI-8344: If configured to tail a file for some period of time 
post-rollover, ensure that we only consume lines that are fully written (i.e., 
end with a newline). Once we stop tailing that file post-rollover, consume any 
data from that file that has not yet been consumed, up to the end of the file, 
even if there is no newline.
    
    This closes #4937.
    
    Signed-off-by: Tamas Palfy <[email protected]>
---
 .../apache/nifi/processors/standard/TailFile.java  | 271 +++++++++++++--------
 .../nifi/processors/standard/TestTailFile.java     |  10 +-
 2 files changed, 178 insertions(+), 103 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 6ca3f23..01cf285 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -46,6 +46,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.TailFile.TailFileState.StateKeys;
 import org.apache.nifi.stream.io.NullOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 
@@ -176,7 +177,9 @@ public class TailFile extends AbstractProcessor {
         .name("Post-Rollover Tail Period")
         .description("When a file is rolled over, the processor will continue 
tailing the rolled over file until it has not been modified for this amount of 
time. " +
             "This allows for another process to rollover a file, and then 
flush out any buffered data. Note that when this value is set, and the tailed 
file rolls over, " +
-            "the new file will not be tailed until the old file has not been 
modified for the configured amount of time.")
+            "the new file will not be tailed until the old file has not been 
modified for the configured amount of time. Additionally, when using this 
capability, in order to avoid data " +
+            "duplication, this period must be set longer than the Processor's 
Run Schedule, and the Processor must not be stopped after the file being tailed 
has been " +
+            "rolled over and before the data has been fully consumed. 
Otherwise, the data may be duplicated, as the entire file may be written out as 
the contents of a single FlowFile.")
         .required(false)
         .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
         .expressionLanguageSupported(NONE)
@@ -253,7 +256,7 @@ public class TailFile extends AbstractProcessor {
             .description("All FlowFiles are routed to this Relationship.")
             .build();
 
-    private volatile Map<String, TailFileObject> states = new HashMap<String, 
TailFileObject>();
+    private volatile Map<String, TailFileObject> states = new HashMap<>();
     private volatile AtomicLong lastLookup = new AtomicLong(0L);
     private volatile AtomicBoolean isMultiChanging = new AtomicBoolean(false);
     private volatile boolean requireStateLookup = true;
@@ -283,7 +286,7 @@ public class TailFile extends AbstractProcessor {
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         if (isConfigurationRestored() && FILENAME.equals(descriptor)) {
-            states = new HashMap<String, TailFileObject>();
+            states = new HashMap<>();
         }
     }
 
@@ -324,7 +327,7 @@ public class TailFile extends AbstractProcessor {
         long maxAge = context.getProperty(MAXIMUM_AGE).getValue() == null ? 
Long.MAX_VALUE : 
context.getProperty(MAXIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
 
         // get list of files to tail
-        List<String> filesToTail = new ArrayList<String>();
+        List<String> filesToTail = new ArrayList<>();
 
         
if(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
             
filesToTail.addAll(getFilesToTail(context.getProperty(BASE_DIRECTORY).evaluateAttributeExpressions().getValue(),
@@ -394,7 +397,7 @@ public class TailFile extends AbstractProcessor {
             if( states.isEmpty() && !statesMap.isEmpty()) {
                 for (String key : statesMap.keySet()) {
                     if (key.endsWith(TailFileState.StateKeys.FILENAME)) {
-                        int index = Integer.valueOf(key.split("\\.")[1]);
+                        int index = Integer.parseInt(key.split("\\.")[1]);
                         states.put(statesMap.get(key), new 
TailFileObject(index, statesMap));
                     }
                 }
@@ -423,7 +426,7 @@ public class TailFile extends AbstractProcessor {
         for (String filename : filesToTail) {
             if (isCleared || !states.containsKey(filename)) {
                 final TailFileState tailFileState = new 
TailFileState(filename, null, null, 0L, 0L, 0L, null, 
ByteBuffer.allocate(65536));
-                states.put(filename, new TailFileObject(fileIndex, 
tailFileState, true));
+                states.put(filename, new TailFileObject(fileIndex, 
tailFileState));
 
                 fileIndex++;
             }
@@ -446,7 +449,7 @@ public class TailFile extends AbstractProcessor {
      */
     private List<String> getFilesToTail(final String baseDir, String 
fileRegex, boolean isRecursive, long maxAge) {
         final Collection<File> files = FileUtils.listFiles(new File(baseDir), 
null, isRecursive);
-        final List<String> result = new ArrayList<String>();
+        final List<String> result = new ArrayList<>();
 
         final String baseDirNoTrailingSeparator = 
baseDir.endsWith(File.separator) ? baseDir.substring(0, baseDir.length() - 1) : 
baseDir;
         final String fullRegex;
@@ -1042,7 +1045,7 @@ public class TailFile extends AbstractProcessor {
         // Sort files based on last modified timestamp. If same timestamp, use 
filename as a secondary sort, as often
         // files that are rolled over are given a naming scheme that is 
lexicographically sort in the same order as the
         // timestamp, such as yyyy-MM-dd-HH-mm-ss
-        Collections.sort(rolledOffFiles, new Comparator<File>() {
+        rolledOffFiles.sort(new Comparator<File>() {
             @Override
             public int compare(final File o1, final File o2) {
                 final int lastModifiedComp = Long.compare(o1.lastModified(), 
o2.lastModified());
@@ -1073,7 +1076,7 @@ public class TailFile extends AbstractProcessor {
     private void persistState(final Map<String, String> state, final 
ProcessSession session, final ProcessContext context) {
         try {
             final StateMap oldState = session.getState(getStateScope(context));
-            Map<String, String> updatedState = new HashMap<String, String>();
+            Map<String, String> updatedState = new HashMap<>();
 
             for(String key : oldState.toMap().keySet()) {
                 // These states are stored by older version of NiFi, and won't 
be used anymore.
@@ -1191,88 +1194,71 @@ public class TailFile extends AbstractProcessor {
             getLogger().debug("Recovering Rolled Off Files; total number of 
files rolled off = {}", new Object[]{rolledOffFiles.size()});
             TailFileObject tfo = states.get(tailFile);
 
+            final long postRolloverTailMillis = 
context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+            final boolean tailingPostRollover = 
tfo.getState().isTailingPostRollover();
+            final boolean shouldTailPostRollover = postRolloverTailMillis > 0;
+
             // For first file that we find, it may or may not be the file that 
we were last reading from.
             // As a result, we have to read up to the position we stored, 
while calculating the checksum. If the checksums match,
             // then we know we've already processed this file. If the 
checksums do not match, then we have not
             // processed this file and we need to seek back to position 0 and 
ingest the entire file.
             // For all other files that have been rolled over, we need to just 
ingest the entire file.
             boolean rolloverOccurred = !rolledOffFiles.isEmpty();
-            if (rolloverOccurred && expectedChecksum != null && 
rolledOffFiles.get(0).length() >= position) {
+
+            final boolean tailFirstFile;
+            if (rolloverOccurred) {
                 final File firstFile = rolledOffFiles.get(0);
+                final long millisSinceModified = System.currentTimeMillis() - 
firstFile.lastModified();
+                final boolean fileGrew = firstFile.length() >= position && 
position > 0;
+                final boolean tailRolledFile = postRolloverTailMillis == 0 || 
millisSinceModified < postRolloverTailMillis;
+                tailFirstFile = fileGrew && tailRolledFile && expectedChecksum 
!= null;
+            } else {
+                tailFirstFile = false;
+            }
 
-                final long startNanos = System.nanoTime();
-                final Boolean reReadOnNul = 
context.getProperty(REREAD_ON_NUL).asBoolean();
-                if (position > 0) {
-                    try (final FileInputStream fis = new 
FileInputStream(firstFile);
-                            final CheckedInputStream in = new 
CheckedInputStream(fis, new CRC32())) {
-                        StreamUtils.copy(in, new NullOutputStream(), position);
+            if (tailFirstFile) {
+                final File firstFile = rolledOffFiles.get(0);
 
-                        final long checksumResult = 
in.getChecksum().getValue();
-                        if (checksumResult == expectedChecksum) {
-                            getLogger().debug("Checksum for {} matched 
expected checksum. Will skip first {} bytes", new Object[]{firstFile, 
position});
-
-                            // This is the same file that we were reading when 
we shutdown. Start reading from this point on.
-                            rolledOffFiles.remove(0);
-                            FlowFile flowFile = session.create();
-
-                            final TailFileState currentState = tfo.getState();
-                            final Checksum checksum = 
currentState.getChecksum() == null ? new CRC32() : currentState.getChecksum();
-                            final ByteBuffer buffer = currentState.getBuffer() 
== null ? ByteBuffer.allocate(65536) : currentState.getBuffer();
-                            final FileChannel channel = fis.getChannel();
-                            final long timestamp = firstFile.lastModified();
-
-                            try {
-                                flowFile = session.write(flowFile, out -> 
readLines(channel, buffer, out, checksum, reReadOnNul, true));
-                            } catch (NulCharacterEncounteredException ncee) {
-                                rolledOffFiles.add(0, firstFile);
-                                session.remove(flowFile);
-                                throw ncee;
-                            }
+                final boolean consumed;
+                if (shouldTailPostRollover) {
+                    // User has configured to continue tailing file after it 
has been rolled over, until it's no longer being modified.
+                    // Consume any newly added lines from the rolled over 
file, but do not consume the last line, if it doesn't have a newline.
+                    // Keep the state indicating that we are currently tailing 
a file post-rollover.
+                    consumed = tailRolledFile(context, session, tailFile, 
expectedChecksum, position, tfo, firstFile, false, true);
+                } else {
+                    // User has not configured to continue tailing file after 
it has been rolled over. If any data was written to the rolled file before
+                    // rolling it over, consume that data, up to the end of 
the file, including the last line, even if it doesn't have a newline.
+                    consumed = tailRolledFile(context, session, tailFile, 
expectedChecksum, position, tfo, firstFile, true, false);
+                }
 
-                            if (flowFile.getSize() == 0L) {
-                                session.remove(flowFile);
-                            } else {
-                                final Map<String, String> attributes = new 
HashMap<>(3);
-                                attributes.put(CoreAttributes.FILENAME.key(), 
firstFile.getName());
-                                attributes.put(CoreAttributes.MIME_TYPE.key(), 
"text/plain");
-                                attributes.put("tailfile.original.path", 
tailFile);
-                                flowFile = session.putAllAttributes(flowFile, 
attributes);
-
-                                
session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), 
"FlowFile contains bytes 0 through " + position + " of source file",
-                                        
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
-                                session.transfer(flowFile, REL_SUCCESS);
-                                getLogger().debug("Created {} from rolled over 
file {} and routed to success", new Object[]{flowFile, firstFile});
-                            }
+                if (consumed) {
+                    rolledOffFiles.remove(0);
+                }
+            } else if (tailingPostRollover && shouldTailPostRollover) {
+                // This condition is encountered when we are tailing a file 
post-rollover, and we've now reached the point where the rolled file
+                // has not changed.
+                final List<File> allRolledFiles = getRolledOffFiles(context, 
0L, tailFile);
+                
allRolledFiles.sort(Comparator.comparing(File::lastModified).reversed());
+                final File newestFile = allRolledFiles.get(0);
+
+                // If we don't notice that the file has been modified, per the 
checks above, then we want to keep checking until the last modified
+                // date has eclipsed the configured value for the 
Post-Rollover Tail Period. Until then, return false. Once that occurs, we will
+                // consume the rest of the data, including the last line, even 
if it doesn't have a line ending.
+                final long millisSinceModified = System.currentTimeMillis() - 
newestFile.lastModified();
+                if (millisSinceModified < postRolloverTailMillis) {
+                    getLogger().debug("Rolled over file {} (size={}, 
lastModified={}) was modified {} millis ago, which isn't long enough to consume 
file fully without taking line endings into " +
+                        "account. Will do nothing will file for now.", 
newestFile, newestFile.length(), newestFile.lastModified(), 
millisSinceModified);
+                    return true;
+                }
 
-                            // We need to update the state to account for the 
fact that we just brought data in.
-                            // If we are going to tail a rolled over file for 
some amount of time, then we need to keep the state pointing to the
-                            // same file, just using an updated 
position/timestamp/checksum/length. This way, the next iteration will compare 
against these
-                            // updated values.
-                            // But if we are not going to tail the rolled over 
file for any period of time, we can essentially reset the state.
-                            final long postRolloverTailMillis = 
context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
-                            final long millisSinceUpdate = 
System.currentTimeMillis() - timestamp;
-                            if (postRolloverTailMillis > 0 && 
millisSinceUpdate < postRolloverTailMillis) {
-                                getLogger().debug("File {} has been rolled 
over, but it was updated {} millis ago, which is less than the configured {} 
({} ms), so will continue tailing",
-                                    firstFile, millisSinceUpdate, 
POST_ROLLOVER_TAIL_PERIOD.getDisplayName(), postRolloverTailMillis);
-
-                                final long length = currentState.getLength() + 
flowFile.getSize();
-                                final long updatedPosition = position + 
flowFile.getSize();
-                                final TailFileState updatedState = new 
TailFileState(currentState.getFilename(), currentState.getFile(), channel, 
updatedPosition, timestamp, length, checksum,
-                                    buffer);
-
-                                tfo.setState(updatedState);
-                                persistState(tfo, session, context);
-                            } else {
-                                // use a timestamp of lastModified() + 1 so 
that we do not ingest this file again.
-                                cleanup();
-                                tfo.setState(new TailFileState(tailFile, null, 
null, 0L, firstFile.lastModified() + 1L, firstFile.length(), null, 
tfo.getState().getBuffer()));
-                                persistState(tfo, session, context);
-                            }
-                        } else {
-                            getLogger().debug("Checksum for {} did not match 
expected checksum. Checksum for file was {} but expected {}. Will consume 
entire file",
-                                    new Object[]{firstFile, checksumResult, 
expectedChecksum});
-                        }
-                    }
+                // The file has been rolled over and is no longer being 
written to. Consume all the way to the end of the file, including the last line,
+                // even if it does not have a newline after it.
+                final boolean consumed = tailRolledFile(context, session, 
tailFile, expectedChecksum, position, tfo, newestFile, true, false);
+                if (consumed) {
+                    getLogger().debug("Consumed the final data from {}", 
newestFile);
+                    rolledOffFiles.remove(newestFile);
+                } else {
+                    getLogger().debug("No more data to consume from {} 
(size={}, lastModified={})", newestFile, newestFile.length(), 
newestFile.lastModified());
                 }
             }
 
@@ -1291,6 +1277,87 @@ public class TailFile extends AbstractProcessor {
         }
     }
 
+    private boolean tailRolledFile(final ProcessContext context, final 
ProcessSession session, final String tailFile, final Long expectedChecksum,
+                                final long position, final TailFileObject tfo, 
final File fileToTail, final boolean readFully, final boolean 
tailingPostRollover) throws IOException {
+
+        final Boolean reReadOnNul = 
context.getProperty(REREAD_ON_NUL).asBoolean();
+        final long startNanos = System.nanoTime();
+
+        try (final FileInputStream fis = new FileInputStream(fileToTail);
+             final CheckedInputStream in = new CheckedInputStream(fis, new 
CRC32())) {
+            StreamUtils.copy(in, new NullOutputStream(), position);
+
+            final long checksumResult = in.getChecksum().getValue();
+            if (checksumResult != expectedChecksum) {
+                getLogger().debug("Checksum for {} did not match expected 
checksum. Checksum for file was {} but expected {}. Will consume entire file",
+                    new Object[]{fileToTail, checksumResult, 
expectedChecksum});
+
+                return false;
+            }
+
+            getLogger().debug("Checksum for {} matched expected checksum. Will 
skip first {} bytes", new Object[]{fileToTail, position});
+
+            // This is the same file that we were reading when we shutdown. 
Start reading from this point on.
+            FlowFile flowFile = session.create();
+
+            final TailFileState currentState = tfo.getState();
+            final Checksum checksum = currentState.getChecksum() == null ? new 
CRC32() : currentState.getChecksum();
+            final ByteBuffer buffer = currentState.getBuffer() == null ? 
ByteBuffer.allocate(65536) : currentState.getBuffer();
+            final FileChannel channel = fis.getChannel();
+            final long timestamp = fileToTail.lastModified();
+
+            try {
+                flowFile = session.write(flowFile, out -> readLines(channel, 
buffer, out, checksum, reReadOnNul, readFully));
+            } catch (NulCharacterEncounteredException ncee) {
+                session.remove(flowFile);
+                throw ncee;
+            }
+
+            if (flowFile.getSize() == 0L) {
+                session.remove(flowFile);
+            } else {
+                final Map<String, String> attributes = new HashMap<>(3);
+                attributes.put(CoreAttributes.FILENAME.key(), 
fileToTail.getName());
+                attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+                attributes.put("tailfile.original.path", tailFile);
+                flowFile = session.putAllAttributes(flowFile, attributes);
+
+                session.getProvenanceReporter().receive(flowFile, 
fileToTail.toURI().toString(), "FlowFile contains bytes 0 through " + position 
+ " of source file",
+                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNanos));
+                session.transfer(flowFile, REL_SUCCESS);
+                getLogger().debug("Created {} from rolled over file {} and 
routed to success", new Object[]{flowFile, fileToTail});
+            }
+
+            // We need to update the state to account for the fact that we 
just brought data in.
+            // If we are going to tail a rolled over file for some amount of 
time, then we need to keep the state pointing to the
+            // same file, just using an updated 
position/timestamp/checksum/length. This way, the next iteration will compare 
against these
+            // updated values.
+            // But if we are not going to tail the rolled over file for any 
period of time, we can essentially reset the state.
+            final long postRolloverTailMillis = 
context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS);
+            final long millisSinceUpdate = System.currentTimeMillis() - 
timestamp;
+            if (tailingPostRollover && postRolloverTailMillis > 0) {
+                getLogger().debug("File {} has been rolled over, but it was 
updated {} millis ago, which is less than the configured {} ({} ms), so will 
continue tailing",
+                    fileToTail, millisSinceUpdate, 
POST_ROLLOVER_TAIL_PERIOD.getDisplayName(), postRolloverTailMillis);
+
+                final long length = currentState.getLength() + 
flowFile.getSize();
+                final long updatedPosition = position + flowFile.getSize();
+                final TailFileState updatedState = new 
TailFileState(currentState.getFilename(), currentState.getFile(), channel, 
updatedPosition, timestamp, length, checksum,
+                    buffer, tailingPostRollover);
+
+                tfo.setState(updatedState);
+            } else {
+                // use a timestamp of lastModified() + 1 so that we do not 
ingest this file again.
+                getLogger().debug("Completed tailing of file {}; will cleanup 
state", tailFile);
+                cleanup();
+                tfo.setState(new TailFileState(tailFile, null, null, 0L, 
fileToTail.lastModified() + 1L, fileToTail.length(), null, 
tfo.getState().getBuffer(), tailingPostRollover));
+            }
+
+            persistState(tfo, session, context);
+            return true;
+        }
+    }
+
+
     /**
      * Creates a new FlowFile that contains the entire contents of the given
      * file and transfers that FlowFile to success. This method will commit the
@@ -1332,18 +1399,13 @@ public class TailFile extends AbstractProcessor {
 
     static class TailFileObject {
 
-        private TailFileState state = new TailFileState(null, null, null, 0L, 
0L, 0L, null, ByteBuffer.allocate(65536));
+        private TailFileState state;
         private Long expectedRecoveryChecksum;
         private int filenameIndex;
         private boolean tailFileChanged = true;
 
-        public TailFileObject(int index) {
-            this.filenameIndex = index;
-        }
-
-        public TailFileObject(final int index, final TailFileState fileState, 
final boolean tailFileChanged) {
+        public TailFileObject(final int index, final TailFileState fileState) {
             this.filenameIndex = index;
-            this.tailFileChanged = true;
             this.state = fileState;
         }
 
@@ -1352,20 +1414,17 @@ public class TailFile extends AbstractProcessor {
             this.tailFileChanged = false;
             final String prefix = MAP_PREFIX + index + '.';
             final String filename = statesMap.get(prefix + 
TailFileState.StateKeys.FILENAME);
-            final long position = Long.valueOf(statesMap.get(prefix + 
TailFileState.StateKeys.POSITION));
-            final long timestamp = Long.valueOf(statesMap.get(prefix + 
TailFileState.StateKeys.TIMESTAMP));
-            final long length = Long.valueOf(statesMap.get(prefix + 
TailFileState.StateKeys.LENGTH));
-            this.state = new TailFileState(filename, new File(filename), null, 
position, timestamp, length, null, ByteBuffer.allocate(65536));
+            final long position = Long.parseLong(statesMap.get(prefix + 
TailFileState.StateKeys.POSITION));
+            final long timestamp = Long.parseLong(statesMap.get(prefix + 
TailFileState.StateKeys.TIMESTAMP));
+            final long length = Long.parseLong(statesMap.get(prefix + 
TailFileState.StateKeys.LENGTH));
+            final boolean tailingPostRollover = Boolean.parseBoolean(prefix + 
StateKeys.TAILING_POST_ROLLOVER);
+            this.state = new TailFileState(filename, new File(filename), null, 
position, timestamp, length, null, ByteBuffer.allocate(65536), 
tailingPostRollover);
         }
 
         public int getFilenameIndex() {
             return filenameIndex;
         }
 
-        public void setFilenameIndex(int filenameIndex) {
-            this.filenameIndex = filenameIndex;
-        }
-
         public TailFileState getState() {
             return state;
         }
@@ -1406,17 +1465,24 @@ public class TailFile extends AbstractProcessor {
         private final long length;
         private final Checksum checksum;
         private final ByteBuffer buffer;
+        private final boolean tailingPostRollover;
 
-        private static class StateKeys {
+        static class StateKeys {
             public static final String FILENAME = "filename";
             public static final String POSITION = "position";
             public static final String TIMESTAMP = "timestamp";
             public static final String CHECKSUM = "checksum";
             public static final String LENGTH = "length";
+            public static final String TAILING_POST_ROLLOVER = 
"tailingPostRollover";
+        }
+
+        public TailFileState(final String filename, final File file, final 
FileChannel reader, final long position, final long timestamp,
+                             final long length, final Checksum checksum, final 
ByteBuffer buffer) {
+            this(filename, file, reader, position, timestamp, length, 
checksum, buffer, false);
         }
 
-        public TailFileState(final String filename, final File file, final 
FileChannel reader,
-                final long position, final long timestamp, final long length, 
final Checksum checksum, final ByteBuffer buffer) {
+        public TailFileState(final String filename, final File file, final 
FileChannel reader, final long position, final long timestamp,
+                             final long length, final Checksum checksum, final 
ByteBuffer buffer, final boolean tailingPostRollover) {
             this.filename = filename;
             this.file = file;
             this.reader = reader;
@@ -1425,6 +1491,7 @@ public class TailFile extends AbstractProcessor {
             this.timestamp = timestamp; // many operating systems will use 
only second-level precision for last-modified times so cut off milliseconds
             this.checksum = checksum;
             this.buffer = buffer;
+            this.tailingPostRollover = tailingPostRollover;
         }
 
         public String getFilename() {
@@ -1459,9 +1526,14 @@ public class TailFile extends AbstractProcessor {
             return buffer;
         }
 
+        public boolean isTailingPostRollover() {
+            return tailingPostRollover;
+        }
+
         @Override
         public String toString() {
-            return "TailFileState[filename=" + filename + ", position=" + 
position + ", timestamp=" + timestamp + ", checksum=" + (checksum == null ? 
"null" : checksum.getValue()) + "]";
+            return "TailFileState[filename=" + filename + ", position=" + 
position + ", timestamp=" + timestamp + ", checksum=" + (checksum == null ? 
"null" : checksum.getValue()) +
+                ", tailingPostRollover=" + tailingPostRollover + "]";
         }
 
         public Map<String, String> toStateMap(int index) {
@@ -1472,6 +1544,7 @@ public class TailFile extends AbstractProcessor {
             map.put(prefix + StateKeys.LENGTH, String.valueOf(length));
             map.put(prefix + StateKeys.TIMESTAMP, String.valueOf(timestamp));
             map.put(prefix + StateKeys.CHECKSUM, checksum == null ? null : 
String.valueOf(checksum.getValue()));
+            map.put(prefix + StateKeys.TAILING_POST_ROLLOVER, 
String.valueOf(tailingPostRollover));
             return map;
         }
     }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index a62fbea..c279c3f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -347,6 +347,7 @@ public class TestTailFile {
         // Create the new file
         final RandomAccessFile newFile = new RandomAccessFile(new 
File("target/log.txt"), "rw");
         newFile.write("new file\n".getBytes()); // This should not get 
consumed until the old file's last modified date indicates it's complete
+        newFile.close();
 
         // Trigger processor and verify data is consumed properly
         runner.run(1, false, false);
@@ -355,8 +356,8 @@ public class TestTailFile {
         runner.clearTransferState();
 
         // Write to the file and trigger again.
-        raf.write("e\n".getBytes());
-        System.out.println("Wrote e\\n");
+        raf.write("e\nf".getBytes());
+        System.out.println("Wrote e\\nf");
         runner.run(1, false, false);
 
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
@@ -376,8 +377,9 @@ public class TestTailFile {
         runner.run(1, false, false);
 
         // Verify results
-        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
-        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("new
 file\n");
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("f");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("new
 file\n");
         runner.clearTransferState();
 
         raf.close();

Reply via email to