DanielCarter-stack commented on PR #10497:
URL: https://github.com/apache/seatunnel/pull/10497#issuecomment-3902939303
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10497", "part": 1,
"total": 1} -->
### Issue 1: Missing null validation leads to NPE risk
**Location**: `AmazonDynamoDBWriter.java:48-49`
**Modified code**:
```java
public void write(SeaTunnelRow element) throws IOException {
String tableName = element.getTableId();
dynamoDbSinkClient.write(serializer.serialize(element), tableName);
}
```
**Related context**:
- Parent class/interface: `AbstractSinkWriter.java`
(seatunnel-connectors-v2/connector-common)
- Interface: `SupportMultiTableSinkWriter.java` (seatunnel-api)
- SeaTunnelRow definition: `SeaTunnelRow.java:31` default `private String
tableId = ""`
**Problem description**:
When `SeaTunnelRow.getTableId()` returns an empty string or null
(single-table scenario or CDC doesn't set tableId), the code directly passes
the empty string to `DynamoDbSinkClient.write()`. Although the AWS SDK will
reject empty table names and throw an exception, this results in a runtime
error rather than graceful degradation.
**Potential risks**:
- Risk 1: In single-table scenarios, when users don't configure multiple
tables, `tableId` is an empty string, causing task failures
- Risk 2: Backward compatibility breakage: original single-table jobs may
not work properly
**Impact scope**:
- Direct impact: `AmazonDynamoDBWriter.write()` method
- Indirect impact: All jobs using DynamoDB Sink (single-table and
multi-table)
- Impact area: Single Connector
**Severity**: MAJOR
**Improvement suggestion**:
```java
public void write(SeaTunnelRow element) throws IOException {
String tableName = element.getTableId();
// Fallback to configured table name (single table compatibility)
if (StringUtils.isEmpty(tableName)) {
tableName = catalogTable.getTableId().toTablePath().getTableName();
}
dynamoDbSinkClient.write(serializer.serialize(element), tableName);
}
```
Import needs to be added:
```java
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
```
**Rationale**:
Referencing the handling approach in `AssertSinkWriter`, when `tableId` is
empty, it should fall back to the table name configured in `CatalogTable` to
ensure backward compatibility.
---
### Issue 2: Batch size counted per table, logic has flaws
**Location**: `DynamoDbSinkClient.java:78-80`
**Modified code**:
```java
if (amazondynamodbConfig.getBatchSize() > 0
&& batchListByTable.get(tableName).size() >=
amazondynamodbConfig.getBatchSize()) {
flush();
}
```
**Original code** (dev branch):
```java
if (amazondynamodbConfig.getBatchSize() > 0
&& batchList.size() >= amazondynamodbConfig.getBatchSize()) {
flush();
}
```
**Related context**:
- Caller: `AmazonDynamoDBWriter.write()`
- AWS API: `BatchWriteItemRequest` maximum 25 operations per request
**Problem description**:
Current logic is "when a single table's batch reaches the threshold, trigger
global flush". This means:
1. Table A has 25 records, triggers flush
2. Table B only has 3 records, will also be written out
3. Table B loses batch optimization opportunity
**Potential risks**:
- Risk 1: High-frequency tables trigger frequent global flushes, reducing
overall throughput
- Risk 2: Low-frequency tables' batch sizes cannot reach user-configured
thresholds
**Impact scope**:
- Direct impact: `DynamoDbSinkClient` batch logic
- Indirect impact: All jobs using batch writes
- Impact area: Single Connector
**Severity**: MINOR
**Improvement suggestion**:
```java
public synchronized void write(PutItemRequest putItemRequest, String
tableName) {
tryInit();
batchListByTable.computeIfAbsent(tableName, k -> new ArrayList<>());
batchListByTable.get(tableName).add(...);
// Only flush the current table
if (amazondynamodbConfig.getBatchSize() > 0
&& batchListByTable.get(tableName).size() >=
amazondynamodbConfig.getBatchSize()) {
flushTable(tableName); // New method
}
}
private void flushTable(String tableName) {
List<WriteRequest> requests = batchListByTable.get(tableName);
if (requests != null && !requests.isEmpty()) {
Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
requestItems.put(tableName, requests);
dynamoDbClient.batchWriteItem(
BatchWriteItemRequest.builder().requestItems(requestItems).build());
batchListByTable.remove(tableName); // Only remove flushed tables
}
}
```
**Rationale**:
Change global flush to per-table flush to avoid high-frequency tables
affecting batch optimization of low-frequency tables.
---
### Issue 3: Concurrency safety issues with synchronized methods
**Location**: `DynamoDbSinkClient.java:67, 91`
**Modified code**:
```java
public synchronized void write(PutItemRequest putItemRequest, String
tableName) {
tryInit();
batchListByTable.computeIfAbsent(tableName, k -> new ArrayList<>());
batchListByTable.get(tableName).add(...);
if (...)
flush(); // Network I/O inside lock
}
synchronized void flush() {
for (Map.Entry<String, List<WriteRequest>> entry :
batchListByTable.entrySet()) {
// ...
dynamoDbClient.batchWriteItem(...); // AWS API call
}
batchListByTable.clear();
}
```
**Related context**:
- Parent class: `AbstractSinkWriter` (non-synchronized)
- Caller: `AmazonDynamoDBWriter.write()` (may be called by multiple threads)
- AWS SDK: `DynamoDbClient` is not thread-safe
**Problem description**:
1. `write()` method uses `synchronized`, serializing multi-thread writes
2. `flush()` performs network IO (AWS API calls) within `synchronized` block
3. During network latency (possibly 100-500ms), other threads are blocked
4. Concurrent performance severely degraded
**Potential risks**:
- Risk 1: In high-concurrency scenarios, throughput limited by network
latency
- Risk 2: Multi-core CPUs cannot write in parallel
**Impact scope**:
- Direct impact: `DynamoDbSinkClient` concurrent performance
- Indirect impact: All high-throughput jobs
- Impact area: Single Connector
**Severity**: MAJOR
**Improvement suggestion**:
```java
private final Object lock = new Object();
private final Map<String, List<WriteRequest>> batchListByTable;
public void write(PutItemRequest putItemRequest, String tableName) {
synchronized (lock) {
tryInit();
batchListByTable.computeIfAbsent(tableName, k -> new ArrayList<>());
batchListByTable.get(tableName).add(...);
if (amazondynamodbConfig.getBatchSize() > 0
&& batchListByTable.get(tableName).size() >=
amazondynamodbConfig.getBatchSize()) {
// Copy current table batch
List<WriteRequest> toFlush = new
ArrayList<>(batchListByTable.get(tableName));
batchListByTable.get(tableName).clear();
// Execute network I/O outside lock
flushAsync(tableName, toFlush);
}
}
}
private void flushAsync(String tableName, List<WriteRequest> requests) {
try {
Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
requestItems.put(tableName, requests);
dynamoDbClient.batchWriteItem(
BatchWriteItemRequest.builder().requestItems(requestItems).build());
} catch (Exception e) {
// Handle exception and retry
log.error("Failed to flush table: {}", tableName, e);
}
}
```
**Rationale**:
Move network IO outside synchronized block, use fine-grained locks to
protect shared state, improving concurrent performance.
---
### Issue 4: Unprocessed items returned by AWS API not handled
**Location**: `DynamoDbSinkClient.java:96-109`
**Modified code**:
```java
for (Map.Entry<String, List<WriteRequest>> entry :
batchListByTable.entrySet()) {
String tableName = entry.getKey();
List<WriteRequest> requests = entry.getValue();
if (!requests.isEmpty()) {
Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
requestItems.put(tableName, requests);
dynamoDbClient.batchWriteItem(
BatchWriteItemRequest.builder().requestItems(requestItems).build());
// Missing handling of return value
}
}
batchListByTable.clear(); // Clear directly, assuming all succeeded
```
**Related context**:
- AWS SDK: `BatchWriteItemResponse.getUnprocessedKeys()` returns failed items
- AWS documentation: Unprocessed items must be manually retried
**Problem description**:
AWS DynamoDB `batchWriteItem` API has the following limitations:
- Maximum 25 operations per request
- Maximum 16 MB data per request
- Table-level throughput limits
Items exceeding limits are returned in `unprocessedKeys`. Current code:
1. Does not check return value
2. Directly clears cache
3. Causes **data loss**
**Potential risks**:
- Risk 1: Data silently lost under high load or insufficient quota
- Risk 2: Cannot guarantee data integrity
**Impact scope**:
- Direct impact: `DynamoDbSinkClient.flush()` method
- Indirect impact: All data writes
- Impact area: Single Connector, data correctness
**Severity**: CRITICAL
**Improvement suggestion**:
```java
synchronized void flush() {
if (batchListByTable.isEmpty()) {
return;
}
for (Map.Entry<String, List<WriteRequest>> entry :
batchListByTable.entrySet()) {
String tableName = entry.getKey();
List<WriteRequest> requests = entry.getValue();
if (!requests.isEmpty()) {
flushWithRetry(tableName, requests);
}
}
batchListByTable.clear();
}
private void flushWithRetry(String tableName, List<WriteRequest> requests) {
List<WriteRequest> pendingRequests = new ArrayList<>(requests);
int maxRetries = 3;
int retryCount = 0;
while (!pendingRequests.isEmpty() && retryCount < maxRetries) {
Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
requestItems.put(tableName, pendingRequests);
BatchWriteItemResponse response = dynamoDbClient.batchWriteItem(
BatchWriteItemRequest.builder().requestItems(requestItems).build());
Map<String, List<WriteRequest>> unprocessedKeys =
response.unprocessedKeys();
pendingRequests = unprocessedKeys.getOrDefault(tableName,
Collections.emptyList());
if (!pendingRequests.isEmpty()) {
retryCount++;
log.warn("Table {} has {} unprocessed items, retry {}/{}",
tableName, pendingRequests.size(), retryCount,
maxRetries);
try {
Thread.sleep(100 * retryCount); // Exponential backoff
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted during retry", e);
}
}
}
if (!pendingRequests.isEmpty()) {
throw new RuntimeException(
String.format("Failed to write %d items to table %s after %d
retries",
pendingRequests.size(), tableName, maxRetries));
}
}
```
**Rationale**:
Following AWS best practices, handle `UnprocessedKeys` with exponential
backoff retry to ensure data integrity.
---
### Issue 5: Missing multi-table feature tests
**Location**: Test file directory
**Current status**:
- Existing tests: `AmazonDynamoDBSourceFactoryTest.java` (only test
configuration rules)
- Missing tests:
- Multi-table write scenario tests
- Fallback tests when `element.getTableId()` is empty
- `DynamoDbSinkClient` multi-table batch tests
- `UnprocessedKeys` retry tests
**Related context**:
- Parent class tests: `AbstractSinkWriter` test pattern
- Compared Connectors: JDBC, Hudi both have `MultiTableResourceManager` tests
**Problem description**:
PR submitter claims "Unit tests verify interface implementation", but no new
test code has actually been added.
**Potential risks**:
- Risk 1: Multi-table features cannot be automatically verified by CI/CD
- Risk 2: Multi-table logic may be broken during refactoring
**Impact scope**:
- Direct impact: Test coverage
- Indirect impact: Code quality assurance
- Impact area: Single Connector
**Severity**: MAJOR
**Improvement suggestion**:
Add new `AmazonDynamoDBMultiTableSinkTest.java`:
```java
public class AmazonDynamoDBMultiTableSinkTest {
@Test
public void testMultiTableWrite() {
// Simulate multi-table write scenario
SeaTunnelRow row1 = createRow("table1", ...);
SeaTunnelRow row2 = createRow("table2", ...);
SeaTunnelRow row3 = createRow("table1", ...);
writer.write(row1);
writer.write(row2);
writer.write(row3);
writer.prepareCommit();
// Verify both tables are written
verify(dynamoDbClient, times(1)).batchWriteItem(argThat(req ->
req.containsKey("table1") && req.containsKey("table2")
));
}
@Test
public void testEmptyTableIdFallback() {
SeaTunnelRow row = new SeaTunnelRow(new Object[0]);
row.setTableId(""); // Empty table name
writer.write(row);
// Should fallback to configured table name
verify(dynamoDbClient).write(any(), eq("configTable"));
}
}
```
**Rationale**:
Add unit tests and integration tests to verify the correctness of
multi-table logic.
---
### Issue 6: Typo (minor)
**Location**: `AmazonDynamoDBSinkFactory.java:48`
**Modified code**:
```java
.optional(BATCH_SIZE, SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
```
**Problem description**:
- `MULTI_TABLE_SINK_REPLICA` missing letter `L`, should be
`MULTI_TABLE_SINK_REPLICA`
- This is a typo in API definition (`SinkConnectorCommonOptions.java:27`)
- All Connectors are using this misspelled constant name
**Potential risks**:
- Risk 1: Reduced code readability
- Risk 2: May need compatibility fix in the future
**Impact scope**:
- Direct impact: Code readability
- Impact area: Entire project (API definition)
**Severity**: MINOR
**Improvement suggestion**:
Although this is an API-level typo, this PR does not need to fix it. Suggest
submitting a separate PR to fix:
1. Rename `MULTI_TABLE_SINK_REPLICA` to `MULTI_TABLE_SINK_REPLICA`
2. Add `@Deprecated` annotation to old constant
3. Update all Connectors
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]