DanielCarter-stack commented on PR #10448:
URL: https://github.com/apache/seatunnel/pull/10448#issuecomment-3847464836

   <!-- code-pr-reviewer -->
   <!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10448", "part": 1, 
"total": 1} -->
   ### Issue 1: InterruptedException Handling Does Not Follow Best Practices
   
   **Location**: `CheckpointCoordinator.java:685-690`
   
   ```java
   } catch (InterruptedException e) {
       handleCoordinatorError(
               "triggering checkpoint barrier has been interrupted",
               e,
               CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
       throw new RuntimeException(e);  // ⚠️ Problem here
   }
   ```
   
   **Problem Description**:
   Java concurrent programming best practice is: after catching 
`InterruptedException`, you should **restore the interrupt status** 
(`Thread.currentThread().interrupt()`) instead of directly throwing 
`RuntimeException`. The current approach leads to:
   1. Callers cannot distinguish between "true interruption" and "business 
exception"
   2. Interrupt status is lost, upper-level code cannot properly respond to 
interruption
   3. Violates Java concurrent programming best practices
   
   **Related Context**:
   - Other places in the same class don't have `InterruptedException` handling 
for comparison
   - Hazelcast's `InvocationFuture.get()` will throw `InterruptedException`
   - Upper-level caller: `startTriggerPendingCheckpoint()` executes 
asynchronously in executorService
   
   **Potential Risks**:
   - Risk 1: If the thread pool is shut down, lost interrupt status will cause 
graceful shutdown to fail
   - Risk 2: May lead to "phantom interruption" issues (exception is thrown but 
interrupt status is not restored)
   
   **Scope of Impact**:
   - Direct impact: `startTriggerPendingCheckpoint()` method
   - Indirect impact: Thread pool management of the entire checkpoint 
