[ https://issues.apache.org/jira/browse/KAFKA-6205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341331#comment-16341331 ]
ASF GitHub Bot commented on KAFKA-6205: --------------------------------------- guozhangwang closed pull request #4415: KAFKA-6205: initialize topology after state stores restoration completed URL: https://github.com/apache/kafka/pull/4415 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 2b8af6ddb48..fe7574ff403 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -199,7 +199,7 @@ void flushState() { * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition */ - void initializeStateStores() { + void initStateStores() { if (topology.stateStores().isEmpty()) { return; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index b90ec10dafd..2cd82f461dd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -99,7 +99,7 @@ void addNewTask(final T task) { for (final Iterator<Map.Entry<TaskId, T>> it = created.entrySet().iterator(); it.hasNext(); ) { final Map.Entry<TaskId, T> entry = it.next(); try { - if (!entry.getValue().initialize()) { + if (!entry.getValue().initializeStateStores()) { log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey()); addToRestoring(entry.getValue()); } else { @@ -272,6 +272,7 @@ private void addToRestoring(final T task) { private void transitionToRunning(final T task, final Set<TopicPartition> readyPartitions) { log.debug("transitioning {} {} to running", taskTypeName, task.id()); running.put(task.id(), task); + task.initializeTopology(); for (TopicPartition topicPartition : task.partitions()) { runningByPartition.put(topicPartition, task); if (task.hasStateStores()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 837f6073cdc..1016c1e375f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -62,14 +62,19 @@ } @Override - public boolean initialize() { - initializeStateStores(); + public boolean initializeStateStores() { + initStateStores(); checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed()); processorContext.initialized(); taskInitialized = true; return true; } + @Override + public void initializeTopology() { + //no-op + } + /** * <pre> * - update offset limits diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 7063e746ed7..91f7cf4b52c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -161,16 +161,19 @@ public StreamTask(final TaskId id, } @Override - public boolean initialize() { + public boolean initializeStateStores() { log.trace("Initializing"); - initializeStateStores(); + initStateStores(); + return changelogPartitions().isEmpty(); + } + + @Override + public void initializeTopology() { initTopology(); processorContext.initialized(); taskInitialized = true; - return changelogPartitions().isEmpty(); } - /** * <pre> * - re-initialize the task diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java index f066bffde11..5f221e3dc02 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java @@ -32,7 +32,9 @@ * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition */ - boolean initialize(); + boolean initializeStateStores(); + + void initializeTopology(); void commit(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index bdc1c00153f..62ddacfb33c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -327,6 +327,7 @@ boolean updateNewAndRestoringTasks() { standby.initializeNewTasks(); final Collection<TopicPartition> restored = changelogReader.restore(active); + resumed.addAll(active.updateRestored(restored)); if (!resumed.isEmpty()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index 776110c28db..cf218b21c54 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -101,7 +101,7 @@ public void shouldThrowLockExceptionIfFailedToLockStateDirectoryWhenTopologyHasS final AbstractTask task = createTask(consumer, Collections.singletonMap(store, "dummy")); try { - task.initializeStateStores(); + task.initStateStores(); fail("Should have thrown LockException"); } catch (final LockException e) { // ok @@ -116,7 +116,7 @@ public void shouldNotAttemptToLockIfNoStores() { final AbstractTask task = createTask(consumer, Collections.<StateStore, String>emptyMap()); - task.initializeStateStores(); + task.initStateStores(); // should fail if lock is called EasyMock.verify(stateDirectory); @@ -254,9 +254,12 @@ public void close(final boolean clean, final boolean isZombie) {} public void closeSuspended(final boolean clean, final boolean isZombie, final RuntimeException e) {} @Override - public boolean initialize() { + public boolean initializeStateStores() { return false; } + + @Override + public void initializeTopology() {} }; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java index 3d33b0b7646..4bb7828fe25 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java @@ -89,7 +89,7 @@ public void shouldNotGetPartitionsFromNewTasksWithoutStateStores() { @Test public void shouldInitializeNewTasks() { - EasyMock.expect(t1.initialize()).andReturn(false); + EasyMock.expect(t1.initializeStateStores()).andReturn(false); EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)); EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet()); EasyMock.replay(t1); @@ -101,10 +101,14 @@ public void shouldInitializeNewTasks() { @Test public void shouldMoveInitializedTasksNeedingRestoreToRestoring() { - EasyMock.expect(t1.initialize()).andReturn(false); + EasyMock.expect(t1.initializeStateStores()).andReturn(false); + t1.initializeTopology(); + EasyMock.expectLastCall().once(); EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)); EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet()); - EasyMock.expect(t2.initialize()).andReturn(true); + EasyMock.expect(t2.initializeStateStores()).andReturn(true); + t2.initializeTopology(); + EasyMock.expectLastCall().once(); final Set<TopicPartition> t2partitions = Collections.singleton(tp2); EasyMock.expect(t2.partitions()).andReturn(t2partitions); EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()); @@ -125,7 +129,9 @@ public void shouldMoveInitializedTasksNeedingRestoreToRestoring() { @Test public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() { - EasyMock.expect(t2.initialize()).andReturn(true); + EasyMock.expect(t2.initializeStateStores()).andReturn(true); + t2.initializeTopology(); + EasyMock.expectLastCall().once(); EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2)); EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()); EasyMock.expect(t2.hasStateStores()).andReturn(false); @@ -142,10 +148,12 @@ public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() { @Test public void shouldTransitionFullyRestoredTasksToRunning() { final Set<TopicPartition> task1Partitions = Utils.mkSet(tp1); - EasyMock.expect(t1.initialize()).andReturn(false); + EasyMock.expect(t1.initializeStateStores()).andReturn(false); EasyMock.expect(t1.partitions()).andReturn(task1Partitions).anyTimes(); EasyMock.expect(t1.changelogPartitions()).andReturn(Utils.mkSet(changeLog1, changeLog2)).anyTimes(); EasyMock.expect(t1.hasStateStores()).andReturn(true).anyTimes(); + t1.initializeTopology(); + EasyMock.expectLastCall().once(); EasyMock.replay(t1); addAndInitTask(); @@ -169,7 +177,7 @@ public void shouldSuspendRunningTasks() { @Test public void shouldCloseRestoringTasks() { - EasyMock.expect(t1.initialize()).andReturn(false); + EasyMock.expect(t1.initializeStateStores()).andReturn(false); EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)); EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet()); t1.close(false, false); @@ -236,6 +244,8 @@ public void shouldResumeMatchingSuspendedTasks() { mockRunningTaskSuspension(); t1.resume(); EasyMock.expectLastCall(); + t1.initializeTopology(); + EasyMock.expectLastCall().once(); EasyMock.replay(t1); assertThat(suspendTask(), nullValue()); @@ -266,7 +276,9 @@ public void shouldCloseTaskOnResumeIfTaskMigratedException() { } private void mockTaskInitialization() { - EasyMock.expect(t1.initialize()).andReturn(true); + EasyMock.expect(t1.initializeStateStores()).andReturn(true); + t1.initializeTopology(); + EasyMock.expectLastCall().once(); EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)); EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()); EasyMock.expect(t1.hasStateStores()).andReturn(false); @@ -449,7 +461,9 @@ private RuntimeException suspendTask() { } private void mockRunningTaskSuspension() { - EasyMock.expect(t1.initialize()).andReturn(true); + EasyMock.expect(t1.initializeStateStores()).andReturn(true); + t1.initializeTopology(); + EasyMock.expectLastCall().once(); EasyMock.expect(t1.hasStateStores()).andReturn(false).anyTimes(); EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes(); EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()).anyTimes(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index ce655bc4990..605ab337983 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -155,7 +155,7 @@ public void cleanup() throws IOException { public void testStorePartitions() throws IOException { StreamsConfig config = createConfig(baseDir); StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory); - task.initialize(); + task.initializeStateStores(); assertEquals(Utils.mkSet(partition2, partition1), new HashSet<>(task.checkpointedOffsets().keySet())); } @@ -177,7 +177,7 @@ public void testUpdateNonPersistentStore() throws IOException { public void testUpdate() throws IOException { StreamsConfig config = createConfig(baseDir); StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, consumer, changelogReader, config, null, stateDirectory); - task.initialize(); + task.initializeStateStores(); final Set<TopicPartition> partition = Collections.singleton(partition2); restoreStateConsumer.assign(partition); @@ -223,7 +223,7 @@ public void testUpdateKTable() throws IOException { StreamsConfig config = createConfig(baseDir); StandbyTask task = new StandbyTask(taskId, ktablePartitions, ktableTopology, consumer, changelogReader, config, null, stateDirectory); - task.initialize(); + task.initializeStateStores(); restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet())); for (ConsumerRecord<Integer, Integer> record : Arrays.asList( @@ -344,7 +344,7 @@ public void shouldCheckpointStoreOffsetsOnCommit() throws IOException { null, stateDirectory ); - task.initialize(); + task.initializeStateStores(); restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet())); @@ -396,7 +396,7 @@ void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateExcep closedStateManager.set(true); } }; - task.initialize(); + task.initializeStateStores(); try { task.close(true, false); fail("should have thrown exception"); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index a3ff328d078..92cfe66c3f6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -318,7 +318,8 @@ public void testPauseResume() { @Test public void testMaybePunctuateStreamTime() { task = createStatelessTask(false); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); task.addRecords(partition1, records( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), @@ -384,7 +385,8 @@ public void testMaybePunctuateStreamTime() { @Test public void testCancelPunctuateStreamTime() { task = createStatelessTask(false); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); task.addRecords(partition1, records( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue), @@ -416,7 +418,8 @@ public void testCancelPunctuateStreamTime() { @Test public void shouldPunctuateSystemTimeWhenIntervalElapsed() { task = createStatelessTask(false); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); long now = time.milliseconds(); time.sleep(10); assertTrue(task.maybePunctuateSystemTime()); @@ -430,7 +433,8 @@ public void shouldPunctuateSystemTimeWhenIntervalElapsed() { @Test public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() { task = createStatelessTask(false); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); long now = time.milliseconds(); assertTrue(task.maybePunctuateSystemTime()); // first time we always punctuate time.sleep(9); @@ -441,7 +445,8 @@ public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() { @Test public void testCancelPunctuateSystemTime() { task = createStatelessTask(false); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); long now = time.milliseconds(); time.sleep(10); assertTrue(task.maybePunctuateSystemTime()); @@ -454,7 +459,8 @@ public void testCancelPunctuateSystemTime() { @Test public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() { task = createTaskThatThrowsException(); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); task.addRecords(partition2, Collections.singletonList( new ConsumerRecord<>(partition2.topic(), partition2.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); @@ -469,7 +475,8 @@ public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() { @Test public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() { task = createStatelessTask(false); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); try { task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, new Punctuator() { @@ -489,7 +496,8 @@ public void punctuate(long timestamp) { @Test public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime() { task = createStatelessTask(false); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); try { task.punctuate(processorSystemTime, 1, PunctuationType.WALL_CLOCK_TIME, new Punctuator() { @@ -531,7 +539,8 @@ public void flush() { @Test public void shouldCheckpointOffsetsOnCommit() throws IOException { task = createStatefulTask(false, true); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); task.commit(); final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId00), ProcessorStateManager.CHECKPOINT_FILE_NAME)); @@ -542,7 +551,8 @@ public void shouldCheckpointOffsetsOnCommit() throws IOException { @Test public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() { task = createStatefulTask(true, true); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); task.commit(); final File checkpointFile = new File(stateDirectory.directoryForTask(taskId00), ProcessorStateManager.CHECKPOINT_FILE_NAME); @@ -553,7 +563,8 @@ public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() { @Test public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() { task = createStatelessTask(false); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); task.processorContext.setCurrentNode(processorStreamTime); try { task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator); @@ -566,7 +577,8 @@ public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateC @Test public void shouldCallPunctuateOnPassedInProcessorNode() { task = createStatelessTask(false); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator); assertThat(punctuatedAt, equalTo(5L)); task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, punctuator); @@ -576,7 +588,8 @@ public void shouldCallPunctuateOnPassedInProcessorNode() { @Test public void shouldSetProcessorNodeOnContextBackToNullAfterSuccesfullPunctuate() { task = createStatelessTask(false); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, punctuator); assertThat(((ProcessorContextImpl) task.context()).currentNode(), nullValue()); } @@ -607,7 +620,8 @@ public void punctuate(long timestamp) { @Test public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology() { task = createTaskThatThrowsException(); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); try { task.close(true, false); fail("should have thrown runtime exception"); @@ -760,7 +774,8 @@ public void shouldCloseProducerOnCloseWhenEosEnabled() { @Test public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushing() { task = createTaskThatThrowsException(); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); try { task.commit(); @@ -774,7 +789,8 @@ public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushing() { public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() { final StreamTask task = createTaskThatThrowsException(); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); try { task.suspend(); fail("should have thrown an exception"); @@ -786,7 +802,8 @@ public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() @Test public void shouldCloseStateManagerIfFailureOnTaskClose() { task = createStatefulTaskThatThrowsExceptionOnClose(true, false); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); try { task.close(true, false); @@ -813,14 +830,14 @@ public void shouldNotCloseTopologyProcessorNodesIfNotInitialized() { public void shouldBeInitializedIfChangelogPartitionsIsEmpty() { final StreamTask task = createStatefulTask(false, false); - assertTrue(task.initialize()); + assertTrue(task.initializeStateStores()); } @Test public void shouldNotBeInitializedIfChangelogPartitionsIsNonEmpty() { final StreamTask task = createStatefulTask(false, true); - assertFalse(task.initialize()); + assertFalse(task.initializeStateStores()); } @Test @@ -840,7 +857,8 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() { task = new StreamTask(taskId00, Utils.mkSet(partition1, repartition), topology, consumer, changelogReader, config, streamsMetrics, stateDirectory, null, time, producer); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); task.addRecords(partition1, Collections.singletonList( new ConsumerRecord<>(partition1.topic(), partition1.partition(), 5L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 9821e4c98b4..85c282ca461 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -103,12 +103,12 @@ public void before() throws IOException { stateDirectory = new StateDirectory(streamsConfig, new MockTime()); taskOne = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology, new TaskId(0, 0)); - taskOne.initialize(); + taskOne.initializeStateStores(); tasks.put(new TaskId(0, 0), taskOne); taskTwo = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology, new TaskId(0, 1)); - taskTwo.initialize(); + taskTwo.initializeStateStores(); tasks.put(new TaskId(0, 1), taskTwo); diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index ae0cc9cb909..e5f15bcd482 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -248,7 +248,8 @@ public ProcessorTopologyTestDriver(final StreamsConfig config, cache, new MockTime(), producer); - task.initialize(); + task.initializeStateStores(); + task.initializeTopology(); } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Have State Stores Restore Before Initializing Toplogy > ----------------------------------------------------- > > Key: KAFKA-6205 > URL: https://issues.apache.org/jira/browse/KAFKA-6205 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 1.0.0, 0.11.0.2 > Reporter: Bill Bejeck > Assignee: Bill Bejeck > Priority: Major > Fix For: 1.0.1, 0.11.0.3 > > > Streams should restore state stores (if needed) before initializing the > topology. -- This message was sent by Atlassian JIRA (v7.6.3#76005)