DanielCarter-stack commented on PR #10557:
URL: https://github.com/apache/seatunnel/pull/10557#issuecomment-3993273343
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10557", "part": 1,
"total": 1} -->
## Issue 1: Concurrent access risk in sourceSplits
**Location**: `RabbitmqSourceReader.java:75, 186`
```java
private final Set<RabbitmqSplit> sourceSplits; // HashSet - Not thread-safe
public void addSplits(List<RabbitmqSplit> splits) {
// ...
sourceSplits.add(split); // ❌ No synchronization protection
}
public List<RabbitmqSplit> snapshotState(long checkpointId) {
return new ArrayList<>(sourceSplits); // ❌ No synchronization protection
}
```
**Context**:
- Caller 1: `RabbitmqSplitEnumerator.assignSplitsToReaders()` →
`context.assignSplit()` → `addSplits()` (Enumerator thread)
- Caller 2: Checkpoint coordinator → `snapshotState()` (Checkpoint thread)
**Problem Description**:
Two threads may concurrently access `sourceSplits` HashSet, leading to
`ConcurrentModificationException` or inconsistent state. For example:
1. Thread A (Enumerator) is executing `sourceSplits.add(split1)`
2. Thread B (Checkpoint) is simultaneously executing `new
ArrayList<>(sourceSplits)`
3. May cause iterator exceptions or loss of split1
**Potential Risks**:
- **Risk 1**: Incomplete state during Checkpoint, causing split loss after
recovery
- **Risk 2**: Concurrent modification exception causing job failure
- **Risk 3**: Difficult to reproduce in production high-concurrency scenarios
**Scope of Impact**:
- **Direct Impact**: `RabbitmqSourceReader` class
- **Indirect Impact**: Checkpoint recovery mechanism may fail
- **Affected Area**: Single Connector (RabbitMQ)
**Severity**: Medium-High
**Improvement Suggestions**:
```java
// Solution 1: Use ConcurrentHashMap (recommended)
private final Set<RabbitmqSplit> sourceSplits =
ConcurrentHashMap.newKeySet();
// Solution 2: Use Collections.synchronizedSet
private final Set<RabbitmqSplit> sourceSplits =
Collections.synchronizedSet(new HashSet<>());
// Solution 3: Add locks when accessing (affects performance, not
recommended)
private final Set<RabbitmqSplit> sourceSplits = new HashSet<>();
private final Object splitsLock = new Object();
public void addSplits(List<RabbitmqSplit> splits) {
synchronized (splitsLock) {
sourceSplits.addAll(splits);
}
}
```
**Rationale**:
- `ConcurrentHashMap.newKeySet()` provides thread-safe HashSet implementation
- Does not affect performance (concurrent reads)
- Consistent with `pendingDeliveryTagsToCommit` implementation
---
## Issue 2: Silent message dropping
**Location**: `RabbitmqSourceReader.java:152-161`
```java
DeserializationSchema<SeaTunnelRow> schema =
schemaMap.get(message.getSplitId());
String exactTableId = exactTableIdMap.get(message.getSplitId());
if (schema != null && exactTableId != null) {
SeaTunnelRow row = schema.deserialize(delivery.getBody());
if (row != null) {
row.setTableId(exactTableId);
output.collect(row);
}
} else {
log.warn("Cannot find schema or tableId for queue: {}",
message.getSplitId());
// ❌ Messages are silently discarded
}
```
**Context**:
- `schemaMap` and `exactTableIdMap` are initialized in the constructor
- If messages from unconfigured queues are received at runtime (e.g.,
configuration error, queue name typo)
**Problem Description**:
When the corresponding splitId cannot be found in `schemaMap` or
`exactTableIdMap`, the message is only logged at WARN level and then discarded.
This means:
1. When users have configuration errors, data is lost but the job continues
running
2. No metrics or alerts notify the user
3. Inconsistent with SeaTunnel's error handling philosophy (should typically
fail or explicitly reject)
**Potential Risks**:
- **Risk 1**: Silent data loss due to configuration errors, difficult for
users to detect
- **Risk 2**: Potential loss of critical data in production environments
- **Risk 3**: Difficult to debug (logs may be drowned out)
**Scope of Impact**:
- **Direct Impact**: Message processing in multi-table scenarios
- **Indirect Impact**: Data reliability
- **Affected Area**: Users using multi-table functionality
**Severity**: Medium-High
**Improvement Suggestions**:
```java
// Solution 1: Escalate to error and throw exception (strict mode)
if (schema != null && exactTableId != null) {
SeaTunnelRow row = schema.deserialize(delivery.getBody());
if (row != null) {
row.setTableId(exactTableId);
output.collect(row);
}
} else {
String errorMsg = String.format(
"Cannot find schema or tableId for queue: %s. " +
"This queue is not configured in tables_configs. " +
"Available queues: %s",
message.getSplitId(), schemaMap.keySet());
log.error(errorMsg);
throw new RabbitmqConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, errorMsg);
}
// Solution 2: Add configuration option (lenient mode)
if (config.isStrictSchemaValidation()) {
// Throw exception
} else {
// Only log warning (current behavior)
log.warn("...");
}
// Solution 3: Add Metrics
if (schema == null || exactTableId == null) {
context.counter("rabbitmq.unknown.queue.messages").add(1);
log.warn("...");
}
```
**Rationale**:
- Configuration errors should fail-fast
- Provides clear error messages to help users diagnose
- Optional strict mode allows users to choose behavior
---
## Issue 3: Consumer resource leak risk
**Location**: `RabbitmqSourceReader.java:174-194`
```java
@Override
public void addSplits(List<RabbitmqSplit> splits) {
for (RabbitmqSplit split : splits) {
System.out.println(...); // ⚠️ Debug code
try {
DefaultConsumer consumer =
rabbitMQClient.getQueueingConsumer(queue, split.splitId());
channel.basicConsume(split.splitId(), autoAck, consumer); // ⚠️
Create consumer every time
sourceSplits.add(split);
} catch (IOException e) {
throw new RabbitmqConnectorException(...);
}
}
}
@Override
public void close() throws IOException {
if (rabbitMQClient != null) {
rabbitMQClient.close(); // ✅ Close client
}
}
```
**Context**:
- RabbitMQ's `basicConsume()` creates a consumer and registers it to the
channel
- Each call creates a new `DefaultConsumer` instance
- No corresponding `basicCancel()` call
**Problem Description**:
1. **Every `addSplits()` creates a new consumer**: If a split is reassigned
(e.g., `addSplitsBack`), duplicate consumption may occur
2. **No consumer cleanup mechanism**: If a split is removed, the
corresponding consumer is still running
3. **Debug code not cleaned up**: Production code contains
`System.out.println`
**Potential Risks**:
- **Risk 1**: Duplicate consumption of the same queue (if split is
reassigned)
- **Risk 2**: Consumer resource leak (long-running jobs)
- **Risk 3**: Log pollution (debug output)
**Scope of Impact**:
- **Direct Impact**: Resource usage, duplicate message consumption
- **Indirect Impact**: Data accuracy
- **Affected Area**: Multi-table + split reassignment scenarios
**Severity**: Medium
**Improvement Suggestions**:
```java
// Added: Track consumer
private final Map<String, DefaultConsumer> activeConsumers = new
ConcurrentHashMap<>();
@Override
public void addSplits(List<RabbitmqSplit> splits) {
for (RabbitmqSplit split : splits) {
log.info("Received split for queue: {}", split.splitId()); // Use
log instead of println
try {
// Check if consumer already exists
if (activeConsumers.containsKey(split.splitId())) {
log.warn("Consumer for queue {} already exists, skipping",
split.splitId());
continue;
}
DefaultConsumer consumer =
rabbitMQClient.getQueueingConsumer(queue, split.splitId());
channel.basicConsume(split.splitId(), autoAck, consumer);
activeConsumers.put(split.splitId(), consumer);
sourceSplits.add(split);
log.info("Started consuming from queue: {}", split.splitId());
} catch (IOException e) {
throw new
RabbitmqConnectorException(CREATE_RABBITMQ_CLIENT_FAILED, e);
}
}
}
// Added: Clean up consumer (if needed)
private void removeSplit(RabbitmqSplit split) {
try {
DefaultConsumer consumer = activeConsumers.remove(split.splitId());
if (consumer != null) {
channel.basicCancel(consumer.getConsumerTag());
}
sourceSplits.remove(split);
} catch (IOException e) {
log.error("Failed to cancel consumer for queue {}", split.splitId(),
e);
}
}
@Override
public void close() throws IOException {
// Clean up all consumers
for (Map.Entry<String, DefaultConsumer> entry :
activeConsumers.entrySet()) {
try {
channel.basicCancel(entry.getValue().getConsumerTag());
} catch (IOException e) {
log.error("Failed to cancel consumer for queue {}",
entry.getKey(), e);
}
}
activeConsumers.clear();
if (rabbitMQClient != null) {
rabbitMQClient.close();
}
}
```
**Rationale**:
- Track active consumers to avoid duplicate creation
- Provide cleanup mechanism to prevent resource leaks
- Remove debug code, use logging framework
---
## Issue 4: System.out.println debug code
**Location**: `RabbitmqSourceReader.java:176-179`
```java
System.out.println(
"\u001B[32m [READER DEBUG] Received split for queue: "
+ split.splitId()
+ "\u001B[0m");
```
**Problem Description**:
Production code contains debug output, violating Apache code standards.
**Potential Risks**:
- **Risk 1**: Pollutes standard output, interferes with log collection
- **Risk 2**: ANSI escape codes display abnormally in some terminals
- **Risk 3**: Cannot be controlled through log levels
**Scope of Impact**:
- **Direct Impact**: Code quality
- **Affected Area**: Code standards
**Severity**: Low
**Improvement Suggestions**:
```java
// Remove debug code, use logging
log.info("Received split for queue: {}", split.splitId());
// Or use DEBUG level
log.debug("Received split for queue: {}", split.splitId());
```
---
## Issue 5: Configuration validation timing
**Location**: `RabbitmqSource.java:58-80`
```java
private List<CatalogTable> initializeCatalogTables(ReadonlyConfig config) {
List<CatalogTable> tables = new ArrayList<>();
if
(config.getOptional(ConnectorCommonOptions.TABLE_CONFIGS).isPresent()) {
// ... Handle multiple tables
} else if
(config.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
// ... Handle single table
} else {
throw new RabbitmqConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
"No 'schema' or 'table_configs' found. Please configure at least
one table.");
}
return tables;
}
```
**Context**:
- Called in `RabbitmqSource` constructor
- Exclusive rules for exclusive are already defined in
`RabbitmqSourceFactory.optionRule()`
**Problem Description**:
Although `optionRule()` defines exclusive rules for `TABLE_CONFIGS` and
`QUEUE_NAME`, the validation logic in `initializeCatalogTables()` uses an
if-else chain. The following issues exist:
1. If users configure both, `TABLE_CONFIGS` takes priority (as expected),
but depends on if-else order
2. No explicit validation logic checks for this conflict
**Potential Risks**:
- **Risk 1**: Future code refactoring may break priority
- **Risk 2**: Does not follow best practices (should validate early)
**Scope of Impact**:
- **Direct Impact**: Configuration validation logic
- **Affected Area**: Configuration clarity
**Severity**: Low (current implementation is correct, but can be improved)
**Improvement Suggestions**:
```java
private List<CatalogTable> initializeCatalogTables(ReadonlyConfig config) {
boolean hasTableConfigs =
config.getOptional(ConnectorCommonOptions.TABLE_CONFIGS).isPresent();
boolean hasSchema =
config.getOptional(ConnectorCommonOptions.SCHEMA).isPresent();
// Explicitly validate exclusive rules
if (hasTableConfigs && hasSchema) {
throw new RabbitmqConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
"Cannot specify both 'table_configs' and 'schema'. Please use
'table_configs' for multi-table or 'schema' for single-table mode.");
}
List<CatalogTable> tables = new ArrayList<>();
if (hasTableConfigs) {
// ... Multiple tables logic
} else if (hasSchema) {
// ... Single table logic
} else {
throw new RabbitmqConnectorException(...);
}
return tables;
}
```
**Rationale**:
- Explicit validation logic, does not depend on if-else order
- Provides clear error messages
- Consistent with `optionRule()` exclusive rules
---
## Issue 6: Insufficient E2E test data validation
**Location**: `RabbitmqIT.java:259-280`
```java
@TestTemplate
public void testRabbitMQMultiTableE2E(TestContainer container) throws
Exception {
// ... Create two queues, each with 10 records
sendData("multi_table_1", type1, 10);
sendData("multi_table_2", type2, 10);
Container.ExecResult execResult =
container.executeJob("/rabbitmq_multitable.conf");
Assertions.assertEquals(0, execResult.getExitCode());
// ❌ Only validate exit code, data distribution not validated
}
```
**Context**:
Configuration file uses Assert Sink:
```hocon
Assert {
rules {
row_rules = [
{ rule_type = MAX_ROW, rule_value = 10 },
{ rule_type = MIN_ROW, rule_value = 10 }
]
}
}
```
**Problem Description**:
The test only validates the total row count as 20, but does not verify:
1. Whether the row count for each table is correct (possibly one is 20 and
the other is 0)
2. Whether the tableId for each row is correct
3. Whether the schema matches
**Potential Risks**:
- **Risk 1**: Cannot detect tableId injection errors
- **Risk 2**: Cannot detect schema mismatch
**Scope of Impact**:
- **Direct Impact**: Test coverage
- **Affected Area**: Test reliability
**Severity**: Low-Medium
**Improvement Suggestions**:
```java
@TestTemplate
public void testRabbitMQMultiTableE2E(TestContainer container) throws
Exception {
// ... Send data
// Use custom sink for validation
Map<String, Integer> tableCounts = new HashMap<>();
// ... Collect data and count rows per table
Assertions.assertEquals(10, tableCounts.get("multi_table_1").intValue());
Assertions.assertEquals(10, tableCounts.get("multi_table_2").intValue());
// Validate schema
// ...
}
```
**Rationale**:
- Verify data is correctly distributed to different tables
- Verify tableId injection is correct
- Improve test confidence
---
## Issue 7: Missing documentation
**Location**: PR description
**Problem Description**:
The PR description states: "If necessary, please update the documentation",
but:
1. The diff does not include any documentation changes
2. No migration guide from single-table to multi-table
3. No configuration examples
**Potential Risks**:
- **Risk 1**: Users do not know how to use the new feature
- **Risk 2**: Difficult to upgrade
**Scope of Impact**:
- **Affected Area**: User experience
**Severity**: Medium
**Improvement Suggestions**:
Add documentation `docs/en/connectors/source/RabbitMQ.md`, including:
1. Multi-table configuration examples
2. Migration guide from single-table to multi-table
3. Common questions (e.g., queue_name configuration priority)
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]