DanielCarter-stack commented on PR #10497:
URL: https://github.com/apache/seatunnel/pull/10497#issuecomment-3902939303

   <!-- code-pr-reviewer -->
   <!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10497", "part": 1, 
"total": 1} -->
   ### Issue 1: Missing null validation leads to NPE risk
   
   **Location**: `AmazonDynamoDBWriter.java:48-49`
   
   **Modified code**:
   ```java
   public void write(SeaTunnelRow element) throws IOException {
       String tableName = element.getTableId();
       dynamoDbSinkClient.write(serializer.serialize(element), tableName);
   }
   ```
   
   **Related context**:
   - Parent class/interface: `AbstractSinkWriter.java` 
(seatunnel-connectors-v2/connector-common)
   - Interface: `SupportMultiTableSinkWriter.java` (seatunnel-api)
   - SeaTunnelRow definition: `SeaTunnelRow.java:31` default `private String 
tableId = ""`
   
   **Problem description**:
   When `SeaTunnelRow.getTableId()` returns an empty string or null 
(single-table scenario or CDC doesn't set tableId), the code directly passes 
the empty string to `DynamoDbSinkClient.write()`. Although the AWS SDK will 
reject empty table names and throw an exception, this results in a runtime 
error rather than graceful degradation.
   
   **Potential risks**:
   - Risk 1: In single-table scenarios, when users don't configure multiple 
tables, `tableId` is an empty string, causing task failures
   - Risk 2: Backward compatibility breakage: original single-table jobs may 
not work properly
   
   **Impact scope**:
   - Direct impact: `AmazonDynamoDBWriter.write()` method
   - Indirect impact: All jobs using DynamoDB Sink (single-table and 
multi-table)
   - Impact area: Single Connector
   
   **Severity**: MAJOR
   
   **Improvement suggestion**:
   ```java
   public void write(SeaTunnelRow element) throws IOException {
       String tableName = element.getTableId();
       
       // Fallback to configured table name (single table compatibility)
       if (StringUtils.isEmpty(tableName)) {
           tableName = catalogTable.getTableId().toTablePath().getTableName();
       }
       
       dynamoDbSinkClient.write(serializer.serialize(element), tableName);
   }
   ```
   
   Import needs to be added:
   ```java
   import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
   ```
   
   **Rationale**:
   Referencing the handling approach in `AssertSinkWriter`, when `tableId` is 
empty, it should fall back to the table name configured in `CatalogTable` to 
ensure backward compatibility.
   
   ---
   
   ### Issue 2: Batch size counted per table, logic has flaws
   
   **Location**: `DynamoDbSinkClient.java:78-80`
   
   **Modified code**:
   ```java
   if (amazondynamodbConfig.getBatchSize() > 0
           && batchListByTable.get(tableName).size() >= 
amazondynamodbConfig.getBatchSize()) {
       flush();
   }
   ```
   
   **Original code** (dev branch):
   ```java
   if (amazondynamodbConfig.getBatchSize() > 0
           && batchList.size() >= amazondynamodbConfig.getBatchSize()) {
       flush();
   }
   ```
   
   **Related context**:
   - Caller: `AmazonDynamoDBWriter.write()`
   - AWS API: `BatchWriteItemRequest` maximum 25 operations per request
   
   **Problem description**:
   Current logic is "when a single table's batch reaches the threshold, trigger 
global flush". This means:
   1. Table A has 25 records, triggers flush
   2. Table B only has 3 records, will also be written out
   3. Table B loses batch optimization opportunity
   
   **Potential risks**:
   - Risk 1: High-frequency tables trigger frequent global flushes, reducing 
overall throughput
   - Risk 2: Low-frequency tables' batch sizes cannot reach user-configured 
thresholds
   
   **Impact scope**:
   - Direct impact: `DynamoDbSinkClient` batch logic
   - Indirect impact: All jobs using batch writes
   - Impact area: Single Connector
   
   **Severity**: MINOR
   
   **Improvement suggestion**:
   ```java
   public synchronized void write(PutItemRequest putItemRequest, String 
tableName) {
       tryInit();
   
       batchListByTable.computeIfAbsent(tableName, k -> new ArrayList<>());
       batchListByTable.get(tableName).add(...);
       
       // Only flush the current table
       if (amazondynamodbConfig.getBatchSize() > 0
               && batchListByTable.get(tableName).size() >= 
amazondynamodbConfig.getBatchSize()) {
           flushTable(tableName);  // New method
       }
   }
   
   private void flushTable(String tableName) {
       List<WriteRequest> requests = batchListByTable.get(tableName);
       if (requests != null && !requests.isEmpty()) {
           Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
           requestItems.put(tableName, requests);
           dynamoDbClient.batchWriteItem(
               
BatchWriteItemRequest.builder().requestItems(requestItems).build());
           batchListByTable.remove(tableName);  // Only remove flushed tables
       }
   }
   ```
   
   **Rationale**:
   Change global flush to per-table flush to avoid high-frequency tables 
affecting batch optimization of low-frequency tables.
   
   ---
   
   ### Issue 3: Concurrency safety issues with synchronized methods
   
   **Location**: `DynamoDbSinkClient.java:67, 91`
   
   **Modified code**:
   ```java
   public synchronized void write(PutItemRequest putItemRequest, String 
tableName) {
       tryInit();
       batchListByTable.computeIfAbsent(tableName, k -> new ArrayList<>());
       batchListByTable.get(tableName).add(...);
       if (...)
           flush();  // Network I/O inside lock
   }
   
   synchronized void flush() {
       for (Map.Entry<String, List<WriteRequest>> entry : 
batchListByTable.entrySet()) {
           // ...
           dynamoDbClient.batchWriteItem(...);  // AWS API call
       }
       batchListByTable.clear();
   }
   ```
   
   **Related context**:
   - Parent class: `AbstractSinkWriter` (non-synchronized)
   - Caller: `AmazonDynamoDBWriter.write()` (may be called by multiple threads)
   - AWS SDK: `DynamoDbClient` is not thread-safe
   
   **Problem description**:
   1. `write()` method uses `synchronized`, serializing multi-thread writes
   2. `flush()` performs network IO (AWS API calls) within `synchronized` block
   3. During network latency (possibly 100-500ms), other threads are blocked
   4. Concurrent performance severely degraded
   
   **Potential risks**:
   - Risk 1: In high-concurrency scenarios, throughput limited by network 
latency
   - Risk 2: Multi-core CPUs cannot write in parallel
   
   **Impact scope**:
   - Direct impact: `DynamoDbSinkClient` concurrent performance
   - Indirect impact: All high-throughput jobs
   - Impact area: Single Connector
   
   **Severity**: MAJOR
   
   **Improvement suggestion**:
   ```java
   private final Object lock = new Object();
   private final Map<String, List<WriteRequest>> batchListByTable;
   
   public void write(PutItemRequest putItemRequest, String tableName) {
       synchronized (lock) {
           tryInit();
           batchListByTable.computeIfAbsent(tableName, k -> new ArrayList<>());
           batchListByTable.get(tableName).add(...);
           
           if (amazondynamodbConfig.getBatchSize() > 0
                   && batchListByTable.get(tableName).size() >= 
amazondynamodbConfig.getBatchSize()) {
               // Copy current table batch
               List<WriteRequest> toFlush = new 
ArrayList<>(batchListByTable.get(tableName));
               batchListByTable.get(tableName).clear();
               
               // Execute network I/O outside lock
               flushAsync(tableName, toFlush);
           }
       }
   }
   
   private void flushAsync(String tableName, List<WriteRequest> requests) {
       try {
           Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
           requestItems.put(tableName, requests);
           dynamoDbClient.batchWriteItem(
               
BatchWriteItemRequest.builder().requestItems(requestItems).build());
       } catch (Exception e) {
           // Handle exception and retry
           log.error("Failed to flush table: {}", tableName, e);
       }
   }
   ```
   
   **Rationale**:
   Move network IO outside synchronized block, use fine-grained locks to 
protect shared state, improving concurrent performance.
   
   ---
   
   ### Issue 4: Unprocessed items returned by AWS API not handled
   
   **Location**: `DynamoDbSinkClient.java:96-109`
   
   **Modified code**:
   ```java
   for (Map.Entry<String, List<WriteRequest>> entry : 
batchListByTable.entrySet()) {
       String tableName = entry.getKey();
       List<WriteRequest> requests = entry.getValue();
   
       if (!requests.isEmpty()) {
           Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
           requestItems.put(tableName, requests);
           dynamoDbClient.batchWriteItem(
               
BatchWriteItemRequest.builder().requestItems(requestItems).build());
           // Missing handling of return value
       }
   }
   
   batchListByTable.clear();  // Clear directly, assuming all succeeded
   ```
   
   **Related context**:
   - AWS SDK: `BatchWriteItemResponse.getUnprocessedKeys()` returns failed items
   - AWS documentation: Unprocessed items must be manually retried
   
   **Problem description**:
   AWS DynamoDB `batchWriteItem` API has the following limitations:
   - Maximum 25 operations per request
   - Maximum 16 MB data per request
   - Table-level throughput limits
   
   Items exceeding limits are returned in `unprocessedKeys`. Current code:
   1. Does not check return value
   2. Directly clears cache
   3. Causes **data loss**
   
   **Potential risks**:
   - Risk 1: Data silently lost under high load or insufficient quota
   - Risk 2: Cannot guarantee data integrity
   
   **Impact scope**:
   - Direct impact: `DynamoDbSinkClient.flush()` method
   - Indirect impact: All data writes
   - Impact area: Single Connector, data correctness
   
   **Severity**: CRITICAL
   
   **Improvement suggestion**:
   ```java
   synchronized void flush() {
       if (batchListByTable.isEmpty()) {
           return;
       }
   
       for (Map.Entry<String, List<WriteRequest>> entry : 
batchListByTable.entrySet()) {
           String tableName = entry.getKey();
           List<WriteRequest> requests = entry.getValue();
   
           if (!requests.isEmpty()) {
               flushWithRetry(tableName, requests);
           }
       }
   
       batchListByTable.clear();
   }
   
   private void flushWithRetry(String tableName, List<WriteRequest> requests) {
       List<WriteRequest> pendingRequests = new ArrayList<>(requests);
       int maxRetries = 3;
       int retryCount = 0;
       
       while (!pendingRequests.isEmpty() && retryCount < maxRetries) {
           Map<String, List<WriteRequest>> requestItems = new HashMap<>(1);
           requestItems.put(tableName, pendingRequests);
           
           BatchWriteItemResponse response = dynamoDbClient.batchWriteItem(
               
BatchWriteItemRequest.builder().requestItems(requestItems).build());
           
           Map<String, List<WriteRequest>> unprocessedKeys = 
response.unprocessedKeys();
           pendingRequests = unprocessedKeys.getOrDefault(tableName, 
Collections.emptyList());
           
           if (!pendingRequests.isEmpty()) {
               retryCount++;
               log.warn("Table {} has {} unprocessed items, retry {}/{}", 
                        tableName, pendingRequests.size(), retryCount, 
maxRetries);
               
               try {
                   Thread.sleep(100 * retryCount);  // Exponential backoff
               } catch (InterruptedException e) {
                   Thread.currentThread().interrupt();
                   throw new RuntimeException("Interrupted during retry", e);
               }
           }
       }
       
       if (!pendingRequests.isEmpty()) {
           throw new RuntimeException(
               String.format("Failed to write %d items to table %s after %d 
retries", 
                            pendingRequests.size(), tableName, maxRetries));
       }
   }
   ```
   
   **Rationale**:
   Following AWS best practices, handle `UnprocessedKeys` with exponential 
backoff retry to ensure data integrity.
   
   ---
   
   ### Issue 5: Missing multi-table feature tests
   
   **Location**: Test file directory
   
   **Current status**:
   - Existing tests: `AmazonDynamoDBSourceFactoryTest.java` (only test 
configuration rules)
   - Missing tests:
     - Multi-table write scenario tests
     - Fallback tests when `element.getTableId()` is empty
     - `DynamoDbSinkClient` multi-table batch tests
     - `UnprocessedKeys` retry tests
   
   **Related context**:
   - Parent class tests: `AbstractSinkWriter` test pattern
   - Compared Connectors: JDBC, Hudi both have `MultiTableResourceManager` tests
   
   **Problem description**:
   PR submitter claims "Unit tests verify interface implementation", but no new 
test code has actually been added.
   
   **Potential risks**:
   - Risk 1: Multi-table features cannot be automatically verified by CI/CD
   - Risk 2: Multi-table logic may be broken during refactoring
   
   **Impact scope**:
   - Direct impact: Test coverage
   - Indirect impact: Code quality assurance
   - Impact area: Single Connector
   
   **Severity**: MAJOR
   
   **Improvement suggestion**:
   Add new `AmazonDynamoDBMultiTableSinkTest.java`:
   ```java
   public class AmazonDynamoDBMultiTableSinkTest {
       
       @Test
       public void testMultiTableWrite() {
           // Simulate multi-table write scenario
           SeaTunnelRow row1 = createRow("table1", ...);
           SeaTunnelRow row2 = createRow("table2", ...);
           SeaTunnelRow row3 = createRow("table1", ...);
           
           writer.write(row1);
           writer.write(row2);
           writer.write(row3);
           
           writer.prepareCommit();
           
           // Verify both tables are written
           verify(dynamoDbClient, times(1)).batchWriteItem(argThat(req -> 
               req.containsKey("table1") && req.containsKey("table2")
           ));
       }
       
       @Test
       public void testEmptyTableIdFallback() {
           SeaTunnelRow row = new SeaTunnelRow(new Object[0]);
           row.setTableId("");  // Empty table name
           
           writer.write(row);
           
           // Should fallback to configured table name
           verify(dynamoDbClient).write(any(), eq("configTable"));
       }
   }
   ```
   
   **Rationale**:
   Add unit tests and integration tests to verify the correctness of 
multi-table logic.
   
   ---
   
   ### Issue 6: Typo (minor)
   
   **Location**: `AmazonDynamoDBSinkFactory.java:48`
   
   **Modified code**:
   ```java
   .optional(BATCH_SIZE, SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
   ```
   
   **Problem description**:
   - `MULTI_TABLE_SINK_REPLICA` missing letter `L`, should be 
`MULTI_TABLE_SINK_REPLICA`
   - This is a typo in API definition (`SinkConnectorCommonOptions.java:27`)
   - All Connectors are using this misspelled constant name
   
   **Potential risks**:
   - Risk 1: Reduced code readability
   - Risk 2: May need compatibility fix in the future
   
   **Impact scope**:
   - Direct impact: Code readability
   - Impact area: Entire project (API definition)
   
   **Severity**: MINOR
   
   **Improvement suggestion**:
   Although this is an API-level typo, this PR does not need to fix it. Suggest 
submitting a separate PR to fix:
   1. Rename `MULTI_TABLE_SINK_REPLICA` to `MULTI_TABLE_SINK_REPLICA`
   2. Add `@Deprecated` annotation to old constant
   3. Update all Connectors
   
   ---


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to