exceptionfactory commented on a change in pull request #5251:
URL: https://github.com/apache/nifi/pull/5251#discussion_r681108549



##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -976,14 +1032,53 @@ private long readLines(final FileChannel reader, final 
ByteBuffer buffer, final
         }
     }
 
-    private void flushByteArrayOutputStream(final ByteArrayOutputStream baos, 
final OutputStream out, final Checksum checksum) throws IOException {
-        baos.writeTo(out);
+    private void flushByteArrayOutputStream(final ByteArrayOutputStream baos, 
final OutputStream out, final Checksum checksum, final boolean ignoreRegex) 
throws IOException {
         final byte[] baosBuffer = baos.toByteArray();
-        checksum.update(baosBuffer, 0, baos.size());
+        baos.reset();
+
+        // If the regular expression is being ignored, we need to flush 
anything that is buffered.
+        // This happens, for example, when a file has been rolled over. At 
that point, we want to flush whatever we have,
+        // even if the regex hasn't been matched.
+        if (ignoreRegex) {
+            flushLinesBuffer(out, checksum);
+        }
+
+        if (lineStartRegex == null) {
+            out.write(baosBuffer);
+
+            checksum.update(baosBuffer, 0, baosBuffer.length);
+            if (getLogger().isTraceEnabled()) {
+                getLogger().trace("Checksum updated to {}", 
checksum.getValue());
+            }
+
+            return;
+        }
+
+        final String bufferAsString = new String(baosBuffer, 
StandardCharsets.UTF_8);
+        final String[] lines = bufferAsString.split("\n");

Review comment:
       Should this be replaced with `System.lineSeparator()` or perhaps changed 
to a configurable property to work across different platforms?
   ```suggestion
           final String[] lines = bufferAsString.split(System.lineSeparator());
   ```

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -251,6 +257,30 @@
             .defaultValue("false")
             .build();
 
+    static final PropertyDescriptor LINE_START_REGEX = new Builder()
+        .name("Line Start Regex")
+        .displayName("Line Start Regex")

Review comment:
       With the existing property named `Rolling Filename Pattern`, what do you 
think about naming this property `Line Start Pattern`? 

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -108,6 +113,7 @@
 public class TailFile extends AbstractProcessor {
 
     static final String MAP_PREFIX = "file.";
+    private static final byte[] NEW_LINE_BYTES = 
"\n".getBytes(StandardCharsets.UTF_8);

Review comment:
       Should this use `System.lineSeparator()`?

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
##########
@@ -833,6 +834,73 @@ public void testConsumeWhenNewLineFound() throws 
IOException, InterruptedExcepti
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("return\r\r\n");
     }
 
+    @Test
+    public void testMultiLineWaitsForRegexMatchShutdownBetweenReads() throws 
IOException {
+        testMultiLineWaitsForRegexMatch(true);
+    }
+
+    @Test
+    public void testMultiLineWaitsForRegexMatchWithoutShutdownBetweenReads() 
throws IOException {
+        testMultiLineWaitsForRegexMatch(false);
+    }
+
+    private void testMultiLineWaitsForRegexMatch(final boolean 
shutdownBetweenReads) throws IOException {
+        runner.setProperty(TailFile.LINE_START_REGEX, "<\\d>");
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+
+        final String line1 = "<1>Hello, World\n";
+        final String line2 = "<2>Good-bye, World\n";
+        final String line3 = "<3>Start of multi-line\n";
+        final String line4 = "<4>Last One\n";
+
+        raf.write(line1.getBytes());
+        raf.write(line2.getBytes());
+
+        runner.run(1, shutdownBetweenReads, true);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        raf.write(line3.getBytes());
+        runner.run(1, shutdownBetweenReads, shutdownBetweenReads);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        for (int i=0; i < 10; i++) {
+            System.out.println("i = " + i);

Review comment:
       Although this is helpful for diagnosing the test, it seems like it would 
be better to change the output to use a logger as opposed to `System.out`.

##########
File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
##########
@@ -847,8 +900,12 @@ public void process(final OutputStream rawOut) throws 
IOException {
             flowFile = session.putAllAttributes(flowFile, attributes);
 
             session.getProvenanceReporter().receive(flowFile, 
file.toURI().toString(), "FlowFile contains bytes " + position + " through " + 
positionHolder.get() + " of source file",
-                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNanos));
+                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
             session.transfer(flowFile, REL_SUCCESS);
+            getLogger().debug("Created {} and routed to success", new 
Object[]{flowFile});

Review comment:
       The `Object[]` wrapper is no longer necessary, so this could be 
streamlined:
   ```suggestion
               getLogger().debug("Created {} and routed to success", flowFile);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to