This is an automated email from the ASF dual-hosted git repository.
zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9c964e6d8 [flink] Add kafka options to register
9c964e6d8 is described below
commit 9c964e6d8be84baeecabb8222a32e9659ca5d59b
Author: 李国君 <[email protected]>
AuthorDate: Wed Nov 1 15:05:46 2023 +0800
[flink] Add kafka options to register
---
.../org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
index af0740f1a..cbff022a3 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.kafka;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.log.LogStoreRegister;
import org.apache.paimon.flink.log.LogStoreTableFactory;
-import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
@@ -43,6 +42,8 @@ import static
org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM_PARTITION
import static
org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM_REPLICATION;
import static org.apache.paimon.flink.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
import static org.apache.paimon.flink.kafka.KafkaLogOptions.TOPIC;
+import static
org.apache.paimon.flink.kafka.KafkaLogStoreFactory.toKafkaProperties;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** KafkaLogStoreRegister is used to register/unregister topics in Kafka for
paimon table. */
public class KafkaLogStoreRegister implements LogStoreRegister {
@@ -73,9 +74,9 @@ public class KafkaLogStoreRegister implements
LogStoreRegister {
this.identifier.getObjectName(),
UUID.randomUUID().toString().replace("-", ""));
- Preconditions.checkArgument(this.bootstrapServers != null);
- Preconditions.checkArgument(this.topic != null);
- Preconditions.checkArgument(this.identifier != null);
+ checkNotNull(context.getOptions().get(BOOTSTRAP_SERVERS));
+ checkNotNull(this.topic);
+ checkNotNull(this.identifier);
// handle the type information missing when Map is converted to Options
if (context.getOptions().get(REGISTER_TIMEOUT.key()) == null) {
@@ -94,8 +95,7 @@ public class KafkaLogStoreRegister implements
LogStoreRegister {
this.replicationFactor =
context.getOptions().get(LOG_SYSTEM_REPLICATION);
- this.properties = new Properties();
- this.properties.put("bootstrap.servers", this.bootstrapServers);
+ this.properties = toKafkaProperties(context.getOptions());
}
@Override