http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
new file mode 100644
index 0000000..232af26
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.provenance.ProvenanceReporter;
+import org.apache.nifi.processors.kafka.pubsub.ConsumerPool.PoolStats;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class ConsumerPoolTest {
+
+    private Consumer<byte[], byte[]> consumer = null;
+    private ProcessSession mockSession = null;
+    private ProcessContext mockContext = Mockito.mock(ProcessContext.class);
+    private ProvenanceReporter mockReporter = null;
+    private ConsumerPool testPool = null;
+    private ConsumerPool testDemarcatedPool = null;
+    private ComponentLog logger = null;
+
+    @Before
+    @SuppressWarnings("unchecked")
+    public void setup() {
+        consumer = mock(Consumer.class);
+        logger = mock(ComponentLog.class);
+        mockSession = mock(ProcessSession.class);
+        mockReporter = mock(ProvenanceReporter.class);
+        when(mockSession.getProvenanceReporter()).thenReturn(mockReporter);
+        testPool = new ConsumerPool(
+                1,
+                null,
+                Collections.emptyMap(),
+                Collections.singletonList("nifi"),
+                100L,
+                "utf-8",
+                "ssl",
+                "localhost",
+                logger,
+                true,
+                StandardCharsets.UTF_8,
+                null) {
+            @Override
+            protected Consumer<byte[], byte[]> createKafkaConsumer() {
+                return consumer;
+            }
+        };
+        testDemarcatedPool = new ConsumerPool(
+                1,
+                "--demarcator--".getBytes(StandardCharsets.UTF_8),
+                Collections.emptyMap(),
+                Collections.singletonList("nifi"),
+                100L,
+                "utf-8",
+                "ssl",
+                "localhost",
+                logger,
+                true,
+                StandardCharsets.UTF_8,
+                Pattern.compile(".*")) {
+            @Override
+            protected Consumer<byte[], byte[]> createKafkaConsumer() {
+                return consumer;
+            }
+        };
+    }
+
+    @Test
+    public void validatePoolSimpleCreateClose() throws Exception {
+
+        
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, 
new byte[][]{}));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, 
mockContext)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, 
mockContext)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, 
mockContext)) {
+            lease.poll();
+        }
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, 
mockContext)) {
+            lease.poll();
+        }
+        testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
+        final PoolStats stats = testPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(4, stats.leasesObtainedCount);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void validatePoolSimpleCreatePollClose() throws Exception {
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
+        };
+        final ConsumerRecords<byte[], byte[]> firstRecs = 
createConsumerRecords("foo", 1, 1L, firstPassValues);
+
+        when(consumer.poll(anyLong())).thenReturn(firstRecs, 
createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, 
mockContext)) {
+            lease.poll();
+            lease.commit();
+        }
+        testPool.close();
+        verify(mockSession, times(3)).create();
+        verify(mockSession, times(1)).commit();
+        final PoolStats stats = testPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+    }
+
+    @Test
+    public void validatePoolSimpleBatchCreateClose() throws Exception {
+        
when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, 
new byte[][]{}));
+        for (int i = 0; i < 100; i++) {
+            try (final ConsumerLease lease = 
testPool.obtainConsumer(mockSession, mockContext)) {
+                for (int j = 0; j < 100; j++) {
+                    lease.poll();
+                }
+            }
+        }
+        testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
+        final PoolStats stats = testPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(100, stats.leasesObtainedCount);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void validatePoolBatchCreatePollClose() throws Exception {
+        final byte[][] firstPassValues = new byte[][]{
+            "Hello-1".getBytes(StandardCharsets.UTF_8),
+            "Hello-2".getBytes(StandardCharsets.UTF_8),
+            "Hello-3".getBytes(StandardCharsets.UTF_8)
+        };
+        final ConsumerRecords<byte[], byte[]> firstRecs = 
createConsumerRecords("foo", 1, 1L, firstPassValues);
+
+        when(consumer.poll(anyLong())).thenReturn(firstRecs, 
createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
+        try (final ConsumerLease lease = 
testDemarcatedPool.obtainConsumer(mockSession, mockContext)) {
+            lease.poll();
+            lease.commit();
+        }
+        testDemarcatedPool.close();
+        verify(mockSession, times(1)).create();
+        verify(mockSession, times(1)).commit();
+        final PoolStats stats = testDemarcatedPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+    }
+
+    @Test
+    public void validatePoolConsumerFails() throws Exception {
+
+        when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
+        try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, 
mockContext)) {
+            try {
+                lease.poll();
+                fail();
+            } catch (final KafkaException ke) {
+
+            }
+        }
+        testPool.close();
+        verify(mockSession, times(0)).create();
+        verify(mockSession, times(0)).commit();
+        final PoolStats stats = testPool.getPoolStats();
+        assertEquals(1, stats.consumerCreatedCount);
+        assertEquals(1, stats.consumerClosedCount);
+        assertEquals(1, stats.leasesObtainedCount);
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    static ConsumerRecords<byte[], byte[]> createConsumerRecords(final String 
topic, final int partition, final long startingOffset, final byte[][] 
rawRecords) {
+        final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> map = 
new HashMap<>();
+        final TopicPartition tPart = new TopicPartition(topic, partition);
+        final List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+        long offset = startingOffset;
+        for (final byte[] rawRecord : rawRecords) {
+            final ConsumerRecord<byte[], byte[]> rec = new 
ConsumerRecord(topic, partition, offset++, 
UUID.randomUUID().toString().getBytes(), rawRecord);
+            records.add(rec);
+        }
+        map.put(tPart, records);
+        return new ConsumerRecords(map);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java
new file mode 100644
index 0000000..d370fec
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestConsumeKafkaRecord_1_0 {
+
+    private ConsumerLease mockLease = null;
+    private ConsumerPool mockConsumerPool = null;
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws InitializationException {
+        mockLease = mock(ConsumerLease.class);
+        mockConsumerPool = mock(ConsumerPool.class);
+
+        ConsumeKafkaRecord_1_0 proc = new ConsumeKafkaRecord_1_0() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+
+        runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"okeydokey:1234");
+
+        final String readerId = "record-reader";
+        final MockRecordParser readerService = new MockRecordParser();
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+        runner.addControllerService(readerId, readerService);
+        runner.enableControllerService(readerService);
+
+        final String writerId = "record-writer";
+        final RecordSetWriterFactory writerService = new 
MockRecordWriter("name, age");
+        runner.addControllerService(writerId, writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(ConsumeKafkaRecord_1_0.RECORD_READER, readerId);
+        runner.setProperty(ConsumeKafkaRecord_1_0.RECORD_WRITER, writerId);
+    }
+
+    @Test
+    public void validateCustomValidatorSettings() throws Exception {
+        runner.setProperty(ConsumeKafkaRecord_1_0.TOPICS, "foo");
+        runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafkaRecord_1_0.AUTO_OFFSET_RESET, 
ConsumeKafkaRecord_1_0.OFFSET_EARLIEST);
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"Foo");
+        runner.assertNotValid();
+        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class.getName());
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        runner.assertValid();
+        runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void validatePropertiesValidation() throws Exception {
+        runner.setProperty(ConsumeKafkaRecord_1_0.TOPICS, "foo");
+        runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafkaRecord_1_0.AUTO_OFFSET_RESET, 
ConsumeKafkaRecord_1_0.OFFSET_EARLIEST);
+
+        runner.removeProperty(ConsumeKafkaRecord_1_0.GROUP_ID);
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("invalid because Group ID is 
required"));
+        }
+
+        runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, "");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
+        }
+
+        runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, "  ");
+        try {
+            runner.assertValid();
+            fail();
+        } catch (AssertionError e) {
+            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
+        }
+    }
+
+    @Test
+    public void validateGetAllMessages() throws Exception {
+        String groupName = "validateGetAllMessages";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, 
Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        runner.setProperty(ConsumeKafkaRecord_1_0.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafkaRecord_1_0.AUTO_OFFSET_RESET, 
ConsumeKafkaRecord_1_0.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void validateGetAllMessagesPattern() throws Exception {
+        String groupName = "validateGetAllMessagesPattern";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, 
Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        runner.setProperty(ConsumeKafkaRecord_1_0.TOPICS, "(fo.*)|(ba)");
+        runner.setProperty(ConsumeKafkaRecord_1_0.TOPIC_TYPE, "pattern");
+        runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafkaRecord_1_0.AUTO_OFFSET_RESET, 
ConsumeKafkaRecord_1_0.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void validateGetErrorMessages() throws Exception {
+        String groupName = "validateGetErrorMessages";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(true, false);
+        when(mockLease.commit()).thenReturn(Boolean.FALSE);
+
+        runner.setProperty(ConsumeKafkaRecord_1_0.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafkaRecord_1_0.AUTO_OFFSET_RESET, 
ConsumeKafkaRecord_1_0.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
+        verify(mockLease, times(2)).continuePolling();
+        verify(mockLease, times(1)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void testJaasConfiguration() throws Exception {
+        runner.setProperty(ConsumeKafkaRecord_1_0.TOPICS, "foo");
+        runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, "foo");
+        runner.setProperty(ConsumeKafkaRecord_1_0.AUTO_OFFSET_RESET, 
ConsumeKafkaRecord_1_0.OFFSET_EARLIEST);
+
+        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, 
KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.KERBEROS_PRINCIPLE, "kafka");
+        runner.assertValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, 
"[email protected]");
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File");
+        runner.assertNotValid();
+
+        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, 
"src/test/resources/server.properties");
+        runner.assertValid();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
new file mode 100644
index 0000000..78c03ec
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestInFlightMessageTracker.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestInFlightMessageTracker {
+
+    @Test(timeout = 5000L)
+    public void testAwaitCompletionWhenComplete() throws InterruptedException, 
TimeoutException {
+        final MockFlowFile flowFile = new MockFlowFile(1L);
+
+        final InFlightMessageTracker tracker = new InFlightMessageTracker(new 
MockComponentLog("1", "unit-test"));
+        tracker.incrementSentCount(flowFile);
+
+        verifyNotComplete(tracker);
+
+        tracker.incrementSentCount(flowFile);
+        verifyNotComplete(tracker);
+
+        tracker.incrementAcknowledgedCount(flowFile);
+        verifyNotComplete(tracker);
+
+        tracker.incrementAcknowledgedCount(flowFile);
+        tracker.awaitCompletion(1L);
+    }
+
+    @Test(timeout = 5000L)
+    public void testAwaitCompletionWhileWaiting() throws InterruptedException, 
ExecutionException {
+        final MockFlowFile flowFile = new MockFlowFile(1L);
+
+        final InFlightMessageTracker tracker = new InFlightMessageTracker(new 
MockComponentLog("1", "unit-test"));
+        tracker.incrementSentCount(flowFile);
+
+        verifyNotComplete(tracker);
+
+        tracker.incrementSentCount(flowFile);
+        verifyNotComplete(tracker);
+
+        final ExecutorService exec = Executors.newFixedThreadPool(1);
+        final Future<?> future = exec.submit(() -> {
+            try {
+                tracker.awaitCompletion(10000L);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        tracker.incrementAcknowledgedCount(flowFile);
+        tracker.incrementAcknowledgedCount(flowFile);
+
+        future.get();
+    }
+
+    private void verifyNotComplete(final InFlightMessageTracker tracker) 
throws InterruptedException {
+        try {
+            tracker.awaitCompletion(10L);
+            Assert.fail("Expected timeout");
+        } catch (final TimeoutException te) {
+            // expected
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
new file mode 100644
index 0000000..6a0c7ce
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPublishKafka {
+    private static final String TOPIC_NAME = "unit-test";
+
+    private PublisherPool mockPool;
+    private PublisherLease mockLease;
+    private TestRunner runner;
+
+    @Before
+    public void setup() {
+        mockPool = mock(PublisherPool.class);
+        mockLease = mock(PublisherLease.class);
+
+        when(mockPool.obtainPublisher()).thenReturn(mockLease);
+
+        runner = TestRunners.newTestRunner(new PublishKafka_1_0() {
+            @Override
+            protected PublisherPool createPublisherPool(final ProcessContext 
context) {
+                return mockPool;
+            }
+        });
+
+        runner.setProperty(PublishKafka_1_0.TOPIC, TOPIC_NAME);
+        runner.setProperty(PublishKafka_1_0.DELIVERY_GUARANTEE, 
PublishKafka_1_0.DELIVERY_REPLICATED);
+    }
+
+    @Test
+    public void testSingleSuccess() throws IOException {
+        final MockFlowFile flowFile = runner.enqueue("hello world");
+
+        
when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 
1));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_SUCCESS, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleSuccess() throws IOException {
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+
+        
when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 
1));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_SUCCESS, 3);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testSingleFailure() throws IOException {
+        final MockFlowFile flowFile = runner.enqueue("hello world");
+
+        
when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_FAILURE, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleFailures() throws IOException {
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+        
when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_FAILURE, 3);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleMessagesPerFlowFile() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 10);
+        msgCounts.put(flowFiles.get(1), 20);
+
+        final PublishResult result = createPublishResult(msgCounts, new 
HashSet<>(flowFiles), Collections.emptyMap());
+
+        when(mockLease.complete()).thenReturn(result);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafka_1_0.REL_SUCCESS, 2);
+
+        verify(mockLease, times(2)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+
+        runner.assertAllFlowFilesContainAttribute("msg.count");
+        assertEquals(1, 
runner.getFlowFilesForRelationship(PublishKafka_1_0.REL_SUCCESS).stream()
+            .filter(ff -> ff.getAttribute("msg.count").equals("10"))
+            .count());
+        assertEquals(1, 
runner.getFlowFilesForRelationship(PublishKafka_1_0.REL_SUCCESS).stream()
+            .filter(ff -> ff.getAttribute("msg.count").equals("20"))
+            .count());
+    }
+
+
+    @Test
+    public void testSomeSuccessSomeFailure() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+        flowFiles.add(runner.enqueue("hello world"));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 10);
+        msgCounts.put(flowFiles.get(1), 20);
+
+        final Map<FlowFile, Exception> failureMap = new HashMap<>();
+        failureMap.put(flowFiles.get(2), new RuntimeException("Intentional 
Unit Test Exception"));
+        failureMap.put(flowFiles.get(3), new RuntimeException("Intentional 
Unit Test Exception"));
+
+        final PublishResult result = createPublishResult(msgCounts, new 
HashSet<>(flowFiles.subList(0, 2)), failureMap);
+
+        when(mockLease.complete()).thenReturn(result);
+
+        runner.run();
+        runner.assertTransferCount(PublishKafka_1_0.REL_SUCCESS, 0);
+        runner.assertTransferCount(PublishKafka_1_0.REL_FAILURE, 4);
+
+        verify(mockLease, times(4)).publish(any(FlowFile.class), 
any(InputStream.class), eq(null), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+
+        
assertTrue(runner.getFlowFilesForRelationship(PublishKafka_1_0.REL_FAILURE).stream()
+            .noneMatch(ff -> ff.getAttribute("msg.count") != null));
+    }
+
+
+    private PublishResult createAllSuccessPublishResult(final FlowFile 
successfulFlowFile, final int msgCount) {
+        return 
createAllSuccessPublishResult(Collections.singleton(successfulFlowFile), 
msgCount);
+    }
+
+    private PublishResult createAllSuccessPublishResult(final Set<FlowFile> 
successfulFlowFiles, final int msgCountPerFlowFile) {
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        for (final FlowFile ff : successfulFlowFiles) {
+            msgCounts.put(ff, msgCountPerFlowFile);
+        }
+        return createPublishResult(msgCounts, successfulFlowFiles, 
Collections.emptyMap());
+    }
+
+    private PublishResult createFailurePublishResult(final FlowFile failure) {
+        return createFailurePublishResult(Collections.singleton(failure));
+    }
+
+    private PublishResult createFailurePublishResult(final Set<FlowFile> 
failures) {
+        final Map<FlowFile, Exception> failureMap = 
failures.stream().collect(Collectors.toMap(ff -> ff, ff -> new 
RuntimeException("Intentional Unit Test Exception")));
+        return createPublishResult(Collections.emptyMap(), 
Collections.emptySet(), failureMap);
+    }
+
+    private PublishResult createPublishResult(final Map<FlowFile, Integer> 
msgCounts, final Set<FlowFile> successFlowFiles, final Map<FlowFile, Exception> 
failures) {
+        // sanity check.
+        for (final FlowFile success : successFlowFiles) {
+            if (failures.containsKey(success)) {
+                throw new IllegalArgumentException("Found same FlowFile in 
both 'success' and 'failures' collections: " + success);
+            }
+        }
+
+        return new PublishResult() {
+            @Override
+            public boolean isFailure() {
+                return !failures.isEmpty();
+            }
+
+            @Override
+            public int getSuccessfulMessageCount(FlowFile flowFile) {
+                Integer count = msgCounts.get(flowFile);
+                return count == null ? 0 : count.intValue();
+            }
+
+            @Override
+            public Exception getReasonForFailure(FlowFile flowFile) {
+                return failures.get(flowFile);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
new file mode 100644
index 0000000..45439cc
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_1_0.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
+import org.apache.nifi.processors.kafka.pubsub.util.MockRecordWriter;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestPublishKafkaRecord_1_0 {
+
+    private static final String TOPIC_NAME = "unit-test";
+
+    private PublisherPool mockPool;
+    private PublisherLease mockLease;
+    private TestRunner runner;
+
+    @Before
+    public void setup() throws InitializationException, IOException {
+        mockPool = mock(PublisherPool.class);
+        mockLease = mock(PublisherLease.class);
+        
Mockito.doCallRealMethod().when(mockLease).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class),
+            any(RecordSchema.class), any(String.class), any(String.class));
+
+        when(mockPool.obtainPublisher()).thenReturn(mockLease);
+
+        runner = TestRunners.newTestRunner(new PublishKafkaRecord_1_0() {
+            @Override
+            protected PublisherPool createPublisherPool(final ProcessContext 
context) {
+                return mockPool;
+            }
+        });
+
+        runner.setProperty(PublishKafkaRecord_1_0.TOPIC, TOPIC_NAME);
+
+        final String readerId = "record-reader";
+        final MockRecordParser readerService = new MockRecordParser();
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+        runner.addControllerService(readerId, readerService);
+        runner.enableControllerService(readerService);
+
+        final String writerId = "record-writer";
+        final RecordSetWriterFactory writerService = new 
MockRecordWriter("name, age");
+        runner.addControllerService(writerId, writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(PublishKafkaRecord_1_0.RECORD_READER, readerId);
+        runner.setProperty(PublishKafkaRecord_1_0.RECORD_WRITER, writerId);
+        runner.setProperty(PublishKafka_1_0.DELIVERY_GUARANTEE, 
PublishKafka_1_0.DELIVERY_REPLICATED);
+    }
+
+    @Test
+    public void testSingleSuccess() throws IOException {
+        final MockFlowFile flowFile = runner.enqueue("John Doe, 48");
+
+        
when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFile, 
1));
+
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class), 
any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleSuccess() throws IOException {
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+
+        
when(mockLease.complete()).thenReturn(createAllSuccessPublishResult(flowFiles, 
1));
+
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 3);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class), 
any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testSingleFailure() throws IOException {
+        final MockFlowFile flowFile = runner.enqueue("John Doe, 48");
+
+        
when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
+
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_FAILURE, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class), 
any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleFailures() throws IOException {
+        final Set<FlowFile> flowFiles = new HashSet<>();
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+
+        
when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFiles));
+
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_FAILURE, 3);
+
+        verify(mockLease, times(3)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class), 
any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+    }
+
+    @Test
+    public void testMultipleMessagesPerFlowFile() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 47"));
+        flowFiles.add(runner.enqueue("John Doe, 48\nJane Doe, 29"));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 10);
+        msgCounts.put(flowFiles.get(1), 20);
+
+        final PublishResult result = createPublishResult(msgCounts, new 
HashSet<>(flowFiles), Collections.emptyMap());
+
+        when(mockLease.complete()).thenReturn(result);
+
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 2);
+
+        verify(mockLease, times(2)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class), 
any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(4)).publish(any(FlowFile.class), eq(null), 
any(byte[].class), eq(TOPIC_NAME), any(InFlightMessageTracker.class));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+
+        runner.assertAllFlowFilesContainAttribute("msg.count");
+        assertEquals(1, 
runner.getFlowFilesForRelationship(PublishKafkaRecord_1_0.REL_SUCCESS).stream()
+            .filter(ff -> ff.getAttribute("msg.count").equals("10"))
+            .count());
+        assertEquals(1, 
runner.getFlowFilesForRelationship(PublishKafkaRecord_1_0.REL_SUCCESS).stream()
+            .filter(ff -> ff.getAttribute("msg.count").equals("20"))
+            .count());
+    }
+
+    @Test
+    public void testNoRecordsInFlowFile() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue(new byte[0]));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 0);
+
+        final PublishResult result = createPublishResult(msgCounts, new 
HashSet<>(flowFiles), Collections.emptyMap());
+
+        when(mockLease.complete()).thenReturn(result);
+
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(PublishKafkaRecord_1_0.REL_SUCCESS, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class), 
any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+
+        final MockFlowFile mff = 
runner.getFlowFilesForRelationship(PublishKafkaRecord_1_0.REL_SUCCESS).get(0);
+        mff.assertAttributeEquals("msg.count", "0");
+    }
+
+
+    @Test
+    public void testSomeSuccessSomeFailure() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+        flowFiles.add(runner.enqueue("John Doe, 48"));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 10);
+        msgCounts.put(flowFiles.get(1), 20);
+
+        final Map<FlowFile, Exception> failureMap = new HashMap<>();
+        failureMap.put(flowFiles.get(2), new RuntimeException("Intentional 
Unit Test Exception"));
+        failureMap.put(flowFiles.get(3), new RuntimeException("Intentional 
Unit Test Exception"));
+
+        final PublishResult result = createPublishResult(msgCounts, new 
HashSet<>(flowFiles.subList(0, 2)), failureMap);
+
+        when(mockLease.complete()).thenReturn(result);
+
+        runner.run();
+        runner.assertTransferCount(PublishKafkaRecord_1_0.REL_SUCCESS, 0);
+        runner.assertTransferCount(PublishKafkaRecord_1_0.REL_FAILURE, 4);
+
+        verify(mockLease, times(4)).publish(any(FlowFile.class), 
any(RecordSet.class), any(RecordSetWriterFactory.class), 
any(RecordSchema.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(1)).close();
+
+        
assertTrue(runner.getFlowFilesForRelationship(PublishKafkaRecord_1_0.REL_FAILURE).stream()
+            .noneMatch(ff -> ff.getAttribute("msg.count") != null));
+    }
+
+
+    private PublishResult createAllSuccessPublishResult(final FlowFile 
successfulFlowFile, final int msgCount) {
+        return 
createAllSuccessPublishResult(Collections.singleton(successfulFlowFile), 
msgCount);
+    }
+
+    private PublishResult createAllSuccessPublishResult(final Set<FlowFile> 
successfulFlowFiles, final int msgCountPerFlowFile) {
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        for (final FlowFile ff : successfulFlowFiles) {
+            msgCounts.put(ff, msgCountPerFlowFile);
+        }
+        return createPublishResult(msgCounts, successfulFlowFiles, 
Collections.emptyMap());
+    }
+
+    private PublishResult createFailurePublishResult(final FlowFile failure) {
+        return createFailurePublishResult(Collections.singleton(failure));
+    }
+
+    private PublishResult createFailurePublishResult(final Set<FlowFile> 
failures) {
+        final Map<FlowFile, Exception> failureMap = 
failures.stream().collect(Collectors.toMap(ff -> ff, ff -> new 
RuntimeException("Intentional Unit Test Exception")));
+        return createPublishResult(Collections.emptyMap(), 
Collections.emptySet(), failureMap);
+    }
+
+    private PublishResult createPublishResult(final Map<FlowFile, Integer> 
msgCounts, final Set<FlowFile> successFlowFiles, final Map<FlowFile, Exception> 
failures) {
+        // sanity check.
+        for (final FlowFile success : successFlowFiles) {
+            if (failures.containsKey(success)) {
+                throw new IllegalArgumentException("Found same FlowFile in 
both 'success' and 'failures' collections: " + success);
+            }
+        }
+
+        return new PublishResult() {
+
+            @Override
+            public int getSuccessfulMessageCount(FlowFile flowFile) {
+                Integer count = msgCounts.get(flowFile);
+                return count == null ? 0 : count.intValue();
+            }
+
+            @Override
+            public Exception getReasonForFailure(FlowFile flowFile) {
+                return failures.get(flowFile);
+            }
+
+            @Override
+            public boolean isFailure() {
+                return !failures.isEmpty();
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
new file mode 100644
index 0000000..54c1222
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+
+public class TestPublisherLease {
+    private ComponentLog logger;
+    private Producer<byte[], byte[]> producer;
+
+    @Before
+    @SuppressWarnings("unchecked")
+    public void setup() {
+        logger = Mockito.mock(ComponentLog.class);
+        producer = Mockito.mock(Producer.class);
+    }
+
+    @Test
+    public void testPoisonOnException() throws IOException {
+        final AtomicInteger poisonCount = new AtomicInteger(0);
+
+        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 
1000L, logger, true, null, StandardCharsets.UTF_8) {
+            @Override
+            public void poison() {
+                poisonCount.incrementAndGet();
+                super.poison();
+            }
+        };
+
+        final FlowFile flowFile = new MockFlowFile(1L);
+        final String topic = "unit-test";
+        final byte[] messageKey = null;
+        final byte[] demarcatorBytes = null;
+
+        final InputStream failureInputStream = new InputStream() {
+            @Override
+            public int read() throws IOException {
+                throw new IOException("Intentional Unit Test Exception");
+            }
+        };
+
+        try {
+            lease.publish(flowFile, failureInputStream, messageKey, 
demarcatorBytes, topic);
+            Assert.fail("Expected IOException");
+        } catch (final IOException ioe) {
+            // expected
+        }
+
+        assertEquals(1, poisonCount.get());
+
+        final PublishResult result = lease.complete();
+        assertTrue(result.isFailure());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testPoisonOnFailure() throws IOException {
+        final AtomicInteger poisonCount = new AtomicInteger(0);
+
+        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 
1000L, logger, true, null, StandardCharsets.UTF_8) {
+            @Override
+            public void poison() {
+                poisonCount.incrementAndGet();
+                super.poison();
+            }
+        };
+
+        final FlowFile flowFile = new MockFlowFile(1L);
+        final String topic = "unit-test";
+        final byte[] messageKey = null;
+        final byte[] demarcatorBytes = null;
+
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws 
Throwable {
+                final Callback callback = invocation.getArgumentAt(1, 
Callback.class);
+                callback.onCompletion(null, new RuntimeException("Unit Test 
Intentional Exception"));
+                return null;
+            }
+        }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
+
+        lease.publish(flowFile, new ByteArrayInputStream(new byte[1]), 
messageKey, demarcatorBytes, topic);
+
+        assertEquals(1, poisonCount.get());
+
+        final PublishResult result = lease.complete();
+        assertTrue(result.isFailure());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testAllDelimitedMessagesSent() throws IOException {
+        final AtomicInteger poisonCount = new AtomicInteger(0);
+
+        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 
10L, logger, true, null, StandardCharsets.UTF_8) {
+            @Override
+            protected void poison() {
+                poisonCount.incrementAndGet();
+                super.poison();
+            }
+        };
+
+        final AtomicInteger correctMessages = new AtomicInteger(0);
+        final AtomicInteger incorrectMessages = new AtomicInteger(0);
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable 
{
+                final ProducerRecord<byte[], byte[]> record = 
invocation.getArgumentAt(0, ProducerRecord.class);
+                final byte[] value = record.value();
+                final String valueString = new String(value, 
StandardCharsets.UTF_8);
+                if ("1234567890".equals(valueString)) {
+                    correctMessages.incrementAndGet();
+                } else {
+                    incorrectMessages.incrementAndGet();
+                }
+
+                return null;
+            }
+        }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
+
+        final FlowFile flowFile = new MockFlowFile(1L);
+        final String topic = "unit-test";
+        final byte[] messageKey = null;
+        final byte[] demarcatorBytes = "\n".getBytes(StandardCharsets.UTF_8);
+
+        final byte[] flowFileContent = 
"1234567890\n1234567890\n1234567890\n\n\n\n1234567890\n\n\n1234567890\n\n\n\n".getBytes(StandardCharsets.UTF_8);
+        lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), 
messageKey, demarcatorBytes, topic);
+
+        final byte[] flowFileContent2 = new byte[0];
+        lease.publish(new MockFlowFile(2L), new 
ByteArrayInputStream(flowFileContent2), messageKey, demarcatorBytes, topic);
+
+        final byte[] flowFileContent3 = 
"1234567890\n1234567890".getBytes(StandardCharsets.UTF_8); // no trailing new 
line
+        lease.publish(new MockFlowFile(3L), new 
ByteArrayInputStream(flowFileContent3), messageKey, demarcatorBytes, topic);
+
+        final byte[] flowFileContent4 = 
"\n\n\n".getBytes(StandardCharsets.UTF_8);
+        lease.publish(new MockFlowFile(4L), new 
ByteArrayInputStream(flowFileContent4), messageKey, demarcatorBytes, topic);
+
+        assertEquals(0, poisonCount.get());
+
+        verify(producer, times(0)).flush();
+
+        final PublishResult result = lease.complete();
+        assertTrue(result.isFailure());
+
+        assertEquals(7, correctMessages.get());
+        assertEquals(0, incorrectMessages.get());
+
+        verify(producer, times(1)).flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
new file mode 100644
index 0000000..afaf841
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherPool.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+public class TestPublisherPool {
+
+    @Test
+    public void testLeaseCloseReturnsToPool() {
+        final Map<String, Object> kafkaProperties = new HashMap<>();
+        kafkaProperties.put("bootstrap.servers", "localhost:1111");
+        kafkaProperties.put("key.serializer", 
ByteArraySerializer.class.getName());
+        kafkaProperties.put("value.serializer", 
ByteArraySerializer.class.getName());
+
+        final PublisherPool pool = new PublisherPool(kafkaProperties, 
Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L, false, null, 
StandardCharsets.UTF_8);
+        assertEquals(0, pool.available());
+
+        final PublisherLease lease = pool.obtainPublisher();
+        assertEquals(0, pool.available());
+
+        lease.close();
+        assertEquals(1, pool.available());
+    }
+
+    @Test
+    public void testPoisonedLeaseNotReturnedToPool() {
+        final Map<String, Object> kafkaProperties = new HashMap<>();
+        kafkaProperties.put("bootstrap.servers", "localhost:1111");
+        kafkaProperties.put("key.serializer", 
ByteArraySerializer.class.getName());
+        kafkaProperties.put("value.serializer", 
ByteArraySerializer.class.getName());
+
+        final PublisherPool pool = new PublisherPool(kafkaProperties, 
Mockito.mock(ComponentLog.class), 1024 * 1024, 1000L, false, null, 
StandardCharsets.UTF_8);
+        assertEquals(0, pool.available());
+
+        final PublisherLease lease = pool.obtainPublisher();
+        assertEquals(0, pool.available());
+
+        lease.poison();
+        lease.close();
+        assertEquals(0, pool.available());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
new file mode 100644
index 0000000..819e3b7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+
+import sun.misc.Unsafe;
+
+class TestUtils {
+
+    public static void setFinalField(Field field, Object instance, Object 
newValue) throws Exception {
+        field.setAccessible(true);
+        Field modifiersField = Field.class.getDeclaredField("modifiers");
+        modifiersField.setAccessible(true);
+        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+
+        field.set(instance, newValue);
+    }
+
+    static Unsafe getUnsafe() {
+        try {
+            Field f = Unsafe.class.getDeclaredField("theUnsafe");
+            f.setAccessible(true);
+            return (Unsafe) f.get(null);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java
new file mode 100644
index 0000000..21527be
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SchemaValidationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class MockRecordParser extends AbstractControllerService implements 
RecordReaderFactory {
+    private final List<Object[]> records = new ArrayList<>();
+    private final List<RecordField> fields = new ArrayList<>();
+    private final int failAfterN;
+
+    public MockRecordParser() {
+        this(-1);
+    }
+
+    public MockRecordParser(final int failAfterN) {
+        this.failAfterN = failAfterN;
+    }
+
+
+    public void addSchemaField(final String fieldName, final RecordFieldType 
type) {
+        fields.add(new RecordField(fieldName, type.getDataType()));
+    }
+
+    public void addRecord(Object... values) {
+        records.add(values);
+    }
+
+    @Override
+    public RecordReader createRecordReader(Map<String, String> variables, 
InputStream in, ComponentLog logger) throws IOException, 
SchemaNotFoundException {
+        final BufferedReader reader = new BufferedReader(new 
InputStreamReader(in));
+
+        return new RecordReader() {
+            private int recordCount = 0;
+
+            @Override
+            public void close() throws IOException {
+            }
+
+            @Override
+            public Record nextRecord(boolean coerceTypes, boolean dropUnknown) 
throws IOException, MalformedRecordException, SchemaValidationException {
+                if (failAfterN >= recordCount) {
+                    throw new MalformedRecordException("Intentional Unit Test 
Exception because " + recordCount + " records have been read");
+                }
+                final String line = reader.readLine();
+                if (line == null) {
+                    return null;
+                }
+
+                recordCount++;
+
+                final String[] values = line.split(",");
+                final Map<String, Object> valueMap = new HashMap<>();
+                int i = 0;
+                for (final RecordField field : fields) {
+                    final String fieldName = field.getFieldName();
+                    valueMap.put(fieldName, values[i++].trim());
+                }
+
+                return new MapRecord(new SimpleRecordSchema(fields), valueMap);
+            }
+
+            @Override
+            public RecordSchema getSchema() {
+                return new SimpleRecordSchema(fields);
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
new file mode 100644
index 0000000..90a909d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+public class MockRecordWriter extends AbstractControllerService implements 
RecordSetWriterFactory {
+    private final String header;
+    private final int failAfterN;
+    private final boolean quoteValues;
+
+    public MockRecordWriter(final String header) {
+        this(header, true, -1);
+    }
+
+    public MockRecordWriter(final String header, final boolean quoteValues) {
+        this(header, quoteValues, -1);
+    }
+
+    public MockRecordWriter(final String header, final boolean quoteValues, 
final int failAfterN) {
+        this.header = header;
+        this.quoteValues = quoteValues;
+        this.failAfterN = failAfterN;
+    }
+
+    @Override
+    public RecordSchema getSchema(Map<String, String> variables, RecordSchema 
readSchema) throws SchemaNotFoundException, IOException {
+        return null;
+    }
+
+    @Override
+    public RecordSetWriter createWriter(final ComponentLog logger, final 
RecordSchema schema, final OutputStream out) {
+        return new RecordSetWriter() {
+
+            @Override
+            public void flush() throws IOException {
+                out.flush();
+            }
+
+            @Override
+            public WriteResult write(final RecordSet rs) throws IOException {
+                out.write(header.getBytes());
+                out.write("\n".getBytes());
+
+                int recordCount = 0;
+                final int numCols = rs.getSchema().getFieldCount();
+                Record record = null;
+                while ((record = rs.next()) != null) {
+                    if (++recordCount > failAfterN && failAfterN > -1) {
+                        throw new IOException("Unit Test intentionally 
throwing IOException after " + failAfterN + " records were written");
+                    }
+
+                    int i = 0;
+                    for (final String fieldName : 
record.getSchema().getFieldNames()) {
+                        final String val = record.getAsString(fieldName);
+                        if (quoteValues) {
+                            out.write("\"".getBytes());
+                            if (val != null) {
+                                out.write(val.getBytes());
+                            }
+                            out.write("\"".getBytes());
+                        } else if (val != null) {
+                            out.write(val.getBytes());
+                        }
+
+                        if (i++ < numCols - 1) {
+                            out.write(",".getBytes());
+                        }
+                    }
+                    out.write("\n".getBytes());
+                }
+
+                return WriteResult.of(recordCount, Collections.emptyMap());
+            }
+
+            @Override
+            public String getMimeType() {
+                return "text/plain";
+            }
+
+            @Override
+            public WriteResult write(Record record) throws IOException {
+                return null;
+            }
+
+            @Override
+            public void close() throws IOException {
+            }
+
+            @Override
+            public void beginRecordSet() throws IOException {
+            }
+
+            @Override
+            public WriteResult finishRecordSet() throws IOException {
+                return null;
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
new file mode 100644
index 0000000..a720b11
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/test/EmbeddedKafka.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Properties;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+
+/**
+ * Embedded Kafka server, primarily to be used for testing.
+ */
+public class EmbeddedKafka {
+
+    private final KafkaServerStartable kafkaServer;
+
+    private final Properties zookeeperConfig;
+
+    private final Properties kafkaConfig;
+
+    private final ZooKeeperServer zkServer;
+
+    private final Logger logger = LoggerFactory.getLogger(EmbeddedKafka.class);
+
+    private final int kafkaPort;
+
+    private final int zookeeperPort;
+
+    private boolean started;
+
+    /**
+     * Will create instance of the embedded Kafka server. Kafka and Zookeeper
+     * configuration properties will be loaded from 'server.properties' and
+     * 'zookeeper.properties' located at the root of the classpath.
+     */
+    public EmbeddedKafka() {
+        this(loadPropertiesFromClasspath("/server.properties"), 
loadPropertiesFromClasspath("/zookeeper.properties"));
+    }
+
+    /**
+     * Will create instance of the embedded Kafka server.
+     *
+     * @param kafkaConfig
+     *            Kafka configuration properties
+     * @param zookeeperConfig
+     *            Zookeeper configuration properties
+     */
+    public EmbeddedKafka(Properties kafkaConfig, Properties zookeeperConfig) {
+        this.cleanupKafkaWorkDir();
+        this.zookeeperConfig = zookeeperConfig;
+        this.kafkaConfig = kafkaConfig;
+        this.kafkaPort = this.availablePort();
+        this.zookeeperPort = this.availablePort();
+
+        this.kafkaConfig.setProperty("port", String.valueOf(this.kafkaPort));
+        this.kafkaConfig.setProperty("zookeeper.connect", "localhost:" + 
this.zookeeperPort);
+        this.zookeeperConfig.setProperty("clientPort", 
String.valueOf(this.zookeeperPort));
+        this.zkServer = new ZooKeeperServer();
+        this.kafkaServer = new KafkaServerStartable(new 
KafkaConfig(kafkaConfig));
+    }
+
+    /**
+     *
+     * @return port for Kafka server
+     */
+    public int getKafkaPort() {
+        if (!this.started) {
+            throw new IllegalStateException("Kafka server is not started. 
Kafka port can't be determined.");
+        }
+        return this.kafkaPort;
+    }
+
+    /**
+     *
+     * @return port for Zookeeper server
+     */
+    public int getZookeeperPort() {
+        if (!this.started) {
+            throw new IllegalStateException("Kafka server is not started. 
Zookeeper port can't be determined.");
+        }
+        return this.zookeeperPort;
+    }
+
+    /**
+     * Will start embedded Kafka server. Its data directories will be created
+     * at 'kafka-tmp' directory relative to the working directory of the 
current
+     * runtime. The data directories will be deleted upon JVM exit.
+     *
+     */
+    public void start() {
+        if (!this.started) {
+            logger.info("Starting Zookeeper server");
+            this.startZookeeper();
+
+            logger.info("Starting Kafka server");
+            this.kafkaServer.startup();
+
+            logger.info("Embedded Kafka is started at localhost:" + 
this.kafkaServer.serverConfig().port()
+                    + ". Zookeeper connection string: " + 
this.kafkaConfig.getProperty("zookeeper.connect"));
+            this.started = true;
+        }
+    }
+
+    /**
+     * Will stop embedded Kafka server, cleaning up all working directories.
+     */
+    public void stop() {
+        if (this.started) {
+            logger.info("Shutting down Kafka server");
+            this.kafkaServer.shutdown();
+            this.kafkaServer.awaitShutdown();
+            logger.info("Shutting down Zookeeper server");
+            this.shutdownZookeeper();
+            logger.info("Embedded Kafka is shut down.");
+            this.cleanupKafkaWorkDir();
+            this.started = false;
+        }
+    }
+
+    /**
+     *
+     */
+    private void cleanupKafkaWorkDir() {
+        File kafkaTmp = new File("target/kafka-tmp");
+        try {
+            FileUtils.deleteDirectory(kafkaTmp);
+        } catch (Exception e) {
+            logger.warn("Failed to delete " + kafkaTmp.getAbsolutePath());
+        }
+    }
+
+    /**
+     * Will start Zookeeper server via {@link ServerCnxnFactory}
+     */
+    private void startZookeeper() {
+        QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
+        try {
+            quorumConfiguration.parseProperties(this.zookeeperConfig);
+
+            ServerConfig configuration = new ServerConfig();
+            configuration.readFrom(quorumConfiguration);
+
+            FileTxnSnapLog txnLog = new FileTxnSnapLog(new 
File(configuration.getDataLogDir()), new File(configuration.getDataDir()));
+
+            zkServer.setTxnLogFactory(txnLog);
+            zkServer.setTickTime(configuration.getTickTime());
+            
zkServer.setMinSessionTimeout(configuration.getMinSessionTimeout());
+            
zkServer.setMaxSessionTimeout(configuration.getMaxSessionTimeout());
+            ServerCnxnFactory zookeeperConnectionFactory = 
ServerCnxnFactory.createFactory();
+            
zookeeperConnectionFactory.configure(configuration.getClientPortAddress(),
+                    configuration.getMaxClientCnxns());
+            zookeeperConnectionFactory.startup(zkServer);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to start Zookeeper 
server", e);
+        }
+    }
+
+    /**
+     * Will shut down Zookeeper server.
+     */
+    private void shutdownZookeeper() {
+        zkServer.shutdown();
+    }
+
+    /**
+     * Will load {@link Properties} from properties file discovered at the
+     * provided path relative to the root of the classpath.
+     */
+    private static Properties loadPropertiesFromClasspath(String path) {
+        try {
+            Properties kafkaProperties = new Properties();
+            kafkaProperties.load(Class.class.getResourceAsStream(path));
+            return kafkaProperties;
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    /**
+     * Will determine the available port used by Kafka/Zookeeper servers.
+     */
+    private int availablePort() {
+        ServerSocket s = null;
+        try {
+            s = new ServerSocket(0);
+            s.setReuseAddress(true);
+            return s.getLocalPort();
+        } catch (Exception e) {
+            throw new IllegalStateException("Failed to discover available 
port.", e);
+        } finally {
+            try {
+                s.close();
+            } catch (IOException e) {
+                // ignore
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/00b11e82/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/resources/log4j.properties
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/resources/log4j.properties
new file mode 100644
index 0000000..57cd63f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootCategory=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - 
%m%n
+
+#og4j.category.org.apache.nifi.processors.kafka=DEBUG

Reply via email to