DanielCarter-stack commented on PR #10430:
URL: https://github.com/apache/seatunnel/pull/10430#issuecomment-3832461773

   <!-- code-pr-reviewer -->
   <!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10430", "part": 1, 
"total": 1} -->
   ### Issue 1: PeekBlockingQueue.moveToTail can only move the head element
   
   **Location**: 
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueue.java:127-143`
   
   ```java
   public boolean moveToTail(Long jobId) {
       lock.lock();
       try {
           E element = jobIdMap.get(jobId);
           if (element == null) {
               return false;
           }
           E head = queue.peek();
           if (head == null || !head.equals(element)) {  // ⚠️ Issue
               return false;
           }
           if (!queue.remove(element)) {
               return false;
           }
           queue.put(element);
           notEmpty.signalAll();
           return true;
       } catch (InterruptedException e) {
           log.error("Move element to tail failed. {}", 
ExceptionUtils.getMessage(e));
           Thread.currentThread().interrupt();
           return false;
       } finally {
           lock.unlock();
       }
   }
   ```
   
   **Related Context**:
   - Caller: `PendingJobScheduleContext.moveHeadToTail` (line 31-33)
   - Caller: `WaitReschedulePolicy.onResourcesNotEnough` (line 71)
   
   **Problem Description**:
   
   The method name is `moveToTail` (move any element to tail), but the actual 
implementation is:
   ```java
   E head = queue.peek();
   if (head == null || !head.equals(element)) {
       return false;  // If it's not the head of the queue, return false 
directly
   }
   ```
   
   This means the method can only move the head element to the tail. Although 
current usage scenarios indeed only require moving the head element, the 
mismatch between the method name and implementation can easily cause 
misunderstanding.
   
   **Potential Risks**:
   - Risk 1: If other developers call this method to move non-head elements in 
the future, it will fail silently (return false), which may lead to logic errors
   - Risk 2: Reduced code readability, maintainers may mistakenly believe any 
element can be moved
   
   **Impact Scope**:
   - Direct impact: `PeekBlockingQueue.moveToTail`
   - Indirect impact: All callers (currently only 
`PendingJobScheduleContext.moveHeadToTail`)
   - Impact area: Core scheduling module
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   
   Solution 1: Rename the method to clarify its purpose
   ```java
   /**
    * Move the head element to the tail of the queue if it matches the given 
jobId.
    * @param jobId the job id of the head element
    * @return true if the head element was moved to tail; false otherwise
    */
   public boolean moveHeadToTailIfMatch(Long jobId) {
       // ... keep existing implementation
   }
   ```
   
   Solution 2: Implement true "move any element to tail" functionality
   ```java
   /**
    * Move the element with the given jobId to the tail of the queue.
    * @param jobId the job id to move
    * @return true if the element was moved; false if not found or already at 
tail
    */
   public boolean moveToTail(Long jobId) {
       lock.lock();
       try {
           E element = jobIdMap.get(jobId);
           if (element == null) {
               return false;
           }
           
           // Check if already at tail
           if (queue.isEmpty()) {
               return false;
           }
           
           // Remove the element (wherever it is)
           if (!queue.remove(element)) {
               return false;
           }
           
           // Add to tail
           queue.put(element);
           notEmpty.signalAll();
           return true;
       } catch (InterruptedException e) {
           log.error("Move element to tail failed. {}", 
ExceptionUtils.getMessage(e));
           Thread.currentThread().interrupt();
           return false;
       } finally {
           lock.unlock();
       }
   }
   ```
   
   **Rationale**:
   - Solution 1 better fits current usage scenarios with minimal risk
   - Solution 2 is more general-purpose but may increase complexity
   - Recommendation: Prioritize Solution 1; if moving non-head elements is 
needed in the future, adopt Solution 2
   
   ---
   
   ### Issue 2: WaitReschedulePolicy's rescheduling condition logic has 
potential issues
   
   **Location**: 
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PendingJobSchedulePolicyFactory.java:54-74`
   
   ```java
   private static class WaitReschedulePolicy implements 
PendingJobSchedulePolicy {
       @Override
       public void onResourcesNotEnough(PendingJobScheduleContext context)
               throws InterruptedException {
           WaitRescheduleConfig config =
                   context.getEngineConfig()
                           .getScheduleStrategyConfig(
                                   ScheduleStrategy.WAIT_RESCHEDULE, 
WaitRescheduleConfig.class);
           if (config == null) {
               config = new WaitRescheduleConfig();
           }
           int maxRetryTimes = config.getMaxRetryTimes();
           int checkTimes = context.getPendingJobInfo().getCheckTimes();
           if (maxRetryTimes > 0
                   && context.getPendingJobQueue().size() > 1
                   && checkTimes > 0
                   && checkTimes % maxRetryTimes == 0) {  // ⚠️ Potential Issue
               context.moveHeadToTail();
           }
           context.sleep(config.getSleepIntervalMillis());
       }
   }
   ```
   
   **Related Context**:
   - `PendingJobInfo.checkTimes` initial value is 0
   - `PendingJobInfo.recordSnapshot` will call `checkTimes.incrementAndGet()`
   - After first call, `checkTimes` becomes 1
   
   **Problem Description**:
   
   The rescheduling condition `checkTimes % maxRetryTimes == 0` may not meet 
