This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 78d4458b94e KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 
2 (#12301)
78d4458b94e is described below

commit 78d4458b94e585bc602a4ae307d3de54fcedf2af
Author: Christo Lolov <[email protected]>
AuthorDate: Wed Jan 11 10:26:48 2023 +0200

    KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (#12301)
    
    This pull request addresses 
https://issues.apache.org/jira/browse/KAFKA-14003. It is the second of a series 
of pull requests which address the move of Kafka Streams tests from JUnit 4 to 
JUnit 5.
    
    Reviewer: Bruno Cadonna <[email protected]>
---
 ...bleForeignKeyInnerJoinMultiIntegrationTest.java | 26 +++++-----
 .../KTableKTableForeignKeyJoinDistributedTest.java | 42 +++++++----------
 .../integration/LagFetchIntegrationTest.java       | 37 +++++++--------
 .../integration/MetricsIntegrationTest.java        | 40 +++++++---------
 .../MetricsReporterIntegrationTest.java            | 30 +++++-------
 .../integration/NamedTopologyIntegrationTest.java  | 32 ++++++-------
 .../OptimizedKTableIntegrationTest.java            | 40 +++++++---------
 .../PurgeRepartitionTopicIntegrationTest.java      | 24 +++++-----
 .../integration/QueryableStateIntegrationTest.java | 55 ++++++++++------------
 .../integration/RegexSourceIntegrationTest.java    | 30 ++++++------
 .../SmokeTestDriverIntegrationTest.java            |  7 ++-
 .../StandbyTaskCreationIntegrationTest.java        | 39 +++++++--------
 12 files changed, 177 insertions(+), 225 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
index 82f6ffae188..08f4fe5cfc1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
@@ -42,14 +42,13 @@ import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -62,12 +61,11 @@ import java.util.function.Function;
 import static java.time.Duration.ofSeconds;
 import static java.util.Arrays.asList;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
+@Timeout(600)
 @Category({IntegrationTest.class})
 public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     private final static int NUM_BROKERS = 1;
 
     public final static EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
@@ -88,7 +86,7 @@ public class 
KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
     private final static Properties PRODUCER_CONFIG_2 = new Properties();
     private final static Properties PRODUCER_CONFIG_3 = new Properties();
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException, InterruptedException 
{
         CLUSTER.start();
         //Use multiple partitions to ensure distribution of keys.
@@ -149,12 +147,12 @@ public class 
KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
         CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
 
-    @Before
+    @BeforeEach
     public void before() throws IOException {
         final String stateDirBasePath = TestUtils.tempDirectory().getPath();
         streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath + 
"-1");
@@ -162,7 +160,7 @@ public class 
KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
         streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG, 
stateDirBasePath + "-3");
     }
 
-    @After
+    @AfterEach
     public void after() throws IOException {
         if (streams != null) {
             streams.close();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
index c95b37ae855..ff5b67965f6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
@@ -34,15 +34,14 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -55,35 +54,30 @@ import java.util.function.Predicate;
 
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
+@Timeout(600)
 @Category({IntegrationTest.class})
 public class KTableKTableForeignKeyJoinDistributedTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
     private static final String LEFT_TABLE = "left_table";
     private static final String RIGHT_TABLE = "right_table";
     private static final String OUTPUT = "output-topic";
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException, InterruptedException 
{
         CLUSTER.start();
         CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
 
     private static final Properties CONSUMER_CONFIG = new Properties();
 
-    @Rule
-    public TestName testName = new TestName();
-
-
     private static final String INPUT_TOPIC = "input-topic";
 
     private KafkaStreams client1;
@@ -92,7 +86,7 @@ public class KTableKTableForeignKeyJoinDistributedTest {
     private volatile boolean client1IsOk = false;
     private volatile boolean client2IsOk = false;
 
-    @Before
+    @BeforeEach
     public void setupTopics() throws InterruptedException {
         CLUSTER.createTopic(LEFT_TABLE, 1, 1);
         CLUSTER.createTopic(RIGHT_TABLE, 1, 1);
@@ -125,7 +119,7 @@ public class KTableKTableForeignKeyJoinDistributedTest {
         CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
     }
 
-    @After
+    @AfterEach
     public void after() {
         client1.close();
         client2.close();
@@ -133,8 +127,8 @@ public class KTableKTableForeignKeyJoinDistributedTest {
         quietlyCleanStateAfterTest(CLUSTER, client2);
     }
 
-    public Properties getStreamsConfiguration() {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+    public Properties getStreamsConfiguration(final TestInfo testInfo) {
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
@@ -164,9 +158,9 @@ public class KTableKTableForeignKeyJoinDistributedTest {
     }
 
     @Test
-    public void shouldBeInitializedWithDefaultSerde() throws Exception {
-        final Properties streamsConfiguration1 = getStreamsConfiguration();
-        final Properties streamsConfiguration2 = getStreamsConfiguration();
+    public void shouldBeInitializedWithDefaultSerde(final TestInfo testInfo) 
throws Exception {
+        final Properties streamsConfiguration1 = 
getStreamsConfiguration(testInfo);
+        final Properties streamsConfiguration2 = 
getStreamsConfiguration(testInfo);
 
         //Each streams client needs to have it's own StreamsBuilder in order 
to simulate
         //a truly distributed run
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
index 3b7251d6da1..363fed3b8e6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
@@ -39,15 +39,14 @@ import 
org.apache.kafka.streams.processor.internals.StreamThread;
 import 
org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,20 +72,19 @@ import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@Timeout(600)
 @Category({IntegrationTest.class})
 public class LagFetchIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException {
         CLUSTER.start();
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
@@ -102,12 +100,9 @@ public class LagFetchIntegrationTest {
     private String outputTopicName;
     private String stateStoreName;
 
-    @Rule
-    public TestName testName = new TestName();
-
-    @Before
-    public void before() {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+    @BeforeEach
+    public void before(final TestInfo testInfo) {
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         inputTopicName = "input-topic-" + safeTestName;
         outputTopicName = "output-topic-" + safeTestName;
         stateStoreName = "lagfetch-test-store" + safeTestName;
@@ -128,7 +123,7 @@ public class LagFetchIntegrationTest {
         
consumerConfiguration.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
 LongDeserializer.class.getName());
     }
 
-    @After
+    @AfterEach
     public void shutdown() throws Exception {
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
     }
@@ -318,7 +313,7 @@ public class LagFetchIntegrationTest {
             IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
             Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
                 .map(Path::toFile)
-                .forEach(f -> assertTrue("Some state " + f + " could not be 
deleted", f.delete()));
+                .forEach(f -> assertTrue(f.delete(), "Some state " + f + " 
could not be deleted"));
 
             // wait till the lag goes down to 0
             final KafkaStreams restartedStreams = new 
KafkaStreams(builder.build(), props);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 5fb5188e5f2..8821a0cebec 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -43,16 +43,14 @@ import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -65,23 +63,24 @@ import java.util.stream.Collectors;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
+@Timeout(600)
 @Category({IntegrationTest.class})
 @SuppressWarnings("deprecation")
 public class MetricsIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
     private static final int NUM_THREADS = 2;
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException {
         CLUSTER.start();
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
@@ -242,15 +241,12 @@ public class MetricsIntegrationTest {
 
     private String appId;
 
-    @Rule
-    public TestName testName = new TestName();
-
-    @Before
-    public void before() throws InterruptedException {
+    @BeforeEach
+    public void before(final TestInfo testInfo) throws InterruptedException {
         builder = new StreamsBuilder();
         CLUSTER.createTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, 
STREAM_OUTPUT_3, STREAM_OUTPUT_4);
 
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         appId = "app-" + safeTestName;
 
         streamsConfiguration = new Properties();
@@ -263,7 +259,7 @@ public class MetricsIntegrationTest {
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
     }
 
-    @After
+    @AfterEach
     public void after() throws InterruptedException {
         CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2, 
STREAM_OUTPUT_3, STREAM_OUTPUT_4);
     }
@@ -752,9 +748,9 @@ public class MetricsIntegrationTest {
         final List<Metric> metrics = listMetric.stream()
             .filter(m -> m.metricName().name().equals(metricName))
             .collect(Collectors.toList());
-        Assert.assertEquals("Size of metrics of type:'" + metricName + "' must 
be equal to " + numMetric + " but it's equal to " + metrics.size(), numMetric, 
metrics.size());
+        assertEquals(numMetric, metrics.size(), "Size of metrics of type:'" + 
metricName + "' must be equal to " + numMetric + " but it's equal to " + 
metrics.size());
         for (final Metric m : metrics) {
-            Assert.assertNotNull("Metric:'" + m.metricName() + "' must be not 
null", m.metricValue());
+            assertNotNull(m.metricValue(), "Metric:'" + m.metricName() + "' 
must be not null");
         }
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
index e07a8886900..d9441d8ca69 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
@@ -28,14 +28,13 @@ import 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.test.IntegrationTest;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -47,10 +46,9 @@ import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 
+@Timeout(600)
 @Category({IntegrationTest.class})
 public class MetricsReporterIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
@@ -62,24 +60,21 @@ public class MetricsReporterIntegrationTest {
     private StreamsBuilder builder;
     private Properties streamsConfiguration;
 
-    @Rule
-    public TestName testName = new TestName();
-
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException {
         CLUSTER.start();
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
 
-    @Before
-    public void before() throws InterruptedException {
+    @BeforeEach
+    public void before(final TestInfo testInfo) throws InterruptedException {
         builder = new StreamsBuilder();
 
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         final String appId = "app-" + safeTestName;
 
         streamsConfiguration = new Properties();
@@ -116,7 +111,6 @@ public class MetricsReporterIntegrationTest {
         @Override
         public void close() {
         }
-
     }
 
     @Test
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
index d1659cbc9d4..9d4f270ed38 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
@@ -65,15 +65,14 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Queue;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 
 import java.time.Duration;
 import java.util.Collection;
@@ -105,10 +104,9 @@ import static java.util.Arrays.asList;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 
+@Timeout(600)
 @Category(IntegrationTest.class)
 public class NamedTopologyIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45);
     
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
@@ -149,7 +147,7 @@ public class NamedTopologyIntegrationTest {
     private static Properties producerConfig;
     private static Properties consumerConfig;
 
-    @BeforeClass
+    @BeforeAll
     public static void initializeClusterAndStandardTopics() throws Exception {
         CLUSTER.start();
 
@@ -165,13 +163,11 @@ public class NamedTopologyIntegrationTest {
         produceToInputTopics(INPUT_STREAM_3, STANDARD_INPUT_DATA);
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
 
-    @Rule
-    public final TestName testName = new TestName();
     private String appId;
     private String changelog1;
     private String changelog2;
@@ -219,9 +215,9 @@ public class NamedTopologyIntegrationTest {
         return streamsConfiguration;
     }
 
-    @Before
-    public void setup() throws Exception {
-        appId = safeUniqueTestName(NamedTopologyIntegrationTest.class, 
testName);
+    @BeforeEach
+    public void setup(final TestInfo testInfo) throws Exception {
+        appId = safeUniqueTestName(NamedTopologyIntegrationTest.class, 
testInfo);
         changelog1 = TOPIC_PREFIX + "-" + TOPOLOGY_1 + "-store-changelog";
         changelog2 = TOPIC_PREFIX + "-" + TOPOLOGY_2 + "-store-changelog";
         changelog3 = TOPIC_PREFIX + "-" + TOPOLOGY_3 + "-store-changelog";
@@ -246,7 +242,7 @@ public class NamedTopologyIntegrationTest {
         topology2Builder2 = streams2.newNamedTopologyBuilder(TOPOLOGY_2);
     }
 
-    @After
+    @AfterEach
     public void shutdown() throws Exception {
         if (streams != null) {
             streams.close(Duration.ofSeconds(30));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
index 2e709af0b11..5ebe2d27575 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
@@ -55,22 +55,20 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.NoRetryException;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@Timeout(600)
 @Category(IntegrationTest.class)
 public class OptimizedKTableIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     private static final Logger LOG = 
LoggerFactory.getLogger(OptimizedKTableIntegrationTest.class);
     private static final int NUM_BROKERS = 1;
     private static int port = 0;
@@ -79,29 +77,25 @@ public class OptimizedKTableIntegrationTest {
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException {
         CLUSTER.start();
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
 
-
-    @Rule
-    public final TestName testName = new TestName();
-
     private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
     private final MockTime mockTime = CLUSTER.time;
 
-    @Before
+    @BeforeEach
     public void before() throws InterruptedException {
         CLUSTER.createTopic(INPUT_TOPIC_NAME, 2, 1);
     }
 
-    @After
+    @AfterEach
     public void after() {
         for (final KafkaStreams kafkaStreams : streamsToCleanup) {
             kafkaStreams.close();
@@ -109,7 +103,7 @@ public class OptimizedKTableIntegrationTest {
     }
 
     @Test
-    public void shouldApplyUpdatesToStandbyStore() throws Exception {
+    public void shouldApplyUpdatesToStandbyStore(final TestInfo testInfo) 
throws Exception {
         final int batch1NumMessages = 100;
         final int batch2NumMessages = 100;
         final int key = 1;
@@ -123,8 +117,8 @@ public class OptimizedKTableIntegrationTest {
             .toStream()
             .peek((k, v) -> semaphore.release());
 
-        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration());
-        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration());
+        final KafkaStreams kafkaStreams1 = createKafkaStreams(builder, 
streamsConfiguration(testInfo));
+        final KafkaStreams kafkaStreams2 = createKafkaStreams(builder, 
streamsConfiguration(testInfo));
         final List<KafkaStreams> kafkaStreamsList = 
Arrays.asList(kafkaStreams1, kafkaStreams2);
 
         startApplicationAndWaitUntilRunning(kafkaStreamsList, 
Duration.ofSeconds(60));
@@ -201,8 +195,8 @@ public class OptimizedKTableIntegrationTest {
         return streams;
     }
 
-    private Properties streamsConfiguration() {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+    private Properties streamsConfiguration(final TestInfo testInfo) {
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         final Properties config = new Properties();
         config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index 26720a00215..3b4db318eea 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -37,14 +37,13 @@ import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -55,10 +54,9 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Set;
 
+@Timeout(600)
 @Category({IntegrationTest.class})
 public class PurgeRepartitionTopicIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
 
     private static final String INPUT_TOPIC = "input-stream";
@@ -77,13 +75,13 @@ public class PurgeRepartitionTopicIntegrationTest {
         }
     });
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException, InterruptedException 
{
         CLUSTER.start();
         CLUSTER.createTopic(INPUT_TOPIC, 1, 1);
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
@@ -154,7 +152,7 @@ public class PurgeRepartitionTopicIntegrationTest {
         }
     }
 
-    @Before
+    @BeforeEach
     public void setup() {
         // create admin client for verification
         final Properties adminConfig = new Properties();
@@ -181,7 +179,7 @@ public class PurgeRepartitionTopicIntegrationTest {
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration, 
time);
     }
 
-    @After
+    @AfterEach
     public void shutdown() {
         if (kafkaStreams != null) {
             kafkaStreams.close(Duration.ofSeconds(30));
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 82409af50f8..d0f04a1265e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -61,15 +61,14 @@ import org.apache.kafka.test.MockMapper;
 import org.apache.kafka.test.NoRetryException;
 import org.apache.kafka.test.TestUtils;
 import org.hamcrest.Matchers;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -116,15 +115,14 @@ import static 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
+@Timeout(600)
 @Category({IntegrationTest.class})
 @SuppressWarnings("deprecation")
 public class QueryableStateIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     private static final Logger log = 
LoggerFactory.getLogger(QueryableStateIntegrationTest.class);
 
     private static final long DEFAULT_TIMEOUT_MS = 120 * 1000;
@@ -133,12 +131,12 @@ public class QueryableStateIntegrationTest {
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException {
         CLUSTER.start();
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
@@ -164,8 +162,8 @@ public class QueryableStateIntegrationTest {
     private Comparator<KeyValue<String, String>> stringComparator;
     private Comparator<KeyValue<String, Long>> stringLongComparator;
 
-    private void createTopics() throws Exception {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+    private void createTopics(final TestInfo testInfo) throws Exception {
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         streamOne = streamOne + "-" + safeTestName;
         streamConcurrent = streamConcurrent + "-" + safeTestName;
         streamThree = streamThree + "-" + safeTestName;
@@ -216,14 +214,11 @@ public class QueryableStateIntegrationTest {
         return input;
     }
 
-    @Rule
-    public TestName testName = new TestName();
-
-    @Before
-    public void before() throws Exception {
-        createTopics();
+    @BeforeEach
+    public void before(final TestInfo testInfo) throws Exception {
+        createTopics(testInfo);
         streamsConfiguration = new Properties();
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
 
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
@@ -244,7 +239,7 @@ public class QueryableStateIntegrationTest {
         }
     }
 
-    @After
+    @AfterEach
     public void shutdown() throws Exception {
         if (kafkaStreams != null) {
             kafkaStreams.close(ofSeconds(30));
@@ -450,8 +445,8 @@ public class QueryableStateIntegrationTest {
     }
 
     @Test
-    public void shouldRejectNonExistentStoreName() throws InterruptedException 
{
-        final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+    public void shouldRejectNonExistentStoreName(final TestInfo testInfo) 
throws InterruptedException {
+        final String uniqueTestName = safeUniqueTestName(getClass(), testInfo);
         final String input = uniqueTestName + "-input";
         final String storeName = uniqueTestName + "-input-table";
 
@@ -465,7 +460,7 @@ public class QueryableStateIntegrationTest {
         );
 
         final Properties properties = mkProperties(mkMap(
-            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, 
safeUniqueTestName(getClass(), testName)),
+            mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, 
safeUniqueTestName(getClass(), testInfo)),
             mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers())
         ));
 
@@ -488,8 +483,8 @@ public class QueryableStateIntegrationTest {
     }
 
     @Test
-    public void shouldRejectWronglyTypedStore() throws InterruptedException {
-        final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+    public void shouldRejectWronglyTypedStore(final TestInfo testInfo) throws 
InterruptedException {
+        final String uniqueTestName = safeUniqueTestName(getClass(), testInfo);
         final String input = uniqueTestName + "-input";
         final String storeName = uniqueTestName + "-input-table";
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 1a22c90fcf1..4dab0f3baec 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -47,15 +47,14 @@ import org.apache.kafka.test.MockKeyValueStoreBuilder;
 import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -81,14 +80,13 @@ import static org.hamcrest.Matchers.greaterThan;
  * End-to-end integration test based on using regex and named topics for 
creating sources, using
  * an embedded Kafka cluster.
  */
+@Timeout(600)
 @Category({IntegrationTest.class})
 public class RegexSourceIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException, InterruptedException 
{
         CLUSTER.start();
         CLUSTER.createTopics(
@@ -104,7 +102,7 @@ public class RegexSourceIntegrationTest {
         CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1);
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
@@ -129,8 +127,8 @@ public class RegexSourceIntegrationTest {
     private static volatile AtomicInteger topicSuffixGenerator = new 
AtomicInteger(0);
     private String outputTopic;
 
-    @Before
-    public void setUp() throws InterruptedException {
+    @BeforeEach
+    public void setUp(final TestInfo testInfo) throws InterruptedException {
         outputTopic = createTopic(topicSuffixGenerator.incrementAndGet());
         final Properties properties = new Properties();
         properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
@@ -141,7 +139,7 @@ public class RegexSourceIntegrationTest {
         properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
 
         streamsConfiguration = StreamsTestUtils.getStreamsConfig(
-            
IntegrationTestUtils.safeUniqueTestName(RegexSourceIntegrationTest.class, new 
TestName()),
+            
IntegrationTestUtils.safeUniqueTestName(RegexSourceIntegrationTest.class, 
testInfo),
             CLUSTER.bootstrapServers(),
             STRING_SERDE_CLASSNAME,
             STRING_SERDE_CLASSNAME,
@@ -149,7 +147,7 @@ public class RegexSourceIntegrationTest {
         );
     }
 
-    @After
+    @AfterEach
     public void tearDown() throws IOException {
         if (streams != null) {
             streams.close();
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index 9f745072ef2..ee7cbe462b3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -25,11 +25,10 @@ import 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.tests.SmokeTestClient;
 import org.apache.kafka.streams.tests.SmokeTestDriver;
-import org.junit.Assert;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
@@ -42,11 +41,11 @@ import java.util.Set;
 
 import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
 import static org.apache.kafka.streams.tests.SmokeTestDriver.verify;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Timeout(600)
 @Tag("integration")
 public class SmokeTestDriverIntegrationTest {
-
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(3);
 
     @BeforeAll
@@ -175,6 +174,6 @@ public class SmokeTestDriverIntegrationTest {
             driver.exception().printStackTrace();
             throw new AssertionError(driver.exception());
         }
-        Assert.assertTrue(driver.result().result(), driver.result().passed());
+        assertTrue(driver.result().passed(), driver.result().result());
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
index d59a7160040..d84d267b448 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -34,14 +34,13 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
 
 import java.io.IOException;
 import java.util.Properties;
@@ -49,28 +48,24 @@ import java.util.function.Predicate;
 
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 
+@Timeout(600)
 @Category({IntegrationTest.class})
 public class StandbyTaskCreationIntegrationTest {
-    @Rule
-    public Timeout globalTimeout = Timeout.seconds(600);
     private static final int NUM_BROKERS = 1;
 
     public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
 
-    @BeforeClass
+    @BeforeAll
     public static void startCluster() throws IOException, InterruptedException 
{
         CLUSTER.start();
         CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
     }
 
-    @AfterClass
+    @AfterAll
     public static void closeCluster() {
         CLUSTER.stop();
     }
 
-    @Rule
-    public TestName testName = new TestName();
-
     private static final String INPUT_TOPIC = "input-topic";
 
     private KafkaStreams client1;
@@ -78,14 +73,14 @@ public class StandbyTaskCreationIntegrationTest {
     private volatile boolean client1IsOk = false;
     private volatile boolean client2IsOk = false;
 
-    @After
+    @AfterEach
     public void after() {
         client1.close();
         client2.close();
     }
 
-    private Properties streamsConfiguration() {
-        final String safeTestName = safeUniqueTestName(getClass(), testName);
+    private Properties streamsConfiguration(final TestInfo testInfo) {
+        final String safeTestName = safeUniqueTestName(getClass(), testInfo);
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
@@ -98,7 +93,7 @@ public class StandbyTaskCreationIntegrationTest {
 
     @Test
     @SuppressWarnings("deprecation")
-    public void 
shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled() throws 
Exception {
+    public void 
shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled(final TestInfo 
testInfo) throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
         final String stateStoreName = "myTransformState";
         final StoreBuilder<KeyValueStore<Integer, Integer>> 
keyValueStoreBuilder =
@@ -121,7 +116,7 @@ public class StandbyTaskCreationIntegrationTest {
             }, stateStoreName);
 
         final Topology topology = builder.build();
-        createClients(topology, streamsConfiguration(), topology, 
streamsConfiguration());
+        createClients(topology, streamsConfiguration(testInfo), topology, 
streamsConfiguration(testInfo));
 
         setStateListenersForVerification(thread -> 
thread.standbyTasks().isEmpty() && !thread.activeTasks().isEmpty());
 
@@ -133,10 +128,10 @@ public class StandbyTaskCreationIntegrationTest {
     }
 
     @Test
-    public void 
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables() throws 
Exception {
-        final Properties streamsConfiguration1 = streamsConfiguration();
+    public void 
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables(final TestInfo 
testInfo) throws Exception {
+        final Properties streamsConfiguration1 = 
streamsConfiguration(testInfo);
         streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
-        final Properties streamsConfiguration2 = streamsConfiguration();
+        final Properties streamsConfiguration2 = 
streamsConfiguration(testInfo);
         streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
 
         final StreamsBuilder builder = new StreamsBuilder();


Reply via email to