NIFI-1171: Ensure that we pick up changes when files roll over and ensure that 
we don't pick up the rolled over file multiple times


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2516b1da
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2516b1da
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2516b1da

Branch: refs/heads/NIFI-655
Commit: 2516b1dad24e39efaeac49a244eea42847602e06
Parents: 8c2323d
Author: Mark Payne <[email protected]>
Authored: Mon Nov 23 11:59:23 2015 -0500
Committer: Mark Payne <[email protected]>
Committed: Mon Nov 23 14:11:14 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/TailFile.java      | 682 ++++++++++---------
 .../nifi/processors/standard/TestTailFile.java  | 151 +++-
 2 files changed, 487 insertions(+), 346 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2516b1da/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 1c32c00..3d6d3a0 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -47,8 +47,8 @@ import java.util.zip.Checksum;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
@@ -71,12 +71,14 @@ import org.apache.nifi.util.LongHolder;
 
 // note: it is important that this Processor is not marked as 
@SupportsBatching because the session commits must complete before persisting 
state locally; otherwise, data loss may occur
 @TriggerSerially
+@InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"tail", "file", "log", "text", "source"})
 @CapabilityDescription("\"Tails\" a file, ingesting data from the file as it 
is written to the file. The file is expected to be textual. Data is ingested 
only when a "
     + "new line is encountered (carriage return or new-line character or 
combination). If the file to tail is periodically \"rolled over\", as is 
generally the case "
     + "with log files, an optional Rolling Filename Pattern can be used to 
