NIFI-4664, NIFI-4662, NIFI-4660, NIFI-4659 moved tests which are timing/threading/network dependent and brittle to integration tests and un-ignored tests that are IT. Updated travis to reduce impact on infra and appveyor now skips test runs so is just to prove build works on windows. This closes #2319
squash Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/cdc1facf Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/cdc1facf Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/cdc1facf Branch: refs/heads/master Commit: cdc1facf398e6d99cc746b70350ff754e2a975ad Parents: c730f80 Author: joewitt <[email protected]> Authored: Tue Dec 5 14:44:07 2017 -0500 Committer: Matt Gilman <[email protected]> Committed: Wed Dec 6 10:53:09 2017 -0500 ---------------------------------------------------------------------- .travis.yml | 4 +- appveyor.yml | 4 +- .../util/list/ITAbstractListProcessor.java | 426 +++++++++++++++++++ .../util/list/TestAbstractListProcessor.java | 353 +-------------- .../kafka/pubsub/ConsumeKafkaTest.java | 98 ----- .../processors/kafka/pubsub/ITConsumeKafka.java | 135 ++++++ .../kafka/pubsub/ConsumeKafkaTest.java | 98 ----- .../processors/kafka/pubsub/ITConsumeKafka.java | 135 ++++++ .../kafka/pubsub/ConsumeKafkaTest.java | 110 ----- .../processors/kafka/pubsub/ITConsumeKafka.java | 150 +++++++ .../kafka/pubsub/ConsumeKafkaTest.java | 98 ----- .../processors/kafka/pubsub/ITConsumeKafka.java | 135 ++++++ .../ITLumberjackSocketChannelHandler.java | 207 +++++++++ .../TestLumberjackSocketChannelHandler.java | 207 --------- .../standard/ITListenSyslogGroovy.groovy | 111 +++++ .../standard/ListenSyslogGroovyTest.groovy | 111 ----- .../processors/standard/ITListenSyslog.java | 402 +++++++++++++++++ .../processors/standard/TestListenSyslog.java | 356 +--------------- pom.xml | 4 +- 19 files changed, 1717 insertions(+), 1427 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/.travis.yml ---------------------------------------------------------------------- diff --git a/.travis.yml b/.travis.yml index d7f0f03..e5b0ccb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,8 +19,6 @@ env: - USER_LANGUAGE=en USER_REGION=US' - USER_LANGUAGE=fr USER_REGION=FR' - USER_LANGUAGE=ja USER_REGION=JP' - - USER_LANGUAGE=pt USER_REGION=BR' - - USER_LANGUAGE=default USER_REGION=default os: - linux @@ -54,4 +52,4 @@ install: # Note: The reason the sed is done as part of script is to ensure the pom hack # won't affect the 'clean install' above - bash .travis.sh - - mvn -T 2C -Pcontrib-check -Ddir-only clean install \ No newline at end of file + - mvn -T 2C -Pcontrib-check -Ddir-only clean install http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/appveyor.yml ---------------------------------------------------------------------- diff --git a/appveyor.yml b/appveyor.yml index c7aa2f2..3e31c19 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -32,9 +32,7 @@ install: - cmd: SET MAVEN_OPTS=-XX:MaxPermSize=2g -Xmx4g - cmd: SET JAVA_OPTS=-XX:MaxPermSize=2g -Xmx4g build_script: - - mvn -q clean package --batch-mode -DskipTests -test_script: - - mvn -q clean install --batch-mode -Pcontrib-check + - mvn clean package --batch-mode -DskipTests -Ddir-only cache: - C:\maven\ - C:\Users\appveyor\.m2 http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java new file mode 100644 index 0000000..dcf47c6 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/ITAbstractListProcessor.java @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.list; + +import org.apache.nifi.components.state.Scope; +import org.apache.nifi.components.state.StateMap; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestWatcher; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MILLIS; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MINUTES; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_SECONDS; +import static org.apache.nifi.processor.util.list.AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION; +import org.apache.nifi.processor.util.list.TestAbstractListProcessor.ConcreteListProcessor; +import org.apache.nifi.processor.util.list.TestAbstractListProcessor.DistributedCache; +import static org.junit.Assert.assertEquals; + +public class ITAbstractListProcessor { + + /** + * @return current timestamp in milliseconds, but truncated at specified + * target precision (e.g. SECONDS or MINUTES). + */ + private static long getCurrentTimestampMillis(final TimeUnit targetPrecision) { + final long timestampInTargetPrecision = targetPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); + return TimeUnit.MILLISECONDS.convert(timestampInTargetPrecision, targetPrecision); + } + + private static long getSleepMillis(final TimeUnit targetPrecision) { + return AbstractListProcessor.LISTING_LAG_MILLIS.get(targetPrecision) * 2; + } + + private static final long DEFAULT_SLEEP_MILLIS = getSleepMillis(TimeUnit.MILLISECONDS); + + private ConcreteListProcessor proc; + private TestRunner runner; + + @Rule + public TestWatcher dumpState = new ListProcessorTestWatcher( + () -> { + try { + return runner.getStateManager().getState(Scope.LOCAL).toMap(); + } catch (IOException e) { + throw new RuntimeException("Failed to retrieve state", e); + } + }, + () -> proc.entities, + () -> runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m -> (FlowFile) m).collect(Collectors.toList()) + ); + + @Before + public void setup() { + proc = new ConcreteListProcessor(); + runner = TestRunners.newTestRunner(proc); + } + + @Rule + public final TemporaryFolder testFolder = new TemporaryFolder(); + + /** + * <p> + * Ensures that files are listed when those are old enough: + * <li>Files with last modified timestamp those are old enough to determine + * that those are completely written and no further files are expected to be + * added with the same timestamp.</li> + * <li>This behavior is expected when a processor is scheduled less + * frequently, such as hourly or daily.</li> + * </p> + */ + @Test + public void testAllExistingEntriesEmittedOnFirstIteration() throws Exception { + final long oldTimestamp = System.currentTimeMillis() - getSleepMillis(TimeUnit.MILLISECONDS); + + // These entries have existed before the processor runs at the first time. + proc.addEntity("name", "id", oldTimestamp); + proc.addEntity("name", "id2", oldTimestamp); + + // First run, the above listed entries should be emitted since it has existed. + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + runner.clearTransferState(); + + // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again + Thread.sleep(DEFAULT_SLEEP_MILLIS); + + // Run again without introducing any new entries + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + } + + private void testPreviouslySkippedEntriesEmmitedOnNextIteration(final TimeUnit targetPrecision) throws InterruptedException { + runner.run(); + + final long initialTimestamp = getCurrentTimestampMillis(targetPrecision); + + setTargetSystemTimestampPrecision(targetPrecision); + + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + proc.addEntity("name", "id", initialTimestamp); + proc.addEntity("name", "id2", initialTimestamp); + runner.run(); + + // First run, the above listed entries would be skipped to avoid write synchronization issues + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again + Thread.sleep(getSleepMillis(targetPrecision)); + + // Run again without introducing any new entries + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + } + + /** + * <p> + * Ensures that newly created files should wait to confirm there is no more + * files created with the same timestamp: + * <li>If files have the latest modified timestamp at an iteration, then + * those should be postponed to be listed</li> + * <li>If those files still are the latest files at the next iteration, then + * those should be listed</li> + * </p> + */ + @Test + public void testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision() throws Exception { + testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MILLISECONDS); + } + + /** + * Same as + * {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} + * but simulates that the target filesystem only provide timestamp precision + * in Seconds. + */ + @Test + public void testPreviouslySkippedEntriesEmittedOnNextIterationSecondPrecision() throws Exception { + testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.SECONDS); + } + + /** + * Same as + * {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} + * but simulates that the target filesystem only provide timestamp precision + * in Minutes. + */ + @Test + public void testPreviouslySkippedEntriesEmittedOnNextIterationMinutesPrecision() throws Exception { + testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MINUTES); + } + + private void testOnlyNewEntriesEmitted(final TimeUnit targetPrecision) throws InterruptedException { + + final long initialTimestamp = getCurrentTimestampMillis(targetPrecision); + + setTargetSystemTimestampPrecision(targetPrecision); + + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + proc.addEntity("name", "id", initialTimestamp); + proc.addEntity("name", "id2", initialTimestamp); + runner.run(); + + // First run, the above listed entries would be skipped to avoid write synchronization issues + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again + Thread.sleep(getSleepMillis(targetPrecision)); + + // Running again, our two previously seen files are now cleared to be released + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + runner.clearTransferState(); + + // Verify no new old files show up + proc.addEntity("name", "id2", initialTimestamp); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // An entry that is older than already processed entry should not be listed. + proc.addEntity("name", "id3", initialTimestamp - targetPrecision.toMillis(1)); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // If an entry whose timestamp is the same with the last processed timestamp should not be listed. + proc.addEntity("name", "id2", initialTimestamp); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Now a new file beyond the current time enters + proc.addEntity("name", "id2", initialTimestamp + targetPrecision.toMillis(1)); + + // It should show up + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); + runner.clearTransferState(); + } + + private void setTargetSystemTimestampPrecision(TimeUnit targetPrecision) { + switch (targetPrecision) { + case MINUTES: + runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_MINUTES); + break; + case SECONDS: + runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_SECONDS); + break; + case MILLISECONDS: + runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_MILLIS); + break; + } + } + + @Test + public void testOnlyNewEntriesEmittedMillisPrecision() throws Exception { + testOnlyNewEntriesEmitted(TimeUnit.MILLISECONDS); + } + + @Test + public void testOnlyNewEntriesEmittedSecondPrecision() throws Exception { + testOnlyNewEntriesEmitted(TimeUnit.SECONDS); + } + + @Test + public void testOnlyNewEntriesEmittedMinutesPrecision() throws Exception { + testOnlyNewEntriesEmitted(TimeUnit.MINUTES); + } + + @Test + public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() throws Exception { + + final long initialTimestamp = System.currentTimeMillis(); + + proc.addEntity("name", "id", initialTimestamp); + proc.addEntity("name", "id2", initialTimestamp); + + // Emulate having state but not having had the processor run such as in a restart + final Map<String, String> preexistingState = new HashMap<>(); + preexistingState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp)); + preexistingState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp)); + preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id"); + preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id2"); + runner.getStateManager().setState(preexistingState, Scope.CLUSTER); + + // run for the first time + runner.run(); + + // First run, the above listed entries would be skipped + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again + Thread.sleep(DEFAULT_SLEEP_MILLIS); + + // Running again, these files should be eligible for transfer and again skipped + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Verify no new old files show up + proc.addEntity("name", "id2", initialTimestamp); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + proc.addEntity("name", "id3", initialTimestamp - 1); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + proc.addEntity("name", "id2", initialTimestamp); + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Now a new file beyond the current time enters + proc.addEntity("name", "id2", initialTimestamp + 1); + + // It should now show up + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); + runner.clearTransferState(); + } + + @Test + public void testStateStoredInClusterStateManagement() throws Exception { + + final DistributedCache cache = new DistributedCache(); + runner.addControllerService("cache", cache); + runner.enableControllerService(cache); + runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache"); + + final long initialTimestamp = System.currentTimeMillis(); + + proc.addEntity("name", "id", initialTimestamp); + runner.run(); + + final Map<String, String> expectedState = new HashMap<>(); + // Ensure only timestamp is migrated + expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); + expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "0"); + runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); + + Thread.sleep(DEFAULT_SLEEP_MILLIS); + + runner.run(); + // Ensure only timestamp is migrated + expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); + expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); + expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id"); + runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); + } + + @Test + public void testResumeListingAfterClearingState() throws Exception { + + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + + final long initialEventTimestamp = System.currentTimeMillis(); + proc.addEntity("name", "id", initialEventTimestamp); + proc.addEntity("name", "id2", initialEventTimestamp); + + // Add entities but these should not be transferred as they are the latest values + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + + // after providing a pause in listings, the files should now transfer + Thread.sleep(DEFAULT_SLEEP_MILLIS); + + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + runner.clearTransferState(); + + // Verify entities are not transferred again for the given state + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + // Clear state for this processor, eradicating timestamp + runner.getStateManager().clear(Scope.CLUSTER); + Assert.assertEquals("State is not empty for this component after clearing", 0, runner.getStateManager().getState(Scope.CLUSTER).toMap().size()); + + // Ensure the original files are now transferred again. + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + runner.clearTransferState(); + } + + @Test + public void testOnlyNewStateStored() throws Exception { + + runner.run(); + + final long initialTimestamp = System.currentTimeMillis(); + + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + proc.addEntity("name", "id", initialTimestamp); + proc.addEntity("name", "id2", initialTimestamp); + + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); + runner.clearTransferState(); + + Thread.sleep(DEFAULT_SLEEP_MILLIS); + + runner.run(); + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); + runner.clearTransferState(); + + final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); + assertEquals(2, stateMap.getVersion()); + + final Map<String, String> map = stateMap.toMap(); + // Ensure timestamp and identifiers are migrated + assertEquals(4, map.size()); + assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY)); + assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY)); + assertEquals("id", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0")); + assertEquals("id2", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".1")); + + proc.addEntity("new name", "new id", initialTimestamp + 1); + runner.run(); + + runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); + runner.clearTransferState(); + + StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER); + assertEquals(3, updatedStateMap.getVersion()); + + assertEquals(3, updatedStateMap.toMap().size()); + assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY)); + // Processed timestamp is now caught up + assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY)); + assertEquals("new id", updatedStateMap.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0")); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java index 4a376e2..f5eae46 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java @@ -19,7 +19,6 @@ package org.apache.nifi.processor.util.list; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.state.Scope; -import org.apache.nifi.components.state.StateMap; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; @@ -33,7 +32,6 @@ import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -54,10 +52,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; -import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MILLIS; -import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_MINUTES; -import static org.apache.nifi.processor.util.list.AbstractListProcessor.PRECISION_SECONDS; -import static org.apache.nifi.processor.util.list.AbstractListProcessor.TARGET_SYSTEM_TIMESTAMP_PRECISION; import static org.junit.Assert.assertEquals; public class TestAbstractListProcessor { @@ -101,257 +95,6 @@ public class TestAbstractListProcessor { @Rule public final TemporaryFolder testFolder = new TemporaryFolder(); - /** - * <p>Ensures that files are listed when those are old enough: - * <li>Files with last modified timestamp those are old enough to determine that those are completely written - * and no further files are expected to be added with the same timestamp.</li> - * <li>This behavior is expected when a processor is scheduled less frequently, such as hourly or daily.</li> - * </p> - */ - @Test - public void testAllExistingEntriesEmittedOnFirstIteration() throws Exception { - final long oldTimestamp = System.currentTimeMillis() - getSleepMillis(TimeUnit.MILLISECONDS); - - // These entries have existed before the processor runs at the first time. - proc.addEntity("name", "id", oldTimestamp); - proc.addEntity("name", "id2", oldTimestamp); - - // First run, the above listed entries should be emitted since it has existed. - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); - runner.clearTransferState(); - - // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - // Run again without introducing any new entries - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - } - - private void testPreviouslySkippedEntriesEmmitedOnNextIteration(final TimeUnit targetPrecision) throws InterruptedException { - runner.run(); - - final long initialTimestamp = getCurrentTimestampMillis(targetPrecision); - - setTargetSystemTimestampPrecision(targetPrecision); - - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - proc.addEntity("name", "id", initialTimestamp); - proc.addEntity("name", "id2", initialTimestamp); - runner.run(); - - // First run, the above listed entries would be skipped to avoid write synchronization issues - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again - Thread.sleep(getSleepMillis(targetPrecision)); - - // Run again without introducing any new entries - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); - } - - /** - * <p>Ensures that newly created files should wait to confirm there is no more files created with the same timestamp: - * <li>If files have the latest modified timestamp at an iteration, then those should be postponed to be listed</li> - * <li>If those files still are the latest files at the next iteration, then those should be listed</li> - * </p> - */ - @Test - public void testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision() throws Exception { - testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MILLISECONDS); - } - - /** - * Same as {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} but simulates that the target - * filesystem only provide timestamp precision in Seconds. - */ - @Test - public void testPreviouslySkippedEntriesEmittedOnNextIterationSecondPrecision() throws Exception { - testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.SECONDS); - } - - /** - * Same as {@link #testPreviouslySkippedEntriesEmittedOnNextIterationMilliPrecision()} but simulates that the target - * filesystem only provide timestamp precision in Minutes. - * This test is ignored because it needs to wait two minutes. Not good for automated unit testing, but still valuable when executed manually. - */ - @Ignore - @Test - public void testPreviouslySkippedEntriesEmittedOnNextIterationMinutesPrecision() throws Exception { - testPreviouslySkippedEntriesEmmitedOnNextIteration(TimeUnit.MINUTES); - } - - private void testOnlyNewEntriesEmitted(final TimeUnit targetPrecision) throws InterruptedException { - - final long initialTimestamp = getCurrentTimestampMillis(targetPrecision); - - setTargetSystemTimestampPrecision(targetPrecision); - - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - proc.addEntity("name", "id", initialTimestamp); - proc.addEntity("name", "id2", initialTimestamp); - runner.run(); - - // First run, the above listed entries would be skipped to avoid write synchronization issues - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again - Thread.sleep(getSleepMillis(targetPrecision)); - - // Running again, our two previously seen files are now cleared to be released - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); - runner.clearTransferState(); - - // Verify no new old files show up - proc.addEntity("name", "id2", initialTimestamp); - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // An entry that is older than already processed entry should not be listed. - proc.addEntity("name", "id3", initialTimestamp - targetPrecision.toMillis(1)); - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // If an entry whose timestamp is the same with the last processed timestamp should not be listed. - proc.addEntity("name", "id2", initialTimestamp); - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // Now a new file beyond the current time enters - proc.addEntity("name", "id2", initialTimestamp + targetPrecision.toMillis(1)); - - // It should show up - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); - runner.clearTransferState(); - } - - private void setTargetSystemTimestampPrecision(TimeUnit targetPrecision) { - switch (targetPrecision) { - case MINUTES: - runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_MINUTES); - break; - case SECONDS: - runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_SECONDS); - break; - case MILLISECONDS: - runner.setProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION, PRECISION_MILLIS); - break; - } - } - - @Test - public void testOnlyNewEntriesEmittedMillisPrecision() throws Exception { - testOnlyNewEntriesEmitted(TimeUnit.MILLISECONDS); - } - - @Test - public void testOnlyNewEntriesEmittedSecondPrecision() throws Exception { - testOnlyNewEntriesEmitted(TimeUnit.SECONDS); - } - - /** - * This test is ignored because it needs to wait two minutes. Not good for automated unit testing, but still valuable when executed manually. - */ - @Ignore - @Test - public void testOnlyNewEntriesEmittedMinutesPrecision() throws Exception { - testOnlyNewEntriesEmitted(TimeUnit.MINUTES); - } - - @Test - public void testHandleRestartWithEntriesAlreadyTransferredAndNoneNew() throws Exception { - - final long initialTimestamp = System.currentTimeMillis(); - - proc.addEntity("name", "id", initialTimestamp); - proc.addEntity("name", "id2", initialTimestamp); - - // Emulate having state but not having had the processor run such as in a restart - final Map<String, String> preexistingState = new HashMap<>(); - preexistingState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp)); - preexistingState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, Long.toString(initialTimestamp)); - preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id"); - preexistingState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id2"); - runner.getStateManager().setState(preexistingState, Scope.CLUSTER); - - // run for the first time - runner.run(); - - // First run, the above listed entries would be skipped - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // Ensure we have covered the necessary lag period to avoid issues where the processor was immediately scheduled to run again - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - // Running again, these files should be eligible for transfer and again skipped - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // Verify no new old files show up - proc.addEntity("name", "id2", initialTimestamp); - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - proc.addEntity("name", "id3", initialTimestamp - 1); - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - proc.addEntity("name", "id2", initialTimestamp); - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // Now a new file beyond the current time enters - proc.addEntity("name", "id2", initialTimestamp + 1); - - // It should now show up - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); - runner.clearTransferState(); - } - - @Test - public void testStateStoredInClusterStateManagement() throws Exception { - - final DistributedCache cache = new DistributedCache(); - runner.addControllerService("cache", cache); - runner.enableControllerService(cache); - runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache"); - - final long initialTimestamp = System.currentTimeMillis(); - - proc.addEntity("name", "id", initialTimestamp); - runner.run(); - - final Map<String, String> expectedState = new HashMap<>(); - // Ensure only timestamp is migrated - expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); - expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "0"); - runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); - // Ensure only timestamp is migrated - expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); - expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(initialTimestamp)); - expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id"); - runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); - } - @Test public void testStateMigratedFromCacheService() throws InitializationException { @@ -416,41 +159,6 @@ public class TestAbstractListProcessor { } @Test - public void testResumeListingAfterClearingState() throws Exception { - - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - - final long initialEventTimestamp = System.currentTimeMillis(); - proc.addEntity("name", "id", initialEventTimestamp); - proc.addEntity("name", "id2", initialEventTimestamp); - - // Add entities but these should not be transferred as they are the latest values - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - - // after providing a pause in listings, the files should now transfer - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); - runner.clearTransferState(); - - // Verify entities are not transferred again for the given state - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - // Clear state for this processor, eradicating timestamp - runner.getStateManager().clear(Scope.CLUSTER); - Assert.assertEquals("State is not empty for this component after clearing", 0, runner.getStateManager().getState(Scope.CLUSTER).toMap().size()); - - // Ensure the original files are now transferred again. - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); - runner.clearTransferState(); - } - - @Test public void testFetchOnStart() throws InitializationException { final DistributedCache cache = new DistributedCache(); @@ -463,55 +171,7 @@ public class TestAbstractListProcessor { assertEquals(1, cache.fetchCount); } - @Test - public void testOnlyNewStateStored() throws Exception { - - runner.run(); - - final long initialTimestamp = System.currentTimeMillis(); - - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - proc.addEntity("name", "id", initialTimestamp); - proc.addEntity("name", "id2", initialTimestamp); - - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 0); - runner.clearTransferState(); - - Thread.sleep(DEFAULT_SLEEP_MILLIS); - - runner.run(); - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 2); - runner.clearTransferState(); - - final StateMap stateMap = runner.getStateManager().getState(Scope.CLUSTER); - assertEquals(2, stateMap.getVersion()); - - final Map<String, String> map = stateMap.toMap(); - // Ensure timestamp and identifiers are migrated - assertEquals(4, map.size()); - assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY)); - assertEquals(Long.toString(initialTimestamp), map.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY)); - assertEquals("id", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0")); - assertEquals("id2", map.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".1")); - - proc.addEntity("new name", "new id", initialTimestamp + 1); - runner.run(); - - runner.assertAllFlowFilesTransferred(ConcreteListProcessor.REL_SUCCESS, 1); - runner.clearTransferState(); - - StateMap updatedStateMap = runner.getStateManager().getState(Scope.CLUSTER); - assertEquals(3, updatedStateMap.getVersion()); - - assertEquals(3, updatedStateMap.toMap().size()); - assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY)); - // Processed timestamp is now caught up - assertEquals(Long.toString(initialTimestamp + 1), updatedStateMap.get(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY)); - assertEquals("new id", updatedStateMap.get(AbstractListProcessor.IDENTIFIER_PREFIX + ".0")); - } - - private static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient { + static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient { private final Map<Object, Object> stored = new HashMap<>(); private int fetchCount = 0; @@ -569,13 +229,12 @@ public class TestAbstractListProcessor { } } + static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> { + final List<ListableEntity> entities = new ArrayList<>(); - private static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> { - private final List<ListableEntity> entities = new ArrayList<>(); - - public final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json"; - public String persistenceFolder = "target/"; - public File persistenceFile = new File(persistenceFolder + persistenceFilename); + final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json"; + String persistenceFolder = "target/"; + File persistenceFile = new File(persistenceFolder + persistenceFilename); @Override public File getPersistenceFile() { http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index bd93e3b..3496ea0 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -18,17 +18,10 @@ package org.apache.nifi.processors.kafka.pubsub; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Before; @@ -100,97 +93,6 @@ public class ConsumeKafkaTest { } @Test - public void validateGetAllMessages() throws Exception { - String groupName = "validateGetAllMessages"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); - when(mockLease.commit()).thenReturn(Boolean.TRUE); - - ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(3)).continuePolling(); - verify(mockLease, times(2)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - @Test - public void validateGetAllMessagesPattern() throws Exception { - String groupName = "validateGetAllMessagesPattern"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); - when(mockLease.commit()).thenReturn(Boolean.TRUE); - - ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_0_10.TOPICS, "(fo.*)|(ba)"); - runner.setProperty(ConsumeKafka_0_10.TOPIC_TYPE, "pattern"); - runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(3)).continuePolling(); - verify(mockLease, times(2)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - @Test - public void validateGetErrorMessages() throws Exception { - String groupName = "validateGetErrorMessages"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(true, false); - when(mockLease.commit()).thenReturn(Boolean.FALSE); - - ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(2)).continuePolling(); - verify(mockLease, times(1)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - @Test public void testJaasConfiguration() throws Exception { ConsumeKafka_0_10 consumeKafka = new ConsumeKafka_0_10(); TestRunner runner = TestRunners.newTestRunner(consumeKafka); http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java new file mode 100644 index 0000000..b0c6eba --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class ITConsumeKafka { + + ConsumerLease mockLease = null; + ConsumerPool mockConsumerPool = null; + + @Before + public void setup() { + mockLease = mock(ConsumerLease.class); + mockConsumerPool = mock(ConsumerPool.class); + } + + @Test + public void validateGetAllMessages() throws Exception { + String groupName = "validateGetAllMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetAllMessagesPattern() throws Exception { + String groupName = "validateGetAllMessagesPattern"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_10.TOPICS, "(fo.*)|(ba)"); + runner.setProperty(ConsumeKafka_0_10.TOPIC_TYPE, "pattern"); + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetErrorMessages() throws Exception { + String groupName = "validateGetErrorMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(true, false); + when(mockLease.commit()).thenReturn(Boolean.FALSE); + + ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(2)).continuePolling(); + verify(mockLease, times(1)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index e460879..b1edd1f 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -18,17 +18,10 @@ package org.apache.nifi.processors.kafka.pubsub; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Before; @@ -100,97 +93,6 @@ public class ConsumeKafkaTest { } @Test - public void validateGetAllMessages() throws Exception { - String groupName = "validateGetAllMessages"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); - when(mockLease.commit()).thenReturn(Boolean.TRUE); - - ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_0_11.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, ConsumeKafka_0_11.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(3)).continuePolling(); - verify(mockLease, times(2)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - @Test - public void validateGetAllMessagesPattern() throws Exception { - String groupName = "validateGetAllMessagesPattern"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); - when(mockLease.commit()).thenReturn(Boolean.TRUE); - - ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_0_11.TOPICS, "(fo.*)|(ba)"); - runner.setProperty(ConsumeKafka_0_11.TOPIC_TYPE, "pattern"); - runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, ConsumeKafka_0_11.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(3)).continuePolling(); - verify(mockLease, times(2)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - @Test - public void validateGetErrorMessages() throws Exception { - String groupName = "validateGetErrorMessages"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(true, false); - when(mockLease.commit()).thenReturn(Boolean.FALSE); - - ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_0_11.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, ConsumeKafka_0_11.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(2)).continuePolling(); - verify(mockLease, times(1)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - @Test public void testJaasConfiguration() throws Exception { ConsumeKafka_0_11 consumeKafka = new ConsumeKafka_0_11(); TestRunner runner = TestRunners.newTestRunner(consumeKafka); http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java new file mode 100644 index 0000000..9a3b3d9 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class ITConsumeKafka { + + ConsumerLease mockLease = null; + ConsumerPool mockConsumerPool = null; + + @Before + public void setup() { + mockLease = mock(ConsumerLease.class); + mockConsumerPool = mock(ConsumerPool.class); + } + + @Test + public void validateGetAllMessages() throws Exception { + String groupName = "validateGetAllMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_11.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, ConsumeKafka_0_11.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetAllMessagesPattern() throws Exception { + String groupName = "validateGetAllMessagesPattern"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_11.TOPICS, "(fo.*)|(ba)"); + runner.setProperty(ConsumeKafka_0_11.TOPIC_TYPE, "pattern"); + runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, ConsumeKafka_0_11.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetErrorMessages() throws Exception { + String groupName = "validateGetErrorMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(true, false); + when(mockLease.commit()).thenReturn(Boolean.FALSE); + + ConsumeKafka_0_11 proc = new ConsumeKafka_0_11() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka_0_11.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka_0_11.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka_0_11.AUTO_OFFSET_RESET, ConsumeKafka_0_11.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); + verify(mockLease, times(2)).continuePolling(); + verify(mockLease, times(1)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index c4b0140..9062e1e 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -20,24 +20,13 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.nifi.components.PropertyValue; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.junit.Assume.assumeFalse; import org.junit.Before; -import static org.mockito.Matchers.anyObject; -import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; public class ConsumeKafkaTest { @@ -105,103 +94,4 @@ public class ConsumeKafkaTest { assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); } } - - @Test - public void validateGetAllMessages() throws Exception { - String groupName = "validateGetAllMessages"; - - when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); - when(mockLease.commit()).thenReturn(Boolean.TRUE); - - ConsumeKafka proc = new ConsumeKafka() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject()); - verify(mockLease, times(3)).continuePolling(); - verify(mockLease, times(2)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - @Test - public void validateGetErrorMessages() throws Exception { - String groupName = "validateGetErrorMessages"; - - when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(true, false); - when(mockLease.commit()).thenReturn(Boolean.FALSE); - - ConsumeKafka proc = new ConsumeKafka() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject()); - verify(mockLease, times(2)).continuePolling(); - verify(mockLease, times(1)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - private boolean isWindowsEnvironment() { - return System.getProperty("os.name").toLowerCase().startsWith("windows"); - } - - @Test - public void validateConsumerRetainer() throws Exception { - assumeFalse(isWindowsEnvironment());//skip if on windows - final ConsumerPool consumerPool = mock(ConsumerPool.class); - - final ConsumeKafka processor = new ConsumeKafka() { - @Override - protected ConsumerPool createConsumerPool(ProcessContext context, ComponentLog log) { - return consumerPool; - } - }; - - final ComponentLog logger = mock(ComponentLog.class); - final ProcessorInitializationContext initializationContext = mock(ProcessorInitializationContext.class); - when(initializationContext.getLogger()).thenReturn(logger); - processor.initialize(initializationContext); - - final ProcessContext processContext = mock(ProcessContext.class); - final PropertyValue heartbeatInternalMsConfig = mock(PropertyValue.class); - when(heartbeatInternalMsConfig.isSet()).thenReturn(true); - when(heartbeatInternalMsConfig.asInteger()).thenReturn(100); - when(processContext.getProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).thenReturn(heartbeatInternalMsConfig); - processor.onScheduled(processContext); - - // retainConsumers should be called at least 1 time if it passed longer than heartbeat interval milliseconds. - Thread.sleep(200); - verify(consumerPool, atLeast(1)).retainConsumers(); - - processor.stopConnectionRetainer(); - - // After stopping connection retainer, it shouldn't interact with consumerPool. - Thread.sleep(200); - verifyNoMoreInteractions(consumerPool); - } } http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java new file mode 100644 index 0000000..084280a --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; + +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; +import static org.junit.Assume.assumeFalse; +import org.junit.Before; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class ITConsumeKafka { + + Consumer<byte[], byte[]> mockConsumer = null; + ConsumerLease mockLease = null; + ConsumerPool mockConsumerPool = null; + + @Before + public void setup() { + mockConsumer = mock(Consumer.class); + mockLease = mock(ConsumerLease.class); + mockConsumerPool = mock(ConsumerPool.class); + } + + @Test + public void validateGetAllMessages() throws Exception { + String groupName = "validateGetAllMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); + when(mockLease.commit()).thenReturn(Boolean.TRUE); + + ConsumeKafka proc = new ConsumeKafka() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject()); + verify(mockLease, times(3)).continuePolling(); + verify(mockLease, times(2)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + @Test + public void validateGetErrorMessages() throws Exception { + String groupName = "validateGetErrorMessages"; + + when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease); + when(mockLease.continuePolling()).thenReturn(true, false); + when(mockLease.commit()).thenReturn(Boolean.FALSE); + + ConsumeKafka proc = new ConsumeKafka() { + @Override + protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { + return mockConsumerPool; + } + }; + final TestRunner runner = TestRunners.newTestRunner(proc); + runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); + runner.setProperty(ConsumeKafka.TOPICS, "foo,bar"); + runner.setProperty(ConsumeKafka.GROUP_ID, groupName); + runner.setProperty(ConsumeKafka.AUTO_OFFSET_RESET, ConsumeKafka.OFFSET_EARLIEST); + runner.run(1, false); + + verify(mockConsumerPool, times(1)).obtainConsumer(anyObject()); + verify(mockLease, times(2)).continuePolling(); + verify(mockLease, times(1)).poll(); + verify(mockLease, times(1)).commit(); + verify(mockLease, times(1)).close(); + verifyNoMoreInteractions(mockConsumerPool); + verifyNoMoreInteractions(mockLease); + } + + private boolean isWindowsEnvironment() { + return System.getProperty("os.name").toLowerCase().startsWith("windows"); + } + + @Test + public void validateConsumerRetainer() throws Exception { + assumeFalse(isWindowsEnvironment());//skip if on windows + final ConsumerPool consumerPool = mock(ConsumerPool.class); + + final ConsumeKafka processor = new ConsumeKafka() { + @Override + protected ConsumerPool createConsumerPool(ProcessContext context, ComponentLog log) { + return consumerPool; + } + }; + + final ComponentLog logger = mock(ComponentLog.class); + final ProcessorInitializationContext initializationContext = mock(ProcessorInitializationContext.class); + when(initializationContext.getLogger()).thenReturn(logger); + processor.initialize(initializationContext); + + final ProcessContext processContext = mock(ProcessContext.class); + final PropertyValue heartbeatInternalMsConfig = mock(PropertyValue.class); + when(heartbeatInternalMsConfig.isSet()).thenReturn(true); + when(heartbeatInternalMsConfig.asInteger()).thenReturn(100); + when(processContext.getProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).thenReturn(heartbeatInternalMsConfig); + processor.onScheduled(processContext); + + // retainConsumers should be called at least 1 time if it passed longer than heartbeat interval milliseconds. + Thread.sleep(200); + verify(consumerPool, atLeast(1)).retainConsumers(); + + processor.stopConnectionRetainer(); + + // After stopping connection retainer, it shouldn't interact with consumerPool. + Thread.sleep(200); + verifyNoMoreInteractions(consumerPool); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/cdc1facf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java index 7b5a8fc..10ac398 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java @@ -18,17 +18,10 @@ package org.apache.nifi.processors.kafka.pubsub; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Before; @@ -100,97 +93,6 @@ public class ConsumeKafkaTest { } @Test - public void validateGetAllMessages() throws Exception { - String groupName = "validateGetAllMessages"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); - when(mockLease.commit()).thenReturn(Boolean.TRUE); - - ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(3)).continuePolling(); - verify(mockLease, times(2)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - @Test - public void validateGetAllMessagesPattern() throws Exception { - String groupName = "validateGetAllMessagesPattern"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE); - when(mockLease.commit()).thenReturn(Boolean.TRUE); - - ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_1_0.TOPICS, "(fo.*)|(ba)"); - runner.setProperty(ConsumeKafka_1_0.TOPIC_TYPE, "pattern"); - runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(3)).continuePolling(); - verify(mockLease, times(2)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - @Test - public void validateGetErrorMessages() throws Exception { - String groupName = "validateGetErrorMessages"; - - when(mockConsumerPool.obtainConsumer(anyObject(), anyObject())).thenReturn(mockLease); - when(mockLease.continuePolling()).thenReturn(true, false); - when(mockLease.commit()).thenReturn(Boolean.FALSE); - - ConsumeKafka_1_0 proc = new ConsumeKafka_1_0() { - @Override - protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) { - return mockConsumerPool; - } - }; - final TestRunner runner = TestRunners.newTestRunner(proc); - runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234"); - runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar"); - runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName); - runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST); - runner.run(1, false); - - verify(mockConsumerPool, times(1)).obtainConsumer(anyObject(), anyObject()); - verify(mockLease, times(2)).continuePolling(); - verify(mockLease, times(1)).poll(); - verify(mockLease, times(1)).commit(); - verify(mockLease, times(1)).close(); - verifyNoMoreInteractions(mockConsumerPool); - verifyNoMoreInteractions(mockLease); - } - - @Test public void testJaasConfiguration() throws Exception { ConsumeKafka_1_0 consumeKafka = new ConsumeKafka_1_0(); TestRunner runner = TestRunners.newTestRunner(consumeKafka);
