DanielCarter-stack commented on PR #10430:
URL: https://github.com/apache/seatunnel/pull/10430#issuecomment-3832461773
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10430", "part": 1,
"total": 1} -->
### Issue 1: PeekBlockingQueue.moveToTail can only move the head element
**Location**:
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueue.java:127-143`
```java
public boolean moveToTail(Long jobId) {
lock.lock();
try {
E element = jobIdMap.get(jobId);
if (element == null) {
return false;
}
E head = queue.peek();
if (head == null || !head.equals(element)) { // ⚠️ Issue
return false;
}
if (!queue.remove(element)) {
return false;
}
queue.put(element);
notEmpty.signalAll();
return true;
} catch (InterruptedException e) {
log.error("Move element to tail failed. {}",
ExceptionUtils.getMessage(e));
Thread.currentThread().interrupt();
return false;
} finally {
lock.unlock();
}
}
```
**Related Context**:
- Caller: `PendingJobScheduleContext.moveHeadToTail` (line 31-33)
- Caller: `WaitReschedulePolicy.onResourcesNotEnough` (line 71)
**Problem Description**:
The method name is `moveToTail` (move any element to tail), but the actual
implementation is:
```java
E head = queue.peek();
if (head == null || !head.equals(element)) {
return false; // If it's not the head of the queue, return false
directly
}
```
This means the method can only move the head element to the tail. Although
current usage scenarios indeed only require moving the head element, the
mismatch between the method name and implementation can easily cause
misunderstanding.
**Potential Risks**:
- Risk 1: If other developers call this method to move non-head elements in
the future, it will fail silently (return false), which may lead to logic errors
- Risk 2: Reduced code readability, maintainers may mistakenly believe any
element can be moved
**Impact Scope**:
- Direct impact: `PeekBlockingQueue.moveToTail`
- Indirect impact: All callers (currently only
`PendingJobScheduleContext.moveHeadToTail`)
- Impact area: Core scheduling module
**Severity**: MINOR
**Improvement Suggestions**:
Solution 1: Rename the method to clarify its purpose
```java
/**
* Move the head element to the tail of the queue if it matches the given
jobId.
* @param jobId the job id of the head element
* @return true if the head element was moved to tail; false otherwise
*/
public boolean moveHeadToTailIfMatch(Long jobId) {
// ... keep existing implementation
}
```
Solution 2: Implement true "move any element to tail" functionality
```java
/**
* Move the element with the given jobId to the tail of the queue.
* @param jobId the job id to move
* @return true if the element was moved; false if not found or already at
tail
*/
public boolean moveToTail(Long jobId) {
lock.lock();
try {
E element = jobIdMap.get(jobId);
if (element == null) {
return false;
}
// Check if already at tail
if (queue.isEmpty()) {
return false;
}
// Remove the element (wherever it is)
if (!queue.remove(element)) {
return false;
}
// Add to tail
queue.put(element);
notEmpty.signalAll();
return true;
} catch (InterruptedException e) {
log.error("Move element to tail failed. {}",
ExceptionUtils.getMessage(e));
Thread.currentThread().interrupt();
return false;
} finally {
lock.unlock();
}
}
```
**Rationale**:
- Solution 1 better fits current usage scenarios with minimal risk
- Solution 2 is more general-purpose but may increase complexity
- Recommendation: Prioritize Solution 1; if moving non-head elements is
needed in the future, adopt Solution 2
---
### Issue 2: WaitReschedulePolicy's rescheduling condition logic has
potential issues
**Location**:
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PendingJobSchedulePolicyFactory.java:54-74`
```java
private static class WaitReschedulePolicy implements
PendingJobSchedulePolicy {
@Override
public void onResourcesNotEnough(PendingJobScheduleContext context)
throws InterruptedException {
WaitRescheduleConfig config =
context.getEngineConfig()
.getScheduleStrategyConfig(
ScheduleStrategy.WAIT_RESCHEDULE,
WaitRescheduleConfig.class);
if (config == null) {
config = new WaitRescheduleConfig();
}
int maxRetryTimes = config.getMaxRetryTimes();
int checkTimes = context.getPendingJobInfo().getCheckTimes();
if (maxRetryTimes > 0
&& context.getPendingJobQueue().size() > 1
&& checkTimes > 0
&& checkTimes % maxRetryTimes == 0) { // ⚠️ Potential Issue
context.moveHeadToTail();
}
context.sleep(config.getSleepIntervalMillis());
}
}
```
**Related Context**:
- `PendingJobInfo.checkTimes` initial value is 0
- `PendingJobInfo.recordSnapshot` will call `checkTimes.incrementAndGet()`
- After first call, `checkTimes` becomes 1
**Problem Description**:
The rescheduling condition `checkTimes % maxRetryTimes == 0` may not meet
expectations in the following scenarios:
Assuming `maxRetryTimes = 3`:
- 1st scheduling loop: `checkTimes = 0`, condition is `0 % 3 == 0` →
**true**, triggers rescheduling
- 2nd scheduling loop: `checkTimes = 1`, condition is `1 % 3 == 0` → false,
no trigger
- 3rd scheduling loop: `checkTimes = 2`, condition is `2 % 3 == 0` → false,
no trigger
- 4th scheduling loop: `checkTimes = 3`, condition is `3 % 3 == 0` →
**true**, triggers rescheduling
The problem lies in the 1st scheduling: if a job just entered the queue,
it's unreasonable to move it to the tail on the first check due to insufficient
resources. This would cause the job to "jump" in the queue.
Although there is `checkTimes > 0` protection, `checkTimes` is incremented
in `recordSnapshot`, not at the start of the scheduling loop. Need to confirm
where `recordSnapshot` is called.
**Verify calling location of `recordSnapshot`**:
By searching the code, `recordSnapshot` is called in
`CoordinatorService.pendingJobSchedule`:
```java
// When resources are insufficient
PendingJobDiagnostic diagnostic =
PendingDiagnosticsCollector.collectJobDiagnostic(...);
pendingJobInfo.recordSnapshot(diagnostic);
```
This means `checkTimes` is indeed incremented after each insufficient
resource check.
**However**, when first entering the pending queue, `checkTimes = 0`. If
resources are insufficient at this time:
1. `recordSnapshot` is called, `checkTimes` becomes 1
2. Next scheduling loop, `checkTimes = 1`
3. Condition `1 % 3 == 0` is false, no rescheduling triggered
So in practice, in the `maxRetryTimes = 3` case, rescheduling will be
triggered on the 3rd, 6th, 9th... checks, which is reasonable.
**Potential Risks**:
- Risk 1: If `maxRetryTimes = 1`, rescheduling will be triggered on the 1st,
2nd, 3rd... every time, causing continuous queue rotation
- Risk 2: Doesn't consider the scenario where a job just entered the queue
(although the `checkTimes` mechanism has already handled this)
- Risk 3: The increment of `checkTimes` and the judgment of rescheduling
condition are separated, which may have timing issues
**Impact Scope**:
- Direct impact: `WaitReschedulePolicy.onResourcesNotEnough`
- Indirect impact: All jobs using WAIT_RESCHEDULE strategy
- Impact area: Core scheduling logic
**Severity**: MINOR
**Improvement Suggestions**:
```java
private static class WaitReschedulePolicy implements
PendingJobSchedulePolicy {
@Override
public void onResourcesNotEnough(PendingJobScheduleContext context)
throws InterruptedException {
WaitRescheduleConfig config =
context.getEngineConfig()
.getScheduleStrategyConfig(
ScheduleStrategy.WAIT_RESCHEDULE,
WaitRescheduleConfig.class);
if (config == null) {
config = new WaitRescheduleConfig();
}
int maxRetryTimes = config.getMaxRetryTimes();
// Validate config
if (maxRetryTimes < 1) {
maxRetryTimes = 3; // fallback to default
}
int checkTimes = context.getPendingJobInfo().getCheckTimes();
// Only reschedule if:
// 1. maxRetryTimes > 0 (enabled)
// 2. queue has more than 1 job (meaningful to reschedule)
// 3. checkTimes >= maxRetryTimes (give the job enough chances)
// 4. checkTimes is a multiple of maxRetryTimes (periodic reschedule)
if (maxRetryTimes > 0
&& context.getPendingJobQueue().size() > 1
&& checkTimes >= maxRetryTimes
&& checkTimes % maxRetryTimes == 0) {
boolean moved = context.moveHeadToTail();
if (moved) {
// Optional: log the reschedule action
}
}
context.sleep(config.getSleepIntervalMillis());
}
}
```
**Rationale**:
- Adding `checkTimes >= maxRetryTimes` ensures the job has waited for
sufficient rounds
- When `maxRetryTimes = 1`, the 1st check (checkTimes=1) will trigger, which
is reasonable (rotate every time)
- When `maxRetryTimes = 3`, the 3rd, 6th, 9th checks will trigger, meeting
expectations
**Or a simpler solution**: Modify configuration validation to prohibit
`maxRetryTimes = 1`
```java
// ScheduleStrategy.java
WAIT_RESCHEDULE(
new String[] {PENDING_JOB_RESCHEDULE.key()},
WaitRescheduleConfig::new,
config -> {
WaitRescheduleConfig waitRescheduleConfig = (WaitRescheduleConfig)
config;
if (waitRescheduleConfig.getMaxRetryTimes() < 2) { // Change to < 2
throw new InvalidConfigurationException(
"pending-job-reschedule.max-retry-times must be >= 2");
}
// ...
}),
```
---
### Issue 3: PendingJobScheduleContext lacks JavaDoc and contract
documentation
**Location**:
`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PendingJobScheduleContext.java:1-47`
```java
@Value
public class PendingJobScheduleContext {
PendingJobInfo pendingJobInfo;
PeekBlockingQueue<PendingJobInfo> pendingJobQueue;
EngineConfig engineConfig;
long jobId;
Runnable failJobAction;
public boolean moveHeadToTail() {
return pendingJobQueue.moveToTail(jobId);
}
public void failJob() {
failJobAction.run();
}
public void sleep(long sleepMillis) throws InterruptedException {
if (sleepMillis > 0) {
Thread.sleep(sleepMillis);
}
}
}
```
**Related Context**:
- Interface: `PendingJobSchedulePolicy` (line 18-21)
- Implementation classes: `WaitPolicy`, `WaitReschedulePolicy`,
`RejectPolicy`
**Problem Description**:
As a context object for scheduling strategies, this class lacks necessary
documentation:
1. No class-level JavaDoc explaining its purpose
2. No explanation of the meaning and lifecycle of each field
3. No explanation of the execution timing and precautions for `failJobAction`
4. No explanation of method contracts (especially the meaning of the return
value of `moveHeadToTail`)
**Potential Risks**:
- Risk 1: Other developers may not be aware that `failJobAction` is executed
from an external thread, which may lead to thread safety issues
- Risk 2: Unclear about the behavior when `moveHeadToTail` returns false
(should processing continue?)
- Risk 3: Unclear whether the relationship between `jobId` and
`pendingJobInfo.getJobId()` is consistent
**Impact Scope**:
- Direct impact: All implementers of `PendingJobSchedulePolicy`
- Indirect impact: Future maintainers
- Impact area: Scheduling strategy extension point
**Severity**: MINOR
**Improvement Suggestions**:
```java
/**
* Context object for pending job schedule policies.
*
* <p>This context provides all necessary information and actions for a
schedule policy
* to decide what to do when resources are not enough for a pending job.
*
* <p><b>Thread Safety:</b> This object is typically created
per-schedule-cycle and
* should not be shared across threads. The {@code failJobAction} is
executed immediately
* when {@link #failJob()} is called, which may be from a different thread.
*
* <p><b>Lifecycle:</b> A new context is created for each iteration of the
pending job
* schedule loop and discarded after use.
*/
@Value
public class PendingJobScheduleContext {
/**
* The pending job info being scheduled. Contains job state and metadata.
*/
PendingJobInfo pendingJobInfo;
/**
* The pending job queue containing all waiting jobs.
*/
PeekBlockingQueue<PendingJobInfo> pendingJobQueue;
/**
* The engine configuration containing schedule strategy settings.
*/
EngineConfig engineConfig;
/**
* The job ID being scheduled. Should match {@code
pendingJobInfo.getJobId()}.
*/
long jobId;
/**
* Action to fail the current job. This action will:
* <ul>
* <li>Mark the job as FAILED</li>
* <li>Remove it from the pending queue</li>
* <li>Notify the client</li>
* </ul>
*
* <p><b>Important:</b> This action should only be called once. Multiple
calls
* may result in unexpected behavior.
*/
Runnable failJobAction;
/**
* Attempt to move the current job (which should be at the head of the
queue) to the tail.
*
* <p>This is a "best effort" operation. It will only succeed if:
* <ul>
* <li>The job exists in the queue</li>
* <li>The job is currently at the head of the queue</li>
* <li>No concurrent operation has removed the job</li>
* </ul>
*
* @return {@code true} if the job was successfully moved to the tail;
* {@code false} otherwise (e.g., job not at head, already
removed, etc.)
*/
public boolean moveHeadToTail() {
return pendingJobQueue.moveToTail(jobId);
}
/**
* Fail the current job immediately.
*
* <p>This will execute the {@code failJobAction} provided at
construction time.
* After calling this method, the job should not be accessed anymore.
*/
public void failJob() {
failJobAction.run();
}
/**
* Sleep for the specified duration.
*
* @param sleepMillis sleep duration in milliseconds; if <= 0, returns
immediately
* @throws InterruptedException if the thread is interrupted while
sleeping
*/
public void sleep(long sleepMillis) throws InterruptedException {
if (sleepMillis > 0) {
Thread.sleep(sleepMillis);
}
}
}
```
**Rationale**:
- Complete JavaDoc can help other developers correctly understand and use
this class
- Explaining thread safety and lifecycle can prevent misuse
- Clarifying method contracts and return value meanings can improve code
maintainability
---
### Issue 4: YamlSeaTunnelDomConfigProcessor's configuration parsing logic
is complex and hard to maintain
**Location**:
`seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java:289-378`
```java
private void parseJobScheduleStrategyNode(
Node jobScheduleStrategyNode,
EngineConfig engineConfig,
EnumSet<ScheduleStrategy> configuredScheduleStrategyConfigs) {
// ... approximately 50 lines of complex logic
}
private ScheduleStrategyConfig parseScheduleStrategyConfig(
Node scheduleStrategyConfigNode,
ScheduleStrategy scheduleStrategy,
String sectionName) {
// ...
}
private void setScheduleStrategyConfigValue(
ScheduleStrategyConfig config, String key, String value) {
// Use reflection to invoke setter
String propertyName = toCamelCase(key);
String setterName = "set" + capitalize(propertyName);
Method setter = Arrays.stream(config.getClass().getMethods())
.filter(m -> m.getName().equals(setterName) &&
m.getParameterCount() == 1)
.findFirst()
.orElseThrow(() -> new
InvalidConfigurationException("Unrecognized element: " + key));
// ...
}
```
**Related Context**:
- Caller: `parseEngineConfig` (line 146-287)
- Configuration classes: `WaitConfig`, `WaitRescheduleConfig`
- Enum: `ScheduleStrategy`
**Problem Description**:
1. **Safety of reflection usage**:
- Using reflection to call setter methods, although there is type
checking, risks still exist
- If the configuration class adds setters not requiring user
configuration (such as `setInternalState`), they may be called incorrectly
- No restriction on accessible methods (such as public/private)
2. **Error handling is not detailed enough**:
```java
} catch (Exception e) {
throw new InvalidConfigurationException(
"Set config failed for key: " + key + ", value: " + value, e);
}
```
捕获所有异常并包装,但原始异常信息可能不够清晰。
3. **配置解析路径复杂**:
- 支持两种配置格式(简单形式和嵌套形式)
- 两个解析路径可能在边界情况下产生冲突
- `parseJobScheduleStrategyNode` 和主解析逻辑都有处理配置节的代码
** Potential Risk**:
- 风险1:反射可能调用到不该调用的方法(如父类的方法)
- 风险2:配置错误时,错误信息可能不清晰
- 风险3:添加新的配置项时,需要同时修改多处代码
** Impact Scope**:
- 直接影响:所有使用 ScheduleStrategy 配置的用户
- 间接影响:未来的配置扩展
- 影响面:配置加载模块
** Severity**:MAJOR
** Improvement Suggestion**:
方案1:改进反射逻辑,增加安全性检查
```java
private void setScheduleStrategyConfigValue(
ScheduleStrategyConfig config, String key, String value) {
String propertyName = toCamelCase(key);
String setterName = "set" + capitalize(propertyName);
// Find the setter method
Method setter = Arrays.stream(config.getClass().getMethods())
.filter(m -> m.getName().equals(setterName)
&& m.getParameterCount() == 1
&& m.getDeclaringClass().equals(config.getClass())) //
Only find methods of current class
.findFirst()
.orElseThrow(() ->
new InvalidConfigurationException(
"Unrecognized config key: " + key +
". Valid keys are: " + getValidConfigKeys(config)));
Class<?> paramType = setter.getParameterTypes()[0];
Object convertedValue = convertValue(key, value, paramType);
try {
setter.invoke(config, convertedValue);
} catch (IllegalAccessException e) {
throw new InvalidConfigurationException(
"Cannot access setter for key: " + key, e);
} catch (IllegalArgumentException e) {
throw new InvalidConfigurationException(
String.format("Invalid value '%s' for key '%s'. Expected type:
%s",
value, key, paramType.getSimpleName()), e);
} catch (InvocationTargetException e) {
throw new InvalidConfigurationException(
"Failed to set config for key: " + key, e.getCause());
}
}
private String getValidConfigKeys(ScheduleStrategyConfig config) {
return Arrays.stream(config.getClass().getMethods())
.filter(m -> m.getName().startsWith("set")
&& m.getParameterCount() == 1
&& m.getDeclaringClass().equals(config.getClass()))
.map(m -> decapitalize(m.getName().substring(3)))
.collect(Collectors.joining(", "));
}
```
方案2:考虑使用更成熟的配置绑定方案(如 Jackson 或 MapStruct)
```java
// Use Jackson's ObjectMapper
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
// Convert Node to configuration object
WaitRescheduleConfig config = mapper.treeToValue(configNode,
WaitRescheduleConfig.class);
```
但考虑到 Hazelcast 已经有自己的配置处理机制,方案1更合适。
** Rationale**:
- 方案1保持了现有架构,改进了安全性和错误处理
- 方案2需要更大改动,可能引入新的依赖
- 建议:优先采用方案1,并添加单元测试覆盖配置解析逻辑
---
# ## Issue 5: Lack of logging and monitoring metrics for rescheduling
operations
**
Location**:`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PendingJobSchedulePolicyFactory.java:54-74`
```java
private static class WaitReschedulePolicy implements
PendingJobSchedulePolicy {
@Override
public void onResourcesNotEnough(PendingJobScheduleContext context)
throws InterruptedException {
// ...
if (maxRetryTimes > 0
&& context.getPendingJobQueue().size() > 1
&& checkTimes > 0
&& checkTimes % maxRetryTimes == 0) {
context.moveHeadToTail(); // ⚠️ No logging
}
context.sleep(config.getSleepIntervalMillis());
}
}
```
** Related Context**:
- `CoordinatorService` 有日志记录,但没有记录重调度操作
- 没有对应的 Metrics 指标
** Issue Description**:
当发生重调度操作时(job 从队列头部移到尾部),没有任何日志记录或监控指标。这导致:
1. 无法通过日志追踪 job 的调度历史
2. 无法验证重调度策略是否按预期工作
3. 无法在监控系统中看到重调度频率
4. 调试调度问题时困难
** Potential Risk**:
- 风险1:生产环境中出现调度问题时,无法通过日志排查
- 风险2:无法量化重调度对系统性能的影响
- 风险3:无法检测到配置错误(如 maxRetryTimes 设置过小导致频繁重调度)
** Impact Scope**:
- 直接影响:运维和问题排查
- 间接影响:系统可观测性
- 影响面:生产环境的可维护性
** Severity**:MAJOR
** Improvement Suggestion**:
```java
private static class WaitReschedulePolicy implements
PendingJobSchedulePolicy {
@Override
public void onResourcesNotEnough(PendingJobScheduleContext context)
throws InterruptedException {
WaitRescheduleConfig config =
context.getEngineConfig()
.getScheduleStrategyConfig(
ScheduleStrategy.WAIT_RESCHEDULE,
WaitRescheduleConfig.class);
if (config == null) {
config = new WaitRescheduleConfig();
}
int maxRetryTimes = config.getMaxRetryTimes();
int checkTimes = context.getPendingJobInfo().getCheckTimes();
boolean rescheduled = false;
if (maxRetryTimes > 0
&& context.getPendingJobQueue().size() > 1
&& checkTimes > 0
&& checkTimes % maxRetryTimes == 0) {
boolean moved = context.moveHeadToTail();
if (moved) {
rescheduled = true;
// Log
log.info(
"Rescheduled job {} to tail of pending queue after {}
checks (maxRetryTimes={})",
context.getJobId(), checkTimes, maxRetryTimes);
// Optional: Record Metrics
//
metricsRegistry.counter("job.reschedule.count").increment();
// Optional: Record events to IMap for REST API query
// jobRescheduleEvents.put(context.getJobId(), new
RescheduleEvent(...));
}
}
context.sleep(config.getSleepIntervalMillis());
}
}
```
同时,在 `PeekBlockingQueue.moveToTail` 中也添加日志:
```java
public boolean moveToTail(Long jobId) {
lock.lock();
try {
E element = jobIdMap.get(jobId);
if (element == null) {
log.warn("Failed to move job {} to tail: job not found in
queue", jobId);
return false;
}
E head = queue.peek();
if (head == null || !head.equals(element)) {
log.debug("Job {} is not at head of queue, skipping move to
tail", jobId);
return false;
}
if (!queue.remove(element)) {
log.warn("Failed to move job {} to tail: remove operation
failed", jobId);
return false;
}
queue.put(element);
notEmpty.signalAll();
log.info("Successfully moved job {} from head to tail of pending
queue", jobId);
return true;
} catch (InterruptedException e) {
log.error("Move element to tail failed. {}",
ExceptionUtils.getMessage(e));
Thread.currentThread().interrupt();
return false;
} finally {
lock.unlock();
}
}
```
** Rationale**:
- 添加日志可以追踪重调度操作,便于问题排查
- 添加 Metrics 可以量化重调度频率,便于性能调优
- 使用不同的日志级别(INFO/WARN/DEBUG)可以减少日志量
- 日志包含关键信息(jobId, checkTimes, maxRetryTimes)便于分析
** Optional Further Improvement**: Add REST API to query job rescheduling
history
---
# ## Issue 6: Improper handling of InterruptedException, interrupt signal is
swallowed
**
Location**:`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java:303-305`
```java
try {
pendingJobSchedulePolicy.onResourcesNotEnough(context);
} catch (InterruptedException e) {
logger.severe(ExceptionUtils.getMessage(e)); // ⚠️ Issue
}
```
** Related Context**:
- 调用方:`pendingJobSchedule` 方法
- 线程:`startPendingJobScheduleThread` 创建的后台线程
** Issue Description**:
当捕获到 `InterruptedException` 时,代码只是记录了日志,但没有:
1. 恢复线程的中断状态(`Thread.currentThread().interrupt()`)
2. 重新抛出异常
3. 退出循环或进行清理
根据 Java 最佳实践,当捕获到 `InterruptedException` 时,应该:
- 重新恢复中断状态
- 或者向上传播异常
- 或者退出线程
当前的处理方式会导致中断信号被"吞掉",线程会继续运行,可能无法响应外部的中断请求(如 JVM 关闭)。
** Potential Risk**:
- 风险1:当外部尝试中断调度线程时(如关闭服务),线程可能不会及时响应
- 风险2:可能掩盖了真正的中断原因
- 风险3:违反了 Java 并发编程的最佳实践
** Impact Scope**:
- 直接影响:调度线程的生命周期管理
- 间接影响:服务关闭时的资源清理
- 影响面:服务可维护性
** Severity**:MAJOR
** Improvement Suggestion**:
方案1:恢复中断状态并退出循环(推荐)
```java
try {
pendingJobSchedulePolicy.onResourcesNotEnough(context);
} catch (InterruptedException e) {
logger.warning("Pending job schedule thread interrupted, exiting");
Thread.currentThread().interrupt(); // Restore interrupt status
throw new RuntimeException("Schedule thread interrupted", e); // Rethrow
}
```
方案2:如果希望继续运行(不推荐)
```java
try {
pendingJobSchedulePolicy.onResourcesNotEnough(context);
} catch (InterruptedException e) {
logger.warning("Pending job schedule interrupted, continuing");
Thread.currentThread().interrupt(); // At least restore interrupt status
return; // This loop interrupted, continue next loop
}
```
方案3:检查中断标志(最优雅)
```java
// Check at method start
private void pendingJobSchedule() throws InterruptedException {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Schedule thread was interrupted");
}
PendingJobInfo pendingJobInfo = pendingJobQueue.peekBlocking();
// ...
try {
pendingJobSchedulePolicy.onResourcesNotEnough(context);
} catch (InterruptedException e) {
logger.info("Pending job schedule thread interrupted");
Thread.currentThread().interrupt();
throw e; // Propagate up
}
}
```
** Rationale**:
- 方案1符合最佳实践,确保中断信号不会丢失
- 方案2至少恢复了中断状态,但可能导致线程"半死不活"
- 方案3最优雅,但需要修改方法签名
建议采用方案1,因为:
1. 外部调用者(startPendingJobScheduleThread)已经处理了 RuntimeException
2. 明确表示线程被中断,需要退出
3. 遵循 Java 并发最佳实践
---
# ## Issue 7: Insufficient test coverage, missing edge case and concurrent
testing
**
Location**:`seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueueTest.java:126-136`
```java
@Test
public void testMoveToTail() throws InterruptedException {
queue.put("1");
queue.put("2");
queue.put("3");
Assertions.assertTrue(queue.moveToTail(1L));
Assertions.assertEquals("2", queue.peekBlocking());
Assertions.assertEquals("2", queue.take());
Assertions.assertEquals("3", queue.take());
Assertions.assertEquals("1", queue.take());
}
```
** Related Context**:
- 被测试类:`PeekBlockingQueue.moveToTail`
-
相关测试:`SeaTunnelEngineClusterRoleTest.pendingQueueRescheduleAllowsLaterJobRunMultiNode`
** Issue Description**:
当前的测试只覆盖了正常情况下的 `moveToTail` 操作,缺少以下测试场景:
1. **边界情况**:
- 空队列
- 单元素队列
- 移动不存在的 jobId
- 移动已经在尾部的元素
2. **并发场景**:
- 移动的同时另一个线程在 take
- 移动的同时另一个线程在 removeById
- 多个线程同时调用 moveToTail
3. **配置相关**:
- 不同的 maxRetryTimes 值(0, 1, 2, 5, 10)
- 不同的 sleepIntervalMillis 值
- 配置验证测试(负值、0等)
4. **策略相关**:
- WaitPolicy vs WaitReschedulePolicy 的行为差异
- REJECT 策略是否正确失败 job
** Potential Risk**:
- 风险1:边界条件下的 bug 可能在生产环境中出现
- 风险2:并发问题可能在高负载下暴露
- 风险3:配置错误可能在运行时才被发现
** Impact Scope**:
- 直接影响:代码质量和稳定性
- 间接影响:生产环境可靠性
- 影响面:所有使用该功能的用户
** Severity**:MAJOR
** Improvement Suggestion**:
添加以下测试用例:
```java
// PeekBlockingQueueTest.java
@Test
public void testMoveToTail_EmptyQueue() throws InterruptedException {
// Empty queue, moving non-existent element
Assertions.assertFalse(queue.moveToTail(999L));
Assertions.assertEquals(0, queue.size());
}
@Test
public void testMoveToTail_SingleElementQueue() throws InterruptedException {
// Single element queue
queue.put("1");
Assertions.assertFalse(queue.moveToTail(1L)); // size() == 1, should
not move
Assertions.assertEquals("1", queue.take());
}
@Test
public void testMoveToTail_NonExistentJob() throws InterruptedException {
// Move non-existent jobId
queue.put("1");
queue.put("2");
Assertions.assertFalse(queue.moveToTail(999L));
Assertions.assertEquals(2, queue.size());
}
@Test
public void testMoveToTail_NonHeadElement() throws InterruptedException {
// Move non-head element (should fail)
queue.put("1");
queue.put("2");
queue.put("3");
Assertions.assertFalse(queue.moveToTail(2L)); // 2 is not at head
Assertions.assertEquals("1", queue.take());
Assertions.assertEquals("2", queue.take());
Assertions.assertEquals("3", queue.take());
}
@Test
public void testMoveToTail_ConcurrentWithTake() throws InterruptedException {
// Concurrent test: moving while another thread is taking
queue.put("1");
queue.put("2");
queue.put("3");
AtomicBoolean moved = new AtomicBoolean(false);
Thread t1 = new Thread(() -> {
try {
moved.set(queue.moveToTail(1L));
} catch (Exception e) {
e.printStackTrace();
}
});
Thread t2 = new Thread(() -> {
try {
Thread.sleep(10); // Ensure moveToTail executes first
queue.take();
} catch (Exception e) {
e.printStackTrace();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
// Verify queue state is consistent
// ...
}
@Test
public void testMoveToTail_ConcurrentWithRemoveById() throws
InterruptedException {
// Concurrent test: moving while another thread is removing by ID
queue.put("1");
queue.put("2");
queue.put("3");
AtomicBoolean moved = new AtomicBoolean(false);
AtomicBoolean removed = new AtomicBoolean(false);
Thread t1 = new Thread(() -> {
moved.set(queue.moveToTail(1L));
});
Thread t2 = new Thread(() -> {
removed.set(queue.removeById(1L));
});
t1.start();
t2.start();
t1.join();
t2.join();
// Verify: only one operation succeeded
Assertions.assertTrue(moved.get() ^ removed.get()); // XOR
}
// Configuration tests
@Test
public void testWaitRescheduleConfig_Validation() {
WaitRescheduleConfig config = new WaitRescheduleConfig();
// Test default values
Assertions.assertEquals(3, config.getMaxRetryTimes());
Assertions.assertEquals(3000, config.getSleepIntervalMillis());
// Test boundary values
config.setMaxRetryTimes(1);
Assertions.assertEquals(1, config.getMaxRetryTimes());
config.setSleepIntervalMillis(100);
Assertions.assertEquals(100, config.getSleepIntervalMillis());
}
// Policy tests
@Test
public void testWaitReschedulePolicy_WithZeroCheckTimes() {
// Test checkTimes = 0 scenario
PendingJobInfo mockInfo = mock(PendingJobInfo.class);
when(mockInfo.getCheckTimes()).thenReturn(0);
PendingJobScheduleContext context = new PendingJobScheduleContext(
mockInfo, mockQueue, mockConfig, jobId, mockAction
);
WaitReschedulePolicy policy = new WaitReschedulePolicy();
policy.onResourcesNotEnough(context);
// Verify: when checkTimes = 0, rescheduling is not triggered
verify(mockQueue, never()).moveToTail(anyLong());
}
```
** Rationale**:
- 边界测试可以发现空指针、越界等常见 bug
- 并发测试可以发现竞态条件和死锁
- 配置测试可以提前发现配置错误
- 策略测试可以验证不同策略的行为差异
- 这些测试虽然增加了开发时间,但能显著提高代码质量和可靠性
---
# ## Issue 8: PeekBlockingQueue.moveToTail condition check is too strict
**
Location**:`seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/utils/PeekBlockingQueue.java:127-143`
** Issue Description**:
```java
E head = queue.peek();
if (head == null || !head.equals(element)) {
return false; // ⚠️ Issue
}
```
这个检查确保只有队首元素才能被移动到尾部。但在多线程环境下,可能出现以下情况:
1. 线程A调用 `moveToTail(jobId)`
2. 线程A执行到 `E head = queue.peek()`,假设 head = job1
3. 线程B调用 `take()`,取走了 job1
4. 线程A继续执行 `!head.equals(element)`,发现不匹配,返回 false
5. 但实际上,job1 已经被取走,应该从队列中移除,不应该再被移动
更关键的是,如果我们的目标是"公平调度",那么在以下情况下应该也能移动:
- job 在队列中间位置(不是队首)
- job 刚被一个短 job 取代了位置
** Potential Risk**:
- 在某些竞态条件下,可能导致 job 无法被正确重调度
- 限制了未来的扩展性(如实现更灵活的调度策略)
** Impact Scope**:
- 直接影响:`moveToTail` 的行为
- 间接影响:重调度策略的有效性
- 影响面:调度模块
** Severity**:MINOR
** Improvement Suggestion**:
这个问题实际上是问题1的延伸。如果采纳问题1的方案2(实现真正的 moveToTail),则此问题自动解决。
如果保持当前设计(只移动队首),则建议:
```java
/**
* Move the head element to the tail of the queue if it matches the given
jobId.
*
* <p>This is a specialized method for job rescheduling. It will only
succeed if:
* <ul>
* <li>The job exists in the queue</li>
* <li>The job is currently at the head of the queue</li>
* </ul>
*
* <p>If the job is not at the head (e.g., due to concurrent operations),
* this method will return false and the caller should retry in the next
cycle.
*
* @param jobId the job id that should be at the head
* @return true if the head job was moved to tail; false otherwise
*/
public boolean moveHeadToTailIfAtHead(Long jobId) {
lock.lock();
try {
E element = jobIdMap.get(jobId);
if (element == null) {
return false;
}
E head = queue.peek();
if (head == null || !head.equals(element)) {
return false;
}
if (!queue.remove(element)) {
return false;
}
queue.put(element);
notEmpty.signalAll();
return true;
} catch (InterruptedException e) {
log.error("Move element to tail failed. {}",
ExceptionUtils.getMessage(e));
Thread.currentThread().interrupt();
return false;
} finally {
lock.unlock();
}
}
```
The key is to improve the JavaDoc, clearly documenting the method's contract
and behavior.
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]