This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e1a81ac167 [Hotfix][Zeta] Fix job deadlock when schema change (#6389)
e1a81ac167 is described below

commit e1a81ac1677166e07c21311966705e0029e79b7b
Author: hailin0 <[email protected]>
AuthorDate: Wed Mar 6 16:56:36 2024 +0800

    [Hotfix][Zeta] Fix job deadlock when schema change (#6389)
---
 .../server/task/flow/SourceFlowLifeCycle.java      | 41 +++++++++++++---------
 1 file changed, 24 insertions(+), 17 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index f5964badee..95e54980b4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -168,8 +168,8 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
                             "previous schema changes in progress, 
schemaChangePhase: "
                                     + schemaChangePhase.get());
                 }
-                runningTask.triggerSchemaChangeBeforeCheckpoint().get();
                 schemaChangePhase.set(SchemaChangePhase.createBeforePhase());
+                runningTask.triggerSchemaChangeBeforeCheckpoint().get();
                 log.info("triggered schema-change-before checkpoint, stopping 
collect data");
             } else if (collector.captureSchemaChangeAfterCheckpointSignal()) {
                 if (schemaChangePhase.get() != null) {
@@ -177,8 +177,8 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
                             "previous schema changes in progress, 
schemaChangePhase: "
                                     + schemaChangePhase.get());
                 }
-                runningTask.triggerSchemaChangeAfterCheckpoint().get();
                 schemaChangePhase.set(SchemaChangePhase.createAfterPhase());
+                runningTask.triggerSchemaChangeAfterCheckpoint().get();
                 log.info("triggered schema-change-after checkpoint, stopping 
collect data");
             }
         } else {
@@ -284,25 +284,32 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
                 currentTaskLocation);
 
         CheckpointType checkpointType = ((CheckpointBarrier) 
barrier).getCheckpointType();
-        if (schemaChanging() && checkpointType.isSchemaChangeCheckpoint()) {
-            if (checkpointType.isSchemaChangeBeforeCheckpoint()
-                    && schemaChangePhase.get().isBeforePhase()) {
-                schemaChangePhase.get().setCheckpointId(barrier.getId());
-            } else if (checkpointType.isSchemaChangeAfterCheckpoint()
-                    && schemaChangePhase.get().isAfterPhase()) {
-                schemaChangePhase.get().setCheckpointId(barrier.getId());
+        if (checkpointType.isSchemaChangeCheckpoint()) {
+            if (schemaChanging()) {
+                if (checkpointType.isSchemaChangeBeforeCheckpoint()
+                        && schemaChangePhase.get().isBeforePhase()) {
+                    schemaChangePhase.get().setCheckpointId(barrier.getId());
+                } else if (checkpointType.isSchemaChangeAfterCheckpoint()
+                        && schemaChangePhase.get().isAfterPhase()) {
+                    schemaChangePhase.get().setCheckpointId(barrier.getId());
+                } else {
+                    throw new IllegalStateException(
+                            String.format(
+                                    "schema-change checkpoint[%s,%s] and 
phase[%s] is not matched",
+                                    barrier.getId(),
+                                    checkpointType,
+                                    schemaChangePhase.get().getPhase()));
+                }
+                log.info(
+                        "lock checkpoint[{}] waiting for complete..., phase: 
[{}]",
+                        barrier.getId(),
+                        schemaChangePhase.get().getPhase());
             } else {
                 throw new IllegalStateException(
                         String.format(
-                                "schema-change checkpoint[%s,%s] and phase[%s] 
is not matched",
-                                barrier.getId(),
-                                checkpointType,
-                                schemaChangePhase.get().getPhase()));
+                                "schema-change checkpoint[%s] and phase[%s] is 
not matched",
+                                barrier.getId(), checkpointType));
             }
-            log.info(
-                    "lock checkpoint[{}] waiting for complete..., phase: [{}]",
-                    barrier.getId(),
-                    schemaChangePhase.get().getPhase());
         }
     }
 

Reply via email to