This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 145b68793f [Improve] table_store options (#9515)
145b68793f is described below
commit 145b68793fb936dedf3dc02301584daec03955c9
Author: Jarvis <[email protected]>
AuthorDate: Sun Jun 29 11:39:44 2025 +0800
[Improve] table_store options (#9515)
Co-authored-by: corgy-w <[email protected]>
---
.../seatunnel/api/ConnectorOptionCheckTest.java | 2 -
...oreConfig.java => TableStoreCommonOptions.java} | 21 +++--
.../tablestore/config/TableStoreConfig.java | 55 ++++++++++++
.../tablestore/config/TableStoreSinkOptions.java} | 19 ++---
.../config/TableStoreSourceOptions.java} | 15 +---
.../tablestore/config/TablestoreOptions.java | 79 ------------------
.../serialize/DefaultSeaTunnelRowSerializer.java | 15 ++--
.../{TablestoreWriter.java => TableStoreSink.java} | 37 +++++----
...reSinkClient.java => TableStoreSinkClient.java} | 22 ++---
...SinkFactory.java => TableStoreSinkFactory.java} | 35 ++++----
...TablestoreWriter.java => TableStoreWriter.java} | 13 ++-
.../seatunnel/tablestore/sink/TablestoreSink.java | 97 ----------------------
...bleStoreDBSource.java => TableStoreSource.java} | 55 ++++++------
...ceFactory.java => TableStoreSourceFactory.java} | 22 ++---
...urceReader.java => TableStoreSourceReader.java} | 47 ++++++-----
...SourceSplit.java => TableStoreSourceSplit.java} | 2 +-
...r.java => TableStoreSourceSplitEnumerator.java} | 55 ++++++------
...SourceState.java => TableStoreSourceState.java} | 4 +-
...FactoryTest.java => TableStoreFactoryTest.java} | 6 +-
19 files changed, 234 insertions(+), 367 deletions(-)
diff --git
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 4e5250bfb6..1d14dd556d 100644
---
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -194,8 +194,6 @@ public class ConnectorOptionCheckTest {
whiteList.add("TypesenseSinkOptions");
whiteList.add("MongodbSinkOptions");
whiteList.add("SelectDBSinkOptions");
- whiteList.add("TablestoreSinkOptions");
- whiteList.add("TableStoreDBSourceOptions");
whiteList.add("PostgresIncrementalSourceOptions");
whiteList.add("SqlServerIncrementalSourceOptions");
whiteList.add("OracleIncrementalSourceOptions");
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreCommonOptions.java
similarity index 84%
rename from
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
rename to
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreCommonOptions.java
index 3e1714c551..cde4c97080 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreCommonOptions.java
@@ -20,39 +20,42 @@ package
org.apache.seatunnel.connectors.seatunnel.tablestore.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import java.io.Serializable;
+import java.util.List;
+
+public class TableStoreCommonOptions {
+
+ public static final String identifier = "Tablestore";
-public class TablestoreConfig implements Serializable {
public static final Option<String> END_POINT =
Options.key("end_point")
.stringType()
.noDefaultValue()
.withDescription(" Tablestore end_point");
+
public static final Option<String> INSTANCE_NAME =
Options.key("instance_name")
.stringType()
.noDefaultValue()
.withDescription(" Tablestore instance_name");
+
public static final Option<String> ACCESS_KEY_ID =
Options.key("access_key_id")
.stringType()
.noDefaultValue()
.withDescription(" Tablestore access_key_id");
+
public static final Option<String> ACCESS_KEY_SECRET =
Options.key("access_key_secret")
.stringType()
.noDefaultValue()
.withDescription(" Tablestore access_key_secret");
+
public static final Option<String> TABLE =
Options.key("table").stringType().noDefaultValue().withDescription(" Tablestore
table");
- public static final Option<String> BATCH_SIZE =
- Options.key("batch_size")
- .stringType()
- .defaultValue("25")
- .withDescription(" Tablestore batch_size");
- public static final Option<String> PRIMARY_KEYS =
+
+ public static final Option<List<String>> PRIMARY_KEYS =
Options.key("primary_keys")
- .stringType()
+ .listType()
.noDefaultValue()
.withDescription(" Tablestore primary_keys");
}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreConfig.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreConfig.java
new file mode 100644
index 0000000000..f2efde9603
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreConfig.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class TableStoreConfig implements Serializable {
+
+ private String endpoint;
+
+ private String instanceName;
+
+ private String accessKeyId;
+
+ private String accessKeySecret;
+
+ private String table;
+
+ private List<String> primaryKeys;
+
+ public int batchSize;
+
+ public TableStoreConfig() {}
+
+ public TableStoreConfig(ReadonlyConfig config) {
+ this.endpoint = config.get(TableStoreCommonOptions.END_POINT);
+ this.instanceName = config.get(TableStoreCommonOptions.INSTANCE_NAME);
+ this.accessKeyId = config.get(TableStoreCommonOptions.ACCESS_KEY_ID);
+ this.accessKeySecret =
config.get(TableStoreCommonOptions.ACCESS_KEY_SECRET);
+ this.table = config.get(TableStoreCommonOptions.TABLE);
+ this.primaryKeys = config.get(TableStoreCommonOptions.PRIMARY_KEYS);
+ this.batchSize = config.get(TableStoreSinkOptions.BATCH_SIZE);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreSinkOptions.java
similarity index 63%
copy from
seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
copy to
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreSinkOptions.java
index 58920f8a51..0870218d93 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreSinkOptions.java
@@ -15,17 +15,16 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.tablestore;
+package org.apache.seatunnel.connectors.seatunnel.tablestore.config;
-import
org.apache.seatunnel.connectors.seatunnel.tablestore.sink.TablestoreSinkFactory;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+public class TableStoreSinkOptions extends TableStoreCommonOptions {
-class TablestoreFactoryTest {
-
- @Test
- void optionRule() {
- Assertions.assertNotNull((new TablestoreSinkFactory()).optionRule());
- }
+ public static final Option<Integer> BATCH_SIZE =
+ Options.key("batch_size")
+ .intType()
+ .defaultValue(25)
+ .withDescription(" Tablestore batch_size");
}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreSourceOptions.java
similarity index 67%
copy from
seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
copy to
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreSourceOptions.java
index 58920f8a51..d7d717a951 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreSourceOptions.java
@@ -15,17 +15,6 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.tablestore;
+package org.apache.seatunnel.connectors.seatunnel.tablestore.config;
-import
org.apache.seatunnel.connectors.seatunnel.tablestore.sink.TablestoreSinkFactory;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-class TablestoreFactoryTest {
-
- @Test
- void optionRule() {
- Assertions.assertNotNull((new TablestoreSinkFactory()).optionRule());
- }
-}
+public class TableStoreSourceOptions extends TableStoreCommonOptions {}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
deleted file mode 100644
index be12181893..0000000000
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.tablestore.config;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;
-
-@Data
-@AllArgsConstructor
-public class TablestoreOptions implements Serializable {
-
- private String endpoint;
-
- private String instanceName;
-
- private String accessKeyId;
-
- private String accessKeySecret;
-
- private String table;
-
- private List<String> primaryKeys;
-
- public int batchSize = Integer.parseInt(BATCH_SIZE.defaultValue());
-
- public TablestoreOptions() {}
-
- public TablestoreOptions(Config config) {
- this.endpoint = config.getString(TablestoreConfig.END_POINT.key());
- this.instanceName =
config.getString(TablestoreConfig.INSTANCE_NAME.key());
- this.accessKeyId =
config.getString(TablestoreConfig.ACCESS_KEY_ID.key());
- this.accessKeySecret =
config.getString(TablestoreConfig.ACCESS_KEY_SECRET.key());
- this.table = config.getString(TablestoreConfig.TABLE.key());
- this.primaryKeys =
config.getStringList(TablestoreConfig.PRIMARY_KEYS.key());
-
- if (config.hasPath(BATCH_SIZE.key())) {
- this.batchSize = config.getInt(BATCH_SIZE.key());
- }
- }
-
- public static TablestoreOptions of(ReadonlyConfig config) {
- Map<String, Object> map = config.getSourceMap();
- TablestoreOptions tablestoreOptions = new TablestoreOptions();
- tablestoreOptions.setEndpoint(config.get(TablestoreConfig.END_POINT));
-
tablestoreOptions.setInstanceName(config.get(TablestoreConfig.INSTANCE_NAME));
-
tablestoreOptions.setAccessKeyId(config.get(TablestoreConfig.ACCESS_KEY_ID));
-
tablestoreOptions.setAccessKeySecret(config.get(TablestoreConfig.ACCESS_KEY_SECRET));
- tablestoreOptions.setTable(config.get(TablestoreConfig.TABLE));
- List<String> keys = (List<String>)
map.get(TablestoreConfig.PRIMARY_KEYS.key());
-
- tablestoreOptions.setPrimaryKeys(keys);
- return tablestoreOptions;
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java
index 8b2b3dcb0f..c90072a376 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreConfig;
import
org.apache.seatunnel.connectors.seatunnel.tablestore.exception.TablestoreConnectorException;
import com.alicloud.openservices.tablestore.model.Column;
@@ -42,12 +42,12 @@ import java.util.List;
public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
private final SeaTunnelRowType seaTunnelRowType;
- private final TablestoreOptions tablestoreOptions;
+ private final TableStoreConfig tableStoreConfig;
public DefaultSeaTunnelRowSerializer(
- SeaTunnelRowType seaTunnelRowType, TablestoreOptions
tablestoreOptions) {
+ SeaTunnelRowType seaTunnelRowType, TableStoreConfig
tableStoreConfig) {
this.seaTunnelRowType = seaTunnelRowType;
- this.tablestoreOptions = tablestoreOptions;
+ this.tableStoreConfig = tableStoreConfig;
}
@Override
@@ -56,15 +56,14 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer {
PrimaryKeyBuilder primaryKeyBuilder =
PrimaryKeyBuilder.createPrimaryKeyBuilder();
List<Column> columns =
new ArrayList<>(
- seaTunnelRow.getFields().length
- - tablestoreOptions.getPrimaryKeys().size());
+ seaTunnelRow.getFields().length -
tableStoreConfig.getPrimaryKeys().size());
Arrays.stream(seaTunnelRowType.getFieldNames())
.forEach(
fieldName -> {
Object field =
seaTunnelRow.getField(seaTunnelRowType.indexOf(fieldName));
int index = seaTunnelRowType.indexOf(fieldName);
- if
(tablestoreOptions.getPrimaryKeys().contains(fieldName)) {
+ if
(tableStoreConfig.getPrimaryKeys().contains(fieldName)) {
primaryKeyBuilder.addPrimaryKeyColumn(
this.convertPrimaryKeyColumn(
fieldName,
@@ -81,7 +80,7 @@ public class DefaultSeaTunnelRowSerializer implements
SeaTunnelRowSerializer {
}
});
RowPutChange rowPutChange =
- new RowPutChange(tablestoreOptions.getTable(),
primaryKeyBuilder.build());
+ new RowPutChange(tableStoreConfig.getTable(),
primaryKeyBuilder.build());
rowPutChange.setCondition(new
Condition(RowExistenceExpectation.IGNORE));
columns.forEach(rowPutChange::addColumn);
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSink.java
similarity index 51%
copy from
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
copy to
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSink.java
index 22bfe1be27..382e94cdab 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSink.java
@@ -17,40 +17,41 @@
package org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
-import
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.DefaultSeaTunnelRowSerializer;
-import
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.SeaTunnelRowSerializer;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreConfig;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreSinkOptions;
import java.io.IOException;
import java.util.Optional;
-public class TablestoreWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class TableStoreSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
- private final TablestoreSinkClient tablestoreSinkClient;
- private final SeaTunnelRowSerializer serializer;
+ private final CatalogTable catalogTable;
+ private final TableStoreConfig tableStoreConfig;
- public TablestoreWriter(
- TablestoreOptions tablestoreOptions, SeaTunnelRowType
seaTunnelRowType) {
- tablestoreSinkClient = new TablestoreSinkClient(tablestoreOptions,
seaTunnelRowType);
- serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType,
tablestoreOptions);
+ public TableStoreSink(ReadonlyConfig pluginConfig, CatalogTable
catalogTable) {
+ this.tableStoreConfig = new TableStoreConfig(pluginConfig);
+ this.catalogTable = catalogTable;
}
@Override
- public void write(SeaTunnelRow element) throws IOException {
- tablestoreSinkClient.write(serializer.serialize(element));
+ public String getPluginName() {
+ return TableStoreSinkOptions.identifier;
}
@Override
- public void close() throws IOException {
- tablestoreSinkClient.close();
+ public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
+ throws IOException {
+ return new TableStoreWriter(tableStoreConfig,
catalogTable.getSeaTunnelRowType());
}
@Override
- public Optional<Void> prepareCommit() {
- tablestoreSinkClient.flush();
- return super.prepareCommit();
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.of(catalogTable);
}
}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSinkClient.java
similarity index 84%
rename from
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
rename to
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSinkClient.java
index e4ff70f405..889ea64ea0 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSinkClient.java
@@ -19,7 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreConfig;
import
org.apache.seatunnel.connectors.seatunnel.tablestore.exception.TablestoreConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.tablestore.exception.TablestoreConnectorException;
@@ -34,15 +34,15 @@ import java.util.ArrayList;
import java.util.List;
@Slf4j
-public class TablestoreSinkClient {
- private final TablestoreOptions tablestoreOptions;
+public class TableStoreSinkClient {
+ private final TableStoreConfig tableStoreConfig;
private volatile boolean initialize;
private volatile Exception flushException;
private SyncClient syncClient;
private final List<RowPutChange> batchList;
- public TablestoreSinkClient(TablestoreOptions tablestoreOptions,
SeaTunnelRowType typeInfo) {
- this.tablestoreOptions = tablestoreOptions;
+ public TableStoreSinkClient(TableStoreConfig tableStoreConfig,
SeaTunnelRowType typeInfo) {
+ this.tableStoreConfig = tableStoreConfig;
this.batchList = new ArrayList<>();
}
@@ -52,10 +52,10 @@ public class TablestoreSinkClient {
}
syncClient =
new SyncClient(
- tablestoreOptions.getEndpoint(),
- tablestoreOptions.getAccessKeyId(),
- tablestoreOptions.getAccessKeySecret(),
- tablestoreOptions.getInstanceName());
+ tableStoreConfig.getEndpoint(),
+ tableStoreConfig.getAccessKeyId(),
+ tableStoreConfig.getAccessKeySecret(),
+ tableStoreConfig.getInstanceName());
initialize = true;
}
@@ -64,8 +64,8 @@ public class TablestoreSinkClient {
tryInit();
checkFlushException();
batchList.add(rowPutChange);
- if (tablestoreOptions.getBatchSize() > 0
- && batchList.size() >= tablestoreOptions.getBatchSize()) {
+ if (tableStoreConfig.getBatchSize() > 0
+ && batchList.size() >= tableStoreConfig.getBatchSize()) {
flush();
}
}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSinkFactory.java
similarity index 57%
rename from
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
rename to
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSinkFactory.java
index 7e417f9828..bdf9a39473 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSinkFactory.java
@@ -19,38 +19,39 @@ package
org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreSinkOptions;
import com.google.auto.service.AutoService;
-import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID;
-import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET;
-import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.END_POINT;
-import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.INSTANCE_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.PRIMARY_KEYS;
-import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.TABLE;
-
@AutoService(Factory.class)
-public class TablestoreSinkFactory implements TableSinkFactory {
+public class TableStoreSinkFactory implements TableSinkFactory {
+
@Override
public String factoryIdentifier() {
- return "Tablestore";
+ return TableStoreSinkOptions.identifier;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(
- END_POINT,
- TABLE,
- INSTANCE_NAME,
- ACCESS_KEY_ID,
- ACCESS_KEY_SECRET,
- PRIMARY_KEYS,
+ TableStoreSinkOptions.END_POINT,
+ TableStoreSinkOptions.TABLE,
+ TableStoreSinkOptions.INSTANCE_NAME,
+ TableStoreSinkOptions.ACCESS_KEY_ID,
+ TableStoreSinkOptions.ACCESS_KEY_SECRET,
+ TableStoreSinkOptions.PRIMARY_KEYS,
ConnectorCommonOptions.SCHEMA)
- .optional(BATCH_SIZE)
+ .optional(TableStoreSinkOptions.BATCH_SIZE)
.build();
}
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () -> new TableStoreSink(context.getOptions(),
context.getCatalogTable());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreWriter.java
similarity index 83%
rename from
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
rename to
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreWriter.java
index 22bfe1be27..890e71bba6 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreWriter.java
@@ -20,22 +20,21 @@ package
org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreConfig;
import
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.DefaultSeaTunnelRowSerializer;
import
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.SeaTunnelRowSerializer;
import java.io.IOException;
import java.util.Optional;
-public class TablestoreWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class TableStoreWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
- private final TablestoreSinkClient tablestoreSinkClient;
+ private final TableStoreSinkClient tablestoreSinkClient;
private final SeaTunnelRowSerializer serializer;
- public TablestoreWriter(
- TablestoreOptions tablestoreOptions, SeaTunnelRowType
seaTunnelRowType) {
- tablestoreSinkClient = new TablestoreSinkClient(tablestoreOptions,
seaTunnelRowType);
- serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType,
tablestoreOptions);
+ public TableStoreWriter(TableStoreConfig tableStoreConfig,
SeaTunnelRowType seaTunnelRowType) {
+ tablestoreSinkClient = new TableStoreSinkClient(tableStoreConfig,
seaTunnelRowType);
+ serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType,
tableStoreConfig);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
deleted file mode 100644
index 2656263850..0000000000
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
-import
org.apache.seatunnel.connectors.seatunnel.tablestore.exception.TablestoreConnectorException;
-
-import com.google.auto.service.AutoService;
-
-import java.io.IOException;
-import java.util.Optional;
-
-import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID;
-import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET;
-import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.END_POINT;
-import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.INSTANCE_NAME;
-import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.PRIMARY_KEYS;
-import static
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.TABLE;
-
-@AutoService(SeaTunnelSink.class)
-public class TablestoreSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
-
- private SeaTunnelRowType rowType;
-
- private TablestoreOptions tablestoreOptions;
-
- @Override
- public String getPluginName() {
- return "Tablestore";
- }
-
- @Override
- public void prepare(Config pluginConfig) throws PrepareFailException {
- CheckResult result =
- CheckConfigUtil.checkAllExists(
- pluginConfig,
- END_POINT.key(),
- TABLE.key(),
- INSTANCE_NAME.key(),
- ACCESS_KEY_ID.key(),
- ACCESS_KEY_SECRET.key(),
- PRIMARY_KEYS.key());
- if (!result.isSuccess()) {
- throw new TablestoreConnectorException(
- SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
- String.format(
- "PluginName: %s, PluginType: %s, Message: %s",
- getPluginName(), PluginType.SINK,
result.getMsg()));
- }
- tablestoreOptions = new TablestoreOptions(pluginConfig);
- }
-
- @Override
- public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.rowType = seaTunnelRowType;
- }
-
- @Override
- public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context)
- throws IOException {
- return new TablestoreWriter(tablestoreOptions, rowType);
- }
-
- @Override
- public Optional<CatalogTable> getWriteCatalogTable() {
- return super.getWriteCatalogTable();
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSource.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSource.java
similarity index 61%
rename from
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSource.java
rename to
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSource.java
index 85c0062ed3..96c2f40707 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSource.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSource.java
@@ -28,38 +28,38 @@ import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.JobMode;
-import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreConfig;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreSourceOptions;
import lombok.extern.slf4j.Slf4j;
+import java.util.Collections;
import java.util.List;
@Slf4j
-public class TableStoreDBSource
- implements SeaTunnelSource<SeaTunnelRow, TableStoreDBSourceSplit,
TableStoreDBSourceState>,
+public class TableStoreSource
+ implements SeaTunnelSource<SeaTunnelRow, TableStoreSourceSplit,
TableStoreSourceState>,
SupportParallelism,
SupportColumnProjection {
- private TablestoreOptions tablestoreOptions;
- private SeaTunnelRowType typeInfo;
+ private final TableStoreConfig tableStoreConfig;
+ private final CatalogTable catalogTable;
private JobContext jobContext;
+ public TableStoreSource(ReadonlyConfig config) {
+ this.tableStoreConfig = new TableStoreConfig(config);
+ this.catalogTable = CatalogTableUtil.buildWithConfig(config);
+ }
+
@Override
public String getPluginName() {
- return "Tablestore";
+ return TableStoreSourceOptions.identifier;
}
@Override
public List<CatalogTable> getProducedCatalogTables() {
- return SeaTunnelSource.super.getProducedCatalogTables();
- }
-
- public TableStoreDBSource(ReadonlyConfig config) {
- this.tablestoreOptions = TablestoreOptions.of(config);
- CatalogTableUtil.buildWithConfig(config);
- this.typeInfo =
CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
+ return Collections.singletonList(catalogTable);
}
@Override
@@ -70,29 +70,28 @@ public class TableStoreDBSource
}
@Override
- public SourceReader<SeaTunnelRow, TableStoreDBSourceSplit>
createReader(Context readerContext)
+ public SourceReader<SeaTunnelRow, TableStoreSourceSplit>
createReader(Context readerContext)
throws Exception {
- return new TableStoreDBSourceReader(readerContext, tablestoreOptions,
typeInfo);
+ return new TableStoreSourceReader(
+ readerContext, tableStoreConfig,
catalogTable.getSeaTunnelRowType());
}
@Override
- public SourceSplitEnumerator<TableStoreDBSourceSplit,
TableStoreDBSourceState> createEnumerator(
-
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context<TableStoreDBSourceSplit>
+ public SourceSplitEnumerator<TableStoreSourceSplit, TableStoreSourceState>
createEnumerator(
+
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context<TableStoreSourceSplit>
enumeratorContext)
throws Exception {
- return new TableStoreDBSourceSplitEnumerator(enumeratorContext,
tablestoreOptions);
+ return new TableStoreSourceSplitEnumerator(enumeratorContext,
tableStoreConfig);
}
@Override
- public SourceSplitEnumerator<TableStoreDBSourceSplit,
TableStoreDBSourceState>
- restoreEnumerator(
-
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context<
- TableStoreDBSourceSplit>
- enumeratorContext,
- TableStoreDBSourceState checkpointState)
- throws Exception {
- return new TableStoreDBSourceSplitEnumerator(
- enumeratorContext, tablestoreOptions, checkpointState);
+ public SourceSplitEnumerator<TableStoreSourceSplit, TableStoreSourceState>
restoreEnumerator(
+
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context<TableStoreSourceSplit>
+ enumeratorContext,
+ TableStoreSourceState checkpointState)
+ throws Exception {
+ return new TableStoreSourceSplitEnumerator(
+ enumeratorContext, tableStoreConfig, checkpointState);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDbSourceFactory.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceFactory.java
similarity index 77%
rename from
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDbSourceFactory.java
rename to
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceFactory.java
index f93ae4bfe3..f17b72644b 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDbSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceFactory.java
@@ -23,30 +23,30 @@ import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
-import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreSourceOptions;
import com.google.auto.service.AutoService;
import java.io.Serializable;
@AutoService(Factory.class)
-public class TableStoreDbSourceFactory implements TableSourceFactory {
+public class TableStoreSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
- return "Tablestore";
+ return TableStoreSourceOptions.identifier;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(
- TablestoreConfig.END_POINT,
- TablestoreConfig.INSTANCE_NAME,
- TablestoreConfig.ACCESS_KEY_ID,
- TablestoreConfig.ACCESS_KEY_SECRET,
- TablestoreConfig.TABLE,
- TablestoreConfig.PRIMARY_KEYS)
+ TableStoreSourceOptions.END_POINT,
+ TableStoreSourceOptions.INSTANCE_NAME,
+ TableStoreSourceOptions.ACCESS_KEY_ID,
+ TableStoreSourceOptions.ACCESS_KEY_SECRET,
+ TableStoreSourceOptions.TABLE,
+ TableStoreSourceOptions.PRIMARY_KEYS)
.build();
}
@@ -54,11 +54,11 @@ public class TableStoreDbSourceFactory implements
TableSourceFactory {
public <T, SplitT extends SourceSplit, StateT extends Serializable>
TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
return () ->
- (SeaTunnelSource<T, SplitT, StateT>) new
TableStoreDBSource(context.getOptions());
+ (SeaTunnelSource<T, SplitT, StateT>) new
TableStoreSource(context.getOptions());
}
@Override
public Class<? extends SeaTunnelSource> getSourceClass() {
- return TableStoreDBSource.class;
+ return TableStoreSource.class;
}
}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceReader.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceReader.java
similarity index 82%
rename from
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceReader.java
rename to
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceReader.java
index eefd4aae03..d99ce6f784 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceReader.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceReader.java
@@ -1,4 +1,3 @@
-package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -15,11 +14,14 @@ package
org.apache.seatunnel.connectors.seatunnel.tablestore.source;
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
+
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreConfig;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TunnelClient;
@@ -42,24 +44,23 @@ import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
@Slf4j
-public class TableStoreDBSourceReader
- implements SourceReader<SeaTunnelRow, TableStoreDBSourceSplit> {
+public class TableStoreSourceReader implements SourceReader<SeaTunnelRow,
TableStoreSourceSplit> {
protected SourceReader.Context context;
- protected TablestoreOptions tablestoreOptions;
+ protected TableStoreConfig tableStoreConfig;
protected SeaTunnelRowType seaTunnelRowType;
- Queue<TableStoreDBSourceSplit> pendingSplits = new
ConcurrentLinkedDeque<>();
+ Queue<TableStoreSourceSplit> pendingSplits = new ConcurrentLinkedDeque<>();
private SyncClient client;
private volatile boolean noMoreSplit;
private TunnelClient tunnelClient;
- public TableStoreDBSourceReader(
+ public TableStoreSourceReader(
SourceReader.Context context,
- TablestoreOptions options,
+ TableStoreConfig options,
SeaTunnelRowType seaTunnelRowType) {
this.context = context;
- this.tablestoreOptions = options;
+ this.tableStoreConfig = options;
this.seaTunnelRowType = seaTunnelRowType;
}
@@ -67,16 +68,16 @@ public class TableStoreDBSourceReader
public void open() throws Exception {
client =
new SyncClient(
- tablestoreOptions.getEndpoint(),
- tablestoreOptions.getAccessKeyId(),
- tablestoreOptions.getAccessKeySecret(),
- tablestoreOptions.getInstanceName());
+ tableStoreConfig.getEndpoint(),
+ tableStoreConfig.getAccessKeyId(),
+ tableStoreConfig.getAccessKeySecret(),
+ tableStoreConfig.getInstanceName());
tunnelClient =
new TunnelClient(
- tablestoreOptions.getEndpoint(),
- tablestoreOptions.getAccessKeyId(),
- tablestoreOptions.getAccessKeySecret(),
- tablestoreOptions.getInstanceName());
+ tableStoreConfig.getEndpoint(),
+ tableStoreConfig.getAccessKeyId(),
+ tableStoreConfig.getAccessKeySecret(),
+ tableStoreConfig.getInstanceName());
}
@Override
@@ -88,7 +89,7 @@ public class TableStoreDBSourceReader
@Override
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
synchronized (output.getCheckpointLock()) {
- TableStoreDBSourceSplit split = pendingSplits.poll();
+ TableStoreSourceSplit split = pendingSplits.poll();
if (Objects.nonNull(split)) {
read(split, output);
}
@@ -108,7 +109,7 @@ public class TableStoreDBSourceReader
}
}
- private void read(TableStoreDBSourceSplit split, Collector<SeaTunnelRow>
output) {
+ private void read(TableStoreSourceSplit split, Collector<SeaTunnelRow>
output) {
String tunnelId = getTunel(split);
TableStoreProcessor processor =
new TableStoreProcessor(split.getTableName(),
split.getPrimaryKey(), output);
@@ -122,7 +123,7 @@ public class TableStoreDBSourceReader
}
}
- public String getTunel(TableStoreDBSourceSplit split) {
+ public String getTunel(TableStoreSourceSplit split) {
deleteTunel(split);
String tunnelId = null;
String tunnelName = split.getTableName() + "_migration2aws_tunnel4" +
split.getSplitId();
@@ -142,7 +143,7 @@ public class TableStoreDBSourceReader
return tunnelId;
}
- public void deleteTunel(TableStoreDBSourceSplit split) {
+ public void deleteTunel(TableStoreSourceSplit split) {
String tunnelName = split.getTableName() + "_migration2aws_tunnel4" +
split.getSplitId();
try {
DeleteTunnelRequest drequest =
@@ -155,12 +156,12 @@ public class TableStoreDBSourceReader
}
@Override
- public List<TableStoreDBSourceSplit> snapshotState(long checkpointId)
throws Exception {
+ public List<TableStoreSourceSplit> snapshotState(long checkpointId) throws
Exception {
return new ArrayList<>(pendingSplits);
}
@Override
- public void addSplits(List<TableStoreDBSourceSplit> splits) {
+ public void addSplits(List<TableStoreSourceSplit> splits) {
this.pendingSplits.addAll(splits);
}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplit.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceSplit.java
similarity index 95%
rename from
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplit.java
rename to
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceSplit.java
index b1ad1fb856..5881a07643 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplit.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceSplit.java
@@ -25,7 +25,7 @@ import lombok.Setter;
@AllArgsConstructor
@Getter
@Setter
-public class TableStoreDBSourceSplit implements SourceSplit {
+public class TableStoreSourceSplit implements SourceSplit {
private static final long serialVersionUID = 6471832674315580956L;
private Integer splitId;
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplitEnumerator.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceSplitEnumerator.java
similarity index 70%
rename from
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplitEnumerator.java
rename to
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceSplitEnumerator.java
index 3dd58b7e69..7239d6185c 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplitEnumerator.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceSplitEnumerator.java
@@ -17,7 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreConfig;
import lombok.extern.slf4j.Slf4j;
@@ -32,32 +32,31 @@ import java.util.Map;
import java.util.Set;
@Slf4j
-public class TableStoreDBSourceSplitEnumerator
- implements SourceSplitEnumerator<TableStoreDBSourceSplit,
TableStoreDBSourceState> {
+public class TableStoreSourceSplitEnumerator
+ implements SourceSplitEnumerator<TableStoreSourceSplit,
TableStoreSourceState> {
- private final SourceSplitEnumerator.Context<TableStoreDBSourceSplit>
enumeratorContext;
- private final Map<Integer, List<TableStoreDBSourceSplit>> pendingSplits;
- private final TablestoreOptions tablestoreOptions;
+ private final SourceSplitEnumerator.Context<TableStoreSourceSplit>
enumeratorContext;
+ private final Map<Integer, List<TableStoreSourceSplit>> pendingSplits;
+ private final TableStoreConfig tableStoreConfig;
private final Object stateLock = new Object();
private volatile boolean shouldEnumerate;
/**
* @param enumeratorContext
- * @param tablestoreOptions
+ * @param tableStoreConfig
*/
- public TableStoreDBSourceSplitEnumerator(
- Context<TableStoreDBSourceSplit> enumeratorContext,
- TablestoreOptions tablestoreOptions) {
- this(enumeratorContext, tablestoreOptions, null);
+ public TableStoreSourceSplitEnumerator(
+ Context<TableStoreSourceSplit> enumeratorContext, TableStoreConfig
tableStoreConfig) {
+ this(enumeratorContext, tableStoreConfig, null);
}
- public TableStoreDBSourceSplitEnumerator(
- Context<TableStoreDBSourceSplit> enumeratorContext,
- TablestoreOptions tablestoreOptions,
- TableStoreDBSourceState sourceState) {
+ public TableStoreSourceSplitEnumerator(
+ Context<TableStoreSourceSplit> enumeratorContext,
+ TableStoreConfig tableStoreConfig,
+ TableStoreSourceState sourceState) {
this.enumeratorContext = enumeratorContext;
- this.tablestoreOptions = tablestoreOptions;
+ this.tableStoreConfig = tableStoreConfig;
this.pendingSplits = new HashMap<>();
this.shouldEnumerate = sourceState == null;
if (sourceState != null) {
@@ -73,7 +72,7 @@ public class TableStoreDBSourceSplitEnumerator
public void run() throws Exception {
Set<Integer> readers = enumeratorContext.registeredReaders();
if (shouldEnumerate) {
- Set<TableStoreDBSourceSplit> newSplits =
getTableStoreDBSourceSplit();
+ Set<TableStoreSourceSplit> newSplits =
getTableStoreDBSourceSplit();
synchronized (stateLock) {
addPendingSplit(newSplits);
shouldEnumerate = false;
@@ -84,7 +83,7 @@ public class TableStoreDBSourceSplitEnumerator
private void assignSplit(Set<Integer> readers) {
for (int reader : readers) {
- List<TableStoreDBSourceSplit> assignmentForReader =
pendingSplits.remove(reader);
+ List<TableStoreSourceSplit> assignmentForReader =
pendingSplits.remove(reader);
if (assignmentForReader != null && !assignmentForReader.isEmpty())
{
log.info("Assign splits {} to reader {}", assignmentForReader,
reader);
try {
@@ -101,22 +100,22 @@ public class TableStoreDBSourceSplitEnumerator
}
}
- private Set<TableStoreDBSourceSplit> getTableStoreDBSourceSplit() {
+ private Set<TableStoreSourceSplit> getTableStoreDBSourceSplit() {
- Set<TableStoreDBSourceSplit> allSplit = new HashSet<>();
- String tables = tablestoreOptions.getTable();
+ Set<TableStoreSourceSplit> allSplit = new HashSet<>();
+ String tables = tableStoreConfig.getTable();
String[] tableArr = tables.split(",");
for (int i = 0; i < tableArr.length; i++) {
allSplit.add(
- new TableStoreDBSourceSplit(
- i, tableArr[i],
tablestoreOptions.getPrimaryKeys().get(i)));
+ new TableStoreSourceSplit(
+ i, tableArr[i],
tableStoreConfig.getPrimaryKeys().get(i)));
}
return allSplit;
}
- private void addPendingSplit(Collection<TableStoreDBSourceSplit> splits) {
+ private void addPendingSplit(Collection<TableStoreSourceSplit> splits) {
int readerCount = enumeratorContext.currentParallelism();
- for (TableStoreDBSourceSplit split : splits) {
+ for (TableStoreSourceSplit split : splits) {
int ownerReader = split.getSplitId() % readerCount;
pendingSplits.computeIfAbsent(ownerReader, k -> new
ArrayList<>()).add(split);
}
@@ -129,7 +128,7 @@ public class TableStoreDBSourceSplitEnumerator
}
@Override
- public void addSplitsBack(List<TableStoreDBSourceSplit> splits, int
subtaskId) {
+ public void addSplitsBack(List<TableStoreSourceSplit> splits, int
subtaskId) {
log.debug("Add back splits {} to tablestore.", splits);
if (!splits.isEmpty()) {
addPendingSplit(splits);
@@ -155,9 +154,9 @@ public class TableStoreDBSourceSplitEnumerator
}
@Override
- public TableStoreDBSourceState snapshotState(long checkpointId) throws
Exception {
+ public TableStoreSourceState snapshotState(long checkpointId) throws
Exception {
synchronized (stateLock) {
- return new TableStoreDBSourceState(shouldEnumerate, pendingSplits);
+ return new TableStoreSourceState(shouldEnumerate, pendingSplits);
}
}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceState.java
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceState.java
similarity index 89%
rename from
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceState.java
rename to
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceState.java
index e18d0ea952..7d5ca178a3 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceState.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceState.java
@@ -27,9 +27,9 @@ import java.util.Map;
@Getter
@Setter
@AllArgsConstructor
-public class TableStoreDBSourceState implements Serializable {
+public class TableStoreSourceState implements Serializable {
private static final long serialVersionUID = -2942147037830134078L;
private boolean shouldEnumerate;
- private Map<Integer, List<TableStoreDBSourceSplit>> pendingSplits;
+ private Map<Integer, List<TableStoreSourceSplit>> pendingSplits;
}
diff --git
a/seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
b/seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TableStoreFactoryTest.java
similarity index 90%
rename from
seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
rename to
seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TableStoreFactoryTest.java
index 58920f8a51..26984454d8 100644
---
a/seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
+++
b/seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TableStoreFactoryTest.java
@@ -17,15 +17,15 @@
package org.apache.seatunnel.connectors.seatunnel.tablestore;
-import
org.apache.seatunnel.connectors.seatunnel.tablestore.sink.TablestoreSinkFactory;
+import
org.apache.seatunnel.connectors.seatunnel.tablestore.sink.TableStoreSinkFactory;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-class TablestoreFactoryTest {
+class TableStoreFactoryTest {
@Test
void optionRule() {
- Assertions.assertNotNull((new TablestoreSinkFactory()).optionRule());
+ Assertions.assertNotNull((new TableStoreSinkFactory()).optionRule());
}
}