DanielCarter-stack commented on PR #10580:
URL: https://github.com/apache/seatunnel/pull/10580#issuecomment-4021660601

   <!-- code-pr-reviewer -->
   <!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10580", "part": 1, 
"total": 1} -->
   ## Issue 1: Incomplete Resource Cleanup Leading to Potential Leaks
   
   **Location**: `CoordinatorService.java:559-567`
   
   ```java
   } catch (Exception e) {
       isActive = false;
       logger.severe("check new active master error, will retry later.", e);
       try {
           clearCoordinatorService();
       } catch (Exception ex) {
           logger.warning("clear coordinator service failed.", ex);
       }
   }
   ```
   
   **Related Context**:
   - `initCoordinatorService()`: line 404-447
     - Initialize `jobHistoryService` (line 415-429)
     - Initialize `eventProcessor` (line 430-434)
     - Initialize `connectorPackageService` (line 437-440)
     - Start `restoreAllJobFromMasterNodeSwitchFuture` (line 443-446)
   - `clearCoordinatorService()`: line 570-604
     - Only clean up `runningJobMasterMap`, `pendingJobQueue`, 
`executorService`, `resourceManager`, `eventProcessor`
     - **Does NOT clean up `jobHistoryService` and `connectorPackageService`**
   
   **Problem Description**:
   If an exception is thrown at different stages during 
`initCoordinatorService()` execution:
   - Scenario A: `jobHistoryService` created successfully, but `eventProcessor` 
creation fails → `jobHistoryService` will not be cleaned up
   - Scenario B: `restoreAllJobFromMasterNodeSwitchFuture` has been submitted 
but subsequent code fails → async tasks may still be running
   - Scenario C: `connectorPackageService` created successfully but subsequent 
steps fail → service will not be cleaned up
   
   This leads to:
   1. `jobHistoryService` holds stale IMap references (such as 
`runningJobStateIMap`, `finishedJobStateIMap`), potentially occupying memory
   2. `connectorPackageService` may hold unreleased resources
   3. `restoreAllJobFromMasterNodeSwitchFuture` reference itself is not 
cleared, but the executor has been shut down
   
   **Potential Risks**:
   - **Risk 1**: Memory leak. Each retry may create new `jobHistoryService`, 
but old ones are not cleaned up
   - **Risk 2**: Resource leak. `connectorPackageService` may hold file handles 
or network connections
   - **Risk 3**: State inconsistency. If retry succeeds, old 
`jobHistoryService` may still be using stale IMap references
   
   **Impact Scope**:
   - **Direct Impact**: Lifecycle management of `CoordinatorService`
   - **Indirect Impact**: Continuous failures lead to continuously growing 
memory usage
   - **Affected Area**: Core framework, affecting all users using cluster mode
   
   **Severity**: **MAJOR**
   
   **Improvement Suggestions**:
   ```java
   // Added in clearCoordinatorService()
   public synchronized void clearCoordinatorService() {
       // interrupt all JobMaster
       runningJobMasterMap.values().forEach(JobMaster::interrupt);
       if (isWaitStrategy) {
           pendingJobQueue
                   .getJobIdMap()
                   .values()
                   .forEach(
                           pendingJobInfo -> {
                               JobMaster jobMaster = 
pendingJobInfo.getJobMaster();
                               jobMaster.interrupt();
                           });
           pendingJobQueue.clear();
       }
       executorService.shutdownNow();
       runningJobMasterMap.clear();
   
       try {
           executorService.awaitTermination(20, TimeUnit.SECONDS);
       } catch (InterruptedException e) {
           throw new SeaTunnelEngineException("wait clean executor service 
error", e);
       }
   
       // Added: cleanup jobHistoryService
       jobHistoryService = null;
   
       // Added: cleanup connectorPackageService
       if (connectorPackageService != null) {
           try {
               // If ConnectorPackageService has a close() method, call it
               // connectorPackageService.close();
           } catch (Exception e) {
               logger.warning("Failed to close connector package service", e);
           }
           connectorPackageService = null;
       }
   
       // Added: cleanup restoreAllJobFromMasterNodeSwitchFuture
       restoreAllJobFromMasterNodeSwitchFuture = null;
   
       if (resourceManager != null) {
           resourceManager.close();
       }
   
       try {
           if (eventProcessor != null) {
               eventProcessor.close();
               eventProcessor = null;  // Added: clear references
           }
       } catch (Exception e) {
           throw new SeaTunnelEngineException("close event processor error", e);
       }
   }
   ```
   
   **Rationale**: 
   - Ensure all resources initialized in `initCoordinatorService()` are cleaned 
