This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c615c59  KAFKA-6205: initialize topology after state stores 
restoration completed
c615c59 is described below

commit c615c597aa24cc5a4552102f1b6d27fcf4e24d12
Author: Bill Bejeck <[email protected]>
AuthorDate: Fri Jan 26 09:24:40 2018 -0800

    KAFKA-6205: initialize topology after state stores restoration completed
    
    Initialize topology after state store restoration.
    Although IMHO updating some of the existing tests demonstrates the correct 
order of operations, I'll probably add an integration test, but I wanted to get 
this PR in for feedback on the approach.
    
    Author: Bill Bejeck <[email protected]>
    
    Reviewers: Guozhang Wang <[email protected]>, Matthias J. Sax 
<[email protected]>
    
    Closes #4415 from 
bbejeck/KAFKA-6205_restore_state_stores_before_initializing_topology
    
    minor log4j edits
---
 .../streams/processor/internals/AbstractTask.java  |  5 +-
 .../streams/processor/internals/AssignedTasks.java |  3 +-
 .../streams/processor/internals/StandbyTask.java   | 10 +++-
 .../streams/processor/internals/StreamTask.java    | 13 +++--
 .../kafka/streams/processor/internals/Task.java    |  4 +-
 .../streams/processor/internals/TaskManager.java   |  1 +
 .../processor/internals/AbstractTaskTest.java      |  9 ++--
 .../internals/AssignedStreamsTasksTest.java        | 30 ++++++++---
 .../processor/internals/StandbyTaskTest.java       | 10 ++--
 .../processor/internals/StreamTaskTest.java        | 58 ++++++++++++++--------
 .../StreamThreadStateStoreProviderTest.java        |  4 +-
 .../kafka/test/ProcessorTopologyTestDriver.java    |  3 +-
 12 files changed, 100 insertions(+), 50 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 2b8af6d..d9c827f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -196,10 +196,11 @@ public abstract class AbstractTask implements Task {
     }
 
     /**
-     * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
+     *
+     * Package-private for testing only
      */
