DanielCarter-stack commented on PR #10562:
URL: https://github.com/apache/seatunnel/pull/10562#issuecomment-4003611773

   <!-- code-pr-reviewer -->
   <!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10562", "part": 1, 
"total": 1} -->
   ### Issue 1: Missing unit test coverage
   
   **Location**: 
   - 
`seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/utils/ExceptionUtilTest.java`
   - 
`seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/CoordinatorServiceTest.java`
   
   **Related Context**:
   - Modified class: `ExceptionUtil.java:155-161`
   - Modified class: `CoordinatorService.java:508-526`
   - Callers: `JobMaster.java:490-494, 686-690, 732-733` and many other places
   
   **Problem Description**:
   The PR modifies core exception judgment logic and job recovery process, but 
does not add any unit tests. Specifically missing:
   
   1. `ExceptionUtil.isOperationNeedRetryException` has no test cases to verify 
it can correctly identify `RetryableHazelcastException`
   2. `CoordinatorService.restoreJobFromMasterActiveSwitch` has no tests 
simulating IMap loading scenarios
   3. No tests to verify retry mechanism is triggered correctly
   4. No tests for behavior after retry count is exhausted
   
   **Potential Risks**:
   - **Risk 1**: Future modifications may accidentally break this logic, 
causing regressions
   - **Risk 2**: Cannot verify whether the fix is truly effective
   - **Risk 3**: Edge conditions (e.g., still failing after 30 retries) are not 
verified
   
   **Impact Scope**:
   - **Direct Impact**: `ExceptionUtil` and `CoordinatorService` classes
   - **Indirect Impact**: All scenarios that depend on master switch job 
recovery functionality
   - **Impact Area**: Core framework
   
   **Severity**: MAJOR
   
   **Improvement Suggestions**:
   
   ```java
   // Added in ExceptionUtilTest.java
   @Test
   void testIsOperationNeedRetryException_withRetryableHazelcastException() {
       RetryableHazelcastException exception = new 
RetryableHazelcastException("Test");
       assertTrue(ExceptionUtil.isOperationNeedRetryException(exception));
   }
   
   @Test
   void 
testIsOperationNeedRetryException_withWrappedRetryableHazelcastException() {
       Throwable exception = new Exception(
           new RuntimeException(new RetryableHazelcastException("Test"))
       );
       assertTrue(ExceptionUtil.isOperationNeedRetryException(exception));
   }
   
   @Test
   void testIsOperationNeedRetryException_withNonRetryableException() {
       Exception exception = new Exception("Non-retryable");
       assertFalse(ExceptionUtil.isOperationNeedRetryException(exception));
   }
   ```
   
   ```java
   // Added in CoordinatorServiceTest.java (need to mock Hazelcast IMap to 
throw RetryableHazelcastException)
   @Test
   public void testRestoreJobFromMasterActiveSwitch_withIMapLoadingRetry() {
       // Use Mockito to mock runningJobStateIMap
       // First 3 calls throw RetryableHazelcastException
       // 4th call returns normal jobState
       // Verify retry logic takes effect, job successfully recovered
   }
   
   @Test
   public void testRestoreJobFromMasterActiveSwitch_retryExhausted() {
       // Mock to always throw RetryableHazelcastException
       // Verify SeaTunnelEngineException is thrown after 30 retries
   }
   ```
   
   **Rationale**: Testing is critical to ensuring code quality and preventing 
regressions. Especially for bugs in distributed scenarios, manual testing is 
costly and difficult to cover all cases.
   
   ---
   
   ### Issue 2: Missing JavaDoc comments
   
   **Location**: 
`seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java:155`
   
   **Related Context**:
   - Method: `ExceptionUtil.isOperationNeedRetryException`
   - Callers: Retry logic in 9 different classes
   
   **Problem Description**:
   The `isOperationNeedRetryException` method lacks JavaDoc comments to explain:
   1. What is the purpose of this method
   2. Which exception types are considered to need retry
   3. Why these exceptions need retry
   4. How callers should use this method
   
   **Potential Risks**:
   - **Risk 1**: New developers may not understand the importance of this 