expectations in the following scenarios:
   
   Assuming `maxRetryTimes = 3`:
   - 1st scheduling loop: `checkTimes = 0`, condition is `0 % 3 == 0` → 
**true**, triggers rescheduling
   - 2nd scheduling loop: `checkTimes = 1`, condition is `1 % 3 == 0` → false, 
no trigger
   - 3rd scheduling loop: `checkTimes = 2`, condition is `2 % 3 == 0` → false, 
no trigger
   - 4th scheduling loop: `checkTimes = 3`, condition is `3 % 3 == 0` → 
**true**, triggers rescheduling
   
   The problem lies in the 1st scheduling: if a job just entered the queue, 
it's unreasonable to move it to the tail on the first check due to insufficient 
resources. This would cause the job to "jump" in the queue.
   
   Although there is `checkTimes > 0` protection, `checkTimes` is incremented 
in `recordSnapshot`, not at the start of the scheduling loop. Need to confirm 
where `recordSnapshot` is called.
   
   **Verify calling location of `recordSnapshot`**:
   
   By searching the code, `recordSnapshot` is called in 
`CoordinatorService.pendingJobSchedule`:
   ```java
   // When resources are insufficient
   PendingJobDiagnostic diagnostic = 
PendingDiagnosticsCollector.collectJobDiagnostic(...);
   pendingJobInfo.recordSnapshot(diagnostic);
   ```
   
   This means `checkTimes` is indeed incremented after each insufficient 
resource check.
   
   **However**, when first entering the pending queue, `checkTimes = 0`. If 
resources are insufficient at this time:
   1. `recordSnapshot` is called, `checkTimes` becomes 1
   2. Next scheduling loop, `checkTimes = 1`
   3. Condition `1 % 3 == 0` is false, no rescheduling triggered
   
   So in practice, in the `maxRetryTimes = 3` case, rescheduling will be 
triggered on the 3rd, 6th, 9th... checks, which is reasonable.
   
   **Potential Risks**:
   - Risk 1: If `maxRetryTimes = 1`, rescheduling will be triggered on the 1st, 
2nd, 3rd... every time, causing continuous queue rotation
   - Risk 2: Doesn't consider the scenario where a job just entered the queue 
(although the `checkTimes` mechanism has already handled this)
   - Risk 3: The increment of `checkTimes` and the judgment of rescheduling 
condition are separated, which may have timing issues
   
   **Impact Scope**:
   - Direct impact: `WaitReschedulePolicy.onResourcesNotEnough`
   - Indirect impact: All jobs using WAIT_RESCHEDULE strategy
   - Impact area: Core scheduling logic
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   
   ```java
   private static class WaitReschedulePolicy implements 
PendingJobSchedulePolicy {
       @Override
       public void onResourcesNotEnough(PendingJobScheduleContext context)
               throws InterruptedException {
           WaitRescheduleConfig config =
                   context.getEngineConfig()
                           .getScheduleStrategyConfig(
                                   ScheduleStrategy.WAIT_RESCHEDULE, 
WaitRescheduleConfig.class);
           if (config == null) {
               config = new WaitRescheduleConfig();
           }
           int maxRetryTimes = config.getMaxRetryTimes();
           
           // Validate config
           if (maxRetryTimes < 1) {
               maxRetryTimes = 3;  // fallback to default
           }
           
           int checkTimes = context.getPendingJobInfo().getCheckTimes();
           
           // Only reschedule if:
           // 1. maxRetryTimes > 0 (enabled)
           // 2. queue has more than 1 job (meaningful to reschedule)
           // 3. checkTimes >= maxRetryTimes (give the job enough chances)
           // 4. checkTimes is a multiple of maxRetryTimes (periodic reschedule)
           if (maxRetryTimes > 0
                   && context.getPendingJobQueue().size() > 1
                   && checkTimes >= maxRetryTimes
                   && checkTimes % maxRetryTimes == 0) {
               boolean moved = context.moveHeadToTail();
               if (moved) {
                   // Optional: log the reschedule action
               }
           }
           context.sleep(config.getSleepIntervalMillis());
       }
   }
   ```
   
   **Rationale**:
   - Adding `checkTimes >= maxRetryTimes` ensures the job has waited for 
sufficient rounds
   - When `maxRetryTimes = 1`, the 1st check (checkTimes=1) will trigger, which 
is reasonable (rotate every time)
   - When `maxRetryTimes = 3`, the 3rd, 6th, 9th checks will trigger, meeting 
