davidzollo commented on PR #10497:
URL: https://github.com/apache/seatunnel/pull/10497#issuecomment-3904249873
Good job.
The overall design follows the standard SeaTunnel pattern by implementing
`SupportMultiTableSinkWriter`. However, I found **critical concurrency issues**
and **reliability concerns** that must be addressed before merging.
### 1. Concurrency Bug: Mismatched Locks causing Crash
In `DynamoDbSinkClient.java`, the `write` method synchronizes on a specific
`lock` object, while the `flush` method is declared `synchronized` (which locks
on `this` instance).
```java
// Uses 'lock' object
public void write(PutItemRequest putItemRequest, String tableName) {
synchronized (lock) {
// ... modifies batchListByTable (HashMap)
}
}
// Uses 'this' instance
synchronized void flush() {
// ... iterates over batchListByTable
}
```
**Impact**:
- `write` and `flush` can execute concurrently on different threads (Stream
thread vs Checkpoint thread).
- Because `batchListByTable` is a `HashMap` (not thread-safe), concurrent
modification during iteration (`flush`) will throw
**`ConcurrentModificationException`** and crash the job during checkpoints.
**Fix**: Ensure both methods synchronize on the same object (specifically
`lock`).
```java
// Remove 'synchronized' keyword from method signature and use block
public void flush() {
synchronized (lock) {
// implementation
}
}
```
### 2. Weak Retry Strategy for Throttling
The current retry logic in `flushWithRetry` is insufficient for production
workloads, especially given DynamoDB's strict throughput limits.
```java
int maxRetries = 3;
// ...
Thread.sleep(100 * retryCount);
```
**Impact**:
- Only ~600ms total wait time across 3 retries (100 + 200 + 300).
- No jitter, leading to "thundering herd" problems if multiple tasks retry
simultaneously.
- High risk of `RuntimeException` ("Failed to write ... items") under
backpressure, causing job failure.
**Suggestion**:
- Increase `maxRetries` significantly (e.g., 10-15).
- Use exponential backoff with jitter (e.g., start at 100ms, max wait 2-5s
per retry).
- Consider making retry parameters configurable via `AmazonDynamoDBConfig`.
## Logic Implementation Correctness
### 1. NPE Handling in Writer (Verified)
The `AmazonDynamoDBWriter` correctly handles empty table identifiers:
```java
String tableName = element.getTableId();
if (StringUtils.isEmpty(tableName)) {
tableName = amazondynamodbConfig.getTable();
}
```
This is robust and safely falls back to the default configured table,
ensuring backward compatibility for single-table jobs.
### 2. Batch Flush Logic (Verified)
The refactored `write` method correctly moves the network I/O outside the
synchronized block:
```java
synchronized (lock) {
// ... adds to buffer ...
if (batchSizeReached) {
toFlush = new ArrayList<>(batchListByTable.get(tableName));
batchListByTable.remove(tableName);
}
}
if (toFlush != null) {
// Correctly executed outside lock
flushTableAsync(tableName, toFlush);
}
```
This reduces lock contention significantly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]