This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3202459394 KAFKA-13877: Fix flakiness in RackAwarenessIntegrationTest
(#12468)
3202459394 is described below
commit 32024593947f8bb497f2f5d392e0d1d892a16ff3
Author: Guozhang Wang <[email protected]>
AuthorDate: Wed Aug 3 09:17:38 2022 -0700
KAFKA-13877: Fix flakiness in RackAwarenessIntegrationTest (#12468)
In the current test, we check for tag distribution immediately after
everyone is on the running state, however due to the fact of the follow-up
rebalances, "everyone is now in running state" does not mean that the cluster
is now stable. In fact, a follow-up rebalance may occur, upon which the local
thread metadata would return empty which would cause the distribution verifier
to fail.
Reviewers: Divij Vaidya <[email protected]>, Luke Chen <[email protected]>
---
.../kafka/streams/processor/internals/StreamTask.java | 2 +-
.../streams/integration/RackAwarenessIntegrationTest.java | 14 +++++++-------
2 files changed, 8 insertions(+), 8 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 07c4494225..f7bf8a5e74 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -1267,7 +1267,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
final SourceNode<?, ?> source = topology.source(partition.topic());
if (source == null) {
throw new TopologyException(
- "Topic is unknown to the topology. " +
+ "Topic " + partition.topic() + " is unknown to the
topology. " +
"This may happen if different KafkaStreams
instances of the same application execute different Topologies. " +
"Note that Topologies are only identical if
all operators are added in the same order."
);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
index 677633e9b0..7c93b769f5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
@@ -55,8 +55,8 @@ import java.util.stream.Stream;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600)
@Tag("integration")
@@ -143,13 +143,13 @@ public class RackAwarenessIntegrationTest {
createAndStart(clientTags2, clientTagKeys, numberOfStandbyReplicas);
waitUntilAllKafkaStreamsClientsAreRunning();
- assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys));
+ waitForCondition(() ->
isIdealTaskDistributionReachedForTags(clientTagKeys), "not all tags are evenly
distributed");
stopKafkaStreamsInstanceWithIndex(0);
waitUntilAllKafkaStreamsClientsAreRunning();
- assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys));
+ waitForCondition(() ->
isIdealTaskDistributionReachedForTags(clientTagKeys), "not all tags are evenly
distributed");
}
@Test
@@ -165,7 +165,7 @@ public class RackAwarenessIntegrationTest {
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B,
TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER),
numberOfStandbyReplicas);
waitUntilAllKafkaStreamsClientsAreRunning();
-
assertTrue(isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)));
+ waitForCondition(() ->
isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)), "not all tags
are evenly distributed");
}
@Test
@@ -186,7 +186,7 @@ public class RackAwarenessIntegrationTest {
createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C,
TAG_VALUE_K8_CLUSTER_3), asList(TAG_ZONE, TAG_CLUSTER),
numberOfStandbyReplicas);
waitUntilAllKafkaStreamsClientsAreRunning();
- assertTrue(isIdealTaskDistributionReachedForTags(asList(TAG_ZONE,
TAG_CLUSTER)));
+ waitForCondition(() ->
isIdealTaskDistributionReachedForTags(asList(TAG_ZONE, TAG_CLUSTER)), "not all
tags are evenly distributed");
}
@Test
@@ -204,8 +204,8 @@ public class RackAwarenessIntegrationTest {
waitUntilAllKafkaStreamsClientsAreRunning();
-
assertTrue(isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)));
-
assertTrue(isPartialTaskDistributionReachedForTags(singletonList(TAG_CLUSTER)));
+ waitForCondition(() ->
isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)), "not all tags
are evenly distributed");
+ waitForCondition(() ->
isPartialTaskDistributionReachedForTags(singletonList(TAG_CLUSTER)), "not all
tags are evenly distributed");
}
private void stopKafkaStreamsInstanceWithIndex(final int index) {