method and modify it incorrectly
   - **Risk 2**: May miss adding new retryable exception types
   - **Risk 3**: Difficult to maintain, unclear about specific scenarios for 
each retryable exception
   
   **Impact Scope**:
   - **Direct Impact**: Maintainability of `ExceptionUtil` class
   - **Indirect Impact**: All developers who use this method
   - **Impact Area**: Core framework
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   
   ```java
   /**
    * Check if an exception indicates an operation that should be retried.
    *
    * <p>This method is used by {@link RetryUtils} to determine if an operation
    * should be retried based on the exception thrown. It extracts the root 
cause
    * of the exception chain and checks if it matches known retryable exception 
types.
    *
    * <p>The following exception types are considered retryable:
    * <ul>
    *   <li>{@link HazelcastInstanceNotActiveException} - Hazelcast instance is 
not active</li>
    *   <li>{@link InterruptedException} - Operation was interrupted</li>
    *   <li>{@link OperationTimeoutException} - Operation timed out</li>
    *   <li>{@link RetryableHazelcastException} - Hazelcast explicitly marks 
this as retryable,
    *       e.g., when IMap is still loading from external storage (S3) in 
EAGER mode</li>
    * </ul>
    *
    * <p><b>Important:</b> When adding new retryable exception types, ensure 
that:
    * <ol>
    *   <li>The exception is truly transient and retrying may succeed</li>
    *   <li>Retrying does not cause duplicate operations or data 
inconsistency</li>
    *   <li>All call sites can handle the retry delay appropriately</li>
    * </ol>
    *
    * @param e the exception to check (may be wrapped in other exceptions)
    * @return {@code true} if the exception (or its root cause) is retryable; 
{@code false} otherwise
    * @see RetryUtils#retryWithException
    * @see org.apache.seatunnel.common.utils.ExceptionUtils#getRootException
    */
   public static boolean isOperationNeedRetryException(@NonNull Throwable e) {
       Throwable exception = ExceptionUtils.getRootException(e);
       return exception instanceof HazelcastInstanceNotActiveException
               || exception instanceof InterruptedException
               || exception instanceof OperationTimeoutException
               || exception instanceof RetryableHazelcastException;
   }
   ```
   
   **Rationale**: Comprehensive JavaDoc is crucial for public API methods, 
especially for core utility methods called from multiple places.
   
   ---
   
   ### Issue 3: PR description mentions "putIfAbsent" but it's not reflected in 
the code
   
   **Location**: PR description
   
   **Related Context**:
   - PR description: "Add RetryableHazelcastException to 
ExceptionUtil.isOperationNeedRetryException so SeaTunnel's RetryUtils retries 
putIfAbsent calls..."
   - Actual code: Only `runningJobStateIMap.get()` call is modified
   
   **Problem Description**:
   The PR description mentions that the fix will affect "putIfAbsent calls", 
but no `putIfAbsent` usage is seen in the actual code changes. This may mean:
   
   1. The PR description is inaccurate or outdated
   2. The "putIfAbsent" issue exists elsewhere (not included in this PR)
   3. The description confuses different fix points
   
   **Potential Risks**:
   - **Risk 1**: Code reviewers may be confused about the actual fix scope
   - **Risk 2**: If `putIfAbsent` has the same issue but is not fixed, the bug 
still exists
   - **Risk 3**: Future maintainers may misunderstand the actual impact of the 
PR
   
   **Impact Scope**:
   - **Direct Impact**: Understandability of the PR
   - **Indirect Impact**: May lead to incomplete fixes
   - **Impact Area**: Documentation and communication
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   
   1. **Clarify PR description**: 
      - If the `putIfAbsent` issue indeed exists but is not within the scope of 
this PR, it should be explicitly stated in the PR description
      - If the description is outdated, it should be updated to accurately 
