DanielCarter-stack commented on PR #10558:
URL: https://github.com/apache/seatunnel/pull/10558#issuecomment-3999110144

   <!-- code-pr-reviewer -->
   <!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10558", "part": 1, 
"total": 1} -->
   ### Issue 1: Exceptions thrown in whenComplete callback are ineffective
   
   **Location**: `PulsarSinkWriter.java:91-99`
   
   ```java
   future.whenComplete(
       (id, ex) -> {
           pendingMessages.decrementAndGet();
           if (ex != null) {
               throw new PulsarConnectorException(
                       PulsarConnectorErrorCode.SEND_MESSAGE_FAILED,
                       "Send message failed");
           }
       });
   ```
   
   **Related Context**:
   - Async callbacks execute in Pulsar client thread pool
   - Exceptions thrown in callbacks are not caught by the main thread
   
   **Problem Description**:
   `CompletableFuture.whenComplete()` executes in an async thread, where thrown 
exceptions are swallowed by the `CompletableFuture` framework (typically only 
logged to stderr) and do not propagate to the calling thread. This causes 
message send failures to:
   1. Go unnoticed by users (exceptions are swallowed)
   2. `pendingMessages` still gets decremented
   3. Data consistency cannot be guaranteed
   
   This is an issue that existed in the original code, but was not fixed in 
this refactoring.
   
   **Potential Risks**:
   - Silent loss of failed message sends
   - Violation of at-least-once semantics (AT_LEAST_ONCE)
   - Difficult to troubleshoot in production environments
   
   **Impact Scope**:
   - Direct impact: Exception handling in `PulsarSinkWriter.write()`
   - Indirect impact: All scenarios using EXACTLY_ONCE or AT_LEAST_ONCE 
semantics
   - Affected scope: All Pulsar Connector users
   
   **Severity**: MAJOR
   
   **Improvement Suggestions**:
   ```java
   future.whenComplete(
       (id, ex) -> {
           pendingMessages.decrementAndGet();
           if (ex != null) {
               // Log errors instead of throwing exceptions
               log.error("Failed to send message to topic: {}", topic, ex);
               // Or use a custom error handling mechanism
               errorHandler.handleSendFailure(topic, element, ex);
           }
       });
   ```
   
   ---
   
   ### Issue 2: Removal of Apache License header
   
   **Locations**:
   - `PulsarSink.java:1-17` (original) → removed in new code
   - `PulsarSinkWriter.java:1-33` (original) → removed in new code
   
   **Problem Description**:
   All modified Java files have had their Apache License headers removed, 
including:
   - `/* Licensed to the Apache Software Foundation... */`
   - Copyright notices
   - LICENSE file references
   
   This violates Apache project guidelines.
   
   **Potential Risks**:
   - Legal compliance risk
   - Cannot pass Apache PMC IP review
   - Affects project compliance
   
   **Impact Scope**:
   - Direct impact: All modified Java files
   - Affected scope: Project compliance
   
   **Severity**: BLOCKER (Must fix)
   
   **Improvement Suggestions**:
   Restore Apache License headers for all files.
   
   ---
   
   ### Issue 3: Removal of PARTITION_KEY_FIELDS configuration causes breaking 
change
   
   **Locations**:
   - `PulsarSinkFactory.java:39-41` (original optionRule included 
`PARTITION_KEY_FIELDS`)
   - `PulsarSinkWriter.java` (removed `getPartitionKeyFields()`, 
`createKeySerializationSchema()` methods)
   - `PulsarSinkOptions.java:64-69` (definition still exists but unused)
   
   **Problem Description**:
   1. Original `PulsarSinkFactory.optionRule()` had `PARTITION_KEY_FIELDS` as 
an optional configuration
   2. New code removed all related logic:
      - `keySerializationSchema` field
      - `getPartitionKeyFields()` method
      - `createKeySerializationSchema()` method
   3. **But not documented in `incompatible-changes.md`**
   4. PR description does not explicitly inform about this breaking change
   
   **Potential Risks**:
   - Existing users using `partition_key_fields` configuration cannot run after 
