NIFI-1296, NIFI-1680, NIFI-1764, NIFI-1837, NIFI-1827, NIFI-1699  implemented 
new  Kafka processors that leverage Kafka 0.9 API
- Improved StreamScanner for better performance
- Renamed StreamScanner to StreamDemarcator as suggested by Joe
- Added failure handling logic to ensure both processors can be reset to their 
initial state (as if they were just started)
- Provided comprehensive test suite to validate various aspects of both Publish 
and Consume from Kafka
- Added relevant javadocs
- Added initial additionalDetails docs
- Addressed NPE reported by NIFI-1764
- Life-cycle refactoring for the existing PutKafka to ensure producer restart 
after errors
- Incorporated code changes contributed by Ralph Perko (see NIFI-1837)
- Addressed partition issue in RoundRobinPartitioner discussed in NIFI-1827
- Updated PropertyDescriptor descriptions to reflect their purpose

NIFI-1296 added @Ignore on some Kafka tests to improve test time

NIFI-1296 reworked tests to avoid dependency on embedded Kafka

NIFI-1296 fixed spelling error

NIFI-1296 fixed trailing whitespaces in non-java files

This closes #366

NIFI-1296 fixed pom files for 0.x branch

NIFI-1296 removed trailing white spaces in non-Java files


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/640b7021
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/640b7021
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/640b7021

Branch: refs/heads/0.x
Commit: 640b70214f9e40706bd880a877c9ba82770c1b83
Parents: c9340dc
Author: Oleg Zhurakousky <[email protected]>
Authored: Thu Apr 7 07:15:25 2016 -0400
Committer: Oleg Zhurakousky <[email protected]>
Committed: Thu May 19 09:07:50 2016 -0400

----------------------------------------------------------------------
 nifi-assembly/pom.xml                           |   5 +
 .../nifi/stream/io/util/StreamDemarcator.java   | 191 ++++++++
 .../nifi/stream/io/util/StreamScanner.java      | 164 -------
 .../stream/io/util/StreamDemarcatorTest.java    | 227 +++++++++
 .../nifi/stream/io/util/StreamScannerTests.java | 130 ------
 .../kafka/AbstractKafkaProcessor.java           | 145 ++++++
 .../nifi/processors/kafka/KafkaPublisher.java   | 274 ++++++-----
 .../nifi/processors/kafka/Partitioners.java     |   9 +-
 .../processors/kafka/PublishingContext.java     | 151 ++++++
 .../apache/nifi/processors/kafka/PutKafka.java  | 257 ++++++-----
 .../kafka/SplittableMessageContext.java         | 123 -----
 .../kafka/GetKafkaIntegrationTests.java         |   4 +
 .../processors/kafka/KafkaPublisherTest.java    | 126 +++--
 .../nifi/processors/kafka/PutKafkaTest.java     | 228 ++++++++++
 .../kafka/SplittableMessageContextTest.java     |  66 ---
 .../nifi/processors/kafka/TestPutKafka.java     | 270 -----------
 .../nifi-kafka-pubsub-nar/pom.xml               |  35 ++
 .../src/main/resources/META-INF/LICENSE         | 299 ++++++++++++
 .../src/main/resources/META-INF/NOTICE          |  72 +++
 .../nifi-kafka-pubsub-processors/pom.xml        |  79 ++++
 .../kafka/pubsub/AbstractKafkaProcessor.java    | 334 ++++++++++++++
 .../processors/kafka/pubsub/ConsumeKafka.java   | 242 ++++++++++
 .../processors/kafka/pubsub/KafkaPublisher.java | 232 ++++++++++
 .../processors/kafka/pubsub/Partitioners.java   |  61 +++
 .../processors/kafka/pubsub/PublishKafka.java   | 359 +++++++++++++++
 .../kafka/pubsub/PublishingContext.java         | 139 ++++++
 .../org.apache.nifi.processor.Processor         |  16 +
 .../additionalDetails.html                      |  33 ++
 .../additionalDetails.html                      |  47 ++
 .../AbstractKafkaProcessorLifecycelTest.java    | 456 +++++++++++++++++++
 .../kafka/pubsub/ConsumeKafkaTest.java          | 167 +++++++
 .../kafka/pubsub/KafkaPublisherTest.java        | 302 ++++++++++++
 .../kafka/pubsub/PublishKafkaTest.java          | 254 +++++++++++
 .../kafka/pubsub/PublishingContextTest.java     | 106 +++++
 .../kafka/pubsub/StubConsumeKafka.java          |  71 +++
 .../kafka/pubsub/StubPublishKafka.java          |  96 ++++
 .../nifi/processors/kafka/pubsub/TestUtils.java |  46 ++
 .../processors/kafka/test/EmbeddedKafka.java    | 226 +++++++++
 .../kafka/test/EmbeddedKafkaProducerHelper.java | 110 +++++
 .../src/test/resources/log4j.properties         |  21 +
 .../src/test/resources/server.properties        | 121 +++++
 .../src/test/resources/zookeeper.properties     |  20 +
 nifi-nar-bundles/nifi-kafka-bundle/pom.xml      |   7 +
 pom.xml                                         |   6 +
 44 files changed, 5307 insertions(+), 1020 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 05ebff8..a95220d 100644
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -159,6 +159,11 @@ language governing permissions and limitations under the 
License. -->
         </dependency>
         <dependency>
             <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kafka-pubsub-nar</artifactId>
+            <type>nar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-http-context-map-nar</artifactId>
             <type>nar</type>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