reflect the actual modifications
      
   2. **Code search verification**:
      ```bash
      # Search all usages of putIfAbsent in the project
      grep -rn "putIfAbsent" seatunnel-engine/
      ```
      
   3. **可能的描述更新**:
      ```
      - Add RetryableHazelcastException to 
ExceptionUtil.isOperationNeedRetryException
        so that all retry operations (including IMap access like get/put) can 
properly
        handle RetryableHazelcastException
      - Wrap runningJobStateIMap.get() in RetryUtils in 
restoreJobFromMasterActiveSwitch
        to handle RetryableHazelcastException during master switch
      ```
   
   ** Reason**: Accurate PR description is important for code review and future 
maintenance. If `putIfAbsent` has the same issue, it should be fixed together 
or a new Issue should be created to track it.
   
   ---
   
   # ## Issue 4: Error handling can be more refined
   
   ** Location**: 
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:519-522`
   
   ** Related context**:
   - 方法: `restoreJobFromMasterActiveSwitch`
   - 调用方: `restoreAllRunningJobFromMasterNodeSwitch` (line 483-485)
   
   ** Issue description**:
   当前的错误处理比较粗糙:
   ```java
   catch (Exception e) {
       throw new SeaTunnelEngineException(
           String.format("Job id %s restore failed, can not get job state", 
jobId), e);
   }
   ```
   
   这种处理方式有几个问题:
   
   1. **不区分异常类型**: 无论是 `RetryableHazelcastException` 重试耗尽,还是其他严重异常,都抛出相同的错误
   2. **丢失重试信息**: 没有记录实际重试了多少次
   3. **缺少降级策略**: 对于某些可恢复的错误,可能有更好的处理方式
   
   ** Potential risks**:
   - **风险 1**: 难以诊断问题,日志中看不到重试的细节
   - **风险 2**: 无法针对不同的失败原因采取不同的处理策略
   - **风险 3**: 运维人员无法快速定位问题根因
   
   ** Scope of impact**:
   - **直接影响**: `restoreJobFromMasterActiveSwitch` 的错误处理
   - **间接影响**: 作业恢复失败的可诊断性
   - **影响面**: 单个模块
   
   ** Severity**: MINOR
   
   ** Improvement suggestions**:
   ```java
   Object jobState;
   try {
       jobState = RetryUtils.retryWithException(
           () -> runningJobStateIMap.get(jobId),
           new RetryUtils.RetryMaterial(
               Constant.OPERATION_RETRY_TIME,
               true,
               ExceptionUtil::isOperationNeedRetryException,
               Constant.OPERATION_RETRY_SLEEP));
       // Add success log (if retries occurred)
       logger.info(String.format("Successfully retrieved job state for job %s", 
jobId));
   } catch (Exception e) {
       // Check if it's caused by RetryableHazelcastException
       Throwable rootCause = ExceptionUtils.getRootException(e);
       if (rootCause instanceof RetryableHazelcastException) {
           logger.severe(String.format(
               "Job %s restore failed after %d retries due to IMap still 
loading from S3. " +
               "This may indicate a configuration issue or S3 connectivity 
problem. " +
               "Consider increasing IMap load timeout or checking S3 
connectivity.",
               jobId, Constant.OPERATION_RETRY_TIME));
       } else {
           logger.severe(String.format(
               "Job %s restore failed due to unexpected error", jobId), e);
       }
       throw new SeaTunnelEngineException(
           String.format("Job id %s restore failed, can not get job state", 
jobId), e);
   }
   ```
   
   或者,如果需要更精细的控制:
   ```java
   // Customize RetryUtils to support retry callbacks
   Object jobState;
   int[] retryCount = {0};
   try {
       jobState = RetryUtils.retryWithException(
           () -> {
               retryCount[0]++;
               return runningJobStateIMap.get(jobId);
           },
           new RetryUtils.RetryMaterial(
               Constant.OPERATION_RETRY_TIME,
               true,
               ExceptionUtil::isOperationNeedRetryException,
               Constant.OPERATION_RETRY_SLEEP));
       
       if (retryCount[0] > 1) {
           logger.info(String.format(
               "Job state retrieved after %d retries for job %s (IMap was 
loading)",
               retryCount[0], jobId));
       }
   } catch (Exception e) {
       // ... error handling
   }
   ```
   
   ** Reason**: Better logging and error handling can significantly improve 
maintainability in production environments.
   
   ---
   
   # ## Issue 5: No consideration for permanent IMap unavailability
   
   ** Location**: 
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:508-526`
   
   ** Related context**:
   - 方法: `restoreJobFromMasterActiveSwitch`
   - 依赖: `runningJobStateIMap`, `runningJobInfoIMap`
   
   ** Issue description**:
   当前实现假设 IMap 最终会加载完成,但如果:
   1. S3 连接断开
   2. S3 存储损坏
   3. IMap 配置错误
   
   即使重试 30 次(60 秒),IMap 可能仍然不可用。此时:
   - 作业恢复会抛出异常
   - `runningJobInfoIMap` 中的条目被保留(如果在这一步之前就失败)
   - 作业处于"悬空"状态:既不在 `runningJobMasterMap` 中,也不在 `pendingJobQueue` 中
   
   ** Potential risks**:
   - **风险 1**: 作业卡在不可恢复的状态
   - **风险 2**: 需要手动清理 `runningJobInfoIMap` 和 `runningJobStateIMap`
   - **风险 3**: 没有告警机制通知运维人员
   
   ** Scope of impact**:
   - **直接影响**: 作业恢复的可靠性
   - **间接影响**: 集群的自我恢复能力
   - **影响面**: 核心框架
   
   ** Severity**: MAJOR
   
   ** Improvement suggestions**:
   
   可以考虑添加一个"标记机制",将无法恢复的作业标记为需要人工介入:
   ```java
   private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull 
JobInfo jobInfo) {
       Object jobState;
       try {
           jobState = RetryUtils.retryWithException(
               () -> runningJobStateIMap.get(jobId),
               new RetryUtils.RetryMaterial(
                   Constant.OPERATION_RETRY_TIME,
                   true,
                   ExceptionUtil::isOperationNeedRetryException,
                   Constant.OPERATION_RETRY_SLEEP));
       } catch (Exception e) {
           Throwable rootCause = ExceptionUtils.getRootException(e);
           
           // Check if it's RetryableHazelcastException (IMap loading issue)
           if (rootCause instanceof RetryableHazelcastException) {
               // Log to dedicated failure queue for subsequent processing or 
alerting
               logger.severe(String.format(
                   "Job %s restore failed due to IMap loading timeout after %d 
retries. " +
                   "Marking job as failed. JobInfo: %s",
                   jobId, Constant.OPERATION_RETRY_TIME, jobInfo));
               
               // Try to update job status to FAILED (if there's 
finishedJobStateIMap)
               try {
                   // Update finishedJobStateIMap, mark job as FAILED
                   // This way the job is visible in UI instead of completely 
disappearing
               } catch (Exception ex) {
                   logger.warning("Failed to mark job as failed", ex);
               }
           }
           
           throw new SeaTunnelEngineException(
               String.format("Job id %s restore failed, can not get job state", 
jobId), e);
       }
       
       if (jobState == null) {
           runningJobInfoIMap.remove(jobId);
           return;
       }
       
       // ... normal recovery logic
   }
   ```
   
   或者,可以添加一个配置选项,控制作业恢复失败时的行为:
   ```java
   // Add in EngineConfig
   public enum JobRestoreFailureStrategy {
       FAIL_FAST,           // Fail immediately (current behavior)
       MARK_AS_FAILED,      // Mark as FAILED, keep in history
       RETRY_INDEFINITELY   // Retry indefinitely (not recommended)
   }
   ```
   
   ** Reason**: Although this is an edge case, all failure scenarios need to be 
