This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1854d4b8a11 KAFKA-14572: Migrate EmbeddedKafkaCluster used by Streams
integration tests from EmbeddedZookeeper to KRaft (#17016)
1854d4b8a11 is described below
commit 1854d4b8a11461b53b59fa109b95f2a4f5003997
Author: Omnia Ibrahim <[email protected]>
AuthorDate: Fri Sep 27 20:49:12 2024 +0100
KAFKA-14572: Migrate EmbeddedKafkaCluster used by Streams integration tests
from EmbeddedZookeeper to KRaft (#17016)
Migrate the EmbeddedKafkaCluster from the EmbeddedZookeeper to KRaft
Reviewers Bill Bejeck <[email protected]>
---
build.gradle | 1 +
checkstyle/import-control.xml | 2 +
.../integration/AbstractResetIntegrationTest.java | 13 +-
.../integration/AdjustStreamThreadCountTest.java | 2 +-
.../streams/integration/EosIntegrationTest.java | 3 +-
...ighAvailabilityTaskAssignorIntegrationTest.java | 21 +-
.../integration/IQv2StoreIntegrationTest.java | 2 +-
.../JoinGracePeriodDurabilityIntegrationTest.java | 11 +-
.../integration/JoinStoreIntegrationTest.java | 2 +-
.../JoinWithIncompleteMetadataIntegrationTest.java | 2 +-
.../KTableSourceTopicRestartIntegrationTest.java | 16 +-
.../KafkaStreamsCloseOptionsIntegrationTest.java | 2 +-
.../integration/NamedTopologyIntegrationTest.java | 28 +-
.../PositionRestartIntegrationTest.java | 4 +-
.../integration/QueryableStateIntegrationTest.java | 2 +-
.../integration/RangeQueryIntegrationTest.java | 2 +-
.../integration/RegexSourceIntegrationTest.java | 6 +-
.../streams/integration/ResetIntegrationTest.java | 28 +-
.../integration/ResetIntegrationWithSslTest.java | 6 +-
.../ResetPartitionTimeIntegrationTest.java | 15 +-
.../integration/RestoreIntegrationTest.java | 2 +-
.../integration/RocksDBMetricsIntegrationTest.java | 2 +-
.../integration/StandbyTaskEOSIntegrationTest.java | 2 +-
...tandbyTaskEOSMultiRebalanceIntegrationTest.java | 6 +-
.../integration/StateDirectoryIntegrationTest.java | 4 +-
...amsUncaughtExceptionHandlerIntegrationTest.java | 18 +-
.../SuppressionDurabilityIntegrationTest.java | 37 +-
.../integration/SuppressionIntegrationTest.java | 11 +-
.../integration/TaskMetadataIntegrationTest.java | 2 +-
.../integration/utils/EmbeddedKafkaCluster.java | 473 ++++++++++++---------
.../integration/utils/IntegrationTestUtils.java | 36 +-
.../streams/integration/utils/KafkaEmbedded.java | 223 ----------
...HandlingSourceTopicDeletionIntegrationTest.java | 2 +-
33 files changed, 395 insertions(+), 591 deletions(-)
diff --git a/build.gradle b/build.gradle
index 0907f0506a4..0cd97d23186 100644
--- a/build.gradle
+++ b/build.gradle
@@ -2528,6 +2528,7 @@ project(':streams') {
// testCompileOnly prevents streams from exporting a dependency on
test-utils, which would cause a dependency cycle
testCompileOnly project(':streams:test-utils')
+ testImplementation project(':metadata')
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':server')
testImplementation project(':core')
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index f7d7ef798bd..487d49f0466 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -393,6 +393,8 @@
</subpackage>
<subpackage name="integration">
+ <allow pkg="kafka.testkit"/>
+ <allow pkg="org.apache.kafka.metadata"/>
<allow pkg="kafka.admin" />
<allow pkg="kafka.api" />
<allow pkg="kafka.cluster" />
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
index b92e2ad135a..293c5f5d286 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
@@ -150,7 +150,7 @@ public abstract class AbstractResetIntegrationTest {
protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
protected static final int CLEANUP_CONSUMER_TIMEOUT = 2000;
- protected static final int TIMEOUT_MULTIPLIER = 15;
+ protected static final int TIMEOUT_MULTIPLIER = 30;
void prepareTest(final TestInfo testInfo) throws Exception {
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
@@ -159,7 +159,7 @@ public abstract class AbstractResetIntegrationTest {
waitForEmptyConsumerGroup(adminClient, appID, TIMEOUT_MULTIPLIER *
CLEANUP_CONSUMER_TIMEOUT);
- cluster.deleteAllTopicsAndWait(120000);
+ cluster.deleteAllTopics();
cluster.createTopics(INPUT_TOPIC, OUTPUT_TOPIC, OUTPUT_TOPIC_2,
OUTPUT_TOPIC_2_RERUN);
add10InputElements();
@@ -199,7 +199,7 @@ public abstract class AbstractResetIntegrationTest {
// RUN
streams = new KafkaStreams(setupTopologyWithIntermediateTopic(true,
OUTPUT_TOPIC_2), streamsConfig);
- streams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
streams.close();
@@ -272,7 +272,7 @@ public abstract class AbstractResetIntegrationTest {
// RUN
streams = new
KafkaStreams(setupTopologyWithIntermediateTopic(useRepartitioned,
OUTPUT_TOPIC_2), streamsConfig);
- streams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> result =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
// receive only first values to make sure intermediate user topic is
not consumed completely
// => required to test "seekToEnd" for intermediate topics
@@ -301,7 +301,7 @@ public abstract class AbstractResetIntegrationTest {
assertInternalTopicsGotDeleted(useRepartitioned ? null :
INTERMEDIATE_USER_TOPIC);
// RE-RUN
- streams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> resultRerun =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
final List<KeyValue<Long, Long>> resultRerun2 =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC_2_RERUN, 40);
streams.close();
@@ -323,7 +323,7 @@ public abstract class AbstractResetIntegrationTest {
cleanGlobal(!useRepartitioned, null, null, appID);
if (!useRepartitioned) {
- cluster.deleteTopicAndWait(INTERMEDIATE_USER_TOPIC);
+ cluster.deleteTopic(INTERMEDIATE_USER_TOPIC);
}
}
@@ -420,7 +420,6 @@ public abstract class AbstractResetIntegrationTest {
}
protected void assertInternalTopicsGotDeleted(final String
additionalExistingTopic) throws Exception {
- // do not use list topics request, but read from the embedded
cluster's zookeeper path directly to confirm
if (additionalExistingTopic != null) {
cluster.waitForRemainingTopics(30000, INPUT_TOPIC, OUTPUT_TOPIC,
OUTPUT_TOPIC_2, OUTPUT_TOPIC_2_RERUN,
Topic.GROUP_METADATA_TOPIC_NAME, additionalExistingTopic);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
index 4feaa2c6283..916940ce9bf 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
@@ -107,7 +107,7 @@ public class AdjustStreamThreadCountTest {
builder = new StreamsBuilder();
builder.stream(inputTopic);
- properties = mkObjectProperties(
+ properties = mkObjectProperties(
mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers()),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index b5651e889ef..2b924e03727 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -162,8 +162,7 @@ public class EosIntegrationTest {
@BeforeEach
public void createTopics() throws Exception {
applicationId = "appId-" + TEST_NUMBER.getAndIncrement();
- CLUSTER.deleteTopicsAndWait(
- 60_000L,
+ CLUSTER.deleteTopics(
SINGLE_PARTITION_INPUT_TOPIC, MULTI_PARTITION_INPUT_TOPIC,
SINGLE_PARTITION_THROUGH_TOPIC, MULTI_PARTITION_THROUGH_TOPIC,
SINGLE_PARTITION_OUTPUT_TOPIC, MULTI_PARTITION_OUTPUT_TOPIC);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
index 52a3e839c21..b50cf4ad62c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/HighAvailabilityTaskAssignorIntegrationTest.java
@@ -67,7 +67,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
-import static java.util.Arrays.asList;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
@@ -81,19 +80,11 @@ import static org.hamcrest.Matchers.is;
@Tag("integration")
public class HighAvailabilityTaskAssignorIntegrationTest {
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(3,
- new Properties(),
- asList(
- new Properties() {{
- setProperty(ServerConfigs.BROKER_RACK_CONFIG,
AssignmentTestUtils.RACK_0);
- }},
- new Properties() {{
- setProperty(ServerConfigs.BROKER_RACK_CONFIG,
AssignmentTestUtils.RACK_1);
- }},
- new Properties() {{
- setProperty(ServerConfigs.BROKER_RACK_CONFIG,
AssignmentTestUtils.RACK_2);
- }}
- )
- );
+ new Properties(), mkMap(
+ mkEntry(0, mkMap(mkEntry(ServerConfigs.BROKER_RACK_CONFIG,
AssignmentTestUtils.RACK_0))),
+ mkEntry(1, mkMap(mkEntry(ServerConfigs.BROKER_RACK_CONFIG,
AssignmentTestUtils.RACK_1))),
+ mkEntry(2, mkMap(mkEntry(ServerConfigs.BROKER_RACK_CONFIG,
AssignmentTestUtils.RACK_2)))
+ ));
@BeforeAll
public static void startCluster() throws IOException {
@@ -258,7 +249,7 @@ public class HighAvailabilityTaskAssignorIntegrationTest {
restoreCompleteLatch.await();
// We should finalize the restoration without having restored any
records (because they're already in
- // the store. Otherwise, we failed to properly re-use the state
from the standby.
+ // the store). Otherwise, we failed to properly re-use the state
from the standby.
assertThat(instance1TotalRestored.get(), is(0L));
// Belt-and-suspenders check that we never even attempt to restore
any records.
assertThat(instance1NumRestored.get(), is(-1L));
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 425f1eb207f..be31768aa4a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -379,7 +379,7 @@ public class IQv2StoreIntegrationTest {
throws InterruptedException, IOException, ExecutionException,
TimeoutException {
CLUSTER.start();
- CLUSTER.deleteAllTopicsAndWait(60 * 1000L);
+ CLUSTER.deleteAllTopics();
final int partitions = 2;
CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
index 1ac11194f5f..61cbca6e3d2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinGracePeriodDurabilityIntegrationTest.java
@@ -49,6 +49,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.time.Duration;
+import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
@@ -70,12 +71,8 @@ import static org.hamcrest.MatcherAssert.assertThat;
@Timeout(600)
public class JoinGracePeriodDurabilityIntegrationTest {
- public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(
- 3,
- mkProperties(mkMap()),
- 0L
- );
-
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(3);
+ private static final long NOW = Instant.now().toEpochMilli();
@BeforeAll
public static void startCluster() throws IOException {
CLUSTER.start();
@@ -218,7 +215,7 @@ public class JoinGracePeriodDurabilityIntegrationTest {
* just to exercise that everything works properly in the presence of
commits.
*/
private long scaledTime(final long unscaledTime) {
- return COMMIT_INTERVAL * 2 * unscaledTime;
+ return NOW + COMMIT_INTERVAL * 2 * unscaledTime;
}
private static void produceSynchronouslyToPartitionZero(final String
topic, final List<KeyValueTimestamp<String, String>> toProduce) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
index c71ac31672a..1041f323ae1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -97,7 +97,7 @@ public class JoinStoreIntegrationTest {
@AfterEach
public void cleanup() throws InterruptedException, IOException {
- CLUSTER.deleteAllTopicsAndWait(120000);
+ CLUSTER.deleteAllTopics();
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
index 58b00f81d53..c3059011cbf 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
@@ -84,7 +84,7 @@ public class JoinWithIncompleteMetadataIntegrationTest {
@AfterEach
public void cleanup() throws InterruptedException, IOException {
- CLUSTER.deleteAllTopicsAndWait(120000);
+ CLUSTER.deleteAllTopics();
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
index 35677aeaef2..5f742d95b3c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
@@ -120,8 +120,7 @@ public class KTableSourceTopicRestartIntegrationTest {
@Test
public void
shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosDisabled()
throws Exception {
try {
- streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
- streams.start();
+ streams = IntegrationTestUtils.getRunningStreams(STREAMS_CONFIG,
streamsBuilder, false);
produceKeyValues("a", "b", "c");
@@ -131,7 +130,7 @@ public class KTableSourceTopicRestartIntegrationTest {
streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
// the state restore listener will append one record to the log
streams.setGlobalStateRestoreListener(new
UpdatingSourceTopicOnRestoreStartStateRestoreListener());
- streams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
produceKeyValues("f", "g", "h");
@@ -149,8 +148,7 @@ public class KTableSourceTopicRestartIntegrationTest {
STREAMS_CONFIG.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
try {
- streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
- streams.start();
+ streams = IntegrationTestUtils.getRunningStreams(STREAMS_CONFIG,
streamsBuilder, false);
produceKeyValues("a", "b", "c");
@@ -160,7 +158,7 @@ public class KTableSourceTopicRestartIntegrationTest {
streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
// the state restore listener will append one record to the log
streams.setGlobalStateRestoreListener(new
UpdatingSourceTopicOnRestoreStartStateRestoreListener());
- streams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
produceKeyValues("f", "g", "h");
@@ -176,16 +174,14 @@ public class KTableSourceTopicRestartIntegrationTest {
@Test
public void
shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration() throws
Exception {
try {
- streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
- streams.start();
+ streams = IntegrationTestUtils.getStartedStreams(STREAMS_CONFIG,
streamsBuilder, false);
produceKeyValues("a", "b", "c");
assertNumberValuesRead(readKeyValues, expectedInitialResultsMap,
"Table did not read all values");
streams.close();
- streams = new KafkaStreams(streamsBuilder.build(), STREAMS_CONFIG);
- streams.start();
+ streams = IntegrationTestUtils.getRunningStreams(STREAMS_CONFIG,
streamsBuilder, false);
produceKeyValues("f", "g", "h");
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
index d7ce484ba55..bcbc36f3152 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java
@@ -135,7 +135,7 @@ public class KafkaStreamsCloseOptionsIntegrationTest {
adminClient = Admin.create(commonClientConfig);
}
- CLUSTER.deleteAllTopicsAndWait(120_000L);
+ CLUSTER.deleteAllTopics();
CLUSTER.createTopic(INPUT_TOPIC, 2, 1);
CLUSTER.createTopic(OUTPUT_TOPIC, 2, 1);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
index 4e341b07234..5f0fd659db7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
@@ -97,7 +97,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
@Timeout(600)
@Tag("integration")
public class NamedTopologyIntegrationTest {
- public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(3);
private static final String TOPOLOGY_1 = "topology-1";
private static final String TOPOLOGY_2 = "topology-2";
@@ -243,14 +243,14 @@ public class NamedTopologyIntegrationTest {
CLUSTER.getAllTopicsInCluster().stream().filter(t ->
t.contains("-changelog") || t.contains("-repartition")).forEach(t -> {
try {
assertThat("topic was not decorated",
t.contains(TOPIC_PREFIX));
- CLUSTER.deleteTopicsAndWait(t);
- } catch (final InterruptedException e) {
+ CLUSTER.deleteTopics(t);
+ } catch (final RuntimeException e) {
e.printStackTrace();
}
});
- CLUSTER.deleteTopicsAndWait(OUTPUT_STREAM_1, OUTPUT_STREAM_2,
OUTPUT_STREAM_3);
- CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
+ CLUSTER.deleteTopics(OUTPUT_STREAM_1, OUTPUT_STREAM_2,
OUTPUT_STREAM_3);
+ CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
}
@Test
@@ -518,8 +518,8 @@ public class NamedTopologyIntegrationTest {
CLUSTER.getAllTopicsInCluster().stream().filter(t ->
t.contains("-changelog")).forEach(t -> {
try {
- CLUSTER.deleteTopicAndWait(t);
- } catch (final InterruptedException e) {
+ CLUSTER.deleteTopic(t);
+ } catch (final RuntimeException e) {
e.printStackTrace();
}
});
@@ -570,7 +570,7 @@ public class NamedTopologyIntegrationTest {
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
} finally {
- CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
+ CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1);
}
}
@@ -624,8 +624,8 @@ public class NamedTopologyIntegrationTest {
CLUSTER.getAllTopicsInCluster().stream().filter(t ->
t.contains("changelog")).forEach(t -> {
try {
- CLUSTER.deleteTopicAndWait(t);
- } catch (final InterruptedException e) {
+ CLUSTER.deleteTopic(t);
+ } catch (final RuntimeException e) {
e.printStackTrace();
}
});
@@ -640,7 +640,7 @@ public class NamedTopologyIntegrationTest {
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
} finally {
- CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
+ CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
}
}
@@ -662,8 +662,8 @@ public class NamedTopologyIntegrationTest {
CLUSTER.getAllTopicsInCluster().stream().filter(t ->
t.contains("-changelog") || t.contains("-repartition")).forEach(t -> {
try {
- CLUSTER.deleteTopicsAndWait(t);
- } catch (final InterruptedException e) {
+ CLUSTER.deleteTopics(t);
+ } catch (final RuntimeException e) {
e.printStackTrace();
}
});
@@ -678,7 +678,7 @@ public class NamedTopologyIntegrationTest {
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
COUNT_OUTPUT, 5), equalTo(COUNT_OUTPUT_DATA));
assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig,
SUM_OUTPUT, 5), equalTo(SUM_OUTPUT_DATA));
- CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
+ CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT);
}
/**
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java
index 044acfac8b0..bc21a2a8b6a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/PositionRestartIntegrationTest.java
@@ -102,7 +102,7 @@ import static org.hamcrest.Matchers.is;
public class PositionRestartIntegrationTest {
private static final Logger LOG =
LoggerFactory.getLogger(PositionRestartIntegrationTest.class);
private static final long SEED = new Random().nextLong();
- private static final int NUM_BROKERS = 1;
+ private static final int NUM_BROKERS = 3;
public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
private static int port = 0;
private static final String INPUT_TOPIC_NAME = "input-topic";
@@ -274,7 +274,7 @@ public class PositionRestartIntegrationTest {
throws InterruptedException, IOException, ExecutionException,
TimeoutException {
CLUSTER.start();
- CLUSTER.deleteAllTopicsAndWait(60 * 1000L);
+ CLUSTER.deleteAllTopics();
final int partitions = 2;
CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index f16c3e57c39..2f84b0b4f4a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -242,7 +242,7 @@ public class QueryableStateIntegrationTest {
kafkaStreams.close(ofSeconds(30));
}
IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
- CLUSTER.deleteAllTopicsAndWait(0L);
+ CLUSTER.deleteAllTopics();
}
/**
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
index 477024dce53..6842c7718e1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RangeQueryIntegrationTest.java
@@ -153,7 +153,7 @@ public class RangeQueryIntegrationTest {
@AfterEach
public void cleanup() throws InterruptedException {
- CLUSTER.deleteAllTopicsAndWait(120000);
+ CLUSTER.deleteAllTopics();
}
@ParameterizedTest
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index ae953020520..7fff8099a16 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -196,7 +196,7 @@ public class RegexSourceIntegrationTest {
streams.close();
} finally {
- CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+ CLUSTER.deleteTopics("TEST-TOPIC-1", "TEST-TOPIC-2");
}
}
@@ -248,7 +248,7 @@ public class RegexSourceIntegrationTest {
streams.close();
} finally {
- CLUSTER.deleteTopicsAndWait(topic1, topic2);
+ CLUSTER.deleteTopics(topic1, topic2);
}
}
@@ -290,7 +290,7 @@ public class RegexSourceIntegrationTest {
streams.start();
TestUtils.waitForCondition(() ->
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
} finally {
- CLUSTER.deleteTopicAndWait("TEST-TOPIC-A");
+ CLUSTER.deleteTopic("TEST-TOPIC-A");
}
TestUtils.waitForCondition(() ->
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 1c8599a3b19..13856c58dc9 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -68,7 +68,7 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
// expiration of connections by the brokers to avoid errors when
`AdminClient` sends requests after potentially
// very long sleep times
brokerProps.put(SocketServerConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG,
-1L);
- CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+ CLUSTER = new EmbeddedKafkaCluster(3, brokerProps);
}
@BeforeAll
@@ -98,7 +98,7 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
}
@Test
- public void shouldNotAllowToResetWhileStreamsIsRunning(final TestInfo
testInfo) {
+ public void shouldNotAllowToResetWhileStreamsIsRunning(final TestInfo
testInfo) throws Exception {
final String appID = IntegrationTestUtils.safeUniqueTestName(testInfo);
final String[] parameters = new String[] {
"--application-id", appID,
@@ -113,7 +113,7 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
// RUN
streams = new
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
- streams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final int exitCode = new StreamsResetter().execute(parameters,
cleanUpConfig);
assertEquals(1, exitCode);
@@ -193,7 +193,8 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
// Run
streams = new
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
- streams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
final List<KeyValue<Long, Long>> result =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
streams.close();
@@ -213,7 +214,7 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
assertInternalTopicsGotDeleted(null);
// RE-RUN
- streams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> resultRerun =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
streams.close();
@@ -228,7 +229,8 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
// RUN
streams = new
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
- streams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
final List<KeyValue<Long, Long>> result =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
streams.close();
@@ -251,7 +253,7 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
resetFile.deleteOnExit();
// RE-RUN
- streams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<KeyValue<Long, Long>> resultRerun =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 5);
streams.close();
@@ -269,7 +271,8 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
// RUN
streams = new
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
- streams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
final List<KeyValue<Long, Long>> result =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
streams.close();
@@ -297,7 +300,8 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
resetFile.deleteOnExit();
// RE-RUN
- streams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
final List<KeyValue<Long, Long>> resultRerun =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
streams.close();
@@ -314,7 +318,8 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
// RUN
streams = new
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
- streams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
final List<KeyValue<Long, Long>> result =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
streams.close();
@@ -337,7 +342,8 @@ public class ResetIntegrationTest extends
AbstractResetIntegrationTest {
resetFile.deleteOnExit();
// RE-RUN
- streams.start();
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+
final List<KeyValue<Long, Long>> resultRerun =
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(resultConsumerConfig,
OUTPUT_TOPIC, 10);
streams.close();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
index 65a99a0b1b5..7f47233361e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
@@ -33,8 +33,6 @@ import java.io.IOException;
import java.util.Map;
import java.util.Properties;
-import static
org.apache.kafka.server.config.ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG;
-
/**
* Tests command line SSL setup for reset tool.
*/
@@ -54,9 +52,7 @@ public class ResetIntegrationWithSslTest extends
AbstractResetIntegrationTest {
try {
SSL_CONFIG = TestSslUtils.createSslConfig(false, true,
ConnectionMode.SERVER, TestUtils.tempFile(), "testCert");
-
- brokerProps.put(SocketServerConfigs.LISTENERS_CONFIG,
"SSL://localhost:0");
- brokerProps.put(INTER_BROKER_LISTENER_NAME_CONFIG, "SSL");
+
brokerProps.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"EXTERNAL:SSL,CONTROLLER:SSL,INTERNAL:SSL");
brokerProps.putAll(SSL_CONFIG);
} catch (final Exception e) {
throw new RuntimeException(e);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
index a0cedcb2840..cff0d74fcad 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.java
@@ -44,6 +44,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
+import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -64,13 +65,15 @@ import static org.hamcrest.MatcherAssert.assertThat;
public class ResetPartitionTimeIntegrationTest {
private static final int NUM_BROKERS = 1;
private static final Properties BROKER_CONFIG;
+ private static final long NOW = Instant.now().toEpochMilli();
+
static {
BROKER_CONFIG = new Properties();
BROKER_CONFIG.put("transaction.state.log.replication.factor", (short)
1);
BROKER_CONFIG.put("transaction.state.log.min.isr", 1);
}
public static final EmbeddedKafkaCluster CLUSTER =
- new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG, 0L);
+ new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG);
@BeforeAll
public static void startCluster() throws IOException {
@@ -117,13 +120,13 @@ public class ResetPartitionTimeIntegrationTest {
produceSynchronouslyToPartitionZero(
input,
Collections.singletonList(
- new KeyValueTimestamp<>("k3", "v3", 5000)
+ new KeyValueTimestamp<>("k3", "v3", NOW + 5000)
)
);
verifyOutput(
outputRaw,
Collections.singletonList(
- new KeyValueTimestamp<>("k3", "v3", 5000)
+ new KeyValueTimestamp<>("k3", "v3", NOW + 5000)
)
);
assertThat(lastRecordedTimestamp, is(-1L));
@@ -138,16 +141,16 @@ public class ResetPartitionTimeIntegrationTest {
produceSynchronouslyToPartitionZero(
input,
Collections.singletonList(
- new KeyValueTimestamp<>("k5", "v5", 4999)
+ new KeyValueTimestamp<>("k5", "v5", NOW + 4999)
)
);
verifyOutput(
outputRaw,
Collections.singletonList(
- new KeyValueTimestamp<>("k5", "v5", 4999)
+ new KeyValueTimestamp<>("k5", "v5", NOW + 4999)
)
);
- assertThat(lastRecordedTimestamp, is(5000L));
+ assertThat(lastRecordedTimestamp, is(NOW + 5000L));
} finally {
kafkaStreams.close();
quietlyCleanStateAfterTest(CLUSTER, kafkaStreams);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 636c9c52f05..5fe61eed66e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -117,7 +117,7 @@ public class RestoreIntegrationTest {
private static final Duration RESTORATION_DELAY = Duration.ofMillis(2000);
- private static final int NUM_BROKERS = 1;
+ private static final int NUM_BROKERS = 2;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
index 5fd98bbd7d6..22ac5e5408e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/RocksDBMetricsIntegrationTest.java
@@ -135,7 +135,7 @@ public class RocksDBMetricsIntegrationTest {
@AfterEach
public void after() throws Exception {
- CLUSTER.deleteTopicsAndWait(STREAM_INPUT_ONE, STREAM_INPUT_TWO,
STREAM_OUTPUT_ONE, STREAM_OUTPUT_TWO);
+ CLUSTER.deleteTopics(STREAM_INPUT_ONE, STREAM_INPUT_TWO,
STREAM_OUTPUT_ONE, STREAM_OUTPUT_TWO);
}
@FunctionalInterface
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
index f910043d1e0..1c3bd11957e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
@@ -104,7 +104,7 @@ public class StandbyTaskEOSIntegrationTest {
inputTopic = "input-" + safeTestName;
outputTopic = "output-" + safeTestName;
storeName = "store-" + safeTestName;
- CLUSTER.deleteTopicsAndWait(inputTopic, outputTopic, appId +
"-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog");
+ CLUSTER.deleteTopics(inputTopic, outputTopic, appId +
"-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog");
CLUSTER.createTopic(inputTopic, 1, 3);
CLUSTER.createTopic(outputTopic, 1, 3);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java
index 66f5b5f0209..e4d355cdabe 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSMultiRebalanceIntegrationTest.java
@@ -18,9 +18,11 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
@@ -99,7 +101,7 @@ public class StandbyTaskEOSMultiRebalanceIntegrationTest {
storeName = "store-" + safeTestName;
counterName = "counter-" + safeTestName;
- CLUSTER.deleteTopicsAndWait(inputTopic, outputTopic);
+ CLUSTER.deleteTopics(inputTopic, outputTopic);
CLUSTER.createTopic(inputTopic, partitionCount, 3);
CLUSTER.createTopic(outputTopic, partitionCount, 3);
}
@@ -141,7 +143,7 @@ public class StandbyTaskEOSMultiRebalanceIntegrationTest {
CLUSTER.bootstrapServers(),
IntegerSerializer.class,
IntegerSerializer.class,
- new Properties()
+
Utils.mkProperties(Collections.singletonMap(ProducerConfig.ACKS_CONFIG, "all"))
),
10L + time
);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
index df4359228c6..3c3a3721748 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StateDirectoryIntegrationTest.java
@@ -171,7 +171,7 @@ public class StateDirectoryIntegrationTest {
).findFirst().isPresent()
);
} finally {
- CLUSTER.deleteAllTopicsAndWait(0L);
+ CLUSTER.deleteAllTopics();
}
}
@@ -271,7 +271,7 @@ public class StateDirectoryIntegrationTest {
assertTrue((new File(stateDir)).exists()); // Root state store
exists
assertTrue(appDir.exists()); // Application state store exists
} finally {
- CLUSTER.deleteAllTopicsAndWait(0L);
+ CLUSTER.deleteAllTopics();
}
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 476c1857422..9ab5c7d574c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -54,6 +54,7 @@ import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.time.Duration;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -80,8 +81,9 @@ import static org.junit.jupiter.api.Assertions.fail;
@Tag("integration")
@Timeout(600)
public class StreamsUncaughtExceptionHandlerIntegrationTest {
+ private static final long NOW = Instant.now().toEpochMilli();
- public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1, new Properties(), Collections.emptyList(), 0L, 0L);
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
@BeforeAll
public static void startCluster() throws IOException {
@@ -146,7 +148,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
kafkaStreams.setUncaughtExceptionHandler((t, e) ->
counter.incrementAndGet());
startApplicationAndWaitUntilRunning(kafkaStreams);
- produceMessages(0L, inputTopic, "A");
+ produceMessages(NOW, inputTopic, "A");
// should call the UncaughtExceptionHandler in current thread
TestUtils.waitForCondition(() -> counter.get() == 1, "Handler was
called 1st time");
@@ -168,7 +170,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
startApplicationAndWaitUntilRunning(kafkaStreams);
- produceMessages(0L, inputTopic, "A");
+ produceMessages(NOW, inputTopic, "A");
waitForApplicationState(Collections.singletonList(kafkaStreams),
KafkaStreams.State.ERROR, DEFAULT_DURATION);
assertThat(processorValueCollector.size(), equalTo(1));
@@ -252,7 +254,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
startApplicationAndWaitUntilRunning(kafkaStreams);
- produceMessages(0L, inputTopic2, "A");
+ produceMessages(NOW, inputTopic2, "A");
waitForApplicationState(Collections.singletonList(kafkaStreams),
KafkaStreams.State.ERROR, DEFAULT_DURATION);
assertThat(processorValueCollector.size(), equalTo(1));
@@ -297,7 +299,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
IntegerSerializer.class,
StringSerializer.class,
new Properties()),
- 0L);
+ NOW);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
inputTopic2,
@@ -310,7 +312,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
IntegerSerializer.class,
StringSerializer.class,
new Properties()),
- 0L);
+ NOW);
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
TestUtils.consumerConfig(
@@ -365,7 +367,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
startApplicationAndWaitUntilRunning(asList(kafkaStreams1,
kafkaStreams2));
- produceMessages(0L, inputTopic, "A");
+ produceMessages(NOW, inputTopic, "A");
waitForApplicationState(asList(kafkaStreams1, kafkaStreams2),
KafkaStreams.State.ERROR, DEFAULT_DURATION);
assertThat(processorValueCollector.size(), equalTo(1));
@@ -386,7 +388,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
});
startApplicationAndWaitUntilRunning(kafkaStreams);
- produceMessages(0L, inputTopic, "A");
+ produceMessages(NOW, inputTopic, "A");
TestUtils.waitForCondition(() -> count.get() == numThreads,
"finished replacing threads");
TestUtils.waitForCondition(() -> throwError.get(), "finished
replacing threads");
kafkaStreams.close();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
index 9215a0095e1..47d37f76d8c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java
@@ -55,11 +55,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.HashSet;
+import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -82,12 +81,8 @@ import static org.hamcrest.Matchers.equalTo;
@Tag("integration")
@Timeout(600)
public class SuppressionDurabilityIntegrationTest {
-
- public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(
- 3,
- mkProperties(mkMap()),
- 0L
- );
+ private static final long NOW = Instant.now().toEpochMilli();
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(3);
@BeforeAll
public static void startCluster() throws IOException {
@@ -173,11 +168,11 @@ public class SuppressionDurabilityIntegrationTest {
);
verifyOutput(
outputRaw,
- new HashSet<>(asList(
+ asList(
new KeyValueTimestamp<>("k1", 1L, scaledTime(1L)),
new KeyValueTimestamp<>("k2", 1L, scaledTime(2L)),
new KeyValueTimestamp<>("k3", 1L, scaledTime(3L))
- ))
+ )
);
assertThat(eventCount.get(), is(0));
@@ -191,10 +186,10 @@ public class SuppressionDurabilityIntegrationTest {
);
verifyOutput(
outputRaw,
- new HashSet<>(asList(
+ asList(
new KeyValueTimestamp<>("k4", 1L, scaledTime(4L)),
new KeyValueTimestamp<>("k5", 1L, scaledTime(5L))
- ))
+ )
);
assertThat(eventCount.get(), is(2));
verifyOutput(
@@ -225,11 +220,11 @@ public class SuppressionDurabilityIntegrationTest {
);
verifyOutput(
outputRaw,
- new HashSet<>(asList(
+ asList(
new KeyValueTimestamp<>("k6", 1L, scaledTime(6L)),
new KeyValueTimestamp<>("k7", 1L, scaledTime(7L)),
new KeyValueTimestamp<>("k8", 1L, scaledTime(8L))
- ))
+ )
);
assertThat("suppress has apparently produced some duplicates.
There should only be 5 output events.",
eventCount.get(), is(5));
@@ -303,24 +298,12 @@ public class SuppressionDurabilityIntegrationTest {
IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic,
keyValueTimestamps);
}
- private void verifyOutput(final String topic, final
Set<KeyValueTimestamp<String, Long>> keyValueTimestamps) {
- final Properties properties = mkProperties(
- mkMap(
- mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"),
- mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers()),
- mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()),
- mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName())
- )
- );
- IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic,
keyValueTimestamps);
- }
-
/**
* scaling to ensure that there are commits in between the various test
events,
* just to exercise that everything works properly in the presence of
commits.
*/
private long scaledTime(final long unscaledTime) {
- return COMMIT_INTERVAL * 2 * unscaledTime;
+ return NOW + COMMIT_INTERVAL * 2 * unscaledTime;
}
private static void produceSynchronouslyToPartitionZero(final String
topic, final List<KeyValueTimestamp<String, String>> toProduce) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index 9b9722e8539..3d7c141129f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -52,6 +52,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.io.IOException;
+import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
@@ -83,12 +84,8 @@ import static org.hamcrest.Matchers.empty;
@Tag("integration")
@Timeout(600)
public class SuppressionIntegrationTest {
-
- public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(
- 1,
- mkProperties(mkMap()),
- 0L
- );
+ private static final long NOW = Instant.now().toEpochMilli();
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
@BeforeAll
public static void startCluster() throws IOException {
@@ -525,7 +522,7 @@ public class SuppressionIntegrationTest {
* just to exercise that everything works properly in the presence of
commits.
*/
private static long scaledTime(final long unscaledTime) {
- return COMMIT_INTERVAL * 2 * unscaledTime;
+ return NOW + COMMIT_INTERVAL * 2 * unscaledTime;
}
private static void produceSynchronously(final String topic, final
List<KeyValueTimestamp<String, String>> toProduce) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
index 0238dcae621..9f025ee414a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.java
@@ -61,7 +61,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
@Timeout(600)
public class TaskMetadataIntegrationTest {
- public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1, new Properties(), Collections.emptyList(), 0L, 0L);
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1, new Properties(), Collections.emptyMap(), 0L, 0L);
@BeforeAll
public static void startCluster() throws IOException {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index f4f03b98330..16334714168 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -16,19 +16,36 @@
*/
package org.apache.kafka.streams.integration.utils;
-import kafka.server.KafkaServer;
-import kafka.zk.EmbeddedZookeeper;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.network.SocketServerConfigs;
-import org.apache.kafka.server.config.ConfigType;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
-import org.apache.kafka.server.config.ZkConfigs;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.apache.kafka.test.TestCondition;
@@ -37,115 +54,143 @@ import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
+import java.time.Duration;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
/**
- * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and
supplied number of Kafka brokers.
+ * Setup an embedded Kafka KRaft cluster for integration tests (using {@link
kafka.testkit.KafkaClusterTestKit} internally) with the
+ * specified number of brokers and the specified broker properties.
+ * Additional Kafka client properties can also be supplied if required.
+ * This class also provides various utility methods to easily create Kafka
topics, produce data, consume data etc.
*/
public class EmbeddedKafkaCluster {
private static final Logger log =
LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
- private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random
port being selected
- private static final int TOPIC_CREATION_TIMEOUT = 30000;
- private static final int TOPIC_DELETION_TIMEOUT = 30000;
- private EmbeddedZookeeper zookeeper = null;
- private final KafkaEmbedded[] brokers;
-
+ private final KafkaClusterTestKit cluster;
private final Properties brokerConfig;
- private final List<Properties> brokerConfigOverrides;
public final MockTime time;
-
public EmbeddedKafkaCluster(final int numBrokers) {
this(numBrokers, new Properties());
}
- public EmbeddedKafkaCluster(final int numBrokers,
- final Properties brokerConfig) {
- this(numBrokers, brokerConfig, System.currentTimeMillis());
+ public EmbeddedKafkaCluster(final int numBrokers, final Properties
brokerConfig) {
+ this(numBrokers, brokerConfig, Collections.emptyMap());
}
public EmbeddedKafkaCluster(final int numBrokers,
final Properties brokerConfig,
final long mockTimeMillisStart) {
- this(numBrokers, brokerConfig, Collections.emptyList(),
mockTimeMillisStart);
- }
-
- public EmbeddedKafkaCluster(final int numBrokers,
- final Properties brokerConfig,
- final List<Properties> brokerConfigOverrides) {
- this(numBrokers, brokerConfig, brokerConfigOverrides,
System.currentTimeMillis());
+ this(numBrokers, brokerConfig, Collections.emptyMap(),
mockTimeMillisStart, System.nanoTime());
}
-
public EmbeddedKafkaCluster(final int numBrokers,
final Properties brokerConfig,
- final List<Properties> brokerConfigOverrides,
- final long mockTimeMillisStart) {
- this(numBrokers, brokerConfig, brokerConfigOverrides,
mockTimeMillisStart, System.nanoTime());
+ final Map<Integer, Map<String, String>>
brokerConfigOverrides) {
+ this(numBrokers, brokerConfig, brokerConfigOverrides,
System.currentTimeMillis(), System.nanoTime());
}
-
public EmbeddedKafkaCluster(final int numBrokers,
final Properties brokerConfig,
- final List<Properties> brokerConfigOverrides,
+ final Map<Integer, Map<String, String>>
brokerConfigOverrides,
final long mockTimeMillisStart,
final long mockTimeNanoStart) {
+ addDefaultBrokerPropsIfAbsent(brokerConfig);
+
if (!brokerConfigOverrides.isEmpty() && brokerConfigOverrides.size()
!= numBrokers) {
throw new IllegalArgumentException("Size of brokerConfigOverrides
" + brokerConfigOverrides.size()
- + " must match broker number " + numBrokers);
+ + " must match broker number " + numBrokers);
+ }
+ try {
+ final KafkaClusterTestKit.Builder clusterBuilder = new
KafkaClusterTestKit.Builder(
+ new TestKitNodes.Builder()
+ .setCombined(true)
+ .setNumBrokerNodes(numBrokers)
+ .setPerServerProperties(brokerConfigOverrides)
+ // Reduce number of controllers for faster startup
+ // We may make this configurable in the future if
there's a use case for it
+ .setNumControllerNodes(1)
+ .build()
+ );
+
+ brokerConfig.forEach((k, v) ->
clusterBuilder.setConfigProp((String) k, v));
+ cluster = clusterBuilder.build();
+ cluster.nonFatalFaultHandler().setIgnore(true);
+ } catch (final Exception e) {
+ throw new KafkaException("Failed to create test Kafka cluster", e);
}
- brokers = new KafkaEmbedded[numBrokers];
this.brokerConfig = brokerConfig;
- time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
- this.brokerConfigOverrides = brokerConfigOverrides;
+ this.time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
+ }
+
+ public void start() {
+ try {
+ cluster.format();
+ cluster.startup();
+ cluster.waitForReadyBrokers();
+ } catch (final Exception e) {
+ throw new KafkaException("Failed to start test Kafka cluster", e);
+ }
+
+ verifyClusterReadiness();
}
/**
- * Creates and starts a Kafka cluster.
+ * Perform an extended check to ensure that the primary APIs of the
cluster are available, including:
+ * <ul>
+ * <li>Ability to create a topic</li>
+ * <li>Ability to produce to a topic</li>
+ * <li>Ability to form a consumer group</li>
+ * <li>Ability to consume from a topic</li>
+ * </ul>
+ * If this method completes successfully, all resources created to verify
the cluster health
+ * (such as topics and consumer groups) will be cleaned up before it
returns.
+ * <p>
+ * This provides extra guarantees compared to other cluster readiness
checks such as
+ * {@link KafkaClusterTestKit#waitForReadyBrokers()}, which verify that
brokers have
+ * completed startup and joined the cluster, but do not verify that the
internal consumer
+ * offsets topic has been created or that it's actually possible for users
to create and
+ * interact with topics.
*/
- public void start() throws IOException {
- log.debug("Initiating embedded Kafka cluster startup");
- log.debug("Starting a ZooKeeper instance");
- zookeeper = new EmbeddedZookeeper();
- log.debug("ZooKeeper instance is running at {}", zKConnectString());
-
- brokerConfig.put(ZkConfigs.ZK_CONNECT_CONFIG, zKConnectString());
- putIfAbsent(brokerConfig, SocketServerConfigs.LISTENERS_CONFIG,
"PLAINTEXT://localhost:" + DEFAULT_BROKER_PORT);
- putIfAbsent(brokerConfig, ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG,
true);
- putIfAbsent(brokerConfig,
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);
- putIfAbsent(brokerConfig,
GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 0);
- putIfAbsent(brokerConfig,
GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0);
- putIfAbsent(brokerConfig,
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, (short) 1);
- putIfAbsent(brokerConfig,
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, 5);
- putIfAbsent(brokerConfig,
TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, 5);
- putIfAbsent(brokerConfig,
ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, true);
-
- for (int i = 0; i < brokers.length; i++) {
- brokerConfig.put(ServerConfigs.BROKER_ID_CONFIG, i);
- log.debug("Starting a Kafka instance on {} ...",
brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG));
-
- final Properties effectiveConfig = new Properties();
- effectiveConfig.putAll(brokerConfig);
- if (brokerConfigOverrides != null && brokerConfigOverrides.size()
> i) {
- effectiveConfig.putAll(brokerConfigOverrides.get(i));
+ public void verifyClusterReadiness() {
+ final UUID uuid = UUID.randomUUID();
+ final String consumerGroupId = "group-warmup-" + uuid;
+ final Map<String, Object> consumerConfig =
Collections.singletonMap(GROUP_ID_CONFIG, consumerGroupId);
+ final String topic = "topic-warmup-" + uuid;
+
+ createTopic(topic);
+ final Map<String, Object> producerProps = new
HashMap<>(clientDefaultConfig());
+ producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "warmup-producer");
+ produce(producerProps, topic, null, "warmup message key", "warmup
message value");
+
+ try (Consumer<?, ?> consumer =
createConsumerAndSubscribeTo(consumerConfig, topic)) {
+ final ConsumerRecords<?, ?> records =
consumer.poll(Duration.ofMillis(TimeUnit.MINUTES.toMillis(2)));
+ if (records.isEmpty()) {
+ throw new AssertionError("Failed to verify availability of
group coordinator and produce/consume APIs on Kafka cluster in time");
}
- brokers[i] = new KafkaEmbedded(effectiveConfig, time);
-
- log.debug("Kafka instance is running at {}, connected to ZooKeeper
at {}",
- brokers[i].brokerList(), brokers[i].zookeeperConnect());
}
- }
- private void putIfAbsent(final Properties props, final String propertyKey,
final Object propertyValue) {
- if (!props.containsKey(propertyKey)) {
- brokerConfig.put(propertyKey, propertyValue);
+ try (Admin admin = createAdminClient()) {
+
admin.deleteConsumerGroups(Collections.singleton(consumerGroupId)).all().get(30,
TimeUnit.SECONDS);
+ admin.deleteTopics(Collections.singleton(topic)).all().get(30,
TimeUnit.SECONDS);
+ } catch (final InterruptedException | ExecutionException |
TimeoutException e) {
+ throw new AssertionError("Failed to clean up cluster health check
resource(s)", e);
}
}
@@ -153,46 +198,22 @@ public class EmbeddedKafkaCluster {
* Stop the Kafka cluster.
*/
public void stop() {
- if (brokers.length > 1) {
- // delete the topics first to avoid cascading leader elections
while shutting down the brokers
- final Set<String> topics = getAllTopicsInCluster();
- if (!topics.isEmpty()) {
- try (final Admin adminClient = brokers[0].createAdminClient())
{
- adminClient.deleteTopics(topics).all().get();
- } catch (final InterruptedException e) {
- log.warn("Got interrupted while deleting topics in
preparation for stopping embedded brokers", e);
- throw new RuntimeException(e);
- } catch (final ExecutionException | RuntimeException e) {
- log.warn("Couldn't delete all topics before stopping
brokers", e);
- }
- }
+ final AtomicReference<Throwable> shutdownFailure = new
AtomicReference<>();
+ Utils.closeQuietly(cluster, "embedded Kafka cluster", shutdownFailure);
+ if (shutdownFailure.get() != null) {
+ throw new KafkaException("Failed to shut down producer / embedded
Kafka cluster", shutdownFailure.get());
}
- for (final KafkaEmbedded broker : brokers) {
- broker.stopAsync();
- }
- for (final KafkaEmbedded broker : brokers) {
- broker.awaitStoppedAndPurge();
- }
- zookeeper.shutdown();
}
- /**
- * The ZooKeeper connection string aka `zookeeper.connect` in
`hostnameOrIp:port` format.
- * Example: `127.0.0.1:2181`.
- * <p>
- * You can use this to e.g. tell Kafka brokers how to connect to this
instance.
- */
- public String zKConnectString() {
- return "127.0.0.1:" + zookeeper.port();
+ public String bootstrapServers() {
+ return cluster.bootstrapServers();
}
- /**
- * This cluster's `bootstrap.servers` value. Example: `127.0.0.1:9092`.
- * <p>
- * You can use this to tell Kafka producers how to connect to this cluster.
- */
- public String bootstrapServers() {
- return brokers[0].brokerList();
+ public boolean sslEnabled() {
+ final String listenerSecurityProtocolMap =
brokerConfig.getProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG);
+ if (listenerSecurityProtocolMap == null)
+ return false;
+ return listenerSecurityProtocolMap.contains(":SSL") ||
listenerSecurityProtocolMap.contains(":SASL_SSL");
}
/**
@@ -211,8 +232,18 @@ public class EmbeddedKafkaCluster {
*
* @param topic The name of the topic.
*/
- public void createTopic(final String topic) throws InterruptedException {
- createTopic(topic, 1, 1, Collections.emptyMap());
+ public void createTopic(final String topic) {
+ createTopic(topic, 1);
+ }
+
+ /**
+ * Create a Kafka topic with given partition and a replication factor of 1.
+ *
+ * @param topic The name of the topic.
+ * @param partitions The number of partitions for this topic.
+ */
+ public void createTopic(final String topic, final int partitions) {
+ createTopic(topic, partitions, 1, Collections.emptyMap());
}
/**
@@ -227,116 +258,177 @@ public class EmbeddedKafkaCluster {
}
/**
- * Create a Kafka topic with the given parameters.
+ * Create a Kafka topic with given partition, replication factor, and
topic config.
*
- * @param topic The name of the topic.
+ * @param topic The name of the topic.
* @param partitions The number of partitions for this topic.
* @param replication The replication factor for (partitions of) this
topic.
* @param topicConfig Additional topic-level configuration settings.
*/
- public void createTopic(final String topic,
- final int partitions,
- final int replication,
- final Map<String, String> topicConfig) throws
InterruptedException {
- brokers[0].createTopic(topic, partitions, replication, topicConfig);
- final List<TopicPartition> topicPartitions = new ArrayList<>();
- for (int partition = 0; partition < partitions; partition++) {
- topicPartitions.add(new TopicPartition(topic, partition));
+ public void createTopic(final String topic, final int partitions, final
int replication, final Map<String, String> topicConfig) {
+ if (replication > cluster.brokers().size()) {
+ throw new InvalidReplicationFactorException("Insufficient brokers
("
+ + cluster.brokers().size() + ") for desired replication ("
+ replication + ")");
}
- IntegrationTestUtils.waitForTopicPartitions(brokers(),
topicPartitions, TOPIC_CREATION_TIMEOUT);
- }
- /**
- * Deletes a topic returns immediately.
- *
- * @param topic the name of the topic
- */
- public void deleteTopic(final String topic) throws InterruptedException {
- deleteTopicsAndWait(-1L, topic);
+ log.info("Creating topic { name: {}, partitions: {}, replication: {},
config: {} }",
+ topic, partitions, replication, topicConfig);
+ final NewTopic newTopic = new NewTopic(topic, partitions, (short)
replication);
+ newTopic.configs(topicConfig);
+
+ try (final Admin adminClient = createAdminClient()) {
+
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
+ TestUtils.waitForCondition(() ->
adminClient.listTopics().names().get().contains(topic),
+ "Wait for topic " + topic + " to get created.");
+ } catch (final TopicExistsException ignored) {
+ } catch (final InterruptedException | ExecutionException e) {
+ if (!(e.getCause() instanceof TopicExistsException)) {
+ throw new RuntimeException(e);
+ }
+ }
}
- /**
- * Deletes a topic and blocks for max 30 sec until the topic got deleted.
- *
- * @param topic the name of the topic
- */
- public void deleteTopicAndWait(final String topic) throws
InterruptedException {
- deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topic);
+ public void deleteTopics(final String... topics) {
+ for (final String topic : topics) {
+ deleteTopic(topic);
+ }
}
+
/**
- * Deletes multiple topics returns immediately.
+ * Delete a Kafka topic.
*
- * @param topics the name of the topics
+ * @param topic the topic to delete; may not be null
*/
- public void deleteTopics(final String... topics) throws
InterruptedException {
- deleteTopicsAndWait(-1, topics);
+ public void deleteTopic(final String topic) {
+ try (final Admin adminClient = createAdminClient()) {
+ adminClient.deleteTopics(Collections.singleton(topic)).all().get();
+ } catch (final InterruptedException | ExecutionException e) {
+ if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
+ throw new RuntimeException(e);
+ }
+ }
}
/**
- * Deletes multiple topics and blocks for max 30 sec until all topics got
deleted.
- *
- * @param topics the name of the topics
+ * Delete all topics except internal topics.
*/
- public void deleteTopicsAndWait(final String... topics) throws
InterruptedException {
- deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
+ public void deleteAllTopics() {
+ try (final Admin adminClient = createAdminClient()) {
+ final Set<String> topics = adminClient.listTopics().names().get();
+ adminClient.deleteTopics(topics).all().get();
+ } catch (final UnknownTopicOrPartitionException ignored) {
+ } catch (final ExecutionException | InterruptedException e) {
+ if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
+ throw new RuntimeException(e);
+ }
+ }
}
/**
- * Deletes multiple topics and blocks until all topics got deleted.
- *
- * @param timeoutMs the max time to wait for the topics to be deleted
(does not block if {@code <= 0})
- * @param topics the name of the topics
+ * Produce given key and value to topic partition.
+ * @param topic the topic to produce to; may not be null.
+ * @param partition the topic partition to produce to.
+ * @param key the record key.
+ * @param value the record value.
*/
- public void deleteTopicsAndWait(final long timeoutMs, final String...
topics) throws InterruptedException {
- for (final String topic : topics) {
+ public void produce(final Map<String, Object> producerProps, final String
topic, final Integer partition, final String key, final String value) {
+ try (KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps, new ByteArraySerializer(), new
ByteArraySerializer())) {
+ final ProducerRecord<byte[], byte[]> msg = new
ProducerRecord<>(topic, partition, key == null ? null : key.getBytes(), value
== null ? null : value.getBytes());
try {
- brokers[0].deleteTopic(topic);
- } catch (final UnknownTopicOrPartitionException ignored) { }
+ producer.send(msg).get(TimeUnit.SECONDS.toMillis(120),
TimeUnit.MILLISECONDS);
+ producer.flush();
+ } catch (final Exception e) {
+ throw new KafkaException("Could not produce message: " + msg,
e);
+ }
}
+ }
+
+ public Admin createAdminClient() {
+ return Admin.create(mkProperties(clientDefaultConfig()));
+ }
- if (timeoutMs > 0) {
- TestUtils.waitForCondition(new TopicsDeletedCondition(topics),
timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds.");
+ public Map<String, String> clientDefaultConfig() {
+ final Map<String, String> props = new HashMap<>();
+ props.putIfAbsent(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+ if (sslEnabled()) {
+ props.putIfAbsent(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG).toString());
+ props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password)
brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
+ props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"SSL");
}
+ return props;
}
- /**
- * Deletes all topics and blocks until all topics got deleted.
- *
- * @param timeoutMs the max time to wait for the topics to be deleted
(does not block if {@code <= 0})
- */
- public void deleteAllTopicsAndWait(final long timeoutMs) throws
InterruptedException {
- final Set<String> topics = getAllTopicsInCluster();
- for (final String topic : topics) {
- try {
- brokers[0].deleteTopic(topic);
- } catch (final UnknownTopicOrPartitionException ignored) { }
+ public KafkaConsumer<byte[], byte[]> createConsumer(final Map<String,
Object> consumerProps) {
+ final Map<String, Object> props = new HashMap<>(clientDefaultConfig());
+ props.putAll(consumerProps);
+
+ props.putIfAbsent(GROUP_ID_CONFIG, UUID.randomUUID().toString());
+ props.putIfAbsent(ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.putIfAbsent(AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.putIfAbsent(KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ props.putIfAbsent(VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+ final KafkaConsumer<byte[], byte[]> consumer;
+ try {
+ consumer = new KafkaConsumer<>(props);
+ } catch (final Throwable t) {
+ throw new KafkaException("Failed to create consumer", t);
}
+ return consumer;
+ }
+
+ public KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo(final
Map<String, Object> consumerProps, final String... topics) {
+ return createConsumerAndSubscribeTo(consumerProps, null, topics);
+ }
- if (timeoutMs > 0) {
- TestUtils.waitForCondition(new TopicsDeletedCondition(topics),
timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds.");
+ public KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo(final
Map<String, Object> consumerProps, final ConsumerRebalanceListener
rebalanceListener, final String... topics) {
+ final KafkaConsumer<byte[], byte[]> consumer =
createConsumer(consumerProps);
+ if (rebalanceListener != null) {
+ consumer.subscribe(Arrays.asList(topics), rebalanceListener);
+ } else {
+ consumer.subscribe(Arrays.asList(topics));
}
+ return consumer;
+ }
+
+ private void addDefaultBrokerPropsIfAbsent(final Properties brokerConfig) {
+
brokerConfig.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 *
1024 * 1024L);
+
brokerConfig.putIfAbsent(GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
"0");
+
brokerConfig.putIfAbsent(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG,
"0");
+
brokerConfig.putIfAbsent(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG,
"5");
+
brokerConfig.putIfAbsent(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1");
+
brokerConfig.putIfAbsent(TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG,
"5");
+
brokerConfig.putIfAbsent(TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG,
"1");
+
brokerConfig.putIfAbsent(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG,
true);
+ brokerConfig.putIfAbsent(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG,
true);
}
public void waitForRemainingTopics(final long timeoutMs, final String...
topics) throws InterruptedException {
TestUtils.waitForCondition(new TopicsRemainingCondition(topics),
timeoutMs, "Topics are not expected after " + timeoutMs + " milli seconds.");
}
- private final class TopicsDeletedCondition implements TestCondition {
- final Set<String> deletedTopics = new HashSet<>();
-
- private TopicsDeletedCondition(final String... topics) {
- Collections.addAll(deletedTopics, topics);
- }
-
- private TopicsDeletedCondition(final Collection<String> topics) {
- deletedTopics.addAll(topics);
+ public Set<String> getAllTopicsInCluster() {
+ try (final Admin adminClient = createAdminClient()) {
+ return adminClient.listTopics(new
ListTopicsOptions().listInternal(true)).names().get();
+ } catch (final InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
}
+ }
- @Override
- public boolean conditionMet() {
- final Set<String> allTopics = getAllTopicsInCluster();
- return !allTopics.removeAll(deletedTopics);
+ public Properties getLogConfig(final String topic) {
+ try (final Admin adminClient = createAdminClient()) {
+ final ConfigResource configResource = new
ConfigResource(ConfigResource.Type.TOPIC, topic);
+ final Config config =
adminClient.describeConfigs(Collections.singleton(configResource)).values().get(configResource).get();
+ final Properties properties = new Properties();
+ for (final ConfigEntry configEntry : config.entries()) {
+ if (configEntry.source() ==
ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG) {
+ properties.put(configEntry.name(), configEntry.value());
+ }
+ }
+ return properties;
+ } catch (final InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
}
}
@@ -353,25 +445,4 @@ public class EmbeddedKafkaCluster {
return allTopics.equals(remainingTopics);
}
}
-
- private List<KafkaServer> brokers() {
- final List<KafkaServer> servers = new ArrayList<>();
- for (final KafkaEmbedded broker : brokers) {
- servers.add(broker.kafkaServer());
- }
- return servers;
- }
-
- public Properties getLogConfig(final String topic) {
- return
brokers[0].kafkaServer().zkClient().getEntityConfigs(ConfigType.TOPIC, topic);
- }
-
- public Set<String> getAllTopicsInCluster() {
- final scala.collection.Iterator<String> topicsIterator =
brokers[0].kafkaServer().zkClient().getAllTopicsInCluster(false).iterator();
- final Set<String> topics = new HashSet<>();
- while (topicsIterator.hasNext()) {
- topics.add(topicsIterator.next());
- }
- return topics;
- }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 007da4c0789..536e43b715a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -75,6 +75,7 @@ import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@@ -104,7 +105,6 @@ import static org.apache.kafka.common.utils.Utils.sleep;
import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.fail;
@@ -294,7 +294,7 @@ public class IntegrationTestUtils {
final int replicationCount,
final String... topics) {
try {
- cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
+ cluster.deleteAllTopics();
for (final String topic : topics) {
cluster.createTopic(topic, partitionCount, replicationCount);
}
@@ -306,9 +306,9 @@ public class IntegrationTestUtils {
public static void quietlyCleanStateAfterTest(final EmbeddedKafkaCluster
cluster, final KafkaStreams driver) {
try {
driver.cleanUp();
- cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT);
- } catch (final RuntimeException | InterruptedException e) {
- LOG.warn("Ignoring failure to clean test state", e);
+ cluster.deleteAllTopics();
+ } catch (final RuntimeException e) {
+ LOG.warn("Ignoring failure to clean test state");
}
}
@@ -1167,6 +1167,10 @@ public class IntegrationTestUtils {
if (results.size() != expected.size()) {
throw new AssertionError(printRecords(results) + " != " +
expected);
}
+ // sort expected and results by key before comparing them
+ expected.sort(Comparator.comparing(e -> e.key().toString()));
+ results.sort(Comparator.comparing(e -> e.key().toString()));
+
final Iterator<KeyValueTimestamp<K, V>> expectedIterator =
expected.iterator();
for (final ConsumerRecord<K, V> result : results) {
final KeyValueTimestamp<K, V> expected1 = expectedIterator.next();
@@ -1178,28 +1182,6 @@ public class IntegrationTestUtils {
}
}
- public static void verifyKeyValueTimestamps(final Properties
consumerConfig,
- final String topic,
- final
Set<KeyValueTimestamp<String, Long>> expected) {
- final List<ConsumerRecord<String, Long>> results;
- try {
- results = waitUntilMinRecordsReceived(consumerConfig, topic,
expected.size());
- } catch (final Exception e) {
- throw new RuntimeException(e);
- }
-
- if (results.size() != expected.size()) {
- throw new AssertionError(printRecords(results) + " != " +
expected);
- }
-
- final Set<KeyValueTimestamp<String, Long>> actual =
- results.stream()
- .map(result -> new KeyValueTimestamp<>(result.key(),
result.value(), result.timestamp()))
- .collect(Collectors.toSet());
-
- assertThat(actual, equalTo(expected));
- }
-
private static <K, V> void compareKeyValueTimestamp(final
ConsumerRecord<K, V> record,
final K expectedKey,
final V expectedValue,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
deleted file mode 100644
index 5937d287e0c..00000000000
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/KafkaEmbedded.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration.utils;
-
-import kafka.cluster.EndPoint;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.TestUtils;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.admin.NewTopic;
-import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.config.types.Password;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.network.SocketServerConfigs;
-import org.apache.kafka.server.config.ServerConfigs;
-import org.apache.kafka.server.config.ZkConfigs;
-import org.apache.kafka.server.util.MockTime;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-
-import static
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
-import static org.apache.kafka.server.config.ServerLogConfigs.LOG_DIR_CONFIG;
-import static
org.apache.kafka.server.config.ServerLogConfigs.NUM_PARTITIONS_CONFIG;
-
-
-/**
- * Runs an in-memory, "embedded" instance of a Kafka broker, which listens at
`127.0.0.1:9092` by
- * default.
- * <p>
- * Requires a running ZooKeeper instance to connect to.
- */
-public class KafkaEmbedded {
-
- private static final Logger log =
LoggerFactory.getLogger(KafkaEmbedded.class);
-
- private static final String DEFAULT_ZK_CONNECT = "127.0.0.1:2181";
-
- private final Properties effectiveConfig;
- private final File logDir;
- private final File tmpFolder;
- private final KafkaServer kafka;
-
- /**
- * Creates and starts an embedded Kafka broker.
- *
- * @param config Broker configuration settings. Used to modify, for
example, on which port the
- * broker should listen to. Note that you cannot change the
`log.dirs` setting
- * currently.
- */
- @SuppressWarnings({"WeakerAccess", "this-escape"})
- public KafkaEmbedded(final Properties config, final MockTime time) throws
IOException {
- tmpFolder = org.apache.kafka.test.TestUtils.tempDirectory();
- logDir =
org.apache.kafka.test.TestUtils.tempDirectory(tmpFolder.toPath(), "log");
- effectiveConfig = effectiveConfigFrom(config);
- final boolean loggingEnabled = true;
- final KafkaConfig kafkaConfig = new KafkaConfig(effectiveConfig,
loggingEnabled);
- log.debug("Starting embedded Kafka broker (with log.dirs={} and ZK
ensemble at {}) ...",
- logDir, zookeeperConnect());
- kafka = TestUtils.createServer(kafkaConfig, time);
- log.debug("Startup of embedded Kafka broker at {} completed (with ZK
ensemble at {}) ...",
- brokerList(), zookeeperConnect());
- }
-
- /**
- * Creates the configuration for starting the Kafka broker by merging
default values with
- * overwrites.
- *
- * @param initialConfig Broker configuration settings that override the
default config.
- */
- private Properties effectiveConfigFrom(final Properties initialConfig) {
- final Properties effectiveConfig = new Properties();
- effectiveConfig.put(ServerConfigs.BROKER_ID_CONFIG, 0);
- effectiveConfig.put(NUM_PARTITIONS_CONFIG, 1);
- effectiveConfig.put(AUTO_CREATE_TOPICS_ENABLE_CONFIG, true);
- effectiveConfig.put(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG, 1000000);
- effectiveConfig.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG,
true);
- effectiveConfig.put(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG, 10000);
-
- effectiveConfig.putAll(initialConfig);
- effectiveConfig.setProperty(LOG_DIR_CONFIG, logDir.getAbsolutePath());
- return effectiveConfig;
- }
-
- /**
- * This broker's `metadata.broker.list` value. Example: `localhost:9092`.
- * <p>
- * You can use this to tell Kafka producers and consumers how to connect
to this instance.
- */
- @SuppressWarnings("WeakerAccess")
- public String brokerList() {
- final EndPoint endPoint = kafka.advertisedListeners().head();
- return endPoint.host() + ":" + endPoint.port();
- }
-
-
- /**
- * The ZooKeeper connection string aka `zookeeper.connect`.
- */
- @SuppressWarnings("WeakerAccess")
- public String zookeeperConnect() {
- return effectiveConfig.getProperty("zookeeper.connect",
DEFAULT_ZK_CONNECT);
- }
-
- @SuppressWarnings("WeakerAccess")
- public void stopAsync() {
- log.debug("Shutting down embedded Kafka broker at {} (with ZK ensemble
at {}) ...",
- brokerList(), zookeeperConnect());
- kafka.shutdown();
- }
-
- @SuppressWarnings("WeakerAccess")
- public void awaitStoppedAndPurge() {
- kafka.awaitShutdown();
- log.debug("Removing log dir at {} ...", logDir);
- try {
- Utils.delete(tmpFolder);
- } catch (final IOException e) {
- throw new RuntimeException(e);
- }
- log.debug("Shutdown of embedded Kafka broker at {} completed (with ZK
ensemble at {}) ...",
- brokerList(), zookeeperConnect());
- }
-
- /**
- * Create a Kafka topic with 1 partition and a replication factor of 1.
- *
- * @param topic The name of the topic.
- */
- public void createTopic(final String topic) {
- createTopic(topic, 1, 1, Collections.emptyMap());
- }
-
- /**
- * Create a Kafka topic with the given parameters.
- *
- * @param topic The name of the topic.
- * @param partitions The number of partitions for this topic.
- * @param replication The replication factor for (the partitions of) this
topic.
- */
- public void createTopic(final String topic, final int partitions, final
int replication) {
- createTopic(topic, partitions, replication, Collections.emptyMap());
- }
-
- /**
- * Create a Kafka topic with the given parameters.
- *
- * @param topic The name of the topic.
- * @param partitions The number of partitions for this topic.
- * @param replication The replication factor for (partitions of) this
topic.
- * @param topicConfig Additional topic-level configuration settings.
- */
- public void createTopic(final String topic,
- final int partitions,
- final int replication,
- final Map<String, String> topicConfig) {
- log.debug("Creating topic { name: {}, partitions: {}, replication: {},
config: {} }",
- topic, partitions, replication, topicConfig);
- final NewTopic newTopic = new NewTopic(topic, partitions, (short)
replication);
- newTopic.configs(topicConfig);
-
- try (final Admin adminClient = createAdminClient()) {
-
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
- } catch (final InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
-
- @SuppressWarnings("WeakerAccess")
- public Admin createAdminClient() {
- final Properties adminClientConfig = new Properties();
- adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
brokerList());
- final Object listeners =
effectiveConfig.get(SocketServerConfigs.LISTENERS_CONFIG);
- if (listeners != null && listeners.toString().contains("SSL")) {
- adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
effectiveConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
- adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
((Password)
effectiveConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
-
adminClientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
- }
- return Admin.create(adminClientConfig);
- }
-
- @SuppressWarnings("WeakerAccess")
- public void deleteTopic(final String topic) {
- log.debug("Deleting topic { name: {} }", topic);
- try (final Admin adminClient = createAdminClient()) {
-
adminClient.deleteTopics(Collections.singletonList(topic)).all().get();
- } catch (final InterruptedException | ExecutionException e) {
- if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
- throw new RuntimeException(e);
- }
- }
- }
-
- @SuppressWarnings("WeakerAccess")
- public KafkaServer kafkaServer() {
- return kafka;
- }
-}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java
index 56218c72900..1aeaa45d92c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/HandlingSourceTopicDeletionIntegrationTest.java
@@ -114,7 +114,7 @@ public class HandlingSourceTopicDeletionIntegrationTest {
() -> "Kafka Streams clients did not reach state RUNNING"
);
- CLUSTER.deleteTopicAndWait(INPUT_TOPIC);
+ CLUSTER.deleteTopic(INPUT_TOPIC);
TestUtils.waitForCondition(
() -> kafkaStreams1.state() == State.ERROR &&
kafkaStreams2.state() == State.ERROR,