Repository: kafka Updated Branches: refs/heads/1.0 1d026269e -> b2bb2c6e8
KAFKA-5989: resume consumption of tasks that have state stores but no changelogging Stores where logging is disabled where never consumed as the partitions were paused, but never resumed. Author: Damian Guy <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #4025 from dguy/1.0 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b2bb2c6e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b2bb2c6e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b2bb2c6e Branch: refs/heads/1.0 Commit: b2bb2c6e8f6d430bd84690f25c2c1a29bc1f3864 Parents: 1d02626 Author: Damian Guy <[email protected]> Authored: Thu Oct 5 12:55:55 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Oct 5 12:55:55 2017 -0700 ---------------------------------------------------------------------- .../processor/internals/AssignedTasks.java | 19 ++-- .../processor/internals/TaskManager.java | 4 +- .../integration/RestoreIntegrationTest.java | 109 ++++++++++++++++++- .../processor/internals/AssignedTasksTest.java | 14 ++- .../processor/internals/TaskManagerTest.java | 25 ++++- 5 files changed, 154 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b2bb2c6e/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 4448a78..12c3f79 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -109,10 +109,12 @@ class AssignedTasks implements RestoringTasks { } /** + * @return partitions that are ready to be resumed * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition */ - void initializeNewTasks() { + Set<TopicPartition> initializeNewTasks() { + final Set<TopicPartition> readyPartitions = new HashSet<>(); if (!created.isEmpty()) { log.debug("Initializing {}s {}", taskTypeName, created.keySet()); } @@ -123,7 +125,7 @@ class AssignedTasks implements RestoringTasks { log.debug("transitioning {} {} to restoring", taskTypeName, entry.getKey()); addToRestoring(entry.getValue()); } else { - transitionToRunning(entry.getValue()); + transitionToRunning(entry.getValue(), readyPartitions); } it.remove(); } catch (final LockException e) { @@ -131,6 +133,7 @@ class AssignedTasks implements RestoringTasks { log.trace("Could not create {} {} due to {}; will retry", taskTypeName, entry.getKey(), e.getMessage()); } } + return readyPartitions; } Set<TopicPartition> updateRestored(final Collection<TopicPartition> restored) { @@ -144,8 +147,7 @@ class AssignedTasks implements RestoringTasks { final Map.Entry<TaskId, Task> entry = it.next(); final Task task = entry.getValue(); if (restoredPartitions.containsAll(task.changelogPartitions())) { - transitionToRunning(task); - resume.addAll(task.partitions()); + transitionToRunning(task, resume); it.remove(); } else { if (log.isTraceEnabled()) { @@ -262,11 +264,11 @@ class AssignedTasks implements RestoringTasks { suspended.remove(taskId); throw e; } - transitionToRunning(task); + transitionToRunning(task, new HashSet<TopicPartition>()); log.trace("resuming suspended {} {}", taskTypeName, task.id()); return true; } else { - log.trace("couldn't resume task {} assigned partitions {}, task partitions {}", taskId, partitions, task.partitions()); + log.warn("couldn't resume task {} assigned partitions {}, task partitions {}", taskId, partitions, task.partitions()); } } return false; @@ -282,11 +284,14 @@ class AssignedTasks implements RestoringTasks { } } - private void transitionToRunning(final Task task) { + private void transitionToRunning(final Task task, final Set<TopicPartition> readyPartitions) { log.debug("transitioning {} {} to running", taskTypeName, task.id()); running.put(task.id(), task); for (TopicPartition topicPartition : task.partitions()) { runningByPartition.put(topicPartition, task); + if (task.hasStateStores()) { + readyPartitions.add(topicPartition); + } } for (TopicPartition topicPartition : task.changelogPartitions()) { runningByPartition.put(topicPartition, task); http://git-wip-us.apache.org/repos/asf/kafka/blob/b2bb2c6e/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 652f4e4..5387425 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -257,11 +257,11 @@ class TaskManager { * @throws TaskMigratedException if another thread wrote to the changelog topic that is currently restored */ boolean updateNewAndRestoringTasks() { - active.initializeNewTasks(); + final Set<TopicPartition> resumed = active.initializeNewTasks(); standby.initializeNewTasks(); final Collection<TopicPartition> restored = changelogReader.restore(active); - final Set<TopicPartition> resumed = active.updateRestored(restored); + resumed.addAll(active.updateRestored(restored)); if (!resumed.isEmpty()) { log.trace("resuming partitions {}", resumed); http://git-wip-us.apache.org/repos/asf/kafka/blob/b2bb2c6e/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index ae36ad8..19ddedf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -30,15 +30,25 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Reducer; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateRestoreListener; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -71,6 +81,7 @@ public class RestoreIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); private static final String INPUT_STREAM = "input-stream"; + private static final String INPUT_STREAM_2 = "input-stream-2"; private final int numberOfKeys = 10000; private KafkaStreams kafkaStreams; private String applicationId = "restore-test"; @@ -79,9 +90,10 @@ public class RestoreIntegrationTest { @BeforeClass public static void createTopics() throws InterruptedException { CLUSTER.createTopic(INPUT_STREAM, 2, 1); + CLUSTER.createTopic(INPUT_STREAM_2, 2, 1); } - private Properties props() { + private Properties props(final String applicationId) { Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); @@ -119,7 +131,7 @@ public class RestoreIntegrationTest { final CountDownLatch startupLatch = new CountDownLatch(1); - kafkaStreams = new KafkaStreams(builder.build(), props()); + kafkaStreams = new KafkaStreams(builder.build(), props(applicationId)); kafkaStreams.setStateListener(new KafkaStreams.StateListener() { @Override public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) { @@ -168,7 +180,7 @@ public class RestoreIntegrationTest { }, Materialized.<Integer, Integer, KeyValueStore<Bytes, byte[]>>as("reduce-store").withLoggingDisabled()); final CountDownLatch startupLatch = new CountDownLatch(1); - kafkaStreams = new KafkaStreams(builder.build(), props()); + kafkaStreams = new KafkaStreams(builder.build(), props(applicationId)); kafkaStreams.setStateListener(new KafkaStreams.StateListener() { @Override public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) { @@ -182,6 +194,97 @@ public class RestoreIntegrationTest { assertTrue(startupLatch.await(30, TimeUnit.SECONDS)); } + + @Test + public void shouldProcessDataFromStoresWithLoggingDisabled() throws InterruptedException, ExecutionException { + + IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_STREAM_2, + Arrays.asList(KeyValue.pair(1, 1), + KeyValue.pair(2, 2), + KeyValue.pair(3, 3)), + TestUtils.producerConfig(CLUSTER.bootstrapServers(), + IntegerSerializer.class, + IntegerSerializer.class), + CLUSTER.time); + + final KeyValueBytesStoreSupplier lruMapSupplier = Stores.lruMap(INPUT_STREAM_2, 10); + + final StoreBuilder<KeyValueStore<Integer, Integer>> storeBuilder = new KeyValueStoreBuilder<>(lruMapSupplier, + Serdes.Integer(), + Serdes.Integer(), + CLUSTER.time) + .withLoggingDisabled(); + + final StreamsBuilder streamsBuilder = new StreamsBuilder(); + + streamsBuilder.addStateStore(storeBuilder); + + final KStream<Integer, Integer> stream = streamsBuilder.stream(INPUT_STREAM_2); + final CountDownLatch processorLatch = new CountDownLatch(3); + stream.process(new ProcessorSupplier<Integer, Integer>() { + @Override + public Processor<Integer, Integer> get() { + return new KeyValueStoreProcessor(INPUT_STREAM_2, processorLatch); + } + }, INPUT_STREAM_2); + + final Topology topology = streamsBuilder.build(); + + kafkaStreams = new KafkaStreams(topology, props(applicationId + "-logging-disabled")); + + final CountDownLatch latch = new CountDownLatch(1); + kafkaStreams.setStateListener(new KafkaStreams.StateListener() { + @Override + public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) { + if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) { + latch.countDown(); + } + } + }); + kafkaStreams.start(); + + latch.await(30, TimeUnit.SECONDS); + + assertTrue(processorLatch.await(30, TimeUnit.SECONDS)); + + } + + + public static class KeyValueStoreProcessor implements Processor<Integer, Integer> { + + private String topic; + private final CountDownLatch processorLatch; + + private KeyValueStore<Integer, Integer> store; + + public KeyValueStoreProcessor(final String topic, final CountDownLatch processorLatch) { + this.topic = topic; + this.processorLatch = processorLatch; + } + + @Override + public void init(final ProcessorContext context) { + this.store = (KeyValueStore<Integer, Integer>) context.getStateStore(topic); + } + + @Override + public void process(final Integer key, final Integer value) { + if (key != null) { + store.put(key, value); + processorLatch.countDown(); + } + } + + @Override + public void punctuate(final long timestamp) { + + } + + @Override + public void close() { + + } + } private void createStateForRestoration() throws ExecutionException, InterruptedException { http://git-wip-us.apache.org/repos/asf/kafka/blob/b2bb2c6e/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java index 9d6aea1..a721936 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java @@ -105,19 +105,22 @@ public class AssignedTasksTest { EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)); EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet()); EasyMock.expect(t2.initialize()).andReturn(true); - EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2)); + final Set<TopicPartition> t2partitions = Collections.singleton(tp2); + EasyMock.expect(t2.partitions()).andReturn(t2partitions); EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()); + EasyMock.expect(t2.hasStateStores()).andReturn(true); EasyMock.replay(t1, t2); assignedTasks.addNewTask(t1); assignedTasks.addNewTask(t2); - assignedTasks.initializeNewTasks(); + final Set<TopicPartition> readyPartitions = assignedTasks.initializeNewTasks(); Collection<Task> restoring = assignedTasks.restoringTasks(); assertThat(restoring.size(), equalTo(1)); assertSame(restoring.iterator().next(), t1); + assertThat(readyPartitions, equalTo(t2partitions)); } @Test @@ -125,13 +128,15 @@ public class AssignedTasksTest { EasyMock.expect(t2.initialize()).andReturn(true); EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2)); EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()); + EasyMock.expect(t2.hasStateStores()).andReturn(false); EasyMock.replay(t2); assignedTasks.addNewTask(t2); - assignedTasks.initializeNewTasks(); + final Set<TopicPartition> toResume = assignedTasks.initializeNewTasks(); assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId2))); + assertThat(toResume, equalTo(Collections.<TopicPartition>emptySet())); } @Test @@ -140,6 +145,7 @@ public class AssignedTasksTest { EasyMock.expect(t1.initialize()).andReturn(false); EasyMock.expect(t1.partitions()).andReturn(task1Partitions).anyTimes(); EasyMock.expect(t1.changelogPartitions()).andReturn(Utils.mkSet(changeLog1, changeLog2)).anyTimes(); + EasyMock.expect(t1.hasStateStores()).andReturn(true).anyTimes(); EasyMock.replay(t1); addAndInitTask(); @@ -263,6 +269,7 @@ public class AssignedTasksTest { EasyMock.expect(t1.initialize()).andReturn(true); EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)); EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()); + EasyMock.expect(t1.hasStateStores()).andReturn(false); } @Test @@ -443,6 +450,7 @@ public class AssignedTasksTest { private void mockRunningTaskSuspension() { EasyMock.expect(t1.initialize()).andReturn(true); + EasyMock.expect(t1.hasStateStores()).andReturn(false).anyTimes(); EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes(); EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()).anyTimes(); t1.suspend(); http://git-wip-us.apache.org/repos/asf/kafka/blob/b2bb2c6e/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 7ee8fae..67dd6c0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -32,6 +32,7 @@ import org.junit.runner.RunWith; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -293,7 +294,7 @@ public class TaskManagerTest { @Test public void shouldInitializeNewActiveTasks() { - active.initializeNewTasks(); + EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>()); EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())). andReturn(Collections.<TopicPartition>emptySet()); EasyMock.expectLastCall(); @@ -304,7 +305,8 @@ public class TaskManagerTest { @Test public void shouldInitializeNewStandbyTasks() { - standby.initializeNewTasks(); + EasyMock.expect(standby.initializeNewTasks()).andReturn(new HashSet<TopicPartition>()); + EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>()); EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())). andReturn(Collections.<TopicPartition>emptySet()); EasyMock.expectLastCall(); @@ -316,6 +318,7 @@ public class TaskManagerTest { @Test public void shouldRestoreStateFromChangeLogReader() { + EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>()); EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions); EasyMock.expect(active.updateRestored(taskId0Partitions)). andReturn(Collections.<TopicPartition>emptySet()); @@ -327,6 +330,7 @@ public class TaskManagerTest { @Test public void shouldResumeRestoredPartitions() { + EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>()); EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions); EasyMock.expect(active.updateRestored(taskId0Partitions)). andReturn(taskId0Partitions); @@ -350,6 +354,7 @@ public class TaskManagerTest { @Test public void shouldReturnFalseWhenThereAreStillNonRunningTasks() { + EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>()); EasyMock.expect(active.allTasksRunning()).andReturn(false); EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())). andReturn(Collections.<TopicPartition>emptySet()); @@ -449,8 +454,24 @@ public class TaskManagerTest { verify(active); } + @Test + public void shouldResumeConsumptionOfInitializedPartitions() { + final Set<TopicPartition> resumed = Collections.singleton(new TopicPartition("topic", 0)); + EasyMock.expect(active.initializeNewTasks()).andReturn(resumed); + EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())). + andReturn(Collections.<TopicPartition>emptySet()); + consumer.resume(resumed); + EasyMock.expectLastCall(); + + EasyMock.replay(active, consumer); + + taskManager.updateNewAndRestoringTasks(); + EasyMock.verify(consumer); + } + private void mockAssignStandbyPartitions(final long offset) { final Task task = EasyMock.createNiceMock(Task.class); + EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>()); EasyMock.expect(active.allTasksRunning()).andReturn(true); EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())). andReturn(Collections.<TopicPartition>emptySet());
