This is an automated email from the ASF dual-hosted git repository.
MartijnVisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 84d2ee7a [FLINK-39723][tests] Wait for partition assignment in
KafkaTableTestBase.createTestTopic
84d2ee7a is described below
commit 84d2ee7a53c892ed21d4d6ddadcd49322571b957
Author: Aleksandr Savonin <[email protected]>
AuthorDate: Mon Jun 1 20:20:15 2026 +0200
[FLINK-39723][tests] Wait for partition assignment in
KafkaTableTestBase.createTestTopic
---
.../connectors/kafka/table/KafkaTableTestBase.java | 18 +++---------------
1 file changed, 3 insertions(+), 15 deletions(-)
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index 5db67f6a..32e76f68 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -29,7 +29,6 @@ import org.apache.flink.test.util.AbstractTestBase;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
-import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
@@ -122,21 +121,10 @@ abstract class KafkaTableTestBase extends
AbstractTestBase {
}
public void createTestTopic(String topic, int numPartitions, int
replicationFactor) {
- Map<String, Object> properties = new HashMap<>();
+ Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
getBootstrapServers());
- try (AdminClient admin = AdminClient.create(properties)) {
- admin.createTopics(
- Collections.singletonList(
- new NewTopic(topic, numPartitions, (short)
replicationFactor)))
- .all()
- .get();
- } catch (Exception e) {
- throw new IllegalStateException(
- String.format(
- "Fail to create topic [%s partitions: %d
replication factor: %d].",
- topic, numPartitions, replicationFactor),
- e);
- }
+ KafkaUtil.createNewTopicAndWaitForPartitionAssignment(
+ topic, numPartitions, replicationFactor, properties);
}
public Map<TopicPartition, OffsetAndMetadata> getConsumerOffset(String
groupId) {