new file mode 100644
index 0000000..3064f1c
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamDemarcator.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * The <code>StreamDemarcator</code> class takes an input stream and demarcates
+ * it so it could be read (see {@link #nextToken()}) as individual byte[]
+ * demarcated by the provided delimiter. If delimiter is not provided the 
entire
+ * stream will be read into a single token which may result in
+ * {@link OutOfMemoryError} if stream is too large.
+ */
+public class StreamDemarcator {
+
+    private final static int INIT_BUFFER_SIZE = 8192;
+
+    private final InputStream is;
+
+    private final byte[] delimiterBytes;
+
+    private final int maxDataSize;
+
+    private final int initialBufferSize;
+
+
+    private byte[] buffer;
+
+    private int index;
+
+    private int mark;
+
+    private int readAheadLength;
+
+    /**
+     * Constructs a new instance
+     *
+     * @param is
+     *            instance of {@link InputStream} representing the data
+     * @param delimiterBytes
+     *            byte array representing delimiter bytes used to split the
+     *            input stream. Can be null
+     * @param maxDataSize
+     *            maximum size of data derived from the input stream. This 
means
+     *            that neither {@link InputStream} nor its individual chunks 
(if
+     *            delimiter is used) can ever be greater then this size.
+     */
+    public StreamDemarcator(InputStream is, byte[] delimiterBytes, int 
maxDataSize) {
+        this(is, delimiterBytes, maxDataSize, INIT_BUFFER_SIZE);
+    }
+
+    /**
+     * Constructs a new instance
+     *
+     * @param is
+     *            instance of {@link InputStream} representing the data
+     * @param delimiterBytes
+     *            byte array representing delimiter bytes used to split the
+     *            input stream. Can be null
+     * @param maxDataSize
+     *            maximum size of data derived from the input stream. This 
means
+     *            that neither {@link InputStream} nor its individual chunks 
(if
+     *            delimiter is used) can ever be greater then this size.
+     * @param initialBufferSize
+     *            initial size of the buffer used to buffer {@link InputStream}
+     *            or its parts (if delimiter is used) to create its byte[]
+     *            representation. Must be positive integer. The buffer will 
grow
+     *            automatically as needed up to the Integer.MAX_VALUE;
+     *
+     */
+    public StreamDemarcator(InputStream is, byte[] delimiterBytes, int 
maxDataSize, int initialBufferSize) {
+        this.validateInput(is, delimiterBytes, maxDataSize, initialBufferSize);
+        this.is = is;
+        this.delimiterBytes = delimiterBytes;
+        this.initialBufferSize = initialBufferSize;
+        this.buffer = new byte[initialBufferSize];
+        this.maxDataSize = maxDataSize;
+    }
+
+    /**
+     * Will read the next data token from the {@link InputStream} returning 
null
+     * when it reaches the end of the stream.
+     */
+    public byte[] nextToken() {
+        byte[] data = null;
+        int j = 0;
+
+        while (data == null && this.buffer != null) {
+            if (this.index >= this.readAheadLength) {
+                this.fill();
+            }
+            if (this.index >= this.readAheadLength) {
+                data = this.extractDataToken(0);
+                this.buffer = null;
+            } else {
+                byte byteVal = this.buffer[this.index++];
+                if (this.delimiterBytes != null && this.delimiterBytes[j] == 
byteVal) {
+                    if (++j == this.delimiterBytes.length) {
+                        data = 
this.extractDataToken(this.delimiterBytes.length);
+                        this.mark = this.index;
+                        j = 0;
+                    }
+                } else {
+                    j = 0;
+                }
+            }
+        }
+        return data;
+    }
+
+    /**
+     * Will fill the current buffer from current 'index' position, expanding it
+     * and or shuffling it if necessary
+     */
+    private void fill() {
+        if (this.index >= this.buffer.length) {
+            if (this.mark == 0) { // expand
+                byte[] newBuff = new byte[this.buffer.length + 
this.initialBufferSize];
+                System.arraycopy(this.buffer, 0, newBuff, 0, 
this.buffer.length);
+                this.buffer = newBuff;
+            } else { // shuffle
+                int length = this.index - this.mark;
+                System.arraycopy(this.buffer, this.mark, this.buffer, 0, 
length);
+                this.index = length;
+                this.mark = 0;
+            }
+        }
+
+        try {
+            int bytesRead;
+            do {
+                bytesRead = this.is.read(this.buffer, this.index, 
this.buffer.length - this.index);
+            } while (bytesRead == 0);
+
+            if (bytesRead != -1) {
+                this.readAheadLength = this.index + bytesRead;
+                if (this.readAheadLength > this.maxDataSize) {
+                    throw new IllegalStateException("Maximum allowed data size 
of " + this.maxDataSize + " exceeded.");
+                }
+            }
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed while reading 
InputStream", e);
+        }
+    }
+
+    /**
+     * Will extract data token from the current buffer. The length of the data
+     * token is between the current 'mark' and 'index' minus 'lengthSubtract'
+     * which signifies the length of the delimiter (if any). If the above
+     * subtraction results in length 0, null is returned.
+     */
+    private byte[] extractDataToken(int lengthSubtract) {
+        byte[] data = null;
+        int length = this.index - this.mark - lengthSubtract;
+        if (length > 0) {
+            data = new byte[length];
+            System.arraycopy(this.buffer, this.mark, data, 0, data.length);
+        }
+        return data;
+    }
+
+    /**
+     *
+     */
+    private void validateInput(InputStream is, byte[] delimiterBytes, int 
maxDataSize, int initialBufferSize) {
+        if (is == null) {
+            throw new IllegalArgumentException("'is' must not be null");
+        } else if (maxDataSize <= 0) {
+            throw new IllegalArgumentException("'maxDataSize' must be > 0");
+        } else if (initialBufferSize <= 0) {
+            throw new IllegalArgumentException("'initialBufferSize' must be > 
0");
+        } else if (delimiterBytes != null && delimiterBytes.length == 0){
+            throw new IllegalArgumentException("'delimiterBytes' is an 
optional argument, but when provided its length must be > 0");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java
deleted file mode 100644
index 901f31a..0000000
--- 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/util/StreamScanner.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io.util;
-
-import java.io.BufferedInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-
-/**
- *
- */
-public class StreamScanner {
-
-    private final static int EOF = -1;
-
-    private final InputStream is;
-
-    private final byte[] delimiterBytes;
-
-    private final int maxDataSize;
-
-    private ByteBuffer buffer;
-
-    private byte[] data;
-
-    /**
-     * Constructs a new instance
-     *
-     * @param is
-     *            instance of {@link InputStream} representing the data
-     * @param delimiterBytes
-     *            byte array representing delimiter bytes used to split the
-     *            input stream. Can be null
-     * @param maxDataSize
-     *            maximum size of data derived from the input stream. This 
means
-     *            that neither {@link InputStream} nor its individual chunks 
(if
-     *            delimiter is used) can ever be greater then this size.
-     */
-    public StreamScanner(InputStream is, byte[] delimiterBytes, int 
maxDataSize) {
-        this(is, delimiterBytes, maxDataSize, 8192);
-    }
-
-    /**
-     * Constructs a new instance
-     *
-     * @param is
-     *            instance of {@link InputStream} representing the data
-     * @param delimiterBytes
-     *            byte array representing delimiter bytes used to split the
-     *            input stream. Can be null
-     * @param maxDataSize
-     *            maximum size of data derived from the input stream. This 
means
-     *            that neither {@link InputStream} nor its individual chunks 
(if
-     *            delimiter is used) can ever be greater then this size.
-     * @param initialBufferSize
-     *            initial size of the buffer used to buffer {@link InputStream}
-     *            or its parts (if delimiter is used) to create its byte[]
-     *            representation. Must be positive integer. The buffer will 
grow
-     *            automatically as needed up to the Integer.MAX_VALUE;
-     *
-     */
-    public StreamScanner(InputStream is, byte[] delimiterBytes, int 
maxDataSize, int initialBufferSize) {
-        this.is = new BufferedInputStream(is);
-        this.delimiterBytes = delimiterBytes;
-        this.buffer = ByteBuffer.allocate(initialBufferSize);
-        this.maxDataSize = maxDataSize;
-    }
-
-    /**
-     * Checks if there are more elements in the stream. This operation is
-     * idempotent.
-     *
-     * @return <i>true</i> if there are more elements in the stream or
-     *         <i>false</i> when it reaches the end of the stream after the 
last
-     *         element was retrieved via {@link #next()} operation.
-     */
-    public boolean hasNext() {
-        int j = 0;
-        int readVal = 0;
-        while (this.data == null && readVal != EOF) {
-            this.expandBufferIfNecessary();
-            try {
-                readVal = this.is.read();
-            } catch (IOException e) {
-                throw new IllegalStateException("Failed while reading 
InputStream", e);
-            }
-            if (readVal == EOF) {
-                this.extractDataToken(0);
-            } else {
-                byte byteVal = (byte)readVal;
-                this.buffer.put(byteVal);
-                if (this.buffer.position() > this.maxDataSize) {
-                    throw new IllegalStateException("Maximum allowed data size 
of " + this.maxDataSize + " exceeded.");
-                }
-                if (this.delimiterBytes != null && this.delimiterBytes[j] == 
byteVal) {
-                    if (++j == this.delimiterBytes.length) {
-                        this.extractDataToken(this.delimiterBytes.length);
-                        j = 0;
-                    }
-                } else {
-                    j = 0;
-                }
-            }
-        }
-        return this.data != null;
-    }
-
-    /**
-     * @return byte array representing the next segment in the stream or the
-     *         whole stream if no delimiter is used
-     */
-    public byte[] next() {
-        try {
-            return this.data;
-        } finally {
-            this.data = null;
-        }
-    }
-
-    /**
-     *
-     */
-    private void expandBufferIfNecessary() {
-        if (this.buffer.position() == Integer.MAX_VALUE ){
-            throw new IllegalStateException("Internal buffer has reached the 
capacity and can not be expended any further");
-        }
-        if (this.buffer.remaining() == 0) {
-            this.buffer.flip();
-            int pos = this.buffer.capacity();
-            int newSize = this.buffer.capacity() * 2 > Integer.MAX_VALUE ? 
Integer.MAX_VALUE : this.buffer.capacity() * 2;
-            ByteBuffer bb = ByteBuffer.allocate(newSize);
-            bb.put(this.buffer);
-            this.buffer = bb;
-            this.buffer.position(pos);
-        }
-    }
-
-    /**
-     *
-     */
-    private void extractDataToken(int lengthSubtract) {
-        this.buffer.flip();
-        if (this.buffer.limit() > 0){ // something must be in the buffer; at 
least delimiter (e.g., \n)
-            this.data = new byte[this.buffer.limit() - lengthSubtract];
-            this.buffer.get(this.data);
-        }
-        this.buffer.clear();
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
new file mode 100644
index 0000000..996d5ab
--- /dev/null
+++ 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamDemarcatorTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.stream.io.util;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+public class StreamDemarcatorTest {
+
+    @Test
+    public void validateInitializationFailure() {
+        try {
+            new StreamDemarcator(null, null, -1);
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+
+        try {
+            new StreamDemarcator(mock(InputStream.class), null, -1);
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+
+        try {
+            new StreamDemarcator(mock(InputStream.class), null, 10, -1);
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+
+        try {
+            new StreamDemarcator(mock(InputStream.class), new byte[0], 10, 1);
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+    }
+
+    @Test
+    public void validateNoDelimiter() {
+        String data = "Learn from yesterday, live for today, hope for 
tomorrow. The important thing is not to stop questioning.";
+        ByteArrayInputStream is = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+        StreamDemarcator scanner = new StreamDemarcator(is, null, 1000);
+        assertTrue(Arrays.equals(data.getBytes(StandardCharsets.UTF_8), 
scanner.nextToken()));
+        // validate that subsequent invocations of nextToken() do not result 
in exception
+        assertNull(scanner.nextToken());
+        assertNull(scanner.nextToken());
+    }
+
+    @Test
+    public void validateNoDelimiterSmallInitialBuffer() {
+        String data = "Learn from yesterday, live for today, hope for 
tomorrow. The important thing is not to stop questioning.";
+        ByteArrayInputStream is = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+        StreamDemarcator scanner = new StreamDemarcator(is, null, 1000, 1);
+        assertTrue(Arrays.equals(data.getBytes(StandardCharsets.UTF_8), 
scanner.nextToken()));
+    }
+
+    @Test
+    public void validateSingleByteDelimiter() {
+        String data = "Learn from yesterday, live for today, hope for 
tomorrow. The important thing is not to stop questioning.";
+        ByteArrayInputStream is = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+        StreamDemarcator scanner = new StreamDemarcator(is, 
",".getBytes(StandardCharsets.UTF_8), 1000);
+        assertTrue(Arrays.equals("Learn from 
yesterday".getBytes(StandardCharsets.UTF_8), scanner.nextToken()));
+        assertTrue(Arrays.equals(" live for 
today".getBytes(StandardCharsets.UTF_8), scanner.nextToken()));
+        assertTrue(Arrays.equals(" hope for tomorrow. The important thing is 
not to stop questioning.".getBytes(StandardCharsets.UTF_8), 
scanner.nextToken()));
+        assertNull(scanner.nextToken());
+    }
+
+    @Test
+    public void validateDelimiterAtTheBeginning() {
+        String data = ",Learn from yesterday, live for today, hope for 
tomorrow. The important thing is not to stop questioning.";
+        ByteArrayInputStream is = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+        StreamDemarcator scanner = new StreamDemarcator(is, 
",".getBytes(StandardCharsets.UTF_8), 1000);
+        assertTrue(Arrays.equals("Learn from 
yesterday".getBytes(StandardCharsets.UTF_8), scanner.nextToken()));
+        assertTrue(Arrays.equals(" live for 
today".getBytes(StandardCharsets.UTF_8), scanner.nextToken()));
+        assertTrue(Arrays.equals(" hope for tomorrow. The important thing is 
not to stop questioning.".getBytes(StandardCharsets.UTF_8), 
scanner.nextToken()));
+        assertNull(scanner.nextToken());
+    }
+
+    @Test
+    public void validateEmptyDelimiterSegments() {
+        String data = ",,,,,Learn from yesterday, live for today, hope for 
tomorrow. The important thing is not to stop questioning.";
+        ByteArrayInputStream is = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+        StreamDemarcator scanner = new StreamDemarcator(is, 
",".getBytes(StandardCharsets.UTF_8), 1000);
+        assertTrue(Arrays.equals("Learn from 
yesterday".getBytes(StandardCharsets.UTF_8), scanner.nextToken()));
+        assertTrue(Arrays.equals(" live for 
today".getBytes(StandardCharsets.UTF_8), scanner.nextToken()));
+        assertTrue(Arrays.equals(" hope for tomorrow. The important thing is 
not to stop questioning.".getBytes(StandardCharsets.UTF_8), 
scanner.nextToken()));
+        assertNull(scanner.nextToken());
+    }
+
+    @Test
+    public void validateSingleByteDelimiterSmallInitialBuffer() {
+        String data = "Learn from yesterday, live for today, hope for 
tomorrow. The important thing is not to stop questioning.";
+        ByteArrayInputStream is = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+        StreamDemarcator scanner = new StreamDemarcator(is, 
",".getBytes(StandardCharsets.UTF_8), 1000, 2);
+        assertTrue(Arrays.equals("Learn from 
yesterday".getBytes(StandardCharsets.UTF_8), scanner.nextToken()));
+        assertTrue(Arrays.equals(" live for 
today".getBytes(StandardCharsets.UTF_8), scanner.nextToken()));
+        assertTrue(Arrays.equals(" hope for tomorrow. The important thing is 
not to stop questioning.".getBytes(StandardCharsets.UTF_8), 
scanner.nextToken()));
+        assertNull(scanner.nextToken());
+    }
+
+    @Test
+    public void validateWithMultiByteDelimiter() {
+        String data = "foodaabardaabazzz";
+        ByteArrayInputStream is = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+        StreamDemarcator scanner = new StreamDemarcator(is, 
"daa".getBytes(StandardCharsets.UTF_8), 1000);
+        assertTrue(Arrays.equals("foo".getBytes(StandardCharsets.UTF_8), 
scanner.nextToken()));
+        assertTrue(Arrays.equals("bar".getBytes(StandardCharsets.UTF_8), 
scanner.nextToken()));
+        assertTrue(Arrays.equals("bazzz".getBytes(StandardCharsets.UTF_8), 
scanner.nextToken()));
+        assertNull(scanner.nextToken());
+    }
+
+    @Test
+    public void validateWithMultiByteDelimiterAtTheBeginning() {
+        String data = "daafoodaabardaabazzz";
+        ByteArrayInputStream is = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+        StreamDemarcator scanner = new StreamDemarcator(is, 
"daa".getBytes(StandardCharsets.UTF_8), 1000);
+        assertTrue(Arrays.equals("foo".getBytes(StandardCharsets.UTF_8), 
scanner.nextToken()));
+        assertTrue(Arrays.equals("bar".getBytes(StandardCharsets.UTF_8), 
scanner.nextToken()));
+        assertTrue(Arrays.equals("bazzz".getBytes(StandardCharsets.UTF_8), 
scanner.nextToken()));
+        assertNull(scanner.nextToken());
+    }
+
+    @Test
+    public void validateWithMultiByteDelimiterSmallInitialBuffer() {
+        String data = "foodaabarffdaabazz";
+        ByteArrayInputStream is = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+        StreamDemarcator scanner = new StreamDemarcator(is, 
"daa".getBytes(StandardCharsets.UTF_8), 1000, 1);
+        assertTrue(Arrays.equals("foo".getBytes(StandardCharsets.UTF_8), 
scanner.nextToken()));
+        assertTrue(Arrays.equals("barff".getBytes(StandardCharsets.UTF_8), 
scanner.nextToken()));
+        assertTrue(Arrays.equals("bazz".getBytes(StandardCharsets.UTF_8), 
scanner.nextToken()));
+        assertNull(scanner.nextToken());
+    }
+
+    @Test
+    public void validateWithMultiByteCharsNoDelimiter() {
+        String data = "僠THIS IS MY NEW TEXT.僠IT HAS A NEWLINE.";
+        ByteArrayInputStream is = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+        StreamDemarcator scanner = new StreamDemarcator(is, null, 1000);
+        byte[] next = scanner.nextToken();
+        assertNotNull(next);
+        assertEquals(data, new String(next, StandardCharsets.UTF_8));
+        assertNull(scanner.nextToken());
+    }
+
+    @Test
+    public void validateWithMultiByteCharsNoDelimiterSmallInitialBuffer() {
+        String data = "僠THIS IS MY NEW TEXT.僠IT HAS A NEWLINE.";
+        ByteArrayInputStream is = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+        StreamDemarcator scanner = new StreamDemarcator(is, null, 1000, 2);
+        byte[] next = scanner.nextToken();
+        assertNotNull(next);
+        assertEquals(data, new String(next, StandardCharsets.UTF_8));
+        assertNull(scanner.nextToken());
+    }
+
+    @Test
+    public void validateWithComplexDelimiter() {
+        String data = "THIS IS MY TEXT<MYDEIMITER>THIS IS MY NEW 
TEXT<MYDEIMITER>THIS IS MY NEWEST TEXT";
+        ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
+        StreamDemarcator scanner = new StreamDemarcator(is, 
"<MYDEIMITER>".getBytes(StandardCharsets.UTF_8), 1000);
+        assertEquals("THIS IS MY TEXT", new String(scanner.nextToken(), 
StandardCharsets.UTF_8));
+        assertEquals("THIS IS MY NEW TEXT", new String(scanner.nextToken(), 
StandardCharsets.UTF_8));
+        assertEquals("THIS IS MY NEWEST TEXT", new String(scanner.nextToken(), 
StandardCharsets.UTF_8));
+        assertNull(scanner.nextToken());
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void validateMaxBufferSize() {
+        String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY 
DEIMITER>THIS IS MY NEWEST TEXT";
+        ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
+        StreamDemarcator scanner = new StreamDemarcator(is, "<MY 
DEIMITER>".getBytes(StandardCharsets.UTF_8), 20);
+        scanner.nextToken();
+    }
+
+    @Test
+    public void validateScannerHandlesNegativeOneByteInputsNoDelimiter() {
+        ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 
0, 0, -1, 0, 0, 0 });
+        StreamDemarcator scanner = new StreamDemarcator(is, null, 20);
+        byte[] b = scanner.nextToken();
+        assertArrayEquals(b, new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 });
+    }
+
+    @Test
+    public void validateScannerHandlesNegativeOneByteInputs() {
+        ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 
0, 0, -1, 0, 0, 0 });
+        StreamDemarcator scanner = new StreamDemarcator(is, 
"water".getBytes(StandardCharsets.UTF_8), 20, 1024);
+        byte[] b = scanner.nextToken();
+        assertArrayEquals(b, new byte[] { 0, 0, 0, 0, -1, 0, 0, 0 });
+    }
+
+    @Test
+    public void verifyScannerHandlesNegativeOneByteDelimiter() {
+        ByteArrayInputStream is = new ByteArrayInputStream(new byte[] { 0, 0, 
0, 0, -1, 0, 0, 0 });
+        StreamDemarcator scanner = new StreamDemarcator(is, new byte[] { -1 }, 
20, 1024);
+        assertArrayEquals(scanner.nextToken(), new byte[] { 0, 0, 0, 0 });
+        assertArrayEquals(scanner.nextToken(), new byte[] { 0, 0, 0 });
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java
 
b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java
deleted file mode 100644
index 2dc8f0b..0000000
--- 
a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/stream/io/util/StreamScannerTests.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.stream.io.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.ByteArrayInputStream;
-import java.lang.reflect.Field;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class StreamScannerTests {
-
-    @Test
-    public void validateWithMultiByteCharsNoDelimiter() {
-        String data = "僠THIS IS MY NEW TEXT.僠IT HAS A NEWLINE.";
-        ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
-        StreamScanner scanner = new StreamScanner(is, null, 1000);
-        assertTrue(scanner.hasNext());
-        assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8));
-        assertFalse(scanner.hasNext());
-    }
-
-    @Test
-    public void validateWithComplexDelimiter() {
-        String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY 
DEIMITER>THIS IS MY NEWEST TEXT";
-        ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
-        StreamScanner scanner = new StreamScanner(is, "<MY 
DEIMITER>".getBytes(StandardCharsets.UTF_8), 1000);
-        assertTrue(scanner.hasNext());
-        assertEquals("THIS IS MY TEXT", new String(scanner.next(), 
StandardCharsets.UTF_8));
-        assertTrue(scanner.hasNext());
-        assertEquals("THIS IS MY NEW TEXT", new String(scanner.next(), 
StandardCharsets.UTF_8));
-        assertTrue(scanner.hasNext());
-        assertEquals("THIS IS MY NEWEST TEXT", new String(scanner.next(), 
StandardCharsets.UTF_8));
-        assertFalse(scanner.hasNext());
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void validateMaxBufferSize() {
-        String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY 
DEIMITER>THIS IS MY NEWEST TEXT";
-        ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
-        StreamScanner scanner = new StreamScanner(is, "<MY 
DEIMITER>".getBytes(StandardCharsets.UTF_8), 20);
-        assertTrue(scanner.hasNext());
-    }
-
-    @Test
-    public void verifyScannerHandlesNegativeOneByteInputs() {
-        ByteArrayInputStream is = new ByteArrayInputStream(new byte[]{0, 0, 0, 
0, -1, 0, 0, 0});
-        StreamScanner scanner = new StreamScanner(is, 
"water".getBytes(StandardCharsets.UTF_8), 20, 1024);
-        assertTrue(scanner.hasNext());
-        Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0, 0, -1, 0, 
0, 0});
-    }
-
-    @Test
-    public void verifyScannerHandlesNegativeOneByteDelimiter() {
-        ByteArrayInputStream is = new ByteArrayInputStream(new byte[]{0, 0, 0, 
0, -1, 0, 0, 0});
-        StreamScanner scanner = new StreamScanner(is, new byte[] { -1 }, 20, 
1024);
-        assertTrue(scanner.hasNext());
-        Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0, 0});
-        assertTrue(scanner.hasNext());
-        Assert.assertArrayEquals(scanner.next(), new byte[]{0, 0, 0});
-    }
-
-    @Test
-    public void validateHasNextIdempotencyWithDelimiter() {
-        String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY 
DEIMITER>THIS IS MY NEWEST TEXT";
-        ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
-        StreamScanner scanner = new StreamScanner(is, "<MY 
DEIMITER>".getBytes(StandardCharsets.UTF_8), 1000);
-        for (int i = 0; i < 5; i++) { // we only have 3 segments so unless 
idempotent hasNext would return false after 3 tries
-            assertTrue(scanner.hasNext());
-        }
-        assertTrue(scanner.hasNext());
-        assertEquals("THIS IS MY TEXT", new String(scanner.next(), 
StandardCharsets.UTF_8));
-        assertTrue(scanner.hasNext());
-        assertEquals("THIS IS MY NEW TEXT", new String(scanner.next(), 
StandardCharsets.UTF_8));
-        assertTrue(scanner.hasNext());
-        assertEquals("THIS IS MY NEWEST TEXT", new String(scanner.next(), 
StandardCharsets.UTF_8));
-        assertFalse(scanner.hasNext());
-    }
-
-    @Test
-    public void validateHasNextIdempotencyWithoutDelimiter() {
-        String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY 
DEIMITER>THIS IS MY NEWEST TEXT";
-        ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
-        StreamScanner scanner = new StreamScanner(is, null, 1000);
-        for (int i = 0; i < 5; i++) { // we only have 3 segments so unless 
idempotent hasNext would return false after 3 tries
-            assertTrue(scanner.hasNext());
-        }
-        assertTrue(scanner.hasNext());
-        assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8));
-        assertFalse(scanner.hasNext());
-    }
-
-    @Test
-    public void validateInternalBufferCanExpend() throws Exception {
-        String data = "THIS IS MY TEXT<MY DEIMITER>THIS IS MY NEW TEXT<MY 
DEIMITER>THIS IS MY NEWEST TEXT";
-        ByteArrayInputStream is = new ByteArrayInputStream(data.getBytes());
-        StreamScanner scanner = new StreamScanner(is, null, 1000, 2);
-        Field bufferField = StreamScanner.class.getDeclaredField("buffer");
-        bufferField.setAccessible(true);
-        ByteBuffer buffer = (ByteBuffer) bufferField.get(scanner);
-        assertEquals(2, buffer.capacity());
-
-        assertTrue(scanner.hasNext());
-        assertEquals(data, new String(scanner.next(), StandardCharsets.UTF_8));
-        assertFalse(scanner.hasNext());
-
-        buffer = (ByteBuffer) bufferField.get(scanner);
-        assertEquals(128, buffer.capacity());
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/AbstractKafkaProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/AbstractKafkaProcessor.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/AbstractKafkaProcessor.java
new file mode 100644
index 0000000..5a470b3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/AbstractKafkaProcessor.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.exception.ProcessException;
+
+/**
+ * Base class for {@link Processor}s to publish and consume messages from Kafka
+ *
+ * @see PutKafka
+ */
+abstract class AbstractKafkaProcessor<T extends Closeable> extends 
AbstractSessionFactoryProcessor {
+
+
+    private volatile boolean acceptTask = true;
+
+    private final AtomicInteger taskCounter = new AtomicInteger();
+
+
+    /**
+     * @see KafkaPublisher
+     */
+    volatile T kafkaResource;
+
+    /**
+     *
+     */
+    @Override
+    public final void onTrigger(final ProcessContext context, final 
ProcessSessionFactory sessionFactory) throws ProcessException {
+        if (this.acceptTask) { // acts as a circuit breaker to allow existing 
tasks to wind down so 'kafkaResource' can be reset before new tasks are 
accepted.
+            this.taskCounter.incrementAndGet();
+            final ProcessSession session = sessionFactory.createSession();
+            try {
+                /*
+                 * We can't be doing double null check here since as a pattern
+                 * it only works for lazy init but not reset, which is what we
+                 * are doing here. In fact the first null check is dangerous
+                 * since 'kafkaResource' can become null right after its null
+                 * check passed causing subsequent NPE.
+                 */
+                synchronized (this) {
+                    if (this.kafkaResource == null) {
+                        this.kafkaResource = this.buildKafkaResource(context, 
session);
+                    }
+                }
+
+                /*
+                 * The 'processed' boolean flag does not imply any failure or 
success. It simply states that:
+                 * - ConsumeKafka - some messages were received form Kafka and 
1_ FlowFile were generated
+                 * - PublishKafka - some messages were sent to Kafka based on 
existence of the input FlowFile
+                 */
+                boolean processed = this.rendezvousWithKafka(context, session);
+                session.commit();
+                if (processed) {
+                    this.postCommit(context);
+                } else {
+                    context.yield();
+                }
+            } catch (Throwable e) {
+                this.acceptTask = false;
+                session.rollback(true);
+                this.getLogger().error("{} failed to process due to {}; 
rolling back session", new Object[] { this, e });
+            } finally {
+                synchronized (this) {
+                    if (this.taskCounter.decrementAndGet() == 0 && 
!this.acceptTask) {
+                        this.close();
+                        this.acceptTask = true;
+                    }
+                }
+            }
+        } else {
+            context.yield();
+        }
+    }
+
+    /**
+     * Will call {@link Closeable#close()} on the target resource after which
+     * the target resource will be set to null. Should only be called when 
there
+     * are no more threads being executed on this processor or when it has been
+     * verified that only a single thread remains.
+     *
+     * @see KafkaPublisher
+     */
+    @OnStopped
+    public void close() {
+        if (this.taskCounter.get() == 0) {
+            try {
+                if (this.kafkaResource != null) {
+                    try {
+                        this.kafkaResource.close();
+                    } catch (Exception e) {
+                        this.getLogger().warn("Failed while closing " + 
this.kafkaResource, e);
+                    }
+                }
+            } finally {
+                this.kafkaResource = null;
+            }
+        }
+    }
+
+    /**
+     * This operation will be executed after {@link ProcessSession#commit()} 
has
+     * been called.
+     */
+    protected void postCommit(ProcessContext context) {
+
+    }
+
+    /**
+     * This operation is called from
+     * {@link #onTrigger(ProcessContext, ProcessSessionFactory)} method and
+     * contains main processing logic for this Processor.
+     */
+    protected abstract boolean rendezvousWithKafka(ProcessContext context, 
ProcessSession session);
+
+    /**
+     * Builds target resource for interacting with Kafka. The target resource
+     * could be one of {@link KafkaPublisher} or {@link KafkaConsumer}
+     */
+    protected abstract T buildKafkaResource(ProcessContext context, 
ProcessSession session) throws ProcessException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
index afb2cc6..dac5804 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
@@ -16,9 +16,9 @@
  */
 package org.apache.nifi.processors.kafka;
 
+import java.io.Closeable;
 import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
@@ -27,47 +27,50 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ProcessorLog;
-import org.apache.nifi.stream.io.util.StreamScanner;
+import org.apache.nifi.stream.io.util.StreamDemarcator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import kafka.producer.KeyedMessage;
 import kafka.producer.Partitioner;
 
 /**
- * Wrapper over {@link KafkaProducer} to assist {@link PutKafka} processor with
- * sending content of {@link FlowFile}s to Kafka.
+ * Wrapper over {@link KafkaProducer} to assist {@link PublishKafka} processor
+ * with sending contents of the {@link FlowFile}s to Kafka.
  */
-class KafkaPublisher implements AutoCloseable {
+class KafkaPublisher implements Closeable {
 
     private static final Logger logger = 
LoggerFactory.getLogger(KafkaPublisher.class);
 
-    private final KafkaProducer<byte[], byte[]> producer;
+    private final Producer<byte[], byte[]> kafkaProducer;
 
-    private final Partitioner partitioner;
-
-    private final long ackWaitTime;
+    private long ackWaitTime = 30000;
 
     private ProcessorLog processLog;
 
+    private final Partitioner partitioner;
+
     /**
      * Creates an instance of this class as well as the instance of the
      * corresponding Kafka {@link KafkaProducer} using provided Kafka
      * configuration properties.
+     *
+     * @param kafkaProperties
+     *            instance of {@link Properties} used to bootstrap
+     *            {@link KafkaProducer}
      */
     KafkaPublisher(Properties kafkaProperties) {
         kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
-        this.producer = new KafkaProducer<>(kafkaProperties);
-        this.ackWaitTime = 
Long.parseLong(kafkaProperties.getProperty(ProducerConfig.TIMEOUT_CONFIG)) * 2;
+        this.kafkaProducer = new KafkaProducer<>(kafkaProperties);
         try {
-            if (kafkaProperties.containsKey("partitioner.class")){
+            if (kafkaProperties.containsKey("partitioner.class")) {
                 this.partitioner = (Partitioner) 
Class.forName(kafkaProperties.getProperty("partitioner.class")).newInstance();
             } else {
                 this.partitioner = null;
@@ -78,152 +81,181 @@ class KafkaPublisher implements AutoCloseable {
     }
 
     /**
-     *
-     */
-    void setProcessLog(ProcessorLog processLog) {
-        this.processLog = processLog;
-    }
-
-    /**
-     * Publishes messages to Kafka topic. It supports three publishing
-     * mechanisms.
+     * Publishes messages to Kafka topic. It uses {@link StreamDemarcator} to
+     * determine how many messages to Kafka will be sent from a provided
+     * {@link InputStream} (see {@link PublishingContext#getContentStream()}).
+     * It supports two publishing modes:
      * <ul>
-     * <li>Sending the entire content stream as a single Kafka message.</li>
-     * <li>Splitting the incoming content stream into chunks and sending
-     * individual chunks as separate Kafka messages.</li>
-     * <li>Splitting the incoming content stream into chunks and sending only
-     * the chunks that have failed previously @see
-     * {@link SplittableMessageContext#getFailedSegments()}.</li>
+     * <li>Sending all messages constructed from
+     * {@link StreamDemarcator#nextToken()} operation.</li>
+     * <li>Sending only unacknowledged messages constructed from
+     * {@link StreamDemarcator#nextToken()} operation.</li>
      * </ul>
+     * The unacknowledged messages are determined from the value of
+     * {@link PublishingContext#getLastAckedMessageIndex()}.
+     * <br>
      * This method assumes content stream affinity where it is expected that 
the
      * content stream that represents the same Kafka message(s) will remain the
      * same across possible retries. This is required specifically for cases
      * where delimiter is used and a single content stream may represent
-     * multiple Kafka messages. The failed segment list will keep the index of
-     * of each content stream segment that had failed to be sent to Kafka, so
-     * upon retry only the failed segments are sent.
+     * multiple Kafka messages. The
+     * {@link PublishingContext#getLastAckedMessageIndex()} will provide the
+     * index of the last ACKed message, so upon retry only messages with the
+     * higher index are sent.
      *
-     * @param messageContext
-     *            instance of {@link SplittableMessageContext} which hold
-     *            context information about the message to be sent
-     * @param contentStream
-     *            instance of open {@link InputStream} carrying the content of
-     *            the message(s) to be send to Kafka
-     * @param partitionKey
-     *            the value of the partition key. Only relevant is user wishes
-     *            to provide a custom partition key instead of relying on
-     *            variety of provided {@link Partitioner}(s)
-     * @param maxBufferSize maximum message size
-     * @return The set containing the failed segment indexes for messages that
-     *         failed to be sent to Kafka.
+     * @param publishingContext
+     *            instance of {@link PublishingContext} which hold context
+     *            information about the message(s) to be sent.
+     * @return The index of the last successful offset.
      */
-    BitSet publish(SplittableMessageContext messageContext, InputStream 
contentStream, Integer partitionKey,
-            int maxBufferSize) {
-        List<Future<RecordMetadata>> sendFutures = this.split(messageContext, 
contentStream, partitionKey, maxBufferSize);
-        return this.publish(sendFutures);
+    KafkaPublisherResult publish(PublishingContext publishingContext) {
+        StreamDemarcator streamTokenizer = new 
StreamDemarcator(publishingContext.getContentStream(),
+                publishingContext.getDelimiterBytes(), 
publishingContext.getMaxRequestSize());
+
+        int prevLastAckedMessageIndex = 
publishingContext.getLastAckedMessageIndex();
+        List<Future<RecordMetadata>> resultFutures = new ArrayList<>();
+
+        byte[] messageBytes;
+        int tokenCounter = 0;
+        for (; (messageBytes = streamTokenizer.nextToken()) != null; 
tokenCounter++) {
+            if (prevLastAckedMessageIndex < tokenCounter) {
+                Integer partitionId = publishingContext.getPartitionId();
+                if (partitionId == null && publishingContext.getKeyBytes() != 
null) {
+                    partitionId = 
this.getPartition(publishingContext.getKeyBytes(), 
publishingContext.getTopic());
+                }
+                ProducerRecord<byte[], byte[]> message =
+                        new ProducerRecord<>(publishingContext.getTopic(), 
publishingContext.getPartitionId(), publishingContext.getKeyBytes(), 
messageBytes);
+                resultFutures.add(this.kafkaProducer.send(message));
+            }
+        }
+
+        int lastAckedMessageIndex = this.processAcks(resultFutures, 
prevLastAckedMessageIndex);
+        return new KafkaPublisherResult(tokenCounter, lastAckedMessageIndex);
     }
 
     /**
-     * This method splits (if required) the incoming content stream into
-     * messages to publish to Kafka topic. See publish method for more
-     * details
+     * Sets the time this publisher will wait for the {@link Future#get()}
+     * operation (the Future returned by
+     * {@link KafkaProducer#send(ProducerRecord)}) to complete before timing
+     * out.
      *
-     * @param messageContext
-     *            instance of {@link SplittableMessageContext} which hold
-     *            context information about the message to be sent
-     * @param contentStream
-     *            instance of open {@link InputStream} carrying the content of
-     *            the message(s) to be send to Kafka
-     * @param partitionKey
-     *            the value of the partition key. Only relevant is user wishes
-     *            to provide a custom partition key instead of relying on
-     *            variety of provided {@link Partitioner}(s)
-     * @param maxBufferSize maximum message size
-     * @return The list of messages to publish
+     * This value will also be used as a timeout when closing the underlying
+     * {@link KafkaProducer}. See {@link #close()}.
      */
-    List<Future<RecordMetadata>> split(SplittableMessageContext 
messageContext, InputStream contentStream, Integer partitionKey,
-            int maxBufferSize) {
-        List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
-        BitSet prevFailedSegmentIndexes = messageContext.getFailedSegments();
-        int segmentCounter = 0;
-        StreamScanner scanner = new StreamScanner(contentStream, 
messageContext.getDelimiterBytes(), maxBufferSize);
-
-        while (scanner.hasNext()) {
-            byte[] content = scanner.next();
-            if (content.length > 0){
-                byte[] key = messageContext.getKeyBytes();
-                String topicName = messageContext.getTopicName();
-                if (partitionKey == null && key != null) {
-                    partitionKey = this.getPartition(key, topicName);
-                }
-                if (prevFailedSegmentIndexes == null || 
prevFailedSegmentIndexes.get(segmentCounter)) {
-                    ProducerRecord<byte[], byte[]> message = new 
ProducerRecord<>(topicName, partitionKey, key, content);
-                    sendFutures.add(this.toKafka(message));
-                }
-                segmentCounter++;
-            }
-        }
-        return sendFutures;
+    void setAckWaitTime(long ackWaitTime) {
+        this.ackWaitTime = ackWaitTime;
     }
 
     /**
+     * This operation will process ACKs from Kafka in the order in which
+     * {@link KafkaProducer#send(ProducerRecord)} invocation were made 
returning
+     * the index of the last ACKed message. Within this operation processing 
ACK
+     * simply means successful invocation of 'get()' operation on the
+     * {@link Future} returned by {@link KafkaProducer#send(ProducerRecord)}
+     * operation. Upon encountering any type of error while interrogating such
+     * {@link Future} the ACK loop will end. Messages that were not ACKed would
+     * be considered non-delivered and therefore could be resent at the later
+     * time.
+     *
+     * @param sendFutures
+     *            list of {@link Future}s representing results of publishing to
+     *            Kafka
      *
+     * @param lastAckMessageIndex
+     *            the index of the last ACKed message. It is important to
+     *            provide the last ACKed message especially while re-trying so
+     *            the proper index is maintained.
      */
-    BitSet publish(List<Future<RecordMetadata>> sendFutures) {
-        int segmentCounter = 0;
-        BitSet failedSegments = new BitSet();
-        for (Future<RecordMetadata> future : sendFutures) {
+    private int processAcks(List<Future<RecordMetadata>> sendFutures, int 
lastAckMessageIndex) {
+        boolean exceptionThrown = false;
+        for (int segmentCounter = 0; segmentCounter < sendFutures.size() && 
!exceptionThrown; segmentCounter++) {
+            Future<RecordMetadata> future = sendFutures.get(segmentCounter);
             try {
                 future.get(this.ackWaitTime, TimeUnit.MILLISECONDS);
+                lastAckMessageIndex++;
             } catch (InterruptedException e) {
-                failedSegments.set(segmentCounter);
+                exceptionThrown = true;
                 Thread.currentThread().interrupt();
-                logger.warn("Interrupted while waiting for acks from Kafka");
-                if (this.processLog != null) {
-                    this.processLog.warn("Interrupted while waiting for acks 
from Kafka");
-                }
+                this.warnOrError("Interrupted while waiting for acks from 
Kafka", null);
             } catch (ExecutionException e) {
-                failedSegments.set(segmentCounter);
-                logger.error("Failed while waiting for acks from Kafka", e);
-                if (this.processLog != null) {
-                    this.processLog.error("Failed while waiting for acks from 
Kafka", e);
-                }
+                exceptionThrown = true;
+                this.warnOrError("Failed while waiting for acks from Kafka", 
e);
             } catch (TimeoutException e) {
-                failedSegments.set(segmentCounter);
-                logger.warn("Timed out while waiting for acks from Kafka");
-                if (this.processLog != null) {
-                    this.processLog.warn("Timed out while waiting for acks 
from Kafka");
-                }
+                exceptionThrown = true;
+                this.warnOrError("Timed out while waiting for acks from 
Kafka", null);
             }
-            segmentCounter++;
         }
-        return failedSegments;
+
+        return lastAckMessageIndex;
     }
 
     /**
-     *
+     * Will close the underlying {@link KafkaProducer}
      */
-    private int getPartition(Object key, String topicName) {
-        int partSize = this.producer.partitionsFor(topicName).size();
-        return this.partitioner.partition(key, partSize);
+    @Override
+    public void close() {
+        this.kafkaProducer.close();
     }
 
     /**
-     * Closes {@link KafkaProducer}
+     * Will set {@link ProcessorLog} as an additional logger to forward log
+     * messages to NiFi bulletin
      */
-    @Override
-    public void close() throws Exception {
-        this.producer.close();
+    void setProcessLog(ProcessorLog processLog) {
+        this.processLog = processLog;
     }
 
     /**
-     * Sends the provided {@link KeyedMessage} to Kafka async returning
-     * {@link Future}
+     *
      */
-    private Future<RecordMetadata> toKafka(ProducerRecord<byte[], byte[]> 
message) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("Publishing message to '" + message.topic() + "' 
topic.");
+    private void warnOrError(String message, Exception e) {
+        if (e == null) {
+            logger.warn(message);
+            if (this.processLog != null) {
+                this.processLog.warn(message);
+            }
+        } else {
+            logger.error(message, e);
+            if (this.processLog != null) {
+                this.processLog.error(message, e);
+            }
+        }
+    }
+
+    static class KafkaPublisherResult {
+        private final int messagesSent;
+        private final int lastMessageAcked;
+        KafkaPublisherResult(int messagesSent, int lastMessageAcked) {
+            this.messagesSent = messagesSent;
+            this.lastMessageAcked = lastMessageAcked;
+        }
+
+        public int getMessagesSent() {
+            return this.messagesSent;
+        }
+
+        public int getLastMessageAcked() {
+            return this.lastMessageAcked;
+        }
+
+        public boolean isAllAcked() {
+            return this.messagesSent - 1 == this.lastMessageAcked;
+        }
+
+        @Override
+        public String toString() {
+            return "Sent:" + this.messagesSent + "; Last ACK:" + 
this.lastMessageAcked;
+        }
+    }
+
+    /**
+     *
+     */
+    private int getPartition(Object key, String topicName) {
+        if (this.partitioner != null) {
+            int partSize = this.kafkaProducer.partitionsFor(topicName).size();
+            return this.partitioner.partition(key, partSize);
         }
-        return this.producer.send(message);
+        return 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
index 2a851a4..32d3606 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/Partitioners.java
@@ -40,12 +40,11 @@ final public class Partitioners {
             return partitionIndex;
         }
 
-        private int next(int numberOfPartitions) {
-            if (index == numberOfPartitions) {
-                index = 0;
+        private synchronized int next(int numberOfPartitions) {
+            if (this.index >= numberOfPartitions) {
+                this.index = 0;
             }
-            int indexToReturn = index++;
-            return indexToReturn;
+            return index++;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java
new file mode 100644
index 0000000..914ac1a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PublishingContext.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Holder of context information used by {@link KafkaPublisher} required to
+ * publish messages to Kafka.
+ */
+class PublishingContext {
+
+    private final InputStream contentStream;
+
+    private final String topic;
+
+    private final int lastAckedMessageIndex;
+
+    private volatile Integer partitionId;
+
+    /*
+     * We're using the default value from Kafka. We are using it to control the
+     * message size before it goes to to Kafka thus limiting possibility of a
+     * late failures in Kafka client.
+     */
+    private volatile int maxRequestSize = 1048576; // kafka default
+
+    private volatile boolean maxRequestSizeSet;
+
+    private volatile byte[] keyBytes;
+
+    private volatile byte[] delimiterBytes;
+
+
+
+    PublishingContext(InputStream contentStream, String topic) {
+        this(contentStream, topic, -1);
+    }
+
+    PublishingContext(InputStream contentStream, String topic, int 
lastAckedMessageIndex) {
+        this.validateInput(contentStream, topic, lastAckedMessageIndex);
+        this.contentStream = contentStream;
+        this.topic = topic;
+        this.lastAckedMessageIndex = lastAckedMessageIndex;
+    }
+
+    @Override
+    public String toString() {
+        return "topic: '" + this.topic + "'; delimiter: '" + new 
String(this.delimiterBytes, StandardCharsets.UTF_8) + "'";
+    }
+
+    int getLastAckedMessageIndex() {
+        return this.lastAckedMessageIndex;
+    }
+
+    int getMaxRequestSize() {
+        return this.maxRequestSize;
+    }
+
+    byte[] getKeyBytes() {
+        return this.keyBytes;
+    }
+
+    Integer getPartitionId() {
+        return partitionId;
+    }
+
+    public void setPartitionId(Integer partitionId) {
+        this.partitionId = partitionId;
+    }
+
+    byte[] getDelimiterBytes() {
+        return this.delimiterBytes;
+    }
+
+    InputStream getContentStream() {
+        return this.contentStream;
+    }
+
+    String getTopic() {
+        return this.topic;
+    }
+
+    void setKeyBytes(byte[] keyBytes) {
+        if (this.keyBytes == null) {
+            if (keyBytes != null) {
+                this.assertBytesValid(keyBytes);
+                this.keyBytes = keyBytes;
+            }
+        } else {
+            throw new IllegalArgumentException("'keyBytes' can only be set 
once per instance");
+        }
+    }
+
+    void setDelimiterBytes(byte[] delimiterBytes) {
+        if (this.delimiterBytes == null) {
+            if (delimiterBytes != null) {
+                this.assertBytesValid(delimiterBytes);
+                this.delimiterBytes = delimiterBytes;
+            }
+        } else {
+            throw new IllegalArgumentException("'delimiterBytes' can only be 
set once per instance");
+        }
+    }
+
+    void setMaxRequestSize(int maxRequestSize) {
+        if (!this.maxRequestSizeSet) {
+            if (maxRequestSize > 0) {
+                this.maxRequestSize = maxRequestSize;
+                this.maxRequestSizeSet = true;
+            } else {
+                throw new IllegalArgumentException("'maxRequestSize' must be > 
0");
+            }
+        } else {
+            throw new IllegalArgumentException("'maxRequestSize' can only be 
set once per instance");
+        }
+    }
+
+    private void assertBytesValid(byte[] bytes) {
+        if (bytes != null) {
+            if (bytes.length == 0) {
+                throw new IllegalArgumentException("'bytes' must not be 
empty");
+            }
+        }
+    }
+
+    private void validateInput(InputStream contentStream, String topic, int 
lastAckedMessageIndex) {
+        if (contentStream == null) {
+            throw new IllegalArgumentException("'contentStream' must not be 
null");
+        } else if (topic == null || topic.trim().length() == 0) {
+            throw new IllegalArgumentException("'topic' must not be null or 
empty");
+        } else if (lastAckedMessageIndex < -1) {
+            throw new IllegalArgumentException("'lastAckedMessageIndex' must 
be >= -1");
+        }
+    }
+}

Reply via email to