This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c615c59 KAFKA-6205: initialize topology after state stores
restoration completed
c615c59 is described below
commit c615c597aa24cc5a4552102f1b6d27fcf4e24d12
Author: Bill Bejeck <[email protected]>
AuthorDate: Fri Jan 26 09:24:40 2018 -0800
KAFKA-6205: initialize topology after state stores restoration completed
Initialize topology after state store restoration.
Although IMHO updating some of the existing tests demonstrates the correct
order of operations, I'll probably add an integration test, but I wanted to get
this PR in for feedback on the approach.
Author: Bill Bejeck <[email protected]>
Reviewers: Guozhang Wang <[email protected]>, Matthias J. Sax
<[email protected]>
Closes #4415 from
bbejeck/KAFKA-6205_restore_state_stores_before_initializing_topology
minor log4j edits
---
.../streams/processor/internals/AbstractTask.java | 5 +-
.../streams/processor/internals/AssignedTasks.java | 3 +-
.../streams/processor/internals/StandbyTask.java | 10 +++-
.../streams/processor/internals/StreamTask.java | 13 +++--
.../kafka/streams/processor/internals/Task.java | 4 +-
.../streams/processor/internals/TaskManager.java | 1 +
.../processor/internals/AbstractTaskTest.java | 9 ++--
.../internals/AssignedStreamsTasksTest.java | 30 ++++++++---
.../processor/internals/StandbyTaskTest.java | 10 ++--
.../processor/internals/StreamTaskTest.java | 58 ++++++++++++++--------
.../StreamThreadStateStoreProviderTest.java | 4 +-
.../kafka/test/ProcessorTopologyTestDriver.java | 3 +-
12 files changed, 100 insertions(+), 50 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 2b8af6d..d9c827f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -196,10 +196,11 @@ public abstract class AbstractTask implements Task {
}
/**
- * @throws IllegalStateException If store gets registered after
initialized is already finished
* @throws StreamsException if the store's change log does not contain the
partition
+ *
+ * Package-private for testing only
*/
- void initializeStateStores() {
+ void registerStateStores() {
if (topology.stateStores().isEmpty()) {
return;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index b90ec10..2cd82f4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -99,7 +99,7 @@ abstract class AssignedTasks<T extends Task> {
for (final Iterator<Map.Entry<TaskId, T>> it =
created.entrySet().iterator(); it.hasNext(); ) {
final Map.Entry<TaskId, T> entry = it.next();
try {
- if (!entry.getValue().initialize()) {
+ if (!entry.getValue().initializeStateStores()) {
log.debug("Transitioning {} {} to restoring",
taskTypeName, entry.getKey());
addToRestoring(entry.getValue());
} else {
@@ -272,6 +272,7 @@ abstract class AssignedTasks<T extends Task> {
private void transitionToRunning(final T task, final Set<TopicPartition>
readyPartitions) {
log.debug("transitioning {} {} to running", taskTypeName, task.id());
running.put(task.id(), task);
+ task.initializeTopology();
for (TopicPartition topicPartition : task.partitions()) {
runningByPartition.put(topicPartition, task);
if (task.hasStateStores()) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 837f607..39d34d7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -62,14 +62,20 @@ public class StandbyTask extends AbstractTask {
}
@Override
- public boolean initialize() {
- initializeStateStores();
+ public boolean initializeStateStores() {
+ log.trace("Initializing state stores");
+ registerStateStores();
checkpointedOffsets =
Collections.unmodifiableMap(stateMgr.checkpointed());
processorContext.initialized();
taskInitialized = true;
return true;
}
+ @Override
+ public void initializeTopology() {
+ //no-op
+ }
+
/**
* <pre>
* - update offset limits
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 7063e74..e293f25 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -161,16 +161,19 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
}
@Override
- public boolean initialize() {
- log.trace("Initializing");
- initializeStateStores();
+ public boolean initializeStateStores() {
+ log.trace("Initializing state stores");
+ registerStateStores();
+ return changelogPartitions().isEmpty();
+ }
+
+ @Override
+ public void initializeTopology() {
initTopology();
processorContext.initialized();
taskInitialized = true;
- return changelogPartitions().isEmpty();
}
-
/**
* <pre>
* - re-initialize the task
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index f066bff..5f221e3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -32,7 +32,9 @@ public interface Task {
* @throws IllegalStateException If store gets registered after
initialized is already finished
* @throws StreamsException if the store's change log does not contain the
partition
*/
- boolean initialize();
+ boolean initializeStateStores();
+
+ void initializeTopology();
void commit();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index bdc1c00..62ddacf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -327,6 +327,7 @@ class TaskManager {
standby.initializeNewTasks();
final Collection<TopicPartition> restored =
changelogReader.restore(active);
+
resumed.addAll(active.updateRestored(restored));
if (!resumed.isEmpty()) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 776110c..4569858 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -101,7 +101,7 @@ public class AbstractTaskTest {
final AbstractTask task = createTask(consumer,
Collections.singletonMap(store, "dummy"));
try {
- task.initializeStateStores();
+ task.registerStateStores();
fail("Should have thrown LockException");
} catch (final LockException e) {
// ok
@@ -116,7 +116,7 @@ public class AbstractTaskTest {
final AbstractTask task = createTask(consumer,
Collections.<StateStore, String>emptyMap());
- task.initializeStateStores();
+ task.registerStateStores();
// should fail if lock is called
EasyMock.verify(stateDirectory);
@@ -254,9 +254,12 @@ public class AbstractTaskTest {
public void closeSuspended(final boolean clean, final boolean
isZombie, final RuntimeException e) {}
@Override
- public boolean initialize() {
+ public boolean initializeStateStores() {
return false;
}
+
+ @Override
+ public void initializeTopology() {}
};
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 3d33b0b..4bb7828 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -89,7 +89,7 @@ public class AssignedStreamsTasksTest {
@Test
public void shouldInitializeNewTasks() {
- EasyMock.expect(t1.initialize()).andReturn(false);
+ EasyMock.expect(t1.initializeStateStores()).andReturn(false);
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
EasyMock.replay(t1);
@@ -101,10 +101,14 @@ public class AssignedStreamsTasksTest {
@Test
public void shouldMoveInitializedTasksNeedingRestoreToRestoring() {
- EasyMock.expect(t1.initialize()).andReturn(false);
+ EasyMock.expect(t1.initializeStateStores()).andReturn(false);
+ t1.initializeTopology();
+ EasyMock.expectLastCall().once();
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
- EasyMock.expect(t2.initialize()).andReturn(true);
+ EasyMock.expect(t2.initializeStateStores()).andReturn(true);
+ t2.initializeTopology();
+ EasyMock.expectLastCall().once();
final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
EasyMock.expect(t2.partitions()).andReturn(t2partitions);
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
@@ -125,7 +129,9 @@ public class AssignedStreamsTasksTest {
@Test
public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() {
- EasyMock.expect(t2.initialize()).andReturn(true);
+ EasyMock.expect(t2.initializeStateStores()).andReturn(true);
+ t2.initializeTopology();
+ EasyMock.expectLastCall().once();
EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
EasyMock.expect(t2.hasStateStores()).andReturn(false);
@@ -142,10 +148,12 @@ public class AssignedStreamsTasksTest {
@Test
public void shouldTransitionFullyRestoredTasksToRunning() {
final Set<TopicPartition> task1Partitions = Utils.mkSet(tp1);
- EasyMock.expect(t1.initialize()).andReturn(false);
+ EasyMock.expect(t1.initializeStateStores()).andReturn(false);
EasyMock.expect(t1.partitions()).andReturn(task1Partitions).anyTimes();
EasyMock.expect(t1.changelogPartitions()).andReturn(Utils.mkSet(changeLog1,
changeLog2)).anyTimes();
EasyMock.expect(t1.hasStateStores()).andReturn(true).anyTimes();
+ t1.initializeTopology();
+ EasyMock.expectLastCall().once();
EasyMock.replay(t1);
addAndInitTask();
@@ -169,7 +177,7 @@ public class AssignedStreamsTasksTest {
@Test
public void shouldCloseRestoringTasks() {
- EasyMock.expect(t1.initialize()).andReturn(false);
+ EasyMock.expect(t1.initializeStateStores()).andReturn(false);
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
t1.close(false, false);
@@ -236,6 +244,8 @@ public class AssignedStreamsTasksTest {
mockRunningTaskSuspension();
t1.resume();
EasyMock.expectLastCall();
+ t1.initializeTopology();
+ EasyMock.expectLastCall().once();
EasyMock.replay(t1);
assertThat(suspendTask(), nullValue());
@@ -266,7 +276,9 @@ public class AssignedStreamsTasksTest {
}
private void mockTaskInitialization() {
- EasyMock.expect(t1.initialize()).andReturn(true);
+ EasyMock.expect(t1.initializeStateStores()).andReturn(true);
+ t1.initializeTopology();
+ EasyMock.expectLastCall().once();
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
EasyMock.expect(t1.hasStateStores()).andReturn(false);
@@ -449,7 +461,9 @@ public class AssignedStreamsTasksTest {
}
private void mockRunningTaskSuspension() {
- EasyMock.expect(t1.initialize()).andReturn(true);
+ EasyMock.expect(t1.initializeStateStores()).andReturn(true);
+ t1.initializeTopology();
+ EasyMock.expectLastCall().once();
EasyMock.expect(t1.hasStateStores()).andReturn(false).anyTimes();
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()).anyTimes();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index ce655bc..605ab33 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -155,7 +155,7 @@ public class StandbyTaskTest {
public void testStorePartitions() throws IOException {
StreamsConfig config = createConfig(baseDir);
StandbyTask task = new StandbyTask(taskId, topicPartitions, topology,
consumer, changelogReader, config, null, stateDirectory);
- task.initialize();
+ task.initializeStateStores();
assertEquals(Utils.mkSet(partition2, partition1), new
HashSet<>(task.checkpointedOffsets().keySet()));
}
@@ -177,7 +177,7 @@ public class StandbyTaskTest {
public void testUpdate() throws IOException {
StreamsConfig config = createConfig(baseDir);
StandbyTask task = new StandbyTask(taskId, topicPartitions, topology,
consumer, changelogReader, config, null, stateDirectory);
- task.initialize();
+ task.initializeStateStores();
final Set<TopicPartition> partition =
Collections.singleton(partition2);
restoreStateConsumer.assign(partition);
@@ -223,7 +223,7 @@ public class StandbyTaskTest {
StreamsConfig config = createConfig(baseDir);
StandbyTask task = new StandbyTask(taskId, ktablePartitions,
ktableTopology, consumer, changelogReader, config, null, stateDirectory);
- task.initialize();
+ task.initializeStateStores();
restoreStateConsumer.assign(new
ArrayList<>(task.checkpointedOffsets().keySet()));
for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
@@ -344,7 +344,7 @@ public class StandbyTaskTest {
null,
stateDirectory
);
- task.initialize();
+ task.initializeStateStores();
restoreStateConsumer.assign(new
ArrayList<>(task.checkpointedOffsets().keySet()));
@@ -396,7 +396,7 @@ public class StandbyTaskTest {
closedStateManager.set(true);
}
};
- task.initialize();
+ task.initializeStateStores();
try {
task.close(true, false);
fail("should have thrown exception");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a3ff328..92cfe66 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -318,7 +318,8 @@ public class StreamTaskTest {
@Test
public void testMaybePunctuateStreamTime() {
task = createStatelessTask(false);
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(),
partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue),
@@ -384,7 +385,8 @@ public class StreamTaskTest {
@Test
public void testCancelPunctuateStreamTime() {
task = createStatelessTask(false);
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
task.addRecords(partition1, records(
new ConsumerRecord<>(partition1.topic(),
partition1.partition(), 20, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue),
@@ -416,7 +418,8 @@ public class StreamTaskTest {
@Test
public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
task = createStatelessTask(false);
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
long now = time.milliseconds();
time.sleep(10);
assertTrue(task.maybePunctuateSystemTime());
@@ -430,7 +433,8 @@ public class StreamTaskTest {
@Test
public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
task = createStatelessTask(false);
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
long now = time.milliseconds();
assertTrue(task.maybePunctuateSystemTime()); // first time we always
punctuate
time.sleep(9);
@@ -441,7 +445,8 @@ public class StreamTaskTest {
@Test
public void testCancelPunctuateSystemTime() {
task = createStatelessTask(false);
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
long now = time.milliseconds();
time.sleep(10);
assertTrue(task.maybePunctuateSystemTime());
@@ -454,7 +459,8 @@ public class StreamTaskTest {
@Test
public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContext() {
task = createTaskThatThrowsException();
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
task.addRecords(partition2, Collections.singletonList(
new ConsumerRecord<>(partition2.topic(),
partition2.partition(), 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue)));
@@ -469,7 +475,8 @@ public class StreamTaskTest {
@Test
public void
shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime()
{
task = createStatelessTask(false);
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
try {
task.punctuate(processorStreamTime, 1,
PunctuationType.STREAM_TIME, new Punctuator() {
@@ -489,7 +496,8 @@ public class StreamTaskTest {
@Test
public void
shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime()
{
task = createStatelessTask(false);
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
try {
task.punctuate(processorSystemTime, 1,
PunctuationType.WALL_CLOCK_TIME, new Punctuator() {
@@ -531,7 +539,8 @@ public class StreamTaskTest {
@Test
public void shouldCheckpointOffsetsOnCommit() throws IOException {
task = createStatefulTask(false, true);
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
task.commit();
final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new
File(stateDirectory.directoryForTask(taskId00),
ProcessorStateManager.CHECKPOINT_FILE_NAME));
@@ -542,7 +551,8 @@ public class StreamTaskTest {
@Test
public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
task = createStatefulTask(true, true);
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
task.commit();
final File checkpointFile = new
File(stateDirectory.directoryForTask(taskId00),
ProcessorStateManager.CHECKPOINT_FILE_NAME);
@@ -553,7 +563,8 @@ public class StreamTaskTest {
@Test
public void
shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
task = createStatelessTask(false);
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
task.processorContext.setCurrentNode(processorStreamTime);
try {
task.punctuate(processorStreamTime, 10,
PunctuationType.STREAM_TIME, punctuator);
@@ -566,7 +577,8 @@ public class StreamTaskTest {
@Test
public void shouldCallPunctuateOnPassedInProcessorNode() {
task = createStatelessTask(false);
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME,
punctuator);
assertThat(punctuatedAt, equalTo(5L));
task.punctuate(processorStreamTime, 10, PunctuationType.STREAM_TIME,
punctuator);
@@ -576,7 +588,8 @@ public class StreamTaskTest {
@Test
public void
shouldSetProcessorNodeOnContextBackToNullAfterSuccesfullPunctuate() {
task = createStatelessTask(false);
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
task.punctuate(processorStreamTime, 5, PunctuationType.STREAM_TIME,
punctuator);
assertThat(((ProcessorContextImpl) task.context()).currentNode(),
nullValue());
}
@@ -607,7 +620,8 @@ public class StreamTaskTest {
@Test
public void
shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology()
{
task = createTaskThatThrowsException();
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
try {
task.close(true, false);
fail("should have thrown runtime exception");
@@ -760,7 +774,8 @@ public class StreamTaskTest {
@Test
public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushing()
{
task = createTaskThatThrowsException();
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
try {
task.commit();
@@ -774,7 +789,8 @@ public class StreamTaskTest {
public void
shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() {
final StreamTask task = createTaskThatThrowsException();
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
try {
task.suspend();
fail("should have thrown an exception");
@@ -786,7 +802,8 @@ public class StreamTaskTest {
@Test
public void shouldCloseStateManagerIfFailureOnTaskClose() {
task = createStatefulTaskThatThrowsExceptionOnClose(true, false);
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
try {
task.close(true, false);
@@ -813,14 +830,14 @@ public class StreamTaskTest {
public void shouldBeInitializedIfChangelogPartitionsIsEmpty() {
final StreamTask task = createStatefulTask(false, false);
- assertTrue(task.initialize());
+ assertTrue(task.initializeStateStores());
}
@Test
public void shouldNotBeInitializedIfChangelogPartitionsIsNonEmpty() {
final StreamTask task = createStatefulTask(false, true);
- assertFalse(task.initialize());
+ assertFalse(task.initializeStateStores());
}
@Test
@@ -840,7 +857,8 @@ public class StreamTaskTest {
task = new StreamTask(taskId00, Utils.mkSet(partition1, repartition),
topology, consumer, changelogReader, config,
streamsMetrics, stateDirectory, null, time, producer);
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
task.addRecords(partition1, Collections.singletonList(
new ConsumerRecord<>(partition1.topic(),
partition1.partition(), 5L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey,
recordValue)));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 9821e4c..85c282c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -103,12 +103,12 @@ public class StreamThreadStateStoreProviderTest {
stateDirectory = new StateDirectory(streamsConfig, new MockTime());
taskOne = createStreamsTask(applicationId, streamsConfig,
clientSupplier, topology,
new TaskId(0, 0));
- taskOne.initialize();
+ taskOne.initializeStateStores();
tasks.put(new TaskId(0, 0),
taskOne);
taskTwo = createStreamsTask(applicationId, streamsConfig,
clientSupplier, topology,
new TaskId(0, 1));
- taskTwo.initialize();
+ taskTwo.initializeStateStores();
tasks.put(new TaskId(0, 1),
taskTwo);
diff --git
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index ae0cc9c..e5f15bc 100644
---
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -248,7 +248,8 @@ public class ProcessorTopologyTestDriver {
cache,
new MockTime(),
producer);
- task.initialize();
+ task.initializeStateStores();
+ task.initializeTopology();
}
}
--
To stop receiving notification emails like this one, please contact
[email protected].