coordinator
   - Impact area: Core framework (all jobs using checkpoint)
   
   **Severity**: MAJOR
   
   **Improvement Suggestions**:
   ```java
   } catch (InterruptedException e) {
       handleCoordinatorError(
               "triggering checkpoint barrier has been interrupted",
               e,
               CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
       Thread.currentThread().interrupt();  // Restore interrupted state
       throw new RuntimeException(e);
   }
   ```
   
   **Rationale**:
   1. Follows Java concurrent programming best practices (see "Java Concurrency 
in Practice")
   2. Ensures interrupt status is not lost
   3. Allows upper-level code to properly handle interruption
   4. Consistent with `TaskCallTimer.java:123-125` handling pattern
   
   ---
   
   ### Issue 2: InterruptedException Branch May Cause Duplicate Error Handling
   
   **Location**: `CheckpointCoordinator.java:685-690`
   
   ```java
   } catch (InterruptedException e) {
       handleCoordinatorError(...);  // First processing
       throw new RuntimeException(e);  // Throw exception
   }
   ```
   
   **Problem Description**:
   In the `InterruptedException` branch, `handleCoordinatorError()` is called 
first (which sets status to FAILED and cleans up resources), then 
`RuntimeException` is thrown. This leads to:
   1. Exception will be caught by `whenCompleteAsync()` of 
`startTriggerPendingCheckpoint()` (line 645)
   2. May trigger a second `handleCoordinatorError()` (line 648)
   3. Although there is a `isDone()` check to prevent duplicate processing, the 
code logic is unclear
   
   **Related Context**:
   ```java
   // Lines 645-651
   completableFuture.whenCompleteAsync(
       (completedCheckpoint, error) -> {
           if (error != null) {
               handleCoordinatorError(  // Possibly second call
                       "trigger checkpoint failed",
                       error,
                       CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
   ```
   
   **Potential Risks**:
   - Risk 1: Code logic is confusing, difficult for maintainers to understand
   - Risk 2: If `isDone()` check fails, it will cause duplicate resource cleanup
   - Risk 3: Error messages may be inconsistent ("interrupted" vs "trigger 
checkpoint failed")
   
   **Scope of Impact**:
   - Direct impact: Exception handling chain of 
`startTriggerPendingCheckpoint()` method
   - Indirect impact: Correctness of Checkpoint state machine
   - Impact area: Core framework
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   ```java
   } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       // Do not call handleCoordinatorError in catch block
       // Let exception propagate to whenCompleteAsync for unified handling
       throw e;
   } catch (Exception e) {
       handleCoordinatorError(
               "triggering checkpoint barrier failed",
               e,
               CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
       return;
   }
   ```
   
   Or:
   ```java
   } catch (InterruptedException e) {
       handleCoordinatorError(
               "triggering checkpoint barrier has been interrupted",
               e,
               CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
       Thread.currentThread().interrupt();
       return;  // Do not throw exception
   } catch (Exception e) {
       handleCoordinatorError(
               "triggering checkpoint barrier failed",
               e,
               CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
       return;
   }
   ```
   
   **Rationale**:
   1. Avoid duplicate error handling
   2. Code logic is clearer
   3. Ensure error message consistency
   4. Reduce dependency on `isDone()` check
   
   ---
   
   ### Issue 3: Direct Failure for Transient Network Faults May Be Too 
Aggressive
   
   **Location**: `CheckpointCoordinator.java:691-696`
   
   ```java
   } catch (Exception e) {
       handleCoordinatorError(
               "triggering checkpoint barrier failed",
               e,
               CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
       return;  // Fail immediately, no retry
   }
   ```
   
   **Problem Description**:
   The current implementation causes the job to fail immediately upon any 
checkpoint barrier trigger failure. However, in distributed systems, network 
jitter or transient failures are common:
   1. May be a temporary network partition that recovers in a few seconds
   2. May be a brief timeout caused by GC
   3. May be a Hazelcast transient failure
   
   Direct failure leads to:
   - Users need to manually restart the job
   - May lose data being processed
   - Reduces system availability
   
   **Related Context**:
   - Checkpoint configuration has `checkpoint.timeout` (default 60 seconds)
   - Checkpoint configuration has no retry-related configuration
   - Other checkpoint errors (such as timeout) also have no retry mechanism
   
   **Potential Risks**:
   - Risk 1: Reduces availability in production environments
   - Risk 2: Users may mistakenly think this is system instability
   - Risk 3: Inconsistent with `checkpoint.timeout` semantics (timeout is 
waiting, here it's immediate failure)
   
   **Scope of Impact**:
   - Direct impact: All streaming jobs with checkpoint enabled
   - Indirect impact: Operations complexity in production environments
   - Impact area: All users using Zeta engine
   
   **Severity**: MINOR (but may be MAJOR if production environment network is 
unstable)
   
   **Improvement Suggestions**:
   This is a design trade-off issue with several possible approaches:
   
   **Option A**: Add retry mechanism (recommended for long-term improvement)
   ```java
   // Add in CheckpointConfig
   private int checkpointBarrierTriggerRetryTimes = 3;  // Retry 3 times by 
default
   
   // Add retry logic in triggerCheckpoint
   public InvocationFuture<?>[] triggerCheckpoint(CheckpointBarrier 
checkpointBarrier) {
       int retry = 0;
       while (retry < 
coordinatorConfig.getCheckpointBarrierTriggerRetryTimes()) {
           try {
               return plan.getStartingSubtasks().stream()
                   .map(taskLocation -> new 
CheckpointBarrierTriggerOperation(checkpointBarrier, taskLocation))
                   .map(checkpointManager::sendOperationToMemberNode)
                   .toArray(InvocationFuture[]::new);
           } catch (Exception e) {
               retry++;
               if (retry >= 
coordinatorConfig.getCheckpointBarrierTriggerRetryTimes()) {
                   throw e;
               }
               LOG.warn("Retry triggering checkpoint barrier, attempt {}", 
retry);
               Thread.sleep(1000 * retry);  // Exponential backoff
           }
       }
   }
   ```
   
   **Option B**: Configure whether to fail immediately
   ```java
   // Add in CheckpointConfig
   private boolean failImmediatelyOnBarrierTriggerError = false;  // Do not 
fail immediately by default
   
   // Modify catch block
   } catch (Exception e) {
       if (coordinatorConfig.isFailImmediatelyOnBarrierTriggerError()) {
           handleCoordinatorError(...);
           return;
       } else {
           LOG.error("triggering checkpoint barrier failed, but job will 
continue", e);
           return;  // Only return, do not call handleCoordinatorError
       }
   }
   ```
   
   **Option C**: Keep current implementation (most conservative)
   ```java
   // No modifications, but document in notes:
   // "If checkpoint barrier trigger fails, the job will fail immediately.
   //  This is to ensure data consistency. Consider increasing checkpoint 
timeout
   //  if you encounter transient network issues."
   ```
   
   **Rationale**:
   1. Improve system availability
   2. Consistent with best practices of other distributed systems (e.g., 
Flink's checkpoint retry mechanism)
   3. Give users more control
   4. Balance data consistency and availability
   
   **Note**: This is a larger improvement, recommended for a follow-up PR. The 
current PR can be merged first to fix the "silent failure" bug.
   
   ---
   
   ### Issue 4: Error Messages Not Specific Enough, Hard to Diagnose Problems
   
   **Location**: `CheckpointCoordinator.java:686-687, 693-694`
   
   ```java
   handleCoordinatorError(
           "triggering checkpoint barrier has been interrupted",  // ⚠️ Missing 
key information
           e,
           CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
   ```
   
   **Problem Description**:
   Current error messages lack key diagnostic information, such as:
   1. What is the Checkpoint ID?
   2. Which Task node failed?
   3. How many Tasks succeeded/failed?
   4. What is the specific reason for failure?
   
   This requires users and operations personnel to view full stack traces and 
logs when troubleshooting, reducing diagnosability.
   
   **Related Context**:
   - `pendingCheckpoint.getInfo()` can get detailed checkpoint information
   - `plan.getStartingSubtasks()` can get all Task information
   - `InvocationFuture` can get execution result of each Task
   
   **Potential Risks**:
   - Risk 1: Increase troubleshooting time
   - Risk 2: Users may not be able to quickly locate the problem
   - Risk 3: Monitoring alerts cannot provide useful information
   
   **Scope of Impact**:
   - Direct impact: Quality of error logs
   - Indirect impact: Troubleshooting efficiency
   - Impact area: All users using checkpoint
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   ```java
   } catch (InterruptedException e) {
       handleCoordinatorError(
               String.format(
                       "Triggering checkpoint barrier %s has been interrupted. 
Pending tasks: %d",
                       pendingCheckpoint.getInfo(),
                       plan.getStartingSubtasks().size()),
               e,
               CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
       Thread.currentThread().interrupt();
       throw new RuntimeException(e);
   } catch (Exception e) {
       handleCoordinatorError(
               String.format(
                       "Failed to trigger checkpoint barrier %s. Checkpoint 
type: %s, Pending tasks: %d",
                       pendingCheckpoint.getInfo(),
                       pendingCheckpoint.getCheckpointType(),
                       plan.getStartingSubtasks().size()),
               e,
               CheckpointCloseReason.CHECKPOINT_INSIDE_ERROR);
       return;
   }
   ```
   
   **Rationale**:
   1. Provide more detailed context information
   2. Convenient for quickly locating problems
   3. Consistent with style of other error messages (see lines 649, 657)
   4. Does not affect performance (string construction only on exception)
   
   ---
   
   ### Issue 6: Test Code Uses Incorrect jobId, Test Cannot Properly Validate
   
   **Location**: `CheckpointBarrierTriggerErrorTest.java:33-34, 40`
   
   ```java
   long jobId = System.currentTimeMillis();  // Line 33: Define variable
   startJob(System.currentTimeMillis(), CONF_PATH);  // Line 34: Pass new 
timestamp!
   
   // ...
   
   Assertions.assertEquals(
           server.getCoordinatorService().getJobStatus(jobId),  // Line 40: Use 
jobId variable
           JobStatus.RUNNING);
   ```
   
   **Problem Description**:
   The test code defines `jobId` variable at line 33, but when calling 
`startJob()` at line 34, passes a new `System.currentTimeMillis()`. This leads 
to:
   1. The actual submitted jobId is different from the `jobId` variable value
   2. Lines 40 and 54 use `jobId` to query job status, querying a non-existent 
job
   3. Test can never pass (will timeout)
   
   **Confidence**: 100% (this is a clear bug)
   
   **Related Context**:
   - `startJob()` method uses the passed jobid to submit the job
   - `getJobStatus(jobId)` queries status of the specified jobId
   - Other tests (such as `CheckpointErrorRestoreEndTest.java:42`) correctly 
use the same jobId
   
   **Potential Risks**:
   - Risk 1: Test cannot pass, CI will fail
   - Risk 2: False test passing gives team a false sense of security
   - Risk 3: Waste of CI resources and developer time
   
   **Scope of Impact**:
   - Direct impact: `CheckpointBarrierTriggerErrorTest` test
   - Indirect impact: CI/CD process
   - Impact area: Single test case
   
   **Severity**: BLOCKER (test must be fixed before merging)
   
   **Improvement Suggestions**:
   ```java
   @Test
   public void testCheckpointBarrierTriggerError() throws NoSuchFieldException, 
IllegalAccessException {
       long jobId = System.currentTimeMillis();
       startJob(jobId, CONF_PATH);  // Fix: Use jobId variable instead of 
getting timestamp again
   
       await().atMost(120000, TimeUnit.MILLISECONDS)
               .untilAsserted(
                       () ->
                               Assertions.assertEquals(
                                       
server.getCoordinatorService().getJobStatus(jobId),
                                       JobStatus.RUNNING));
   
       // ... rest of code unchanged
   }
   ```
   
   **Rationale**:
   1. Fix obvious bug so test can run correctly
   2. Consistent with practice of other tests
   3. Ensures test can validate PR's fix effect
   
   ---
   
   ### Issue 8: Checkpoint Configuration in Test Configuration File May Cause 
Test Instability
   
   **Location**: 
`stream_fake_to_console_checkpoint_barrier_trigger_error.conf:24-25`
   
   ```hocon
   checkpoint.interval = 1000  # 1 秒
   checkpoint.timeout = 60000  # 60 秒
   ```
   
   **Problem Description**:
   `checkpoint.interval = 1000` (1 second) in test configuration means:
   1. After job starts, checkpoints will trigger frequently (once per second)
   2. Test makes the first trigger fail through Mockito
   3. But checkpoints will continue to trigger afterward
   4. If job doesn't fail in time, multiple checkpoints may be running
   
   This may lead to:
   - Test instability (timing issues)
   - Test timeout (360 seconds may not be enough)
   - Multiple threads operating simultaneously, hard to predict behavior
   
   **Related Context**:
   - Other tests (such as `CheckpointErrorRestoreEndTest`) use longer intervals
   - Test timeout is set to 360 seconds (6 minutes), already quite long
   
   **Potential Risks**:
   - Risk 1: Test may be unstable (sometimes passes, sometimes fails)
   - Risk 2: Extend CI time
   - Risk 3: May mask other concurrent bugs
   
   **Scope of Impact**:
   - Direct impact: `CheckpointBarrierTriggerErrorTest` test
   - Indirect impact: CI/CD process
   - Impact area: Single test case
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   ```hocon
   env {
     parallelism = 1
     job.mode = "STREAMING"
     checkpoint.interval = 10000  # 修改:改为 10 秒,减少触发频率
     checkpoint.timeout = 60000
   }
   ```
   
   **Rationale**:
   1. Reduce test timing sensitivity
   2. Reduce unnecessary checkpoint triggers
   3. Improve test stability
   4. Shorten test execution time
   
   ---


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to