This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a696b4d6f49 KAFKA-17978: Fix invalid topology on Task assignment
(#17778)
a696b4d6f49 is described below
commit a696b4d6f4917ccd942a7ba21ef660f351e47b01
Author: Nick Telford <[email protected]>
AuthorDate: Tue Nov 12 16:18:42 2024 +0000
KAFKA-17978: Fix invalid topology on Task assignment (#17778)
When we introduced "startup tasks" in #16922, we initialized them with
no input partitions, because they aren't known until assignment.
However, when we update them during assignment, it's possible that we
update the topology with the incorrect source topics for some internal
topics, due to a difference in the way internal topics are handled for
StandbyTasks.
To resolve this, we now initialize startup tasks with the correct input
partitions, by calculating them from the Topology.
When we assign our startup tasks, we now conditionally update their
input partitions only if they've actually changed, just as we do for
regular StandbyTasks.
With this, the E2E tests now pass, as expected.
Reviewer: Bruno Cadonna <[email protected]>
---
.../kafka/streams/processor/internals/ProcessorStateManager.java | 4 ++--
.../apache/kafka/streams/processor/internals/StateDirectory.java | 9 ++++++++-
.../apache/kafka/streams/processor/internals/TaskManager.java | 2 +-
3 files changed, 11 insertions(+), 4 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index f4702c469f8..30334abc53e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -44,7 +44,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -233,8 +232,9 @@ public class ProcessorStateManager implements StateManager {
final
LogContext logContext,
final
StateDirectory stateDirectory,
final
Map<String, String> storeToChangelogTopic,
+ final
Set<TopicPartition> sourcePartitions,
final boolean
stateUpdaterEnabled) {
- return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled,
logContext, stateDirectory, null, storeToChangelogTopic, new HashSet<>(0),
stateUpdaterEnabled);
+ return new ProcessorStateManager(taskId, TaskType.STANDBY, eosEnabled,
logContext, stateDirectory, null, storeToChangelogTopic, sourcePartitions,
stateUpdaterEnabled);
}
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 1423fb934e7..04a62bad1bf 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -48,6 +49,7 @@ import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -215,12 +217,17 @@ public class StateDirectory implements AutoCloseable {
// because it's possible that the topology has changed since
that data was written, and is now stateless
// this therefore prevents us from creating unnecessary Tasks
just because of some left-over state
if (subTopology.hasStateWithChangelogs()) {
+ final Set<TopicPartition> inputPartitions =
topologyMetadata.nodeToSourceTopics(id).values().stream()
+ .flatMap(Collection::stream)
+ .map(t -> new TopicPartition(t, id.partition()))
+ .collect(Collectors.toSet());
final ProcessorStateManager stateManager =
ProcessorStateManager.createStartupTaskStateManager(
id,
eosEnabled,
logContext,
this,
subTopology.storeToChangelogTopic(),
+ inputPartitions,
stateUpdaterEnabled
);
@@ -234,7 +241,7 @@ public class StateDirectory implements AutoCloseable {
final Task task = new StandbyTask(
id,
- new HashSet<>(),
+ inputPartitions,
subTopology,
topologyMetadata.taskConfig(id),
streamsMetrics,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 1ca98aeba5d..8a6e27b4c99 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -337,7 +337,7 @@ public class TaskManager {
// replace our dummy values with the real ones, now we
know our thread and assignment
final Set<TopicPartition> inputPartitions =
entry.getValue();
task.stateManager().assignToStreamThread(new
LogContext(threadLogPrefix), changelogReader, inputPartitions);
- task.updateInputPartitions(inputPartitions,
topologyMetadata.nodeToSourceTopics(taskId));
+ updateInputPartitionsOfStandbyTaskIfTheyChanged(task,
inputPartitions);
assignedTasks.put(task, inputPartitions);
}