expectations
   
   **Or a simpler solution**: Modify configuration validation to prohibit 
`maxRetryTimes = 1`
   
   ```java
   // ScheduleStrategy.java
   WAIT_RESCHEDULE(
       new String[] {PENDING_JOB_RESCHEDULE.key()},
       WaitRescheduleConfig::new,
       config -> {
           WaitRescheduleConfig waitRescheduleConfig = (WaitRescheduleConfig) 
config;
           if (waitRescheduleConfig.getMaxRetryTimes() < 2) {  // Change to < 2
               throw new InvalidConfigurationException(
                   "pending-job-reschedule.max-retry-times must be >= 2");
           }
           // ...
       }),
   ```
   
   ---
   
   ### Issue 3: PendingJobScheduleContext lacks JavaDoc and contract 
documentation
   
   **Location**: 
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PendingJobScheduleContext.java:1-47`
   
   ```java
   @Value
   public class PendingJobScheduleContext {
       PendingJobInfo pendingJobInfo;
       PeekBlockingQueue<PendingJobInfo> pendingJobQueue;
       EngineConfig engineConfig;
       long jobId;
       Runnable failJobAction;
   
       public boolean moveHeadToTail() {
           return pendingJobQueue.moveToTail(jobId);
       }
   
       public void failJob() {
           failJobAction.run();
       }
   
       public void sleep(long sleepMillis) throws InterruptedException {
           if (sleepMillis > 0) {
               Thread.sleep(sleepMillis);
           }
       }
   }
   ```
   
   **Related Context**:
   - Interface: `PendingJobSchedulePolicy` (line 18-21)
   - Implementation classes: `WaitPolicy`, `WaitReschedulePolicy`, 
`RejectPolicy`
   
   **Problem Description**:
   
   As a context object for scheduling strategies, this class lacks necessary 
documentation:
   1. No class-level JavaDoc explaining its purpose
   2. No explanation of the meaning and lifecycle of each field
   3. No explanation of the execution timing and precautions for `failJobAction`
   4. No explanation of method contracts (especially the meaning of the return 
value of `moveHeadToTail`)
   
   **Potential Risks**:
   - Risk 1: Other developers may not be aware that `failJobAction` is executed 
from an external thread, which may lead to thread safety issues
   - Risk 2: Unclear about the behavior when `moveHeadToTail` returns false 
(should processing continue?)
   - Risk 3: Unclear whether the relationship between `jobId` and 
`pendingJobInfo.getJobId()` is consistent
   
   **Impact Scope**:
   - Direct impact: All implementers of `PendingJobSchedulePolicy`
   - Indirect impact: Future maintainers
   - Impact area: Scheduling strategy extension point
   
   **Severity**: MINOR
   
   **Improvement Suggestions**:
   
   ```java
   /**
    * Context object for pending job schedule policies.
    * 
    * <p>This context provides all necessary information and actions for a 
schedule policy
    * to decide what to do when resources are not enough for a pending job.
    * 
    * <p><b>Thread Safety:</b> This object is typically created 
per-schedule-cycle and
    * should not be shared across threads. The {@code failJobAction} is 
executed immediately
    * when {@link #failJob()} is called, which may be from a different thread.
    * 
    * <p><b>Lifecycle:</b> A new context is created for each iteration of the 
pending job
    * schedule loop and discarded after use.
    */
   @Value
   public class PendingJobScheduleContext {
       /**
        * The pending job info being scheduled. Contains job state and metadata.
        */
       PendingJobInfo pendingJobInfo;
       
       /**
        * The pending job queue containing all waiting jobs.
        */
       PeekBlockingQueue<PendingJobInfo> pendingJobQueue;
       
       /**
        * The engine configuration containing schedule strategy settings.
        */
       EngineConfig engineConfig;
       
       /**
        * The job ID being scheduled. Should match {@code 
pendingJobInfo.getJobId()}.
        */
       long jobId;
       
       /**
        * Action to fail the current job. This action will:
        * <ul>
        *   <li>Mark the job as FAILED</li>
        *   <li>Remove it from the pending queue</li>
        *   <li>Notify the client</li>
        * </ul>
        * 
        * <p><b>Important:</b> This action should only be called once. Multiple 
calls
        * may result in unexpected behavior.
        */
       Runnable failJobAction;
   
       /**
        * Attempt to move the current job (which should be at the head of the 
queue) to the tail.
        * 
        * <p>This is a "best effort" operation. It will only succeed if:
        * <ul>
        *   <li>The job exists in the queue</li>
        *   <li>The job is currently at the head of the queue</li>
        *   <li>No concurrent operation has removed the job</li>
        * </ul>
        * 
        * @return {@code true} if the job was successfully moved to the tail;
        *         {@code false} otherwise (e.g., job not at head, already 
removed, etc.)
        */
       public boolean moveHeadToTail() {
           return pendingJobQueue.moveToTail(jobId);
       }
   
       /**
        * Fail the current job immediately.
        * 
        * <p>This will execute the {@code failJobAction} provided at 
construction time.
        * After calling this method, the job should not be accessed anymore.
        */
       public void failJob() {
           failJobAction.run();
       }
   
       /**
        * Sleep for the specified duration.
        * 
        * @param sleepMillis sleep duration in milliseconds; if <= 0, returns 
immediately
        * @throws InterruptedException if the thread is interrupted while 
sleeping
        */
       public void sleep(long sleepMillis) throws InterruptedException {
           if (sleepMillis > 0) {
               Thread.sleep(sleepMillis);
           }
       }
   }
   ```
   
   **Rationale**:
   - Complete JavaDoc can help other developers correctly understand and use 
this class
   - Explaining thread safety and lifecycle can prevent misuse
   - Clarifying method contracts and return value meanings can improve code 
maintainability
   
   ---
   
   ### Issue 4: YamlSeaTunnelDomConfigProcessor's configuration parsing logic 
is complex and hard to maintain
   
   **Location**: 
`seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java:289-378`
   
   ```java
   private void parseJobScheduleStrategyNode(
           Node jobScheduleStrategyNode,
           EngineConfig engineConfig,
           EnumSet<ScheduleStrategy> configuredScheduleStrategyConfigs) {
       // ... approximately 50 lines of complex logic
   }
   
   private ScheduleStrategyConfig parseScheduleStrategyConfig(
           Node scheduleStrategyConfigNode,
           ScheduleStrategy scheduleStrategy,
           String sectionName) {
       // ...
   }
   
   private void setScheduleStrategyConfigValue(
           ScheduleStrategyConfig config, String key, String value) {
       // Use reflection to invoke setter
       String propertyName = toCamelCase(key);
       String setterName = "set" + capitalize(propertyName);
       Method setter = Arrays.stream(config.getClass().getMethods())
               .filter(m -> m.getName().equals(setterName) && 
m.getParameterCount() == 1)
               .findFirst()
               .orElseThrow(() -> new 
InvalidConfigurationException("Unrecognized element: " + key));
       // ...
   }
   ```
   
   **Related Context**:
   - Caller: `parseEngineConfig` (line 146-287)
   - Configuration classes: `WaitConfig`, `WaitRescheduleConfig`
   - Enum: `ScheduleStrategy`
   
   **Problem Description**:
   
   1. **Safety of reflection usage**:
      - Using reflection to call setter methods, although there is type 
checking, risks still exist
      - If the configuration class adds setters not requiring user 
configuration (such as `setInternalState`), they may be called incorrectly
      - No restriction on accessible methods (such as public/private)
   
   2. **Error handling is not detailed enough**:
      ```java
      } catch (Exception e) {
          throw new InvalidConfigurationException(
              "Set config failed for key: " + key + ", value: " + value, e);
      }
      ```
      捕获所有异常并包装,但原始异常信息可能不够清晰。
   
   3. **配置解析路径复杂**:
      - 支持两种配置格式(简单形式和嵌套形式)
      - 两个解析路径可能在边界情况下产生冲突
      - `parseJobScheduleStrategyNode` 和主解析逻辑都有处理配置节的代码
   
   ** Potential Risk**:
   - 风险1:反射可能调用到不该调用的方法(如父类的方法)
   - 风险2:配置错误时,错误信息可能不清晰
   - 风险3:添加新的配置项时,需要同时修改多处代码
   
   ** Impact Scope**:
   - 直接影响:所有使用 ScheduleStrategy 配置的用户
   - 间接影响:未来的配置扩展
   - 影响面:配置加载模块
   
   ** Severity**:MAJOR
   
   ** Improvement Suggestion**:
   
   方案1:改进反射逻辑,增加安全性检查
   ```java
   private void setScheduleStrategyConfigValue(
           ScheduleStrategyConfig config, String key, String value) {
       String propertyName = toCamelCase(key);
       String setterName = "set" + capitalize(propertyName);
       
       // Find the setter method
       Method setter = Arrays.stream(config.getClass().getMethods())
               .filter(m -> m.getName().equals(setterName) 
                       && m.getParameterCount() == 1
                       && m.getDeclaringClass().equals(config.getClass()))  // 
Only find methods of current class
               .findFirst()
               .orElseThrow(() -> 
                   new InvalidConfigurationException(
                       "Unrecognized config key: " + key + 
                       ". Valid keys are: " + getValidConfigKeys(config)));
       
       Class<?> paramType = setter.getParameterTypes()[0];
       Object convertedValue = convertValue(key, value, paramType);
       
       try {
           setter.invoke(config, convertedValue);
       } catch (IllegalAccessException e) {
           throw new InvalidConfigurationException(
               "Cannot access setter for key: " + key, e);
       } catch (IllegalArgumentException e) {
           throw new InvalidConfigurationException(
               String.format("Invalid value '%s' for key '%s'. Expected type: 
%s", 
                   value, key, paramType.getSimpleName()), e);
       } catch (InvocationTargetException e) {
           throw new InvalidConfigurationException(
               "Failed to set config for key: " + key, e.getCause());
       }
   }
   
   private String getValidConfigKeys(ScheduleStrategyConfig config) {
       return Arrays.stream(config.getClass().getMethods())
               .filter(m -> m.getName().startsWith("set") 
                       && m.getParameterCount() == 1
                       && m.getDeclaringClass().equals(config.getClass()))
               .map(m -> decapitalize(m.getName().substring(3)))
               .collect(Collectors.joining(", "));
   }
   ```
   
   方案2:考虑使用更成熟的配置绑定方案(如 Jackson 或 MapStruct)
   ```java
   // Use Jackson's ObjectMapper
   ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
   // Convert Node to configuration object
   WaitRescheduleConfig config = mapper.treeToValue(configNode, 
WaitRescheduleConfig.class);
   ```
   
   但考虑到 Hazelcast 已经有自己的配置处理机制,方案1更合适。
   
   ** Rationale**:
   - 方案1保持了现有架构,改进了安全性和错误处理
   - 方案2需要更大改动,可能引入新的依赖
   - 建议:优先采用方案1,并添加单元测试覆盖配置解析逻辑
   
   ---
   
   # ## Issue 5: Lack of logging and monitoring metrics for rescheduling 
operations
   
   ** 
Location**:`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PendingJobSchedulePolicyFactory.java:54-74`
   ```java
   private static class WaitReschedulePolicy implements 
