This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.7 by this push:
     new 6e998cffdd3 MINOR: Add junit properties to display parameterized test 
names (#14983)
6e998cffdd3 is described below

commit 6e998cffdd33e343945877ccee1fec8337c7d57d
Author: Bruno Cadonna <cado...@apache.org>
AuthorDate: Wed Dec 13 09:42:18 2023 +0100

    MINOR: Add junit properties to display parameterized test names (#14983)
    
    In many parameterized tests, the display name is broken. Example - 
testMetadataFetch appears as [1] true, [2] false link
    This is because the constant in @ParameterizedTest
    
    String DEFAULT_DISPLAY_NAME = "[{index}] {argumentsWithNames}";
    
    This PR adds a new junit-platform.properties which overrides to add a 
{displayName} which shows the the display name of the method
    
    For existing tests which override the name, should work as is. The 
precedence rules are explained
    
        name attribute in @ParameterizedTest, if present
        value of the junit.jupiter.params.displayname.default configuration 
parameter, if present
        DEFAULT_DISPLAY_NAME constant defined in @ParameterizedTest
    
    Source: 
https://junit.org/junit5/docs/current/user-guide/#writing-tests-parameterized-tests-display-names
    
    Sample test run output
    Before: [1] true link
    After: testMetadataExpiry(boolean).false link
    
    This commit is an extension of 
https://github.com/apache/kafka/commit/bdf6d46b41bba22d5a9adf6eb1c87e46d8b9c095 
which needed to reverted due to introduces test failures.
    
    Reviewers: David Jacot <dja...@confluent.io>, Lucas Brutschy 
<lbruts...@confluent.io>
---
 .../src/test/resources/junit-platform.properties   | 15 ++++
 .../src/test/resources/junit-platform.properties   | 15 ++++
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  6 +-
 .../integration/AbstractResetIntegrationTest.java  |  8 +-
 .../integration/AdjustStreamThreadCountTest.java   |  2 +-
 .../ConsistencyVectorIntegrationTest.java          |  8 +-
 .../integration/EmitOnChangeIntegrationTest.java   |  2 +-
 .../GlobalKTableEOSIntegrationTest.java            |  7 +-
 .../integration/GlobalKTableIntegrationTest.java   |  7 +-
 .../integration/GlobalThreadShutDownOrderTest.java |  2 +-
 ...ighAvailabilityTaskAssignorIntegrationTest.java |  2 +-
 .../streams/integration/IQv2IntegrationTest.java   | 11 +--
 .../JoinGracePeriodDurabilityIntegrationTest.java  |  2 +-
 .../KStreamAggregationDedupIntegrationTest.java    |  9 +-
 .../KStreamAggregationIntegrationTest.java         | 11 ++-
 .../integration/KStreamKStreamIntegrationTest.java |  2 +-
 .../KStreamRepartitionIntegrationTest.java         |  4 +-
 ...yInnerJoinCustomPartitionerIntegrationTest.java |  2 +-
 .../KTableKTableForeignKeyJoinDistributedTest.java |  8 +-
 ...reignKeyJoinMaterializationIntegrationTest.java |  2 -
 .../KTableSourceTopicRestartIntegrationTest.java   |  5 +-
 .../KafkaStreamsCloseOptionsIntegrationTest.java   | 11 +--
 .../integration/LagFetchIntegrationTest.java       |  2 +-
 .../integration/MetricsIntegrationTest.java        |  2 +-
 .../MetricsReporterIntegrationTest.java            |  2 +-
 .../integration/NamedTopologyIntegrationTest.java  |  2 +-
 .../OptimizedKTableIntegrationTest.java            |  8 +-
 .../integration/PauseResumeIntegrationTest.java    |  2 +-
 .../integration/QueryableStateIntegrationTest.java | 13 ++-
 .../integration/RegexSourceIntegrationTest.java    |  2 +-
 .../streams/integration/ResetIntegrationTest.java  | 18 ++--
 .../ResetPartitionTimeIntegrationTest.java         |  2 +-
 .../integration/RestoreIntegrationTest.java        |  2 +-
 .../integration/RocksDBMetricsIntegrationTest.java |  2 +-
 .../SelfJoinUpgradeIntegrationTest.java            | 97 ++++++++++++++--------
 .../SlidingWindowedKStreamIntegrationTest.java     |  6 +-
 .../StandbyTaskCreationIntegrationTest.java        |  9 +-
 .../integration/StandbyTaskEOSIntegrationTest.java |  2 +-
 .../integration/StateDirectoryIntegrationTest.java |  4 +-
 .../integration/StoreQueryIntegrationTest.java     |  2 +-
 .../integration/StoreUpgradeIntegrationTest.java   |  4 +-
 ...bleJoinTopologyOptimizationIntegrationTest.java |  4 +-
 ...amsUncaughtExceptionHandlerIntegrationTest.java |  2 +-
 .../SuppressionDurabilityIntegrationTest.java      |  2 +-
 .../integration/TaskAssignorIntegrationTest.java   |  2 +-
 .../integration/TaskMetadataIntegrationTest.java   |  2 +-
 .../TimeWindowedKStreamIntegrationTest.java        |  6 +-
 .../VersionedKeyValueStoreIntegrationTest.java     |  9 +-
 .../integration/utils/IntegrationTestUtils.java    | 38 +++++----
 .../KTableKTableForeignKeyJoinScenarioTest.java    |  2 +-
 ...HandlingSourceTopicDeletionIntegrationTest.java |  2 +-
 51 files changed, 226 insertions(+), 165 deletions(-)

diff --git a/clients/src/test/resources/junit-platform.properties 
b/clients/src/test/resources/junit-platform.properties
new file mode 100644
index 00000000000..05069923a7f
--- /dev/null
+++ b/clients/src/test/resources/junit-platform.properties
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+junit.jupiter.params.displayname.default = "{displayName}.{argumentsWithNames}"
diff --git a/server-common/src/test/resources/junit-platform.properties 
b/server-common/src/test/resources/junit-platform.properties
new file mode 100644
index 00000000000..05069923a7f
--- /dev/null
+++ b/server-common/src/test/resources/junit-platform.properties
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+junit.jupiter.params.displayname.default = "{displayName}.{argumentsWithNames}"
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index e03d562ed1a..f5a8dfdece4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -1175,7 +1175,7 @@ public class KafkaStreamsTest {
 
     @Test
     public void statelessTopologyShouldNotCreateStateDirectory() {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(testName);
         final String inputTopic = safeTestName + "-input";
         final String outputTopic = safeTestName + "-output";
         final Topology topology = new Topology();
@@ -1201,7 +1201,7 @@ public class KafkaStreamsTest {
 
     @Test
     public void inMemoryStatefulTopologyShouldNotCreateStateDirectory() {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(testName);
         final String inputTopic = safeTestName + "-input";
         final String outputTopic = safeTestName + "-output";
         final String globalTopicName = safeTestName + "-global";
@@ -1213,7 +1213,7 @@ public class KafkaStreamsTest {
 
     @Test
     public void statefulTopologyShouldCreateStateDirectory() {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(testName);
         final String inputTopic = safeTestName + "-input";
         final String outputTopic = safeTestName + "-output";
         final String globalTopicName = safeTestName + "-global";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index 05b119da064..fe41e6c1e82 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -164,7 +164,7 @@ public abstract class AbstractResetIntegrationTest {
     protected static final int TIMEOUT_MULTIPLIER = 15;
 
     void prepareTest() throws Exception {
-        final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        final String appID = IntegrationTestUtils.safeUniqueTestName(testName);
         prepareConfigs(appID);
         prepareEnvironment();
 
@@ -205,7 +205,7 @@ public abstract class AbstractResetIntegrationTest {
 
     @Test
     public void testResetWhenInternalTopicsAreSpecified() throws Exception {
-        final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        final String appID = IntegrationTestUtils.safeUniqueTestName(testName);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
         // RUN
@@ -233,7 +233,7 @@ public abstract class AbstractResetIntegrationTest {
 
     @Test
     public void 
testReprocessingFromScratchAfterResetWithoutIntermediateUserTopic() throws 
Exception {
-        final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        final String appID = IntegrationTestUtils.safeUniqueTestName(testName);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
         // RUN
@@ -278,7 +278,7 @@ public abstract class AbstractResetIntegrationTest {
             cluster.createTopic(INTERMEDIATE_USER_TOPIC);
         }
 
-        final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        final String appID = IntegrationTestUtils.safeUniqueTestName(testName);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
         // RUN
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
index a74af0a25ac..9cee7618494 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
@@ -100,7 +100,7 @@ public class AdjustStreamThreadCountTest {
 
     @BeforeEach
     public void setup(final TestInfo testInfo) {
-        final String testId = safeUniqueTestName(getClass(), testInfo);
+        final String testId = safeUniqueTestName(testInfo);
         appId = "appId_" + testId;
         inputTopic = "input" + testId;
         IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
index dc720559a9b..3e809467b08 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
@@ -115,8 +115,9 @@ public class ConsistencyVectorIntegrationTest {
                .toStream()
                .peek((k, v) -> semaphore.release());
 
-        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration());
-        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration());
+        final String safeTestName = safeUniqueTestName(testName);
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration(safeTestName));
+        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration(safeTestName));
         final List<KafkaStreams> kafkaStreamsList = 
Arrays.asList(kafkaStreams1, kafkaStreams2);
 
         try {
@@ -209,8 +210,7 @@ public class ConsistencyVectorIntegrationTest {
         );
     }
 
-    private Properties streamsConfiguration() {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+    private Properties streamsConfiguration(final String safeTestName) {
         final Properties config = new Properties();
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
         config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + 
(++port));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
index 876c5ed0190..00dd2d436d5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
@@ -72,7 +72,7 @@ public class EmitOnChangeIntegrationTest {
 
     @BeforeEach
     public void setup(final TestInfo testInfo) {
-        final String testId = safeUniqueTestName(getClass(), testInfo);
+        final String testId = safeUniqueTestName(testInfo);
         appId = "appId_" + testId;
         inputTopic = "input" + testId;
         inputTopic2 = "input2" + testId;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
index c6ef6c72694..7b2c8674083 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java
@@ -130,9 +130,9 @@ public class GlobalKTableEOSIntegrationTest {
     @Before
     public void before() throws Exception {
         builder = new StreamsBuilder();
-        createTopics();
+        final String safeTestName = safeUniqueTestName(testName);
+        createTopics(safeTestName);
         streamsConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
@@ -464,8 +464,7 @@ public class GlobalKTableEOSIntegrationTest {
         );
     }
 
-    private void createTopics() throws Exception {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+    private void createTopics(final String safeTestName) throws Exception {
         streamTopic = "stream-" + safeTestName;
         globalTableTopic = "globalTable-" + safeTestName;
         CLUSTER.createTopics(streamTopic);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index e184b4d6a15..6bf28c52d6a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -100,9 +100,9 @@ public class GlobalKTableIntegrationTest {
     @BeforeEach
     public void before(final TestInfo testInfo) throws Exception {
         builder = new StreamsBuilder();
-        createTopics(testInfo);
+        final String safeTestName = safeUniqueTestName(testInfo);
+        createTopics(safeTestName);
         streamsConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
@@ -341,8 +341,7 @@ public class GlobalKTableIntegrationTest {
         kafkaStreams.close();
     }
 
-    private void createTopics(final TestInfo testInfo) throws Exception {
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+    private void createTopics(final String safeTestName) throws Exception {
         streamTopic = "stream-" + safeTestName;
         globalTableTopic = "globalTable-" + safeTestName;
         CLUSTER.createTopics(streamTopic);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
index 06314adbd40..18db670d90d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalThreadShutDownOrderTest.java
@@ -110,7 +110,7 @@ public class GlobalThreadShutDownOrderTest {
         builder = new StreamsBuilder();
         createTopics();
         streamsConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+        final String safeTestName = safeUniqueTestName(testInfo);
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
index 7de06e2c273..7d512c14f57 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
@@ -134,7 +134,7 @@ public class HighAvailabilityTaskAssignorIntegrationTest {
                                                final TestInfo testInfo,
                                                final String rackAwareStrategy) 
throws InterruptedException {
         // Replace "balance_subtopology" with shorter name since max name 
length is 249
-        final String testId = safeUniqueTestName(getClass(), 
testInfo).replaceAll("balance_subtopology", "balance");
+        final String testId = 
safeUniqueTestName(testInfo).replaceAll("balance_subtopology", "balance");
         final String appId = "appId_" + System.currentTimeMillis() + "_" + 
testId;
         final String inputTopic = "input" + testId;
         final Set<TopicPartition> inputTopicPartitions = mkSet(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
index 455ee976e0e..becbe301ecb 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
@@ -159,7 +159,9 @@ public class IQv2IntegrationTest {
             Materialized.as(STORE_NAME)
         );
 
-        kafkaStreams = new KafkaStreams(builder.build(), 
streamsConfiguration(testInfo));
+
+        final String safeTestName = 
IntegrationTestUtils.safeUniqueTestName(testInfo);
+        kafkaStreams = new KafkaStreams(builder.build(), 
streamsConfiguration(safeTestName));
         kafkaStreams.cleanUp();
     }
 
@@ -420,7 +422,8 @@ public class IQv2IntegrationTest {
 
         // Discard the basic streams and replace with test-specific topology
         kafkaStreams.close();
-        kafkaStreams = new KafkaStreams(builder.build(), 
streamsConfiguration(testInfo));
+        final String safeTestName = 
IntegrationTestUtils.safeUniqueTestName(testInfo);
+        kafkaStreams = new KafkaStreams(builder.build(), 
streamsConfiguration(safeTestName));
         kafkaStreams.cleanUp();
 
         kafkaStreams.start();
@@ -438,9 +441,7 @@ public class IQv2IntegrationTest {
     }
 
 
-    private Properties streamsConfiguration(final TestInfo testInfo) {
-        final String safeTestName = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testInfo);
-
+    private Properties streamsConfiguration(final String safeTestName) {
         final Properties config = new Properties();
         config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
index b4861882cab..6fa378cb4a7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
@@ -118,7 +118,7 @@ public class JoinGracePeriodDurabilityIntegrationTest {
     @Test
     @SuppressWarnings("deprecation")
     public void shouldRecoverBufferAfterShutdown() {
-        final String testId = safeUniqueTestName(getClass(), testName);
+        final String testId = safeUniqueTestName(testName);
         final String appId = "appId_" + testId;
         final String streamInput = "Streaminput" + testId;
         final String tableInput = "Tableinput" + testId;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index cf1732b16be..887bf416da3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -96,9 +96,9 @@ public class KStreamAggregationDedupIntegrationTest {
     @BeforeEach
     public void before(final TestInfo testInfo) throws InterruptedException {
         builder = new StreamsBuilder();
-        createTopics(testInfo);
+        final String safeTestName = safeUniqueTestName(testInfo);
+        createTopics(safeTestName);
         streamsConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
@@ -234,8 +234,7 @@ public class KStreamAggregationDedupIntegrationTest {
     }
 
 
-    private void createTopics(final TestInfo testInfo) throws 
InterruptedException {
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+    private void createTopics(final String safeTestName) throws 
InterruptedException {
         streamOneInput = "stream-one-" + safeTestName;
         outputTopic = "output-" + safeTestName;
         CLUSTER.createTopic(streamOneInput, 3, 1);
@@ -254,7 +253,7 @@ public class KStreamAggregationDedupIntegrationTest {
                                                  final TestInfo testInfo)
             throws Exception {
 
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+        final String safeTestName = safeUniqueTestName(testInfo);
         final Properties consumerProperties = new Properties();
         
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"group-" + safeTestName);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 27f09293872..03877c68595 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -132,9 +132,9 @@ public class KStreamAggregationIntegrationTest {
     @BeforeEach
     public void before(final TestInfo testInfo) throws InterruptedException {
         builder = new StreamsBuilder();
-        createTopics(testInfo);
+        final String safeTestName = safeUniqueTestName(testInfo);
+        createTopics(safeTestName);
         streamsConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
@@ -1041,8 +1041,7 @@ public class KStreamAggregationIntegrationTest {
     }
 
 
-    private void createTopics(final TestInfo testInfo) throws 
InterruptedException {
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+    private void createTopics(final String safeTestName) throws 
InterruptedException {
         streamOneInput = "stream-one-" + safeTestName;
         outputTopic = "output-" + safeTestName;
         userSessionsStream = "user-sessions-" + safeTestName;
@@ -1071,7 +1070,7 @@ public class KStreamAggregationIntegrationTest {
                                                                  final 
TestInfo testInfo)
             throws Exception {
 
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+        final String safeTestName = safeUniqueTestName(testInfo);
         final Properties consumerProperties = new Properties();
         
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"group-" + safeTestName);
@@ -1095,7 +1094,7 @@ public class KStreamAggregationIntegrationTest {
                                                                               
final Class innerClass,
                                                                               
final int numMessages,
                                                                               
final TestInfo testInfo) throws Exception {
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+        final String safeTestName = safeUniqueTestName(testInfo);
         final Properties consumerProperties = new Properties();
         
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"group-" + safeTestName);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
index c6cb077e6c0..1d9a77b5bf4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKStreamIntegrationTest.java
@@ -96,7 +96,7 @@ public class KStreamKStreamIntegrationTest {
     @BeforeEach
     public void before(final TestInfo testInfo) throws IOException {
         final String stateDirBasePath = TestUtils.tempDirectory().getPath();
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+        final String safeTestName = safeUniqueTestName(testInfo);
         streamsConfig = getStreamsConfig(safeTestName);
         streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index 61f9cde5974..79e62c48ef1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -135,7 +135,7 @@ public class KStreamRepartitionIntegrationTest {
         streamsConfiguration = new Properties();
         kafkaStreamsInstances = new ArrayList<>();
 
-        safeTestName = safeUniqueTestName(getClass(), testName);
+        safeTestName = safeUniqueTestName(testName);
 
         topicB = "topic-b-" + safeTestName;
         inputTopic = "input-topic-" + safeTestName;
@@ -890,7 +890,7 @@ public class KStreamRepartitionIntegrationTest {
                                                  final List<KeyValue<K, V>> 
expectedRecords,
                                                  final String outputTopic) 
throws Exception {
 
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(testName);
         final Properties consumerProperties = new Properties();
         
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"group-" + safeTestName);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
index 12a5056730d..a792717c1d3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java
@@ -152,7 +152,7 @@ public class 
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest {
     @BeforeEach
     public void before(final TestInfo testInfo) throws IOException {
         final String stateDirBasePath = TestUtils.tempDirectory().getPath();
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+        final String safeTestName = safeUniqueTestName(testInfo);
         streamsConfig = getStreamsConfig(safeTestName);
         streamsConfigTwo = getStreamsConfig(safeTestName);
         streamsConfigThree = getStreamsConfig(safeTestName);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
index d775c9e54c9..a6d5b1a70f2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
@@ -126,8 +126,7 @@ public class KTableKTableForeignKeyJoinDistributedTest {
         quietlyCleanStateAfterTest(CLUSTER, client2);
     }
 
-    public Properties getStreamsConfiguration(final TestInfo testInfo) {
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+    public Properties getStreamsConfiguration(final String safeTestName) {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
@@ -158,8 +157,9 @@ public class KTableKTableForeignKeyJoinDistributedTest {
 
     @Test
     public void shouldBeInitializedWithDefaultSerde(final TestInfo testInfo) 
throws Exception {
-        final Properties streamsConfiguration1 = 
getStreamsConfiguration(testInfo);
-        final Properties streamsConfiguration2 = 
getStreamsConfiguration(testInfo);
+        final String safeTestName = safeUniqueTestName(testInfo);
+        final Properties streamsConfiguration1 = 
getStreamsConfiguration(safeTestName);
+        final Properties streamsConfiguration2 = 
getStreamsConfiguration(safeTestName);
 
         //Each streams client needs to have it's own StreamsBuilder in order 
to simulate
         //a truly distributed run
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
index 2a36556c99f..6655f697bb7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinMaterializationIntegrationTest.java
@@ -54,7 +54,6 @@ import static java.util.Collections.emptyMap;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
-import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
@@ -82,7 +81,6 @@ public class 
KTableKTableForeignKeyJoinMaterializationIntegrationTest {
 
     @Before
     public void before() {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
         streamsConfig = mkProperties(mkMap(
             mkEntry(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath())
         ));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index cefc48d0161..d16aa2f0254 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -98,10 +98,11 @@ public class KTableSourceTopicRestartIntegrationTest {
 
     @BeforeEach
     public void before(final TestInfo testInfo) throws Exception {
-        sourceTopic = SOURCE_TOPIC + "-" + 
IntegrationTestUtils.safeUniqueTestName(getClass(), testInfo);
+        final String safeTestName = 
IntegrationTestUtils.safeUniqueTestName(testInfo);
+        sourceTopic = SOURCE_TOPIC + "-" + safeTestName;
         CLUSTER.createTopic(sourceTopic);
 
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, 
IntegrationTestUtils.safeUniqueTestName(getClass(), testInfo));
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, safeTestName);
 
         final KTable<String, String> kTable = 
streamsBuilder.table(sourceTopic, Materialized.as("store"));
         kTable.toStream().foreach(readKeyValues::put);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
index 00da6a89e18..0a7feba5354 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
@@ -102,12 +102,14 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
     public void before() throws Exception {
         mockTime = CLUSTER.time;
 
-        final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        final String appID = IntegrationTestUtils.safeUniqueTestName(testName);
 
         commonClientConfig = new Properties();
         commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
 
         streamsConfig = new Properties();
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
"someGroupInstance");
         streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
         streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
         streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
@@ -153,19 +155,14 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
 
     @Test
     public void testCloseOptions() throws Exception {
-        final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
-        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
-        streamsConfig.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, 
"someGroupInstance");
         // Test with two threads to show that each of the threads is being 
called to remove clients from the CG.
         streamsConfig.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
-
-        // RUN
         streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
         IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
         
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig, 
OUTPUT_TOPIC, 10);
 
         streams.close(new 
CloseOptions().leaveGroup(true).timeout(Duration.ofSeconds(30)));
-        waitForEmptyConsumerGroup(adminClient, appID, 0);
+        waitForEmptyConsumerGroup(adminClient, 
streamsConfig.getProperty(StreamsConfig.APPLICATION_ID_CONFIG), 0);
     }
 
     protected Topology setupTopologyWithoutIntermediateUserTopic() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
index d2016901888..0d7be9b8e14 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
@@ -100,7 +100,7 @@ public class LagFetchIntegrationTest {
 
     @BeforeEach
     public void before(final TestInfo testInfo) {
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+        final String safeTestName = safeUniqueTestName(testInfo);
         inputTopicName = "input-topic-" + safeTestName;
         outputTopicName = "output-topic-" + safeTestName;
         stateStoreName = "lagfetch-test-store" + safeTestName;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index a577a29c6ba..663fb22c5b2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -245,7 +245,7 @@ public class MetricsIntegrationTest {
         builder = new StreamsBuilder();
         CLUSTER.createTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, 
STREAM_OUTPUT_3, STREAM_OUTPUT_4);
 
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+        final String safeTestName = safeUniqueTestName(testInfo);
         appId = "app-" + safeTestName;
 
         streamsConfiguration = new Properties();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
index bb67dddcf89..9f95d0a0823 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
@@ -73,7 +73,7 @@ public class MetricsReporterIntegrationTest {
     public void before(final TestInfo testInfo) throws InterruptedException {
         builder = new StreamsBuilder();
 
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+        final String safeTestName = safeUniqueTestName(testInfo);
         final String appId = "app-" + safeTestName;
 
         streamsConfiguration = new Properties();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
index e6cd96b26cc..1dede6617c5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
@@ -217,7 +217,7 @@ public class NamedTopologyIntegrationTest {
 
     @BeforeEach
     public void setup(final TestInfo testInfo) throws Exception {
-        appId = safeUniqueTestName(NamedTopologyIntegrationTest.class, 
testInfo);
+        appId = safeUniqueTestName(testInfo);
         changelog1 = TOPIC_PREFIX + "-" + TOPOLOGY_1 + "-store-changelog";
         changelog2 = TOPIC_PREFIX + "-" + TOPOLOGY_2 + "-store-changelog";
         changelog3 = TOPIC_PREFIX + "-" + TOPOLOGY_3 + "-store-changelog";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
index be30c447f8c..5d1a46f89e3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
@@ -116,8 +116,9 @@ public class OptimizedKTableIntegrationTest {
             .toStream()
             .peek((k, v) -> semaphore.release());
 
-        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration(testInfo));
-        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration(testInfo));
+        final String safeTestName = safeUniqueTestName(testInfo);
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration(safeTestName));
+        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration(safeTestName));
         final List<KafkaStreams> kafkaStreamsList = 
Arrays.asList(kafkaStreams1, kafkaStreams2);
 
         try {
@@ -199,8 +200,7 @@ public class OptimizedKTableIntegrationTest {
         return streams;
     }
 
-    private Properties streamsConfiguration(final TestInfo testInfo) {
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+    private Properties streamsConfiguration(final String safeTestName) {
         final Properties config = new Properties();
         config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
index 5c3f8aa90d0..3f4412256b7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/PauseResumeIntegrationTest.java
@@ -125,7 +125,7 @@ public class PauseResumeIntegrationTest {
     @BeforeEach
     public void createTopics(final TestInfo testInfo) throws 
InterruptedException {
         cleanStateBeforeTest(CLUSTER, 1, INPUT_STREAM_1, INPUT_STREAM_2, 
OUTPUT_STREAM_1, OUTPUT_STREAM_2);
-        appId = safeUniqueTestName(PauseResumeIntegrationTest.class, testInfo);
+        appId = safeUniqueTestName(testInfo);
     }
 
     private Properties props(final boolean stateUpdaterEnabled) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 97816ebdc1b..df87793420b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -160,8 +160,7 @@ public class QueryableStateIntegrationTest {
     private Comparator<KeyValue<String, String>> stringComparator;
     private Comparator<KeyValue<String, Long>> stringLongComparator;
 
-    private void createTopics(final TestInfo testInfo) throws Exception {
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
+    private void createTopics(final String safeTestName) throws Exception {
         streamOne = streamOne + "-" + safeTestName;
         streamConcurrent = streamConcurrent + "-" + safeTestName;
         streamThree = streamThree + "-" + safeTestName;
@@ -214,9 +213,9 @@ public class QueryableStateIntegrationTest {
 
     @BeforeEach
     public void before(final TestInfo testInfo) throws Exception {
-        createTopics(testInfo);
+        final String safeTestName = safeUniqueTestName(testInfo);
+        createTopics(safeTestName);
         streamsConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
 
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
@@ -444,7 +443,7 @@ public class QueryableStateIntegrationTest {
 
     @Test
     public void shouldRejectNonExistentStoreName(final TestInfo testInfo) 
throws InterruptedException {
-        final String uniqueTestName = safeUniqueTestName(getClass(), testInfo);
+        final String uniqueTestName = safeUniqueTestName(testInfo);
         final String input = uniqueTestName + "-input";
         final String storeName = uniqueTestName + "-input-table";
 
@@ -458,7 +457,7 @@ public class QueryableStateIntegrationTest {
         );
 
         final Properties properties = mkProperties(mkMap(
-            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, 
safeUniqueTestName(getClass(), testInfo)),
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, uniqueTestName),
             mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers())
         ));
 
@@ -482,7 +481,7 @@ public class QueryableStateIntegrationTest {
 
     @Test
     public void shouldRejectWronglyTypedStore(final TestInfo testInfo) throws 
InterruptedException {
-        final String uniqueTestName = safeUniqueTestName(getClass(), testInfo);
+        final String uniqueTestName = safeUniqueTestName(testInfo);
         final String input = uniqueTestName + "-input";
         final String storeName = uniqueTestName + "-input-table";
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index e5f3080f8bb..cb5a3da1631 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -137,7 +137,7 @@ public class RegexSourceIntegrationTest {
         properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
 
         streamsConfiguration = StreamsTestUtils.getStreamsConfig(
-            
IntegrationTestUtils.safeUniqueTestName(RegexSourceIntegrationTest.class, 
testInfo),
+            IntegrationTestUtils.safeUniqueTestName(testInfo),
             CLUSTER.bootstrapServers(),
             STRING_SERDE_CLASSNAME,
             STRING_SERDE_CLASSNAME,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 7a851c63bfb..fe83e9d2b5f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -101,7 +101,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
 
     @Test
     public void shouldNotAllowToResetWhileStreamsIsRunning() {
-        final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        final String appID = IntegrationTestUtils.safeUniqueTestName(testName);
         final String[] parameters = new String[] {
             "--application-id", appID,
             "--bootstrap-server", cluster.bootstrapServers(),
@@ -125,7 +125,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
 
     @Test
     public void shouldNotAllowToResetWhenInputTopicAbsent() {
-        final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        final String appID = IntegrationTestUtils.safeUniqueTestName(testName);
         final String[] parameters = new String[] {
             "--application-id", appID,
             "--bootstrap-server", cluster.bootstrapServers(),
@@ -141,7 +141,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
 
     @Test
     public void shouldNotAllowToResetWhenIntermediateTopicAbsent() {
-        final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        final String appID = IntegrationTestUtils.safeUniqueTestName(testName);
         final String[] parameters = new String[] {
             "--application-id", appID,
             "--bootstrap-server", cluster.bootstrapServers(),
@@ -157,7 +157,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
 
     @Test
     public void shouldNotAllowToResetWhenSpecifiedInternalTopicDoesNotExist() {
-        final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        final String appID = IntegrationTestUtils.safeUniqueTestName(testName);
         final String[] parameters = new String[] {
             "--application-id", appID,
             "--bootstrap-server", cluster.bootstrapServers(),
@@ -173,7 +173,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
 
     @Test
     public void shouldNotAllowToResetWhenSpecifiedInternalTopicIsNotInternal() 
{
-        final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        final String appID = IntegrationTestUtils.safeUniqueTestName(testName);
         final String[] parameters = new String[] {
             "--application-id", appID,
             "--bootstrap-server", cluster.bootstrapServers(),
@@ -189,7 +189,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
 
     @Test
     public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() 
throws Exception {
-        final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        final String appID = IntegrationTestUtils.safeUniqueTestName(testName);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
         streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
Integer.toString(STREAMS_CONSUMER_TIMEOUT * 100));
 
@@ -225,7 +225,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
 
     @Test
     public void 
testReprocessingFromFileAfterResetWithoutIntermediateUserTopic() throws 
Exception {
-        final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        final String appID = IntegrationTestUtils.safeUniqueTestName(testName);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
         // RUN
@@ -266,7 +266,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
 
     @Test
     public void 
testReprocessingFromDateTimeAfterResetWithoutIntermediateUserTopic() throws 
Exception {
-        final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        final String appID = IntegrationTestUtils.safeUniqueTestName(testName);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
         // RUN
@@ -311,7 +311,7 @@ public class ResetIntegrationTest extends 
AbstractResetIntegrationTest {
 
     @Test
     public void 
testReprocessingByDurationAfterResetWithoutIntermediateUserTopic() throws 
Exception {
-        final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+        final String appID = IntegrationTestUtils.safeUniqueTestName(testName);
         streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
 
         // RUN
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
index 7fe905ae7d4..40c03e6d891 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
@@ -112,7 +112,7 @@ public class ResetPartitionTimeIntegrationTest {
 
     @Test
     public void shouldPreservePartitionTimeOnKafkaStreamRestart() {
-        final String appId = "app-" + safeUniqueTestName(getClass(), testName);
+        final String appId = "app-" + safeUniqueTestName(testName);
         final String input = "input";
         final String outputRaw = "output-raw";
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index f61092106e7..812da300747 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -139,7 +139,7 @@ public class RestoreIntegrationTest {
 
     @BeforeEach
     public void createTopics(final TestInfo testInfo) throws 
InterruptedException {
-        appId = safeUniqueTestName(RestoreIntegrationTest.class, testInfo);
+        appId = safeUniqueTestName(testInfo);
         inputStream = appId + "-input-stream";
         CLUSTER.createTopic(inputStream, 2, 1);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 02d34b51fb7..706b27c4063 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -193,7 +193,7 @@ public class RocksDBMetricsIntegrationTest {
 
     private Properties streamsConfig() {
         final Properties streamsConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(testName);
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"test-application-" + safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
index ff8cd3b8339..21bcb609541 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.java
@@ -79,17 +79,19 @@ public class SelfJoinUpgradeIntegrationTest {
     @Rule
     public TestName testName = new TestName();
 
+    private String safeTestName;
+
     @Before
     public void createTopics() throws Exception {
-        inputTopic = INPUT_TOPIC + safeUniqueTestName(getClass(), testName);
-        outputTopic = OUTPUT_TOPIC + safeUniqueTestName(getClass(), testName);
+        safeTestName = safeUniqueTestName(testName);
+        inputTopic = INPUT_TOPIC + safeTestName;
+        outputTopic = OUTPUT_TOPIC + safeTestName;
         CLUSTER.createTopic(inputTopic);
         CLUSTER.createTopic(outputTopic);
     }
 
     private Properties props() {
         final Properties streamsConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
@@ -113,7 +115,6 @@ public class SelfJoinUpgradeIntegrationTest {
 
 
     @Test
-    @SuppressWarnings("unchecked")
     public void shouldUpgradeWithTopologyOptimizationOff() throws Exception {
 
         final StreamsBuilder streamsBuilderOld = new StreamsBuilder();
@@ -127,20 +128,30 @@ public class SelfJoinUpgradeIntegrationTest {
         );
         joinedOld.to(outputTopic, Produced.with(Serdes.String(), 
Serdes.String()));
 
-
+        final String safeTestName = safeUniqueTestName(testName);
         final Properties props = props();
         props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
         kafkaStreams = new KafkaStreams(streamsBuilderOld.build(), props);
         kafkaStreams.start();
 
         final long currentTime = CLUSTER.time.milliseconds();
-        processKeyValueAndVerifyCount("1", "A", currentTime + 42L, asList(
-            new KeyValueTimestamp<String, String>("1", "AA", currentTime + 
42L)));
+        processKeyValueAndVerifyCount(
+            "1",
+            "A",
+            currentTime + 42L,
+            asList(new KeyValueTimestamp<>("1", "AA", currentTime + 42L))
+        );
 
-        processKeyValueAndVerifyCount("1", "B", currentTime + 43L, asList(
-            new KeyValueTimestamp("1", "BA", currentTime + 43L),
-            new KeyValueTimestamp("1", "AB", currentTime + 43L),
-            new KeyValueTimestamp("1", "BB", currentTime + 43L)));
+        processKeyValueAndVerifyCount(
+            "1",
+            "B",
+            currentTime + 43L,
+            asList(
+                new KeyValueTimestamp<>("1", "BA", currentTime + 43L),
+                new KeyValueTimestamp<>("1", "AB", currentTime + 43L),
+                new KeyValueTimestamp<>("1", "BB", currentTime + 43L)
+            )
+        );
 
 
         kafkaStreams.close();
@@ -152,19 +163,23 @@ public class SelfJoinUpgradeIntegrationTest {
 
         final long currentTimeNew = CLUSTER.time.milliseconds();
 
-        processKeyValueAndVerifyCount("1", "C", currentTimeNew + 44L, asList(
-            new KeyValueTimestamp("1", "CA", currentTimeNew + 44L),
-            new KeyValueTimestamp("1", "CB", currentTimeNew + 44L),
-            new KeyValueTimestamp("1", "AC", currentTimeNew + 44L),
-            new KeyValueTimestamp("1", "BC", currentTimeNew + 44L),
-            new KeyValueTimestamp("1", "CC", currentTimeNew + 44L)));
-
+        processKeyValueAndVerifyCount(
+            "1",
+            "C",
+            currentTimeNew + 44L,
+            asList(
+                new KeyValueTimestamp<>("1", "CA", currentTimeNew + 44L),
+                new KeyValueTimestamp<>("1", "CB", currentTimeNew + 44L),
+                new KeyValueTimestamp<>("1", "AC", currentTimeNew + 44L),
+                new KeyValueTimestamp<>("1", "BC", currentTimeNew + 44L),
+                new KeyValueTimestamp<>("1", "CC", currentTimeNew + 44L)
+            )
+        );
 
         kafkaStreams.close();
     }
 
     @Test
-    @SuppressWarnings("unchecked")
     public void shouldRestartWithTopologyOptimizationOn() throws Exception {
 
         final StreamsBuilder streamsBuilderOld = new StreamsBuilder();
@@ -185,14 +200,23 @@ public class SelfJoinUpgradeIntegrationTest {
         kafkaStreams.start();
 
         final long currentTime = CLUSTER.time.milliseconds();
-        processKeyValueAndVerifyCount("1", "A", currentTime + 42L, asList(
-            new KeyValueTimestamp("1", "AA", currentTime + 42L)));
-
-        processKeyValueAndVerifyCount("1", "B", currentTime + 43L, asList(
-            new KeyValueTimestamp("1", "BA", currentTime + 43L),
-            new KeyValueTimestamp("1", "AB", currentTime + 43L),
-            new KeyValueTimestamp("1", "BB", currentTime + 43L)));
+        processKeyValueAndVerifyCount(
+            "1",
+            "A",
+            currentTime + 42L,
+            asList(new KeyValueTimestamp<>("1", "AA", currentTime + 42L))
+        );
 
+        processKeyValueAndVerifyCount(
+            "1",
+            "B",
+            currentTime + 43L,
+            asList(
+                new KeyValueTimestamp<>("1", "BA", currentTime + 43L),
+                new KeyValueTimestamp<>("1", "AB", currentTime + 43L),
+                new KeyValueTimestamp<>("1", "BB", currentTime + 43L)
+            )
+        );
 
         kafkaStreams.close();
         kafkaStreams = null;
@@ -203,12 +227,18 @@ public class SelfJoinUpgradeIntegrationTest {
 
         final long currentTimeNew = CLUSTER.time.milliseconds();
 
-        processKeyValueAndVerifyCount("1", "C", currentTimeNew + 44L, asList(
-            new KeyValueTimestamp("1", "CA", currentTimeNew + 44L),
-            new KeyValueTimestamp("1", "CB", currentTimeNew + 44L),
-            new KeyValueTimestamp("1", "AC", currentTimeNew + 44L),
-            new KeyValueTimestamp("1", "BC", currentTimeNew + 44L),
-            new KeyValueTimestamp("1", "CC", currentTimeNew + 44L)));
+        processKeyValueAndVerifyCount(
+            "1",
+            "C",
+            currentTimeNew + 44L,
+            asList(
+                new KeyValueTimestamp<>("1", "CA", currentTimeNew + 44L),
+                new KeyValueTimestamp<>("1", "CB", currentTimeNew + 44L),
+                new KeyValueTimestamp<>("1", "AC", currentTimeNew + 44L),
+                new KeyValueTimestamp<>("1", "BC", currentTimeNew + 44L),
+                new KeyValueTimestamp<>("1", "CC", currentTimeNew + 44L)
+            )
+        );
 
         kafkaStreams.close();
     }
@@ -230,7 +260,6 @@ public class SelfJoinUpgradeIntegrationTest {
             timestamp);
 
 
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
         final Properties consumerProperties = new Properties();
         
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"group-" + safeTestName);
@@ -251,6 +280,4 @@ public class SelfJoinUpgradeIntegrationTest {
 
         return actual.equals(expected);
     }
-
-
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
index 395ef6efd66..6dcbd80384a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SlidingWindowedKStreamIntegrationTest.java
@@ -121,6 +121,8 @@ public class SlidingWindowedKStreamIntegrationTest {
     private EmitStrategy emitStrategy;
     private boolean emitFinal;
 
+    private String safeTestName;
+
     @Parameterized.Parameters(name = "{0}_cache:{1}")
     public static Collection<Object[]> getEmitStrategy() {
         return asList(new Object[][] {
@@ -134,9 +136,9 @@ public class SlidingWindowedKStreamIntegrationTest {
     @Before
     public void before() throws InterruptedException {
         builder = new StreamsBuilder();
+        safeTestName = safeUniqueTestName(testName);
         createTopics();
         streamsConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
@@ -445,7 +447,6 @@ public class SlidingWindowedKStreamIntegrationTest {
     }
 
     private void createTopics() throws InterruptedException {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
         streamOneInput = "stream-one-" + safeTestName;
         streamTwoInput = "stream-two-" + safeTestName;
         outputTopic = "output-" + safeTestName;
@@ -464,7 +465,6 @@ public class SlidingWindowedKStreamIntegrationTest {
                                                                               
final long windowSize,
                                                                               
final Class innerClass,
                                                                               
final int numMessages) throws Exception {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
         final Properties consumerProperties = new Properties();
         
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"group-" + safeTestName);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
index 95eab415c33..2ecb24b94cd 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.Timeout;
@@ -54,12 +55,19 @@ public class StandbyTaskCreationIntegrationTest {
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
+    private String safeTestName;
+
     @BeforeAll
     public static void startCluster() throws IOException, InterruptedException 
{
         CLUSTER.start();
         CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
     }
 
+    @BeforeEach
+    public void setUp(final TestInfo testInfo) {
+        safeTestName = safeUniqueTestName(testInfo);
+    }
+
     @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
@@ -79,7 +87,6 @@ public class StandbyTaskCreationIntegrationTest {
     }
 
     private Properties streamsConfiguration(final TestInfo testInfo) {
-        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
index eaa24e8005a..bac33419819 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
@@ -121,7 +121,7 @@ public class StandbyTaskEOSIntegrationTest {
 
     @Before
     public void createTopics() throws Exception {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(testName);
         appId = "app-" + safeTestName;
         inputTopic = "input-" + safeTestName;
         outputTopic = "output-" + safeTestName;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
index 1515debe845..69466197bca 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
@@ -78,7 +78,7 @@ public class StateDirectoryIntegrationTest {
 
     @Test
     public void testCleanUpStateDirIfEmpty() throws InterruptedException {
-        final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+        final String uniqueTestName = safeUniqueTestName(testName);
 
         // Create Topic
         final String input = uniqueTestName + "-input";
@@ -184,7 +184,7 @@ public class StateDirectoryIntegrationTest {
 
     @Test
     public void testNotCleanUpStateDirIfNotEmpty() throws InterruptedException 
{
-        final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+        final String uniqueTestName = safeUniqueTestName(testName);
 
         // Create Topic
         final String input = uniqueTestName + "-input";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
index 8b5b1ebd62a..e20f7168059 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
@@ -112,7 +112,7 @@ public class StoreQueryIntegrationTest {
 
     @BeforeEach
     public void before(final TestInfo testInfo) throws InterruptedException, 
IOException {
-        this.appId = safeUniqueTestName(getClass(), testInfo);
+        this.appId = safeUniqueTestName(testInfo);
     }
 
     @AfterEach
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
index 57d5bcad162..c6ed805ac4c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
@@ -85,13 +85,13 @@ public class StoreUpgradeIntegrationTest {
 
     @Before
     public void createTopics() throws Exception {
-        inputStream = "input-stream-" + safeUniqueTestName(getClass(), 
testName);
+        inputStream = "input-stream-" + safeUniqueTestName(testName);
         CLUSTER.createTopic(inputStream);
     }
 
     private Properties props() {
         final Properties streamsConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(testName);
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
index 282ceeed0ec..073e80a7287 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinTopologyOptimizationIntegrationTest.java
@@ -110,7 +110,7 @@ public class 
StreamTableJoinTopologyOptimizationIntegrationTest {
     public void before() throws InterruptedException {
         streamsConfiguration = new Properties();
 
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(testName);
 
         tableTopic = "table-topic" + safeTestName;
         inputTopic = "stream-topic-" + safeTestName;
@@ -241,7 +241,7 @@ public class 
StreamTableJoinTopologyOptimizationIntegrationTest {
                                                  final Deserializer<V> 
valueSerializer,
                                                  final List<KeyValue<K, V>> 
expectedRecords) throws Exception {
 
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(testName);
         final Properties consumerProperties = new Properties();
         
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"group-" + safeTestName);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 916afc201d3..63bb1a15b07 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -97,7 +97,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
     @Rule
     public final TestName testName = new TestName();
 
-    private final String testId = safeUniqueTestName(getClass(), testName);
+    private final String testId = safeUniqueTestName(testName);
     private final String appId = "appId_" + testId;
     private final String inputTopic = "input" + testId;
     private final String inputTopic2 = "input2" + testId;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index 1dc6e6a6072..d0e90e6882a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -130,7 +130,7 @@ public class SuppressionDurabilityIntegrationTest {
     @Test
     @SuppressWarnings("deprecation")
     public void shouldRecoverBufferAfterShutdown() {
-        final String testId = safeUniqueTestName(getClass(), testName);
+        final String testId = safeUniqueTestName(testName);
         final String appId = "appId_" + testId;
         final String input = "input" + testId;
         final String storeName = "counts";
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
index 7f00f9ddd48..de7ece8423d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
@@ -92,7 +92,7 @@ public class TaskAssignorIntegrationTest {
         // ensure these configurations wind up where they belong, and any 
number of future code changes
         // could break this change.
 
-        final String testId = safeUniqueTestName(getClass(), testName);
+        final String testId = safeUniqueTestName(testName);
         final String appId = "appId_" + testId;
         final String inputTopic = "input" + testId;
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
index cc2d9a0fe23..62d3758c86c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
@@ -88,7 +88,7 @@ public class TaskMetadataIntegrationTest {
 
     @Before
     public void setup() {
-        final String testId = safeUniqueTestName(getClass(), testName);
+        final String testId = safeUniqueTestName(testName);
         appId = appIdPrefix + testId;
         inputTopic = "input" + testId;
         IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
index 91aa583060c..6f5a2b09be6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.java
@@ -129,6 +129,8 @@ public class TimeWindowedKStreamIntegrationTest {
 
     private boolean emitFinal;
 
+    private String safeTestName;
+
     @Parameterized.Parameters(name = "{0}_{1}")
     public static Collection<Object[]> getEmitStrategy() {
         return asList(new Object[][] {
@@ -142,9 +144,9 @@ public class TimeWindowedKStreamIntegrationTest {
     @Before
     public void before() throws InterruptedException {
         builder = new StreamsBuilder();
+        safeTestName = safeUniqueTestName(testName);
         createTopics();
         streamsConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
@@ -477,7 +479,6 @@ public class TimeWindowedKStreamIntegrationTest {
     }
 
     private void createTopics() throws InterruptedException {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
         streamOneInput = "stream-one-" + safeTestName;
         streamTwoInput = "stream-two-" + safeTestName;
         outputTopic = "output-" + safeTestName;
@@ -496,7 +497,6 @@ public class TimeWindowedKStreamIntegrationTest {
                                                                               
final long windowSize,
                                                                               
final Class innerClass,
                                                                               
final int numMessages) throws Exception {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
         final Properties consumerProperties = new Properties();
         
consumerProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"group-" + safeTestName);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java
index 94d2644905b..15b1ae1ea80 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.java
@@ -108,7 +108,7 @@ public class VersionedKeyValueStoreIntegrationTest {
 
     @Before
     public void beforeTest() throws InterruptedException {
-        final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+        final String uniqueTestName = safeUniqueTestName(testName);
         inputStream = "input-stream-" + uniqueTestName;
         globalTableTopic = "global-table-" + uniqueTestName;
         outputStream = "output-stream-" + uniqueTestName;
@@ -203,7 +203,7 @@ public class VersionedKeyValueStoreIntegrationTest {
             1);
 
         // verify changelog topic properties
-        final String changelogTopic = 
"app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog";
+        final String changelogTopic = 
props.getProperty(StreamsConfig.APPLICATION_ID_CONFIG) + 
"-versioned-store-changelog";
         final Properties changelogTopicConfig = 
CLUSTER.getLogConfig(changelogTopic);
         assertThat(changelogTopicConfig.getProperty("cleanup.policy"), 
equalTo("compact"));
         assertThat(changelogTopicConfig.getProperty("min.compaction.lag.ms"), 
equalTo(Long.toString(HISTORY_RETENTION + 24 * 60 * 60 * 1000L)));
@@ -361,7 +361,7 @@ public class VersionedKeyValueStoreIntegrationTest {
 
     private void shouldManualUpgradeFromNonVersionedToVersioned(final Topology 
originalTopology) throws Exception {
         // build original (non-versioned) topology and start app
-        Properties props = props();
+        final Properties props = props();
         // additional property to prevent premature compaction of older record 
versions while using timestamped store
         props.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, 60_000L);
         kafkaStreams = new KafkaStreams(originalTopology, props);
@@ -407,7 +407,6 @@ public class VersionedKeyValueStoreIntegrationTest {
             .process(() -> new VersionedStoreContentCheckerProcessor(true, 
data), STORE_NAME)
             .to(outputStream, Produced.with(Serdes.Integer(), 
Serdes.Integer()));
 
-        props = props();
         kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
         kafkaStreams.start();
 
@@ -430,8 +429,8 @@ public class VersionedKeyValueStoreIntegrationTest {
     }
 
     private Properties props() {
+        final String safeTestName = safeUniqueTestName(testName);
         final Properties streamsConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index cc6c96b30f8..4f1d8d3d426 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.header.Headers;
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState;
@@ -231,8 +232,9 @@ public class IntegrationTestUtils {
      * The name is safe even for parameterized methods.
      * Used by tests not yet migrated from JUnit 4.
      */
-    public static String safeUniqueTestName(final Class<?> testClass, final 
TestName testName) {
-        return safeUniqueTestName(testClass, testName.getMethodName());
+    public static String safeUniqueTestName(final TestName testName) {
+        final String methodName = testName.getMethodName();
+        return safeUniqueTestName(methodName);
     }
 
     /**
@@ -240,21 +242,25 @@ public class IntegrationTestUtils {
      * JUnit 5 instead of a TestName from JUnit 4.
      * Used by tests migrated to JUnit 5.
      */
-    public static String safeUniqueTestName(final Class<?> testClass, final 
TestInfo testInfo) {
-        final String displayName = testInfo.getDisplayName();
+    public static String safeUniqueTestName(final TestInfo testInfo) {
         final String methodName = 
testInfo.getTestMethod().map(Method::getName).orElse("unknownMethodName");
-        final String testName = displayName.contains(methodName) ? methodName 
: methodName + displayName;
-        return safeUniqueTestName(testClass, testName);
-    }
-
-    private static String safeUniqueTestName(final Class<?> testClass, final 
String testName) {
-        return (testClass.getSimpleName() + testName)
-                .replace(':', '_')
-                .replace('.', '_')
-                .replace('[', '_')
-                .replace(']', '_')
-                .replace(' ', '_')
-                .replace('=', '_');
+        return safeUniqueTestName(methodName);
+    }
+
+    private static String safeUniqueTestName(final String testName) {
+        return sanitize(testName + Uuid.randomUuid().toString());
+    }
+
+    private static String sanitize(final String str) {
+        return str
+            // The `-` is used in Streams' thread name as a separator and some 
tests rely on this.
+            .replace('-', '_')
+            .replace(':', '_')
+            .replace('.', '_')
+            .replace('[', '_')
+            .replace(']', '_')
+            .replace(' ', '_')
+            .replace('=', '_');
     }
 
     /**
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
index a111e829f53..f85596c0cc1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableForeignKeyJoinScenarioTest.java
@@ -240,7 +240,7 @@ public class KTableKTableForeignKeyJoinScenarioTest {
 
     private void validateTopologyCanProcessData(final StreamsBuilder builder) {
         final Properties config = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(testName);
         config.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class.getName());
         config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class.getName());
         config.setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java
index 017cccf69d8..d6a8a40874b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java
@@ -85,7 +85,7 @@ public class HandlingSourceTopicDeletionIntegrationTest {
         builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), 
Serdes.String()))
             .to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), 
Serdes.String()));
 
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(testName);
         final String appId = "app-" + safeTestName;
 
         final Properties streamsConfiguration = new Properties();


Reply via email to