This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c32d2338a7e KAFKA-10199: Enable state updater by default (#13927)
c32d2338a7e is described below
commit c32d2338a7e0079e539b74eb16f0095380a1ce85
Author: Bruno Cadonna <[email protected]>
AuthorDate: Wed Oct 4 13:58:44 2023 +0200
KAFKA-10199: Enable state updater by default (#13927)
Now that the implementation for the state updater is done, we can enable it
by default.
This PR enables the state updater by default and fixes code that made
assumptions that are not true when the state updater is enabled (mainly tests).
Reviewers: Lucas Brutschy <[email protected]>, Matthias J. Sax
<[email protected]>, Walker Carlson <[email protected]>
---
.../org/apache/kafka/streams/StreamsConfig.java | 4 +
.../streams/processor/internals/ReadOnlyTask.java | 2 +-
.../processor/internals/StoreChangelogReader.java | 3 +-
.../streams/processor/internals/StreamThread.java | 9 +-
.../streams/processor/internals/TaskExecutor.java | 2 +-
.../streams/processor/internals/TaskManager.java | 57 +++++--
.../integration/EosV2UpgradeIntegrationTest.java | 3 +-
...yInnerJoinCustomPartitionerIntegrationTest.java | 5 +-
.../integration/LagFetchIntegrationTest.java | 17 +-
.../integration/NamedTopologyIntegrationTest.java | 85 +++++-----
.../PurgeRepartitionTopicIntegrationTest.java | 8 +-
.../integration/utils/IntegrationTestUtils.java | 23 ++-
.../internals/DefaultStateUpdaterTest.java | 4 +-
.../processor/internals/ReadOnlyTaskTest.java | 1 +
.../internals/StoreChangelogReaderTest.java | 56 +++----
.../processor/internals/StreamThreadTest.java | 181 +++++++++++++++++----
.../processor/internals/TaskExecutorTest.java | 2 +-
.../processor/internals/TaskManagerTest.java | 51 ++++--
18 files changed, 356 insertions(+), 157 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b8defc5ee0c..2b36a0820b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1198,6 +1198,10 @@ public class StreamsConfig extends AbstractConfig {
// Private API to enable the state updater (i.e. state updating on a
dedicated thread)
public static final String STATE_UPDATER_ENABLED =
"__state.updater.enabled__";
+ public static boolean getStateUpdaterEnabled(final Map<String, Object>
configs) {
+ return InternalConfig.getBoolean(configs,
InternalConfig.STATE_UPDATER_ENABLED, true);
+ }
+
public static boolean getBoolean(final Map<String, Object> configs,
final String key, final boolean defaultValue) {
final Object value = configs.getOrDefault(key, defaultValue);
if (value instanceof Boolean) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
index 2fb37564155..cfac1f13b21 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
@@ -211,7 +211,7 @@ public class ReadOnlyTask implements Task {
@Override
public StateStore getStore(final String name) {
- throw new UnsupportedOperationException("This task is read-only");
+ return task.getStore(name);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 2a8c215e9b1..1097402ebb3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -230,8 +230,7 @@ public class StoreChangelogReader implements
ChangelogReader {
this.restoreConsumer = restoreConsumer;
this.stateRestoreListener = stateRestoreListener;
- this.stateUpdaterEnabled =
- InternalConfig.getBoolean(config.originals(),
InternalConfig.STATE_UPDATER_ENABLED, false);
+ this.stateUpdaterEnabled =
InternalConfig.getStateUpdaterEnabled(config.originals());
this.groupId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
this.pollTime =
Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a7ac2566248..bfce0b32608 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -374,8 +374,7 @@ public class StreamThread extends Thread {
final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes,
streamsMetrics);
- final boolean stateUpdaterEnabled =
- InternalConfig.getBoolean(config.originals(),
InternalConfig.STATE_UPDATER_ENABLED, false);
+ final boolean stateUpdaterEnabled =
InternalConfig.getStateUpdaterEnabled(config.originals());
final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator(
topologyMetadata,
config,
@@ -558,7 +557,7 @@ public class StreamThread extends Thread {
this.numIterations = 1;
this.eosEnabled = eosEnabled(config);
- this.stateUpdaterEnabled =
InternalConfig.getBoolean(config.originals(),
InternalConfig.STATE_UPDATER_ENABLED, false);
+ this.stateUpdaterEnabled =
InternalConfig.getStateUpdaterEnabled(config.originals());
}
private static final class InternalConsumerConfig extends ConsumerConfig {
@@ -859,7 +858,7 @@ public class StreamThread extends Thread {
if (log.isDebugEnabled()) {
log.debug("Committed all active tasks {} and standby
tasks {} in {}ms",
- taskManager.activeTaskIds(),
taskManager.standbyTaskIds(), commitLatency);
+ taskManager.activeRunningTaskIds(),
taskManager.standbyTaskIds(), commitLatency);
}
}
@@ -1128,7 +1127,7 @@ public class StreamThread extends Thread {
if (now - lastCommitMs > commitTimeMs) {
if (log.isDebugEnabled()) {
log.debug("Committing all active tasks {} and standby tasks {}
since {}ms has elapsed (commit interval is {}ms)",
- taskManager.activeTaskIds(),
taskManager.standbyTaskIds(), now - lastCommitMs, commitTimeMs);
+ taskManager.activeRunningTaskIds(),
taskManager.standbyTaskIds(), now - lastCommitMs, commitTimeMs);
}
committed = taskManager.commit(
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index 56359676718..3ec2035a5cd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -178,7 +178,7 @@ public class TaskExecutor {
final Set<TaskId> corruptedTasks = new HashSet<>();
if (executionMetadata.processingMode() == EXACTLY_ONCE_ALPHA) {
- for (final Task task : taskManager.activeTaskIterable()) {
+ for (final Task task : taskManager.activeRunningTaskIterable()) {
final Map<TopicPartition, OffsetAndMetadata>
taskOffsetsToCommit = offsetsPerTask.getOrDefault(task, emptyMap());
if (!taskOffsetsToCommit.isEmpty() ||
taskManager.streamsProducerForTask(task.id()).transactionInFlight()) {
try {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index e117e3bd5ea..cf6f6b6326f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -572,7 +572,9 @@ public class TaskManager {
while (iter.hasNext()) {
final Map.Entry<TaskId, Set<TopicPartition>> entry = iter.next();
final TaskId taskId = entry.getKey();
- if (taskId.topologyName() != null &&
!topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
+ final boolean taskIsOwned = tasks.allTaskIds().contains(taskId)
+ || (stateUpdater != null &&
stateUpdater.getTasks().stream().anyMatch(task -> task.id() == taskId));
+ if (taskId.topologyName() != null && !taskIsOwned &&
!topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
log.info("Cannot create the assigned task {} since it's
topology name cannot be recognized, will put it " +
"aside as pending for now and create later when
topology metadata gets refreshed", taskId);
pendingTasks.put(taskId, entry.getValue());
@@ -649,7 +651,12 @@ public class TaskManager {
try {
if (oldTask.isActive()) {
final StandbyTask standbyTask =
convertActiveToStandby((StreamTask) oldTask, inputPartitions);
- tasks.replaceActiveWithStandby(standbyTask);
+ if (stateUpdater != null) {
+ tasks.removeTask(oldTask);
+
tasks.addPendingTasksToInit(Collections.singleton(standbyTask));
+ } else {
+ tasks.replaceActiveWithStandby(standbyTask);
+ }
} else {
final StreamTask activeTask =
convertStandbyToActive((StandbyTask) oldTask, inputPartitions);
tasks.replaceStandbyWithActive(activeTask);
@@ -915,7 +922,7 @@ public class TaskManager {
}
private void handleRestoredTasksFromStateUpdater(final long now,
- final
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
+ final
java.util.function.Consumer<Set<TopicPartition>> offsetResetter) {
final Map<TaskId, RuntimeException> taskExceptions = new
LinkedHashMap<>();
final Set<Task> tasksToCloseDirty = new
TreeSet<>(Comparator.comparing(Task::id));
@@ -964,7 +971,7 @@ public class TaskManager {
final Map<Task, Map<TopicPartition, OffsetAndMetadata>>
consumedOffsetsPerTask = new HashMap<>();
final AtomicReference<RuntimeException> firstException = new
AtomicReference<>(null);
- for (final Task task : activeTaskIterable()) {
+ for (final Task task : activeRunningTaskIterable()) {
if
(remainingRevokedPartitions.containsAll(task.inputPartitions())) {
// when the task input partitions are included in the revoked
list,
// this is an active task and should be revoked
@@ -1557,12 +1564,16 @@ public class TaskManager {
.collect(Collectors.toSet());
}
- Set<TaskId> standbyTaskIds() {
- return standbyTaskStream()
+ Set<TaskId> activeRunningTaskIds() {
+ return activeRunningTaskStream()
.map(Task::id)
.collect(Collectors.toSet());
}
+ Set<TaskId> standbyTaskIds() {
+ return standbyTaskStream().map(Task::id).collect(Collectors.toSet());
+ }
+
Map<TaskId, Task> allTasks() {
// not bothering with an unmodifiable map, since the tasks themselves
are mutable, but
// if any outside code modifies the map or the tasks, it would be a
severe transgression.
@@ -1615,7 +1626,21 @@ public class TaskManager {
return activeTaskStream().collect(Collectors.toList());
}
+ List<Task> activeRunningTaskIterable() {
+ return activeRunningTaskStream().collect(Collectors.toList());
+ }
+
private Stream<Task> activeTaskStream() {
+ if (stateUpdater != null) {
+ return Stream.concat(
+ activeRunningTaskStream(),
+ stateUpdater.getTasks().stream().filter(Task::isActive)
+ );
+ }
+ return activeRunningTaskStream();
+ }
+
+ private Stream<Task> activeRunningTaskStream() {
return tasks.allTasks().stream().filter(Task::isActive);
}
@@ -1628,7 +1653,15 @@ public class TaskManager {
}
private Stream<Task> standbyTaskStream() {
- return tasks.allTasks().stream().filter(t -> !t.isActive());
+ final Stream<Task> standbyTasksInTaskRegistry =
tasks.allTasks().stream().filter(t -> !t.isActive());
+ if (stateUpdater != null) {
+ return Stream.concat(
+ stateUpdater.getStandbyTasks().stream(),
+ standbyTasksInTaskRegistry
+ );
+ } else {
+ return standbyTasksInTaskRegistry;
+ }
}
// For testing only.
@@ -1704,9 +1737,9 @@ public class TaskManager {
if (rebalanceInProgress) {
return -1;
} else {
- for (final Task task : activeTaskIterable()) {
+ for (final Task task : activeRunningTaskIterable()) {
if (task.commitRequested() && task.commitNeeded()) {
- return commit(activeTaskIterable());
+ return commit(activeRunningTaskIterable());
}
}
return 0;
@@ -1791,7 +1824,7 @@ public class TaskManager {
}
void recordTaskProcessRatio(final long totalProcessLatencyMs, final long
now) {
- for (final Task task : activeTaskIterable()) {
+ for (final Task task : activeRunningTaskIterable()) {
task.recordProcessTimeRatioAndBufferSize(totalProcessLatencyMs,
now);
}
}
@@ -1815,7 +1848,7 @@ public class TaskManager {
}
final Map<TopicPartition, RecordsToDelete> recordsToDelete = new
HashMap<>();
- for (final Task task : activeTaskIterable()) {
+ for (final Task task : activeRunningTaskIterable()) {
for (final Map.Entry<TopicPartition, Long> entry :
task.purgeableOffsets().entrySet()) {
recordsToDelete.put(entry.getKey(),
RecordsToDelete.beforeOffset(entry.getValue()));
}
@@ -1915,7 +1948,7 @@ public class TaskManager {
}
boolean needsInitializationOrRestoration() {
- return
activeTaskIterable().stream().anyMatch(Task::needsInitializationOrRestoration);
+ return
activeTaskStream().anyMatch(Task::needsInitializationOrRestoration);
}
// for testing only
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
index aacb9e0483b..5d8e4bf3190 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EosV2UpgradeIntegrationTest.java
@@ -783,7 +783,7 @@ public class EosV2UpgradeIntegrationTest {
commitRequested.set(0);
stateTransitions1.clear();
stateTransitions2.clear();
- streams2V2 = getKafkaStreams(APP_DIR_1,
StreamsConfig.EXACTLY_ONCE_V2);
+ streams2V2 = getKafkaStreams(APP_DIR_2,
StreamsConfig.EXACTLY_ONCE_V2);
streams2V2.setStateListener(
(newState, oldState) ->
stateTransitions2.add(KeyValue.pair(oldState, newState))
);
@@ -1149,7 +1149,6 @@ public class EosV2UpgradeIntegrationTest {
keys.add(row.key);
}
}
-
return true;
},
MAX_WAIT_TIME_MS,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
index a065cf1d3f5..12a5056730d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
@@ -207,11 +207,12 @@ public class
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
});
}
- startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120));
+ for (final KafkaStreams stream: kafkaStreamsList) {
+ stream.start();
+ }
// the streams applications should have shut down into `ERROR` due to
the IllegalStateException
waitForApplicationState(Arrays.asList(streams, streamsTwo,
streamsThree), KafkaStreams.State.ERROR, ofSeconds(60));
-
}
private void verifyKTableKTableJoin(final Set<KeyValue<String, String>>
expectedResult) throws Exception {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
index 03461fcbbc5..d2016901888 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
@@ -274,8 +274,9 @@ public class LagFetchIntegrationTest {
final StreamsBuilder builder = new StreamsBuilder();
final KTable<String, Long> t1 = builder.table(inputTopicName,
Materialized.as(stateStoreName));
t1.toStream().to(outputTopicName);
- final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+ final KafkaStreams streams = new KafkaStreams(builder.build(), props);
+ final AtomicReference<LagInfo> zeroLagRef = new AtomicReference<>();
try {
// First start up the active.
TestUtils.waitForCondition(() ->
streams.allLocalStorePartitionLags().size() == 0,
@@ -291,7 +292,6 @@ public class LagFetchIntegrationTest {
WAIT_TIMEOUT_MS);
// check for proper lag values.
- final AtomicReference<LagInfo> zeroLagRef = new
AtomicReference<>();
TestUtils.waitForCondition(() -> {
final Map<String, Map<Integer, LagInfo>> offsetLagInfoMap =
streams.allLocalStorePartitionLags();
assertThat(offsetLagInfoMap.size(), equalTo(1));
@@ -312,9 +312,14 @@ public class LagFetchIntegrationTest {
Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(f -> assertTrue(f.delete(), "Some state " + f + "
could not be deleted"));
+ } finally {
+ streams.close();
+ streams.cleanUp();
+ }
- // wait till the lag goes down to 0
- final KafkaStreams restartedStreams = new
KafkaStreams(builder.build(), props);
+ // wait till the lag goes down to 0
+ final KafkaStreams restartedStreams = new
KafkaStreams(builder.build(), props);
+ try {
// set a state restoration listener to track progress of
restoration
final CountDownLatch restorationEndLatch = new CountDownLatch(1);
final Map<String, Map<Integer, LagInfo>> restoreStartLagInfo = new
HashMap<>();
@@ -356,8 +361,8 @@ public class LagFetchIntegrationTest {
assertThat(restoreEndLagInfo.get(stateStoreName).get(0),
equalTo(zeroLagRef.get()));
} finally {
- streams.close();
- streams.cleanUp();
+ restartedStreams.close();
+ restartedStreams.cleanUp();
}
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
index de1241e9536..ee290211b6d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
@@ -59,6 +59,7 @@ import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.TestUtils;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
@@ -86,6 +87,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.apache.kafka.streams.KeyQueryMetadata.NOT_AVAILABLE;
import static org.apache.kafka.streams.KeyValue.pair;
+import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.DEFAULT_TIMEOUT;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
@@ -172,9 +174,9 @@ public class NamedTopologyIntegrationTest {
private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C",
400L), pair("C", -50L));
private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
- asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of
count operation with caching
+ asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L),
pair("C", 2L));
private static final List<KeyValue<String, Long>> SUM_OUTPUT_DATA =
- asList(pair("B", 200L), pair("A", 400L), pair("C", 350L)); // output
of summation with caching
+ asList(pair("A", 100L), pair("B", 200L), pair("A", 400L), pair("C",
400L), pair("C", 350L));
private static final String TOPIC_PREFIX = "unique_topic_prefix";
private final KafkaClientSupplier clientSupplier = new
DefaultKafkaClientSupplier();
@@ -207,6 +209,7 @@ public class NamedTopologyIntegrationTest {
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
1000L);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10
* 1000);
+
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.InternalConfig.TOPIC_PREFIX_ALTERNATIVE,
TOPIC_PREFIX);
return streamsConfiguration;
}
@@ -318,7 +321,7 @@ public class NamedTopologyIntegrationTest {
.toStream().to(OUTPUT_STREAM_1);
streams.addNamedTopology(topology1Builder.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
- final List<KeyValue<String, Long>> results =
waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3);
+ final List<KeyValue<String, Long>> results =
waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 5);
assertThat(results, equalTo(COUNT_OUTPUT_DATA));
final Set<String> allTopics = CLUSTER.getAllTopicsInCluster();
@@ -336,9 +339,9 @@ public class NamedTopologyIntegrationTest {
streams.addNamedTopology(topology3Builder.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_3, 3), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_3, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(CLUSTER.getAllTopicsInCluster().containsAll(asList(changelog1,
changelog2, changelog3)), is(true));
}
@@ -365,7 +368,7 @@ public class NamedTopologyIntegrationTest {
streams.addNamedTopology(topology2Builder.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
SINGLE_PARTITION_OUTPUT_STREAM, 3), equalTo(COUNT_OUTPUT_DATA));
final ReadOnlyKeyValueStore<String, Long> store =
@@ -437,7 +440,7 @@ public class NamedTopologyIntegrationTest {
streams.start();
streams.addNamedTopology(topology1Builder.build()).all().get();
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
}
@Test
@@ -448,8 +451,10 @@ public class NamedTopologyIntegrationTest {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
streams.addNamedTopology(topology2Builder.build()).all().get();
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
+
IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams),
State.RUNNING, Duration.ofMillis(DEFAULT_TIMEOUT));
+
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
}
@Test
@@ -463,9 +468,11 @@ public class NamedTopologyIntegrationTest {
streams.addNamedTopology(topology3Builder.build()).all().get();
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_3, 3), equalTo(COUNT_OUTPUT_DATA));
+
IntegrationTestUtils.waitForApplicationState(Collections.singletonList(streams),
State.RUNNING, Duration.ofMillis(DEFAULT_TIMEOUT));
+
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_3, 5), equalTo(COUNT_OUTPUT_DATA));
}
@Test
@@ -481,14 +488,14 @@ public class NamedTopologyIntegrationTest {
streams2.addNamedTopology(topology1Builder2.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(asList(streams,
streams2));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
final AddNamedTopologyResult result =
streams.addNamedTopology(topology2Builder.build());
final AddNamedTopologyResult result2 =
streams2.addNamedTopology(topology2Builder2.build());
result.all().get();
result2.all().get();
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
}
@Test
@@ -501,7 +508,7 @@ public class NamedTopologyIntegrationTest {
streams2.addNamedTopology(topology1Builder2.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(asList(streams,
streams2));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
final RemoveNamedTopologyResult result =
streams.removeNamedTopology(TOPOLOGY_1, true);
streams2.removeNamedTopology(TOPOLOGY_1, true).all().get();
@@ -537,7 +544,7 @@ public class NamedTopologyIntegrationTest {
assertThat(streams.getAllTopologies(),
equalTo(singleton(topology2Client1)));
assertThat(streams2.getAllTopologies(),
equalTo(singleton(topology2Client2)));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
}
@Test
@@ -559,7 +566,7 @@ public class NamedTopologyIntegrationTest {
produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA);
produceToInputTopics(DELAYED_INPUT_STREAM_2, STANDARD_INPUT_DATA);
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
} finally {
CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1,
DELAYED_INPUT_STREAM_2);
}
@@ -578,8 +585,8 @@ public class NamedTopologyIntegrationTest {
streams.addNamedTopology(topology1Builder.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
streams.removeNamedTopology(TOPOLOGY_1).all().get();
streams.cleanUpNamedTopology(TOPOLOGY_1);
@@ -592,8 +599,8 @@ public class NamedTopologyIntegrationTest {
produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA);
streams.addNamedTopology(topology1Builder2.build()).all().get();
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
} finally {
CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1);
@@ -610,9 +617,9 @@ public class NamedTopologyIntegrationTest {
streams.addNamedTopology(topology3Builder.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_3, 3), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_3, 5), equalTo(COUNT_OUTPUT_DATA));
}
@Test
@@ -625,9 +632,9 @@ public class NamedTopologyIntegrationTest {
streams.addNamedTopology(topology3Builder.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_3, 3), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_3, 5), equalTo(COUNT_OUTPUT_DATA));
}
@Test
@@ -642,8 +649,8 @@ public class NamedTopologyIntegrationTest {
final NamedTopology namedTopology = topology1Builder.build();
streams.addNamedTopology(namedTopology).all().get();
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
streams.removeNamedTopology("topology-1", true).all().get();
streams.cleanUpNamedTopology("topology-1");
@@ -662,8 +669,8 @@ public class NamedTopologyIntegrationTest {
final NamedTopology namedTopologyDup = topology1BuilderDup.build();
streams.addNamedTopology(namedTopologyDup).all().get();
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
} finally {
CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
}
@@ -680,8 +687,8 @@ public class NamedTopologyIntegrationTest {
final NamedTopology namedTopology = topology1Builder.build();
streams.addNamedTopology(namedTopology).all().get();
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
streams.removeNamedTopology(TOPOLOGY_1, true).all().get();
streams.cleanUpNamedTopology(TOPOLOGY_1);
@@ -700,8 +707,8 @@ public class NamedTopologyIntegrationTest {
final NamedTopology namedTopologyDup = topology1BuilderDup.build();
streams.addNamedTopology(namedTopologyDup).all().get();
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
}
@@ -723,7 +730,7 @@ public class NamedTopologyIntegrationTest {
streams2.addNamedTopology(topology1Builder2.build());
IntegrationTestUtils.startApplicationAndWaitUntilRunning(asList(streams,
streams2));
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 5), equalTo(COUNT_OUTPUT_DATA));
topology2Builder.stream(NEW_STREAM).groupBy((k, v) ->
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
topology2Builder2.stream(NEW_STREAM).groupBy((k, v) ->
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
@@ -746,7 +753,7 @@ public class NamedTopologyIntegrationTest {
CLUSTER.createTopic(NEW_STREAM, 2, 1);
produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
- assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
+ assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_2, 5), equalTo(COUNT_OUTPUT_DATA));
// Make sure the threads were not actually killed and replaced
assertThat(streams.metadataForLocalThreads().size(), equalTo(2));
@@ -793,7 +800,7 @@ public class NamedTopologyIntegrationTest {
produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
final List<KeyValue<String, Integer>> output =
- waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 3);
+ waitUntilMinKeyValueRecordsReceived(consumerConfig,
OUTPUT_STREAM_1, 5);
output.retainAll(COUNT_OUTPUT_DATA);
assertThat(output, equalTo(COUNT_OUTPUT_DATA));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index 774fa57b988..9a6b21d8a1f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
@@ -86,7 +87,7 @@ public class PurgeRepartitionTopicIntegrationTest {
}
- private final Time time = CLUSTER.time;
+ private final Time time = new MockTime(1);
private class RepartitionTopicCreatedWithExpectedConfigs implements
TestCondition {
@Override
@@ -212,10 +213,11 @@ public class PurgeRepartitionTopicIntegrationTest {
);
// we need long enough timeout to by-pass the log manager's
InitialTaskDelayMs, which is hard-coded on server side
+ final long waitForPurgeMs = 60000;
TestUtils.waitForCondition(
new RepartitionTopicVerified(currentSize -> currentSize <=
PURGE_SEGMENT_BYTES),
- 60000,
- "Repartition topic " + REPARTITION_TOPIC + " not purged data after
60000 ms."
+ waitForPurgeMs,
+ "Repartition topic " + REPARTITION_TOPIC + " not purged data after
" + waitForPurgeMs + " ms."
);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index d5bb594cdaa..9fda2e9f9d8 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -775,6 +775,7 @@ public class IntegrationTestUtils {
/**
* Wait until final key-value mappings have been consumed.
+ * Duplicate records are not considered in the comparison.
*
* @param consumerConfig Kafka Consumer configuration
* @param topic Kafka topic to consume from
@@ -791,6 +792,7 @@ public class IntegrationTestUtils {
/**
* Wait until final key-value mappings have been consumed.
+ * Duplicate records are not considered in the comparison.
*
* @param consumerConfig Kafka Consumer configuration
* @param topic Kafka topic to consume from
@@ -807,6 +809,7 @@ public class IntegrationTestUtils {
/**
* Wait until final key-value mappings have been consumed.
+ * Duplicate records are not considered in the comparison.
*
* @param consumerConfig Kafka Consumer configuration
* @param topic Kafka topic to consume from
@@ -850,15 +853,19 @@ public class IntegrationTestUtils {
// still need to check that for each key, the ordering is
expected
final Map<K, List<T>> finalAccumData = new HashMap<>();
for (final T kv : accumulatedActual) {
- finalAccumData.computeIfAbsent(
- withTimestamp ? ((KeyValueTimestamp<K, V>) kv).key() :
((KeyValue<K, V>) kv).key,
- key -> new ArrayList<>()).add(kv);
+ final K key = withTimestamp ? ((KeyValueTimestamp<K, V>)
kv).key() : ((KeyValue<K, V>) kv).key;
+ final List<T> records =
finalAccumData.computeIfAbsent(key, k -> new ArrayList<>());
+ if (!records.contains(kv)) {
+ records.add(kv);
+ }
}
final Map<K, List<T>> finalExpected = new HashMap<>();
for (final T kv : expectedRecords) {
- finalExpected.computeIfAbsent(
- withTimestamp ? ((KeyValueTimestamp<K, V>) kv).key() :
((KeyValue<K, V>) kv).key,
- key -> new ArrayList<>()).add(kv);
+ final K key = withTimestamp ? ((KeyValueTimestamp<K, V>)
kv).key() : ((KeyValue<K, V>) kv).key;
+ final List<T> records = finalExpected.computeIfAbsent(key,
k -> new ArrayList<>());
+ if (!records.contains(kv)) {
+ records.add(kv);
+ }
}
// returns true only if the remaining records in both lists
are the same and in the same order
@@ -1037,8 +1044,8 @@ public class IntegrationTestUtils {
final long millisRemaining = expectedEnd -
System.currentTimeMillis();
if (millisRemaining <= 0) {
fail(
- "Application did not reach a RUNNING state for all
streams instances. " +
- "Non-running instances: " + nonRunningStreams
+ nonRunningStreams.size() + " out of " +
streamsList.size() + " Streams clients did not reach the RUNNING state. " +
+ "Non-running Streams clients: " + nonRunningStreams
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 13ac8e48f10..1c5c4292336 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -1269,7 +1269,7 @@ class DefaultStateUpdaterTest {
}
@Test
- public void
shouldAddFailedTasksToQueueWhenRestoreThrowsTaskCorruptedException() throws
Exception {
+ public void shouldHandleTaskCorruptedExceptionAndAddFailedTasksToQueue()
throws Exception {
final StreamTask task1 = statefulTask(TASK_0_0,
mkSet(TOPIC_PARTITION_A_0)).inState(State.RESTORING).build();
final StandbyTask task2 = standbyTask(TASK_0_2,
mkSet(TOPIC_PARTITION_B_0)).inState(State.RUNNING).build();
final StreamTask task3 = statefulTask(TASK_1_0,
mkSet(TOPIC_PARTITION_C_0)).inState(State.RESTORING).build();
@@ -1293,6 +1293,8 @@ class DefaultStateUpdaterTest {
verifyRestoredActiveTasks();
verifyRemovedTasks();
verify(changelogReader).unregister(mkSet(TOPIC_PARTITION_A_0,
TOPIC_PARTITION_B_0));
+ verify(task1).markChangelogAsCorrupted(mkSet(TOPIC_PARTITION_A_0));
+ verify(task2).markChangelogAsCorrupted(mkSet(TOPIC_PARTITION_B_0));
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
index e25c7014c68..e0c8eaf4056 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ReadOnlyTaskTest.java
@@ -45,6 +45,7 @@ class ReadOnlyTaskTest {
add("changelogOffsets");
add("state");
add("id");
+ add("getStore");
}
};
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index e544f485f46..5a19d8af734 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -390,37 +390,28 @@ public class StoreChangelogReaderTest extends
EasyMockSupport {
}
@Test
- public void shouldPollWithRightTimeout() {
- final TaskId taskId = new TaskId(0, 0);
-
-
EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
-
EasyMock.expect(stateManager.changelogOffsets()).andReturn(singletonMap(tp,
5L));
- EasyMock.expect(stateManager.taskId()).andReturn(taskId).anyTimes();
- EasyMock.replay(stateManager, storeMetadata, store);
-
- consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
- adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L));
-
- final StoreChangelogReader changelogReader =
- new StoreChangelogReader(time, config, logContext, adminClient,
consumer, callback);
-
- changelogReader.register(tp, stateManager);
-
- if (type == STANDBY) {
- changelogReader.transitToUpdateStandby();
- }
+ public void shouldPollWithRightTimeoutWithStateUpdater() {
+ shouldPollWithRightTimeout(true);
+ }
- changelogReader.restore(Collections.singletonMap(taskId,
mock(Task.class)));
+ @Test
+ public void shouldPollWithRightTimeoutWithoutStateUpdater() {
+ shouldPollWithRightTimeout(false);
+ }
- if (type == ACTIVE) {
-
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)),
consumer.lastPollTimeout());
- } else {
- assertEquals(Duration.ZERO, consumer.lastPollTimeout());
- }
+ private void shouldPollWithRightTimeout(final boolean stateUpdaterEnabled)
{
+ final Properties properties = new Properties();
+ properties.put(InternalConfig.STATE_UPDATER_ENABLED,
stateUpdaterEnabled);
+ shouldPollWithRightTimeout(properties);
}
@Test
- public void shouldPollWithRightTimeoutWithStateUpdater() {
+ public void shouldPollWithRightTimeoutWithStateUpdaterDefault() {
+ final Properties properties = new Properties();
+ shouldPollWithRightTimeout(properties);
+ }
+
+ private void shouldPollWithRightTimeout(final Properties properties) {
final TaskId taskId = new TaskId(0, 0);
EasyMock.expect(storeMetadata.offset()).andReturn(null).andReturn(9L).anyTimes();
@@ -431,8 +422,6 @@ public class StoreChangelogReaderTest extends
EasyMockSupport {
consumer.updateBeginningOffsets(Collections.singletonMap(tp, 5L));
adminClient.updateEndOffsets(Collections.singletonMap(tp, 11L));
- final Properties properties = new Properties();
- properties.put(InternalConfig.STATE_UPDATER_ENABLED, true);
final StreamsConfig config = new
StreamsConfig(StreamsTestUtils.getStreamsConfig("test-reader", properties));
final StoreChangelogReader changelogReader =
@@ -445,7 +434,16 @@ public class StoreChangelogReaderTest extends
EasyMockSupport {
}
changelogReader.restore(Collections.singletonMap(taskId,
mock(Task.class)));
-
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)),
consumer.lastPollTimeout());
+ if (type == ACTIVE) {
+
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)),
consumer.lastPollTimeout());
+ } else {
+ if (!properties.containsKey(InternalConfig.STATE_UPDATER_ENABLED)
+ || (boolean)
properties.get(InternalConfig.STATE_UPDATER_ENABLED)) {
+
assertEquals(Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)),
consumer.lastPollTimeout());
+ } else {
+ assertEquals(Duration.ZERO, consumer.lastPollTimeout());
+ }
+ }
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 9cda974cb95..f9c2a0a6080 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -47,8 +47,10 @@ import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
@@ -73,7 +75,6 @@ import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.StreamThread.State;
import
org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
@@ -130,6 +131,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet;
import static
org.apache.kafka.streams.processor.internals.ClientUtils.getSharedAdminClientId;
import static
org.apache.kafka.streams.processor.internals.StateManagerUtil.CHECKPOINT_FILE_NAME;
import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
+import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
@@ -241,6 +243,13 @@ public class StreamThreadTest {
private StreamThread
createStreamThread(@SuppressWarnings("SameParameterValue") final String
clientId,
final StreamsConfig config,
final boolean eosEnabled) {
+ return createStreamThread(clientId, config, mockTime, eosEnabled);
+ }
+
+ private StreamThread
createStreamThread(@SuppressWarnings("SameParameterValue") final String
clientId,
+ final StreamsConfig config,
+ final Time time,
+ final boolean eosEnabled) {
if (eosEnabled) {
clientSupplier.setApplicationIdForProducer(APPLICATION_ID);
}
@@ -251,7 +260,7 @@ public class StreamThreadTest {
metrics,
APPLICATION_ID,
config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
- mockTime
+ time
);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
@@ -265,7 +274,7 @@ public class StreamThreadTest {
PROCESS_ID,
clientId,
streamsMetrics,
- mockTime,
+ time,
streamsMetadataState,
0,
stateDirectory,
@@ -334,7 +343,7 @@ public class StreamThreadTest {
@Test
public void shouldChangeStateAtStartClose() throws Exception {
- final StreamThread thread = createStreamThread(CLIENT_ID, config,
false);
+ final StreamThread thread = createStreamThread(CLIENT_ID, config, new
MockTime(1), false);
final StateListenerStub stateListener = new StateListenerStub();
thread.setStateListener(stateListener);
@@ -634,6 +643,7 @@ public class StreamThreadTest {
@Test
public void
shouldEnforceRebalanceWhenScheduledAndNotCurrentlyRebalancing() throws
InterruptedException {
+ final Time mockTime = new MockTime(1);
final StreamsConfig config = new StreamsConfig(configProps(false));
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
@@ -694,6 +704,7 @@ public class StreamThreadTest {
@Test
public void shouldNotEnforceRebalanceWhenCurrentlyRebalancing() throws
InterruptedException {
+ final Time mockTime = new MockTime(1);
final StreamsConfig config = new StreamsConfig(configProps(false));
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
@@ -704,6 +715,7 @@ public class StreamThreadTest {
final Consumer<byte[], byte[]> mockConsumer =
EasyMock.createNiceMock(Consumer.class);
expect(mockConsumer.poll(anyObject())).andStubReturn(ConsumerRecords.empty());
+ expect(mockConsumer.assignment()).andStubReturn(emptySet());
final ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
expect(mockConsumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
@@ -1034,6 +1046,7 @@ public class StreamThreadTest {
expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
expect(consumer.poll(anyObject())).andStubReturn(new
ConsumerRecords<>(Collections.emptyMap()));
+ expect(consumer.assignment()).andStubReturn(emptySet());
final Task task = niceMock(Task.class);
expect(task.id()).andStubReturn(task1);
expect(task.inputPartitions()).andStubReturn(Collections.singleton(t1p1));
@@ -1046,7 +1059,9 @@ public class StreamThreadTest {
final StandbyTaskCreator standbyTaskCreator =
mock(StandbyTaskCreator.class);
expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet());
- EasyMock.replay(consumer, consumerGroupMetadata, task,
activeTaskCreator, standbyTaskCreator);
+ EasyMock.replay(consumer, consumerGroupMetadata, activeTaskCreator,
standbyTaskCreator);
+
+ final StateUpdater stateUpdater = Mockito.mock(StateUpdater.class);
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID,
StreamsConfig.METRICS_LATEST, mockTime);
@@ -1064,7 +1079,7 @@ public class StreamThreadTest {
topologyMetadata,
null,
null,
- null
+ stateUpdater
) {
@Override
int commit(final Collection<Task> tasksToCommit) {
@@ -1246,7 +1261,17 @@ public class StreamThreadTest {
public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws
InterruptedException {
internalTopologyBuilder.addSource(null, "source1", null, null, null,
topic1);
- final StreamThread thread = createStreamThread(CLIENT_ID, new
StreamsConfig(configProps(true)), true);
+ final Properties props = configProps(true);
+
+ // The state updater is disabled for this test because this test
relies on the fact the mainConsumer.resume()
+ // is not called. This is not true when the state updater is enabled
which leads to
+ // java.lang.IllegalStateException: No current assignment for
partition topic1-2.
+ // Since this tests verifies an aspect that is independent from the
state updater, it is OK to disable
+ // the state updater and leave the rewriting of the test to later,
when the code path for disabled state updater
+ // is removed.
+ props.put(InternalConfig.STATE_UPDATER_ENABLED, false);
+ final StreamThread thread =
+ createStreamThread(CLIENT_ID, new StreamsConfig(props), new
MockTime(1), true);
thread.taskManager().handleRebalanceStart(Collections.singleton(topic1));
@@ -1290,7 +1315,12 @@ public class StreamThreadTest {
public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws
InterruptedException {
internalTopologyBuilder.addSource(null, "source1", null, null, null,
topic1);
- final StreamThread thread = createStreamThread(CLIENT_ID, new
StreamsConfig(configProps(true)), true);
+ final StreamThread thread = createStreamThread(
+ CLIENT_ID,
+ new StreamsConfig(configProps(true)),
+ new MockTime(1),
+ true
+ );
thread.start();
TestUtils.waitForCondition(
@@ -1353,42 +1383,68 @@ public class StreamThreadTest {
}
@Test
- public void shouldNotReturnDataAfterTaskMigrated() {
+ public void shouldNotReturnDataAfterTaskMigratedWithStateUpdaterEnabled() {
+ shouldNotReturnDataAfterTaskMigrated(true);
+ }
+
+ @Test
+ public void shouldNotReturnDataAfterTaskMigratedWithStateUpdaterDisabled()
{
+ shouldNotReturnDataAfterTaskMigrated(false);
+ }
+
+ private void shouldNotReturnDataAfterTaskMigrated(final boolean
stateUpdaterEnabled) {
final TaskManager taskManager =
EasyMock.createNiceMock(TaskManager.class);
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+ expect(taskManager.allOwnedTasks()).andStubReturn(emptyMap());
final InternalTopologyBuilder internalTopologyBuilder =
EasyMock.createNiceMock(InternalTopologyBuilder.class);
expect(internalTopologyBuilder.fullSourceTopicNames()).andReturn(Collections.singletonList(topic1)).times(2);
final MockConsumer<byte[], byte[]> consumer = new
MockConsumer<>(OffsetResetStrategy.LATEST);
+ final MockConsumer<byte[], byte[]> restoreConsumer = new
MockConsumer<>(OffsetResetStrategy.EARLIEST);
consumer.subscribe(Collections.singletonList(topic1), new
MockRebalanceListener());
consumer.rebalance(Collections.singletonList(t1p1));
consumer.updateEndOffsets(Collections.singletonMap(t1p1, 10L));
consumer.seekToEnd(Collections.singletonList(t1p1));
- final ChangelogReader changelogReader = new MockChangelogReader() {
- @Override
- public long restore(final Map<TaskId, Task> tasks) {
+ final TaskMigratedException taskMigratedException = new
TaskMigratedException(
+ "Changelog restore found task migrated", new
RuntimeException("restore task migrated"));
+ ChangelogReader changelogReader = this.changelogReader;
+ if (stateUpdaterEnabled) {
+ expect(taskManager.checkStateUpdater(anyLong(),
anyObject())).andAnswer(() -> {
consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new
byte[0], new byte[0]));
consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new
byte[1], new byte[0]));
- throw new TaskMigratedException(
- "Changelog restore found task migrated", new
RuntimeException("restore task migrated"));
- }
- };
+ throw taskMigratedException;
+ });
+ } else {
+ changelogReader = new MockChangelogReader() {
+ @Override
+ public long restore(final Map<TaskId, Task> tasks) {
+ consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new
byte[0], new byte[0]));
+ consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new
byte[1], new byte[0]));
+
+ throw taskMigratedException;
+ }
+ };
+ }
taskManager.handleLostAll();
+
EasyMock.replay(taskManager, internalTopologyBuilder);
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID,
StreamsConfig.METRICS_LATEST, mockTime);
+ final Properties props = configProps(false);
+ props.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
+ final StreamsConfig config = new StreamsConfig(props);
final StreamThread thread = new StreamThread(
- mockTime,
+ new MockTime(1),
config,
null,
consumer,
- consumer,
+ restoreConsumer,
changelogReader,
null,
taskManager,
@@ -1584,8 +1640,20 @@ public class StreamThreadTest {
}
@Test
- public void shouldReinitializeRevivedTasksInAnyState() {
- final StreamThread thread = createStreamThread(CLIENT_ID, new
StreamsConfig(configProps(false)), false);
+ public void
shouldReinitializeRevivedTasksInAnyStateWithStateUpdaterEnabled() throws
Exception {
+ shouldReinitializeRevivedTasksInAnyState(true);
+ }
+
+ @Test
+ public void
shouldReinitializeRevivedTasksInAnyStateWithStateUpdaterDisabled() throws
Exception {
+ shouldReinitializeRevivedTasksInAnyState(false);
+ }
+
+ private void shouldReinitializeRevivedTasksInAnyState(final boolean
stateUpdaterEnabled) throws Exception {
+ final Properties streamsConfigProps = configProps(false);
+ streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED,
stateUpdaterEnabled);
+ final StreamsConfig config = new StreamsConfig(streamsConfigProps);
+ final StreamThread thread = createStreamThread(CLIENT_ID, config, new
MockTime(1), false);
final String storeName = "store";
final String storeChangelog = "stream-thread-test-store-changelog";
@@ -1652,12 +1720,30 @@ public class StreamThreadTest {
thread.runOnce();
// the third actually polls, processes the record, and throws the
corruption exception
+ if (stateUpdaterEnabled) {
+ TestUtils.waitForCondition(
+ () -> thread.taskManager().checkStateUpdater(
+ mockTime.milliseconds(),
+ topicPartitions ->
mockConsumer.seekToBeginning(singleton(t1p1))
+ ),
+ 10 * 1000,
+ "State updater never returned tasks.");
+ }
addRecord(mockConsumer, 0L);
shouldThrow.set(true);
final TaskCorruptedException taskCorruptedException =
assertThrows(TaskCorruptedException.class, thread::runOnce);
// Now, we can handle the corruption
thread.taskManager().handleCorruption(taskCorruptedException.corruptedTasks());
+ if (stateUpdaterEnabled) {
+ TestUtils.waitForCondition(
+ () -> thread.taskManager().checkStateUpdater(
+ mockTime.milliseconds(),
+ topicPartitions ->
mockConsumer.seekToBeginning(singleton(t1p1))
+ ),
+ 10 * 1000,
+ "State updater never returned tasks.");
+ }
// again, complete the restoration
thread.runOnce();
@@ -1836,7 +1922,7 @@ public class StreamThreadTest {
.groupByKey().count(Materialized.as("count-one"));
internalStreamsBuilder.buildAndOptimizeTopology();
- final StreamThread thread = createStreamThread(CLIENT_ID, config,
false);
+ final StreamThread thread = createStreamThread(CLIENT_ID, config, new
MockTime(1), false);
final MockConsumer<byte[], byte[]> restoreConsumer =
clientSupplier.restoreConsumer;
restoreConsumer.updatePartitions(
STREAM_THREAD_TEST_COUNT_ONE_CHANGELOG,
@@ -1883,6 +1969,10 @@ public class StreamThreadTest {
final String storeName2 = "table-two";
final String changelogName1 = APPLICATION_ID + "-" + storeName1 +
"-changelog";
final String changelogName2 = APPLICATION_ID + "-" + storeName2 +
"-changelog";
+ final Properties props = configProps(false);
+ // Updating standby tasks on the stream thread only happens when the
state updater is disabled
+ props.put(InternalConfig.STATE_UPDATER_ENABLED, false);
+ final StreamsConfig config = new StreamsConfig(props);
final StreamThread thread = createStreamThread(CLIENT_ID, config,
false);
final MockConsumer<byte[], byte[]> restoreConsumer =
clientSupplier.restoreConsumer;
@@ -2004,6 +2094,10 @@ public class StreamThreadTest {
final String storeName2 = "table-two";
final String changelogName1 = APPLICATION_ID + "-" + storeName1 +
"-changelog";
final String changelogName2 = APPLICATION_ID + "-" + storeName2 +
"-changelog";
+ final Properties props = configProps(false);
+ // Updating standby tasks on the stream thread only happens when the
state updater is disabled
+ props.put(InternalConfig.STATE_UPDATER_ENABLED, false);
+ final StreamsConfig config = new StreamsConfig(props);
final StreamThread thread = createStreamThread(CLIENT_ID, config,
false);
final MockConsumer<byte[], byte[]> restoreConsumer =
clientSupplier.restoreConsumer;
@@ -2243,13 +2337,25 @@ public class StreamThreadTest {
}
@Test
- public void
shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore() throws
Exception {
+ public void
shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestoreWithStateUpdaterEnabled()
throws Exception {
+ shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(true);
+ }
+
+ @Test
+ public void
shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestoreWithStateUpdaterDiabled()
throws Exception {
+
shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(false);
+ }
+
+ private void
shouldRecoverFromInvalidOffsetExceptionOnRestoreAndFinishRestore(final boolean
stateUpdaterEnabled) throws Exception {
internalStreamsBuilder.stream(Collections.singleton("topic"), consumed)
.groupByKey()
.count(Materialized.as("count"));
internalStreamsBuilder.buildAndOptimizeTopology();
- final StreamThread thread = createStreamThread("clientId", config,
false);
+ final Properties props = configProps(false);
+ props.put(InternalConfig.STATE_UPDATER_ENABLED, stateUpdaterEnabled);
+ final StreamsConfig config = new StreamsConfig(props);
+ final StreamThread thread = createStreamThread("clientId", config, new
MockTime(1), false);
final MockConsumer<byte[], byte[]> mockConsumer =
(MockConsumer<byte[], byte[]>) thread.mainConsumer();
final MockConsumer<byte[], byte[]> mockRestoreConsumer =
(MockConsumer<byte[], byte[]>) thread.restoreConsumer();
final MockAdminClient mockAdminClient = (MockAdminClient)
thread.adminClient();
@@ -2276,6 +2382,8 @@ public class StreamThreadTest {
)
);
mockConsumer.updateBeginningOffsets(Collections.singletonMap(topicPartition,
0L));
+ mockConsumer.subscribe(mkSet(topicPartition.topic()));
+ mockConsumer.rebalance(Collections.singleton(topicPartition));
mockRestoreConsumer.updatePartitions(
"stream-thread-test-count-changelog",
@@ -2325,6 +2433,12 @@ public class StreamThreadTest {
}
});
+ // after handling the exception and reviving the task, with the
state updater the changelog topic is
+ // registered again with the changelog reader
+ TestUtils.waitForCondition(
+ () -> mockRestoreConsumer.assignment().size() == 1,
+ "Never get the assignment");
+
// after handling the exception and reviving the task, the position
// should be reset to the beginning.
TestUtils.waitForCondition(
@@ -2344,12 +2458,18 @@ public class StreamThreadTest {
"K2".getBytes(),
"V2".getBytes()));
- TestUtils.waitForCondition(
- () -> {
- mockRestoreConsumer.assign(changelogPartitionSet);
- return mockRestoreConsumer.position(changelogPartition) ==
2L;
- },
- "Never finished restore");
+ if (stateUpdaterEnabled) {
+ TestUtils.waitForCondition(
+ () -> mockRestoreConsumer.assignment().size() == 0,
+ "Never get the assignment");
+ } else {
+ TestUtils.waitForCondition(
+ () -> {
+ mockRestoreConsumer.assign(changelogPartitionSet);
+ return
mockRestoreConsumer.position(changelogPartition) == 2L;
+ },
+ "Never finished restore");
+ }
} finally {
thread.shutdown();
thread.join(10000);
@@ -3190,6 +3310,7 @@ public class StreamThreadTest {
@Test
public void
shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater() {
final Properties streamsConfigProps =
StreamsTestUtils.getStreamsConfig();
+ streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, false);
final StreamThread streamThread = setUpThread(streamsConfigProps);
streamThread.setState(State.STARTING);
streamThread.setState(State.PARTITIONS_ASSIGNED);
@@ -3320,7 +3441,7 @@ public class StreamThreadTest {
return null;
}
StandbyTask standbyTask(final TaskManager taskManager, final
TopicPartition partition) {
- final Stream<Task> standbys =
taskManager.allTasks().values().stream().filter(t -> !t.isActive());
+ final Stream<Task> standbys =
taskManager.standbyTaskMap().values().stream();
for (final Task task : (Iterable<Task>) standbys::iterator) {
if (task.inputPartitions().contains(partition)) {
return (StandbyTask) task;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
index 68658d80f60..1cae7db0f42 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskExecutorTest.java
@@ -72,7 +72,7 @@ public class TaskExecutorTest {
final Tasks tasks = mock(Tasks.class);
final ConsumerGroupMetadata groupMetadata =
mock(ConsumerGroupMetadata.class);
final TaskManager taskManager = mock(TaskManager.class);
-
when(taskManager.activeTaskIterable()).thenReturn(Collections.singletonList(task));
+
when(taskManager.activeRunningTaskIterable()).thenReturn(Collections.singletonList(task));
when(taskManager.consumerGroupMetadata()).thenReturn(groupMetadata);
final StreamsProducer producer = mock(StreamsProducer.class);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index d534077cf2e..a876d8ab4e1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -590,28 +590,49 @@ public class TaskManagerTest {
}
@Test
- public void
shouldAssignActiveTaskInTasksRegistryToBeRecycledWithStateUpdaterEnabled() {
- final StreamTask activeTaskToRecycle = statefulTask(taskId03,
taskId03ChangelogPartitions)
- .inState(State.SUSPENDED)
- .withInputPartitions(taskId03Partitions).build();
- final StandbyTask recycledStandbyTask = standbyTask(taskId03,
taskId03ChangelogPartitions)
- .inState(State.CREATED)
- .withInputPartitions(taskId03Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ public void
shouldAddRecycledStandbyTaskfromActiveToPendingTasksToInitWithStateUpdaterEnabled()
{
+ final StreamTask activeTaskToRecycle = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING).build();
+ final StandbyTask standbyTask = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.CREATED).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.allTasks()).thenReturn(mkSet(activeTaskToRecycle));
+
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle,
taskId01Partitions))
+ .thenReturn(standbyTask);
final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle,
activeTaskToRecycle.inputPartitions()))
- .thenReturn(recycledStandbyTask);
- taskManager.handleAssignment(
- Collections.emptyMap(),
- mkMap(mkEntry(activeTaskToRecycle.id(),
activeTaskToRecycle.inputPartitions()))
- );
+ taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01,
taskId01Partitions)));
+ Mockito.verify(activeTaskToRecycle).prepareCommit();
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
+ Mockito.verify(tasks).addPendingTasksToInit(mkSet(standbyTask));
+ Mockito.verify(tasks).removeTask(activeTaskToRecycle);
Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
+ Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
+ }
+
+ @Test
+ public void
shouldAddRecycledStandbyTaskfromActiveToTaskRegistryWithStateUpdaterDisabled() {
+ final StreamTask activeTaskToRecycle = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.RUNNING).build();
+ final StandbyTask standbyTask = standbyTask(taskId01,
taskId01ChangelogPartitions)
+ .withInputPartitions(taskId01Partitions)
+ .inState(State.CREATED).build();
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.allTasks()).thenReturn(mkSet(activeTaskToRecycle));
+
when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle,
taskId01Partitions))
+ .thenReturn(standbyTask);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
+
+ taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01,
taskId01Partitions)));
+
Mockito.verify(activeTaskToRecycle).prepareCommit();
- Mockito.verify(tasks).replaceActiveWithStandby(recycledStandbyTask);
+
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(activeTaskToRecycle.id());
+ Mockito.verify(tasks).replaceActiveWithStandby(standbyTask);
+ Mockito.verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
}