This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d9201108cf [improve] cassandra connector options (#8608)
d9201108cf is described below
commit d9201108cf4614497820cd56f625ce195e0ad0c5
Author: Jarvis <[email protected]>
AuthorDate: Fri Feb 7 13:57:20 2025 +0800
[improve] cassandra connector options (#8608)
---
docs/en/connector-v2/sink/Cassandra.md | 4 +-
.../seatunnel/api/ConnectorOptionCheckTest.java | 2 -
...sandraConfig.java => CassandraBaseOptions.java} | 23 +-----
.../cassandra/config/CassandraParameters.java | 65 ++++------------
...sandraConfig.java => CassandraSinkOptions.java} | 33 ++------
.../cassandra/config/CassandraSourceOptions.java | 27 +++++++
.../seatunnel/cassandra/sink/CassandraSink.java | 59 +++++---------
.../cassandra/sink/CassandraSinkFactory.java | 36 ++++++---
.../cassandra/source/CassandraSource.java | 90 +++++++++++-----------
.../cassandra/source/CassandraSourceFactory.java | 31 +++++++-
10 files changed, 169 insertions(+), 201 deletions(-)
diff --git a/docs/en/connector-v2/sink/Cassandra.md
b/docs/en/connector-v2/sink/Cassandra.md
index 73c6d3aba5..71bcb1e6e7 100644
--- a/docs/en/connector-v2/sink/Cassandra.md
+++ b/docs/en/connector-v2/sink/Cassandra.md
@@ -12,7 +12,7 @@ Write data to Apache Cassandra.
## Options
-| name | type | required | default value |
+| name | type | required | default value |
|-------------------|---------|----------|---------------|
| host | String | Yes | - |
| keyspace | String | Yes | - |
@@ -21,7 +21,7 @@ Write data to Apache Cassandra.
| password | String | No | - |
| datacenter | String | No | datacenter1 |
| consistency_level | String | No | LOCAL_ONE |
-| fields | String | No | LOCAL_ONE |
+| fields | Array | No | - |
| batch_size | int | No | 5000 |
| batch_type | String | No | UNLOGGED |
| async_write | boolean | No | true |
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 1e0464952b..513606c183 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -213,7 +213,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("MaxcomputeSourceOptions");
whiteList.add("InfluxDBSourceOptions");
whiteList.add("InfluxDBSinkOptions");
- whiteList.add("CassandraSinkOptions");
whiteList.add("KuduSourceOptions");
whiteList.add("SocketSinkOptions");
whiteList.add("DataHubSinkOptions");
@@ -237,7 +236,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("SocketSourceOptions");
whiteList.add("OpenMldbSourceOptions");
whiteList.add("Web3jSourceOptions");
- whiteList.add("CassandraSourceOptions");
whiteList.add("RedisSourceOptions");
whiteList.add("PostgresIncrementalSourceOptions");
whiteList.add("SqlServerIncrementalSourceOptions");
diff --git
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraBaseOptions.java
similarity index 67%
copy from
seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
copy to
seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraBaseOptions.java
index 7f38dc27d3..7addd8e876 100644
---
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
+++
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraBaseOptions.java
@@ -20,7 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.cassandra.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-public class CassandraConfig {
+public class CassandraBaseOptions {
public static final Integer DEFAULT_BATCH_SIZE = 5000;
@@ -42,25 +42,4 @@ public class CassandraConfig {
.stringType()
.defaultValue("LOCAL_ONE")
.withDescription("");
-
- public static final Option<String> TABLE =
-
Options.key("table").stringType().noDefaultValue().withDescription("");
-
- public static final Option<String> FIELDS =
-
Options.key("fields").stringType().defaultValue("LOCAL_ONE").withDescription("");
-
- public static final Option<Integer> BATCH_SIZE =
- Options.key("batch_size")
- .intType()
- .defaultValue(DEFAULT_BATCH_SIZE)
- .withDescription("");
-
- public static final Option<String> BATCH_TYPE =
-
Options.key("batch_type").stringType().defaultValue("UNLOGGED").withDescription("");
-
- public static final Option<Boolean> ASYNC_WRITE =
-
Options.key("async_write").booleanType().defaultValue(true).withDescription("");
-
- public static final Option<String> CQL =
-
Options.key("cql").stringType().noDefaultValue().withDescription("");
}
diff --git
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraParameters.java
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraParameters.java
index 14a66c6eed..97559df9f6 100644
---
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraParameters.java
+++
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraParameters.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.cassandra.config;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
@@ -44,54 +44,19 @@ public class CassandraParameters implements Serializable {
private DefaultBatchType batchType;
private Boolean asyncWrite;
- public void buildWithConfig(Config config) {
- this.host = config.getString(CassandraConfig.HOST.key());
- this.keyspace = config.getString(CassandraConfig.KEYSPACE.key());
-
- if (config.hasPath(CassandraConfig.USERNAME.key())) {
- this.username = config.getString(CassandraConfig.USERNAME.key());
- }
- if (config.hasPath(CassandraConfig.PASSWORD.key())) {
- this.password = config.getString(CassandraConfig.PASSWORD.key());
- }
- if (config.hasPath(CassandraConfig.DATACENTER.key())) {
- this.datacenter =
config.getString(CassandraConfig.DATACENTER.key());
- } else {
- this.datacenter = CassandraConfig.DATACENTER.defaultValue();
- }
- if (config.hasPath(CassandraConfig.TABLE.key())) {
- this.table = config.getString(CassandraConfig.TABLE.key());
- }
- if (config.hasPath(CassandraConfig.CQL.key())) {
- this.cql = config.getString(CassandraConfig.CQL.key());
- }
- if (config.hasPath(CassandraConfig.FIELDS.key())) {
- this.fields = config.getStringList(CassandraConfig.FIELDS.key());
- }
- if (config.hasPath(CassandraConfig.CONSISTENCY_LEVEL.key())) {
- this.consistencyLevel =
- DefaultConsistencyLevel.valueOf(
-
config.getString(CassandraConfig.CONSISTENCY_LEVEL.key()));
- } else {
- this.consistencyLevel =
- DefaultConsistencyLevel.valueOf(
- CassandraConfig.CONSISTENCY_LEVEL.defaultValue());
- }
- if (config.hasPath(CassandraConfig.BATCH_SIZE.key())) {
- this.batchSize = config.getInt(CassandraConfig.BATCH_SIZE.key());
- } else {
- this.batchSize = CassandraConfig.BATCH_SIZE.defaultValue();
- }
- if (config.hasPath(CassandraConfig.BATCH_TYPE.key())) {
- this.batchType =
-
DefaultBatchType.valueOf(config.getString(CassandraConfig.BATCH_TYPE.key()));
- } else {
- this.batchType =
DefaultBatchType.valueOf(CassandraConfig.BATCH_TYPE.defaultValue());
- }
- if (config.hasPath(CassandraConfig.ASYNC_WRITE.key())) {
- this.asyncWrite =
config.getBoolean(CassandraConfig.ASYNC_WRITE.key());
- } else {
- this.asyncWrite = true;
- }
+ public void buildWithConfig(ReadonlyConfig config) {
+ this.host = config.get(CassandraBaseOptions.HOST);
+ this.keyspace = config.get(CassandraBaseOptions.KEYSPACE);
+ this.username = config.get(CassandraBaseOptions.USERNAME);
+ this.password = config.get(CassandraBaseOptions.PASSWORD);
+ this.datacenter = config.get(CassandraBaseOptions.DATACENTER);
+ this.table = config.get(CassandraSinkOptions.TABLE);
+ this.cql = config.get(CassandraSourceOptions.CQL);
+ this.fields = config.get(CassandraSinkOptions.FIELDS);
+ this.consistencyLevel =
+
DefaultConsistencyLevel.valueOf(config.get(CassandraBaseOptions.CONSISTENCY_LEVEL));
+ this.batchSize = config.get(CassandraSinkOptions.BATCH_SIZE);
+ this.batchType =
DefaultBatchType.valueOf(config.get(CassandraSinkOptions.BATCH_TYPE));
+ this.asyncWrite = config.get(CassandraSinkOptions.ASYNC_WRITE);
}
}
diff --git
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraSinkOptions.java
similarity index 56%
rename from
seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
rename to
seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraSinkOptions.java
index 7f38dc27d3..9197de9ef5 100644
---
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraConfig.java
+++
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraSinkOptions.java
@@ -20,34 +20,18 @@ package
org.apache.seatunnel.connectors.seatunnel.cassandra.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-public class CassandraConfig {
+import java.util.List;
- public static final Integer DEFAULT_BATCH_SIZE = 5000;
-
- public static final Option<String> HOST =
-
Options.key("host").stringType().noDefaultValue().withDescription("");
-
- public static final Option<String> KEYSPACE =
-
Options.key("keyspace").stringType().noDefaultValue().withDescription("");
-
- public static final Option<String> USERNAME =
-
Options.key("username").stringType().noDefaultValue().withDescription("");
- public static final Option<String> PASSWORD =
-
Options.key("password").stringType().noDefaultValue().withDescription("");
- public static final Option<String> DATACENTER =
-
Options.key("datacenter").stringType().defaultValue("datacenter1").withDescription("");
-
- public static final Option<String> CONSISTENCY_LEVEL =
- Options.key("consistency_level")
- .stringType()
- .defaultValue("LOCAL_ONE")
- .withDescription("");
+public class CassandraSinkOptions extends CassandraBaseOptions {
public static final Option<String> TABLE =
Options.key("table").stringType().noDefaultValue().withDescription("");
- public static final Option<String> FIELDS =
-
Options.key("fields").stringType().defaultValue("LOCAL_ONE").withDescription("");
+ public static final Option<List<String>> FIELDS =
+ Options.key("fields")
+ .listType()
+ .noDefaultValue()
+ .withDescription("The fields need write to cassandra");
public static final Option<Integer> BATCH_SIZE =
Options.key("batch_size")
@@ -60,7 +44,4 @@ public class CassandraConfig {
public static final Option<Boolean> ASYNC_WRITE =
Options.key("async_write").booleanType().defaultValue(true).withDescription("");
-
- public static final Option<String> CQL =
-
Options.key("cql").stringType().noDefaultValue().withDescription("");
}
diff --git
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraSourceOptions.java
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraSourceOptions.java
new file mode 100644
index 0000000000..0ccc8ba471
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/config/CassandraSourceOptions.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cassandra.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class CassandraSourceOptions extends CassandraBaseOptions {
+
+ public static final Option<String> CQL =
+
Options.key("cql").stringType().noDefaultValue().withDescription("");
+}
diff --git
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
index 9b37d94266..ed92ff6067 100644
---
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
+++
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSink.java
@@ -17,17 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.cassandra.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import
org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
@@ -39,43 +33,26 @@ import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
-import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.KEYSPACE;
-import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.TABLE;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.TABLE;
-@AutoService(SeaTunnelSink.class)
public class CassandraSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
- private final CassandraParameters cassandraParameters = new
CassandraParameters();
- private SeaTunnelRowType seaTunnelRowType;
-
- private ColumnDefinitions tableSchema;
-
- @Override
- public String getPluginName() {
- return "Cassandra";
- }
+ private final CassandraParameters cassandraParameters;
+ private final CatalogTable catalogTable;
+ private final ColumnDefinitions tableSchema;
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult checkResult =
- CheckConfigUtil.checkAllExists(
- pluginConfig, HOST.key(), KEYSPACE.key(), TABLE.key());
- if (!checkResult.isSuccess()) {
- throw new CassandraConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
checkResult.getMsg()));
- }
- this.cassandraParameters.buildWithConfig(pluginConfig);
+ public CassandraSink(
+ CassandraParameters cassandraParameters,
+ CatalogTable catalogTable,
+ ReadonlyConfig pluginConfig) {
+ this.cassandraParameters = cassandraParameters;
+ this.catalogTable = catalogTable;
try (CqlSession session =
CassandraClient.getCqlSessionBuilder(
cassandraParameters.getHost(),
@@ -85,8 +62,7 @@ public class CassandraSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
cassandraParameters.getDatacenter())
.build()) {
List<String> fields = cassandraParameters.getFields();
- this.tableSchema =
- CassandraClient.getTableSchema(session,
pluginConfig.getString(TABLE.key()));
+ this.tableSchema = CassandraClient.getTableSchema(session,
pluginConfig.get(TABLE));
if (fields == null || fields.isEmpty()) {
List<String> newFields = new ArrayList<>();
for (int i = 0; i < tableSchema.size(); i++) {
@@ -101,7 +77,7 @@ public class CassandraSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
"Field "
+ field
+ " does not exist in table "
- + pluginConfig.getString(TABLE.key()));
+ + pluginConfig.get(TABLE));
}
}
}
@@ -115,18 +91,19 @@ public class CassandraSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
}
@Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
+ public String getPluginName() {
+ return "Cassandra";
}
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
throws IOException {
- return new CassandraSinkWriter(cassandraParameters, seaTunnelRowType,
tableSchema);
+ return new CassandraSinkWriter(
+ cassandraParameters, catalogTable.getSeaTunnelRowType(),
tableSchema);
}
@Override
public Optional<CatalogTable> getWriteCatalogTable() {
- return super.getWriteCatalogTable();
+ return Optional.of(catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkFactory.java
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkFactory.java
index 471847bbc6..a9e9d9e4c3 100644
---
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkFactory.java
@@ -18,12 +18,26 @@
package org.apache.seatunnel.connectors.seatunnel.cassandra.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
-import
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
import com.google.auto.service.AutoService;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.ASYNC_WRITE;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.BATCH_TYPE;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.CONSISTENCY_LEVEL;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.DATACENTER;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.FIELDS;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.HOST;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.KEYSPACE;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.TABLE;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSinkOptions.USERNAME;
+
@AutoService(Factory.class)
public class CassandraSinkFactory implements TableSinkFactory {
@Override
@@ -34,15 +48,19 @@ public class CassandraSinkFactory implements
TableSinkFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(CassandraConfig.HOST, CassandraConfig.KEYSPACE,
CassandraConfig.TABLE)
- .bundled(CassandraConfig.USERNAME, CassandraConfig.PASSWORD)
+ .required(HOST, KEYSPACE, TABLE)
+ .bundled(USERNAME, PASSWORD)
.optional(
- CassandraConfig.DATACENTER,
- CassandraConfig.CONSISTENCY_LEVEL,
- CassandraConfig.FIELDS,
- CassandraConfig.BATCH_SIZE,
- CassandraConfig.BATCH_TYPE,
- CassandraConfig.ASYNC_WRITE)
+ DATACENTER, CONSISTENCY_LEVEL, FIELDS, BATCH_SIZE,
BATCH_TYPE, ASYNC_WRITE)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ CassandraParameters cassandraParameters = new CassandraParameters();
+ cassandraParameters.buildWithConfig(context.getOptions());
+ return () ->
+ new CassandraSink(
+ cassandraParameters, context.getCatalogTable(),
context.getOptions());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
index 661c3e37ce..46e19b5a37 100644
---
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
+++
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSource.java
@@ -17,19 +17,14 @@
package org.apache.seatunnel.connectors.seatunnel.cassandra.source;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportColumnProjection;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
import
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
@@ -42,40 +37,25 @@ import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReader
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.Row;
-import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.CQL;
-import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.HOST;
-import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig.KEYSPACE;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSourceOptions.CQL;
-@AutoService(SeaTunnelSource.class)
public class CassandraSource extends AbstractSingleSplitSource<SeaTunnelRow>
implements SupportColumnProjection {
- private SeaTunnelRowType rowTypeInfo;
- private final CassandraParameters cassandraParameters = new
CassandraParameters();
+ private final CassandraParameters cassandraParameters;
+ private final CatalogTable catalogTable;
- @Override
- public String getPluginName() {
- return "Cassandra";
- }
+ public CassandraSource(CassandraParameters cassandraParameters,
ReadonlyConfig pluginConfig) {
+ this.cassandraParameters = cassandraParameters;
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult checkResult =
- CheckConfigUtil.checkAllExists(pluginConfig, HOST.key(),
KEYSPACE.key(), CQL.key());
- if (!checkResult.isSuccess()) {
- throw new CassandraConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SOURCE,
checkResult.getMsg()));
- }
- this.cassandraParameters.buildWithConfig(pluginConfig);
try (CqlSession currentSession =
CassandraClient.getCqlSessionBuilder(
- pluginConfig.getString(HOST.key()),
- pluginConfig.getString(KEYSPACE.key()),
+ cassandraParameters.getHost(),
+ cassandraParameters.getKeyspace(),
cassandraParameters.getUsername(),
cassandraParameters.getPassword(),
cassandraParameters.getDatacenter())
@@ -84,23 +64,38 @@ public class CassandraSource extends
AbstractSingleSplitSource<SeaTunnelRow>
currentSession
.execute(
CassandraClient.createSimpleStatement(
- pluginConfig.getString(CQL.key()),
+ pluginConfig.get(CQL),
cassandraParameters.getConsistencyLevel()))
.one();
if (rs == null) {
throw new CassandraConnectorException(
CassandraConnectorErrorCode.NO_DATA_IN_SOURCE_TABLE,
- "No data select from this cql: " +
pluginConfig.getConfig(CQL.key()));
+ "No data select from this cql: " +
pluginConfig.get(CQL));
}
int columnSize = rs.getColumnDefinitions().size();
- String[] fieldNames = new String[columnSize];
- SeaTunnelDataType<?>[] seaTunnelDataTypes = new
SeaTunnelDataType[columnSize];
+ TableSchema.Builder schemaBuilder = TableSchema.builder();
+ String tableName = "default";
for (int i = 0; i < columnSize; i++) {
- fieldNames[i] =
rs.getColumnDefinitions().get(i).getName().asInternal();
- seaTunnelDataTypes[i] =
-
TypeConvertUtil.convert(rs.getColumnDefinitions().get(i).getType());
+ PhysicalColumn physicalColumn =
+ PhysicalColumn.of(
+
rs.getColumnDefinitions().get(i).getName().asInternal(),
+
TypeConvertUtil.convert(rs.getColumnDefinitions().get(i).getType()),
+ null,
+ null,
+ true,
+ null,
+ null);
+ schemaBuilder.column(physicalColumn);
+ tableName =
rs.getColumnDefinitions().get(i).getTable().asInternal();
}
- this.rowTypeInfo = new SeaTunnelRowType(fieldNames,
seaTunnelDataTypes);
+ catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of(
+ getPluginName(),
cassandraParameters.getKeyspace(), tableName),
+ schemaBuilder.build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ "");
} catch (Exception e) {
throw new CassandraConnectorException(
CommonErrorCodeDeprecated.TABLE_SCHEMA_GET_FAILED,
@@ -109,14 +104,19 @@ public class CassandraSource extends
AbstractSingleSplitSource<SeaTunnelRow>
}
}
+ @Override
+ public String getPluginName() {
+ return "Cassandra";
+ }
+
@Override
public Boundedness getBoundedness() {
return Boundedness.BOUNDED;
}
@Override
- public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
- return this.rowTypeInfo;
+ public List<CatalogTable> getProducedCatalogTables() {
+ return Collections.singletonList(catalogTable);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceFactory.java
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceFactory.java
index 9c15411151..7160d19e1e 100644
---
a/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cassandra/src/main/java/org/apache/seatunnel/connectors/seatunnel/cassandra/source/CassandraSourceFactory.java
@@ -19,12 +19,25 @@ package
org.apache.seatunnel.connectors.seatunnel.cassandra.source;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraConfig;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
import com.google.auto.service.AutoService;
+import java.io.Serializable;
+
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSourceOptions.CONSISTENCY_LEVEL;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSourceOptions.CQL;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSourceOptions.DATACENTER;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSourceOptions.HOST;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSourceOptions.KEYSPACE;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSourceOptions.PASSWORD;
+import static
org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraSourceOptions.USERNAME;
+
@AutoService(Factory.class)
public class CassandraSourceFactory implements TableSourceFactory {
@Override
@@ -35,12 +48,22 @@ public class CassandraSourceFactory implements
TableSourceFactory {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(CassandraConfig.HOST, CassandraConfig.KEYSPACE,
CassandraConfig.CQL)
- .bundled(CassandraConfig.USERNAME, CassandraConfig.PASSWORD)
- .optional(CassandraConfig.DATACENTER,
CassandraConfig.CONSISTENCY_LEVEL)
+ .required(HOST, KEYSPACE, CQL)
+ .bundled(USERNAME, PASSWORD)
+ .optional(DATACENTER, CONSISTENCY_LEVEL)
.build();
}
+ @Override
+ public <T, SplitT extends SourceSplit, StateT extends Serializable>
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
+ CassandraParameters cassandraParameters = new CassandraParameters();
+ cassandraParameters.buildWithConfig(context.getOptions());
+ return () ->
+ (SeaTunnelSource<T, SplitT, StateT>)
+ new CassandraSource(cassandraParameters,
context.getOptions());
+ }
+
@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
return CassandraSource.class;