Repository: nifi Updated Branches: refs/heads/0.x db189e3b3 -> 5c0c1d3bb
NIFI-2874: Ensure that when reading more data from an InputStream the StreamDemarcator appropriately updates the max index that can be read from the buffer Signed-off-by: Mike Moser <[email protected]> This closes #1114. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/5c0c1d3b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/5c0c1d3b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/5c0c1d3b Branch: refs/heads/0.x Commit: 5c0c1d3bb41fa75622a3a7502ccf86ec49d2fe4d Parents: db189e3 Author: Mark Payne <[email protected]> Authored: Fri Oct 7 09:52:03 2016 -0400 Committer: Mike Moser <[email protected]> Committed: Fri Oct 7 14:24:54 2016 -0400 ---------------------------------------------------------------------- .../nifi/stream/io/util/StreamDemarcator.java | 1 + .../nifi/stream/io/util/StreamDemarcatorTest.java | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/5c0c1d3b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java index 3064f1c..96dc619 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java @@ -138,6 +138,7 @@ public class StreamDemarcator { System.arraycopy(this.buffer, this.mark, this.buffer, 0, length); this.index = length; this.mark = 0; + this.readAheadLength = length; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/5c0c1d3b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java index 93082a2..355cbb6 100644 --- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java @@ -25,6 +25,7 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -218,6 +219,21 @@ public class StreamDemarcatorTest { } @Test + public void testOnBufferSplitNoTrailingDelimiter() throws IOException { + final byte[] inputData = "Yes\nNo".getBytes(StandardCharsets.UTF_8); + ByteArrayInputStream is = new ByteArrayInputStream(inputData); + StreamDemarcator scanner = new StreamDemarcator(is, "\n".getBytes(), 1000, 3); + + final byte[] first = scanner.nextToken(); + final byte[] second = scanner.nextToken(); + assertNotNull(first); + assertNotNull(second); + + assertArrayEquals(first, new byte[] {'Y', 'e', 's'}); + assertArrayEquals(second, new byte[] {'N', 'o'}); + } + + @Test public void verifyScannerHandlesNegativeOneByteDelimiter() { ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 }); StreamDemarcator scanner = new StreamDemarcator(is, new byte[] { -1 }, 20, 1024);
