davidzollo commented on PR #10497:
URL: https://github.com/apache/seatunnel/pull/10497#issuecomment-3904249873

   Good job.
   
   The overall design follows the standard SeaTunnel pattern by implementing 
`SupportMultiTableSinkWriter`. However, I found **critical concurrency issues** 
and **reliability concerns** that must be addressed before merging.
   
   
   ### 1. Concurrency Bug: Mismatched Locks causing Crash
   
   In `DynamoDbSinkClient.java`, the `write` method synchronizes on a specific 
`lock` object, while the `flush` method is declared `synchronized` (which locks 
on `this` instance).
   
   ```java
   // Uses 'lock' object
   public void write(PutItemRequest putItemRequest, String tableName) {
       synchronized (lock) {
           // ... modifies batchListByTable (HashMap)
       }
   }
   
   // Uses 'this' instance
   synchronized void flush() {
       // ... iterates over batchListByTable
   }
   ```
   
   **Impact**:
   - `write` and `flush` can execute concurrently on different threads (Stream 
thread vs Checkpoint thread).
   - Because `batchListByTable` is a `HashMap` (not thread-safe), concurrent 
modification during iteration (`flush`) will throw 
**`ConcurrentModificationException`** and crash the job during checkpoints.
   
   **Fix**: Ensure both methods synchronize on the same object (specifically 
`lock`).
   ```java
   // Remove 'synchronized' keyword from method signature and use block
   public void flush() {
       synchronized (lock) {
           // implementation
       }
   }
   ```
   
   ### 2. Weak Retry Strategy for Throttling
   
   The current retry logic in `flushWithRetry` is insufficient for production 
workloads, especially given DynamoDB's strict throughput limits.
   
   ```java
   int maxRetries = 3;
   // ...
   Thread.sleep(100 * retryCount);
   ```
   
   **Impact**:
   - Only ~600ms total wait time across 3 retries (100 + 200 + 300).
   - No jitter, leading to "thundering herd" problems if multiple tasks retry 
simultaneously.
   - High risk of `RuntimeException` ("Failed to write ... items") under 
backpressure, causing job failure.
   
   **Suggestion**:
   - Increase `maxRetries` significantly (e.g., 10-15).
   - Use exponential backoff with jitter (e.g., start at 100ms, max wait 2-5s 
per retry).
   - Consider making retry parameters configurable via `AmazonDynamoDBConfig`.
   
   ## Logic Implementation Correctness
   
   ### 1. NPE Handling in Writer (Verified)
   The `AmazonDynamoDBWriter` correctly handles empty table identifiers:
   ```java
   String tableName = element.getTableId();
   if (StringUtils.isEmpty(tableName)) {
       tableName = amazondynamodbConfig.getTable();
   }
   ```
   This is robust and safely falls back to the default configured table, 
ensuring backward compatibility for single-table jobs.
   
   ### 2. Batch Flush Logic (Verified)
   The refactored `write` method correctly moves the network I/O outside the 
synchronized block:
   ```java
   synchronized (lock) {
       // ... adds to buffer ...
       if (batchSizeReached) {
           toFlush = new ArrayList<>(batchListByTable.get(tableName));
           batchListByTable.remove(tableName);
       }
   }
   if (toFlush != null) {
       // Correctly executed outside lock
       flushTableAsync(tableName, toFlush);
   }
   ```
   This reduces lock contention significantly.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to