PendingJobSchedulePolicy {
       @Override
       public void onResourcesNotEnough(PendingJobScheduleContext context)
               throws InterruptedException {
           // ...
           if (maxRetryTimes > 0
                   && context.getPendingJobQueue().size() > 1
                   && checkTimes > 0
                   && checkTimes % maxRetryTimes == 0) {
               context.moveHeadToTail();  // ⚠️ No logging
           }
           context.sleep(config.getSleepIntervalMillis());
       }
   }
   ```
   
   ** Related Context**:
   - `CoordinatorService` 有日志记录,但没有记录重调度操作
   - 没有对应的 Metrics 指标
   
   ** Issue Description**:
   
   当发生重调度操作时(job 从队列头部移到尾部),没有任何日志记录或监控指标。这导致:
   1. 无法通过日志追踪 job 的调度历史
   2. 无法验证重调度策略是否按预期工作
   3. 无法在监控系统中看到重调度频率
   4. 调试调度问题时困难
   
   ** Potential Risk**:
   - 风险1:生产环境中出现调度问题时,无法通过日志排查
   - 风险2:无法量化重调度对系统性能的影响
   - 风险3:无法检测到配置错误(如 maxRetryTimes 设置过小导致频繁重调度)
   
   ** Impact Scope**:
   - 直接影响:运维和问题排查
   - 间接影响:系统可观测性
   - 影响面:生产环境的可维护性
   
   ** Severity**:MAJOR
   
   ** Improvement Suggestion**:
   ```java
   private static class WaitReschedulePolicy implements 
