This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 254223fdb [Imporve][Connector-V2]Parameter verification for connector
V2 kafka sink (#2866)
254223fdb is described below
commit 254223fdb92dd83d72ea6439457068ca38cb3d6d
Author: TaoZex <[email protected]>
AuthorDate: Mon Sep 26 16:48:09 2022 +0800
[Imporve][Connector-V2]Parameter verification for connector V2 kafka sink
(#2866)
* parameter verification
* update
* update
---
.../seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java | 10 ++++++++++
.../connectors/seatunnel/kafka/sink/KafkaSinkWriter.java | 3 ++-
2 files changed, 12 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index 1c37dfd77..ef92d4152 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
+
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
@@ -26,6 +29,9 @@ import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
import
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
@@ -50,6 +56,10 @@ public class KafkaSink implements
SeaTunnelSink<SeaTunnelRow, KafkaSinkState, Ka
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
TOPIC, BOOTSTRAP_SERVERS);
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SINK,
result.getMsg());
+ }
this.pluginConfig = pluginConfig;
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 3f71c3085..b577067ef 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
import org.apache.seatunnel.api.sink.SinkWriter;
@@ -136,7 +137,7 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
// todo: parse the target field from config
private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(Config
pluginConfig, SeaTunnelRowType seaTunnelRowType) {
- return new
DefaultSeaTunnelRowSerializer(pluginConfig.getString("topics"),
seaTunnelRowType);
+ return new
DefaultSeaTunnelRowSerializer(pluginConfig.getString(TOPIC), seaTunnelRowType);
}
private KafkaSemantics getKafkaSemantics(Config pluginConfig) {