http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java
new file mode 100644
index 0000000..6346ffd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.Closeable;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class AbstractKafkaProcessorLifecycelTest {
+
+    private final static Random random = new Random();
+
+    @Test
+    public void validateBaseProperties() throws Exception {
+        TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class);
+        runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
+        runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
+        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
+
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
+        }
+
+        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("'bootstrap.servers' validated 
against 'foo' is invalid"));
+        }
+        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
+
+        runner.removeProperty(ConsumeKafka.TOPIC);
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("'topic' is invalid because 
topic is required"));
+        }
+
+        runner.setProperty(ConsumeKafka.TOPIC, "");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
+        }
+
+        runner.setProperty(ConsumeKafka.TOPIC, "  ");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
+        }
+        runner.setProperty(ConsumeKafka.TOPIC, "blah");
+
+        runner.removeProperty(ConsumeKafka.CLIENT_ID);
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("invalid because client.id is 
required"));
+        }
+
+        runner.setProperty(ConsumeKafka.CLIENT_ID, "");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
+        }
+
+        runner.setProperty(ConsumeKafka.CLIENT_ID, "   ");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
+        }
+        runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj");
+
+        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
+        }
+        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "  ");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
+        }
+    }
+
+    @Test
+    @Ignore // just for extra sanity check
+    public void validateConcurrencyWithRandomFailuresMultiple() throws 
Exception {
+        for (int i = 0; i < 100; i++) {
+            validateConcurrencyWithRandomFailures();
+        }
+    }
+
+    @Test
+    public void validateConcurrencyWithRandomFailures() throws Exception {
+        ExecutorService processingExecutor = Executors.newFixedThreadPool(32);
+        final AtomicInteger commitCounter = new AtomicInteger();
+        final AtomicInteger rollbackCounter = new AtomicInteger();
+        final AtomicInteger yieldCounter = new AtomicInteger();
+
+        final ProcessSessionFactory sessionFactory = 
mock(ProcessSessionFactory.class);
+        final ProcessSession session = mock(ProcessSession.class);
+        when(sessionFactory.createSession()).thenReturn(session);
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                commitCounter.incrementAndGet();
+                return null;
+            }
+        }).when(session).commit();
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                rollbackCounter.incrementAndGet();
+                return null;
+            }
+        }).when(session).rollback(true);
+
+        final ConcurrencyValidatingProcessor processor = spy(new 
ConcurrencyValidatingProcessor());
+
+        int testCount = 1000;
+        final CountDownLatch latch = new CountDownLatch(testCount);
+        for (int i = 0; i < testCount; i++) {
+            processingExecutor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    ProcessContext context = mock(ProcessContext.class);
+                    doAnswer(new Answer<Void>() {
+                        @Override
+                        public Void answer(InvocationOnMock invocation) throws 
Throwable {
+                            yieldCounter.incrementAndGet();
+                            return null;
+                        }
+                    }).when(context).yield();
+                    if (random.nextInt(10) == 5) {
+                        when(context.getName()).thenReturn("fail");
+                    }
+                    try {
+                        processor.onTrigger(context, sessionFactory);
+                    } catch (Exception e) {
+                        fail();
+                    } finally {
+                        latch.countDown();
+                    }
+                }
+            });
+        }
+
+        assertTrue(latch.await(20000, TimeUnit.MILLISECONDS));
+        processingExecutor.shutdown();
+
+        System.out.println("SUCCESS: " + processor.successfulTriggers);
+        System.out.println("FAILURE: " + processor.failedTriggers);
+        System.out.println("INIT: " + processor.resourceReinitialized);
+        System.out.println("YIELD CALLS: " + yieldCounter.get());
+        System.out.println("COMMIT CALLS: " + commitCounter.get());
+        System.out.println("ROLLBACK CALLS: " + rollbackCounter.get());
+        System.out.println("Close CALLS: " + processor.closeCounter.get());
+
+        /*
+         * this has to be <= 1 since the last thread may come to finally block
+         * after acceptTask flag has been reset at which point the close will
+         * not be called (which is correct behavior since it will be invoked
+         * explicitly by the life-cycle operations of a running processor).
+         *
+         * You can actually observe the =1 behavior in the next test where it 
is
+         * always 0 close calls
+         */
+        int closeVsInitDiff = processor.resourceReinitialized.get() - 
processor.closeCounter.get();
+        assertTrue(closeVsInitDiff <= 1);
+
+        assertEquals(commitCounter.get(), processor.successfulTriggers.get());
+        assertEquals(rollbackCounter.get(), processor.failedTriggers.get());
+
+        assertEquals(testCount,
+                processor.successfulTriggers.get() + 
processor.failedTriggers.get() + yieldCounter.get());
+    }
+
+    @Test
+    public void validateConcurrencyWithAllSuccesses() throws Exception {
+        ExecutorService processingExecutor = Executors.newFixedThreadPool(32);
+        final AtomicInteger commitCounter = new AtomicInteger();
+        final AtomicInteger rollbackCounter = new AtomicInteger();
+        final AtomicInteger yieldCounter = new AtomicInteger();
+
+        final ProcessSessionFactory sessionFactory = 
mock(ProcessSessionFactory.class);
+        final ProcessSession session = mock(ProcessSession.class);
+        when(sessionFactory.createSession()).thenReturn(session);
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                commitCounter.incrementAndGet();
+                return null;
+            }
+        }).when(session).commit();
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                rollbackCounter.incrementAndGet();
+                return null;
+            }
+        }).when(session).rollback(true);
+
+        final ConcurrencyValidatingProcessor processor = spy(new 
ConcurrencyValidatingProcessor());
+
+        int testCount = 1000;
+        final CountDownLatch latch = new CountDownLatch(testCount);
+        for (int i = 0; i < testCount; i++) {
+            processingExecutor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    ProcessContext context = mock(ProcessContext.class);
+                    doAnswer(new Answer<Void>() {
+                        @Override
+                        public Void answer(InvocationOnMock invocation) throws 
Throwable {
+                            yieldCounter.incrementAndGet();
+                            return null;
+                        }
+                    }).when(context).yield();
+                    try {
+                        processor.onTrigger(context, sessionFactory);
+                    } catch (Exception e) {
+                        fail();
+                    } finally {
+                        latch.countDown();
+                    }
+                }
+            });
+        }
+
+        assertTrue(latch.await(30000, TimeUnit.MILLISECONDS));
+        processingExecutor.shutdown();
+
+        System.out.println("SUCCESS: " + processor.successfulTriggers);
+        System.out.println("FAILURE: " + processor.failedTriggers);
+        System.out.println("INIT: " + processor.resourceReinitialized);
+        System.out.println("YIELD CALLS: " + yieldCounter.get());
+        System.out.println("COMMIT CALLS: " + commitCounter.get());
+        System.out.println("ROLLBACK CALLS: " + rollbackCounter.get());
+        System.out.println("Close CALLS: " + processor.closeCounter.get());
+
+        /*
+         * unlike previous test this one will always be 1 since there are no
+         * failures
+         */
+        int closeVsInitDiff = processor.resourceReinitialized.get() - 
processor.closeCounter.get();
+        assertEquals(1, closeVsInitDiff);
+
+        assertEquals(commitCounter.get(), processor.successfulTriggers.get());
+        assertEquals(rollbackCounter.get(), processor.failedTriggers.get());
+
+        assertEquals(testCount,
+                processor.successfulTriggers.get() + 
processor.failedTriggers.get() + yieldCounter.get());
+    }
+
+    @Test
+    public void validateConcurrencyWithAllFailures() throws Exception {
+        ExecutorService processingExecutor = Executors.newFixedThreadPool(32);
+        final AtomicInteger commitCounter = new AtomicInteger();
+        final AtomicInteger rollbackCounter = new AtomicInteger();
+        final AtomicInteger yieldCounter = new AtomicInteger();
+
+        final ProcessSessionFactory sessionFactory = 
mock(ProcessSessionFactory.class);
+        final ProcessSession session = mock(ProcessSession.class);
+        when(sessionFactory.createSession()).thenReturn(session);
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                commitCounter.incrementAndGet();
+                return null;
+            }
+        }).when(session).commit();
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                rollbackCounter.incrementAndGet();
+                return null;
+            }
+        }).when(session).rollback(true);
+
+        final ConcurrencyValidatingProcessor processor = spy(new 
ConcurrencyValidatingProcessor());
+
+        int testCount = 1000;
+        final CountDownLatch latch = new CountDownLatch(testCount);
+        for (int i = 0; i < testCount; i++) {
+            processingExecutor.execute(new Runnable() {
+                @Override
+                public void run() {
+                    ProcessContext context = mock(ProcessContext.class);
+                    doAnswer(new Answer<Void>() {
+                        @Override
+                        public Void answer(InvocationOnMock invocation) throws 
Throwable {
+                            yieldCounter.incrementAndGet();
+                            return null;
+                        }
+                    }).when(context).yield();
+                    when(context.getName()).thenReturn("fail");
+                    try {
+                        processor.onTrigger(context, sessionFactory);
+                    } catch (Exception e) {
+                        fail();
+                    } finally {
+                        latch.countDown();
+                    }
+                }
+            });
+        }
+
+        assertTrue(latch.await(20000, TimeUnit.MILLISECONDS));
+        processingExecutor.shutdown();
+
+        System.out.println("SUCCESS: " + processor.successfulTriggers);
+        System.out.println("FAILURE: " + processor.failedTriggers);
+        System.out.println("INIT: " + processor.resourceReinitialized);
+        System.out.println("YIELD CALLS: " + yieldCounter.get());
+        System.out.println("COMMIT CALLS: " + commitCounter.get());
+        System.out.println("ROLLBACK CALLS: " + rollbackCounter.get());
+        System.out.println("Close CALLS: " + processor.closeCounter.get());
+
+        /*
+         * unlike previous test this one will always be 0 since all triggers 
are
+         * failures
+         */
+        int closeVsInitDiff = processor.resourceReinitialized.get() - 
processor.closeCounter.get();
+        assertEquals(0, closeVsInitDiff);
+
+        assertEquals(commitCounter.get(), processor.successfulTriggers.get());
+        assertEquals(rollbackCounter.get(), processor.failedTriggers.get());
+
+        assertEquals(testCount,
+                processor.successfulTriggers.get() + 
processor.failedTriggers.get() + yieldCounter.get());
+    }
+
+    /**
+     *
+     */
+    public static class DummyProcessor extends 
AbstractKafkaProcessor<Closeable> {
+        @Override
+        protected boolean rendezvousWithKafka(ProcessContext context, 
ProcessSession session) throws ProcessException {
+            return true;
+        }
+
+        @Override
+        protected Closeable buildKafkaResource(ProcessContext context, 
ProcessSession session) throws ProcessException {
+            return mock(Closeable.class);
+        }
+
+        @Override
+        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+            return SHARED_DESCRIPTORS;
+        }
+    }
+
+
+    public static class ConcurrencyValidatingProcessor extends 
AbstractKafkaProcessor<Closeable> {
+        final AtomicInteger failedTriggers = new AtomicInteger();
+        final AtomicInteger successfulTriggers = new AtomicInteger();
+        final AtomicInteger resourceReinitialized = new AtomicInteger();
+        final AtomicInteger closeCounter = new AtomicInteger();
+
+        ConcurrencyValidatingProcessor() {
+            try {
+                Field loggerField = 
AbstractSessionFactoryProcessor.class.getDeclaredField("logger");
+                loggerField.setAccessible(true);
+                loggerField.set(this, mock(ProcessorLog.class));
+            } catch (Exception e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        @Override
+        @OnStopped
+        public void close() {
+            super.close();
+            assertTrue(this.kafkaResource == null);
+            closeCounter.incrementAndGet();
+        }
+
+        @Override
+        protected boolean rendezvousWithKafka(ProcessContext context, 
ProcessSession session) {
+            assertNotNull(this.kafkaResource);
+            if ("fail".equals(context.getName())) {
+                failedTriggers.incrementAndGet();
+                throw new RuntimeException("Intentional");
+            }
+            this.successfulTriggers.incrementAndGet();
+            return true;
+        }
+
+        @Override
+        protected Closeable buildKafkaResource(ProcessContext context, 
ProcessSession session) throws ProcessException {
+            this.resourceReinitialized.incrementAndGet();
+            return mock(Closeable.class);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
new file mode 100644
index 0000000..2031e76
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+// The test is valid and should be ran when working on this module. @Ignore is
+// to speed up the overall build
+public class ConsumeKafkaTest {
+
+    @Test
+    public void validatePropertiesValidation() throws Exception {
+        ConsumeKafka consumeKafka = new ConsumeKafka();
+        TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(ConsumeKafka.TOPIC, "foo");
+        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
+        runner.setProperty(ConsumeKafka.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
+
+        runner.removeProperty(ConsumeKafka.GROUP_ID);
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("invalid because group.id is 
required"));
+        }
+
+        runner.setProperty(ConsumeKafka.GROUP_ID, "");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
+        }
+
+        runner.setProperty(ConsumeKafka.GROUP_ID, "  ");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
+        }
+    }
+
+    /**
+     * Will set auto-offset to 'smallest' to ensure that all events (the once
+     * that were sent before and after consumer startup) are received.
+     */
+    @Test
+    public void validateGetAllMessages() throws Exception {
+        String topicName = "validateGetAllMessages";
+
+        StubConsumeKafka consumeKafka = new StubConsumeKafka();
+
+        final TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka.TOPIC, topicName);
+        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
+        runner.setProperty(ConsumeKafka.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
+
+        byte[][] values = new byte[][] { 
"Hello-1".getBytes(StandardCharsets.UTF_8),
+                "Hello-2".getBytes(StandardCharsets.UTF_8), 
"Hello-3".getBytes(StandardCharsets.UTF_8) };
+        consumeKafka.setValues(values);
+
+        runner.run(1, false);
+
+        values = new byte[][] { "Hello-4".getBytes(StandardCharsets.UTF_8), 
"Hello-5".getBytes(StandardCharsets.UTF_8),
+                "Hello-6".getBytes(StandardCharsets.UTF_8) };
+        consumeKafka.setValues(values);
+
+        runner.run(1, false);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
+
+        assertEquals(6, flowFiles.size());
+        // spot check
+        MockFlowFile flowFile = flowFiles.get(0);
+        String event = new String(flowFile.toByteArray());
+        assertEquals("Hello-1", event);
+
+        flowFile = flowFiles.get(1);
+        event = new String(flowFile.toByteArray());
+        assertEquals("Hello-2", event);
+
+        flowFile = flowFiles.get(5);
+        event = new String(flowFile.toByteArray());
+        assertEquals("Hello-6", event);
+
+        consumeKafka.close();
+    }
+
+    @Test
+    public void validateGetAllMessagesWithProvidedDemarcator() throws 
Exception {
+        String topicName = "validateGetAllMessagesWithProvidedDemarcator";
+
+        StubConsumeKafka consumeKafka = new StubConsumeKafka();
+
+        final TestRunner runner = TestRunners.newTestRunner(consumeKafka);
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka.TOPIC, topicName);
+        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
+        runner.setProperty(ConsumeKafka.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
+        runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
+
+        byte[][] values = new byte[][] { 
"Hello-1".getBytes(StandardCharsets.UTF_8),
+                "Hi-2".getBytes(StandardCharsets.UTF_8) };
+        consumeKafka.setValues(values);
+
+        runner.run(1, false);
+        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
+        assertEquals(1, flowFiles.size());
+
+        values = new byte[][] { 
"Здравствуйте-3".getBytes(StandardCharsets.UTF_8),
+                "こんにちは-4".getBytes(StandardCharsets.UTF_8), 
"Hello-5".getBytes(StandardCharsets.UTF_8) };
+        consumeKafka.setValues(values);
+
+        runner.run(1, false);
+
+        flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
+
+        assertEquals(2, flowFiles.size());
+        MockFlowFile flowFile = flowFiles.get(0);
+        String[] events = new String(flowFile.toByteArray(), 
StandardCharsets.UTF_8).split("blah");
+
+        assertEquals(2, events.length);
+
+        flowFile = flowFiles.get(1);
+        events = new String(flowFile.toByteArray(), 
StandardCharsets.UTF_8).split("blah");
+
+        assertEquals(3, events.length);
+        // spot check
+        assertEquals("Здравствуйте-3", events[0]);
+        assertEquals("こんにちは-4", events[1]);
+        assertEquals("Hello-5", events[2]);
+
+        consumeKafka.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
new file mode 100644
index 0000000..2c45d37
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import 
org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult;
+import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
+import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.ConsumerTimeoutException;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+
+public class KafkaPublisherTest {
+
+    private static EmbeddedKafka kafkaLocal;
+
+    private static EmbeddedKafkaProducerHelper producerHelper;
+
+    @BeforeClass
+    public static void beforeClass() {
+        kafkaLocal = new EmbeddedKafka();
+        kafkaLocal.start();
+        producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal);
+    }
+
+    @AfterClass
+    public static void afterClass() throws Exception {
+        producerHelper.close();
+        kafkaLocal.stop();
+    }
+
+    @Test
+    public void validateSuccessfulSendAsWhole() throws Exception {
+        InputStream contentStream = new ByteArrayInputStream("Hello 
Kafka".getBytes(StandardCharsets.UTF_8));
+        String topicName = "validateSuccessfulSendAsWhole";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+
+        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
+        KafkaPublisherResult result = publisher.publish(publishingContext);
+
+        assertEquals(0, result.getLastMessageAcked());
+        assertEquals(1, result.getMessagesSent());
+        contentStream.close();
+        publisher.close();
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        assertNotNull(iter.next());
+        try {
+            iter.next();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+    }
+
+    @Test
+    public void validateSuccessfulSendAsDelimited() throws Exception {
+        InputStream contentStream = new ByteArrayInputStream(
+                "Hello Kafka\nHello Kafka\nHello Kafka\nHello 
Kafka\n".getBytes(StandardCharsets.UTF_8));
+        String topicName = "validateSuccessfulSendAsDelimited";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+
+        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
+        
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
+        KafkaPublisherResult result = publisher.publish(publishingContext);
+
+        assertEquals(3, result.getLastMessageAcked());
+        assertEquals(4, result.getMessagesSent());
+        contentStream.close();
+        publisher.close();
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        assertNotNull(iter.next());
+        assertNotNull(iter.next());
+        assertNotNull(iter.next());
+        assertNotNull(iter.next());
+        try {
+            iter.next();
+            fail();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+    }
+
+    /*
+     * This test simulates the condition where not all messages were ACKed by
+     * Kafka
+     */
+    @Test
+    public void validateRetries() throws Exception {
+        byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello 
Kafka4\n".getBytes(StandardCharsets.UTF_8);
+        InputStream contentStream = new ByteArrayInputStream(testValue);
+        String topicName = "validateSuccessfulReSendOfFailedSegments";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+
+        // simulates the first re-try
+        int lastAckedMessageIndex = 1;
+        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName, lastAckedMessageIndex);
+        
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
+
+        publisher.publish(publishingContext);
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        String m1 = new String(iter.next().message());
+        String m2 = new String(iter.next().message());
+        assertEquals("Hello Kafka3", m1);
+        assertEquals("Hello Kafka4", m2);
+        try {
+            iter.next();
+            fail();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+
+        // simulates the second re-try
+        lastAckedMessageIndex = 2;
+        contentStream = new ByteArrayInputStream(testValue);
+        publishingContext = new PublishingContext(contentStream, topicName, 
lastAckedMessageIndex);
+        
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
+        publisher.publish(publishingContext);
+
+        m1 = new String(iter.next().message());
+        assertEquals("Hello Kafka4", m1);
+
+        publisher.close();
+    }
+
+    /*
+     * Similar to the above test, but it sets the first retry index to the last
+     * possible message index and second index to an out of bound index. The
+     * expectation is that no messages will be sent to Kafka
+     */
+    @Test
+    public void validateRetriesWithWrongIndex() throws Exception {
+        byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello 
Kafka4\n".getBytes(StandardCharsets.UTF_8);
+        InputStream contentStream = new ByteArrayInputStream(testValue);
+        String topicName = "validateRetriesWithWrongIndex";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+
+        // simulates the first re-try
+        int lastAckedMessageIndex = 3;
+        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName, lastAckedMessageIndex);
+        
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
+
+        publisher.publish(publishingContext);
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        try {
+            iter.next();
+            fail();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+
+        // simulates the second re-try
+        lastAckedMessageIndex = 6;
+        contentStream = new ByteArrayInputStream(testValue);
+        publishingContext = new PublishingContext(contentStream, topicName, 
lastAckedMessageIndex);
+        
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
+        publisher.publish(publishingContext);
+        try {
+            iter.next();
+            fail();
+        } catch (ConsumerTimeoutException e) {
+            // that's OK since this is the Kafka mechanism to unblock
+        }
+
+        publisher.close();
+    }
+
+    @Test
+    public void validateWithMultiByteCharactersNoDelimiter() throws Exception {
+        String data = "僠THIS IS MY NEW TEXT.僠IT HAS A NEWLINE.";
+        InputStream contentStream = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+        String topicName = "validateWithMultiByteCharacters";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
+
+        publisher.publish(publishingContext);
+        publisher.close();
+
+        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
+        String r = new String(iter.next().message(), StandardCharsets.UTF_8);
+        assertEquals(data, r);
+    }
+
+    @Test
+    public void validateWithNonDefaultPartitioner() throws Exception {
+        String data = "fooandbarandbaz";
+        InputStream contentStream = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
+        String topicName = "validateWithNonDefaultPartitioner";
+
+        Properties kafkaProperties = this.buildProducerProperties();
+        kafkaProperties.setProperty("partitioner.class", 
TestPartitioner.class.getName());
+        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties);
+        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
+        
publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8));
+
+        try {
+            publisher.publish(publishingContext);
+            // partitioner should be invoked 3 times
+            assertTrue(TestPartitioner.counter == 3);
+            publisher.close();
+        } finally {
+            TestPartitioner.counter = 0;
+        }
+    }
+
+    private Properties buildProducerProperties() {
+        Properties kafkaProperties = new Properties();
+        kafkaProperties.put("key.serializer", 
ByteArraySerializer.class.getName());
+        kafkaProperties.put("value.serializer", 
ByteArraySerializer.class.getName());
+        kafkaProperties.setProperty("bootstrap.servers", "localhost:" + 
kafkaLocal.getKafkaPort());
+        kafkaProperties.put("auto.create.topics.enable", "true");
+        return kafkaProperties;
+    }
+
+    private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) {
+        Properties props = new Properties();
+        props.put("zookeeper.connect", "localhost:" + 
kafkaLocal.getZookeeperPort());
+        props.put("group.id", "test");
+        props.put("consumer.timeout.ms", "500");
+        props.put("auto.offset.reset", "smallest");
+        ConsumerConfig consumerConfig = new ConsumerConfig(props);
+        ConsumerConnector consumer = 
Consumer.createJavaConsumerConnector(consumerConfig);
+        Map<String, Integer> topicCountMap = new HashMap<>(1);
+        topicCountMap.put(topic, 1);
+        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
+        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
+        ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator();
+        return iter;
+    }
+
+    public static class TestPartitioner implements Partitioner {
+        static int counter;
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+            // nothing to do, test
+        }
+
+        @Override
+        public int partition(String topic, Object key, byte[] keyBytes, Object 
value, byte[] valueBytes,
+                Cluster cluster) {
+            counter++;
+            return 0;
+        }
+
+        @Override
+        public void close() {
+            counter = 0;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
new file mode 100644
index 0000000..e669df7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+// The test is valid and should be ran when working on this module. @Ignore is
+// to speed up the overall build
+public class PublishKafkaTest {
+
+    @Test
+    public void validatePropertiesValidation() throws Exception {
+        PublishKafka publishKafka = new PublishKafka();
+        TestRunner runner = TestRunners.newTestRunner(publishKafka);
+        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(PublishKafka.TOPIC, "foo");
+        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
+        runner.setProperty(PublishKafka.META_WAIT_TIME, "foo");
+
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("'max.block.ms' validated 
against 'foo' is invalid"));
+        }
+    }
+
+    @Test
+    public void validateCustomValidation() {
+        String topicName = "validateCustomValidation";
+        PublishKafka publishKafka = new PublishKafka();
+
+        /*
+         * Validates that Kerberos principle is required if one of SASL set for
+         * secirity protocol
+         */
+        TestRunner runner = TestRunners.newTestRunner(publishKafka);
+        runner.setProperty(PublishKafka.TOPIC, topicName);
+        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
+        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka.SECURITY_PROTOCOL, 
PublishKafka.SEC_SASL_PLAINTEXT);
+        try {
+            runner.run();
+            fail();
+        } catch (Throwable e) {
+            assertTrue(e.getMessage().contains("'Kerberos Service Name' is 
invalid because"));
+        }
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateSingleCharacterDemarcatedMessages() {
+        String topicName = "validateSingleCharacterDemarcatedMessages";
+        StubPublishKafka putKafka = new StubPublishKafka();
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka.TOPIC, topicName);
+        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
+        runner.setProperty(PublishKafka.KEY, "key1");
+        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
+
+        runner.enqueue("Hello 
World\nGoodbye\n1\n2\n3\n4\n5".getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+        assertEquals(0, runner.getQueueSize().getObjectCount());
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateMultiCharacterDemarcatedMessagesAndCustomPartitioner() 
{
+        String topicName = 
"validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
+        StubPublishKafka putKafka = new StubPublishKafka();
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka.TOPIC, topicName);
+        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
+        runner.setProperty(PublishKafka.KEY, "key1");
+        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka.PARTITION_CLASS, 
Partitioners.RoundRobinPartitioner.class.getName());
+        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "foo");
+
+        runner.enqueue("Hello 
WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+        assertEquals(0, runner.getQueueSize().getObjectCount());
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
+
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateOnSendFailureAndThenResendSuccess() throws Exception {
+        String topicName = "validateSendFailureAndThenResendSuccess";
+        StubPublishKafka putKafka = new StubPublishKafka();
+
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka.TOPIC, topicName);
+        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
+        runner.setProperty(PublishKafka.KEY, "key1");
+        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
+        runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
+
+        final String text = "Hello World\nGoodbye\nfail\n2";
+        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+        assertEquals(1, runner.getQueueSize().getObjectCount()); // due to 
failure
+        runner.run(1, false);
+        assertEquals(0, runner.getQueueSize().getObjectCount());
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateOnFutureGetFailureAndThenResendSuccess() throws 
Exception {
+        String topicName = "validateSendFailureAndThenResendSuccess";
+        StubPublishKafka putKafka = new StubPublishKafka();
+
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka.TOPIC, topicName);
+        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
+        runner.setProperty(PublishKafka.KEY, "key1");
+        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
+        runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
+
+        final String text = "Hello World\nGoodbye\nfuturefail\n2";
+        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+        MockFlowFile ff = 
runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).get(0);
+        assertNotNull(ff);
+        runner.enqueue(ff);
+
+        runner.run(1, false);
+        assertEquals(0, runner.getQueueSize().getObjectCount());
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        // 6 sends due to duplication
+        verify(producer, times(6)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateDemarcationIntoEmptyMessages() {
+        String topicName = "validateDemarcationIntoEmptyMessages";
+        StubPublishKafka putKafka = new StubPublishKafka();
+        final TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka.TOPIC, topicName);
+        runner.setProperty(PublishKafka.KEY, "key1");
+        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
+        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
+
+        final byte[] bytes = 
"\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(StandardCharsets.UTF_8);
+        runner.enqueue(bytes);
+        runner.run(1);
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateComplexRightPartialDemarcatedMessages() {
+        String topicName = "validateComplexRightPartialDemarcatedMessages";
+        StubPublishKafka putKafka = new StubPublishKafka();
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka.TOPIC, topicName);
+        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
+        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "僠<僠
WILDSTUFF僠>僠");
+
+        runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠
WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠
>".getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(3)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateComplexLeftPartialDemarcatedMessages() {
+        String topicName = "validateComplexLeftPartialDemarcatedMessages";
+        StubPublishKafka putKafka = new StubPublishKafka();
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka.TOPIC, topicName);
+        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
+        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "僠<僠
WILDSTUFF僠>僠");
+
+        runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠
WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>僠<僠WILDSTUFF僠>僠
".getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+
+        runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void validateComplexPartialMatchDemarcatedMessages() {
+        String topicName = "validateComplexPartialMatchDemarcatedMessages";
+        StubPublishKafka putKafka = new StubPublishKafka();
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PublishKafka.TOPIC, topicName);
+        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
+        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
+        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "僠<僠
WILDSTUFF僠>僠");
+
+        runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠
WILDBOOMSTUFF僠>僠".getBytes(StandardCharsets.UTF_8));
+        runner.run(1, false);
+
+        runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
+        Producer<byte[], byte[]> producer = putKafka.getProducer();
+        verify(producer, times(2)).send(Mockito.any(ProducerRecord.class));
+        runner.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
new file mode 100644
index 0000000..5cee6bc
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Test;
+
+public class PublishingContextTest {
+
+    @Test
+    public void failInvalidConstructorArgs() {
+        try {
+            new PublishingContext(null, null);
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+        try {
+            new PublishingContext(mock(InputStream.class), null);
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+
+        try {
+            new PublishingContext(mock(InputStream.class), "");
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+
+        try {
+            new PublishingContext(mock(InputStream.class), "mytopic", -3);
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+    }
+
+    @Test
+    public void validateFullSetting() {
+        PublishingContext publishingContext = new 
PublishingContext(mock(InputStream.class), "topic", 3);
+        
publishingContext.setDelimiterBytes("delimiter".getBytes(StandardCharsets.UTF_8));
+        publishingContext.setKeyBytes("key".getBytes(StandardCharsets.UTF_8));
+
+        assertEquals("delimiter", new 
String(publishingContext.getDelimiterBytes(), StandardCharsets.UTF_8));
+        assertEquals("key", new String(publishingContext.getKeyBytes(), 
StandardCharsets.UTF_8));
+        assertEquals("topic", publishingContext.getTopic());
+        assertEquals("topic: 'topic'; delimiter: 'delimiter'", 
publishingContext.toString());
+    }
+
+    @Test
+    public void validateOnlyOnceSetPerInstance() {
+        PublishingContext publishingContext = new 
PublishingContext(mock(InputStream.class), "topic");
+        publishingContext.setKeyBytes(new byte[] { 0 });
+        try {
+            publishingContext.setKeyBytes(new byte[] { 0 });
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+
+        publishingContext.setDelimiterBytes(new byte[] { 0 });
+        try {
+            publishingContext.setDelimiterBytes(new byte[] { 0 });
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+
+        publishingContext.setMaxRequestSize(1024);
+        try {
+            publishingContext.setMaxRequestSize(1024);
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+
+        try {
+            publishingContext.setMaxRequestSize(-10);
+            fail();
+        } catch (IllegalArgumentException e) {
+            // success
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubConsumeKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubConsumeKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubConsumeKafka.java
new file mode 100644
index 0000000..017ad70
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubConsumeKafka.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class StubConsumeKafka extends ConsumeKafka {
+
+    private byte[][] values;
+
+    public void setValues(byte[][] values) {
+        this.values = values;
+    }
+
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected Consumer<byte[], byte[]> buildKafkaResource(ProcessContext 
context, ProcessSession session) {
+        Consumer<byte[], byte[]> consumer = super.buildKafkaResource(context, 
session);
+        consumer = mock(Consumer.class);
+        final String topicName = 
context.getProperty(TOPIC).evaluateAttributeExpressions().getValue();
+
+        when(consumer.poll(Mockito.anyLong())).thenAnswer(new 
Answer<ConsumerRecords<byte[], byte[]>>() {
+            @Override
+            public ConsumerRecords<byte[], byte[]> answer(InvocationOnMock 
invocation) throws Throwable {
+                List<ConsumerRecord<byte[], byte[]>> records = new 
ArrayList<>();
+                for (int i = 0; i < StubConsumeKafka.this.values.length; i++) {
+                    byte[] value = StubConsumeKafka.this.values[i];
+                    ConsumerRecord<byte[], byte[]> record = new 
ConsumerRecord<>(topicName, 0, 0, null, value);
+                    records.add(record);
+                }
+                TopicPartition partition = new TopicPartition(topicName, 0);
+                Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> m = 
new LinkedHashMap<>();
+                m.put(partition, records);
+                return new ConsumerRecords<>(m);
+            }
+        });
+
+        return consumer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
new file mode 100644
index 0000000..cf258b8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class StubPublishKafka extends PublishKafka {
+
+    private volatile Producer<byte[], byte[]> producer;
+
+    private volatile boolean failed;
+
+    public Producer<byte[], byte[]> getProducer() {
+        return producer;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected KafkaPublisher buildKafkaResource(ProcessContext context, 
ProcessSession session)
+            throws ProcessException {
+        Properties kafkaProperties = this.buildKafkaProperties(context);
+        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
+        KafkaPublisher publisher;
+        try {
+            Field f = PublishKafka.class.getDeclaredField("brokers");
+            f.setAccessible(true);
+            f.set(this, 
context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue());
+            publisher = (KafkaPublisher) 
TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class);
+            producer = mock(Producer.class);
+            this.instrumentProducer(producer, false);
+            Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer");
+            kf.setAccessible(true);
+            kf.set(publisher, producer);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+        return publisher;
+    }
+
+    @SuppressWarnings("unchecked")
+    private void instrumentProducer(Producer<byte[], byte[]> producer, boolean 
failRandomly) {
+        when(producer.send(Mockito.any(ProducerRecord.class))).then(new 
Answer<Future<RecordMetadata>>() {
+            @SuppressWarnings("rawtypes")
+            @Override
+            public Future<RecordMetadata> answer(InvocationOnMock invocation) 
throws Throwable {
+                ProducerRecord<byte[], byte[]> record = 
(ProducerRecord<byte[], byte[]>) invocation.getArguments()[0];
+                String value = new String(record.value(), 
StandardCharsets.UTF_8);
+                if ("fail".equals(value) && !StubPublishKafka.this.failed) {
+                    StubPublishKafka.this.failed = true;
+                    throw new RuntimeException("intentional");
+                }
+                Future future = mock(Future.class);
+                if ("futurefail".equals(value) && 
!StubPublishKafka.this.failed) {
+                    StubPublishKafka.this.failed = true;
+                    when(future.get(Mockito.anyLong(), 
Mockito.any(TimeUnit.class))).thenThrow(ExecutionException.class);
+                }
+                return future;
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
new file mode 100644
index 0000000..b056a08
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import sun.misc.Unsafe;
+
+class TestUtils {
+
+    public static void setFinalField(Field field, Object instance, Object 
newValue) throws Exception {
+        field.setAccessible(true);
+        Field modifiersField = Field.class.getDeclaredField("modifiers");
+        modifiersField.setAccessible(true);
+        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+
+        field.set(instance, newValue);
+    }
+
+
+    static Unsafe getUnsafe() {
+        try {
+            Field f = Unsafe.class.getDeclaredField("theUnsafe");
+            f.setAccessible(true);
+            return (Unsafe) f.get(null);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
new file mode 100644
index 0000000..802f889
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Properties;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+
+/**
+ * Embedded Kafka server, primarily to be used for testing.
+ */
+public class EmbeddedKafka {
+
+    private final KafkaServerStartable kafkaServer;
+
+    private final Properties zookeeperConfig;
+
+    private final Properties kafkaConfig;
+
+    private final ZooKeeperServer zkServer;
+
+    private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class);
+
+    private final int kafkaPort;
+
+    private final int zookeeperPort;
+
+    private boolean started;
+
+    /**
+     * Will create instance of the embedded Kafka server. Kafka and Zookeeper
+     * configuration properties will be loaded from 'server.properties' and
+     * 'zookeeper.properties' located at the root of the classpath.
+     */
+    public EmbeddedKafka() {
+        this(loadPropertiesFromClasspath("/server.properties"), 
loadPropertiesFromClasspath("/zookeeper.properties"));
+    }
+
+    /**
+     * Will create instance of the embedded Kafka server.
+     *
+     * @param kafkaConfig
+     *            Kafka configuration properties
+     * @param zookeeperConfig
+     *            Zookeeper configuration properties
+     */
+    public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) {
+        this.cleanupKafkaWorkDir();
+        this.zookeeperConfig = zookeeperConfig;
+        this.kafkaConfig = kafkaConfig;
+        this.kafkaPort = this.availablePort();
+        this.zookeeperPort = this.availablePort();
+
+        this.kafkaConfig.setProperty("port", String.valueOf(this.kafkaPort));
+        this.kafkaConfig.setProperty("zookeeper.connect", "localhost:" + 
this.zookeeperPort);
+        this.zookeeperConfig.setProperty("clientPort", 
String.valueOf(this.zookeeperPort));
+        this.zkServer = new ZooKeeperServer();
+        this.kafkaServer = new KafkaServerStartable(new 
KafkaConfig(kafkaConfig));
+    }
+
+    /**
+     *
+     * @return port for Kafka server
+     */
+    public int getKafkaPort() {
+        if (!this.started) {
+            throw new IllegalStateException("Kafka server is not started. 
Kafka port can't be determined.");
+        }
+        return this.kafkaPort;
+    }
+
+    /**
+     *
+     * @return port for Zookeeper server
+     */
+    public int getZookeeperPort() {
+        if (!this.started) {
+            throw new IllegalStateException("Kafka server is not started. 
Zookeeper port can't be determined.");
+        }
+        return this.zookeeperPort;
+    }
+
+    /**
+     * Will start embedded Kafka server. Its data directories will be created
+     * at 'kafka-tmp' directory relative to the working directory of the 
current
+     * runtime. The data directories will be deleted upon JVM exit.
+     *
+     */
+    public void start() {
+        if (!this.started) {
+            logger.info("Starting Zookeeper server");
+            this.startZookeeper();
+
+            logger.info("Starting Kafka server");
+            this.kafkaServer.startup();
+
+            logger.info("Embeded Kafka is started at localhost:" + 
this.kafkaServer.serverConfig().port()
+                    + ". Zookeeper connection string: " + 
this.kafkaConfig.getProperty("zookeeper.connect"));
+            this.started = true;
+        }
+    }
+
+    /**
+     * Will stop embedded Kafka server, cleaning up all working directories.
+     */
+    public void stop() {
+        if (this.started) {
+            logger.info("Shutting down Kafka server");
+            this.kafkaServer.shutdown();
+            this.kafkaServer.awaitShutdown();
+            logger.info("Shutting down Zookeeper server");
+            this.shutdownZookeeper();
+            logger.info("Embeded Kafka is shut down.");
+            this.cleanupKafkaWorkDir();
+            this.started = false;
+        }
+    }
+
+    /**
+     *
+     */
+    private void cleanupKafkaWorkDir() {
+        File kafkaTmp = new File("target/kafka-tmp");
+        try {
+            FileUtils.deleteDirectory(kafkaTmp);
+        } catch (Exception e) {
+            logger.warn("Failed to delete " + kafkaTmp.getAbsolutePath());
+        }
+    }
+
+    /**
+     * Will start Zookeeper server via {@link ServerCnxnFactory}
+     */
+    private void startZookeeper() {
+        QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
+        try {
+            quorumConfiguration.parseProperties(this.zookeeperConfig);
+
+            ServerConfig configuration = new ServerConfig();
+            configuration.readFrom(quorumConfiguration);
+
+            FileTxnSnapLog txnLog = new FileTxnSnapLog(new 
File(configuration.getDataLogDir()), new File(configuration.getDataDir()));
+
+            zkServer.setTxnLogFactory(txnLog);
+            zkServer.setTickTime(configuration.getTickTime());
+            
zkServer.setMinSessionTimeout(configuration.getMinSessionTimeout());
+            
zkServer.setMaxSessionTimeout(configuration.getMaxSessionTimeout());
+            ServerCnxnFactory zookeeperConnectionFactory = 
ServerCnxnFactory.createFactory();
+            
zookeeperConnectionFactory.configure(configuration.getClientPortAddress(),
+                    configuration.getMaxClientCnxns());
+            zookeeperConnectionFactory.startup(zkServer);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to start Zookeeper 
server", e);
+        }
+    }
+
+    /**
+     * Will shut down Zookeeper server.
+     */
+    private void shutdownZookeeper() {
+        zkServer.shutdown();
+    }
+
+    /**
+     * Will load {@link Properties} from properties file discovered at the
+     * provided path relative to the root of the classpath.
+     */
+    private static Properties loadPropertiesFromClasspath(String path) {
+        try {
+            Properties kafkaProperties = new Properties();
+            kafkaProperties.load(Class.class.getResourceAsStream(path));
+            return kafkaProperties;
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    /**
+     * Will determine the available port used by Kafka/Zookeeper servers.
+     */
+    private int availablePort() {
+        ServerSocket s = null;
+        try {
+            s = new ServerSocket(0);
+            s.setReuseAddress(true);
+            return s.getLocalPort();
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to discover available 
port.", e);
+        } finally {
+            try {
+                s.close();
+            } catch (IOException e) {
+                // ignore
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java
new file mode 100644
index 0000000..0ed00fb
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafkaProducerHelper.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.test;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+
+import kafka.producer.KeyedMessage;
+import kafka.producer.OldProducer;
+
+/**
+ * Helper class which helps to produce events targeting {@link EmbeddedKafka}
+ * server.
+ */
+public class EmbeddedKafkaProducerHelper implements Closeable {
+
+    private final EmbeddedKafka kafkaServer;
+
+    private final OldProducer producer;
+
+    /**
+     * Will create an instance of EmbeddedKafkaProducerHelper based on default
+     * configurations.<br>
+     * Default configuration includes:<br>
+     * <i>
+     * metadata.broker.list=[determined from the instance of EmbeddedKafka]<br>
+     * serializer.class=kafka.serializer.DefaultEncoder<br>
+     * key.serializer.class=kafka.serializer.DefaultEncoder<br>
+     * auto.create.topics.enable=true
+     * </i><br>
+     * <br>
+     * If you wish to supply additional configuration properties or override
+     * existing use
+     * {@link 
EmbeddedKafkaProducerHelper#EmbeddedKafkaProducerHelper(EmbeddedKafka, 
Properties)}
+     * constructor.
+     *
+     * @param kafkaServer
+     *            instance of {@link EmbeddedKafka}
+     */
+    public EmbeddedKafkaProducerHelper(EmbeddedKafka kafkaServer) {
+        this(kafkaServer, null);
+    }
+
+    /**
+     * Will create an instance of EmbeddedKafkaProducerHelper based on default
+     * configurations and additional configuration properties.<br>
+     * Default configuration includes:<br>
+     * metadata.broker.list=[determined from the instance of EmbeddedKafka]<br>
+     * serializer.class=kafka.serializer.DefaultEncoder<br>
+     * key.serializer.class=kafka.serializer.DefaultEncoder<br>
+     * auto.create.topics.enable=true<br>
+     * <br>
+     *
+     * @param kafkaServer
+     *            instance of {@link EmbeddedKafka}
+     * @param additionalProperties
+     *            instance of {@link Properties} specifying additional producer
+     *            configuration properties.
+     */
+    public EmbeddedKafkaProducerHelper(EmbeddedKafka kafkaServer, Properties 
additionalProperties) {
+        this.kafkaServer = kafkaServer;
+        Properties producerProperties = new Properties();
+        producerProperties.put("metadata.broker.list", "localhost:" + 
this.kafkaServer.getKafkaPort());
+        producerProperties.put("serializer.class", 
"kafka.serializer.DefaultEncoder");
+        producerProperties.put("key.serializer.class", 
"kafka.serializer.DefaultEncoder");
+        producerProperties.put("auto.create.topics.enable", "true");
+        if (additionalProperties != null) {
+            producerProperties.putAll(additionalProperties);
+        }
+        this.producer = new OldProducer(producerProperties);
+    }
+
+    /**
+     * Will send an event to a Kafka topic. If topic doesn't exist it will be
+     * auto-created.
+     *
+     * @param topicName
+     *            Kafka topic name.
+     * @param event
+     *            string representing an event(message) to be sent to Kafka.
+     */
+    public void sendEvent(String topicName, String event) {
+        KeyedMessage<byte[], byte[]> data = new KeyedMessage<byte[], 
byte[]>(topicName, event.getBytes());
+        this.producer.send(data.topic(), data.key(), data.message());
+    }
+
+    /**
+     * Will close the underlying Kafka producer.
+     */
+    @Override
+    public void close() throws IOException {
+        this.producer.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/640b7021/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/resources/log4j.properties
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/resources/log4j.properties
new file mode 100644
index 0000000..57cd63f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootCategory=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - 
%m%n
+
+#og4j.category.org.apache.nifi.processors.kafka=DEBUG

Reply via email to