considered in production environments to ensure the system doesn't enter an 
inconsistent state.
   
   ---
   
   # ## Issue 6: Potential performance issue - undifferentiated 2-second retry 
interval
   
   ** Location**: 
`seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java:42`
 and `CoordinatorService.java:518`
   
   ** Related context**:
   - 常量定义: `Constant.OPERATION_RETRY_SLEEP = 2000` (2 秒)
   - 使用位置: `CoordinatorService`, `JobMaster` 等多处
   
   ** Issue description**:
   当前重试策略使用固定的 2 秒间隔,这在某些场景下可能不是最优的:
   
   1. **IMap 快速加载完成**: 如果 IMap 在 500ms 内就加载完成,浪费了 1.5 秒
   2. **IMap 加载很慢**: 如果 IMap 需要加载 10 秒,30 次重试可能不够
   3. **无指数退避**: 在 `RetryMaterial` 构造中没有启用 `sleepTimeIncrease`,导致没有指数退避
   
   ** Potential risks**:
   - **风险 1**: 在 IMap 快速加载完成时,不必要的延迟
   - **风险 2**: 在 IMap 加载缓慢时,可能重试次数不够
   - **风险 3**: 固定间隔可能加剧集群负载(如果在同一时间多个作业都在重试)
   
   ** Scope of impact**:
   - **直接影响**: 作业恢复的速度和成功率
   - **间接影响**: master switch 的整体恢复时间
   - **影响面**: 核心框架
   
   ** Severity**: MINOR
   
   ** Improvement suggestions**:
   
   可以考虑启用指数退避:
   ```java
   jobState = RetryUtils.retryWithException(
       () -> runningJobStateIMap.get(jobId),
       new RetryUtils.RetryMaterial(
           Constant.OPERATION_RETRY_TIME,    // 30
           true,
           ExceptionUtil::isOperationNeedRetryException,
           Constant.OPERATION_RETRY_SLEEP,   // 2000ms as base interval
           true));  // Enable sleepTimeIncrease (exponential backoff)
   ```
   
   这样重试间隔将是:
   - 第 1 次: 2 秒
   - 第 2 次: 4 秒
   - 第 3 次: 8 秒
   - 第 4 次: 16 秒
   - ...
   - 第 30 次: 最大 20 秒(受 `MAX_RETRY_TIME_MS` 限制)
   
   总等待时间会更合理,且在 IMap 快速加载完成时不会浪费太多时间。
   
   ** Or**, a different retry strategy can be used for this specific scenario:
   ```java
   // Add in Constant
   public static final int IMAP_LOADING_RETRY_TIME = 30;
   public static final int IMAP_LOADING_RETRY_SLEEP_INITIAL = 500;  // 500ms
   public static final int IMAP_LOADING_RETRY_SLEEP_MAX = 5000;     // 5s
   
   // In CoordinatorService
   jobState = RetryUtils.retryWithException(
       () -> runningJobStateIMap.get(jobId),
       new RetryUtils.RetryMaterial(
           Constant.IMAP_LOADING_RETRY_TIME,
           true,
           ExceptionUtil::isOperationNeedRetryException,
           Constant.IMAP_LOADING_RETRY_SLEEP_INITIAL,
           true));  // Enable exponential backoff
   ```
   
   ** Reason**: Exponential backoff is a best practice for handling transient 