PendingJobSchedulePolicy {
       @Override
       public void onResourcesNotEnough(PendingJobScheduleContext context)
               throws InterruptedException {
           WaitRescheduleConfig config =
                   context.getEngineConfig()
                           .getScheduleStrategyConfig(
                                   ScheduleStrategy.WAIT_RESCHEDULE, 
WaitRescheduleConfig.class);
           if (config == null) {
               config = new WaitRescheduleConfig();
           }
           int maxRetryTimes = config.getMaxRetryTimes();
           int checkTimes = context.getPendingJobInfo().getCheckTimes();
           
           boolean rescheduled = false;
           if (maxRetryTimes > 0
                   && context.getPendingJobQueue().size() > 1
                   && checkTimes > 0
                   && checkTimes % maxRetryTimes == 0) {
               boolean moved = context.moveHeadToTail();
               if (moved) {
                   rescheduled = true;
                   // Log
                   log.info(
                       "Rescheduled job {} to tail of pending queue after {} 
checks (maxRetryTimes={})",
                       context.getJobId(), checkTimes, maxRetryTimes);
                   
                   // Optional: Record Metrics
                   // 
metricsRegistry.counter("job.reschedule.count").increment();
                   
                   // Optional: Record events to IMap for REST API query
                   // jobRescheduleEvents.put(context.getJobId(), new 
RescheduleEvent(...));
               }
           }
           
           context.sleep(config.getSleepIntervalMillis());
       }
   }
   ```
   
   同时,在 `PeekBlockingQueue.moveToTail` 中也添加日志:
   ```java
   public boolean moveToTail(Long jobId) {
       lock.lock();
       try {
           E element = jobIdMap.get(jobId);
           if (element == null) {
               log.warn("Failed to move job {} to tail: job not found in 
queue", jobId);
               return false;
           }
           E head = queue.peek();
           if (head == null || !head.equals(element)) {
               log.debug("Job {} is not at head of queue, skipping move to 
tail", jobId);
               return false;
           }
           if (!queue.remove(element)) {
               log.warn("Failed to move job {} to tail: remove operation 
failed", jobId);
               return false;
           }
           queue.put(element);
           notEmpty.signalAll();
           log.info("Successfully moved job {} from head to tail of pending 
queue", jobId);
           return true;
       } catch (InterruptedException e) {
           log.error("Move element to tail failed. {}", 
ExceptionUtils.getMessage(e));
           Thread.currentThread().interrupt();
           return false;
       } finally {
           lock.unlock();
       }
   }
   ```
   
   ** Rationale**:
   - 添加日志可以追踪重调度操作,便于问题排查
   - 添加 Metrics 可以量化重调度频率,便于性能调优
   - 使用不同的日志级别(INFO/WARN/DEBUG)可以减少日志量
   - 日志包含关键信息(jobId, checkTimes, maxRetryTimes)便于分析
   
   ** Optional Further Improvement**: Add REST API to query job rescheduling 
history
   
   ---
   
   # ## Issue 6: Improper handling of InterruptedException, interrupt signal is 
swallowed
   
   ** 
Location**:`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:303-305`
   ```java
   try {
       pendingJobSchedulePolicy.onResourcesNotEnough(context);
   } catch (InterruptedException e) {
       logger.severe(ExceptionUtils.getMessage(e));  // ⚠️ Issue
   }
   ```
   
   ** Related Context**:
   - 调用方:`pendingJobSchedule` 方法
   - 线程:`startPendingJobScheduleThread` 创建的后台线程
   
   ** Issue Description**:
   
   当捕获到 `InterruptedException` 时,代码只是记录了日志,但没有:
   1. 恢复线程的中断状态(`Thread.currentThread().interrupt()`)
   2. 重新抛出异常
   3. 退出循环或进行清理
   
   根据 Java 最佳实践,当捕获到 `InterruptedException` 时,应该:
   - 重新恢复中断状态
   - 或者向上传播异常
   - 或者退出线程
   
   当前的处理方式会导致中断信号被"吞掉",线程会继续运行,可能无法响应外部的中断请求(如 JVM 关闭)。
   
   ** Potential Risk**:
   - 风险1:当外部尝试中断调度线程时(如关闭服务),线程可能不会及时响应
   - 风险2:可能掩盖了真正的中断原因
   - 风险3:违反了 Java 并发编程的最佳实践
   
   ** Impact Scope**:
   - 直接影响:调度线程的生命周期管理
   - 间接影响:服务关闭时的资源清理
   - 影响面:服务可维护性
   
   ** Severity**:MAJOR
   
   ** Improvement Suggestion**:
   
   方案1:恢复中断状态并退出循环(推荐)
   ```java
   try {
       pendingJobSchedulePolicy.onResourcesNotEnough(context);
   } catch (InterruptedException e) {
       logger.warning("Pending job schedule thread interrupted, exiting");
       Thread.currentThread().interrupt();  // Restore interrupt status
       throw new RuntimeException("Schedule thread interrupted", e);  // Rethrow
   }
   ```
   
   方案2:如果希望继续运行(不推荐)
   ```java
   try {
       pendingJobSchedulePolicy.onResourcesNotEnough(context);
   } catch (InterruptedException e) {
       logger.warning("Pending job schedule interrupted, continuing");
       Thread.currentThread().interrupt();  // At least restore interrupt status
       return;  // This loop interrupted, continue next loop
   }
   ```
   
   方案3:检查中断标志(最优雅)
   ```java
   // Check at method start
   private void pendingJobSchedule() throws InterruptedException {
       if (Thread.currentThread().isInterrupted()) {
           throw new InterruptedException("Schedule thread was interrupted");
       }
       
       PendingJobInfo pendingJobInfo = pendingJobQueue.peekBlocking();
       // ...
       
       try {
           pendingJobSchedulePolicy.onResourcesNotEnough(context);
       } catch (InterruptedException e) {
           logger.info("Pending job schedule thread interrupted");
           Thread.currentThread().interrupt();
           throw e;  // Propagate up
       }
   }
   ```
   
   ** Rationale**:
   - 方案1符合最佳实践,确保中断信号不会丢失
   - 方案2至少恢复了中断状态,但可能导致线程"半死不活"
   - 方案3最优雅,但需要修改方法签名
   
   建议采用方案1,因为:
   1. 外部调用者(startPendingJobScheduleThread)已经处理了 RuntimeException
   2. 明确表示线程被中断,需要退出
   3. 遵循 Java 并发最佳实践
   
   ---
   
   # ## Issue 7: Insufficient test coverage, missing edge case and concurrent 
testing
   
   ** 
Location**:`seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueueTest.java:126-136`
   ```java
   @Test
   public void testMoveToTail() throws InterruptedException {
       queue.put("1");
       queue.put("2");
       queue.put("3");
       Assertions.assertTrue(queue.moveToTail(1L));
       Assertions.assertEquals("2", queue.peekBlocking());
       Assertions.assertEquals("2", queue.take());
       Assertions.assertEquals("3", queue.take());
       Assertions.assertEquals("1", queue.take());
   }
   ```
   
   ** Related Context**:
   - 被测试类:`PeekBlockingQueue.moveToTail`
   - 
相关测试:`SeaTunnelEngineClusterRoleTest.pendingQueueRescheduleAllowsLaterJobRunMultiNode`
   
   ** Issue Description**:
   
   当前的测试只覆盖了正常情况下的 `moveToTail` 操作,缺少以下测试场景:
   
   1. **边界情况**:
      - 空队列
      - 单元素队列
      - 移动不存在的 jobId
      - 移动已经在尾部的元素
   
   2. **并发场景**:
      - 移动的同时另一个线程在 take
      - 移动的同时另一个线程在 removeById
      - 多个线程同时调用 moveToTail
   
   3. **配置相关**:
      - 不同的 maxRetryTimes 值(0, 1, 2, 5, 10)
      - 不同的 sleepIntervalMillis 值
      - 配置验证测试(负值、0等)
   
   4. **策略相关**:
      - WaitPolicy vs WaitReschedulePolicy 的行为差异
      - REJECT 策略是否正确失败 job
   
   ** Potential Risk**:
   - 风险1:边界条件下的 bug 可能在生产环境中出现
   - 风险2:并发问题可能在高负载下暴露
   - 风险3:配置错误可能在运行时才被发现
   
   ** Impact Scope**:
   - 直接影响:代码质量和稳定性
   - 间接影响:生产环境可靠性
   - 影响面:所有使用该功能的用户
   
   ** Severity**:MAJOR
   
   ** Improvement Suggestion**:
   
   添加以下测试用例:
   ```java
   // PeekBlockingQueueTest.java
   
   @Test
   public void testMoveToTail_EmptyQueue() throws InterruptedException {
       // Empty queue, moving non-existent element
       Assertions.assertFalse(queue.moveToTail(999L));
       Assertions.assertEquals(0, queue.size());
   }
   
   @Test
   public void testMoveToTail_SingleElementQueue() throws InterruptedException {
       // Single element queue
       queue.put("1");
       Assertions.assertFalse(queue.moveToTail(1L));  // size() == 1, should 
not move
       Assertions.assertEquals("1", queue.take());
   }
   
   @Test
   public void testMoveToTail_NonExistentJob() throws InterruptedException {
       // Move non-existent jobId
       queue.put("1");
       queue.put("2");
       Assertions.assertFalse(queue.moveToTail(999L));
       Assertions.assertEquals(2, queue.size());
   }
   
   @Test
   public void testMoveToTail_NonHeadElement() throws InterruptedException {
       // Move non-head element (should fail)
       queue.put("1");
       queue.put("2");
       queue.put("3");
       Assertions.assertFalse(queue.moveToTail(2L));  // 2 is not at head
       Assertions.assertEquals("1", queue.take());
       Assertions.assertEquals("2", queue.take());
       Assertions.assertEquals("3", queue.take());
   }
   
   @Test
   public void testMoveToTail_ConcurrentWithTake() throws InterruptedException {
       // Concurrent test: moving while another thread is taking
       queue.put("1");
       queue.put("2");
       queue.put("3");
       
       AtomicBoolean moved = new AtomicBoolean(false);
       Thread t1 = new Thread(() -> {
           try {
               moved.set(queue.moveToTail(1L));
           } catch (Exception e) {
               e.printStackTrace();
           }
       });
       
       Thread t2 = new Thread(() -> {
           try {
               Thread.sleep(10);  // Ensure moveToTail executes first
               queue.take();
           } catch (Exception e) {
               e.printStackTrace();
           }
       });
       
       t1.start();
       t2.start();
       t1.join();
       t2.join();
       
       // Verify queue state is consistent
       // ...
   }
   
   @Test
   public void testMoveToTail_ConcurrentWithRemoveById() throws 
InterruptedException {
       // Concurrent test: moving while another thread is removing by ID
       queue.put("1");
       queue.put("2");
       queue.put("3");
       
       AtomicBoolean moved = new AtomicBoolean(false);
       AtomicBoolean removed = new AtomicBoolean(false);
       
       Thread t1 = new Thread(() -> {
           moved.set(queue.moveToTail(1L));
       });
       
       Thread t2 = new Thread(() -> {
           removed.set(queue.removeById(1L));
       });
       
       t1.start();
       t2.start();
       t1.join();
       t2.join();
       
       // Verify: only one operation succeeded
       Assertions.assertTrue(moved.get() ^ removed.get());  // XOR
   }
   
   // Configuration tests
   
   @Test
   public void testWaitRescheduleConfig_Validation() {
       WaitRescheduleConfig config = new WaitRescheduleConfig();
       
       // Test default values
       Assertions.assertEquals(3, config.getMaxRetryTimes());
       Assertions.assertEquals(3000, config.getSleepIntervalMillis());
       
       // Test boundary values
       config.setMaxRetryTimes(1);
       Assertions.assertEquals(1, config.getMaxRetryTimes());
       
       config.setSleepIntervalMillis(100);
       Assertions.assertEquals(100, config.getSleepIntervalMillis());
   }
   
   // Policy tests
   
   @Test
   public void testWaitReschedulePolicy_WithZeroCheckTimes() {
       // Test checkTimes = 0 scenario
       PendingJobInfo mockInfo = mock(PendingJobInfo.class);
       when(mockInfo.getCheckTimes()).thenReturn(0);
       
       PendingJobScheduleContext context = new PendingJobScheduleContext(
           mockInfo, mockQueue, mockConfig, jobId, mockAction
       );
       
       WaitReschedulePolicy policy = new WaitReschedulePolicy();
       policy.onResourcesNotEnough(context);
       
       // Verify: when checkTimes = 0, rescheduling is not triggered
       verify(mockQueue, never()).moveToTail(anyLong());
   }
   ```
   
   ** Rationale**:
   - 边界测试可以发现空指针、越界等常见 bug
   - 并发测试可以发现竞态条件和死锁
   - 配置测试可以提前发现配置错误
   - 策略测试可以验证不同策略的行为差异
   - 这些测试虽然增加了开发时间,但能显著提高代码质量和可靠性
   
   ---
   
   # ## Issue 8: PeekBlockingQueue.moveToTail condition check is too strict
   
   ** 
Location**:`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueue.java:127-143`
   
   ** Issue Description**:
   ```java
   E head = queue.peek();
   if (head == null || !head.equals(element)) {
       return false;  // ⚠️ Issue
   }
   ```
   
   这个检查确保只有队首元素才能被移动到尾部。但在多线程环境下,可能出现以下情况:
   
   1. 线程A调用 `moveToTail(jobId)`
   2. 线程A执行到 `E head = queue.peek()`,假设 head = job1
   3. 线程B调用 `take()`,取走了 job1
   4. 线程A继续执行 `!head.equals(element)`,发现不匹配,返回 false
   5. 但实际上,job1 已经被取走,应该从队列中移除,不应该再被移动
   
   更关键的是,如果我们的目标是"公平调度",那么在以下情况下应该也能移动:
   - job 在队列中间位置(不是队首)
   - job 刚被一个短 job 取代了位置
   
   ** Potential Risk**:
   - 在某些竞态条件下,可能导致 job 无法被正确重调度
   - 限制了未来的扩展性(如实现更灵活的调度策略)
   
   ** Impact Scope**:
   - 直接影响:`moveToTail` 的行为
   - 间接影响:重调度策略的有效性
   - 影响面:调度模块
   
   ** Severity**:MINOR
   
   ** Improvement Suggestion**:
   
   这个问题实际上是问题1的延伸。如果采纳问题1的方案2(实现真正的 moveToTail),则此问题自动解决。
   
   如果保持当前设计(只移动队首),则建议:
   ```java
   /**
    * Move the head element to the tail of the queue if it matches the given 
jobId.
    * 
    * <p>This is a specialized method for job rescheduling. It will only 
succeed if:
    * <ul>
    *   <li>The job exists in the queue</li>
    *   <li>The job is currently at the head of the queue</li>
    * </ul>
    * 
    * <p>If the job is not at the head (e.g., due to concurrent operations), 
    * this method will return false and the caller should retry in the next 
cycle.
    * 
    * @param jobId the job id that should be at the head
    * @return true if the head job was moved to tail; false otherwise
    */
   public boolean moveHeadToTailIfAtHead(Long jobId) {
       lock.lock();
       try {
           E element = jobIdMap.get(jobId);
           if (element == null) {
               return false;
           }
           
           E head = queue.peek();
           if (head == null || !head.equals(element)) {
               return false;
           }
           
           if (!queue.remove(element)) {
               return false;
           }
           
           queue.put(element);
           notEmpty.signalAll();
           return true;
       } catch (InterruptedException e) {
           log.error("Move element to tail failed. {}", 
ExceptionUtils.getMessage(e));
           Thread.currentThread().interrupt();
           return false;
       } finally {
           lock.unlock();
       }
   }
   ```
   
   The key is to improve the JavaDoc, clearly documenting the method's contract 
and behavior.
   
   ---


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to