This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d9aba3a0 [kafka] support to explicitly create the kafka topic. (#910)
3d9aba3a0 is described below

commit 3d9aba3a044acaab09e147a02a8029a4817c016b
Author: calvin <[email protected]>
AuthorDate: Fri Apr 14 16:59:46 2023 +0800

    [kafka] support to explicitly create the kafka topic. (#910)
---
 .../paimon/flink/kafka/KafkaLogSinkProvider.java   | 38 +++++++++++++++++++++-
 .../paimon/flink/kafka/KafkaLogStoreFactory.java   |  4 ++-
 .../paimon/flink/FileSystemCatalogITCase.java      |  6 ++--
 3 files changed, 43 insertions(+), 5 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSinkProvider.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSinkProvider.java
index 09c9c823f..2d055f6a4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSinkProvider.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogSinkProvider.java
@@ -26,10 +26,15 @@ import org.apache.paimon.flink.sink.LogSinkFunction;
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.errors.TopicExistsException;
 
 import javax.annotation.Nullable;
 
+import java.util.Collections;
 import java.util.Properties;
 
 /** A Kafka {@link LogSinkProvider}. */
@@ -49,19 +54,25 @@ public class KafkaLogSinkProvider implements 
LogSinkProvider {
 
     private final LogChangelogMode changelogMode;
 
+    private final Integer numBuckets;
+
+    public static final int DEFAULT_REPLICATION_FACTOR = 2;
+
     public KafkaLogSinkProvider(
             String topic,
             Properties properties,
             @Nullable SerializationSchema<RowData> primaryKeySerializer,
             SerializationSchema<RowData> valueSerializer,
             LogConsistency consistency,
-            LogChangelogMode changelogMode) {
+            LogChangelogMode changelogMode,
+            Integer numBuckets) {
         this.topic = topic;
         this.properties = properties;
         this.primaryKeySerializer = primaryKeySerializer;
         this.valueSerializer = valueSerializer;
         this.consistency = consistency;
         this.changelogMode = changelogMode;
+        this.numBuckets = numBuckets;
     }
 
     @Override
@@ -77,6 +88,7 @@ public class KafkaLogSinkProvider implements LogSinkProvider {
             default:
                 throw new IllegalArgumentException("Unsupported: " + 
consistency);
         }
+        createTopicIfNotExists();
         return new KafkaSinkFunction(topic, createSerializationSchema(), 
properties, semantic);
     }
 
@@ -85,4 +97,28 @@ public class KafkaLogSinkProvider implements LogSinkProvider 
{
         return new KafkaLogSerializationSchema(
                 topic, primaryKeySerializer, valueSerializer, changelogMode);
     }
+
+    private void createTopicIfNotExists() {
+        try (final AdminClient adminClient = AdminClient.create(properties)) {
+            if (!adminClient.listTopics().names().get().contains(topic)) {
+                int numBrokers = 
adminClient.describeCluster().nodes().get().size();
+                int replicationFactor =
+                        DEFAULT_REPLICATION_FACTOR > numBrokers
+                                ? numBrokers
+                                : DEFAULT_REPLICATION_FACTOR;
+
+                NewTopic newTopic = new NewTopic(topic, numBuckets, (short) 
replicationFactor);
+
+                
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+            }
+        } catch (Exception e) {
+            if (e.getCause() instanceof TopicExistsException) {
+                throw new TableException(
+                        String.format(
+                                "Failed to create kafka topic. " + "Reason: 
topic %s exists. ",
+                                topic));
+            }
+            throw new TableException("Error in createTopicIfNotExists", e);
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
index 9bb9e0f3a..8ea9e6c47 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java
@@ -43,6 +43,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ISOLATION_LEVEL_CONFIG;
+import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
 import static org.apache.paimon.CoreOptions.LogConsistency;
@@ -124,7 +125,8 @@ public class KafkaLogStoreFactory implements 
LogStoreTableFactory {
                 primaryKeySerializer,
                 valueSerializer,
                 options.get(LOG_CONSISTENCY),
-                options.get(LOG_CHANGELOG_MODE));
+                options.get(LOG_CHANGELOG_MODE),
+                options.get(BUCKET));
     }
 
     private int[] getPrimaryKeyIndexes(ResolvedSchema schema) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
index 202faea9f..2d7add076 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FileSystemCatalogITCase.java
@@ -97,7 +97,6 @@ public class FileSystemCatalogITCase extends 
KafkaTableTestBase {
     @Test
     public void testLogWriteRead() throws Exception {
         String topic = UUID.randomUUID().toString();
-        createTopicIfNotExists(topic, 1);
 
         try {
             tEnv.useCatalog("fs");
@@ -135,9 +134,10 @@ public class FileSystemCatalogITCase extends 
KafkaTableTestBase {
                                     + "'kafka.topic'='%s'"
                                     + ")",
                             getBootstrapServers(), topic));
+
+            tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', 
'6')").await();
             BlockingIterator<Row, Row> iterator =
                     BlockingIterator.of(tEnv.from("T").execute().collect());
-            tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', 
'6')").await();
             List<Row> result = iterator.collectAndClose(2);
             assertThat(result)
                     .containsExactlyInAnyOrder(Row.of("1", "2", "3", 4), 
Row.of("4", "5", "6", 7));
@@ -191,9 +191,9 @@ public class FileSystemCatalogITCase extends 
KafkaTableTestBase {
     }
 
     private void innerTestWriteRead() throws Exception {
+        tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', 
'6')").await();
         BlockingIterator<Row, Row> iterator =
                 BlockingIterator.of(tEnv.from("T").execute().collect());
-        tEnv.executeSql("INSERT INTO T VALUES ('1', '2', '3'), ('4', '5', 
'6')").await();
         List<Row> result = iterator.collectAndClose(2);
         assertThat(result).containsExactlyInAnyOrder(Row.of("1", "2", "3"), 
Row.of("4", "5", "6"));
     }

Reply via email to