http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html
deleted file mode 100644
index 20ce03c..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafka/additionalDetails.html
+++ /dev/null
@@ -1,47 +0,0 @@
-<!DOCTYPE html>
-<html lang="en">
-    <!--
-      Licensed to the Apache Software Foundation (ASF) under one or more
-      contributor license agreements.  See the NOTICE file distributed with
-      this work for additional information regarding copyright ownership.
-      The ASF licenses this file to You under the Apache License, Version 2.0
-      (the "License"); you may not use this file except in compliance with
-      the License.  You may obtain a copy of the License at
-          http://www.apache.org/licenses/LICENSE-2.0
-      Unless required by applicable law or agreed to in writing, software
-      distributed under the License is distributed on an "AS IS" BASIS,
-      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-      See the License for the specific language governing permissions and
-      limitations under the License.
-    -->
-    <head>
-        <meta charset="utf-8" />
-        <title>PublishKafka</title>
-        <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css" />
-    </head>
-
-    <body>
-        <!-- Processor Documentation 
================================================== -->
-        <h2>Description:</h2>
-        <p>
-            This Processors puts the contents of a FlowFile to a Topic in
-            <a href="http://kafka.apache.org/";>Apache Kafka</a> using 
KafkaProducer API available
-            with Kafka 0.9+ API. The content of a FlowFile becomes the 
contents of a Kafka message.
-            This message is optionally assigned a key by using the &lt;Kafka 
Key&gt; Property.
-        </p>
-
-        <p>
-            The Processor allows the user to configure an optional Message 
Demarcator that
-            can be used to send many messages per FlowFile. For example, a 
<i>\n</i> could be used
-            to indicate that the contents of the FlowFile should be used to 
send one message
-            per line of text. It also supports multi-char demarcators (e.g., 
'my custom demarcator').
-            If the property is not set, the entire contents of the FlowFile
-            will be sent as a single message. When using the demarcator, if 
some messages are
-            successfully sent but other messages fail to send, the resulting 
FlowFile will be
-            considered a failed FlowFuile and will have additional attributes 
to that effect.
-            One of such attributes is 'failed.last.idx' which indicates the 
index of the last message
-            that was successfully ACKed by Kafka. (if no demarcator is used 
the value of this index will be -1).
-            This will allow PublishKafka to only re-send un-ACKed messages on 
the next re-try.
-        </p>
-    </body>
-</html>

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycleTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycleTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycleTest.java
deleted file mode 100644
index d09be60..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycleTest.java
+++ /dev/null
@@ -1,456 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import java.io.Closeable;
-import java.lang.reflect.Field;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-public class AbstractKafkaProcessorLifecycleTest {
-
-    private final static Random random = new Random();
-
-    @Test
-    public void validateBaseProperties() throws Exception {
-        TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class);
-        runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
-        runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
-        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
-
-        try {
-            runner.assertValid();
-            fail();
-        } catch (AssertionError e) {
-            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
-        }
-
-        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
-        try {
-            runner.assertValid();
-            fail();
-        } catch (AssertionError e) {
-            assertTrue(e.getMessage().contains("'bootstrap.servers' validated 
against 'foo' is invalid"));
-        }
-        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
-
-        runner.removeProperty(ConsumeKafka.TOPIC);
-        try {
-            runner.assertValid();
-            fail();
-        } catch (AssertionError e) {
-            assertTrue(e.getMessage().contains("'topic' is invalid because 
topic is required"));
-        }
-
-        runner.setProperty(ConsumeKafka.TOPIC, "");
-        try {
-            runner.assertValid();
-            fail();
-        } catch (AssertionError e) {
-            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
-        }
-
-        runner.setProperty(ConsumeKafka.TOPIC, "  ");
-        try {
-            runner.assertValid();
-            fail();
-        } catch (AssertionError e) {
-            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
-        }
-        runner.setProperty(ConsumeKafka.TOPIC, "blah");
-
-        runner.removeProperty(ConsumeKafka.CLIENT_ID);
-        try {
-            runner.assertValid();
-            fail();
-        } catch (AssertionError e) {
-            assertTrue(e.getMessage().contains("invalid because client.id is 
required"));
-        }
-
-        runner.setProperty(ConsumeKafka.CLIENT_ID, "");
-        try {
-            runner.assertValid();
-            fail();
-        } catch (AssertionError e) {
-            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
-        }
-
-        runner.setProperty(ConsumeKafka.CLIENT_ID, "   ");
-        try {
-            runner.assertValid();
-            fail();
-        } catch (AssertionError e) {
-            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
-        }
-        runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj");
-
-        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "");
-        try {
-            runner.assertValid();
-            fail();
-        } catch (AssertionError e) {
-            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
-        }
-        runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "  ");
-        try {
-            runner.assertValid();
-            fail();
-        } catch (AssertionError e) {
-            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
-        }
-    }
-
-    @Test
-    @Ignore // just for extra sanity check
-    public void validateConcurrencyWithRandomFailuresMultiple() throws 
Exception {
-        for (int i = 0; i < 100; i++) {
-            validateConcurrencyWithRandomFailures();
-        }
-    }
-
-    @Test
-    public void validateConcurrencyWithRandomFailures() throws Exception {
-        ExecutorService processingExecutor = Executors.newFixedThreadPool(32);
-        final AtomicInteger commitCounter = new AtomicInteger();
-        final AtomicInteger rollbackCounter = new AtomicInteger();
-        final AtomicInteger yieldCounter = new AtomicInteger();
-
-        final ProcessSessionFactory sessionFactory = 
mock(ProcessSessionFactory.class);
-        final ProcessSession session = mock(ProcessSession.class);
-        when(sessionFactory.createSession()).thenReturn(session);
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocation) throws Throwable {
-                commitCounter.incrementAndGet();
-                return null;
-            }
-        }).when(session).commit();
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocation) throws Throwable {
-                rollbackCounter.incrementAndGet();
-                return null;
-            }
-        }).when(session).rollback(true);
-
-        final ConcurrencyValidatingProcessor processor = spy(new 
ConcurrencyValidatingProcessor());
-
-        int testCount = 1000;
-        final CountDownLatch latch = new CountDownLatch(testCount);
-        for (int i = 0; i < testCount; i++) {
-            processingExecutor.execute(new Runnable() {
-                @Override
-                public void run() {
-                    ProcessContext context = mock(ProcessContext.class);
-                    doAnswer(new Answer<Void>() {
-                        @Override
-                        public Void answer(InvocationOnMock invocation) throws 
Throwable {
-                            yieldCounter.incrementAndGet();
-                            return null;
-                        }
-                    }).when(context).yield();
-                    if (random.nextInt(10) == 5) {
-                        when(context.getName()).thenReturn("fail");
-                    }
-                    try {
-                        processor.onTrigger(context, sessionFactory);
-                    } catch (Exception e) {
-                        fail();
-                    } finally {
-                        latch.countDown();
-                    }
-                }
-            });
-        }
-
-        assertTrue(latch.await(20000, TimeUnit.MILLISECONDS));
-        processingExecutor.shutdown();
-
-        System.out.println("SUCCESS: " + processor.successfulTriggers);
-        System.out.println("FAILURE: " + processor.failedTriggers);
-        System.out.println("INIT: " + processor.resourceReinitialized);
-        System.out.println("YIELD CALLS: " + yieldCounter.get());
-        System.out.println("COMMIT CALLS: " + commitCounter.get());
-        System.out.println("ROLLBACK CALLS: " + rollbackCounter.get());
-        System.out.println("Close CALLS: " + processor.closeCounter.get());
-
-        /*
-         * this has to be <= 1 since the last thread may come to finally block
-         * after acceptTask flag has been reset at which point the close will
-         * not be called (which is correct behavior since it will be invoked
-         * explicitly by the life-cycle operations of a running processor).
-         *
-         * You can actually observe the =1 behavior in the next test where it 
is
-         * always 0 close calls
-         */
-        int closeVsInitDiff = processor.resourceReinitialized.get() - 
processor.closeCounter.get();
-        assertTrue(closeVsInitDiff <= 1);
-
-        assertEquals(commitCounter.get(), processor.successfulTriggers.get());
-        assertEquals(rollbackCounter.get(), processor.failedTriggers.get());
-
-        assertEquals(testCount,
-                processor.successfulTriggers.get() + 
processor.failedTriggers.get() + yieldCounter.get());
-    }
-
-    @Test
-    public void validateConcurrencyWithAllSuccesses() throws Exception {
-        ExecutorService processingExecutor = Executors.newFixedThreadPool(32);
-        final AtomicInteger commitCounter = new AtomicInteger();
-        final AtomicInteger rollbackCounter = new AtomicInteger();
-        final AtomicInteger yieldCounter = new AtomicInteger();
-
-        final ProcessSessionFactory sessionFactory = 
mock(ProcessSessionFactory.class);
-        final ProcessSession session = mock(ProcessSession.class);
-        when(sessionFactory.createSession()).thenReturn(session);
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocation) throws Throwable {
-                commitCounter.incrementAndGet();
-                return null;
-            }
-        }).when(session).commit();
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocation) throws Throwable {
-                rollbackCounter.incrementAndGet();
-                return null;
-            }
-        }).when(session).rollback(true);
-
-        final ConcurrencyValidatingProcessor processor = spy(new 
ConcurrencyValidatingProcessor());
-
-        int testCount = 1000;
-        final CountDownLatch latch = new CountDownLatch(testCount);
-        for (int i = 0; i < testCount; i++) {
-            processingExecutor.execute(new Runnable() {
-                @Override
-                public void run() {
-                    ProcessContext context = mock(ProcessContext.class);
-                    doAnswer(new Answer<Void>() {
-                        @Override
-                        public Void answer(InvocationOnMock invocation) throws 
Throwable {
-                            yieldCounter.incrementAndGet();
-                            return null;
-                        }
-                    }).when(context).yield();
-                    try {
-                        processor.onTrigger(context, sessionFactory);
-                    } catch (Exception e) {
-                        fail();
-                    } finally {
-                        latch.countDown();
-                    }
-                }
-            });
-        }
-
-        assertTrue(latch.await(30000, TimeUnit.MILLISECONDS));
-        processingExecutor.shutdown();
-
-        System.out.println("SUCCESS: " + processor.successfulTriggers);
-        System.out.println("FAILURE: " + processor.failedTriggers);
-        System.out.println("INIT: " + processor.resourceReinitialized);
-        System.out.println("YIELD CALLS: " + yieldCounter.get());
-        System.out.println("COMMIT CALLS: " + commitCounter.get());
-        System.out.println("ROLLBACK CALLS: " + rollbackCounter.get());
-        System.out.println("Close CALLS: " + processor.closeCounter.get());
-
-        /*
-         * unlike previous test this one will always be 1 since there are no
-         * failures
-         */
-        int closeVsInitDiff = processor.resourceReinitialized.get() - 
processor.closeCounter.get();
-        assertEquals(1, closeVsInitDiff);
-
-        assertEquals(commitCounter.get(), processor.successfulTriggers.get());
-        assertEquals(rollbackCounter.get(), processor.failedTriggers.get());
-
-        assertEquals(testCount,
-                processor.successfulTriggers.get() + 
processor.failedTriggers.get() + yieldCounter.get());
-    }
-
-    @Test
-    public void validateConcurrencyWithAllFailures() throws Exception {
-        ExecutorService processingExecutor = Executors.newFixedThreadPool(32);
-        final AtomicInteger commitCounter = new AtomicInteger();
-        final AtomicInteger rollbackCounter = new AtomicInteger();
-        final AtomicInteger yieldCounter = new AtomicInteger();
-
-        final ProcessSessionFactory sessionFactory = 
mock(ProcessSessionFactory.class);
-        final ProcessSession session = mock(ProcessSession.class);
-        when(sessionFactory.createSession()).thenReturn(session);
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocation) throws Throwable {
-                commitCounter.incrementAndGet();
-                return null;
-            }
-        }).when(session).commit();
-        doAnswer(new Answer<Void>() {
-            @Override
-            public Void answer(InvocationOnMock invocation) throws Throwable {
-                rollbackCounter.incrementAndGet();
-                return null;
-            }
-        }).when(session).rollback(true);
-
-        final ConcurrencyValidatingProcessor processor = spy(new 
ConcurrencyValidatingProcessor());
-
-        int testCount = 1000;
-        final CountDownLatch latch = new CountDownLatch(testCount);
-        for (int i = 0; i < testCount; i++) {
-            processingExecutor.execute(new Runnable() {
-                @Override
-                public void run() {
-                    ProcessContext context = mock(ProcessContext.class);
-                    doAnswer(new Answer<Void>() {
-                        @Override
-                        public Void answer(InvocationOnMock invocation) throws 
Throwable {
-                            yieldCounter.incrementAndGet();
-                            return null;
-                        }
-                    }).when(context).yield();
-                    when(context.getName()).thenReturn("fail");
-                    try {
-                        processor.onTrigger(context, sessionFactory);
-                    } catch (Exception e) {
-                        fail();
-                    } finally {
-                        latch.countDown();
-                    }
-                }
-            });
-        }
-
-        assertTrue(latch.await(20000, TimeUnit.MILLISECONDS));
-        processingExecutor.shutdown();
-
-        System.out.println("SUCCESS: " + processor.successfulTriggers);
-        System.out.println("FAILURE: " + processor.failedTriggers);
-        System.out.println("INIT: " + processor.resourceReinitialized);
-        System.out.println("YIELD CALLS: " + yieldCounter.get());
-        System.out.println("COMMIT CALLS: " + commitCounter.get());
-        System.out.println("ROLLBACK CALLS: " + rollbackCounter.get());
-        System.out.println("Close CALLS: " + processor.closeCounter.get());
-
-        /*
-         * unlike previous test this one will always be 0 since all triggers 
are
-         * failures
-         */
-        int closeVsInitDiff = processor.resourceReinitialized.get() - 
processor.closeCounter.get();
-        assertEquals(0, closeVsInitDiff);
-
-        assertEquals(commitCounter.get(), processor.successfulTriggers.get());
-        assertEquals(rollbackCounter.get(), processor.failedTriggers.get());
-
-        assertEquals(testCount,
-                processor.successfulTriggers.get() + 
processor.failedTriggers.get() + yieldCounter.get());
-    }
-
-    /**
-     *
-     */
-    public static class DummyProcessor extends 
AbstractKafkaProcessor<Closeable> {
-        @Override
-        protected boolean rendezvousWithKafka(ProcessContext context, 
ProcessSession session) throws ProcessException {
-            return true;
-        }
-
-        @Override
-        protected Closeable buildKafkaResource(ProcessContext context, 
ProcessSession session) throws ProcessException {
-            return mock(Closeable.class);
-        }
-
-        @Override
-        protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-            return SHARED_DESCRIPTORS;
-        }
-    }
-
-
-    public static class ConcurrencyValidatingProcessor extends 
AbstractKafkaProcessor<Closeable> {
-        final AtomicInteger failedTriggers = new AtomicInteger();
-        final AtomicInteger successfulTriggers = new AtomicInteger();
-        final AtomicInteger resourceReinitialized = new AtomicInteger();
-        final AtomicInteger closeCounter = new AtomicInteger();
-
-        ConcurrencyValidatingProcessor() {
-            try {
-                Field loggerField = 
AbstractSessionFactoryProcessor.class.getDeclaredField("logger");
-                loggerField.setAccessible(true);
-                loggerField.set(this, mock(ComponentLog.class));
-            } catch (Exception e) {
-                throw new IllegalStateException(e);
-            }
-        }
-
-        @Override
-        @OnStopped
-        public void close() {
-            super.close();
-            assertTrue(this.kafkaResource == null);
-            closeCounter.incrementAndGet();
-        }
-
-        @Override
-        protected boolean rendezvousWithKafka(ProcessContext context, 
ProcessSession session) {
-            assertNotNull(this.kafkaResource);
-            if ("fail".equals(context.getName())) {
-                failedTriggers.incrementAndGet();
-                throw new RuntimeException("Intentional");
-            }
-            this.successfulTriggers.incrementAndGet();
-            return true;
-        }
-
-        @Override
-        protected Closeable buildKafkaResource(ProcessContext context, 
ProcessSession session) throws ProcessException {
-            this.resourceReinitialized.incrementAndGet();
-            return mock(Closeable.class);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
deleted file mode 100644
index 8e17a21..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
-
-// The test is valid and should be ran when working on this module. @Ignore is
-// to speed up the overall build
-public class ConsumeKafkaTest {
-
-    @Test
-    public void validateCustomSerilaizerDeserializerSettings() throws 
Exception {
-        ConsumeKafka consumeKafka = new ConsumeKafka();
-        TestRunner runner = TestRunners.newTestRunner(consumeKafka);
-        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "okeydokey:1234");
-        runner.setProperty(ConsumeKafka.TOPIC, "foo");
-        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
-        runner.setProperty(ConsumeKafka.GROUP_ID, "foo");
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
-        runner.setProperty("key.deserializer", 
ByteArrayDeserializer.class.getName());
-        runner.assertValid();
-        runner.setProperty("key.deserializer", "Foo");
-        runner.assertNotValid();
-    }
-
-    @Test
-    public void validatePropertiesValidation() throws Exception {
-        ConsumeKafka consumeKafka = new ConsumeKafka();
-        TestRunner runner = TestRunners.newTestRunner(consumeKafka);
-        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "okeydokey:1234");
-        runner.setProperty(ConsumeKafka.TOPIC, "foo");
-        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
-        runner.setProperty(ConsumeKafka.GROUP_ID, "foo");
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
-
-        runner.removeProperty(ConsumeKafka.GROUP_ID);
-        try {
-            runner.assertValid();
-            fail();
-        } catch (AssertionError e) {
-            assertTrue(e.getMessage().contains("invalid because group.id is 
required"));
-        }
-
-        runner.setProperty(ConsumeKafka.GROUP_ID, "");
-        try {
-            runner.assertValid();
-            fail();
-        } catch (AssertionError e) {
-            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
-        }
-
-        runner.setProperty(ConsumeKafka.GROUP_ID, "  ");
-        try {
-            runner.assertValid();
-            fail();
-        } catch (AssertionError e) {
-            assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
-        }
-    }
-
-    /**
-     * Will set auto-offset to 'smallest' to ensure that all events (the once
-     * that were sent before and after consumer startup) are received.
-     */
-    @Test
-    public void validateGetAllMessages() throws Exception {
-        String topicName = "validateGetAllMessages";
-
-        StubConsumeKafka consumeKafka = new StubConsumeKafka();
-
-        final TestRunner runner = TestRunners.newTestRunner(consumeKafka);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPIC, topicName);
-        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
-        runner.setProperty(ConsumeKafka.GROUP_ID, "foo");
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
-        runner.setProperty("check.connection", "false");
-
-        byte[][] values = new byte[][] { 
"Hello-1".getBytes(StandardCharsets.UTF_8),
-                "Hello-2".getBytes(StandardCharsets.UTF_8), 
"Hello-3".getBytes(StandardCharsets.UTF_8) };
-        consumeKafka.setValues(values);
-
-        runner.run(1, false);
-
-        values = new byte[][] { "Hello-4".getBytes(StandardCharsets.UTF_8), 
"Hello-5".getBytes(StandardCharsets.UTF_8),
-                "Hello-6".getBytes(StandardCharsets.UTF_8) };
-        consumeKafka.setValues(values);
-
-        runner.run(1, false);
-
-        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(6, flowFiles.size());
-        // spot check
-        MockFlowFile flowFile = flowFiles.get(0);
-        String event = new String(flowFile.toByteArray());
-        assertEquals("Hello-1", event);
-
-        flowFile = flowFiles.get(1);
-        event = new String(flowFile.toByteArray());
-        assertEquals("Hello-2", event);
-
-        flowFile = flowFiles.get(5);
-        event = new String(flowFile.toByteArray());
-        assertEquals("Hello-6", event);
-
-        consumeKafka.close();
-    }
-
-    @Test
-    public void validateGetAllMessagesWithProvidedDemarcator() throws 
Exception {
-        String topicName = "validateGetAllMessagesWithProvidedDemarcator";
-
-        StubConsumeKafka consumeKafka = new StubConsumeKafka();
-
-        final TestRunner runner = TestRunners.newTestRunner(consumeKafka);
-        runner.setValidateExpressionUsage(false);
-        runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPIC, topicName);
-        runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
-        runner.setProperty(ConsumeKafka.GROUP_ID, "foo");
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
-        runner.setProperty(ConsumeKafka.MESSAGE_DEMARCATOR, "blah");
-        runner.setProperty("check.connection", "false");
-
-        byte[][] values = new byte[][] { 
"Hello-1".getBytes(StandardCharsets.UTF_8),
-                "Hi-2".getBytes(StandardCharsets.UTF_8) };
-        consumeKafka.setValues(values);
-
-        runner.run(1, false);
-        List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-        assertEquals(1, flowFiles.size());
-
-        values = new byte[][] { 
"Здравствуйте-3".getBytes(StandardCharsets.UTF_8),
-                "こんにちは-4".getBytes(StandardCharsets.UTF_8), 
"Hello-5".getBytes(StandardCharsets.UTF_8) };
-        consumeKafka.setValues(values);
-
-        runner.run(1, false);
-
-        flowFiles = 
runner.getFlowFilesForRelationship(ConsumeKafka.REL_SUCCESS);
-
-        assertEquals(2, flowFiles.size());
-        MockFlowFile flowFile = flowFiles.get(0);
-        String[] events = new String(flowFile.toByteArray(), 
StandardCharsets.UTF_8).split("blah");
-        assertEquals("0", flowFile.getAttribute("kafka.partition"));
-        assertEquals("0", flowFile.getAttribute("kafka.offset"));
-        assertEquals("validateGetAllMessagesWithProvidedDemarcator", 
flowFile.getAttribute("kafka.topic"));
-
-        assertEquals(2, events.length);
-
-        flowFile = flowFiles.get(1);
-        events = new String(flowFile.toByteArray(), 
StandardCharsets.UTF_8).split("blah");
-
-        assertEquals(3, events.length);
-        // spot check
-        assertEquals("Здравствуйте-3", events[0]);
-        assertEquals("こんにちは-4", events[1]);
-        assertEquals("Hello-5", events[2]);
-
-        consumeKafka.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
deleted file mode 100644
index 6b8b042..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisherTest.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.kafka.clients.producer.Partitioner;
-import org.apache.kafka.common.Cluster;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.logging.ComponentLog;
-import 
org.apache.nifi.processors.kafka.pubsub.KafkaPublisher.KafkaPublisherResult;
-import org.apache.nifi.processors.kafka.test.EmbeddedKafka;
-import org.apache.nifi.processors.kafka.test.EmbeddedKafkaProducerHelper;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.ConsumerTimeoutException;
-import kafka.consumer.KafkaStream;
-import kafka.javaapi.consumer.ConsumerConnector;
-
-public class KafkaPublisherTest {
-
-    private static EmbeddedKafka kafkaLocal;
-
-    private static EmbeddedKafkaProducerHelper producerHelper;
-
-    @BeforeClass
-    public static void beforeClass() {
-        kafkaLocal = new EmbeddedKafka();
-        kafkaLocal.start();
-        producerHelper = new EmbeddedKafkaProducerHelper(kafkaLocal);
-    }
-
-    @AfterClass
-    public static void afterClass() throws Exception {
-        producerHelper.close();
-        kafkaLocal.stop();
-    }
-
-    @Test
-    public void validateSuccessfulSendAsWhole() throws Exception {
-        InputStream contentStream = new ByteArrayInputStream("Hello 
Kafka".getBytes(StandardCharsets.UTF_8));
-        String topicName = "validateSuccessfulSendAsWhole";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
-
-        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
-        KafkaPublisherResult result = publisher.publish(publishingContext);
-
-        assertEquals(0, result.getLastMessageAcked());
-        assertEquals(1, result.getMessagesSent());
-        contentStream.close();
-        publisher.close();
-
-        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
-        assertNotNull(iter.next());
-        try {
-            iter.next();
-        } catch (ConsumerTimeoutException e) {
-            // that's OK since this is the Kafka mechanism to unblock
-        }
-    }
-
-    @Test
-    public void validateSuccessfulSendAsDelimited() throws Exception {
-        InputStream contentStream = new ByteArrayInputStream(
-                "Hello Kafka\nHello Kafka\nHello Kafka\nHello 
Kafka\n".getBytes(StandardCharsets.UTF_8));
-        String topicName = "validateSuccessfulSendAsDelimited";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
-
-        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
-        
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
-        KafkaPublisherResult result = publisher.publish(publishingContext);
-
-        assertEquals(3, result.getLastMessageAcked());
-        assertEquals(4, result.getMessagesSent());
-        contentStream.close();
-        publisher.close();
-
-        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
-        assertNotNull(iter.next());
-        assertNotNull(iter.next());
-        assertNotNull(iter.next());
-        assertNotNull(iter.next());
-        try {
-            iter.next();
-            fail();
-        } catch (ConsumerTimeoutException e) {
-            // that's OK since this is the Kafka mechanism to unblock
-        }
-    }
-
-    /*
-     * This test simulates the condition where not all messages were ACKed by
-     * Kafka
-     */
-    @Test
-    public void validateRetries() throws Exception {
-        byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello 
Kafka4\n".getBytes(StandardCharsets.UTF_8);
-        InputStream contentStream = new ByteArrayInputStream(testValue);
-        String topicName = "validateSuccessfulReSendOfFailedSegments";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
-
-        // simulates the first re-try
-        int lastAckedMessageIndex = 1;
-        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName, lastAckedMessageIndex);
-        
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
-
-        publisher.publish(publishingContext);
-
-        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
-        String m1 = new String(iter.next().message());
-        String m2 = new String(iter.next().message());
-        assertEquals("Hello Kafka3", m1);
-        assertEquals("Hello Kafka4", m2);
-        try {
-            iter.next();
-            fail();
-        } catch (ConsumerTimeoutException e) {
-            // that's OK since this is the Kafka mechanism to unblock
-        }
-
-        // simulates the second re-try
-        lastAckedMessageIndex = 2;
-        contentStream = new ByteArrayInputStream(testValue);
-        publishingContext = new PublishingContext(contentStream, topicName, 
lastAckedMessageIndex);
-        
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
-        publisher.publish(publishingContext);
-
-        m1 = new String(iter.next().message());
-        assertEquals("Hello Kafka4", m1);
-
-        publisher.close();
-    }
-
-    /*
-     * Similar to the above test, but it sets the first retry index to the last
-     * possible message index and second index to an out of bound index. The
-     * expectation is that no messages will be sent to Kafka
-     */
-    @Test
-    public void validateRetriesWithWrongIndex() throws Exception {
-        byte[] testValue = "Hello Kafka1\nHello Kafka2\nHello Kafka3\nHello 
Kafka4\n".getBytes(StandardCharsets.UTF_8);
-        InputStream contentStream = new ByteArrayInputStream(testValue);
-        String topicName = "validateRetriesWithWrongIndex";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
-
-        // simulates the first re-try
-        int lastAckedMessageIndex = 3;
-        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName, lastAckedMessageIndex);
-        
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
-
-        publisher.publish(publishingContext);
-
-        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
-        try {
-            iter.next();
-            fail();
-        } catch (ConsumerTimeoutException e) {
-            // that's OK since this is the Kafka mechanism to unblock
-        }
-
-        // simulates the second re-try
-        lastAckedMessageIndex = 6;
-        contentStream = new ByteArrayInputStream(testValue);
-        publishingContext = new PublishingContext(contentStream, topicName, 
lastAckedMessageIndex);
-        
publishingContext.setDelimiterBytes("\n".getBytes(StandardCharsets.UTF_8));
-        publisher.publish(publishingContext);
-        try {
-            iter.next();
-            fail();
-        } catch (ConsumerTimeoutException e) {
-            // that's OK since this is the Kafka mechanism to unblock
-        }
-
-        publisher.close();
-    }
-
-    @Test
-    public void validateWithMultiByteCharactersNoDelimiter() throws Exception {
-        String data = "僠THIS IS MY NEW TEXT.僠IT HAS A NEWLINE.";
-        InputStream contentStream = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
-        String topicName = "validateWithMultiByteCharacters";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
-        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
-
-        publisher.publish(publishingContext);
-        publisher.close();
-
-        ConsumerIterator<byte[], byte[]> iter = this.buildConsumer(topicName);
-        String r = new String(iter.next().message(), StandardCharsets.UTF_8);
-        assertEquals(data, r);
-    }
-
-    @Test
-    public void validateWithNonDefaultPartitioner() throws Exception {
-        String data = "fooandbarandbaz";
-        InputStream contentStream = new 
ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8));
-        String topicName = "validateWithNonDefaultPartitioner";
-
-        Properties kafkaProperties = this.buildProducerProperties();
-        kafkaProperties.setProperty("partitioner.class", 
TestPartitioner.class.getName());
-        KafkaPublisher publisher = new KafkaPublisher(kafkaProperties, 
mock(ComponentLog.class));
-        PublishingContext publishingContext = new 
PublishingContext(contentStream, topicName);
-        
publishingContext.setDelimiterBytes("and".getBytes(StandardCharsets.UTF_8));
-
-        try {
-            publisher.publish(publishingContext);
-            // partitioner should be invoked 3 times
-            assertTrue(TestPartitioner.counter == 3);
-            publisher.close();
-        } finally {
-            TestPartitioner.counter = 0;
-        }
-    }
-
-    private Properties buildProducerProperties() {
-        Properties kafkaProperties = new Properties();
-        kafkaProperties.put("key.serializer", 
ByteArraySerializer.class.getName());
-        kafkaProperties.put("value.serializer", 
ByteArraySerializer.class.getName());
-        kafkaProperties.setProperty("bootstrap.servers", "localhost:" + 
kafkaLocal.getKafkaPort());
-        kafkaProperties.put("auto.create.topics.enable", "true");
-        return kafkaProperties;
-    }
-
-    private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) {
-        Properties props = new Properties();
-        props.put("zookeeper.connect", "localhost:" + 
kafkaLocal.getZookeeperPort());
-        props.put("group.id", "test");
-        props.put("consumer.timeout.ms", "500");
-        props.put("auto.offset.reset", "smallest");
-        ConsumerConfig consumerConfig = new ConsumerConfig(props);
-        ConsumerConnector consumer = 
Consumer.createJavaConsumerConnector(consumerConfig);
-        Map<String, Integer> topicCountMap = new HashMap<>(1);
-        topicCountMap.put(topic, 1);
-        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap);
-        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
-        ConsumerIterator<byte[], byte[]> iter = streams.get(0).iterator();
-        return iter;
-    }
-
-    public static class TestPartitioner implements Partitioner {
-        static int counter;
-
-        @Override
-        public void configure(Map<String, ?> configs) {
-            // nothing to do, test
-        }
-
-        @Override
-        public int partition(String topic, Object key, byte[] keyBytes, Object 
value, byte[] valueBytes,
-                Cluster cluster) {
-            counter++;
-            return 0;
-        }
-
-        @Override
-        public void close() {
-            counter = 0;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
deleted file mode 100644
index 01c5fdd..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaTest.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.nio.charset.StandardCharsets;
-
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-// The test is valid and should be ran when working on this module. @Ignore is
-// to speed up the overall build
-public class PublishKafkaTest {
-
-    @Test
-    public void validateCustomSerilaizerDeserializerSettings() throws 
Exception {
-        PublishKafka publishKafka = new PublishKafka();
-        TestRunner runner = TestRunners.newTestRunner(publishKafka);
-        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "okeydokey:1234");
-        runner.setProperty(PublishKafka.TOPIC, "foo");
-        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
-        runner.setProperty(PublishKafka.META_WAIT_TIME, "3 sec");
-        runner.setProperty("key.serializer", 
ByteArraySerializer.class.getName());
-        runner.assertValid();
-        runner.setProperty("key.serializer", "Foo");
-        runner.assertNotValid();
-    }
-
-    @Test
-    public void validatePropertiesValidation() throws Exception {
-        PublishKafka publishKafka = new PublishKafka();
-        TestRunner runner = TestRunners.newTestRunner(publishKafka);
-        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "okeydokey:1234");
-        runner.setProperty(PublishKafka.TOPIC, "foo");
-        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
-        runner.setProperty(PublishKafka.META_WAIT_TIME, "foo");
-
-        try {
-            runner.assertValid();
-            fail();
-        } catch (AssertionError e) {
-            assertTrue(e.getMessage().contains("'max.block.ms' validated 
against 'foo' is invalid"));
-        }
-    }
-
-    @Test
-    public void validateCustomValidation() {
-        String topicName = "validateCustomValidation";
-        PublishKafka publishKafka = new PublishKafka();
-
-        /*
-         * Validates that Kerberos principle is required if one of SASL set for
-         * secirity protocol
-         */
-        TestRunner runner = TestRunners.newTestRunner(publishKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
-        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.SECURITY_PROTOCOL, 
PublishKafka.SEC_SASL_PLAINTEXT);
-        try {
-            runner.run();
-            fail();
-        } catch (Throwable e) {
-            assertTrue(e.getMessage().contains("'Kerberos Service Name' is 
invalid because"));
-        }
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateSingleCharacterDemarcatedMessages() {
-        String topicName = "validateSingleCharacterDemarcatedMessages";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
-        runner.setProperty(PublishKafka.KEY, "key1");
-        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
-
-        runner.enqueue("Hello 
World\nGoodbye\n1\n2\n3\n4\n5".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void 
validateMultiCharacterDemarcatedMessagesAndCustomPartitionerA() {
-        String topicName = 
"validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
-        runner.setProperty(PublishKafka.KEY, "key1");
-        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.PARTITION_CLASS, 
Partitioners.RoundRobinPartitioner.class.getName());
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "foo");
-
-        runner.enqueue("Hello 
WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
-
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void 
validateMultiCharacterDemarcatedMessagesAndCustomPartitionerB() {
-        String topicName = 
"validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
-        StubPublishKafka putKafka = new StubPublishKafka(1);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
-        runner.setProperty(PublishKafka.KEY, "key1");
-        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.PARTITION_CLASS, 
Partitioners.RoundRobinPartitioner.class.getName());
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "foo");
-
-        runner.enqueue("Hello 
WorldfooGoodbyefoo1foo2foo3foo4foo5".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(7)).send(Mockito.any(ProducerRecord.class));
-
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateOnSendFailureAndThenResendSuccessA() throws Exception {
-        String topicName = "validateSendFailureAndThenResendSuccess";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
-        runner.setProperty(PublishKafka.KEY, "key1");
-        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
-        runner.setProperty(PublishKafka.META_WAIT_TIME, "3000 millis");
-
-        final String text = "Hello World\nGoodbye\nfail\n2";
-        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        assertEquals(1, runner.getQueueSize().getObjectCount()); // due to 
failure
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-        putKafka.destroy();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateOnSendFailureAndThenResendSuccessB() throws Exception {
-        String topicName = "validateSendFailureAndThenResendSuccess";
-        StubPublishKafka putKafka = new StubPublishKafka(1);
-
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
-        runner.setProperty(PublishKafka.KEY, "key1");
-        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
-        runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
-
-        final String text = "Hello World\nGoodbye\nfail\n2";
-        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        assertEquals(1, runner.getQueueSize().getObjectCount()); // due to 
failure
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void 
validateOnFutureGetFailureAndThenResendSuccessFirstMessageFail() throws 
Exception {
-        String topicName = "validateSendFailureAndThenResendSuccess";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
-        runner.setProperty(PublishKafka.KEY, "key1");
-        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
-        runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
-
-        final String text = "futurefail\nHello World\nGoodbye\n2";
-        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        MockFlowFile ff = 
runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).get(0);
-        assertNotNull(ff);
-        runner.enqueue(ff);
-
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        // 6 sends due to duplication
-        verify(producer, times(5)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateOnFutureGetFailureAndThenResendSuccess() throws 
Exception {
-        String topicName = "validateSendFailureAndThenResendSuccess";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
-        runner.setProperty(PublishKafka.KEY, "key1");
-        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
-        runner.setProperty(PublishKafka.META_WAIT_TIME, "500 millis");
-
-        final String text = "Hello World\nGoodbye\nfuturefail\n2";
-        runner.enqueue(text.getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-        MockFlowFile ff = 
runner.getFlowFilesForRelationship(PublishKafka.REL_FAILURE).get(0);
-        assertNotNull(ff);
-        runner.enqueue(ff);
-
-        runner.run(1, false);
-        assertEquals(0, runner.getQueueSize().getObjectCount());
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        // 6 sends due to duplication
-        verify(producer, times(6)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateDemarcationIntoEmptyMessages() {
-        String topicName = "validateDemarcationIntoEmptyMessages";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        final TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.KEY, "key1");
-        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
-        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "\n");
-
-        final byte[] bytes = 
"\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes(StandardCharsets.UTF_8);
-        runner.enqueue(bytes);
-        runner.run(1);
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateComplexRightPartialDemarcatedMessages() {
-        String topicName = "validateComplexRightPartialDemarcatedMessages";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
-        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "僠<僠
WILDSTUFF僠>僠");
-
-        runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠
WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠
>".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(3)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateComplexLeftPartialDemarcatedMessages() {
-        String topicName = "validateComplexLeftPartialDemarcatedMessages";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
-        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "僠<僠
WILDSTUFF僠>僠");
-
-        runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠
WILDSTUFF僠>僠I Mean IT!僠<僠WILDSTUFF僠>僠<僠WILDSTUFF僠>僠
".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-
-        runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(4)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void validateComplexPartialMatchDemarcatedMessages() {
-        String topicName = "validateComplexPartialMatchDemarcatedMessages";
-        StubPublishKafka putKafka = new StubPublishKafka(100);
-        TestRunner runner = TestRunners.newTestRunner(putKafka);
-        runner.setProperty(PublishKafka.TOPIC, topicName);
-        runner.setProperty(PublishKafka.CLIENT_ID, "foo");
-        runner.setProperty(PublishKafka.BOOTSTRAP_SERVERS, "localhost:1234");
-        runner.setProperty(PublishKafka.MESSAGE_DEMARCATOR, "僠<僠
WILDSTUFF僠>僠");
-
-        runner.enqueue("Hello World僠<僠WILDSTUFF僠>僠Goodbye僠<僠
WILDBOOMSTUFF僠>僠".getBytes(StandardCharsets.UTF_8));
-        runner.run(1, false);
-
-        runner.assertAllFlowFilesTransferred(PublishKafka.REL_SUCCESS, 1);
-        Producer<byte[], byte[]> producer = putKafka.getProducer();
-        verify(producer, times(2)).send(Mockito.any(ProducerRecord.class));
-        runner.shutdown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
deleted file mode 100644
index 5cee6bc..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/PublishingContextTest.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-import java.io.InputStream;
-import java.nio.charset.StandardCharsets;
-
-import org.junit.Test;
-
-public class PublishingContextTest {
-
-    @Test
-    public void failInvalidConstructorArgs() {
-        try {
-            new PublishingContext(null, null);
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-        try {
-            new PublishingContext(mock(InputStream.class), null);
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-
-        try {
-            new PublishingContext(mock(InputStream.class), "");
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-
-        try {
-            new PublishingContext(mock(InputStream.class), "mytopic", -3);
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-    }
-
-    @Test
-    public void validateFullSetting() {
-        PublishingContext publishingContext = new 
PublishingContext(mock(InputStream.class), "topic", 3);
-        
publishingContext.setDelimiterBytes("delimiter".getBytes(StandardCharsets.UTF_8));
-        publishingContext.setKeyBytes("key".getBytes(StandardCharsets.UTF_8));
-
-        assertEquals("delimiter", new 
String(publishingContext.getDelimiterBytes(), StandardCharsets.UTF_8));
-        assertEquals("key", new String(publishingContext.getKeyBytes(), 
StandardCharsets.UTF_8));
-        assertEquals("topic", publishingContext.getTopic());
-        assertEquals("topic: 'topic'; delimiter: 'delimiter'", 
publishingContext.toString());
-    }
-
-    @Test
-    public void validateOnlyOnceSetPerInstance() {
-        PublishingContext publishingContext = new 
PublishingContext(mock(InputStream.class), "topic");
-        publishingContext.setKeyBytes(new byte[] { 0 });
-        try {
-            publishingContext.setKeyBytes(new byte[] { 0 });
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-
-        publishingContext.setDelimiterBytes(new byte[] { 0 });
-        try {
-            publishingContext.setDelimiterBytes(new byte[] { 0 });
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-
-        publishingContext.setMaxRequestSize(1024);
-        try {
-            publishingContext.setMaxRequestSize(1024);
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-
-        try {
-            publishingContext.setMaxRequestSize(-10);
-            fail();
-        } catch (IllegalArgumentException e) {
-            // success
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubConsumeKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubConsumeKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubConsumeKafka.java
deleted file mode 100644
index 0f0b23f..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubConsumeKafka.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-public class StubConsumeKafka extends ConsumeKafka {
-
-    private byte[][] values;
-
-    public void setValues(byte[][] values) {
-        this.values = values;
-    }
-
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected Consumer<byte[], byte[]> buildKafkaResource(ProcessContext 
context, ProcessSession session) {
-        Consumer<byte[], byte[]> consumer = super.buildKafkaResource(context, 
session);
-        consumer = mock(Consumer.class);
-        String topicName = 
context.getProperty(TOPIC).evaluateAttributeExpressions().getValue();
-
-        when(consumer.poll(Mockito.anyLong())).thenAnswer(new 
Answer<ConsumerRecords<byte[], byte[]>>() {
-            @Override
-            public ConsumerRecords<byte[], byte[]> answer(InvocationOnMock 
invocation) throws Throwable {
-                List<ConsumerRecord<byte[], byte[]>> records = new 
ArrayList<>();
-                for (int i = 0; i < StubConsumeKafka.this.values.length; i++) {
-                    byte[] value = StubConsumeKafka.this.values[i];
-                    ConsumerRecord<byte[], byte[]> record = new 
ConsumerRecord<>(topicName, 0, 0, null, value);
-                    records.add(record);
-                }
-                TopicPartition partition = new TopicPartition(topicName, 0);
-                Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> m = 
new LinkedHashMap<>();
-                m.put(partition, records);
-                return new ConsumerRecords<>(m);
-            }
-        });
-
-        return consumer;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
deleted file mode 100644
index 3189356..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/StubPublishKafka.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.lang.reflect.Field;
-import java.nio.charset.StandardCharsets;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TopicAuthorizationException;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-public class StubPublishKafka extends PublishKafka {
-
-    private volatile Producer<byte[], byte[]> producer;
-
-    private volatile boolean failed;
-
-    private final int ackCheckSize;
-
-    private final ExecutorService executor = Executors.newCachedThreadPool();
-
-    StubPublishKafka(int ackCheckSize) {
-        this.ackCheckSize = ackCheckSize;
-    }
-
-    public Producer<byte[], byte[]> getProducer() {
-        return producer;
-    }
-
-    public void destroy() {
-        this.executor.shutdownNow();
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected KafkaPublisher buildKafkaResource(ProcessContext context, 
ProcessSession session)
-            throws ProcessException {
-        Properties kafkaProperties = this.buildKafkaProperties(context);
-        kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
-        kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
-        KafkaPublisher publisher;
-        try {
-            Field f = PublishKafka.class.getDeclaredField("brokers");
-            f.setAccessible(true);
-            f.set(this, 
context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue());
-            publisher = (KafkaPublisher) 
TestUtils.getUnsafe().allocateInstance(KafkaPublisher.class);
-            publisher.setAckWaitTime(15000);
-            producer = mock(Producer.class);
-            this.instrumentProducer(producer, false);
-            Field kf = KafkaPublisher.class.getDeclaredField("kafkaProducer");
-            kf.setAccessible(true);
-            kf.set(publisher, producer);
-
-            Field componentLogF = 
KafkaPublisher.class.getDeclaredField("componentLog");
-            componentLogF.setAccessible(true);
-            componentLogF.set(publisher, mock(ComponentLog.class));
-
-            Field ackCheckSizeField = 
KafkaPublisher.class.getDeclaredField("ackCheckSize");
-            ackCheckSizeField.setAccessible(true);
-            ackCheckSizeField.set(publisher, this.ackCheckSize);
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new IllegalStateException(e);
-        }
-        return publisher;
-    }
-
-    @SuppressWarnings("unchecked")
-    private void instrumentProducer(Producer<byte[], byte[]> producer, boolean 
failRandomly) {
-
-        when(producer.send(Mockito.any(ProducerRecord.class))).then(new 
Answer<Future<RecordMetadata>>() {
-            @Override
-            public Future<RecordMetadata> answer(InvocationOnMock invocation) 
throws Throwable {
-                ProducerRecord<byte[], byte[]> record = 
(ProducerRecord<byte[], byte[]>) invocation.getArguments()[0];
-                String value = new String(record.value(), 
StandardCharsets.UTF_8);
-                if ("fail".equals(value) && !StubPublishKafka.this.failed) {
-                    StubPublishKafka.this.failed = true;
-                    throw new RuntimeException("intentional");
-                }
-                Future<RecordMetadata> future = executor.submit(new 
Callable<RecordMetadata>() {
-                    @Override
-                    public RecordMetadata call() throws Exception {
-                        if ("futurefail".equals(value) && 
!StubPublishKafka.this.failed) {
-                            StubPublishKafka.this.failed = true;
-                            throw new 
TopicAuthorizationException("Unauthorized");
-                        } else {
-                            TopicPartition partition = new 
TopicPartition("foo", 0);
-                            RecordMetadata meta = new 
RecordMetadata(partition, 0, 0);
-                            return meta;
-                        }
-                    }
-                });
-                return future;
-            }
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/1745c127/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
deleted file mode 100644
index b056a08..0000000
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestUtils.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-
-import sun.misc.Unsafe;
-
-class TestUtils {
-
-    public static void setFinalField(Field field, Object instance, Object 
newValue) throws Exception {
-        field.setAccessible(true);
-        Field modifiersField = Field.class.getDeclaredField("modifiers");
-        modifiersField.setAccessible(true);
-        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
-
-        field.set(instance, newValue);
-    }
-
-
-    static Unsafe getUnsafe() {
-        try {
-            Field f = Unsafe.class.getDeclaredField("theUnsafe");
-            f.setAccessible(true);
-            return (Unsafe) f.get(null);
-        } catch (Exception e) {
-            throw new IllegalStateException(e);
-        }
-    }
-
-}

Reply via email to