Repository: kafka Updated Branches: refs/heads/trunk 670193f22 -> ebc7f7caa
http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 7abe4dd..e5b96ff 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.clients.producer.MockProducer; @@ -26,6 +27,7 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; @@ -34,13 +36,13 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockInternalTopicManager; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; import org.apache.kafka.test.MockTimestampExtractor; +import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.Assert; import org.junit.Before; @@ -79,8 +81,14 @@ public class StreamThreadTest { private final String clientId = "clientId"; private final String applicationId = "stream-thread-test"; - private final MockTime time = new MockTime(); + private final MockTime mockTime = new MockTime(); + private final Metrics metrics = new Metrics(); + private final MockClientSupplier clientSupplier = new MockClientSupplier(); private UUID processId = UUID.randomUUID(); + final KStreamBuilder builder = new KStreamBuilder(); + private final StreamsConfig config = new StreamsConfig(configProps(false)); + + @Before public void setUp() throws Exception { @@ -106,8 +114,12 @@ public class StreamThreadTest { new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]) ); - private final Cluster metadata = new Cluster("cluster", Collections.singleton(Node.noNode()), infos, Collections.<String>emptySet(), - Collections.<String>emptySet()); + private final Cluster metadata = new Cluster( + "cluster", + Collections.singleton(Node.noNode()), + infos, + Collections.<String>emptySet(), + Collections.<String>emptySet()); private final PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"), subscriptionUserData()); @@ -135,7 +147,7 @@ public class StreamThreadTest { private final TaskId task4 = new TaskId(1, 1); private final TaskId task5 = new TaskId(1, 2); - private Properties configProps() { + private Properties configProps(final boolean enableEos) { return new Properties() { { setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); @@ -143,6 +155,9 @@ public class StreamThreadTest { setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); + if (enableEos) { + setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); + } } }; } @@ -162,13 +177,23 @@ public class StreamThreadTest { final StreamsConfig config, final StreamsMetrics metrics, final StateDirectory stateDirectory) { - super(id, applicationId, partitions, topology, consumer, new StoreChangelogReader(restoreConsumer, Time.SYSTEM, 5000), config, metrics, - stateDirectory, null, new MockTime(), new RecordCollectorImpl(producer, id.toString())); + super(id, + applicationId, + partitions, + topology, + consumer, + new StoreChangelogReader(restoreConsumer, Time.SYSTEM, 5000), + config, + metrics, + stateDirectory, + null, + new MockTime(), + producer); } @Override - public void commit() { - super.commit(); + void commitImpl(final boolean startNewTransaction) { + super.commitImpl(startNewTransaction); committed = true; } @@ -176,9 +201,9 @@ public class StreamThreadTest { protected void updateOffsetLimits() {} @Override - public void close() { + public void close(final boolean clean) { + super.close(clean); closed = true; - super.close(); } @Override @@ -192,32 +217,44 @@ public class StreamThreadTest { @SuppressWarnings("unchecked") @Test public void testPartitionAssignmentChange() throws Exception { - final StreamsConfig config = new StreamsConfig(configProps()); - final StateListenerStub stateListener = new StateListenerStub(); - - - final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); builder.addSource("source1", "topic1"); builder.addSource("source2", "topic2"); builder.addSource("source3", "topic3"); builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3"); - - final MockClientSupplier mockClientSupplier = new MockClientSupplier(); - final StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + metrics, + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0) { @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) { - final ProcessorTopology topology = builder.build(id.topicGroupId); - return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, - mockClientSupplier.getProducer(new HashMap()), restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); + return new TestStreamTask( + id, + applicationId, + partitionsForTask, + topology, + consumer, + clientSupplier.getProducer(new HashMap()), + restoreConsumer, + config, + new MockStreamsMetrics(new Metrics()), + stateDirectory); } }; + final StateListenerStub stateListener = new StateListenerStub(); thread.setStateListener(stateListener); assertEquals(thread.state(), StreamThread.State.RUNNING); - initPartitionGrouper(config, thread, mockClientSupplier); + initPartitionGrouper(config, thread, clientSupplier); final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; @@ -336,7 +373,6 @@ public class StreamThreadTest { @SuppressWarnings("unchecked") @Test public void testHandingOverTaskFromOneToAnotherThread() throws Exception { - final TopologyBuilder builder = new TopologyBuilder(); builder.addStateStore( Stores .create("store") @@ -346,12 +382,31 @@ public class StreamThreadTest { .build() ); builder.addSource("source", TOPIC); - final StreamsConfig config = new StreamsConfig(configProps()); - final MockClientSupplier mockClientSupplier = new MockClientSupplier(); - mockClientSupplier.consumer.assign(Arrays.asList(new TopicPartition(TOPIC, 0), new TopicPartition(TOPIC, 1))); - final StreamThread thread1 = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId + 1, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); - final StreamThread thread2 = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId + 2, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); + clientSupplier.consumer.assign(Arrays.asList(new TopicPartition(TOPIC, 0), new TopicPartition(TOPIC, 1))); + + final StreamThread thread1 = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId + 1, + processId, + metrics, + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); + final StreamThread thread2 = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId + 2, + processId, + metrics, + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); final Map<TaskId, Set<TopicPartition>> task0 = Collections.singletonMap(new TaskId(0, 0), task0Assignment); final Map<TaskId, Set<TopicPartition>> task1 = Collections.singletonMap(new TaskId(0, 1), task1Assignment); @@ -417,17 +472,24 @@ public class StreamThreadTest { Map<TaskId, Set<TopicPartition>> activeTasks() { return activeTaskAssignment; } + + @Override + public void close() {} } @Test public void testMetrics() throws Exception { - final TopologyBuilder builder = new TopologyBuilder().setApplicationId("MetricsApp"); - final StreamsConfig config = new StreamsConfig(configProps()); - final MockClientSupplier clientSupplier = new MockClientSupplier(); - - final Metrics metrics = new Metrics(); - final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, - clientId, processId, metrics, new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + metrics, + mockTime, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); final String defaultGroupName = "stream-metrics"; final String defaultPrefix = "thread." + thread.threadClientId(); final Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.threadClientId()); @@ -462,31 +524,34 @@ public class StreamThreadTest { final File baseDir = Files.createTempDirectory("test").toFile(); try { final long cleanupDelay = 1000L; - final Properties props = configProps(); + final Properties props = configProps(false); props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay)); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); - final StreamsConfig config = new StreamsConfig(props); - final File applicationDir = new File(baseDir, applicationId); applicationDir.mkdir(); final File stateDir1 = new File(applicationDir, task1.toString()); final File stateDir2 = new File(applicationDir, task2.toString()); final File stateDir3 = new File(applicationDir, task3.toString()); - final File extraDir = new File(applicationDir, "X"); + final File extraDir = new File(applicationDir, applicationId); stateDir1.mkdir(); stateDir2.mkdir(); stateDir3.mkdir(); extraDir.mkdir(); - final MockTime mockTime = new MockTime(); - - final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); builder.addSource("source1", "topic1"); - final MockClientSupplier mockClientSupplier = new MockClientSupplier(); - final StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId, processId, new Metrics(), mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0) { + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + metrics, + mockTime, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0) { @Override public void maybeClean(final long now) { @@ -496,15 +561,21 @@ public class StreamThreadTest { @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) { final ProcessorTopology topology = builder.build(id.topicGroupId); - return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, - mockClientSupplier.getProducer(new HashMap()), restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); + return new TestStreamTask( + id, + applicationId, + partitionsForTask, + topology, + consumer, + clientSupplier.getProducer(new HashMap<String, Object>()), + restoreConsumer, + config, + new MockStreamsMetrics(new Metrics()), + stateDirectory); } }; - initPartitionGrouper(config, thread, mockClientSupplier); - - final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; - + initPartitionGrouper(config, thread, clientSupplier); assertTrue(thread.tasks().isEmpty()); mockTime.sleep(cleanupDelay); @@ -530,6 +601,7 @@ public class StreamThreadTest { assignedPartitions = Arrays.asList(t1p1, t1p2); prevTasks = new HashMap<>(thread.tasks()); + final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; rebalanceListener.onPartitionsRevoked(revokedPartitions); rebalanceListener.onPartitionsAssigned(assignedPartitions); @@ -592,7 +664,6 @@ public class StreamThreadTest { assertFalse(stateDir2.exists()); assertFalse(stateDir3.exists()); assertTrue(extraDir.exists()); - } finally { Utils.delete(baseDir); } @@ -603,20 +674,25 @@ public class StreamThreadTest { final File baseDir = Files.createTempDirectory("test").toFile(); try { final long commitInterval = 1000L; - final Properties props = configProps(); + final Properties props = configProps(false); props.setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval)); final StreamsConfig config = new StreamsConfig(props); - final MockTime mockTime = new MockTime(); - - final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); builder.addSource("source1", "topic1"); - final MockClientSupplier mockClientSupplier = new MockClientSupplier(); - final StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, clientId, processId, new Metrics(), mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0) { + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + metrics, + mockTime, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0) { @Override public void maybeCommit(final long now) { @@ -626,12 +702,21 @@ public class StreamThreadTest { @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitionsForTask) { final ProcessorTopology topology = builder.build(id.topicGroupId); - return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, - mockClientSupplier.getProducer(new HashMap()), restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); + return new TestStreamTask( + id, + applicationId, + partitionsForTask, + topology, + consumer, + clientSupplier.getProducer(new HashMap<String, Object>()), + restoreConsumer, + config, + new MockStreamsMetrics(new Metrics()), + stateDirectory); } }; - initPartitionGrouper(config, thread, mockClientSupplier); + initPartitionGrouper(config, thread, clientSupplier); final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; @@ -684,10 +769,9 @@ public class StreamThreadTest { } @Test - public void shouldInjectSharedProducerForAllTasksUsingClientSupplierWhenEosDisabled() { - final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X").addSource("source1", "someTopic"); - final StreamsConfig config = new StreamsConfig(configProps()); - final MockClientSupplier clientSupplier = new MockClientSupplier(); + public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() { + builder.addSource("source1", "someTopic"); + final StreamThread thread = new StreamThread( builder, config, @@ -695,8 +779,8 @@ public class StreamThreadTest { applicationId, clientId, processId, - new Metrics(), - new MockTime(), + metrics, + mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); @@ -718,21 +802,19 @@ public class StreamThreadTest { } @Test - public void shouldInjectProducerPerTaskUsingClientSupplierForEoS() { - final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X").addSource("source1", "someTopic"); - final Properties properties = configProps(); - properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - final StreamsConfig config = new StreamsConfig(properties); - final MockClientSupplier clientSupplier = new MockClientSupplier(); + public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() { + builder.addSource("source1", "someTopic"); + + final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId); final StreamThread thread = new StreamThread( builder, - config, + new StreamsConfig(configProps(true)), clientSupplier, applicationId, clientId, processId, - new Metrics(), - new MockTime(), + metrics, + mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); @@ -757,21 +839,19 @@ public class StreamThreadTest { } @Test - public void shouldCloseAllTaskProducers() { - final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X").addSource("source1", "someTopic"); - final Properties properties = configProps(); - properties.setProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - final StreamsConfig config = new StreamsConfig(properties); - final MockClientSupplier clientSupplier = new MockClientSupplier(); + public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() { + builder.addSource("source1", "someTopic"); + + final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId); final StreamThread thread = new StreamThread( builder, - config, + new StreamsConfig(configProps(true)), clientSupplier, applicationId, clientId, processId, - new Metrics(), - new MockTime(), + metrics, + mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); @@ -791,10 +871,9 @@ public class StreamThreadTest { } @Test - public void shouldCloseThreadProducer() { - final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X").addSource("source1", "someTopic"); - final StreamsConfig config = new StreamsConfig(configProps()); - final MockClientSupplier clientSupplier = new MockClientSupplier(); + public void shouldCloseThreadProducerOnCloseIfEosDisabled() { + builder.addSource("source1", "someTopic"); + final StreamThread thread = new StreamThread( builder, config, @@ -802,8 +881,8 @@ public class StreamThreadTest { applicationId, clientId, processId, - new Metrics(), - new MockTime(), + metrics, + mockTime, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); @@ -822,15 +901,19 @@ public class StreamThreadTest { @Test public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws Exception { - final TopologyBuilder builder = new TopologyBuilder(); - builder.setApplicationId(applicationId) - .addSource("name", "topic") - .addSink("out", "output"); - + builder.addSource("name", "topic").addSink("out", "output"); - final StreamsConfig config = new StreamsConfig(configProps()); - final StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, - clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + metrics, + mockTime, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); thread.setPartitionAssignor(new StreamPartitionAssignor() { @Override @@ -849,11 +932,18 @@ public class StreamThreadTest { builder.setApplicationId(applicationId); builder.stream("t1").groupByKey().count("count-one"); builder.stream("t2").groupByKey().count("count-two"); - final StreamsConfig config = new StreamsConfig(configProps()); - final MockClientSupplier clientSupplier = new MockClientSupplier(); - final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, - clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + metrics, + mockTime, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer; restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", @@ -900,11 +990,18 @@ public class StreamThreadTest { builder.setApplicationId(applicationId); builder.stream("t1").groupByKey().count("count-one"); builder.stream("t2").groupByKey().count("count-two"); - final StreamsConfig config = new StreamsConfig(configProps()); - final MockClientSupplier clientSupplier = new MockClientSupplier(); - final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, - clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + metrics, + mockTime, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer; restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", @@ -964,18 +1061,35 @@ public class StreamThreadTest { final KStreamBuilder builder = new KStreamBuilder(); builder.setApplicationId(applicationId); builder.stream(Pattern.compile("t.*")).to("out"); - final StreamsConfig config = new StreamsConfig(configProps()); - final MockClientSupplier mockClientSupplier = new MockClientSupplier(); final Map<Collection<TopicPartition>, TestStreamTask> createdTasks = new HashMap<>(); - final StreamThread thread = new StreamThread(builder, config, mockClientSupplier, applicationId, - clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + metrics, + mockTime, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0) { + @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { final ProcessorTopology topology = builder.build(id.topicGroupId); - final TestStreamTask task = new TestStreamTask(id, applicationId, partitions, topology, consumer, - mockClientSupplier.getProducer(new HashMap()), restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); + final TestStreamTask task = new TestStreamTask( + id, + applicationId, + partitions, + topology, + consumer, + clientSupplier.getProducer(new HashMap<String, Object>()), + restoreConsumer, + config, + new MockStreamsMetrics(new Metrics()), + stateDirectory); createdTasks.put(partitions, task); return task; } @@ -1024,43 +1138,177 @@ public class StreamThreadTest { } @Test + public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws Exception { + builder.addSource("source", TOPIC).addSink("sink", "dummyTopic", "source"); + + final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId); + final StreamThread thread = new StreamThread( + builder, + new StreamsConfig(configProps(true)), + clientSupplier, + applicationId, + clientId, + processId, + metrics, + mockTime, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); + + final MockConsumer consumer = clientSupplier.consumer; + consumer.updatePartitions(TOPIC, Collections.singletonList(new PartitionInfo(TOPIC, 0, null, null, null))); + + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + activeTasks.put(task1, task0Assignment); + + thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks)); + + thread.rebalanceListener.onPartitionsRevoked(null); + thread.rebalanceListener.onPartitionsAssigned(task0Assignment); + assertThat(thread.tasks().size(), equalTo(1)); + final MockProducer producer = clientSupplier.producers.get(0); + + thread.start(); + + TestUtils.waitForCondition( + new TestCondition() { + @Override + public boolean conditionMet() { + return !consumer.subscription().isEmpty(); + } + }, + "StreamsThread's internal consumer did not subscribe to input topic."); + + // change consumer subscription from "pattern" to "manual" to be able to call .addRecords() + consumer.updateBeginningOffsets(new HashMap<TopicPartition, Long>() { + { + put(task0Assignment.iterator().next(), 0L); + } + }); + consumer.unsubscribe(); + consumer.assign(task0Assignment); + + consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, new byte[0], new byte[0])); + mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1); + TestUtils.waitForCondition( + new TestCondition() { + @Override + public boolean conditionMet() { + return producer.history().size() == 1; + } + }, + "StreamsThread did not produce output record."); + + assertFalse(producer.transactionCommitted()); + mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L); + TestUtils.waitForCondition( + new TestCondition() { + @Override + public boolean conditionMet() { + return producer.commitCount() == 1; + } + }, + "StreamsThread did not commit transaction."); + + producer.fenceProducer(); + mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L); + + consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, new byte[0], new byte[0])); + TestUtils.waitForCondition( + new TestCondition() { + @Override + public boolean conditionMet() { + return thread.tasks().isEmpty(); + } + }, + "StreamsThread did not remove fenced zombie task."); + + thread.close(); + thread.join(); + + assertThat(producer.commitCount(), equalTo(1L)); + } + + @Test + public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed() throws Exception { + builder.addSource("name", "topic").addSink("out", "output"); + + final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId); + final StreamThread thread = new StreamThread( + builder, + new StreamsConfig(configProps(true)), + clientSupplier, + applicationId, + clientId, + processId, + new Metrics(), + new MockTime(), + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); + + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + activeTasks.put(task1, task0Assignment); + + thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks)); + + thread.rebalanceListener.onPartitionsRevoked(null); + thread.rebalanceListener.onPartitionsAssigned(task0Assignment); + assertThat(thread.tasks().size(), equalTo(1)); + + thread.rebalanceListener.onPartitionsRevoked(null); + clientSupplier.producers.get(0).fenceProducer(); + try { + thread.rebalanceListener.onPartitionsAssigned(task0Assignment); + fail("should have thrown " + ProducerFencedException.class.getSimpleName()); + } catch (final ProducerFencedException e) { } + + assertTrue(thread.tasks().isEmpty()); + } + + @Test public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskCloseDuringShutdown() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); builder.setApplicationId(applicationId); builder.stream("t1").groupByKey(); - final StreamsConfig config = new StreamsConfig(configProps()); - final MockClientSupplier clientSupplier = new MockClientSupplier(); - final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), - applicationId, - Utils.mkSet(new TopicPartition("t1", 0)), - builder.build(0), - clientSupplier.consumer, - clientSupplier.getProducer(new HashMap()), - clientSupplier.restoreConsumer, - config, - new MockStreamsMetrics(new Metrics()), - new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time)) { + + final TestStreamTask testStreamTask = new TestStreamTask( + new TaskId(0, 0), + applicationId, + Utils.mkSet(new TopicPartition("t1", 0)), + builder.build(0), + clientSupplier.consumer, + clientSupplier.getProducer(new HashMap<String, Object>()), + clientSupplier.restoreConsumer, + config, + new MockStreamsMetrics(new Metrics()), + new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), mockTime)) { + @Override - public void close() { + public void close(final boolean clean) { throw new RuntimeException("KABOOM!"); } }; - final StreamsConfig config1 = new StreamsConfig(configProps()); - final StreamThread thread = new StreamThread(builder, config1, clientSupplier, applicationId, - clientId, processId, new Metrics(), new MockTime(), - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + metrics, + mockTime, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0) { + @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { return testStreamTask; } }; - final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); activeTasks.put(testStreamTask.id(), testStreamTask.partitions); - thread.setPartitionAssignor(new MockStreamsPartitionAssignor(activeTasks)); thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); @@ -1070,37 +1318,42 @@ public class StreamThreadTest { thread.close(); thread.join(); assertFalse("task shouldn't have been committed as there was an exception during shutdown", testStreamTask.committed); - - } @Test public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskFlushDuringShutdown() throws Exception { - final KStreamBuilder builder = new KStreamBuilder(); - builder.setApplicationId(applicationId); final MockStateStoreSupplier.MockStateStore stateStore = new MockStateStoreSupplier.MockStateStore("foo", false); builder.stream("t1").groupByKey().count(new MockStateStoreSupplier(stateStore)); - final StreamsConfig config = new StreamsConfig(configProps()); - final MockClientSupplier clientSupplier = new MockClientSupplier(); - final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), - applicationId, - Utils.mkSet(new TopicPartition("t1", 0)), - builder.build(0), - clientSupplier.consumer, - clientSupplier.getProducer(new HashMap()), - clientSupplier.restoreConsumer, - config, - new MockStreamsMetrics(new Metrics()), - new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time)) { + final TestStreamTask testStreamTask = new TestStreamTask( + new TaskId(0, 0), + applicationId, + Utils.mkSet(new TopicPartition("t1", 0)), + builder.build(0), + clientSupplier.consumer, + clientSupplier.getProducer(new HashMap<String, Object>()), + clientSupplier.restoreConsumer, + config, + new MockStreamsMetrics(metrics), + new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), mockTime)) { + @Override public void flushState() { throw new RuntimeException("KABOOM!"); } }; - final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, - clientId, processId, new Metrics(), new MockTime(), - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + metrics, + mockTime, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0) { + @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { return testStreamTask; @@ -1132,28 +1385,37 @@ public class StreamThreadTest { final KStreamBuilder builder = new KStreamBuilder(); builder.setApplicationId(applicationId); builder.stream("t1").groupByKey(); - final StreamsConfig config = new StreamsConfig(configProps()); - final MockClientSupplier clientSupplier = new MockClientSupplier(); - final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), - applicationId, - Utils.mkSet(new TopicPartition("t1", 0)), - builder.build(0), - clientSupplier.consumer, - clientSupplier.getProducer(new HashMap()), - clientSupplier.restoreConsumer, - config, - new MockStreamsMetrics(new Metrics()), - new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time)) { + + final TestStreamTask testStreamTask = new TestStreamTask( + new TaskId(0, 0), + applicationId, + Utils.mkSet(new TopicPartition("t1", 0)), + builder.build(0), + clientSupplier.consumer, + clientSupplier.getProducer(new HashMap<String, Object>()), + clientSupplier.restoreConsumer, + config, + new MockStreamsMetrics(new Metrics()), + new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), mockTime)) { + @Override public void suspend() { throw new RuntimeException("KABOOM!"); } }; - final StreamsConfig config1 = new StreamsConfig(configProps()); - final StreamThread thread = new StreamThread(builder, config1, clientSupplier, applicationId, - clientId, processId, new Metrics(), new MockTime(), - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + metrics, + mockTime, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0) { + @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { return testStreamTask; @@ -1183,28 +1445,36 @@ public class StreamThreadTest { final KStreamBuilder builder = new KStreamBuilder(); builder.setApplicationId(applicationId); builder.stream("t1").groupByKey(); - final StreamsConfig config = new StreamsConfig(configProps()); - final MockClientSupplier clientSupplier = new MockClientSupplier(); - final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), - applicationId, - Utils.mkSet(new TopicPartition("t1", 0)), - builder.build(0), - clientSupplier.consumer, - clientSupplier.getProducer(new HashMap()), - clientSupplier.restoreConsumer, - config, - new MockStreamsMetrics(new Metrics()), - new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), time)) { + + final TestStreamTask testStreamTask = new TestStreamTask( + new TaskId(0, 0), + applicationId, + Utils.mkSet(new TopicPartition("t1", 0)), + builder.build(0), + clientSupplier.consumer, + clientSupplier.getProducer(new HashMap<String, Object>()), + clientSupplier.restoreConsumer, + config, + new MockStreamsMetrics(new Metrics()), + new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), mockTime)) { + @Override protected void flushState() { throw new RuntimeException("KABOOM!"); } }; - final StreamsConfig config1 = new StreamsConfig(configProps()); - final StreamThread thread = new StreamThread(builder, config1, clientSupplier, applicationId, - clientId, processId, new Metrics(), new MockTime(), - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + final StreamThread thread = new StreamThread( + builder, + config, + clientSupplier, + applicationId, + clientId, + processId, + metrics, + mockTime, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { return testStreamTask; @@ -1227,11 +1497,12 @@ public class StreamThreadTest { // expected } assertFalse(testStreamTask.committed); - } + private void initPartitionGrouper(final StreamsConfig config, + final StreamThread thread, + final MockClientSupplier clientSupplier) { - private void initPartitionGrouper(final StreamsConfig config, final StreamThread thread, final MockClientSupplier clientSupplier) { final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(thread, thread.applicationId, thread.clientId)); http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index f8b17b2..bf55b47 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -27,20 +27,19 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StoreChangelogReader; import org.apache.kafka.streams.processor.internals.StreamTask; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsMetadataState; -import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.ReadOnlyWindowStore; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.MockClientSupplier; import org.apache.kafka.test.MockProcessorSupplier; -import org.apache.kafka.test.NoOpRecordCollector; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -61,7 +60,6 @@ import static org.junit.Assert.assertEquals; public class StreamThreadStateStoreProviderTest { - private StreamThread thread; private StreamTask taskOne; private StreamTask taskTwo; private StreamThreadStateStoreProvider provider; @@ -113,23 +111,29 @@ public class StreamThreadStateStoreProviderTest { taskTwo); storesAvailable = true; - thread = new StreamThread(builder, streamsConfig, clientSupplier, - applicationId, - "clientId", UUID.randomUUID(), new Metrics(), - Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0) { - @Override - public Map<TaskId, StreamTask> tasks() { - return tasks; - } - - @Override - public boolean isInitialized() { - return storesAvailable; - } - }; - provider = new StreamThreadStateStoreProvider(thread); - + provider = new StreamThreadStateStoreProvider( + new StreamThread( + builder, + streamsConfig, + clientSupplier, + applicationId, + "clientId", + UUID.randomUUID(), + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0) { + + @Override + public Map<TaskId, StreamTask> tasks() { + return tasks; + } + + @Override + public boolean isInitialized() { + return storesAvailable; + } + }); } @After @@ -139,7 +143,7 @@ public class StreamThreadStateStoreProviderTest { @Test public void shouldFindKeyValueStores() throws Exception { - List<ReadOnlyKeyValueStore<String, String>> kvStores = + final List<ReadOnlyKeyValueStore<String, String>> kvStores = provider.stores("kv-store", QueryableStoreTypes.<String, String>keyValueStore()); assertEquals(2, kvStores.size()); } @@ -188,15 +192,22 @@ public class StreamThreadStateStoreProviderTest { final MockClientSupplier clientSupplier, final ProcessorTopology topology, final TaskId taskId) { - return new StreamTask(taskId, applicationId, Collections - .singletonList(new TopicPartition(topicName, taskId.partition)), topology, - clientSupplier.consumer, - new StoreChangelogReader(clientSupplier.restoreConsumer, Time.SYSTEM, 5000), - streamsConfig, new MockStreamsMetrics(new Metrics()), stateDirectory, null, new MockTime(), new NoOpRecordCollector()) { - @Override - protected void updateOffsetLimits() { + return new StreamTask( + taskId, + applicationId, + Collections.singletonList(new TopicPartition(topicName, taskId.partition)), + topology, + clientSupplier.consumer, + new StoreChangelogReader(clientSupplier.restoreConsumer, Time.SYSTEM, 5000), + streamsConfig, + new MockStreamsMetrics(new Metrics()), + stateDirectory, + null, + new MockTime(), + clientSupplier.getProducer(new HashMap<String, Object>())) { - } + @Override + protected void updateOffsetLimits() {} }; } http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java index 531fdb6..da5ab3b 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.streams.KafkaClientSupplier; @@ -28,17 +29,35 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import static org.hamcrest.CoreMatchers.startsWith; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; + public class MockClientSupplier implements KafkaClientSupplier { + private final String applicationId; private static final ByteArraySerializer BYTE_ARRAY_SERIALIZER = new ByteArraySerializer(); public final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); public final MockConsumer<byte[], byte[]> restoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); - public final List<Producer> producers = new LinkedList<>(); + public final List<MockProducer> producers = new LinkedList<>(); + + public MockClientSupplier() { + this(null); + } + + public MockClientSupplier(final String applicationId) { + this.applicationId = applicationId; + } @Override public Producer<byte[], byte[]> getProducer(final Map<String, Object> config) { - final Producer<byte[], byte[]> producer = new MockProducer<>(true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER); + if (applicationId != null) { + assertThat((String) config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), startsWith(applicationId + "-")); + } else { + assertFalse(config.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); + } + final MockProducer<byte[], byte[]> producer = new MockProducer<>(true, BYTE_ARRAY_SERIALIZER, BYTE_ARRAY_SERIALIZER); producers.add(producer); return producer; } http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java index 05175f9..66271a0 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpRecordCollector.java @@ -45,17 +45,14 @@ public class NoOpRecordCollector implements RecordCollector { final StreamPartitioner<? super K, ? super V> partitioner) {} @Override - public void flush() { - //no-op - } + public void flush() {} @Override - public void close() { - //no-op - } + public void close() {} @Override public Map<TopicPartition, Long> offsets() { return Collections.emptyMap(); } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/ebc7f7ca/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 5d46ce0..a9f020b 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -16,17 +16,7 @@ */ package org.apache.kafka.test; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -50,19 +40,28 @@ import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl; import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl; import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorTopology; -import org.apache.kafka.streams.processor.internals.RecordCollectorImpl; import org.apache.kafka.streams.processor.internals.StateDirectory; import org.apache.kafka.streams.processor.internals.StoreChangelogReader; import org.apache.kafka.streams.processor.internals.StreamTask; -import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.internals.ThreadCache; - import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; /** * This class makes it easier to write tests to verify the behavior of topologies created with a {@link TopologyBuilder}. @@ -140,21 +139,16 @@ import java.io.IOException; */ public class ProcessorTopologyTestDriver { - private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer(); - private final static String APPLICATION_ID = "test-driver-application"; private final static int PARTITION_ID = 0; private final static TaskId TASK_ID = new TaskId(0, PARTITION_ID); private final ProcessorTopology topology; - private final MockConsumer<byte[], byte[]> consumer; private final MockProducer<byte[], byte[]> producer; - private final MockConsumer<byte[], byte[]> restoreStateConsumer; private final Map<String, TopicPartition> partitionsByTopic = new HashMap<>(); private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<>(); private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<>(); private final Set<String> internalTopics = new HashSet<>(); - private final ProcessorTopology globalTopology; private final Map<String, TopicPartition> globalPartitionsByTopic = new HashMap<>(); private StreamTask task; private GlobalStateUpdateTask globalStateTask; @@ -164,28 +158,29 @@ public class ProcessorTopologyTestDriver { * @param config the stream configuration for the topology * @param builder the topology builder that will be used to create the topology instance */ - public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder) { + public ProcessorTopologyTestDriver(final StreamsConfig config, + final TopologyBuilder builder) { topology = builder.setApplicationId(APPLICATION_ID).build(null); - globalTopology = builder.buildGlobalStateTopology(); + final ProcessorTopology globalTopology = builder.buildGlobalStateTopology(); // Set up the consumer and producer ... - consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + final Consumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + final Serializer<byte[]> bytesSerializer = new ByteArraySerializer(); producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) { @Override - public List<PartitionInfo> partitionsFor(String topic) { + public List<PartitionInfo> partitionsFor(final String topic) { return Collections.singletonList(new PartitionInfo(topic, PARTITION_ID, null, null, null)); } }; - restoreStateConsumer = createRestoreConsumer(TASK_ID, topology.storeToChangelogTopic()); // Identify internal topics for forwarding in process ... - for (TopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) { + for (final TopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) { internalTopics.addAll(topicsInfo.repartitionSourceTopics.keySet()); } // Set up all of the topic+partition information and subscribe the consumer to each ... - for (String topic : topology.sourceTopics()) { - TopicPartition tp = new TopicPartition(topic, PARTITION_ID); + for (final String topic : topology.sourceTopics()) { + final TopicPartition tp = new TopicPartition(topic, PARTITION_ID); partitionsByTopic.put(topic, tp); offsetsByTopicPartition.put(tp, new AtomicLong()); } @@ -199,7 +194,7 @@ public class ProcessorTopologyTestDriver { if (globalTopology != null) { final MockConsumer<byte[], byte[]> globalConsumer = createGlobalConsumer(); for (final String topicName : globalTopology.sourceTopics()) { - List<PartitionInfo> partitionInfos = new ArrayList<>(); + final List<PartitionInfo> partitionInfos = new ArrayList<>(); partitionInfos.add(new PartitionInfo(topicName, 1, null, null, null)); globalConsumer.updatePartitions(topicName, partitionInfos); final TopicPartition partition = new TopicPartition(topicName, 1); @@ -221,12 +216,15 @@ public class ProcessorTopologyTestDriver { partitionsByTopic.values(), topology, consumer, - new StoreChangelogReader(restoreStateConsumer, Time.SYSTEM, 5000), + new StoreChangelogReader( + createRestoreConsumer(topology.storeToChangelogTopic()), + Time.SYSTEM, + 5000), config, streamsMetrics, stateDirectory, cache, new MockTime(), - new RecordCollectorImpl(producer, "id")); + producer); } } @@ -238,12 +236,15 @@ public class ProcessorTopologyTestDriver { * @param value the raw message value * @param timestamp the raw message timestamp */ - private void process(String topicName, byte[] key, byte[] value, long timestamp) { + private void process(final String topicName, + final byte[] key, + final byte[] value, + final long timestamp) { - TopicPartition tp = partitionsByTopic.get(topicName); + final TopicPartition tp = partitionsByTopic.get(topicName); if (tp != null) { // Add the record ... - long offset = offsetsByTopicPartition.get(tp).incrementAndGet(); + final long offset = offsetsByTopicPartition.get(tp).incrementAndGet(); task.addRecords(tp, records(new ConsumerRecord<>(tp.topic(), tp.partition(), offset, timestamp, TimestampType.CREATE_TIME, 0L, 0, 0, key, value))); producer.clear(); @@ -253,7 +254,7 @@ public class ProcessorTopologyTestDriver { task.commit(); // Capture all the records sent to the producer ... - for (ProducerRecord<byte[], byte[]> record : producer.history()) { + for (final ProducerRecord<byte[], byte[]> record : producer.history()) { Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(record.topic()); if (outputRecords == null) { outputRecords = new LinkedList<>(); @@ -284,7 +285,9 @@ public class ProcessorTopologyTestDriver { * @param key the raw message key * @param value the raw message value */ - public void process(String topicName, byte[] key, byte[] value) { + public void process(final String topicName, + final byte[] key, + final byte[] value) { process(topicName, key, value, 0L); } @@ -297,7 +300,11 @@ public class ProcessorTopologyTestDriver { * @param keySerializer the serializer for the key * @param valueSerializer the serializer for the value */ - public <K, V> void process(String topicName, K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) { + public <K, V> void process(final String topicName, + final K key, + final V value, + final Serializer<K> keySerializer, + final Serializer<V> valueSerializer) { process(topicName, keySerializer.serialize(topicName, key), valueSerializer.serialize(topicName, value)); } @@ -308,9 +315,11 @@ public class ProcessorTopologyTestDriver { * @param topic the name of the topic * @return the next record on that topic, or null if there is no record available */ - public ProducerRecord<byte[], byte[]> readOutput(String topic) { - Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(topic); - if (outputRecords == null) return null; + public ProducerRecord<byte[], byte[]> readOutput(final String topic) { + final Queue<ProducerRecord<byte[], byte[]>> outputRecords = outputRecordsByTopic.get(topic); + if (outputRecords == null) { + return null; + } return outputRecords.poll(); } @@ -323,15 +332,19 @@ public class ProcessorTopologyTestDriver { * @param valueDeserializer the deserializer for the value type * @return the next record on that topic, or null if there is no record available */ - public <K, V> ProducerRecord<K, V> readOutput(String topic, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) { - ProducerRecord<byte[], byte[]> record = readOutput(topic); - if (record == null) return null; - K key = keyDeserializer.deserialize(record.topic(), record.key()); - V value = valueDeserializer.deserialize(record.topic(), record.value()); - return new ProducerRecord<K, V>(record.topic(), record.partition(), record.timestamp(), key, value); + public <K, V> ProducerRecord<K, V> readOutput(final String topic, + final Deserializer<K> keyDeserializer, + final Deserializer<V> valueDeserializer) { + final ProducerRecord<byte[], byte[]> record = readOutput(topic); + if (record == null) { + return null; + } + final K key = keyDeserializer.deserialize(record.topic(), record.key()); + final V value = valueDeserializer.deserialize(record.topic(), record.value()); + return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value); } - private Iterable<ConsumerRecord<byte[], byte[]>> records(ConsumerRecord<byte[], byte[]> record) { + private Iterable<ConsumerRecord<byte[], byte[]>> records(final ConsumerRecord<byte[], byte[]> record) { return Collections.singleton(record); } @@ -347,7 +360,7 @@ public class ProcessorTopologyTestDriver { * @return the state store, or null if no store has been registered with the given name * @see #getKeyValueStore(String) */ - public StateStore getStateStore(String name) { + public StateStore getStateStore(final String name) { return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name); } @@ -365,8 +378,8 @@ public class ProcessorTopologyTestDriver { * @see #getStateStore(String) */ @SuppressWarnings("unchecked") - public <K, V> KeyValueStore<K, V> getKeyValueStore(String name) { - StateStore store = getStateStore(name); + public <K, V> KeyValueStore<K, V> getKeyValueStore(final String name) { + final StateStore store = getStateStore(name); return store instanceof KeyValueStore ? (KeyValueStore<K, V>) getStateStore(name) : null; } @@ -375,12 +388,12 @@ public class ProcessorTopologyTestDriver { */ public void close() { if (task != null) { - task.close(); + task.close(true); } if (globalStateTask != null) { try { globalStateTask.close(); - } catch (IOException e) { + } catch (final IOException e) { // ignore } } @@ -390,35 +403,29 @@ public class ProcessorTopologyTestDriver { * Utility method that creates the {@link MockConsumer} used for restoring state, which should not be done by this * driver object unless this method is overwritten with a functional consumer. * - * @param id the ID of the stream task * @param storeToChangelogTopic the map of the names of the stores to the changelog topics * @return the mock consumer; never null */ - protected MockConsumer<byte[], byte[]> createRestoreConsumer(TaskId id, Map<String, String> storeToChangelogTopic) { - MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) { + private MockConsumer<byte[], byte[]> createRestoreConsumer(final Map<String, String> storeToChangelogTopic) { + final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) { @Override - public synchronized void seekToEnd(Collection<TopicPartition> partitions) { - // do nothing ... - } + public synchronized void seekToEnd(final Collection<TopicPartition> partitions) {} @Override - public synchronized void seekToBeginning(Collection<TopicPartition> partitions) { - // do nothing ... - } + public synchronized void seekToBeginning(final Collection<TopicPartition> partitions) {} @Override - public synchronized long position(TopicPartition partition) { - // do nothing ... + public synchronized long position(final TopicPartition partition) { return 0L; } }; // For each store ... - for (Map.Entry<String, String> storeAndTopic: storeToChangelogTopic.entrySet()) { - String topicName = storeAndTopic.getValue(); + for (final Map.Entry<String, String> storeAndTopic: storeToChangelogTopic.entrySet()) { + final String topicName = storeAndTopic.getValue(); // Set up the restore-state topic ... // consumer.subscribe(new TopicPartition(topicName, 1)); // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ... - List<PartitionInfo> partitionInfos = new ArrayList<>(); + final List<PartitionInfo> partitionInfos = new ArrayList<>(); partitionInfos.add(new PartitionInfo(topicName, PARTITION_ID, null, null, null)); consumer.updatePartitions(topicName, partitionInfos); consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, PARTITION_ID), 0L)); @@ -426,25 +433,19 @@ public class ProcessorTopologyTestDriver { return consumer; } - protected MockConsumer<byte[], byte[]> createGlobalConsumer() { - MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) { + private MockConsumer<byte[], byte[]> createGlobalConsumer() { + return new MockConsumer<byte[], byte[]>(OffsetResetStrategy.LATEST) { @Override - public synchronized void seekToEnd(Collection<TopicPartition> partitions) { - // do nothing ... - } + public synchronized void seekToEnd(final Collection<TopicPartition> partitions) {} @Override - public synchronized void seekToBeginning(Collection<TopicPartition> partitions) { - // do nothing ... - } + public synchronized void seekToBeginning(final Collection<TopicPartition> partitions) {} @Override - public synchronized long position(TopicPartition partition) { - // do nothing ... + public synchronized long position(final TopicPartition partition) { return 0L; } }; - - return consumer; } + }