-    void initializeStateStores() {
+    void registerStateStores() {
         if (topology.stateStores().isEmpty()) {
             return;
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index b90ec10..2cd82f4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -99,7 +99,7 @@ abstract class AssignedTasks<T extends Task> {
         for (final Iterator<Map.Entry<TaskId, T>> it = 
created.entrySet().iterator(); it.hasNext(); ) {
             final Map.Entry<TaskId, T> entry = it.next();
             try {
-                if (!entry.getValue().initialize()) {
+                if (!entry.getValue().initializeStateStores()) {
                     log.debug("Transitioning {} {} to restoring", 
taskTypeName, entry.getKey());
                     addToRestoring(entry.getValue());
                 } else {
@@ -272,6 +272,7 @@ abstract class AssignedTasks<T extends Task> {
     private void transitionToRunning(final T task, final Set<TopicPartition> 
readyPartitions) {
         log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
+        task.initializeTopology();
         for (TopicPartition topicPartition : task.partitions()) {
             runningByPartition.put(topicPartition, task);
             if (task.hasStateStores()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 837f607..39d34d7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -62,14 +62,20 @@ public class StandbyTask extends AbstractTask {
     }
 
     @Override
-    public boolean initialize() {
-        initializeStateStores();
+    public boolean initializeStateStores() {
+        log.trace("Initializing state stores");
+        registerStateStores();
         checkpointedOffsets = 
Collections.unmodifiableMap(stateMgr.checkpointed());
         processorContext.initialized();
         taskInitialized = true;
         return true;
     }
 
+    @Override
+    public void initializeTopology() {
+        //no-op
+    }
+
     /**
      * <pre>
      * - update offset limits
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 7063e74..e293f25 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -161,16 +161,19 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator
     }
 
     @Override
-    public boolean initialize() {
-        log.trace("Initializing");
-        initializeStateStores();
+    public boolean initializeStateStores() {
+        log.trace("Initializing state stores");
+        registerStateStores();
+        return changelogPartitions().isEmpty();
+    }
+
+    @Override
+    public void initializeTopology() {
         initTopology();
         processorContext.initialized();
         taskInitialized = true;
-        return changelogPartitions().isEmpty();
     }
 
-
     /**
      * <pre>
      * - re-initialize the task
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index f066bff..5f221e3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -32,7 +32,9 @@ public interface Task {
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
      */
-    boolean initialize();
+    boolean initializeStateStores();
+
+    void initializeTopology();
 
     void commit();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index bdc1c00..62ddacf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -327,6 +327,7 @@ class TaskManager {
         standby.initializeNewTasks();
 
         final Collection<TopicPartition> restored = 
changelogReader.restore(active);
+
         resumed.addAll(active.updateRestored(restored));
 
         if (!resumed.isEmpty()) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 776110c..4569858 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -101,7 +101,7 @@ public class AbstractTaskTest {
         final AbstractTask task = createTask(consumer, 
Collections.singletonMap(store, "dummy"));
 
         try {
-            task.initializeStateStores();
+            task.registerStateStores();
             fail("Should have thrown LockException");
         } catch (final LockException e) {
             // ok
@@ -116,7 +116,7 @@ public class AbstractTaskTest {
 
         final AbstractTask task = createTask(consumer, 
Collections.<StateStore, String>emptyMap());
 
-        task.initializeStateStores();
+        task.registerStateStores();
 
         // should fail if lock is called
         EasyMock.verify(stateDirectory);
@@ -254,9 +254,12 @@ public class AbstractTaskTest {
             public void closeSuspended(final boolean clean, final boolean 
isZombie, final RuntimeException e) {}
 
             @Override
-            public boolean initialize() {
+            public boolean initializeStateStores() {
                 return false;
             }
+
+            @Override
+            public void initializeTopology() {}
         };
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 3d33b0b..4bb7828 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -89,7 +89,7 @@ public class AssignedStreamsTasksTest {
 
     @Test
     public void shouldInitializeNewTasks() {
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
         EasyMock.replay(t1);
@@ -101,10 +101,14 @@ public class AssignedStreamsTasksTest {
 
     @Test
     public void shouldMoveInitializedTasksNeedingRestoreToRestoring() {
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
-        EasyMock.expect(t2.initialize()).andReturn(true);
+        EasyMock.expect(t2.initializeStateStores()).andReturn(true);
+        t2.initializeTopology();
+        EasyMock.expectLastCall().once();
         final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
         EasyMock.expect(t2.partitions()).andReturn(t2partitions);
         
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
@@ -125,7 +129,9 @@ public class AssignedStreamsTasksTest {
 
     @Test
     public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() {
-        EasyMock.expect(t2.initialize()).andReturn(true);
+        EasyMock.expect(t2.initializeStateStores()).andReturn(true);
+        t2.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
         
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
         EasyMock.expect(t2.hasStateStores()).andReturn(false);
@@ -142,10 +148,12 @@ public class AssignedStreamsTasksTest {
     @Test
     public void shouldTransitionFullyRestoredTasksToRunning() {
         final Set<TopicPartition> task1Partitions = Utils.mkSet(tp1);
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(task1Partitions).anyTimes();
         
EasyMock.expect(t1.changelogPartitions()).andReturn(Utils.mkSet(changeLog1, 
changeLog2)).anyTimes();
         EasyMock.expect(t1.hasStateStores()).andReturn(true).anyTimes();
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.replay(t1);
 
         addAndInitTask();
@@ -169,7 +177,7 @@ public class AssignedStreamsTasksTest {
 
     @Test
     public void shouldCloseRestoringTasks() {
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
         t1.close(false, false);
@@ -236,6 +244,8 @@ public class AssignedStreamsTasksTest {
         mockRunningTaskSuspension();
         t1.resume();
         EasyMock.expectLastCall();
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.replay(t1);
 
         assertThat(suspendTask(), nullValue());
@@ -266,7 +276,9 @@ public class AssignedStreamsTasksTest {
     }
 
     private void mockTaskInitialization() {
-        EasyMock.expect(t1.initialize()).andReturn(true);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(true);
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
         EasyMock.expect(t1.hasStateStores()).andReturn(false);
@@ -449,7 +461,9 @@ public class AssignedStreamsTasksTest {
     }
 
     private void mockRunningTaskSuspension() {
-        EasyMock.expect(t1.initialize()).andReturn(true);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(true);
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t1.hasStateStores()).andReturn(false).anyTimes();
         
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
         
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()).anyTimes();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index ce655bc..605ab33 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -155,7 +155,7 @@ public class StandbyTaskTest {
     public void testStorePartitions() throws IOException {
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, 
consumer, changelogReader, config, null, stateDirectory);
-        task.initialize();
+        task.initializeStateStores();
         assertEquals(Utils.mkSet(partition2, partition1), new 
HashSet<>(task.checkpointedOffsets().keySet()));
     }
 
@@ -177,7 +177,7 @@ public class StandbyTaskTest {
     public void testUpdate() throws IOException {
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, topicPartitions, topology, 
consumer, changelogReader, config, null, stateDirectory);
-        task.initialize();
+        task.initializeStateStores();
         final Set<TopicPartition> partition = 
Collections.singleton(partition2);
         restoreStateConsumer.assign(partition);
 
@@ -223,7 +223,7 @@ public class StandbyTaskTest {
 
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, ktablePartitions, 
ktableTopology, consumer, changelogReader, config, null, stateDirectory);
-        task.initialize();
+        task.initializeStateStores();
         restoreStateConsumer.assign(new 
ArrayList<>(task.checkpointedOffsets().keySet()));
 
         for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
@@ -344,7 +344,7 @@ public class StandbyTaskTest {
                                                  null,
                                                  stateDirectory
         );
-        task.initialize();
+        task.initializeStateStores();
 
 
         restoreStateConsumer.assign(new 
ArrayList<>(task.checkpointedOffsets().keySet()));
@@ -396,7 +396,7 @@ public class StandbyTaskTest {
                 closedStateManager.set(true);
             }
         };
-        task.initialize();
+        task.initializeStateStores();
         try {
             task.close(true, false);
             fail("should have thrown exception");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a3ff328..92cfe66 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -318,7 +318,8 @@ public class StreamTaskTest {
     @Test
     public void testMaybePunctuateStreamTime() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue),
@@ -384,7 +385,8 @@ public class StreamTaskTest {
     @Test
     public void testCancelPunctuateStreamTime() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         task.addRecords(partition1, records(
                 new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue),
@@ -416,7 +418,8 @@ public class StreamTaskTest {
     @Test
     public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         long now = time.milliseconds();
         time.sleep(10);
         assertTrue(task.maybePunctuateSystemTime());
@@ -430,7 +433,8 @@ public class StreamTaskTest {
     @Test
     public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         long now = time.milliseconds();
         assertTrue(task.maybePunctuateSystemTime()); // first time we always 
punctuate
         time.sleep(9);
@@ -441,7 +445,8 @@ public class StreamTaskTest {
     @Test
     public void testCancelPunctuateSystemTime() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         long now = time.milliseconds();
         time.sleep(10);
         assertTrue(task.maybePunctuateSystemTime());
@@ -454,7 +459,8 @@ public class StreamTaskTest {
     @Test
     public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() {
         task = createTaskThatThrowsException();
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.addRecords(partition2, Collections.singletonList(
                 new ConsumerRecord<>(partition2.topic(), 
partition2.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue)));
 
@@ -469,7 +475,8 @@ public class StreamTaskTest {
     @Test
     public void 
shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime()
 {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         try {
             task.punctuate(processorStreamTime, 1, 
PunctuationType.STREAM_TIME, new Punctuator() {
@@ -489,7 +496,8 @@ public class StreamTaskTest {
     @Test
     public void 
shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime()
 {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         try {
             task.punctuate(processorSystemTime, 1, 
PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
@@ -531,7 +539,8 @@ public class StreamTaskTest {
     @Test
     public void shouldCheckpointOffsetsOnCommit() throws IOException {
         task = createStatefulTask(false, true);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.commit();
         final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new 
File(stateDirectory.directoryForTask(taskId00),
                                                                           
ProcessorStateManager.CHECKPOINT_FILE_NAME));
@@ -542,7 +551,8 @@ public class StreamTaskTest {
     @Test
     public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
         task = createStatefulTask(true, true);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.commit();
         final File checkpointFile = new 
File(stateDirectory.directoryForTask(taskId00),
                                              
ProcessorStateManager.CHECKPOINT_FILE_NAME);
@@ -553,7 +563,8 @@ public class StreamTaskTest {
     @Test
     public void 
shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.processorContext.setCurrentNode(processorStreamTime);
         try {
             task.punctuate(processorStreamTime, 10, 
PunctuationType.STREAM_TIME, punctuator);
@@ -566,7 +577,8 @@ public class StreamTaskTest {
     @Test
     public void shouldCallPunctuateOnPassedInProcessorNode() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, 
punctuator);
         assertThat(punctuatedAt, equalTo(5L));
         task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME, 
punctuator);
@@ -576,7 +588,8 @@ public class StreamTaskTest {
     @Test
     public void 
shouldSetProcessorNodeOnContextBackToNullAfterSuccesfullPunctuate() {
         task = createStatelessTask(false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME, 
punctuator);
         assertThat(((ProcessorContextImpl) task.context()).currentNode(), 
nullValue());
     }
@@ -607,7 +620,8 @@ public class StreamTaskTest {
     @Test
     public void 
shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology()
 {
         task = createTaskThatThrowsException();
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         try {
             task.close(true, false);
             fail("should have thrown runtime exception");
@@ -760,7 +774,8 @@ public class StreamTaskTest {
     @Test
     public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushing() 
{
         task = createTaskThatThrowsException();
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         try {
             task.commit();
@@ -774,7 +789,8 @@ public class StreamTaskTest {
     public void 
shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() {
         final StreamTask task = createTaskThatThrowsException();
 
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         try {
             task.suspend();
             fail("should have thrown an exception");
@@ -786,7 +802,8 @@ public class StreamTaskTest {
     @Test
     public void shouldCloseStateManagerIfFailureOnTaskClose() {
         task = createStatefulTaskThatThrowsExceptionOnClose(true, false);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         try {
             task.close(true, false);
@@ -813,14 +830,14 @@ public class StreamTaskTest {
     public void shouldBeInitializedIfChangelogPartitionsIsEmpty() {
         final StreamTask task = createStatefulTask(false, false);
 
-        assertTrue(task.initialize());
+        assertTrue(task.initializeStateStores());
     }
 
     @Test
     public void shouldNotBeInitializedIfChangelogPartitionsIsNonEmpty() {
         final StreamTask task = createStatefulTask(false, true);
 
-        assertFalse(task.initialize());
+        assertFalse(task.initializeStateStores());
     }
 
     @Test
@@ -840,7 +857,8 @@ public class StreamTaskTest {
 
         task = new StreamTask(taskId00, Utils.mkSet(partition1, repartition), 
topology, consumer, changelogReader, config,
                 streamsMetrics, stateDirectory, null, time, producer);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
 
         task.addRecords(partition1, Collections.singletonList(
                 new ConsumerRecord<>(partition1.topic(), 
partition1.partition(), 5L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, 
recordValue)));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 9821e4c..85c282c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -103,12 +103,12 @@ public class StreamThreadStateStoreProviderTest {
         stateDirectory = new StateDirectory(streamsConfig, new MockTime());
         taskOne = createStreamsTask(applicationId, streamsConfig, 
clientSupplier, topology,
                                     new TaskId(0, 0));
-        taskOne.initialize();
+        taskOne.initializeStateStores();
         tasks.put(new TaskId(0, 0),
                   taskOne);
         taskTwo = createStreamsTask(applicationId, streamsConfig, 
clientSupplier, topology,
                                     new TaskId(0, 1));
-        taskTwo.initialize();
+        taskTwo.initializeStateStores();
         tasks.put(new TaskId(0, 1),
                   taskTwo);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index ae0cc9c..e5f15bc 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -248,7 +248,8 @@ public class ProcessorTopologyTestDriver {
                                   cache,
                                   new MockTime(),
                                   producer);
-            task.initialize();
+            task.initializeStateStores();
+            task.initializeTopology();
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to