This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 3202459394 KAFKA-13877: Fix flakiness in RackAwarenessIntegrationTest (#12468) 3202459394 is described below commit 32024593947f8bb497f2f5d392e0d1d892a16ff3 Author: Guozhang Wang <wangg...@gmail.com> AuthorDate: Wed Aug 3 09:17:38 2022 -0700 KAFKA-13877: Fix flakiness in RackAwarenessIntegrationTest (#12468) In the current test, we check for tag distribution immediately after everyone is on the running state, however due to the fact of the follow-up rebalances, "everyone is now in running state" does not mean that the cluster is now stable. In fact, a follow-up rebalance may occur, upon which the local thread metadata would return empty which would cause the distribution verifier to fail. Reviewers: Divij Vaidya <di...@amazon.com>, Luke Chen <show...@gmail.com> --- .../kafka/streams/processor/internals/StreamTask.java | 2 +- .../streams/integration/RackAwarenessIntegrationTest.java | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 07c4494225..f7bf8a5e74 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -1267,7 +1267,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, final SourceNode<?, ?> source = topology.source(partition.topic()); if (source == null) { throw new TopologyException( - "Topic is unknown to the topology. " + + "Topic " + partition.topic() + " is unknown to the topology. " + "This may happen if different KafkaStreams instances of the same application execute different Topologies. " + "Note that Topologies are only identical if all operators are added in the same order." ); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java index 677633e9b0..7c93b769f5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java @@ -55,8 +55,8 @@ import java.util.stream.Stream; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(600) @Tag("integration") @@ -143,13 +143,13 @@ public class RackAwarenessIntegrationTest { createAndStart(clientTags2, clientTagKeys, numberOfStandbyReplicas); waitUntilAllKafkaStreamsClientsAreRunning(); - assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys)); + waitForCondition(() -> isIdealTaskDistributionReachedForTags(clientTagKeys), "not all tags are evenly distributed"); stopKafkaStreamsInstanceWithIndex(0); waitUntilAllKafkaStreamsClientsAreRunning(); - assertTrue(isIdealTaskDistributionReachedForTags(clientTagKeys)); + waitForCondition(() -> isIdealTaskDistributionReachedForTags(clientTagKeys), "not all tags are evenly distributed"); } @Test @@ -165,7 +165,7 @@ public class RackAwarenessIntegrationTest { createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); waitUntilAllKafkaStreamsClientsAreRunning(); - assertTrue(isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE))); + waitForCondition(() -> isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)), "not all tags are evenly distributed"); } @Test @@ -186,7 +186,7 @@ public class RackAwarenessIntegrationTest { createAndStart(buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_3), asList(TAG_ZONE, TAG_CLUSTER), numberOfStandbyReplicas); waitUntilAllKafkaStreamsClientsAreRunning(); - assertTrue(isIdealTaskDistributionReachedForTags(asList(TAG_ZONE, TAG_CLUSTER))); + waitForCondition(() -> isIdealTaskDistributionReachedForTags(asList(TAG_ZONE, TAG_CLUSTER)), "not all tags are evenly distributed"); } @Test @@ -204,8 +204,8 @@ public class RackAwarenessIntegrationTest { waitUntilAllKafkaStreamsClientsAreRunning(); - assertTrue(isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE))); - assertTrue(isPartialTaskDistributionReachedForTags(singletonList(TAG_CLUSTER))); + waitForCondition(() -> isIdealTaskDistributionReachedForTags(singletonList(TAG_ZONE)), "not all tags are evenly distributed"); + waitForCondition(() -> isPartialTaskDistributionReachedForTags(singletonList(TAG_CLUSTER)), "not all tags are evenly distributed"); } private void stopKafkaStreamsInstanceWithIndex(final int index) {