failures, balancing recovery speed with system load.
   
   ---
   
   # ## Issue 7: Potential race condition - duplicate recovery check not robust 
enough
   
   ** Location**: 
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:481-485`
   
   ** Related context**:
   - 方法: `restoreAllRunningJobFromMasterNodeSwitch`
   - 调用链: `checkNewActiveMaster` -> `initCoordinatorService` -> 
`restoreAllRunningJobFromMasterNodeSwitch`
   
   ** Issue description**:
   在 `restoreAllRunningJobFromMasterNodeSwitch` 中有这样的检查:
   ```java
   // skip the job new submit
   if (!runningJobMasterMap.containsKey(entry.getKey())) {
       restoreJobFromMasterActiveSwitch(entry.getKey(), entry.getValue());
   }
   ```
   
   这个检查是为了避免重复恢复同一个作业,但它是在 `CompletableFuture.runAsync` 内部执行的,存在时序问题:
   
   1. **时序**: 线程 A 检查 `!runningJobMasterMap.containsKey(jobId)` 为 true,准备恢复
   2. **并发**: 同时线程 B 可能正在提交同一个新作业(通过 `submitJob`)
   3. **问题**: 两个线程可能同时处理同一个 jobId,导致冲突
   
   ** Potential risks**:
   - **风险 1**: 同一个作业可能被恢复两次
   - **风险 2**: 新提交的作业可能被意外覆盖
   - **风险 3**: 虽然概率较低,但在高并发场景下可能发生
   
   ** Scope of impact**:
   - **直接影响**: `restoreJobFromMasterActiveSwitch` 和 `submitJob` 的并发安全性
   - **间接影响**: 作业的一致性
   - **影响面**: 单个模块
   
   ** Severity**: MAJOR
   
   ** Improvement suggestions**:
   
   方案 1: 使用 `runningJobInfoIMap` 作为同步点
   ```java
   private void restoreJobFromMasterActiveSwitch(@NonNull Long jobId, @NonNull 
JobInfo jobInfo) {
       // Double-check: confirm again that the job hasn't been restored yet
       if (runningJobMasterMap.containsKey(jobId)) {
           logger.info(String.format("Job %s already restored, skipping", 
jobId));
           return;
       }
       
       // ... continue recovery logic
   }
   ```
   
   方案 2: 使用 `ConcurrentHashMap.putIfAbsent`
   ```java
   private void restoreAllRunningJobFromMasterNodeSwitch() {
       // ...
       List<CompletableFuture<Void>> collect =
           needRestoreFromMasterNodeSwitchJobs.stream()
               .filter(entry -> 
!runningJobMasterMap.containsKey(entry.getKey()))
               .map(entry -> {
                   // Try to mark as "restoring"
                   JobMaster existing = runningJobMasterMap.putIfAbsent(
                       entry.getKey(), 
                       RESTORE_MARKER  // A special marker object
                   );
                   
                   if (existing != null) {
                       // Already handled by another thread
                       return CompletableFuture.completedFuture(null);
                   }
                   
                   return CompletableFuture.runAsync(() -> {
                       try {
                           logger.info(String.format("begin restore job 
(%s)...", entry.getKey()));
                           restoreJobFromMasterActiveSwitch(entry.getKey(), 
entry.getValue());
                       } finally {
                           // After recovery completes, RESTORE_MARKER will be 
replaced by the actual JobMaster
                           // If recovery fails, need to clean up the marker
                       }
                   }, executorService);
               })
               .collect(Collectors.toList());
       // ...
   }
   ```
   
   方案 3: 同步恢复(简单但可能影响性能)
   ```java
   private void restoreAllRunningJobFromMasterNodeSwitch() {
       // ...
       for (Map.Entry<Long, JobInfo> entry : 
needRestoreFromMasterNodeSwitchJobs) {
           if (!runningJobMasterMap.containsKey(entry.getKey())) {
               try {
                   restoreJobFromMasterActiveSwitch(entry.getKey(), 
entry.getValue());
               } catch (Exception e) {
                   logger.severe(e);
               }
           }
       }
       // ...
   }
   ```
   
   ** Reason**: Although the probability of this race condition is low, in 
distributed systems edge cases always happen and should be prevented in advance.
   
   ---
   
   # ## Issue 8: Log level and content can be improved
   
   ** Location**: 
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:475-493`
   
   ** Related context**:
   - 方法: `restoreAllRunningJobFromMasterNodeSwitch`
   - 相关日志: line 475, 477, 488, 490-493
   
   ** Issue description**:
   当前的日志记录存在以下问题:
   
   1. **缺少关键信息**: 日志中没有记录重试次数、实际等待时间等关键指标
   2. **级别不一致**: 使用 `logger.severe(e)` 但没有区分不同严重程度的错误
   3. **缺少诊断信息**: 当 IMap 加载失败时,没有记录 IMap 的状态、S3 连接状态等
   
   ** Potential risks**:
   - **风险 1**: 难以诊断生产环境问题
   - **风险 2**: 无法监控作业恢复的健康状况
   - **风险 3**: 缺少可观测性数据
   
   ** Scope of impact**:
   - **直接影响**: 运维和调试体验
   - **间接影响**: 问题诊断效率
   - **影响面**: 单个模块
   
   ** Severity**: MINOR
   
   ** Improvement suggestions**:
   ```java
   private void restoreAllRunningJobFromMasterNodeSwitch() {
       long startTime = System.currentTimeMillis();
       List<Map.Entry<Long, JobInfo>> needRestoreFromMasterNodeSwitchJobs =
               runningJobInfoIMap.entrySet().stream()
                       .filter(entry -> 
!runningJobMasterMap.containsKey(entry.getKey()))
                       .collect(Collectors.toList());
       
       if (needRestoreFromMasterNodeSwitchJobs.isEmpty()) {
           logger.info("No jobs to restore after master switch");
           return;
       }
       
       logger.info(String.format(
           "Found %d jobs to restore after master switch", 
           needRestoreFromMasterNodeSwitchJobs.size()));
       
       // ... worker wait logic ...
       
       List<CompletableFuture<Void>> collect =
               needRestoreFromMasterNodeSwitchJobs.stream()
                       .map(
                               entry ->
                                       CompletableFuture.runAsync(
                                               () -> {
                                                   long jobRestoreStartTime = 
System.currentTimeMillis();
                                                   logger.info(
                                                           String.format(
                                                                   "Begin 
restore job %s from master active switch",
                                                                   
entry.getKey()));
                                                   try {
                                                       if 
(!runningJobMasterMap.containsKey(
                                                                       
entry.getKey())) {
                                                           
restoreJobFromMasterActiveSwitch(
                                                                   
entry.getKey(),
                                                                   
entry.getValue());
                                                           long duration = 
System.currentTimeMillis() - jobRestoreStartTime;
                                                           
logger.info(String.format(
                                                               "Job %s restored 
successfully in %d ms",
                                                               entry.getKey(), 
duration));
                                                       } else {
                                                           
logger.info(String.format(
                                                               "Job %s already 
restored by another thread, skipping",
                                                               entry.getKey()));
                                                       }
                                                   } catch (Exception e) {
                                                       long duration = 
System.currentTimeMillis() - jobRestoreStartTime;
                                                       
logger.severe(String.format(
                                                           "Job %s restore 
failed after %d ms: %s",
                                                           entry.getKey(), 
duration, 
                                                           
ExceptionUtils.getMessage(e)), e);
                                                   }
                                               },
                                               
MDCTracer.tracing(entry.getKey(), executorService)))
                       .collect(Collectors.toList());
   
       try {
           CompletableFuture<Void> voidCompletableFuture =
                   CompletableFuture.allOf(collect.toArray(new 
CompletableFuture[0]));
           voidCompletableFuture.get();
           long totalDuration = System.currentTimeMillis() - startTime;
           logger.info(String.format(
               "All job restores completed in %d ms. Success: %d, Failed: %d",
               totalDuration,
               needRestoreFromMasterNodeSwitchJobs.size(),
               // TODO: can add failure count
               0));
       } catch (Exception e) {
           long totalDuration = System.currentTimeMillis() - startTime;
           logger.severe(String.format(
               "Job restore process failed after %d ms: %s",
               totalDuration, ExceptionUtils.getMessage(e)), e);
           throw new SeaTunnelEngineException(e);
       }
   }
   ```
   
   **Rationale**: Better logging is the foundation of observability in 
distributed systems and is crucial for quickly locating and resolving issues.
   
   ---
   ---


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to