This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 742f928 KAFKA-8611: Refactor KStreamRepartitionIntegrationTest (#8470)
742f928 is described below
commit 742f9281d9a747c35dfa2c3cfbd0a6b19f326508
Author: Levani Kokhreidze <[email protected]>
AuthorDate: Wed Apr 15 04:18:02 2020 +0300
KAFKA-8611: Refactor KStreamRepartitionIntegrationTest (#8470)
Reviewers: Matthias J. Sax <[email protected]>
---
.../KStreamRepartitionIntegrationTest.java | 24 +++++++++++-----------
1 file changed, 12 insertions(+), 12 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
index 907d4c8..6140a25 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java
@@ -49,6 +49,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import java.io.IOException;
@@ -95,24 +96,22 @@ public class KStreamRepartitionIntegrationTest {
private Properties streamsConfiguration;
private List<KafkaStreams> kafkaStreamsInstances;
- @Parameters(name = "Optimization = {0}")
- public static Collection<Object[]> data() {
- final List<Object[]> values = new ArrayList<>();
-
- Arrays.asList(StreamsConfig.OPTIMIZE, StreamsConfig.NO_OPTIMIZATION)
- .forEach(x -> values.add(new Object[]{x}));
+ @Parameter
+ public String topologyOptimization;
- return values;
+ @Parameters(name = "Optimization = {0}")
+ public static Collection<?> topologyOptimization() {
+ return Arrays.asList(new String[][]{
+ {StreamsConfig.OPTIMIZE},
+ {StreamsConfig.NO_OPTIMIZATION}
+ });
}
- public KStreamRepartitionIntegrationTest(final String
topologyOptimization) {
+ @Before
+ public void before() throws InterruptedException {
streamsConfiguration = new Properties();
kafkaStreamsInstances = new ArrayList<>();
- streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION,
topologyOptimization);
- }
- @Before
- public void before() throws InterruptedException {
final int testNum = TEST_NUM.incrementAndGet();
topicB = "topic-b-" + testNum;
@@ -130,6 +129,7 @@ public class KStreamRepartitionIntegrationTest {
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.Integer().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.TOPOLOGY_OPTIMIZATION,
topologyOptimization);
}
@After