NIFI-4664, NIFI-4662, NIFI-4660, NIFI-4659 moved tests which are 
timing/threading/network dependent and brittle to integration tests and 
un-ignored tests that are IT. Updated travis to reduce impact on infra and 
appveyor now skips test runs so is just to prove build works on windows. This 
closes #2319

squash


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cdc1facf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cdc1facf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cdc1facf

Branch: refs/heads/master
Commit: cdc1facf398e6d99cc746b70350ff754e2a975ad
Parents: c730f80
Author: joewitt <[email protected]>
Authored: Tue Dec 5 14:44:07 2017 -0500
Committer: Matt Gilman <[email protected]>
Committed: Wed Dec 6 10:53:09 2017 -0500

----------------------------------------------------------------------
 .travis.yml                                     |   4 +-
 appveyor.yml                                    |   4 +-
 .../util/list/ITAbstractListProcessor.java      | 426 +++++++++++++++++++
 .../util/list/TestAbstractListProcessor.java    | 353 +--------------
 .../kafka/pubsub/ConsumeKafkaTest.java          |  98 -----
 .../processors/kafka/pubsub/ITConsumeKafka.java | 135 ++++++
 .../kafka/pubsub/ConsumeKafkaTest.java          |  98 -----
 .../processors/kafka/pubsub/ITConsumeKafka.java | 135 ++++++
 .../kafka/pubsub/ConsumeKafkaTest.java          | 110 -----
 .../processors/kafka/pubsub/ITConsumeKafka.java | 150 +++++++
 .../kafka/pubsub/ConsumeKafkaTest.java          |  98 -----
 .../processors/kafka/pubsub/ITConsumeKafka.java | 135 ++++++
 .../ITLumberjackSocketChannelHandler.java       | 207 +++++++++
 .../TestLumberjackSocketChannelHandler.java     | 207 ---------
 .../standard/ITListenSyslogGroovy.groovy        | 111 +++++
 .../standard/ListenSyslogGroovyTest.groovy      | 111 -----
 .../processors/standard/ITListenSyslog.java     | 402 +++++++++++++++++
 .../processors/standard/TestListenSyslog.java   | 356 +---------------
 pom.xml                                         |   4 +-
 19 files changed, 1717 insertions(+), 1427 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index d7f0f03..e5b0ccb 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -19,8 +19,6 @@ env:
   - USER_LANGUAGE=en USER_REGION=US'
   - USER_LANGUAGE=fr USER_REGION=FR'
   - USER_LANGUAGE=ja USER_REGION=JP'
-  - USER_LANGUAGE=pt USER_REGION=BR'
-  - USER_LANGUAGE=default USER_REGION=default
 
 os:
   - linux
@@ -54,4 +52,4 @@ install:
   # Note: The reason the sed is done as part of script is to ensure the pom 
hack 
   # won't affect the 'clean install' above
   - bash .travis.sh
-  - mvn -T 2C -Pcontrib-check -Ddir-only clean install
\ No newline at end of file
+  - mvn -T 2C -Pcontrib-check -Ddir-only clean install

http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/appveyor.yml
----------------------------------------------------------------------
diff --git a/appveyor.yml b/appveyor.yml
index c7aa2f2..3e31c19 100644
--- a/appveyor.yml
+++ b/appveyor.yml
@@ -32,9 +32,7 @@ install:
   - cmd: SET MAVEN_OPTS=-XX:MaxPermSize=2g -Xmx4g
   - cmd: SET JAVA_OPTS=-XX:MaxPermSize=2g -Xmx4g
 build_script:
-  - mvn -q clean package --batch-mode -DskipTests
-test_script:
-  - mvn -q clean install --batch-mode -Pcontrib-check
+  - mvn clean package --batch-mode -DskipTests -Ddir-only
 cache:
   - C:\maven\
   - C:\Users\appveyor\.m2

