This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 03e9e301694 KAFKA-20372: Fix flaky test in 
NamedTopologyIntegrationTest (#21968)
03e9e301694 is described below

commit 03e9e301694f75991fd80cb32d21d7d5b81879d7
Author: ChickenchickenLove <[email protected]>
AuthorDate: Wed May 6 04:04:39 2026 +0900

    KAFKA-20372: Fix flaky test in NamedTopologyIntegrationTest (#21968)
    
    This PR fixes a flaky test in `NamedTopologyIntegrationTest`:
    
    
`shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning`.
    
    The failure is caused by a race in the task lifecycle during named
    topology removal.
    
    In some runs, local tasks for the topology move to active/running
    quickly.  In other runs, some tasks can remain in a transitional state
    for a short time, such as state update / restore, before becoming fully
    active.
    
    The test currently removes and cleans up the topology without waiting
    for this lifecycle to settle. Because of that,
    `removeNamedTopology(...)` may run while some local tasks are still
    transitioning, and `cleanUpNamedTopology(...)` may follow before those
    tasks are fully gone. This makes the subsequent re-add flow
    non-deterministic and leads to intermittent failures.
    
    To make the test deterministic, this PR adds explicit waits around the
    remove / cleanup sequence:
    - wait until all local tasks for the topology are running before calling
    `removeNamedTopology(...)`
    - wait until no local tasks remain for the topology before calling
    `cleanUpNamedTopology(...)`
    
    This keeps the test semantics unchanged and only stabilizes task
    lifecycle timing in the test.
    
    ## Test result in my local
    - Before: 12/17
    - After: 89/89
    
    ## Considered Alternatives
    1. Send records to partition 1 as well
    This would make it more likely that both partition 0 and partition 1
    tasks are initialized before removal.
    
    However, this changes the test input and can also introduce additional
    rebalance and task-transition timing effects, since partition 1
    processing becomes part of the exercised path. In other words, it may
    reduce one symptom while introducing a different concurrency surface
    related to task initialization and reassignment.
    
    More importantly, the root cause of the flake is that the test does not
    wait for task lifecycle transitions to settle before calling
    `removeNamedTopology(...)` and `cleanUpNamedTopology(...)`. Adding more
    input does not address that directly.
    
    2. Reduce the input topic partition count from 2 to 1 This would avoid
    the multi-partition timing issue by construction.
    However, this also weakens the coverage of the test by removing the
    multi-partition setup entirely. Since the production code supports
    multiple partitions, reducing the topic to a single partition would hide
    the race instead of synchronizing with the intended lifecycle of the
    test.
    
    Reviewers: Lianet Magrans <[email protected]>, Chia-Yi Chiu
     <[email protected]>
    
    ---------
    
    Co-authored-by: Lianet Magrans <[email protected]>
---
 .../integration/NamedTopologyIntegrationTest.java  | 16 +++++++++++
 .../streams/processor/internals/StreamThread.java  | 11 ++++++++
 .../streams/processor/internals/TaskManager.java   | 32 ++++++++++++++++++++++
 .../KafkaStreamsNamedTopologyWrapper.java          | 14 ++++++++++
 4 files changed, 73 insertions(+)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
index 8186a5b3049..b46bb036e47 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
@@ -657,7 +657,23 @@ public class NamedTopologyIntegrationTest {
 
         assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
         assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
+
+        TestUtils.waitForCondition(
+                () -> streams.areAllLocalTasksRunningForTopology(TOPOLOGY_1),
+                () -> "Not all local tasks for topology " + TOPOLOGY_1
+                        + " are initialized and in RUNNING state before 
remove. "
+                        + "streamsState=" + streams.state()
+                        + ", localThreads=" + streams.metadataForLocalThreads()
+        );
         streams.removeNamedTopology(TOPOLOGY_1, true).all().get();
+        
+        TestUtils.waitForCondition(
+                () -> !streams.hasAnyLocalTaskForTopology(TOPOLOGY_1),
+                () -> "Topology " + TOPOLOGY_1
+                        + " still has local tasks after remove. "
+                        + "streamsState=" + streams.state()
+                        + ", localThreads=" + streams.metadataForLocalThreads()
+        );
         streams.cleanUpNamedTopology(TOPOLOGY_1);
 
         CLUSTER.getAllTopicsInCluster().stream().filter(t -> 
t.contains("-changelog") || t.contains("-repartition")).forEach(t -> {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f9a9ba0d5e2..882b427d1ca 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -2083,6 +2083,17 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         return taskManager;
     }
 
+    // VisibleForTesting
+    public boolean hasAnyTaskForTopology(final String topologyName) {
+        return taskManager.hasAnyTaskForTopology(topologyName);
+    }
+
+
+    // VisibleForTesting
+    public boolean areAllTasksRunningForTopology(final String topologyName) {
+        return taskManager.areAllTasksRunningForTopology(topologyName);
+    }
+    
     int currentNumIterations() {
         return numIterations;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 6834976756d..e8a3442ec13 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -1631,6 +1631,38 @@ public class TaskManager {
         return ret;
     }
 
+    // VisibleForTesting
+    boolean hasAnyTaskForTopology(final String topologyName) {
+        return allTasks().keySet().stream().anyMatch(taskId -> 
topologyName.equals(taskId.topologyName()));
+    }
+
+    /**
+     * Returns {@code true} if every task for the given topology is 
initialized and in
+     * {@link State#RUNNING}.
+     *
+     * <p>If there are no tasks for the given topology, this method returns 
{@code true}.
+     *
+     * @param topologyName the topology name
+     * @return {@code true} if all matching tasks are initialized and in 
{@link State#RUNNING},
+     *         or if there are no matching tasks; {@code false} otherwise
+     */
+    // VisibleForTesting
+    boolean areAllTasksRunningForTopology(final String topologyName) {
+        final Map<TaskId, Task> allTasks = allTasks();
+        final Set<TaskId> initializedTaskIds = tasks.allInitializedTaskIds();
+
+        for (final Map.Entry<TaskId, Task> entry : allTasks.entrySet()) {
+            final TaskId taskId = entry.getKey();
+            if (topologyName.equals(taskId.topologyName())) {
+                if (!initializedTaskIds.contains(taskId) || 
entry.getValue().state() != State.RUNNING) {
+                    return false;
+                }
+            }
+        }
+
+        return true;
+    }
+    
     /**
      * Returns tasks owned by the stream thread.
      * This does not return any tasks currently owned by the state updater.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
index 5acc2f424a9..655a7d46565 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
@@ -443,4 +443,18 @@ public class KafkaStreamsNamedTopologyWrapper extends 
KafkaStreams {
                 .collect(Collectors.toList())));
         return allLocalStorePartitionLags(allTopologyTasks);
     }
+
+    // VisibleForTesting
+    public boolean hasAnyLocalTaskForTopology(final String topologyName) {
+        synchronized (threads) {
+            return threads.stream().anyMatch(thread -> 
thread.hasAnyTaskForTopology(topologyName));
+        }
+    }
+    
+    // VisibleForTesting
+    public boolean areAllLocalTasksRunningForTopology(final String 
topologyName) {
+        synchronized (threads) {
+            return threads.stream().allMatch(thread -> 
thread.areAllTasksRunningForTopology(topologyName));
+        }
+    }
 }

Reply via email to