retrieve data from files that have rolled over, even if the rollover occurred 
while NiFi "
-    + "was not running (provided that the data still exists upon restart of 
NiFi).")
-@InputRequirement(Requirement.INPUT_FORBIDDEN)
+    + "was not running (provided that the data still exists upon restart of 
NiFi). It is generally advisable to set the Run Schedule to a few seconds, 
rather than running "
+    + "with the default value of 0 secs, as this Processor will consume a lot 
of resources if scheduled very aggressively. At this time, this Processor does 
not support "
+    + "ingesting files that have been compressed when 'rolled over'.")
 public class TailFile extends AbstractProcessor {
 
     static final AllowableValue START_BEGINNING_OF_TIME = new 
AllowableValue("Beginning of Time", "Beginning of Time",
@@ -124,7 +126,6 @@ public class TailFile extends AbstractProcessor {
         .build();
 
     private volatile TailFileState state = new TailFileState(null, null, null, 
0L, 0L, null, ByteBuffer.allocate(65536));
-    private volatile boolean recoveredRolledFiles = false;
     private volatile Long expectedRecoveryChecksum;
     private volatile boolean tailFileChanged = false;
 
@@ -147,17 +148,12 @@ public class TailFile extends AbstractProcessor {
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         if (FILENAME.equals(descriptor)) {
             state = new TailFileState(newValue, null, null, 0L, 0L, null, 
ByteBuffer.allocate(65536));
-            recoveredRolledFiles = false;
             tailFileChanged = true;
-        } else if (ROLLING_FILENAME_PATTERN.equals(descriptor)) {
-            recoveredRolledFiles = false;
         }
     }
 
     @OnScheduled
     public void recoverState(final ProcessContext context) throws IOException {
-        recoveredRolledFiles = false;
-
         final String tailFilename = context.getProperty(FILENAME).getValue();
         final String stateFilename = 
context.getProperty(STATE_FILE).getValue();
 
@@ -172,7 +168,7 @@ public class TailFile extends AbstractProcessor {
 
             if (encodingVersion == 0) {
                 final String filename = dis.readUTF();
-                final long position = dis.readLong();
+                long position = dis.readLong();
                 final long timestamp = dis.readLong();
                 final boolean checksumPresent = dis.readBoolean();
 
@@ -202,18 +198,25 @@ public class TailFile extends AbstractProcessor {
                                 getLogger().debug("When recovering state, 
checksum of tailed file matches the stored checksum. Will resume where left 
off.");
                                 tailFile = existingTailFile;
                                 reader = FileChannel.open(tailFile.toPath(), 
StandardOpenOption.READ);
-                                getLogger().debug("Created RandomAccessFile {} 
for {} in recoverState", new Object[] {reader, tailFile});
+                                getLogger().debug("Created FileChannel {} for 
{} in recoverState", new Object[] {reader, tailFile});
 
                                 reader.position(position);
                             } else {
+                                // we don't seek the reader to the position, 
so our reader will start at beginning of file.
                                 getLogger().debug("When recovering state, 
checksum of tailed file does not match the stored checksum. Will begin tailing 
current file from beginning.");
                             }
                         }
+                    } else {
+                        // fewer bytes than our position, so we know we 
weren't already reading from this file. Keep reader at a position of 0.
+                        getLogger().debug("When recovering state, existing 
file to tail is only {} bytes but position flag is {}; "
+                            + "this indicates that the file has rotated. Will 
begin tailing current file from beginning.", new Object[] 
{existingTailFile.length(), position});
                     }
 
                     state = new TailFileState(tailFilename, tailFile, reader, 
position, timestamp, checksum, ByteBuffer.allocate(65536));
                 } else {
+                    // If filename changed or there is no checksum present, 
then we have no expected checksum to use for recovery.
                     expectedRecoveryChecksum = null;
+
                     // tailing a new file since the state file was written 
out. We will reset state.
                     state = new TailFileState(tailFilename, null, null, 0L, 
0L, null, ByteBuffer.allocate(65536));
                 }
@@ -234,224 +237,79 @@ public class TailFile extends AbstractProcessor {
             return;
         }
 
-        final FileChannel channel = state.getReader();
-        if (channel == null) {
+        final FileChannel reader = state.getReader();
+        if (reader == null) {
             return;
         }
 
         try {
-            channel.close();
+            reader.close();
         } catch (final IOException ioe) {
             getLogger().warn("Failed to close file handle during cleanup");
         }
 
-        getLogger().debug("Closed FileChannel {}", new Object[] {channel});
+        getLogger().debug("Closed FileChannel {}", new Object[] {reader});
 
         this.state = new TailFileState(state.getFilename(), state.getFile(), 
null, state.getPosition(), state.getTimestamp(), state.getChecksum(), 
state.getBuffer());
     }
 
 
-    public void persistState(final TailFileState state, final String 
stateFilename) throws IOException {
-        getLogger().debug("Persisting state {} to {}", new Object[] {state, 
stateFilename});
 
-        final File stateFile = new File(stateFilename);
-        File directory = stateFile.getParentFile();
-        if (directory != null && !directory.exists() && !directory.mkdirs()) {
-            getLogger().warn("Failed to persist state to {} because the parent 
directory does not exist and could not be created. This may result in data 
being duplicated upon restart of NiFi");
-            return;
-        }
-        try (final FileOutputStream fos = new FileOutputStream(stateFile);
-            final DataOutputStream dos = new DataOutputStream(fos)) {
-
-            dos.writeInt(0); // version
-            dos.writeUTF(state.getFilename());
-            dos.writeLong(state.getPosition());
-            dos.writeLong(state.getTimestamp());
-            if (state.getChecksum() == null) {
-                dos.writeBoolean(false);
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        // If user changes the file that is being tailed, we need to consume 
the already-rolled-over data according
+        // to the Initial Start Position property
+        boolean rolloverOccurred;
+        if (tailFileChanged) {
+            rolloverOccurred = false;
+            final String recoverPosition = 
context.getProperty(START_POSITION).getValue();
+
+            if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
+                recoverRolledFiles(context, session, 
this.expectedRecoveryChecksum, state.getTimestamp(), state.getPosition());
+            } else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) {
+                cleanup();
+                state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 0L, 
null, state.getBuffer());
             } else {
-                dos.writeBoolean(true);
-                dos.writeLong(state.getChecksum().getValue());
-            }
-        }
-    }
-
-    private FileChannel createReader(final File file, final long position) {
-        final FileChannel reader;
-
-        try {
-            reader = FileChannel.open(file.toPath(), StandardOpenOption.READ);
-        } catch (final IOException ioe) {
-            getLogger().warn("Could not open {} due to {}; will attempt to 
access file again after the configured Yield Duration has elapsed", new 
Object[] {file, ioe});
-            return null;
-        }
+                final String filename = 
context.getProperty(FILENAME).getValue();
+                final File file = new File(filename);
 
-        getLogger().debug("Created RandomAccessFile {} for {}", new Object[] 
{reader, file});
+                try {
+                    final FileChannel fileChannel = 
FileChannel.open(file.toPath(), StandardOpenOption.READ);
+                    getLogger().debug("Created FileChannel {} for {}", new 
Object[] {fileChannel, file});
 
-        try {
-            reader.position(position);
-        } catch (final IOException ioe) {
-            getLogger().error("Failed to read from {} due to {}", new Object[] 
{file, ioe});
-
-            try {
-                reader.close();
-                getLogger().debug("Closed RandomAccessFile {}", new Object[] 
{reader});
-            } catch (final IOException ioe2) {
-            }
-
-            return null;
-        }
-
-        return reader;
-    }
-
-    // for testing purposes
-    TailFileState getState() {
-        return state;
-    }
-
-
-
-    /**
-     * Finds any files that have rolled over and have not yet been ingested by 
this Processor. Each of these files that is found will be
-     * ingested as its own FlowFile. If a file is found that has been 
partially ingested, the rest of the file will be ingested as a
-     * single FlowFile but the data that already has been ingested will not be 
ingested again.
-     *
-     * @param context the ProcessContext to use in order to obtain Processor 
configuration
-     * @param session the ProcessSession to use in order to interact with 
FlowFile creation and content.
-     */
-    private void recoverRolledFiles(final ProcessContext context, final 
ProcessSession session) {
-        try {
-            // Find all files that match our rollover pattern, if any, and 
order them based on their timestamp and filename.
-            // Ignore any file that has a timestamp earlier than the state 
that we have persisted. If we were reading from
-            // a file when we stopped running, then that file that we were 
reading from should be the first file in this list,
-            // assuming that the file still exists on the file system.
-            final List<File> rolledOffFiles = getRolledOffFiles(context, 
state.getTimestamp());
-            getLogger().debug("Recovering Rolled Off Files; total number of 
files rolled off = {}", new Object[] {rolledOffFiles.size()});
-
-            // For first file that we find, it may or may not be the file that 
we were last reading from.
-            // As a result, we have to read up to the position we stored, 
while calculating the checksum. If the checksums match,
-            // then we know we've already processed this file. If the 
checksums do not match, then we have not
-            // processed this file and we need to seek back to position 0 and 
ingest the entire file.
-            // For all other files that have been rolled over, we need to just 
ingest the entire file.
-            if (!rolledOffFiles.isEmpty() && expectedRecoveryChecksum != null 
&& rolledOffFiles.get(0).length() >= state.getPosition()) {
-                final File firstFile = rolledOffFiles.get(0);
-
-                final long startNanos = System.nanoTime();
-                try (final InputStream fis = new FileInputStream(firstFile);
-                    final CheckedInputStream in = new CheckedInputStream(fis, 
new CRC32())) {
-                    StreamUtils.copy(in, new NullOutputStream(), 
state.getPosition());
-
-                    final long checksumResult = in.getChecksum().getValue();
-                    if (checksumResult == expectedRecoveryChecksum) {
-                        getLogger().debug("Checksum for {} matched expected 
checksum. Will skip first {} bytes", new Object[] {firstFile, 
state.getPosition()});
-
-                        // This is the same file that we were reading when we 
shutdown. Start reading from this point on.
-                        rolledOffFiles.remove(0);
-                        FlowFile flowFile = session.create();
-                        flowFile = session.importFrom(in, flowFile);
-                        if (flowFile.getSize() == 0L) {
-                            session.remove(flowFile);
-                            // use a timestamp of lastModified() + 1 so that 
we do not ingest this file again.
-                            cleanup();
-                            state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 
firstFile.lastModified() + 1L, null, state.getBuffer());
-                        } else {
-                            flowFile = session.putAttribute(flowFile, 
"filename", firstFile.getName());
-
-                            session.getProvenanceReporter().receive(flowFile, 
firstFile.toURI().toString(), "FlowFile contains bytes 0 through " + 
state.getPosition() + " of source file",
-                                
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
-                            session.transfer(flowFile, REL_SUCCESS);
-
-                            // use a timestamp of lastModified() + 1 so that 
we do not ingest this file again.
-                            cleanup();
-                            state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 
firstFile.lastModified() + 1L, null, state.getBuffer());
+                    final Checksum checksum = new CRC32();
+                    final long position = file.length();
+                    final long timestamp = file.lastModified();
 
-                            // must ensure that we do session.commit() before 
persisting state in order to avoid data loss.
-                            session.commit();
-                            persistState(state, 
context.getProperty(STATE_FILE).getValue());
-                        }
-                    } else {
-                        getLogger().debug("Checksum for {} did not match 
expected checksum. Checksum for file was {} but expected {}. Will consume 
entire file",
-                            new Object[] {firstFile, checksumResult, 
expectedRecoveryChecksum});
+                    try (final InputStream fis = new FileInputStream(file);
+                        final CheckedInputStream in = new 
CheckedInputStream(fis, checksum)) {
+                        StreamUtils.copy(in, new NullOutputStream(), position);
                     }
-                }
-            }
-
-            // For each file that we found that matches our Rollover Pattern, 
and has a last modified date later than the timestamp
-            // that we recovered from the state file, we need to consume the 
entire file. The only exception to this is the file that
-            // we were reading when we last stopped, as it may already have 
been partially consumed. That is taken care of in the
-            // above block of code.
-            for (final File file : rolledOffFiles) {
-                FlowFile flowFile = session.create();
-                flowFile = session.importFrom(file.toPath(), true, flowFile);
-                if (flowFile.getSize() == 0L) {
-                    session.remove(flowFile);
-                } else {
-                    flowFile = session.putAttribute(flowFile, "filename", 
file.getName());
-                    session.getProvenanceReporter().receive(flowFile, 
file.toURI().toString());
-                    session.transfer(flowFile, REL_SUCCESS);
 
-                    // use a timestamp of lastModified() + 1 so that we do not 
ingest this file again.
+                    fileChannel.position(position);
                     cleanup();
-                    state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 
file.lastModified() + 1L, null, state.getBuffer());
-
-                    // must ensure that we do session.commit() before 
persisting state in order to avoid data loss.
-                    session.commit();
-                    persistState(state, 
context.getProperty(STATE_FILE).getValue());
+                    state = new TailFileState(filename, file, fileChannel, 
position, timestamp, checksum, state.getBuffer());
+                } catch (final IOException ioe) {
+                    getLogger().error("Attempted to position Reader at current 
position in file {} but failed to do so due to {}", new Object[] {file, 
ioe.toString()}, ioe);
+                    context.yield();
+                    return;
                 }
             }
-        } catch (final IOException e) {
-            getLogger().error("Failed to recover files that have rolled over 
due to {}", new Object[] {e});
-        }
-    }
-
-
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
-        // If this is the first time the processor has run since it was 
started, we need to check for any files that may have rolled over
-        // while the processor was stopped. If we find any, we need to import 
them into the flow.
-        if (!recoveredRolledFiles) {
-            if (tailFileChanged) {
-                final String recoverPosition = 
context.getProperty(START_POSITION).getValue();
-
-                if 
(START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
-                    recoverRolledFiles(context, session);
-                } else if 
(START_CURRENT_FILE.getValue().equals(recoverPosition)) {
-                    cleanup();
-                    state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 0L, 
null, state.getBuffer());
-                } else {
-                    final String filename = 
context.getProperty(FILENAME).getValue();
-                    final File file = new File(filename);
-
-                    try {
-                        final FileChannel channel = 
FileChannel.open(file.toPath(), StandardOpenOption.READ);
-                        getLogger().debug("Created FileChannel {} for {}", new 
Object[] {channel, file});
 
-                        final Checksum checksum = new CRC32();
-                        final long position = file.length();
-                        final long timestamp = file.lastModified();
-
-                        try (final InputStream fis = new FileInputStream(file);
-                            final CheckedInputStream in = new 
CheckedInputStream(fis, checksum)) {
-                            StreamUtils.copy(in, new NullOutputStream(), 
position);
-                        }
-
-                        channel.position(position);
-                        cleanup();
-                        state = new TailFileState(filename, file, channel, 
position, timestamp, checksum, state.getBuffer());
-                    } catch (final IOException ioe) {
-                        getLogger().error("Attempted to position Reader at 
current position in file {} but failed to do so due to {}", new Object[] {file, 
ioe.toString()}, ioe);
-                        context.yield();
-                        return;
-                    }
-                }
-
-                tailFileChanged = false;
-            } else {
-                recoverRolledFiles(context, session);
+            tailFileChanged = false;
+        } else {
+            // Recover any data that may have rolled over since the last time 
that this processor ran.
+            // If expectedRecoveryChecksum != null, that indicates that this 
is the first iteration since processor was started, so use whatever checksum 
value
+            // was present when the state was last persisted. In this case, we 
must then null out the value so that the next iteration won't keep using the 
"recovered"
+            // value. If the value is null, then we know that either the 
processor has already recovered that data, or there was no state persisted. In 
either case,
+            // use whatever checksum value is currently in the state.
+            Long expectedChecksumValue = expectedRecoveryChecksum;
+            if (expectedChecksumValue == null) {
+                expectedChecksumValue = state.getChecksum() == null ? null : 
state.getChecksum().getValue();
             }
 
-            recoveredRolledFiles = true;
+            rolloverOccurred = recoverRolledFiles(context, session, 
expectedChecksumValue, state.getTimestamp(), state.getPosition());
+            expectedRecoveryChecksum = null;
         }
 
         // initialize local variables from state object; this is done so that 
we can easily change the values throughout
@@ -479,57 +337,11 @@ public class TailFile extends AbstractProcessor {
         final long startNanos = System.nanoTime();
 
         // Check if file has rotated
-        long fileLength = file.length();
-        final long lastModified = file.lastModified();
-        if (fileLength < position) {
-            // File has rotated. It's possible that it rotated before we 
finished reading all of the data. As a result, we need
-            // to check the last rolled-over file and see if it is longer than 
our position. If so, consume the data past our
-            // marked position.
-            try {
-                final List<File> updatedRolledOverFiles = 
getRolledOffFiles(context, timestamp);
-                getLogger().debug("Tailed file has rotated. Total number of 
rolled off files to check for un-consumed modifications: {}", new Object[] 
{updatedRolledOverFiles.size()});
-
-                if (!updatedRolledOverFiles.isEmpty()) {
-                    final File lastRolledOver = 
updatedRolledOverFiles.get(updatedRolledOverFiles.size() - 1);
-
-                    // there is more data in the file that has not yet been 
consumed.
-                    if (lastRolledOver.length() > state.getPosition()) {
-                        getLogger().debug("Last rolled over file {} is larger 
than our last position; will consume data from it after offset {}", new 
Object[] {lastRolledOver, state.getPosition()});
-
-                        try (final FileInputStream fis = new 
FileInputStream(lastRolledOver)) {
-                            StreamUtils.skip(fis, state.getPosition());
-
-                            FlowFile flowFile = session.create();
-                            flowFile = session.importFrom(fis, flowFile);
-                            if (flowFile.getSize() == 0) {
-                                session.remove(flowFile);
-                            } else {
-                                flowFile = session.putAttribute(flowFile, 
"filename", lastRolledOver.getName());
-
-                                
session.getProvenanceReporter().receive(flowFile, 
lastRolledOver.toURI().toString(), "FlowFile contains bytes " + 
state.getPosition() + " through " +
-                                    lastRolledOver.length() + " of source 
file", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
-                                session.transfer(flowFile, REL_SUCCESS);
-                                this.state = state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 
lastRolledOver.lastModified() + 1L, null, state.getBuffer());
-
-                                // must ensure that we do session.commit() 
before persisting state in order to avoid data loss.
-                                session.commit();
-                                persistState(state, 
context.getProperty(STATE_FILE).getValue());
-                            }
-                        }
-                    } else {
-                        getLogger().debug("Last rolled over file {} is not 
larger than our last position; will not consume data from it", new Object[] 
{lastRolledOver});
-                    }
-                }
-            } catch (final IOException ioe) {
-                getLogger().error("File being tailed was rolled over. However, 
was unable to determine which \"Rollover Files\" exist or read the last one due 
to {}. "
-                    + "It is possible that data at the end of the last file 
will be skipped as a result.", new Object[] {ioe});
-            }
-
-
+        if (rolloverOccurred) {
             // Since file has rotated, we close the reader, create a new one, 
and then reset our state.
             try {
                 reader.close();
-                getLogger().debug("Closed RandomAccessFile {}", new Object[] 
{reader, reader});
+                getLogger().debug("Closed FileChannel {}", new Object[] 
{reader, reader});
             } catch (final IOException ioe) {
                 getLogger().warn("Failed to close reader for {} due to {}", 
new Object[] {file, ioe});
             }
@@ -537,104 +349,85 @@ public class TailFile extends AbstractProcessor {
             reader = createReader(file, 0L);
             position = 0L;
             checksum.reset();
-            fileLength = file.length();
         }
 
-        // check if there is any data to consume by checking if file has grown 
or last modified timestamp has changed.
-        boolean consumeData = false;
-        if (fileLength > position) {
-            consumeData = true;
-        } else if (lastModified > timestamp) {
-            // This can happen if file is truncated, or is replaced with the 
same amount of data as the old file.
-            position = 0;
-
-            try {
-                reader.position(0L);
-            } catch (final IOException ioe) {
-                getLogger().error("Failed to seek to beginning of file due to 
{}", new Object[] {ioe});
-                context.yield();
-                return;
-            }
-
-            consumeData = true;
+        if (file.length() == position) {
+            // no data to consume so rather than continually running, yield to 
allow other processors to use the thread.
+            // In this case, the state should not have changed, and we will 
have created no FlowFiles, so we don't have to
+            // persist the state or commit the session; instead, just return 
here.
+            getLogger().debug("No data to consume; created no FlowFiles");
+            state = this.state = new 
TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, 
timestamp, checksum, state.getBuffer());
+            persistState(state, context);
+            context.yield();
+            return;
         }
 
         // If there is data to consume, read as much as we can.
         final TailFileState currentState = state;
         final Checksum chksum = checksum;
-        if (consumeData) {
-            // data has been written to file. Stream it to a new FlowFile.
-            FlowFile flowFile = session.create();
-
-            final FileChannel fileReader = reader;
-            final LongHolder positionHolder = new LongHolder(position);
-            flowFile = session.write(flowFile, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream rawOut) throws 
IOException {
-                    try (final OutputStream out = new 
BufferedOutputStream(rawOut)) {
-                        positionHolder.set(readLines(fileReader, 
currentState.getBuffer(), out, chksum));
-                    }
-                }
-            });
+        // data has been written to file. Stream it to a new FlowFile.
+        FlowFile flowFile = session.create();
 
-            // If there ended up being no data, just remove the FlowFile
-            if (flowFile.getSize() == 0) {
-                session.remove(flowFile);
-                getLogger().debug("No data to consume; removed created 
FlowFile");
-            } else {
-                // determine filename for FlowFile by using <base filename of 
log file>.<initial offset>-<final offset>.<extension>
-                final String tailFilename = file.getName();
-                final String baseName = 
StringUtils.substringBeforeLast(tailFilename, ".");
-                final String flowFileName;
-                if (baseName.length() < tailFilename.length()) {
-                    flowFileName = baseName + "." + position + "-" + 
positionHolder.get() + "." + StringUtils.substringAfterLast(tailFilename, ".");
-                } else {
-                    flowFileName = baseName + "." + position + "-" + 
positionHolder.get();
+        final FileChannel fileReader = reader;
+        final LongHolder positionHolder = new LongHolder(position);
+        flowFile = session.write(flowFile, new OutputStreamCallback() {
+            @Override
+            public void process(final OutputStream rawOut) throws IOException {
+                try (final OutputStream out = new 
BufferedOutputStream(rawOut)) {
+                    positionHolder.set(readLines(fileReader, 
currentState.getBuffer(), out, chksum));
                 }
+            }
+        });
 
-                final Map<String, String> attributes = new HashMap<>(2);
-                attributes.put(CoreAttributes.FILENAME.key(), flowFileName);
-                attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
-                flowFile = session.putAllAttributes(flowFile, attributes);
-
-                session.getProvenanceReporter().receive(flowFile, 
file.toURI().toString(), "FlowFile contains bytes " + position + " through " + 
positionHolder.get() + " of source file",
-                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNanos));
-                session.transfer(flowFile, REL_SUCCESS);
-                position = positionHolder.get();
-                timestamp = lastModified;
-                getLogger().debug("Created {} and routed to success", new 
Object[] {flowFile});
+        // If there ended up being no data, just remove the FlowFile
+        if (flowFile.getSize() == 0) {
+            session.remove(flowFile);
+            getLogger().debug("No data to consume; removed created FlowFile");
+        } else {
+            // determine filename for FlowFile by using <base filename of log 
file>.<initial offset>-<final offset>.<extension>
+            final String tailFilename = file.getName();
+            final String baseName = 
StringUtils.substringBeforeLast(tailFilename, ".");
+            final String flowFileName;
+            if (baseName.length() < tailFilename.length()) {
+                flowFileName = baseName + "." + position + "-" + 
positionHolder.get() + "." + StringUtils.substringAfterLast(tailFilename, ".");
+            } else {
+                flowFileName = baseName + "." + position + "-" + 
positionHolder.get();
             }
+
+            final Map<String, String> attributes = new HashMap<>(2);
+            attributes.put(CoreAttributes.FILENAME.key(), flowFileName);
+            attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            session.getProvenanceReporter().receive(flowFile, 
file.toURI().toString(), "FlowFile contains bytes " + position + " through " + 
positionHolder.get() + " of source file",
+                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
+            session.transfer(flowFile, REL_SUCCESS);
+            position = positionHolder.get();
+
+            // Set timestamp to the latest of when the file was modified and 
the current timestamp stored in the state.
+            // We do this because when we read a file that has been rolled 
over, we set the state to 1 millisecond later than the last mod date
+            // in order to avoid ingesting that file again. If we then read 
from this file during the same second (or millisecond, depending on the
+            // operating system file last mod precision), then we could set 
the timestamp to a smaller value, which could result in reading in the
+            // rotated file a second time.
+            timestamp = Math.max(state.getTimestamp(), file.lastModified());
+            getLogger().debug("Created {} and routed to success", new Object[] 
{flowFile});
         }
 
         // Create a new state object to represent our current position, 
timestamp, etc.
         final TailFileState updatedState = new 
TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, 
timestamp, checksum, state.getBuffer());
         this.state = updatedState;
 
-        if (!consumeData) {
-            // no data to consume so rather than continually running, yield to 
allow other processors to use the thread.
-            // In this case, the state should not have changed, and we will 
have created no FlowFiles, so we don't have to
-            // persist the state or commit the session; instead, just return 
here.
-            getLogger().debug("No data to consume; created no FlowFiles");
-            context.yield();
-            return;
-        }
-
         // We must commit session before persisting state in order to avoid 
data loss on restart
         session.commit();
-        final String stateFilename = 
context.getProperty(STATE_FILE).getValue();
-        try {
-            persistState(updatedState, stateFilename);
-        } catch (final IOException e) {
-            getLogger().warn("Failed to update state file {} due to {}; some 
data may be duplicated on restart of NiFi", new Object[] {stateFilename, e});
-        }
+        persistState(updatedState, context);
     }
 
 
     /**
-     * Read new lines from the given RandomAccessFile, copying it to the given 
Output Stream. The Checksum is used in order to later determine whether or not
+     * Read new lines from the given FileChannel, copying it to the given 
Output Stream. The Checksum is used in order to later determine whether or not
      * data has been consumed.
      *
-     * @param reader The RandomAccessFile to read data from
+     * @param reader The FileChannel to read data from
      * @param buffer the buffer to use for copying data
      * @param out the OutputStream to copy the data to
      * @param checksum the Checksum object to use in order to calculate 
checksum for recovery purposes
@@ -643,6 +436,8 @@ public class TailFile extends AbstractProcessor {
      * @throws java.io.IOException if an I/O error occurs.
      */
     private long readLines(final FileChannel reader, final ByteBuffer buffer, 
final OutputStream out, final Checksum checksum) throws IOException {
+        getLogger().debug("Reading lines starting at position {}", new 
Object[] {reader.position()});
+
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
             long pos = reader.position();
             long rePos = pos; // position to re-read
@@ -651,11 +446,12 @@ public class TailFile extends AbstractProcessor {
             int linesRead = 0;
             boolean seenCR = false;
             buffer.clear();
+
             while (((num = reader.read(buffer)) != -1)) {
                 buffer.flip();
 
                 for (int i = 0; i < num; i++) {
-                    byte ch = buffer.get();
+                    byte ch = buffer.get(i);
 
                     switch (ch) {
                         case '\n':
@@ -697,9 +493,11 @@ public class TailFile extends AbstractProcessor {
                 pos = reader.position();
             }
 
-            getLogger().debug("Read {} lines; repositioning reader from {} to 
{}", new Object[] {linesRead, pos, rePos});
-            reader.position(rePos);
-            buffer.clear();
+            if (rePos < reader.position()) {
+                getLogger().debug("Read {} lines; repositioning reader from {} 
to {}", new Object[] {linesRead, pos, rePos});
+                reader.position(rePos); // Ensure we can re-read if necessary
+            }
+
             return rePos;
         }
     }
@@ -773,25 +571,239 @@ public class TailFile extends AbstractProcessor {
     }
 
 
+
+    private void persistState(final TailFileState state, final ProcessContext 
context) {
+        final String stateFilename = 
context.getProperty(STATE_FILE).getValue();
+        try {
+            persistState(state, stateFilename);
+        } catch (final IOException e) {
+            getLogger().warn("Failed to update state file {} due to {}; some 
data may be duplicated on restart of NiFi", new Object[] {stateFilename, e});
+        }
+    }
+
+    private void persistState(final TailFileState state, final String 
stateFilename) throws IOException {
+        getLogger().debug("Persisting state {} to {}", new Object[] {state, 
stateFilename});
+
+        final File stateFile = new File(stateFilename);
+        File directory = stateFile.getParentFile();
+        if (directory != null && !directory.exists() && !directory.mkdirs()) {
+            getLogger().warn("Failed to persist state to {} because the parent 
directory does not exist and could not be created. This may result in data 
being duplicated upon restart of NiFi");
+            return;
+        }
+        try (final FileOutputStream fos = new FileOutputStream(stateFile);
+            final DataOutputStream dos = new DataOutputStream(fos)) {
+
+            dos.writeInt(0); // version
+            dos.writeUTF(state.getFilename());
+            dos.writeLong(state.getPosition());
+            dos.writeLong(state.getTimestamp());
+            if (state.getChecksum() == null) {
+                dos.writeBoolean(false);
+            } else {
+                dos.writeBoolean(true);
+                dos.writeLong(state.getChecksum().getValue());
+            }
+        }
+    }
+
+    private FileChannel createReader(final File file, final long position) {
+        final FileChannel reader;
+
+        try {
+            reader = FileChannel.open(file.toPath(), StandardOpenOption.READ);
+        } catch (final IOException ioe) {
+            getLogger().warn("Unable to open file {}; will attempt to access 
file again after the configured Yield Duration has elapsed: {}", new Object[] 
{file, ioe});
+            return null;
+        }
+
+        getLogger().debug("Created FileChannel {} for {}", new Object[] 
{reader, file});
+
+        try {
+            reader.position(position);
+        } catch (final IOException ioe) {
+            getLogger().error("Failed to read from {} due to {}", new Object[] 
{file, ioe});
+
+            try {
+                reader.close();
+                getLogger().debug("Closed FileChannel {}", new Object[] 
{reader});
+            } catch (final IOException ioe2) {
+            }
+
+            return null;
+        }
+
+        return reader;
+    }
+
+    // for testing purposes
+    TailFileState getState() {
+        return state;
+    }
+
+
+    /**
+     * Finds any files that have rolled over and have not yet been ingested by 
this Processor. Each of these files that is found will be
+     * ingested as its own FlowFile. If a file is found that has been 
partially ingested, the rest of the file will be ingested as a
+     * single FlowFile but the data that already has been ingested will not be 
ingested again.
+     *
+     * @param context the ProcessContext to use in order to obtain Processor 
configuration.
+     * @param session the ProcessSession to use in order to interact with 
FlowFile creation and content.
+     * @param expectedChecksum the checksum value that is expected for the 
oldest file from offset 0 through &lt;position&gt;.
+     * @param timestamp the latest Last Modified Timestamp that has been 
consumed. Any data that was written before this data will not be ingested.
+     * @param position the byte offset in the file being tailed, where tailing 
last left off.
+     *
+     * @return <code>true</code> if the file being tailed has rolled over, 
<code>false</code> otherwise
+     */
+    private boolean recoverRolledFiles(final ProcessContext context, final 
ProcessSession session, final Long expectedChecksum, final long timestamp, 
final long position) {
+        try {
+            // Find all files that match our rollover pattern, if any, and 
order them based on their timestamp and filename.
+            // Ignore any file that has a timestamp earlier than the state 
that we have persisted. If we were reading from
+            // a file when we stopped running, then that file that we were 
reading from should be the first file in this list,
+            // assuming that the file still exists on the file system.
+            final List<File> rolledOffFiles = getRolledOffFiles(context, 
timestamp);
+            return recoverRolledFiles(context, session, rolledOffFiles, 
expectedChecksum, timestamp, position);
+        } catch (final IOException e) {
+            getLogger().error("Failed to recover files that have rolled over 
due to {}", new Object[] {e});
+            return false;
+        }
+    }
+
+    /**
+     * Finds any files that have rolled over and have not yet been ingested by 
this Processor. Each of these files that is found will be
+     * ingested as its own FlowFile. If a file is found that has been 
partially ingested, the rest of the file will be ingested as a
+     * single FlowFile but the data that already has been ingested will not be 
ingested again.
+     *
+     * @param context the ProcessContext to use in order to obtain Processor 
configuration.
+     * @param session the ProcessSession to use in order to interact with 
FlowFile creation and content.
+     * @param expectedChecksum the checksum value that is expected for the 
oldest file from offset 0 through &lt;position&gt;.
+     * @param timestamp the latest Last Modfiied Timestamp that has been 
consumed. Any data that was written before this data will not be ingested.
+     * @param position the byte offset in the file being tailed, where tailing 
last left off.
+     *
+     * @return <code>true</code> if the file being tailed has rolled over, 
false otherwise
+     */
+    private boolean recoverRolledFiles(final ProcessContext context, final 
ProcessSession session, final List<File> rolledOffFiles, final Long 
expectedChecksum,
+        final long timestamp, final long position) {
+        try {
+            getLogger().debug("Recovering Rolled Off Files; total number of 
files rolled off = {}", new Object[] {rolledOffFiles.size()});
+
+            // For first file that we find, it may or may not be the file that 
we were last reading from.
+            // As a result, we have to read up to the position we stored, 
while calculating the checksum. If the checksums match,
+            // then we know we've already processed this file. If the 
checksums do not match, then we have not
+            // processed this file and we need to seek back to position 0 and 
ingest the entire file.
+            // For all other files that have been rolled over, we need to just 
ingest the entire file.
+            boolean rolloverOccurred = !rolledOffFiles.isEmpty();
+            if (rolloverOccurred && expectedChecksum != null && 
rolledOffFiles.get(0).length() >= position) {
+                final File firstFile = rolledOffFiles.get(0);
+
+                final long startNanos = System.nanoTime();
+                if (position > 0) {
+                    try (final InputStream fis = new 
FileInputStream(firstFile);
+                        final CheckedInputStream in = new 
CheckedInputStream(fis, new CRC32())) {
+                        StreamUtils.copy(in, new NullOutputStream(), position);
+
+                        final long checksumResult = 
in.getChecksum().getValue();
+                        if (checksumResult == expectedChecksum) {
+                            getLogger().debug("Checksum for {} matched 
expected checksum. Will skip first {} bytes", new Object[] {firstFile, 
position});
+
+                            // This is the same file that we were reading when 
we shutdown. Start reading from this point on.
+                            rolledOffFiles.remove(0);
+                            FlowFile flowFile = session.create();
+                            flowFile = session.importFrom(in, flowFile);
+                            if (flowFile.getSize() == 0L) {
+                                session.remove(flowFile);
+                                // use a timestamp of lastModified() + 1 so 
that we do not ingest this file again.
+                                cleanup();
+                                state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 
firstFile.lastModified() + 1L, null, state.getBuffer());
+                            } else {
+                                flowFile = session.putAttribute(flowFile, 
"filename", firstFile.getName());
+
+                                
session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), 
"FlowFile contains bytes 0 through " + position + " of source file",
+                                    
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
+                                session.transfer(flowFile, REL_SUCCESS);
+                                getLogger().debug("Created {} from rolled over 
file {} and routed to success", new Object[] {flowFile, firstFile});
+
+                                // use a timestamp of lastModified() + 1 so 
that we do not ingest this file again.
+                                cleanup();
+                                state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 
firstFile.lastModified() + 1L, null, state.getBuffer());
+
+                                // must ensure that we do session.commit() 
before persisting state in order to avoid data loss.
+                                session.commit();
+                                persistState(state, 
context.getProperty(STATE_FILE).getValue());
+                            }
+                        } else {
+                            getLogger().debug("Checksum for {} did not match 
expected checksum. Checksum for file was {} but expected {}. Will consume 
entire file",
+                                new Object[] {firstFile, checksumResult, 
expectedChecksum});
+                        }
+                    }
+                }
+            }
+
+            // For each file that we found that matches our Rollover Pattern, 
and has a last modified date later than the timestamp
+            // that we recovered from the state file, we need to consume the 
entire file. The only exception to this is the file that
+            // we were reading when we last stopped, as it may already have 
been partially consumed. That is taken care of in the
+            // above block of code.
+            for (final File file : rolledOffFiles) {
+                state = consumeFileFully(file, context, session, state);
+            }
+
+            return rolloverOccurred;
+        } catch (final IOException e) {
+            getLogger().error("Failed to recover files that have rolled over 
due to {}", new Object[] {e});
+            return false;
+        }
+    }
+
+
     /**
-     * A simple Java class to hold information about our state so that we can 
maintain this state across multiple invocations of the Processor.
+     * Creates a new FlowFile that contains the entire contents of the given 
file and transfers that FlowFile to success. This method
+     * will commit the given session and emit an appropriate Provenance Event.
+     *
+     * @param file the file to ingest
+     * @param context the ProcessContext
+     * @param session the ProcessSession
+     * @param state the current state
      *
-     * We use a FileChannel to read from the file, rather than a 
BufferedInputStream, etc. because we want to be able to read any amount of data
-     * and then reposition the reader if we need to, as a result of a line not 
being terminated (i.e., no new-line).
+     * @return the new, updated state that reflects that the given file has 
been ingested.
+     */
+    private TailFileState consumeFileFully(final File file, final 
ProcessContext context, final ProcessSession session, TailFileState state) {
+        FlowFile flowFile = session.create();
+        flowFile = session.importFrom(file.toPath(), true, flowFile);
+        if (flowFile.getSize() == 0L) {
+            session.remove(flowFile);
+        } else {
+            flowFile = session.putAttribute(flowFile, "filename", 
file.getName());
+            session.getProvenanceReporter().receive(flowFile, 
file.toURI().toString());
+            session.transfer(flowFile, REL_SUCCESS);
+            getLogger().debug("Created {} from {} and routed to success", new 
Object[] {flowFile, file});
+
+            // use a timestamp of lastModified() + 1 so that we do not ingest 
this file again.
+            cleanup();
+            state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 
file.lastModified() + 1L, null, state.getBuffer());
+
+            // must ensure that we do session.commit() before persisting state 
in order to avoid data loss.
+            session.commit();
+            persistState(state, context);
+        }
+
+        return state;
+    }
+
+    /**
+     * A simple Java class to hold information about our state so that we can 
maintain this state across multiple invocations of the Processor
      */
     static class TailFileState {
         private final String filename; // hold onto filename and not just File 
because we want to match that against the user-defined filename to recover from
         private final File file;
-        private final FileChannel fileChannel;
+        private final FileChannel reader;
         private final long position;
         private final long timestamp;
         private final Checksum checksum;
         private final ByteBuffer buffer;
 
-        public TailFileState(final String filename, final File file, final 
FileChannel fileChannel, final long position, final long timestamp, final 
Checksum checksum, final ByteBuffer buffer) {
+        public TailFileState(final String filename, final File file, final 
FileChannel reader, final long position, final long timestamp, final Checksum 
checksum, final ByteBuffer buffer) {
             this.filename = filename;
             this.file = file;
-            this.fileChannel = fileChannel;
+            this.reader = reader;
             this.position = position;
             this.timestamp = timestamp; // many operating systems will use 
only second-level precision for last-modified times so cut off milliseconds
             this.checksum = checksum;
@@ -807,7 +819,7 @@ public class TailFile extends AbstractProcessor {
         }
 
         public FileChannel getReader() {
-            return fileChannel;
+            return reader;
         }
 
         public long getPosition() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/2516b1da/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index 1445d88..e3d74c4 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -95,12 +95,14 @@ public class TestTailFile {
         runner.run();
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
+        System.out.println("Ingested 6 bytes");
         runner.clearTransferState();
 
         // roll over the file
         raf.close();
         file.renameTo(new File(file.getParentFile(), file.getName() + 
".previous"));
         raf = new RandomAccessFile(file, "rw");
+        System.out.println("Rolled over file to " + file.getName() + 
".previous");
 
         // truncate file
         raf.setLength(0L);
@@ -111,6 +113,7 @@ public class TestTailFile {
         Thread.sleep(1000L); // we need to wait at least one second because of 
the granularity of timestamps on many file systems.
         raf.write("HELLO\n".getBytes());
 
+        System.out.println("Wrote out 6 bytes to tailed file");
         runner.run();
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("HELLO\n");
@@ -119,6 +122,7 @@ public class TestTailFile {
     @Test
     public void testConsumeAfterTruncationStartAtCurrentTime() throws 
IOException, InterruptedException {
         runner.setProperty(TailFile.START_POSITION, 
TailFile.START_CURRENT_TIME.getValue());
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");
         runner.run();
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
 
@@ -129,7 +133,10 @@ public class TestTailFile {
         runner.clearTransferState();
 
         // truncate and then write same number of bytes
-        raf.setLength(0L);
+        raf.close();
+        assertTrue(file.renameTo(new File("target/log.txt.1")));
+        raf = new RandomAccessFile(file, "rw");
+
         runner.run();
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
 
@@ -157,7 +164,7 @@ public class TestTailFile {
         runner.setProperty(TailFile.START_POSITION, 
TailFile.START_CURRENT_TIME.getValue());
 
         raf.write("hello world\n".getBytes());
-        Thread.sleep(1000);
+        Thread.sleep(1000L);
         runner.run(100);
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
     }
@@ -225,7 +232,7 @@ public class TestTailFile {
     public void testRemainderOfFileRecoveredIfRolledOverWhileRunning() throws 
IOException {
         // this mimics the case when we are reading a log file that rolls over 
while processor is running.
         runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log*.txt");
-        runner.run(1, false, false);
+        runner.run(1, false, true);
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
 
         raf.write("hello\n".getBytes());
@@ -236,13 +243,11 @@ public class TestTailFile {
 
         raf.write("world".getBytes());
         raf.close();
-
-        processor.cleanup(); // Need to do this for Windows because otherwise 
we cannot rename the file because we have the file open still in the same 
process.
-        assertTrue(file.renameTo(new File("target/log1.txt")));
+        file.renameTo(new File("target/log1.txt"));
 
         raf = new RandomAccessFile(new File("target/log.txt"), "rw");
         raf.write("1\n".getBytes());
-        runner.run(1, false, false);
+        runner.run(1, true, false);
 
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
@@ -258,7 +263,7 @@ public class TestTailFile {
 
         // this mimics the case when we are reading a log file that rolls over 
while processor is running.
         runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
-        runner.run(1, false, false);
+        runner.run(1, false, true);
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
 
         raf.write("hello\n".getBytes());
@@ -279,7 +284,7 @@ public class TestTailFile {
 
         raf = new RandomAccessFile(new File("target/log.txt"), "rw");
         raf.write("1\n".getBytes());
-        runner.run(1, false, false);
+        runner.run(1);
 
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
@@ -291,7 +296,7 @@ public class TestTailFile {
     public void testMultipleRolloversAfterHavingReadAllData() throws 
IOException, InterruptedException {
         // this mimics the case when we are reading a log file that rolls over 
while processor is running.
         runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
-        runner.run(1, false, false);
+        runner.run(1, false, true);
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
 
         raf.write("hello\n".getBytes());
@@ -312,8 +317,52 @@ public class TestTailFile {
         // write to a new file.
         file = new File("target/log.txt");
         raf = new RandomAccessFile(file, "rw");
+        raf.write("abc\n".getBytes());
+
+        // rename file to log.1
+        raf.close();
+        file.renameTo(new File("target/log.1"));
+
+        // write to a new file.
+        file = new File("target/log.txt");
+        raf = new RandomAccessFile(file, "rw");
+        raf.write("1\n".getBytes());
+        raf.close();
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 3);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("abc\n");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("1\n");
+    }
+
+
+    @Test
+    public void testMultipleRolloversAfterHavingReadAllDataWhileStillRunning() 
throws IOException, InterruptedException {
+        // this mimics the case when we are reading a log file that rolls over 
while processor is running.
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+        runner.run(1, false, true);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("hello\n".getBytes());
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
+        runner.clearTransferState();
+
+        raf.write("world".getBytes());
+        runner.run(1, false, false); // ensure that we've read 'world' but not 
consumed it into a flowfile.
 
         Thread.sleep(1000L);
+
+        // rename file to log.2
+        raf.close();
+        file.renameTo(new File("target/log.2"));
+
+        // write to a new file.
+        file = new File("target/log.txt");
+        raf = new RandomAccessFile(file, "rw");
         raf.write("abc\n".getBytes());
 
         // rename file to log.1
@@ -326,7 +375,7 @@ public class TestTailFile {
         raf.write("1\n".getBytes());
         raf.close();
 
-        runner.run(1);
+        runner.run(1, true, false); // perform shutdown but do not perform 
initialization because last iteration didn't shutdown.
 
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 3);
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
@@ -336,10 +385,58 @@ public class TestTailFile {
 
 
     @Test
+    public void testMultipleRolloversWithLongerFileLength() throws 
IOException, InterruptedException {
+        // this mimics the case when we are reading a log file that rolls over 
while processor is running.
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+        runner.run(1, false, true);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("hello\n".getBytes());
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
+        runner.clearTransferState();
+
+        raf.write("world".getBytes());
+
+        // rename file to log.2
+        raf.close();
+        file.renameTo(new File("target/log.2"));
+
+        Thread.sleep(1200L);
+
+        // write to a new file.
+        file = new File("target/log.txt");
+        raf = new RandomAccessFile(file, "rw");
+        raf.write("abc\n".getBytes());
+
+        // rename file to log.1
+        raf.close();
+        file.renameTo(new File("target/log.1"));
+        Thread.sleep(1200L);
+
+        // write to a new file.
+        file = new File("target/log.txt");
+        raf = new RandomAccessFile(file, "rw");
+        raf.write("This is a longer line than the other files 
had.\n".getBytes());
+        raf.close();
+
+        runner.run(1, true, false); // perform shutdown but do not perform 
initialization because last iteration didn't shutdown.
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 3);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("abc\n");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("This
 is a longer line than the other files had.\n");
+    }
+
+
+    @Test
     public void testConsumeWhenNewLineFound() throws IOException, 
InterruptedException {
         runner.run();
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
 
+        Thread.sleep(1100L);
+
         raf.write("Hello, World".getBytes());
         runner.run();
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
@@ -377,7 +474,39 @@ public class TestTailFile {
         runner.run();
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("return\r\r\n");
+    }
+
+    @Test
+    public void testRolloverAndUpdateAtSameTime() throws IOException {
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
 
+        // write out some data and ingest it.
+        raf.write("hello there\n".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        // roll the file over and write data to the new log.txt file.
+        raf.write("another".getBytes());
+        raf.close();
+        file.renameTo(new File("target/log.1"));
+        raf = new RandomAccessFile(file, "rw");
+        raf.write("new file\n".getBytes());
+
+        // Run the processor. We should get 2 files because we should get the 
rest of what was
+        // written to log.txt before it rolled, and then we should get some 
data from the new log.txt.
+        runner.run(1, false, true);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("another");
+
+        // If we run again, we should get nothing.
+        // We did have an issue where we were recognizing the previously 
rolled over file again because the timestamps
+        // were still the same (second-level precision on many file systems). 
As a result, we verified the checksum of the
+        // already-rolled file against the checksum of the new file and they 
didn't match, so we ingested the entire rolled
+        // file as well as the new file again. Instead, we should ingest 
nothing!
+        runner.clearTransferState();
+        runner.run(1, true, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
     }
 
 }

Reply via email to