This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e1a81ac167 [Hotfix][Zeta] Fix job deadlock when schema change (#6389)
e1a81ac167 is described below
commit e1a81ac1677166e07c21311966705e0029e79b7b
Author: hailin0 <[email protected]>
AuthorDate: Wed Mar 6 16:56:36 2024 +0800
[Hotfix][Zeta] Fix job deadlock when schema change (#6389)
---
.../server/task/flow/SourceFlowLifeCycle.java | 41 +++++++++++++---------
1 file changed, 24 insertions(+), 17 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index f5964badee..95e54980b4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -168,8 +168,8 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
"previous schema changes in progress,
schemaChangePhase: "
+ schemaChangePhase.get());
}
- runningTask.triggerSchemaChangeBeforeCheckpoint().get();
schemaChangePhase.set(SchemaChangePhase.createBeforePhase());
+ runningTask.triggerSchemaChangeBeforeCheckpoint().get();
log.info("triggered schema-change-before checkpoint, stopping
collect data");
} else if (collector.captureSchemaChangeAfterCheckpointSignal()) {
if (schemaChangePhase.get() != null) {
@@ -177,8 +177,8 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
"previous schema changes in progress,
schemaChangePhase: "
+ schemaChangePhase.get());
}
- runningTask.triggerSchemaChangeAfterCheckpoint().get();
schemaChangePhase.set(SchemaChangePhase.createAfterPhase());
+ runningTask.triggerSchemaChangeAfterCheckpoint().get();
log.info("triggered schema-change-after checkpoint, stopping
collect data");
}
} else {
@@ -284,25 +284,32 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
currentTaskLocation);
CheckpointType checkpointType = ((CheckpointBarrier)
barrier).getCheckpointType();
- if (schemaChanging() && checkpointType.isSchemaChangeCheckpoint()) {
- if (checkpointType.isSchemaChangeBeforeCheckpoint()
- && schemaChangePhase.get().isBeforePhase()) {
- schemaChangePhase.get().setCheckpointId(barrier.getId());
- } else if (checkpointType.isSchemaChangeAfterCheckpoint()
- && schemaChangePhase.get().isAfterPhase()) {
- schemaChangePhase.get().setCheckpointId(barrier.getId());
+ if (checkpointType.isSchemaChangeCheckpoint()) {
+ if (schemaChanging()) {
+ if (checkpointType.isSchemaChangeBeforeCheckpoint()
+ && schemaChangePhase.get().isBeforePhase()) {
+ schemaChangePhase.get().setCheckpointId(barrier.getId());
+ } else if (checkpointType.isSchemaChangeAfterCheckpoint()
+ && schemaChangePhase.get().isAfterPhase()) {
+ schemaChangePhase.get().setCheckpointId(barrier.getId());
+ } else {
+ throw new IllegalStateException(
+ String.format(
+ "schema-change checkpoint[%s,%s] and
phase[%s] is not matched",
+ barrier.getId(),
+ checkpointType,
+ schemaChangePhase.get().getPhase()));
+ }
+ log.info(
+ "lock checkpoint[{}] waiting for complete..., phase:
[{}]",
+ barrier.getId(),
+ schemaChangePhase.get().getPhase());
} else {
throw new IllegalStateException(
String.format(
- "schema-change checkpoint[%s,%s] and phase[%s]
is not matched",
- barrier.getId(),
- checkpointType,
- schemaChangePhase.get().getPhase()));
+ "schema-change checkpoint[%s] and phase[%s] is
not matched",
+ barrier.getId(), checkpointType));
}
- log.info(
- "lock checkpoint[{}] waiting for complete..., phase: [{}]",
- barrier.getId(),
- schemaChangePhase.get().getPhase());
}
}