This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 145b68793f [Improve] table_store options (#9515)
145b68793f is described below

commit 145b68793fb936dedf3dc02301584daec03955c9
Author: Jarvis <[email protected]>
AuthorDate: Sun Jun 29 11:39:44 2025 +0800

    [Improve] table_store options (#9515)
    
    Co-authored-by: corgy-w <[email protected]>
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |  2 -
 ...oreConfig.java => TableStoreCommonOptions.java} | 21 +++--
 .../tablestore/config/TableStoreConfig.java        | 55 ++++++++++++
 .../tablestore/config/TableStoreSinkOptions.java}  | 19 ++---
 .../config/TableStoreSourceOptions.java}           | 15 +---
 .../tablestore/config/TablestoreOptions.java       | 79 ------------------
 .../serialize/DefaultSeaTunnelRowSerializer.java   | 15 ++--
 .../{TablestoreWriter.java => TableStoreSink.java} | 37 +++++----
 ...reSinkClient.java => TableStoreSinkClient.java} | 22 ++---
 ...SinkFactory.java => TableStoreSinkFactory.java} | 35 ++++----
 ...TablestoreWriter.java => TableStoreWriter.java} | 13 ++-
 .../seatunnel/tablestore/sink/TablestoreSink.java  | 97 ----------------------
 ...bleStoreDBSource.java => TableStoreSource.java} | 55 ++++++------
 ...ceFactory.java => TableStoreSourceFactory.java} | 22 ++---
 ...urceReader.java => TableStoreSourceReader.java} | 47 ++++++-----
 ...SourceSplit.java => TableStoreSourceSplit.java} |  2 +-
 ...r.java => TableStoreSourceSplitEnumerator.java} | 55 ++++++------
 ...SourceState.java => TableStoreSourceState.java} |  4 +-
 ...FactoryTest.java => TableStoreFactoryTest.java} |  6 +-
 19 files changed, 234 insertions(+), 367 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 4e5250bfb6..1d14dd556d 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -194,8 +194,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("TypesenseSinkOptions");
         whiteList.add("MongodbSinkOptions");
         whiteList.add("SelectDBSinkOptions");
