This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new bf52973  NIFI-8773: Implemented Line Start Pattern in TailFile
bf52973 is described below

commit bf52973d628b092902f389e638448a6cd458d6a2
Author: Mark Payne <[email protected]>
AuthorDate: Fri Jul 9 17:16:21 2021 -0400

    NIFI-8773: Implemented Line Start Pattern in TailFile
    
    Each message encountered in the tailed file will be buffered (up to some 
configurable max) until the subsequent message arrives. At that point, the 
previous message will be flushed.
    
    This closes #5251
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../apache/nifi/processors/standard/TailFile.java  | 169 +++++++++++++++++----
 .../additionalDetails.html                         |  94 ++++++++++--
 .../standard/AbstractTestTailFileScenario.java     |   3 +-
 .../nifi/processors/standard/TestTailFile.java     |  73 ++++++++-
 4 files changed, 292 insertions(+), 47 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 2b1bfcd..19e5724 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -32,6 +32,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.components.RequiredPermission;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -40,6 +41,7 @@ import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
@@ -60,6 +62,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -84,6 +87,8 @@ import java.util.zip.Checksum;
 
 import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
 import static 
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+import static 
org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR;
+import static 
org.apache.nifi.processor.util.StandardValidators.REGULAR_EXPRESSION_VALIDATOR;
 
 // note: it is important that this Processor is not marked as 
@SupportsBatching because the session commits must complete before persisting 
state locally; otherwise, data loss may occur
 @TriggerSerially