http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java
new file mode 100644
index 0000000..dcf47c6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.list;
+
+import org.apache.nifi.components.state.Scope;
+import org.apache.nifi.components.state.StateMap;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestWatcher;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MILLIS;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MINUTES;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_SECONDS;
+import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION;
+import 
org.apache.nifi.processor.util.list.TestAbstractListProcessor.ConcreteListProcessor;
+import 
org.apache.nifi.processor.util.list.TestAbstractListProcessor.DistributedCache;
+import static org.junit.Assert.assertEquals;
+
+public class ITAbstractListProcessor {
+
+    /**
+     * @return current timestamp in milliseconds, but truncated at specified
+     * target precision (e.g. SECONDS or MINUTES).
+     */
+    private static long getCurrentTimestampMillis(final TimeUnit 
targetPrecision) {
+        final long timestampInTargetPrecision = 
targetPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+        return TimeUnit.MILLISECONDS.convert(timestampInTargetPrecision, 
targetPrecision);
+    }
+
+    private static long getSleepMillis(final TimeUnit targetPrecision) {
+        return AbstractListProcessor.LISTING_LAG_MILLIS.get(targetPrecision) * 
2;
+    }
+
+    private static final long DEFAULT_SLEEP_MILLIS = 
getSleepMillis(TimeUnit.MILLISECONDS);
+
+    private ConcreteListProcessor proc;
+    private TestRunner runner;
+
+    @Rule
+    public TestWatcher dumpState = new ListProcessorTestWatcher(
+            () -> {
+                try {
+                    return 
runner.getStateManager().getState(Scope.LOCAL).toMap();
+                } catch (IOException e) {
+                    throw new RuntimeException("Failed to retrieve state", e);
+                }
+            },
+            () -> proc.entities,
+            () -> 
runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m
 -> (FlowFile) m).collect(Collectors.toList())
+    );
+
+    @Before
+    public void setup() {
+        proc = new ConcreteListProcessor();
+        runner = TestRunners.newTestRunner(proc);
+    }
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder();
+
+    /**
+     * <p>
+     * Ensures that files are listed when those are old enough:
+     * <li>Files with last modified timestamp those are old enough to determine
+     * that those are completely written and no further files are expected to 
be
+     * added with the same timestamp.</li>
+     * <li>This behavior is expected when a processor is scheduled less
+     * frequently, such as hourly or daily.</li>
+     * </p>
+     */
+    @Test
+    public void testAllExistingEntriesEmittedOnFirstIteration() throws 
Exception {
+        final long oldTimestamp = System.currentTimeMillis() - 
getSleepMillis(TimeUnit.MILLISECONDS);
+
+        // These entries have existed before the processor runs at the first 
time.
+        proc.addEntity("name", "id", oldTimestamp);
+        proc.addEntity("name", "id2", oldTimestamp);
+
+        // First run, the above listed entries should be emitted since it has 
existed.
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+        runner.clearTransferState();
+
+        // Ensure we have covered the necessary lag period to avoid issues 
where the processor was immediately scheduled to run again
+        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+        // Run again without introducing any new entries
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+    }
+
+    private void testPreviouslySkippedEntriesEmmitedOnNextIteration(final 
TimeUnit targetPrecision) throws InterruptedException {
+        runner.run();
+
+        final long initialTimestamp = 
getCurrentTimestampMillis(targetPrecision);
+
+        setTargetSystemTimestampPrecision(targetPrecision);
+
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        proc.addEntity("name", "id", initialTimestamp);
+        proc.addEntity("name", "id2", initialTimestamp);
+        runner.run();
+
+        // First run, the above listed entries would be skipped to avoid write 
synchronization issues
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Ensure we have covered the necessary lag period to avoid issues 
where the processor was immediately scheduled to run again
+        Thread.sleep(getSleepMillis(targetPrecision));
+
+        // Run again without introducing any new entries
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+    }
+
+    /**
+     * <p>
+     * Ensures that newly created files should wait to confirm there is no more
+     * files created with the same timestamp:
+     * <li>If files have the latest modified timestamp at an iteration, then
+     * those should be postponed to be listed</li>
+     * <li>If those files still are the latest files at the next iteration, 
then
+     * those should be listed</li>
+     * </p>
+     */
+    @Test
+    public void 
testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision() throws 
Exception {
+        
testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Same as
+     * {@link 
#testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()}
+     * but simulates that the target filesystem only provide timestamp 
precision
+     * in Seconds.
+     */
+    @Test
+    public void 
testPreviouslySkippedEntriesEmittedOnNextIterationSecondPrecision() throws 
Exception {
+        testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.SECONDS);
+    }
+
+    /**
+     * Same as
+     * {@link 
#testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()}
+     * but simulates that the target filesystem only provide timestamp 
precision
+     * in Minutes.
+     */
+    @Test
+    public void 
testPreviouslySkippedEntriesEmittedOnNextIterationMinutesPrecision() throws 
Exception {
+        testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MINUTES);
+    }
+
+    private void testOnlyNewEntriesEmitted(final TimeUnit targetPrecision) 
throws InterruptedException {
+
+        final long initialTimestamp = 
getCurrentTimestampMillis(targetPrecision);
+
+        setTargetSystemTimestampPrecision(targetPrecision);
+
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        proc.addEntity("name", "id", initialTimestamp);
+        proc.addEntity("name", "id2", initialTimestamp);
+        runner.run();
+
+        // First run, the above listed entries would be skipped to avoid write 
synchronization issues
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Ensure we have covered the necessary lag period to avoid issues 
where the processor was immediately scheduled to run again
+        Thread.sleep(getSleepMillis(targetPrecision));
+
+        // Running again, our two previously seen files are now cleared to be 
released
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+        runner.clearTransferState();
+
+        // Verify no new old files show up
+        proc.addEntity("name", "id2", initialTimestamp);
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // An entry that is older than already processed entry should not be 
listed.
+        proc.addEntity("name", "id3", initialTimestamp - 
targetPrecision.toMillis(1));
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // If an entry whose timestamp is the same with the last processed 
timestamp should not be listed.
+        proc.addEntity("name", "id2", initialTimestamp);
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Now a new file beyond the current time enters
+        proc.addEntity("name", "id2", initialTimestamp + 
targetPrecision.toMillis(1));
+
+        // It should show up
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        runner.clearTransferState();
+    }
+
+    private void setTargetSystemTimestampPrecision(TimeUnit targetPrecision) {
+        switch (targetPrecision) {
+            case MINUTES:
+                runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, 
PRECISION_MINUTES);
+                break;
+            case SECONDS:
+                runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, 
PRECISION_SECONDS);
+                break;
+            case MILLISECONDS:
+                runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, 
PRECISION_MILLIS);
+                break;
+        }
+    }
+
+    @Test
+    public void testOnlyNewEntriesEmittedMillisPrecision() throws Exception {
+        testOnlyNewEntriesEmitted(TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    public void testOnlyNewEntriesEmittedSecondPrecision() throws Exception {
+        testOnlyNewEntriesEmitted(TimeUnit.SECONDS);
+    }
+
+    @Test
+    public void testOnlyNewEntriesEmittedMinutesPrecision() throws Exception {
+        testOnlyNewEntriesEmitted(TimeUnit.MINUTES);
+    }
+
+    @Test
+    public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() 
throws Exception {
+
+        final long initialTimestamp = System.currentTimeMillis();
+
+        proc.addEntity("name", "id", initialTimestamp);
+        proc.addEntity("name", "id2", initialTimestamp);
+
+        // Emulate having state but not having had the processor run such as 
in a restart
+        final Map<String, String> preexistingState = new HashMap<>();
+        
preexistingState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, 
Long.toString(initialTimestamp));
+        
preexistingState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY,
 Long.toString(initialTimestamp));
+        preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", 
"id");
+        preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", 
"id2");
+        runner.getStateManager().setState(preexistingState, Scope.CLUSTER);
+
+        // run for the first time
+        runner.run();
+
+        // First run, the above listed entries would be skipped
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Ensure we have covered the necessary lag period to avoid issues 
where the processor was immediately scheduled to run again
+        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+        // Running again, these files should be eligible for transfer and 
again skipped
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Verify no new old files show up
+        proc.addEntity("name", "id2", initialTimestamp);
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id3", initialTimestamp - 1);
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        proc.addEntity("name", "id2", initialTimestamp);
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Now a new file beyond the current time enters
+        proc.addEntity("name", "id2", initialTimestamp + 1);
+
+        // It should now show up
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testStateStoredInClusterStateManagement() throws Exception {
+
+        final DistributedCache cache = new DistributedCache();
+        runner.addControllerService("cache", cache);
+        runner.enableControllerService(cache);
+        runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, 
"cache");
+
+        final long initialTimestamp = System.currentTimeMillis();
+
+        proc.addEntity("name", "id", initialTimestamp);
+        runner.run();
+
+        final Map<String, String> expectedState = new HashMap<>();
+        // Ensure only timestamp is migrated
+        
expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, 
String.valueOf(initialTimestamp));
+        
expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY,
 "0");
