DanielCarter-stack commented on PR #10502:
URL: https://github.com/apache/seatunnel/pull/10502#issuecomment-3939843020
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10502", "part": 1,
"total": 1} -->
### Issue 1: AbstractSinkWriter Template Method Breaks Existing Subclass
Compatibility
**Location**:
`seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java:27-47`
**Related Context**:
- Subclass 1:
`seatunnel-connectors-v2/connector-slack/src/main/java/org/apache/seatunnel/connectors/seatunnel/slack/sink/SlackWriter.java:49`
- Subclass 2:
`seatunnel-connectors-v2/connector-dingtalk/src/main/java/org/apache/seatunnel/connectors/seatunnel/sink/DingTalkWriter.java:49`
- Subclass 3:
`seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java:70`
- Other 27+ subclasses
**Problem Description**:
`AbstractSinkWriter` added an implementation for `write(T element)`,
changing it to a template method pattern that requires subclasses to implement
`doWrite(T element)`. However:
1. The `write()` method in the code is not marked as `final` or `abstract`
2. Existing 30+ subclasses all implement the `write()` method
3. The `write()` methods of these subclasses will override the parent class
implementation
4. This causes dirty data collection logic to never be called
**Potential Risks**:
- **Risk 1**: All Connectors inheriting from `AbstractSinkWriter` cannot use
the dirty data collection feature
- **Risk 2**: Subclasses calling `super.write()` may cause recursion
- **Risk 3**: No compile-time errors, only runtime behavior anomalies
**Impact Scope**:
- **Direct Impact**: Connectors such as Slack, DingTalk, Email, Assert,
HugeGraph, Druid, Console, Redis, RocketMQ, Qdrant, TDengine, etc.
- **Indirect Impact**: User-configured dirty data collectors do not take
effect in these Connectors
- **Impact Area**: 30+ Connectors, requiring adaptation one by one
**Severity**: BLOCKER
**Improvement Suggestions**:
```java
// AbstractSinkWriter.java
public abstract class AbstractSinkWriter<T, StateT> implements SinkWriter<T,
Void, StateT> {
protected SinkWriter.Context context;
protected AbstractSinkWriter() {}
protected AbstractSinkWriter(SinkWriter.Context context) {
this.context = context;
}
/**
* Template method that wraps {@link #doWrite(Object)} with dirty data
collection.
*
* <p><b>IMPORTANT:</b> Subclasses must override {@link
#doWrite(Object)} instead of this method.
* If you override this method directly, dirty data collection will not
work.
*
* @throws IOException if write fails and the record cannot be collected
as dirty data
*/
@Override
public final void write(T element) throws IOException { // Add final
if (validateDirtyRecord(element)) {
return;
}
try {
doWrite(element);
} catch (Exception e) {
if (!tryCollectDirtyRecord(element, e)) {
if (e instanceof IOException) {
throw (IOException) e;
}
throw new IOException(e);
}
}
}
/**
* Subclasses implement this method for actual write logic.
*
* @param element the data to write
* @throws IOException if write fails
*/
protected abstract void doWrite(T element) throws IOException; //
Change to abstract
// ... other methods
}
```
At the same time, all subclasses need to be checked during the code review
phase to rename `write()` to `doWrite()`.
---
### Issue 2: getPluginConfig() Interface Method Improper Design Causes
Configuration Passing Failure
**Location**:
-
`seatunnel-api/src/main/java/org/apache/seatunnel/api/common/SeaTunnelPluginLifeCycle.java:50-53`
-
`seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DirtyCollectorConfigProcessor.java:99-114`
**Related Context**:
- Caller: `DirtyCollectorConfigProcessor.initializeCollector(SeaTunnelSink,
CatalogTable[])`
- Implementation Class:
`seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java:62-65`
- Other Sinks: JDBC, Kafka, Hudi, Paimon, etc. (this method not implemented)
**Problem Description**:
1. The `SeaTunnelPluginLifeCycle` interface added the `getPluginConfig()`
method, default throwing `UnsupportedOperationException`
2. `DirtyCollectorConfigProcessor` calls this method when initializing the
collector
3. Only `ConsoleSink` implements this method
4. Other Sinks will throw exceptions, making it impossible to obtain
configuration, and the dirty data collector downgrades to
`NoOpDirtyRecordCollector`
**Potential Risks**:
- **Risk 1**: All Sinks except ConsoleSink cannot use per-sink dirty data
configuration
- **Risk 2**: Users configured `sink.dirty.collector` but it does not take
effect, with no error prompt
- **Risk 3**: Violates the "interface minimization" principle (interfaces
should not know about implementation details like "configuration")
**Impact Scope**:
- **Direct Impact**: All SeaTunnelSinks that do not implement
`getPluginConfig()`
- **Indirect Impact**: User configurations do not take effect, difficult to
troubleshoot
- **Impact Area**: Core framework, affecting all Connectors
**Severity**: CRITICAL
**Improvement Suggestions**:
```java
// Solution 1: Remove interface method, pass configuration in SinkAction
instead
// MultipleTableJobConfigParser.java
Config envConfig = seaTunnelJobConfig.hasPath("env")
? seaTunnelJobConfig.getConfig("env") : null;
Config mergedSinkConfig =
DirtyCollectorConfigProcessor.getMergedSinkConfigForDirty(
envConfig, sinkConfig);
// Create collector directly here instead of in Sink
DirtyRecordCollector collector = DirtyCollectorConfigProcessor.processConfig(
envConfig, sinkConfig, catalogTable);
sinkAction.setDirtyRecordCollector(collector);
// Solution 2: If the interface must be kept, provide a reasonable default
implementation
// SeaTunnelPluginLifeCycle.java
default Config getPluginConfig() {
return ConfigFactory.empty(); // Return empty configuration instead of
throwing exception
}
```
Solution 1 is recommended because configuration is a concern of the Job
parsing phase and should not be the responsibility of Sink implementation
classes to trace back configuration.
---
### Issue 3: Dirty Data Counters Cannot Aggregate Across Distributed
Executors in Spark Environment
**Location**:
-
`seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/LogDirtyRecordCollector.java:41`
-
`seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java:68-71`
-
`seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSinkInjector.java:102-107`
**Related Context**:
- Serialization: `SparkSinkInjector.inject()` method serializes collector
through `SerializationUtils.objectToString()`
- Deserialization: `SparkDataSourceWriter` or `SparkDataWriterFactory`
deserializes collector
- Counter: `AtomicLong dirtyRecordCount` is independent in each Executor
instance
**Problem Description**:
1. Collector is created in Driver, containing `AtomicLong dirtyRecordCount =
new AtomicLong(0)`
2. After serialization, it is sent to each Executor
3. Each Executor deserializes to get an independent `AtomicLong` instance
(value resets to 0)
4. Threshold check `dirtyRecordCount >= threshold` only takes effect locally
5. **Result**: When configuring `threshold=10`, each Executor will collect
10 dirty data before triggering the threshold
**Example Scenario**:
```
配置: threshold=10, parallelism=4
预期: 全局收集 10 条脏数据后失败
实际: Executor1 收集 10 条, Executor2 收集 10 条, Executor3 收集 10 条, Executor4 收集 10
条
全局共 40 条,但每个 Executor 都只看到自己的 10 条
```
**Potential Risks**:
- **Risk 1**: Threshold configuration becomes ineffective, user expectations
differ from actual behavior
- **Risk 2**: Statistical data is inaccurate, unable to obtain global dirty
data total count
- **Risk 3**: In high-concurrency scenarios, far more dirty data than
expected may be collected
**Impact Scope**:
- **Direct Impact**: All Sinks under Spark engine (versions 2.4 and 3.3)
- **Indirect Impact**: Flink engine may also have similar issues (need to
verify Flink's serialization mechanism)
- **Impact Area**: Distributed execution engines
**Severity**: MAJOR
**Improvement Suggestions**:
```java
// Solution 1: Use Accumulator mechanism (recommended)
// SparkDirtyRecordCollector.java
public class SparkDirtyRecordCollector implements DirtyRecordCollector {
private LongAccumulator dirtyRecordAccumulator;
@Override
public void collect(...) {
if (dirtyRecordAccumulator != null) {
dirtyRecordAccumulator.add(1);
}
// ... logging logic
}
@Override
public void checkThreshold() throws Exception {
if (dirtyRecordAccumulator != null && threshold > 0) {
long globalCount = dirtyRecordAccumulator.value();
if (globalCount >= threshold) {
// fail
}
}
}
}
// Initialize in SparkDataWriterFactory
dirtyRecordAccumulator = sparkContext.longAccumulator("dirtyRecordCount");
// Solution 2: Remove threshold feature, only provide counting and logging
// Clearly state in documentation: threshold feature not supported in
distributed environments
```
---
### Issue 4: Dirty Data Collection May Leak Sensitive Information
**Location**:
`seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/LogDirtyRecordCollector.java:84-93`
**Related Context**:
- Log Output: Contains complete `dirtyRecord.toString()`
- Configuration Item: No desensitization configuration
**Problem Description**:
```java
String logMessage = String.format(
"Dirty record collected (exception) - SubTask: %d, %s, Count: %d,
Record: %s, Error: %s",
subTaskIndex,
tableInfo,
currentCount,
dirtyRecord != null ? dirtyRecord.toString() : "null", // Log completely
errorMessage != null ? errorMessage : "");
```
If dirty data contains sensitive fields (such as passwords, ID numbers,
credit card numbers), they will be recorded in logs in plaintext.
**Potential Risks**:
- **Risk 1**: Violates data security compliance requirements (GDPR, Personal
Information Protection Law)
- **Risk 2**: Log files may be accessed by unauthorized personnel
- **Risk 3**: Log collection systems (ELK, Splunk) will store sensitive data
**Impact Scope**:
- **Direct Impact**: All users using `LogDirtyRecordCollector`
- **Indirect Impact**: Data security compliance risks
- **Impact Area**: Production environment security
**Severity**: MAJOR
**Improvement Suggestions**:
```java
// Solution 1: Add desensitization configuration
env {
dirty.collector = {
type = "log"
mask_fields = ["password", "ssn", "credit_card"] // Fields that need
desensitization
mask_pattern = "***" // Desensitization character
}
}
// LogDirtyRecordCollector.java
@Override
public void init(Config config) {
// ...
if (config.hasPath("mask_fields")) {
this.maskFields = config.getStringList("mask_fields");
}
}
private String maskSensitiveFields(SeaTunnelRow row) {
if (maskFields == null || maskFields.isEmpty()) {
return row.toString();
}
// Implement field desensitization logic
}
// Solution 2: Only log row number and error information, not complete data
String logMessage = String.format(
"Dirty record collected - SubTask: %d, Table: %s, Count: %d, Error: %s",
subTaskIndex, tableInfo, currentCount, errorMessage);
// Does not include dirtyRecord content
// Solution 3: Add security warning in documentation
```
---
### Issue 5: DirtyRecordCollector's Serializable Semantics Cause State Loss
in Distributed Environments
**Location**:
-
`seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/DirtyRecordCollector.java:31`
-
`seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/LogDirtyRecordCollector.java:38-41`
**Related Context**:
- Spark Serialization: `SparkSinkInjector.inject()` serializes through
`SerializationUtils.objectToString()`
- Flink Serialization: FlinkKafkaSink, etc., may transmit over the network
**Problem Description**:
1. `DirtyRecordCollector` inherits `Serializable`
2. `LogDirtyRecordCollector` uses `AtomicLong dirtyRecordCount`
3. Java serialization will save the values of `AtomicLong`, but object state
may be inconsistent after deserialization
4. **In Flink's checkpoint/recovery scenarios**:
- Collector is serialized to checkpoint
- After task failure, recover from checkpoint
- The value of `dirtyRecordCount` will be restored from checkpoint
- But `NoOpDirtyRecordCollector.INSTANCE` is a singleton, and
deserialization will break singleton semantics
**Potential Risks**:
- **Risk 1**: Inaccurate counting after task recovery
- **Risk 2**: `NoOpDirtyRecordCollector.INSTANCE` is deserialized multiple
times, breaking the singleton
- **Risk 3**: The delegate and validator of `ValidatingDirtyRecordCollector`
may lose state
**Impact Scope**:
- **Direct Impact**: Stateful tasks under Spark/Flink engines
- **Indirect Impact**: Exactly-once semantics may be affected
- **Impact Area**: Distributed execution engines
**Severity**: MAJOR
**Improvement Suggestions**:
```java
// Solution 1: Not Serializable, create locally each time
// DirtyCollectorConfigProcessor.java
public static DirtyRecordCollector createLocalCollector(Config config) {
// Create locally on each Executor, do not transmit over network
}
// Solution 2: Implement CheckpointedData interface (Flink)
public interface CheckpointedDirtyRecordCollector extends
DirtyRecordCollector {
byte[] snapshotState();
void restoreState(byte[] state);
}
// Solution 3: Use a separate state backend
// Dirty data count is not placed in Collector, but in Metrics system
```
---
### Issue 6: Missing SPI Registration Files, AutoService Annotation May Not
Take Effect
**Location**:
-
`seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/LogDirtyRecordCollectorProvider.java:25`
-
`seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/AlwaysDirtyDataValidator.java:31`
**Related Context**:
- Using `@AutoService(Factory.class)` annotation
- Need to generate
`META-INF/services/org.apache.seatunnel.api.table.factory.Factory` file
**Problem Description**:
1. The code uses the `@AutoService(Factory.class)` annotation
2. However, the PR's changed file list does not include
`META-INF/services/...` files
3. `@AutoService` requires a compile-time processor to generate SPI files
4. **If the build configuration is incorrect, SPI files will not be
generated**
5. Result: `LogDirtyRecordCollector` and `AlwaysDirtyDataValidator` cannot
be discovered by SPI
**Potential Risks**:
- **Risk 1**: E2E tests for custom Collector/Validator SPI will fail
- **Risk 2**: User-extended SPI will also not work
- **Risk 3**: Can only be instantiated directly through class names, SPI
mechanism becomes meaningless
**Impact Scope**:
- **Direct Impact**: SPI extension mechanism
- **Indirect Impact**: E2E test `DirtyDataCustomExtensionIT` may fail
- **Impact Area**: Core functionality
**Severity**: CRITICAL
**Improvement Suggestions**:
```bash
# Check if SPI file exists
find seatunnel-api -name
"META-INF/services/org.apache.seatunnel.api.table.factory.Factory"
# If not exists, need to:
# 1. Confirm if pom.xml has com.google.auto.service:auto-service configured
# 2. Manually create SPI file:
mkdir -p seatunnel-api/src/main/resources/META-INF/services
cat >
seatunnel-api/src/main/resources/META-INF/services/org.apache.seatunnel.api.table.factory.Factory
<< EOF
org.apache.seatunnel.api.sink.LogDirtyRecordCollectorProvider
org.apache.seatunnel.api.sink.CountingDirtyRecordCollectorProvider
org.apache.seatunnel.api.sink.AlwaysDirtyDataValidator
EOF
```
---
### Issue 7: Non-SeaTunnelRow Type Data Cannot Collect Dirty Data in
AbstractSinkWriter
**Location**:
`seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/sink/AbstractSinkWriter.java:104-113`
**Related Context**:
- `AbstractSinkWriter` is a generic class `<T, StateT>`
- `tryCollectDirtyRecord()` only handles `SeaTunnelRow` types
**Problem Description**:
```java
protected boolean tryCollectDirtyRecord(T element, Exception e) {
// ...
if (element instanceof SeaTunnelRow) {
collector.collect(...);
return true;
}
return false; // Return false directly for other types
}
```
Although most Connectors use `SeaTunnelRow`, some Connectors may:
1. Use custom types (such as Avro `GenericRecord`, Protobuf `Message`)
2. Use primitive types (such as `String`, `byte[]`)
Dirty data from these Connectors cannot be collected, and exceptions will
still be thrown.
**Potential Risks**:
- **Risk 1**: Dirty data collection feature is incomplete
- **Risk 2**: Inconsistent behavior across different Connectors
- **Risk 3**: May encounter this issue when extending Connectors in the
future
**Impact Scope**:
- **Direct Impact**: Connectors using non-`SeaTunnelRow` types
- **Indirect Impact**: Functional consistency
- **Impact Area**: Some Connectors (need to confirm if such Connectors exist)
**Severity**: MINOR
**Improvement Suggestions**:
```java
// Solution 1: Support dirty data collection of any type
protected boolean tryCollectDirtyRecord(T element, Exception e) {
// ...
collector.collect(
context.getIndexOfSubtask(),
element, // Pass original types directly
e,
"Write failed: " + e.getMessage());
return true;
}
// DirtyRecordCollector interface also needs to support generics
public interface DirtyRecordCollector {
<T> void collect(int subTaskIndex, T dirtyRecord, Throwable exception,
...);
}
```
Or:
```java
// Solution 2: Only log exception information, not the data itself
if (!(element instanceof SeaTunnelRow)) {
log.warn("Cannot collect dirty record of type: {}", element.getClass());
// Still return true, swallow the exception
return true;
}
```
---
### Issue 8: Missing Documentation and Migration Guide
**Location**: Entire PR
**Problem Description**:
This PR introduces major functional changes, but lacks the following
documentation:
1. **User Documentation**: How to configure `env.dirty.collector` and
`env.dirty.validator`
2. **Connector Developer Documentation**: How to implement
`getPluginConfig()`
3. **SPI Extension Guide**: How to write custom Collectors and Validators
4. **Migration Guide**: How existing Connectors can migrate from `write()`
to `doWrite()`
5. **Incompatible Change Notes**: `incompatible-changes.md` not updated
6. **Performance Impact Description**: Performance overhead after enabling
dirty data collection
**Potential Risks**:
- **Risk 1**: Users don't know how to use the new feature
- **Risk 2**: Connector developers don't know how to adapt
- **Risk 3**: Behavior changes after upgrade, confusing users
**Impact Scope**:
- **Direct Impact**: User and developer experience
- **Indirect Impact**: Low feature adoption rate
- **Impact Area**: Entire community
**Severity**: MAJOR
**Improvement Suggestions**:
```markdown
# Documentation that needs to be supplemented
# # 1. User Documentation (docs/en/feature/dirty-data-collection.md)
- 功能介绍
- 配置示例(env 和 sink 级别)
- 内置 Collector 类型(log, noop)
- 内置 Validator 示例
- 阈值配置说明
- 安全注意事项(敏感信息脱敏)
# # 2. Connector Developer Guide (docs/en/contribution/connector-guide.md)
新增章节:
- 如何实现 getPluginConfig()
- 如何使用脏数据收集器
- AbstractSinkWriter 迁移指南
# # 3. SPI Extension Development
- 如何实现 DirtyRecordCollectorProvider
- 如何实现 DirtyDataValidator
- 注册和打包说明
- 示例项目
## 4. incompatible-changes.md
新增条目:
- SinkWriter.Context 新增 getDirtyRecordCollector() 方法
- SeaTunnelPluginLifeCycle 新增 getPluginConfig() 方法
- AbstractSinkWriter 行为变更(write() -> doWrite())
## 5. RELEASE NOTES
说明新功能、配置项、已知限制
```
---
### Issue 9: Missing Unit Tests for Concurrent Scenarios
**Location**: `seatunnel-api/src/test/java/`
**Problem Description**:
Current tests mainly cover:
- ✅ Configuration parsing
- ✅ SPI discovery
- ✅ Basic collection functionality
But missing:
- ❌ Multi-threaded concurrent counting tests (correctness of `AtomicLong`)
- ❌ Serialization/deserialization tests
- ❌ Checkpoint recovery tests
- ❌ Threshold trigger boundary condition tests
**Potential Risks**:
- **Risk 1**: Counting errors in concurrent scenarios not discovered
- **Risk 2**: State inconsistency in distributed environments
**Severity**: MINOR
**Improvement Suggestions**:
```java
// DirtyRecordCollectorConcurrencyTest.java
@Test
public void testConcurrentCollect() throws Exception {
DirtyRecordCollector collector = new LogDirtyRecordCollector();
collector.init(ConfigFactory.parseString("threshold = 10000"));
int threadCount = 10;
int recordsPerThread = 1000;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
executor.submit(() -> {
for (int j = 0; j < recordsPerThread; j++) {
collector.collect(0, testRow, new Exception("test"),
"error", null);
}
latch.countDown();
});
}
latch.await();
assertEquals(threadCount * recordsPerThread,
collector.getDirtyRecordCount());
}
@Test
public void testSerialization() throws Exception {
LogDirtyRecordCollector original = new LogDirtyRecordCollector();
original.init(config);
original.collect(0, testRow, new Exception("test"), "error", null);
byte[] serialized = SerializationUtils.serialize(original);
LogDirtyRecordCollector deserialized =
SerializationUtils.deserialize(serialized);
// Verify that state is correctly restored
assertEquals(original.getDirtyRecordCount(),
deserialized.getDirtyRecordCount());
}
```
---
### Issue 10: Inconsistent Behavior When ValidatingDirtyRecordCollector's
Validator is null
**Location**:
`seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/ValidatingDirtyRecordCollector.java:67-69`
**Problem Description**:
```java
public boolean validateAndCollectIfDirty(int subTaskIndex, SeaTunnelRow
record, CatalogTable catalogTable) {
if (validator == null) {
return false; // Return false when there is no validator
}
// ...
}
```
But the constructor does not validate the `validator` parameter:
```java
public ValidatingDirtyRecordCollector(DirtyRecordCollector delegate,
DirtyDataValidator validator) {
this.delegate = delegate;
this.validator = validator; // Allow null
}
```
**Potential Risks**:
- **Risk 1**: If `validator` is null, `validateAndCollectIfDirty()` always
returns false
- **Risk 2**: `validator.close()` in the `close()` method has NPE risk
- **Risk 3**: Inconsistent with design intent (creating
ValidatingDirtyRecordCollector is for validation)
**Severity**: MINOR
**Improvement Suggestions**:
```java
// Solution 1: Validate in constructor
public ValidatingDirtyRecordCollector(DirtyRecordCollector delegate,
DirtyDataValidator validator) {
Preconditions.checkNotNull(delegate, "delegate cannot be null");
Preconditions.checkNotNull(validator, "validator cannot be null");
this.delegate = delegate;
this.validator = validator;
}
// Solution 2: Use Optional
private final Optional<DirtyDataValidator> validator;
public boolean validateAndCollectIfDirty(...) {
return validator.map(v -> {
ValidationResult result = v.validate(record, catalogTable);
// ...
}).orElse(false);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]