upgrade
   - Violates semantic versioning principles
   - Poor user experience
   
   **Impact Scope**:
   - Direct impact: Users using Pulsar partition key functionality
   - Indirect impact: Scenarios relying on message key for routing/partitioning
   - Affected scope: Some Pulsar Connector users
   
   **Severity**: MAJOR
   
   **Improvement Suggestions**:
   1. If this feature must be removed:
      - Document in `incompatible-changes.md`
      - Prominently note in PR description
      - Explain alternative approaches in migration documentation
      
   2. Or retain this functionality:
      - In multi-table scenarios, partition key configuration remains valid
      - Only ignore partition key configuration when using `getTableId()` 
routing
   
   ---
   
   ### Issue 4: Race condition in transaction management logic after 
prepareCommit
   
   **Location**: `PulsarSinkWriter.java:145-163`
   
   ```java
   @Override
   public Optional<PulsarCommitInfo> prepareCommit() throws IOException {
       if (PulsarSemantics.EXACTLY_ONCE != pulsarSemantics) {
           return Optional.empty();
       }
   
       while (pendingMessages.get() > 0) {
           Thread.yield();
       }
   
       if (currentTransaction == null) {
           return Optional.empty();
       }
   
       TxnID txnID = currentTransaction.getTxnID();
       currentTransaction = null;  // ← Set to null
   
       return Optional.of(new PulsarCommitInfo(txnID));
   }
   
   private Transaction getOrCreateTransaction() {
       if (PulsarSemantics.EXACTLY_ONCE != pulsarSemantics) {
           return null;
       }
   
       if (currentTransaction == null) {  // ← Recreate
           try {
               currentTransaction =
                       PulsarConfigUtil.getTransaction(pulsarClient, 
transactionTimeout);
           } catch (Exception e) {
               throw new PulsarConnectorException(
                       PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
                       "Transaction create failed");
           }
       }
   
       return currentTransaction;
   }
   ```
   
   **Problem Description**:
   1. `prepareCommit()` sets `currentTransaction` to null
   2. Next `write()` call creates new transaction only when 
`getOrCreateTransaction()` is invoked
   3. **Race condition**: If `write()` is called before `prepareCommit()`, but 
transaction is not ready, NPE will occur
   
   Compared to original implementation:
   ```java
   // The original snapshotState creates a new transaction before returning the 
state
   List<PulsarSinkState> pulsarSinkStates = Lists.newArrayList(new 
PulsarSinkState(this.transaction.getTxnID()));
   this.transaction = (TransactionImpl) 
PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout);
   return pulsarSinkStates;
   ```
   
   **Potential Risks**:
   - After `currentTransaction` is set to null, if there are concurrent write 
calls, null transaction may be used
   - Violates EXACTLY_ONCE semantics
   
   **Impact Scope**:
   - Direct impact: Multi-table writes in EXACTLY_ONCE mode
   - Indirect impact: Scenarios with large checkpoint intervals
   - Affected scope: Pulsar Connector EXACTLY_ONCE users
   
   **Severity**: CRITICAL
   
   **Improvement Suggestions**:
   Create next transaction before `prepareCommit()` returns:
   ```java
   TxnID txnID = currentTransaction.getTxnID();
   currentTransaction = null;
   
   // Create the next transaction immediately
   try {
       currentTransaction = PulsarConfigUtil.getTransaction(pulsarClient, 
transactionTimeout);
   } catch (Exception e) {
       throw new PulsarConnectorException(
               PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
               "Failed to create next transaction after prepareCommit", e);
   }
   
   return Optional.of(new PulsarCommitInfo(txnID));
   ```
   
   ---
   
   ### Issue 5: TOPIC configuration becomes optional but default value 
scenarios not sufficiently validated
   
   **Locations**: `PulsarSinkFactory.java:38-39` (new optionRule), 
