This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch 3.7 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push: new 6e998cffdd3 MINOR: Add junit properties to display parameterized test names (#14983) 6e998cffdd3 is described below commit 6e998cffdd33e343945877ccee1fec8337c7d57d Author: Bruno Cadonna <cado...@apache.org> AuthorDate: Wed Dec 13 09:42:18 2023 +0100 MINOR: Add junit properties to display parameterized test names (#14983) In many parameterized tests, the display name is broken. Example - testMetadataFetch appears as [1] true, [2] false link This is because the constant in @ParameterizedTest String DEFAULT_DISPLAY_NAME = "[{index}] {argumentsWithNames}"; This PR adds a new junit-platform.properties which overrides to add a {displayName} which shows the the display name of the method For existing tests which override the name, should work as is. The precedence rules are explained name attribute in @ParameterizedTest, if present value of the junit.jupiter.params.displayname.default configuration parameter, if present DEFAULT_DISPLAY_NAME constant defined in @ParameterizedTest Source: https://junit.org/junit5/docs/current/user-guide/#writing-tests-parameterized-tests-display-names Sample test run output Before: [1] true link After: testMetadataExpiry(boolean).false link This commit is an extension of https://github.com/apache/kafka/commit/bdf6d46b41bba22d5a9adf6eb1c87e46d8b9c095 which needed to reverted due to introduces test failures. Reviewers: David Jacot <dja...@confluent.io>, Lucas Brutschy <lbruts...@confluent.io> --- .../src/test/resources/junit-platform.properties | 15 ++++ .../src/test/resources/junit-platform.properties | 15 ++++ .../org/apache/kafka/streams/KafkaStreamsTest.java | 6 +- .../integration/AbstractResetIntegrationTest.java | 8 +- .../integration/AdjustStreamThreadCountTest.java | 2 +- .../ConsistencyVectorIntegrationTest.java | 8 +- .../integration/EmitOnChangeIntegrationTest.java | 2 +- .../GlobalKTableEOSIntegrationTest.java | 7 +- .../integration/GlobalKTableIntegrationTest.java | 7 +- .../integration/GlobalThreadShutDownOrderTest.java | 2 +- ...ighAvailabilityTaskAssignorIntegrationTest.java | 2 +- .../streams/integration/IQv2IntegrationTest.java | 11 +-- .../JoinGracePeriodDurabilityIntegrationTest.java | 2 +- .../KStreamAggregationDedupIntegrationTest.java | 9 +- .../KStreamAggregationIntegrationTest.java | 11 ++- .../integration/KStreamKStreamIntegrationTest.java | 2 +- .../KStreamRepartitionIntegrationTest.java | 4 +- ...yInnerJoinCustomPartitionerIntegrationTest.java | 2 +- .../KTableKTableForeignKeyJoinDistributedTest.java | 8 +- ...reignKeyJoinMaterializationIntegrationTest.java | 2 - .../KTableSourceTopicRestartIntegrationTest.java | 5 +- .../KafkaStreamsCloseOptionsIntegrationTest.java | 11 +-- .../integration/LagFetchIntegrationTest.java | 2 +- .../integration/MetricsIntegrationTest.java | 2 +- .../MetricsReporterIntegrationTest.java | 2 +- .../integration/NamedTopologyIntegrationTest.java | 2 +- .../OptimizedKTableIntegrationTest.java | 8 +- .../integration/PauseResumeIntegrationTest.java | 2 +- .../integration/QueryableStateIntegrationTest.java | 13 ++- .../integration/RegexSourceIntegrationTest.java | 2 +- .../streams/integration/ResetIntegrationTest.java | 18 ++-- .../ResetPartitionTimeIntegrationTest.java | 2 +- .../integration/RestoreIntegrationTest.java | 2 +- .../integration/RocksDBMetricsIntegrationTest.java | 2 +- .../SelfJoinUpgradeIntegrationTest.java | 97 ++++++++++++++-------- .../SlidingWindowedKStreamIntegrationTest.java | 6 +- .../StandbyTaskCreationIntegrationTest.java | 9 +- .../integration/StandbyTaskEOSIntegrationTest.java | 2 +- .../integration/StateDirectoryIntegrationTest.java | 4 +- .../integration/StoreQueryIntegrationTest.java | 2 +- .../integration/StoreUpgradeIntegrationTest.java | 4 +- ...bleJoinTopologyOptimizationIntegrationTest.java | 4 +- ...amsUncaughtExceptionHandlerIntegrationTest.java | 2 +- .../SuppressionDurabilityIntegrationTest.java | 2 +- .../integration/TaskAssignorIntegrationTest.java | 2 +- .../integration/TaskMetadataIntegrationTest.java | 2 +- .../TimeWindowedKStreamIntegrationTest.java | 6 +- .../VersionedKeyValueStoreIntegrationTest.java | 9 +- .../integration/utils/IntegrationTestUtils.java | 38 +++++---- .../KTableKTableForeignKeyJoinScenarioTest.java | 2 +- ...HandlingSourceTopicDeletionIntegrationTest.java | 2 +- 51 files changed, 226 insertions(+), 165 deletions(-) diff --git a/clients/src/test/resources/junit-platform.properties b/clients/src/test/resources/junit-platform.properties new file mode 100644 index 00000000000..05069923a7f --- /dev/null +++ b/clients/src/test/resources/junit-platform.properties @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +junit.jupiter.params.displayname.default = "{displayName}.{argumentsWithNames}" diff --git a/server-common/src/test/resources/junit-platform.properties b/server-common/src/test/resources/junit-platform.properties new file mode 100644 index 00000000000..05069923a7f --- /dev/null +++ b/server-common/src/test/resources/junit-platform.properties @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +junit.jupiter.params.displayname.default = "{displayName}.{argumentsWithNames}" diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index e03d562ed1a..f5a8dfdece4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -1175,7 +1175,7 @@ public class KafkaStreamsTest { @Test public void statelessTopologyShouldNotCreateStateDirectory() { - final String safeTestName = safeUniqueTestName(getClass(), testName); + final String safeTestName = safeUniqueTestName(testName); final String inputTopic = safeTestName + "-input"; final String outputTopic = safeTestName + "-output"; final Topology topology = new Topology(); @@ -1201,7 +1201,7 @@ public class KafkaStreamsTest { @Test public void inMemoryStatefulTopologyShouldNotCreateStateDirectory() { - final String safeTestName = safeUniqueTestName(getClass(), testName); + final String safeTestName = safeUniqueTestName(testName); final String inputTopic = safeTestName + "-input"; final String outputTopic = safeTestName + "-output"; final String globalTopicName = safeTestName + "-global"; @@ -1213,7 +1213,7 @@ public class KafkaStreamsTest { @Test public void statefulTopologyShouldCreateStateDirectory() { - final String safeTestName = safeUniqueTestName(getClass(), testName); + final String safeTestName = safeUniqueTestName(testName); final String inputTopic = safeTestName + "-input"; final String outputTopic = safeTestName + "-output"; final String globalTopicName = safeTestName + "-global"; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java index 05b119da064..fe41e6c1e82 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java @@ -164,7 +164,7 @@ public abstract class AbstractResetIntegrationTest { protected static final int TIMEOUT_MULTIPLIER = 15; void prepareTest() throws Exception { - final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + final String appID = IntegrationTestUtils.safeUniqueTestName(testName); prepareConfigs(appID); prepareEnvironment(); @@ -205,7 +205,7 @@ public abstract class AbstractResetIntegrationTest { @Test public void testResetWhenInternalTopicsAreSpecified() throws Exception { - final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + final String appID = IntegrationTestUtils.safeUniqueTestName(testName); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN @@ -233,7 +233,7 @@ public abstract class AbstractResetIntegrationTest { @Test public void testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws Exception { - final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + final String appID = IntegrationTestUtils.safeUniqueTestName(testName); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN @@ -278,7 +278,7 @@ public abstract class AbstractResetIntegrationTest { cluster.createTopic(INTERMEDIATE_USER_TOPIC); } - final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + final String appID = IntegrationTestUtils.safeUniqueTestName(testName); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java index a74af0a25ac..9cee7618494 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java @@ -100,7 +100,7 @@ public class AdjustStreamThreadCountTest { @BeforeEach public void setup(final TestInfo testInfo) { - final String testId = safeUniqueTestName(getClass(), testInfo); + final String testId = safeUniqueTestName(testInfo); appId = "appId_" + testId; inputTopic = "input" + testId; IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java index dc720559a9b..3e809467b08 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java @@ -115,8 +115,9 @@ public class ConsistencyVectorIntegrationTest { .toStream() .peek((k, v) -> semaphore.release()); - final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration()); - final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration()); + final String safeTestName = safeUniqueTestName(testName); + final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration(safeTestName)); + final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration(safeTestName)); final List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2); try { @@ -209,8 +210,7 @@ public class ConsistencyVectorIntegrationTest { ); } - private Properties streamsConfiguration() { - final String safeTestName = safeUniqueTestName(getClass(), testName); + private Properties streamsConfiguration(final String safeTestName) { final Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port)); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java index 876c5ed0190..00dd2d436d5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java @@ -72,7 +72,7 @@ public class EmitOnChangeIntegrationTest { @BeforeEach public void setup(final TestInfo testInfo) { - final String testId = safeUniqueTestName(getClass(), testInfo); + final String testId = safeUniqueTestName(testInfo); appId = "appId_" + testId; inputTopic = "input" + testId; inputTopic2 = "input2" + testId; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index c6ef6c72694..7b2c8674083 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -130,9 +130,9 @@ public class GlobalKTableEOSIntegrationTest { @Before public void before() throws Exception { builder = new StreamsBuilder(); - createTopics(); + final String safeTestName = safeUniqueTestName(testName); + createTopics(safeTestName); streamsConfiguration = new Properties(); - final String safeTestName = safeUniqueTestName(getClass(), testName); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); @@ -464,8 +464,7 @@ public class GlobalKTableEOSIntegrationTest { ); } - private void createTopics() throws Exception { - final String safeTestName = safeUniqueTestName(getClass(), testName); + private void createTopics(final String safeTestName) throws Exception { streamTopic = "stream-" + safeTestName; globalTableTopic = "globalTable-" + safeTestName; CLUSTER.createTopics(streamTopic); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index e184b4d6a15..6bf28c52d6a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -100,9 +100,9 @@ public class GlobalKTableIntegrationTest { @BeforeEach public void before(final TestInfo testInfo) throws Exception { builder = new StreamsBuilder(); - createTopics(testInfo); + final String safeTestName = safeUniqueTestName(testInfo); + createTopics(safeTestName); streamsConfiguration = new Properties(); - final String safeTestName = safeUniqueTestName(getClass(), testInfo); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -341,8 +341,7 @@ public class GlobalKTableIntegrationTest { kafkaStreams.close(); } - private void createTopics(final TestInfo testInfo) throws Exception { - final String safeTestName = safeUniqueTestName(getClass(), testInfo); + private void createTopics(final String safeTestName) throws Exception { streamTopic = "stream-" + safeTestName; globalTableTopic = "globalTable-" + safeTestName; CLUSTER.createTopics(streamTopic); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java index 06314adbd40..18db670d90d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java @@ -110,7 +110,7 @@ public class GlobalThreadShutDownOrderTest { builder = new StreamsBuilder(); createTopics(); streamsConfiguration = new Properties(); - final String safeTestName = safeUniqueTestName(getClass(), testInfo); + final String safeTestName = safeUniqueTestName(testInfo); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java index 7de06e2c273..7d512c14f57 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java @@ -134,7 +134,7 @@ public class HighAvailabilityTaskAssignorIntegrationTest { final TestInfo testInfo, final String rackAwareStrategy) throws InterruptedException { // Replace "balance_subtopology" with shorter name since max name length is 249 - final String testId = safeUniqueTestName(getClass(), testInfo).replaceAll("balance_subtopology", "balance"); + final String testId = safeUniqueTestName(testInfo).replaceAll("balance_subtopology", "balance"); final String appId = "appId_" + System.currentTimeMillis() + "_" + testId; final String inputTopic = "input" + testId; final Set<TopicPartition> inputTopicPartitions = mkSet( diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java index 455ee976e0e..becbe301ecb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java @@ -159,7 +159,9 @@ public class IQv2IntegrationTest { Materialized.as(STORE_NAME) ); - kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration(testInfo)); + + final String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo); + kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration(safeTestName)); kafkaStreams.cleanUp(); } @@ -420,7 +422,8 @@ public class IQv2IntegrationTest { // Discard the basic streams and replace with test-specific topology kafkaStreams.close(); - kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration(testInfo)); + final String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo); + kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration(safeTestName)); kafkaStreams.cleanUp(); kafkaStreams.start(); @@ -438,9 +441,7 @@ public class IQv2IntegrationTest { } - private Properties streamsConfiguration(final TestInfo testInfo) { - final String safeTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), testInfo); - + private Properties streamsConfiguration(final String safeTestName) { final Properties config = new Properties(); config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java index b4861882cab..6fa378cb4a7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java @@ -118,7 +118,7 @@ public class JoinGracePeriodDurabilityIntegrationTest { @Test @SuppressWarnings("deprecation") public void shouldRecoverBufferAfterShutdown() { - final String testId = safeUniqueTestName(getClass(), testName); + final String testId = safeUniqueTestName(testName); final String appId = "appId_" + testId; final String streamInput = "Streaminput" + testId; final String tableInput = "Tableinput" + testId; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index cf1732b16be..887bf416da3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -96,9 +96,9 @@ public class KStreamAggregationDedupIntegrationTest { @BeforeEach public void before(final TestInfo testInfo) throws InterruptedException { builder = new StreamsBuilder(); - createTopics(testInfo); + final String safeTestName = safeUniqueTestName(testInfo); + createTopics(safeTestName); streamsConfiguration = new Properties(); - final String safeTestName = safeUniqueTestName(getClass(), testInfo); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -234,8 +234,7 @@ public class KStreamAggregationDedupIntegrationTest { } - private void createTopics(final TestInfo testInfo) throws InterruptedException { - final String safeTestName = safeUniqueTestName(getClass(), testInfo); + private void createTopics(final String safeTestName) throws InterruptedException { streamOneInput = "stream-one-" + safeTestName; outputTopic = "output-" + safeTestName; CLUSTER.createTopic(streamOneInput, 3, 1); @@ -254,7 +253,7 @@ public class KStreamAggregationDedupIntegrationTest { final TestInfo testInfo) throws Exception { - final String safeTestName = safeUniqueTestName(getClass(), testInfo); + final String safeTestName = safeUniqueTestName(testInfo); final Properties consumerProperties = new Properties(); consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 27f09293872..03877c68595 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -132,9 +132,9 @@ public class KStreamAggregationIntegrationTest { @BeforeEach public void before(final TestInfo testInfo) throws InterruptedException { builder = new StreamsBuilder(); - createTopics(testInfo); + final String safeTestName = safeUniqueTestName(testInfo); + createTopics(safeTestName); streamsConfiguration = new Properties(); - final String safeTestName = safeUniqueTestName(getClass(), testInfo); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -1041,8 +1041,7 @@ public class KStreamAggregationIntegrationTest { } - private void createTopics(final TestInfo testInfo) throws InterruptedException { - final String safeTestName = safeUniqueTestName(getClass(), testInfo); + private void createTopics(final String safeTestName) throws InterruptedException { streamOneInput = "stream-one-" + safeTestName; outputTopic = "output-" + safeTestName; userSessionsStream = "user-sessions-" + safeTestName; @@ -1071,7 +1070,7 @@ public class KStreamAggregationIntegrationTest { final TestInfo testInfo) throws Exception { - final String safeTestName = safeUniqueTestName(getClass(), testInfo); + final String safeTestName = safeUniqueTestName(testInfo); final Properties consumerProperties = new Properties(); consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName); @@ -1095,7 +1094,7 @@ public class KStreamAggregationIntegrationTest { final Class innerClass, final int numMessages, final TestInfo testInfo) throws Exception { - final String safeTestName = safeUniqueTestName(getClass(), testInfo); + final String safeTestName = safeUniqueTestName(testInfo); final Properties consumerProperties = new Properties(); consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java index c6cb077e6c0..1d9a77b5bf4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java @@ -96,7 +96,7 @@ public class KStreamKStreamIntegrationTest { @BeforeEach public void before(final TestInfo testInfo) throws IOException { final String stateDirBasePath = TestUtils.tempDirectory().getPath(); - final String safeTestName = safeUniqueTestName(getClass(), testInfo); + final String safeTestName = safeUniqueTestName(testInfo); streamsConfig = getStreamsConfig(safeTestName); streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java index 61f9cde5974..79e62c48ef1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java @@ -135,7 +135,7 @@ public class KStreamRepartitionIntegrationTest { streamsConfiguration = new Properties(); kafkaStreamsInstances = new ArrayList<>(); - safeTestName = safeUniqueTestName(getClass(), testName); + safeTestName = safeUniqueTestName(testName); topicB = "topic-b-" + safeTestName; inputTopic = "input-topic-" + safeTestName; @@ -890,7 +890,7 @@ public class KStreamRepartitionIntegrationTest { final List<KeyValue<K, V>> expectedRecords, final String outputTopic) throws Exception { - final String safeTestName = safeUniqueTestName(getClass(), testName); + final String safeTestName = safeUniqueTestName(testName); final Properties consumerProperties = new Properties(); consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java index 12a5056730d..a792717c1d3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java @@ -152,7 +152,7 @@ public class KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest { @BeforeEach public void before(final TestInfo testInfo) throws IOException { final String stateDirBasePath = TestUtils.tempDirectory().getPath(); - final String safeTestName = safeUniqueTestName(getClass(), testInfo); + final String safeTestName = safeUniqueTestName(testInfo); streamsConfig = getStreamsConfig(safeTestName); streamsConfigTwo = getStreamsConfig(safeTestName); streamsConfigThree = getStreamsConfig(safeTestName); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java index d775c9e54c9..a6d5b1a70f2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java @@ -126,8 +126,7 @@ public class KTableKTableForeignKeyJoinDistributedTest { quietlyCleanStateAfterTest(CLUSTER, client2); } - public Properties getStreamsConfiguration(final TestInfo testInfo) { - final String safeTestName = safeUniqueTestName(getClass(), testInfo); + public Properties getStreamsConfiguration(final String safeTestName) { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); @@ -158,8 +157,9 @@ public class KTableKTableForeignKeyJoinDistributedTest { @Test public void shouldBeInitializedWithDefaultSerde(final TestInfo testInfo) throws Exception { - final Properties streamsConfiguration1 = getStreamsConfiguration(testInfo); - final Properties streamsConfiguration2 = getStreamsConfiguration(testInfo); + final String safeTestName = safeUniqueTestName(testInfo); + final Properties streamsConfiguration1 = getStreamsConfiguration(safeTestName); + final Properties streamsConfiguration2 = getStreamsConfiguration(safeTestName); //Each streams client needs to have it's own StreamsBuilder in order to simulate //a truly distributed run diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java index 2a36556c99f..6655f697bb7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java @@ -54,7 +54,6 @@ import static java.util.Collections.emptyMap; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; -import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -82,7 +81,6 @@ public class KTableKTableForeignKeyJoinMaterializationIntegrationTest { @Before public void before() { - final String safeTestName = safeUniqueTestName(getClass(), testName); streamsConfig = mkProperties(mkMap( mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()) )); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java index cefc48d0161..d16aa2f0254 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java @@ -98,10 +98,11 @@ public class KTableSourceTopicRestartIntegrationTest { @BeforeEach public void before(final TestInfo testInfo) throws Exception { - sourceTopic = SOURCE_TOPIC + "-" + IntegrationTestUtils.safeUniqueTestName(getClass(), testInfo); + final String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo); + sourceTopic = SOURCE_TOPIC + "-" + safeTestName; CLUSTER.createTopic(sourceTopic); - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, IntegrationTestUtils.safeUniqueTestName(getClass(), testInfo)); + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, safeTestName); final KTable<String, String> kTable = streamsBuilder.table(sourceTopic, Materialized.as("store")); kTable.toStream().foreach(readKeyValues::put); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java index 00da6a89e18..0a7feba5354 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java @@ -102,12 +102,14 @@ public class KafkaStreamsCloseOptionsIntegrationTest { public void before() throws Exception { mockTime = CLUSTER.time; - final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + final String appID = IntegrationTestUtils.safeUniqueTestName(testName); commonClientConfig = new Properties(); commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfig = new Properties(); + streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); + streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance"); streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath()); streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); @@ -153,19 +155,14 @@ public class KafkaStreamsCloseOptionsIntegrationTest { @Test public void testCloseOptions() throws Exception { - final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); - streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); - streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "someGroupInstance"); // Test with two threads to show that each of the threads is being called to remove clients from the CG. streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); - - // RUN streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams); IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, OUTPUT_TOPIC, 10); streams.close(new CloseOptions().leaveGroup(true).timeout(Duration.ofSeconds(30))); - waitForEmptyConsumerGroup(adminClient, appID, 0); + waitForEmptyConsumerGroup(adminClient, streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0); } protected Topology setupTopologyWithoutIntermediateUserTopic() { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java index d2016901888..0d7be9b8e14 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java @@ -100,7 +100,7 @@ public class LagFetchIntegrationTest { @BeforeEach public void before(final TestInfo testInfo) { - final String safeTestName = safeUniqueTestName(getClass(), testInfo); + final String safeTestName = safeUniqueTestName(testInfo); inputTopicName = "input-topic-" + safeTestName; outputTopicName = "output-topic-" + safeTestName; stateStoreName = "lagfetch-test-store" + safeTestName; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java index a577a29c6ba..663fb22c5b2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java @@ -245,7 +245,7 @@ public class MetricsIntegrationTest { builder = new StreamsBuilder(); CLUSTER.createTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, STREAM_OUTPUT_3, STREAM_OUTPUT_4); - final String safeTestName = safeUniqueTestName(getClass(), testInfo); + final String safeTestName = safeUniqueTestName(testInfo); appId = "app-" + safeTestName; streamsConfiguration = new Properties(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java index bb67dddcf89..9f95d0a0823 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java @@ -73,7 +73,7 @@ public class MetricsReporterIntegrationTest { public void before(final TestInfo testInfo) throws InterruptedException { builder = new StreamsBuilder(); - final String safeTestName = safeUniqueTestName(getClass(), testInfo); + final String safeTestName = safeUniqueTestName(testInfo); final String appId = "app-" + safeTestName; streamsConfiguration = new Properties(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java index e6cd96b26cc..1dede6617c5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java @@ -217,7 +217,7 @@ public class NamedTopologyIntegrationTest { @BeforeEach public void setup(final TestInfo testInfo) throws Exception { - appId = safeUniqueTestName(NamedTopologyIntegrationTest.class, testInfo); + appId = safeUniqueTestName(testInfo); changelog1 = TOPIC_PREFIX + "-" + TOPOLOGY_1 + "-store-changelog"; changelog2 = TOPIC_PREFIX + "-" + TOPOLOGY_2 + "-store-changelog"; changelog3 = TOPIC_PREFIX + "-" + TOPOLOGY_3 + "-store-changelog"; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java index be30c447f8c..5d1a46f89e3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java @@ -116,8 +116,9 @@ public class OptimizedKTableIntegrationTest { .toStream() .peek((k, v) -> semaphore.release()); - final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration(testInfo)); - final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration(testInfo)); + final String safeTestName = safeUniqueTestName(testInfo); + final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, streamsConfiguration(safeTestName)); + final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, streamsConfiguration(safeTestName)); final List<KafkaStreams> kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2); try { @@ -199,8 +200,7 @@ public class OptimizedKTableIntegrationTest { return streams; } - private Properties streamsConfiguration(final TestInfo testInfo) { - final String safeTestName = safeUniqueTestName(getClass(), testInfo); + private Properties streamsConfiguration(final String safeTestName) { final Properties config = new Properties(); config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java index 5c3f8aa90d0..3f4412256b7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java @@ -125,7 +125,7 @@ public class PauseResumeIntegrationTest { @BeforeEach public void createTopics(final TestInfo testInfo) throws InterruptedException { cleanStateBeforeTest(CLUSTER, 1, INPUT_STREAM_1, INPUT_STREAM_2, OUTPUT_STREAM_1, OUTPUT_STREAM_2); - appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testInfo); + appId = safeUniqueTestName(testInfo); } private Properties props(final boolean stateUpdaterEnabled) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 97816ebdc1b..df87793420b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -160,8 +160,7 @@ public class QueryableStateIntegrationTest { private Comparator<KeyValue<String, String>> stringComparator; private Comparator<KeyValue<String, Long>> stringLongComparator; - private void createTopics(final TestInfo testInfo) throws Exception { - final String safeTestName = safeUniqueTestName(getClass(), testInfo); + private void createTopics(final String safeTestName) throws Exception { streamOne = streamOne + "-" + safeTestName; streamConcurrent = streamConcurrent + "-" + safeTestName; streamThree = streamThree + "-" + safeTestName; @@ -214,9 +213,9 @@ public class QueryableStateIntegrationTest { @BeforeEach public void before(final TestInfo testInfo) throws Exception { - createTopics(testInfo); + final String safeTestName = safeUniqueTestName(testInfo); + createTopics(safeTestName); streamsConfiguration = new Properties(); - final String safeTestName = safeUniqueTestName(getClass(), testInfo); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); @@ -444,7 +443,7 @@ public class QueryableStateIntegrationTest { @Test public void shouldRejectNonExistentStoreName(final TestInfo testInfo) throws InterruptedException { - final String uniqueTestName = safeUniqueTestName(getClass(), testInfo); + final String uniqueTestName = safeUniqueTestName(testInfo); final String input = uniqueTestName + "-input"; final String storeName = uniqueTestName + "-input-table"; @@ -458,7 +457,7 @@ public class QueryableStateIntegrationTest { ); final Properties properties = mkProperties(mkMap( - mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, safeUniqueTestName(getClass(), testInfo)), + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, uniqueTestName), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()) )); @@ -482,7 +481,7 @@ public class QueryableStateIntegrationTest { @Test public void shouldRejectWronglyTypedStore(final TestInfo testInfo) throws InterruptedException { - final String uniqueTestName = safeUniqueTestName(getClass(), testInfo); + final String uniqueTestName = safeUniqueTestName(testInfo); final String input = uniqueTestName + "-input"; final String storeName = uniqueTestName + "-input-table"; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index e5f3080f8bb..cb5a3da1631 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -137,7 +137,7 @@ public class RegexSourceIntegrationTest { properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000); streamsConfiguration = StreamsTestUtils.getStreamsConfig( - IntegrationTestUtils.safeUniqueTestName(RegexSourceIntegrationTest.class, testInfo), + IntegrationTestUtils.safeUniqueTestName(testInfo), CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, STRING_SERDE_CLASSNAME, diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java index 7a851c63bfb..fe83e9d2b5f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java @@ -101,7 +101,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void shouldNotAllowToResetWhileStreamsIsRunning() { - final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + final String appID = IntegrationTestUtils.safeUniqueTestName(testName); final String[] parameters = new String[] { "--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), @@ -125,7 +125,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void shouldNotAllowToResetWhenInputTopicAbsent() { - final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + final String appID = IntegrationTestUtils.safeUniqueTestName(testName); final String[] parameters = new String[] { "--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), @@ -141,7 +141,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void shouldNotAllowToResetWhenIntermediateTopicAbsent() { - final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + final String appID = IntegrationTestUtils.safeUniqueTestName(testName); final String[] parameters = new String[] { "--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), @@ -157,7 +157,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void shouldNotAllowToResetWhenSpecifiedInternalTopicDoesNotExist() { - final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + final String appID = IntegrationTestUtils.safeUniqueTestName(testName); final String[] parameters = new String[] { "--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), @@ -173,7 +173,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void shouldNotAllowToResetWhenSpecifiedInternalTopicIsNotInternal() { - final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + final String appID = IntegrationTestUtils.safeUniqueTestName(testName); final String[] parameters = new String[] { "--application-id", appID, "--bootstrap-server", cluster.bootstrapServers(), @@ -189,7 +189,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() throws Exception { - final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + final String appID = IntegrationTestUtils.safeUniqueTestName(testName); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.toString(STREAMS_CONSUMER_TIMEOUT * 100)); @@ -225,7 +225,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws Exception { - final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + final String appID = IntegrationTestUtils.safeUniqueTestName(testName); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN @@ -266,7 +266,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws Exception { - final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + final String appID = IntegrationTestUtils.safeUniqueTestName(testName); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN @@ -311,7 +311,7 @@ public class ResetIntegrationTest extends AbstractResetIntegrationTest { @Test public void testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws Exception { - final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + final String appID = IntegrationTestUtils.safeUniqueTestName(testName); streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); // RUN diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java index 7fe905ae7d4..40c03e6d891 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java @@ -112,7 +112,7 @@ public class ResetPartitionTimeIntegrationTest { @Test public void shouldPreservePartitionTimeOnKafkaStreamRestart() { - final String appId = "app-" + safeUniqueTestName(getClass(), testName); + final String appId = "app-" + safeUniqueTestName(testName); final String input = "input"; final String outputRaw = "output-raw"; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java index f61092106e7..812da300747 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java @@ -139,7 +139,7 @@ public class RestoreIntegrationTest { @BeforeEach public void createTopics(final TestInfo testInfo) throws InterruptedException { - appId = safeUniqueTestName(RestoreIntegrationTest.class, testInfo); + appId = safeUniqueTestName(testInfo); inputStream = appId + "-input-stream"; CLUSTER.createTopic(inputStream, 2, 1); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java index 02d34b51fb7..706b27c4063 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java @@ -193,7 +193,7 @@ public class RocksDBMetricsIntegrationTest { private Properties streamsConfig() { final Properties streamsConfiguration = new Properties(); - final String safeTestName = safeUniqueTestName(getClass(), testName); + final String safeTestName = safeUniqueTestName(testName); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-application-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java index ff8cd3b8339..21bcb609541 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java @@ -79,17 +79,19 @@ public class SelfJoinUpgradeIntegrationTest { @Rule public TestName testName = new TestName(); + private String safeTestName; + @Before public void createTopics() throws Exception { - inputTopic = INPUT_TOPIC + safeUniqueTestName(getClass(), testName); - outputTopic = OUTPUT_TOPIC + safeUniqueTestName(getClass(), testName); + safeTestName = safeUniqueTestName(testName); + inputTopic = INPUT_TOPIC + safeTestName; + outputTopic = OUTPUT_TOPIC + safeTestName; CLUSTER.createTopic(inputTopic); CLUSTER.createTopic(outputTopic); } private Properties props() { final Properties streamsConfiguration = new Properties(); - final String safeTestName = safeUniqueTestName(getClass(), testName); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); @@ -113,7 +115,6 @@ public class SelfJoinUpgradeIntegrationTest { @Test - @SuppressWarnings("unchecked") public void shouldUpgradeWithTopologyOptimizationOff() throws Exception { final StreamsBuilder streamsBuilderOld = new StreamsBuilder(); @@ -127,20 +128,30 @@ public class SelfJoinUpgradeIntegrationTest { ); joinedOld.to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); - + final String safeTestName = safeUniqueTestName(testName); final Properties props = props(); props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props); kafkaStreams.start(); final long currentTime = CLUSTER.time.milliseconds(); - processKeyValueAndVerifyCount("1", "A", currentTime + 42L, asList( - new KeyValueTimestamp<String, String>("1", "AA", currentTime + 42L))); + processKeyValueAndVerifyCount( + "1", + "A", + currentTime + 42L, + asList(new KeyValueTimestamp<>("1", "AA", currentTime + 42L)) + ); - processKeyValueAndVerifyCount("1", "B", currentTime + 43L, asList( - new KeyValueTimestamp("1", "BA", currentTime + 43L), - new KeyValueTimestamp("1", "AB", currentTime + 43L), - new KeyValueTimestamp("1", "BB", currentTime + 43L))); + processKeyValueAndVerifyCount( + "1", + "B", + currentTime + 43L, + asList( + new KeyValueTimestamp<>("1", "BA", currentTime + 43L), + new KeyValueTimestamp<>("1", "AB", currentTime + 43L), + new KeyValueTimestamp<>("1", "BB", currentTime + 43L) + ) + ); kafkaStreams.close(); @@ -152,19 +163,23 @@ public class SelfJoinUpgradeIntegrationTest { final long currentTimeNew = CLUSTER.time.milliseconds(); - processKeyValueAndVerifyCount("1", "C", currentTimeNew + 44L, asList( - new KeyValueTimestamp("1", "CA", currentTimeNew + 44L), - new KeyValueTimestamp("1", "CB", currentTimeNew + 44L), - new KeyValueTimestamp("1", "AC", currentTimeNew + 44L), - new KeyValueTimestamp("1", "BC", currentTimeNew + 44L), - new KeyValueTimestamp("1", "CC", currentTimeNew + 44L))); - + processKeyValueAndVerifyCount( + "1", + "C", + currentTimeNew + 44L, + asList( + new KeyValueTimestamp<>("1", "CA", currentTimeNew + 44L), + new KeyValueTimestamp<>("1", "CB", currentTimeNew + 44L), + new KeyValueTimestamp<>("1", "AC", currentTimeNew + 44L), + new KeyValueTimestamp<>("1", "BC", currentTimeNew + 44L), + new KeyValueTimestamp<>("1", "CC", currentTimeNew + 44L) + ) + ); kafkaStreams.close(); } @Test - @SuppressWarnings("unchecked") public void shouldRestartWithTopologyOptimizationOn() throws Exception { final StreamsBuilder streamsBuilderOld = new StreamsBuilder(); @@ -185,14 +200,23 @@ public class SelfJoinUpgradeIntegrationTest { kafkaStreams.start(); final long currentTime = CLUSTER.time.milliseconds(); - processKeyValueAndVerifyCount("1", "A", currentTime + 42L, asList( - new KeyValueTimestamp("1", "AA", currentTime + 42L))); - - processKeyValueAndVerifyCount("1", "B", currentTime + 43L, asList( - new KeyValueTimestamp("1", "BA", currentTime + 43L), - new KeyValueTimestamp("1", "AB", currentTime + 43L), - new KeyValueTimestamp("1", "BB", currentTime + 43L))); + processKeyValueAndVerifyCount( + "1", + "A", + currentTime + 42L, + asList(new KeyValueTimestamp<>("1", "AA", currentTime + 42L)) + ); + processKeyValueAndVerifyCount( + "1", + "B", + currentTime + 43L, + asList( + new KeyValueTimestamp<>("1", "BA", currentTime + 43L), + new KeyValueTimestamp<>("1", "AB", currentTime + 43L), + new KeyValueTimestamp<>("1", "BB", currentTime + 43L) + ) + ); kafkaStreams.close(); kafkaStreams = null; @@ -203,12 +227,18 @@ public class SelfJoinUpgradeIntegrationTest { final long currentTimeNew = CLUSTER.time.milliseconds(); - processKeyValueAndVerifyCount("1", "C", currentTimeNew + 44L, asList( - new KeyValueTimestamp("1", "CA", currentTimeNew + 44L), - new KeyValueTimestamp("1", "CB", currentTimeNew + 44L), - new KeyValueTimestamp("1", "AC", currentTimeNew + 44L), - new KeyValueTimestamp("1", "BC", currentTimeNew + 44L), - new KeyValueTimestamp("1", "CC", currentTimeNew + 44L))); + processKeyValueAndVerifyCount( + "1", + "C", + currentTimeNew + 44L, + asList( + new KeyValueTimestamp<>("1", "CA", currentTimeNew + 44L), + new KeyValueTimestamp<>("1", "CB", currentTimeNew + 44L), + new KeyValueTimestamp<>("1", "AC", currentTimeNew + 44L), + new KeyValueTimestamp<>("1", "BC", currentTimeNew + 44L), + new KeyValueTimestamp<>("1", "CC", currentTimeNew + 44L) + ) + ); kafkaStreams.close(); } @@ -230,7 +260,6 @@ public class SelfJoinUpgradeIntegrationTest { timestamp); - final String safeTestName = safeUniqueTestName(getClass(), testName); final Properties consumerProperties = new Properties(); consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName); @@ -251,6 +280,4 @@ public class SelfJoinUpgradeIntegrationTest { return actual.equals(expected); } - - } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java index 395ef6efd66..6dcbd80384a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java @@ -121,6 +121,8 @@ public class SlidingWindowedKStreamIntegrationTest { private EmitStrategy emitStrategy; private boolean emitFinal; + private String safeTestName; + @Parameterized.Parameters(name = "{0}_cache:{1}") public static Collection<Object[]> getEmitStrategy() { return asList(new Object[][] { @@ -134,9 +136,9 @@ public class SlidingWindowedKStreamIntegrationTest { @Before public void before() throws InterruptedException { builder = new StreamsBuilder(); + safeTestName = safeUniqueTestName(testName); createTopics(); streamsConfiguration = new Properties(); - final String safeTestName = safeUniqueTestName(getClass(), testName); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -445,7 +447,6 @@ public class SlidingWindowedKStreamIntegrationTest { } private void createTopics() throws InterruptedException { - final String safeTestName = safeUniqueTestName(getClass(), testName); streamOneInput = "stream-one-" + safeTestName; streamTwoInput = "stream-two-" + safeTestName; outputTopic = "output-" + safeTestName; @@ -464,7 +465,6 @@ public class SlidingWindowedKStreamIntegrationTest { final long windowSize, final Class innerClass, final int numMessages) throws Exception { - final String safeTestName = safeUniqueTestName(getClass(), testName); final Properties consumerProperties = new Properties(); consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java index 95eab415c33..2ecb24b94cd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; @@ -54,12 +55,19 @@ public class StandbyTaskCreationIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + private String safeTestName; + @BeforeAll public static void startCluster() throws IOException, InterruptedException { CLUSTER.start(); CLUSTER.createTopic(INPUT_TOPIC, 2, 1); } + @BeforeEach + public void setUp(final TestInfo testInfo) { + safeTestName = safeUniqueTestName(testInfo); + } + @AfterAll public static void closeCluster() { CLUSTER.stop(); @@ -79,7 +87,6 @@ public class StandbyTaskCreationIntegrationTest { } private Properties streamsConfiguration(final TestInfo testInfo) { - final String safeTestName = safeUniqueTestName(getClass(), testInfo); final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java index eaa24e8005a..bac33419819 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java @@ -121,7 +121,7 @@ public class StandbyTaskEOSIntegrationTest { @Before public void createTopics() throws Exception { - final String safeTestName = safeUniqueTestName(getClass(), testName); + final String safeTestName = safeUniqueTestName(testName); appId = "app-" + safeTestName; inputTopic = "input-" + safeTestName; outputTopic = "output-" + safeTestName; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java index 1515debe845..69466197bca 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java @@ -78,7 +78,7 @@ public class StateDirectoryIntegrationTest { @Test public void testCleanUpStateDirIfEmpty() throws InterruptedException { - final String uniqueTestName = safeUniqueTestName(getClass(), testName); + final String uniqueTestName = safeUniqueTestName(testName); // Create Topic final String input = uniqueTestName + "-input"; @@ -184,7 +184,7 @@ public class StateDirectoryIntegrationTest { @Test public void testNotCleanUpStateDirIfNotEmpty() throws InterruptedException { - final String uniqueTestName = safeUniqueTestName(getClass(), testName); + final String uniqueTestName = safeUniqueTestName(testName); // Create Topic final String input = uniqueTestName + "-input"; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java index 8b5b1ebd62a..e20f7168059 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java @@ -112,7 +112,7 @@ public class StoreQueryIntegrationTest { @BeforeEach public void before(final TestInfo testInfo) throws InterruptedException, IOException { - this.appId = safeUniqueTestName(getClass(), testInfo); + this.appId = safeUniqueTestName(testInfo); } @AfterEach diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java index 57d5bcad162..c6ed805ac4c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java @@ -85,13 +85,13 @@ public class StoreUpgradeIntegrationTest { @Before public void createTopics() throws Exception { - inputStream = "input-stream-" + safeUniqueTestName(getClass(), testName); + inputStream = "input-stream-" + safeUniqueTestName(testName); CLUSTER.createTopic(inputStream); } private Properties props() { final Properties streamsConfiguration = new Properties(); - final String safeTestName = safeUniqueTestName(getClass(), testName); + final String safeTestName = safeUniqueTestName(testName); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java index 282ceeed0ec..073e80a7287 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java @@ -110,7 +110,7 @@ public class StreamTableJoinTopologyOptimizationIntegrationTest { public void before() throws InterruptedException { streamsConfiguration = new Properties(); - final String safeTestName = safeUniqueTestName(getClass(), testName); + final String safeTestName = safeUniqueTestName(testName); tableTopic = "table-topic" + safeTestName; inputTopic = "stream-topic-" + safeTestName; @@ -241,7 +241,7 @@ public class StreamTableJoinTopologyOptimizationIntegrationTest { final Deserializer<V> valueSerializer, final List<KeyValue<K, V>> expectedRecords) throws Exception { - final String safeTestName = safeUniqueTestName(getClass(), testName); + final String safeTestName = safeUniqueTestName(testName); final Properties consumerProperties = new Properties(); consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java index 916afc201d3..63bb1a15b07 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java @@ -97,7 +97,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest { @Rule public final TestName testName = new TestName(); - private final String testId = safeUniqueTestName(getClass(), testName); + private final String testId = safeUniqueTestName(testName); private final String appId = "appId_" + testId; private final String inputTopic = "input" + testId; private final String inputTopic2 = "input2" + testId; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java index 1dc6e6a6072..d0e90e6882a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java @@ -130,7 +130,7 @@ public class SuppressionDurabilityIntegrationTest { @Test @SuppressWarnings("deprecation") public void shouldRecoverBufferAfterShutdown() { - final String testId = safeUniqueTestName(getClass(), testName); + final String testId = safeUniqueTestName(testName); final String appId = "appId_" + testId; final String input = "input" + testId; final String storeName = "counts"; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java index 7f00f9ddd48..de7ece8423d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java @@ -92,7 +92,7 @@ public class TaskAssignorIntegrationTest { // ensure these configurations wind up where they belong, and any number of future code changes // could break this change. - final String testId = safeUniqueTestName(getClass(), testName); + final String testId = safeUniqueTestName(testName); final String appId = "appId_" + testId; final String inputTopic = "input" + testId; diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java index cc2d9a0fe23..62d3758c86c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java @@ -88,7 +88,7 @@ public class TaskMetadataIntegrationTest { @Before public void setup() { - final String testId = safeUniqueTestName(getClass(), testName); + final String testId = safeUniqueTestName(testName); appId = appIdPrefix + testId; inputTopic = "input" + testId; IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java index 91aa583060c..6f5a2b09be6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java @@ -129,6 +129,8 @@ public class TimeWindowedKStreamIntegrationTest { private boolean emitFinal; + private String safeTestName; + @Parameterized.Parameters(name = "{0}_{1}") public static Collection<Object[]> getEmitStrategy() { return asList(new Object[][] { @@ -142,9 +144,9 @@ public class TimeWindowedKStreamIntegrationTest { @Before public void before() throws InterruptedException { builder = new StreamsBuilder(); + safeTestName = safeUniqueTestName(testName); createTopics(); streamsConfiguration = new Properties(); - final String safeTestName = safeUniqueTestName(getClass(), testName); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -477,7 +479,6 @@ public class TimeWindowedKStreamIntegrationTest { } private void createTopics() throws InterruptedException { - final String safeTestName = safeUniqueTestName(getClass(), testName); streamOneInput = "stream-one-" + safeTestName; streamTwoInput = "stream-two-" + safeTestName; outputTopic = "output-" + safeTestName; @@ -496,7 +497,6 @@ public class TimeWindowedKStreamIntegrationTest { final long windowSize, final Class innerClass, final int numMessages) throws Exception { - final String safeTestName = safeUniqueTestName(getClass(), testName); final Properties consumerProperties = new Properties(); consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-" + safeTestName); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java index 94d2644905b..15b1ae1ea80 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java @@ -108,7 +108,7 @@ public class VersionedKeyValueStoreIntegrationTest { @Before public void beforeTest() throws InterruptedException { - final String uniqueTestName = safeUniqueTestName(getClass(), testName); + final String uniqueTestName = safeUniqueTestName(testName); inputStream = "input-stream-" + uniqueTestName; globalTableTopic = "global-table-" + uniqueTestName; outputStream = "output-stream-" + uniqueTestName; @@ -203,7 +203,7 @@ public class VersionedKeyValueStoreIntegrationTest { 1); // verify changelog topic properties - final String changelogTopic = "app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog"; + final String changelogTopic = props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + "-versioned-store-changelog"; final Properties changelogTopicConfig = CLUSTER.getLogConfig(changelogTopic); assertThat(changelogTopicConfig.getProperty("cleanup.policy"), equalTo("compact")); assertThat(changelogTopicConfig.getProperty("min.compaction.lag.ms"), equalTo(Long.toString(HISTORY_RETENTION + 24 * 60 * 60 * 1000L))); @@ -361,7 +361,7 @@ public class VersionedKeyValueStoreIntegrationTest { private void shouldManualUpgradeFromNonVersionedToVersioned(final Topology originalTopology) throws Exception { // build original (non-versioned) topology and start app - Properties props = props(); + final Properties props = props(); // additional property to prevent premature compaction of older record versions while using timestamped store props.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, 60_000L); kafkaStreams = new KafkaStreams(originalTopology, props); @@ -407,7 +407,6 @@ public class VersionedKeyValueStoreIntegrationTest { .process(() -> new VersionedStoreContentCheckerProcessor(true, data), STORE_NAME) .to(outputStream, Produced.with(Serdes.Integer(), Serdes.Integer())); - props = props(); kafkaStreams = new KafkaStreams(streamsBuilder.build(), props); kafkaStreams.start(); @@ -430,8 +429,8 @@ public class VersionedKeyValueStoreIntegrationTest { } private Properties props() { + final String safeTestName = safeUniqueTestName(testName); final Properties streamsConfiguration = new Properties(); - final String safeTestName = safeUniqueTestName(getClass(), testName); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index cc6c96b30f8..4f1d8d3d426 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -31,6 +31,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Metric; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState; @@ -231,8 +232,9 @@ public class IntegrationTestUtils { * The name is safe even for parameterized methods. * Used by tests not yet migrated from JUnit 4. */ - public static String safeUniqueTestName(final Class<?> testClass, final TestName testName) { - return safeUniqueTestName(testClass, testName.getMethodName()); + public static String safeUniqueTestName(final TestName testName) { + final String methodName = testName.getMethodName(); + return safeUniqueTestName(methodName); } /** @@ -240,21 +242,25 @@ public class IntegrationTestUtils { * JUnit 5 instead of a TestName from JUnit 4. * Used by tests migrated to JUnit 5. */ - public static String safeUniqueTestName(final Class<?> testClass, final TestInfo testInfo) { - final String displayName = testInfo.getDisplayName(); + public static String safeUniqueTestName(final TestInfo testInfo) { final String methodName = testInfo.getTestMethod().map(Method::getName).orElse("unknownMethodName"); - final String testName = displayName.contains(methodName) ? methodName : methodName + displayName; - return safeUniqueTestName(testClass, testName); - } - - private static String safeUniqueTestName(final Class<?> testClass, final String testName) { - return (testClass.getSimpleName() + testName) - .replace(':', '_') - .replace('.', '_') - .replace('[', '_') - .replace(']', '_') - .replace(' ', '_') - .replace('=', '_'); + return safeUniqueTestName(methodName); + } + + private static String safeUniqueTestName(final String testName) { + return sanitize(testName + Uuid.randomUuid().toString()); + } + + private static String sanitize(final String str) { + return str + // The `-` is used in Streams' thread name as a separator and some tests rely on this. + .replace('-', '_') + .replace(':', '_') + .replace('.', '_') + .replace('[', '_') + .replace(']', '_') + .replace(' ', '_') + .replace('=', '_'); } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java index a111e829f53..f85596c0cc1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java @@ -240,7 +240,7 @@ public class KTableKTableForeignKeyJoinScenarioTest { private void validateTopologyCanProcessData(final StreamsBuilder builder) { final Properties config = new Properties(); - final String safeTestName = safeUniqueTestName(getClass(), testName); + final String safeTestName = safeUniqueTestName(testName); config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class.getName()); config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); config.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java index 017cccf69d8..d6a8a40874b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java @@ -85,7 +85,7 @@ public class HandlingSourceTopicDeletionIntegrationTest { builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), Serdes.String())) .to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), Serdes.String())); - final String safeTestName = safeUniqueTestName(getClass(), testName); + final String safeTestName = safeUniqueTestName(testName); final String appId = "app-" + safeTestName; final Properties streamsConfiguration = new Properties();