[ 
https://issues.apache.org/jira/browse/KAFKA-6205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341331#comment-16341331
 ] 

ASF GitHub Bot commented on KAFKA-6205:
---------------------------------------

guozhangwang closed pull request #4415: KAFKA-6205: initialize topology after 
state stores restoration completed
URL: https://github.com/apache/kafka/pull/4415
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 2b8af6ddb48..fe7574ff403 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -199,7 +199,7 @@ void flushState() {
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
      */
-    void initializeStateStores() {
+    void initStateStores() {
         if (topology.stateStores().isEmpty()) {
             return;
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index b90ec10dafd..2cd82f461dd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -99,7 +99,7 @@ void addNewTask(final T task) {
         for (final Iterator<Map.Entry<TaskId, T>> it = 
created.entrySet().iterator(); it.hasNext(); ) {
             final Map.Entry<TaskId, T> entry = it.next();
             try {
-                if (!entry.getValue().initialize()) {
+                if (!entry.getValue().initializeStateStores()) {
                     log.debug("Transitioning {} {} to restoring", 
taskTypeName, entry.getKey());
                     addToRestoring(entry.getValue());
                 } else {
@@ -272,6 +272,7 @@ private void addToRestoring(final T task) {
     private void transitionToRunning(final T task, final Set<TopicPartition> 
readyPartitions) {
         log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
+        task.initializeTopology();
         for (TopicPartition topicPartition : task.partitions()) {
             runningByPartition.put(topicPartition, task);
             if (task.hasStateStores()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 837f6073cdc..1016c1e375f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -62,14 +62,19 @@
     }
 
     @Override
-    public boolean initialize() {
-        initializeStateStores();
+    public boolean initializeStateStores() {
+        initStateStores();
         checkpointedOffsets = 
Collections.unmodifiableMap(stateMgr.checkpointed());
         processorContext.initialized();
         taskInitialized = true;
         return true;
     }
 
+    @Override
+    public void initializeTopology() {
+        //no-op
+    }
+
     /**
      * <pre>
      * - update offset limits
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 7063e746ed7..91f7cf4b52c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -161,16 +161,19 @@ public StreamTask(final TaskId id,
     }
 
     @Override
-    public boolean initialize() {
+    public boolean initializeStateStores() {
         log.trace("Initializing");
-        initializeStateStores();
+        initStateStores();
+        return changelogPartitions().isEmpty();
+    }
+
+    @Override
+    public void initializeTopology() {
         initTopology();
         processorContext.initialized();
         taskInitialized = true;
-        return changelogPartitions().isEmpty();
     }
 
-
     /**
      * <pre>
      * - re-initialize the task
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index f066bffde11..5f221e3dc02 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -32,7 +32,9 @@
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
      */
-    boolean initialize();
+    boolean initializeStateStores();
+
+    void initializeTopology();
 
     void commit();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index bdc1c00153f..62ddacfb33c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -327,6 +327,7 @@ boolean updateNewAndRestoringTasks() {
         standby.initializeNewTasks();
 
         final Collection<TopicPartition> restored = 
changelogReader.restore(active);
+
         resumed.addAll(active.updateRestored(restored));
 
         if (!resumed.isEmpty()) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 776110c28db..cf218b21c54 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -101,7 +101,7 @@ public void 
shouldThrowLockExceptionIfFailedToLockStateDirectoryWhenTopologyHasS
         final AbstractTask task = createTask(consumer, 
Collections.singletonMap(store, "dummy"));
 
         try {
-            task.initializeStateStores();
+            task.initStateStores();
             fail("Should have thrown LockException");
         } catch (final LockException e) {
             // ok
@@ -116,7 +116,7 @@ public void shouldNotAttemptToLockIfNoStores() {
 
         final AbstractTask task = createTask(consumer, 
Collections.<StateStore, String>emptyMap());
 
-        task.initializeStateStores();
+        task.initStateStores();
 
         // should fail if lock is called
         EasyMock.verify(stateDirectory);
@@ -254,9 +254,12 @@ public void close(final boolean clean, final boolean 
isZombie) {}
             public void closeSuspended(final boolean clean, final boolean 
isZombie, final RuntimeException e) {}
 
             @Override
-            public boolean initialize() {
+            public boolean initializeStateStores() {
                 return false;
             }
+
+            @Override
+            public void initializeTopology() {}
         };
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 3d33b0b7646..4bb7828fe25 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -89,7 +89,7 @@ public void 
shouldNotGetPartitionsFromNewTasksWithoutStateStores() {
 
     @Test
     public void shouldInitializeNewTasks() {
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
         EasyMock.replay(t1);
@@ -101,10 +101,14 @@ public void shouldInitializeNewTasks() {
 
     @Test
     public void shouldMoveInitializedTasksNeedingRestoreToRestoring() {
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
-        EasyMock.expect(t2.initialize()).andReturn(true);
+        EasyMock.expect(t2.initializeStateStores()).andReturn(true);
+        t2.initializeTopology();
+        EasyMock.expectLastCall().once();
         final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
         EasyMock.expect(t2.partitions()).andReturn(t2partitions);
         
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
@@ -125,7 +129,9 @@ public void 
shouldMoveInitializedTasksNeedingRestoreToRestoring() {
 
     @Test
     public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() {
-        EasyMock.expect(t2.initialize()).andReturn(true);
+        EasyMock.expect(t2.initializeStateStores()).andReturn(true);
+        t2.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
         
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
         EasyMock.expect(t2.hasStateStores()).andReturn(false);
@@ -142,10 +148,12 @@ public void 
shouldMoveInitializedTasksThatDontNeedRestoringToRunning() {
     @Test
     public void shouldTransitionFullyRestoredTasksToRunning() {
         final Set<TopicPartition> task1Partitions = Utils.mkSet(tp1);
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(task1Partitions).anyTimes();
         
EasyMock.expect(t1.changelogPartitions()).andReturn(Utils.mkSet(changeLog1, 
changeLog2)).anyTimes();
         EasyMock.expect(t1.hasStateStores()).andReturn(true).anyTimes();
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.replay(t1);
 
         addAndInitTask();
@@ -169,7 +177,7 @@ public void shouldSuspendRunningTasks() {
 
     @Test
     public void shouldCloseRestoringTasks() {
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
         t1.close(false, false);
@@ -236,6 +244,8 @@ public void shouldResumeMatchingSuspendedTasks() {
         mockRunningTaskSuspension();
         t1.resume();
         EasyMock.expectLastCall();
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.replay(t1);
 
         assertThat(suspendTask(), nullValue());
@@ -266,7 +276,9 @@ public void 
shouldCloseTaskOnResumeIfTaskMigratedException() {
     }
 
     private void mockTaskInitialization() {
-        EasyMock.expect(t1.initialize()).andReturn(true);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(true);
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
         EasyMock.expect(t1.hasStateStores()).andReturn(false);
@@ -449,7 +461,9 @@ private RuntimeException suspendTask() {
     }
 
     private void mockRunningTaskSuspension() {
-        EasyMock.expect(t1.initialize()).andReturn(true);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(true);
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t1.hasStateStores()).andReturn(false).anyTimes();
         
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
         
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()).anyTimes();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index ce655bc4990..605ab337983 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -155,7 +155,7 @@ public void cleanup() throws IOException {
     public void testStorePartitions() throws IOException {
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, 
consumer, changelogReader, config, null, stateDirectory);
-        task.initialize();
+        task.initializeStateStores();
         assertEquals(Utils.mkSet(partition2, partition1), new 
HashSet<>(task.checkpointedOffsets().keySet()));
     }
 
@@ -177,7 +177,7 @@ public void testUpdateNonPersistentStore() throws 
IOException {
     public void testUpdate() throws IOException {
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, 
consumer, changelogReader, config, null, stateDirectory);
-        task.initialize();
+        task.initializeStateStores();
         final Set<TopicPartition> partition = 
Collections.singleton(partition2);
         restoreStateConsumer.assign(partition);
 
@@ -223,7 +223,7 @@ public void testUpdateKTable() throws IOException {
 
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, ktablePartitions, 
ktableTopology, consumer, changelogReader, config, null, stateDirectory);
-        task.initialize();
+        task.initializeStateStores();
         restoreStateConsumer.assign(new 
ArrayList<>(task.checkpointedOffsets().keySet()));
 
         for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
@@ -344,7 +344,7 @@ public void shouldCheckpointStoreOffsetsOnCommit() throws 
IOException {
                                                  null,
                                                  stateDirectory
         );
-        task.initialize();
+        task.initializeStateStores();
 
 
         restoreStateConsumer.assign(new 
ArrayList<>(task.checkpointedOffsets().keySet()));
@@ -396,7 +396,7 @@ void closeStateManager(final boolean writeCheckpoint) 
throws ProcessorStateExcep
                 closedStateManager.set(true);
             }
         };
-        task.initialize();
+        task.initializeStateStores();
         try {
             task.close(true, false);
             fail("should have thrown exception");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a3ff328d078..92cfe66c3f6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -318,7 +318,8 @@ public void testPauseResume() {
     @Test
     public void testMaybePunctuateStreamTime() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue),
@@ -384,7 +385,8 @@ public void testMaybePunctuateStreamTime() {
     @Test
     public void testCancelPunctuateStreamTime() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue),
@@ -416,7 +418,8 @@ public void testCancelPunctuateStreamTime() {
     @Test
     public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         long now = time.milliseconds();
         time.sleep(10);
         assertTrue(task.maybePunctuateSystemTime());
@@ -430,7 +433,8 @@ public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
     @Test
     public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         long now = time.milliseconds();
         assertTrue(task.maybePunctuateSystemTime()); // first time we always 
punctuate
         time.sleep(9);
@@ -441,7 +445,8 @@ public void 
shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
     @Test
     public void testCancelPunctuateSystemTime() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         long now = time.milliseconds();
         time.sleep(10);
         assertTrue(task.maybePunctuateSystemTime());
@@ -454,7 +459,8 @@ public void testCancelPunctuateSystemTime() {
     @Test
     public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() {
         task = createTaskThatThrowsException();
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.addRecords(partition2, Collections.singletonList(
                 new ConsumerRecord<>(partition2.topic(), 
partition2.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue)));
 
@@ -469,7 +475,8 @@ public void 
shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() {
     @Test
     public void 
shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime()
 {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         try {
             task.punctuate(processorStreamTime, 1, 
PunctuationType.STREAM_TIME, new Punctuator() {
@@ -489,7 +496,8 @@ public void punctuate(long timestamp) {
     @Test
     public void 
shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime()
 {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         try {
             task.punctuate(processorSystemTime, 1, 
PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
@@ -531,7 +539,8 @@ public void flush() {
     @Test
     public void shouldCheckpointOffsetsOnCommit() throws IOException {
         task = createStatefulTask(false, true);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.commit();
         final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new 
File(stateDirectory.directoryForTask(taskId00),
                                                                           
ProcessorStateManager.CHECKPOINT_FILE_NAME));
@@ -542,7 +551,8 @@ public void shouldCheckpointOffsetsOnCommit() throws 
IOException {
     @Test
     public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
         task = createStatefulTask(true, true);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.commit();
         final File checkpointFile = new 
File(stateDirectory.directoryForTask(taskId00),
                                              
ProcessorStateManager.CHECKPOINT_FILE_NAME);
@@ -553,7 +563,8 @@ public void 
shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
     @Test
     public void 
shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.processorContext.setCurrentNode(processorStreamTime);
         try {
             task.punctuate(processorStreamTime, 10, 
PunctuationType.STREAM_TIME, punctuator);
@@ -566,7 +577,8 @@ public void 
shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateC
     @Test
     public void shouldCallPunctuateOnPassedInProcessorNode() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, 
punctuator);
         assertThat(punctuatedAt, equalTo(5L));
         task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, 
punctuator);
@@ -576,7 +588,8 @@ public void shouldCallPunctuateOnPassedInProcessorNode() {
     @Test
     public void 
shouldSetProcessorNodeOnContextBackToNullAfterSuccesfullPunctuate() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, 
punctuator);
         assertThat(((ProcessorContextImpl) task.context()).currentNode(), 
nullValue());
     }
@@ -607,7 +620,8 @@ public void punctuate(long timestamp) {
     @Test
     public void 
shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology()
 {
         task = createTaskThatThrowsException();
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         try {
             task.close(true, false);
             fail("should have thrown runtime exception");
@@ -760,7 +774,8 @@ public void shouldCloseProducerOnCloseWhenEosEnabled() {
     @Test
     public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushing() 
{
         task = createTaskThatThrowsException();
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         try {
             task.commit();
@@ -774,7 +789,8 @@ public void 
shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushing() {
     public void 
shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() {
         final StreamTask task = createTaskThatThrowsException();
 
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         try {
             task.suspend();
             fail("should have thrown an exception");
@@ -786,7 +802,8 @@ public void 
shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension()
     @Test
     public void shouldCloseStateManagerIfFailureOnTaskClose() {
         task = createStatefulTaskThatThrowsExceptionOnClose(true, false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         try {
             task.close(true, false);
@@ -813,14 +830,14 @@ public void 
shouldNotCloseTopologyProcessorNodesIfNotInitialized() {
     public void shouldBeInitializedIfChangelogPartitionsIsEmpty() {
         final StreamTask task = createStatefulTask(false, false);
 
-        assertTrue(task.initialize());
+        assertTrue(task.initializeStateStores());
     }
 
     @Test
     public void shouldNotBeInitializedIfChangelogPartitionsIsNonEmpty() {
         final StreamTask task = createStatefulTask(false, true);
 
-        assertFalse(task.initialize());
+        assertFalse(task.initializeStateStores());
     }
 
     @Test
@@ -840,7 +857,8 @@ public void 
shouldReturnOffsetsForRepartitionTopicsForPurging() {
 
         task = new StreamTask(taskId00, Utils.mkSet(partition1, repartition), 
topology, consumer, changelogReader, config,
                 streamsMetrics, stateDirectory, null, time, producer);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
                 new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 5L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue)));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 9821e4c98b4..85c282ca461 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -103,12 +103,12 @@ public void before() throws IOException {
         stateDirectory = new StateDirectory(streamsConfig, new MockTime());
         taskOne = createStreamsTask(applicationId, streamsConfig, 
clientSupplier, topology,
                                     new TaskId(0, 0));
-        taskOne.initialize();
+        taskOne.initializeStateStores();
         tasks.put(new TaskId(0, 0),
                   taskOne);
         taskTwo = createStreamsTask(applicationId, streamsConfig, 
clientSupplier, topology,
                                     new TaskId(0, 1));
-        taskTwo.initialize();
+        taskTwo.initializeStateStores();
         tasks.put(new TaskId(0, 1),
                   taskTwo);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index ae0cc9cb909..e5f15bcd482 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -248,7 +248,8 @@ public ProcessorTopologyTestDriver(final StreamsConfig 
config,
                                   cache,
                                   new MockTime(),
                                   producer);
-            task.initialize();
+            task.initializeStateStores();
+            task.initializeTopology();
         }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Have State Stores Restore Before Initializing Toplogy
> -----------------------------------------------------
>
>                 Key: KAFKA-6205
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6205
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 1.0.0, 0.11.0.2
>            Reporter: Bill Bejeck
>            Assignee: Bill Bejeck
>            Priority: Major
>             Fix For: 1.0.1, 0.11.0.3
>
>
> Streams should restore state stores (if needed) before initializing the 
> topology.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to