up
   - Prevent memory leaks and resource leaks
   - Ensure a clean initial state for the next retry
   
   ---
   
   ## Issue 2: Interruption Risk of Async Recovery Tasks
   
   **Location**: `CoordinatorService.java:443-446, 559-567`
   
   ```java
   // In initCoordinatorService()
   restoreAllJobFromMasterNodeSwitchFuture =
           new PassiveCompletableFuture(
                   CompletableFuture.runAsync(
                           this::restoreAllRunningJobFromMasterNodeSwitch, 
executorService));
   
   // In checkNewActiveMaster() exception handling
   } catch (Exception e) {
       isActive = false;
       logger.severe("check new active master error, will retry later.", e);
       try {
           clearCoordinatorService();  // Will call 
executorService.shutdownNow()
       } catch (Exception ex) {
           logger.warning("clear coordinator service failed.", ex);
       }
   }
   ```
   
   **Related Context**:
   - `waitForJobComplete()` (line 733-759) depends on 
`restoreAllJobFromMasterNodeSwitchFuture.join()`
   - `restoreAllRunningJobFromMasterNodeSwitch()` (line 449-504) recovers 
running jobs
   - `restoreJobFromMasterActiveSwitch()` (line 506-537) creates new JobMaster 
and adds to pending queue
   
   **Problem Description**:
   
   This is a **serious concurrency timing issue**:
   
   1. `initCoordinatorService()` submits async recovery tasks to 
`executorService`
   2. Async tasks start executing, possibly performing the following operations:
      - Reading job information from `runningJobInfoIMap`
      - Creating new `JobMaster` instances
      - Adding jobs to `pendingJobQueue`
   3. If subsequent code in `initCoordinatorService()` throws an exception 
during async task execution
   4. `clearCoordinatorService()` calls `executorService.shutdownNow()`
   5. **Recovery tasks in progress are forcibly interrupted**
   6. This may lead to:
      - Some jobs have been added in `pendingJobQueue`
      - Some jobs have been added in `runningJobMasterMap`
      - But `restoreAllJobFromMasterNodeSwitchFuture`'s Future has been cleared 