`PulsarSinkWriter.java:103-108`
   
   ```java
   private String resolveTopic(SeaTunnelRow row) {
       if (row.getTableId() != null) {
           return row.getTableId();
       }
       return pluginConfig.get(PulsarSinkOptions.TOPIC);  // ← May return null
   }
   ```
   
   **Problem Description**:
   1. `PulsarSinkFactory.optionRule()` removes `TOPIC` from `required`
   2. `resolveTopic()` falls back to configured `TOPIC` when `row.getTableId() 
== null`
   3. **But `pluginConfig.get(TOPIC)` may return null** (user did not configure)
   4. Subsequent `createProducer(topic)` receives null topic, causing Pulsar 
client exception
   
   **Potential Risks**:
   - In single-table scenarios, if user forgets to configure `topic`, runtime 
exception occurs
   - Error message unclear
   
   **Impact Scope**:
   - Direct impact: Single-table sink users
   - Affected scope: All Pulsar Sink users
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   1. Validate in `PulsarSinkFactory.createSink()`: if single-table mode and 
topic not configured, throw exception
   2. Or add validation in `resolveTopic()`:
   ```java
   private String resolveTopic(SeaTunnelRow row) {
       if (row.getTableId() != null) {
           return row.getTableId();
       }
       String topic = pluginConfig.get(PulsarSinkOptions.TOPIC);
       if (topic == null) {
           throw new PulsarConnectorException(
                   PulsarConnectorErrorCode.ILLEGAL_ARGUMENT,
                   "Topic must be configured when row.getTableId() is null");
       }
       return topic;
   }
   ```
   
   ---
   
   ### Issue 6: Using RuntimeException instead of SeaTunnelJsonFormatException
   
   **Location**: `PulsarSinkWriter.java:216`
   
   ```java
   // Original code:
   throw new SeaTunnelJsonFormatException(
           CommonErrorCode.UNSUPPORTED_DATA_TYPE, 
           "Unsupported format: " + format);
   
   // New code:
   throw new RuntimeException("Unsupported format: " + format);
   ```
   
   **Problem Description**:
   Using generic `RuntimeException` instead of framework-defined exception 
types violates SeaTunnel specifications.
   
   **Potential Risks**:
   - Error codes lost, difficult to troubleshoot issues
   - Exception handling framework cannot recognize
   
   **Impact Scope**:
   - Direct impact: Scenarios using unsupported formats
   - Affected scope: Error handling and monitoring
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   Restore use of `SeaTunnelJsonFormatException` or other framework-defined 
exception types.
   
   ---
   
   ### Issue 7: Lack of testing for multi-table scenarios
   
   **Problem Description**:
   - PR description explicitly states: "No additional unit tests were added"
   - Core logic modified but no test coverage
   - No sink tests under `connector-pulsar/src/test/`
   
   **Potential Risks**:
   - Code quality not guaranteed
   - High regression risk
   - Edge cases not covered
   
   **Impact Scope**:
   - Direct impact: Code quality and stability
   - Indirect impact: Future maintenance costs
   - Affected scope: All Pulsar Connector users
   
   **Severity**: MAJOR
   
   **Improvement Suggestions**:
   Add the following tests:
   ```java
   // PulsarSinkWriterTest.java
   @Test
   public void testResolveTopicWithTableId() {
       SeaTunnelRow row = new SeaTunnelRow(new Object[]{});
       row.setTableId("persistent://tenant/ns/topic1");
       String topic = writer.resolveTopic(row);
       assertEquals("persistent://tenant/ns/topic1", topic);
   }
   
   @Test
   public void testResolveTopicWithoutTableId() {
       SeaTunnelRow row = new SeaTunnelRow(new Object[]{});
       String topic = writer.resolveTopic(row);
       assertEquals(configTopic, topic);
   }
   
   @Test
   public void testMultipleProducerCreation() {
       // Verify that multiple producers are cached in producerMap
   }
   
   @Test
   public void testExactlyOnceMultiTable() {
       // Verify multi-table writes in EXACTLY_ONCE mode
   }
   ```
   
   ---
   
   ### Issue 8: SupportMultiTableSinkWriter interface not implemented
   
   **Location**: `PulsarSinkWriter.java:36-37`
   
   ```java
   public class PulsarSinkWriter
           implements SinkWriter<SeaTunnelRow, PulsarCommitInfo, 
PulsarSinkState> {
       // SupportMultiTableSinkWriter not implemented
   ```
   
   **Related Context**:
   - Redis Connector: `RedisSinkWriter implements 
SupportMultiTableSinkWriter<Void>`
   - InfluxDB Connector: `InfluxDBSinkWriter implements 
SupportMultiTableSinkWriter`
   - Console Connector: `ConsoleSinkWriter implements 
SupportMultiTableSinkWriter<Void>`
   
   **Problem Description**:
   Although `PulsarSink` implements `SupportMultiTableSink`, `PulsarSinkWriter` 
