This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a1d13b9157 [Bugfix][Zeta] Fix the checkpoint be blocked with long time 
(#5695)
a1d13b9157 is described below

commit a1d13b9157794c3c1c5769f564599b944d8fee76
Author: happyboy1024 <[email protected]>
AuthorDate: Tue Oct 24 20:46:02 2023 +0800

    [Bugfix][Zeta] Fix the checkpoint be blocked with long time (#5695)
---
 .../seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java    | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index 572836fe51..f5964badee 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -152,6 +152,14 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
                 Thread.sleep(100);
             } else {
                 collector.resetEmptyThisPollNext();
+                /**
+                 * The current thread obtain a checkpoint lock in the method 
{@link
+                 * SourceReader#pollNext(Collector)}. When trigger the 
checkpoint or savepoint,
+                 * other threads try to obtain the lock in the method {@link
+                 * SourceFlowLifeCycle#triggerBarrier(Barrier)}. When high CPU 
load, checkpoint
+                 * process may be blocked as long time. So we need sleep to 
free the CPU.
+                 */
+                Thread.sleep(0L);
             }
 
             if (collector.captureSchemaChangeBeforeCheckpointSignal()) {

Reply via email to