(if fixed per Issue 1's suggestion)
      - Next call to `waitForJobComplete()` will cause join() to fail or block
   
   **Potential Risks**:
   - **Risk 1**: Job state inconsistency. Some jobs are marked as recovered, 
but recovery process is incomplete
   - **Risk 2**: `waitForJobComplete()` may permanently block or fail, as it 
depends on `restoreAllJobFromMasterNodeSwitchFuture.join()`
   - **Risk 3**: Incomplete job information may exist in `pendingJobQueue` and 
`runningJobMasterMap`
   
   **Impact Scope**:
   - **Direct Impact**: Job recovery process during Master node switching
   - **Indirect Impact**: Users querying job status through 
`waitForJobComplete()` may fail
   - **Affected Area**: Core framework, all scenarios with running jobs during 
Master switch
   
   **Severity**: **CRITICAL**
   
   **Improvement Suggestions**:
   
   ```java
   private void initCoordinatorService() {
       runningJobInfoIMap =
               
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO);
       runningJobStateIMap =
               
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_STATE);
       runningJobStateTimestampsIMap =
               
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_STATE_TIMESTAMPS);
       ownedSlotProfilesIMap =
               
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_OWNED_SLOT_PROFILES);
       metricsImap = 
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS);
   
       jobHistoryService =
               new JobHistoryService(
                       nodeEngine,
                       runningJobStateIMap,
                       logger,
                       pendingJobQueue.getJobIdMap(),
                       runningJobMasterMap,
                       
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE),
                       nodeEngine
                               .getHazelcastInstance()
                               .getMap(Constant.IMAP_FINISHED_JOB_METRICS),
                       nodeEngine
                               .getHazelcastInstance()
   .getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO),
                       engineConfig.getHistoryJobExpireMinutes());
       eventProcessor =
               createJobEventProcessor(
                       engineConfig.getEventReportHttpApi(),
                       engineConfig.getEventReportHttpHeaders(),
                       nodeEngine);
   
       // If the user has configured the connector package service, create it  
on the master node.
       ConnectorJarStorageConfig connectorJarStorageConfig =
               engineConfig.getConnectorJarStorageConfig();
       if (connectorJarStorageConfig.getEnable()) {
           connectorPackageService = new 
ConnectorPackageService(seaTunnelServer);
       }
   
       // Modified: move async task creation to the end, ensure all preceding 
initializations succeed before creating
       restoreAllJobFromMasterNodeSwitchFuture =
               new PassiveCompletableFuture(
                       CompletableFuture.runAsync(
                               this::restoreAllRunningJobFromMasterNodeSwitch, 
executorService));
   }
   
   // In the exception handling of checkNewActiveMaster()
   } catch (Exception e) {
       isActive = false;
       logger.severe("check new active master error, will retry later.", e);
       try {
           // Added: wait for restore task to complete or cancel before cleanup
           if (restoreAllJobFromMasterNodeSwitchFuture != null) {
               try {
                   // Brief wait to avoid blocking too long
                   restoreAllJobFromMasterNodeSwitchFuture.get(1, 
TimeUnit.SECONDS);
               } catch (TimeoutException ex) {
                   logger.warning("Restore task timeout during cleanup, will be 
interrupted", ex);
               } catch (ExecutionException | InterruptedException ex) {
                   logger.warning("Restore task failed during cleanup", ex);
               }
           }
           
           clearCoordinatorService();
       } catch (Exception ex) {
           logger.warning("clear coordinator service failed.", ex);
       }
   }
   ```
   
   **Rationale**: 
   - Ensure `restoreAllJobFromMasterNodeSwitchFuture` is created only after all 
other initializations succeed
   - Give recovery tasks a brief opportunity to complete (1 second) before 
cleanup, avoiding frequent interruptions
   - If timeout indeed occurs, execute cleanup; at this point need to accept 
the consequences of interrupted recovery
   - Or consider splitting `initCoordinatorService()` into multiple phases, 
each phase can be independently rolled back
   
   **Alternative**: Move `restoreAllJobFromMasterNodeSwitchFuture` creation out 
of `initCoordinatorService()`, create it only after confirming `isActive = 
true`.
   
   ---
   
   ## Issue 3: Lack of Retry Backoff Strategy and Circuit Breaker Mechanism
   
   **Location**: `CoordinatorService.java:218-219, 559-567`
   
   ```java
   // In constructor
   masterActiveListener.scheduleAtFixedRate(
           this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);
   
   // In exception handling
   } catch (Exception e) {
       isActive = false;
       logger.severe("check new active master error, will retry later.", e);
       try {
           clearCoordinatorService();
       } catch (Exception ex) {
           logger.warning("clear coordinator service failed.", ex);
       }
   }
   ```
   
   **Related Context**:
   - Hazelcast's `RetryableHazelcastException` usually only succeeds after 
multiple retries
   - If it's a configuration error or permanent failure, continuous retry is 
meaningless
   - Frequent retries consume CPU and generate large amounts of logs
   
   **Problem Description**:
   
   Current implementation is **unlimited fixed-interval retry**:
   - Retries every 100ms with no maximum retry limit
   - No backoff strategy
   - No distinction between transient errors and permanent errors
   
   This leads to the following problems:
   1. **Log flooding**: If error persists, 10 SEVERE logs will be generated per 
second
   2. **Resource waste**: Frequent `clearCoordinatorService()` and 
`initCoordinatorService()` calls consume CPU
   3. **Lack of observability**: Cannot know from outside how many retries have 
occurred
   4. **Permanent errors cannot be alerted**: Configuration errors lead to 
permanent retries
   
   **Potential Risks**:
   - **Risk 1**: Disk filled with logs (if error persists)
   - **Risk 2**: Excessive CPU usage (frequent initialization and cleanup)
   - **Risk 3**: Operations personnel cannot promptly detect real failures 
(logs drowned out)
   
   **Impact Scope**:
   - **Direct Impact**: Health monitoring of `CoordinatorService`
   - **Indirect Impact**: Overall cluster performance and stability
   - **Affected Area**: Core framework
   
   **Severity**: **MAJOR**
   
   **Improvement Suggestions**:
   
   ```java
   // Add fields in CoordinatorService
   private final AtomicLong initFailureCount = new AtomicLong(0);
   private static final long MAX_INIT_FAILURES_BEFORE_BACKOFF = 10;
   private static final long BACKOFF_INTERVAL_MS = 5000; // 5 seconds
   
   private void checkNewActiveMaster() {
       try {
           if (!isActive && this.seaTunnelServer.isMasterNode()) {
               logger.info(
                       "This node become a new active master node, begin init 
coordinator service");
               if (this.executorService.isShutdown()) {
                   this.executorService =
                           Executors.newCachedThreadPool(
                                   new ThreadFactoryBuilder()
                                           
.setNameFormat("seatunnel-coordinator-service-%d")
                                           .build());
               }
               initCoordinatorService();
               isActive = true;
               initFailureCount.set(0); // Reset failure count
           } else if (isActive && !this.seaTunnelServer.isMasterNode()) {
               isActive = false;
               logger.info(
                       "This node become leave active master node, begin clear 
coordinator service");
               clearCoordinatorService();
           }
       } catch (Exception e) {
           isActive = false;
           long failureCount = initFailureCount.incrementAndGet();
           
           // Determine if backoff is needed
           if (failureCount >= MAX_INIT_FAILURES_BEFORE_BACKOFF) {
               logger.severe(String.format(
                   "CoordinatorService initialization failed after %d attempts 
(error: %s). " +
                   "Entering backoff mode, will retry in %dms. Node: %s, 
Master: %s",
                   failureCount,
                   e.getClass().getSimpleName(),
                   BACKOFF_INTERVAL_MS,
                   nodeEngine.getThisAddress(),
                   nodeEngine.getMasterAddress()
               ), e);
               
               // Can send alert here
               // alertService.sendAlert("CoordinatorService init failed");
               
               // Wait for a period of time then reset counter
               try {
                   Thread.sleep(BACKOFF_INTERVAL_MS);
               } catch (InterruptedException ie) {
                   Thread.currentThread().interrupt();
                   logger.warning("Backoff sleep interrupted", ie);
               }
           } else {
               logger.severe(String.format(
                   "CoordinatorService initialization failed (attempt %d/%d): 
%s. Will retry in %dms",
                   failureCount,
                   MAX_INIT_FAILURES_BEFORE_BACKOFF,
                   e.getMessage(),
                   100
               ), e);
           }
           
           try {
               clearCoordinatorService();
           } catch (Exception ex) {
               logger.warning("clear coordinator service failed.", ex);
           }
       }
   }
   
   // Add Metrics methods for monitoring
   public long getInitFailureCount() {
       return initFailureCount.get();
   }
   ```
   
   **Rationale**: 
   - Avoid log flooding and resource waste
   - Provide better observability
   - Distinguish between transient errors and permanent errors
   - Follow distributed systems best practices (exponential backoff)
   
   ---
   
   ## Issue 4: Insufficient Log Information, Lacking Context
   
   **Location**: `CoordinatorService.java:561`
   
   ```java
   logger.severe("check new active master error, will retry later.", e);
   ```
   
   **Related Context**:
   - Operations personnel need to quickly identify whether it's a network 
issue, configuration issue, or Hazelcast issue
   - Need to know current node status and cluster status
   - Need to know which retry attempt this is
   
   **Problem Description**:
   
   Current log information lacks key context:
   1. No node address information
   2. No Master address information
   3. No retry count
   4. No concise description of exception type
   
   **Potential Risks**:
   - **Risk 1**: Difficult problem localization, need to check full stack trace 
to understand exception type
   - **Risk 2**: Cannot quickly determine if it's a widespread issue or 
specific to a node
   - **Risk 3**: Cannot assess retry trends
   
   **Impact Scope**:
   - **Direct Impact**: Observability and problem diagnosis efficiency
   - **Affected Area**: Operations and development personnel
   
   **Severity**: **MINOR**
   
   **Improvement Suggestions**:
   
   ```java
   } catch (Exception e) {
       isActive = false;
       long failureCount = initFailureCount.incrementAndGet();
       
       Throwable rootCause = ExceptionUtils.getRootCause(e);
       String errorType = rootCause.getClass().getSimpleName();
       String errorMsg = rootCause.getMessage();
       
       logger.severe(String.format(
           "CoordinatorService initialization failed (attempt #%d). " +
           "Node: %s, Cluster Master: %s, Error Type: %s, Error: %s. " +
           "Will retry in %dms.",
           failureCount,
           nodeEngine.getThisAddress(),
           nodeEngine.getMasterAddress(),
           errorType,
           errorMsg != null && errorMsg.length() > 100 ? errorMsg.substring(0, 
100) + "..." : errorMsg,
           100
       ), e);
       
       try {
           clearCoordinatorService();
       } catch (Exception ex) {
           logger.warning(String.format(
               "Failed to clear CoordinatorService after initialization failure 
(node: %s)",
               nodeEngine.getThisAddress()
           ), ex);
       }
   }
   ```
   
   **Rationale**: 
   - Provide richer context information
   - Help quickly locate problems
   - Follow production environment logging best practices
   
   ---
   
   ## Issue 5: Missing Cleanup of jobHistoryService in clearCoordinatorService()
   
   **Location**: `CoordinatorService.java:570-604`
   
   ```java
   public synchronized void clearCoordinatorService() {
       // interrupt all JobMaster
       runningJobMasterMap.values().forEach(JobMaster::interrupt);
       // ...
       executorService.shutdownNow();
       runningJobMasterMap.clear();
   
       try {
           executorService.awaitTermination(20, TimeUnit.SECONDS);
       } catch (InterruptedException e) {
           throw new SeaTunnelEngineException("wait clean executor service 
error", e);
       }
   
       if (resourceManager != null) {
           resourceManager.close();
       }
   
       try {
           if (eventProcessor != null) {
               eventProcessor.close();
           }
       } catch (Exception e) {
           throw new SeaTunnelEngineException("close event processor error", e);
       }
       // Missing cleanup for jobHistoryService, connectorPackageService, 
restoreAllJobFromMasterNodeSwitchFuture
   }
   ```
   
   **Related Context**:
   - `initCoordinatorService()` creates `jobHistoryService` at line 415-429
   - `initCoordinatorService()` creates `connectorPackageService` at line 
437-440
   - `initCoordinatorService()` creates 
`restoreAllJobFromMasterNodeSwitchFuture` at line 443-446
   
   **Problem Description**:
   
   `clearCoordinatorService()` does not clean up all resources initialized in 
`initCoordinatorService()`. This is related to Issue 1, but emphasizes the 
**symmetry** issue:
   - If `init` creates resources, `clear` should clean them up
   - Current code is asymmetric, leading to partial resource leaks
   
   **Potential Risks**:
   - **Risk 1**: Memory leak
   - **Risk 2**: `jobHistoryService` may hold references to closed IMap, 
causing errors on next use
   
   **Impact Scope**:
   - **Direct Impact**: Resource management
   - **Affected Area**: Core framework
   
   **Severity**: **MAJOR** (Together with Issue 1 forms a complete problem)
   
   **Improvement Suggestions**: See Issue 1's improvement suggestions
   
   ---


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to