This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new b3eed510235 KAFKA-19660: JoinWithIncompleteMetadataIntegrationTest fails in isolated run of one parameter (#20483) b3eed510235 is described below commit b3eed510235d49a0cc8eb322cdfb8486dfc16748 Author: Jinhe Zhang <jhz.tra...@outlook.com> AuthorDate: Mon Sep 8 11:02:11 2025 -0400 KAFKA-19660: JoinWithIncompleteMetadataIntegrationTest fails in isolated run of one parameter (#20483) The original test timeout when using new protocol, because it use `ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG` as the exception's timeout, which is 300s. Also the test for new protocol and old protocol use the same group ID, so the failure will be hidden. What I do: 1. Set the timeout as 5 secs so it can be captured within 10s 2. Use new appId for new protocol Reviewers: Lucas Brutschy <lucas...@apache.org> --- .../integration/JoinWithIncompleteMetadataIntegrationTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java index 04f35dcfce4..291401c82fd 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java @@ -57,6 +57,7 @@ public class JoinWithIncompleteMetadataIntegrationTest { STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL); + STREAMS_CONFIG.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), MISSING_TOPIC_DETECTION_TIMEOUT_MS); } @AfterAll @@ -66,6 +67,7 @@ public class JoinWithIncompleteMetadataIntegrationTest { private static final String APP_ID = "join-incomplete-metadata-integration-test"; private static final Long COMMIT_INTERVAL = 100L; + private static final int MISSING_TOPIC_DETECTION_TIMEOUT_MS = 5000; static final Properties STREAMS_CONFIG = new Properties(); static final String INPUT_TOPIC_RIGHT = "inputTopicRight"; static final String NON_EXISTENT_INPUT_TOPIC_LEFT = "inputTopicLeft-not-exist"; @@ -93,7 +95,8 @@ public class JoinWithIncompleteMetadataIntegrationTest { @ParameterizedTest @ValueSource(booleans = {false, true}) public void testShouldAutoShutdownOnJoinWithIncompleteMetadata(final boolean useNewProtocol) throws InterruptedException { - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID); + final String appId = APP_ID + "-" + (useNewProtocol ? "new" : "old"); + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); if (useNewProtocol) {