This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d9201108cf [improve] cassandra connector options (#8608)
d9201108cf is described below

commit d9201108cf4614497820cd56f625ce195e0ad0c5
Author: Jarvis <[email protected]>
AuthorDate: Fri Feb 7 13:57:20 2025 +0800

    [improve] cassandra connector options (#8608)
---
 docs/en/connector-v2/sink/Cassandra.md             |  4 +-
 .../seatunnel/api/ConnectorOptionCheckTest.java    |  2 -
 ...sandraConfig.java => CassandraBaseOptions.java} | 23 +-----
 .../cassandra/config/CassandraParameters.java      | 65 ++++------------
 ...sandraConfig.java => CassandraSinkOptions.java} | 33 ++------
 .../cassandra/config/CassandraSourceOptions.java   | 27 +++++++
 .../seatunnel/cassandra/sink/CassandraSink.java    | 59 +++++---------
 .../cassandra/sink/CassandraSinkFactory.java       | 36 ++++++---
 .../cassandra/source/CassandraSource.java          | 90 +++++++++++-----------
 .../cassandra/source/CassandraSourceFactory.java   | 31 +++++++-
 10 files changed, 169 insertions(+), 201 deletions(-)

diff --git a/docs/en/connector-v2/sink/Cassandra.md 
b/docs/en/connector-v2/sink/Cassandra.md
index 73c6d3aba5..71bcb1e6e7 100644
--- a/docs/en/connector-v2/sink/Cassandra.md
+++ b/docs/en/connector-v2/sink/Cassandra.md
@@ -12,7 +12,7 @@ Write data to Apache Cassandra.
 
 ## Options
 
-|       name        |  type   | required | default value |
+|       name        | type    | required | default value |
 |-------------------|---------|----------|---------------|
 | host              | String  | Yes      | -             |
 | keyspace          | String  | Yes      | -             |
@@ -21,7 +21,7 @@ Write data to Apache Cassandra.
 | password          | String  | No       | -             |
 | datacenter        | String  | No       | datacenter1   |
 | consistency_level | String  | No       | LOCAL_ONE     |
-| fields            | String  | No       | LOCAL_ONE     |
+| fields            | Array   | No       | -             |
 | batch_size        | int     | No       | 5000          |
 | batch_type        | String  | No       | UNLOGGED      |
 | async_write       | boolean | No       | true          |
diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 1e0464952b..513606c183 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -213,7 +213,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("MaxcomputeSourceOptions");
         whiteList.add("InfluxDBSourceOptions");
         whiteList.add("InfluxDBSinkOptions");
-        whiteList.add("CassandraSinkOptions");
         whiteList.add("KuduSourceOptions");
         whiteList.add("SocketSinkOptions");
         whiteList.add("DataHubSinkOptions");
@@ -237,7 +236,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("SocketSourceOptions");
         whiteList.add("OpenMldbSourceOptions");
         whiteList.add("Web3jSourceOptions");
-        whiteList.add("CassandraSourceOptions");
         whiteList.add("RedisSourceOptions");
         whiteList.add("PostgresIncrementalSourceOptions");
         whiteList.add("SqlServerIncrementalSourceOptions");