+        runner.getStateManager().assertStateEquals(expectedState, 
Scope.CLUSTER);
+
+        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+        runner.run();
+        // Ensure only timestamp is migrated
+        
expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, 
String.valueOf(initialTimestamp));
+        
expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY,
 String.valueOf(initialTimestamp));
+        expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", 
"id");
+        runner.getStateManager().assertStateEquals(expectedState, 
Scope.CLUSTER);
+    }
+
+    @Test
+    public void testResumeListingAfterClearingState() throws Exception {
+
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+
+        final long initialEventTimestamp = System.currentTimeMillis();
+        proc.addEntity("name", "id", initialEventTimestamp);
+        proc.addEntity("name", "id2", initialEventTimestamp);
+
+        // Add entities but these should not be transferred as they are the 
latest values
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+
+        // after providing a pause in listings, the files should now  transfer
+        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+        runner.clearTransferState();
+
+        // Verify entities are not transferred again for the given state
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        // Clear state for this processor, eradicating timestamp
+        runner.getStateManager().clear(Scope.CLUSTER);
+        Assert.assertEquals("State is not empty for this component after 
clearing", 0, runner.getStateManager().getState(Scope.CLUSTER).toMap().size());
+
+        // Ensure the original files are now transferred again.
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testOnlyNewStateStored() throws Exception {
+
+        runner.run();
+
+        final long initialTimestamp = System.currentTimeMillis();
+
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        proc.addEntity("name", "id", initialTimestamp);
+        proc.addEntity("name", "id2", initialTimestamp);
+
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
+        runner.clearTransferState();
+
+        Thread.sleep(DEFAULT_SLEEP_MILLIS);
+
+        runner.run();
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
+        runner.clearTransferState();
+
+        final StateMap stateMap = 
runner.getStateManager().getState(Scope.CLUSTER);
+        assertEquals(2, stateMap.getVersion());
+
+        final Map<String, String> map = stateMap.toMap();
+        // Ensure timestamp and identifiers are migrated
+        assertEquals(4, map.size());
+        assertEquals(Long.toString(initialTimestamp), 
map.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
+        assertEquals(Long.toString(initialTimestamp), 
map.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
+        assertEquals("id", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + 
".0"));
+        assertEquals("id2", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + 
".1"));
+
+        proc.addEntity("new name", "new id", initialTimestamp + 1);
+        runner.run();
+
+        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        StateMap updatedStateMap = 
runner.getStateManager().getState(Scope.CLUSTER);
+        assertEquals(3, updatedStateMap.getVersion());
+
+        assertEquals(3, updatedStateMap.toMap().size());
+        assertEquals(Long.toString(initialTimestamp + 1), 
updatedStateMap.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
+        // Processed timestamp is now caught up
+        assertEquals(Long.toString(initialTimestamp + 1), 
updatedStateMap.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
+        assertEquals("new id", 
updatedStateMap.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
index 4a376e2..f5eae46 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
@@ -19,7 +19,6 @@ package org.apache.nifi.processor.util.list;
 
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.Scope;
-import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.distributed.cache.client.Deserializer;
 import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
@@ -33,7 +32,6 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -54,10 +52,6 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
-import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MILLIS;
-import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MINUTES;
-import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_SECONDS;
-import static 
org.apache.nifi.processor.util.list.AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION;
 import static org.junit.Assert.assertEquals;
 
 public class TestAbstractListProcessor {
@@ -101,257 +95,6 @@ public class TestAbstractListProcessor {
     @Rule
     public final TemporaryFolder testFolder = new TemporaryFolder();
 
-    /**
-     * <p>Ensures that files are listed when those are old enough:
-     *   <li>Files with last modified timestamp those are old enough to 
determine that those are completely written
-     *     and no further files are expected to be added with the same 
timestamp.</li>
-     *   <li>This behavior is expected when a processor is scheduled less 
frequently, such as hourly or daily.</li>
-     * </p>
-     */
-    @Test
-    public void testAllExistingEntriesEmittedOnFirstIteration() throws 
Exception {
-        final long oldTimestamp = System.currentTimeMillis() - 
getSleepMillis(TimeUnit.MILLISECONDS);
-
-        // These entries have existed before the processor runs at the first 
time.
-        proc.addEntity("name", "id", oldTimestamp);
-        proc.addEntity("name", "id2", oldTimestamp);
-
-        // First run, the above listed entries should be emitted since it has 
existed.
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
-        runner.clearTransferState();
-
-        // Ensure we have covered the necessary lag period to avoid issues 
where the processor was immediately scheduled to run again
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        // Run again without introducing any new entries
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-    }
-
-    private void testPreviouslySkippedEntriesEmmitedOnNextIteration(final 
TimeUnit targetPrecision) throws InterruptedException {
-        runner.run();
-
-        final long initialTimestamp = 
getCurrentTimestampMillis(targetPrecision);
-
-        setTargetSystemTimestampPrecision(targetPrecision);
-
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        proc.addEntity("name", "id", initialTimestamp);
-        proc.addEntity("name", "id2", initialTimestamp);
-        runner.run();
-
-        // First run, the above listed entries would be skipped to avoid write 
synchronization issues
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        // Ensure we have covered the necessary lag period to avoid issues 
where the processor was immediately scheduled to run again
-        Thread.sleep(getSleepMillis(targetPrecision));
-
-        // Run again without introducing any new entries
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
-    }
-
-    /**
-     * <p>Ensures that newly created files should wait to confirm there is no 
more files created with the same timestamp:
-     *   <li>If files have the latest modified timestamp at an iteration, then 
those should be postponed to be listed</li>
-     *   <li>If those files still are the latest files at the next iteration, 
then those should be listed</li>
-     * </p>
-     */
-    @Test
-    public void 
testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision() throws 
Exception {
-        
testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MILLISECONDS);
-    }
-
-    /**
-     * Same as {@link 
#testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} but 
simulates that the target
-     * filesystem only provide timestamp precision in Seconds.
-     */
-    @Test
-    public void 
testPreviouslySkippedEntriesEmittedOnNextIterationSecondPrecision() throws 
Exception {
-        testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.SECONDS);
-    }
-
-    /**
-     * Same as {@link 
#testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} but 
simulates that the target
-     * filesystem only provide timestamp precision in Minutes.
-     * This test is ignored because it needs to wait two minutes. Not good for 
automated unit testing, but still valuable when executed manually.
-     */
-    @Ignore
-    @Test
-    public void 
testPreviouslySkippedEntriesEmittedOnNextIterationMinutesPrecision() throws 
Exception {
-        testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MINUTES);
-    }
-
-    private void testOnlyNewEntriesEmitted(final TimeUnit targetPrecision) 
throws InterruptedException {
-
-        final long initialTimestamp = 
getCurrentTimestampMillis(targetPrecision);
-
-        setTargetSystemTimestampPrecision(targetPrecision);
-
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        proc.addEntity("name", "id", initialTimestamp);
-        proc.addEntity("name", "id2", initialTimestamp);
-        runner.run();
-
-        // First run, the above listed entries would be skipped to avoid write 
synchronization issues
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        // Ensure we have covered the necessary lag period to avoid issues 
where the processor was immediately scheduled to run again
-        Thread.sleep(getSleepMillis(targetPrecision));
-
-        // Running again, our two previously seen files are now cleared to be 
released
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
-        runner.clearTransferState();
-
-        // Verify no new old files show up
-        proc.addEntity("name", "id2", initialTimestamp);
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        // An entry that is older than already processed entry should not be 
listed.
-        proc.addEntity("name", "id3", initialTimestamp - 
targetPrecision.toMillis(1));
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        // If an entry whose timestamp is the same with the last processed 
timestamp should not be listed.
-        proc.addEntity("name", "id2", initialTimestamp);
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        // Now a new file beyond the current time enters
-        proc.addEntity("name", "id2", initialTimestamp + 
targetPrecision.toMillis(1));
-
-        // It should show up
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
-        runner.clearTransferState();
-    }
-
-    private void setTargetSystemTimestampPrecision(TimeUnit targetPrecision) {
-        switch (targetPrecision) {
-            case MINUTES:
-                runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, 
PRECISION_MINUTES);
-                break;
-            case SECONDS:
-                runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, 
PRECISION_SECONDS);
-                break;
-            case MILLISECONDS:
-                runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, 
PRECISION_MILLIS);
-                break;
-        }
-    }
-
-    @Test
-    public void testOnlyNewEntriesEmittedMillisPrecision() throws Exception {
-        testOnlyNewEntriesEmitted(TimeUnit.MILLISECONDS);
-    }
-
-    @Test
-    public void testOnlyNewEntriesEmittedSecondPrecision() throws Exception {
-        testOnlyNewEntriesEmitted(TimeUnit.SECONDS);
-    }
-
-    /**
-     * This test is ignored because it needs to wait two minutes. Not good for 
automated unit testing, but still valuable when executed manually.
-     */
-    @Ignore
-    @Test
-    public void testOnlyNewEntriesEmittedMinutesPrecision() throws Exception {
-        testOnlyNewEntriesEmitted(TimeUnit.MINUTES);
-    }
-
-    @Test
-    public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() 
throws Exception {
-
-        final long initialTimestamp = System.currentTimeMillis();
-
-        proc.addEntity("name", "id", initialTimestamp);
-        proc.addEntity("name", "id2", initialTimestamp);
-
-        // Emulate having state but not having had the processor run such as 
in a restart
-        final Map<String, String> preexistingState = new HashMap<>();
-        
preexistingState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, 
Long.toString(initialTimestamp));
-        
preexistingState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY,
 Long.toString(initialTimestamp));
-        preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", 
"id");
-        preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", 
"id2");
-        runner.getStateManager().setState(preexistingState, Scope.CLUSTER);
-
-        // run for the first time
-        runner.run();
-
-        // First run, the above listed entries would be skipped
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        // Ensure we have covered the necessary lag period to avoid issues 
where the processor was immediately scheduled to run again
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        // Running again, these files should be eligible for transfer and 
again skipped
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        // Verify no new old files show up
-        proc.addEntity("name", "id2", initialTimestamp);
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        proc.addEntity("name", "id3", initialTimestamp - 1);
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        proc.addEntity("name", "id2", initialTimestamp);
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        // Now a new file beyond the current time enters
-        proc.addEntity("name", "id2", initialTimestamp + 1);
-
-        // It should now show up
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
-        runner.clearTransferState();
-    }
-
-    @Test
-    public void testStateStoredInClusterStateManagement() throws Exception {
-
-        final DistributedCache cache = new DistributedCache();
-        runner.addControllerService("cache", cache);
-        runner.enableControllerService(cache);
-        runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, 
"cache");
-
-        final long initialTimestamp = System.currentTimeMillis();
-
-        proc.addEntity("name", "id", initialTimestamp);
-        runner.run();
-
-        final Map<String, String> expectedState = new HashMap<>();
-        // Ensure only timestamp is migrated
-        
expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, 
String.valueOf(initialTimestamp));
-        
expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY,
 "0");
