This is an automated email from the ASF dual-hosted git repository.
MartijnVisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new ab6182e9 [FLINK-39822][tests] Wait for partition assignment in test
createTopics calls
ab6182e9 is described below
commit ab6182e981a68ebbfc47e9ec4719f21b9a0f268b
Author: Aleksandr Savonin <[email protected]>
AuthorDate: Tue Jun 2 18:31:51 2026 +0200
[FLINK-39822][tests] Wait for partition assignment in test createTopics
calls
---
.../tests/util/kafka/KafkaContainerClient.java | 22 +++------------
.../flink/tests/util/kafka/SmokeKafkaITCase.java | 20 ++++++-------
.../sink/testutils/KafkaSinkExternalContext.java | 12 ++++----
.../enumerator/KafkaSourceEnumeratorTest.java | 14 ++-------
.../kafka/source/reader/KafkaSourceReaderTest.java | 7 +----
.../FlinkKafkaIntegrationCompatibilityTest.java | 33 +++++++---------------
.../testutils/KafkaSourceExternalContext.java | 16 +++++------
7 files changed, 37 insertions(+), 87 deletions(-)
diff --git
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
index e70e1343..087c7069 100644
---
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
+++
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaContainerClient.java
@@ -19,12 +19,11 @@
package org.apache.flink.tests.util.kafka;
import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.connector.kafka.testutils.KafkaUtil;
import org.apache.flink.connector.kafka.testutils.TestKafkaContainer;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -48,9 +47,7 @@ import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
/** A utility class that exposes common methods over a {@link
TestKafkaContainer}. */
@@ -63,22 +60,11 @@ public class KafkaContainerClient {
}
public void createTopic(int replicationFactor, int numPartitions, String
topic) {
- Map<String, Object> properties = new HashMap<>();
+ Properties properties = new Properties();
properties.put(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
container.getBootstrapServers());
- try (AdminClient admin = AdminClient.create(properties)) {
- admin.createTopics(
- Collections.singletonList(
- new NewTopic(topic, numPartitions, (short)
replicationFactor)))
- .all()
- .get();
- } catch (Exception e) {
- throw new IllegalStateException(
- String.format(
- "Fail to create topic [%s partitions: %d
replication factor: %d].",
- topic, numPartitions, replicationFactor),
- e);
- }
+ KafkaUtil.createNewTopicAndWaitForPartitionAssignment(
+ topic, numPartitions, replicationFactor, properties);
}
public <T> void sendMessages(String topic, Serializer<T> valueSerializer,
T... messages) {
diff --git
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
index 6aab872f..3c796bf7 100644
---
a/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
+++
b/flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java
@@ -30,8 +30,6 @@ import org.apache.flink.test.resources.ResourceTestUtils;
import org.apache.flink.test.util.JobSubmission;
import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -50,7 +48,6 @@ import org.testcontainers.junit.jupiter.Testcontainers;
import java.nio.ByteBuffer;
import java.nio.file.Path;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -90,7 +87,6 @@ class SmokeKafkaITCase {
.withTestcontainersSettings(TESTCONTAINERS_SETTINGS)
.build();
- private static AdminClient admin;
private static KafkaProducer<Void, Integer> producer;
private static Configuration getConfiguration() {
@@ -117,7 +113,6 @@ class SmokeKafkaITCase {
adminProperties.put(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
KAFKA_CONTAINER.getBootstrapServers());
- admin = AdminClient.create(adminProperties);
final Properties producerProperties = new Properties();
producerProperties.putAll(adminProperties);
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
VoidSerializer.class);
@@ -128,7 +123,6 @@ class SmokeKafkaITCase {
@AfterAll
static void teardown() {
- admin.close();
producer.close();
}
@@ -141,12 +135,14 @@ class SmokeKafkaITCase {
// create the required topics
final short replicationFactor = 1;
- admin.createTopics(
- Arrays.asList(
- new NewTopic(inputTopic, 1, replicationFactor),
- new NewTopic(outputTopic, 1,
replicationFactor)))
- .all()
- .get();
+ final Properties adminProperties = new Properties();
+ adminProperties.put(
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+ KAFKA_CONTAINER.getBootstrapServers());
+ KafkaUtil.createNewTopicAndWaitForPartitionAssignment(
+ inputTopic, 1, replicationFactor, adminProperties);
+ KafkaUtil.createNewTopicAndWaitForPartitionAssignment(
+ outputTopic, 1, replicationFactor, adminProperties);
producer.send(new ProducerRecord<>(inputTopic, 1));
producer.send(new ProducerRecord<>(inputTopic, 2));
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
index 392e5aa5..b179a653 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java
@@ -26,6 +26,7 @@ import
org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
+import org.apache.flink.connector.kafka.testutils.KafkaUtil;
import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
import
org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
@@ -35,7 +36,6 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
-import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
@@ -103,12 +103,10 @@ public class KafkaSinkExternalContext implements
DataStreamSinkV2ExternalContext
topicName,
numPartitions,
replicationFactor);
- NewTopic newTopic = new NewTopic(topicName, numPartitions,
replicationFactor);
- try {
-
kafkaAdminClient.createTopics(Collections.singletonList(newTopic)).all().get();
- } catch (Exception e) {
- throw new RuntimeException(String.format("Cannot create topic
'%s'", topicName), e);
- }
+ final Properties properties = new Properties();
+ properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
+ KafkaUtil.createNewTopicAndWaitForPartitionAssignment(
+ topicName, numPartitions, replicationFactor, properties);
}
private void deleteTopic(String topicName) {
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java
index bb706084..1a8a0427 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumeratorTest.java
@@ -34,7 +34,6 @@ import
org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import com.google.common.collect.Iterables;
import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
@@ -283,8 +282,7 @@ public class KafkaSourceEnumeratorTest {
context,
ENABLE_PERIODIC_PARTITION_DISCOVERY,
INCLUDE_DYNAMIC_TOPIC,
- OffsetsInitializer.latest());
- AdminClient adminClient = KafkaSourceTestEnv.getAdminClient())
{
+ OffsetsInitializer.latest())) {
startEnumeratorAndRegisterReaders(context, enumerator,
OffsetsInitializer.latest());
@@ -295,15 +293,7 @@ public class KafkaSourceEnumeratorTest {
.hasSize(2);
// create the dynamic topic.
- adminClient
- .createTopics(
- Collections.singleton(
- new NewTopic(
- DYNAMIC_TOPIC_NAME,
- NUM_PARTITIONS_DYNAMIC_TOPIC,
- (short) 1)))
- .all()
- .get();
+ KafkaSourceTestEnv.createTestTopic(DYNAMIC_TOPIC_NAME,
NUM_PARTITIONS_DYNAMIC_TOPIC, 1);
// invoke partition discovery callable again.
while (true) {
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
index 134fe70f..9b5221f6 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/KafkaSourceReaderTest.java
@@ -43,7 +43,6 @@ import
org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.util.function.SerializableSupplier;
import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -92,12 +91,8 @@ public class KafkaSourceReaderTest extends
SourceReaderTestBase<KafkaPartitionSp
@BeforeAll
public static void setup() throws Throwable {
KafkaSourceTestEnv.setup();
+ KafkaSourceTestEnv.createTestTopic(TOPIC, NUM_PARTITIONS, 1);
try (AdminClient adminClient = KafkaSourceTestEnv.getAdminClient()) {
- adminClient
- .createTopics(
- Collections.singleton(new NewTopic(TOPIC,
NUM_PARTITIONS, (short) 1)))
- .all()
- .get();
// Use the admin client to trigger the creation of internal
__consumer_offsets topic.
// This makes sure that we won't see unavailable coordinator in
the tests.
waitUtil(
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/FlinkKafkaIntegrationCompatibilityTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/FlinkKafkaIntegrationCompatibilityTest.java
index 938fb7e7..e2f36189 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/FlinkKafkaIntegrationCompatibilityTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/FlinkKafkaIntegrationCompatibilityTest.java
@@ -28,8 +28,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.CloseableIterator;
-import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -51,6 +49,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.UUID;
import static
org.apache.flink.connector.kafka.testutils.DockerImageVersions.APACHE_KAFKA;
@@ -67,13 +66,9 @@ import static org.assertj.core.api.Assertions.assertThat;
class FlinkKafkaIntegrationCompatibilityTest {
private TestKafkaContainer kafkaContainer;
- private AdminClient adminClient;
@AfterEach
void tearDown() {
- if (adminClient != null) {
- adminClient.close();
- }
if (kafkaContainer != null) {
kafkaContainer.stop();
}
@@ -99,16 +94,12 @@ class FlinkKafkaIntegrationCompatibilityTest {
int numRecordsPerPartition = 5;
// Create topics
- Map<String, Object> adminConfig = new HashMap<>();
- adminConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
- adminClient = AdminClient.create(adminConfig);
- adminClient
- .createTopics(
- Arrays.asList(
- new NewTopic(topic1, numPartitions, (short) 1),
- new NewTopic(topic2, numPartitions, (short)
1)))
- .all()
- .get();
+ Properties adminConfig = new Properties();
+ adminConfig.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
+ KafkaUtil.createNewTopicAndWaitForPartitionAssignment(
+ topic1, numPartitions, 1, adminConfig);
+ KafkaUtil.createNewTopicAndWaitForPartitionAssignment(
+ topic2, numPartitions, 1, adminConfig);
// Produce test data to both topics
// Values in partition N should be {N, N+1, N+2, ...,
numRecordsPerPartition-1}
@@ -192,13 +183,9 @@ class FlinkKafkaIntegrationCompatibilityTest {
int numRecords = 100;
// Create topic
- Map<String, Object> adminConfig = new HashMap<>();
- adminConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
- adminClient = AdminClient.create(adminConfig);
- adminClient
- .createTopics(Collections.singleton(new NewTopic(topic, 1,
(short) 1)))
- .all()
- .get();
+ Properties adminConfig = new Properties();
+ adminConfig.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
+ KafkaUtil.createNewTopicAndWaitForPartitionAssignment(topic, 1, 1,
adminConfig);
// Create Flink KafkaSink and write records
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java
index 658a1c90..6e9db0ce 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java
@@ -32,7 +32,6 @@ import
org.apache.flink.connector.testframe.external.source.TestingSourceSetting
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewPartitions;
-import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
@@ -178,10 +177,9 @@ public class KafkaSourceExternalContext implements
DataStreamSourceExternalConte
private KafkaPartitionDataWriter createSinglePartitionTopic(int
topicIndex) throws Exception {
String newTopicName = topicName + "-" + topicIndex;
LOG.info("Creating topic '{}'", newTopicName);
- adminClient
- .createTopics(Collections.singletonList(new
NewTopic(newTopicName, 1, (short) 1)))
- .all()
- .get();
+ final Properties adminProperties = new Properties();
+
adminProperties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
+ KafkaUtil.createNewTopicAndWaitForPartitionAssignment(newTopicName, 1,
1, adminProperties);
return new KafkaPartitionDataWriter(
getKafkaProducerProperties(topicIndex), new
TopicPartition(newTopicName, 0));
}
@@ -207,10 +205,10 @@ public class KafkaSourceExternalContext implements
DataStreamSourceExternalConte
new TopicPartition(topicName, numPartitions));
} else {
LOG.info("Creating topic '{}'", topicName);
- adminClient
- .createTopics(Collections.singletonList(new
NewTopic(topicName, 1, (short) 1)))
- .all()
- .get();
+ final Properties adminProperties = new Properties();
+ adminProperties.setProperty(
+ AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
+ KafkaUtil.createNewTopicAndWaitForPartitionAssignment(topicName,
1, 1, adminProperties);
return new KafkaPartitionDataWriter(
getKafkaProducerProperties(0), new
TopicPartition(topicName, 0));
}