This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a69efca0fd [improve] update amazondynamodb connector (#8601)
a69efca0fd is described below

commit a69efca0fdca28bdb46afde395e0bea7c404c82b
Author: Jarvis <[email protected]>
AuthorDate: Fri Feb 7 14:01:26 2025 +0800

    [improve] update amazondynamodb connector (#8601)
---
 .../seatunnel/api/ConnectorOptionCheckTest.java    |  1 -
 ...BConfig.java => AmazonDynamoDBBaseOptions.java} | 28 +-------
 .../config/AmazonDynamoDBConfig.java               | 84 +++++++++-------------
 .../config/AmazonDynamoDBSinkOptions.java          | 30 ++++++++
 .../config/AmazonDynamoDBSourceOptions.java        | 62 ++++------------
 .../serialize/DefaultSeaTunnelRowSerializer.java   | 11 ++-
 .../amazondynamodb/sink/AmazonDynamoDBSink.java    | 62 ++++------------
 .../sink/AmazonDynamoDBSinkFactory.java            | 25 ++++---
 .../amazondynamodb/sink/AmazonDynamoDBWriter.java  | 10 ++-
 .../amazondynamodb/sink/DynamoDbSinkClient.java    | 22 +++---
 .../source/AmazonDynamoDBSource.java               | 67 +++++------------
 .../source/AmazonDynamoDBSourceFactory.java        | 41 +++++++----
 .../source/AmazonDynamoDBSourceReader.java         | 18 ++---
 .../AmazonDynamoDBSourceSplitEnumerator.java       | 16 ++---
 14 files changed, 190 insertions(+), 287 deletions(-)

diff --git 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
index 88f4f69a29..a473f0f685 100644
--- 
a/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
+++ 
b/seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java
@@ -224,7 +224,6 @@ public class ConnectorOptionCheckTest {
         whiteList.add("RocketMqSourceOptions");
         whiteList.add("TablestoreSinkOptions");
         whiteList.add("TableStoreDBSourceOptions");
-        whiteList.add("AmazonDynamoDBSinkOptions");
         whiteList.add("KuduSinkOptions");
         whiteList.add("TDengineSinkOptions");
         whiteList.add("Neo4jSourceOptions");
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBBaseOptions.java
similarity index 65%
copy from 
seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
copy to 
seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBBaseOptions.java
index 87d69c74c1..4507e7c46d 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBBaseOptions.java
@@ -22,7 +22,7 @@ import org.apache.seatunnel.api.configuration.Options;
 
 import java.io.Serializable;
 
-public class AmazonDynamoDBConfig implements Serializable {
+public class AmazonDynamoDBBaseOptions implements Serializable {
     public static final Option<String> URL =
             Options.key("url")
                     .stringType()
@@ -48,30 +48,4 @@ public class AmazonDynamoDBConfig implements Serializable {
                     .stringType()
                     .noDefaultValue()
                     .withDescription("The table of Amazon DynamoDB");
-
-    public static final Option<Integer> BATCH_SIZE =
-            Options.key("batch_size")
-                    .intType()
-                    .defaultValue(25)
-                    .withDescription("The batch size of Amazon DynamoDB");
-
-    public static final Option<Integer> BATCH_INTERVAL_MS =
-            Options.key("batch_interval_ms")
-                    .intType()
-                    .defaultValue(1000)
-                    .withDescription("The batch interval of Amazon DynamoDB");
-
-    @SuppressWarnings("checkstyle:MagicNumber")
-    public static final Option<Integer> SCAN_ITEM_LIMIT =
-            Options.key("scan_item_limit")
-                    .intType()
-                    .defaultValue(1)
-                    .withDescription("number of item each scan request should 
return");
-
-    @SuppressWarnings("checkstyle:MagicNumber")
-    public static final Option<Integer> PARALLEL_SCAN_THREADS =
-            Options.key("parallel_scan_threads")
-                    .intType()
-                    .defaultValue(2)
-                    .withDescription("number of logical segments for parallel 
scan");
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
index 87d69c74c1..0c72819776 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
@@ -17,61 +17,47 @@
 
 package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
 
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
 
 import java.io.Serializable;
 
+@Data
+@AllArgsConstructor
 public class AmazonDynamoDBConfig implements Serializable {
-    public static final Option<String> URL =
-            Options.key("url")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("url to read to Amazon DynamoDB");
-    public static final Option<String> REGION =
-            Options.key("region")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The region of Amazon DynamoDB");
-    public static final Option<String> ACCESS_KEY_ID =
-            Options.key("access_key_id")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The access id of Amazon DynamoDB");
-    public static final Option<String> SECRET_ACCESS_KEY =
-            Options.key("secret_access_key")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The access secret key of Amazon 
DynamoDB");
-    public static final Option<String> TABLE =
-            Options.key("table")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("The table of Amazon DynamoDB");
 
-    public static final Option<Integer> BATCH_SIZE =
-            Options.key("batch_size")
-                    .intType()
-                    .defaultValue(25)
-                    .withDescription("The batch size of Amazon DynamoDB");
+    private String url;
+
+    private String region;
+
+    private String accessKeyId;
+
+    private String secretAccessKey;
+
+    private String table;
 
-    public static final Option<Integer> BATCH_INTERVAL_MS =
-            Options.key("batch_interval_ms")
-                    .intType()
-                    .defaultValue(1000)
-                    .withDescription("The batch interval of Amazon DynamoDB");
+    private Config schema;
 
-    @SuppressWarnings("checkstyle:MagicNumber")
-    public static final Option<Integer> SCAN_ITEM_LIMIT =
-            Options.key("scan_item_limit")
-                    .intType()
-                    .defaultValue(1)
-                    .withDescription("number of item each scan request should 
return");
+    public int batchSize;
+    public int scanItemLimit;
+    public int parallelScanThreads;
 
-    @SuppressWarnings("checkstyle:MagicNumber")
-    public static final Option<Integer> PARALLEL_SCAN_THREADS =
-            Options.key("parallel_scan_threads")
-                    .intType()
-                    .defaultValue(2)
-                    .withDescription("number of logical segments for parallel 
scan");
+    public AmazonDynamoDBConfig(ReadonlyConfig config) {
+        this.url = config.get(AmazonDynamoDBBaseOptions.URL);
+        this.region = config.get(AmazonDynamoDBBaseOptions.REGION);
+        this.accessKeyId = config.get(AmazonDynamoDBBaseOptions.ACCESS_KEY_ID);
+        this.secretAccessKey = 
config.get(AmazonDynamoDBBaseOptions.SECRET_ACCESS_KEY);
+        this.table = config.get(AmazonDynamoDBBaseOptions.TABLE);
+        if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
+            this.schema = 
ReadonlyConfig.fromMap(config.get(TableSchemaOptions.SCHEMA)).toConfig();
+        }
+        this.batchSize = config.get(AmazonDynamoDBSinkOptions.BATCH_SIZE);
+        this.scanItemLimit = 
config.get(AmazonDynamoDBSourceOptions.SCAN_ITEM_LIMIT);
+        this.parallelScanThreads = 
config.get(AmazonDynamoDBSourceOptions.PARALLEL_SCAN_THREADS);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSinkOptions.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSinkOptions.java
new file mode 100644
index 0000000000..48aebf8be1
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSinkOptions.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class AmazonDynamoDBSinkOptions extends AmazonDynamoDBBaseOptions {
+
+    public static final Option<Integer> BATCH_SIZE =
+            Options.key("batch_size")
+                    .intType()
+                    .defaultValue(25)
+                    .withDescription("The batch size of Amazon DynamoDB");
+}
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
index bd4f32b08f..57d44adb0a 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSourceOptions.java
@@ -17,57 +17,25 @@
 
 package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
 
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-@AllArgsConstructor
-public class AmazonDynamoDBSourceOptions implements Serializable {
-
-    private String url;
-
-    private String region;
-
-    private String accessKeyId;
-
-    private String secretAccessKey;
+import java.util.Map;
 
-    private String table;
+public class AmazonDynamoDBSourceOptions extends AmazonDynamoDBBaseOptions {
 
-    private Config schema;
+    public static final Option<Integer> SCAN_ITEM_LIMIT =
+            Options.key("scan_item_limit")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription("number of item each scan request should 
return");
 
-    public int batchSize = AmazonDynamoDBConfig.BATCH_SIZE.defaultValue();
-    public int batchIntervalMs = 
AmazonDynamoDBConfig.BATCH_INTERVAL_MS.defaultValue();
-    public int scanItemLimit = 
AmazonDynamoDBConfig.SCAN_ITEM_LIMIT.defaultValue();
-    public int parallelScanThreads = 
AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS.defaultValue();
+    public static final Option<Integer> PARALLEL_SCAN_THREADS =
+            Options.key("parallel_scan_threads")
+                    .intType()
+                    .defaultValue(2)
+                    .withDescription("number of logical segments for parallel 
scan");
 
-    public AmazonDynamoDBSourceOptions(Config config) {
-        this.url = config.getString(AmazonDynamoDBConfig.URL.key());
-        this.region = config.getString(AmazonDynamoDBConfig.REGION.key());
-        this.accessKeyId = 
config.getString(AmazonDynamoDBConfig.ACCESS_KEY_ID.key());
-        this.secretAccessKey = 
config.getString(AmazonDynamoDBConfig.SECRET_ACCESS_KEY.key());
-        this.table = config.getString(AmazonDynamoDBConfig.TABLE.key());
-        if (config.hasPath(TableSchemaOptions.SCHEMA.key())) {
-            this.schema = config.getConfig(TableSchemaOptions.SCHEMA.key());
-        }
-        if (config.hasPath(AmazonDynamoDBConfig.BATCH_SIZE.key())) {
-            this.batchSize = 
config.getInt(AmazonDynamoDBConfig.BATCH_SIZE.key());
-        }
-        if (config.hasPath(AmazonDynamoDBConfig.BATCH_INTERVAL_MS.key())) {
-            this.batchIntervalMs = 
config.getInt(AmazonDynamoDBConfig.BATCH_INTERVAL_MS.key());
-        }
-        if (config.hasPath(AmazonDynamoDBConfig.SCAN_ITEM_LIMIT.key())) {
-            this.scanItemLimit = 
config.getInt(AmazonDynamoDBConfig.SCAN_ITEM_LIMIT.key());
-        }
-        if (config.hasPath(AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS.key())) {
-            this.parallelScanThreads =
-                    
config.getInt(AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS.key());
-        }
-    }
+    public static final Option<Map<String, Object>> SCHEMA = 
TableSchemaOptions.SCHEMA;
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowSerializer.java
index 5cef8d2bc6..6d31eecca6 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/serialize/DefaultSeaTunnelRowSerializer.java
@@ -23,7 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
-import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
 
 import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@@ -40,14 +40,13 @@ import java.util.stream.Stream;
 public class DefaultSeaTunnelRowSerializer implements SeaTunnelRowSerializer {
 
     private final SeaTunnelRowType seaTunnelRowType;
-    private final AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
+    private final AmazonDynamoDBConfig amazondynamodbConfig;
     private final List<AttributeValue.Type> measurementsType;
 
     public DefaultSeaTunnelRowSerializer(
-            SeaTunnelRowType seaTunnelRowType,
-            AmazonDynamoDBSourceOptions amazondynamodbSourceOptions) {
+            SeaTunnelRowType seaTunnelRowType, AmazonDynamoDBConfig 
amazondynamodbConfig) {
         this.seaTunnelRowType = seaTunnelRowType;
-        this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
+        this.amazondynamodbConfig = amazondynamodbConfig;
         this.measurementsType = convertTypes(seaTunnelRowType);
     }
 
@@ -65,7 +64,7 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
                             measurementsType.get(index)));
         }
         return PutItemRequest.builder()
-                .tableName(amazondynamodbSourceOptions.getTable())
+                .tableName(amazondynamodbConfig.getTable())
                 .item(itemValues)
                 .build();
     }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
index 68dcc84a42..b9a27a7c29 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
@@ -17,79 +17,41 @@
 
 package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
-import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.exception.AmazonDynamoDBConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 
-import com.google.auto.service.AutoService;
-
 import java.io.IOException;
 import java.util.Optional;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SECRET_ACCESS_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.TABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
-
-@AutoService(SeaTunnelSink.class)
 public class AmazonDynamoDBSink extends AbstractSimpleSink<SeaTunnelRow, Void> 
{
 
-    private SeaTunnelRowType rowType;
+    private CatalogTable catalogTable;
 
-    private AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
+    private AmazonDynamoDBConfig amazondynamodbConfig;
 
-    @Override
-    public String getPluginName() {
-        return "AmazonDynamodb";
+    public AmazonDynamoDBSink(
+            CatalogTable catalogTable, AmazonDynamoDBConfig 
amazondynamodbConfig) {
+        this.catalogTable = catalogTable;
+        this.amazondynamodbConfig = amazondynamodbConfig;
     }
 
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        URL.key(),
-                        TABLE.key(),
-                        REGION.key(),
-                        ACCESS_KEY_ID.key(),
-                        SECRET_ACCESS_KEY.key());
-        if (!result.isSuccess()) {
-            throw new AmazonDynamoDBConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        amazondynamodbSourceOptions = new 
AmazonDynamoDBSourceOptions(pluginConfig);
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.of(catalogTable);
     }
 
     @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.rowType = seaTunnelRowType;
+    public String getPluginName() {
+        return "AmazonDynamodb";
     }
 
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
             throws IOException {
-        return new AmazonDynamoDBWriter(amazondynamodbSourceOptions, rowType);
-    }
-
-    @Override
-    public Optional<CatalogTable> getWriteCatalogTable() {
-        return super.getWriteCatalogTable();
+        return new AmazonDynamoDBWriter(amazondynamodbConfig, 
catalogTable.getSeaTunnelRowType());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
index 14aaf5fc87..715da7c1b1 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
@@ -18,18 +18,20 @@
 package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.BATCH_INTERVAL_MS;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.BATCH_SIZE;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SECRET_ACCESS_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.TABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.ACCESS_KEY_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.REGION;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.SECRET_ACCESS_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.TABLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.URL;
 
 @AutoService(Factory.class)
 public class AmazonDynamoDBSinkFactory implements TableSinkFactory {
@@ -42,7 +44,14 @@ public class AmazonDynamoDBSinkFactory implements 
TableSinkFactory {
     public OptionRule optionRule() {
         return OptionRule.builder()
                 .required(URL, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY, TABLE)
-                .optional(BATCH_SIZE, BATCH_INTERVAL_MS)
+                .optional(BATCH_SIZE)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () ->
+                new AmazonDynamoDBSink(
+                        context.getCatalogTable(), new 
AmazonDynamoDBConfig(context.getOptions()));
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
index aa27a4b714..f7e39b11a2 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
@@ -19,7 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowSerializer;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
@@ -33,11 +33,9 @@ public class AmazonDynamoDBWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void>
     private final SeaTunnelRowSerializer serializer;
 
     public AmazonDynamoDBWriter(
-            AmazonDynamoDBSourceOptions amazondynamodbSourceOptions,
-            SeaTunnelRowType seaTunnelRowType) {
-        dynamoDbSinkClient = new 
DynamoDbSinkClient(amazondynamodbSourceOptions);
-        serializer =
-                new DefaultSeaTunnelRowSerializer(seaTunnelRowType, 
amazondynamodbSourceOptions);
+            AmazonDynamoDBConfig amazondynamodbConfig, SeaTunnelRowType 
seaTunnelRowType) {
+        dynamoDbSinkClient = new DynamoDbSinkClient(amazondynamodbConfig);
+        serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType, 
amazondynamodbConfig);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
index b12ba15d9d..29ac8b7d4f 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
 
-import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
 
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
@@ -35,13 +35,13 @@ import java.util.List;
 import java.util.Map;
 
 public class DynamoDbSinkClient {
-    private final AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
+    private final AmazonDynamoDBConfig amazondynamodbConfig;
     private volatile boolean initialize;
     private DynamoDbClient dynamoDbClient;
     private final List<WriteRequest> batchList;
 
-    public DynamoDbSinkClient(AmazonDynamoDBSourceOptions 
amazondynamodbSourceOptions) {
-        this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
+    public DynamoDbSinkClient(AmazonDynamoDBConfig amazondynamodbConfig) {
+        this.amazondynamodbConfig = amazondynamodbConfig;
         this.batchList = new ArrayList<>();
     }
 
@@ -51,15 +51,15 @@ public class DynamoDbSinkClient {
         }
         dynamoDbClient =
                 DynamoDbClient.builder()
-                        
.endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl()))
+                        
.endpointOverride(URI.create(amazondynamodbConfig.getUrl()))
                         // The region is meaningless for local DynamoDb but 
required for client
                         // builder validation
-                        
.region(Region.of(amazondynamodbSourceOptions.getRegion()))
+                        .region(Region.of(amazondynamodbConfig.getRegion()))
                         .credentialsProvider(
                                 StaticCredentialsProvider.create(
                                         AwsBasicCredentials.create(
-                                                
amazondynamodbSourceOptions.getAccessKeyId(),
-                                                
amazondynamodbSourceOptions.getSecretAccessKey())))
+                                                
amazondynamodbConfig.getAccessKeyId(),
+                                                
amazondynamodbConfig.getSecretAccessKey())))
                         .build();
         initialize = true;
     }
@@ -70,8 +70,8 @@ public class DynamoDbSinkClient {
                 WriteRequest.builder()
                         
.putRequest(PutRequest.builder().item(putItemRequest.item()).build())
                         .build());
-        if (amazondynamodbSourceOptions.getBatchSize() > 0
-                && batchList.size() >= 
amazondynamodbSourceOptions.getBatchSize()) {
+        if (amazondynamodbConfig.getBatchSize() > 0
+                && batchList.size() >= amazondynamodbConfig.getBatchSize()) {
             flush();
         }
     }
@@ -88,7 +88,7 @@ public class DynamoDbSinkClient {
             return;
         }
         Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
-        requestItems.put(amazondynamodbSourceOptions.getTable(), batchList);
+        requestItems.put(amazondynamodbConfig.getTable(), batchList);
         dynamoDbClient.batchWriteItem(
                 
BatchWriteItemRequest.builder().requestItems(requestItems).build());
 
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
index e0175b4da7..12f7f8435d 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSource.java
@@ -17,83 +17,50 @@
 
 package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.source.SupportColumnProjection;
 import org.apache.seatunnel.api.source.SupportParallelism;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
-import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.exception.AmazonDynamoDBConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
 
-import com.google.auto.service.AutoService;
 import lombok.extern.slf4j.Slf4j;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SECRET_ACCESS_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.TABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
+import java.util.Collections;
+import java.util.List;
 
 @Slf4j
-@AutoService(SeaTunnelSource.class)
 public class AmazonDynamoDBSource
         implements SeaTunnelSource<
                         SeaTunnelRow, AmazonDynamoDBSourceSplit, 
AmazonDynamoDBSourceState>,
                 SupportParallelism,
                 SupportColumnProjection {
 
-    private AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
+    private AmazonDynamoDBConfig amazondynamodbConfig;
+    private CatalogTable catalogTable;
 
-    private SeaTunnelRowType typeInfo;
+    public AmazonDynamoDBSource(
+            AmazonDynamoDBConfig amazondynamodbConfig, CatalogTable 
catalogTable) {
+        this.amazondynamodbConfig = amazondynamodbConfig;
+        this.catalogTable = catalogTable;
+    }
 
     @Override
     public String getPluginName() {
         return "AmazonDynamodb";
     }
 
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        URL.key(),
-                        TABLE.key(),
-                        REGION.key(),
-                        ACCESS_KEY_ID.key(),
-                        SECRET_ACCESS_KEY.key(),
-                        TableSchemaOptions.SCHEMA.key());
-        if (!result.isSuccess()) {
-            throw new AmazonDynamoDBConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        amazondynamodbSourceOptions = new 
AmazonDynamoDBSourceOptions(pluginConfig);
-        typeInfo = 
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
-    }
-
     @Override
     public Boundedness getBoundedness() {
         return Boundedness.BOUNDED;
     }
 
     @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        return this.typeInfo;
+    public List<CatalogTable> getProducedCatalogTables() {
+        return Collections.singletonList(catalogTable);
     }
 
     @Override
@@ -101,8 +68,7 @@ public class AmazonDynamoDBSource
             createEnumerator(
                     SourceSplitEnumerator.Context<AmazonDynamoDBSourceSplit> 
enumeratorContext)
                     throws Exception {
-        return new AmazonDynamoDBSourceSplitEnumerator(
-                enumeratorContext, amazondynamodbSourceOptions);
+        return new AmazonDynamoDBSourceSplitEnumerator(enumeratorContext, 
amazondynamodbConfig);
     }
 
     @Override
@@ -112,12 +78,13 @@ public class AmazonDynamoDBSource
                     AmazonDynamoDBSourceState checkpointState)
                     throws Exception {
         return new AmazonDynamoDBSourceSplitEnumerator(
-                enumeratorContext, amazondynamodbSourceOptions, 
checkpointState);
+                enumeratorContext, amazondynamodbConfig, checkpointState);
     }
 
     @Override
     public SourceReader<SeaTunnelRow, AmazonDynamoDBSourceSplit> createReader(
             SourceReader.Context readerContext) throws Exception {
-        return new AmazonDynamoDBSourceReader(readerContext, 
amazondynamodbSourceOptions, typeInfo);
+        return new AmazonDynamoDBSourceReader(
+                readerContext, amazondynamodbConfig, 
catalogTable.getSeaTunnelRowType());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
index 047d78c0d6..3218e5a29a 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceFactory.java
@@ -19,19 +19,26 @@ package 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
+import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
 
 import com.google.auto.service.AutoService;
 
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.ACCESS_KEY_ID;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.PARALLEL_SCAN_THREADS;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.REGION;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SCAN_ITEM_LIMIT;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.SECRET_ACCESS_KEY;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.TABLE;
-import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig.URL;
+import java.io.Serializable;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions.ACCESS_KEY_ID;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions.PARALLEL_SCAN_THREADS;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions.REGION;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions.SCAN_ITEM_LIMIT;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions.SCHEMA;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions.SECRET_ACCESS_KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions.TABLE;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions.URL;
 
 @AutoService(Factory.class)
 public class AmazonDynamoDBSourceFactory implements TableSourceFactory {
@@ -43,17 +50,21 @@ public class AmazonDynamoDBSourceFactory implements 
TableSourceFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(
-                        URL,
-                        REGION,
-                        ACCESS_KEY_ID,
-                        SECRET_ACCESS_KEY,
-                        TABLE,
-                        TableSchemaOptions.SCHEMA)
+                .required(URL, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY, 
TABLE, SCHEMA)
                 .optional(SCAN_ITEM_LIMIT, PARALLEL_SCAN_THREADS)
                 .build();
     }
 
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable>
+            TableSource<T, SplitT, StateT> 
createSource(TableSourceFactoryContext context) {
+        return () ->
+                (SeaTunnelSource<T, SplitT, StateT>)
+                        new AmazonDynamoDBSource(
+                                new AmazonDynamoDBConfig(context.getOptions()),
+                                
CatalogTableUtil.buildWithConfig(context.getOptions()));
+    }
+
     @Override
     public Class<? extends SeaTunnelSource> getSourceClass() {
         return AmazonDynamoDBSource.class;
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
index 3f1962a3eb..a9e72d9977 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceReader.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.DefaultSeaTunnelRowDeserializer;
 import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.serialize.SeaTunnelRowDeserializer;
 
@@ -46,7 +46,7 @@ public class AmazonDynamoDBSourceReader
 
     protected DynamoDbClient dynamoDbClient;
     protected SourceReader.Context context;
-    protected AmazonDynamoDBSourceOptions amazondynamodbSourceOptions;
+    protected AmazonDynamoDBConfig amazondynamodbConfig;
     protected SeaTunnelRowDeserializer seaTunnelRowDeserializer;
     Queue<AmazonDynamoDBSourceSplit> pendingSplits = new 
ConcurrentLinkedDeque<>();
 
@@ -54,10 +54,10 @@ public class AmazonDynamoDBSourceReader
 
     public AmazonDynamoDBSourceReader(
             SourceReader.Context context,
-            AmazonDynamoDBSourceOptions amazondynamodbSourceOptions,
+            AmazonDynamoDBConfig amazondynamodbConfig,
             SeaTunnelRowType typeInfo) {
         this.context = context;
-        this.amazondynamodbSourceOptions = amazondynamodbSourceOptions;
+        this.amazondynamodbConfig = amazondynamodbConfig;
         this.seaTunnelRowDeserializer = new 
DefaultSeaTunnelRowDeserializer(typeInfo);
     }
 
@@ -65,15 +65,15 @@ public class AmazonDynamoDBSourceReader
     public void open() {
         dynamoDbClient =
                 DynamoDbClient.builder()
-                        
.endpointOverride(URI.create(amazondynamodbSourceOptions.getUrl()))
+                        
.endpointOverride(URI.create(amazondynamodbConfig.getUrl()))
                         // The region is meaningless for local DynamoDb but 
required for client
                         // builder validation
-                        
.region(Region.of(amazondynamodbSourceOptions.getRegion()))
+                        .region(Region.of(amazondynamodbConfig.getRegion()))
                         .credentialsProvider(
                                 StaticCredentialsProvider.create(
                                         AwsBasicCredentials.create(
-                                                
amazondynamodbSourceOptions.getAccessKeyId(),
-                                                
amazondynamodbSourceOptions.getSecretAccessKey())))
+                                                
amazondynamodbConfig.getAccessKeyId(),
+                                                
amazondynamodbConfig.getSecretAccessKey())))
                         .build();
     }
 
@@ -124,7 +124,7 @@ public class AmazonDynamoDBSourceReader
         ScanIterable scan;
         ScanRequest scanRequest =
                 ScanRequest.builder()
-                        .tableName(amazondynamodbSourceOptions.getTable())
+                        .tableName(amazondynamodbConfig.getTable())
                         .limit(split.getItemCount())
                         .segment(split.getSplitId())
                         .totalSegments(split.getTotalSegments())
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplitEnumerator.java
index 03c7e9d5d2..50099f4b54 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/source/AmazonDynamoDBSourceSplitEnumerator.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.source;
 
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSourceOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,23 +41,23 @@ public class AmazonDynamoDBSourceSplitEnumerator
 
     private final SourceSplitEnumerator.Context<AmazonDynamoDBSourceSplit> 
enumeratorContext;
     private final Map<Integer, List<AmazonDynamoDBSourceSplit>> pendingSplits;
-    private final AmazonDynamoDBSourceOptions amazonDynamoDBSourceOptions;
+    private final AmazonDynamoDBConfig amazonDynamoDBConfig;
 
     private final Object stateLock = new Object();
     private volatile boolean shouldEnumerate;
 
     public AmazonDynamoDBSourceSplitEnumerator(
             Context<AmazonDynamoDBSourceSplit> enumeratorContext,
-            AmazonDynamoDBSourceOptions amazonDynamoDBSourceOptions) {
-        this(enumeratorContext, amazonDynamoDBSourceOptions, null);
+            AmazonDynamoDBConfig amazonDynamoDBConfig) {
+        this(enumeratorContext, amazonDynamoDBConfig, null);
     }
 
     public AmazonDynamoDBSourceSplitEnumerator(
             Context<AmazonDynamoDBSourceSplit> enumeratorContext,
-            AmazonDynamoDBSourceOptions amazonDynamoDBSourceOptions,
+            AmazonDynamoDBConfig amazonDynamoDBConfig,
             AmazonDynamoDBSourceState sourceState) {
         this.enumeratorContext = enumeratorContext;
-        this.amazonDynamoDBSourceOptions = amazonDynamoDBSourceOptions;
+        this.amazonDynamoDBConfig = amazonDynamoDBConfig;
         this.pendingSplits = new HashMap<>();
         this.shouldEnumerate = sourceState == null;
         if (sourceState != null) {
@@ -119,8 +119,8 @@ public class AmazonDynamoDBSourceSplitEnumerator
 
     private Set<AmazonDynamoDBSourceSplit> discoverySplits() {
         Set<AmazonDynamoDBSourceSplit> allSplit = new HashSet<>();
-        int totalSegments = amazonDynamoDBSourceOptions.parallelScanThreads;
-        int itemLimit = amazonDynamoDBSourceOptions.scanItemLimit;
+        int totalSegments = amazonDynamoDBConfig.parallelScanThreads;
+        int itemLimit = amazonDynamoDBConfig.scanItemLimit;
         for (int i = 0; i < totalSegments; i++) {
             AmazonDynamoDBSourceSplit split =
                     new AmazonDynamoDBSourceSplit(i, totalSegments, itemLimit);

Reply via email to