This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c964e6d8 [flink] Add kafka options to register
9c964e6d8 is described below

commit 9c964e6d8be84baeecabb8222a32e9659ca5d59b
Author: 李国君 <[email protected]>
AuthorDate: Wed Nov 1 15:05:46 2023 +0800

    [flink] Add kafka options to register
---
 .../org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
index af0740f1a..cbff022a3 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.kafka;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.log.LogStoreRegister;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
-import org.apache.paimon.utils.Preconditions;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
@@ -43,6 +42,8 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM_PARTITION
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM_REPLICATION;
 import static org.apache.paimon.flink.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
 import static org.apache.paimon.flink.kafka.KafkaLogOptions.TOPIC;
+import static 
org.apache.paimon.flink.kafka.KafkaLogStoreFactory.toKafkaProperties;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** KafkaLogStoreRegister is used to register/unregister topics in Kafka for 
paimon table. */
 public class KafkaLogStoreRegister implements LogStoreRegister {
@@ -73,9 +74,9 @@ public class KafkaLogStoreRegister implements 
LogStoreRegister {
                                 this.identifier.getObjectName(),
                                 UUID.randomUUID().toString().replace("-", ""));
 
-        Preconditions.checkArgument(this.bootstrapServers != null);
-        Preconditions.checkArgument(this.topic != null);
-        Preconditions.checkArgument(this.identifier != null);
+        checkNotNull(context.getOptions().get(BOOTSTRAP_SERVERS));
+        checkNotNull(this.topic);
+        checkNotNull(this.identifier);
 
         // handle the type information missing when Map is converted to Options
         if (context.getOptions().get(REGISTER_TIMEOUT.key()) == null) {
@@ -94,8 +95,7 @@ public class KafkaLogStoreRegister implements 
LogStoreRegister {
 
         this.replicationFactor = 
context.getOptions().get(LOG_SYSTEM_REPLICATION);
 
-        this.properties = new Properties();
-        this.properties.put("bootstrap.servers", this.bootstrapServers);
+        this.properties = toKafkaProperties(context.getOptions());
     }
 
     @Override

Reply via email to