DanielCarter-stack commented on PR #10558:
URL: https://github.com/apache/seatunnel/pull/10558#issuecomment-3999110144
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10558", "part": 1,
"total": 1} -->
### Issue 1: Exceptions thrown in whenComplete callback are ineffective
**Location**: `PulsarSinkWriter.java:91-99`
```java
future.whenComplete(
(id, ex) -> {
pendingMessages.decrementAndGet();
if (ex != null) {
throw new PulsarConnectorException(
PulsarConnectorErrorCode.SEND_MESSAGE_FAILED,
"Send message failed");
}
});
```
**Related Context**:
- Async callbacks execute in Pulsar client thread pool
- Exceptions thrown in callbacks are not caught by the main thread
**Problem Description**:
`CompletableFuture.whenComplete()` executes in an async thread, where thrown
exceptions are swallowed by the `CompletableFuture` framework (typically only
logged to stderr) and do not propagate to the calling thread. This causes
message send failures to:
1. Go unnoticed by users (exceptions are swallowed)
2. `pendingMessages` still gets decremented
3. Data consistency cannot be guaranteed
This is an issue that existed in the original code, but was not fixed in
this refactoring.
**Potential Risks**:
- Silent loss of failed message sends
- Violation of at-least-once semantics (AT_LEAST_ONCE)
- Difficult to troubleshoot in production environments
**Impact Scope**:
- Direct impact: Exception handling in `PulsarSinkWriter.write()`
- Indirect impact: All scenarios using EXACTLY_ONCE or AT_LEAST_ONCE
semantics
- Affected scope: All Pulsar Connector users
**Severity**: MAJOR
**Improvement Suggestions**:
```java
future.whenComplete(
(id, ex) -> {
pendingMessages.decrementAndGet();
if (ex != null) {
// Log errors instead of throwing exceptions
log.error("Failed to send message to topic: {}", topic, ex);
// Or use a custom error handling mechanism
errorHandler.handleSendFailure(topic, element, ex);
}
});
```
---
### Issue 2: Removal of Apache License header
**Locations**:
- `PulsarSink.java:1-17` (original) → removed in new code
- `PulsarSinkWriter.java:1-33` (original) → removed in new code
**Problem Description**:
All modified Java files have had their Apache License headers removed,
including:
- `/* Licensed to the Apache Software Foundation... */`
- Copyright notices
- LICENSE file references
This violates Apache project guidelines.
**Potential Risks**:
- Legal compliance risk
- Cannot pass Apache PMC IP review
- Affects project compliance
**Impact Scope**:
- Direct impact: All modified Java files
- Affected scope: Project compliance
**Severity**: BLOCKER (Must fix)
**Improvement Suggestions**:
Restore Apache License headers for all files.
---
### Issue 3: Removal of PARTITION_KEY_FIELDS configuration causes breaking
change
**Locations**:
- `PulsarSinkFactory.java:39-41` (original optionRule included
`PARTITION_KEY_FIELDS`)
- `PulsarSinkWriter.java` (removed `getPartitionKeyFields()`,
`createKeySerializationSchema()` methods)
- `PulsarSinkOptions.java:64-69` (definition still exists but unused)
**Problem Description**:
1. Original `PulsarSinkFactory.optionRule()` had `PARTITION_KEY_FIELDS` as
an optional configuration
2. New code removed all related logic:
- `keySerializationSchema` field
- `getPartitionKeyFields()` method
- `createKeySerializationSchema()` method
3. **But not documented in `incompatible-changes.md`**
4. PR description does not explicitly inform about this breaking change
**Potential Risks**:
- Existing users using `partition_key_fields` configuration cannot run after
upgrade
- Violates semantic versioning principles
- Poor user experience
**Impact Scope**:
- Direct impact: Users using Pulsar partition key functionality
- Indirect impact: Scenarios relying on message key for routing/partitioning
- Affected scope: Some Pulsar Connector users
**Severity**: MAJOR
**Improvement Suggestions**:
1. If this feature must be removed:
- Document in `incompatible-changes.md`
- Prominently note in PR description
- Explain alternative approaches in migration documentation
2. Or retain this functionality:
- In multi-table scenarios, partition key configuration remains valid
- Only ignore partition key configuration when using `getTableId()`
routing
---
### Issue 4: Race condition in transaction management logic after
prepareCommit
**Location**: `PulsarSinkWriter.java:145-163`
```java
@Override
public Optional<PulsarCommitInfo> prepareCommit() throws IOException {
if (PulsarSemantics.EXACTLY_ONCE != pulsarSemantics) {
return Optional.empty();
}
while (pendingMessages.get() > 0) {
Thread.yield();
}
if (currentTransaction == null) {
return Optional.empty();
}
TxnID txnID = currentTransaction.getTxnID();
currentTransaction = null; // ← Set to null
return Optional.of(new PulsarCommitInfo(txnID));
}
private Transaction getOrCreateTransaction() {
if (PulsarSemantics.EXACTLY_ONCE != pulsarSemantics) {
return null;
}
if (currentTransaction == null) { // ← Recreate
try {
currentTransaction =
PulsarConfigUtil.getTransaction(pulsarClient,
transactionTimeout);
} catch (Exception e) {
throw new PulsarConnectorException(
PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
"Transaction create failed");
}
}
return currentTransaction;
}
```
**Problem Description**:
1. `prepareCommit()` sets `currentTransaction` to null
2. Next `write()` call creates new transaction only when
`getOrCreateTransaction()` is invoked
3. **Race condition**: If `write()` is called before `prepareCommit()`, but
transaction is not ready, NPE will occur
Compared to original implementation:
```java
// The original snapshotState creates a new transaction before returning the
state
List<PulsarSinkState> pulsarSinkStates = Lists.newArrayList(new
PulsarSinkState(this.transaction.getTxnID()));
this.transaction = (TransactionImpl)
PulsarConfigUtil.getTransaction(pulsarClient, transactionTimeout);
return pulsarSinkStates;
```
**Potential Risks**:
- After `currentTransaction` is set to null, if there are concurrent write
calls, null transaction may be used
- Violates EXACTLY_ONCE semantics
**Impact Scope**:
- Direct impact: Multi-table writes in EXACTLY_ONCE mode
- Indirect impact: Scenarios with large checkpoint intervals
- Affected scope: Pulsar Connector EXACTLY_ONCE users
**Severity**: CRITICAL
**Improvement Suggestions**:
Create next transaction before `prepareCommit()` returns:
```java
TxnID txnID = currentTransaction.getTxnID();
currentTransaction = null;
// Create the next transaction immediately
try {
currentTransaction = PulsarConfigUtil.getTransaction(pulsarClient,
transactionTimeout);
} catch (Exception e) {
throw new PulsarConnectorException(
PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED,
"Failed to create next transaction after prepareCommit", e);
}
return Optional.of(new PulsarCommitInfo(txnID));
```
---
### Issue 5: TOPIC configuration becomes optional but default value
scenarios not sufficiently validated
**Locations**: `PulsarSinkFactory.java:38-39` (new optionRule),
`PulsarSinkWriter.java:103-108`
```java
private String resolveTopic(SeaTunnelRow row) {
if (row.getTableId() != null) {
return row.getTableId();
}
return pluginConfig.get(PulsarSinkOptions.TOPIC); // ← May return null
}
```
**Problem Description**:
1. `PulsarSinkFactory.optionRule()` removes `TOPIC` from `required`
2. `resolveTopic()` falls back to configured `TOPIC` when `row.getTableId()
== null`
3. **But `pluginConfig.get(TOPIC)` may return null** (user did not configure)
4. Subsequent `createProducer(topic)` receives null topic, causing Pulsar
client exception
**Potential Risks**:
- In single-table scenarios, if user forgets to configure `topic`, runtime
exception occurs
- Error message unclear
**Impact Scope**:
- Direct impact: Single-table sink users
- Affected scope: All Pulsar Sink users
**Severity**: MINOR
**Improvement Suggestions**:
1. Validate in `PulsarSinkFactory.createSink()`: if single-table mode and
topic not configured, throw exception
2. Or add validation in `resolveTopic()`:
```java
private String resolveTopic(SeaTunnelRow row) {
if (row.getTableId() != null) {
return row.getTableId();
}
String topic = pluginConfig.get(PulsarSinkOptions.TOPIC);
if (topic == null) {
throw new PulsarConnectorException(
PulsarConnectorErrorCode.ILLEGAL_ARGUMENT,
"Topic must be configured when row.getTableId() is null");
}
return topic;
}
```
---
### Issue 6: Using RuntimeException instead of SeaTunnelJsonFormatException
**Location**: `PulsarSinkWriter.java:216`
```java
// Original code:
throw new SeaTunnelJsonFormatException(
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
"Unsupported format: " + format);
// New code:
throw new RuntimeException("Unsupported format: " + format);
```
**Problem Description**:
Using generic `RuntimeException` instead of framework-defined exception
types violates SeaTunnel specifications.
**Potential Risks**:
- Error codes lost, difficult to troubleshoot issues
- Exception handling framework cannot recognize
**Impact Scope**:
- Direct impact: Scenarios using unsupported formats
- Affected scope: Error handling and monitoring
**Severity**: MINOR
**Improvement Suggestions**:
Restore use of `SeaTunnelJsonFormatException` or other framework-defined
exception types.
---
### Issue 7: Lack of testing for multi-table scenarios
**Problem Description**:
- PR description explicitly states: "No additional unit tests were added"
- Core logic modified but no test coverage
- No sink tests under `connector-pulsar/src/test/`
**Potential Risks**:
- Code quality not guaranteed
- High regression risk
- Edge cases not covered
**Impact Scope**:
- Direct impact: Code quality and stability
- Indirect impact: Future maintenance costs
- Affected scope: All Pulsar Connector users
**Severity**: MAJOR
**Improvement Suggestions**:
Add the following tests:
```java
// PulsarSinkWriterTest.java
@Test
public void testResolveTopicWithTableId() {
SeaTunnelRow row = new SeaTunnelRow(new Object[]{});
row.setTableId("persistent://tenant/ns/topic1");
String topic = writer.resolveTopic(row);
assertEquals("persistent://tenant/ns/topic1", topic);
}
@Test
public void testResolveTopicWithoutTableId() {
SeaTunnelRow row = new SeaTunnelRow(new Object[]{});
String topic = writer.resolveTopic(row);
assertEquals(configTopic, topic);
}
@Test
public void testMultipleProducerCreation() {
// Verify that multiple producers are cached in producerMap
}
@Test
public void testExactlyOnceMultiTable() {
// Verify multi-table writes in EXACTLY_ONCE mode
}
```
---
### Issue 8: SupportMultiTableSinkWriter interface not implemented
**Location**: `PulsarSinkWriter.java:36-37`
```java
public class PulsarSinkWriter
implements SinkWriter<SeaTunnelRow, PulsarCommitInfo,
PulsarSinkState> {
// SupportMultiTableSinkWriter not implemented
```
**Related Context**:
- Redis Connector: `RedisSinkWriter implements
SupportMultiTableSinkWriter<Void>`
- InfluxDB Connector: `InfluxDBSinkWriter implements
SupportMultiTableSinkWriter`
- Console Connector: `ConsoleSinkWriter implements
SupportMultiTableSinkWriter<Void>`
**Problem Description**:
Although `PulsarSink` implements `SupportMultiTableSink`, `PulsarSinkWriter`
does not implement `SupportMultiTableSinkWriter`. Inconsistent with other
Connector implementations.
**Potential Risks**:
- API inconsistency
- May require refactoring in the future to support more complex multi-table
scenarios (e.g., schema evolution)
**Impact Scope**:
- Direct impact: Code architecture consistency
- Indirect impact: Future extensibility
- Affected scope: Pulsar Connector architecture
**Severity**: MINOR
**Improvement Suggestions**:
Consider implementing `SupportMultiTableSinkWriter<Void>` interface to
maintain consistency with other Connectors.
---
### Issue 9: snapshotState returning empty list may cause state loss
**Location**: `PulsarSinkWriter.java:165-178`
```java
@Override
public List<PulsarSinkState> snapshotState(long checkpointId) throws
IOException {
for (Producer<byte[]> producer : producerMap.values()) {
producer.flush();
}
while (pendingMessages.get() > 0) {
for (Producer<byte[]> producer : producerMap.values()) {
producer.flush();
}
}
return Collections.emptyList(); // ← Return empty list
}
```
**Problem Description**:
Compared to original implementation:
- Original EXACTLY_ONCE mode returned `new
PulsarSinkState(transaction.getTxnID())`
- New implementation returns empty list, relying on `prepareCommit` to
return commitInfo
Although SeaTunnel's checkpoint mechanism allows `snapshotState` to return
empty list (state in `prepareCommit`), this differs from the original
implementation.
**Potential Risks**:
- Recovery from old version savepoint may fail
- State management logic unclear
**Impact Scope**:
- Direct impact: Users upgrading from old versions
- Indirect impact: Savepoint recovery
- Affected scope: EXACTLY_ONCE users
**Severity**: MAJOR
**Improvement Suggestions**:
1. Confirm whether SeaTunnel framework allows `snapshotState` to return
empty list
2. If recovery from old version savepoint is needed, maintain compatibility:
```java
@Override
public List<PulsarSinkState> snapshotState(long checkpointId) throws
IOException {
// flush logic...
if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
// Maintain compatibility with the old version
if (currentTransaction != null) {
return Collections.singletonList(new
PulsarSinkState(currentTransaction.getTxnID()));
}
}
return Collections.emptyList();
}
```
---
### Issue 10: Visibility issue with currentTransaction in concurrent
scenarios
**Location**: `PulsarSinkWriter.java:49`
```java
private Transaction currentTransaction; // ← No volatile
```
**Problem Description**:
Although SeaTunnel's `SinkWriter` is typically single-threaded, if the
framework upgrades to concurrent writes:
- `currentTransaction` has no `volatile` modifier
- After setting to null in `prepareCommit()`, other threads may see stale
values
**Potential Risks**:
- In concurrent scenarios, threads may use committed or aborted transactions
- Violates EXACTLY_ONCE semantics
**Impact Scope**:
- Direct impact: Concurrent write scenarios (if supported in the future)
- Indirect impact: Code robustness
- Affected scope: Architecture design
**Severity**: MINOR (current framework assumes single-threading)
**Improvement Suggestions**:
Use `AtomicReference<Transaction>` or add `volatile`:
```java
private final AtomicReference<Transaction> currentTransaction = new
AtomicReference<>();
```
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]