This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 78d4458b94e KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part
2 (#12301)
78d4458b94e is described below
commit 78d4458b94e585bc602a4ae307d3de54fcedf2af
Author: Christo Lolov <[email protected]>
AuthorDate: Wed Jan 11 10:26:48 2023 +0200
KAFKA-14003 Kafka Streams JUnit4 to JUnit5 migration part 2 (#12301)
This pull request addresses
https://issues.apache.org/jira/browse/KAFKA-14003. It is the second of a series
of pull requests which address the move of Kafka Streams tests from JUnit 4 to
JUnit 5.
Reviewer: Bruno Cadonna <[email protected]>
---
...bleForeignKeyInnerJoinMultiIntegrationTest.java | 26 +++++-----
.../KTableKTableForeignKeyJoinDistributedTest.java | 42 +++++++----------
.../integration/LagFetchIntegrationTest.java | 37 +++++++--------
.../integration/MetricsIntegrationTest.java | 40 +++++++---------
.../MetricsReporterIntegrationTest.java | 30 +++++-------
.../integration/NamedTopologyIntegrationTest.java | 32 ++++++-------
.../OptimizedKTableIntegrationTest.java | 40 +++++++---------
.../PurgeRepartitionTopicIntegrationTest.java | 24 +++++-----
.../integration/QueryableStateIntegrationTest.java | 55 ++++++++++------------
.../integration/RegexSourceIntegrationTest.java | 30 ++++++------
.../SmokeTestDriverIntegrationTest.java | 7 ++-
.../StandbyTaskCreationIntegrationTest.java | 39 +++++++--------
12 files changed, 177 insertions(+), 225 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
index 82f6ffae188..08f4fe5cfc1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java
@@ -42,14 +42,13 @@ import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.Collections;
@@ -62,12 +61,11 @@ import java.util.function.Function;
import static java.time.Duration.ofSeconds;
import static java.util.Arrays.asList;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+@Timeout(600)
@Category({IntegrationTest.class})
public class KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
private final static int NUM_BROKERS = 1;
public final static EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
@@ -88,7 +86,7 @@ public class
KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
private final static Properties PRODUCER_CONFIG_2 = new Properties();
private final static Properties PRODUCER_CONFIG_3 = new Properties();
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException, InterruptedException
{
CLUSTER.start();
//Use multiple partitions to ensure distribution of keys.
@@ -149,12 +147,12 @@ public class
KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
- @Before
+ @BeforeEach
public void before() throws IOException {
final String stateDirBasePath = TestUtils.tempDirectory().getPath();
streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, stateDirBasePath +
"-1");
@@ -162,7 +160,7 @@ public class
KTableKTableForeignKeyInnerJoinMultiIntegrationTest {
streamsConfigThree.put(StreamsConfig.STATE_DIR_CONFIG,
stateDirBasePath + "-3");
}
- @After
+ @AfterEach
public void after() throws IOException {
if (streams != null) {
streams.close();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
index c95b37ae855..ff5b67965f6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinDistributedTest.java
@@ -34,15 +34,14 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.Arrays;
@@ -55,35 +54,30 @@ import java.util.function.Predicate;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.quietlyCleanStateAfterTest;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+@Timeout(600)
@Category({IntegrationTest.class})
public class KTableKTableForeignKeyJoinDistributedTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final String LEFT_TABLE = "left_table";
private static final String RIGHT_TABLE = "right_table";
private static final String OUTPUT = "output-topic";
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException, InterruptedException
{
CLUSTER.start();
CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
private static final Properties CONSUMER_CONFIG = new Properties();
- @Rule
- public TestName testName = new TestName();
-
-
private static final String INPUT_TOPIC = "input-topic";
private KafkaStreams client1;
@@ -92,7 +86,7 @@ public class KTableKTableForeignKeyJoinDistributedTest {
private volatile boolean client1IsOk = false;
private volatile boolean client2IsOk = false;
- @Before
+ @BeforeEach
public void setupTopics() throws InterruptedException {
CLUSTER.createTopic(LEFT_TABLE, 1, 1);
CLUSTER.createTopic(RIGHT_TABLE, 1, 1);
@@ -125,7 +119,7 @@ public class KTableKTableForeignKeyJoinDistributedTest {
CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
}
- @After
+ @AfterEach
public void after() {
client1.close();
client2.close();
@@ -133,8 +127,8 @@ public class KTableKTableForeignKeyJoinDistributedTest {
quietlyCleanStateAfterTest(CLUSTER, client2);
}
- public Properties getStreamsConfiguration() {
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ public Properties getStreamsConfiguration(final TestInfo testInfo) {
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" +
safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
@@ -164,9 +158,9 @@ public class KTableKTableForeignKeyJoinDistributedTest {
}
@Test
- public void shouldBeInitializedWithDefaultSerde() throws Exception {
- final Properties streamsConfiguration1 = getStreamsConfiguration();
- final Properties streamsConfiguration2 = getStreamsConfiguration();
+ public void shouldBeInitializedWithDefaultSerde(final TestInfo testInfo)
throws Exception {
+ final Properties streamsConfiguration1 =
getStreamsConfiguration(testInfo);
+ final Properties streamsConfiguration2 =
getStreamsConfiguration(testInfo);
//Each streams client needs to have it's own StreamsBuilder in order
to simulate
//a truly distributed run
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
index 3b7251d6da1..363fed3b8e6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
@@ -39,15 +39,14 @@ import
org.apache.kafka.streams.processor.internals.StreamThread;
import
org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,20 +72,19 @@ import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+@Timeout(600)
@Category({IntegrationTest.class})
public class LagFetchIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@@ -102,12 +100,9 @@ public class LagFetchIntegrationTest {
private String outputTopicName;
private String stateStoreName;
- @Rule
- public TestName testName = new TestName();
-
- @Before
- public void before() {
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ @BeforeEach
+ public void before(final TestInfo testInfo) {
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
inputTopicName = "input-topic-" + safeTestName;
outputTopicName = "output-topic-" + safeTestName;
stateStoreName = "lagfetch-test-store" + safeTestName;
@@ -128,7 +123,7 @@ public class LagFetchIntegrationTest {
consumerConfiguration.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
LongDeserializer.class.getName());
}
- @After
+ @AfterEach
public void shutdown() throws Exception {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
}
@@ -318,7 +313,7 @@ public class LagFetchIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
.map(Path::toFile)
- .forEach(f -> assertTrue("Some state " + f + " could not be
deleted", f.delete()));
+ .forEach(f -> assertTrue(f.delete(), "Some state " + f + "
could not be deleted"));
// wait till the lag goes down to 0
final KafkaStreams restartedStreams = new
KafkaStreams(builder.build(), props);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
index 5fb5188e5f2..8821a0cebec 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
@@ -43,16 +43,14 @@ import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -65,23 +63,24 @@ import java.util.stream.Collectors;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+@Timeout(600)
@Category({IntegrationTest.class})
@SuppressWarnings("deprecation")
public class MetricsIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final int NUM_THREADS = 2;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@@ -242,15 +241,12 @@ public class MetricsIntegrationTest {
private String appId;
- @Rule
- public TestName testName = new TestName();
-
- @Before
- public void before() throws InterruptedException {
+ @BeforeEach
+ public void before(final TestInfo testInfo) throws InterruptedException {
builder = new StreamsBuilder();
CLUSTER.createTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2,
STREAM_OUTPUT_3, STREAM_OUTPUT_4);
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
appId = "app-" + safeTestName;
streamsConfiguration = new Properties();
@@ -263,7 +259,7 @@ public class MetricsIntegrationTest {
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
}
- @After
+ @AfterEach
public void after() throws InterruptedException {
CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT_1, STREAM_OUTPUT_2,
STREAM_OUTPUT_3, STREAM_OUTPUT_4);
}
@@ -752,9 +748,9 @@ public class MetricsIntegrationTest {
final List<Metric> metrics = listMetric.stream()
.filter(m -> m.metricName().name().equals(metricName))
.collect(Collectors.toList());
- Assert.assertEquals("Size of metrics of type:'" + metricName + "' must
be equal to " + numMetric + " but it's equal to " + metrics.size(), numMetric,
metrics.size());
+ assertEquals(numMetric, metrics.size(), "Size of metrics of type:'" +
metricName + "' must be equal to " + numMetric + " but it's equal to " +
metrics.size());
for (final Metric m : metrics) {
- Assert.assertNotNull("Metric:'" + m.metricName() + "' must be not
null", m.metricValue());
+ assertNotNull(m.metricValue(), "Metric:'" + m.metricName() + "'
must be not null");
}
}
}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
index e07a8886900..d9441d8ca69 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/MetricsReporterIntegrationTest.java
@@ -28,14 +28,13 @@ import
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.IntegrationTest;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.HashMap;
@@ -47,10 +46,9 @@ import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.sa
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
+@Timeout(600)
@Category({IntegrationTest.class})
public class MetricsReporterIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
@@ -62,24 +60,21 @@ public class MetricsReporterIntegrationTest {
private StreamsBuilder builder;
private Properties streamsConfiguration;
- @Rule
- public TestName testName = new TestName();
-
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
- @Before
- public void before() throws InterruptedException {
+ @BeforeEach
+ public void before(final TestInfo testInfo) throws InterruptedException {
builder = new StreamsBuilder();
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
final String appId = "app-" + safeTestName;
streamsConfiguration = new Properties();
@@ -116,7 +111,6 @@ public class MetricsReporterIntegrationTest {
@Override
public void close() {
}
-
}
@Test
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
index d1659cbc9d4..9d4f270ed38 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
@@ -65,15 +65,14 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import java.time.Duration;
import java.util.Collection;
@@ -105,10 +104,9 @@ import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
+@Timeout(600)
@Category(IntegrationTest.class)
public class NamedTopologyIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(45);
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
@@ -149,7 +147,7 @@ public class NamedTopologyIntegrationTest {
private static Properties producerConfig;
private static Properties consumerConfig;
- @BeforeClass
+ @BeforeAll
public static void initializeClusterAndStandardTopics() throws Exception {
CLUSTER.start();
@@ -165,13 +163,11 @@ public class NamedTopologyIntegrationTest {
produceToInputTopics(INPUT_STREAM_3, STANDARD_INPUT_DATA);
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
- @Rule
- public final TestName testName = new TestName();
private String appId;
private String changelog1;
private String changelog2;
@@ -219,9 +215,9 @@ public class NamedTopologyIntegrationTest {
return streamsConfiguration;
}
- @Before
- public void setup() throws Exception {
- appId = safeUniqueTestName(NamedTopologyIntegrationTest.class,
testName);
+ @BeforeEach
+ public void setup(final TestInfo testInfo) throws Exception {
+ appId = safeUniqueTestName(NamedTopologyIntegrationTest.class,
testInfo);
changelog1 = TOPIC_PREFIX + "-" + TOPOLOGY_1 + "-store-changelog";
changelog2 = TOPIC_PREFIX + "-" + TOPOLOGY_2 + "-store-changelog";
changelog3 = TOPIC_PREFIX + "-" + TOPOLOGY_3 + "-store-changelog";
@@ -246,7 +242,7 @@ public class NamedTopologyIntegrationTest {
topology2Builder2 = streams2.newNamedTopologyBuilder(TOPOLOGY_2);
}
- @After
+ @AfterEach
public void shutdown() throws Exception {
if (streams != null) {
streams.close(Duration.ofSeconds(30));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
index 2e709af0b11..5ebe2d27575 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java
@@ -55,22 +55,20 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+@Timeout(600)
@Category(IntegrationTest.class)
public class OptimizedKTableIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
private static final Logger LOG =
LoggerFactory.getLogger(OptimizedKTableIntegrationTest.class);
private static final int NUM_BROKERS = 1;
private static int port = 0;
@@ -79,29 +77,25 @@ public class OptimizedKTableIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
-
- @Rule
- public final TestName testName = new TestName();
-
private final List<KafkaStreams> streamsToCleanup = new ArrayList<>();
private final MockTime mockTime = CLUSTER.time;
- @Before
+ @BeforeEach
public void before() throws InterruptedException {
CLUSTER.createTopic(INPUT_TOPIC_NAME, 2, 1);
}
- @After
+ @AfterEach
public void after() {
for (final KafkaStreams kafkaStreams : streamsToCleanup) {
kafkaStreams.close();
@@ -109,7 +103,7 @@ public class OptimizedKTableIntegrationTest {
}
@Test
- public void shouldApplyUpdatesToStandbyStore() throws Exception {
+ public void shouldApplyUpdatesToStandbyStore(final TestInfo testInfo)
throws Exception {
final int batch1NumMessages = 100;
final int batch2NumMessages = 100;
final int key = 1;
@@ -123,8 +117,8 @@ public class OptimizedKTableIntegrationTest {
.toStream()
.peek((k, v) -> semaphore.release());
- final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration());
- final KafkaStreams kafkaStreams2 = createKafkaStreams(builder,
streamsConfiguration());
+ final KafkaStreams kafkaStreams1 = createKafkaStreams(builder,
streamsConfiguration(testInfo));
+ final KafkaStreams kafkaStreams2 = createKafkaStreams(builder,
streamsConfiguration(testInfo));
final List<KafkaStreams> kafkaStreamsList =
Arrays.asList(kafkaStreams1, kafkaStreams2);
startApplicationAndWaitUntilRunning(kafkaStreamsList,
Duration.ofSeconds(60));
@@ -201,8 +195,8 @@ public class OptimizedKTableIntegrationTest {
return streams;
}
- private Properties streamsConfiguration() {
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ private Properties streamsConfiguration(final TestInfo testInfo) {
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
final Properties config = new Properties();
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
index 26720a00215..3b4db318eea 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java
@@ -37,14 +37,13 @@ import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -55,10 +54,9 @@ import java.util.List;
import java.util.Properties;
import java.util.Set;
+@Timeout(600)
@Category({IntegrationTest.class})
public class PurgeRepartitionTopicIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
private static final String INPUT_TOPIC = "input-stream";
@@ -77,13 +75,13 @@ public class PurgeRepartitionTopicIntegrationTest {
}
});
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException, InterruptedException
{
CLUSTER.start();
CLUSTER.createTopic(INPUT_TOPIC, 1, 1);
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@@ -154,7 +152,7 @@ public class PurgeRepartitionTopicIntegrationTest {
}
}
- @Before
+ @BeforeEach
public void setup() {
// create admin client for verification
final Properties adminConfig = new Properties();
@@ -181,7 +179,7 @@ public class PurgeRepartitionTopicIntegrationTest {
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration,
time);
}
- @After
+ @AfterEach
public void shutdown() {
if (kafkaStreams != null) {
kafkaStreams.close(Duration.ofSeconds(30));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 82409af50f8..d0f04a1265e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -61,15 +61,14 @@ import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matchers;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -116,15 +115,14 @@ import static
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+@Timeout(600)
@Category({IntegrationTest.class})
@SuppressWarnings("deprecation")
public class QueryableStateIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
private static final Logger log =
LoggerFactory.getLogger(QueryableStateIntegrationTest.class);
private static final long DEFAULT_TIMEOUT_MS = 120 * 1000;
@@ -133,12 +131,12 @@ public class QueryableStateIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@@ -164,8 +162,8 @@ public class QueryableStateIntegrationTest {
private Comparator<KeyValue<String, String>> stringComparator;
private Comparator<KeyValue<String, Long>> stringLongComparator;
- private void createTopics() throws Exception {
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ private void createTopics(final TestInfo testInfo) throws Exception {
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
streamOne = streamOne + "-" + safeTestName;
streamConcurrent = streamConcurrent + "-" + safeTestName;
streamThree = streamThree + "-" + safeTestName;
@@ -216,14 +214,11 @@ public class QueryableStateIntegrationTest {
return input;
}
- @Rule
- public TestName testName = new TestName();
-
- @Before
- public void before() throws Exception {
- createTopics();
+ @BeforeEach
+ public void before(final TestInfo testInfo) throws Exception {
+ createTopics(testInfo);
streamsConfiguration = new Properties();
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" +
safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
@@ -244,7 +239,7 @@ public class QueryableStateIntegrationTest {
}
}
- @After
+ @AfterEach
public void shutdown() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close(ofSeconds(30));
@@ -450,8 +445,8 @@ public class QueryableStateIntegrationTest {
}
@Test
- public void shouldRejectNonExistentStoreName() throws InterruptedException
{
- final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+ public void shouldRejectNonExistentStoreName(final TestInfo testInfo)
throws InterruptedException {
+ final String uniqueTestName = safeUniqueTestName(getClass(), testInfo);
final String input = uniqueTestName + "-input";
final String storeName = uniqueTestName + "-input-table";
@@ -465,7 +460,7 @@ public class QueryableStateIntegrationTest {
);
final Properties properties = mkProperties(mkMap(
- mkEntry(StreamsConfig.APPLICATION_ID_CONFIG,
safeUniqueTestName(getClass(), testName)),
+ mkEntry(StreamsConfig.APPLICATION_ID_CONFIG,
safeUniqueTestName(getClass(), testInfo)),
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers())
));
@@ -488,8 +483,8 @@ public class QueryableStateIntegrationTest {
}
@Test
- public void shouldRejectWronglyTypedStore() throws InterruptedException {
- final String uniqueTestName = safeUniqueTestName(getClass(), testName);
+ public void shouldRejectWronglyTypedStore(final TestInfo testInfo) throws
InterruptedException {
+ final String uniqueTestName = safeUniqueTestName(getClass(), testInfo);
final String input = uniqueTestName + "-input";
final String storeName = uniqueTestName + "-input-table";
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 1a22c90fcf1..4dab0f3baec 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -47,15 +47,14 @@ import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
@@ -81,14 +80,13 @@ import static org.hamcrest.Matchers.greaterThan;
* End-to-end integration test based on using regex and named topics for
creating sources, using
* an embedded Kafka cluster.
*/
+@Timeout(600)
@Category({IntegrationTest.class})
public class RegexSourceIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException, InterruptedException
{
CLUSTER.start();
CLUSTER.createTopics(
@@ -104,7 +102,7 @@ public class RegexSourceIntegrationTest {
CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1);
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
@@ -129,8 +127,8 @@ public class RegexSourceIntegrationTest {
private static volatile AtomicInteger topicSuffixGenerator = new
AtomicInteger(0);
private String outputTopic;
- @Before
- public void setUp() throws InterruptedException {
+ @BeforeEach
+ public void setUp(final TestInfo testInfo) throws InterruptedException {
outputTopic = createTopic(topicSuffixGenerator.incrementAndGet());
final Properties properties = new Properties();
properties.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
@@ -141,7 +139,7 @@ public class RegexSourceIntegrationTest {
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
streamsConfiguration = StreamsTestUtils.getStreamsConfig(
-
IntegrationTestUtils.safeUniqueTestName(RegexSourceIntegrationTest.class, new
TestName()),
+
IntegrationTestUtils.safeUniqueTestName(RegexSourceIntegrationTest.class,
testInfo),
CLUSTER.bootstrapServers(),
STRING_SERDE_CLASSNAME,
STRING_SERDE_CLASSNAME,
@@ -149,7 +147,7 @@ public class RegexSourceIntegrationTest {
);
}
- @After
+ @AfterEach
public void tearDown() throws IOException {
if (streams != null) {
streams.close();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
index 9f745072ef2..ee7cbe462b3 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java
@@ -25,11 +25,10 @@ import
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.tests.SmokeTestClient;
import org.apache.kafka.streams.tests.SmokeTestDriver;
-import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@@ -42,11 +41,11 @@ import java.util.Set;
import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
import static org.apache.kafka.streams.tests.SmokeTestDriver.verify;
+import static org.junit.jupiter.api.Assertions.assertTrue;
@Timeout(600)
@Tag("integration")
public class SmokeTestDriverIntegrationTest {
-
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(3);
@BeforeAll
@@ -175,6 +174,6 @@ public class SmokeTestDriverIntegrationTest {
driver.exception().printStackTrace();
throw new AssertionError(driver.exception());
}
- Assert.assertTrue(driver.result().result(), driver.result().passed());
+ assertTrue(driver.result().passed(), driver.result().result());
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
index d59a7160040..d84d267b448 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java
@@ -34,14 +34,13 @@ import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.Properties;
@@ -49,28 +48,24 @@ import java.util.function.Predicate;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+@Timeout(600)
@Category({IntegrationTest.class})
public class StandbyTaskCreationIntegrationTest {
- @Rule
- public Timeout globalTimeout = Timeout.seconds(600);
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
- @BeforeClass
+ @BeforeAll
public static void startCluster() throws IOException, InterruptedException
{
CLUSTER.start();
CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
}
- @AfterClass
+ @AfterAll
public static void closeCluster() {
CLUSTER.stop();
}
- @Rule
- public TestName testName = new TestName();
-
private static final String INPUT_TOPIC = "input-topic";
private KafkaStreams client1;
@@ -78,14 +73,14 @@ public class StandbyTaskCreationIntegrationTest {
private volatile boolean client1IsOk = false;
private volatile boolean client2IsOk = false;
- @After
+ @AfterEach
public void after() {
client1.close();
client2.close();
}
- private Properties streamsConfiguration() {
- final String safeTestName = safeUniqueTestName(getClass(), testName);
+ private Properties streamsConfiguration(final TestInfo testInfo) {
+ final String safeTestName = safeUniqueTestName(getClass(), testInfo);
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" +
safeTestName);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
@@ -98,7 +93,7 @@ public class StandbyTaskCreationIntegrationTest {
@Test
@SuppressWarnings("deprecation")
- public void
shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled() throws
Exception {
+ public void
shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled(final TestInfo
testInfo) throws Exception {
final StreamsBuilder builder = new StreamsBuilder();
final String stateStoreName = "myTransformState";
final StoreBuilder<KeyValueStore<Integer, Integer>>
keyValueStoreBuilder =
@@ -121,7 +116,7 @@ public class StandbyTaskCreationIntegrationTest {
}, stateStoreName);
final Topology topology = builder.build();
- createClients(topology, streamsConfiguration(), topology,
streamsConfiguration());
+ createClients(topology, streamsConfiguration(testInfo), topology,
streamsConfiguration(testInfo));
setStateListenersForVerification(thread ->
thread.standbyTasks().isEmpty() && !thread.activeTasks().isEmpty());
@@ -133,10 +128,10 @@ public class StandbyTaskCreationIntegrationTest {
}
@Test
- public void
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables() throws
Exception {
- final Properties streamsConfiguration1 = streamsConfiguration();
+ public void
shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables(final TestInfo
testInfo) throws Exception {
+ final Properties streamsConfiguration1 =
streamsConfiguration(testInfo);
streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
- final Properties streamsConfiguration2 = streamsConfiguration();
+ final Properties streamsConfiguration2 =
streamsConfiguration(testInfo);
streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG,
StreamsConfig.OPTIMIZE);
final StreamsBuilder builder = new StreamsBuilder();