Repository: nifi
Updated Branches:
  refs/heads/master 765df6781 -> 830f7aa84


NIFI-5718: Implemented LineDemarcator and removed NLKBufferedReader in order to 
improve performance


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/564ad0cd
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/564ad0cd
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/564ad0cd

Branch: refs/heads/master
Commit: 564ad0cd71e56b426e02757c7a0a70e28ccbea12
Parents: 765df67
Author: Mark Payne <marka...@hotmail.com>
Authored: Thu Oct 18 12:05:16 2018 -0400
Committer: Peter Wicks <patric...@gmail.com>
Committed: Fri Nov 9 14:26:20 2018 -0700

----------------------------------------------------------------------
 .../nifi/stream/io/RepeatingInputStream.java    | 103 +++++++++++++
 .../stream/io/util/AbstractTextDemarcator.java  | 147 +++++++++++++++++++
 .../nifi/stream/io/util/LineDemarcator.java     | 116 +++++++++++++++
 .../nifi/stream/io/util/TestLineDemarcator.java | 120 +++++++++++++++
 .../nifi/processors/standard/ReplaceText.java   |  71 +++++----
 .../nifi/processors/standard/RouteText.java     |  49 +++----
 .../standard/util/NLKBufferedReader.java        |  76 ----------
 7 files changed, 549 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/564ad0cd/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/RepeatingInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/RepeatingInputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/RepeatingInputStream.java
