This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 38f1903be [Feature][API] Add options check before create source and 
sink and transform in FactoryUtil (#4424)
38f1903be is described below

commit 38f1903be2baebc6a903012c3a3129168aec34c8
Author: Eric <[email protected]>
AuthorDate: Tue Mar 28 21:24:03 2023 +0800

    [Feature][API] Add options check before create source and sink and 
transform in FactoryUtil (#4424)
---
 .../seatunnel/api/table/factory/FactoryUtil.java   |  4 ++
 .../connectors/seatunnel/kafka/config/Config.java  |  7 +++
 .../connectors/seatunnel/kafka/sink/KafkaSink.java | 37 +++--------
 .../seatunnel/kafka/sink/KafkaSinkCommitter.java   |  7 +--
 .../seatunnel/kafka/sink/KafkaSinkFactory.java     | 10 +--
 .../seatunnel/kafka/sink/KafkaSinkWriter.java      | 73 ++++++++++------------
 .../seatunnel/cassandra/CassandraIT.java           |  3 +
 .../test/resources/extractTopic_fake_to_kafka.conf |  1 +
 .../test/resources/kafkasink_fake_to_kafka.conf    |  1 +
 .../test/resources/kafkasource_canal_to_kafka.conf |  1 +
 10 files changed, 69 insertions(+), 75 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index a330b4e37..6ac939149 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -114,6 +115,7 @@ public final class FactoryUtil {
                     ReadonlyConfig options,
                     ClassLoader classLoader) {
         TableFactoryContext context = new TableFactoryContext(acceptedTables, 
options, classLoader);
+        
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
         TableSource<T, SplitT, StateT> tableSource = 
factory.createSource(context);
         validateAndApplyMetadata(acceptedTables, tableSource);
         return tableSource.createSource();
@@ -136,6 +138,7 @@ public final class FactoryUtil {
             TableFactoryContext context =
                     new TableFactoryContext(
                             Collections.singletonList(catalogTable), options, 
classLoader);
+            
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
             return factory.createSink(context).createSink();
         } catch (Throwable t) {
             throw new FactoryException(
@@ -321,6 +324,7 @@ public final class FactoryUtil {
         TableFactoryContext context =
                 new TableFactoryContext(
                         Collections.singletonList(catalogTable), options, 
classLoader);
+        
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
         return factory.createTransform(context).createTransform();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index ff051b96d..2dffda4f4 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -164,4 +164,11 @@ public class Config {
                             "The processing method of data format error. The 
default value is fail, and the optional value is (fail, skip). "
                                     + "When fail is selected, data format 
error will block and an exception will be thrown. "
                                     + "When skip is selected, data format 
error will skip this line data.");
+
+    public static final Option<KafkaSemantics> SEMANTICS =
+            Options.key("semantics")
+                    .enumType(KafkaSemantics.class)
+                    .defaultValue(KafkaSemantics.NON)
+                    .withDescription(
+                            "Semantics that can be chosen 
EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.");
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
index e09eb08e8..cbd409f99 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSink.java
@@ -20,7 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.ConfigValidator;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -29,60 +30,40 @@ import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaAggregatedCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
 
 import com.google.auto.service.AutoService;
+import lombok.NoArgsConstructor;
 
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
-
 /**
  * Kafka Sink implementation by using SeaTunnel sink API. This class contains 
the method to create
  * {@link KafkaSinkWriter} and {@link KafkaSinkCommitter}.
  */
 @AutoService(SeaTunnelSink.class)
+@NoArgsConstructor
 public class KafkaSink
         implements SeaTunnelSink<
                 SeaTunnelRow, KafkaSinkState, KafkaCommitInfo, 
KafkaAggregatedCommitInfo> {
 
-    private Config pluginConfig;
+    private ReadonlyConfig pluginConfig;
     private SeaTunnelRowType seaTunnelRowType;
 
-    public KafkaSink() {}
-
-    public KafkaSink(Config pluginConfig, SeaTunnelRowType rowType) {
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
BOOTSTRAP_SERVERS.key());
-        if (!result.isSuccess()) {
-            throw new KafkaConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
+    public KafkaSink(ReadonlyConfig pluginConfig, SeaTunnelRowType rowType) {
         this.pluginConfig = pluginConfig;
         this.seaTunnelRowType = rowType;
     }
 
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
BOOTSTRAP_SERVERS.key());
-        if (!result.isSuccess()) {
-            throw new KafkaConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        this.pluginConfig = pluginConfig;
+        ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
+                .validate(new KafkaSinkFactory().optionRule());
+        this.pluginConfig = ReadonlyConfig.fromConfig(pluginConfig);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
index 4d62c00c6..ed4e28080 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
 
@@ -33,11 +32,11 @@ import java.util.Properties;
 @Slf4j
 public class KafkaSinkCommitter implements SinkCommitter<KafkaCommitInfo> {
 
-    private final Config pluginConfig;
+    private final ReadonlyConfig pluginConfig;
 
     private KafkaInternalProducer<?, ?> kafkaProducer;
 
-    public KafkaSinkCommitter(Config pluginConfig) {
+    public KafkaSinkCommitter(ReadonlyConfig pluginConfig) {
         this.pluginConfig = pluginConfig;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index b0cadf736..a9a0315ab 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
@@ -47,7 +45,11 @@ public class KafkaSinkFactory implements TableSinkFactory {
                         Arrays.asList(
                                 MessageFormat.JSON, MessageFormat.CANAL_JSON, 
MessageFormat.TEXT),
                         Config.TOPIC)
-                .optional(Config.KAFKA_CONFIG, Config.ASSIGN_PARTITIONS, 
Config.TRANSACTION_PREFIX)
+                .optional(
+                        Config.KAFKA_CONFIG,
+                        Config.ASSIGN_PARTITIONS,
+                        Config.TRANSACTION_PREFIX,
+                        Config.SEMANTICS)
                 .exclusive(Config.PARTITION, Config.PARTITION_KEY_FIELDS)
                 .build();
     }
@@ -56,7 +58,7 @@ public class KafkaSinkFactory implements TableSinkFactory {
     public TableSink createSink(TableFactoryContext context) {
         return () ->
                 new KafkaSink(
-                        ConfigFactory.parseMap(context.getOptions().toMap()),
+                        context.getOptions(),
                         
context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 6ed287f2d..c7fdb4cee 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -17,13 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
@@ -33,6 +30,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.kafka.serialize.SeaTunnelRowSer
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -45,12 +43,14 @@ import java.util.Properties;
 import java.util.Random;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.ASSIGN_PARTITIONS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOTSTRAP_SERVERS;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FIELD_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY_FIELDS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SEMANTICS;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
 
@@ -71,20 +71,22 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
     public KafkaSinkWriter(
             SinkWriter.Context context,
             SeaTunnelRowType seaTunnelRowType,
-            Config pluginConfig,
+            ReadonlyConfig pluginConfig,
             List<KafkaSinkState> kafkaStates) {
         this.context = context;
         this.seaTunnelRowType = seaTunnelRowType;
-        if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
-            MessageContentPartitioner.setAssignPartitions(
-                    pluginConfig.getStringList(ASSIGN_PARTITIONS.key()));
+        if (pluginConfig.get(ASSIGN_PARTITIONS) != null
+                && 
!CollectionUtils.isEmpty(pluginConfig.get(ASSIGN_PARTITIONS))) {
+            
MessageContentPartitioner.setAssignPartitions(pluginConfig.get(ASSIGN_PARTITIONS));
         }
-        if (pluginConfig.hasPath(TRANSACTION_PREFIX.key())) {
-            this.transactionPrefix = 
pluginConfig.getString(TRANSACTION_PREFIX.key());
+
+        if (pluginConfig.get(TRANSACTION_PREFIX) != null) {
+            this.transactionPrefix = pluginConfig.get(TRANSACTION_PREFIX);
         } else {
             Random random = new Random();
             this.transactionPrefix = String.format("SeaTunnel%04d", 
random.nextInt(PREFIX_RANGE));
         }
+
         restoreState(kafkaStates);
         this.seaTunnelRowSerializer = getSerializer(pluginConfig, 
seaTunnelRowType);
         if 
(KafkaSemantics.EXACTLY_ONCE.equals(getKafkaSemantics(pluginConfig))) {
@@ -141,21 +143,20 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
         }
     }
 
-    private Properties getKafkaProperties(Config pluginConfig) {
+    private Properties getKafkaProperties(ReadonlyConfig pluginConfig) {
         Properties kafkaProperties = new Properties();
-        if (CheckConfigUtil.isValidParam(pluginConfig, KAFKA_CONFIG.key())) {
-            pluginConfig
-                    .getObject(KAFKA_CONFIG.key())
-                    .forEach((key, value) -> kafkaProperties.put(key, 
value.unwrapped()));
+        if (pluginConfig.get(KAFKA_CONFIG) != null) {
+            pluginConfig.get(KAFKA_CONFIG).forEach((key, value) -> 
kafkaProperties.put(key, value));
         }
-        if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
+
+        if (pluginConfig.get(ASSIGN_PARTITIONS) != null) {
             kafkaProperties.put(
                     ProducerConfig.PARTITIONER_CLASS_CONFIG,
                     
"org.apache.seatunnel.connectors.seatunnel.kafka.sink.MessageContentPartitioner");
         }
+
         kafkaProperties.put(
-                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
-                
pluginConfig.getString(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
pluginConfig.get(BOOTSTRAP_SERVERS));
         kafkaProperties.put(
                 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         kafkaProperties.put(
@@ -164,24 +165,18 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
     }
 
     private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(
-            Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
-        ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(pluginConfig);
-        MessageFormat messageFormat = readonlyConfig.get(FORMAT);
+            ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+        MessageFormat messageFormat = pluginConfig.get(FORMAT);
         String delimiter = DEFAULT_FIELD_DELIMITER;
-        if (pluginConfig.hasPath(FIELD_DELIMITER.key())) {
-            delimiter = pluginConfig.getString(FIELD_DELIMITER.key());
-        }
-        String topic = null;
-        if (pluginConfig.hasPath(TOPIC.key())) {
-            topic = pluginConfig.getString(TOPIC.key());
+
+        if (pluginConfig.get(FIELD_DELIMITER) != null) {
+            delimiter = pluginConfig.get(FIELD_DELIMITER);
         }
-        if (pluginConfig.hasPath(PARTITION.key())) {
+
+        String topic = pluginConfig.get(TOPIC);
+        if (pluginConfig.get(PARTITION) != null) {
             return DefaultSeaTunnelRowSerializer.create(
-                    topic,
-                    pluginConfig.getInt(PARTITION.key()),
-                    seaTunnelRowType,
-                    messageFormat,
-                    delimiter);
+                    topic, pluginConfig.get(PARTITION), seaTunnelRowType, 
messageFormat, delimiter);
         } else {
             return DefaultSeaTunnelRowSerializer.create(
                     topic,
@@ -192,9 +187,9 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
         }
     }
 
-    private KafkaSemantics getKafkaSemantics(Config pluginConfig) {
-        if (pluginConfig.hasPath("semantics")) {
-            return pluginConfig.getEnum(KafkaSemantics.class, "semantics");
+    private KafkaSemantics getKafkaSemantics(ReadonlyConfig pluginConfig) {
+        if (pluginConfig.get(SEMANTICS) != null) {
+            return pluginConfig.get(SEMANTICS);
         }
         return KafkaSemantics.NON;
     }
@@ -211,10 +206,10 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
     }
 
     private List<String> getPartitionKeyFields(
-            Config pluginConfig, SeaTunnelRowType seaTunnelRowType) {
-        if (pluginConfig.hasPath(PARTITION_KEY_FIELDS.key())) {
-            List<String> partitionKeyFields =
-                    pluginConfig.getStringList(PARTITION_KEY_FIELDS.key());
+            ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
+
+        if (pluginConfig.get(PARTITION_KEY_FIELDS) != null) {
+            List<String> partitionKeyFields = 
pluginConfig.get(PARTITION_KEY_FIELDS);
             List<String> rowTypeFieldNames = 
Arrays.asList(seaTunnelRowType.getFieldNames());
             for (String partitionKeyField : partitionKeyFields) {
                 if (!rowTypeFieldNames.contains(partitionKeyField)) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java
index d991cea1b..ad548295b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cassandra-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cassandra/CassandraIT.java
@@ -68,6 +68,7 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.util.ArrayList;
@@ -141,10 +142,12 @@ public class CassandraIT extends TestSuiteBase implements 
TestResource {
             session.execute(
                     SimpleStatement.builder(config.getString(SOURCE_TABLE))
                             .setKeyspace(KEYSPACE)
+                            .setTimeout(Duration.ofSeconds(10))
                             .build());
             session.execute(
                     SimpleStatement.builder(config.getString(SINK_TABLE))
                             .setKeyspace(KEYSPACE)
+                            .setTimeout(Duration.ofSeconds(10))
                             .build());
         } catch (Exception e) {
             throw new RuntimeException("Initializing Cassandra table failed!", 
e);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf
index 25f0bd75e..59bd7e9e8 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/extractTopic_fake_to_kafka.conf
@@ -73,6 +73,7 @@ sink {
     source_table_name = "fake1"
     bootstrap.servers = "kafkaCluster:9092"
     topic = "${c_string}"
+    format = json
     partition_key_fields = ["c_map","c_string"]
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
index 086136bf5..a4ad964d3 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasink_fake_to_kafka.conf
@@ -63,6 +63,7 @@ sink {
   Kafka {
     bootstrap.servers = "kafkaCluster:9092"
     topic = "test_topic"
+    format = json
     partition_key_fields = ["c_map","c_string"]
   }
 }
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
index 2b8e219aa..367b13c0f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafkasource_canal_to_kafka.conf
@@ -53,5 +53,6 @@ sink {
     bootstrap.servers = "kafkaCluster:9092"
     topic = "test-canal-sink"
     format = canal_json
+    partition = 0
   }
 }
\ No newline at end of file

Reply via email to