@@ -108,6 +113,7 @@ import static 
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGIST
 public class TailFile extends AbstractProcessor {
 
     static final String MAP_PREFIX = "file.";
+    private static final byte[] NEW_LINE_BYTES = 
"\n".getBytes(StandardCharsets.UTF_8);
 
     static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", 
"Local",
             "State is stored locally. Each node in a cluster will tail a 
different file.");
@@ -130,7 +136,7 @@ public class TailFile extends AbstractProcessor {
             "Start with the data at the end of the File to Tail. Do not ingest 
any data thas has already been rolled over or any "
             + "data in the File to Tail that has already been written.");
 
-    static final PropertyDescriptor BASE_DIRECTORY = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor BASE_DIRECTORY = new Builder()
             .name("tail-base-directory")
             .displayName("Base directory")
             .description("Base directory used to look for files to tail. This 
property is required when using Multifile mode.")
@@ -139,7 +145,7 @@ public class TailFile extends AbstractProcessor {
             .required(false)
             .build();
 
-    static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor MODE = new Builder()
             .name("tail-mode")
             .displayName("Tailing mode")
             .description("Mode to use: single file will tail only one file, 
multiple file will look for a list of file. In Multiple mode"
@@ -150,7 +156,7 @@ public class TailFile extends AbstractProcessor {
             .defaultValue(MODE_SINGLEFILE.getValue())
             .build();
 
-    static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor FILENAME = new Builder()
             .displayName("File(s) to Tail")
             .name("File to Tail")
             .description("Path of the file to tail in case of single file 
mode. If using multifile mode, regular expression to find files "
@@ -161,7 +167,7 @@ public class TailFile extends AbstractProcessor {
             .required(true)
             .build();
 
-    static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new Builder()
             .name("Rolling Filename Pattern")
             .description("If the file to tail \"rolls over\" as would be the 
case with log files, this filename pattern will be used to "
                     + "identify files that have rolled over so that if NiFi is 
restarted, and the file has rolled over, it will be able to pick up where it 
left off. "
@@ -173,7 +179,7 @@ public class TailFile extends AbstractProcessor {
             .required(false)
             .build();
 
-    static final PropertyDescriptor POST_ROLLOVER_TAIL_PERIOD = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor POST_ROLLOVER_TAIL_PERIOD = new Builder()
         .name("Post-Rollover Tail Period")
         .description("When a file is rolled over, the processor will continue 
tailing the rolled over file until it has not been modified for this amount of 
time. " +
             "This allows for another process to rollover a file, and then 
flush out any buffered data. Note that when this value is set, and the tailed 
file rolls over, " +
@@ -186,7 +192,7 @@ public class TailFile extends AbstractProcessor {
         .defaultValue("0 sec")
         .build();
 
-    static final PropertyDescriptor STATE_LOCATION = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor STATE_LOCATION = new Builder()
             .displayName("State Location")
             .name("File Location") //retained name of property for backward 
compatibility of configs
             .description("Specifies where the state is located either local or 
cluster so that state can be stored "
@@ -196,7 +202,7 @@ public class TailFile extends AbstractProcessor {
             .defaultValue(LOCATION_LOCAL.getValue())
             .build();
 
-    static final PropertyDescriptor START_POSITION = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor START_POSITION = new Builder()
             .name("Initial Start Position")
             .description("When the Processor first begins to tail data, this 
property specifies where the Processor should begin reading data. Once data has 
been ingested from a file, "
                     + "the Processor will continue from the last point from 
which it has received data.")
@@ -205,7 +211,7 @@ public class TailFile extends AbstractProcessor {
             .required(true)
             .build();
 
-    static final PropertyDescriptor RECURSIVE = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor RECURSIVE = new Builder()
             .name("tailfile-recursive-lookup")
             .displayName("Recursive lookup")
             .description("When using Multiple files mode, this property 
defines if files must be listed recursively or not"
@@ -215,7 +221,7 @@ public class TailFile extends AbstractProcessor {
             .required(true)
             .build();
 
-    static final PropertyDescriptor LOOKUP_FREQUENCY = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor LOOKUP_FREQUENCY = new Builder()
             .name("tailfile-lookup-frequency")
             .displayName("Lookup frequency")
             .description("Only used in Multiple files mode. It specifies the 
minimum "
@@ -225,7 +231,7 @@ public class TailFile extends AbstractProcessor {
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .build();
 
-    static final PropertyDescriptor MAXIMUM_AGE = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor MAXIMUM_AGE = new Builder()
             .name("tailfile-maximum-age")
             .displayName("Maximum age")
             .description("Only used in Multiple files mode. It specifies the 
necessary "
@@ -237,7 +243,7 @@ public class TailFile extends AbstractProcessor {
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
             .build();
 
-    static final PropertyDescriptor REREAD_ON_NUL = new 
PropertyDescriptor.Builder()
+    static final PropertyDescriptor REREAD_ON_NUL = new Builder()
             .name("reread-on-nul")
             .displayName("Reread when NUL encountered")
             .description("If this option is set to 'true', when a NUL 
character is read, the processor will yield and try to read the same part again 
later. "
@@ -251,6 +257,30 @@ public class TailFile extends AbstractProcessor {
             .defaultValue("false")
             .build();
 
+    static final PropertyDescriptor LINE_START_PATTERN = new Builder()
+        .name("Line Start Pattern")
+        .displayName("Line Start Pattern")
+        .description("A Regular Expression to match against the start of a log 
line. If specified, any line that matches the expression, and any following 
lines, will be buffered until another line" +
+            " matches the Expression. In doing this, we can avoid splitting 
apart multi-line messages in the file. This assumes that the data is in UTF-8 
format.")
+        .required(false)
+        .addValidator(REGULAR_EXPRESSION_VALIDATOR)
+        .expressionLanguageSupported(NONE)
+        .dependsOn(MODE, MODE_SINGLEFILE)
+        .build();
+
+    static final PropertyDescriptor MAX_BUFFER_LENGTH = new Builder()
+        .name("Max Buffer Size")
+        .displayName("Max Buffer Size")
+        .description("When using the Line Start Pattern, there may be 
situations in which the data in the file being tailed never matches the Regular 
Expression. This would result in the processor " +
+            "buffering all data from the tailed file, which can quickly 
exhaust the heap. To avoid this, the Processor will buffer only up to this 
amount of data before flushing the buffer, even if" +
+            " it means ingesting partial data from the file.")
+        .required(true)
+        .addValidator(DATA_SIZE_VALIDATOR)
+        .expressionLanguageSupported(NONE)
+        .defaultValue("64 KB")
+        .dependsOn(LINE_START_PATTERN)
+        .build();
+
     static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("All FlowFiles are routed to this Relationship.")
@@ -261,6 +291,10 @@ public class TailFile extends AbstractProcessor {
     private volatile AtomicBoolean isMultiChanging = new AtomicBoolean(false);
     private volatile boolean requireStateLookup = true;
 
+    private volatile ByteArrayOutputStream linesBuffer = new 
ByteArrayOutputStream();
+    private volatile Pattern lineStartPattern;
+    private volatile long maxBufferBytes;
+
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
@@ -275,6 +309,8 @@ public class TailFile extends AbstractProcessor {
         properties.add(LOOKUP_FREQUENCY);
         properties.add(MAXIMUM_AGE);
         properties.add(REREAD_ON_NUL);
+        properties.add(LINE_START_PATTERN);
+        properties.add(MAX_BUFFER_LENGTH);
         return properties;
     }
 
@@ -341,6 +377,14 @@ public class TailFile extends AbstractProcessor {
     }
 
     @OnScheduled
+    public void compileRegex(final ProcessContext context) {
+        final String regex = 
context.getProperty(LINE_START_PATTERN).getValue();
+        lineStartPattern = (regex == null) ? null : Pattern.compile(regex);
+
+        this.maxBufferBytes = 
context.getProperty(MAX_BUFFER_LENGTH).asDataSize(DataUnit.B).longValue();
+    }
+
+    @OnScheduled
     public void recoverState(final ProcessContext context) throws IOException {
         // set isMultiChanging
         
isMultiChanging.set(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue()));
@@ -581,13 +625,16 @@ public class TailFile extends AbstractProcessor {
     }
 
     @OnStopped
-    public void cleanup() {
+    public void cleanup(final ProcessContext context) {
         for (TailFileObject tfo : states.values()) {
             cleanReader(tfo);
             final TailFileState state = tfo.getState();
-            tfo.setState(new TailFileState(state.getFilename(), 
state.getFile(), null, state.getPosition(),
+            tfo.setState(new TailFileState(state.getFilename(), 
state.getFile(), null, state.getPosition() - linesBuffer.size(),
                 state.getTimestamp(), state.getLength(), state.getChecksum(), 
state.getBuffer(), state.isTailingPostRollover()));
+            persistState(tfo, null, context);
         }
+
+        linesBuffer.reset();
     }
 
     private void cleanReader(TailFileObject tfo) {
@@ -643,7 +690,7 @@ public class TailFile extends AbstractProcessor {
             return;
         }
 
-        for (String tailFile : states.keySet()) {
+        for (final String tailFile : states.keySet()) {
             try {
                 processTailFile(context, session, tailFile);
             } catch (NulCharacterEncounteredException e) {
@@ -652,6 +699,12 @@ public class TailFile extends AbstractProcessor {
                 return;
             }
         }
+
+        // If a Line Start Pattern is being used and data is buffered, the 
Position that has been stored in the state will
+        // not be accurate. To address this, we call cleanup(), which will 
handle updating the state to the correct values for us.
+        if (lineStartPattern != null && linesBuffer.size() > 0) {
+            cleanup(context);
+        }
     }
 
     private void processTailFile(final ProcessContext context, final 
ProcessSession session, final String tailFile) {
@@ -667,7 +720,7 @@ public class TailFile extends AbstractProcessor {
             if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
                 recoverRolledFiles(context, session, tailFile, 
tfo.getExpectedRecoveryChecksum(), tfo.getState().getTimestamp(), 
tfo.getState().getPosition());
             } else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) {
-                cleanup();
+                cleanup(context);
                 tfo.setState(new TailFileState(tailFile, null, null, 0L, 0L, 
0L, null, tfo.getState().getBuffer()));
             } else {
                 final String filename = tailFile;
@@ -687,7 +740,7 @@ public class TailFile extends AbstractProcessor {
                     }
 
                     fileChannel.position(position);
-                    cleanup();
+                    cleanup(context);
                     tfo.setState(new TailFileState(filename, file, 
fileChannel, position, timestamp, file.length(), checksum, 
tfo.getState().getBuffer()));
                 } catch (final IOException ioe) {
                     getLogger().error("Attempted to position Reader at current 
position in file {} but failed to do so due to {}", new Object[]{file, 
ioe.toString()}, ioe);
@@ -847,8 +900,12 @@ public class TailFile extends AbstractProcessor {
             flowFile = session.putAllAttributes(flowFile, attributes);
 
             session.getProvenanceReporter().receive(flowFile, 
file.toURI().toString(), "FlowFile contains bytes " + position + " through " + 
positionHolder.get() + " of source file",
-                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNanos));
+                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
             session.transfer(flowFile, REL_SUCCESS);
+            getLogger().debug("Created {} and routed to success", flowFile);
+        }
+
+        if (flowFile.getSize() > 0 || linesBuffer.size() > 0) {
             position = positionHolder.get();
 
             // Set timestamp to the latest of when the file was modified and 
the current timestamp stored in the state.
@@ -858,7 +915,6 @@ public class TailFile extends AbstractProcessor {
             // rotated file a second time.
             timestamp = Math.max(state.getTimestamp(), file.lastModified());
             length = file.length();
-            getLogger().debug("Created {} and routed to success", new 
Object[]{flowFile});
         }
 
         // Create a new state object to represent our current position, 
timestamp, etc.
@@ -930,7 +986,7 @@ public class TailFile extends AbstractProcessor {
                         case '\n': {
                             baos.write(ch);
                             seenCR = false;
-                            flushByteArrayOutputStream(baos, out, checksum);
+                            flushByteArrayOutputStream(baos, out, checksum, 
false);
                             rePos = pos + i + 1;
                             linesRead++;
                             break;
@@ -948,7 +1004,7 @@ public class TailFile extends AbstractProcessor {
                         default: {
                             if (seenCR) {
                                 seenCR = false;
-                                flushByteArrayOutputStream(baos, out, 
checksum);
+                                flushByteArrayOutputStream(baos, out, 
checksum, false);
                                 linesRead++;
                                 baos.write(ch);
                                 rePos = pos + i;
@@ -963,7 +1019,7 @@ public class TailFile extends AbstractProcessor {
             }
 
             if (readFully) {
-                flushByteArrayOutputStream(baos, out, checksum);
+                flushByteArrayOutputStream(baos, out, checksum, true);
                 rePos = reader.position();
             }
 
@@ -976,14 +1032,53 @@ public class TailFile extends AbstractProcessor {
         }
     }
 
-    private void flushByteArrayOutputStream(final ByteArrayOutputStream baos, 
final OutputStream out, final Checksum checksum) throws IOException {
-        baos.writeTo(out);
+    private void flushByteArrayOutputStream(final ByteArrayOutputStream baos, 
final OutputStream out, final Checksum checksum, final boolean ignoreRegex) 
throws IOException {
         final byte[] baosBuffer = baos.toByteArray();
-        checksum.update(baosBuffer, 0, baos.size());
+        baos.reset();
+
+        // If the regular expression is being ignored, we need to flush 
anything that is buffered.
+        // This happens, for example, when a file has been rolled over. At 
that point, we want to flush whatever we have,
+        // even if the regex hasn't been matched.
+        if (ignoreRegex) {
+            flushLinesBuffer(out, checksum);
+        }
+
+        if (lineStartPattern == null) {
+            out.write(baosBuffer);
+
+            checksum.update(baosBuffer, 0, baosBuffer.length);
+            if (getLogger().isTraceEnabled()) {
+                getLogger().trace("Checksum updated to {}", 
checksum.getValue());
+            }
+
+            return;
+        }
+
+        final String bufferAsString = new String(baosBuffer, 
StandardCharsets.UTF_8);
+        final String[] lines = bufferAsString.split("\n");
+        for (final String line : lines) {
+            final boolean startsWithRegex = 
lineStartPattern.matcher(line).lookingAt();
+
+            if (startsWithRegex || linesBuffer.size() >= maxBufferBytes) {
+                // We found a line that matches our start regex. Flush what we 
have buffered and reset our buffer.
+                flushLinesBuffer(out, checksum);
+            }
+
+            // This line does not match our start regex. Buffer this line 
until we encounter a line that does.
+            linesBuffer.write(line.getBytes(StandardCharsets.UTF_8));
+            linesBuffer.write(NEW_LINE_BYTES);
+        }
+    }
+
+    private void flushLinesBuffer(final OutputStream out, final Checksum 
checksum) throws IOException {
+        linesBuffer.writeTo(out);
+
+        checksum.update(linesBuffer.toByteArray(), 0, linesBuffer.size());
         if (getLogger().isTraceEnabled()) {
             getLogger().trace("Checksum updated to {}", checksum.getValue());
         }
-        baos.reset();
+
+        linesBuffer.reset();
     }
 
     /**
@@ -1074,7 +1169,8 @@ public class TailFile extends AbstractProcessor {
 
     private void persistState(final Map<String, String> state, final 
ProcessSession session, final ProcessContext context) {
         try {
-            final StateMap oldState = session.getState(getStateScope(context));
+            final Scope scope = getStateScope(context);
+            final StateMap oldState = session == null ? 
context.getStateManager().getState(scope) : session.getState(scope);
             Map<String, String> updatedState = new HashMap<>();
 
             for(String key : oldState.toMap().keySet()) {
@@ -1092,7 +1188,11 @@ public class TailFile extends AbstractProcessor {
 
             updatedState.putAll(state);
 
-            session.setState(updatedState, getStateScope(context));
+            if (session == null) {
+                context.getStateManager().setState(updatedState, scope);
+            } else {
+                session.setState(updatedState, scope);
+            }
         } catch (final IOException e) {
             getLogger().warn("Failed to store state due to {}; some data may 
be duplicated on restart of NiFi", new Object[]{e});
         }
@@ -1361,7 +1461,7 @@ public class TailFile extends AbstractProcessor {
 
                 // use a timestamp of lastModified() + 1 so that we do not 
ingest this file again.
                 getLogger().debug("Completed tailing of file {}; will cleanup 
state", tailFile);
-                cleanup();
+                cleanup(context);
                 tfo.setState(new TailFileState(tailFile, null, null, 0L, 
fileToTail.lastModified() + 1L, fileToTail.length(), null, 
tfo.getState().getBuffer(), tailingPostRollover));
             }
 
@@ -1383,9 +1483,16 @@ public class TailFile extends AbstractProcessor {
      * @return the new, updated state that reflects that the given file has 
been
      * ingested.
      */
-    private TailFileState consumeFileFully(final File file, final 
ProcessContext context, final ProcessSession session, TailFileObject tfo) {
+    private TailFileState consumeFileFully(final File file, final 
ProcessContext context, final ProcessSession session, TailFileObject tfo) 
throws IOException {
         FlowFile flowFile = session.create();
-        flowFile = session.importFrom(file.toPath(), true, flowFile);
+
+        try (final InputStream fis = new FileInputStream(file)) {
+            flowFile = session.write(flowFile, out -> {
+                flushLinesBuffer(out, new CRC32());
+                StreamUtils.copy(fis, out);
+            });
+        }
+
         if (flowFile.getSize() == 0L) {
             session.remove(flowFile);
         } else {
@@ -1399,7 +1506,7 @@ public class TailFile extends AbstractProcessor {
             getLogger().debug("Created {} from {} and routed to success", new 
Object[]{flowFile, file});
 
             // use a timestamp of lastModified() + 1 so that we do not ingest 
this file again.
-            cleanup();
+            cleanup(context);
             tfo.setState(new 
TailFileState(context.getProperty(FILENAME).evaluateAttributeExpressions().getValue(),
 null, null, 0L, file.lastModified() + 1L, file.length(), null,
                     tfo.getState().getBuffer()));
 
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
index 1585e71..e5013b2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
@@ -22,27 +22,48 @@
     </head>
 
     <body>
-        <h3>Modes</h3>
+
+               <h3>Introduction</h3>
+               <p>
+                       This processor offers a powerful capability, allowing 
the user to periodically look at a file that is actively being written to by 
another process.
+                       When the file changes, the new lines are ingested. This 
Processor assumes that data in the file is textual.
+               </p>
+               <p>
+                       Tailing a file from a filesystem is a seemingly simple 
but notoriously difficult task. This is because we are periodically checking 
the contents
+                       of a file that is being written to. The file may be 
constantly changing, or it may rarely change. The file may be "rolled over" 
(i.e., renamed)
+                       and it's important that even after restarting the 
application (NiFi, in this case), we are able to pick up where we left off. 
Other additional complexities
+                       also come into play. For example, NFS mounted drives 
may indicate that data is readable but then return NUL bytes (Unicode 0) when 
attempting to read, as
+                       the actual bytes are not yet known (see the &lt;Reread 
when NUL encountered&gt; property), and file systems have different timestamp 
granularities.
+               </p>
+               <p>
+                       This Processor is designed to handle all of these 
different cases. This can lead to slightly more complex configuration, but this 
document should provide
+                       you with all you need to get started!
+               </p>
+
+
+               <h3>Modes</h3>
         <p>
-            This processor is used to tail a file or multiple files according 
to multiple modes. The 
+            This processor is used to tail a file or multiple files, depending 
on the configured mode. The
             mode to choose depends of the logging pattern followed by the 
file(s) to tail. In any case, if there
-            is a rolling pattern, the rolling files must be plain text files 
(compression is not supported at 
+            is a rolling pattern, the rolling files must be plain text files 
(compression is not supported at
             the moment).
         </p>
+
         <ul>
                <li><b>Single file</b>: the processor will tail the file with 
the path given in 'File(s) to tail' property.</li>
                <li><b>Multiple files</b>: the processor will look for files 
into the 'Base directory'. It will look for file recursively
                according to the 'Recursive lookup' property and will tail all 
the files matching the regular expression
                provided in the 'File(s) to tail' property.</li>
         </ul>
+
         <h3>Rolling filename pattern</h3>
         <p>
                In case the 'Rolling filename pattern' property is used, when 
the processor detects that the file to tail has rolled over, the
-               processor will look for possible missing messages in the rolled 
file. To do so, the processor will use the pattern to find the 
+               processor will look for possible missing messages in the rolled 
file. To do so, the processor will use the pattern to find the
                rolling files in the same directory as the file to tail.
         </p>
         <p>
-               In order to keep this property available in the 'Multiple 
files' mode when multiples files to tail are in the same directory, 
+               In order to keep this property available in the 'Multiple 
files' mode when multiples files to tail are in the same directory,
                it is possible to use the ${filename} tag to reference the name 
(without extension) of the file to tail. For example, if we have:
         </p>
                <p>
@@ -72,7 +93,7 @@
                and new log messages are always appended in my-app.log file.
         </p>
         <p>
-               In case recursivity is set to 'true'. The regular expression 
for the files to tail must embrace the possible intermediate directories 
+               In case recursivity is set to 'true'. The regular expression 
for the files to tail must embrace the possible intermediate directories
                between the base directory and the files to tail. Example:
         </p>
         <p>
@@ -89,26 +110,71 @@
                        Recursivity: true
                </code>
                </p>
-        <p>
+
+               <p>
                If the processor is configured with '<b>Multiple files</b>' 
mode, two additional properties are relevant:
         </p>
+
         <ul>
                <li><b>Lookup frequency</b>: specifies the minimum duration the 
processor will wait before listing again the files to tail.</li>
-               <li><b>Maximum age</b>: specifies the necessary minimum 
duration to consider that no new messages will be appended in a file 
+               <li><b>Maximum age</b>: specifies the necessary minimum 
duration to consider that no new messages will be appended in a file
                regarding its last modification date. If the amount of time 
that has elapsed since the file was modified is larger than this
                period of time, the file will not be tailed. For example, if a 
file was modified 24 hours ago and this property is set to 12 hours,
                the file will not be tailed. But if this property is set to 36 
hours, then the file will continue to be tailed.</li>
         </ul>
-        <p>
-               It is necessary to pay attention to 'Lookup frequency' and 
'Maximum age' properties, as well as the frequency at which the processor is 
-               triggered, in order to achieve high performance. It is 
recommended to keep 'Maximum age' > 'Lookup frequency' > processor scheduling 
-               frequency to avoid missing data. It also recommended not to set 
'Maximum Age' too low because if messages are appended in a file 
+
+               <p>
+               It is necessary to pay attention to 'Lookup frequency' and 
'Maximum age' properties, as well as the frequency at which the processor is
+               triggered, in order to achieve high performance. It is 
recommended to keep 'Maximum age' > 'Lookup frequency' > processor scheduling
+               frequency to avoid missing data. It also recommended not to set 
'Maximum Age' too low because if messages are appended in a file
                after this file has been considered "too old", all the messages 
in the file may be read again, leading to data duplication.
         </p>
-        <p>
-               If the processor is configured with '<b>Multiple files</b>' 
mode, the 'Rolling 
+
+               <p>
+               If the processor is configured with '<b>Multiple files</b>' 
mode, the 'Rolling
                filename pattern' property must be specific enough to ensure 
that only the rolling files will be listed and not other currently tailed
                files in the same directory (this can be achieved using 
${filename} tag).
         </p>
+
+
+               <h3>Handling Multi-Line Messages</h3>
+               <p>
+                       Most of the time, when we tail a file, we are happy to 
receive data periodically, however it was written to the file. There are 
scenarios, though,
+                       where we may have data written in such a way that 
multiple lines need to be retained together. Take, for example, the following 
lines of text that
+                       might be found in a log file:
+               </p>
+       <code>
+               <pre>
+2021-07-09 14:12:19,731 INFO [main] org.apache.nifi.NiFi Launching NiFi...
+2021-07-09 14:12:19,915 INFO [main] o.a.n.p.AbstractBootstrapPropertiesLoader 
Determined default application properties path to be 
'/Users/mpayne/devel/nifi/nifi-assembly/target/nifi-1.14.0-SNAPSHOT-bin/nifi-1.14.0-SNAPSHOT/./conf/nifi.properties'
+2021-07-09 14:12:19,919 INFO [main] o.a.nifi.properties.NiFiPropertiesLoader 
Loaded 199 properties from 
/Users/mpayne/devel/nifi/nifi-assembly/target/nifi-1.14.0-SNAPSHOT-bin/nifi-1.14.0-SNAPSHOT/./conf/nifi.properties
+2021-07-09 14:12:19,925 WARN Line 1 of Log Message
+                       Line 2: This is an important warning.
+                       Line 3: Please do not ignore this warning.
+                       Line 4: These lines of text make sense only in the 
context of the original message.
+2021-07-09 14:12:19,941 INFO [main] Final message in log file
+               </pre>
+       </code>
+
+               <p>
+                       In this case, we may want to ensure that the log lines 
are not ingested in such a way that our multi-line log message is not broken up 
into Lines 1 and 2 in one FlowFile
+                       and Lines 3 and 4 in another. To accomplish this, the 
Processor exposes the &lt;Line Start Pattern&gt; property. If we set this 
Property to a value of
+                       <code>\d{4}-\d{2}-\d{2}</code>, then we are telling the 
Processor that each message should begin with 4 digits, followed by a dash, 
followed by 2 digits, a dash, and 2 digits.
+                       I.e., we are telling it that each message begins with a 
timestamp in yyyy-MM-dd format. Because of this, even if the Processor runs and 
sees only Lines 1 and 2 of our
+                       multiline log message, it will not ingest the data yet. 
It will wait until it sees the next message, which starts with a timestamp.
+               </p>
+               <p>
+                       Note that, because of this, the last message that the 
Processor will encounter in the above situation is the "Final message in log 
file" line. At this point, the Processor does
+                       not know whether the next line of text it encounters 
will be part of this line or a new message. As such, it will not ingest this 
data. It will wait until either another message
+                       is encountered (that matches our regex) or until the 
file is rolled over (renamed). Because of this, there may be some delay in 
ingesting the last message in the file, if the process
+                       that writes to the file just stops writing at this 
point.
+               </p>
+
+               <p>
+                       Additionally, we run the chance of the Regular 
Expression not matching the data in the file. This could result in buffering 
all of the file's content, which could cause NiFi
+                       to run out of memory. To avoid this, the &lt;Max Buffer 
Size&gt; property limits the amount of data that can be buffered. If this 
amount of data is buffered, it will be flushed
+                       to the FlowFile, even if another message hasn't been 
encountered.
+               </p>
+
     </body>
 </html>
\ No newline at end of file
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractTestTailFileScenario.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractTestTailFileScenario.java
index 2c98a6f..8d890be 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractTestTailFileScenario.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractTestTailFileScenario.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
 
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.After;
@@ -110,7 +111,7 @@ public class AbstractTestTailFileScenario {
             randomAccessFile.close();
         }
 
-        processor.cleanup();
+        processor.cleanup(new MockProcessContext(processor));
     }
 
     public void testScenario(List<Action> actions) throws Exception {
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index e7d6177..a9ac7e5 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -22,6 +22,7 @@ import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.processors.standard.TailFile.TailFileState;
 import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.After;
@@ -29,6 +30,8 @@ import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
@@ -58,6 +61,7 @@ import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeFalse;
 
 public class TestTailFile {
+    private static final Logger logger = 
LoggerFactory.getLogger(TestTailFile.class);
 
     private File file;
     private File existingFile;
@@ -118,7 +122,7 @@ public class TestTailFile {
             otherRaf.close();
         }
 
-        processor.cleanup();
+        processor.cleanup(new MockProcessContext(processor));
 
         final File[] files = file.getParentFile().listFiles();
         if (files != null) {
@@ -834,6 +838,73 @@ public class TestTailFile {
     }
 
     @Test
+    public void testMultiLineWaitsForRegexMatchShutdownBetweenReads() throws 
IOException {
+        testMultiLineWaitsForRegexMatch(true);
+    }
+
+    @Test
+    public void testMultiLineWaitsForRegexMatchWithoutShutdownBetweenReads() 
throws IOException {
+        testMultiLineWaitsForRegexMatch(false);
+    }
+
+    private void testMultiLineWaitsForRegexMatch(final boolean 
shutdownBetweenReads) throws IOException {
+        runner.setProperty(TailFile.LINE_START_REGEX, "<\\d>");
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+
+        final String line1 = "<1>Hello, World\n";
+        final String line2 = "<2>Good-bye, World\n";
+        final String line3 = "<3>Start of multi-line\n";
+        final String line4 = "<4>Last One\n";
+
+        raf.write(line1.getBytes());
+        raf.write(line2.getBytes());
+
+        runner.run(1, shutdownBetweenReads, true);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        raf.write(line3.getBytes());
+        runner.run(1, shutdownBetweenReads, shutdownBetweenReads);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        for (int i=0; i < 10; i++) {
+            logger.info("i = " + i);
+            raf.write(String.valueOf(i).getBytes());
+            raf.write("\n".getBytes());
+
+            runner.run(1, shutdownBetweenReads, shutdownBetweenReads);
+            runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+        }
+
+        // The state should indicate that the position is only equal to the 
length of the first 2 lines because that's all that has been emitted.
+        final Map<String, String> stateMap = 
runner.getStateManager().getState(Scope.LOCAL).toMap();
+        assertEquals(String.valueOf(line1.length() + line2.length() + 
line3.length() + 20), stateMap.get("file.0.length"));
+        assertEquals(String.valueOf(line1.length() + line2.length()), 
stateMap.get("file.0.position"));
+
+        raf.write(line4.getBytes());
+        runner.run(1, shutdownBetweenReads, shutdownBetweenReads);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
+        final MockFlowFile multiLineOutputFile = 
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0);
+        multiLineOutputFile.assertContentEquals("<3>Start of 
multi-line\n0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n");
+        runner.clearTransferState();
+
+        // roll the file
+        raf.close();
+        file.renameTo(new File("target/log.1"));
+        raf = new RandomAccessFile(file, "rw");
+        raf.write(new byte[0]);
+
+        runner.run(1, shutdownBetweenReads, shutdownBetweenReads);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        final MockFlowFile finalOutputFile = 
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0);
+        finalOutputFile.assertContentEquals("<4>Last One\n");
+    }
+
+
+    @Test
     public void testRolloverAndUpdateAtSameTime() throws IOException {
         runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
 

Reply via email to