DanielCarter-stack commented on PR #10551:
URL: https://github.com/apache/seatunnel/pull/10551#issuecomment-3980337294
<!-- code-pr-reviewer -->
<!-- cpr:pr_reply_v2_parts {"group": "apache/seatunnel#10551", "part": 1,
"total": 1} -->
### Issue 1: NPE risk when currExecutionState may be null
**Location**: `PhysicalVertex.java:363`
```java
if (stateEntryMissing) {
log.warn(...);
current = currExecutionState; // currExecutionState may be null
}
```
**Related Context**:
- Constructor: `PhysicalVertex.java:162` - `this.currExecutionState =
(ExecutionState) runningJobStateIMap.get(taskGroupLocation);`
- Caller 1: `PhysicalVertex.java:576` - `stateProcess()` →
`updateTaskState(ExecutionState.RUNNING)`
- Caller 2: `PhysicalVertex.java:580` - `stateProcess()` →
`updateTaskState(ExecutionState.FAILED)`
- Caller 3: `PhysicalVertex.java:623` - `makeTaskGroupFailing()` →
`updateTaskState(ExecutionState.FAILING)`
**Issue Description**:
Although the PR handles the case where `runningJobStateIMap.get()` returns
null, the fallback `currExecutionState` itself may also be null. Looking at
line 162 of the constructor, `currExecutionState` initialization depends on
`runningJobStateIMap`. In the following scenarios:
1. When constructing `PhysicalVertex`, there is no corresponding entry in
`runningJobStateIMap`
2. Although lines 153-160 of the constructor will try to create an entry,
under race conditions (such as parallel initialization of multiple
PhysicalVertex), `currExecutionState` may still be null
When `currExecutionState` is null and `stateEntryMissing=true`, although the
subsequent `current.equals(targetState)` (line 370) will not execute (due to
the `current != null` check), if the target state transition flow depends on
the value of `current`, it may lead to unexpected behavior.
**Potential Risks**:
- **Risk 1**: If `currExecutionState` is null, `current` will also be null,
causing subsequent state transition logic to be based on incorrect assumptions
- **Risk 2**: The log will record the `null` state value, which may cause
log parsing errors
- **Risk 3**: The switch statement in the `stateProcess()` method depends on
the return value of `getExecutionState()`. If it returns null, a
NullPointerException will be thrown
**Impact Scope**:
- **Direct Impact**: `PhysicalVertex.updateTaskState()` method
- **Indirect Impact**: All paths that call `updateTaskState`, including
`stateProcess()`, `makeTaskGroupFailing()`, `updateStateByExecutionService()`
- **Impact Area**: Core framework (Engine Server)
**Severity**: **MAJOR**
**Improvement Suggestions**:
```java
// Add additional null check in updateTaskState method
if (stateEntryMissing) {
log.warn(
"{} current state is null (possibly due to node removal during
scaling down), "
+ "continue local transition to {}. Task execution
location: {}",
taskFullName,
targetState,
taskGroupLocation);
current = currExecutionState;
// Added: If local state is also null, use default initial state
if (current == null) {
log.error(
"{} Both distributed state and local state are null,
assuming CREATED as fallback. "
+ "Task execution location: {}",
taskFullName,
taskGroupLocation);
current = ExecutionState.CREATED;
}
}
```
**Rationale**:
1. Defensive programming: can continue processing even if
`currExecutionState` is null
2. Use `CREATED` as a reasonable default value, consistent with the initial
state of the state machine
3. Add error logging to facilitate detection of such exceptional situations
4. No need to modify the `PhysicalVertex` constructor, maintaining backward
compatibility
---
### Issue 2: Not updating distributed Map when state entry is missing may
cause state inconsistency
**Location**: `PhysicalVertex.java:386-398`
```java
// now do the actual state transition
if (!stateEntryMissing) {
RetryUtils.retryWithException(
() -> {
updateStateTimestamps(targetState);
runningJobStateIMap.set(taskGroupLocation, targetState);
return null;
},
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
ExceptionUtil::isOperationNeedRetryException,
Constant.OPERATION_RETRY_SLEEP));
}
this.currExecutionState = targetState;
```
**Related Context**:
- `initStateFuture()`: `PhysicalVertex.java:182-204` - restore state from
`runningJobStateIMap`
- JobMaster failover: `JobMaster.java` - new Master restores job status from
IMap
- Monitoring system: may read task status from `runningJobStateIMap` for
display
**Issue Description**:
When `stateEntryMissing=true`, the PR chooses to only update the local state
`currExecutionState`, not the distributed Map `runningJobStateIMap`. This leads
to the following problems:
1. **State inconsistency**: Local state has been updated (e.g., FAILED), but
the distributed Map still has no entry for this task
2. **Failover failure**: If JobMaster fails over, the new Master cannot
restore this task's state from `runningJobStateIMap`
3. **Monitoring blind spot**: Monitoring systems that depend on
`runningJobStateIMap` cannot see the latest state of this task
**Potential Risks**:
- **Risk 1**: After JobMaster failover, the new Master may think the task
does not exist, leading to duplicate execution or state confusion
- **Risk 2**: When operations personnel view task status from Hazelcast
IMap, they will get incomplete information
- **Risk 3**: If `currExecutionState` has been updated but there is no
record in IMap, it may cause errors in certain logic that depends on IMap (such
as counting completed tasks)
**Impact Scope**:
- **Direct Impact**: State persistence of `PhysicalVertex`
- **Indirect Impact**: JobMaster failover recovery, monitoring system, task
statistics logic
- **Impact Area**: Core framework (state management and failover recovery)
**Severity**: **MAJOR**
**Improvement Suggestions**:
There are two possible improvement approaches:
**Approach 1: Still try to write to distributed Map** (recommended)
```java
// now do the actual state transition
if (!stateEntryMissing) {
RetryUtils.retryWithException(
() -> {
updateStateTimestamps(targetState);
runningJobStateIMap.set(taskGroupLocation, targetState);
return null;
},
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
ExceptionUtil::isOperationNeedRetryException,
Constant.OPERATION_RETRY_SLEEP));
} else {
// Added: Try to recreate even if the entry was originally missing
try {
updateStateTimestamps(targetState);
runningJobStateIMap.put(taskGroupLocation, targetState);
log.info(
"{} Recreated state entry in distributed map for state
transition to {}",
taskFullName,
targetState);
} catch (Exception e) {
// If write fails, log error but don't block local state transition
log.warn(
"{} Failed to recreate state entry in distributed map: {}",
taskFullName,
ExceptionUtils.getMessage(e));
}
}
this.currExecutionState = targetState;
```
**Approach 2: Accept state inconsistency, but add documentation**
```java
/**
* Update task state. When the state entry is missing from the distributed
map
* (e.g., due to node removal during scaling down), only the local state is
updated.
* This is a known trade-off to allow task state progression even when the
* distributed state is unavailable.
*
* Note: This may cause temporary inconsistency between local and
distributed state.
* The local state is the source of truth for task lifecycle management.
*/
public synchronized void updateTaskState(@NonNull ExecutionState
targetState) {
// ... existing implementation ...
}
```
**Rationale**:
**Advantages of Approach 1**:
1. Maintain state consistency: even if the entry was originally missing, try
to recreate it
2. Support failover recovery: new Master can restore complete state from
distributed Map
3. Monitoring visibility: monitoring systems can see the latest state
**Advantages of Approach 2**:
1. If recreating the entry may cause other problems (such as entry ID
conflicts), then approach 2 is safer
2. Clearly inform users that this is a known trade-off
3. Leave room for future optimization
**Recommend Approach 1**, because:
- `runningJobStateIMap.put()` should be an idempotent operation
- If the entry truly cannot be created (e.g., Hazelcast cluster failure),
the catch block will catch and log the exception
- Local state transition will still proceed, maintaining fault tolerance
---
### Issue 3: Test cases do not cover the scenario where currExecutionState
is null
**Location**: `TaskTest.java:312-397`
```java
@Test
@SetEnvironmentVariable(key = SKIP_CHECK_JAR, value = "true")
public void testUpdateTaskStateWhenStateEntryMissing() throws
MalformedURLException {
// ... test code ...
runningJobState.remove(physicalVertex.getTaskGroupLocation());
physicalVertex.makeTaskGroupFailing(new RuntimeException("test missing
state entry"));
Assertions.assertTrue(stateFuture.isDone());
Assertions.assertEquals(ExecutionState.FAILED,
physicalVertex.getExecutionState());
Assertions.assertEquals(ExecutionState.FAILED,
stateFuture.join().getExecutionState());
}
```
**Related Context**:
- `PhysicalVertex` constructor: `PhysicalVertex.java:127-180` -
`currExecutionState` initialization logic
- Other test cases: other test methods in `TaskTest.java`
**Issue Description**:
Current test cases only verify the following scenarios:
1. Create `PhysicalVertex` and initialize (at this time `currExecutionState`
will be correctly set)
2. Delete entry in `runningJobState`
3. Call `makeTaskGroupFailing` to trigger state transition
But this test does not cover the following scenarios:
1. **Case where currExecutionState itself is null**: if before
`runningJobState.remove()`, `currExecutionState` is already null
2. **Concurrent scenarios**: multiple threads call `updateTaskState`
simultaneously and the state entry is missing
3. **Continuous state transitions**: whether state can correctly advance
when the state entry is missing multiple times
**Potential Risks**:
- **Risk 1**: The scenario where `currExecutionState` is null mentioned in
Issue 1 is not covered by tests
- **Risk 2**: Boundary conditions may lead to unexpected behavior in
production environment
- **Risk 3**: Insufficient test coverage may hide potential state machine
bugs
**Impact Scope**:
- **Direct Impact**: Test coverage and code quality
- **Indirect Impact**: Production environment stability
- **Impact Area**: Single test case
**Severity**: **MINOR**
**Improvement Suggestions**:
```java
@Test
@SetEnvironmentVariable(key = SKIP_CHECK_JAR, value = "true")
public void testUpdateTaskStateWhenStateEntryMissingAndLocalStateNull()
throws MalformedURLException {
IdGenerator idGenerator = new IdGenerator();
// ... setup LogicalDag and PhysicalPlan (same as existing tests) ...
PhysicalVertex physicalVertex =
physicalPlan.getPipelineList().get(0).getPhysicalVertexList().get(0);
PassiveCompletableFuture<TaskExecutionState> stateFuture =
physicalVertex.initStateFuture();
physicalVertex.startPhysicalVertex();
// Delete state entry
runningJobState.remove(physicalVertex.getTaskGroupLocation());
// Use reflection to set currExecutionState to null, simulating extreme
scenario
java.lang.reflect.Field field =
PhysicalVertex.class.getDeclaredField("currExecutionState");
field.setAccessible(true);
field.set(physicalVertex, null);
// Trigger state transition
physicalVertex.makeTaskGroupFailing(new RuntimeException("test with null
local state"));
// Verify: Even if local state is null, task should still reach terminal
state
Assertions.assertTrue(stateFuture.isDone());
Assertions.assertEquals(ExecutionState.FAILED,
physicalVertex.getExecutionState());
Assertions.assertEquals(ExecutionState.FAILED,
stateFuture.join().getExecutionState());
}
@Test
@SetEnvironmentVariable(key = SKIP_CHECK_JAR, value = "true")
public void testConcurrentStateUpdateWhenEntryMissing() throws
MalformedURLException, Exception {
IdGenerator idGenerator = new IdGenerator();
// ... setup LogicalDag and PhysicalPlan ...
PhysicalVertex physicalVertex =
physicalPlan.getPipelineList().get(0).getPhysicalVertexList().get(0);
physicalVertex.startPhysicalVertex();
// Delete state entry
runningJobState.remove(physicalVertex.getTaskGroupLocation());
// Use CountDownLatch to simulate concurrent calls
java.util.concurrent.CountDownLatch startLatch = new
java.util.concurrent.CountDownLatch(1);
java.util.concurrent.CountDownLatch doneLatch = new
java.util.concurrent.CountDownLatch(2);
ExecutorService executor = Executors.newFixedThreadPool(2);
// Thread 1: Try to update to FAILING
executor.submit(() -> {
try {
startLatch.await();
physicalVertex.makeTaskGroupFailing(new RuntimeException("thread
1"));
} catch (Exception e) {
e.printStackTrace();
} finally {
doneLatch.countDown();
}
});
// Thread 2: Try to update to CANCELING
executor.submit(() -> {
try {
startLatch.await();
physicalVertex.cancel();
} catch (Exception e) {
e.printStackTrace();
} finally {
doneLatch.countDown();
}
});
startLatch.countDown(); // Start both threads simultaneously
doneLatch.await(5, TimeUnit.SECONDS);
// Verify: Should reach some terminal state, should not throw exception
or deadlock
ExecutionState finalState = physicalVertex.getExecutionState();
Assertions.assertTrue(
finalState.isEndState(),
"Final state should be terminal, but was: " + finalState);
}
```
**Rationale**:
1. **First test**: verifies the extreme scenario when `currExecutionState`
is null, ensuring code robustness
2. **Second test**: verifies thread safety under concurrent scenarios,
ensuring the `synchronized` keyword takes effect
3. Improves test coverage and reduces the risk of unexpected situations in
production environment
4. Test code is clear, easy to understand and maintain
---
### Issue 4: Missing JavaDoc documentation updates
**Location**: `PhysicalVertex.java:351`
**Related Context**:
- JavaDoc of `PhysicalVertex` class: `PhysicalVertex.java:67-72`
- JavaDoc of other methods: such as `resetExecutionState()`,
`updateStateByExecutionService()`
**Issue Description**:
The `updateTaskState` method adds important fault tolerance logic (handling
`stateEntryMissing`), but does not update or add JavaDoc to explain this
behavior. This will lead to:
1. Other developers not understanding why `runningJobStateIMap` is not
updated in certain cases
2. Code maintainers may mistakenly think this is a bug and try to "fix" it
3. Developers using this API are unclear about its fault tolerance features
**Potential Risks**:
- **Risk 1**: Reduced code maintainability, subsequent developers may
misunderstand the design intent
- **Risk 2**: May lead to inappropriate "fixes" that break the original
fault tolerance logic
- **Risk 3**: Violates Apache project's high standards for code documentation
**Impact Scope**:
- **Direct Impact**: Code maintainability
- **Indirect Impact**: Future maintainers' understanding cost
- **Impact Area**: Single method
**Severity**: **MINOR**
**Improvement Suggestions**:
```java
/**
* Update the task state in both the distributed state map and the local
state.
*
* <p>This method handles the scenario where the task state entry is missing
from
* the distributed map (e.g., due to node removal during scaling down). In
such cases:
* <ul>
* <li>The local state ({@code currExecutionState}) is used as a fallback
for the
* current state</li>
* <li>The distributed map is not updated if the entry is missing (to avoid
* creating orphaned entries)</li>
* <li>The local state is always updated to ensure the task can progress
to a
* terminal state</li>
* </ul>
*
* <p>This design ensures that task state transitions can complete even when
the
* distributed state is temporarily unavailable, preventing the task from
hanging
* in a non-terminal state.
*
* @param targetState the target state to transition to, must not be null
* @see ExecutionState
* @see #currExecutionState
* @see #stateProcess()
*/
public synchronized void updateTaskState(@NonNull ExecutionState
targetState) {
// ... method implementation ...
}
```
**Rationale**:
1. Clearly explains the method's behavior and fault tolerance design
2. Explains why the distributed Map is not updated when `stateEntryMissing`
3. Provides background information (node removal scenario), helping to
understand the design intent
4. Uses standard JavaDoc format, including `@param`, `@see` and other tags
5. Complies with Apache project documentation standards
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]