exceptionfactory commented on a change in pull request #5251:
URL: https://github.com/apache/nifi/pull/5251#discussion_r681108549
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -976,14 +1032,53 @@ private long readLines(final FileChannel reader, final
ByteBuffer buffer, final
}
}
- private void flushByteArrayOutputStream(final ByteArrayOutputStream baos,
final OutputStream out, final Checksum checksum) throws IOException {
- baos.writeTo(out);
+ private void flushByteArrayOutputStream(final ByteArrayOutputStream baos,
final OutputStream out, final Checksum checksum, final boolean ignoreRegex)
throws IOException {
final byte[] baosBuffer = baos.toByteArray();
- checksum.update(baosBuffer, 0, baos.size());
+ baos.reset();
+
+ // If the regular expression is being ignored, we need to flush
anything that is buffered.
+ // This happens, for example, when a file has been rolled over. At
that point, we want to flush whatever we have,
+ // even if the regex hasn't been matched.
+ if (ignoreRegex) {
+ flushLinesBuffer(out, checksum);
+ }
+
+ if (lineStartRegex == null) {
+ out.write(baosBuffer);
+
+ checksum.update(baosBuffer, 0, baosBuffer.length);
+ if (getLogger().isTraceEnabled()) {
+ getLogger().trace("Checksum updated to {}",
checksum.getValue());
+ }
+
+ return;
+ }
+
+ final String bufferAsString = new String(baosBuffer,
StandardCharsets.UTF_8);
+ final String[] lines = bufferAsString.split("\n");
Review comment:
Should this be replaced with `System.lineSeparator()` or perhaps changed
to a configurable property to work across different platforms?
```suggestion
final String[] lines = bufferAsString.split(System.lineSeparator());
```
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -251,6 +257,30 @@
.defaultValue("false")
.build();
+ static final PropertyDescriptor LINE_START_REGEX = new Builder()
+ .name("Line Start Regex")
+ .displayName("Line Start Regex")
Review comment:
With the existing property named `Rolling Filename Pattern`, what do you
think about naming this property `Line Start Pattern`?
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -108,6 +113,7 @@
public class TailFile extends AbstractProcessor {
static final String MAP_PREFIX = "file.";
+ private static final byte[] NEW_LINE_BYTES =
"\n".getBytes(StandardCharsets.UTF_8);
Review comment:
Should this use `System.lineSeparator()`?
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
##########
@@ -833,6 +834,73 @@ public void testConsumeWhenNewLineFound() throws
IOException, InterruptedExcepti
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("return\r\r\n");
}
+ @Test
+ public void testMultiLineWaitsForRegexMatchShutdownBetweenReads() throws
IOException {
+ testMultiLineWaitsForRegexMatch(true);
+ }
+
+ @Test
+ public void testMultiLineWaitsForRegexMatchWithoutShutdownBetweenReads()
throws IOException {
+ testMultiLineWaitsForRegexMatch(false);
+ }
+
+ private void testMultiLineWaitsForRegexMatch(final boolean
shutdownBetweenReads) throws IOException {
+ runner.setProperty(TailFile.LINE_START_REGEX, "<\\d>");
+ runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+
+ final String line1 = "<1>Hello, World\n";
+ final String line2 = "<2>Good-bye, World\n";
+ final String line3 = "<3>Start of multi-line\n";
+ final String line4 = "<4>Last One\n";
+
+ raf.write(line1.getBytes());
+ raf.write(line2.getBytes());
+
+ runner.run(1, shutdownBetweenReads, true);
+
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+ runner.clearTransferState();
+
+ raf.write(line3.getBytes());
+ runner.run(1, shutdownBetweenReads, shutdownBetweenReads);
+ runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+ runner.clearTransferState();
+
+ for (int i=0; i < 10; i++) {
+ System.out.println("i = " + i);
Review comment:
Although this is helpful for diagnosing the test, it seems like it would
be better to change the output to use a logger as opposed to `System.out`.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -847,8 +900,12 @@ public void process(final OutputStream rawOut) throws
IOException {
flowFile = session.putAllAttributes(flowFile, attributes);
session.getProvenanceReporter().receive(flowFile,
file.toURI().toString(), "FlowFile contains bytes " + position + " through " +
positionHolder.get() + " of source file",
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startNanos));
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
session.transfer(flowFile, REL_SUCCESS);
+ getLogger().debug("Created {} and routed to success", new
Object[]{flowFile});
Review comment:
The `Object[]` wrapper is no longer necessary, so this could be
streamlined:
```suggestion
getLogger().debug("Created {} and routed to success", flowFile);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]