new file mode 100644
index 0000000..f542741
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/RepeatingInputStream.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Objects;
+
+public class RepeatingInputStream extends InputStream {
+    private final byte[] toRepeat;
+    private final int maxIterations;
+
+    private InputStream bais;
+    private int repeatCount;
+
+
+    public RepeatingInputStream(final byte[] toRepeat, final int iterations) {
+        if (iterations < 1) {
+            throw new IllegalArgumentException();
+        }
+        if (Objects.requireNonNull(toRepeat).length == 0) {
+            throw new IllegalArgumentException();
+        }
+
+        this.toRepeat = toRepeat;
+        this.maxIterations = iterations;
+
+        repeat();
+        bais = new ByteArrayInputStream(toRepeat);
+        repeatCount = 1;
+    }
+
+    @Override
+    public int read() throws IOException {
+        final int value = bais.read();
+        if (value > -1) {
+            return value;
+        }
+
+        final boolean repeated = repeat();
+        if (repeated) {
+            return bais.read();
+        }
+
+        return -1;
+    }
+
+    @Override
+    public int read(final byte[] b, final int off, final int len) throws 
IOException {
+        final int value = bais.read(b, off, len);
+        if (value > -1) {
+            return value;
+        }
+
+        final boolean repeated = repeat();
+        if (repeated) {
+            return bais.read(b, off, len);
+        }
+
+        return -1;
+    }
+
+    @Override
+    public int read(final byte[] b) throws IOException {
+        final int value = bais.read(b);
+        if (value > -1) {
+            return value;
+        }
+
+        final boolean repeated = repeat();
+        if (repeated) {
+            return bais.read(b);
+        }
+
+        return -1;
+    }
+
+    private boolean repeat() {
+        if (repeatCount >= maxIterations) {
+            return false;
+        }
+
+        repeatCount++;
+        bais = new ByteArrayInputStream(toRepeat);
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/564ad0cd/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/AbstractTextDemarcator.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/AbstractTextDemarcator.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/AbstractTextDemarcator.java
new file mode 100644
index 0000000..f10f66d
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/AbstractTextDemarcator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import org.apache.nifi.stream.io.exception.TokenTooLargeException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.nio.BufferOverflowException;
+
+public abstract class AbstractTextDemarcator implements Closeable {
+
+    private static final int INIT_BUFFER_SIZE = 8192;
+
+    private final Reader reader;
+
+    /*
+     * The maximum allowed size of the token. In the event such size is 
exceeded
+     * TokenTooLargeException is thrown.
+     */
+    private final int maxDataSize;
+
+    /*
+     * Buffer into which the bytes are read from the provided stream. The size
+     * of the buffer is defined by the 'initialBufferSize' provided in the
+     * constructor or defaults to the value of INIT_BUFFER_SIZE constant.
+     */
+    char[] buffer;
+
+    /*
+     * Starting offset of the demarcated token within the current 'buffer'.
+     */
+    int index;
+
+    /*
+     * Starting offset of the demarcated token within the current 'buffer'. 
Keep
+     * in mind that while most of the time it is the same as the 'index' it may
+     * also have a value of 0 at which point it serves as a signal to the 
fill()
+     * operation that buffer needs to be expended if end of token is not 
reached
+     * (see fill() operation for more details).
+     */
+    int mark;
+
+    /*
+     * The length of the bytes valid for reading. It is different from the
+     * buffer length, since this number may be smaller (e.g., at he end of the
+     * stream) then actual buffer length. It is set by the fill() operation
+     * every time more bytes read into buffer.
+     */
+    int availableBytesLength;
+
+    /**
+     * Constructs an instance of demarcator with provided {@link InputStream}
+     * and max buffer size. Each demarcated token must fit within max buffer
+     * size, otherwise the exception will be raised.
+     */
+    AbstractTextDemarcator(Reader reader, int maxDataSize) {
+        this(reader, maxDataSize, INIT_BUFFER_SIZE);
+    }
+
+    /**
+     * Constructs an instance of demarcator with provided {@link InputStream}
+     * and max buffer size and initial buffer size. Each demarcated token must
+     * fit within max buffer size, otherwise the exception will be raised.
+     */
+    AbstractTextDemarcator(Reader reader, int maxDataSize, int 
initialBufferSize) {
+        this.validate(reader, maxDataSize, initialBufferSize);
+        this.reader = reader;
+        this.buffer = new char[initialBufferSize];
+        this.maxDataSize = maxDataSize;
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    /**
+     * Will fill the current buffer from current 'index' position, expanding it
+     * and or shuffling it if necessary. If buffer exceeds max buffer size a
+     * {@link TokenTooLargeException} will be thrown.
+     *
+     * @throws IOException
+     *             if unable to read from the stream
+     */
+    void fill() throws IOException {
+        if (this.index >= this.buffer.length) {
+            if (this.mark == 0) { // expand
+                long expandedSize = Math.min(this.buffer.length * 2, 
this.buffer.length + 1_048_576);
+                if (expandedSize > maxDataSize) {
+                    throw new BufferOverflowException();
+                }
+
+                char[] newBuff = new char[(int) expandedSize];
+                System.arraycopy(this.buffer, 0, newBuff, 0, 
this.buffer.length);
+                this.buffer = newBuff;
+            } else { // shift the data left in the buffer
+                int length = this.index - this.mark;
+                System.arraycopy(this.buffer, this.mark, this.buffer, 0, 
length);
+                this.index = length;
+                this.mark = 0;
+            }
+        }
+
+        int bytesRead;
+        /*
+         * The do/while pattern is used here similar to the way it is used in
+         * BufferedReader essentially protecting from assuming the EOS until it
+         * actually is since not every implementation of InputStream guarantees
+         * that bytes are always available while the stream is open.
+         */
+        do {
+            bytesRead = reader.read(this.buffer, this.index, 
this.buffer.length - this.index);
+        } while (bytesRead == 0);
+        this.availableBytesLength = bytesRead != -1 ? this.index + bytesRead : 
-1;
+    }
+
+
+    /**
+     * Validates prerequisites for constructor arguments
+     */
+    private void validate(Reader reader, int maxDataSize, int 
initialBufferSize) {
+        if (reader == null) {
+            throw new IllegalArgumentException("'reader' must not be null");
+        } else if (maxDataSize <= 0) {
+            throw new IllegalArgumentException("'maxDataSize' must be > 0");
+        } else if (initialBufferSize <= 0) {
+            throw new IllegalArgumentException("'initialBufferSize' must be > 
0");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/564ad0cd/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/LineDemarcator.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/LineDemarcator.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/LineDemarcator.java
new file mode 100644
index 0000000..c08b1a4
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/LineDemarcator.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.Charset;
+
+/**
+ * A demarcator that scans an InputStream for line endings (carriage returns 
and new lines) and returns
+ * lines of text one-at-a-time. This is similar to BufferedReader but with a 
very important distinction: while
+ * BufferedReader returns the lines of text after stripping off any line 
endings, this class returns text including the
+ * line endings. So, for example, if the following text is provided:
+ *
+ * <code>ABC\rXYZ\nABCXYZ\r\nhello</code>
+ *
+ * Then calls to {@link #nextLine()} will result in 4 String values being 
returned:
+ *
+ * <ul>
+ *     <li><code>ABC\r</code></li>
+ *     <li><code>XYZ\n</code></li>
+ *     <li><code>ABCXYZ\r\n</code></li>
+ *     <li><code>hello</code></li>
+ * </ul>
+ *
+ * All subsequent calls to {@link #nextLine()} will return <code>null</code>.
+ */
+public class LineDemarcator extends AbstractTextDemarcator {
+    private static final char CARRIAGE_RETURN = '\r';
+    private static final char NEW_LINE = '\n';
+
+    private char lastChar;
+
+    public LineDemarcator(final InputStream in, final Charset charset, final 
int maxDataSize, final int initialBufferSize) {
+        this(new InputStreamReader(in, charset), maxDataSize, 
initialBufferSize);
+    }
+
+    public LineDemarcator(final Reader reader, final int maxDataSize, final 
int initialBufferSize) {
+        super(reader, maxDataSize, initialBufferSize);
+    }
+
+    /**
+     * Will read the next line of text from the {@link InputStream} returning 
null
+     * when it reaches the end of the stream.
+     *
+     * @throws IOException if unable to read from the stream
+     */
+    public String nextLine() throws IOException {
+        while (this.availableBytesLength != -1) {
+            if (this.index >= this.availableBytesLength) {
+                this.fill();
+            }
+
+            if (this.availableBytesLength != -1) {
+                char charVal;
+                int i;
+                for (i = this.index; i < this.availableBytesLength; i++) {
+                    charVal = this.buffer[i];
+
+                    try {
+                        if (charVal == NEW_LINE) {
+                            this.index = i + 1;
+
+                            final int size = this.index - this.mark;
+                            final String line =  new String(this.buffer, mark, 
size);
+
+                            this.mark = this.index;
+                            return line;
+                        } else if (lastChar == CARRIAGE_RETURN) {
+                            // Point this.index to i+1 because that's the next 
byte that we want to consume.
+                            this.index = i + 1;
+
+                            // Size is equal to where the line began, up to 
index-1 because we don't want to consume the last byte encountered.
+                            final int size = this.index - 1 - this.mark;
+                            final String line = new String(this.buffer, mark, 
size);
+
+                            // set 'mark' to index - 1 because we don't want 
to consume the last byte that we've encountered, since we're basing our
+                            // line on the previous byte.
+                            this.mark = this.index - 1;
+                            return line;
+                        }
+                    } finally {
+                        lastChar = charVal;
+                    }
+                }
+
+                this.index = i;
+            } else {
+                final int size = this.index - this.mark;
+                if (size == 0) {
+                    return null;
+                }
+
+                return new String(this.buffer, mark, size);
+            }
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/564ad0cd/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TestLineDemarcator.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TestLineDemarcator.java
 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TestLineDemarcator.java
new file mode 100644
index 0000000..768a60a
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/TestLineDemarcator.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestLineDemarcator {
+
+    @Test
+    public void testSingleCharacterLines() throws IOException {
+        final String input = "A\nB\nC\rD\r\nE\r\nF\r\rG";
+
+        final List<String> lines = getLines(input);
+        assertEquals(Arrays.asList("A\n", "B\n", "C\r", "D\r\n", "E\r\n", 
"F\r", "\r", "G"), lines);
+    }
+
+
+    @Test
+    public void testEmptyStream() throws IOException {
+        final List<String> lines = getLines("");
+        assertEquals(Collections.emptyList(), lines);
+    }
+
+    @Test
+    public void testOnlyEmptyLines() throws IOException {
+        final String input = "\r\r\r\n\n\n\r\n";
+
+        final List<String> lines = getLines(input);
+        assertEquals(Arrays.asList("\r", "\r", "\r\n", "\n", "\n", "\r\n"), 
lines);
+    }
+
+    @Test
+    public void testOnBufferSplit() throws IOException {
+        final String input = "ABC\r\nXYZ";
+        final List<String> lines = getLines(input, 10, 4);
+
+        assertEquals(Arrays.asList("ABC\r\n", "XYZ"), lines);
+    }
+
+    @Test
+    public void testEndsWithCarriageReturn() throws IOException {
+        final List<String> lines = getLines("ABC\r");
+        assertEquals(Arrays.asList("ABC\r"), lines);
+    }
+
+    @Test
+    public void testEndsWithNewLine() throws IOException {
+        final List<String> lines = getLines("ABC\n");
+        assertEquals(Arrays.asList("ABC\n"), lines);
+    }
+
+    @Test
+    public void testEndsWithCarriageReturnNewLine() throws IOException {
+        final List<String> lines = getLines("ABC\r\n");
+        assertEquals(Arrays.asList("ABC\r\n"), lines);
+    }
+
+    @Test
+    public void testReadAheadInIsEol() throws IOException {
+        final String input = "he\ra-to-a\rb-to-b\rc-to-c\r\nd-to-d";
+        final List<String> lines = getLines(input, 10, 10);
+
+        assertEquals(Arrays.asList("he\r", "a-to-a\r", "b-to-b\r", 
"c-to-c\r\n", "d-to-d"), lines);
+    }
+
+    @Test
+    public void testFirstCharMatchOnly() throws IOException {
+        final List<String> lines = getLines("\nThe quick brown fox jumped over 
the lazy dog.");
+        assertEquals(Arrays.asList("\n", "The quick brown fox jumped over the 
lazy dog."), lines);
+    }
+
+    private List<String> getLines(final String text) throws IOException {
+        return getLines(text, 8192, 8192);
+    }
+
+    private List<String> getLines(final String text, final int maxDataSize, 
final int bufferSize) throws IOException {
+        final byte[] bytes = text.getBytes(StandardCharsets.UTF_8);
+
+        final List<String> lines = new ArrayList<>();
+
+        try (final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+             final Reader reader = new InputStreamReader(bais, 
StandardCharsets.UTF_8);
+             final LineDemarcator demarcator = new LineDemarcator(reader, 
maxDataSize, bufferSize)) {
+
+            String line;
+            while ((line = demarcator.nextLine()) != null) {
+                lines.add(line);
+            }
+        }
+
+        return lines;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/564ad0cd/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
index 3108a6c..c6aec0c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ReplaceText.java
@@ -49,14 +49,13 @@ import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.NLKBufferedReader;
 import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.stream.io.util.LineDemarcator;
 import org.apache.nifi.util.StopWatch;
 
 import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.nio.charset.Charset;
@@ -342,7 +341,7 @@ public class ReplaceText extends AbstractProcessor {
                 final StringBuilder sb = new StringBuilder(value.length() + 1);
                 final int groupStart = backRefMatcher.start(1);
 
-                sb.append(value.substring(0, groupStart - 1));
+                sb.append(value, 0, groupStart - 1);
                 sb.append("\\");
                 sb.append(value.substring(groupStart - 1));
                 value = sb.toString();
@@ -370,11 +369,11 @@ public class ReplaceText extends AbstractProcessor {
                 flowFile = session.write(flowFile, new StreamCallback() {
                     @Override
                     public void process(final InputStream in, final 
OutputStream out) throws IOException {
-                        try (NLKBufferedReader br = new NLKBufferedReader(new 
InputStreamReader(in, charset), maxBufferSize);
-                            BufferedWriter bw = new BufferedWriter(new 
OutputStreamWriter(out, charset));) {
+                        try (final LineDemarcator demarcator = new 
LineDemarcator(in, charset, maxBufferSize, 8192);
+                            final BufferedWriter bw = new BufferedWriter(new 
OutputStreamWriter(out, charset))) {
 
                             String line;
-                            while ((line = br.readLine()) != null) {
+                            while ((line = demarcator.nextLine()) != null) {
                                 // We need to determine what line ending was 
used and use that after our replacement value.
                                 lineEndingBuilder.setLength(0);
                                 for (int i = line.length() - 1; i >= 0; i--) {
@@ -423,10 +422,11 @@ public class ReplaceText extends AbstractProcessor {
                 flowFile = session.write(flowFile, new StreamCallback() {
                     @Override
                     public void process(final InputStream in, final 
OutputStream out) throws IOException {
-                        try (NLKBufferedReader br = new NLKBufferedReader(new 
InputStreamReader(in, charset), maxBufferSize);
-                            BufferedWriter bw = new BufferedWriter(new 
OutputStreamWriter(out, charset));) {
+                        try (final LineDemarcator demarcator = new 
LineDemarcator(in, charset, maxBufferSize, 8192);
+                            final BufferedWriter bw = new BufferedWriter(new 
OutputStreamWriter(out, charset))) {
+
                             String oneLine;
-                            while (null != (oneLine = br.readLine())) {
+                            while (null != (oneLine = demarcator.nextLine())) {
                                 final String updatedValue = 
replacementValue.concat(oneLine);
                                 bw.write(updatedValue);
                             }
@@ -461,10 +461,11 @@ public class ReplaceText extends AbstractProcessor {
                 flowFile = session.write(flowFile, new StreamCallback() {
                     @Override
                     public void process(final InputStream in, final 
OutputStream out) throws IOException {
-                        try (NLKBufferedReader br = new NLKBufferedReader(new 
InputStreamReader(in, charset), maxBufferSize);
-                            BufferedWriter bw = new BufferedWriter(new 
OutputStreamWriter(out, charset));) {
+                        try (final LineDemarcator demarcator = new 
LineDemarcator(in, charset, maxBufferSize, 8192);
+                            final BufferedWriter bw = new BufferedWriter(new 
OutputStreamWriter(out, charset))) {
+
                             String oneLine;
-                            while (null != (oneLine = br.readLine())) {
+                            while (null != (oneLine = demarcator.nextLine())) {
                                 // we need to find the first carriage return 
or new-line so that we can append the new value
                                 // before the line separate. However, we don't 
want to do this using a regular expression due
                                 // to performance concerns. So we will find 
the first occurrence of either \r or \n and use
@@ -582,14 +583,15 @@ public class ReplaceText extends AbstractProcessor {
                 updatedFlowFile = session.write(flowFile, new StreamCallback() 
{
                     @Override
                     public void process(final InputStream in, final 
OutputStream out) throws IOException {
-                        try (NLKBufferedReader br = new NLKBufferedReader(new 
InputStreamReader(in, charset), maxBufferSize);
-                            BufferedWriter bw = new BufferedWriter(new 
OutputStreamWriter(out, charset))) {
+                        try (final LineDemarcator demarcator = new 
LineDemarcator(in, charset, maxBufferSize, 8192);
+                            final BufferedWriter bw = new BufferedWriter(new 
OutputStreamWriter(out, charset))) {
+
                             String oneLine;
 
                             final StringBuffer sb = new StringBuffer();
                             Matcher matcher = null;
 
-                            while (null != (oneLine = br.readLine())) {
+                            while (null != (oneLine = demarcator.nextLine())) {
                                 additionalAttrs.clear();
                                 if (matcher == null) {
                                     matcher = searchPattern.matcher(oneLine);
@@ -649,14 +651,7 @@ public class ReplaceText extends AbstractProcessor {
         public FlowFile replace(FlowFile flowFile, final ProcessSession 
session, final ProcessContext context, final String evaluateMode, final Charset 
charset, final int maxBufferSize) {
 
             final String replacementValue = 
context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
-
-            final AttributeValueDecorator quotedAttributeDecorator = new 
AttributeValueDecorator() {
-                @Override
-                public String decorate(final String attributeValue) {
-                    return Pattern.quote(attributeValue);
-                }
-            };
-
+            final AttributeValueDecorator quotedAttributeDecorator = 
Pattern::quote;
             final String searchValue = 
context.getProperty(SEARCH_VALUE).evaluateAttributeExpressions(flowFile, 
quotedAttributeDecorator).getValue();
 
             final int flowFileSize = (int) flowFile.getSize();
@@ -672,16 +667,34 @@ public class ReplaceText extends AbstractProcessor {
                     }
                 });
             } else {
+                final int initialBufferSize = (int) 
Math.min(flowFile.getSize(), 8192);
+                final Pattern searchPattern = Pattern.compile(searchValue, 
Pattern.LITERAL);
+
                 flowFile = session.write(flowFile, new StreamCallback() {
                     @Override
                     public void process(final InputStream in, final 
OutputStream out) throws IOException {
-                        try (NLKBufferedReader br = new NLKBufferedReader(new 
InputStreamReader(in, charset), maxBufferSize);
-                            BufferedWriter bw = new BufferedWriter(new 
OutputStreamWriter(out, charset))) {
+                        try (final LineDemarcator demarcator = new 
LineDemarcator(in, charset, maxBufferSize, initialBufferSize);
+                            final BufferedWriter bw = new BufferedWriter(new 
OutputStreamWriter(out, charset))) {
+
                             String oneLine;
-                            while (null != (oneLine = br.readLine())) {
-                                // Interpreting the search and replacement 
values as char sequences
-                                final String updatedValue = 
oneLine.replace(searchValue, replacementValue);
-                                bw.write(updatedValue);
+                            while (null != (oneLine = demarcator.nextLine())) {
+                                int matches = 0;
+                                int lastEnd = 0;
+
+                                final Matcher matcher = 
searchPattern.matcher(oneLine);
+                                while (matcher.find()) {
+                                    bw.write(oneLine, lastEnd, matcher.start() 
- lastEnd);
+                                    bw.write(replacementValue);
+                                    matches++;
+
+                                    lastEnd = matcher.end();
+                                }
+
+                                if (matches > 0) {
+                                    bw.write(oneLine, lastEnd, 
oneLine.length() - lastEnd);
+                                } else {
+                                    bw.write(oneLine);
+                                }
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/564ad0cd/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
index a87434e..4fbbce6 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/RouteText.java
@@ -19,24 +19,6 @@ package org.apache.nifi.processors.standard;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.CacheBuilder;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.Reader;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.DynamicRelationship;
@@ -68,8 +50,24 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.standard.util.NLKBufferedReader;
+import org.apache.nifi.stream.io.util.LineDemarcator;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 @EventDriven
 @SideEffectFree
@@ -291,7 +289,7 @@ public class RouteText extends AbstractProcessor {
         final Set<String> allDynamicProps = this.dynamicPropertyNames;
         final Set<Relationship> newRelationships = new HashSet<>();
         final String routeStrategy = configuredRouteStrategy;
-        if (ROUTE_TO_MATCHING_PROPERTY_NAME.equals(routeStrategy)) {
+        if (ROUTE_TO_MATCHING_PROPERTY_NAME.getValue().equals(routeStrategy)) {
             for (final String propName : allDynamicProps) {
                 newRelationships.add(new 
Relationship.Builder().name(propName).build());
             }
@@ -419,14 +417,13 @@ public class RouteText extends AbstractProcessor {
         session.read(originalFlowFile, new InputStreamCallback() {
             @Override
             public void process(final InputStream in) throws IOException {
-                try (final Reader inReader = new InputStreamReader(in, 
charset);
-                    final NLKBufferedReader reader = new 
NLKBufferedReader(inReader)) {
+                try (final LineDemarcator demarcator = new LineDemarcator(in, 
charset, Integer.MAX_VALUE, 8192)) {
 
                     final Map<String, String> variables = new HashMap<>(2);
 
                     int lineCount = 0;
                     String line;
-                    while ((line = reader.readLine()) != null) {
+                    while ((line = demarcator.nextLine()) != null) {
 
                         final String matchLine;
                         if (trim) {
@@ -550,11 +547,7 @@ public class RouteText extends AbstractProcessor {
     private void appendLine(final ProcessSession session, final 
Map<Relationship, Map<Group, FlowFile>> flowFileMap, final Relationship 
relationship,
         final FlowFile original, final String line, final Charset charset, 
final Group group) {
 
-        Map<Group, FlowFile> groupToFlowFileMap = 
flowFileMap.get(relationship);
-        if (groupToFlowFileMap == null) {
-            groupToFlowFileMap = new HashMap<>();
-            flowFileMap.put(relationship, groupToFlowFileMap);
-        }
+        final Map<Group, FlowFile> groupToFlowFileMap = 
flowFileMap.computeIfAbsent(relationship, k -> new HashMap<>());
 
         FlowFile flowFile = groupToFlowFileMap.get(group);
         if (flowFile == null) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/564ad0cd/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/NLKBufferedReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/NLKBufferedReader.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/NLKBufferedReader.java
deleted file mode 100644
index df8847f..0000000
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/NLKBufferedReader.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.util;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.Reader;
-
-//NLKBufferedReader = New Line Keeper Buffered Reader
-public class NLKBufferedReader extends BufferedReader {
-    public NLKBufferedReader(Reader in, int sz) {
-        super(in, sz);
-    }
-
-    public NLKBufferedReader(Reader in) {
-        super(in);
-    }
-
-    /**
-     * Reads a line of text in the same manner as {@link BufferedReader} 
except that any line-termination characters (\r and \n) are preserved in the 
String
-     * that is returned from this reader, whereas {@link BufferedReader} will 
strip those out.
-     *
-     * @return A String containing the next line of text (including any 
line-termination characters) from the underlying Reader, or null if no more 
data is available
-     *
-     * @throws IOException If unable to read from teh underlying Reader
-     */
-    @Override
-    public String readLine() throws IOException {
-        final StringBuilder stringBuilder = new StringBuilder();
-
-        int intchar = read();
-        while (intchar != -1) {
-            final char c = (char) intchar;
-            stringBuilder.append(c);
-
-            if (c == '\n') {
-                break;
-            } else if (c == '\r') {
-                // Peek at next character, check if it's \n
-                int charPeek = peek();
-                if (charPeek == '\n') {
-                    stringBuilder.append((char) read());
-                }
-
-                break;
-            }
-
-            intchar = read();
-        }
-
-        final String result = stringBuilder.toString();
-        return (result.length() == 0) ? null : result;
-    }
-
-    public int peek() throws IOException {
-        mark(1);
-        int readByte = read();
-        reset();
-
-        return readByte;
-    }
-}

Reply via email to