This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new bf52973 NIFI-8773: Implemented Line Start Pattern in TailFile
bf52973 is described below
commit bf52973d628b092902f389e638448a6cd458d6a2
Author: Mark Payne <[email protected]>
AuthorDate: Fri Jul 9 17:16:21 2021 -0400
NIFI-8773: Implemented Line Start Pattern in TailFile
Each message encountered in the tailed file will be buffered (up to some
configurable max) until the subsequent message arrives. At that point, the
previous message will be flushed.
This closes #5251
Signed-off-by: David Handermann <[email protected]>
---
.../apache/nifi/processors/standard/TailFile.java | 169 +++++++++++++++++----
.../additionalDetails.html | 94 ++++++++++--
.../standard/AbstractTestTailFileScenario.java | 3 +-
.../nifi/processors/standard/TestTailFile.java | 73 ++++++++-
4 files changed, 292 insertions(+), 47 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 2b1bfcd..19e5724 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -32,6 +32,7 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.RequiredPermission;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -40,6 +41,7 @@ import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@@ -60,6 +62,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -84,6 +87,8 @@ import java.util.zip.Checksum;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
import static
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
+import static
org.apache.nifi.processor.util.StandardValidators.DATA_SIZE_VALIDATOR;
+import static
org.apache.nifi.processor.util.StandardValidators.REGULAR_EXPRESSION_VALIDATOR;
// note: it is important that this Processor is not marked as
@SupportsBatching because the session commits must complete before persisting
state locally; otherwise, data loss may occur
@TriggerSerially
@@ -108,6 +113,7 @@ import static
org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGIST
public class TailFile extends AbstractProcessor {
static final String MAP_PREFIX = "file.";
+ private static final byte[] NEW_LINE_BYTES =
"\n".getBytes(StandardCharsets.UTF_8);
static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local",
"Local",
"State is stored locally. Each node in a cluster will tail a
different file.");
@@ -130,7 +136,7 @@ public class TailFile extends AbstractProcessor {
"Start with the data at the end of the File to Tail. Do not ingest
any data thas has already been rolled over or any "
+ "data in the File to Tail that has already been written.");
- static final PropertyDescriptor BASE_DIRECTORY = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor BASE_DIRECTORY = new Builder()
.name("tail-base-directory")
.displayName("Base directory")
.description("Base directory used to look for files to tail. This
property is required when using Multifile mode.")
@@ -139,7 +145,7 @@ public class TailFile extends AbstractProcessor {
.required(false)
.build();
- static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor MODE = new Builder()
.name("tail-mode")
.displayName("Tailing mode")
.description("Mode to use: single file will tail only one file,
multiple file will look for a list of file. In Multiple mode"
@@ -150,7 +156,7 @@ public class TailFile extends AbstractProcessor {
.defaultValue(MODE_SINGLEFILE.getValue())
.build();
- static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
+ static final PropertyDescriptor FILENAME = new Builder()
.displayName("File(s) to Tail")
.name("File to Tail")
.description("Path of the file to tail in case of single file
mode. If using multifile mode, regular expression to find files "
@@ -161,7 +167,7 @@ public class TailFile extends AbstractProcessor {
.required(true)
.build();
- static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new Builder()
.name("Rolling Filename Pattern")
.description("If the file to tail \"rolls over\" as would be the
case with log files, this filename pattern will be used to "
+ "identify files that have rolled over so that if NiFi is
restarted, and the file has rolled over, it will be able to pick up where it
left off. "
@@ -173,7 +179,7 @@ public class TailFile extends AbstractProcessor {
.required(false)
.build();
- static final PropertyDescriptor POST_ROLLOVER_TAIL_PERIOD = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor POST_ROLLOVER_TAIL_PERIOD = new Builder()
.name("Post-Rollover Tail Period")
.description("When a file is rolled over, the processor will continue
tailing the rolled over file until it has not been modified for this amount of
time. " +
"This allows for another process to rollover a file, and then
flush out any buffered data. Note that when this value is set, and the tailed
file rolls over, " +
@@ -186,7 +192,7 @@ public class TailFile extends AbstractProcessor {
.defaultValue("0 sec")
.build();
- static final PropertyDescriptor STATE_LOCATION = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor STATE_LOCATION = new Builder()
.displayName("State Location")
.name("File Location") //retained name of property for backward
compatibility of configs
.description("Specifies where the state is located either local or
cluster so that state can be stored "
@@ -196,7 +202,7 @@ public class TailFile extends AbstractProcessor {
.defaultValue(LOCATION_LOCAL.getValue())
.build();
- static final PropertyDescriptor START_POSITION = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor START_POSITION = new Builder()
.name("Initial Start Position")
.description("When the Processor first begins to tail data, this
property specifies where the Processor should begin reading data. Once data has
been ingested from a file, "
+ "the Processor will continue from the last point from
which it has received data.")
@@ -205,7 +211,7 @@ public class TailFile extends AbstractProcessor {
.required(true)
.build();
- static final PropertyDescriptor RECURSIVE = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor RECURSIVE = new Builder()
.name("tailfile-recursive-lookup")
.displayName("Recursive lookup")
.description("When using Multiple files mode, this property
defines if files must be listed recursively or not"
@@ -215,7 +221,7 @@ public class TailFile extends AbstractProcessor {
.required(true)
.build();
- static final PropertyDescriptor LOOKUP_FREQUENCY = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor LOOKUP_FREQUENCY = new Builder()
.name("tailfile-lookup-frequency")
.displayName("Lookup frequency")
.description("Only used in Multiple files mode. It specifies the
minimum "
@@ -225,7 +231,7 @@ public class TailFile extends AbstractProcessor {
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
- static final PropertyDescriptor MAXIMUM_AGE = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor MAXIMUM_AGE = new Builder()
.name("tailfile-maximum-age")
.displayName("Maximum age")
.description("Only used in Multiple files mode. It specifies the
necessary "
@@ -237,7 +243,7 @@ public class TailFile extends AbstractProcessor {
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
- static final PropertyDescriptor REREAD_ON_NUL = new
PropertyDescriptor.Builder()
+ static final PropertyDescriptor REREAD_ON_NUL = new Builder()
.name("reread-on-nul")
.displayName("Reread when NUL encountered")
.description("If this option is set to 'true', when a NUL
character is read, the processor will yield and try to read the same part again
later. "
@@ -251,6 +257,30 @@ public class TailFile extends AbstractProcessor {
.defaultValue("false")
.build();
+ static final PropertyDescriptor LINE_START_PATTERN = new Builder()
+ .name("Line Start Pattern")
+ .displayName("Line Start Pattern")
+ .description("A Regular Expression to match against the start of a log
line. If specified, any line that matches the expression, and any following
lines, will be buffered until another line" +
+ " matches the Expression. In doing this, we can avoid splitting
apart multi-line messages in the file. This assumes that the data is in UTF-8
format.")
+ .required(false)
+ .addValidator(REGULAR_EXPRESSION_VALIDATOR)
+ .expressionLanguageSupported(NONE)
+ .dependsOn(MODE, MODE_SINGLEFILE)
+ .build();
+
+ static final PropertyDescriptor MAX_BUFFER_LENGTH = new Builder()
+ .name("Max Buffer Size")
+ .displayName("Max Buffer Size")
+ .description("When using the Line Start Pattern, there may be
situations in which the data in the file being tailed never matches the Regular
Expression. This would result in the processor " +
+ "buffering all data from the tailed file, which can quickly
exhaust the heap. To avoid this, the Processor will buffer only up to this
amount of data before flushing the buffer, even if" +
+ " it means ingesting partial data from the file.")
+ .required(true)
+ .addValidator(DATA_SIZE_VALIDATOR)
+ .expressionLanguageSupported(NONE)
+ .defaultValue("64 KB")
+ .dependsOn(LINE_START_PATTERN)
+ .build();
+
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All FlowFiles are routed to this Relationship.")
@@ -261,6 +291,10 @@ public class TailFile extends AbstractProcessor {
private volatile AtomicBoolean isMultiChanging = new AtomicBoolean(false);
private volatile boolean requireStateLookup = true;
+ private volatile ByteArrayOutputStream linesBuffer = new
ByteArrayOutputStream();
+ private volatile Pattern lineStartPattern;
+ private volatile long maxBufferBytes;
+
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>();
@@ -275,6 +309,8 @@ public class TailFile extends AbstractProcessor {
properties.add(LOOKUP_FREQUENCY);
properties.add(MAXIMUM_AGE);
properties.add(REREAD_ON_NUL);
+ properties.add(LINE_START_PATTERN);
+ properties.add(MAX_BUFFER_LENGTH);
return properties;
}
@@ -341,6 +377,14 @@ public class TailFile extends AbstractProcessor {
}
@OnScheduled
+ public void compileRegex(final ProcessContext context) {
+ final String regex =
context.getProperty(LINE_START_PATTERN).getValue();
+ lineStartPattern = (regex == null) ? null : Pattern.compile(regex);
+
+ this.maxBufferBytes =
context.getProperty(MAX_BUFFER_LENGTH).asDataSize(DataUnit.B).longValue();
+ }
+
+ @OnScheduled
public void recoverState(final ProcessContext context) throws IOException {
// set isMultiChanging
isMultiChanging.set(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue()));
@@ -581,13 +625,16 @@ public class TailFile extends AbstractProcessor {
}
@OnStopped
- public void cleanup() {
+ public void cleanup(final ProcessContext context) {
for (TailFileObject tfo : states.values()) {
cleanReader(tfo);
final TailFileState state = tfo.getState();
- tfo.setState(new TailFileState(state.getFilename(),
state.getFile(), null, state.getPosition(),
+ tfo.setState(new TailFileState(state.getFilename(),
state.getFile(), null, state.getPosition() - linesBuffer.size(),
state.getTimestamp(), state.getLength(), state.getChecksum(),
state.getBuffer(), state.isTailingPostRollover()));
+ persistState(tfo, null, context);
}
+
+ linesBuffer.reset();
}
private void cleanReader(TailFileObject tfo) {
@@ -643,7 +690,7 @@ public class TailFile extends AbstractProcessor {
return;
}
- for (String tailFile : states.keySet()) {
+ for (final String tailFile : states.keySet()) {
try {
processTailFile(context, session, tailFile);
} catch (NulCharacterEncounteredException e) {
@@ -652,6 +699,12 @@ public class TailFile extends AbstractProcessor {
return;
}
}
+
+ // If a Line Start Pattern is being used and data is buffered, the
Position that has been stored in the state will
+ // not be accurate. To address this, we call cleanup(), which will
handle updating the state to the correct values for us.
+ if (lineStartPattern != null && linesBuffer.size() > 0) {
+ cleanup(context);
+ }
}
private void processTailFile(final ProcessContext context, final
ProcessSession session, final String tailFile) {
@@ -667,7 +720,7 @@ public class TailFile extends AbstractProcessor {
if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
recoverRolledFiles(context, session, tailFile,
tfo.getExpectedRecoveryChecksum(), tfo.getState().getTimestamp(),
tfo.getState().getPosition());
} else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) {
- cleanup();
+ cleanup(context);
tfo.setState(new TailFileState(tailFile, null, null, 0L, 0L,
0L, null, tfo.getState().getBuffer()));
} else {
final String filename = tailFile;
@@ -687,7 +740,7 @@ public class TailFile extends AbstractProcessor {
}
fileChannel.position(position);
- cleanup();
+ cleanup(context);
tfo.setState(new TailFileState(filename, file,
fileChannel, position, timestamp, file.length(), checksum,
tfo.getState().getBuffer()));
} catch (final IOException ioe) {
getLogger().error("Attempted to position Reader at current
position in file {} but failed to do so due to {}", new Object[]{file,
ioe.toString()}, ioe);
@@ -847,8 +900,12 @@ public class TailFile extends AbstractProcessor {
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile,
file.toURI().toString(), "FlowFile contains bytes " + position + " through " +
positionHolder.get() + " of source file",
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startNanos));
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
session.transfer(flowFile, REL_SUCCESS);
+ getLogger().debug("Created {} and routed to success", flowFile);
+ }
+
+ if (flowFile.getSize() > 0 || linesBuffer.size() > 0) {
position = positionHolder.get();
// Set timestamp to the latest of when the file was modified and
the current timestamp stored in the state.
@@ -858,7 +915,6 @@ public class TailFile extends AbstractProcessor {
// rotated file a second time.
timestamp = Math.max(state.getTimestamp(), file.lastModified());
length = file.length();
- getLogger().debug("Created {} and routed to success", new
Object[]{flowFile});
}
// Create a new state object to represent our current position,
timestamp, etc.
@@ -930,7 +986,7 @@ public class TailFile extends AbstractProcessor {
case '\n': {
baos.write(ch);
seenCR = false;
- flushByteArrayOutputStream(baos, out, checksum);
+ flushByteArrayOutputStream(baos, out, checksum,
false);
rePos = pos + i + 1;
linesRead++;
break;
@@ -948,7 +1004,7 @@ public class TailFile extends AbstractProcessor {
default: {
if (seenCR) {
seenCR = false;
- flushByteArrayOutputStream(baos, out,
checksum);
+ flushByteArrayOutputStream(baos, out,
checksum, false);
linesRead++;
baos.write(ch);
rePos = pos + i;
@@ -963,7 +1019,7 @@ public class TailFile extends AbstractProcessor {
}
if (readFully) {
- flushByteArrayOutputStream(baos, out, checksum);
+ flushByteArrayOutputStream(baos, out, checksum, true);
rePos = reader.position();
}
@@ -976,14 +1032,53 @@ public class TailFile extends AbstractProcessor {
}
}
- private void flushByteArrayOutputStream(final ByteArrayOutputStream baos,
final OutputStream out, final Checksum checksum) throws IOException {
- baos.writeTo(out);
+ private void flushByteArrayOutputStream(final ByteArrayOutputStream baos,
final OutputStream out, final Checksum checksum, final boolean ignoreRegex)
throws IOException {
final byte[] baosBuffer = baos.toByteArray();
- checksum.update(baosBuffer, 0, baos.size());
+ baos.reset();
+
+ // If the regular expression is being ignored, we need to flush
anything that is buffered.
+ // This happens, for example, when a file has been rolled over. At
that point, we want to flush whatever we have,
+ // even if the regex hasn't been matched.
+ if (ignoreRegex) {
+ flushLinesBuffer(out, checksum);
+ }
+
+ if (lineStartPattern == null) {
+ out.write(baosBuffer);
+
+ checksum.update(baosBuffer, 0, baosBuffer.length);
+ if (getLogger().isTraceEnabled()) {
+ getLogger().trace("Checksum updated to {}",
checksum.getValue());
+ }
+
+ return;
+ }
+
+ final String bufferAsString = new String(baosBuffer,
StandardCharsets.UTF_8);
+ final String[] lines = bufferAsString.split("\n");
+ for (final String line : lines) {
+ final boolean startsWithRegex =
lineStartPattern.matcher(line).lookingAt();
+
+ if (startsWithRegex || linesBuffer.size() >= maxBufferBytes) {
+ // We found a line that matches our start regex. Flush what we
have buffered and reset our buffer.
+ flushLinesBuffer(out, checksum);
+ }
+
+ // This line does not match our start regex. Buffer this line
until we encounter a line that does.
+ linesBuffer.write(line.getBytes(StandardCharsets.UTF_8));
+ linesBuffer.write(NEW_LINE_BYTES);
+ }
+ }
+
+ private void flushLinesBuffer(final OutputStream out, final Checksum
checksum) throws IOException {
+ linesBuffer.writeTo(out);
+
+ checksum.update(linesBuffer.toByteArray(), 0, linesBuffer.size());
if (getLogger().isTraceEnabled()) {
getLogger().trace("Checksum updated to {}", checksum.getValue());
}
- baos.reset();
+
+ linesBuffer.reset();
}
/**
@@ -1074,7 +1169,8 @@ public class TailFile extends AbstractProcessor {
private void persistState(final Map<String, String> state, final
ProcessSession session, final ProcessContext context) {
try {
- final StateMap oldState = session.getState(getStateScope(context));
+ final Scope scope = getStateScope(context);
+ final StateMap oldState = session == null ?
context.getStateManager().getState(scope) : session.getState(scope);
Map<String, String> updatedState = new HashMap<>();
for(String key : oldState.toMap().keySet()) {
@@ -1092,7 +1188,11 @@ public class TailFile extends AbstractProcessor {
updatedState.putAll(state);
- session.setState(updatedState, getStateScope(context));
+ if (session == null) {
+ context.getStateManager().setState(updatedState, scope);
+ } else {
+ session.setState(updatedState, scope);
+ }
} catch (final IOException e) {
getLogger().warn("Failed to store state due to {}; some data may
be duplicated on restart of NiFi", new Object[]{e});
}
@@ -1361,7 +1461,7 @@ public class TailFile extends AbstractProcessor {
// use a timestamp of lastModified() + 1 so that we do not
ingest this file again.
getLogger().debug("Completed tailing of file {}; will cleanup
state", tailFile);
- cleanup();
+ cleanup(context);
tfo.setState(new TailFileState(tailFile, null, null, 0L,
fileToTail.lastModified() + 1L, fileToTail.length(), null,
tfo.getState().getBuffer(), tailingPostRollover));
}
@@ -1383,9 +1483,16 @@ public class TailFile extends AbstractProcessor {
* @return the new, updated state that reflects that the given file has
been
* ingested.
*/
- private TailFileState consumeFileFully(final File file, final
ProcessContext context, final ProcessSession session, TailFileObject tfo) {
+ private TailFileState consumeFileFully(final File file, final
ProcessContext context, final ProcessSession session, TailFileObject tfo)
throws IOException {
FlowFile flowFile = session.create();
- flowFile = session.importFrom(file.toPath(), true, flowFile);
+
+ try (final InputStream fis = new FileInputStream(file)) {
+ flowFile = session.write(flowFile, out -> {
+ flushLinesBuffer(out, new CRC32());
+ StreamUtils.copy(fis, out);
+ });
+ }
+
if (flowFile.getSize() == 0L) {
session.remove(flowFile);
} else {
@@ -1399,7 +1506,7 @@ public class TailFile extends AbstractProcessor {
getLogger().debug("Created {} from {} and routed to success", new
Object[]{flowFile, file});
// use a timestamp of lastModified() + 1 so that we do not ingest
this file again.
- cleanup();
+ cleanup(context);
tfo.setState(new
TailFileState(context.getProperty(FILENAME).evaluateAttributeExpressions().getValue(),
null, null, 0L, file.lastModified() + 1L, file.length(), null,
tfo.getState().getBuffer()));
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
index 1585e71..e5013b2 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
@@ -22,27 +22,48 @@
</head>
<body>
- <h3>Modes</h3>
+
+ <h3>Introduction</h3>
+ <p>
+ This processor offers a powerful capability, allowing
the user to periodically look at a file that is actively being written to by
another process.
+ When the file changes, the new lines are ingested. This
Processor assumes that data in the file is textual.
+ </p>
+ <p>
+ Tailing a file from a filesystem is a seemingly simple
but notoriously difficult task. This is because we are periodically checking
the contents
+ of a file that is being written to. The file may be
constantly changing, or it may rarely change. The file may be "rolled over"
(i.e., renamed)
+ and it's important that even after restarting the
application (NiFi, in this case), we are able to pick up where we left off.
Other additional complexities
+ also come into play. For example, NFS mounted drives
may indicate that data is readable but then return NUL bytes (Unicode 0) when
attempting to read, as
+ the actual bytes are not yet known (see the <Reread
when NUL encountered> property), and file systems have different timestamp
granularities.
+ </p>
+ <p>
+ This Processor is designed to handle all of these
different cases. This can lead to slightly more complex configuration, but this
document should provide
+ you with all you need to get started!
+ </p>
+
+
+ <h3>Modes</h3>
<p>
- This processor is used to tail a file or multiple files according
to multiple modes. The
+ This processor is used to tail a file or multiple files, depending
on the configured mode. The
mode to choose depends of the logging pattern followed by the
file(s) to tail. In any case, if there
- is a rolling pattern, the rolling files must be plain text files
(compression is not supported at
+ is a rolling pattern, the rolling files must be plain text files
(compression is not supported at
the moment).
</p>
+
<ul>
<li><b>Single file</b>: the processor will tail the file with
the path given in 'File(s) to tail' property.</li>
<li><b>Multiple files</b>: the processor will look for files
into the 'Base directory'. It will look for file recursively
according to the 'Recursive lookup' property and will tail all
the files matching the regular expression
provided in the 'File(s) to tail' property.</li>
</ul>
+
<h3>Rolling filename pattern</h3>
<p>
In case the 'Rolling filename pattern' property is used, when
the processor detects that the file to tail has rolled over, the
- processor will look for possible missing messages in the rolled
file. To do so, the processor will use the pattern to find the
+ processor will look for possible missing messages in the rolled
file. To do so, the processor will use the pattern to find the
rolling files in the same directory as the file to tail.
</p>
<p>
- In order to keep this property available in the 'Multiple
files' mode when multiples files to tail are in the same directory,
+ In order to keep this property available in the 'Multiple
files' mode when multiples files to tail are in the same directory,
it is possible to use the ${filename} tag to reference the name
(without extension) of the file to tail. For example, if we have:
</p>
<p>
@@ -72,7 +93,7 @@
and new log messages are always appended in my-app.log file.
</p>
<p>
- In case recursivity is set to 'true'. The regular expression
for the files to tail must embrace the possible intermediate directories
+ In case recursivity is set to 'true'. The regular expression
for the files to tail must embrace the possible intermediate directories
between the base directory and the files to tail. Example:
</p>
<p>
@@ -89,26 +110,71 @@
Recursivity: true
</code>
</p>
- <p>
+
+ <p>
If the processor is configured with '<b>Multiple files</b>'
mode, two additional properties are relevant:
</p>
+
<ul>
<li><b>Lookup frequency</b>: specifies the minimum duration the
processor will wait before listing again the files to tail.</li>
- <li><b>Maximum age</b>: specifies the necessary minimum
duration to consider that no new messages will be appended in a file
+ <li><b>Maximum age</b>: specifies the necessary minimum
duration to consider that no new messages will be appended in a file
regarding its last modification date. If the amount of time
that has elapsed since the file was modified is larger than this
period of time, the file will not be tailed. For example, if a
file was modified 24 hours ago and this property is set to 12 hours,
the file will not be tailed. But if this property is set to 36
hours, then the file will continue to be tailed.</li>
</ul>
- <p>
- It is necessary to pay attention to 'Lookup frequency' and
'Maximum age' properties, as well as the frequency at which the processor is
- triggered, in order to achieve high performance. It is
recommended to keep 'Maximum age' > 'Lookup frequency' > processor scheduling
- frequency to avoid missing data. It also recommended not to set
'Maximum Age' too low because if messages are appended in a file
+
+ <p>
+ It is necessary to pay attention to 'Lookup frequency' and
'Maximum age' properties, as well as the frequency at which the processor is
+ triggered, in order to achieve high performance. It is
recommended to keep 'Maximum age' > 'Lookup frequency' > processor scheduling
+ frequency to avoid missing data. It also recommended not to set
'Maximum Age' too low because if messages are appended in a file
after this file has been considered "too old", all the messages
in the file may be read again, leading to data duplication.
</p>
- <p>
- If the processor is configured with '<b>Multiple files</b>'
mode, the 'Rolling
+
+ <p>
+ If the processor is configured with '<b>Multiple files</b>'
mode, the 'Rolling
filename pattern' property must be specific enough to ensure
that only the rolling files will be listed and not other currently tailed
files in the same directory (this can be achieved using
${filename} tag).
</p>
+
+
+ <h3>Handling Multi-Line Messages</h3>
+ <p>
+ Most of the time, when we tail a file, we are happy to
receive data periodically, however it was written to the file. There are
scenarios, though,
+ where we may have data written in such a way that
multiple lines need to be retained together. Take, for example, the following
lines of text that
+ might be found in a log file:
+ </p>
+ <code>
+ <pre>
+2021-07-09 14:12:19,731 INFO [main] org.apache.nifi.NiFi Launching NiFi...
+2021-07-09 14:12:19,915 INFO [main] o.a.n.p.AbstractBootstrapPropertiesLoader
Determined default application properties path to be
'/Users/mpayne/devel/nifi/nifi-assembly/target/nifi-1.14.0-SNAPSHOT-bin/nifi-1.14.0-SNAPSHOT/./conf/nifi.properties'
+2021-07-09 14:12:19,919 INFO [main] o.a.nifi.properties.NiFiPropertiesLoader
Loaded 199 properties from
/Users/mpayne/devel/nifi/nifi-assembly/target/nifi-1.14.0-SNAPSHOT-bin/nifi-1.14.0-SNAPSHOT/./conf/nifi.properties
+2021-07-09 14:12:19,925 WARN Line 1 of Log Message
+ Line 2: This is an important warning.
+ Line 3: Please do not ignore this warning.
+ Line 4: These lines of text make sense only in the
context of the original message.
+2021-07-09 14:12:19,941 INFO [main] Final message in log file
+ </pre>
+ </code>
+
+ <p>
+ In this case, we may want to ensure that the log lines
are not ingested in such a way that our multi-line log message is not broken up
into Lines 1 and 2 in one FlowFile
+ and Lines 3 and 4 in another. To accomplish this, the
Processor exposes the <Line Start Pattern> property. If we set this
Property to a value of
+ <code>\d{4}-\d{2}-\d{2}</code>, then we are telling the
Processor that each message should begin with 4 digits, followed by a dash,
followed by 2 digits, a dash, and 2 digits.
+ I.e., we are telling it that each message begins with a
timestamp in yyyy-MM-dd format. Because of this, even if the Processor runs and
sees only Lines 1 and 2 of our
+ multiline log message, it will not ingest the data yet.
It will wait until it sees the next message, which starts with a timestamp.
+ </p>
+ <p>
+ Note that, because of this, the last message that the
Processor will encounter in the above situation is the "Final message in log
file" line. At this point, the Processor does
+ not know whether the next line of text it encounters
will be part of this line or a new message. As such, it will not ingest this
data. It will wait until either another message
+ is encountered (that matches our regex) or until the
file is rolled over (renamed). Because of this, there may be some delay in
ingesting the last message in the file, if the process
+ that writes to the file just stops writing at this
point.
+ </p>
+
+ <p>
+ Additionally, we run the chance of the Regular
Expression not matching the data in the file. This could result in buffering
all of the file's content, which could cause NiFi
+ to run out of memory. To avoid this, the <Max Buffer
Size> property limits the amount of data that can be buffered. If this
amount of data is buffered, it will be flushed
+ to the FlowFile, even if another message hasn't been
encountered.
+ </p>
+
</body>
</html>
\ No newline at end of file
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractTestTailFileScenario.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractTestTailFileScenario.java
index 2c98a6f..8d890be 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractTestTailFileScenario.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/AbstractTestTailFileScenario.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
import org.apache.commons.lang3.SystemUtils;
import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
@@ -110,7 +111,7 @@ public class AbstractTestTailFileScenario {
randomAccessFile.close();
}
- processor.cleanup();
+ processor.cleanup(new MockProcessContext(processor));
}
public void testScenario(List<Action> actions) throws Exception {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index e7d6177..a9ac7e5 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -22,6 +22,7 @@ import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.processors.standard.TailFile.TailFileState;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.After;
@@ -29,6 +30,8 @@ import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
@@ -58,6 +61,7 @@ import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;
public class TestTailFile {
+ private static final Logger logger =
LoggerFactory.getLogger(TestTailFile.class);
private File file;
private File existingFile;
@@ -118,7 +122,7 @@ public class TestTailFile {
otherRaf.close();
}
- processor.cleanup();
+ processor.cleanup(new MockProcessContext(processor));
final File[] files = file.getParentFile().listFiles();
if (files != null) {
@@ -834,6 +838,73 @@ public class TestTailFile {
}
@Test
+ public void testMultiLineWaitsForRegexMatchShutdownBetweenReads() throws
IOException {
+ testMultiLineWaitsForRegexMatch(true);
+ }
+
+ @Test
+ public void testMultiLineWaitsForRegexMatchWithoutShutdownBetweenReads()
throws IOException {
+ testMultiLineWaitsForRegexMatch(false);
+ }
+
+ private void testMultiLineWaitsForRegexMatch(final boolean
shutdownBetweenReads) throws IOException {
+ runner.setProperty(TailFile.LINE_START_REGEX, "<\\d>");
+ runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+
+ final String line1 = "<1>Hello, World\n";
+ final String line2 = "<2>Good-bye, World\n";
+ final String line3 = "<3>Start of multi-line\n";
+ final String line4 = "<4>Last One\n";
+
+ raf.write(line1.getBytes());
+ raf.write(line2.getBytes());
+
+ runner.run(1, shutdownBetweenReads, true);
+
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+ runner.clearTransferState();
+
+ raf.write(line3.getBytes());
+ runner.run(1, shutdownBetweenReads, shutdownBetweenReads);
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+ runner.clearTransferState();
+
+ for (int i=0; i < 10; i++) {
+ logger.info("i = " + i);
+ raf.write(String.valueOf(i).getBytes());
+ raf.write("\n".getBytes());
+
+ runner.run(1, shutdownBetweenReads, shutdownBetweenReads);
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+ }
+
+ // The state should indicate that the position is only equal to the
length of the first 2 lines because that's all that has been emitted.
+ final Map<String, String> stateMap =
runner.getStateManager().getState(Scope.LOCAL).toMap();
+ assertEquals(String.valueOf(line1.length() + line2.length() +
line3.length() + 20), stateMap.get("file.0.length"));
+ assertEquals(String.valueOf(line1.length() + line2.length()),
stateMap.get("file.0.position"));
+
+ raf.write(line4.getBytes());
+ runner.run(1, shutdownBetweenReads, shutdownBetweenReads);
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+
+ final MockFlowFile multiLineOutputFile =
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0);
+ multiLineOutputFile.assertContentEquals("<3>Start of
multi-line\n0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n");
+ runner.clearTransferState();
+
+ // roll the file
+ raf.close();
+ file.renameTo(new File("target/log.1"));
+ raf = new RandomAccessFile(file, "rw");
+ raf.write(new byte[0]);
+
+ runner.run(1, shutdownBetweenReads, shutdownBetweenReads);
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+ final MockFlowFile finalOutputFile =
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0);
+ finalOutputFile.assertContentEquals("<4>Last One\n");
+ }
+
+
+ @Test
public void testRolloverAndUpdateAtSameTime() throws IOException {
runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");