does not implement `SupportMultiTableSinkWriter`. Inconsistent with other 
Connector implementations.
   
   **Potential Risks**:
   - API inconsistency
   - May require refactoring in the future to support more complex multi-table 
scenarios (e.g., schema evolution)
   
   **Impact Scope**:
   - Direct impact: Code architecture consistency
   - Indirect impact: Future extensibility
   - Affected scope: Pulsar Connector architecture
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   Consider implementing `SupportMultiTableSinkWriter<Void>` interface to 
maintain consistency with other Connectors.
   
   ---
   
   ### Issue 9: snapshotState returning empty list may cause state loss
   
   **Location**: `PulsarSinkWriter.java:165-178`
   
   ```java
   @Override
   public List<PulsarSinkState> snapshotState(long checkpointId) throws 
IOException {
       for (Producer<byte[]> producer : producerMap.values()) {
           producer.flush();
       }
   
       while (pendingMessages.get() > 0) {
           for (Producer<byte[]> producer : producerMap.values()) {
               producer.flush();
           }
       }
   
       return Collections.emptyList();  // ← Return empty list
   }
   ```
   
   **Problem Description**:
   Compared to original implementation:
   - Original EXACTLY_ONCE mode returned `new 
PulsarSinkState(transaction.getTxnID())`
   - New implementation returns empty list, relying on `prepareCommit` to 
return commitInfo
   
   Although SeaTunnel's checkpoint mechanism allows `snapshotState` to return 
empty list (state in `prepareCommit`), this differs from the original 
implementation.
   
   **Potential Risks**:
   - Recovery from old version savepoint may fail
   - State management logic unclear
   
   **Impact Scope**:
   - Direct impact: Users upgrading from old versions
   - Indirect impact: Savepoint recovery
   - Affected scope: EXACTLY_ONCE users
   
   **Severity**: MAJOR
   
   **Improvement Suggestions**:
   1. Confirm whether SeaTunnel framework allows `snapshotState` to return 
empty list
   2. If recovery from old version savepoint is needed, maintain compatibility:
   ```java
   @Override
   public List<PulsarSinkState> snapshotState(long checkpointId) throws 
IOException {
       // flush logic...
       
       if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
           // Maintain compatibility with the old version
           if (currentTransaction != null) {
               return Collections.singletonList(new 
PulsarSinkState(currentTransaction.getTxnID()));
           }
       }
       return Collections.emptyList();
   }
   ```
   
   ---
   
   ### Issue 10: Visibility issue with currentTransaction in concurrent 
scenarios
   
   **Location**: `PulsarSinkWriter.java:49`
   
   ```java
   private Transaction currentTransaction;  // ← No volatile
   ```
   
   **Problem Description**:
   Although SeaTunnel's `SinkWriter` is typically single-threaded, if the 
framework upgrades to concurrent writes:
   - `currentTransaction` has no `volatile` modifier
   - After setting to null in `prepareCommit()`, other threads may see stale 
values
   
   **Potential Risks**:
   - In concurrent scenarios, threads may use committed or aborted transactions
   - Violates EXACTLY_ONCE semantics
   
   **Impact Scope**:
   - Direct impact: Concurrent write scenarios (if supported in the future)
   - Indirect impact: Code robustness
   - Affected scope: Architecture design
   
   **Severity**: MINOR (current framework assumes single-threading)
   
   **Improvement Suggestions**:
   Use `AtomicReference<Transaction>` or add `volatile`:
   ```java
   private final AtomicReference<Transaction> currentTransaction = new 
AtomicReference<>();
   ```
   
   ---


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to