-        runner.getStateManager().assertStateEquals(expectedState, 
Scope.CLUSTER);
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
-        // Ensure only timestamp is migrated
-        
expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, 
String.valueOf(initialTimestamp));
-        
expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY,
 String.valueOf(initialTimestamp));
-        expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", 
"id");
-        runner.getStateManager().assertStateEquals(expectedState, 
Scope.CLUSTER);
-    }
-
     @Test
     public void testStateMigratedFromCacheService() throws 
InitializationException {
 
@@ -416,41 +159,6 @@ public class TestAbstractListProcessor {
     }
 
     @Test
-    public void testResumeListingAfterClearingState() throws Exception {
-
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-
-        final long initialEventTimestamp = System.currentTimeMillis();
-        proc.addEntity("name", "id", initialEventTimestamp);
-        proc.addEntity("name", "id2", initialEventTimestamp);
-
-        // Add entities but these should not be transferred as they are the 
latest values
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-
-        // after providing a pause in listings, the files should now  transfer
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
-        runner.clearTransferState();
-
-        // Verify entities are not transferred again for the given state
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        // Clear state for this processor, eradicating timestamp
-        runner.getStateManager().clear(Scope.CLUSTER);
-        Assert.assertEquals("State is not empty for this component after 
clearing", 0, runner.getStateManager().getState(Scope.CLUSTER).toMap().size());
-
-        // Ensure the original files are now transferred again.
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
-        runner.clearTransferState();
-    }
-
-    @Test
     public void testFetchOnStart() throws InitializationException {
 
         final DistributedCache cache = new DistributedCache();
@@ -463,55 +171,7 @@ public class TestAbstractListProcessor {
         assertEquals(1, cache.fetchCount);
     }
 
-    @Test
-    public void testOnlyNewStateStored() throws Exception {
-
-        runner.run();
-
-        final long initialTimestamp = System.currentTimeMillis();
-
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        proc.addEntity("name", "id", initialTimestamp);
-        proc.addEntity("name", "id2", initialTimestamp);
-
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0);
-        runner.clearTransferState();
-
-        Thread.sleep(DEFAULT_SLEEP_MILLIS);
-
-        runner.run();
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2);
-        runner.clearTransferState();
-
-        final StateMap stateMap = 
runner.getStateManager().getState(Scope.CLUSTER);
-        assertEquals(2, stateMap.getVersion());
-
-        final Map<String, String> map = stateMap.toMap();
-        // Ensure timestamp and identifiers are migrated
-        assertEquals(4, map.size());
-        assertEquals(Long.toString(initialTimestamp), 
map.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
-        assertEquals(Long.toString(initialTimestamp), 
map.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
-        assertEquals("id", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + 
".0"));
-        assertEquals("id2", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + 
".1"));
-
-        proc.addEntity("new name", "new id", initialTimestamp + 1);
-        runner.run();
-
-        
runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1);
-        runner.clearTransferState();
-
-        StateMap updatedStateMap = 
runner.getStateManager().getState(Scope.CLUSTER);
-        assertEquals(3, updatedStateMap.getVersion());
-
-        assertEquals(3, updatedStateMap.toMap().size());
-        assertEquals(Long.toString(initialTimestamp + 1), 
updatedStateMap.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY));
-        // Processed timestamp is now caught up
-        assertEquals(Long.toString(initialTimestamp + 1), 
updatedStateMap.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY));
-        assertEquals("new id", 
updatedStateMap.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0"));
-    }
-
-    private static class DistributedCache extends AbstractControllerService 
implements DistributedMapCacheClient {
+    static class DistributedCache extends AbstractControllerService implements 
DistributedMapCacheClient {
         private final Map<Object, Object> stored = new HashMap<>();
         private int fetchCount = 0;
 
@@ -569,13 +229,12 @@ public class TestAbstractListProcessor {
         }
     }
 
+    static class ConcreteListProcessor extends 
AbstractListProcessor<ListableEntity> {
+        final List<ListableEntity> entities = new ArrayList<>();
 
-    private static class ConcreteListProcessor extends 
AbstractListProcessor<ListableEntity> {
-        private final List<ListableEntity> entities = new ArrayList<>();
-
-        public final String persistenceFilename = "ListProcessor-local-state-" 
+ UUID.randomUUID().toString() + ".json";
-        public String persistenceFolder = "target/";
-        public File persistenceFile = new File(persistenceFolder + 
persistenceFilename);
+        final String persistenceFilename = "ListProcessor-local-state-" + 
UUID.randomUUID().toString() + ".json";
+        String persistenceFolder = "target/";
+        File persistenceFile = new File(persistenceFolder + 
persistenceFilename);
 
         @Override
         public File getPersistenceFile() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index bd93e3b..3496ea0 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -18,17 +18,10 @@ package org.apache.nifi.processors.kafka.pubsub;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
@@ -100,97 +93,6 @@ public class ConsumeKafkaTest {
     }
 
     @Test
-    public void validateGetAllMessages() throws Exception {
-        String groupName = "validateGetAllMessages";
-
-        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
-        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, 
Boolean.TRUE, Boolean.FALSE);
-        when(mockLease.commit()).thenReturn(Boolean.TRUE);
-
-        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
-            @Override
-            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
-                return mockConsumerPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, 
ConsumeKafka_0_10.OFFSET_EARLIEST);
-        runner.run(1, false);
-
-        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
-        verify(mockLease, times(3)).continuePolling();
-        verify(mockLease, times(2)).poll();
-        verify(mockLease, times(1)).commit();
-        verify(mockLease, times(1)).close();
-        verifyNoMoreInteractions(mockConsumerPool);
-        verifyNoMoreInteractions(mockLease);
-    }
-
-    @Test
-    public void validateGetAllMessagesPattern() throws Exception {
-        String groupName = "validateGetAllMessagesPattern";
-
-        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
-        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, 
Boolean.TRUE, Boolean.FALSE);
-        when(mockLease.commit()).thenReturn(Boolean.TRUE);
-
-        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
-            @Override
-            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
-                return mockConsumerPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_0_10.TOPICS, "(fo.*)|(ba)");
-        runner.setProperty(ConsumeKafka_0_10.TOPIC_TYPE, "pattern");
-        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, 
ConsumeKafka_0_10.OFFSET_EARLIEST);
-        runner.run(1, false);
-
-        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
-        verify(mockLease, times(3)).continuePolling();
-        verify(mockLease, times(2)).poll();
-        verify(mockLease, times(1)).commit();
-        verify(mockLease, times(1)).close();
-        verifyNoMoreInteractions(mockConsumerPool);
-        verifyNoMoreInteractions(mockLease);
-    }
-
-    @Test
-    public void validateGetErrorMessages() throws Exception {
-        String groupName = "validateGetErrorMessages";
-
-        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
-        when(mockLease.continuePolling()).thenReturn(true, false);
-        when(mockLease.commit()).thenReturn(Boolean.FALSE);
-
-        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
-            @Override
-            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
-                return mockConsumerPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, 
ConsumeKafka_0_10.OFFSET_EARLIEST);
-        runner.run(1, false);
-
-        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
-        verify(mockLease, times(2)).continuePolling();
-        verify(mockLease, times(1)).poll();
-        verify(mockLease, times(1)).commit();
-        verify(mockLease, times(1)).close();
-        verifyNoMoreInteractions(mockConsumerPool);
-        verifyNoMoreInteractions(mockLease);
-    }
-
-    @Test
     public void testJaasConfiguration() throws Exception {
         ConsumeKafka_0_10 consumeKafka = new ConsumeKafka_0_10();
         TestRunner runner = TestRunners.newTestRunner(consumeKafka);

http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
new file mode 100644
index 0000000..b0c6eba
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ITConsumeKafka {
+
+    ConsumerLease mockLease = null;
+    ConsumerPool mockConsumerPool = null;
+
+    @Before
+    public void setup() {
+        mockLease = mock(ConsumerLease.class);
+        mockConsumerPool = mock(ConsumerPool.class);
+    }
+
+    @Test
+    public void validateGetAllMessages() throws Exception {
+        String groupName = "validateGetAllMessages";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, 
Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, 
ConsumeKafka_0_10.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void validateGetAllMessagesPattern() throws Exception {
+        String groupName = "validateGetAllMessagesPattern";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, 
Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_0_10.TOPICS, "(fo.*)|(ba)");
+        runner.setProperty(ConsumeKafka_0_10.TOPIC_TYPE, "pattern");
+        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, 
ConsumeKafka_0_10.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void validateGetErrorMessages() throws Exception {
+        String groupName = "validateGetErrorMessages";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(true, false);
+        when(mockLease.commit()).thenReturn(Boolean.FALSE);
+
+        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, 
ConsumeKafka_0_10.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
+        verify(mockLease, times(2)).continuePolling();
+        verify(mockLease, times(1)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index e460879..b1edd1f 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -18,17 +18,10 @@ package org.apache.nifi.processors.kafka.pubsub;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
@@ -100,97 +93,6 @@ public class ConsumeKafkaTest {
     }
 
     @Test
-    public void validateGetAllMessages() throws Exception {
-        String groupName = "validateGetAllMessages";
-
-        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
-        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, 
Boolean.TRUE, Boolean.FALSE);
-        when(mockLease.commit()).thenReturn(Boolean.TRUE);
-
-        ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() {
-            @Override
-            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
-                return mockConsumerPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_0_11.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, 
ConsumeKafka_0_11.OFFSET_EARLIEST);
-        runner.run(1, false);
-
-        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
-        verify(mockLease, times(3)).continuePolling();
-        verify(mockLease, times(2)).poll();
-        verify(mockLease, times(1)).commit();
-        verify(mockLease, times(1)).close();
-        verifyNoMoreInteractions(mockConsumerPool);
-        verifyNoMoreInteractions(mockLease);
-    }
-
-    @Test
-    public void validateGetAllMessagesPattern() throws Exception {
-        String groupName = "validateGetAllMessagesPattern";
-
-        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
-        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, 
Boolean.TRUE, Boolean.FALSE);
-        when(mockLease.commit()).thenReturn(Boolean.TRUE);
-
-        ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() {
-            @Override
-            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
-                return mockConsumerPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_0_11.TOPICS, "(fo.*)|(ba)");
-        runner.setProperty(ConsumeKafka_0_11.TOPIC_TYPE, "pattern");
-        runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, 
ConsumeKafka_0_11.OFFSET_EARLIEST);
-        runner.run(1, false);
-
-        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
-        verify(mockLease, times(3)).continuePolling();
-        verify(mockLease, times(2)).poll();
-        verify(mockLease, times(1)).commit();
-        verify(mockLease, times(1)).close();
-        verifyNoMoreInteractions(mockConsumerPool);
-        verifyNoMoreInteractions(mockLease);
-    }
-
-    @Test
-    public void validateGetErrorMessages() throws Exception {
-        String groupName = "validateGetErrorMessages";
-
-        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
-        when(mockLease.continuePolling()).thenReturn(true, false);
-        when(mockLease.commit()).thenReturn(Boolean.FALSE);
-
-        ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() {
-            @Override
-            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
-                return mockConsumerPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_0_11.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, 
ConsumeKafka_0_11.OFFSET_EARLIEST);
-        runner.run(1, false);
-
-        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
-        verify(mockLease, times(2)).continuePolling();
-        verify(mockLease, times(1)).poll();
-        verify(mockLease, times(1)).commit();
-        verify(mockLease, times(1)).close();
-        verifyNoMoreInteractions(mockConsumerPool);
-        verifyNoMoreInteractions(mockLease);
-    }
-
-    @Test
     public void testJaasConfiguration() throws Exception {
         ConsumeKafka_0_11 consumeKafka = new ConsumeKafka_0_11();
         TestRunner runner = TestRunners.newTestRunner(consumeKafka);

http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
new file mode 100644
index 0000000..9a3b3d9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ITConsumeKafka {
+
+    ConsumerLease mockLease = null;
+    ConsumerPool mockConsumerPool = null;
+
+    @Before
+    public void setup() {
+        mockLease = mock(ConsumerLease.class);
+        mockConsumerPool = mock(ConsumerPool.class);
+    }
+
+    @Test
+    public void validateGetAllMessages() throws Exception {
+        String groupName = "validateGetAllMessages";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, 
Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_0_11.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, 
ConsumeKafka_0_11.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void validateGetAllMessagesPattern() throws Exception {
+        String groupName = "validateGetAllMessagesPattern";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, 
Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_0_11.TOPICS, "(fo.*)|(ba)");
+        runner.setProperty(ConsumeKafka_0_11.TOPIC_TYPE, "pattern");
+        runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, 
ConsumeKafka_0_11.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void validateGetErrorMessages() throws Exception {
+        String groupName = "validateGetErrorMessages";
+
+        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(true, false);
+        when(mockLease.commit()).thenReturn(Boolean.FALSE);
+
+        ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_0_11.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, 
ConsumeKafka_0_11.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
+        verify(mockLease, times(2)).continuePolling();
+        verify(mockLease, times(1)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index c4b0140..9062e1e 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -20,24 +20,13 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.nifi.components.PropertyValue;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.junit.Assume.assumeFalse;
 import org.junit.Before;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
 
 public class ConsumeKafkaTest {
 
@@ -105,103 +94,4 @@ public class ConsumeKafkaTest {
             assertTrue(e.getMessage().contains("must contain at least one 
character that is not white space"));
         }
     }
-
-    @Test
-    public void validateGetAllMessages() throws Exception {
-        String groupName = "validateGetAllMessages";
-
-        
when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
-        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, 
Boolean.TRUE, Boolean.FALSE);
-        when(mockLease.commit()).thenReturn(Boolean.TRUE);
-
-        ConsumeKafka proc = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
-                return mockConsumerPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
-        runner.run(1, false);
-
-        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
-        verify(mockLease, times(3)).continuePolling();
-        verify(mockLease, times(2)).poll();
-        verify(mockLease, times(1)).commit();
-        verify(mockLease, times(1)).close();
-        verifyNoMoreInteractions(mockConsumerPool);
-        verifyNoMoreInteractions(mockLease);
-    }
-
-    @Test
-    public void validateGetErrorMessages() throws Exception {
-        String groupName = "validateGetErrorMessages";
-
-        
when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
-        when(mockLease.continuePolling()).thenReturn(true, false);
-        when(mockLease.commit()).thenReturn(Boolean.FALSE);
-
-        ConsumeKafka proc = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
-                return mockConsumerPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
-        runner.run(1, false);
-
-        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
-        verify(mockLease, times(2)).continuePolling();
-        verify(mockLease, times(1)).poll();
-        verify(mockLease, times(1)).commit();
-        verify(mockLease, times(1)).close();
-        verifyNoMoreInteractions(mockConsumerPool);
-        verifyNoMoreInteractions(mockLease);
-    }
-
-    private boolean isWindowsEnvironment() {
-        return 
System.getProperty("os.name").toLowerCase().startsWith("windows");
-    }
-
-    @Test
-    public void validateConsumerRetainer() throws Exception {
-        assumeFalse(isWindowsEnvironment());//skip if on windows
-        final ConsumerPool consumerPool = mock(ConsumerPool.class);
-
-        final ConsumeKafka processor = new ConsumeKafka() {
-            @Override
-            protected ConsumerPool createConsumerPool(ProcessContext context, 
ComponentLog log) {
-                return consumerPool;
-            }
-        };
-
-        final ComponentLog logger = mock(ComponentLog.class);
-        final ProcessorInitializationContext initializationContext = 
mock(ProcessorInitializationContext.class);
-        when(initializationContext.getLogger()).thenReturn(logger);
-        processor.initialize(initializationContext);
-
-        final ProcessContext processContext = mock(ProcessContext.class);
-        final PropertyValue heartbeatInternalMsConfig = 
mock(PropertyValue.class);
-        when(heartbeatInternalMsConfig.isSet()).thenReturn(true);
-        when(heartbeatInternalMsConfig.asInteger()).thenReturn(100);
-        
when(processContext.getProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).thenReturn(heartbeatInternalMsConfig);
-        processor.onScheduled(processContext);
-
-        // retainConsumers should be called at least 1 time if it passed 
longer than heartbeat interval milliseconds.
-        Thread.sleep(200);
-        verify(consumerPool, atLeast(1)).retainConsumers();
-
-        processor.stopConnectionRetainer();
-
-        // After stopping connection retainer, it shouldn't interact with 
consumerPool.
-        Thread.sleep(200);
-        verifyNoMoreInteractions(consumerPool);
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
new file mode 100644
index 0000000..084280a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+import static org.junit.Assume.assumeFalse;
+import org.junit.Before;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class ITConsumeKafka {
+
+    Consumer<byte[], byte[]> mockConsumer = null;
+    ConsumerLease mockLease = null;
+    ConsumerPool mockConsumerPool = null;
+
+    @Before
+    public void setup() {
+        mockConsumer = mock(Consumer.class);
+        mockLease = mock(ConsumerLease.class);
+        mockConsumerPool = mock(ConsumerPool.class);
+    }
+
+    @Test
+    public void validateGetAllMessages() throws Exception {
+        String groupName = "validateGetAllMessages";
+
+        
when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, 
Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        ConsumeKafka proc = new ConsumeKafka() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
+    public void validateGetErrorMessages() throws Exception {
+        String groupName = "validateGetErrorMessages";
+
+        
when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(true, false);
+        when(mockLease.commit()).thenReturn(Boolean.FALSE);
+
+        ConsumeKafka proc = new ConsumeKafka() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
+        runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, 
ConsumeKafka.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+        verify(mockLease, times(2)).continuePolling();
+        verify(mockLease, times(1)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    private boolean isWindowsEnvironment() {
+        return 
System.getProperty("os.name").toLowerCase().startsWith("windows");
+    }
+
+    @Test
+    public void validateConsumerRetainer() throws Exception {
+        assumeFalse(isWindowsEnvironment());//skip if on windows
+        final ConsumerPool consumerPool = mock(ConsumerPool.class);
+
+        final ConsumeKafka processor = new ConsumeKafka() {
+            @Override
+            protected ConsumerPool createConsumerPool(ProcessContext context, 
ComponentLog log) {
+                return consumerPool;
+            }
+        };
+
+        final ComponentLog logger = mock(ComponentLog.class);
+        final ProcessorInitializationContext initializationContext = 
mock(ProcessorInitializationContext.class);
+        when(initializationContext.getLogger()).thenReturn(logger);
+        processor.initialize(initializationContext);
+
+        final ProcessContext processContext = mock(ProcessContext.class);
+        final PropertyValue heartbeatInternalMsConfig = 
mock(PropertyValue.class);
+        when(heartbeatInternalMsConfig.isSet()).thenReturn(true);
+        when(heartbeatInternalMsConfig.asInteger()).thenReturn(100);
+        
when(processContext.getProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).thenReturn(heartbeatInternalMsConfig);
+        processor.onScheduled(processContext);
+
+        // retainConsumers should be called at least 1 time if it passed 
longer than heartbeat interval milliseconds.
+        Thread.sleep(200);
+        verify(consumerPool, atLeast(1)).retainConsumers();
+
+        processor.stopConnectionRetainer();
+
+        // After stopping connection retainer, it shouldn't interact with 
consumerPool.
+        Thread.sleep(200);
+        verifyNoMoreInteractions(consumerPool);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 7b5a8fc..10ac398 100644
--- 
a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ 
b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -18,17 +18,10 @@ package org.apache.nifi.processors.kafka.pubsub;
 
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Before;
@@ -100,97 +93,6 @@ public class ConsumeKafkaTest {
     }
 
     @Test
-    public void validateGetAllMessages() throws Exception {
-        String groupName = "validateGetAllMessages";
-
-        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
-        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, 
Boolean.TRUE, Boolean.FALSE);
-        when(mockLease.commit()).thenReturn(Boolean.TRUE);
-
-        ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() {
-            @Override
-            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
-                return mockConsumerPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, 
ConsumeKafka_1_0.OFFSET_EARLIEST);
-        runner.run(1, false);
-
-        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
-        verify(mockLease, times(3)).continuePolling();
-        verify(mockLease, times(2)).poll();
-        verify(mockLease, times(1)).commit();
-        verify(mockLease, times(1)).close();
-        verifyNoMoreInteractions(mockConsumerPool);
-        verifyNoMoreInteractions(mockLease);
-    }
-
-    @Test
-    public void validateGetAllMessagesPattern() throws Exception {
-        String groupName = "validateGetAllMessagesPattern";
-
-        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
-        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, 
Boolean.TRUE, Boolean.FALSE);
-        when(mockLease.commit()).thenReturn(Boolean.TRUE);
-
-        ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() {
-            @Override
-            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
-                return mockConsumerPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_1_0.TOPICS, "(fo.*)|(ba)");
-        runner.setProperty(ConsumeKafka_1_0.TOPIC_TYPE, "pattern");
-        runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, 
ConsumeKafka_1_0.OFFSET_EARLIEST);
-        runner.run(1, false);
-
-        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
-        verify(mockLease, times(3)).continuePolling();
-        verify(mockLease, times(2)).poll();
-        verify(mockLease, times(1)).commit();
-        verify(mockLease, times(1)).close();
-        verifyNoMoreInteractions(mockConsumerPool);
-        verifyNoMoreInteractions(mockLease);
-    }
-
-    @Test
-    public void validateGetErrorMessages() throws Exception {
-        String groupName = "validateGetErrorMessages";
-
-        when(mockConsumerPool.obtainConsumer(anyObject(), 
anyObject())).thenReturn(mockLease);
-        when(mockLease.continuePolling()).thenReturn(true, false);
-        when(mockLease.commit()).thenReturn(Boolean.FALSE);
-
-        ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() {
-            @Override
-            protected ConsumerPool createConsumerPool(final ProcessContext 
context, final ComponentLog log) {
-                return mockConsumerPool;
-            }
-        };
-        final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, 
"0.0.0.0:1234");
-        runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar");
-        runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName);
-        runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, 
ConsumeKafka_1_0.OFFSET_EARLIEST);
-        runner.run(1, false);
-
-        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), 
anyObject());
-        verify(mockLease, times(2)).continuePolling();
-        verify(mockLease, times(1)).poll();
-        verify(mockLease, times(1)).commit();
-        verify(mockLease, times(1)).close();
-        verifyNoMoreInteractions(mockConsumerPool);
-        verifyNoMoreInteractions(mockLease);
-    }
-
-    @Test
     public void testJaasConfiguration() throws Exception {
         ConsumeKafka_1_0 consumeKafka = new ConsumeKafka_1_0();
         TestRunner runner = TestRunners.newTestRunner(consumeKafka);

Reply via email to