DanielCarter-stack commented on PR #10275:
URL: https://github.com/apache/seatunnel/pull/10275#issuecomment-3796510047
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10275", "part": 1,
"total": 1} -->
### Issue 1: Thread Safety Issue with currentUnassignedSplitSize()
**Location**: `FakeSourceSplitEnumerator.java:82-84`
```java
@Override
public int currentUnassignedSplitSize() {
return pendingSplits.size(); // HashMap.size() is not an atomic
operation
}
```
**Related Context**:
- Caller: Possibly SeaTunnel Engine's monitoring or scheduling code
- Concurrent scenario: Other threads may be modifying `pendingSplits`
**Problem Description**:
`pendingSplits` is a regular `HashMap` (line 40: `private final Map<Integer,
Set<FakeSourceSplit>> pendingSplits;`), and its `size()` method is not an
atomic operation. If another thread is modifying the map while reading the
size, it could lead to:
1. `ConcurrentModificationException` (if there's an iteration at this time)
2. Inaccurate size being returned (if elements are being added/removed)
Although both `addSplitChangeToPendingAssignments()` and
`assignPendingSplits()` use `synchronized (lock)` for protection,
`currentUnassignedSplitSize()` does not.
**Potential Risks**:
- Risk 1: Monitoring data displays incorrect number of pending splits
- Risk 2: If Engine relies on this method for scheduling decisions, it could
lead to incorrect scheduling
**Impact Scope**:
- Direct impact: `FakeSourceSplitEnumerator.currentUnassignedSplitSize()`
- Indirect impact: Any monitoring or scheduling logic that calls this method
- Affected area: Single Connector (connector-fake)
**Severity**: MINOR
**Suggested Improvement**:
```java
@Override
public int currentUnassignedSplitSize() {
synchronized (lock) {
return pendingSplits.size();
}
}
```
**Rationale**:
1. Maintain consistent concurrent protection with other methods that access
`pendingSplits`
2. Ensure accurate size is returned
3. Performance impact is acceptable (this method should not be called
frequently)
---
### Issue 2: Insufficient Memory Visibility Guarantee for splitsDiscovered
Flag
**Location**: `FakeSourceSplitEnumerator.java:47`,
`FakeSourceSplitEnumerator.java:65`, `FakeSourceSplitEnumerator.java:95`
```java
private volatile boolean splitsDiscovered = false; // line 47
public void run() throws Exception {
discoverySplits();
splitsDiscovered = true; // line 65 - write operation
assignPendingSplits();
}
public void registerReader(int subtaskId) {
if (splitsDiscovered) { // line 95 - read operation
assignPendingSplits(subtaskId);
}
}
```
**Related Context**:
- `SourceSplitEnumerator` interface documentation (line 82-86) explicitly
states that `snapshotState()` may execute concurrently with `run()`
- Modifications to `assignedSplits` need to be protected through
`synchronized (lock)`
**Problem Description**:
Although `splitsDiscovered` is declared as `volatile`, guaranteeing its own
visibility, consider the following scenario:
1. **Thread A (run())**:
```java
discoverySplits(); // Modified assignedSplits (partially outside
synchronized)
assignedSplits.forEach(allSplit::remove); // Inside synchronized
splitsDiscovered = true; // volatile write
```
2. **Thread B(registerReader())**:
```java
if (splitsDiscovered) { // volatile read
assignPendingSplits(subtaskId); // Will access assignedSplits
}
```
问题在于:`splitsDiscovered = true` 的 happens-before 关系不能保证 `assignedSplits`
的所有修改对 Thread B 可见,因为:
- `discoverySplits()` 中 `assignedSplits.forEach(allSplit::remove)` 在
`synchronized` 块内
- 但 `allSplit` 的修改(`assignedSplits.forEach`)涉及 `assignedSplits` 的迭代
查看 `FakeSourceSplitEnumerator.java:135-137`:
```java
synchronized (lock) {
assignedSplits.forEach(allSplit::remove); // allSplit is HashSet
}
```
`allSplit` 是局部变量,不存在并发问题。但 `assignedSplits` 是实例字段。
** Potential Risk**:
- 风险 1:Thread B 可能读到 `splitsDiscovered = true`,但看到过时的 `assignedSplits` 内容
- 风险 2:极端情况下,可能导致 `assignPendingSplits()` 重复分配已分配的 splits
** Impact Scope**:
- 直接影响:`registerReader()`, `handleSplitRequest()`, `addSplitsBack()` 的逻辑
- 间接影响:可能导致 splits 重复分配
- 影响面:单个 Connector
** Severity**:** MINOR (Low actual occurrence probability, because
`registerReader()` and other methods' `assignPendingSplits()` are protected by
`synchronized`)
** Improvement Suggestion**:
当前实现在 `assignPendingSplits(int)` 内部有 `synchronized
(lock)`,已经建立了足够的内存屏障。但为了更清晰和保险,可以:
```java
public void run() throws Exception {
discoverySplits();
synchronized (lock) {
splitsDiscovered = true;
}
assignPendingSplits();
}
```
或者更简单,保持当前实现(因为后续的 `assignPendingSplits()` 会进入 synchronized 块)。
** Rationale**:
1. 虽然 `volatile` 提供了一定的可见性保证,但将状态变更与 `synchronized` 关联更清晰
2. 符合 "establishes happens-before relationship" 的最佳实践
3. 性能影响可忽略
---
# ## Issue 3: Conditional assignment in handleSplitRequest() and
registerReader() may cause Reader starvation
** Location**:** `FakeSourceSplitEnumerator.java:87-91`,
`FakeSourceSplitEnumerator.java:94-98`
```java
@Override
public void handleSplitRequest(int subtaskId) {
if (splitsDiscovered) {
assignPendingSplits(subtaskId);
}
}
@Override
public void registerReader(int subtaskId) {
if (splitsDiscovered) {
assignPendingSplits(subtaskId);
}
}
```
** Related Context**:
- `SourceSplitEnumerator` 接口文档(line 44-50)保证调用顺序:`open()` →
`addSplitsBack()` → `registerReader()` → `run()`
- `handleSplitRequest()` 由 Reader 主动调用,可能发生在任何时候
** Issue Description**:
考虑以下时序:
1. `open()` 被调用
2. `addSplitsBack()` 被调用(如果有恢复的 splits)
3. `registerReader(0)` 被调用 → `splitsDiscovered = false` → **什么都不做**
4. `registerReader(1)` 被调用 → `splitsDiscovered = false` → **什么都不做**
5. `run()` 被调用:
```java
discoverySplits(); // Generate all splits
splitsDiscovered = true;
assignPendingSplits(); // Assign to all registered Readers (0, 1)
```
这个流程是正确的。
但是考虑场景:如果有 Reader 在 `run()` **之前**调用 `handleSplitRequest()`(虽然接口文档说 `run()`
在 `registerReader()` 之后,但 `handleSplitRequest()` 的时序没有明确定义):
```java
@Override
public void handleSplitRequest(int subtaskId) {
if (splitsDiscovered) { // false
assignPendingSplits(subtaskId);
}
// Do nothing, Reader continues waiting
}
```
然后 `run()` 执行:
```java
assignPendingSplits(); // Will assign to all registeredReaders
```
如果 Reader 已经在 `registeredReaders()` 中,会收到分配。所以这个问题实际不存在。
** However**, checking the interface documentation, `handleSplitRequest()`
is "called by reader when it needs more splits", which should be after `run()`.
** Actual Issue**: Current implementation has no problem, but logic depends
on call order.
** Potential Risk**:
- 风险:如果 Engine 的实现与接口文档不一致,可能导致 Reader 没有收到 splits
** Impact Scope**:
- 直接影响:`handleSplitRequest()`, `registerReader()`
- 间接影响:Source 协调逻辑
- 影响面:单个 Connector
** Severity**:** MINOR (Depends on Engine correctly implementing interface
contract)
** Improvement Suggestion**:
添加日志和断言帮助调试:
```java
@Override
public void handleSplitRequest(int subtaskId) {
log.debug("Received split request from reader {}, splitsDiscovered={}",
subtaskId, splitsDiscovered);
if (splitsDiscovered) {
assignPendingSplits(subtaskId);
} else {
log.warn("Received split request from reader {} before splits are
discovered. " +
"This is unexpected according to API contract.", subtaskId);
}
}
```
** Rationale**:
1. 帮助诊断潜在的 Engine bug
2. 提供更清晰的日志追踪
3. 不改变现有行为
---
# ## Issue 4: CI configuration change (backend.yml) is unrelated to PR
content
** Location**:** `.github/workflows/backend.yml:442`
```yaml
- timeout-minutes: 150
+ timeout-minutes: 180
```
** Related Context**:
- PR 标题和描述都是关于 connector-fake 的 bug 修复
- CI timeout 修改应该是独立的 PR
** Issue Description**:
将 backend.yml 的 timeout 从 150 分钟增加到 180 分钟与这个 PR 的修复内容无关。这个变更应该:
1. 在独立的 PR 中提交
2. 或者在 PR 描述中明确说明为什么需要这个变更
** Potential Risk**:
- 风险:不相关的变更会被一起 merge,增加 code review 难度
- 风险:如果需要 revert bug 修复,CI timeout 变更也会被 revert
** Impact Scope**:
- 直接影响:GitHub Actions 的 backend job
- 间接影响:所有需要 backend CI 的 PR
- 影响面:整个项目
** Severity**:** MINOR (Process issue, does not affect code quality)
** Improvement Suggestion**:
1. 将 CI 配置变更放在独立的 PR 中
2. 或者在 PR 描述中添加说明:
```
Also increased CI timeout for backend job to address flaky tests.
This is necessary for the new tests to pass reliably.
```
** Rationale**:
1. 保持 PR 的单一职责
2. 便于 code review 和 revert
3. 符合开源项目最佳实践
---
# ## Issue 5: Incomplete test coverage
** Location**:** `FakeSourceSplitEnumeratorTest.java`
** Issue Description**:
新增的测试用例覆盖了核心场景,但缺少以下测试:
1. **addSplitsBack() 的测试**:
- 当前测试没有覆盖 `addSplitsBack()` 的行为
- 这个方法在恢复场景下很重要
2. **handleSplitRequest() 的测试**:
- 修改后的 `handleSplitRequest()` 有实际逻辑,需要测试
- 应该测试在 `splitsDiscovered = true/false` 两种情况下的行为
3. **并发场景的测试**:
- 没有测试 `run()` 和 `snapshotState()` 并发执行
- 没有测试 `registerReader()` 和 `assignPendingSplits()` 的并发
4. **边界条件测试**:
- 0 个 splits 的情况
- 0 个 readers 的情况
- splits 数量小于 readers 数量的情况
5. **currentUnassignedSplitSize() 的测试**:
- 没有验证这个方法返回正确的大小
** Potential Risk**:
- 风险:某些边界情况下的 bug 未被发现
- 风险:并发问题未被发现
** Impact Scope**:
- 直接影响:测试覆盖率和代码质量保证
- 间接影响:潜在的 bug 没有被测试捕获
- 影响面:单个 Connector
** Severity**:** MINOR (Existing tests already cover core scenarios)
** Improvement Suggestion**:
```java
@Test
void addSplitsBackAfterDiscovery() throws Exception {
MultipleTableFakeSourceConfig sourceConfig =
loadSingleTableFakeSourceConfig();
TestingEnumeratorContext context = new TestingEnumeratorContext(2, new
HashSet<>(Arrays.asList(0, 1)));
FakeSourceSplitEnumerator enumerator =
new FakeSourceSplitEnumerator(context, sourceConfig,
Collections.emptySet());
enumerator.run();
// Simulate Reader failure, splits returned
List<FakeSourceSplit> splitsToReturn = new
ArrayList<>(context.getAllAssignedSplits().subList(0, 2));
enumerator.addSplitsBack(splitsToReturn, 0);
// Verify splits are reassigned
Assertions.assertFalse(context.getAllAssignedSplits().isEmpty());
}
@Test
void handleSplitRequestBeforeAndAfterDiscovery() throws Exception {
// Test cases where splitsDiscovered = true/false
}
@Test
void concurrentSnapshotStateAndRun() throws Exception {
// Test concurrent scenarios
}
```
** Rationale**:
1. 提高测试覆盖率
2. 验证并发场景的正确性
3. 防止未来引入 regression
---
# ## Issue 6: Missing JavaDoc and comments
** Location**:** `FakeSourceSplitEnumerator.java:47`,
`FakeSourceSplitEnumerator.java:95`, `FakeSourceSplitEnumerator.java:172`
** Issue Description**:
新增的 `splitsDiscovered` 字段和修改的方法没有 JavaDoc 或注释说明。
** Potential Risk**:
- 风险:其他开发者不理解 `splitsDiscovered` 的作用和生命周期
- 风险:未来维护时可能引入 bug
** Impact Scope**:
- 直接影响:代码可读性和可维护性
- 间接影响:未来的 code review 和维护
- 影响面:单个 Connector
** Severity**:** MINOR
** Improvement Suggestion**:
```java
/**
* Flag indicating whether split discovery has completed.
* Set to true after the first invocation of {@link #run()}.
* Used to ensure readers receive signalNoMoreSplits even when
* no splits are assigned to them (e.g., after restore/restart).
*/
private volatile boolean splitsDiscovered = false;
```
对于 `assignPendingSplits(int)`:
```java
/**
* Assigns any pending splits for a specific reader and signals no more
splits
* if discovery is complete.
*
* <p>This method ensures that readers receive a signalNoMoreSplits callback
* even when no splits are assigned to them, which is critical for
restore/restart
* scenarios where all splits may already be assigned.
*
* @param pendingReader the reader subtask id
*/
private void assignPendingSplits(int pendingReader) {
// ...
}
```
** Rationale**:
1. 解释设计意图
2. 帮助未来维护者理解代码
3. 符合 Apache 项目的文档标准
---
# ## Issue 7: Return value of currentUnassignedSplitSize() may be inaccurate
** Location**:** `FakeSourceSplitEnumerator.java:82-84`
```java
@Override
public int currentUnassignedSplitSize() {
return pendingSplits.size(); // Returns size of Map<Integer,
Set<FakeSourceSplit>>
}
```
** Related Context**:
- 查看方法签名,应该返回"未分配的 splits 数量"
- `pendingSplits` 的结构是 `Map<Integer, Set<FakeSourceSplit>>`,key 是 reader
ID,value 是该 reader 待分配的 splits
** Issue Description**:
当前实现返回 `pendingSplits.size()`,这是**有待分配 splits 的 readers 数量**,而不是**未分配的
splits 总数**。
例如:
- Reader 0 有 3 个待分配 splits
- Reader 1 有 5 个待分配 splits
- `pendingSplits.size()` = 2(有两个 reader 有待分配 splits)
- 实际未分配 splits 数 = 3 + 5 = 8
** Potential Risk**:
- 风险:监控数据不准确
- 风险:如果 Engine 依赖此方法做调度决策,可能导致错误
** Impact Scope**:
- 直接影响:`FakeSourceSplitEnumerator.currentUnassignedSplitSize()`
- 间接影响:任何调用此方法的监控或调度逻辑
- 影响面:单个 Connector
** Severity**:** MAJOR (Semantic error)
** Improvement Suggestion**:
```java
@Override
public int currentUnassignedSplitSize() {
synchronized (lock) {
return pendingSplits.values().stream()
.mapToInt(Set::size)
.sum();
}
}
```
**理由**:
1. 方法名 `currentUnassignedSplitSize` 暗示返回 splits 数量,而不是 readers 数量
2. 这可能是原有 bug,不是 PR 引入的,但应该在修复 PR 中一起修正
3. 需要添加 `synchronized` 保证线程安全(见问题 1)
**验证**:
查看其他 Connector 的实现:
- `KafkaSourceSplitEnumerator` 没有实现此方法(可能使用默认实现)
- `JdbcSourceSplitEnumerator` need to be checked
The fix for this issue should be made as an independent bug fix or together
in this PR.
---
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]