Repository: nifi
Updated Branches:
  refs/heads/master 854d20398 -> 7a165b62c


NIFI-994: Initial import of TailFile


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/31f0909b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/31f0909b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/31f0909b

Branch: refs/heads/master
Commit: 31f0909bd315af43936b844327454ba2c48611e4
Parents: 49ee06b
Author: Mark Payne <[email protected]>
Authored: Sun Oct 18 15:16:56 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Sun Oct 18 19:00:24 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/TailFile.java      | 661 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../nifi/processors/standard/TestTailFile.java  | 205 ++++++
 .../src/test/resources/logback-test.xml         |  18 +
 4 files changed, 885 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/31f0909b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
new file mode 100644
index 0000000..2fe9431
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.Checksum;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.NullOutputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.LongHolder;
+
+// note: it is important that this Processor is not marked as 
@SupportsBatching because the session commits must complete before persisting 
state locally; otherwise, data loss may occur
+@TriggerSerially
+@Tags({"tail", "file", "log", "text", "source"})
+@CapabilityDescription("\"Tails\" a file, ingesting data from the file as it 
is written to the file. The file is expected to be textual. Data is ingested 
only when a "
+    + "new line is encountered (carriage return or new-line character or 
combination). If the file to tail is periodically \"rolled over\", as is 
generally the case "
+    + "with log files, an optional Rolling Filename Pattern can be used to 
retrieve data from files that have rolled over, even if the rollover occurred 
while NiFi "
+    + "was not running (provided that the data still exists upon restart of 
NiFi).")
+public class TailFile extends AbstractProcessor {
+
+    static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
+        .name("File to Tail")
+        .description("Fully-qualified filename of the file that should be 
tailed")
+        .expressionLanguageSupported(false)
+        .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+        .required(true)
+        .build();
+    static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new 
PropertyDescriptor.Builder()
+        .name("Rolling Filename Pattern")
+        .description("If the file to tail \"rolls over\" as would be the case 
with log files, this filename pattern will be used to "
+            + "identify files that have rolled over so that if NiFi is 
restarted, and the file has rolled over, it will be able to pick up where it 
left off. "
+            + "This pattern supports wildcard characters * and ? and will 
assume that the files that have rolled over live in the same directory as the 
file being tailed.")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(false)
+        .build();
+    static final PropertyDescriptor STATE_FILE = new 
PropertyDescriptor.Builder()
+        .name("State File")
+        .description("Specifies the file that should be used for storing state 
about what data has been ingested so that upon restart NiFi can resume from 
where it left off")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .required(true)
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("All FlowFiles are routed to this Relationship.")
+        .build();
+
+    private volatile TailFileState state = new TailFileState(null, null, null, 
0L, 0L, null, new byte[65536]);
+    private volatile boolean recoveredRolledFiles = false;
+    private volatile Long expectedRecoveryChecksum;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(FILENAME);
+        properties.add(ROLLING_FILENAME_PATTERN);
+        properties.add(new 
PropertyDescriptor.Builder().fromPropertyDescriptor(STATE_FILE).defaultValue("./conf/state/"
 + getIdentifier()).build());
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Collections.singleton(REL_SUCCESS);
+    }
+
+    @Override
+    public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
+        if (FILENAME.equals(descriptor)) {
+            state = new TailFileState(newValue, null, null, 0L, 0L, null, new 
byte[65536]);
+            recoveredRolledFiles = false;
+        } else if (ROLLING_FILENAME_PATTERN.equals(descriptor)) {
+            recoveredRolledFiles = false;
+        }
+    }
+
+    @OnScheduled
+    public void recoverState(final ProcessContext context) throws IOException {
+        recoveredRolledFiles = false;
+
+        final String tailFilename = context.getProperty(FILENAME).getValue();
+        final String stateFilename = 
context.getProperty(STATE_FILE).getValue();
+
+        final File stateFile = new File(stateFilename);
+        try (final FileInputStream fis = new FileInputStream(stateFile);
+            final DataInputStream dis = new DataInputStream(fis)) {
+
+            final int encodingVersion = dis.readInt();
+            if (encodingVersion > 0) {
+                throw new IOException("Unable to recover state because State 
File was encoded in a more recent version than Version 1");
+            }
+
+            if (encodingVersion == 0) {
+                final String filename = dis.readUTF();
+                final long position = dis.readLong();
+                final long timestamp = dis.readLong();
+                final boolean checksumPresent = dis.readBoolean();
+
+                RandomAccessFile reader = null;
+                File tailFile = null;
+
+                if (checksumPresent && tailFilename.equals(filename)) {
+                    expectedRecoveryChecksum = dis.readLong();
+
+                    // We have an expected checksum and the currently 
configured filename is the same as the state file.
+                    // We need to check if the existing file is the same as 
the one referred to in the state file based on
+                    // the checksum.
+                    final File existingTailFile = new File(filename);
+                    if (existingTailFile.length() >= position) {
+                        try (final InputStream tailFileIs = new 
FileInputStream(existingTailFile);
+                            final CheckedInputStream in = new 
CheckedInputStream(tailFileIs, new CRC32())) {
+                            StreamUtils.copy(in, new NullOutputStream(), 
state.getPosition());
+
+                            final long checksumResult = 
in.getChecksum().getValue();
+                            if (checksumResult == expectedRecoveryChecksum) {
+                                tailFile = existingTailFile;
+                                reader = new RandomAccessFile(tailFile, "r");
+                                reader.seek(position);
+                            }
+                        }
+                    }
+
+                    state = new TailFileState(tailFilename, tailFile, reader, 
position, timestamp, null, new byte[65536]);
+                } else {
+                    expectedRecoveryChecksum = null;
+                    // tailing a new file since the state file was written 
out. We will reset state.
+                    state = new TailFileState(tailFilename, null, null, 0L, 
0L, null, new byte[65536]);
+                }
+            } else {
+                // encoding Version == -1... no data in file. Just move on.
+            }
+        } catch (final FileNotFoundException fnfe) {
+        }
+    }
+
+
+    public void persistState(final TailFileState state, final String 
stateFilename) throws IOException {
+        final File stateFile = new File(stateFilename);
+        File directory = stateFile.getParentFile();
+        if (directory != null && !directory.exists() && !directory.mkdirs()) {
+            getLogger().warn("Failed to persist state to {} because the parent 
directory does not exist and could not be created. This may result in data 
being duplicated upon restart of NiFi");
+            return;
+        }
+        try (final FileOutputStream fos = new FileOutputStream(stateFile);
+            final DataOutputStream dos = new DataOutputStream(fos)) {
+
+            dos.writeInt(0); // version
+            dos.writeUTF(state.getFilename());
+            dos.writeLong(state.getPosition());
+            dos.writeLong(state.getTimestamp());
+            if (state.getChecksum() == null) {
+                dos.writeBoolean(false);
+            } else {
+                dos.writeBoolean(true);
+                dos.writeLong(state.getChecksum().getValue());
+            }
+        }
+    }
+
+    private RandomAccessFile createReader(final File file, final long 
position) {
+        final RandomAccessFile reader;
+
+        try {
+            reader = new RandomAccessFile(file, "r");
+        } catch (final FileNotFoundException fnfe) {
+            getLogger().debug("File {} does not exist; yielding and 
returning", new Object[] {file});
+            return null;
+        }
+
+        try {
+            reader.seek(position);
+        } catch (final IOException ioe) {
+            getLogger().error("Failed to read from {} due to {}", new Object[] 
{file, ioe});
+
+            try {
+                reader.close();
+            } catch (final IOException ioe2) {
+            }
+
+            return null;
+        }
+
+        return reader;
+    }
+
+    // for testing purposes
+    TailFileState getState() {
+        return state;
+    }
+
+
+
+    /**
+     * Finds any files that have rolled over and have not yet been ingested by 
this Processor. Each of these files that is found will be
+     * ingested as its own FlowFile. If a file is found that has been 
partially ingested, the rest of the file will be ingested as a
+     * single FlowFile but the data that already has been ingested will not be 
ingested again.
+     *
+     * @param context the ProcessContext to use in order to obtain Processor 
configuration
+     * @param session the ProcessSession to use in order to interact with 
FlowFile creation and content.
+     */
+    private void recoverRolledFiles(final ProcessContext context, final 
ProcessSession session) {
+        try {
+            // Find all files that match our rollover pattern, if any, and 
order them based on their timestamp and filename.
+            // Ignore any file that has a timestamp earlier than the state 
that we have persisted. If we were reading from
+            // a file when we stopped running, then that file that we were 
reading from should be the first file in this list,
+            // assuming that the file still exists on the file system.
+            final List<File> rolledOffFiles = getRolledOffFiles(context, 
state.getTimestamp());
+
+            // For first file that we find, it may or may not be the file that 
we were last reading from.
+            // As a result, we have to read up to the position we stored, 
while calculating the checksum. If the checksums match,
+            // then we know we've already processed this file. If the 
checksums do not match, then we have not
+            // processed this file and we need to seek back to position 0 and 
ingest the entire file.
+            // For all other files that have been rolled over, we need to just 
ingest the entire file.
+            if (!rolledOffFiles.isEmpty() && expectedRecoveryChecksum != null 
&& rolledOffFiles.get(0).length() >= state.getPosition()) {
+                final File firstFile = rolledOffFiles.get(0);
+
+                final long startNanos = System.nanoTime();
+                try (final InputStream fis = new FileInputStream(firstFile);
+                    final CheckedInputStream in = new CheckedInputStream(fis, 
new CRC32())) {
+                    StreamUtils.copy(in, new NullOutputStream(), 
state.getPosition());
+
+                    final long checksumResult = in.getChecksum().getValue();
+                    if (checksumResult == expectedRecoveryChecksum) {
+                        // This is the same file that we were reading when we 
shutdown. Start reading from this point on.
+                        rolledOffFiles.remove(0);
+                        FlowFile flowFile = session.create();
+                        flowFile = session.importFrom(in, flowFile);
+                        flowFile = session.putAttribute(flowFile, "filename", 
firstFile.getName());
+
+                        session.getProvenanceReporter().receive(flowFile, 
firstFile.toURI().toString(), "FlowFile contains bytes 0 through " + 
state.getPosition() + " of source file",
+                            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNanos));
+                        session.transfer(flowFile, REL_SUCCESS);
+
+                        // use a timestamp of lastModified() + 1 so that we do 
not ingest this file again.
+                        state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 
firstFile.lastModified() + 1L, null, state.getBuffer());
+
+                        // must ensure that we do session.commit() before 
persisting state in order to avoid data loss.
+                        session.commit();
+                        persistState(state, 
context.getProperty(STATE_FILE).getValue());
+                    }
+                }
+            }
+
+            // For each file that we found that matches our Rollover Pattern, 
and has a last modified date later than the timestamp
+            // that we recovered from the state file, we need to consume the 
entire file. The only exception to this is the file that
+            // we were reading when we last stopped, as it may already have 
been partially consumed. That is taken care of in the
+            // above block of code.
+            for (final File file : rolledOffFiles) {
+                FlowFile flowFile = session.create();
+                flowFile = session.importFrom(file.toPath(), true, flowFile);
+                flowFile = session.putAttribute(flowFile, "filename", 
file.getName());
+                session.getProvenanceReporter().receive(flowFile, 
file.toURI().toString());
+                session.transfer(flowFile, REL_SUCCESS);
+
+                // use a timestamp of lastModified() + 1 so that we do not 
ingest this file again.
+                state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 
file.lastModified() + 1L, null, state.getBuffer());
+
+                // must ensure that we do session.commit() before persisting 
state in order to avoid data loss.
+                session.commit();
+                persistState(state, 
context.getProperty(STATE_FILE).getValue());
+            }
+        } catch (final IOException e) {
+            getLogger().error("Failed to recover files that have rolled over 
due to {}", new Object[] {e});
+        }
+    }
+
+
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        // If this is the first time the processor has run since it was 
started, we need to check for any files that may have rolled over
+        // while the processor was stopped. If we find any, we need to import 
them into the flow.
+        if (!recoveredRolledFiles) {
+            recoverRolledFiles(context, session);
+            recoveredRolledFiles = true;
+        }
+
+        // initialize local variables from state object; this is done so that 
we can easily change the values throughout
+        // the onTrigger method and then create a new state object after we 
finish processing the files.
+        TailFileState state = this.state;
+        File file = state.getFile();
+        RandomAccessFile reader = state.getReader();
+        Checksum checksum = state.getChecksum();
+        if (checksum == null) {
+            checksum = new CRC32();
+        }
+        long position = state.getPosition();
+        long timestamp = state.getTimestamp();
+
+        // Create a reader if necessary.
+        if (file == null || reader == null) {
+            file = new File(context.getProperty(FILENAME).getValue());
+            reader = createReader(file, position);
+            if (reader == null) {
+                context.yield();
+                return;
+            }
+        }
+
+        final long startNanos = System.nanoTime();
+
+        // Check if file has rotated
+        long fileLength = file.length();
+        if (fileLength < position) {
+            // File has rotated. It's possible that it rotated before we 
finished reading all of the data. As a result, we need
+            // to check the last rolled-over file and see if it is longer than 
our position. If so, consume the data past our
+            // marked position.
+            try {
+                final List<File> updatedRolledOverFiles = 
getRolledOffFiles(context, timestamp);
+                if (!updatedRolledOverFiles.isEmpty()) {
+                    final File lastRolledOver = 
updatedRolledOverFiles.get(updatedRolledOverFiles.size() - 1);
+
+                    // there is more data in the file that has not yet been 
consumed.
+                    if (lastRolledOver.length() > state.getPosition()) {
+                        try (final FileInputStream fis = new 
FileInputStream(lastRolledOver)) {
+                            StreamUtils.skip(fis, state.getPosition());
+
+                            FlowFile flowFile = session.create();
+                            flowFile = session.importFrom(fis, flowFile);
+                            flowFile = session.putAttribute(flowFile, 
"filename", lastRolledOver.getName());
+
+                            session.getProvenanceReporter().receive(flowFile, 
lastRolledOver.toURI().toString(), "FlowFile contains bytes " + 
state.getPosition() + " through " +
+                                lastRolledOver.length() + " of source file", 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
+                            session.transfer(flowFile, REL_SUCCESS);
+                            this.state = state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 
lastRolledOver.lastModified() + 1L, null, state.getBuffer());
+
+                            // must ensure that we do session.commit() before 
persisting state in order to avoid data loss.
+                            session.commit();
+                            persistState(state, 
context.getProperty(STATE_FILE).getValue());
+                        }
+                    }
+                }
+            } catch (final IOException ioe) {
+                getLogger().error("File being tailed was rolled over. However, 
was unable to determine which \"Rollover Files\" exist or read the last one due 
to {}. "
+                    + "It is possible that data at the end of the last file 
will be skipped as a result.", new Object[] {ioe});
+            }
+
+
+            // Since file has rotated, we close the reader, create a new one, 
and then reset our state.
+            try {
+                reader.close();
+            } catch (final IOException ioe) {
+                getLogger().warn("Failed to close reader for {} due to {}", 
new Object[] {file, ioe});
+            }
+
+            reader = createReader(file, 0L);
+            position = 0L;
+            checksum.reset();
+            fileLength = file.length();
+        }
+
+        // check if there is any data to consume by checking if file has grown 
or last modified timestamp has changed.
+        boolean consumeData = false;
+        if (fileLength > position) {
+            consumeData = true;
+        } else if (file.lastModified() > timestamp) {
+            // This can happen if file is truncated, or is replaced with the 
same amount of data as the old file.
+            position = 0;
+
+            try {
+                reader.seek(0L);
+            } catch (final IOException ioe) {
+                getLogger().error("Failed to seek to beginning of file due to 
{}", new Object[] {ioe});
+                context.yield();
+                return;
+            }
+
+            consumeData = true;
+        }
+
+        // If there is data to consume, read as much as we can.
+        final TailFileState currentState = state;
+        final Checksum chksum = checksum;
+        if (consumeData) {
+            // data has been written to file. Stream it to a new FlowFile.
+            FlowFile flowFile = session.create();
+            final RandomAccessFile fileReader = reader;
+            final LongHolder positionHolder = new LongHolder(position);
+            flowFile = session.write(flowFile, new OutputStreamCallback() {
+                @Override
+                public void process(final OutputStream rawOut) throws 
IOException {
+                    try (final OutputStream out = new 
BufferedOutputStream(rawOut)) {
+                        positionHolder.set(readLines(fileReader, 
currentState.getBuffer(), out, chksum));
+                    }
+                }
+            });
+
+            // If there ended up being no data, just remove the FlowFile
+            if (flowFile.getSize() == 0) {
+                session.remove(flowFile);
+            } else {
+                // determine filename for FlowFile by using <base filename of 
log file>.<initial offset>-<final offset>.<extension>
+                final String tailFilename = file.getName();
+                final String baseName = 
StringUtils.substringBeforeLast(tailFilename, ".");
+                final String flowFileName;
+                if (baseName.length() < tailFilename.length()) {
+                    flowFileName = baseName + "." + position + "-" + 
positionHolder.get() + "." + StringUtils.substringAfterLast(tailFilename, ".");
+                } else {
+                    flowFileName = baseName + "." + position + "-" + 
positionHolder.get();
+                }
+
+                final Map<String, String> attributes = new HashMap<>(2);
+                attributes.put(CoreAttributes.FILENAME.key(), flowFileName);
+                attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+                flowFile = session.putAllAttributes(flowFile, attributes);
+
+                session.getProvenanceReporter().receive(flowFile, 
file.toURI().toString(), "FlowFile contains bytes " + position + " through " + 
positionHolder.get() + " of source file",
+                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNanos));
+                session.transfer(flowFile, REL_SUCCESS);
+                position = positionHolder.get();
+            }
+        }
+
+        // Create a new state object to represent our current position, 
timestamp, etc.
+        timestamp = System.currentTimeMillis();
+        final TailFileState updatedState = new 
TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, 
timestamp, checksum, state.getBuffer());
+        this.state = updatedState;
+
+        if (!consumeData) {
+            // no data to consume so rather than continually running, yield to 
allow other processors to use the thread.
+            // In this case, the state should not have changed, and we will 
have created no FlowFiles, so we don't have to
+            // persist the state or commit the session; instead, just return 
here.
+            context.yield();
+            return;
+        }
+
+        // We must commit session before persisting state in order to avoid 
data loss on restart
+        session.commit();
+        final String stateFilename = 
context.getProperty(STATE_FILE).getValue();
+        try {
+            persistState(updatedState, stateFilename);
+        } catch (final IOException e) {
+            getLogger().warn("Failed to update state file {} due to {}; some 
data may be duplicated on restart of NiFi", new Object[] {stateFilename, e});
+        }
+    }
+
+
+    /**
+     * Read new lines from the given RandomAccessFile, copying it to the given 
Output Stream. The Checksum is used in order to later determine whether or not
+     * data has been consumed.
+     *
+     * @param reader The RandomAccessFile to read data from
+     * @param buffer the buffer to use for copying data
+     * @param out the OutputStream to copy the data to
+     * @param checksum the Checksum object to use in order to calculate 
checksum for recovery purposes
+     *
+     * @return The new position after the lines have been read
+     * @throws java.io.IOException if an I/O error occurs.
+     */
+    private long readLines(final RandomAccessFile reader, final byte[] buffer, 
final OutputStream out, final Checksum checksum) throws IOException {
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        long pos = reader.getFilePointer();
+        long rePos = pos; // position to re-read
+
+        int num;
+        boolean seenCR = false;
+        while (((num = reader.read(buffer)) != -1)) {
+            for (int i = 0; i < num; i++) {
+                byte ch = buffer[i];
+
+                switch (ch) {
+                    case '\n':
+                        baos.write(ch);
+                        seenCR = false;
+                        baos.writeTo(out);
+                        baos.reset();
+                        rePos = pos + i + 1;
+                        break;
+                    case '\r':
+                        baos.write(ch);
+                        seenCR = true;
+                        break;
+                    default:
+                        if (seenCR) {
+                            seenCR = false;
+                            baos.writeTo(out);
+                            baos.reset();
+                            baos.write(ch);
+                            rePos = pos + i;
+                        } else {
+                            baos.write(ch);
+                        }
+                }
+            }
+
+            checksum.update(buffer, 0, num);
+            pos = reader.getFilePointer();
+        }
+
+        reader.seek(rePos); // Ensure we can re-read if necessary
+        return rePos;
+    }
+
+
+    /**
+     * Returns a list of all Files that match the following criteria:
+     *
+     * <ul>
+     * <li>Filename matches the Rolling Filename Pattern</li>
+     * <li>Filename does not match the actual file being tailed</li>
+     * <li>The Last Modified Time on the file is equal to or later than the 
given minimum timestamp</li>
+     * </ul>
+     *
+     * <p>
+     * The List that is returned will be ordered by file timestamp, providing 
the oldest file first.
+     * </p>
+     *
+     * @param context the ProcessContext to use in order to determine 
Processor configuration
+     * @param minTimestamp any file with a Last Modified Time before this 
timestamp will not be returned
+     * @return a list of all Files that have rolled over
+     * @throws IOException if unable to perform the listing of files
+     */
+    private List<File> getRolledOffFiles(final ProcessContext context, final 
long minTimestamp) throws IOException {
+        final String tailFilename = context.getProperty(FILENAME).getValue();
+        final File tailFile = new File(tailFilename);
+        File directory = tailFile.getParentFile();
+        if (directory == null) {
+            directory = new File(".");
+        }
+
+        final String rollingPattern = 
context.getProperty(ROLLING_FILENAME_PATTERN).getValue();
+        if (rollingPattern == null) {
+            return Collections.emptyList();
+        }
+
+        final List<File> rolledOffFiles = new ArrayList<>();
+        final DirectoryStream<Path> dirStream = 
Files.newDirectoryStream(directory.toPath(), rollingPattern);
+        for (final Path path : dirStream) {
+            final File file = path.toFile();
+            if (file.lastModified() < minTimestamp || file.equals(tailFile)) {
+                continue;
+            }
+
+            rolledOffFiles.add(file);
+        }
+
+        // Sort files based on last modified timestamp. If same timestamp, use 
filename as a secondary sort, as often
+        // files that are rolled over are given a naming scheme that is 
lexicographically sort in the same order as the
+        // timestamp, such as yyyy-MM-dd-HH-mm-ss
+        Collections.sort(rolledOffFiles, new Comparator<File>() {
+            @Override
+            public int compare(final File o1, final File o2) {
+                final int lastModifiedComp = Long.compare(o1.lastModified(), 
o2.lastModified());
+                if (lastModifiedComp != 0) {
+                    return lastModifiedComp;
+                }
+
+                return o1.getName().compareTo(o2.getName());
+            }
+        });
+
+        return rolledOffFiles;
+    }
+
+
+    /**
+     * A simple Java class to hold information about our state so that we can 
maintain this state across multiple invocations of the Processor
+     */
+    static class TailFileState {
+        private final String filename; // hold onto filename and not just File 
because we want to match that against the user-defined filename to recover from
+        private final File file;
+        private final RandomAccessFile raf;
+        private final long position;
+        private final long timestamp;
+        private final Checksum checksum;
+        private final byte[] buffer;
+
+        public TailFileState(final String filename, final File file, final 
RandomAccessFile raf, final long position, final long timestamp, final Checksum 
checksum, final byte[] buffer) {
+            this.filename = filename;
+            this.file = file;
+            this.raf = raf;
+            this.position = position;
+            this.timestamp = (timestamp / 1000) * 1000; // many operating 
systems will use only second-level precision for last-modified times so cut off 
milliseconds
+            this.checksum = checksum;
+            this.buffer = buffer;
+        }
+
+        public String getFilename() {
+            return filename;
+        }
+
+        public File getFile() {
+            return file;
+        }
+
+        public RandomAccessFile getReader() {
+            return raf;
+        }
+
+        public long getPosition() {
+            return position;
+        }
+
+        public long getTimestamp() {
+            return timestamp;
+        }
+
+        public Checksum getChecksum() {
+            return checksum;
+        }
+
+        public byte[] getBuffer() {
+            return buffer;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/31f0909b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 0ce1456..5bd36db 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -65,6 +65,7 @@ org.apache.nifi.processors.standard.SplitContent
 org.apache.nifi.processors.standard.SplitJson
 org.apache.nifi.processors.standard.SplitText
 org.apache.nifi.processors.standard.SplitXml
+org.apache.nifi.processors.standard.TailFile
 org.apache.nifi.processors.standard.TransformXml
 org.apache.nifi.processors.standard.UnpackContent
 org.apache.nifi.processors.standard.ValidateXml

http://git-wip-us.apache.org/repos/asf/nifi/blob/31f0909b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
new file mode 100644
index 0000000..d282a9d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.nifi.processors.standard.TailFile.TailFileState;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTailFile {
+    private File file;
+    private RandomAccessFile raf;
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws IOException {
+        final File targetDir = new File("target");
+        final File[] files = targetDir.listFiles(new FilenameFilter() {
+            @Override
+            public boolean accept(final File dir, final String name) {
+                return name.startsWith("log");
+            }
+        });
+
+        for (final File file : files) {
+            file.delete();
+        }
+
+        file = new File("target/log.txt");
+        file.delete();
+        assertTrue(file.createNewFile());
+
+        final File stateFile = new File("target/tail-file.state");
+        stateFile.delete();
+        Assert.assertFalse(stateFile.exists());
+
+        runner = TestRunners.newTestRunner(new TailFile());
+        runner.setProperty(TailFile.FILENAME, "target/log.txt");
+        runner.setProperty(TailFile.STATE_FILE, "target/tail-file.state");
+        runner.assertValid();
+
+        raf = new RandomAccessFile(file, "rw");
+    }
+
+    @After
+    public void cleanup() throws IOException {
+        if (raf != null) {
+            raf.close();
+        }
+    }
+
+
+    @Test
+    public void testConsumeAfterTruncation() throws IOException {
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("hello\n".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
+        runner.clearTransferState();
+
+        // truncate and then write same number of bytes
+        raf.setLength(0L);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+        raf.write("HELLO\n".getBytes());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("HELLO\n");
+    }
+
+
+    @Test
+    public void testRemainderOfFileRecoveredAfterRestart() throws IOException {
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log*.txt");
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("hello\n".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
+        runner.clearTransferState();
+
+        raf.write("world".getBytes());
+        raf.close();
+        file.renameTo(new File("target/log1.txt"));
+
+        raf = new RandomAccessFile(new File("target/log.txt"), "rw");
+        raf.write("new file\n".getBytes());
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("new
 file\n");
+    }
+
+
+    @Test
+    public void testRemainderOfFileRecoveredIfRolledOverWhileRunning() throws 
IOException {
+        // this mimics the case when we are reading a log file that rolls over 
while processor is running.
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log*.txt");
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("hello\n".getBytes());
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
+        runner.clearTransferState();
+
+        raf.write("world".getBytes());
+        raf.close();
+        file.renameTo(new File("target/log1.txt"));
+
+        raf = new RandomAccessFile(new File("target/log.txt"), "rw");
+        raf.write("1\n".getBytes());
+        runner.run(1, false, false);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n");
+    }
+
+
+    @Test
+    public void testConsumeWhenNewLineFound() throws IOException, 
InterruptedException {
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        final long start = System.currentTimeMillis();
+        Thread.sleep(1100L);
+
+        raf.write("Hello, World".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("\r\n".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
+        final TailFileState state = ((TailFile) 
runner.getProcessor()).getState();
+        assertNotNull(state);
+        assertEquals("target/log.txt", state.getFilename());
+        assertTrue(state.getTimestamp() <= System.currentTimeMillis());
+        assertTrue(state.getTimestamp() >= start);
+        assertEquals(14, state.getPosition());
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("Hello,
 World\r\n");
+
+        runner.clearTransferState();
+
+        raf.write("12345".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("\n".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("12345\n");
+
+        runner.clearTransferState();
+        raf.write("carriage\rreturn\r".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("carriage\r");
+
+        runner.clearTransferState();
+        raf.write("\r\n".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("return\r\r\n");
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/31f0909b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/logback-test.xml
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/logback-test.xml
index 139b232..fad019a 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/logback-test.xml
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/logback-test.xml
@@ -22,6 +22,21 @@
         </encoder>
     </appender>
     
+    <appender name="TARGET_FILE" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>target/log.txt</file>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            
<fileNamePattern>target/log_%d{yyyy-MM-dd_HH}.%i.txt</fileNamePattern>
+            <timeBasedFileNamingAndTriggeringPolicy 
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+                <maxFileSize>10KB</maxFileSize>
+            </timeBasedFileNamingAndTriggeringPolicy>
+            <!-- keep 30 log files worth of history -->
+            <maxHistory>30</maxHistory>
+        </rollingPolicy>
+        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+            <pattern>%date %level [%thread] %logger{40} %msg%n</pattern>
+            <immediateFlush>true</immediateFlush>
+        </encoder>
+    </appender>    
     <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR -->
     <logger name="org.apache.nifi" level="INFO"/>
     
@@ -41,6 +56,9 @@
     <logger name="com.sun.jersey.spi.container.servlet.WebComponent" 
level="ERROR"/>
 
     <logger name="org.apache.nifi.processors.standard" level="DEBUG"/>
+    <logger name="target.file" level="DEBUG" additivity="true">
+        <appender-ref ref="TARGET_FILE" />
+    </logger>
 
     <root level="INFO">
         <appender-ref ref="CONSOLE"/>

Reply via email to