This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c32d2338a7e KAFKA-10199: Enable state updater by default (#13927)
c32d2338a7e is described below

commit c32d2338a7e0079e539b74eb16f0095380a1ce85
Author: Bruno Cadonna <[email protected]>
AuthorDate: Wed Oct 4 13:58:44 2023 +0200

    KAFKA-10199: Enable state updater by default (#13927)
    
    Now that the implementation for the state updater is done, we can enable it 
by default.
    
    This PR enables the state updater by default and fixes code that made 
assumptions that are not true when the state updater is enabled (mainly tests).
    
    Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax 
<[email protected]>, Walker Carlson <[email protected]>
---
 .../org/apache/kafka/streams/StreamsConfig.java    |   4 +
 .../streams/processor/internals/ReadOnlyTask.java  |   2 +-
 .../processor/internals/StoreChangelogReader.java  |   3 +-
 .../streams/processor/internals/StreamThread.java  |   9 +-
 .../streams/processor/internals/TaskExecutor.java  |   2 +-
 .../streams/processor/internals/TaskManager.java   |  57 +++++--
 .../integration/EosV2UpgradeIntegrationTest.java   |   3 +-
 ...yInnerJoinCustomPartitionerIntegrationTest.java |   5 +-
 .../integration/LagFetchIntegrationTest.java       |  17 +-
 .../integration/NamedTopologyIntegrationTest.java  |  85 +++++-----
 .../PurgeRepartitionTopicIntegrationTest.java      |   8 +-
 .../integration/utils/IntegrationTestUtils.java    |  23 ++-
 .../internals/DefaultStateUpdaterTest.java         |   4 +-
 .../processor/internals/ReadOnlyTaskTest.java      |   1 +
 .../internals/StoreChangelogReaderTest.java        |  56 +++----
 .../processor/internals/StreamThreadTest.java      | 181 +++++++++++++++++----
 .../processor/internals/TaskExecutorTest.java      |   2 +-
 .../processor/internals/TaskManagerTest.java       |  51 ++++--
 18 files changed, 356 insertions(+), 157 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b8defc5ee0c..2b36a0820b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1198,6 +1198,10 @@ public class StreamsConfig extends AbstractConfig {
         // Private API to enable the state updater (i.e. state updating on a 
dedicated thread)
         public static final String STATE_UPDATER_ENABLED = 
"__state.updater.enabled__";
 
+        public static boolean getStateUpdaterEnabled(final Map<String, Object> 
configs) {
+            return InternalConfig.getBoolean(configs, 
InternalConfig.STATE_UPDATER_ENABLED, true);
+        }
+
         public static boolean getBoolean(final Map<String, Object> configs, 
final String key, final boolean defaultValue) {
             final Object value = configs.getOrDefault(key, defaultValue);
             if (value instanceof Boolean) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
index 2fb37564155..cfac1f13b21 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
@@ -211,7 +211,7 @@ public class ReadOnlyTask implements Task {
 
     @Override
     public StateStore getStore(final String name) {
-        throw new UnsupportedOperationException("This task is read-only");
+        return task.getStore(name);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 2a8c215e9b1..1097402ebb3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -230,8 +230,7 @@ public class StoreChangelogReader implements 
ChangelogReader {
         this.restoreConsumer = restoreConsumer;
         this.stateRestoreListener = stateRestoreListener;
 
-        this.stateUpdaterEnabled =
-            InternalConfig.getBoolean(config.originals(), 
InternalConfig.STATE_UPDATER_ENABLED, false);
+        this.stateUpdaterEnabled = 
InternalConfig.getStateUpdaterEnabled(config.originals());
 
         this.groupId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
         this.pollTime = 
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a7ac2566248..bfce0b32608 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -374,8 +374,7 @@ public class StreamThread extends Thread {
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, 
streamsMetrics);
 
-        final boolean stateUpdaterEnabled =
-            InternalConfig.getBoolean(config.originals(), 
InternalConfig.STATE_UPDATER_ENABLED, false);
+        final boolean stateUpdaterEnabled = 
InternalConfig.getStateUpdaterEnabled(config.originals());
         final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
             topologyMetadata,
             config,
@@ -558,7 +557,7 @@ public class StreamThread extends Thread {
 
         this.numIterations = 1;
         this.eosEnabled = eosEnabled(config);
-        this.stateUpdaterEnabled = 
InternalConfig.getBoolean(config.originals(), 
InternalConfig.STATE_UPDATER_ENABLED, false);
+        this.stateUpdaterEnabled = 
InternalConfig.getStateUpdaterEnabled(config.originals());
     }
 
     private static final class InternalConsumerConfig extends ConsumerConfig {
@@ -859,7 +858,7 @@ public class StreamThread extends Thread {
 
                     if (log.isDebugEnabled()) {
                         log.debug("Committed all active tasks {} and standby 
tasks {} in {}ms",
-                            taskManager.activeTaskIds(), 
taskManager.standbyTaskIds(), commitLatency);
+                            taskManager.activeRunningTaskIds(), 
taskManager.standbyTaskIds(), commitLatency);
                     }
                 }
 
@@ -1128,7 +1127,7 @@ public class StreamThread extends Thread {
         if (now - lastCommitMs > commitTimeMs) {
             if (log.isDebugEnabled()) {
                 log.debug("Committing all active tasks {} and standby tasks {} 
since {}ms has elapsed (commit interval is {}ms)",
-                          taskManager.activeTaskIds(), 
taskManager.standbyTaskIds(), now - lastCommitMs, commitTimeMs);
+                          taskManager.activeRunningTaskIds(), 
taskManager.standbyTaskIds(), now - lastCommitMs, commitTimeMs);
             }
 
             committed = taskManager.commit(
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index 56359676718..3ec2035a5cd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -178,7 +178,7 @@ public class TaskExecutor {
         final Set<TaskId> corruptedTasks = new HashSet<>();
 
         if (executionMetadata.processingMode() == EXACTLY_ONCE_ALPHA) {
-            for (final Task task : taskManager.activeTaskIterable()) {
+            for (final Task task : taskManager.activeRunningTaskIterable()) {
                 final Map<TopicPartition, OffsetAndMetadata> 
taskOffsetsToCommit = offsetsPerTask.getOrDefault(task, emptyMap());
                 if (!taskOffsetsToCommit.isEmpty() || 
taskManager.streamsProducerForTask(task.id()).transactionInFlight()) {
                     try {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index e117e3bd5ea..cf6f6b6326f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -572,7 +572,9 @@ public class TaskManager {
         while (iter.hasNext()) {
             final Map.Entry<TaskId, Set<TopicPartition>> entry = iter.next();
             final TaskId taskId = entry.getKey();
-            if (taskId.topologyName() != null && 
!topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
+            final boolean taskIsOwned = tasks.allTaskIds().contains(taskId)
+                || (stateUpdater != null && 
stateUpdater.getTasks().stream().anyMatch(task -> task.id() == taskId));
+            if (taskId.topologyName() != null && !taskIsOwned && 
!topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
                 log.info("Cannot create the assigned task {} since it's 
topology name cannot be recognized, will put it " +
                         "aside as pending for now and create later when 
topology metadata gets refreshed", taskId);
                 pendingTasks.put(taskId, entry.getValue());
@@ -649,7 +651,12 @@ public class TaskManager {
             try {
                 if (oldTask.isActive()) {
                     final StandbyTask standbyTask = 
convertActiveToStandby((StreamTask) oldTask, inputPartitions);
-                    tasks.replaceActiveWithStandby(standbyTask);
+                    if (stateUpdater != null) {
+                        tasks.removeTask(oldTask);
+                        
tasks.addPendingTasksToInit(Collections.singleton(standbyTask));
+                    } else {
+                        tasks.replaceActiveWithStandby(standbyTask);
+                    }
                 } else {
                     final StreamTask activeTask = 
convertStandbyToActive((StandbyTask) oldTask, inputPartitions);
                     tasks.replaceStandbyWithActive(activeTask);
@@ -915,7 +922,7 @@ public class TaskManager {
     }
 
     private void handleRestoredTasksFromStateUpdater(final long now,
-                                                        final 
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
+                                                     final 
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
         final Map<TaskId, RuntimeException> taskExceptions = new 
LinkedHashMap<>();
         final Set<Task> tasksToCloseDirty = new 
TreeSet<>(Comparator.comparing(Task::id));
 
@@ -964,7 +971,7 @@ public class TaskManager {
         final Map<Task, Map<TopicPartition, OffsetAndMetadata>> 
consumedOffsetsPerTask = new HashMap<>();
         final AtomicReference<RuntimeException> firstException = new 
AtomicReference<>(null);
 
-        for (final Task task : activeTaskIterable()) {
+        for (final Task task : activeRunningTaskIterable()) {
             if 
(remainingRevokedPartitions.containsAll(task.inputPartitions())) {
                 // when the task input partitions are included in the revoked 
list,
                 // this is an active task and should be revoked
@@ -1557,12 +1564,16 @@ public class TaskManager {
             .collect(Collectors.toSet());
     }
 
-    Set<TaskId> standbyTaskIds() {
-        return standbyTaskStream()
+    Set<TaskId> activeRunningTaskIds() {
+        return activeRunningTaskStream()
             .map(Task::id)
             .collect(Collectors.toSet());
     }
 
+    Set<TaskId> standbyTaskIds() {
+        return standbyTaskStream().map(Task::id).collect(Collectors.toSet());
+    }
+
     Map<TaskId, Task> allTasks() {
         // not bothering with an unmodifiable map, since the tasks themselves 
are mutable, but
         // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
@@ -1615,7 +1626,21 @@ public class TaskManager {
         return activeTaskStream().collect(Collectors.toList());
     }
 
+    List<Task> activeRunningTaskIterable() {
+        return activeRunningTaskStream().collect(Collectors.toList());
+    }
+
     private Stream<Task> activeTaskStream() {
+        if (stateUpdater != null) {
+            return Stream.concat(
+                activeRunningTaskStream(),
+                stateUpdater.getTasks().stream().filter(Task::isActive)
+            );
+        }
+        return activeRunningTaskStream();
+    }
+
+    private Stream<Task> activeRunningTaskStream() {
         return tasks.allTasks().stream().filter(Task::isActive);
     }
 
@@ -1628,7 +1653,15 @@ public class TaskManager {
     }
 
     private Stream<Task> standbyTaskStream() {
-        return tasks.allTasks().stream().filter(t -> !t.isActive());
+        final Stream<Task> standbyTasksInTaskRegistry = 
tasks.allTasks().stream().filter(t -> !t.isActive());
+        if (stateUpdater != null) {
+            return Stream.concat(
+                stateUpdater.getStandbyTasks().stream(),
+                standbyTasksInTaskRegistry
+            );
+        } else {
+            return standbyTasksInTaskRegistry;
+        }
     }
 
     // For testing only.
@@ -1704,9 +1737,9 @@ public class TaskManager {
         if (rebalanceInProgress) {
             return -1;
         } else {
-            for (final Task task : activeTaskIterable()) {
+            for (final Task task : activeRunningTaskIterable()) {
                 if (task.commitRequested() && task.commitNeeded()) {
-                    return commit(activeTaskIterable());
+                    return commit(activeRunningTaskIterable());
                 }
             }
             return 0;
@@ -1791,7 +1824,7 @@ public class TaskManager {
     }
 
     void recordTaskProcessRatio(final long totalProcessLatencyMs, final long 
now) {
-        for (final Task task : activeTaskIterable()) {
+        for (final Task task : activeRunningTaskIterable()) {
             task.recordProcessTimeRatioAndBufferSize(totalProcessLatencyMs, 
now);
         }
     }
@@ -1815,7 +1848,7 @@ public class TaskManager {
             }
 
             final Map<TopicPartition, RecordsToDelete> recordsToDelete = new 
HashMap<>();
-            for (final Task task : activeTaskIterable()) {
+            for (final Task task : activeRunningTaskIterable()) {
                 for (final Map.Entry<TopicPartition, Long> entry : 
task.purgeableOffsets().entrySet()) {
                     recordsToDelete.put(entry.getKey(), 
RecordsToDelete.beforeOffset(entry.getValue()));
                 }
@@ -1915,7 +1948,7 @@ public class TaskManager {
     }
 
     boolean needsInitializationOrRestoration() {
-        return 
activeTaskIterable().stream().anyMatch(Task::needsInitializationOrRestoration);
+        return 
activeTaskStream().anyMatch(Task::needsInitializationOrRestoration);
     }
 
     // for testing only
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
index aacb9e0483b..5d8e4bf3190 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
@@ -783,7 +783,7 @@ public class EosV2UpgradeIntegrationTest {
             commitRequested.set(0);
             stateTransitions1.clear();
             stateTransitions2.clear();
-            streams2V2 = getKafkaStreams(APP_DIR_1, 
StreamsConfig.EXACTLY_ONCE_V2);
+            streams2V2 = getKafkaStreams(APP_DIR_2, 
StreamsConfig.EXACTLY_ONCE_V2);
             streams2V2.setStateListener(
                 (newState, oldState) -> 
stateTransitions2.add(KeyValue.pair(oldState, newState))
             );
@@ -1149,7 +1149,6 @@ public class EosV2UpgradeIntegrationTest {
                         keys.add(row.key);
                     }
                 }
-
                 return true;
             },
             MAX_WAIT_TIME_MS,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
index a065cf1d3f5..12a5056730d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
@@ -207,11 +207,12 @@ public class 
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
             });
         }
 
-        startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120));
+        for (final KafkaStreams stream: kafkaStreamsList) {
+            stream.start();
+        }
 
         // the streams applications should have shut down into `ERROR` due to 
the IllegalStateException
         waitForApplicationState(Arrays.asList(streams, streamsTwo, 
streamsThree), KafkaStreams.State.ERROR, ofSeconds(60));
-
     }
 
     private void verifyKTableKTableJoin(final Set<KeyValue<String, String>> 
expectedResult) throws Exception {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
index 03461fcbbc5..d2016901888 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
@@ -274,8 +274,9 @@ public class LagFetchIntegrationTest {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, Long> t1 = builder.table(inputTopicName, 
Materialized.as(stateStoreName));
         t1.toStream().to(outputTopicName);
-        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
 
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+        final AtomicReference<LagInfo> zeroLagRef = new AtomicReference<>();
         try {
             // First start up the active.
             TestUtils.waitForCondition(() -> 
streams.allLocalStorePartitionLags().size() == 0,
@@ -291,7 +292,6 @@ public class LagFetchIntegrationTest {
                 WAIT_TIMEOUT_MS);
 
             // check for proper lag values.
-            final AtomicReference<LagInfo> zeroLagRef = new 
AtomicReference<>();
             TestUtils.waitForCondition(() -> {
                 final Map<String, Map<Integer, LagInfo>> offsetLagInfoMap = 
streams.allLocalStorePartitionLags();
                 assertThat(offsetLagInfoMap.size(), equalTo(1));
@@ -312,9 +312,14 @@ public class LagFetchIntegrationTest {
             Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
                 .map(Path::toFile)
                 .forEach(f -> assertTrue(f.delete(), "Some state " + f + " 
could not be deleted"));
+        } finally {
+            streams.close();
+            streams.cleanUp();
+        }
 
-            // wait till the lag goes down to 0
-            final KafkaStreams restartedStreams = new 
KafkaStreams(builder.build(), props);
+        // wait till the lag goes down to 0
+        final KafkaStreams restartedStreams = new 
KafkaStreams(builder.build(), props);
+        try {
             // set a state restoration listener to track progress of 
restoration
             final CountDownLatch restorationEndLatch = new CountDownLatch(1);
             final Map<String, Map<Integer, LagInfo>> restoreStartLagInfo = new 
HashMap<>();
@@ -356,8 +361,8 @@ public class LagFetchIntegrationTest {
 
             assertThat(restoreEndLagInfo.get(stateStoreName).get(0), 
equalTo(zeroLagRef.get()));
         } finally {
-            streams.close();
-            streams.cleanUp();
+            restartedStreams.close();
+            restartedStreams.cleanUp();
         }
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
index de1241e9536..ee290211b6d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
@@ -59,6 +59,7 @@ import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
 import org.apache.kafka.test.TestUtils;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
@@ -86,6 +87,7 @@ import java.util.stream.Collectors;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.streams.KeyQueryMetadata.NOT_AVAILABLE;
 import static org.apache.kafka.streams.KeyValue.pair;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
@@ -172,9 +174,9 @@ public class NamedTopologyIntegrationTest {
     private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
         asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 
400L), pair("C", -50L));
     private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
-        asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of 
count operation with caching
+        asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), 
pair("C", 2L));
     private static final List<KeyValue<String, Long>> SUM_OUTPUT_DATA =
-        asList(pair("B", 200L), pair("A", 400L), pair("C", 350L)); // output 
of summation with caching
+        asList(pair("A", 100L), pair("B", 200L), pair("A", 400L), pair("C", 
400L), pair("C", 350L));
     private static final String TOPIC_PREFIX = "unique_topic_prefix";
 
     private final KafkaClientSupplier clientSupplier = new 
DefaultKafkaClientSupplier();
@@ -207,6 +209,7 @@ public class NamedTopologyIntegrationTest {
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000L);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
         streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 
* 1000);
+        
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
         
streamsConfiguration.put(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE, 
TOPIC_PREFIX);
         return streamsConfiguration;
     }
@@ -318,7 +321,7 @@ public class NamedTopologyIntegrationTest {
             .toStream().to(OUTPUT_STREAM_1);
         streams.addNamedTopology(topology1Builder.build());
         IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
-        final List<KeyValue<String, Long>> results = 
waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3);
+        final List<KeyValue<String, Long>> results = 
waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5);
         assertThat(results, equalTo(COUNT_OUTPUT_DATA));
 
         final Set<String> allTopics = CLUSTER.getAllTopicsInCluster();
@@ -336,9 +339,9 @@ public class NamedTopologyIntegrationTest {
         streams.addNamedTopology(topology3Builder.build());
         IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
 
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_3, 3), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_3, 5), equalTo(COUNT_OUTPUT_DATA));
 
         
assertThat(CLUSTER.getAllTopicsInCluster().containsAll(asList(changelog1, 
changelog2, changelog3)), is(true));
     }
@@ -365,7 +368,7 @@ public class NamedTopologyIntegrationTest {
             streams.addNamedTopology(topology2Builder.build());
             IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
 
-            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
             assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
SINGLE_PARTITION_OUTPUT_STREAM, 3), equalTo(COUNT_OUTPUT_DATA));
 
             final ReadOnlyKeyValueStore<String, Long> store =
@@ -437,7 +440,7 @@ public class NamedTopologyIntegrationTest {
         streams.start();
         streams.addNamedTopology(topology1Builder.build()).all().get();
 
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
     }
 
     @Test
@@ -448,8 +451,10 @@ public class NamedTopologyIntegrationTest {
         IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
         streams.addNamedTopology(topology2Builder.build()).all().get();
 
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
+        
IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams),
 State.RUNNING, Duration.ofMillis(DEFAULT_TIMEOUT));
+
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
     }
 
     @Test
@@ -463,9 +468,11 @@ public class NamedTopologyIntegrationTest {
 
         streams.addNamedTopology(topology3Builder.build()).all().get();
 
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_3, 3), equalTo(COUNT_OUTPUT_DATA));
+        
IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams),
 State.RUNNING, Duration.ofMillis(DEFAULT_TIMEOUT));
+
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_3, 5), equalTo(COUNT_OUTPUT_DATA));
     }
 
     @Test
@@ -481,14 +488,14 @@ public class NamedTopologyIntegrationTest {
         streams2.addNamedTopology(topology1Builder2.build());
         
IntegrationTestUtils.startApplicationAndWaitUntilRunning(asList(streams, 
streams2));
 
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
 
         final AddNamedTopologyResult result = 
streams.addNamedTopology(topology2Builder.build());
         final AddNamedTopologyResult result2 = 
streams2.addNamedTopology(topology2Builder2.build());
         result.all().get();
         result2.all().get();
 
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
     }
 
     @Test
@@ -501,7 +508,7 @@ public class NamedTopologyIntegrationTest {
         streams2.addNamedTopology(topology1Builder2.build());
         
IntegrationTestUtils.startApplicationAndWaitUntilRunning(asList(streams, 
streams2));
 
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
 
         final RemoveNamedTopologyResult result = 
streams.removeNamedTopology(TOPOLOGY_1, true);
         streams2.removeNamedTopology(TOPOLOGY_1, true).all().get();
@@ -537,7 +544,7 @@ public class NamedTopologyIntegrationTest {
         assertThat(streams.getAllTopologies(), 
equalTo(singleton(topology2Client1)));
         assertThat(streams2.getAllTopologies(), 
equalTo(singleton(topology2Client2)));
 
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
     }
 
     @Test
@@ -559,7 +566,7 @@ public class NamedTopologyIntegrationTest {
             produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA);
             produceToInputTopics(DELAYED_INPUT_STREAM_2, STANDARD_INPUT_DATA);
 
-            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
         } finally {
             CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1, 
DELAYED_INPUT_STREAM_2);
         }
@@ -578,8 +585,8 @@ public class NamedTopologyIntegrationTest {
             streams.addNamedTopology(topology1Builder.build());
             IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
 
-            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
-            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
             streams.removeNamedTopology(TOPOLOGY_1).all().get();
             streams.cleanUpNamedTopology(TOPOLOGY_1);
 
@@ -592,8 +599,8 @@ public class NamedTopologyIntegrationTest {
             produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA);
             streams.addNamedTopology(topology1Builder2.build()).all().get();
 
-            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
-            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
         } finally {
             CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
             CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1);
@@ -610,9 +617,9 @@ public class NamedTopologyIntegrationTest {
         streams.addNamedTopology(topology3Builder.build());
         IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
 
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_3, 3), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_3, 5), equalTo(COUNT_OUTPUT_DATA));
     }
 
     @Test
@@ -625,9 +632,9 @@ public class NamedTopologyIntegrationTest {
         streams.addNamedTopology(topology3Builder.build());
         IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
 
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_3, 3), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_3, 5), equalTo(COUNT_OUTPUT_DATA));
     }
 
     @Test
@@ -642,8 +649,8 @@ public class NamedTopologyIntegrationTest {
             final NamedTopology namedTopology = topology1Builder.build();
             streams.addNamedTopology(namedTopology).all().get();
 
-            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
-            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
             streams.removeNamedTopology("topology-1", true).all().get();
             streams.cleanUpNamedTopology("topology-1");
 
@@ -662,8 +669,8 @@ public class NamedTopologyIntegrationTest {
             final NamedTopology namedTopologyDup = topology1BuilderDup.build();
             streams.addNamedTopology(namedTopologyDup).all().get();
 
-            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
-            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
         } finally {
             CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
         }
@@ -680,8 +687,8 @@ public class NamedTopologyIntegrationTest {
         final NamedTopology namedTopology = topology1Builder.build();
         streams.addNamedTopology(namedTopology).all().get();
 
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
         streams.removeNamedTopology(TOPOLOGY_1, true).all().get();
         streams.cleanUpNamedTopology(TOPOLOGY_1);
 
@@ -700,8 +707,8 @@ public class NamedTopologyIntegrationTest {
         final NamedTopology namedTopologyDup = topology1BuilderDup.build();
         streams.addNamedTopology(namedTopologyDup).all().get();
 
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
 
         CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
     }
@@ -723,7 +730,7 @@ public class NamedTopologyIntegrationTest {
             streams2.addNamedTopology(topology1Builder2.build());
             
             
IntegrationTestUtils.startApplicationAndWaitUntilRunning(asList(streams, 
streams2));
-            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
 
             topology2Builder.stream(NEW_STREAM).groupBy((k, v) -> 
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
             topology2Builder2.stream(NEW_STREAM).groupBy((k, v) -> 
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
@@ -746,7 +753,7 @@ public class NamedTopologyIntegrationTest {
 
             CLUSTER.createTopic(NEW_STREAM, 2, 1);
             produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
-            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
 
             // Make sure the threads were not actually killed and replaced
             assertThat(streams.metadataForLocalThreads().size(), equalTo(2));
@@ -793,7 +800,7 @@ public class NamedTopologyIntegrationTest {
             produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
 
             final List<KeyValue<String, Integer>> output =
-                waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 3);
+                waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 5);
             output.retainAll(COUNT_OUTPUT_DATA);
 
             assertThat(output, equalTo(COUNT_OUTPUT_DATA));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index 774fa57b988..9a6b21d8a1f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -86,7 +87,7 @@ public class PurgeRepartitionTopicIntegrationTest {
     }
 
 
-    private final Time time = CLUSTER.time;
+    private final Time time = new MockTime(1);
 
     private class RepartitionTopicCreatedWithExpectedConfigs implements 
TestCondition {
         @Override
@@ -212,10 +213,11 @@ public class PurgeRepartitionTopicIntegrationTest {
         );
 
         // we need long enough timeout to by-pass the log manager's 
InitialTaskDelayMs, which is hard-coded on server side
+        final long waitForPurgeMs = 60000;
         TestUtils.waitForCondition(
             new RepartitionTopicVerified(currentSize -> currentSize <= 
PURGE_SEGMENT_BYTES),
-            60000,
-            "Repartition topic " + REPARTITION_TOPIC + " not purged data after 
60000 ms."
+            waitForPurgeMs,
+            "Repartition topic " + REPARTITION_TOPIC + " not purged data after 
" + waitForPurgeMs + " ms."
         );
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index d5bb594cdaa..9fda2e9f9d8 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -775,6 +775,7 @@ public class IntegrationTestUtils {
 
     /**
      * Wait until final key-value mappings have been consumed.
+     * Duplicate records are not considered in the comparison.
      *
      * @param consumerConfig     Kafka Consumer configuration
      * @param topic              Kafka topic to consume from
@@ -791,6 +792,7 @@ public class IntegrationTestUtils {
 
     /**
      * Wait until final key-value mappings have been consumed.
+     * Duplicate records are not considered in the comparison.
      *
      * @param consumerConfig     Kafka Consumer configuration
      * @param topic              Kafka topic to consume from
@@ -807,6 +809,7 @@ public class IntegrationTestUtils {
 
     /**
      * Wait until final key-value mappings have been consumed.
+     * Duplicate records are not considered in the comparison.
      *
      * @param consumerConfig     Kafka Consumer configuration
      * @param topic              Kafka topic to consume from
@@ -850,15 +853,19 @@ public class IntegrationTestUtils {
                 // still need to check that for each key, the ordering is 
expected
                 final Map<K, List<T>> finalAccumData = new HashMap<>();
                 for (final T kv : accumulatedActual) {
-                    finalAccumData.computeIfAbsent(
-                        withTimestamp ? ((KeyValueTimestamp<K, V>) kv).key() : 
((KeyValue<K, V>) kv).key,
-                        key -> new ArrayList<>()).add(kv);
+                    final K key = withTimestamp ? ((KeyValueTimestamp<K, V>) 
kv).key() : ((KeyValue<K, V>) kv).key;
+                    final List<T> records = 
finalAccumData.computeIfAbsent(key, k -> new ArrayList<>());
+                    if (!records.contains(kv)) {
+                        records.add(kv);
+                    }
                 }
                 final Map<K, List<T>> finalExpected = new HashMap<>();
                 for (final T kv : expectedRecords) {
-                    finalExpected.computeIfAbsent(
-                        withTimestamp ? ((KeyValueTimestamp<K, V>) kv).key() : 
((KeyValue<K, V>) kv).key,
-                        key -> new ArrayList<>()).add(kv);
+                    final K key = withTimestamp ? ((KeyValueTimestamp<K, V>) 
kv).key() : ((KeyValue<K, V>) kv).key;
+                    final List<T> records = finalExpected.computeIfAbsent(key, 
k -> new ArrayList<>());
+                    if (!records.contains(kv)) {
+                        records.add(kv);
+                    }
                 }
 
                 // returns true only if the remaining records in both lists 
are the same and in the same order
@@ -1037,8 +1044,8 @@ public class IntegrationTestUtils {
                 final long millisRemaining = expectedEnd - 
System.currentTimeMillis();
                 if (millisRemaining <= 0) {
                     fail(
-                        "Application did not reach a RUNNING state for all 
streams instances. " +
-                            "Non-running instances: " + nonRunningStreams
+                        nonRunningStreams.size() + " out of " + 
streamsList.size() + " Streams clients did not reach the RUNNING state. " +
+                            "Non-running Streams clients: " + nonRunningStreams
                     );
                 }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 13ac8e48f10..1c5c4292336 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -1269,7 +1269,7 @@ class DefaultStateUpdaterTest {
     }
 
     @Test
-    public void 
shouldAddFailedTasksToQueueWhenRestoreThrowsTaskCorruptedException() throws 
Exception {
+    public void shouldHandleTaskCorruptedExceptionAndAddFailedTasksToQueue() 
throws Exception {
         final StreamTask task1 = statefulTask(TASK_0_0, 
mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
         final StandbyTask task2 = standbyTask(TASK_0_2, 
mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
         final StreamTask task3 = statefulTask(TASK_1_0, 
mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
@@ -1293,6 +1293,8 @@ class DefaultStateUpdaterTest {
         verifyRestoredActiveTasks();
         verifyRemovedTasks();
         verify(changelogReader).unregister(mkSet(TOPIC_PARTITION_A_0, 
TOPIC_PARTITION_B_0));
+        verify(task1).markChangelogAsCorrupted(mkSet(TOPIC_PARTITION_A_0));
+        verify(task2).markChangelogAsCorrupted(mkSet(TOPIC_PARTITION_B_0));
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
index e25c7014c68..e0c8eaf4056 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
@@ -45,6 +45,7 @@ class ReadOnlyTaskTest {
             add("changelogOffsets");
             add("state");
             add("id");
+            add("getStore");
         }
     };
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index e544f485f46..5a19d8af734 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -390,37 +390,28 @@ public class StoreChangelogReaderTest extends 
EasyMockSupport {
     }
 
     @Test
-    public void shouldPollWithRightTimeout() {
-        final TaskId taskId = new TaskId(0, 0);
-
-        
EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
-        
EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp, 
5L));
-        EasyMock.expect(stateManager.taskId()).andReturn(taskId).anyTimes();
-        EasyMock.replay(stateManager, storeMetadata, store);
-
-        consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
-        adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L));
-
-        final StoreChangelogReader changelogReader =
-            new StoreChangelogReader(time, config, logContext, adminClient, 
consumer, callback);
-
-        changelogReader.register(tp, stateManager);
-
-        if (type == STANDBY) {
-            changelogReader.transitToUpdateStandby();
-        }
+    public void shouldPollWithRightTimeoutWithStateUpdater() {
+        shouldPollWithRightTimeout(true);
+    }
 
-        changelogReader.restore(Collections.singletonMap(taskId, 
mock(Task.class)));
+    @Test
+    public void shouldPollWithRightTimeoutWithoutStateUpdater() {
+        shouldPollWithRightTimeout(false);
+    }
 
-        if (type == ACTIVE) {
-            
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), 
consumer.lastPollTimeout());
-        } else {
-            assertEquals(Duration.ZERO, consumer.lastPollTimeout());
-        }
+    private void shouldPollWithRightTimeout(final boolean stateUpdaterEnabled) 
{
+        final Properties properties = new Properties();
+        properties.put(InternalConfig.STATE_UPDATER_ENABLED, 
stateUpdaterEnabled);
+        shouldPollWithRightTimeout(properties);
     }
 
     @Test
-    public void shouldPollWithRightTimeoutWithStateUpdater() {
+    public void shouldPollWithRightTimeoutWithStateUpdaterDefault() {
+        final Properties properties = new Properties();
+        shouldPollWithRightTimeout(properties);
+    }
+
+    private void shouldPollWithRightTimeout(final Properties properties) {
         final TaskId taskId = new TaskId(0, 0);
 
         
EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
@@ -431,8 +422,6 @@ public class StoreChangelogReaderTest extends 
EasyMockSupport {
         consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
         adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L));
 
-        final Properties properties = new Properties();
-        properties.put(InternalConfig.STATE_UPDATER_ENABLED, true);
         final StreamsConfig config = new 
StreamsConfig(StreamsTestUtils.getStreamsConfig("test-reader", properties));
 
         final StoreChangelogReader changelogReader =
@@ -445,7 +434,16 @@ public class StoreChangelogReaderTest extends 
EasyMockSupport {
         }
 
         changelogReader.restore(Collections.singletonMap(taskId, 
mock(Task.class)));
-        
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), 
consumer.lastPollTimeout());
+        if (type == ACTIVE) {
+            
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), 
consumer.lastPollTimeout());
+        } else {
+            if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED)
+                    || (boolean) 
properties.get(InternalConfig.STATE_UPDATER_ENABLED)) {
+                
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)), 
consumer.lastPollTimeout());
+            } else {
+                assertEquals(Duration.ZERO, consumer.lastPollTimeout());
+            }
+        }
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 9cda974cb95..f9c2a0a6080 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -47,8 +47,10 @@ import org.apache.kafka.common.metrics.MetricsContext;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -73,7 +75,6 @@ import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.StreamThread.State;
 import 
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.common.utils.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
@@ -130,6 +131,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
 import static 
org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId;
 import static 
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
 import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
+import static org.easymock.EasyMock.anyLong;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
@@ -241,6 +243,13 @@ public class StreamThreadTest {
     private StreamThread 
createStreamThread(@SuppressWarnings("SameParameterValue") final String 
clientId,
                                             final StreamsConfig config,
                                             final boolean eosEnabled) {
+        return createStreamThread(clientId, config, mockTime, eosEnabled);
+    }
+
+    private StreamThread 
createStreamThread(@SuppressWarnings("SameParameterValue") final String 
clientId,
+                                            final StreamsConfig config,
+                                            final Time time,
+                                            final boolean eosEnabled) {
         if (eosEnabled) {
             clientSupplier.setApplicationIdForProducer(APPLICATION_ID);
         }
@@ -251,7 +260,7 @@ public class StreamThreadTest {
             metrics,
             APPLICATION_ID,
             config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
-            mockTime
+            time
         );
 
         final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
@@ -265,7 +274,7 @@ public class StreamThreadTest {
             PROCESS_ID,
             clientId,
             streamsMetrics,
-            mockTime,
+            time,
             streamsMetadataState,
             0,
             stateDirectory,
@@ -334,7 +343,7 @@ public class StreamThreadTest {
 
     @Test
     public void shouldChangeStateAtStartClose() throws Exception {
-        final StreamThread thread = createStreamThread(CLIENT_ID, config, 
false);
+        final StreamThread thread = createStreamThread(CLIENT_ID, config, new 
MockTime(1), false);
 
         final StateListenerStub stateListener = new StateListenerStub();
         thread.setStateListener(stateListener);
@@ -634,6 +643,7 @@ public class StreamThreadTest {
 
     @Test
     public void 
shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing() throws 
InterruptedException {
+        final Time mockTime = new MockTime(1);
         final StreamsConfig config = new StreamsConfig(configProps(false));
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
@@ -694,6 +704,7 @@ public class StreamThreadTest {
 
     @Test
     public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing() throws 
InterruptedException {
+        final Time mockTime = new MockTime(1);
         final StreamsConfig config = new StreamsConfig(configProps(false));
         final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
             metrics,
@@ -704,6 +715,7 @@ public class StreamThreadTest {
 
         final Consumer<byte[], byte[]> mockConsumer = 
EasyMock.createNiceMock(Consumer.class);
         
expect(mockConsumer.poll(anyObject())).andStubReturn(ConsumerRecords.empty());
+        expect(mockConsumer.assignment()).andStubReturn(emptySet());
         final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
         
expect(mockConsumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
         
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
@@ -1034,6 +1046,7 @@ public class StreamThreadTest {
         expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
         
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
         expect(consumer.poll(anyObject())).andStubReturn(new 
ConsumerRecords<>(Collections.emptyMap()));
+        expect(consumer.assignment()).andStubReturn(emptySet());
         final Task task = niceMock(Task.class);
         expect(task.id()).andStubReturn(task1);
         
expect(task.inputPartitions()).andStubReturn(Collections.singleton(t1p1));
@@ -1046,7 +1059,9 @@ public class StreamThreadTest {
         final StandbyTaskCreator standbyTaskCreator = 
mock(StandbyTaskCreator.class);
         
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
 
-        EasyMock.replay(consumer, consumerGroupMetadata, task, 
activeTaskCreator, standbyTaskCreator);
+        EasyMock.replay(consumer, consumerGroupMetadata, activeTaskCreator, 
standbyTaskCreator);
+
+        final StateUpdater stateUpdater = Mockito.mock(StateUpdater.class);
 
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
@@ -1064,7 +1079,7 @@ public class StreamThreadTest {
             topologyMetadata,
             null,
             null,
-            null
+            stateUpdater
         ) {
             @Override
             int commit(final Collection<Task> tasksToCommit) {
@@ -1246,7 +1261,17 @@ public class StreamThreadTest {
     public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws 
InterruptedException {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);
 
-        final StreamThread thread = createStreamThread(CLIENT_ID, new 
StreamsConfig(configProps(true)), true);
+        final Properties props = configProps(true);
+
+        // The state updater is disabled for this test because this test 
relies on the fact the mainConsumer.resume()
+        // is not called. This is not true when the state updater is enabled 
which leads to
+        // java.lang.IllegalStateException: No current assignment for 
partition topic1-2.
+        // Since this tests verifies an aspect that is independent from the 
state updater, it is OK to disable
+        // the state updater and leave the rewriting of the test to later, 
when the code path for disabled state updater
+        // is removed.
+        props.put(InternalConfig.STATE_UPDATER_ENABLED, false);
+        final StreamThread thread =
+            createStreamThread(CLIENT_ID, new StreamsConfig(props), new 
MockTime(1), true);
 
         
thread.taskManager().handleRebalanceStart(Collections.singleton(topic1));
 
@@ -1290,7 +1315,12 @@ public class StreamThreadTest {
     public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws 
InterruptedException {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);
 
-        final StreamThread thread = createStreamThread(CLIENT_ID, new 
StreamsConfig(configProps(true)), true);
+        final StreamThread thread = createStreamThread(
+            CLIENT_ID,
+            new StreamsConfig(configProps(true)),
+            new MockTime(1),
+            true
+        );
 
         thread.start();
         TestUtils.waitForCondition(
@@ -1353,42 +1383,68 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldNotReturnDataAfterTaskMigrated() {
+    public void shouldNotReturnDataAfterTaskMigratedWithStateUpdaterEnabled() {
+        shouldNotReturnDataAfterTaskMigrated(true);
+    }
+
+    @Test
+    public void shouldNotReturnDataAfterTaskMigratedWithStateUpdaterDisabled() 
{
+        shouldNotReturnDataAfterTaskMigrated(false);
+    }
+
+    private void shouldNotReturnDataAfterTaskMigrated(final boolean 
stateUpdaterEnabled) {
         final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
         
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+        expect(taskManager.allOwnedTasks()).andStubReturn(emptyMap());
         final InternalTopologyBuilder internalTopologyBuilder = 
EasyMock.createNiceMock(InternalTopologyBuilder.class);
         
expect(internalTopologyBuilder.fullSourceTopicNames()).andReturn(Collections.singletonList(topic1)).times(2);
 
         final MockConsumer<byte[], byte[]> consumer = new 
MockConsumer<>(OffsetResetStrategy.LATEST);
+        final MockConsumer<byte[], byte[]> restoreConsumer = new 
MockConsumer<>(OffsetResetStrategy.EARLIEST);
 
         consumer.subscribe(Collections.singletonList(topic1), new 
MockRebalanceListener());
         consumer.rebalance(Collections.singletonList(t1p1));
         consumer.updateEndOffsets(Collections.singletonMap(t1p1, 10L));
         consumer.seekToEnd(Collections.singletonList(t1p1));
 
-        final ChangelogReader changelogReader = new MockChangelogReader() {
-            @Override
-            public long restore(final Map<TaskId, Task> tasks) {
+        final TaskMigratedException taskMigratedException = new 
TaskMigratedException(
+            "Changelog restore found task migrated", new 
RuntimeException("restore task migrated"));
+        ChangelogReader changelogReader = this.changelogReader;
+        if (stateUpdaterEnabled) {
+            expect(taskManager.checkStateUpdater(anyLong(), 
anyObject())).andAnswer(() -> {
                 consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new 
byte[0], new byte[0]));
                 consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new 
byte[1], new byte[0]));
 
-                throw new TaskMigratedException(
-                    "Changelog restore found task migrated", new 
RuntimeException("restore task migrated"));
-            }
-        };
+                throw taskMigratedException;
+            });
+        } else {
+            changelogReader = new MockChangelogReader() {
+                @Override
+                public long restore(final Map<TaskId, Task> tasks) {
+                    consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new 
byte[0], new byte[0]));
+                    consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new 
byte[1], new byte[0]));
+
+                    throw taskMigratedException;
+                }
+            };
+        }
 
         taskManager.handleLostAll();
+
         EasyMock.replay(taskManager, internalTopologyBuilder);
 
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
 
+        final Properties props = configProps(false);
+        props.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
+        final StreamsConfig config = new StreamsConfig(props);
         final StreamThread thread = new StreamThread(
-            mockTime,
+            new MockTime(1),
             config,
             null,
             consumer,
-            consumer,
+            restoreConsumer,
             changelogReader,
             null,
             taskManager,
@@ -1584,8 +1640,20 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldReinitializeRevivedTasksInAnyState() {
-        final StreamThread thread = createStreamThread(CLIENT_ID, new 
StreamsConfig(configProps(false)), false);
+    public void 
shouldReinitializeRevivedTasksInAnyStateWithStateUpdaterEnabled() throws 
Exception {
+        shouldReinitializeRevivedTasksInAnyState(true);
+    }
+
+    @Test
+    public void 
shouldReinitializeRevivedTasksInAnyStateWithStateUpdaterDisabled() throws 
Exception {
+        shouldReinitializeRevivedTasksInAnyState(false);
+    }
+
+    private void shouldReinitializeRevivedTasksInAnyState(final boolean 
stateUpdaterEnabled) throws Exception {
+        final Properties streamsConfigProps = configProps(false);
+        streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, 
stateUpdaterEnabled);
+        final StreamsConfig config = new StreamsConfig(streamsConfigProps);
+        final StreamThread thread = createStreamThread(CLIENT_ID, config, new 
MockTime(1), false);
 
         final String storeName = "store";
         final String storeChangelog = "stream-thread-test-store-changelog";
@@ -1652,12 +1720,30 @@ public class StreamThreadTest {
         thread.runOnce();
 
         // the third actually polls, processes the record, and throws the 
corruption exception
+        if (stateUpdaterEnabled) {
+            TestUtils.waitForCondition(
+                () -> thread.taskManager().checkStateUpdater(
+                    mockTime.milliseconds(),
+                    topicPartitions -> 
mockConsumer.seekToBeginning(singleton(t1p1))
+                ),
+                10 * 1000,
+                "State updater never returned tasks.");
+        }
         addRecord(mockConsumer, 0L);
         shouldThrow.set(true);
         final TaskCorruptedException taskCorruptedException = 
assertThrows(TaskCorruptedException.class, thread::runOnce);
 
         // Now, we can handle the corruption
         
thread.taskManager().handleCorruption(taskCorruptedException.corruptedTasks());
+        if (stateUpdaterEnabled) {
+            TestUtils.waitForCondition(
+                () -> thread.taskManager().checkStateUpdater(
+                    mockTime.milliseconds(),
+                    topicPartitions -> 
mockConsumer.seekToBeginning(singleton(t1p1))
+                ),
+                10 * 1000,
+                "State updater never returned tasks.");
+        }
 
         // again, complete the restoration
         thread.runOnce();
@@ -1836,7 +1922,7 @@ public class StreamThreadTest {
                               
.groupByKey().count(Materialized.as("count-one"));
 
         internalStreamsBuilder.buildAndOptimizeTopology();
-        final StreamThread thread = createStreamThread(CLIENT_ID, config, 
false);
+        final StreamThread thread = createStreamThread(CLIENT_ID, config, new 
MockTime(1), false);
         final MockConsumer<byte[], byte[]> restoreConsumer = 
clientSupplier.restoreConsumer;
         restoreConsumer.updatePartitions(
             STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG,
@@ -1883,6 +1969,10 @@ public class StreamThreadTest {
         final String storeName2 = "table-two";
         final String changelogName1 = APPLICATION_ID + "-" + storeName1 + 
"-changelog";
         final String changelogName2 = APPLICATION_ID + "-" + storeName2 + 
"-changelog";
+        final Properties props = configProps(false);
+        // Updating standby tasks on the stream thread only happens when the 
state updater is disabled
+        props.put(InternalConfig.STATE_UPDATER_ENABLED, false);
+        final StreamsConfig config = new StreamsConfig(props);
         final StreamThread thread = createStreamThread(CLIENT_ID, config, 
false);
         final MockConsumer<byte[], byte[]> restoreConsumer = 
clientSupplier.restoreConsumer;
 
@@ -2004,6 +2094,10 @@ public class StreamThreadTest {
         final String storeName2 = "table-two";
         final String changelogName1 = APPLICATION_ID + "-" + storeName1 + 
"-changelog";
         final String changelogName2 = APPLICATION_ID + "-" + storeName2 + 
"-changelog";
+        final Properties props = configProps(false);
+        // Updating standby tasks on the stream thread only happens when the 
state updater is disabled
+        props.put(InternalConfig.STATE_UPDATER_ENABLED, false);
+        final StreamsConfig config = new StreamsConfig(props);
         final StreamThread thread = createStreamThread(CLIENT_ID, config, 
false);
         final MockConsumer<byte[], byte[]> restoreConsumer = 
clientSupplier.restoreConsumer;
 
@@ -2243,13 +2337,25 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void 
shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() throws 
Exception {
+    public void 
shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestoreWithStateUpdaterEnabled()
 throws Exception {
+        shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(true);
+    }
+
+    @Test
+    public void 
shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestoreWithStateUpdaterDiabled()
 throws Exception {
+        
shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(false);
+    }
+
+    private void 
shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(final boolean 
stateUpdaterEnabled) throws Exception {
         internalStreamsBuilder.stream(Collections.singleton("topic"), consumed)
             .groupByKey()
             .count(Materialized.as("count"));
         internalStreamsBuilder.buildAndOptimizeTopology();
 
-        final StreamThread thread = createStreamThread("clientId", config, 
false);
+        final Properties props = configProps(false);
+        props.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
+        final StreamsConfig config = new StreamsConfig(props);
+        final StreamThread thread = createStreamThread("clientId", config, new 
MockTime(1), false);
         final MockConsumer<byte[], byte[]> mockConsumer = 
(MockConsumer<byte[], byte[]>) thread.mainConsumer();
         final MockConsumer<byte[], byte[]> mockRestoreConsumer = 
(MockConsumer<byte[], byte[]>) thread.restoreConsumer();
         final MockAdminClient mockAdminClient = (MockAdminClient) 
thread.adminClient();
@@ -2276,6 +2382,8 @@ public class StreamThreadTest {
             )
         );
         
mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 
0L));
+        mockConsumer.subscribe(mkSet(topicPartition.topic()));
+        mockConsumer.rebalance(Collections.singleton(topicPartition));
 
         mockRestoreConsumer.updatePartitions(
             "stream-thread-test-count-changelog",
@@ -2325,6 +2433,12 @@ public class StreamThreadTest {
                 }
             });
 
+            // after handling the exception and reviving the task, with the 
state updater the changelog topic is
+            // registered again with the changelog reader
+            TestUtils.waitForCondition(
+                () -> mockRestoreConsumer.assignment().size() == 1,
+                "Never get the assignment");
+
             // after handling the exception and reviving the task, the position
             // should be reset to the beginning.
             TestUtils.waitForCondition(
@@ -2344,12 +2458,18 @@ public class StreamThreadTest {
                 "K2".getBytes(),
                 "V2".getBytes()));
 
-            TestUtils.waitForCondition(
-                () -> {
-                    mockRestoreConsumer.assign(changelogPartitionSet);
-                    return mockRestoreConsumer.position(changelogPartition) == 
2L;
-                },
-                "Never finished restore");
+            if (stateUpdaterEnabled) {
+                TestUtils.waitForCondition(
+                    () -> mockRestoreConsumer.assignment().size() == 0,
+                    "Never get the assignment");
+            } else {
+                TestUtils.waitForCondition(
+                    () -> {
+                        mockRestoreConsumer.assign(changelogPartitionSet);
+                        return 
mockRestoreConsumer.position(changelogPartition) == 2L;
+                    },
+                    "Never finished restore");
+            }
         } finally {
             thread.shutdown();
             thread.join(10000);
@@ -3190,6 +3310,7 @@ public class StreamThreadTest {
     @Test
     public void 
shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater() {
         final Properties streamsConfigProps = 
StreamsTestUtils.getStreamsConfig();
+        streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, false);
         final StreamThread streamThread = setUpThread(streamsConfigProps);
         streamThread.setState(State.STARTING);
         streamThread.setState(State.PARTITIONS_ASSIGNED);
@@ -3320,7 +3441,7 @@ public class StreamThreadTest {
         return null;
     }
     StandbyTask standbyTask(final TaskManager taskManager, final 
TopicPartition partition) {
-        final Stream<Task> standbys = 
taskManager.allTasks().values().stream().filter(t -> !t.isActive());
+        final Stream<Task> standbys = 
taskManager.standbyTaskMap().values().stream();
         for (final Task task : (Iterable<Task>) standbys::iterator) {
             if (task.inputPartitions().contains(partition)) {
                 return (StandbyTask) task;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
index 68658d80f60..1cae7db0f42 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
@@ -72,7 +72,7 @@ public class TaskExecutorTest {
         final Tasks tasks = mock(Tasks.class);
         final ConsumerGroupMetadata groupMetadata = 
mock(ConsumerGroupMetadata.class);
         final TaskManager taskManager = mock(TaskManager.class);
-        
when(taskManager.activeTaskIterable()).thenReturn(Collections.singletonList(task));
+        
when(taskManager.activeRunningTaskIterable()).thenReturn(Collections.singletonList(task));
         when(taskManager.consumerGroupMetadata()).thenReturn(groupMetadata);
 
         final StreamsProducer producer = mock(StreamsProducer.class);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index d534077cf2e..a876d8ab4e1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -590,28 +590,49 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void 
shouldAssignActiveTaskInTasksRegistryToBeRecycledWithStateUpdaterEnabled() {
-        final StreamTask activeTaskToRecycle = statefulTask(taskId03, 
taskId03ChangelogPartitions)
-            .inState(State.SUSPENDED)
-            .withInputPartitions(taskId03Partitions).build();
-        final StandbyTask recycledStandbyTask = standbyTask(taskId03, 
taskId03ChangelogPartitions)
-            .inState(State.CREATED)
-            .withInputPartitions(taskId03Partitions).build();
-        final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+    public void 
shouldAddRecycledStandbyTaskfromActiveToPendingTasksToInitWithStateUpdaterEnabled()
 {
+        final StreamTask activeTaskToRecycle = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+            .withInputPartitions(taskId01Partitions)
+            .inState(State.RUNNING).build();
+        final StandbyTask standbyTask = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+            .withInputPartitions(taskId01Partitions)
+            .inState(State.CREATED).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
         when(tasks.allTasks()).thenReturn(mkSet(activeTaskToRecycle));
+        
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, 
taskId01Partitions))
+            .thenReturn(standbyTask);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, 
activeTaskToRecycle.inputPartitions()))
-            .thenReturn(recycledStandbyTask);
 
-        taskManager.handleAssignment(
-            Collections.emptyMap(),
-            mkMap(mkEntry(activeTaskToRecycle.id(), 
activeTaskToRecycle.inputPartitions()))
-        );
+        taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, 
taskId01Partitions)));
 
+        Mockito.verify(activeTaskToRecycle).prepareCommit();
         
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
+        Mockito.verify(tasks).addPendingTasksToInit(mkSet(standbyTask));
+        Mockito.verify(tasks).removeTask(activeTaskToRecycle);
         Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
+        Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+    }
+
+    @Test
+    public void 
shouldAddRecycledStandbyTaskfromActiveToTaskRegistryWithStateUpdaterDisabled() {
+        final StreamTask activeTaskToRecycle = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+            .withInputPartitions(taskId01Partitions)
+            .inState(State.RUNNING).build();
+        final StandbyTask standbyTask = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+            .withInputPartitions(taskId01Partitions)
+            .inState(State.CREATED).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.allTasks()).thenReturn(mkSet(activeTaskToRecycle));
+        
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, 
taskId01Partitions))
+            .thenReturn(standbyTask);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
+
+        taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, 
taskId01Partitions)));
+
         Mockito.verify(activeTaskToRecycle).prepareCommit();
-        Mockito.verify(tasks).replaceActiveWithStandby(recycledStandbyTask);
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
+        Mockito.verify(tasks).replaceActiveWithStandby(standbyTask);
+        Mockito.verify(activeTaskCreator).createTasks(consumer, 
Collections.emptyMap());
         Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
     }
 

Reply via email to