yzeng1618 commented on PR #10404:
URL: https://github.com/apache/seatunnel/pull/10404#issuecomment-3808569340
> ### Issue 1: close() method does not clean up emittedFinishedSplitId
> **Location**: `IncrementalSourceSplitReader.java:112-118`
>
> ```java
> @Override
> public void close() throws Exception {
> if (currentFetcher != null) {
> log.info("Close current fetcher {}",
currentFetcher.getClass().getCanonicalName());
> currentFetcher.close();
> currentSplitId = null; // Cleared currentSplitId
> }
> // ⚠️ Missing: emittedFinishedSplitId = null;
> }
> ```
>
> **Related Context**:
>
> * Callers: `SourceReaderBase.close()` → `IncrementalSourceReader.close()`
→ `IncrementalSourceSplitReader.close()`
> * Related fields: `currentSplitId`, `emittedFinishedSplitId`
>
> **Problem Description**: The `close()` method clears `currentSplitId` but
does not clear `emittedFinishedSplitId`. Although this does not cause a bug in
the current usage scenario (since Reader will not be used after close), this
violates the principle of completeness in state cleanup. If Reader is reused
(although the current design does not support it), it could lead to state
pollution.
>
> **Potential Risks**:
>
> * Risk 1: If future code evolution supports Reader reuse, it may lead to
state errors
> * Risk 2: Code reviewers may consider this an omission, affecting code
credibility
> * Risk 3: In unit tests, if Reader instances are reused, it may cause test
failures
>
> **Impact Scope**:
>
> * Direct impact: close logic of `IncrementalSourceSplitReader`
> * Indirect impact: None (Reader is not reused in current design)
> * Impact surface: Single Connector (CDC Base)
>
> **Severity**: MINOR
>
> **Improvement Suggestion**:
>
> ```java
> @Override
> public void close() throws Exception {
> if (currentFetcher != null) {
> log.info("Close current fetcher {}",
currentFetcher.getClass().getCanonicalName());
> currentFetcher.close();
> currentSplitId = null;
> emittedFinishedSplitId = null; // Add this line
> }
> }
> ```
>
> **Rationale**:
>
> 1. Maintain completeness of state cleanup: all state fields should be
cleaned up in close()
> 2. Prevent bugs introduced by future evolution: if Reader reuse is
supported in the future, avoid state leakage
> 3. Clear code intent: clearly indicates that the lifecycle of these fields
is bound to Reader
> 4. Performance impact: None (just null assignment)
>
> ### Issue 2: Concurrency safety not explicitly documented
> **Location**: `IncrementalSourceSplitReader.java:46-200`
>
> ```java
> @Slf4j
> public class IncrementalSourceSplitReader<C extends SourceConfig>
> implements SplitReader<SourceRecords, SourceSplitBase> {
> // Field declaration (no volatile, no synchronized)
> private String currentSplitId;
> private String emittedFinishedSplitId;
> // ...
> }
> ```
>
> **Related Context**:
>
> * Caller: `SourceReaderBase` calls `IncrementalSourceSplitReader.fetch()`
through single-threaded `fetch()`
> * Caller: `close()` may be called in different threads with `fetch()`
(needs verification)
>
> **Problem Description**: The fields `currentSplitId` and
`emittedFinishedSplitId` of `IncrementalSourceSplitReader` have no concurrency
control (no `volatile`, no `synchronized`). Although the current design assumes
`fetch()` is called single-threaded, this is not explicitly reflected or
documented in the code. If there are concurrent call scenarios in the future
(such as asynchronous close), it may lead to memory visibility issues.
>
> **Potential Risks**:
>
> * Risk 1: If `close()` and `fetch()` are called concurrently,
modifications to `currentSplitId = null` may not be visible to `fetch()`
> * Risk 2: Assignments to `emittedFinishedSplitId` may appear inconsistent
due to instruction reordering
> * Risk 3: Future maintainers may assume thread safety and introduce
concurrent calls
>
> **Impact Scope**:
>
> * Direct impact: All methods of `IncrementalSourceSplitReader`
> * Indirect impact: All tasks using CDC Connector
> * Impact surface: Multiple Connectors (all CDC Connectors)
>
> **Severity**: MAJOR
>
> **Improvement Suggestion**: **Option 1** (Recommended): Add class-level
JavaDoc to document thread safety
>
> ```java
> /**
> * Split reader for incremental source (snapshot + incremental phase).
> *
> * <p><b>Thread safety:</b> This class is NOT thread-safe and should only
be called
> * from a single thread. The {@link #fetch()} method is expected to be
called
> * sequentially without concurrent access. The {@link #close()} method
should also
> * be called from the same thread or after all fetch calls have completed.
> *
> * @param <C> The type of source configuration.
> */
> @Slf4j
> public class IncrementalSourceSplitReader<C extends SourceConfig>
> implements SplitReader<SourceRecords, SourceSplitBase> {
> // ...
> }
> ```
>
> **Option 2** (Safer): Use `volatile` to ensure memory visibility
>
> ```java
> private volatile String currentSplitId;
> private volatile String emittedFinishedSplitId;
> ```
>
> **Rationale**:
>
> 1. Prioritize Option 1: Since the current architecture does not involve
concurrency, adding documentation has less overhead than adding volatile
> 2. Explicit assumptions: Let future maintainers understand the premise of
thread safety
> 3. Prevent misuse: If someone attempts concurrent calls, they will see the
warning
> 4. Option 2 as alternative: If verification finds that `close()` may
indeed be concurrent with `fetch()`, volatile should be used
>
> ### Issue 3: Exception message lacks context information
> **Location**: `IncrementalSourceSplitReader.java:172-174`
>
> ```java
> if (splitId == null) {
> throw new IOException("currentSplitId is null when finishing snapshot
split");
> }
> ```
>
> **Related Context**:
>
> * Call chain: `fetch()` → `finishedSnapshotSplit()` → `throw IOException`
> * Related state: `emittedFinishedSplitId`, `currentFetcher`
>
> **Problem Description**: When `currentSplitId` is detected as null, the
thrown exception message lacks key context information, which is not conducive
to problem diagnosis. When this exception occurs in a production environment,
operators cannot understand from the logs:
>
> * Which splits have been processed?
> * What is the value of `emittedFinishedSplitId`?
> * What is the state of `currentFetcher`?
> * What number call to `fetch()` is this?
>
> **Potential Risks**:
>
> * Risk 1: Difficult to troubleshoot production environment issues,
requiring additional logs to locate the root cause
> * Risk 2: Cannot distinguish between initialization issues and state
management issues
> * Risk 3: May need to restart the task and enable DEBUG logging to
reproduce
>
> **Impact Scope**:
>
> * Direct impact: Readability of exception information
> * Indirect impact: Maintainability of production environment
> * Impact surface: Single Connector (CDC Base)
>
> **Severity**: MINOR
>
> **Improvement Suggestion**:
>
> ```java
> if (splitId == null) {
> log.warn(
> "Invalid state: currentSplitId is null when finishing snapshot
split. "
> + "emittedFinishedSplitId={}, currentFetcher={},
isFinished={}",
> emittedFinishedSplitId,
> currentFetcher != null ?
currentFetcher.getClass().getSimpleName() : "null",
> currentFetcher != null && currentFetcher.isFinished());
> throw new IOException(
> String.format(
> "currentSplitId is null when finishing snapshot split.
"
> + "emittedFinishedSplitId=%s,
currentFetcher=%s, isFinished=%s",
> emittedFinishedSplitId,
> currentFetcher != null ?
currentFetcher.getClass().getSimpleName() : "null",
> currentFetcher != null &&
currentFetcher.isFinished()));
> }
> ```
>
> **Rationale**:
>
> 1. Better debuggability: Include all relevant state information
> 2. Logging: Log a WARN message before throwing the exception for easier
post-mortem analysis
> 3. Formatted output: Use `String.format()` instead of string concatenation
for better readability
> 4. Performance impact: Negligible (only executed on exception path)
>
> ### Issue 4: Missing unit tests for concurrent scenarios
> **Location**: `IncrementalSourceSplitReaderTest.java:36-127`
>
> **Related Context**:
>
> * Test file: `IncrementalSourceSplitReaderTest.java`
> * Class under test: `IncrementalSourceSplitReader`
>
> **Problem Description**: The newly added unit tests only cover
single-threaded scenarios and do not test concurrent calls between `fetch()`
and `close()`. Although Issue 2 mentions that the current design assumes
single-threaded calls, without test verification, this assumption may be broken
in future code evolution.
>
> **Potential Risks**:
>
> * Risk 1: If someone calls `close()` asynchronously in the future, it may
introduce race conditions
> * Risk 2: Refactoring may break the single-threaded assumption and
introduce concurrency bugs
> * Risk 3: Cannot discover potential concurrency issues through CI/CD
>
> **Impact Scope**:
>
> * Direct impact: Test coverage
> * Indirect impact: Long-term stability of code
> * Impact surface: Single Connector (CDC Base)
>
> **Severity**: MINOR
>
> **Improvement Suggestion**:
>
> ```java
> @Test
> void testFetchConcurrentWithCloseShouldNotThrowNPE() throws Exception {
> DataSourceDialect<SourceConfig> dialect =
Mockito.mock(DataSourceDialect.class);
> SourceConfig config = Mockito.mock(SourceConfig.class);
> SchemaChangeResolver resolver =
Mockito.mock(SchemaChangeResolver.class);
>
> IncrementalSourceSplitReader<SourceConfig> reader =
> new IncrementalSourceSplitReader<SourceConfig>(0, dialect,
config, resolver) {
> @Override
> protected void checkSplitOrStartNext() {}
> };
>
> @SuppressWarnings("unchecked")
> Fetcher<SourceRecords, SourceSplitBase> fetcher =
Mockito.mock(Fetcher.class);
> Mockito.when(fetcher.pollSplitRecords()).thenReturn(null);
> Mockito.when(fetcher.isFinished()).thenReturn(true);
>
> setField(reader, "currentFetcher", fetcher);
> setField(reader, "currentSplitId", "split-1");
>
> // Simulate concurrent close and fetch
> ExecutorService executor = Executors.newFixedThreadPool(2);
> Future<?> closeFuture = executor.submit(() -> {
> try {
> reader.close();
> } catch (Exception e) {
> // Expected
> }
> });
>
> Future<RecordsWithSplitIds<SourceRecords>> fetchFuture =
> executor.submit(() -> reader.fetch());
>
> // Should not throw NPE (may throw other exceptions due to closed
state)
> Assertions.assertDoesNotThrow(() -> {
> try {
> fetchFuture.get(1, TimeUnit.SECONDS);
> } catch (TimeoutException e) {
> // Timeout is acceptable
> }
> });
>
> executor.shutdownNow();
> }
> ```
>
> **Rationale**:
>
> 1. Verify concurrency safety: Even if the current design is not
concurrent, tests can verify the assumption
> 2. Prevent regression: If someone introduces concurrent calls in the
future, tests will fail
> 3. Document assumptions: The test itself documents the single-threaded
assumption
> 4. Low cost: This is a simple concurrency test that will not significantly
increase test time
>
> **Note**: This test may need adjustment depending on the actual concurrent
call patterns of `close()` and `fetch()`. It is recommended to verify the
actual usage scenarios first.
>
> ### Issue 5: Exception handling of ChangeEventRecords.forFinishedSplit()
not documented in JavaDoc
> **Location**: `ChangeEventRecords.java:77-82`
>
> ```java
> public static ChangeEventRecords forFinishedSplit(final String splitId) {
> if (splitId == null) {
> throw new IllegalArgumentException("splitId must not be null");
> }
> return new ChangeEventRecords(null, null,
Collections.singleton(splitId));
> }
> ```
>
> **Related Context**:
>
> * Caller: `IncrementalSourceSplitReader.finishedSnapshotSplit()`
> * Interface contract: This is a static factory method with no previous
null check
>
> **Problem Description**: The PR adds null checking and exception throwing
for `forFinishedSplit()`, which is a breaking change (although from a bug fix
perspective). However, this behavior change is not documented in the JavaDoc,
violating the API design principle of "contractual spirit".
>
> **Potential Risks**:
>
> * Risk 1: If there are external callers (although unlikely) passing null,
they will receive unexpected exceptions
> * Risk 2: Code reviewers and maintainers cannot understand this behavior
change from the JavaDoc
> * Risk 3: Violates the "Principle of Least Surprise"
>
> **Impact Scope**:
>
> * Direct impact: API contract of `ChangeEventRecords`
> * Indirect impact: All code calling `forFinishedSplit()`
> * Impact surface: Multiple Connectors (all CDC Connectors)
>
> **Severity**: MINOR
>
> **Improvement Suggestion**:
>
> ```java
> /**
> * Creates a {@link ChangeEventRecords} that only indicates a split is
finished.
> *
> * @param splitId the ID of the finished split, must not be null
> * @return a new {@link ChangeEventRecords} instance
> * @throws IllegalArgumentException if splitId is null
> */
> public static ChangeEventRecords forFinishedSplit(final String splitId) {
> if (splitId == null) {
> throw new IllegalArgumentException("splitId must not be null");
> }
> return new ChangeEventRecords(null, null,
Collections.singleton(splitId));
> }
> ```
>
> **Rationale**:
>
> 1. Clear API contract: JavaDoc clearly specifies parameter constraints and
exception behavior
> 2. Follows Java best practices: `@throws` documents checked exceptions
> 3. Facilitates static checking: IDEs and tools can prompt callers based on
JavaDoc
> 4. Backward compatibility documentation: Even if this is a bug fix,
behavior changes should be documented
Revisions have been made in accordance with the suggestions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]