This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 73dc373714c KAFKA-18913: Remove all code related to non-stateupdater 
path (#21059)
73dc373714c is described below

commit 73dc373714c3602ba4211fbe5602ddfd8fb252e2
Author: Shashank <[email protected]>
AuthorDate: Wed Dec 10 05:08:04 2025 -0800

    KAFKA-18913: Remove all code related to non-stateupdater path (#21059)
    
    This PR cleans up all non stateupdater code. We also remove the config
    `__state.updater.enabled__` which means users will no longer have the
    option to disable the stateupdater.
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 committer-tools/README.md                          |   2 +-
 .../org/apache/kafka/streams/StreamsConfig.java    |   7 -
 .../processor/internals/ActiveTaskCreator.java     |  10 +-
 .../processor/internals/ProcessorStateManager.java |  56 +---
 .../processor/internals/StandbyTaskCreator.java    |  12 +-
 .../processor/internals/StateDirectory.java        |   4 +-
 .../processor/internals/StoreChangelogReader.java  |  10 +-
 .../streams/processor/internals/StreamThread.java  | 131 +++------
 .../streams/processor/internals/TaskManager.java   | 315 +++++++--------------
 .../kafka/streams/processor/internals/Tasks.java   |  11 -
 .../streams/processor/internals/TasksRegistry.java |   2 -
 .../processor/internals/ActiveTaskCreatorTest.java |   2 -
 .../internals/ProcessorStateManagerTest.java       |  57 +---
 .../processor/internals/StreamThreadTest.java      |  18 +-
 .../processor/internals/TaskManagerTest.java       |   5 +-
 .../StreamThreadStateStoreProviderTest.java        |  14 +-
 .../apache/kafka/streams/TopologyTestDriver.java   |   4 +-
 .../tests/streams/streams_upgrade_test.py          |  23 +-
 18 files changed, 187 insertions(+), 496 deletions(-)

diff --git a/committer-tools/README.md b/committer-tools/README.md
index 94f714f959a..4944c6f257f 100644
--- a/committer-tools/README.md
+++ b/committer-tools/README.md
@@ -101,5 +101,5 @@ python find-unfinished-test.py 
~/Downloads/logs_28218821016/5_build\ _\ JUnit\ t
 
 Found tests that were started, but not finished:
 
-2024-09-10T20:31:26.6830206Z Gradle Test Run :streams:test > Gradle Test 
Executor 47 > StreamThreadTest > 
shouldReturnErrorIfProducerInstanceIdNotInitialized(boolean, boolean) > 
"shouldReturnErrorIfProducerInstanceIdNotInitialized(boolean, 
boolean).stateUpdaterEnabled=true, processingThreadsEnabled=true" STARTED
+2024-09-10T20:31:26.6830206Z Gradle Test Run :streams:test > Gradle Test 
Executor 47 > StreamThreadTest > 
shouldReturnErrorIfProducerInstanceIdNotInitialized(boolean) > 
"shouldReturnErrorIfProducerInstanceIdNotInitialized(boolean).processingThreadsEnabled=true"
 STARTED
 ```
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index fdb409d1258..1fdf6654b90 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1309,13 +1309,6 @@ public class StreamsConfig extends AbstractConfig {
         // Private API used to control the prefix of the auto created topics
         public static final String TOPIC_PREFIX_ALTERNATIVE = 
"__internal.override.topic.prefix__";
 
-        // Private API to enable the state updater (i.e. state updating on a 
dedicated thread)
-        public static final String STATE_UPDATER_ENABLED = 
"__state.updater.enabled__";
-
-        public static boolean stateUpdaterEnabled(final Map<String, Object> 
configs) {
-            return true; // always enabled
-        }
-
         // Private API to enable processing threads (i.e. polling is decoupled 
from processing)
         public static final String PROCESSING_THREADS_ENABLED = 
"__processing.threads.enabled__";
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 448853c6367..24ff8fabf0c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -52,7 +52,6 @@ class ActiveTaskCreator {
     private final StreamsConfig applicationConfig;
     private final StreamsMetricsImpl streamsMetrics;
     private final StateDirectory stateDirectory;
-    private final ChangelogReader storeChangelogReader;
     private final ThreadCache cache;
     private final Time time;
     private final KafkaClientSupplier clientSupplier;
@@ -62,7 +61,6 @@ class ActiveTaskCreator {
     private final Logger log;
     private final Sensor createTaskSensor;
     private final StreamsProducer streamsProducer;
-    private final boolean stateUpdaterEnabled;
     private final boolean processingThreadsEnabled;
     private boolean isClosed = false;
 
@@ -70,7 +68,6 @@ class ActiveTaskCreator {
                       final StreamsConfig applicationConfig,
                       final StreamsMetricsImpl streamsMetrics,
                       final StateDirectory stateDirectory,
-                      final ChangelogReader storeChangelogReader,
                       final ThreadCache cache,
                       final Time time,
                       final KafkaClientSupplier clientSupplier,
@@ -78,13 +75,11 @@ class ActiveTaskCreator {
                       final int threadIdx,
                       final UUID processId,
                       final LogContext logContext,
-                      final boolean stateUpdaterEnabled,
                       final boolean processingThreadsEnabled) {
         this.topologyMetadata = topologyMetadata;
         this.applicationConfig = applicationConfig;
         this.streamsMetrics = streamsMetrics;
         this.stateDirectory = stateDirectory;
-        this.storeChangelogReader = storeChangelogReader;
         this.cache = cache;
         this.time = time;
         this.clientSupplier = clientSupplier;
@@ -92,7 +87,6 @@ class ActiveTaskCreator {
         this.threadIdx = threadIdx;
         this.processId = processId;
         this.log = logContext.logger(getClass());
-        this.stateUpdaterEnabled = stateUpdaterEnabled;
         this.processingThreadsEnabled = processingThreadsEnabled;
 
         createTaskSensor = ThreadMetrics.createTaskSensor(threadId, 
streamsMetrics);
@@ -154,10 +148,8 @@ class ActiveTaskCreator {
                 eosEnabled(applicationConfig),
                 logContext,
                 stateDirectory,
-                storeChangelogReader,
                 topology.storeToChangelogTopic(),
-                partitions,
-                stateUpdaterEnabled);
+                partitions);
 
             final InternalProcessorContext<Object, Object> context = new 
ProcessorContextImpl(
                 taskId,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 3506845d288..5f68ff4e067 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -40,7 +40,6 @@ import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -170,7 +169,6 @@ public class ProcessorStateManager implements StateManager {
 
     private final TaskId taskId;
     private final boolean eosEnabled;
-    private ChangelogRegister changelogReader;
     private final Collection<TopicPartition> sourcePartitions;
     private final Map<String, String> storeToChangelogTopic;
 
@@ -180,7 +178,6 @@ public class ProcessorStateManager implements StateManager {
 
     private final File baseDir;
     private final OffsetCheckpoint checkpointFile;
-    private final boolean stateUpdaterEnabled;
 
     private TaskType taskType;
     private Logger log;
@@ -202,19 +199,15 @@ public class ProcessorStateManager implements 
StateManager {
                                  final boolean eosEnabled,
                                  final LogContext logContext,
                                  final StateDirectory stateDirectory,
-                                 final ChangelogRegister changelogReader,
                                  final Map<String, String> 
storeToChangelogTopic,
-                                 final Collection<TopicPartition> 
sourcePartitions,
-                                 final boolean stateUpdaterEnabled) throws 
ProcessorStateException {
+                                 final Collection<TopicPartition> 
sourcePartitions) throws ProcessorStateException {
         this.storeToChangelogTopic = storeToChangelogTopic;
         this.log = logContext.logger(ProcessorStateManager.class);
         this.logPrefix = logContext.logPrefix();
         this.taskId = taskId;
         this.taskType = taskType;
         this.eosEnabled = eosEnabled;
-        this.changelogReader = changelogReader;
         this.sourcePartitions = sourcePartitions;
-        this.stateUpdaterEnabled = stateUpdaterEnabled;
 
         this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId);
         this.checkpointFile = new 
OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId));
@@ -225,16 +218,15 @@ public class ProcessorStateManager implements 
StateManager {
     /**
      * Special constructor used by {@link StateDirectory} to partially 
initialize startup tasks for local state, before
      * they're assigned to a thread. When the task is assigned to a thread, 
the initialization of this StateManager is
-     * completed in {@link #assignToStreamThread(LogContext, 
ChangelogRegister, Collection)}.
+     * completed in {@link #assignToStreamThread(LogContext, Collection)}.
      */
     static ProcessorStateManager createStartupTaskStateManager(final TaskId 
taskId,
                                                                final boolean 
eosEnabled,
                                                                final 
LogContext logContext,
                                                                final 
StateDirectory stateDirectory,
                                                                final 
Map<String, String> storeToChangelogTopic,
-                                                               final 
Set<TopicPartition> sourcePartitions,
-                                                               final boolean 
stateUpdaterEnabled) {
-        return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, 
logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions, 
stateUpdaterEnabled);
+                                                               final 
Set<TopicPartition> sourcePartitions) {
+        return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled, 
logContext, stateDirectory, storeToChangelogTopic, sourcePartitions);
     }
 
     /**
@@ -243,26 +235,17 @@ public class ProcessorStateManager implements 
StateManager {
      * assigned StreamThread's context.
      */
     void assignToStreamThread(final LogContext logContext,
-                              final ChangelogRegister changelogReader,
                               final Collection<TopicPartition> 
sourcePartitions) {
-        if (this.changelogReader != null) {
-            throw new IllegalStateException("Attempted to replace an existing 
changelogReader on a StateManager without closing it.");
-        }
         this.sourcePartitions.clear();
         this.log = logContext.logger(ProcessorStateManager.class);
         this.logPrefix = logContext.logPrefix();
-        this.changelogReader = changelogReader;
         this.sourcePartitions.addAll(sourcePartitions);
     }
 
     void registerStateStores(final List<StateStore> allStores, final 
InternalProcessorContext<?, ?> processorContext) {
         processorContext.uninitialize();
         for (final StateStore store : allStores) {
-            if (stores.containsKey(store.name())) {
-                if (!stateUpdaterEnabled) {
-                    maybeRegisterStoreWithChangelogReader(store.name());
-                }
-            } else {
+            if (!stores.containsKey(store.name())) {
                 store.init(processorContext, store);
             }
             log.trace("Registered state store {}", store.name());
@@ -346,22 +329,6 @@ public class ProcessorStateManager implements StateManager 
{
         }
     }
 
-    private void maybeRegisterStoreWithChangelogReader(final String storeName) 
{
-        if (isLoggingEnabled(storeName) && changelogReader != null) {
-            changelogReader.register(getStorePartition(storeName), this);
-        }
-    }
-
-    private List<TopicPartition> getAllChangelogTopicPartitions() {
-        final List<TopicPartition> allChangelogPartitions = new ArrayList<>();
-        for (final StateStoreMetadata storeMetadata : stores.values()) {
-            if (storeMetadata.changelogPartition != null) {
-                allChangelogPartitions.add(storeMetadata.changelogPartition);
-            }
-        }
-        return allChangelogPartitions;
-    }
-
     @Override
     public File baseDir() {
         return baseDir;
@@ -404,10 +371,6 @@ public class ProcessorStateManager implements StateManager 
{
         // on the state manager this state store would be closed as well
         stores.put(storeName, storeMetadata);
 
-        if (!stateUpdaterEnabled) {
-            maybeRegisterStoreWithChangelogReader(storeName);
-        }
-
         log.debug("Registered state store {} to its state manager", storeName);
     }
 
@@ -616,10 +579,6 @@ public class ProcessorStateManager implements StateManager 
{
     public void close() throws ProcessorStateException {
         log.debug("Closing its state manager and all the registered state 
stores: {}", stores);
 
-        if (!stateUpdaterEnabled && changelogReader != null) {
-            changelogReader.unregister(getAllChangelogTopicPartitions());
-        }
-
         RuntimeException firstException = null;
         // attempting to close the stores, just in case they
         // are not closed by a ProcessorNode yet
@@ -664,11 +623,6 @@ public class ProcessorStateManager implements StateManager 
{
     void recycle() {
         log.debug("Recycling state for {} task {}.", taskType, taskId);
 
-        if (!stateUpdaterEnabled && changelogReader != null) {
-            final List<TopicPartition> allChangelogs = 
getAllChangelogTopicPartitions();
-            changelogReader.unregister(allChangelogs);
-        }
-
         // when the state manager is recycled to be used, future writes may 
bypass its store's caching
         // layer if they are from restoration, hence we need to clear the 
state store's caches just in case
         // See KAFKA-14172 for details
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
index eb8bcafea69..693cb4ed63a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java
@@ -40,27 +40,21 @@ class StandbyTaskCreator {
     private final StreamsConfig applicationConfig;
     private final StreamsMetricsImpl streamsMetrics;
     private final StateDirectory stateDirectory;
-    private final ChangelogReader storeChangelogReader;
     private final ThreadCache dummyCache;
     private final Logger log;
     private final Sensor createTaskSensor;
-    private final boolean stateUpdaterEnabled;
 
     StandbyTaskCreator(final TopologyMetadata topologyMetadata,
                        final StreamsConfig applicationConfig,
                        final StreamsMetricsImpl streamsMetrics,
                        final StateDirectory stateDirectory,
-                       final ChangelogReader storeChangelogReader,
                        final String threadId,
-                       final LogContext logContext,
-                       final boolean stateUpdaterEnabled) {
+                       final LogContext logContext) {
         this.topologyMetadata = topologyMetadata;
         this.applicationConfig = applicationConfig;
         this.streamsMetrics = streamsMetrics;
         this.stateDirectory = stateDirectory;
-        this.storeChangelogReader = storeChangelogReader;
         this.log = logContext.logger(getClass());
-        this.stateUpdaterEnabled = stateUpdaterEnabled;
 
         createTaskSensor = ThreadMetrics.createTaskSensor(threadId, 
streamsMetrics);
 
@@ -87,10 +81,8 @@ class StandbyTaskCreator {
                     eosEnabled(applicationConfig),
                     getLogContext(taskId),
                     stateDirectory,
-                    storeChangelogReader,
                     topology.storeToChangelogTopic(),
-                    partitions,
-                    stateUpdaterEnabled);
+                    partitions);
 
                 final InternalProcessorContext<?, ?> context = new 
ProcessorContextImpl(
                     taskId,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 8ea2d3ae65a..bad10567ddc 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -213,7 +213,6 @@ public class StateDirectory implements AutoCloseable {
         if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
             final ThreadCache dummyCache = new ThreadCache(logContext, 0, 
streamsMetrics);
             final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
-            final boolean stateUpdaterEnabled = 
StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals());
 
             // discover all non-empty task directories in StateDirectory
             for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) {
@@ -235,8 +234,7 @@ public class StateDirectory implements AutoCloseable {
                         logContext,
                         this,
                         subTopology.storeToChangelogTopic(),
-                        inputPartitions,
-                        stateUpdaterEnabled
+                        inputPartitions
                     );
 
                     final InternalProcessorContext<Object, Object> context = 
new ProcessorContextImpl(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 5e09ceb62da..3ec8aec6fca 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -33,7 +33,6 @@ import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsConfig.InternalConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.processor.StandbyUpdateListener;
@@ -212,8 +211,6 @@ public class StoreChangelogReader implements 
ChangelogReader {
     private final Consumer<byte[], byte[]> restoreConsumer;
     private final StateRestoreListener stateRestoreListener;
 
-    private final boolean stateUpdaterEnabled;
-
     // source of the truth of the current registered changelogs;
     // NOTE a changelog would only be removed when its corresponding task
     // is being removed from the thread; otherwise it would stay in this map 
even after completed
@@ -243,8 +240,6 @@ public class StoreChangelogReader implements 
ChangelogReader {
         this.stateRestoreListener = stateRestoreListener;
         this.standbyUpdateListener = standbyUpdateListener;
 
-        this.stateUpdaterEnabled = 
InternalConfig.stateUpdaterEnabled(config.originals());
-
         this.groupId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         this.pollTime = 
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
         this.updateOffsetIntervalMs = 
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) == Long.MAX_VALUE ?
@@ -491,15 +486,12 @@ public class StoreChangelogReader implements 
ChangelogReader {
 
     private ConsumerRecords<byte[], byte[]> 
pollRecordsFromRestoreConsumer(final Map<TaskId, Task> tasks,
                                                                            
final Set<TopicPartition> restoringChangelogs) {
-        // If we are updating only standby tasks, and are not using a separate 
thread, we should
-        // use a non-blocking poll to unblock the processing as soon as 
possible.
-        final boolean useNonBlockingPoll = state == 
ChangelogReaderState.STANDBY_UPDATING && !stateUpdaterEnabled;
         final ConsumerRecords<byte[], byte[]> polledRecords;
 
         try {
             pauseResumePartitions(tasks, restoringChangelogs);
 
-            polledRecords = restoreConsumer.poll(useNonBlockingPoll ? 
Duration.ZERO : pollTime);
+            polledRecords = restoreConsumer.poll(pollTime);
 
             // TODO (?) If we cannot fetch records during restore, should we 
trigger `task.timeout.ms` ?
             // TODO (?) If we cannot fetch records for standby task, should we 
trigger `task.timeout.ms` ?
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f208567c32d..84c4f51734a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -372,7 +372,6 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         new 
AtomicReference<>(org.apache.kafka.streams.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
     private final AtomicLong lastShutdownWarningTimestamp = new AtomicLong(0L);
     private final boolean eosEnabled;
-    private final boolean stateUpdaterEnabled;
     private final boolean processingThreadsEnabled;
 
     private volatile long fetchDeadlineClientInstanceId = -1;
@@ -399,15 +398,12 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                                       final Runnable shutdownErrorHook,
                                       final BiConsumer<Throwable, Boolean> 
streamsUncaughtExceptionHandler) {
 
-        final boolean stateUpdaterEnabled = 
InternalConfig.stateUpdaterEnabled(config.originals());
-
         final String threadId = clientId + THREAD_ID_SUBSTRING + threadIdx;
         final String stateUpdaterId = threadId.replace(THREAD_ID_SUBSTRING, 
STATE_UPDATER_ID_SUBSTRING);
-        final String restorationThreadId = stateUpdaterEnabled ? 
stateUpdaterId : threadId;
 
         final String logPrefix = String.format("stream-thread [%s] ", 
threadId);
         final LogContext logContext = new LogContext(logPrefix);
-        final LogContext restorationLogContext = stateUpdaterEnabled ? new 
LogContext(String.format("state-updater [%s] ", restorationThreadId)) : 
logContext;
+        final LogContext restorationLogContext = new 
LogContext(String.format("state-updater [%s] ", stateUpdaterId));
         final Logger log = LoggerFactory.getLogger(StreamThread.class);
 
         final ReferenceContainer referenceContainer = new ReferenceContainer();
@@ -417,7 +413,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         referenceContainer.clientTags = config.getClientTags();
 
         log.info("Creating restore consumer client for thread {}", threadId);
-        final Map<String, Object> restoreConsumerConfigs = 
config.getRestoreConsumerConfigs(restoreConsumerClientId(restorationThreadId));
+        final Map<String, Object> restoreConsumerConfigs = 
config.getRestoreConsumerConfigs(restoreConsumerClientId(stateUpdaterId));
         final Consumer<byte[], byte[]> restoreConsumer = 
clientSupplier.getRestoreConsumer(restoreConsumerConfigs);
 
         final StoreChangelogReader changelogReader = new StoreChangelogReader(
@@ -438,7 +434,6 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             config,
             streamsMetrics,
             stateDirectory,
-            changelogReader,
             cache,
             time,
             clientSupplier,
@@ -446,7 +441,6 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             threadIdx,
             processId,
             logContext,
-            stateUpdaterEnabled,
             proceessingThreadsEnabled
         );
         final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator(
@@ -454,20 +448,17 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             config,
             streamsMetrics,
             stateDirectory,
-            changelogReader,
             threadId,
-            logContext,
-            stateUpdaterEnabled);
+            logContext);
 
         final Tasks tasks = new Tasks(logContext);
         final boolean processingThreadsEnabled =
             InternalConfig.processingThreadsEnabled(config.originals());
 
         final DefaultTaskManager schedulingTaskManager =
-            maybeCreateSchedulingTaskManager(processingThreadsEnabled, 
stateUpdaterEnabled, topologyMetadata, time, threadId, tasks);
+            maybeCreateSchedulingTaskManager(processingThreadsEnabled, 
topologyMetadata, time, threadId, tasks);
         final StateUpdater stateUpdater =
-            maybeCreateStateUpdater(
-                stateUpdaterEnabled,
+            createStateUpdater(
                 streamsMetrics,
                 config,
                 restoreConsumer,
@@ -622,16 +613,11 @@ public class StreamThread extends Thread implements 
ProcessingThread {
     }
 
     private static DefaultTaskManager maybeCreateSchedulingTaskManager(final 
boolean processingThreadsEnabled,
-                                                                       final 
boolean stateUpdaterEnabled,
                                                                        final 
TopologyMetadata topologyMetadata,
                                                                        final 
Time time,
                                                                        final 
String threadId,
                                                                        final 
Tasks tasks) {
         if (processingThreadsEnabled) {
-            if (!stateUpdaterEnabled) {
-                throw new IllegalStateException("Processing threads require 
the state updater to be enabled");
-            }
-
             final DefaultTaskManager defaultTaskManager = new 
DefaultTaskManager(
                 time,
                 threadId,
@@ -646,29 +632,24 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         return null;
     }
 
-    private static StateUpdater maybeCreateStateUpdater(final boolean 
stateUpdaterEnabled,
-                                                                final 
StreamsMetricsImpl streamsMetrics,
-                                                                final 
StreamsConfig streamsConfig,
-                                                                final 
Consumer<byte[], byte[]> restoreConsumer,
-                                                                final 
ChangelogReader changelogReader,
-                                                                final 
TopologyMetadata topologyMetadata,
-                                                                final Time 
time,
-                                                                final String 
clientId,
-                                                                final int 
threadIdx) {
-        if (stateUpdaterEnabled) {
-            final String name = clientId + STATE_UPDATER_ID_SUBSTRING + 
threadIdx;
-            return new DefaultStateUpdater(
-                name,
-                streamsMetrics,
-                streamsConfig,
-                restoreConsumer,
-                changelogReader,
-                topologyMetadata,
-                time
-            );
-        } else {
-            return null;
-        }
+    private static StateUpdater createStateUpdater(final StreamsMetricsImpl 
streamsMetrics,
+                                                        final StreamsConfig 
streamsConfig,
+                                                        final Consumer<byte[], 
byte[]> restoreConsumer,
+                                                        final ChangelogReader 
changelogReader,
+                                                        final TopologyMetadata 
topologyMetadata,
+                                                        final Time time,
+                                                        final String clientId,
+                                                        final int threadIdx) {
+        final String name = clientId + STATE_UPDATER_ID_SUBSTRING + threadIdx;
+        return new DefaultStateUpdater(
+            name,
+            streamsMetrics,
+            streamsConfig,
+            restoreConsumer,
+            changelogReader,
+            topologyMetadata,
+            time
+        );
     }
 
     private static Optional<StreamsRebalanceData.HostInfo> parseHostInfo(final 
String endpoint) {
@@ -875,7 +856,6 @@ public class StreamThread extends Thread implements 
ProcessingThread {
 
         this.numIterations = 1;
         this.eosEnabled = eosEnabled(config);
-        this.stateUpdaterEnabled = 
InternalConfig.stateUpdaterEnabled(config.originals());
         this.processingThreadsEnabled = 
InternalConfig.processingThreadsEnabled(config.originals());
         this.logSummaryIntervalMs = 
config.getLong(StreamsConfig.LOG_SUMMARY_INTERVAL_MS_CONFIG);
 
@@ -905,9 +885,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         }
         boolean cleanRun = false;
         try {
-            if (stateUpdaterEnabled) {
-                taskManager.init();
-            }
+            taskManager.init();
             cleanRun = runLoop();
         } catch (final Throwable e) {
             failedStreamThreadSensor.record();
@@ -1024,27 +1002,6 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                 }
             }
 
-
-            if (!stateUpdaterEnabled && 
!restoreConsumerInstanceIdFuture.isDone()) {
-                if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
-                    try {
-                        
restoreConsumerInstanceIdFuture.complete(restoreConsumer.clientInstanceId(Duration.ZERO));
-                    } catch (final IllegalStateException disabledError) {
-                        // if telemetry is disabled on a client, we swallow 
the error,
-                        // to allow returning a partial result for all other 
clients
-                        restoreConsumerInstanceIdFuture.complete(null);
-                    } catch (final TimeoutException swallow) {
-                        // swallow
-                    } catch (final Exception error) {
-                        
restoreConsumerInstanceIdFuture.completeExceptionally(error);
-                    }
-                } else {
-                    restoreConsumerInstanceIdFuture.completeExceptionally(
-                        new TimeoutException("Could not retrieve restore 
consumer client instance id.")
-                    );
-                }
-            }
-
             if (!producerInstanceIdFuture.isDone()) {
                 if (fetchDeadlineClientInstanceId >= time.milliseconds()) {
                     try {
@@ -1067,10 +1024,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                 }
             }
 
-            if (mainConsumerInstanceIdFuture.isDone()
-                && (!stateUpdaterEnabled && 
restoreConsumerInstanceIdFuture.isDone())
-                && producerInstanceIdFuture.isDone()) {
-
+            if (mainConsumerInstanceIdFuture.isDone() && 
producerInstanceIdFuture.isDone()) {
                 fetchDeadlineClientInstanceId = -1L;
             }
         }
@@ -1220,10 +1174,6 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             return;
         }
 
-        if (!stateUpdaterEnabled) {
-            initializeAndRestorePhase();
-        }
-
         // TODO: we should record the restore latency and its relative time 
spent ratio after
         //       we figure out how to move this method out of the stream thread
         advanceNowAndComputeLatency();
@@ -1232,8 +1182,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         long totalCommitLatency = 0L;
         long totalProcessLatency = 0L;
         long totalPunctuateLatency = 0L;
-        if (state == State.RUNNING
-            || (stateUpdaterEnabled && 
isStartingRunningOrPartitionAssigned())) {
+        if (isStartingRunningOrPartitionAssigned()) {
 
             taskManager.updateLags();
 
@@ -1248,9 +1197,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
              */
             do {
 
-                if (stateUpdaterEnabled) {
-                    checkStateUpdater();
-                }
+                checkStateUpdater();
 
                 log.debug("Processing tasks with {} iterations.", 
numIterations);
                 final int processed = taskManager.process(numIterations, time);
@@ -1469,15 +1416,11 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         final ConsumerRecords<byte[], byte[]> records;
         log.debug("Invoking poll on main Consumer");
 
-        if (state == State.PARTITIONS_ASSIGNED && !stateUpdaterEnabled) {
-            // try to fetch some records with zero poll millis
-            // to unblock the restoration as soon as possible
-            records = pollRequests(Duration.ZERO);
-        } else if (state == State.PARTITIONS_REVOKED) {
+        if (state == State.PARTITIONS_REVOKED) {
             // try to fetch some records with zero poll millis to unblock
             // other useful work while waiting for the join response
             records = pollRequests(Duration.ZERO);
-        } else if (state == State.RUNNING || state == State.STARTING || (state 
== State.PARTITIONS_ASSIGNED && stateUpdaterEnabled)) {
+        } else if (state == State.RUNNING || state == State.STARTING || state 
== State.PARTITIONS_ASSIGNED) {
             // try to fetch some records with normal poll time
             // in order to get long polling
             records = pollRequests(pollTime);
@@ -1839,7 +1782,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             }
 
             committed = taskManager.commit(
-                taskManager.allOwnedTasks()
+                taskManager.allRunningTasks()
                     .values()
                     .stream()
                     .filter(t -> t.state() == Task.State.RUNNING || t.state() 
== Task.State.RESTORING)
@@ -2079,18 +2022,8 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         }
         result.put(getName() + "-consumer", mainConsumerInstanceIdFuture);
 
-        if (stateUpdaterEnabled) {
-            restoreConsumerInstanceIdFuture = 
stateUpdater.restoreConsumerInstanceId(timeout);
-        } else {
-            if (restoreConsumerInstanceIdFuture.isDone()) {
-                if 
(restoreConsumerInstanceIdFuture.isCompletedExceptionally()) {
-                    restoreConsumerInstanceIdFuture = new KafkaFutureImpl<>();
-                    setDeadline = true;
-                }
-            } else {
-                setDeadline = true;
-            }
-        }
+        restoreConsumerInstanceIdFuture = 
stateUpdater.restoreConsumerInstanceId(timeout);
+
         result.put(getName() + "-restore-consumer", 
restoreConsumerInstanceIdFuture);
 
         if (producerInstanceIdFuture.isDone()) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 4092381fdf4..57f9ecc38d4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -151,9 +151,7 @@ public class TaskManager {
     }
 
     void init() {
-        if (stateUpdater != null) {
-            this.stateUpdater.start();
-        }
+        this.stateUpdater.start();
     }
     void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) {
         this.mainConsumer = mainConsumer;
@@ -198,18 +196,14 @@ public class TaskManager {
     void handleRebalanceComplete() {
         // we should pause consumer only within the listener since
         // before then the assignment has not been updated yet.
-        if (stateUpdater == null) {
-            mainConsumer.pause(mainConsumer.assignment());
-        } else {
-            // All tasks that are owned by the task manager are ready and do 
not need to be paused
-            final Set<TopicPartition> partitionsNotToPause = 
tasks.allNonFailedTasks()
-                .stream()
-                .flatMap(task -> task.inputPartitions().stream())
-                .collect(Collectors.toSet());
-            final Set<TopicPartition> partitionsToPause = new 
HashSet<>(mainConsumer.assignment());
-            partitionsToPause.removeAll(partitionsNotToPause);
-            mainConsumer.pause(partitionsToPause);
-        }
+        // All tasks that are owned by the task manager are ready and do not 
need to be paused
+        final Set<TopicPartition> partitionsNotToPause = 
tasks.allNonFailedTasks()
+            .stream()
+            .flatMap(task -> task.inputPartitions().stream())
+            .collect(Collectors.toSet());
+        final Set<TopicPartition> partitionsToPause = new 
HashSet<>(mainConsumer.assignment());
+        partitionsToPause.removeAll(partitionsNotToPause);
+        mainConsumer.pause(partitionsToPause);
 
         releaseLockedUnassignedTaskDirectories();
 
@@ -272,14 +266,6 @@ public class TaskManager {
     private void closeDirtyAndRevive(final Collection<Task> 
taskWithChangelogs, final boolean markAsCorrupted) {
         for (final Task task : taskWithChangelogs) {
             if (task.state() != State.CLOSED) {
-                final Collection<TopicPartition> corruptedPartitions = 
task.changelogPartitions();
-
-                // mark corrupted partitions to not be checkpointed, and then 
close the task as dirty
-                // TODO: this step should be removed as we complete migrating 
to state updater
-                if (markAsCorrupted && stateUpdater == null) {
-                    task.markChangelogAsCorrupted(corruptedPartitions);
-                }
-
                 try {
                     // we do not need to take the returned offsets since we 
are not going to commit anyways;
                     // this call is only used for active tasks to flush the 
cache before suspending and
@@ -324,20 +310,15 @@ public class TaskManager {
 
                 task.addPartitionsForOffsetReset(assignedToPauseAndReset);
             }
-            if (stateUpdater != null) {
-                tasks.removeTask(task);
-            }
+
+            tasks.removeTask(task);
             task.revive();
-            if (stateUpdater != null) {
-                tasks.addPendingTasksToInit(Collections.singleton(task));
-            }
+            tasks.addPendingTasksToInit(Collections.singleton(task));
         }
     }
 
     private Map<Task, Set<TopicPartition>> assignStartupTasks(final 
Map<TaskId, Set<TopicPartition>> tasksToAssign,
-                                                              final String 
threadLogPrefix,
-                                                              final 
TopologyMetadata topologyMetadata,
-                                                              final 
ChangelogRegister changelogReader) {
+                                                              final String 
threadLogPrefix) {
         if (stateDirectory.hasStartupTasks()) {
             final Map<Task, Set<TopicPartition>> assignedTasks = new 
HashMap<>(tasksToAssign.size());
             for (final Map.Entry<TaskId, Set<TopicPartition>> entry : 
tasksToAssign.entrySet()) {
@@ -346,7 +327,7 @@ public class TaskManager {
                 if (task != null) {
                     // replace our dummy values with the real ones, now we 
know our thread and assignment
                     final Set<TopicPartition> inputPartitions = 
entry.getValue();
-                    task.stateManager().assignToStreamThread(new 
LogContext(threadLogPrefix), changelogReader, inputPartitions);
+                    task.stateManager().assignToStreamThread(new 
LogContext(threadLogPrefix), inputPartitions);
                     updateInputPartitionsOfStandbyTaskIfTheyChanged(task, 
inputPartitions);
 
                     assignedTasks.put(task, inputPartitions);
@@ -401,18 +382,14 @@ public class TaskManager {
         // 2. for tasks that have changed active/standby status, just recycle 
and skip re-creating them
         // 3. otherwise, close them since they are no longer owned
         final Map<TaskId, RuntimeException> failedTasks = new 
LinkedHashMap<>();
-        if (stateUpdater == null) {
-            handleTasksWithoutStateUpdater(activeTasksToCreate, 
standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
-        } else {
-            handleTasksWithStateUpdater(
-                activeTasksToCreate,
-                standbyTasksToCreate,
-                tasksToRecycle,
-                tasksToCloseClean,
-                failedTasks
-            );
-            
failedTasks.putAll(collectExceptionsAndFailedTasksFromStateUpdater());
-        }
+        handleTasks(
+            activeTasksToCreate,
+            standbyTasksToCreate,
+            tasksToRecycle,
+            tasksToCloseClean,
+            failedTasks
+        );
+        failedTasks.putAll(collectExceptionsAndFailedTasksFromStateUpdater());
 
         final Map<TaskId, RuntimeException> taskCloseExceptions = 
closeAndRecycleTasks(tasksToRecycle, tasksToCloseClean);
 
@@ -475,53 +452,8 @@ public class TaskManager {
         final Collection<Task> newActiveTasks = 
activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate);
         final Collection<Task> newStandbyTasks = 
standbyTaskCreator.createTasks(standbyTasksToCreate);
 
-        if (stateUpdater == null) {
-            tasks.addActiveTasks(newActiveTasks);
-            tasks.addStandbyTasks(newStandbyTasks);
-        } else {
-            tasks.addPendingTasksToInit(newActiveTasks);
-            tasks.addPendingTasksToInit(newStandbyTasks);
-        }
-    }
-
-    private void handleTasksWithoutStateUpdater(final Map<TaskId, 
Set<TopicPartition>> activeTasksToCreate,
-                                                final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
-                                                final Map<Task, 
Set<TopicPartition>> tasksToRecycle,
-                                                final Set<Task> 
tasksToCloseClean) {
-        final Map<Task, Set<TopicPartition>> startupStandbyTasksToRecycle = 
assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
-        final Map<Task, Set<TopicPartition>> startupStandbyTasksToUse = 
assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
-
-        // recycle the startup standbys to active
-        tasks.addStandbyTasks(startupStandbyTasksToRecycle.keySet());
-
-        // use startup Standbys as real Standby tasks
-        tasks.addStandbyTasks(startupStandbyTasksToUse.keySet());
-
-        for (final Task task : tasks.allTasks()) {
-            final TaskId taskId = task.id();
-            if (activeTasksToCreate.containsKey(taskId)) {
-                if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = 
activeTasksToCreate.get(taskId);
-                    if (tasks.updateActiveTaskInputPartitions(task, 
topicPartitions)) {
-                        task.updateInputPartitions(topicPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
-                    }
-                    task.resume();
-                } else {
-                    tasksToRecycle.put(task, activeTasksToCreate.get(taskId));
-                }
-                activeTasksToCreate.remove(taskId);
-            } else if (standbyTasksToCreate.containsKey(taskId)) {
-                if (!task.isActive()) {
-                    updateInputPartitionsOfStandbyTaskIfTheyChanged(task, 
standbyTasksToCreate.get(taskId));
-                    task.resume();
-                } else {
-                    tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-                }
-                standbyTasksToCreate.remove(taskId);
-            } else {
-                tasksToCloseClean.add(task);
-            }
-        }
+        tasks.addPendingTasksToInit(newActiveTasks);
+        tasks.addPendingTasksToInit(newStandbyTasks);
     }
 
     private void updateInputPartitionsOfStandbyTaskIfTheyChanged(final Task 
task,
@@ -545,11 +477,11 @@ public class TaskManager {
         }
     }
 
-    private void handleTasksWithStateUpdater(final Map<TaskId, 
Set<TopicPartition>> activeTasksToCreate,
-                                             final Map<TaskId, 
Set<TopicPartition>> standbyTasksToCreate,
-                                             final Map<Task, 
Set<TopicPartition>> tasksToRecycle,
-                                             final Set<Task> tasksToCloseClean,
-                                             final Map<TaskId, 
RuntimeException> failedTasks) {
+    private void handleTasks(final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
+                             final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
+                             final Map<Task, Set<TopicPartition>> 
tasksToRecycle,
+                             final Set<Task> tasksToCloseClean,
+                             final Map<TaskId, RuntimeException> failedTasks) {
         handleTasksPendingInitialization();
         handleStartupTaskReuse(activeTasksToCreate, standbyTasksToCreate, 
failedTasks);
         handleRestoringAndUpdatingTasks(activeTasksToCreate, 
standbyTasksToCreate, failedTasks);
@@ -572,8 +504,8 @@ public class TaskManager {
     private void handleStartupTaskReuse(final Map<TaskId, Set<TopicPartition>> 
activeTasksToCreate,
                                         final Map<TaskId, Set<TopicPartition>> 
standbyTasksToCreate,
                                         final Map<TaskId, RuntimeException> 
failedTasks) {
-        final Map<Task, Set<TopicPartition>> startupStandbyTasksToRecycle = 
assignStartupTasks(activeTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
-        final Map<Task, Set<TopicPartition>> startupStandbyTasksToUse = 
assignStartupTasks(standbyTasksToCreate, logPrefix, topologyMetadata, 
changelogReader);
+        final Map<Task, Set<TopicPartition>> startupStandbyTasksToRecycle = 
assignStartupTasks(activeTasksToCreate, logPrefix);
+        final Map<Task, Set<TopicPartition>> startupStandbyTasksToUse = 
assignStartupTasks(standbyTasksToCreate, logPrefix);
 
         // recycle the startup standbys to active, and remove them from the 
set of actives that need to be created
         if (!startupStandbyTasksToRecycle.isEmpty()) {
@@ -802,7 +734,7 @@ public class TaskManager {
             final Map.Entry<TaskId, Set<TopicPartition>> entry = iter.next();
             final TaskId taskId = entry.getKey();
             final boolean taskIsOwned = tasks.allTaskIds().contains(taskId)
-                || (stateUpdater != null && 
stateUpdater.tasks().stream().anyMatch(task -> task.id() == taskId));
+                || (stateUpdater.tasks().stream().anyMatch(task -> 
task.id().equals(taskId)));
             if (taskId.topologyName() != null && !taskIsOwned && 
!topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
                 log.info("Cannot create the assigned task {} since it's 
topology name cannot be recognized, will put it " +
                         "aside as pending for now and create later when 
topology metadata gets refreshed", taskId);
@@ -880,12 +812,8 @@ public class TaskManager {
             try {
                 if (oldTask.isActive()) {
                     final StandbyTask standbyTask = 
convertActiveToStandby((StreamTask) oldTask, inputPartitions);
-                    if (stateUpdater != null) {
-                        tasks.removeTask(oldTask);
-                        
tasks.addPendingTasksToInit(Collections.singleton(standbyTask));
-                    } else {
-                        tasks.replaceActiveWithStandby(standbyTask);
-                    }
+                    tasks.removeTask(oldTask);
+                    
tasks.addPendingTasksToInit(Collections.singleton(standbyTask));
                 } else {
                     final StreamTask activeTask = 
convertStandbyToActive((StandbyTask) oldTask, inputPartitions);
                     tasks.replaceStandbyWithActive(activeTask);
@@ -1265,24 +1193,22 @@ public class TaskManager {
     }
 
     private void revokeTasksInStateUpdater(final Set<TopicPartition> 
remainingRevokedPartitions) {
-        if (stateUpdater != null) {
-            final Map<TaskId, 
CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new 
LinkedHashMap<>();
-            final Map<TaskId, RuntimeException> failedTasksFromStateUpdater = 
new HashMap<>();
-            for (final Task restoringTask : stateUpdater.tasks()) {
-                if (restoringTask.isActive()) {
-                    if 
(remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
-                        futures.put(restoringTask.id(), 
stateUpdater.remove(restoringTask.id()));
-                        
remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());
-                    }
+        final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> 
futures = new LinkedHashMap<>();
+        final Map<TaskId, RuntimeException> failedTasksFromStateUpdater = new 
HashMap<>();
+        for (final Task restoringTask : stateUpdater.tasks()) {
+            if (restoringTask.isActive()) {
+                if 
(remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
+                    futures.put(restoringTask.id(), 
stateUpdater.remove(restoringTask.id()));
+                    
remainingRevokedPartitions.removeAll(restoringTask.inputPartitions());
                 }
             }
-            getNonFailedTasks(futures, 
failedTasksFromStateUpdater).forEach(task -> {
-                task.suspend();
-                tasks.addTask(task);
-            });
-
-            maybeThrowTaskExceptions(failedTasksFromStateUpdater);
         }
+        getNonFailedTasks(futures, failedTasksFromStateUpdater).forEach(task 
-> {
+            task.suspend();
+            tasks.addTask(task);
+        });
+
+        maybeThrowTaskExceptions(failedTasksFromStateUpdater);
     }
 
     private void prepareCommitAndAddOffsetsToMap(final Set<Task> 
tasksToPrepare,
@@ -1335,31 +1261,27 @@ public class TaskManager {
     }
 
     private void removeLostActiveTasksFromStateUpdaterAndPendingTasksToInit() {
-        if (stateUpdater != null) {
-            final Map<TaskId, 
CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new 
LinkedHashMap<>();
-            final Set<Task> tasksToCloseClean = new 
TreeSet<>(Comparator.comparing(Task::id));
-            tasksToCloseClean.addAll(tasks.drainPendingActiveTasksToInit());
-            final Set<Task> tasksToCloseDirty = new 
TreeSet<>(Comparator.comparing(Task::id));
-            for (final Task restoringTask : stateUpdater.tasks()) {
-                if (restoringTask.isActive()) {
-                    futures.put(restoringTask.id(), 
stateUpdater.remove(restoringTask.id()));
-                }
+        final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> 
futures = new LinkedHashMap<>();
+        final Set<Task> tasksToCloseClean = new 
TreeSet<>(Comparator.comparing(Task::id));
+        tasksToCloseClean.addAll(tasks.drainPendingActiveTasksToInit());
+        final Set<Task> tasksToCloseDirty = new 
TreeSet<>(Comparator.comparing(Task::id));
+        for (final Task restoringTask : stateUpdater.tasks()) {
+            if (restoringTask.isActive()) {
+                futures.put(restoringTask.id(), 
stateUpdater.remove(restoringTask.id()));
             }
+        }
 
-            addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty);
-            for (final Task task : tasksToCloseClean) {
-                closeTaskClean(task, tasksToCloseDirty, new HashMap<>());
-            }
-            for (final Task task : tasksToCloseDirty) {
-                closeTaskDirty(task, false);
-            }
+        addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty);
+        for (final Task task : tasksToCloseClean) {
+            closeTaskClean(task, tasksToCloseDirty, new HashMap<>());
+        }
+        for (final Task task : tasksToCloseDirty) {
+            closeTaskDirty(task, false);
         }
     }
 
     public void signalResume() {
-        if (stateUpdater != null) {
-            stateUpdater.signalResume();
-        }
+        stateUpdater.signalResume();
         if (schedulingTaskManager != null) {
             schedulingTaskManager.signalTaskExecutors();
         }
@@ -1581,40 +1503,38 @@ public class TaskManager {
     }
 
     private void shutdownStateUpdater() {
-        if (stateUpdater != null) {
-            // If there are failed tasks handling them first
-            for (final StateUpdater.ExceptionAndTask exceptionAndTask : 
stateUpdater.drainExceptionsAndFailedTasks()) {
-                final Task failedTask = exceptionAndTask.task();
-                closeTaskDirty(failedTask, false);
-            }
+        // If there are failed tasks handling them first
+        for (final StateUpdater.ExceptionAndTask exceptionAndTask : 
stateUpdater.drainExceptionsAndFailedTasks()) {
+            final Task failedTask = exceptionAndTask.task();
+            closeTaskDirty(failedTask, false);
+        }
 
-            final Map<TaskId, 
CompletableFuture<StateUpdater.RemovedTaskResult>> futures = new 
LinkedHashMap<>();
-            for (final Task task : stateUpdater.tasks()) {
-                final CompletableFuture<StateUpdater.RemovedTaskResult> future 
= stateUpdater.remove(task.id());
-                futures.put(task.id(), future);
-            }
-            final Set<Task> tasksToCloseClean = new 
TreeSet<>(Comparator.comparing(Task::id));
-            final Set<Task> tasksToCloseDirty = new 
TreeSet<>(Comparator.comparing(Task::id));
-            addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty);
-            // at this point we removed all tasks, so the shutdown should not 
take a lot of time
-            stateUpdater.shutdown(Duration.ofMinutes(1L));
+        final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> 
futures = new LinkedHashMap<>();
+        for (final Task task : stateUpdater.tasks()) {
+            final CompletableFuture<StateUpdater.RemovedTaskResult> future = 
stateUpdater.remove(task.id());
+            futures.put(task.id(), future);
+        }
+        final Set<Task> tasksToCloseClean = new 
TreeSet<>(Comparator.comparing(Task::id));
+        final Set<Task> tasksToCloseDirty = new 
TreeSet<>(Comparator.comparing(Task::id));
+        addToTasksToClose(futures, tasksToCloseClean, tasksToCloseDirty);
+        // at this point we removed all tasks, so the shutdown should not take 
a lot of time
+        stateUpdater.shutdown(Duration.ofMinutes(1L));
 
-            for (final Task task : tasksToCloseClean) {
-                tasks.addTask(task);
-            }
-            for (final Task task : tasksToCloseDirty) {
-                closeTaskDirty(task, false);
-            }
-            // Handling all failures that occurred during the remove process
-            for (final StateUpdater.ExceptionAndTask exceptionAndTask : 
stateUpdater.drainExceptionsAndFailedTasks()) {
-                final Task failedTask = exceptionAndTask.task();
-                closeTaskDirty(failedTask, false);
-            }
+        for (final Task task : tasksToCloseClean) {
+            tasks.addTask(task);
+        }
+        for (final Task task : tasksToCloseDirty) {
+            closeTaskDirty(task, false);
+        }
+        // Handling all failures that occurred during the remove process
+        for (final StateUpdater.ExceptionAndTask exceptionAndTask : 
stateUpdater.drainExceptionsAndFailedTasks()) {
+            final Task failedTask = exceptionAndTask.task();
+            closeTaskDirty(failedTask, false);
+        }
 
-            // If there is anything left unhandled due to timeouts, handling 
now
-            for (final Task task : stateUpdater.tasks()) {
-                closeTaskDirty(task, false);
-            }
+        // If there is anything left unhandled due to timeouts, handling now
+        for (final Task task : stateUpdater.tasks()) {
+            closeTaskDirty(task, false);
         }
     }
 
@@ -1793,24 +1713,18 @@ public class TaskManager {
     Map<TaskId, Task> allTasks() {
         // not bothering with an unmodifiable map, since the tasks themselves 
are mutable, but
         // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
-        if (stateUpdater != null) {
-            final Map<TaskId, Task> ret = 
stateUpdater.tasks().stream().collect(Collectors.toMap(Task::id, x -> x));
-            ret.putAll(tasks.allTasksPerId());
-            
ret.putAll(tasks.pendingTasksToInit().stream().collect(Collectors.toMap(Task::id,
 x -> x)));
-            return ret;
-        } else {
-            return tasks.allTasksPerId();
-        }
+        final Map<TaskId, Task> ret = 
stateUpdater.tasks().stream().collect(Collectors.toMap(Task::id, x -> x));
+        ret.putAll(tasks.allTasksPerId());
+        
ret.putAll(tasks.pendingTasksToInit().stream().collect(Collectors.toMap(Task::id,
 x -> x)));
+        return ret;
     }
 
     /**
-     * Returns tasks owned by the stream thread. With state updater disabled, 
these are all tasks. With
-     * state updater enabled, this does not return any tasks currently owned 
by the state updater.
+     * Returns tasks owned by the stream thread.
+     * This does not return any tasks currently owned by the state updater.
      *
-     * TODO: after we complete switching to state updater, we could rename 
this function as allRunningTasks
-     *       to be differentiated from allTasks including running and 
restoring tasks
      */
-    Map<TaskId, Task> allOwnedTasks() {
+    Map<TaskId, Task> allRunningTasks() {
         // not bothering with an unmodifiable map, since the tasks themselves 
are mutable, but
         // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
         return tasks.allTasksPerId();
@@ -1819,13 +1733,9 @@ public class TaskManager {
     Set<Task> readOnlyAllTasks() {
         // not bothering with an unmodifiable map, since the tasks themselves 
are mutable, but
         // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
-        if (stateUpdater != null) {
-            final HashSet<Task> ret = new HashSet<>(stateUpdater.tasks());
-            ret.addAll(tasks.allTasks());
-            return Collections.unmodifiableSet(ret);
-        } else {
-            return Collections.unmodifiableSet(tasks.allTasks());
-        }
+        final HashSet<Task> ret = new HashSet<>(stateUpdater.tasks());
+        ret.addAll(tasks.allTasks());
+        return Collections.unmodifiableSet(ret);
     }
 
     Map<TaskId, Task> notPausedTasks() {
@@ -1848,13 +1758,10 @@ public class TaskManager {
     }
 
     private Stream<Task> activeTaskStream() {
-        if (stateUpdater != null) {
-            return Stream.concat(
-                activeRunningTaskStream(),
-                stateUpdater.tasks().stream().filter(Task::isActive)
-            );
-        }
-        return activeRunningTaskStream();
+        return Stream.concat(
+            activeRunningTaskStream(),
+            stateUpdater.tasks().stream().filter(Task::isActive)
+        );
     }
 
     private Stream<Task> activeRunningTaskStream() {
@@ -1871,14 +1778,10 @@ public class TaskManager {
 
     private Stream<Task> standbyTaskStream() {
         final Stream<Task> standbyTasksInTaskRegistry = 
tasks.allTasks().stream().filter(t -> !t.isActive());
-        if (stateUpdater != null) {
-            return Stream.concat(
-                stateUpdater.standbyTasks().stream(),
-                standbyTasksInTaskRegistry
-            );
-        } else {
-            return standbyTasksInTaskRegistry;
-        }
+        return Stream.concat(
+            stateUpdater.standbyTasks().stream(),
+            standbyTasksInTaskRegistry
+        );
     }
     // For testing only.
     int commitAll() {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 76d63490683..f5d007a5915 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -206,17 +206,6 @@ class Tasks implements TasksRegistry {
         failedTaskIds.remove(taskToRemove.id());
     }
 
-    @Override
-    public synchronized void replaceActiveWithStandby(final StandbyTask 
standbyTask) {
-        final TaskId taskId = standbyTask.id();
-        if (activeTasksPerId.remove(taskId) == null) {
-            throw new IllegalStateException("Attempted to replace unknown 
active task with standby task: " + taskId);
-        }
-        removePartitionsForActiveTask(taskId);
-
-        standbyTasksPerId.put(standbyTask.id(), standbyTask);
-    }
-
     @Override
     public synchronized void replaceStandbyWithActive(final StreamTask 
activeTask) {
         final TaskId taskId = activeTask.id();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
index 20bee575ebc..09c5a79ae0f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
@@ -55,8 +55,6 @@ public interface TasksRegistry {
 
     void removeTask(final Task taskToRemove);
 
-    void replaceActiveWithStandby(final StandbyTask standbyTask);
-
     void replaceStandbyWithActive(final StreamTask activeTask);
 
     boolean updateActiveTaskInputPartitions(final Task task, final 
Set<TopicPartition> topicPartitions);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
index 96597e63523..9ddff25ff94 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java
@@ -272,7 +272,6 @@ public class ActiveTaskCreatorTest {
             config,
             streamsMetrics,
             stateDirectory,
-            changeLogReader,
             new ThreadCache(new LogContext(), 0L, streamsMetrics),
             new MockTime(),
             mockClientSupplier,
@@ -280,7 +279,6 @@ public class ActiveTaskCreatorTest {
             0,
             uuid,
             new LogContext(),
-            false,
             false);
 
         assertThat(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 8c28ae6a33d..6c1a032371c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -120,7 +120,6 @@ public class ProcessorStateManagerTest {
     private final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8);
     private final ConsumerRecord<byte[], byte[]> consumerRecord =
         new ConsumerRecord<>(persistentStoreTopicName, 1, 100L, keyBytes, 
valueBytes);
-    private final MockChangelogReader changelogReader = new 
MockChangelogReader();
     private final LogContext logContext = new 
LogContext("process-state-manager-test ");
     private final StateRestoreCallback noopStateRestoreCallback = (k, v) -> { 
};
 
@@ -202,14 +201,12 @@ public class ProcessorStateManagerTest {
             false,
             logContext,
             stateDirectory,
-            changelogReader,
             mkMap(
                 mkEntry(persistentStoreName, persistentStoreTopicName),
                 mkEntry(persistentStoreTwoName, persistentStoreTwoTopicName),
                 mkEntry(nonPersistentStoreName, nonPersistentStoreTopicName)
             ),
-            Set.of(persistentStorePartition, nonPersistentStorePartition),
-            false);
+            Set.of(persistentStorePartition, nonPersistentStorePartition));
 
         assertTrue(stateMgr.changelogAsSource(persistentStorePartition));
         assertTrue(stateMgr.changelogAsSource(nonPersistentStorePartition));
@@ -224,12 +221,11 @@ public class ProcessorStateManagerTest {
             false,
             logContext,
             stateDirectory,
-            changelogReader, mkMap(
+            mkMap(
                 mkEntry(persistentStoreName, persistentStoreTopicName),
                 mkEntry(persistentStoreTwoName, persistentStoreTopicName)
             ),
-            Collections.emptySet(),
-            false);
+            Collections.emptySet());
 
         stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
         stateMgr.registerStore(persistentStoreTwo, 
persistentStore.stateRestoreCallback, null);
@@ -306,7 +302,7 @@ public class ProcessorStateManagerTest {
     }
 
     @Test
-    public void shouldUnregisterChangelogsDuringClose() {
+    public void shouldCloseStateStoresOnStateManagerClose() {
         final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
         final StateStore store = mock(StateStore.class);
         when(store.name()).thenReturn(persistentStoreName);
@@ -317,16 +313,13 @@ public class ProcessorStateManagerTest {
         verify(store).init(context, store);
 
         stateMgr.registerStore(store, noopStateRestoreCallback, null);
-        
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
 
         stateMgr.close();
         verify(store).close();
-
-        
assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition));
     }
 
     @Test
-    public void shouldRecycleStoreAndReregisterChangelog() {
+    public void shouldRecycleAndReinitializeStore() {
         final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
         final StateStore store = mock(StateStore.class);
         when(store.name()).thenReturn(persistentStoreName);
@@ -336,16 +329,14 @@ public class ProcessorStateManagerTest {
         verify(store).init(context, store);
 
         stateMgr.registerStore(store, noopStateRestoreCallback, null);
-        
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
 
         stateMgr.recycle();
-        
assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition));
         assertThat(stateMgr.store(persistentStoreName), equalTo(store));
 
         stateMgr.registerStateStores(singletonList(store), context);
 
+        verify(store).init(context, store);
         verify(context, times(2)).uninitialize();
-        
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
     }
 
     @Test
@@ -359,10 +350,9 @@ public class ProcessorStateManagerTest {
         verify(store).init(context, store);
 
         stateMgr.registerStore(store, noopStateRestoreCallback, null);
-        
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
+        assertThat(stateMgr.store(persistentStoreName), equalTo(store));
 
         stateMgr.recycle();
-        
assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition));
         assertThat(stateMgr.store(persistentStoreName), equalTo(store));
 
         verify(store).clearCache();
@@ -374,7 +364,7 @@ public class ProcessorStateManagerTest {
 
         try {
             stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
-            
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
+            assertEquals(persistentStore, stateMgr.store(persistentStoreName));
         } finally {
             stateMgr.close();
         }
@@ -386,28 +376,7 @@ public class ProcessorStateManagerTest {
 
         try {
             stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback, null);
-            
assertTrue(changelogReader.isPartitionRegistered(nonPersistentStorePartition));
-        } finally {
-            stateMgr.close();
-        }
-    }
-
-    @Test
-    public void shouldNotRegisterNonLoggedStore() {
-        final ProcessorStateManager stateMgr = new ProcessorStateManager(
-            taskId,
-            Task.TaskType.STANDBY,
-            false,
-            logContext,
-            stateDirectory,
-            changelogReader,
-            emptyMap(),
-            emptySet(),
-            false);
-
-        try {
-            stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
-            
assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition));
+            assertEquals(nonPersistentStore, 
stateMgr.store(nonPersistentStoreName));
         } finally {
             stateMgr.close();
         }
@@ -673,10 +642,8 @@ public class ProcessorStateManagerTest {
             false,
             logContext,
             stateDirectory,
-            changelogReader,
             emptyMap(),
-            emptySet(),
-            false);
+            emptySet());
 
         try {
             stateMgr.registerStore(persistentStore, 
persistentStore.stateRestoreCallback, null);
@@ -1230,14 +1197,12 @@ public class ProcessorStateManagerTest {
             eosEnabled,
             logContext,
             stateDirectory,
-            changelogReader,
             mkMap(
                 mkEntry(persistentStoreName, persistentStoreTopicName),
                 mkEntry(persistentStoreTwoName, persistentStoreTwoTopicName),
                 mkEntry(nonPersistentStoreName, nonPersistentStoreTopicName)
             ),
-            emptySet(),
-            false);
+            emptySet());
     }
 
     private ProcessorStateManager getStateManager(final Task.TaskType 
taskType) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8a60f4f14ea..152b00918c1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -259,7 +259,6 @@ public class StreamThreadTest {
             mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, enableEoS ? 
StreamsConfig.EXACTLY_ONCE_V2 : StreamsConfig.AT_LEAST_ONCE),
             mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.ByteArraySerde.class.getName()),
             mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.ByteArraySerde.class.getName()),
-            mkEntry(InternalConfig.STATE_UPDATER_ENABLED, 
Boolean.toString(true)),
             mkEntry(InternalConfig.PROCESSING_THREADS_ENABLED, 
Boolean.toString(processingThreadsEnabled)),
             mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1")
         ));
@@ -591,7 +590,7 @@ public class StreamThreadTest {
         final Task runningTask = statelessTask(taskId)
             .inState(Task.State.RUNNING).build();
         final TaskManager taskManager = mock(TaskManager.class);
-        
when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, 
runningTask));
+        
when(taskManager.allRunningTasks()).thenReturn(Collections.singletonMap(taskId, 
runningTask));
         
when(taskManager.commit(Collections.singleton(runningTask))).thenReturn(0);
 
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
@@ -905,7 +904,6 @@ public class StreamThreadTest {
         );
 
         final Properties properties = new Properties();
-        properties.put(InternalConfig.STATE_UPDATER_ENABLED, true);
         properties.put(InternalConfig.PROCESSING_THREADS_ENABLED, false);
         properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
         final StreamsConfig config = new 
StreamsConfig(StreamsTestUtils.getStreamsConfig(APPLICATION_ID,
@@ -2263,7 +2261,7 @@ public class StreamThreadTest {
 
         TestUtils.waitForCondition(
             () -> mockRestoreConsumer.assignment().size() == 1,
-            "Never get the assignment");
+            "Never got the assignment");
 
         mockRestoreConsumer.addRecord(new ConsumerRecord<>(
             "stream-thread-test-count-changelog",
@@ -2287,7 +2285,7 @@ public class StreamThreadTest {
         // registered again with the changelog reader
         TestUtils.waitForCondition(
             () -> mockRestoreConsumer.assignment().size() == 1,
-            "Never get the assignment");
+            "Never got the assignment");
 
         // after handling the exception and reviving the task, the position
         // should be reset to the beginning.
@@ -2310,7 +2308,7 @@ public class StreamThreadTest {
 
         TestUtils.waitForCondition(
             () -> mockRestoreConsumer.assignment().isEmpty(),
-            "Never get the assignment");
+            "Never got the assignment");
     }
 
     @ParameterizedTest
@@ -2776,7 +2774,7 @@ public class StreamThreadTest {
         when(task2.state()).thenReturn(Task.State.RESTORING);
         when(task3.state()).thenReturn(Task.State.CREATED);
 
-        when(taskManager.allOwnedTasks()).thenReturn(mkMap(
+        when(taskManager.allRunningTasks()).thenReturn(mkMap(
             mkEntry(taskId1, task1),
             mkEntry(taskId2, task2),
             mkEntry(taskId3, task3)
@@ -4036,7 +4034,7 @@ public class StreamThreadTest {
         final TaskId taskId = new TaskId(0, 0);
 
         when(runningTask.state()).thenReturn(Task.State.RUNNING);
-        
when(taskManager.allOwnedTasks()).thenReturn(Collections.singletonMap(taskId, 
runningTask));
+        
when(taskManager.allRunningTasks()).thenReturn(Collections.singletonMap(taskId, 
runningTask));
         return taskManager;
     }
 
@@ -4073,10 +4071,8 @@ public class StreamThreadTest {
             config,
             streamsMetrics,
             stateDirectory,
-            new MockChangelogReader(),
             CLIENT_ID,
-            logContext,
-            false);
+            logContext);
         return standbyTaskCreator.createTasks(singletonMap(new TaskId(1, 2), 
emptySet()));
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 63638d12c9b..17e47ca2f8e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -878,7 +878,7 @@ public class TaskManagerTest {
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, 
activeTask)));
-        assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03, 
activeTask)));
+        assertEquals(taskManager.allRunningTasks(), mkMap(mkEntry(taskId03, 
activeTask)));
     }
 
     @Test
@@ -2186,7 +2186,6 @@ public class TaskManagerTest {
         verify(task00).prepareCommit(false);
         verify(task00).postCommit(true);
         verify(task00).addPartitionsForOffsetReset(taskId00Partitions);
-        verify(task00).changelogPartitions();
         verify(task00).closeDirty();
         verify(task00).revive();
         verify(tasks).removeTask(task00);
@@ -2258,7 +2257,6 @@ public class TaskManagerTest {
         verify(nonCorruptedTask).prepareCommit(true);
         verify(nonCorruptedTask, never()).addPartitionsForOffsetReset(any());
         verify(corruptedTask).addPartitionsForOffsetReset(taskId00Partitions);
-        verify(corruptedTask).changelogPartitions();
         verify(corruptedTask).postCommit(true);
 
         // check that we should not commit empty map either
@@ -2373,7 +2371,6 @@ public class TaskManagerTest {
         verify(uncorruptedActive, never()).prepareCommit(anyBoolean());
         verify(uncorruptedActive, never()).postCommit(anyBoolean());
 
-        verify(corruptedActive).changelogPartitions();
         verify(corruptedActive).postCommit(true);
         
verify(corruptedActive).addPartitionsForOffsetReset(taskId00Partitions);
         verify(consumer, never()).commitSync(emptyMap());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 212a65eacdd..34d8d279778 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -48,7 +48,6 @@ import 
org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
-import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsProducer;
@@ -63,8 +62,6 @@ import 
org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.test.MockApiProcessorSupplier;
-import org.apache.kafka.test.MockStandbyUpdateListener;
-import org.apache.kafka.test.MockStateRestoreListener;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterEach;
@@ -437,17 +434,8 @@ public class StreamThreadStateStoreProviderTest {
             StreamsConfigUtils.eosEnabled(streamsConfig),
             logContext,
             stateDirectory,
-            new StoreChangelogReader(
-                new MockTime(),
-                streamsConfig,
-                logContext,
-                adminClient,
-                restoreConsumer,
-                new MockStateRestoreListener(),
-                new MockStandbyUpdateListener()),
             topology.storeToChangelogTopic(),
-            partitions,
-            false);
+            partitions);
         final RecordCollector recordCollector = new RecordCollectorImpl(
             logContext,
             taskId,
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index f0e748f35ea..06bce3a1bac 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -472,10 +472,8 @@ public class TopologyTestDriver implements Closeable {
                 
StreamsConfig.EXACTLY_ONCE_V2.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
                 logContext,
                 stateDirectory,
-                new MockChangelogRegister(),
                 processorTopology.storeToChangelogTopic(),
-                new HashSet<>(partitionsByInputTopic.values()),
-                false);
+                new HashSet<>(partitionsByInputTopic.values()));
             final RecordCollector recordCollector = new RecordCollectorImpl(
                 logContext,
                 TASK_ID,
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py 
b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 6b7041167ca..8f2f8859836 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -24,7 +24,7 @@ from kafkatest.services.streams import 
StreamsSmokeTestDriverService, StreamsSmo
 from kafkatest.tests.streams.utils import extract_generation_from_logs, 
extract_generation_id
 from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, 
LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
     LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, 
LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, \
-    LATEST_4_0, LATEST_4_1, DEV_BRANCH, DEV_VERSION, KafkaVersion
+    LATEST_4_0, LATEST_4_1, LATEST_4_2, DEV_BRANCH, DEV_VERSION, KafkaVersion
 
 # broker 0.10.0 is not compatible with newer Kafka Streams versions
 # broker 0.10.1 and 0.10.2 do not support headers, as required by suppress() 
(since v2.2.1)
@@ -174,26 +174,31 @@ class StreamsUpgradeTest(Test):
         self.stop_and_await()
 
     @cluster(num_nodes=6)
-    @matrix(from_version=[str(LATEST_3_2), str(DEV_VERSION)],  upgrade=[True, 
False], metadata_quorum=[quorum.combined_kraft])
+    @matrix(from_version=[str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9), 
str(LATEST_4_0), str(LATEST_4_1), str(LATEST_4_2)],
+            upgrade=[True, False],
+            metadata_quorum=[quorum.combined_kraft])
     def test_upgrade_downgrade_state_updater(self, from_version, upgrade, 
metadata_quorum):
         """
-        Starts 3 KafkaStreams instances, and enables / disables state 
restoration
+        Starts 3 KafkaStreams instances, and tests upgrade/downgrade state 
restoration
         for the instances in a rolling bounce.
 
-        Once same-thread state restoration is removed from the code, this test
-        should use different versions of the code.
+        For versions before 3.8, state updater did not exist (always disabled).
+        For versions 3.8 to 4.2, state updater can be disabled via config.
+        For DEV_VERSION (4.3+), state updater is always enabled (config 
removed).
         """
         to_version=str(DEV_VERSION)
 
         if upgrade:
-            extra_properties_first = { '__state.updater.enabled__': 'false' }
+            # state updater disabled to always enabled
+            extra_properties_first = {'__state.updater.enabled__': 'false'}
             first_version = from_version
-            extra_properties_second = { '__state.updater.enabled__': 'true' }
+            extra_properties_second = {}  # config is removed
             second_version = to_version
         else:
-            extra_properties_first = { '__state.updater.enabled__': 'true' }
+            # state updater always enabled to disabled
+            extra_properties_first = {}  # config is removed
             first_version = to_version
-            extra_properties_second = { '__state.updater.enabled__': 'false' }
+            extra_properties_second = {'__state.updater.enabled__': 'false'}
             second_version = from_version
 
         self.set_up_services()

Reply via email to