Repository: nifi Updated Branches: refs/heads/master 854d20398 -> 7a165b62c
NIFI-994: Initial import of TailFile Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/31f0909b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/31f0909b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/31f0909b Branch: refs/heads/master Commit: 31f0909bd315af43936b844327454ba2c48611e4 Parents: 49ee06b Author: Mark Payne <[email protected]> Authored: Sun Oct 18 15:16:56 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Sun Oct 18 19:00:24 2015 -0400 ---------------------------------------------------------------------- .../nifi/processors/standard/TailFile.java | 661 +++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../nifi/processors/standard/TestTailFile.java | 205 ++++++ .../src/test/resources/logback-test.xml | 18 + 4 files changed, 885 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/31f0909b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java new file mode 100644 index 0000000..2fe9431 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java @@ -0,0 +1,661 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import java.util.zip.Checksum; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.NullOutputStream; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.LongHolder; + +// note: it is important that this Processor is not marked as @SupportsBatching because the session commits must complete before persisting state locally; otherwise, data loss may occur +@TriggerSerially +@Tags({"tail", "file", "log", "text", "source"}) +@CapabilityDescription("\"Tails\" a file, ingesting data from the file as it is written to the file. The file is expected to be textual. Data is ingested only when a " + + "new line is encountered (carriage return or new-line character or combination). If the file to tail is periodically \"rolled over\", as is generally the case " + + "with log files, an optional Rolling Filename Pattern can be used to retrieve data from files that have rolled over, even if the rollover occurred while NiFi " + + "was not running (provided that the data still exists upon restart of NiFi).") +public class TailFile extends AbstractProcessor { + + static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder() + .name("File to Tail") + .description("Fully-qualified filename of the file that should be tailed") + .expressionLanguageSupported(false) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .required(true) + .build(); + static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new PropertyDescriptor.Builder() + .name("Rolling Filename Pattern") + .description("If the file to tail \"rolls over\" as would be the case with log files, this filename pattern will be used to " + + "identify files that have rolled over so that if NiFi is restarted, and the file has rolled over, it will be able to pick up where it left off. " + + "This pattern supports wildcard characters * and ? and will assume that the files that have rolled over live in the same directory as the file being tailed.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .required(false) + .build(); + static final PropertyDescriptor STATE_FILE = new PropertyDescriptor.Builder() + .name("State File") + .description("Specifies the file that should be used for storing state about what data has been ingested so that upon restart NiFi can resume from where it left off") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All FlowFiles are routed to this Relationship.") + .build(); + + private volatile TailFileState state = new TailFileState(null, null, null, 0L, 0L, null, new byte[65536]); + private volatile boolean recoveredRolledFiles = false; + private volatile Long expectedRecoveryChecksum; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(FILENAME); + properties.add(ROLLING_FILENAME_PATTERN); + properties.add(new PropertyDescriptor.Builder().fromPropertyDescriptor(STATE_FILE).defaultValue("./conf/state/" + getIdentifier()).build()); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (FILENAME.equals(descriptor)) { + state = new TailFileState(newValue, null, null, 0L, 0L, null, new byte[65536]); + recoveredRolledFiles = false; + } else if (ROLLING_FILENAME_PATTERN.equals(descriptor)) { + recoveredRolledFiles = false; + } + } + + @OnScheduled + public void recoverState(final ProcessContext context) throws IOException { + recoveredRolledFiles = false; + + final String tailFilename = context.getProperty(FILENAME).getValue(); + final String stateFilename = context.getProperty(STATE_FILE).getValue(); + + final File stateFile = new File(stateFilename); + try (final FileInputStream fis = new FileInputStream(stateFile); + final DataInputStream dis = new DataInputStream(fis)) { + + final int encodingVersion = dis.readInt(); + if (encodingVersion > 0) { + throw new IOException("Unable to recover state because State File was encoded in a more recent version than Version 1"); + } + + if (encodingVersion == 0) { + final String filename = dis.readUTF(); + final long position = dis.readLong(); + final long timestamp = dis.readLong(); + final boolean checksumPresent = dis.readBoolean(); + + RandomAccessFile reader = null; + File tailFile = null; + + if (checksumPresent && tailFilename.equals(filename)) { + expectedRecoveryChecksum = dis.readLong(); + + // We have an expected checksum and the currently configured filename is the same as the state file. + // We need to check if the existing file is the same as the one referred to in the state file based on + // the checksum. + final File existingTailFile = new File(filename); + if (existingTailFile.length() >= position) { + try (final InputStream tailFileIs = new FileInputStream(existingTailFile); + final CheckedInputStream in = new CheckedInputStream(tailFileIs, new CRC32())) { + StreamUtils.copy(in, new NullOutputStream(), state.getPosition()); + + final long checksumResult = in.getChecksum().getValue(); + if (checksumResult == expectedRecoveryChecksum) { + tailFile = existingTailFile; + reader = new RandomAccessFile(tailFile, "r"); + reader.seek(position); + } + } + } + + state = new TailFileState(tailFilename, tailFile, reader, position, timestamp, null, new byte[65536]); + } else { + expectedRecoveryChecksum = null; + // tailing a new file since the state file was written out. We will reset state. + state = new TailFileState(tailFilename, null, null, 0L, 0L, null, new byte[65536]); + } + } else { + // encoding Version == -1... no data in file. Just move on. + } + } catch (final FileNotFoundException fnfe) { + } + } + + + public void persistState(final TailFileState state, final String stateFilename) throws IOException { + final File stateFile = new File(stateFilename); + File directory = stateFile.getParentFile(); + if (directory != null && !directory.exists() && !directory.mkdirs()) { + getLogger().warn("Failed to persist state to {} because the parent directory does not exist and could not be created. This may result in data being duplicated upon restart of NiFi"); + return; + } + try (final FileOutputStream fos = new FileOutputStream(stateFile); + final DataOutputStream dos = new DataOutputStream(fos)) { + + dos.writeInt(0); // version + dos.writeUTF(state.getFilename()); + dos.writeLong(state.getPosition()); + dos.writeLong(state.getTimestamp()); + if (state.getChecksum() == null) { + dos.writeBoolean(false); + } else { + dos.writeBoolean(true); + dos.writeLong(state.getChecksum().getValue()); + } + } + } + + private RandomAccessFile createReader(final File file, final long position) { + final RandomAccessFile reader; + + try { + reader = new RandomAccessFile(file, "r"); + } catch (final FileNotFoundException fnfe) { + getLogger().debug("File {} does not exist; yielding and returning", new Object[] {file}); + return null; + } + + try { + reader.seek(position); + } catch (final IOException ioe) { + getLogger().error("Failed to read from {} due to {}", new Object[] {file, ioe}); + + try { + reader.close(); + } catch (final IOException ioe2) { + } + + return null; + } + + return reader; + } + + // for testing purposes + TailFileState getState() { + return state; + } + + + + /** + * Finds any files that have rolled over and have not yet been ingested by this Processor. Each of these files that is found will be + * ingested as its own FlowFile. If a file is found that has been partially ingested, the rest of the file will be ingested as a + * single FlowFile but the data that already has been ingested will not be ingested again. + * + * @param context the ProcessContext to use in order to obtain Processor configuration + * @param session the ProcessSession to use in order to interact with FlowFile creation and content. + */ + private void recoverRolledFiles(final ProcessContext context, final ProcessSession session) { + try { + // Find all files that match our rollover pattern, if any, and order them based on their timestamp and filename. + // Ignore any file that has a timestamp earlier than the state that we have persisted. If we were reading from + // a file when we stopped running, then that file that we were reading from should be the first file in this list, + // assuming that the file still exists on the file system. + final List<File> rolledOffFiles = getRolledOffFiles(context, state.getTimestamp()); + + // For first file that we find, it may or may not be the file that we were last reading from. + // As a result, we have to read up to the position we stored, while calculating the checksum. If the checksums match, + // then we know we've already processed this file. If the checksums do not match, then we have not + // processed this file and we need to seek back to position 0 and ingest the entire file. + // For all other files that have been rolled over, we need to just ingest the entire file. + if (!rolledOffFiles.isEmpty() && expectedRecoveryChecksum != null && rolledOffFiles.get(0).length() >= state.getPosition()) { + final File firstFile = rolledOffFiles.get(0); + + final long startNanos = System.nanoTime(); + try (final InputStream fis = new FileInputStream(firstFile); + final CheckedInputStream in = new CheckedInputStream(fis, new CRC32())) { + StreamUtils.copy(in, new NullOutputStream(), state.getPosition()); + + final long checksumResult = in.getChecksum().getValue(); + if (checksumResult == expectedRecoveryChecksum) { + // This is the same file that we were reading when we shutdown. Start reading from this point on. + rolledOffFiles.remove(0); + FlowFile flowFile = session.create(); + flowFile = session.importFrom(in, flowFile); + flowFile = session.putAttribute(flowFile, "filename", firstFile.getName()); + + session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), "FlowFile contains bytes 0 through " + state.getPosition() + " of source file", + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos)); + session.transfer(flowFile, REL_SUCCESS); + + // use a timestamp of lastModified() + 1 so that we do not ingest this file again. + state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, firstFile.lastModified() + 1L, null, state.getBuffer()); + + // must ensure that we do session.commit() before persisting state in order to avoid data loss. + session.commit(); + persistState(state, context.getProperty(STATE_FILE).getValue()); + } + } + } + + // For each file that we found that matches our Rollover Pattern, and has a last modified date later than the timestamp + // that we recovered from the state file, we need to consume the entire file. The only exception to this is the file that + // we were reading when we last stopped, as it may already have been partially consumed. That is taken care of in the + // above block of code. + for (final File file : rolledOffFiles) { + FlowFile flowFile = session.create(); + flowFile = session.importFrom(file.toPath(), true, flowFile); + flowFile = session.putAttribute(flowFile, "filename", file.getName()); + session.getProvenanceReporter().receive(flowFile, file.toURI().toString()); + session.transfer(flowFile, REL_SUCCESS); + + // use a timestamp of lastModified() + 1 so that we do not ingest this file again. + state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, file.lastModified() + 1L, null, state.getBuffer()); + + // must ensure that we do session.commit() before persisting state in order to avoid data loss. + session.commit(); + persistState(state, context.getProperty(STATE_FILE).getValue()); + } + } catch (final IOException e) { + getLogger().error("Failed to recover files that have rolled over due to {}", new Object[] {e}); + } + } + + + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + // If this is the first time the processor has run since it was started, we need to check for any files that may have rolled over + // while the processor was stopped. If we find any, we need to import them into the flow. + if (!recoveredRolledFiles) { + recoverRolledFiles(context, session); + recoveredRolledFiles = true; + } + + // initialize local variables from state object; this is done so that we can easily change the values throughout + // the onTrigger method and then create a new state object after we finish processing the files. + TailFileState state = this.state; + File file = state.getFile(); + RandomAccessFile reader = state.getReader(); + Checksum checksum = state.getChecksum(); + if (checksum == null) { + checksum = new CRC32(); + } + long position = state.getPosition(); + long timestamp = state.getTimestamp(); + + // Create a reader if necessary. + if (file == null || reader == null) { + file = new File(context.getProperty(FILENAME).getValue()); + reader = createReader(file, position); + if (reader == null) { + context.yield(); + return; + } + } + + final long startNanos = System.nanoTime(); + + // Check if file has rotated + long fileLength = file.length(); + if (fileLength < position) { + // File has rotated. It's possible that it rotated before we finished reading all of the data. As a result, we need + // to check the last rolled-over file and see if it is longer than our position. If so, consume the data past our + // marked position. + try { + final List<File> updatedRolledOverFiles = getRolledOffFiles(context, timestamp); + if (!updatedRolledOverFiles.isEmpty()) { + final File lastRolledOver = updatedRolledOverFiles.get(updatedRolledOverFiles.size() - 1); + + // there is more data in the file that has not yet been consumed. + if (lastRolledOver.length() > state.getPosition()) { + try (final FileInputStream fis = new FileInputStream(lastRolledOver)) { + StreamUtils.skip(fis, state.getPosition()); + + FlowFile flowFile = session.create(); + flowFile = session.importFrom(fis, flowFile); + flowFile = session.putAttribute(flowFile, "filename", lastRolledOver.getName()); + + session.getProvenanceReporter().receive(flowFile, lastRolledOver.toURI().toString(), "FlowFile contains bytes " + state.getPosition() + " through " + + lastRolledOver.length() + " of source file", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos)); + session.transfer(flowFile, REL_SUCCESS); + this.state = state = new TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, lastRolledOver.lastModified() + 1L, null, state.getBuffer()); + + // must ensure that we do session.commit() before persisting state in order to avoid data loss. + session.commit(); + persistState(state, context.getProperty(STATE_FILE).getValue()); + } + } + } + } catch (final IOException ioe) { + getLogger().error("File being tailed was rolled over. However, was unable to determine which \"Rollover Files\" exist or read the last one due to {}. " + + "It is possible that data at the end of the last file will be skipped as a result.", new Object[] {ioe}); + } + + + // Since file has rotated, we close the reader, create a new one, and then reset our state. + try { + reader.close(); + } catch (final IOException ioe) { + getLogger().warn("Failed to close reader for {} due to {}", new Object[] {file, ioe}); + } + + reader = createReader(file, 0L); + position = 0L; + checksum.reset(); + fileLength = file.length(); + } + + // check if there is any data to consume by checking if file has grown or last modified timestamp has changed. + boolean consumeData = false; + if (fileLength > position) { + consumeData = true; + } else if (file.lastModified() > timestamp) { + // This can happen if file is truncated, or is replaced with the same amount of data as the old file. + position = 0; + + try { + reader.seek(0L); + } catch (final IOException ioe) { + getLogger().error("Failed to seek to beginning of file due to {}", new Object[] {ioe}); + context.yield(); + return; + } + + consumeData = true; + } + + // If there is data to consume, read as much as we can. + final TailFileState currentState = state; + final Checksum chksum = checksum; + if (consumeData) { + // data has been written to file. Stream it to a new FlowFile. + FlowFile flowFile = session.create(); + final RandomAccessFile fileReader = reader; + final LongHolder positionHolder = new LongHolder(position); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream out = new BufferedOutputStream(rawOut)) { + positionHolder.set(readLines(fileReader, currentState.getBuffer(), out, chksum)); + } + } + }); + + // If there ended up being no data, just remove the FlowFile + if (flowFile.getSize() == 0) { + session.remove(flowFile); + } else { + // determine filename for FlowFile by using <base filename of log file>.<initial offset>-<final offset>.<extension> + final String tailFilename = file.getName(); + final String baseName = StringUtils.substringBeforeLast(tailFilename, "."); + final String flowFileName; + if (baseName.length() < tailFilename.length()) { + flowFileName = baseName + "." + position + "-" + positionHolder.get() + "." + StringUtils.substringAfterLast(tailFilename, "."); + } else { + flowFileName = baseName + "." + position + "-" + positionHolder.get(); + } + + final Map<String, String> attributes = new HashMap<>(2); + attributes.put(CoreAttributes.FILENAME.key(), flowFileName); + attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain"); + flowFile = session.putAllAttributes(flowFile, attributes); + + session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), "FlowFile contains bytes " + position + " through " + positionHolder.get() + " of source file", + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos)); + session.transfer(flowFile, REL_SUCCESS); + position = positionHolder.get(); + } + } + + // Create a new state object to represent our current position, timestamp, etc. + timestamp = System.currentTimeMillis(); + final TailFileState updatedState = new TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, timestamp, checksum, state.getBuffer()); + this.state = updatedState; + + if (!consumeData) { + // no data to consume so rather than continually running, yield to allow other processors to use the thread. + // In this case, the state should not have changed, and we will have created no FlowFiles, so we don't have to + // persist the state or commit the session; instead, just return here. + context.yield(); + return; + } + + // We must commit session before persisting state in order to avoid data loss on restart + session.commit(); + final String stateFilename = context.getProperty(STATE_FILE).getValue(); + try { + persistState(updatedState, stateFilename); + } catch (final IOException e) { + getLogger().warn("Failed to update state file {} due to {}; some data may be duplicated on restart of NiFi", new Object[] {stateFilename, e}); + } + } + + + /** + * Read new lines from the given RandomAccessFile, copying it to the given Output Stream. The Checksum is used in order to later determine whether or not + * data has been consumed. + * + * @param reader The RandomAccessFile to read data from + * @param buffer the buffer to use for copying data + * @param out the OutputStream to copy the data to + * @param checksum the Checksum object to use in order to calculate checksum for recovery purposes + * + * @return The new position after the lines have been read + * @throws java.io.IOException if an I/O error occurs. + */ + private long readLines(final RandomAccessFile reader, final byte[] buffer, final OutputStream out, final Checksum checksum) throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + long pos = reader.getFilePointer(); + long rePos = pos; // position to re-read + + int num; + boolean seenCR = false; + while (((num = reader.read(buffer)) != -1)) { + for (int i = 0; i < num; i++) { + byte ch = buffer[i]; + + switch (ch) { + case '\n': + baos.write(ch); + seenCR = false; + baos.writeTo(out); + baos.reset(); + rePos = pos + i + 1; + break; + case '\r': + baos.write(ch); + seenCR = true; + break; + default: + if (seenCR) { + seenCR = false; + baos.writeTo(out); + baos.reset(); + baos.write(ch); + rePos = pos + i; + } else { + baos.write(ch); + } + } + } + + checksum.update(buffer, 0, num); + pos = reader.getFilePointer(); + } + + reader.seek(rePos); // Ensure we can re-read if necessary + return rePos; + } + + + /** + * Returns a list of all Files that match the following criteria: + * + * <ul> + * <li>Filename matches the Rolling Filename Pattern</li> + * <li>Filename does not match the actual file being tailed</li> + * <li>The Last Modified Time on the file is equal to or later than the given minimum timestamp</li> + * </ul> + * + * <p> + * The List that is returned will be ordered by file timestamp, providing the oldest file first. + * </p> + * + * @param context the ProcessContext to use in order to determine Processor configuration + * @param minTimestamp any file with a Last Modified Time before this timestamp will not be returned + * @return a list of all Files that have rolled over + * @throws IOException if unable to perform the listing of files + */ + private List<File> getRolledOffFiles(final ProcessContext context, final long minTimestamp) throws IOException { + final String tailFilename = context.getProperty(FILENAME).getValue(); + final File tailFile = new File(tailFilename); + File directory = tailFile.getParentFile(); + if (directory == null) { + directory = new File("."); + } + + final String rollingPattern = context.getProperty(ROLLING_FILENAME_PATTERN).getValue(); + if (rollingPattern == null) { + return Collections.emptyList(); + } + + final List<File> rolledOffFiles = new ArrayList<>(); + final DirectoryStream<Path> dirStream = Files.newDirectoryStream(directory.toPath(), rollingPattern); + for (final Path path : dirStream) { + final File file = path.toFile(); + if (file.lastModified() < minTimestamp || file.equals(tailFile)) { + continue; + } + + rolledOffFiles.add(file); + } + + // Sort files based on last modified timestamp. If same timestamp, use filename as a secondary sort, as often + // files that are rolled over are given a naming scheme that is lexicographically sort in the same order as the + // timestamp, such as yyyy-MM-dd-HH-mm-ss + Collections.sort(rolledOffFiles, new Comparator<File>() { + @Override + public int compare(final File o1, final File o2) { + final int lastModifiedComp = Long.compare(o1.lastModified(), o2.lastModified()); + if (lastModifiedComp != 0) { + return lastModifiedComp; + } + + return o1.getName().compareTo(o2.getName()); + } + }); + + return rolledOffFiles; + } + + + /** + * A simple Java class to hold information about our state so that we can maintain this state across multiple invocations of the Processor + */ + static class TailFileState { + private final String filename; // hold onto filename and not just File because we want to match that against the user-defined filename to recover from + private final File file; + private final RandomAccessFile raf; + private final long position; + private final long timestamp; + private final Checksum checksum; + private final byte[] buffer; + + public TailFileState(final String filename, final File file, final RandomAccessFile raf, final long position, final long timestamp, final Checksum checksum, final byte[] buffer) { + this.filename = filename; + this.file = file; + this.raf = raf; + this.position = position; + this.timestamp = (timestamp / 1000) * 1000; // many operating systems will use only second-level precision for last-modified times so cut off milliseconds + this.checksum = checksum; + this.buffer = buffer; + } + + public String getFilename() { + return filename; + } + + public File getFile() { + return file; + } + + public RandomAccessFile getReader() { + return raf; + } + + public long getPosition() { + return position; + } + + public long getTimestamp() { + return timestamp; + } + + public Checksum getChecksum() { + return checksum; + } + + public byte[] getBuffer() { + return buffer; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/31f0909b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 0ce1456..5bd36db 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -65,6 +65,7 @@ org.apache.nifi.processors.standard.SplitContent org.apache.nifi.processors.standard.SplitJson org.apache.nifi.processors.standard.SplitText org.apache.nifi.processors.standard.SplitXml +org.apache.nifi.processors.standard.TailFile org.apache.nifi.processors.standard.TransformXml org.apache.nifi.processors.standard.UnpackContent org.apache.nifi.processors.standard.ValidateXml http://git-wip-us.apache.org/repos/asf/nifi/blob/31f0909b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java new file mode 100644 index 0000000..d282a9d --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.RandomAccessFile; + +import org.apache.nifi.processors.standard.TailFile.TailFileState; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class TestTailFile { + private File file; + private RandomAccessFile raf; + private TestRunner runner; + + @Before + public void setup() throws IOException { + final File targetDir = new File("target"); + final File[] files = targetDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(final File dir, final String name) { + return name.startsWith("log"); + } + }); + + for (final File file : files) { + file.delete(); + } + + file = new File("target/log.txt"); + file.delete(); + assertTrue(file.createNewFile()); + + final File stateFile = new File("target/tail-file.state"); + stateFile.delete(); + Assert.assertFalse(stateFile.exists()); + + runner = TestRunners.newTestRunner(new TailFile()); + runner.setProperty(TailFile.FILENAME, "target/log.txt"); + runner.setProperty(TailFile.STATE_FILE, "target/tail-file.state"); + runner.assertValid(); + + raf = new RandomAccessFile(file, "rw"); + } + + @After + public void cleanup() throws IOException { + if (raf != null) { + raf.close(); + } + } + + + @Test + public void testConsumeAfterTruncation() throws IOException { + runner.run(); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); + + raf.write("hello\n".getBytes()); + runner.run(); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n"); + runner.clearTransferState(); + + // truncate and then write same number of bytes + raf.setLength(0L); + runner.run(); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); + raf.write("HELLO\n".getBytes()); + + runner.run(); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("HELLO\n"); + } + + + @Test + public void testRemainderOfFileRecoveredAfterRestart() throws IOException { + runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log*.txt"); + runner.run(); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); + + raf.write("hello\n".getBytes()); + runner.run(); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n"); + runner.clearTransferState(); + + raf.write("world".getBytes()); + raf.close(); + file.renameTo(new File("target/log1.txt")); + + raf = new RandomAccessFile(new File("target/log.txt"), "rw"); + raf.write("new file\n".getBytes()); + runner.run(); + + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world"); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("new file\n"); + } + + + @Test + public void testRemainderOfFileRecoveredIfRolledOverWhileRunning() throws IOException { + // this mimics the case when we are reading a log file that rolls over while processor is running. + runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log*.txt"); + runner.run(1, false, false); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); + + raf.write("hello\n".getBytes()); + runner.run(1, false, false); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n"); + runner.clearTransferState(); + + raf.write("world".getBytes()); + raf.close(); + file.renameTo(new File("target/log1.txt")); + + raf = new RandomAccessFile(new File("target/log.txt"), "rw"); + raf.write("1\n".getBytes()); + runner.run(1, false, false); + + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world"); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n"); + } + + + @Test + public void testConsumeWhenNewLineFound() throws IOException, InterruptedException { + runner.run(); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); + + final long start = System.currentTimeMillis(); + Thread.sleep(1100L); + + raf.write("Hello, World".getBytes()); + runner.run(); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); + + raf.write("\r\n".getBytes()); + runner.run(); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1); + + final TailFileState state = ((TailFile) runner.getProcessor()).getState(); + assertNotNull(state); + assertEquals("target/log.txt", state.getFilename()); + assertTrue(state.getTimestamp() <= System.currentTimeMillis()); + assertTrue(state.getTimestamp() >= start); + assertEquals(14, state.getPosition()); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("Hello, World\r\n"); + + runner.clearTransferState(); + + raf.write("12345".getBytes()); + runner.run(); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); + + raf.write("\n".getBytes()); + runner.run(); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("12345\n"); + + runner.clearTransferState(); + raf.write("carriage\rreturn\r".getBytes()); + runner.run(); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("carriage\r"); + + runner.clearTransferState(); + raf.write("\r\n".getBytes()); + runner.run(); + runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1); + runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("return\r\r\n"); + + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/31f0909b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/logback-test.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/logback-test.xml index 139b232..fad019a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/logback-test.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/logback-test.xml @@ -22,6 +22,21 @@ </encoder> </appender> + <appender name="TARGET_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> + <file>target/log.txt</file> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>target/log_%d{yyyy-MM-dd_HH}.%i.txt</fileNamePattern> + <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> + <maxFileSize>10KB</maxFileSize> + </timeBasedFileNamingAndTriggeringPolicy> + <!-- keep 30 log files worth of history --> + <maxHistory>30</maxHistory> + </rollingPolicy> + <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> + <pattern>%date %level [%thread] %logger{40} %msg%n</pattern> + <immediateFlush>true</immediateFlush> + </encoder> + </appender> <!-- valid logging levels: TRACE, DEBUG, INFO, WARN, ERROR --> <logger name="org.apache.nifi" level="INFO"/> @@ -41,6 +56,9 @@ <logger name="com.sun.jersey.spi.container.servlet.WebComponent" level="ERROR"/> <logger name="org.apache.nifi.processors.standard" level="DEBUG"/> + <logger name="target.file" level="DEBUG" additivity="true"> + <appender-ref ref="TARGET_FILE" /> + </logger> <root level="INFO"> <appender-ref ref="CONSOLE"/>
