This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 0b1c9a52c9 [Feature][Connector-V2] Add multi-table sink support for 
AmazonDynamo… (#10497)
0b1c9a52c9 is described below

commit 0b1c9a52c99eb63fa0f6fc8a93d87b5c1ee89ec2
Author: Mohamed Talal Seif <[email protected]>
AuthorDate: Sun Mar 22 13:16:34 2026 +0200

    [Feature][Connector-V2] Add multi-table sink support for AmazonDynamo… 
(#10497)
---
 docs/en/connectors/sink/AmazonDynamoDB.md          |  42 +++++-
 docs/zh/connectors/sink/AmazonDynamoDB.md          |  42 +++++-
 .../config/AmazonDynamoDBConfig.java               |  12 ++
 .../config/AmazonDynamoDBSinkOptions.java          |  18 +++
 .../amazondynamodb/sink/AmazonDynamoDBSink.java    |  10 +-
 .../sink/AmazonDynamoDBSinkFactory.java            |  11 +-
 .../amazondynamodb/sink/AmazonDynamoDBWriter.java  |  36 ++++-
 .../amazondynamodb/sink/DynamoDbSinkClient.java    | 150 +++++++++++++++++----
 .../AmazonDynamoDBMultiTableSinkTest.java          |  43 ++++++
 9 files changed, 322 insertions(+), 42 deletions(-)

diff --git a/docs/en/connectors/sink/AmazonDynamoDB.md 
b/docs/en/connectors/sink/AmazonDynamoDB.md
index 4d7b05e15a..1bae54cdbc 100644
--- a/docs/en/connectors/sink/AmazonDynamoDB.md
+++ b/docs/en/connectors/sink/AmazonDynamoDB.md
@@ -11,6 +11,7 @@ Write data to Amazon DynamoDB
 ## Key Features
 
 - [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+- [x] [support multiple table 
write](../../introduction/concepts/connector-v2-features.md)
 
 ## Options
 
@@ -21,8 +22,11 @@ Write data to Amazon DynamoDB
 | access_key_id     | string | yes      | -             |
 | secret_access_key | string | yes      | -             |
 | table             | string | yes      | -             |
-| batch_size        | string | no       | 25            |
-| common-options    |        | no       | -             |
+| batch_size          | int    | no       | 25            |
+| max_retries         | int    | no       | 10            |
+| retry_base_delay_ms | long   | no       | 100           |
+| retry_max_delay_ms  | long   | no       | 5000          |
+| common-options      |        | no       | -             |
 
 ### url [string]
 
@@ -42,7 +46,23 @@ The access secret of Amazon DynamoDB.
 
 ### table [string]
 
-The table of Amazon DynamoDB.
+The table of Amazon DynamoDB. Supports `${table_name}` placeholder for 
multi-table sink scenarios.
+
+### batch_size [int]
+
+The number of records to batch before writing to Amazon DynamoDB.
+
+### max_retries [int]
+
+Maximum number of retries when DynamoDB returns unprocessed items in a batch 
write.
+
+### retry_base_delay_ms [long]
+
+Base delay in milliseconds for exponential backoff between retries.
+
+### retry_max_delay_ms [long]
+
+Maximum delay in milliseconds between retries regardless of retry count.
 
 ### common options
 
@@ -50,14 +70,26 @@ Sink plugin common parameters, please refer to [Sink Common 
Options](../common-o
 
 ## Example
 
+### Single table
 ```bash
-Amazondynamodb {
+AmazonDynamoDB {
     url = "http://127.0.0.1:8000";
     region = "us-east-1"
     access_key_id = "dummy-key"
     secret_access_key = "dummy-secret"
     table = "TableName"
-  }
+}
+```
+
+### Multiple table
+```bash
+AmazonDynamoDB {
+    url = "http://127.0.0.1:8000";
+    region = "us-east-1"
+    access_key_id = "dummy-key"
+    secret_access_key = "dummy-secret"
+    table = "${table_name}"
+}
 ```
 
 ## Changelog
diff --git a/docs/zh/connectors/sink/AmazonDynamoDB.md 
b/docs/zh/connectors/sink/AmazonDynamoDB.md
index 3caff15298..d7084211d5 100644
--- a/docs/zh/connectors/sink/AmazonDynamoDB.md
+++ b/docs/zh/connectors/sink/AmazonDynamoDB.md
@@ -11,6 +11,7 @@ import ChangeLog from 
'../changelog/connector-amazondynamodb.md';
 ## 关键特性
 
 - [ ] [精确一次](../../introduction/concepts/connector-v2-features.md)
+- [x] [支持多表写入](../../introduction/concepts/connector-v2-features.md)
 
 ## 选项
 
@@ -21,8 +22,11 @@ import ChangeLog from 
'../changelog/connector-amazondynamodb.md';
 | access_key_id     | string | 是  | -             |
 | secret_access_key | string | 是  | -             |
 | table             | string | 是  | -             |
-| batch_size        | string | 否  | 25            |
-| common-options    |        | 否 | -             |
+| batch_size          | int    | 否  | 25   |
+| max_retries         | int    | 否  | 10   |
+| retry_base_delay_ms | long   | 否  | 100  |
+| retry_max_delay_ms  | long   | 否  | 5000 |
+| common-options      |        | 否  | -    |
 
 ### url [string]
 
@@ -42,7 +46,23 @@ Amazon DynamoDB的访问密钥.
 
 ### table [string]
 
-Amazon DynamoDB 的表名.
+Amazon DynamoDB 的表名. 支持使用 `${table_name}` 占位符,用于多表写入场景.
+
+### batch_size [int]
+
+写入 Amazon DynamoDB 前批量缓存的记录数.
+
+### max_retries [int]
+
+当 DynamoDB 返回未处理数据时,批量写入请求的最大重试次数.
+
+### retry_base_delay_ms [long]
+
+重试之间指数退避的基础延迟时间(毫秒).
+
+### retry_max_delay_ms [long]
+
+重试之间的最大延迟时间(毫秒).
 
 ### 常见选项
 
@@ -50,14 +70,26 @@ Sink插件常用参数,请参考 [Sink Common Options](../common-options/sink-
 
 ## 示例
 
+### 单表写入
 ```bash
-Amazondynamodb {
+AmazonDynamoDB {
     url = "http://127.0.0.1:8000";
     region = "us-east-1"
     access_key_id = "dummy-key"
     secret_access_key = "dummy-secret"
     table = "TableName"
-  }
+}
+```
+
+### 多表写入
+```bash
+AmazonDynamoDB {
+    url = "http://127.0.0.1:8000";
+    region = "us-east-1"
+    access_key_id = "dummy-key"
+    secret_access_key = "dummy-secret"
+    table = "${table_name}"
+}
 ```
 
 ## 变更日志
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
index c1f46b5e40..d6554fe6e7 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBConfig.java
@@ -46,6 +46,9 @@ public class AmazonDynamoDBConfig implements Serializable {
     public int batchSize;
     public int scanItemLimit;
     public int parallelScanThreads;
+    private int maxRetries;
+    private long retryBaseDelayMs;
+    private long retryMaxDelayMs;
 
     public AmazonDynamoDBConfig(ReadonlyConfig config) {
         this.url = config.get(AmazonDynamoDBBaseOptions.URL);
@@ -60,5 +63,14 @@ public class AmazonDynamoDBConfig implements Serializable {
         this.batchSize = config.get(AmazonDynamoDBSinkOptions.BATCH_SIZE);
         this.scanItemLimit = 
config.get(AmazonDynamoDBSourceOptions.SCAN_ITEM_LIMIT);
         this.parallelScanThreads = 
config.get(AmazonDynamoDBSourceOptions.PARALLEL_SCAN_THREADS);
+        this.maxRetries = config.get(AmazonDynamoDBSinkOptions.MAX_RETRIES);
+        if (this.maxRetries < 0) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "max_retries must be a non-negative integer, but 
got: %d",
+                            this.maxRetries));
+        }
+        this.retryBaseDelayMs = 
config.get(AmazonDynamoDBSinkOptions.RETRY_BASE_DELAY_MS);
+        this.retryMaxDelayMs = 
config.get(AmazonDynamoDBSinkOptions.RETRY_MAX_DELAY_MS);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSinkOptions.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSinkOptions.java
index 48aebf8be1..cdab19a5c2 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/config/AmazonDynamoDBSinkOptions.java
@@ -27,4 +27,22 @@ public class AmazonDynamoDBSinkOptions extends 
AmazonDynamoDBBaseOptions {
                     .intType()
                     .defaultValue(25)
                     .withDescription("The batch size of Amazon DynamoDB");
+
+    public static final Option<Integer> MAX_RETRIES =
+            Options.key("max_retries")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription("Maximum number of retries for batch 
write requests");
+
+    public static final Option<Long> RETRY_BASE_DELAY_MS =
+            Options.key("retry_base_delay_ms")
+                    .longType()
+                    .defaultValue(100L)
+                    .withDescription("Base delay in milliseconds for 
exponential backoff");
+
+    public static final Option<Long> RETRY_MAX_DELAY_MS =
+            Options.key("retry_max_delay_ms")
+                    .longType()
+                    .defaultValue(5000L)
+                    .withDescription("Maximum delay in milliseconds for 
exponential backoff");
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
index b9a27a7c29..9da22a3ace 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSink.java
@@ -18,16 +18,17 @@
 package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
 
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
-import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 
 import java.io.IOException;
 import java.util.Optional;
 
-public class AmazonDynamoDBSink extends AbstractSimpleSink<SeaTunnelRow, Void> 
{
+public class AmazonDynamoDBSink extends AbstractSimpleSink<SeaTunnelRow, Void>
+        implements SupportMultiTableSink {
 
     private CatalogTable catalogTable;
 
@@ -50,8 +51,7 @@ public class AmazonDynamoDBSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
     }
 
     @Override
-    public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context)
-            throws IOException {
-        return new AmazonDynamoDBWriter(amazondynamodbConfig, 
catalogTable.getSeaTunnelRowType());
+    public AmazonDynamoDBWriter createWriter(SinkWriter.Context context) 
throws IOException {
+        return new AmazonDynamoDBWriter(amazondynamodbConfig, catalogTable);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
index 715da7c1b1..6dcafdc8d3 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBSinkFactory.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.options.SinkConnectorCommonOptions;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -28,7 +29,10 @@ import com.google.auto.service.AutoService;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.ACCESS_KEY_ID;
 import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.MAX_RETRIES;
 import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.REGION;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.RETRY_BASE_DELAY_MS;
+import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.RETRY_MAX_DELAY_MS;
 import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.SECRET_ACCESS_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.TABLE;
 import static 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBSinkOptions.URL;
@@ -44,7 +48,12 @@ public class AmazonDynamoDBSinkFactory implements 
TableSinkFactory {
     public OptionRule optionRule() {
         return OptionRule.builder()
                 .required(URL, REGION, ACCESS_KEY_ID, SECRET_ACCESS_KEY, TABLE)
-                .optional(BATCH_SIZE)
+                .optional(
+                        BATCH_SIZE,
+                        SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA,
+                        MAX_RETRIES,
+                        RETRY_BASE_DELAY_MS,
+                        RETRY_MAX_DELAY_MS)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
index f7e39b11a2..8badb2971b 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/AmazonDynamoDBWriter.java
@@ -17,6 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
 
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
@@ -27,20 +31,46 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import java.io.IOException;
 import java.util.Optional;
 
-public class AmazonDynamoDBWriter extends AbstractSinkWriter<SeaTunnelRow, 
Void> {
+public class AmazonDynamoDBWriter extends AbstractSinkWriter<SeaTunnelRow, 
Void>
+        implements SupportMultiTableSinkWriter<Void> {
 
     private final DynamoDbSinkClient dynamoDbSinkClient;
     private final SeaTunnelRowSerializer serializer;
+    private final AmazonDynamoDBConfig amazondynamodbConfig;
+
+    public AmazonDynamoDBWriter(
+            AmazonDynamoDBConfig amazondynamodbConfig,
+            CatalogTable catalogTable,
+            DynamoDbSinkClient dynamoDbSinkClient) {
+        this.amazondynamodbConfig = amazondynamodbConfig;
+        this.dynamoDbSinkClient = dynamoDbSinkClient;
+
+        SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+        this.serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType, 
amazondynamodbConfig);
+    }
 
     public AmazonDynamoDBWriter(
-            AmazonDynamoDBConfig amazondynamodbConfig, SeaTunnelRowType 
seaTunnelRowType) {
+            AmazonDynamoDBConfig amazondynamodbConfig, CatalogTable 
catalogTable) {
+
+        this.amazondynamodbConfig = amazondynamodbConfig;
+
+        SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+
         dynamoDbSinkClient = new DynamoDbSinkClient(amazondynamodbConfig);
         serializer = new DefaultSeaTunnelRowSerializer(seaTunnelRowType, 
amazondynamodbConfig);
     }
 
     @Override
     public void write(SeaTunnelRow element) throws IOException {
-        dynamoDbSinkClient.write(serializer.serialize(element));
+        // In multi-table pipelines, row.tableId identifies the target table.
+        // Falls back to the configured table name for single-table usage.
+        String tableName = element.getTableId();
+
+        if (StringUtils.isEmpty(tableName)) {
+            tableName = amazondynamodbConfig.getTable();
+        }
+
+        dynamoDbSinkClient.write(serializer.serialize(element), tableName);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
index 29ac8b7d4f..4f11b7dadb 100644
--- 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/sink/DynamoDbSinkClient.java
@@ -19,11 +19,13 @@ package 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink;
 
 import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.config.AmazonDynamoDBConfig;
 
+import lombok.extern.slf4j.Slf4j;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
 import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
+import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
 import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
 import software.amazon.awssdk.services.dynamodb.model.PutRequest;
 import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
@@ -34,15 +36,25 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+@Slf4j
 public class DynamoDbSinkClient {
     private final AmazonDynamoDBConfig amazondynamodbConfig;
     private volatile boolean initialize;
     private DynamoDbClient dynamoDbClient;
-    private final List<WriteRequest> batchList;
+    private final Map<String, List<WriteRequest>> batchListByTable;
+    private final Object lock = new Object();
+
+    public DynamoDbSinkClient(
+            AmazonDynamoDBConfig amazondynamodbConfig, DynamoDbClient 
dynamoDbClient) {
+        this.amazondynamodbConfig = amazondynamodbConfig;
+        this.dynamoDbClient = dynamoDbClient;
+        this.batchListByTable = new HashMap<>();
+        this.initialize = true;
+    }
 
     public DynamoDbSinkClient(AmazonDynamoDBConfig amazondynamodbConfig) {
         this.amazondynamodbConfig = amazondynamodbConfig;
-        this.batchList = new ArrayList<>();
+        this.batchListByTable = new HashMap<>();
     }
 
     private void tryInit() {
@@ -64,34 +76,126 @@ public class DynamoDbSinkClient {
         initialize = true;
     }
 
-    public synchronized void write(PutItemRequest putItemRequest) {
-        tryInit();
-        batchList.add(
-                WriteRequest.builder()
-                        
.putRequest(PutRequest.builder().item(putItemRequest.item()).build())
-                        .build());
-        if (amazondynamodbConfig.getBatchSize() > 0
-                && batchList.size() >= amazondynamodbConfig.getBatchSize()) {
-            flush();
+    public void write(PutItemRequest putItemRequest, String tableName) {
+        List<WriteRequest> toFlush = null;
+
+        synchronized (lock) {
+            tryInit();
+
+            batchListByTable.computeIfAbsent(tableName, k -> new 
ArrayList<>());
+            batchListByTable
+                    .get(tableName)
+                    .add(
+                            WriteRequest.builder()
+                                    .putRequest(
+                                            PutRequest.builder()
+                                                    
.item(putItemRequest.item())
+                                                    .build())
+                                    .build());
+
+            if (amazondynamodbConfig.getBatchSize() > 0
+                    && batchListByTable.get(tableName).size()
+                            >= amazondynamodbConfig.getBatchSize()) {
+                // Copy batch and remove from map inside lock (fast)
+                toFlush = new ArrayList<>(batchListByTable.get(tableName));
+                batchListByTable.remove(tableName);
+            }
+        }
+
+        // Execute network I/O outside lock (other threads can continue)
+        if (toFlush != null) {
+            flushTable(tableName, toFlush);
         }
     }
 
-    public synchronized void close() {
-        if (dynamoDbClient != null) {
-            flush();
-            dynamoDbClient.close();
+    public void close() {
+        flush();
+        synchronized (lock) {
+            if (dynamoDbClient != null) {
+                dynamoDbClient.close();
+            }
         }
     }
 
-    synchronized void flush() {
-        if (batchList.isEmpty()) {
-            return;
+    void flush() {
+        Map<String, List<WriteRequest>> batchToFlush = new HashMap<>();
+
+        synchronized (lock) {
+            if (dynamoDbClient == null || batchListByTable.isEmpty()) {
+                return;
+            }
+            batchToFlush.putAll(batchListByTable);
+            batchListByTable.clear();
+        }
+
+        for (Map.Entry<String, List<WriteRequest>> entry : 
batchToFlush.entrySet()) {
+            flushTable(entry.getKey(), entry.getValue());
+        }
+    }
+
+    private void flushTable(String tableName, List<WriteRequest> requests) {
+        if (!requests.isEmpty()) {
+            flushWithRetry(tableName, requests);
         }
-        Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
-        requestItems.put(amazondynamodbConfig.getTable(), batchList);
-        dynamoDbClient.batchWriteItem(
-                
BatchWriteItemRequest.builder().requestItems(requestItems).build());
+    }
+
+    private void flushWithRetry(String tableName, List<WriteRequest> requests) 
{
+        List<WriteRequest> pendingRequests = new ArrayList<>(requests);
+
+        int maxRetries = amazondynamodbConfig.getMaxRetries();
+        long baseDelayMs = amazondynamodbConfig.getRetryBaseDelayMs();
+        long maxDelayMs = amazondynamodbConfig.getRetryMaxDelayMs();
+
+        int retryCount = 0;
+
+        while (!pendingRequests.isEmpty() && retryCount <= maxRetries) {
+            Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
+            requestItems.put(tableName, pendingRequests);
+
+            BatchWriteItemResponse response =
+                    dynamoDbClient.batchWriteItem(
+                            
BatchWriteItemRequest.builder().requestItems(requestItems).build());
+
+            Map<String, List<WriteRequest>> unprocessedKeys = 
response.unprocessedItems();
+            pendingRequests = unprocessedKeys.getOrDefault(tableName, new 
ArrayList<>());
+
+            if (!pendingRequests.isEmpty()) {
+                retryCount++;
 
-        batchList.clear();
+                long delay = Math.min(baseDelayMs * (1L << retryCount), 
maxDelayMs);
+
+                long jitter = (long) (delay * Math.random() * 0.5);
+                delay += jitter;
+
+                log.warn(
+                        "Retrying batch write to table '{}': attempt {}/{}, "
+                                + "{} unprocessed items remaining, retrying in 
{} ms",
+                        tableName,
+                        retryCount,
+                        maxRetries,
+                        pendingRequests.size(),
+                        delay);
+
+                try {
+                    Thread.sleep(delay);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException("Interrupted during retry", e);
+                }
+            }
+        }
+
+        if (!pendingRequests.isEmpty()) {
+            log.error(
+                    "Failed to write {} items to table '{}' after {} retries",
+                    pendingRequests.size(),
+                    tableName,
+                    maxRetries);
+
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to write %d items to table %s after %d 
retries",
+                            pendingRequests.size(), tableName, maxRetries));
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-amazondynamodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/AmazonDynamoDBMultiTableSinkTest.java
 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/AmazonDynamoDBMultiTableSinkTest.java
new file mode 100644
index 0000000000..055b415481
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-amazondynamodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/amazondynamodb/AmazonDynamoDBMultiTableSinkTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.amazondynamodb;
+
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink.AmazonDynamoDBSink;
+import 
org.apache.seatunnel.connectors.seatunnel.amazondynamodb.sink.AmazonDynamoDBWriter;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class AmazonDynamoDBMultiTableSinkTest {
+
+    @Test
+    public void testSinkImplementsMultiTableSinkInterface() {
+        Assertions.assertTrue(
+                
SupportMultiTableSink.class.isAssignableFrom(AmazonDynamoDBSink.class),
+                "AmazonDynamoDBSink must implement SupportMultiTableSink");
+    }
+
+    @Test
+    public void testWriterImplementsMultiTableSinkWriterInterface() {
+        Assertions.assertTrue(
+                
SupportMultiTableSinkWriter.class.isAssignableFrom(AmazonDynamoDBWriter.class),
+                "AmazonDynamoDBWriter must implement 
SupportMultiTableSinkWriter");
+    }
+}

Reply via email to