-        whiteList.add("TablestoreSinkOptions");
-        whiteList.add("TableStoreDBSourceOptions");
         whiteList.add("PostgresIncrementalSourceOptions");
         whiteList.add("SqlServerIncrementalSourceOptions");
         whiteList.add("OracleIncrementalSourceOptions");
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreCommonOptions.java
similarity index 84%
rename from 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
rename to 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreCommonOptions.java
index 3e1714c551..cde4c97080 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreConfig.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreCommonOptions.java
@@ -20,39 +20,42 @@ package 
org.apache.seatunnel.connectors.seatunnel.tablestore.config;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
-import java.io.Serializable;
+import java.util.List;
+
+public class TableStoreCommonOptions {
+
+    public static final String identifier = "Tablestore";
 
-public class TablestoreConfig implements Serializable {
     public static final Option<String> END_POINT =
             Options.key("end_point")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(" Tablestore end_point");
+
     public static final Option<String> INSTANCE_NAME =
             Options.key("instance_name")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(" Tablestore instance_name");
+
     public static final Option<String> ACCESS_KEY_ID =
             Options.key("access_key_id")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(" Tablestore access_key_id");
+
     public static final Option<String> ACCESS_KEY_SECRET =
             Options.key("access_key_secret")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(" Tablestore access_key_secret");
+
     public static final Option<String> TABLE =
             
Options.key("table").stringType().noDefaultValue().withDescription(" Tablestore 
table");
-    public static final Option<String> BATCH_SIZE =
-            Options.key("batch_size")
-                    .stringType()
-                    .defaultValue("25")
-                    .withDescription(" Tablestore batch_size");
-    public static final Option<String> PRIMARY_KEYS =
+
+    public static final Option<List<String>> PRIMARY_KEYS =
             Options.key("primary_keys")
-                    .stringType()
+                    .listType()
                     .noDefaultValue()
                     .withDescription(" Tablestore primary_keys");
 }
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreConfig.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreConfig.java
new file mode 100644
index 0000000000..f2efde9603
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreConfig.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.List;
+
+@Data
+public class TableStoreConfig implements Serializable {
+
+    private String endpoint;
+
+    private String instanceName;
+
+    private String accessKeyId;
+
+    private String accessKeySecret;
+
+    private String table;
+
+    private List<String> primaryKeys;
+
+    public int batchSize;
+
+    public TableStoreConfig() {}
+
+    public TableStoreConfig(ReadonlyConfig config) {
+        this.endpoint = config.get(TableStoreCommonOptions.END_POINT);
+        this.instanceName = config.get(TableStoreCommonOptions.INSTANCE_NAME);
+        this.accessKeyId = config.get(TableStoreCommonOptions.ACCESS_KEY_ID);
+        this.accessKeySecret = 
config.get(TableStoreCommonOptions.ACCESS_KEY_SECRET);
+        this.table = config.get(TableStoreCommonOptions.TABLE);
+        this.primaryKeys = config.get(TableStoreCommonOptions.PRIMARY_KEYS);
+        this.batchSize = config.get(TableStoreSinkOptions.BATCH_SIZE);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreSinkOptions.java
similarity index 63%
copy from 
seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
copy to 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreSinkOptions.java
index 58920f8a51..0870218d93 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreSinkOptions.java
@@ -15,17 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.tablestore;
+package org.apache.seatunnel.connectors.seatunnel.tablestore.config;
 
-import 
org.apache.seatunnel.connectors.seatunnel.tablestore.sink.TablestoreSinkFactory;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+public class TableStoreSinkOptions extends TableStoreCommonOptions {
 
-class TablestoreFactoryTest {
-
-    @Test
-    void optionRule() {
-        Assertions.assertNotNull((new TablestoreSinkFactory()).optionRule());
-    }
+    public static final Option<Integer> BATCH_SIZE =
+            Options.key("batch_size")
+                    .intType()
+                    .defaultValue(25)
+                    .withDescription(" Tablestore batch_size");
 }
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreSourceOptions.java
similarity index 67%
copy from 
seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
copy to 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreSourceOptions.java
index 58920f8a51..d7d717a951 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TableStoreSourceOptions.java
@@ -15,17 +15,6 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.tablestore;
+package org.apache.seatunnel.connectors.seatunnel.tablestore.config;
 
-import 
org.apache.seatunnel.connectors.seatunnel.tablestore.sink.TablestoreSinkFactory;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-class TablestoreFactoryTest {
-
-    @Test
-    void optionRule() {
-        Assertions.assertNotNull((new TablestoreSinkFactory()).optionRule());
-    }
-}
+public class TableStoreSourceOptions extends TableStoreCommonOptions {}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
deleted file mode 100644
index be12181893..0000000000
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/config/TablestoreOptions.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.tablestore.config;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;
-
-@Data
-@AllArgsConstructor
-public class TablestoreOptions implements Serializable {
-
-    private String endpoint;
-
-    private String instanceName;
-
-    private String accessKeyId;
-
-    private String accessKeySecret;
-
-    private String table;
-
-    private List<String> primaryKeys;
-
-    public int batchSize = Integer.parseInt(BATCH_SIZE.defaultValue());
-
-    public TablestoreOptions() {}
-
-    public TablestoreOptions(Config config) {
-        this.endpoint = config.getString(TablestoreConfig.END_POINT.key());
-        this.instanceName = 
config.getString(TablestoreConfig.INSTANCE_NAME.key());
-        this.accessKeyId = 
config.getString(TablestoreConfig.ACCESS_KEY_ID.key());
-        this.accessKeySecret = 
config.getString(TablestoreConfig.ACCESS_KEY_SECRET.key());
-        this.table = config.getString(TablestoreConfig.TABLE.key());
-        this.primaryKeys = 
config.getStringList(TablestoreConfig.PRIMARY_KEYS.key());
-
-        if (config.hasPath(BATCH_SIZE.key())) {
-            this.batchSize = config.getInt(BATCH_SIZE.key());
-        }
-    }
-
-    public static TablestoreOptions of(ReadonlyConfig config) {
-        Map<String, Object> map = config.getSourceMap();
-        TablestoreOptions tablestoreOptions = new TablestoreOptions();
-        tablestoreOptions.setEndpoint(config.get(TablestoreConfig.END_POINT));
-        
tablestoreOptions.setInstanceName(config.get(TablestoreConfig.INSTANCE_NAME));
-        
tablestoreOptions.setAccessKeyId(config.get(TablestoreConfig.ACCESS_KEY_ID));
-        
tablestoreOptions.setAccessKeySecret(config.get(TablestoreConfig.ACCESS_KEY_SECRET));
-        tablestoreOptions.setTable(config.get(TablestoreConfig.TABLE));
-        List<String> keys = (List<String>) 
map.get(TablestoreConfig.PRIMARY_KEYS.key());
-
-        tablestoreOptions.setPrimaryKeys(keys);
-        return tablestoreOptions;
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java
index 8b2b3dcb0f..c90072a376 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/serialize/DefaultSeaTunnelRowSerializer.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.tablestore.exception.TablestoreConnectorException;
 
 import com.alicloud.openservices.tablestore.model.Column;
@@ -42,12 +42,12 @@ import java.util.List;
 public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
 
     private final SeaTunnelRowType seaTunnelRowType;
-    private final TablestoreOptions tablestoreOptions;
+    private final TableStoreConfig tableStoreConfig;
 
     public DefaultSeaTunnelRowSerializer(
-            SeaTunnelRowType seaTunnelRowType, TablestoreOptions 
tablestoreOptions) {
+            SeaTunnelRowType seaTunnelRowType, TableStoreConfig 
tableStoreConfig) {
         this.seaTunnelRowType = seaTunnelRowType;
-        this.tablestoreOptions = tablestoreOptions;
+        this.tableStoreConfig = tableStoreConfig;
     }
 
     @Override
@@ -56,15 +56,14 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
         PrimaryKeyBuilder primaryKeyBuilder = 
PrimaryKeyBuilder.createPrimaryKeyBuilder();
         List<Column> columns =
                 new ArrayList<>(
-                        seaTunnelRow.getFields().length
-                                - tablestoreOptions.getPrimaryKeys().size());
+                        seaTunnelRow.getFields().length - 
tableStoreConfig.getPrimaryKeys().size());
         Arrays.stream(seaTunnelRowType.getFieldNames())
                 .forEach(
                         fieldName -> {
                             Object field =
                                     
seaTunnelRow.getField(seaTunnelRowType.indexOf(fieldName));
                             int index = seaTunnelRowType.indexOf(fieldName);
-                            if 
(tablestoreOptions.getPrimaryKeys().contains(fieldName)) {
+                            if 
(tableStoreConfig.getPrimaryKeys().contains(fieldName)) {
                                 primaryKeyBuilder.addPrimaryKeyColumn(
                                         this.convertPrimaryKeyColumn(
                                                 fieldName,
@@ -81,7 +80,7 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
                             }
                         });
         RowPutChange rowPutChange =
-                new RowPutChange(tablestoreOptions.getTable(), 
primaryKeyBuilder.build());
+                new RowPutChange(tableStoreConfig.getTable(), 
primaryKeyBuilder.build());
         rowPutChange.setCondition(new 
Condition(RowExistenceExpectation.IGNORE));
         columns.forEach(rowPutChange::addColumn);
 
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSink.java
similarity index 51%
copy from 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
copy to 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSink.java
index 22bfe1be27..382e94cdab 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSink.java
@@ -17,40 +17,41 @@
 
 package org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
-import 
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.DefaultSeaTunnelRowSerializer;
-import 
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.SeaTunnelRowSerializer;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreSinkOptions;
 
 import java.io.IOException;
 import java.util.Optional;
 
-public class TablestoreWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class TableStoreSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
 
-    private final TablestoreSinkClient tablestoreSinkClient;
-    private final SeaTunnelRowSerializer serializer;
+    private final CatalogTable catalogTable;
+    private final TableStoreConfig tableStoreConfig;
 
-    public TablestoreWriter(
-            TablestoreOptions tablestoreOptions, SeaTunnelRowType 
seaTunnelRowType) {
-        tablestoreSinkClient = new TablestoreSinkClient(tablestoreOptions, 
seaTunnelRowType);
-        serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType, 
tablestoreOptions);
+    public TableStoreSink(ReadonlyConfig pluginConfig, CatalogTable 
catalogTable) {
+        this.tableStoreConfig = new TableStoreConfig(pluginConfig);
+        this.catalogTable = catalogTable;
     }
 
     @Override
-    public void write(SeaTunnelRow element) throws IOException {
-        tablestoreSinkClient.write(serializer.serialize(element));
+    public String getPluginName() {
+        return TableStoreSinkOptions.identifier;
     }
 
     @Override
-    public void close() throws IOException {
-        tablestoreSinkClient.close();
+    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
+            throws IOException {
+        return new TableStoreWriter(tableStoreConfig, 
catalogTable.getSeaTunnelRowType());
     }
 
     @Override
-    public Optional<Void> prepareCommit() {
-        tablestoreSinkClient.flush();
-        return super.prepareCommit();
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.of(catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSinkClient.java
similarity index 84%
rename from 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
rename to 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSinkClient.java
index e4ff70f405..889ea64ea0 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkClient.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSinkClient.java
@@ -19,7 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.tablestore.exception.TablestoreConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.tablestore.exception.TablestoreConnectorException;
 
@@ -34,15 +34,15 @@ import java.util.ArrayList;
 import java.util.List;
 
 @Slf4j
-public class TablestoreSinkClient {
-    private final TablestoreOptions tablestoreOptions;
+public class TableStoreSinkClient {
+    private final TableStoreConfig tableStoreConfig;
     private volatile boolean initialize;
     private volatile Exception flushException;
     private SyncClient syncClient;
     private final List<RowPutChange> batchList;
 
-    public TablestoreSinkClient(TablestoreOptions tablestoreOptions, 
SeaTunnelRowType typeInfo) {
-        this.tablestoreOptions = tablestoreOptions;
+    public TableStoreSinkClient(TableStoreConfig tableStoreConfig, 
SeaTunnelRowType typeInfo) {
+        this.tableStoreConfig = tableStoreConfig;
         this.batchList = new ArrayList<>();
     }
 
@@ -52,10 +52,10 @@ public class TablestoreSinkClient {
         }
         syncClient =
                 new SyncClient(
-                        tablestoreOptions.getEndpoint(),
-                        tablestoreOptions.getAccessKeyId(),
-                        tablestoreOptions.getAccessKeySecret(),
-                        tablestoreOptions.getInstanceName());
+                        tableStoreConfig.getEndpoint(),
+                        tableStoreConfig.getAccessKeyId(),
+                        tableStoreConfig.getAccessKeySecret(),
+                        tableStoreConfig.getInstanceName());
 
         initialize = true;
     }
@@ -64,8 +64,8 @@ public class TablestoreSinkClient {
         tryInit();
         checkFlushException();
         batchList.add(rowPutChange);
-        if (tablestoreOptions.getBatchSize() > 0
-                && batchList.size() >= tablestoreOptions.getBatchSize()) {
+        if (tableStoreConfig.getBatchSize() > 0
+                && batchList.size() >= tableStoreConfig.getBatchSize()) {
             flush();
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSinkFactory.java
similarity index 57%
rename from 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
rename to 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSinkFactory.java
index 7e417f9828..bdf9a39473 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreSinkFactory.java
@@ -19,38 +19,39 @@ package 
org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreSinkOptions;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID;
-import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET;
-import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.BATCH_SIZE;
-import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.END_POINT;
-import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.INSTANCE_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.PRIMARY_KEYS;
-import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.TABLE;
-
 @AutoService(Factory.class)
-public class TablestoreSinkFactory implements TableSinkFactory {
+public class TableStoreSinkFactory implements TableSinkFactory {
+
     @Override
     public String factoryIdentifier() {
-        return "Tablestore";
+        return TableStoreSinkOptions.identifier;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
                 .required(
-                        END_POINT,
-                        TABLE,
-                        INSTANCE_NAME,
-                        ACCESS_KEY_ID,
-                        ACCESS_KEY_SECRET,
-                        PRIMARY_KEYS,
+                        TableStoreSinkOptions.END_POINT,
+                        TableStoreSinkOptions.TABLE,
+                        TableStoreSinkOptions.INSTANCE_NAME,
+                        TableStoreSinkOptions.ACCESS_KEY_ID,
+                        TableStoreSinkOptions.ACCESS_KEY_SECRET,
+                        TableStoreSinkOptions.PRIMARY_KEYS,
                         ConnectorCommonOptions.SCHEMA)
-                .optional(BATCH_SIZE)
+                .optional(TableStoreSinkOptions.BATCH_SIZE)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () -> new TableStoreSink(context.getOptions(), 
context.getCatalogTable());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreWriter.java
similarity index 83%
rename from 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
rename to 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreWriter.java
index 22bfe1be27..890e71bba6 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreWriter.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TableStoreWriter.java
@@ -20,22 +20,21 @@ package 
org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.DefaultSeaTunnelRowSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.tablestore.serialize.SeaTunnelRowSerializer;
 
 import java.io.IOException;
 import java.util.Optional;
 
-public class TablestoreWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
+public class TableStoreWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
 
-    private final TablestoreSinkClient tablestoreSinkClient;
+    private final TableStoreSinkClient tablestoreSinkClient;
     private final SeaTunnelRowSerializer serializer;
 
-    public TablestoreWriter(
-            TablestoreOptions tablestoreOptions, SeaTunnelRowType 
seaTunnelRowType) {
-        tablestoreSinkClient = new TablestoreSinkClient(tablestoreOptions, 
seaTunnelRowType);
-        serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType, 
tablestoreOptions);
+    public TableStoreWriter(TableStoreConfig tableStoreConfig, 
SeaTunnelRowType seaTunnelRowType) {
+        tablestoreSinkClient = new TableStoreSinkClient(tableStoreConfig, 
seaTunnelRowType);
+        serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType, 
tableStoreConfig);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
deleted file mode 100644
index 2656263850..0000000000
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/sink/TablestoreSink.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.tablestore.sink;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
-import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
-import 
org.apache.seatunnel.connectors.seatunnel.tablestore.exception.TablestoreConnectorException;
-
-import com.google.auto.service.AutoService;
-
-import java.io.IOException;
-import java.util.Optional;
-
-import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_ID;
-import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.ACCESS_KEY_SECRET;
-import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.END_POINT;
-import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.INSTANCE_NAME;
-import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.PRIMARY_KEYS;
-import static 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig.TABLE;
-
-@AutoService(SeaTunnelSink.class)
-public class TablestoreSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
-
-    private SeaTunnelRowType rowType;
-
-    private TablestoreOptions tablestoreOptions;
-
-    @Override
-    public String getPluginName() {
-        return "Tablestore";
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        END_POINT.key(),
-                        TABLE.key(),
-                        INSTANCE_NAME.key(),
-                        ACCESS_KEY_ID.key(),
-                        ACCESS_KEY_SECRET.key(),
-                        PRIMARY_KEYS.key());
-        if (!result.isSuccess()) {
-            throw new TablestoreConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        tablestoreOptions = new TablestoreOptions(pluginConfig);
-    }
-
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.rowType = seaTunnelRowType;
-    }
-
-    @Override
-    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
-            throws IOException {
-        return new TablestoreWriter(tablestoreOptions, rowType);
-    }
-
-    @Override
-    public Optional<CatalogTable> getWriteCatalogTable() {
-        return super.getWriteCatalogTable();
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSource.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSource.java
similarity index 61%
rename from 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSource.java
rename to 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSource.java
index 85c0062ed3..96c2f40707 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSource.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSource.java
@@ -28,38 +28,38 @@ import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.constants.JobMode;
-import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreSourceOptions;
 
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.Collections;
 import java.util.List;
 
 @Slf4j
-public class TableStoreDBSource
-        implements SeaTunnelSource<SeaTunnelRow, TableStoreDBSourceSplit, 
TableStoreDBSourceState>,
+public class TableStoreSource
+        implements SeaTunnelSource<SeaTunnelRow, TableStoreSourceSplit, 
TableStoreSourceState>,
                 SupportParallelism,
                 SupportColumnProjection {
 
-    private TablestoreOptions tablestoreOptions;
-    private SeaTunnelRowType typeInfo;
+    private final TableStoreConfig tableStoreConfig;
+    private final CatalogTable catalogTable;
     private JobContext jobContext;
 
+    public TableStoreSource(ReadonlyConfig config) {
+        this.tableStoreConfig = new TableStoreConfig(config);
+        this.catalogTable = CatalogTableUtil.buildWithConfig(config);
+    }
+
     @Override
     public String getPluginName() {
-        return "Tablestore";
+        return TableStoreSourceOptions.identifier;
     }
 
     @Override
     public List<CatalogTable> getProducedCatalogTables() {
-        return SeaTunnelSource.super.getProducedCatalogTables();
-    }
-
-    public TableStoreDBSource(ReadonlyConfig config) {
-        this.tablestoreOptions = TablestoreOptions.of(config);
-        CatalogTableUtil.buildWithConfig(config);
-        this.typeInfo = 
CatalogTableUtil.buildWithConfig(config).getSeaTunnelRowType();
+        return Collections.singletonList(catalogTable);
     }
 
     @Override
@@ -70,29 +70,28 @@ public class TableStoreDBSource
     }
 
     @Override
-    public SourceReader<SeaTunnelRow, TableStoreDBSourceSplit> 
createReader(Context readerContext)
+    public SourceReader<SeaTunnelRow, TableStoreSourceSplit> 
createReader(Context readerContext)
             throws Exception {
-        return new TableStoreDBSourceReader(readerContext, tablestoreOptions, 
typeInfo);
+        return new TableStoreSourceReader(
+                readerContext, tableStoreConfig, 
catalogTable.getSeaTunnelRowType());
     }
 
     @Override
-    public SourceSplitEnumerator<TableStoreDBSourceSplit, 
TableStoreDBSourceState> createEnumerator(
-            
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context<TableStoreDBSourceSplit>
+    public SourceSplitEnumerator<TableStoreSourceSplit, TableStoreSourceState> 
createEnumerator(
+            
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context<TableStoreSourceSplit>
                     enumeratorContext)
             throws Exception {
-        return new TableStoreDBSourceSplitEnumerator(enumeratorContext, 
tablestoreOptions);
+        return new TableStoreSourceSplitEnumerator(enumeratorContext, 
tableStoreConfig);
     }
 
     @Override
-    public SourceSplitEnumerator<TableStoreDBSourceSplit, 
TableStoreDBSourceState>
-            restoreEnumerator(
-                    
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context<
-                                    TableStoreDBSourceSplit>
-                            enumeratorContext,
-                    TableStoreDBSourceState checkpointState)
-                    throws Exception {
-        return new TableStoreDBSourceSplitEnumerator(
-                enumeratorContext, tablestoreOptions, checkpointState);
+    public SourceSplitEnumerator<TableStoreSourceSplit, TableStoreSourceState> 
restoreEnumerator(
+            
org.apache.seatunnel.api.source.SourceSplitEnumerator.Context<TableStoreSourceSplit>
+                    enumeratorContext,
+            TableStoreSourceState checkpointState)
+            throws Exception {
+        return new TableStoreSourceSplitEnumerator(
+                enumeratorContext, tableStoreConfig, checkpointState);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDbSourceFactory.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceFactory.java
similarity index 77%
rename from 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDbSourceFactory.java
rename to 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceFactory.java
index f93ae4bfe3..f17b72644b 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDbSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceFactory.java
@@ -23,30 +23,30 @@ import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
-import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreSourceOptions;
 
 import com.google.auto.service.AutoService;
 
 import java.io.Serializable;
 
 @AutoService(Factory.class)
-public class TableStoreDbSourceFactory implements TableSourceFactory {
+public class TableStoreSourceFactory implements TableSourceFactory {
 
     @Override
     public String factoryIdentifier() {
-        return "Tablestore";
+        return TableStoreSourceOptions.identifier;
     }
 
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
                 .required(
-                        TablestoreConfig.END_POINT,
-                        TablestoreConfig.INSTANCE_NAME,
-                        TablestoreConfig.ACCESS_KEY_ID,
-                        TablestoreConfig.ACCESS_KEY_SECRET,
-                        TablestoreConfig.TABLE,
-                        TablestoreConfig.PRIMARY_KEYS)
+                        TableStoreSourceOptions.END_POINT,
+                        TableStoreSourceOptions.INSTANCE_NAME,
+                        TableStoreSourceOptions.ACCESS_KEY_ID,
+                        TableStoreSourceOptions.ACCESS_KEY_SECRET,
+                        TableStoreSourceOptions.TABLE,
+                        TableStoreSourceOptions.PRIMARY_KEYS)
                 .build();
     }
 
@@ -54,11 +54,11 @@ public class TableStoreDbSourceFactory implements 
TableSourceFactory {
     public <T, SplitT extends SourceSplit, StateT extends Serializable>
             TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
         return () ->
-                (SeaTunnelSource<T, SplitT, StateT>) new 
TableStoreDBSource(context.getOptions());
+                (SeaTunnelSource<T, SplitT, StateT>) new 
TableStoreSource(context.getOptions());
     }
 
     @Override
     public Class<? extends SeaTunnelSource> getSourceClass() {
-        return TableStoreDBSource.class;
+        return TableStoreSource.class;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceReader.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceReader.java
similarity index 82%
rename from 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceReader.java
rename to 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceReader.java
index eefd4aae03..d99ce6f784 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceReader.java
@@ -1,4 +1,3 @@
-package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -15,11 +14,14 @@ package 
org.apache.seatunnel.connectors.seatunnel.tablestore.source;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
+package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
+
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreConfig;
 
 import com.alicloud.openservices.tablestore.SyncClient;
 import com.alicloud.openservices.tablestore.TunnelClient;
@@ -42,24 +44,23 @@ import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedDeque;
 
 @Slf4j
-public class TableStoreDBSourceReader
-        implements SourceReader<SeaTunnelRow, TableStoreDBSourceSplit> {
+public class TableStoreSourceReader implements SourceReader<SeaTunnelRow, 
TableStoreSourceSplit> {
 
     protected SourceReader.Context context;
-    protected TablestoreOptions tablestoreOptions;
+    protected TableStoreConfig tableStoreConfig;
     protected SeaTunnelRowType seaTunnelRowType;
-    Queue<TableStoreDBSourceSplit> pendingSplits = new 
ConcurrentLinkedDeque<>();
+    Queue<TableStoreSourceSplit> pendingSplits = new ConcurrentLinkedDeque<>();
     private SyncClient client;
     private volatile boolean noMoreSplit;
     private TunnelClient tunnelClient;
 
-    public TableStoreDBSourceReader(
+    public TableStoreSourceReader(
             SourceReader.Context context,
-            TablestoreOptions options,
+            TableStoreConfig options,
             SeaTunnelRowType seaTunnelRowType) {
 
         this.context = context;
-        this.tablestoreOptions = options;
+        this.tableStoreConfig = options;
         this.seaTunnelRowType = seaTunnelRowType;
     }
 
@@ -67,16 +68,16 @@ public class TableStoreDBSourceReader
     public void open() throws Exception {
         client =
                 new SyncClient(
-                        tablestoreOptions.getEndpoint(),
-                        tablestoreOptions.getAccessKeyId(),
-                        tablestoreOptions.getAccessKeySecret(),
-                        tablestoreOptions.getInstanceName());
+                        tableStoreConfig.getEndpoint(),
+                        tableStoreConfig.getAccessKeyId(),
+                        tableStoreConfig.getAccessKeySecret(),
+                        tableStoreConfig.getInstanceName());
         tunnelClient =
                 new TunnelClient(
-                        tablestoreOptions.getEndpoint(),
-                        tablestoreOptions.getAccessKeyId(),
-                        tablestoreOptions.getAccessKeySecret(),
-                        tablestoreOptions.getInstanceName());
+                        tableStoreConfig.getEndpoint(),
+                        tableStoreConfig.getAccessKeyId(),
+                        tableStoreConfig.getAccessKeySecret(),
+                        tableStoreConfig.getInstanceName());
     }
 
     @Override
@@ -88,7 +89,7 @@ public class TableStoreDBSourceReader
     @Override
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
         synchronized (output.getCheckpointLock()) {
-            TableStoreDBSourceSplit split = pendingSplits.poll();
+            TableStoreSourceSplit split = pendingSplits.poll();
             if (Objects.nonNull(split)) {
                 read(split, output);
             }
@@ -108,7 +109,7 @@ public class TableStoreDBSourceReader
         }
     }
 
-    private void read(TableStoreDBSourceSplit split, Collector<SeaTunnelRow> 
output) {
+    private void read(TableStoreSourceSplit split, Collector<SeaTunnelRow> 
output) {
         String tunnelId = getTunel(split);
         TableStoreProcessor processor =
                 new TableStoreProcessor(split.getTableName(), 
split.getPrimaryKey(), output);
@@ -122,7 +123,7 @@ public class TableStoreDBSourceReader
         }
     }
 
-    public String getTunel(TableStoreDBSourceSplit split) {
+    public String getTunel(TableStoreSourceSplit split) {
         deleteTunel(split);
         String tunnelId = null;
         String tunnelName = split.getTableName() + "_migration2aws_tunnel4" + 
split.getSplitId();
@@ -142,7 +143,7 @@ public class TableStoreDBSourceReader
         return tunnelId;
     }
 
-    public void deleteTunel(TableStoreDBSourceSplit split) {
+    public void deleteTunel(TableStoreSourceSplit split) {
         String tunnelName = split.getTableName() + "_migration2aws_tunnel4" + 
split.getSplitId();
         try {
             DeleteTunnelRequest drequest =
@@ -155,12 +156,12 @@ public class TableStoreDBSourceReader
     }
 
     @Override
-    public List<TableStoreDBSourceSplit> snapshotState(long checkpointId) 
throws Exception {
+    public List<TableStoreSourceSplit> snapshotState(long checkpointId) throws 
Exception {
         return new ArrayList<>(pendingSplits);
     }
 
     @Override
-    public void addSplits(List<TableStoreDBSourceSplit> splits) {
+    public void addSplits(List<TableStoreSourceSplit> splits) {
         this.pendingSplits.addAll(splits);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplit.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceSplit.java
similarity index 95%
rename from 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplit.java
rename to 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceSplit.java
index b1ad1fb856..5881a07643 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplit.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceSplit.java
@@ -25,7 +25,7 @@ import lombok.Setter;
 @AllArgsConstructor
 @Getter
 @Setter
-public class TableStoreDBSourceSplit implements SourceSplit {
+public class TableStoreSourceSplit implements SourceSplit {
 
     private static final long serialVersionUID = 6471832674315580956L;
     private Integer splitId;
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceSplitEnumerator.java
similarity index 70%
rename from 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplitEnumerator.java
rename to 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceSplitEnumerator.java
index 3dd58b7e69..7239d6185c 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceSplitEnumerator.java
@@ -17,7 +17,7 @@
 package org.apache.seatunnel.connectors.seatunnel.tablestore.source;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TablestoreOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.config.TableStoreConfig;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -32,32 +32,31 @@ import java.util.Map;
 import java.util.Set;
 
 @Slf4j
-public class TableStoreDBSourceSplitEnumerator
-        implements SourceSplitEnumerator<TableStoreDBSourceSplit, 
TableStoreDBSourceState> {
+public class TableStoreSourceSplitEnumerator
+        implements SourceSplitEnumerator<TableStoreSourceSplit, 
TableStoreSourceState> {
 
-    private final SourceSplitEnumerator.Context<TableStoreDBSourceSplit> 
enumeratorContext;
-    private final Map<Integer, List<TableStoreDBSourceSplit>> pendingSplits;
-    private final TablestoreOptions tablestoreOptions;
+    private final SourceSplitEnumerator.Context<TableStoreSourceSplit> 
enumeratorContext;
+    private final Map<Integer, List<TableStoreSourceSplit>> pendingSplits;
+    private final TableStoreConfig tableStoreConfig;
 
     private final Object stateLock = new Object();
     private volatile boolean shouldEnumerate;
 
     /**
      * @param enumeratorContext
-     * @param tablestoreOptions
+     * @param tableStoreConfig
      */
-    public TableStoreDBSourceSplitEnumerator(
-            Context<TableStoreDBSourceSplit> enumeratorContext,
-            TablestoreOptions tablestoreOptions) {
-        this(enumeratorContext, tablestoreOptions, null);
+    public TableStoreSourceSplitEnumerator(
+            Context<TableStoreSourceSplit> enumeratorContext, TableStoreConfig 
tableStoreConfig) {
+        this(enumeratorContext, tableStoreConfig, null);
     }
 
-    public TableStoreDBSourceSplitEnumerator(
-            Context<TableStoreDBSourceSplit> enumeratorContext,
-            TablestoreOptions tablestoreOptions,
-            TableStoreDBSourceState sourceState) {
+    public TableStoreSourceSplitEnumerator(
+            Context<TableStoreSourceSplit> enumeratorContext,
+            TableStoreConfig tableStoreConfig,
+            TableStoreSourceState sourceState) {
         this.enumeratorContext = enumeratorContext;
-        this.tablestoreOptions = tablestoreOptions;
+        this.tableStoreConfig = tableStoreConfig;
         this.pendingSplits = new HashMap<>();
         this.shouldEnumerate = sourceState == null;
         if (sourceState != null) {
@@ -73,7 +72,7 @@ public class TableStoreDBSourceSplitEnumerator
     public void run() throws Exception {
         Set<Integer> readers = enumeratorContext.registeredReaders();
         if (shouldEnumerate) {
-            Set<TableStoreDBSourceSplit> newSplits = 
getTableStoreDBSourceSplit();
+            Set<TableStoreSourceSplit> newSplits = 
getTableStoreDBSourceSplit();
             synchronized (stateLock) {
                 addPendingSplit(newSplits);
                 shouldEnumerate = false;
@@ -84,7 +83,7 @@ public class TableStoreDBSourceSplitEnumerator
 
     private void assignSplit(Set<Integer> readers) {
         for (int reader : readers) {
-            List<TableStoreDBSourceSplit> assignmentForReader = 
pendingSplits.remove(reader);
+            List<TableStoreSourceSplit> assignmentForReader = 
pendingSplits.remove(reader);
             if (assignmentForReader != null && !assignmentForReader.isEmpty()) 
{
                 log.info("Assign splits {} to reader {}", assignmentForReader, 
reader);
                 try {
@@ -101,22 +100,22 @@ public class TableStoreDBSourceSplitEnumerator
         }
     }
 
-    private Set<TableStoreDBSourceSplit> getTableStoreDBSourceSplit() {
+    private Set<TableStoreSourceSplit> getTableStoreDBSourceSplit() {
 
-        Set<TableStoreDBSourceSplit> allSplit = new HashSet<>();
-        String tables = tablestoreOptions.getTable();
+        Set<TableStoreSourceSplit> allSplit = new HashSet<>();
+        String tables = tableStoreConfig.getTable();
         String[] tableArr = tables.split(",");
         for (int i = 0; i < tableArr.length; i++) {
             allSplit.add(
-                    new TableStoreDBSourceSplit(
-                            i, tableArr[i], 
tablestoreOptions.getPrimaryKeys().get(i)));
+                    new TableStoreSourceSplit(
+                            i, tableArr[i], 
tableStoreConfig.getPrimaryKeys().get(i)));
         }
         return allSplit;
     }
 
-    private void addPendingSplit(Collection<TableStoreDBSourceSplit> splits) {
+    private void addPendingSplit(Collection<TableStoreSourceSplit> splits) {
         int readerCount = enumeratorContext.currentParallelism();
-        for (TableStoreDBSourceSplit split : splits) {
+        for (TableStoreSourceSplit split : splits) {
             int ownerReader = split.getSplitId() % readerCount;
             pendingSplits.computeIfAbsent(ownerReader, k -> new 
ArrayList<>()).add(split);
         }
@@ -129,7 +128,7 @@ public class TableStoreDBSourceSplitEnumerator
     }
 
     @Override
-    public void addSplitsBack(List<TableStoreDBSourceSplit> splits, int 
subtaskId) {
+    public void addSplitsBack(List<TableStoreSourceSplit> splits, int 
subtaskId) {
         log.debug("Add back splits {} to tablestore.", splits);
         if (!splits.isEmpty()) {
             addPendingSplit(splits);
@@ -155,9 +154,9 @@ public class TableStoreDBSourceSplitEnumerator
     }
 
     @Override
-    public TableStoreDBSourceState snapshotState(long checkpointId) throws 
Exception {
+    public TableStoreSourceState snapshotState(long checkpointId) throws 
Exception {
         synchronized (stateLock) {
-            return new TableStoreDBSourceState(shouldEnumerate, pendingSplits);
+            return new TableStoreSourceState(shouldEnumerate, pendingSplits);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceState.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceState.java
similarity index 89%
rename from 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceState.java
rename to 
seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceState.java
index e18d0ea952..7d5ca178a3 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreDBSourceState.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/main/java/org/apache/seatunnel/connectors/seatunnel/tablestore/source/TableStoreSourceState.java
@@ -27,9 +27,9 @@ import java.util.Map;
 @Getter
 @Setter
 @AllArgsConstructor
-public class TableStoreDBSourceState implements Serializable {
+public class TableStoreSourceState implements Serializable {
 
     private static final long serialVersionUID = -2942147037830134078L;
     private boolean shouldEnumerate;
-    private Map<Integer, List<TableStoreDBSourceSplit>> pendingSplits;
+    private Map<Integer, List<TableStoreSourceSplit>> pendingSplits;
 }
diff --git 
a/seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
 
b/seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TableStoreFactoryTest.java
similarity index 90%
rename from 
seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
rename to 
seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TableStoreFactoryTest.java
index 58920f8a51..26984454d8 100644
--- 
a/seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TablestoreFactoryTest.java
+++ 
b/seatunnel-connectors-v2/connector-tablestore/src/test/java/org/apache/seatunnel/connectors/seatunnel/tablestore/TableStoreFactoryTest.java
@@ -17,15 +17,15 @@
 
 package org.apache.seatunnel.connectors.seatunnel.tablestore;
 
-import 
org.apache.seatunnel.connectors.seatunnel.tablestore.sink.TablestoreSinkFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.tablestore.sink.TableStoreSinkFactory;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-class TablestoreFactoryTest {
+class TableStoreFactoryTest {
 
     @Test
     void optionRule() {
-        Assertions.assertNotNull((new TablestoreSinkFactory()).optionRule());
+        Assertions.assertNotNull((new TableStoreSinkFactory()).optionRule());
     }
 }

Reply via email to