This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 73dc373714c KAFKA-18913: Remove all code related to non-stateupdater
path (#21059)
73dc373714c is described below
commit 73dc373714c3602ba4211fbe5602ddfd8fb252e2
Author: Shashank <[email protected]>
AuthorDate: Wed Dec 10 05:08:04 2025 -0800
KAFKA-18913: Remove all code related to non-stateupdater path (#21059)
This PR cleans up all non stateupdater code. We also remove the config
`__state.updater.enabled__` which means users will no longer have the
option to disable the stateupdater.
Reviewers: Lucas Brutschy <[email protected]>
---
committer-tools/README.md | 2 +-
.../org/apache/kafka/streams/StreamsConfig.java | 7 -
.../processor/internals/ActiveTaskCreator.java | 10 +-
.../processor/internals/ProcessorStateManager.java | 56 +---
.../processor/internals/StandbyTaskCreator.java | 12 +-
.../processor/internals/StateDirectory.java | 4 +-
.../processor/internals/StoreChangelogReader.java | 10 +-
.../streams/processor/internals/StreamThread.java | 131 +++------
.../streams/processor/internals/TaskManager.java | 315 +++++++--------------
.../kafka/streams/processor/internals/Tasks.java | 11 -
.../streams/processor/internals/TasksRegistry.java | 2 -
.../processor/internals/ActiveTaskCreatorTest.java | 2 -
.../internals/ProcessorStateManagerTest.java | 57 +---
.../processor/internals/StreamThreadTest.java | 18 +-
.../processor/internals/TaskManagerTest.java | 5 +-
.../StreamThreadStateStoreProviderTest.java | 14 +-
.../apache/kafka/streams/TopologyTestDriver.java | 4 +-
.../tests/streams/streams_upgrade_test.py | 23 +-
18 files changed, 187 insertions(+), 496 deletions(-)
diff --git a/committer-tools/README.md b/committer-tools/README.md
index 94f714f959a..4944c6f257f 100644
--- a/committer-tools/README.md
+++ b/committer-tools/README.md
@@ -101,5 +101,5 @@ python find-unfinished-test.py
~/Downloads/logs_28218821016/5_build\ _\ JUnit\ t
Found tests that were started, but not finished:
-2024-09-10T20:31:26.6830206Z Gradle Test Run :streams:test > Gradle Test
Executor 47 > StreamThreadTest >
shouldReturnErrorIfProducerInstanceIdNotInitialized(boolean, boolean) >
"shouldReturnErrorIfProducerInstanceIdNotInitialized(boolean,
boolean).stateUpdaterEnabled=true, processingThreadsEnabled=true" STARTED
+2024-09-10T20:31:26.6830206Z Gradle Test Run :streams:test > Gradle Test
Executor 47 > StreamThreadTest >
shouldReturnErrorIfProducerInstanceIdNotInitialized(boolean) >
"shouldReturnErrorIfProducerInstanceIdNotInitialized(boolean).processingThreadsEnabled=true"
STARTED
```
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index fdb409d1258..1fdf6654b90 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1309,13 +1309,6 @@ public class StreamsConfig extends AbstractConfig {
// Private API used to control the prefix of the auto created topics
public static final String TOPIC_PREFIX_ALTERNATIVE =
"__internal.override.topic.prefix__";
- // Private API to enable the state updater (i.e. state updating on a
dedicated thread)
- public static final String STATE_UPDATER_ENABLED =
"__state.updater.enabled__";
-
- public static boolean stateUpdaterEnabled(final Map<String, Object>
configs) {
- return true; // always enabled
- }
-
// Private API to enable processing threads (i.e. polling is decoupled
from processing)
public static final String PROCESSING_THREADS_ENABLED =
"__processing.threads.enabled__";
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 448853c6367..24ff8fabf0c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -52,7 +52,6 @@ class ActiveTaskCreator {
private final StreamsConfig applicationConfig;
private final StreamsMetricsImpl streamsMetrics;
private final StateDirectory stateDirectory;
- private final ChangelogReader storeChangelogReader;
private final ThreadCache cache;
private final Time time;
private final KafkaClientSupplier clientSupplier;
@@ -62,7 +61,6 @@ class ActiveTaskCreator {
private final Logger log;
private final Sensor createTaskSensor;
private final StreamsProducer streamsProducer;
- private final boolean stateUpdaterEnabled;
private final boolean processingThreadsEnabled;
private boolean isClosed = false;
@@ -70,7 +68,6 @@ class ActiveTaskCreator {
final StreamsConfig applicationConfig,
final StreamsMetricsImpl streamsMetrics,
final StateDirectory stateDirectory,
- final ChangelogReader storeChangelogReader,
final ThreadCache cache,
final Time time,
final KafkaClientSupplier clientSupplier,
@@ -78,13 +75,11 @@ class ActiveTaskCreator {
final int threadIdx,
final UUID processId,
final LogContext logContext,
- final boolean stateUpdaterEnabled,
final boolean processingThreadsEnabled) {
this.topologyMetadata = topologyMetadata;
this.applicationConfig = applicationConfig;
this.streamsMetrics = streamsMetrics;
this.stateDirectory = stateDirectory;
- this.storeChangelogReader = storeChangelogReader;
this.cache = cache;
this.time = time;
this.clientSupplier = clientSupplier;
@@ -92,7 +87,6 @@ class ActiveTaskCreator {
this.threadIdx = threadIdx;
this.processId = processId;
this.log = logContext.logger(getClass());
- this.stateUpdaterEnabled = stateUpdaterEnabled;
this.processingThreadsEnabled = processingThreadsEnabled;
createTaskSensor = ThreadMetrics.createTaskSensor(threadId,
streamsMetrics);
@@ -154,10 +148,8 @@ class ActiveTaskCreator {
eosEnabled(applicationConfig),
logContext,
stateDirectory,
- storeChangelogReader,
topology.storeToChangelogTopic(),
- partitions,
- stateUpdaterEnabled);
+ partitions);
final InternalProcessorContext<Object, Object> context = new
ProcessorContextImpl(
taskId,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 3506845d288..5f68ff4e067 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -40,7 +40,6 @@ import org.slf4j.Logger;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -170,7 +169,6 @@ public class ProcessorStateManager implements StateManager {
private final TaskId taskId;
private final boolean eosEnabled;
- private ChangelogRegister changelogReader;
private final Collection<TopicPartition> sourcePartitions;
private final Map<String, String> storeToChangelogTopic;
@@ -180,7 +178,6 @@ public class ProcessorStateManager implements StateManager {
private final File baseDir;
private final OffsetCheckpoint checkpointFile;
- private final boolean stateUpdaterEnabled;
private TaskType taskType;
private Logger log;
@@ -202,19 +199,15 @@ public class ProcessorStateManager implements
StateManager {
final boolean eosEnabled,
final LogContext logContext,
final StateDirectory stateDirectory,
- final ChangelogRegister changelogReader,
final Map<String, String>
storeToChangelogTopic,
- final Collection<TopicPartition>
sourcePartitions,
- final boolean stateUpdaterEnabled) throws
ProcessorStateException {
+ final Collection<TopicPartition>
sourcePartitions) throws ProcessorStateException {
this.storeToChangelogTopic = storeToChangelogTopic;
this.log = logContext.logger(ProcessorStateManager.class);
this.logPrefix = logContext.logPrefix();
this.taskId = taskId;
this.taskType = taskType;
this.eosEnabled = eosEnabled;
- this.changelogReader = changelogReader;
this.sourcePartitions = sourcePartitions;
- this.stateUpdaterEnabled = stateUpdaterEnabled;
this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId);
this.checkpointFile = new
OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId));
@@ -225,16 +218,15 @@ public class ProcessorStateManager implements
StateManager {
/**
* Special constructor used by {@link StateDirectory} to partially
initialize startup tasks for local state, before
* they're assigned to a thread. When the task is assigned to a thread,
the initialization of this StateManager is
- * completed in {@link #assignToStreamThread(LogContext,
ChangelogRegister, Collection)}.
+ * completed in {@link #assignToStreamThread(LogContext, Collection)}.
*/
static ProcessorStateManager createStartupTaskStateManager(final TaskId
taskId,
final boolean
eosEnabled,
final
LogContext logContext,
final
StateDirectory stateDirectory,
final
Map<String, String> storeToChangelogTopic,
- final
Set<TopicPartition> sourcePartitions,
- final boolean
stateUpdaterEnabled) {
- return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled,
logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions,
stateUpdaterEnabled);
+ final
Set<TopicPartition> sourcePartitions) {
+ return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled,
logContext, stateDirectory, storeToChangelogTopic, sourcePartitions);
}
/**
@@ -243,26 +235,17 @@ public class ProcessorStateManager implements
StateManager {
* assigned StreamThread's context.
*/
void assignToStreamThread(final LogContext logContext,
- final ChangelogRegister changelogReader,
final Collection<TopicPartition>
sourcePartitions) {
- if (this.changelogReader != null) {
- throw new IllegalStateException("Attempted to replace an existing
changelogReader on a StateManager without closing it.");
- }
this.sourcePartitions.clear();
this.log = logContext.logger(ProcessorStateManager.class);
this.logPrefix = logContext.logPrefix();
- this.changelogReader = changelogReader;
this.sourcePartitions.addAll(sourcePartitions);
}
void registerStateStores(final List<StateStore> allStores, final
InternalProcessorContext<?, ?> processorContext) {
processorContext.uninitialize();
for (final StateStore store : allStores) {
- if (stores.containsKey(store.name())) {
- if (!stateUpdaterEnabled) {
- maybeRegisterStoreWithChangelogReader(store.name());
- }
- } else {
+ if (!stores.containsKey(store.name())) {
store.init(processorContext, store);
}
log.trace("Registered state store {}", store.name());
@@ -346,22 +329,6 @@ public class ProcessorStateManager implements StateManager
{
}
}
- private void maybeRegisterStoreWithChangelogReader(final String storeName)
{
- if (isLoggingEnabled(storeName) && changelogReader != null) {
- changelogReader.register(getStorePartition(storeName), this);
- }
- }
-
- private List<TopicPartition> getAllChangelogTopicPartitions() {
- final List<TopicPartition> allChangelogPartitions = new ArrayList<>();
- for (final StateStoreMetadata storeMetadata : stores.values()) {
- if (storeMetadata.changelogPartition != null) {
- allChangelogPartitions.add(storeMetadata.changelogPartition);
- }
- }
- return allChangelogPartitions;
- }
-
@Override
public File baseDir() {
return baseDir;
@@ -404,10 +371,6 @@ public class ProcessorStateManager implements StateManager
{
// on the state manager this state store would be closed as well
stores.put(storeName, storeMetadata);
- if (!stateUpdaterEnabled) {
- maybeRegisterStoreWithChangelogReader(storeName);
- }
-
log.debug("Registered state store {} to its state manager", storeName);
}
@@ -616,10 +579,6 @@ public class ProcessorStateManager implements StateManager
{
public void close() throws ProcessorStateException {
log.debug("Closing its state manager and all the registered state
stores: {}", stores);
- if (!stateUpdaterEnabled && changelogReader != null) {
- changelogReader.unregister(getAllChangelogTopicPartitions());
- }
-
RuntimeException firstException = null;
// attempting to close the stores, just in case they
// are not closed by a ProcessorNode yet
@@ -664,11 +623,6 @@ public class ProcessorStateManager implements StateManager
{
void recycle() {
log.debug("Recycling state for {} task {}.", taskType, taskId);
- if (!stateUpdaterEnabled && changelogReader != null) {
- final List<TopicPartition> allChangelogs =
getAllChangelogTopicPartitions();
- changelogReader.unregister(allChangelogs);
- }
-
// when the state manager is recycled to be used, future writes may
bypass its store's caching
// layer if they are from restoration, hence we need to clear the
state store's caches just in case
// See KAFKA-14172 for details
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
index eb8bcafea69..693cb4ed63a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
@@ -40,27 +40,21 @@ class StandbyTaskCreator {
private final StreamsConfig applicationConfig;
private final StreamsMetricsImpl streamsMetrics;
private final StateDirectory stateDirectory;
- private final ChangelogReader storeChangelogReader;
private final ThreadCache dummyCache;
private final Logger log;
private final Sensor createTaskSensor;
- private final boolean stateUpdaterEnabled;
StandbyTaskCreator(final TopologyMetadata topologyMetadata,
final StreamsConfig applicationConfig,
final StreamsMetricsImpl streamsMetrics,
final StateDirectory stateDirectory,
- final ChangelogReader storeChangelogReader,
final String threadId,
- final LogContext logContext,
- final boolean stateUpdaterEnabled) {
+ final LogContext logContext) {
this.topologyMetadata = topologyMetadata;
this.applicationConfig = applicationConfig;
this.streamsMetrics = streamsMetrics;
this.stateDirectory = stateDirectory;
- this.storeChangelogReader = storeChangelogReader;
this.log = logContext.logger(getClass());
- this.stateUpdaterEnabled = stateUpdaterEnabled;
createTaskSensor = ThreadMetrics.createTaskSensor(threadId,
streamsMetrics);
@@ -87,10 +81,8 @@ class StandbyTaskCreator {
eosEnabled(applicationConfig),
getLogContext(taskId),
stateDirectory,
- storeChangelogReader,
topology.storeToChangelogTopic(),
- partitions,
- stateUpdaterEnabled);
+ partitions);
final InternalProcessorContext<?, ?> context = new
ProcessorContextImpl(
taskId,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 8ea2d3ae65a..bad10567ddc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -213,7 +213,6 @@ public class StateDirectory implements AutoCloseable {
if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
final ThreadCache dummyCache = new ThreadCache(logContext, 0,
streamsMetrics);
final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
- final boolean stateUpdaterEnabled =
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
// discover all non-empty task directories in StateDirectory
for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
@@ -235,8 +234,7 @@ public class StateDirectory implements AutoCloseable {
logContext,
this,
subTopology.storeToChangelogTopic(),
- inputPartitions,
- stateUpdaterEnabled
+ inputPartitions
);
final InternalProcessorContext<Object, Object> context =
new ProcessorContextImpl(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 5e09ceb62da..3ec8aec6fca 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -33,7 +33,6 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
@@ -212,8 +211,6 @@ public class StoreChangelogReader implements
ChangelogReader {
private final Consumer<byte[], byte[]> restoreConsumer;
private final StateRestoreListener stateRestoreListener;
- private final boolean stateUpdaterEnabled;
-
// source of the truth of the current registered changelogs;
// NOTE a changelog would only be removed when its corresponding task
// is being removed from the thread; otherwise it would stay in this map
even after completed
@@ -243,8 +240,6 @@ public class StoreChangelogReader implements
ChangelogReader {
this.stateRestoreListener = stateRestoreListener;
this.standbyUpdateListener = standbyUpdateListener;
- this.stateUpdaterEnabled =
InternalConfig.stateUpdaterEnabled(config.originals());
-
this.groupId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
this.pollTime =
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
this.updateOffsetIntervalMs =
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) == Long.MAX_VALUE ?
@@ -491,15 +486,12 @@ public class StoreChangelogReader implements
ChangelogReader {
private ConsumerRecords<byte[], byte[]>
pollRecordsFromRestoreConsumer(final Map<TaskId, Task> tasks,
final Set<TopicPartition> restoringChangelogs) {
- // If we are updating only standby tasks, and are not using a separate
thread, we should
- // use a non-blocking poll to unblock the processing as soon as
possible.
- final boolean useNonBlockingPoll = state ==
ChangelogReaderState.STANDBY_UPDATING && !stateUpdaterEnabled;
final ConsumerRecords<byte[], byte[]> polledRecords;
try {
pauseResumePartitions(tasks, restoringChangelogs);
- polledRecords = restoreConsumer.poll(useNonBlockingPoll ?
Duration.ZERO : pollTime);
+ polledRecords = restoreConsumer.poll(pollTime);
// TODO (?) If we cannot fetch records during restore, should we
trigger `task.timeout.ms` ?
// TODO (?) If we cannot fetch records for standby task, should we
trigger `task.timeout.ms` ?
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f208567c32d..84c4f51734a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -372,7 +372,6 @@ public class StreamThread extends Thread implements
ProcessingThread {
new
AtomicReference<>(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
private final AtomicLong lastShutdownWarningTimestamp = new AtomicLong(0L);
private final boolean eosEnabled;
- private final boolean stateUpdaterEnabled;
private final boolean processingThreadsEnabled;
private volatile long fetchDeadlineClientInstanceId = -1;
@@ -399,15 +398,12 @@ public class StreamThread extends Thread implements
ProcessingThread {
final Runnable shutdownErrorHook,
final BiConsumer<Throwable, Boolean>
streamsUncaughtExceptionHandler) {
- final boolean stateUpdaterEnabled =
InternalConfig.stateUpdaterEnabled(config.originals());
-
final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx;
final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING,
STATE_UPDATER_ID_SUBSTRING);
- final String restorationThreadId = stateUpdaterEnabled ?
stateUpdaterId : threadId;
final String logPrefix = String.format("stream-thread [%s] ",
threadId);
final LogContext logContext = new LogContext(logPrefix);
- final LogContext restorationLogContext = stateUpdaterEnabled ? new
LogContext(String.format("state-updater [%s] ", restorationThreadId)) :
logContext;
+ final LogContext restorationLogContext = new
LogContext(String.format("state-updater [%s] ", stateUpdaterId));
final Logger log = LoggerFactory.getLogger(StreamThread.class);
final ReferenceContainer referenceContainer = new ReferenceContainer();
@@ -417,7 +413,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
referenceContainer.clientTags = config.getClientTags();
log.info("Creating restore consumer client for thread {}", threadId);
- final Map<String, Object> restoreConsumerConfigs =
config.getRestoreConsumerConfigs(restoreConsumerClientId(restorationThreadId));
+ final Map<String, Object> restoreConsumerConfigs =
config.getRestoreConsumerConfigs(restoreConsumerClientId(stateUpdaterId));
final Consumer<byte[], byte[]> restoreConsumer =
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
final StoreChangelogReader changelogReader = new StoreChangelogReader(
@@ -438,7 +434,6 @@ public class StreamThread extends Thread implements
ProcessingThread {
config,
streamsMetrics,
stateDirectory,
- changelogReader,
cache,
time,
clientSupplier,
@@ -446,7 +441,6 @@ public class StreamThread extends Thread implements
ProcessingThread {
threadIdx,
processId,
logContext,
- stateUpdaterEnabled,
proceessingThreadsEnabled
);
final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
@@ -454,20 +448,17 @@ public class StreamThread extends Thread implements
ProcessingThread {
config,
streamsMetrics,
stateDirectory,
- changelogReader,
threadId,
- logContext,
- stateUpdaterEnabled);
+ logContext);
final Tasks tasks = new Tasks(logContext);
final boolean processingThreadsEnabled =
InternalConfig.processingThreadsEnabled(config.originals());
final DefaultTaskManager schedulingTaskManager =
- maybeCreateSchedulingTaskManager(processingThreadsEnabled,
stateUpdaterEnabled, topologyMetadata, time, threadId, tasks);
+ maybeCreateSchedulingTaskManager(processingThreadsEnabled,
topologyMetadata, time, threadId, tasks);
final StateUpdater stateUpdater =
- maybeCreateStateUpdater(
- stateUpdaterEnabled,
+ createStateUpdater(
streamsMetrics,
config,
restoreConsumer,
@@ -622,16 +613,11 @@ public class StreamThread extends Thread implements
ProcessingThread {
}
private static DefaultTaskManager maybeCreateSchedulingTaskManager(final
boolean processingThreadsEnabled,
- final
boolean stateUpdaterEnabled,
final
TopologyMetadata topologyMetadata,
final
Time time,
final
String threadId,
final
Tasks tasks) {
if (processingThreadsEnabled) {
- if (!stateUpdaterEnabled) {
- throw new IllegalStateException("Processing threads require
the state updater to be enabled");
- }
-
final DefaultTaskManager defaultTaskManager = new
DefaultTaskManager(
time,
threadId,
@@ -646,29 +632,24 @@ public class StreamThread extends Thread implements
ProcessingThread {
return null;
}
- private static StateUpdater maybeCreateStateUpdater(final boolean
stateUpdaterEnabled,
- final
StreamsMetricsImpl streamsMetrics,
- final
StreamsConfig streamsConfig,
- final
Consumer<byte[], byte[]> restoreConsumer,
- final
ChangelogReader changelogReader,
- final
TopologyMetadata topologyMetadata,
- final Time
time,
- final String
clientId,
- final int
threadIdx) {
- if (stateUpdaterEnabled) {
- final String name = clientId + STATE_UPDATER_ID_SUBSTRING +
threadIdx;
- return new DefaultStateUpdater(
- name,
- streamsMetrics,
- streamsConfig,
- restoreConsumer,
- changelogReader,
- topologyMetadata,
- time
- );
- } else {
- return null;
- }
+ private static StateUpdater createStateUpdater(final StreamsMetricsImpl
streamsMetrics,
+ final StreamsConfig
streamsConfig,
+ final Consumer<byte[],
byte[]> restoreConsumer,
+ final ChangelogReader
changelogReader,
+ final TopologyMetadata
topologyMetadata,
+ final Time time,
+ final String clientId,
+ final int threadIdx) {
+ final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx;
+ return new DefaultStateUpdater(
+ name,
+ streamsMetrics,
+ streamsConfig,
+ restoreConsumer,
+ changelogReader,
+ topologyMetadata,
+ time
+ );
}
private static Optional<StreamsRebalanceData.HostInfo> parseHostInfo(final
String endpoint) {
@@ -875,7 +856,6 @@ public class StreamThread extends Thread implements
ProcessingThread {
this.numIterations = 1;
this.eosEnabled = eosEnabled(config);
- this.stateUpdaterEnabled =
InternalConfig.stateUpdaterEnabled(config.originals());
this.processingThreadsEnabled =
InternalConfig.processingThreadsEnabled(config.originals());
this.logSummaryIntervalMs =
config.getLong(StreamsConfig.LOG_SUMMARY_INTERVAL_MS_CONFIG);
@@ -905,9 +885,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
}
boolean cleanRun = false;
try {
- if (stateUpdaterEnabled) {
- taskManager.init();
- }
+ taskManager.init();
cleanRun = runLoop();
} catch (final Throwable e) {
failedStreamThreadSensor.record();
@@ -1024,27 +1002,6 @@ public class StreamThread extends Thread implements
ProcessingThread {
}
}
-
- if (!stateUpdaterEnabled &&
!restoreConsumerInstanceIdFuture.isDone()) {
- if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
- try {
-
restoreConsumerInstanceIdFuture.complete(restoreConsumer.clientInstanceId(Duration.ZERO));
- } catch (final IllegalStateException disabledError) {
- // if telemetry is disabled on a client, we swallow
the error,
- // to allow returning a partial result for all other
clients
- restoreConsumerInstanceIdFuture.complete(null);
- } catch (final TimeoutException swallow) {
- // swallow
- } catch (final Exception error) {
-
restoreConsumerInstanceIdFuture.completeExceptionally(error);
- }
- } else {
- restoreConsumerInstanceIdFuture.completeExceptionally(
- new TimeoutException("Could not retrieve restore
consumer client instance id.")
- );
- }
- }
-
if (!producerInstanceIdFuture.isDone()) {
if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
try {
@@ -1067,10 +1024,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
}
}
- if (mainConsumerInstanceIdFuture.isDone()
- && (!stateUpdaterEnabled &&
restoreConsumerInstanceIdFuture.isDone())
- && producerInstanceIdFuture.isDone()) {
-
+ if (mainConsumerInstanceIdFuture.isDone() &&
producerInstanceIdFuture.isDone()) {
fetchDeadlineClientInstanceId = -1L;
}
}
@@ -1220,10 +1174,6 @@ public class StreamThread extends Thread implements
ProcessingThread {
return;
}
- if (!stateUpdaterEnabled) {
- initializeAndRestorePhase();
- }
-
// TODO: we should record the restore latency and its relative time
spent ratio after
// we figure out how to move this method out of the stream thread
advanceNowAndComputeLatency();
@@ -1232,8 +1182,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
long totalCommitLatency = 0L;
long totalProcessLatency = 0L;
long totalPunctuateLatency = 0L;
- if (state == State.RUNNING
- || (stateUpdaterEnabled &&
isStartingRunningOrPartitionAssigned())) {
+ if (isStartingRunningOrPartitionAssigned()) {
taskManager.updateLags();
@@ -1248,9 +1197,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
*/
do {
- if (stateUpdaterEnabled) {
- checkStateUpdater();
- }
+ checkStateUpdater();
log.debug("Processing tasks with {} iterations.",
numIterations);
final int processed = taskManager.process(numIterations, time);
@@ -1469,15 +1416,11 @@ public class StreamThread extends Thread implements
ProcessingThread {
final ConsumerRecords<byte[], byte[]> records;
log.debug("Invoking poll on main Consumer");
- if (state == State.PARTITIONS_ASSIGNED && !stateUpdaterEnabled) {
- // try to fetch some records with zero poll millis
- // to unblock the restoration as soon as possible
- records = pollRequests(Duration.ZERO);
- } else if (state == State.PARTITIONS_REVOKED) {
+ if (state == State.PARTITIONS_REVOKED) {
// try to fetch some records with zero poll millis to unblock
// other useful work while waiting for the join response
records = pollRequests(Duration.ZERO);
- } else if (state == State.RUNNING || state == State.STARTING || (state
== State.PARTITIONS_ASSIGNED && stateUpdaterEnabled)) {
+ } else if (state == State.RUNNING || state == State.STARTING || state
== State.PARTITIONS_ASSIGNED) {
// try to fetch some records with normal poll time
// in order to get long polling
records = pollRequests(pollTime);
@@ -1839,7 +1782,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
}
committed = taskManager.commit(
- taskManager.allOwnedTasks()
+ taskManager.allRunningTasks()
.values()
.stream()
.filter(t -> t.state() == Task.State.RUNNING || t.state()
== Task.State.RESTORING)
@@ -2079,18 +2022,8 @@ public class StreamThread extends Thread implements
ProcessingThread {
}
result.put(getName() + "-consumer", mainConsumerInstanceIdFuture);
- if (stateUpdaterEnabled) {
- restoreConsumerInstanceIdFuture =
stateUpdater.restoreConsumerInstanceId(timeout);
- } else {
- if (restoreConsumerInstanceIdFuture.isDone()) {
- if
(restoreConsumerInstanceIdFuture.isCompletedExceptionally()) {
- restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>();
- setDeadline = true;
- }
- } else {
- setDeadline = true;
- }
- }
+ restoreConsumerInstanceIdFuture =
stateUpdater.restoreConsumerInstanceId(timeout);
+
result.put(getName() + "-restore-consumer",
restoreConsumerInstanceIdFuture);
if (producerInstanceIdFuture.isDone()) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 4092381fdf4..57f9ecc38d4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -151,9 +151,7 @@ public class TaskManager {
}
void init() {
- if (stateUpdater != null) {
- this.stateUpdater.start();
- }
+ this.stateUpdater.start();
}
void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) {
this.mainConsumer = mainConsumer;
@@ -198,18 +196,14 @@ public class TaskManager {
void handleRebalanceComplete() {
// we should pause consumer only within the listener since
// before then the assignment has not been updated yet.
- if (stateUpdater == null) {
- mainConsumer.pause(mainConsumer.assignment());
- } else {
- // All tasks that are owned by the task manager are ready and do
not need to be paused
- final Set<TopicPartition> partitionsNotToPause =
tasks.allNonFailedTasks()
- .stream()
- .flatMap(task -> task.inputPartitions().stream())
- .collect(Collectors.toSet());
- final Set<TopicPartition> partitionsToPause = new
HashSet<>(mainConsumer.assignment());
- partitionsToPause.removeAll(partitionsNotToPause);
- mainConsumer.pause(partitionsToPause);
- }
+ // All tasks that are owned by the task manager are ready and do not
need to be paused
+ final Set<TopicPartition> partitionsNotToPause =
tasks.allNonFailedTasks()
+ .stream()
+ .flatMap(task -> task.inputPartitions().stream())
+ .collect(Collectors.toSet());
+ final Set<TopicPartition> partitionsToPause = new
HashSet<>(mainConsumer.assignment());
+ partitionsToPause.removeAll(partitionsNotToPause);
+ mainConsumer.pause(partitionsToPause);
releaseLockedUnassignedTaskDirectories();
@@ -272,14 +266,6 @@ public class TaskManager {
private void closeDirtyAndRevive(final Collection<Task>
taskWithChangelogs, final boolean markAsCorrupted) {
for (final Task task : taskWithChangelogs) {
if (task.state() != State.CLOSED) {
- final Collection<TopicPartition> corruptedPartitions =
task.changelogPartitions();
-
- // mark corrupted partitions to not be checkpointed, and then
close the task as dirty
- // TODO: this step should be removed as we complete migrating
to state updater
- if (markAsCorrupted && stateUpdater == null) {
- task.markChangelogAsCorrupted(corruptedPartitions);
- }
-
try {
// we do not need to take the returned offsets since we
are not going to commit anyways;
// this call is only used for active tasks to flush the
cache before suspending and
@@ -324,20 +310,15 @@ public class TaskManager {
task.addPartitionsForOffsetReset(assignedToPauseAndReset);
}
- if (stateUpdater != null) {
- tasks.removeTask(task);
- }
+
+ tasks.removeTask(task);
task.revive();
- if (stateUpdater != null) {
- tasks.addPendingTasksToInit(Collections.singleton(task));
- }
+ tasks.addPendingTasksToInit(Collections.singleton(task));
}
}
private Map<Task, Set<TopicPartition>> assignStartupTasks(final
Map<TaskId, Set<TopicPartition>> tasksToAssign,
- final String
threadLogPrefix,
- final
TopologyMetadata topologyMetadata,
- final
ChangelogRegister changelogReader) {
+ final String
threadLogPrefix) {
if (stateDirectory.hasStartupTasks()) {
final Map<Task, Set<TopicPartition>> assignedTasks = new
HashMap<>(tasksToAssign.size());
for (final Map.Entry<TaskId, Set<TopicPartition>> entry :
tasksToAssign.entrySet()) {
@@ -346,7 +327,7 @@ public class TaskManager {
if (task != null) {
// replace our dummy values with the real ones, now we
know our thread and assignment
final Set<TopicPartition> inputPartitions =
entry.getValue();
- task.stateManager().assignToStreamThread(new
LogContext(threadLogPrefix), changelogReader, inputPartitions);
+ task.stateManager().assignToStreamThread(new
LogContext(threadLogPrefix), inputPartitions);
updateInputPartitionsOfStandbyTaskIfTheyChanged(task,
inputPartitions);
assignedTasks.put(task, inputPartitions);
@@ -401,18 +382,14 @@ public class TaskManager {
// 2. for tasks that have changed active/standby status, just recycle
and skip re-creating them
// 3. otherwise, close them since they are no longer owned
final Map<TaskId, RuntimeException> failedTasks = new
LinkedHashMap<>();
- if (stateUpdater == null) {
- handleTasksWithoutStateUpdater(activeTasksToCreate,
standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
- } else {
- handleTasksWithStateUpdater(
- activeTasksToCreate,
- standbyTasksToCreate,
- tasksToRecycle,
- tasksToCloseClean,
- failedTasks
- );
-
failedTasks.putAll(collectExceptionsAndFailedTasksFromStateUpdater());
- }
+ handleTasks(
+ activeTasksToCreate,
+ standbyTasksToCreate,
+ tasksToRecycle,
+ tasksToCloseClean,
+ failedTasks
+ );
+ failedTasks.putAll(collectExceptionsAndFailedTasksFromStateUpdater());
final Map<TaskId, RuntimeException> taskCloseExceptions =
closeAndRecycleTasks(tasksToRecycle, tasksToCloseClean);
@@ -475,53 +452,8 @@ public class TaskManager {
final Collection<Task> newActiveTasks =
activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate);
final Collection<Task> newStandbyTasks =
standbyTaskCreator.createTasks(standbyTasksToCreate);
- if (stateUpdater == null) {
- tasks.addActiveTasks(newActiveTasks);
- tasks.addStandbyTasks(newStandbyTasks);
- } else {
- tasks.addPendingTasksToInit(newActiveTasks);
- tasks.addPendingTasksToInit(newStandbyTasks);
- }
- }
-
- private void handleTasksWithoutStateUpdater(final Map<TaskId,
Set<TopicPartition>> activeTasksToCreate,
- final Map<TaskId,
Set<TopicPartition>> standbyTasksToCreate,
- final Map<Task,
Set<TopicPartition>> tasksToRecycle,
- final Set<Task>
tasksToCloseClean) {
- final Map<Task, Set<TopicPartition>> startupStandbyTasksToRecycle =
assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
- final Map<Task, Set<TopicPartition>> startupStandbyTasksToUse =
assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
-
- // recycle the startup standbys to active
- tasks.addStandbyTasks(startupStandbyTasksToRecycle.keySet());
-
- // use startup Standbys as real Standby tasks
- tasks.addStandbyTasks(startupStandbyTasksToUse.keySet());
-
- for (final Task task : tasks.allTasks()) {
- final TaskId taskId = task.id();
- if (activeTasksToCreate.containsKey(taskId)) {
- if (task.isActive()) {
- final Set<TopicPartition> topicPartitions =
activeTasksToCreate.get(taskId);
- if (tasks.updateActiveTaskInputPartitions(task,
topicPartitions)) {
- task.updateInputPartitions(topicPartitions,
topologyMetadata.nodeToSourceTopics(task.id()));
- }
- task.resume();
- } else {
- tasksToRecycle.put(task, activeTasksToCreate.get(taskId));
- }
- activeTasksToCreate.remove(taskId);
- } else if (standbyTasksToCreate.containsKey(taskId)) {
- if (!task.isActive()) {
- updateInputPartitionsOfStandbyTaskIfTheyChanged(task,
standbyTasksToCreate.get(taskId));
- task.resume();
- } else {
- tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
- }
- standbyTasksToCreate.remove(taskId);
- } else {
- tasksToCloseClean.add(task);
- }
- }
+ tasks.addPendingTasksToInit(newActiveTasks);
+ tasks.addPendingTasksToInit(newStandbyTasks);
}
private void updateInputPartitionsOfStandbyTaskIfTheyChanged(final Task
task,
@@ -545,11 +477,11 @@ public class TaskManager {
}
}
- private void handleTasksWithStateUpdater(final Map<TaskId,
Set<TopicPartition>> activeTasksToCreate,
- final Map<TaskId,
Set<TopicPartition>> standbyTasksToCreate,
- final Map<Task,
Set<TopicPartition>> tasksToRecycle,
- final Set<Task> tasksToCloseClean,
- final Map<TaskId,
RuntimeException> failedTasks) {
+ private void handleTasks(final Map<TaskId, Set<TopicPartition>>
activeTasksToCreate,
+ final Map<TaskId, Set<TopicPartition>>
standbyTasksToCreate,
+ final Map<Task, Set<TopicPartition>>
tasksToRecycle,
+ final Set<Task> tasksToCloseClean,
+ final Map<TaskId, RuntimeException> failedTasks) {
handleTasksPendingInitialization();
handleStartupTaskReuse(activeTasksToCreate, standbyTasksToCreate,
failedTasks);
handleRestoringAndUpdatingTasks(activeTasksToCreate,
standbyTasksToCreate, failedTasks);
@@ -572,8 +504,8 @@ public class TaskManager {
private void handleStartupTaskReuse(final Map<TaskId, Set<TopicPartition>>
activeTasksToCreate,
final Map<TaskId, Set<TopicPartition>>
standbyTasksToCreate,
final Map<TaskId, RuntimeException>
failedTasks) {
- final Map<Task, Set<TopicPartition>> startupStandbyTasksToRecycle =
assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
- final Map<Task, Set<TopicPartition>> startupStandbyTasksToUse =
assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata,
changelogReader);
+ final Map<Task, Set<TopicPartition>> startupStandbyTasksToRecycle =
assignStartupTasks(activeTasksToCreate, logPrefix);
+ final Map<Task, Set<TopicPartition>> startupStandbyTasksToUse =
assignStartupTasks(standbyTasksToCreate, logPrefix);
// recycle the startup standbys to active, and remove them from the
set of actives that need to be created
if (!startupStandbyTasksToRecycle.isEmpty()) {
@@ -802,7 +734,7 @@ public class TaskManager {
final Map.Entry<TaskId, Set<TopicPartition>> entry = iter.next();
final TaskId taskId = entry.getKey();
final boolean taskIsOwned = tasks.allTaskIds().contains(taskId)
- || (stateUpdater != null &&
stateUpdater.tasks().stream().anyMatch(task -> task.id() == taskId));
+ || (stateUpdater.tasks().stream().anyMatch(task ->
task.id().equals(taskId)));
if (taskId.topologyName() != null && !taskIsOwned &&
!topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
log.info("Cannot create the assigned task {} since it's
topology name cannot be recognized, will put it " +
"aside as pending for now and create later when
topology metadata gets refreshed", taskId);
@@ -880,12 +812,8 @@ public class TaskManager {
try {
if (oldTask.isActive()) {
final StandbyTask standbyTask =
convertActiveToStandby((StreamTask) oldTask, inputPartitions);
- if (stateUpdater != null) {
- tasks.removeTask(oldTask);
-
tasks.addPendingTasksToInit(Collections.singleton(standbyTask));
- } else {
- tasks.replaceActiveWithStandby(standbyTask);
- }
+ tasks.removeTask(oldTask);
+
tasks.addPendingTasksToInit(Collections.singleton(standbyTask));
} else {
final StreamTask activeTask =
convertStandbyToActive((StandbyTask) oldTask, inputPartitions);
tasks.replaceStandbyWithActive(activeTask);
@@ -1265,24 +1193,22 @@ public class TaskManager {
}
private void revokeTasksInStateUpdater(final Set<TopicPartition>
remainingRevokedPartitions) {
- if (stateUpdater != null) {
- final Map<TaskId,
CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new
LinkedHashMap<>();
- final Map<TaskId, RuntimeException> failedTasksFromStateUpdater =
new HashMap<>();
- for (final Task restoringTask : stateUpdater.tasks()) {
- if (restoringTask.isActive()) {
- if
(remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
- futures.put(restoringTask.id(),
stateUpdater.remove(restoringTask.id()));
-
remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());
- }
+ final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>>
futures = new LinkedHashMap<>();
+ final Map<TaskId, RuntimeException> failedTasksFromStateUpdater = new
HashMap<>();
+ for (final Task restoringTask : stateUpdater.tasks()) {
+ if (restoringTask.isActive()) {
+ if
(remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
+ futures.put(restoringTask.id(),
stateUpdater.remove(restoringTask.id()));
+
remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());
}
}
- getNonFailedTasks(futures,
failedTasksFromStateUpdater).forEach(task -> {
- task.suspend();
- tasks.addTask(task);
- });
-
- maybeThrowTaskExceptions(failedTasksFromStateUpdater);
}
+ getNonFailedTasks(futures, failedTasksFromStateUpdater).forEach(task
-> {
+ task.suspend();
+ tasks.addTask(task);
+ });
+
+ maybeThrowTaskExceptions(failedTasksFromStateUpdater);
}
private void prepareCommitAndAddOffsetsToMap(final Set<Task>
tasksToPrepare,
@@ -1335,31 +1261,27 @@ public class TaskManager {
}
private void removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit() {
- if (stateUpdater != null) {
- final Map<TaskId,
CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new
LinkedHashMap<>();
- final Set<Task> tasksToCloseClean = new
TreeSet<>(Comparator.comparing(Task::id));
- tasksToCloseClean.addAll(tasks.drainPendingActiveTasksToInit());
- final Set<Task> tasksToCloseDirty = new
TreeSet<>(Comparator.comparing(Task::id));
- for (final Task restoringTask : stateUpdater.tasks()) {
- if (restoringTask.isActive()) {
- futures.put(restoringTask.id(),
stateUpdater.remove(restoringTask.id()));
- }
+ final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>>
futures = new LinkedHashMap<>();
+ final Set<Task> tasksToCloseClean = new
TreeSet<>(Comparator.comparing(Task::id));
+ tasksToCloseClean.addAll(tasks.drainPendingActiveTasksToInit());
+ final Set<Task> tasksToCloseDirty = new
TreeSet<>(Comparator.comparing(Task::id));
+ for (final Task restoringTask : stateUpdater.tasks()) {
+ if (restoringTask.isActive()) {
+ futures.put(restoringTask.id(),
stateUpdater.remove(restoringTask.id()));
}
+ }
- addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty);
- for (final Task task : tasksToCloseClean) {
- closeTaskClean(task, tasksToCloseDirty, new HashMap<>());
- }
- for (final Task task : tasksToCloseDirty) {
- closeTaskDirty(task, false);
- }
+ addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty);
+ for (final Task task : tasksToCloseClean) {
+ closeTaskClean(task, tasksToCloseDirty, new HashMap<>());
+ }
+ for (final Task task : tasksToCloseDirty) {
+ closeTaskDirty(task, false);
}
}
public void signalResume() {
- if (stateUpdater != null) {
- stateUpdater.signalResume();
- }
+ stateUpdater.signalResume();
if (schedulingTaskManager != null) {
schedulingTaskManager.signalTaskExecutors();
}
@@ -1581,40 +1503,38 @@ public class TaskManager {
}
private void shutdownStateUpdater() {
- if (stateUpdater != null) {
- // If there are failed tasks handling them first
- for (final StateUpdater.ExceptionAndTask exceptionAndTask :
stateUpdater.drainExceptionsAndFailedTasks()) {
- final Task failedTask = exceptionAndTask.task();
- closeTaskDirty(failedTask, false);
- }
+ // If there are failed tasks handling them first
+ for (final StateUpdater.ExceptionAndTask exceptionAndTask :
stateUpdater.drainExceptionsAndFailedTasks()) {
+ final Task failedTask = exceptionAndTask.task();
+ closeTaskDirty(failedTask, false);
+ }
- final Map<TaskId,
CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new
LinkedHashMap<>();
- for (final Task task : stateUpdater.tasks()) {
- final CompletableFuture<StateUpdater.RemovedTaskResult> future
= stateUpdater.remove(task.id());
- futures.put(task.id(), future);
- }
- final Set<Task> tasksToCloseClean = new
TreeSet<>(Comparator.comparing(Task::id));
- final Set<Task> tasksToCloseDirty = new
TreeSet<>(Comparator.comparing(Task::id));
- addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty);
- // at this point we removed all tasks, so the shutdown should not
take a lot of time
- stateUpdater.shutdown(Duration.ofMinutes(1L));
+ final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>>
futures = new LinkedHashMap<>();
+ for (final Task task : stateUpdater.tasks()) {
+ final CompletableFuture<StateUpdater.RemovedTaskResult> future =
stateUpdater.remove(task.id());
+ futures.put(task.id(), future);
+ }
+ final Set<Task> tasksToCloseClean = new
TreeSet<>(Comparator.comparing(Task::id));
+ final Set<Task> tasksToCloseDirty = new
TreeSet<>(Comparator.comparing(Task::id));
+ addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty);
+ // at this point we removed all tasks, so the shutdown should not take
a lot of time
+ stateUpdater.shutdown(Duration.ofMinutes(1L));
- for (final Task task : tasksToCloseClean) {
- tasks.addTask(task);
- }
- for (final Task task : tasksToCloseDirty) {
- closeTaskDirty(task, false);
- }
- // Handling all failures that occurred during the remove process
- for (final StateUpdater.ExceptionAndTask exceptionAndTask :
stateUpdater.drainExceptionsAndFailedTasks()) {
- final Task failedTask = exceptionAndTask.task();
- closeTaskDirty(failedTask, false);
- }
+ for (final Task task : tasksToCloseClean) {
+ tasks.addTask(task);
+ }
+ for (final Task task : tasksToCloseDirty) {
+ closeTaskDirty(task, false);
+ }
+ // Handling all failures that occurred during the remove process
+ for (final StateUpdater.ExceptionAndTask exceptionAndTask :
stateUpdater.drainExceptionsAndFailedTasks()) {
+ final Task failedTask = exceptionAndTask.task();
+ closeTaskDirty(failedTask, false);
+ }
- // If there is anything left unhandled due to timeouts, handling
now
- for (final Task task : stateUpdater.tasks()) {
- closeTaskDirty(task, false);
- }
+ // If there is anything left unhandled due to timeouts, handling now
+ for (final Task task : stateUpdater.tasks()) {
+ closeTaskDirty(task, false);
}
}
@@ -1793,24 +1713,18 @@ public class TaskManager {
Map<TaskId, Task> allTasks() {
// not bothering with an unmodifiable map, since the tasks themselves
are mutable, but
// if any outside code modifies the map or the tasks, it would be a
severe transgression.
- if (stateUpdater != null) {
- final Map<TaskId, Task> ret =
stateUpdater.tasks().stream().collect(Collectors.toMap(Task::id, x -> x));
- ret.putAll(tasks.allTasksPerId());
-
ret.putAll(tasks.pendingTasksToInit().stream().collect(Collectors.toMap(Task::id,
x -> x)));
- return ret;
- } else {
- return tasks.allTasksPerId();
- }
+ final Map<TaskId, Task> ret =
stateUpdater.tasks().stream().collect(Collectors.toMap(Task::id, x -> x));
+ ret.putAll(tasks.allTasksPerId());
+
ret.putAll(tasks.pendingTasksToInit().stream().collect(Collectors.toMap(Task::id,
x -> x)));
+ return ret;
}
/**
- * Returns tasks owned by the stream thread. With state updater disabled,
these are all tasks. With
- * state updater enabled, this does not return any tasks currently owned
by the state updater.
+ * Returns tasks owned by the stream thread.
+ * This does not return any tasks currently owned by the state updater.
*
- * TODO: after we complete switching to state updater, we could rename
this function as allRunningTasks
- * to be differentiated from allTasks including running and
restoring tasks
*/
- Map<TaskId, Task> allOwnedTasks() {
+ Map<TaskId, Task> allRunningTasks() {
// not bothering with an unmodifiable map, since the tasks themselves
are mutable, but
// if any outside code modifies the map or the tasks, it would be a
severe transgression.
return tasks.allTasksPerId();
@@ -1819,13 +1733,9 @@ public class TaskManager {
Set<Task> readOnlyAllTasks() {
// not bothering with an unmodifiable map, since the tasks themselves
are mutable, but
// if any outside code modifies the map or the tasks, it would be a
severe transgression.
- if (stateUpdater != null) {
- final HashSet<Task> ret = new HashSet<>(stateUpdater.tasks());
- ret.addAll(tasks.allTasks());
- return Collections.unmodifiableSet(ret);
- } else {
- return Collections.unmodifiableSet(tasks.allTasks());
- }
+ final HashSet<Task> ret = new HashSet<>(stateUpdater.tasks());
+ ret.addAll(tasks.allTasks());
+ return Collections.unmodifiableSet(ret);
}
Map<TaskId, Task> notPausedTasks() {
@@ -1848,13 +1758,10 @@ public class TaskManager {
}
private Stream<Task> activeTaskStream() {
- if (stateUpdater != null) {
- return Stream.concat(
- activeRunningTaskStream(),
- stateUpdater.tasks().stream().filter(Task::isActive)
- );
- }
- return activeRunningTaskStream();
+ return Stream.concat(
+ activeRunningTaskStream(),
+ stateUpdater.tasks().stream().filter(Task::isActive)
+ );
}
private Stream<Task> activeRunningTaskStream() {
@@ -1871,14 +1778,10 @@ public class TaskManager {
private Stream<Task> standbyTaskStream() {
final Stream<Task> standbyTasksInTaskRegistry =
tasks.allTasks().stream().filter(t -> !t.isActive());
- if (stateUpdater != null) {
- return Stream.concat(
- stateUpdater.standbyTasks().stream(),
- standbyTasksInTaskRegistry
- );
- } else {
- return standbyTasksInTaskRegistry;
- }
+ return Stream.concat(
+ stateUpdater.standbyTasks().stream(),
+ standbyTasksInTaskRegistry
+ );
}
// For testing only.
int commitAll() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 76d63490683..f5d007a5915 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -206,17 +206,6 @@ class Tasks implements TasksRegistry {
failedTaskIds.remove(taskToRemove.id());
}
- @Override
- public synchronized void replaceActiveWithStandby(final StandbyTask
standbyTask) {
- final TaskId taskId = standbyTask.id();
- if (activeTasksPerId.remove(taskId) == null) {
- throw new IllegalStateException("Attempted to replace unknown
active task with standby task: " + taskId);
- }
- removePartitionsForActiveTask(taskId);
-
- standbyTasksPerId.put(standbyTask.id(), standbyTask);
- }
-
@Override
public synchronized void replaceStandbyWithActive(final StreamTask
activeTask) {
final TaskId taskId = activeTask.id();
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
index 20bee575ebc..09c5a79ae0f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
@@ -55,8 +55,6 @@ public interface TasksRegistry {
void removeTask(final Task taskToRemove);
- void replaceActiveWithStandby(final StandbyTask standbyTask);
-
void replaceStandbyWithActive(final StreamTask activeTask);
boolean updateActiveTaskInputPartitions(final Task task, final
Set<TopicPartition> topicPartitions);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index 96597e63523..9ddff25ff94 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -272,7 +272,6 @@ public class ActiveTaskCreatorTest {
config,
streamsMetrics,
stateDirectory,
- changeLogReader,
new ThreadCache(new LogContext(), 0L, streamsMetrics),
new MockTime(),
mockClientSupplier,
@@ -280,7 +279,6 @@ public class ActiveTaskCreatorTest {
0,
uuid,
new LogContext(),
- false,
false);
assertThat(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 8c28ae6a33d..6c1a032371c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -120,7 +120,6 @@ public class ProcessorStateManagerTest {
private final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
private final ConsumerRecord<byte[], byte[]> consumerRecord =
new ConsumerRecord<>(persistentStoreTopicName, 1, 100L, keyBytes,
valueBytes);
- private final MockChangelogReader changelogReader = new
MockChangelogReader();
private final LogContext logContext = new
LogContext("process-state-manager-test ");
private final StateRestoreCallback noopStateRestoreCallback = (k, v) -> {
};
@@ -202,14 +201,12 @@ public class ProcessorStateManagerTest {
false,
logContext,
stateDirectory,
- changelogReader,
mkMap(
mkEntry(persistentStoreName, persistentStoreTopicName),
mkEntry(persistentStoreTwoName, persistentStoreTwoTopicName),
mkEntry(nonPersistentStoreName, nonPersistentStoreTopicName)
),
- Set.of(persistentStorePartition, nonPersistentStorePartition),
- false);
+ Set.of(persistentStorePartition, nonPersistentStorePartition));
assertTrue(stateMgr.changelogAsSource(persistentStorePartition));
assertTrue(stateMgr.changelogAsSource(nonPersistentStorePartition));
@@ -224,12 +221,11 @@ public class ProcessorStateManagerTest {
false,
logContext,
stateDirectory,
- changelogReader, mkMap(
+ mkMap(
mkEntry(persistentStoreName, persistentStoreTopicName),
mkEntry(persistentStoreTwoName, persistentStoreTopicName)
),
- Collections.emptySet(),
- false);
+ Collections.emptySet());
stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
stateMgr.registerStore(persistentStoreTwo,
persistentStore.stateRestoreCallback, null);
@@ -306,7 +302,7 @@ public class ProcessorStateManagerTest {
}
@Test
- public void shouldUnregisterChangelogsDuringClose() {
+ public void shouldCloseStateStoresOnStateManagerClose() {
final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE);
final StateStore store = mock(StateStore.class);
when(store.name()).thenReturn(persistentStoreName);
@@ -317,16 +313,13 @@ public class ProcessorStateManagerTest {
verify(store).init(context, store);
stateMgr.registerStore(store, noopStateRestoreCallback, null);
-
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
stateMgr.close();
verify(store).close();
-
-
assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition));
}
@Test
- public void shouldRecycleStoreAndReregisterChangelog() {
+ public void shouldRecycleAndReinitializeStore() {
final ProcessorStateManager stateMgr =
getStateManager(Task.TaskType.ACTIVE);
final StateStore store = mock(StateStore.class);
when(store.name()).thenReturn(persistentStoreName);
@@ -336,16 +329,14 @@ public class ProcessorStateManagerTest {
verify(store).init(context, store);
stateMgr.registerStore(store, noopStateRestoreCallback, null);
-
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
stateMgr.recycle();
-
assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition));
assertThat(stateMgr.store(persistentStoreName), equalTo(store));
stateMgr.registerStateStores(singletonList(store), context);
+ verify(store).init(context, store);
verify(context, times(2)).uninitialize();
-
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
}
@Test
@@ -359,10 +350,9 @@ public class ProcessorStateManagerTest {
verify(store).init(context, store);
stateMgr.registerStore(store, noopStateRestoreCallback, null);
-
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
+ assertThat(stateMgr.store(persistentStoreName), equalTo(store));
stateMgr.recycle();
-
assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition));
assertThat(stateMgr.store(persistentStoreName), equalTo(store));
verify(store).clearCache();
@@ -374,7 +364,7 @@ public class ProcessorStateManagerTest {
try {
stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
-
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
+ assertEquals(persistentStore, stateMgr.store(persistentStoreName));
} finally {
stateMgr.close();
}
@@ -386,28 +376,7 @@ public class ProcessorStateManagerTest {
try {
stateMgr.registerStore(nonPersistentStore,
nonPersistentStore.stateRestoreCallback, null);
-
assertTrue(changelogReader.isPartitionRegistered(nonPersistentStorePartition));
- } finally {
- stateMgr.close();
- }
- }
-
- @Test
- public void shouldNotRegisterNonLoggedStore() {
- final ProcessorStateManager stateMgr = new ProcessorStateManager(
- taskId,
- Task.TaskType.STANDBY,
- false,
- logContext,
- stateDirectory,
- changelogReader,
- emptyMap(),
- emptySet(),
- false);
-
- try {
- stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
-
assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition));
+ assertEquals(nonPersistentStore,
stateMgr.store(nonPersistentStoreName));
} finally {
stateMgr.close();
}
@@ -673,10 +642,8 @@ public class ProcessorStateManagerTest {
false,
logContext,
stateDirectory,
- changelogReader,
emptyMap(),
- emptySet(),
- false);
+ emptySet());
try {
stateMgr.registerStore(persistentStore,
persistentStore.stateRestoreCallback, null);
@@ -1230,14 +1197,12 @@ public class ProcessorStateManagerTest {
eosEnabled,
logContext,
stateDirectory,
- changelogReader,
mkMap(
mkEntry(persistentStoreName, persistentStoreTopicName),
mkEntry(persistentStoreTwoName, persistentStoreTwoTopicName),
mkEntry(nonPersistentStoreName, nonPersistentStoreTopicName)
),
- emptySet(),
- false);
+ emptySet());
}
private ProcessorStateManager getStateManager(final Task.TaskType
taskType) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8a60f4f14ea..152b00918c1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -259,7 +259,6 @@ public class StreamThreadTest {
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, enableEoS ?
StreamsConfig.EXACTLY_ONCE_V2 : StreamsConfig.AT_LEAST_ONCE),
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.ByteArraySerde.class.getName()),
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArraySerde.class.getName()),
- mkEntry(InternalConfig.STATE_UPDATER_ENABLED,
Boolean.toString(true)),
mkEntry(InternalConfig.PROCESSING_THREADS_ENABLED,
Boolean.toString(processingThreadsEnabled)),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1")
));
@@ -591,7 +590,7 @@ public class StreamThreadTest {
final Task runningTask = statelessTask(taskId)
.inState(Task.State.RUNNING).build();
final TaskManager taskManager = mock(TaskManager.class);
-
when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId,
runningTask));
+
when(taskManager.allRunningTasks()).thenReturn(Collections.singletonMap(taskId,
runningTask));
when(taskManager.commit(Collections.singleton(runningTask))).thenReturn(0);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
@@ -905,7 +904,6 @@ public class StreamThreadTest {
);
final Properties properties = new Properties();
- properties.put(InternalConfig.STATE_UPDATER_ENABLED, true);
properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, false);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
final StreamsConfig config = new
StreamsConfig(StreamsTestUtils.getStreamsConfig(APPLICATION_ID,
@@ -2263,7 +2261,7 @@ public class StreamThreadTest {
TestUtils.waitForCondition(
() -> mockRestoreConsumer.assignment().size() == 1,
- "Never get the assignment");
+ "Never got the assignment");
mockRestoreConsumer.addRecord(new ConsumerRecord<>(
"stream-thread-test-count-changelog",
@@ -2287,7 +2285,7 @@ public class StreamThreadTest {
// registered again with the changelog reader
TestUtils.waitForCondition(
() -> mockRestoreConsumer.assignment().size() == 1,
- "Never get the assignment");
+ "Never got the assignment");
// after handling the exception and reviving the task, the position
// should be reset to the beginning.
@@ -2310,7 +2308,7 @@ public class StreamThreadTest {
TestUtils.waitForCondition(
() -> mockRestoreConsumer.assignment().isEmpty(),
- "Never get the assignment");
+ "Never got the assignment");
}
@ParameterizedTest
@@ -2776,7 +2774,7 @@ public class StreamThreadTest {
when(task2.state()).thenReturn(Task.State.RESTORING);
when(task3.state()).thenReturn(Task.State.CREATED);
- when(taskManager.allOwnedTasks()).thenReturn(mkMap(
+ when(taskManager.allRunningTasks()).thenReturn(mkMap(
mkEntry(taskId1, task1),
mkEntry(taskId2, task2),
mkEntry(taskId3, task3)
@@ -4036,7 +4034,7 @@ public class StreamThreadTest {
final TaskId taskId = new TaskId(0, 0);
when(runningTask.state()).thenReturn(Task.State.RUNNING);
-
when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId,
runningTask));
+
when(taskManager.allRunningTasks()).thenReturn(Collections.singletonMap(taskId,
runningTask));
return taskManager;
}
@@ -4073,10 +4071,8 @@ public class StreamThreadTest {
config,
streamsMetrics,
stateDirectory,
- new MockChangelogReader(),
CLIENT_ID,
- logContext,
- false);
+ logContext);
return standbyTaskCreator.createTasks(singletonMap(new TaskId(1, 2),
emptySet()));
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 63638d12c9b..17e47ca2f8e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -878,7 +878,7 @@ public class TaskManagerTest {
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03,
activeTask)));
- assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03,
activeTask)));
+ assertEquals(taskManager.allRunningTasks(), mkMap(mkEntry(taskId03,
activeTask)));
}
@Test
@@ -2186,7 +2186,6 @@ public class TaskManagerTest {
verify(task00).prepareCommit(false);
verify(task00).postCommit(true);
verify(task00).addPartitionsForOffsetReset(taskId00Partitions);
- verify(task00).changelogPartitions();
verify(task00).closeDirty();
verify(task00).revive();
verify(tasks).removeTask(task00);
@@ -2258,7 +2257,6 @@ public class TaskManagerTest {
verify(nonCorruptedTask).prepareCommit(true);
verify(nonCorruptedTask, never()).addPartitionsForOffsetReset(any());
verify(corruptedTask).addPartitionsForOffsetReset(taskId00Partitions);
- verify(corruptedTask).changelogPartitions();
verify(corruptedTask).postCommit(true);
// check that we should not commit empty map either
@@ -2373,7 +2371,6 @@ public class TaskManagerTest {
verify(uncorruptedActive, never()).prepareCommit(anyBoolean());
verify(uncorruptedActive, never()).postCommit(anyBoolean());
- verify(corruptedActive).changelogPartitions();
verify(corruptedActive).postCommit(true);
verify(corruptedActive).addPartitionsForOffsetReset(taskId00Partitions);
verify(consumer, never()).commitSync(emptyMap());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 212a65eacdd..34d8d279778 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -48,7 +48,6 @@ import
org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.StateDirectory;
-import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsProducer;
@@ -63,8 +62,6 @@ import
org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockApiProcessorSupplier;
-import org.apache.kafka.test.MockStandbyUpdateListener;
-import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
@@ -437,17 +434,8 @@ public class StreamThreadStateStoreProviderTest {
StreamsConfigUtils.eosEnabled(streamsConfig),
logContext,
stateDirectory,
- new StoreChangelogReader(
- new MockTime(),
- streamsConfig,
- logContext,
- adminClient,
- restoreConsumer,
- new MockStateRestoreListener(),
- new MockStandbyUpdateListener()),
topology.storeToChangelogTopic(),
- partitions,
- false);
+ partitions);
final RecordCollector recordCollector = new RecordCollectorImpl(
logContext,
taskId,
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index f0e748f35ea..06bce3a1bac 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -472,10 +472,8 @@ public class TopologyTestDriver implements Closeable {
StreamsConfig.EXACTLY_ONCE_V2.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
logContext,
stateDirectory,
- new MockChangelogRegister(),
processorTopology.storeToChangelogTopic(),
- new HashSet<>(partitionsByInputTopic.values()),
- false);
+ new HashSet<>(partitionsByInputTopic.values()));
final RecordCollector recordCollector = new RecordCollectorImpl(
logContext,
TASK_ID,
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py
b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 6b7041167ca..8f2f8859836 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -24,7 +24,7 @@ from kafkatest.services.streams import
StreamsSmokeTestDriverService, StreamsSmo
from kafkatest.tests.streams.utils import extract_generation_from_logs,
extract_generation_id
from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4,
LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5,
LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, \
- LATEST_4_0, LATEST_4_1, DEV_BRANCH, DEV_VERSION, KafkaVersion
+ LATEST_4_0, LATEST_4_1, LATEST_4_2, DEV_BRANCH, DEV_VERSION, KafkaVersion
# broker 0.10.0 is not compatible with newer Kafka Streams versions
# broker 0.10.1 and 0.10.2 do not support headers, as required by suppress()
(since v2.2.1)
@@ -174,26 +174,31 @@ class StreamsUpgradeTest(Test):
self.stop_and_await()
@cluster(num_nodes=6)
- @matrix(from_version=[str(LATEST_3_2), str(DEV_VERSION)], upgrade=[True,
False], metadata_quorum=[quorum.combined_kraft])
+ @matrix(from_version=[str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9),
str(LATEST_4_0), str(LATEST_4_1), str(LATEST_4_2)],
+ upgrade=[True, False],
+ metadata_quorum=[quorum.combined_kraft])
def test_upgrade_downgrade_state_updater(self, from_version, upgrade,
metadata_quorum):
"""
- Starts 3 KafkaStreams instances, and enables / disables state
restoration
+ Starts 3 KafkaStreams instances, and tests upgrade/downgrade state
restoration
for the instances in a rolling bounce.
- Once same-thread state restoration is removed from the code, this test
- should use different versions of the code.
+ For versions before 3.8, state updater did not exist (always disabled).
+ For versions 3.8 to 4.2, state updater can be disabled via config.
+ For DEV_VERSION (4.3+), state updater is always enabled (config
removed).
"""
to_version=str(DEV_VERSION)
if upgrade:
- extra_properties_first = { '__state.updater.enabled__': 'false' }
+ # state updater disabled to always enabled
+ extra_properties_first = {'__state.updater.enabled__': 'false'}
first_version = from_version
- extra_properties_second = { '__state.updater.enabled__': 'true' }
+ extra_properties_second = {} # config is removed
second_version = to_version
else:
- extra_properties_first = { '__state.updater.enabled__': 'true' }
+ # state updater always enabled to disabled
+ extra_properties_first = {} # config is removed
first_version = to_version
- extra_properties_second = { '__state.updater.enabled__': 'false' }
+ extra_properties_second = {'__state.updater.enabled__': 'false'}
second_version = from_version
self.set_up_services()