diff --git 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraBaseOptions.java
similarity index 67%
copy from 
seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
copy to 
seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraBaseOptions.java
index 7f38dc27d3..7addd8e876 100644
--- 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
+++ 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraBaseOptions.java
@@ -20,7 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.cassandra.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-public class CassandraConfig {
+public class CassandraBaseOptions {
 
     public static final Integer DEFAULT_BATCH_SIZE = 5000;
 
@@ -42,25 +42,4 @@ public class CassandraConfig {
                     .stringType()
                     .defaultValue("LOCAL_ONE")
                     .withDescription("");
-
-    public static final Option<String> TABLE =
-            
Options.key("table").stringType().noDefaultValue().withDescription("");
-
-    public static final Option<String> FIELDS =
-            
Options.key("fields").stringType().defaultValue("LOCAL_ONE").withDescription("");
-
-    public static final Option<Integer> BATCH_SIZE =
-            Options.key("batch_size")
-                    .intType()
-                    .defaultValue(DEFAULT_BATCH_SIZE)
-                    .withDescription("");
-
-    public static final Option<String> BATCH_TYPE =
-            
Options.key("batch_type").stringType().defaultValue("UNLOGGED").withDescription("");
-
-    public static final Option<Boolean> ASYNC_WRITE =
-            
Options.key("async_write").booleanType().defaultValue(true).withDescription("");
-
-    public static final Option<String> CQL =
-            
Options.key("cql").stringType().noDefaultValue().withDescription("");
 }
diff --git 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraParameters.java
 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraParameters.java
index 14a66c6eed..97559df9f6 100644
--- 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraParameters.java
+++ 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraParameters.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.cassandra.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
 import com.datastax.oss.driver.api.core.ConsistencyLevel;
 import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
@@ -44,54 +44,19 @@ public class CassandraParameters implements Serializable {
     private DefaultBatchType batchType;
     private Boolean asyncWrite;
 
-    public void buildWithConfig(Config config) {
-        this.host = config.getString(CassandraConfig.HOST.key());
-        this.keyspace = config.getString(CassandraConfig.KEYSPACE.key());
-
-        if (config.hasPath(CassandraConfig.USERNAME.key())) {
-            this.username = config.getString(CassandraConfig.USERNAME.key());
-        }
-        if (config.hasPath(CassandraConfig.PASSWORD.key())) {
-            this.password = config.getString(CassandraConfig.PASSWORD.key());
-        }
-        if (config.hasPath(CassandraConfig.DATACENTER.key())) {
-            this.datacenter = 
config.getString(CassandraConfig.DATACENTER.key());
-        } else {
-            this.datacenter = CassandraConfig.DATACENTER.defaultValue();
-        }
-        if (config.hasPath(CassandraConfig.TABLE.key())) {
-            this.table = config.getString(CassandraConfig.TABLE.key());
-        }
-        if (config.hasPath(CassandraConfig.CQL.key())) {
-            this.cql = config.getString(CassandraConfig.CQL.key());
-        }
-        if (config.hasPath(CassandraConfig.FIELDS.key())) {
-            this.fields = config.getStringList(CassandraConfig.FIELDS.key());
-        }
-        if (config.hasPath(CassandraConfig.CONSISTENCY_LEVEL.key())) {
-            this.consistencyLevel =
-                    DefaultConsistencyLevel.valueOf(
-                            
config.getString(CassandraConfig.CONSISTENCY_LEVEL.key()));
-        } else {
-            this.consistencyLevel =
-                    DefaultConsistencyLevel.valueOf(
-                            CassandraConfig.CONSISTENCY_LEVEL.defaultValue());
-        }
-        if (config.hasPath(CassandraConfig.BATCH_SIZE.key())) {
-            this.batchSize = config.getInt(CassandraConfig.BATCH_SIZE.key());
-        } else {
-            this.batchSize = CassandraConfig.BATCH_SIZE.defaultValue();
-        }
-        if (config.hasPath(CassandraConfig.BATCH_TYPE.key())) {
-            this.batchType =
-                    
DefaultBatchType.valueOf(config.getString(CassandraConfig.BATCH_TYPE.key()));
-        } else {
-            this.batchType = 
DefaultBatchType.valueOf(CassandraConfig.BATCH_TYPE.defaultValue());
-        }
-        if (config.hasPath(CassandraConfig.ASYNC_WRITE.key())) {
-            this.asyncWrite = 
config.getBoolean(CassandraConfig.ASYNC_WRITE.key());
-        } else {
-            this.asyncWrite = true;
-        }
+    public void buildWithConfig(ReadonlyConfig config) {
+        this.host = config.get(CassandraBaseOptions.HOST);
+        this.keyspace = config.get(CassandraBaseOptions.KEYSPACE);
+        this.username = config.get(CassandraBaseOptions.USERNAME);
+        this.password = config.get(CassandraBaseOptions.PASSWORD);
+        this.datacenter = config.get(CassandraBaseOptions.DATACENTER);
+        this.table = config.get(CassandraSinkOptions.TABLE);
+        this.cql = config.get(CassandraSourceOptions.CQL);
+        this.fields = config.get(CassandraSinkOptions.FIELDS);
+        this.consistencyLevel =
+                
DefaultConsistencyLevel.valueOf(config.get(CassandraBaseOptions.CONSISTENCY_LEVEL));
+        this.batchSize = config.get(CassandraSinkOptions.BATCH_SIZE);
+        this.batchType = 
DefaultBatchType.valueOf(config.get(CassandraSinkOptions.BATCH_TYPE));
+        this.asyncWrite = config.get(CassandraSinkOptions.ASYNC_WRITE);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraSinkOptions.java
similarity index 56%
rename from 
seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
rename to 
seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraSinkOptions.java
index 7f38dc27d3..9197de9ef5 100644
--- 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
+++ 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraSinkOptions.java
@@ -20,34 +20,18 @@ package 
org.apache.seatunnel.connectors.seatunnel.cassandra.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-public class CassandraConfig {
+import java.util.List;
 
-    public static final Integer DEFAULT_BATCH_SIZE = 5000;
-
-    public static final Option<String> HOST =
-            
Options.key("host").stringType().noDefaultValue().withDescription("");
-
-    public static final Option<String> KEYSPACE =
-            
Options.key("keyspace").stringType().noDefaultValue().withDescription("");
-
-    public static final Option<String> USERNAME =
-            
Options.key("username").stringType().noDefaultValue().withDescription("");
-    public static final Option<String> PASSWORD =
-            
Options.key("password").stringType().noDefaultValue().withDescription("");
-    public static final Option<String> DATACENTER =
-            
Options.key("datacenter").stringType().defaultValue("datacenter1").withDescription("");
-
-    public static final Option<String> CONSISTENCY_LEVEL =
-            Options.key("consistency_level")
-                    .stringType()
-                    .defaultValue("LOCAL_ONE")
-                    .withDescription("");
+public class CassandraSinkOptions extends CassandraBaseOptions {
 
     public static final Option<String> TABLE =
             
Options.key("table").stringType().noDefaultValue().withDescription("");
 
-    public static final Option<String> FIELDS =
-            
Options.key("fields").stringType().defaultValue("LOCAL_ONE").withDescription("");
+    public static final Option<List<String>> FIELDS =
+            Options.key("fields")
+                    .listType()
+                    .noDefaultValue()
+                    .withDescription("The fields need write to cassandra");
 
     public static final Option<Integer> BATCH_SIZE =
             Options.key("batch_size")
@@ -60,7 +44,4 @@ public class CassandraConfig {
 
     public static final Option<Boolean> ASYNC_WRITE =
             
Options.key("async_write").booleanType().defaultValue(true).withDescription("");
-
-    public static final Option<String> CQL =
-            
Options.key("cql").stringType().noDefaultValue().withDescription("");
 }
diff --git 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraSourceOptions.java
 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraSourceOptions.java
new file mode 100644
index 0000000000..0ccc8ba471
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraSourceOptions.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class CassandraSourceOptions extends CassandraBaseOptions {
+
+    public static final Option<String> CQL =
+            
Options.key("cql").stringType().noDefaultValue().withDescription("");
+}
diff --git 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
index 9b37d94266..ed92ff6067 100644
--- 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
+++ 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
@@ -17,17 +17,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.cassandra.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
@@ -39,43 +33,26 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 
 import com.datastax.oss.driver.api.core.CqlSession;
 import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
-import com.google.auto.service.AutoService;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.KEYSPACE;
-import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.TABLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.TABLE;
 
-@AutoService(SeaTunnelSink.class)
 public class CassandraSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
 
-    private final CassandraParameters cassandraParameters = new 
CassandraParameters();
-    private SeaTunnelRowType seaTunnelRowType;
-
-    private ColumnDefinitions tableSchema;
-
-    @Override
-    public String getPluginName() {
-        return "Cassandra";
-    }
+    private final CassandraParameters cassandraParameters;
+    private final CatalogTable catalogTable;
+    private final ColumnDefinitions tableSchema;
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult checkResult =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig, HOST.key(), KEYSPACE.key(), TABLE.key());
-        if (!checkResult.isSuccess()) {
-            throw new CassandraConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
checkResult.getMsg()));
-        }
-        this.cassandraParameters.buildWithConfig(pluginConfig);
+    public CassandraSink(
+            CassandraParameters cassandraParameters,
+            CatalogTable catalogTable,
+            ReadonlyConfig pluginConfig) {
+        this.cassandraParameters = cassandraParameters;
+        this.catalogTable = catalogTable;
         try (CqlSession session =
                 CassandraClient.getCqlSessionBuilder(
                                 cassandraParameters.getHost(),
@@ -85,8 +62,7 @@ public class CassandraSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
                                 cassandraParameters.getDatacenter())
                         .build()) {
             List<String> fields = cassandraParameters.getFields();
-            this.tableSchema =
-                    CassandraClient.getTableSchema(session, 
pluginConfig.getString(TABLE.key()));
+            this.tableSchema = CassandraClient.getTableSchema(session, 
pluginConfig.get(TABLE));
             if (fields == null || fields.isEmpty()) {
                 List<String> newFields = new ArrayList<>();
                 for (int i = 0; i < tableSchema.size(); i++) {
@@ -101,7 +77,7 @@ public class CassandraSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
                                 "Field "
                                         + field
                                         + " does not exist in table "
-                                        + pluginConfig.getString(TABLE.key()));
+                                        + pluginConfig.get(TABLE));
                     }
                 }
             }
@@ -115,18 +91,19 @@ public class CassandraSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
     }
 
     @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
+    public String getPluginName() {
+        return "Cassandra";
     }
 
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
-        return new CassandraSinkWriter(cassandraParameters, seaTunnelRowType, 
tableSchema);
+        return new CassandraSinkWriter(
+                cassandraParameters, catalogTable.getSeaTunnelRowType(), 
tableSchema);
     }
 
     @Override
     public Optional<CatalogTable> getWriteCatalogTable() {
-        return super.getWriteCatalogTable();
+        return Optional.of(catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkFactory.java
 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkFactory.java
index 471847bbc6..a9e9d9e4c3 100644
--- 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkFactory.java
@@ -18,12 +18,26 @@
 package org.apache.seatunnel.connectors.seatunnel.cassandra.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
 
 import com.google.auto.service.AutoService;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.ASYNC_WRITE;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.BATCH_TYPE;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.CONSISTENCY_LEVEL;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.DATACENTER;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.FIELDS;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.KEYSPACE;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.TABLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.USERNAME;
+
 @AutoService(Factory.class)
 public class CassandraSinkFactory implements TableSinkFactory {
     @Override
@@ -34,15 +48,19 @@ public class CassandraSinkFactory implements 
TableSinkFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(CassandraConfig.HOST, CassandraConfig.KEYSPACE, 
CassandraConfig.TABLE)
-                .bundled(CassandraConfig.USERNAME, CassandraConfig.PASSWORD)
+                .required(HOST, KEYSPACE, TABLE)
+                .bundled(USERNAME, PASSWORD)
                 .optional(
-                        CassandraConfig.DATACENTER,
-                        CassandraConfig.CONSISTENCY_LEVEL,
-                        CassandraConfig.FIELDS,
-                        CassandraConfig.BATCH_SIZE,
-                        CassandraConfig.BATCH_TYPE,
-                        CassandraConfig.ASYNC_WRITE)
+                        DATACENTER, CONSISTENCY_LEVEL, FIELDS, BATCH_SIZE, 
BATCH_TYPE, ASYNC_WRITE)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        CassandraParameters cassandraParameters = new CassandraParameters();
+        cassandraParameters.buildWithConfig(context.getOptions());
+        return () ->
+                new CassandraSink(
+                        cassandraParameters, context.getCatalogTable(), 
context.getOptions());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
index 661c3e37ce..46e19b5a37 100644
--- 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
+++ 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
@@ -17,19 +17,14 @@
 
 package org.apache.seatunnel.connectors.seatunnel.cassandra.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportColumnProjection;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
 import 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
@@ -42,40 +37,25 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReader
 
 import com.datastax.oss.driver.api.core.CqlSession;
 import com.datastax.oss.driver.api.core.cql.Row;
-import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.CQL;
-import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.HOST;
-import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.KEYSPACE;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSourceOptions.CQL;
 
-@AutoService(SeaTunnelSource.class)
 public class CassandraSource extends AbstractSingleSplitSource<SeaTunnelRow>
         implements SupportColumnProjection {
 
-    private SeaTunnelRowType rowTypeInfo;
-    private final CassandraParameters cassandraParameters = new 
CassandraParameters();
+    private final CassandraParameters cassandraParameters;
+    private final CatalogTable catalogTable;
 
-    @Override
-    public String getPluginName() {
-        return "Cassandra";
-    }
+    public CassandraSource(CassandraParameters cassandraParameters, 
ReadonlyConfig pluginConfig) {
+        this.cassandraParameters = cassandraParameters;
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult checkResult =
-                CheckConfigUtil.checkAllExists(pluginConfig, HOST.key(), 
KEYSPACE.key(), CQL.key());
-        if (!checkResult.isSuccess()) {
-            throw new CassandraConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
checkResult.getMsg()));
-        }
-        this.cassandraParameters.buildWithConfig(pluginConfig);
         try (CqlSession currentSession =
                 CassandraClient.getCqlSessionBuilder(
-                                pluginConfig.getString(HOST.key()),
-                                pluginConfig.getString(KEYSPACE.key()),
+                                cassandraParameters.getHost(),
+                                cassandraParameters.getKeyspace(),
                                 cassandraParameters.getUsername(),
                                 cassandraParameters.getPassword(),
                                 cassandraParameters.getDatacenter())
@@ -84,23 +64,38 @@ public class CassandraSource extends 
AbstractSingleSplitSource<SeaTunnelRow>
                     currentSession
                             .execute(
                                     CassandraClient.createSimpleStatement(
-                                            pluginConfig.getString(CQL.key()),
+                                            pluginConfig.get(CQL),
                                             
cassandraParameters.getConsistencyLevel()))
                             .one();
             if (rs == null) {
                 throw new CassandraConnectorException(
                         CassandraConnectorErrorCode.NO_DATA_IN_SOURCE_TABLE,
-                        "No data select from this cql: " + 
pluginConfig.getConfig(CQL.key()));
+                        "No data select from this cql: " + 
pluginConfig.get(CQL));
             }
             int columnSize = rs.getColumnDefinitions().size();
-            String[] fieldNames = new String[columnSize];
-            SeaTunnelDataType<?>[] seaTunnelDataTypes = new 
SeaTunnelDataType[columnSize];
+            TableSchema.Builder schemaBuilder = TableSchema.builder();
+            String tableName = "default";
             for (int i = 0; i < columnSize; i++) {
-                fieldNames[i] = 
rs.getColumnDefinitions().get(i).getName().asInternal();
-                seaTunnelDataTypes[i] =
-                        
TypeConvertUtil.convert(rs.getColumnDefinitions().get(i).getType());
+                PhysicalColumn physicalColumn =
+                        PhysicalColumn.of(
+                                
rs.getColumnDefinitions().get(i).getName().asInternal(),
+                                
TypeConvertUtil.convert(rs.getColumnDefinitions().get(i).getType()),
+                                null,
+                                null,
+                                true,
+                                null,
+                                null);
+                schemaBuilder.column(physicalColumn);
+                tableName = 
rs.getColumnDefinitions().get(i).getTable().asInternal();
             }
-            this.rowTypeInfo = new SeaTunnelRowType(fieldNames, 
seaTunnelDataTypes);
+            catalogTable =
+                    CatalogTable.of(
+                            TableIdentifier.of(
+                                    getPluginName(), 
cassandraParameters.getKeyspace(), tableName),
+                            schemaBuilder.build(),
+                            Collections.emptyMap(),
+                            Collections.emptyList(),
+                            "");
         } catch (Exception e) {
             throw new CassandraConnectorException(
                     CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED,
@@ -109,14 +104,19 @@ public class CassandraSource extends 
AbstractSingleSplitSource<SeaTunnelRow>
         }
     }
 
+    @Override
+    public String getPluginName() {
+        return "Cassandra";
+    }
+
     @Override
     public Boundedness getBoundedness() {
         return Boundedness.BOUNDED;
     }
 
     @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        return this.rowTypeInfo;
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceFactory.java
index 9c15411151..7160d19e1e 100644
--- 
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceFactory.java
@@ -19,12 +19,25 @@ package 
org.apache.seatunnel.connectors.seatunnel.cassandra.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSourceOptions.CONSISTENCY_LEVEL;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSourceOptions.CQL;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSourceOptions.DATACENTER;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSourceOptions.HOST;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSourceOptions.KEYSPACE;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSourceOptions.PASSWORD;
+import static 
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSourceOptions.USERNAME;
+
 @AutoService(Factory.class)
 public class CassandraSourceFactory implements TableSourceFactory {
     @Override
@@ -35,12 +48,22 @@ public class CassandraSourceFactory implements 
TableSourceFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(CassandraConfig.HOST, CassandraConfig.KEYSPACE, 
CassandraConfig.CQL)
-                .bundled(CassandraConfig.USERNAME, CassandraConfig.PASSWORD)
-                .optional(CassandraConfig.DATACENTER, 
CassandraConfig.CONSISTENCY_LEVEL)
+                .required(HOST, KEYSPACE, CQL)
+                .bundled(USERNAME, PASSWORD)
+                .optional(DATACENTER, CONSISTENCY_LEVEL)
                 .build();
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        CassandraParameters cassandraParameters = new CassandraParameters();
+        cassandraParameters.buildWithConfig(context.getOptions());
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new CassandraSource(cassandraParameters, 
context.getOptions());
+    }
+
     @Override
     public Class<? extends SeaTunnelSource> getSourceClass() {
         return CassandraSource.class;


Reply via email to