This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e26006d2aa [Improve][Engine] Adjust the sleep mode of flink and spark 
engine to be consistent with zeta (#5698)
e26006d2aa is described below

commit e26006d2aa47df0f92740cf9843dca862db82232
Author: happyboy1024 <[email protected]>
AuthorDate: Tue Oct 31 15:23:27 2023 +0800

    [Improve][Engine] Adjust the sleep mode of flink and spark engine to be 
consistent with zeta (#5698)
---
 .../java/org/apache/seatunnel/api/source/Collector.java   |  6 ++++++
 .../engine/server/task/SeaTunnelSourceCollector.java      |  2 ++
 .../seatunnel/translation/source/CoordinatedSource.java   | 15 ++++++++++++++-
 .../seatunnel/translation/source/ParallelSource.java      | 14 +++++++++++---
 .../seatunnel/translation/flink/source/RowCollector.java  | 12 ++++++++++++
 .../reader/batch/CoordinatedBatchPartitionReader.java     | 15 ++++++++++++++-
 .../micro/CoordinatedMicroBatchPartitionReader.java       | 15 ++++++++++++++-
 .../partition/batch/CoordinatedBatchPartitionReader.java  | 15 ++++++++++++++-
 .../micro/CoordinatedMicroBatchPartitionReader.java       | 15 ++++++++++++++-
 .../spark/serialization/InternalRowCollector.java         | 12 ++++++++++++
 10 files changed, 113 insertions(+), 8 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
index 85435880c6..51ace474e5 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
@@ -40,4 +40,10 @@ public interface Collector<T> {
      * @return The object to use as the lock
      */
     Object getCheckpointLock();
+
+    default boolean isEmptyThisPollNext() {
+        return false;
+    }
+
+    default void resetEmptyThisPollNext() {}
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index 6e44089e8f..aecb64ebfc 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -192,10 +192,12 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
         return checkpointLock;
     }
 
+    @Override
     public boolean isEmptyThisPollNext() {
         return emptyThisPollNext;
     }
 
+    @Override
     public void resetEmptyThisPollNext() {
         this.emptyThisPollNext = true;
     }
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 6cf6f5d51d..553bb34507 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -165,7 +165,20 @@ public class CoordinatedSource<T, SplitT extends 
SourceSplit, StateT extends Ser
                                         while (flag.get()) {
                                             try {
                                                 reader.pollNext(collector);
-                                                
Thread.sleep(SLEEP_TIME_INTERVAL);
+                                                if 
(collector.isEmptyThisPollNext()) {
+                                                    Thread.sleep(100);
+                                                } else {
+                                                    
collector.resetEmptyThisPollNext();
+                                                    /**
+                                                     * sleep(0) is used to 
prevent the current
+                                                     * thread from occupying 
CPU resources for a
+                                                     * long time, thus 
blocking the checkpoint
+                                                     * thread for a long time. 
It is mentioned in
+                                                     * this
+                                                     * 
https://github.com/apache/seatunnel/issues/5694
+                                                     */
+                                                    Thread.sleep(0L);
+                                                }
                                             } catch (Exception e) {
                                                 running = false;
                                                 flag.set(false);
diff --git 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index d8c59e4cc4..c6934877a0 100644
--- 
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -38,8 +38,6 @@ import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
-import static 
org.apache.seatunnel.translation.source.CoordinatedSource.SLEEP_TIME_INTERVAL;
-
 public class ParallelSource<T, SplitT extends SourceSplit, StateT extends 
Serializable>
         implements BaseSourceFunction<T> {
     private static final Logger LOG = 
LoggerFactory.getLogger(ParallelSource.class);
@@ -134,7 +132,17 @@ public class ParallelSource<T, SplitT extends SourceSplit, 
StateT extends Serial
                 future.get();
             }
             reader.pollNext(collector);
-            Thread.sleep(SLEEP_TIME_INTERVAL);
+            if (collector.isEmptyThisPollNext()) {
+                Thread.sleep(100);
+            } else {
+                collector.resetEmptyThisPollNext();
+                /**
+                 * sleep(0) is used to prevent the current thread from 
occupying CPU resources for a
+                 * long time, thus blocking the checkpoint thread for a long 
time. It is mentioned
+                 * in this https://github.com/apache/seatunnel/issues/5694
+                 */
+                Thread.sleep(0L);
+            }
         }
         LOG.debug("Parallel source runs complete.");
     }
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
index 23573c31ec..daf615efd8 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
@@ -36,6 +36,7 @@ public class RowCollector implements Collector<SeaTunnelRow> {
     protected final Object checkpointLock;
 
     private FlowControlGate flowControlGate;
+    private volatile boolean emptyThisPollNext;
 
     public RowCollector(
             SourceFunction.SourceContext<Row> internalCollector,
@@ -60,6 +61,7 @@ public class RowCollector implements Collector<SeaTunnelRow> {
                 }
             }
             internalCollector.collect(rowSerialization.convert(sourceRecord));
+            emptyThisPollNext = false;
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -69,4 +71,14 @@ public class RowCollector implements Collector<SeaTunnelRow> 
{
     public Object getCheckpointLock() {
         return this.checkpointLock;
     }
+
+    @Override
+    public boolean isEmptyThisPollNext() {
+        return emptyThisPollNext;
+    }
+
+    @Override
+    public void resetEmptyThisPollNext() {
+        this.emptyThisPollNext = true;
+    }
 }
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/CoordinatedBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/CoordinatedBatchPartitionReader.java
index f901c99ddf..436e371acc 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/CoordinatedBatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/CoordinatedBatchPartitionReader.java
@@ -87,7 +87,20 @@ public class CoordinatedBatchPartitionReader extends 
ParallelBatchPartitionReade
                                             while (flag.get()) {
                                                 try {
                                                     
reader.pollNext(rowCollector);
-                                                    
Thread.sleep(SLEEP_TIME_INTERVAL);
+                                                    if 
(rowCollector.isEmptyThisPollNext()) {
+                                                        Thread.sleep(100);
+                                                    } else {
+                                                        
rowCollector.resetEmptyThisPollNext();
+                                                        /**
+                                                         * sleep(0) is used to 
prevent the current
+                                                         * thread from 
occupying CPU resources for a
+                                                         * long time, thus 
blocking the checkpoint
+                                                         * thread for a long 
time. It is mentioned
+                                                         * in this
+                                                         * 
https://github.com/apache/seatunnel/issues/5694
+                                                         */
+                                                        Thread.sleep(0L);
+                                                    }
                                                 } catch (Exception e) {
                                                     this.running = false;
                                                     flag.set(false);
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/micro/CoordinatedMicroBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/micro/CoordinatedMicroBatchPartitionReader.java
index 0aa4edacc8..0cec65e613 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/micro/CoordinatedMicroBatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/micro/CoordinatedMicroBatchPartitionReader.java
@@ -155,7 +155,20 @@ public class CoordinatedMicroBatchPartitionReader extends 
ParallelMicroBatchPart
                                             while (flag.get()) {
                                                 try {
                                                     
reader.pollNext(rowCollector);
-                                                    
Thread.sleep(SLEEP_TIME_INTERVAL);
+                                                    if 
(rowCollector.isEmptyThisPollNext()) {
+                                                        Thread.sleep(100);
+                                                    } else {
+                                                        
rowCollector.resetEmptyThisPollNext();
+                                                        /**
+                                                         * sleep(0) is used to 
prevent the current
+                                                         * thread from 
occupying CPU resources for a
+                                                         * long time, thus 
blocking the checkpoint
+                                                         * thread for a long 
time. It is mentioned
+                                                         * in this
+                                                         * 
https://github.com/apache/seatunnel/issues/5694
+                                                         */
+                                                        Thread.sleep(0L);
+                                                    }
                                                 } catch (Exception e) {
                                                     this.running = false;
                                                     flag.set(false);
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/CoordinatedBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/CoordinatedBatchPartitionReader.java
index 9302b9f089..62ff5030f3 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/CoordinatedBatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/CoordinatedBatchPartitionReader.java
@@ -87,7 +87,20 @@ public class CoordinatedBatchPartitionReader extends 
ParallelBatchPartitionReade
                                             while (flag.get()) {
                                                 try {
                                                     
reader.pollNext(rowCollector);
-                                                    
Thread.sleep(SLEEP_TIME_INTERVAL);
+                                                    if 
(rowCollector.isEmptyThisPollNext()) {
+                                                        Thread.sleep(100);
+                                                    } else {
+                                                        
rowCollector.resetEmptyThisPollNext();
+                                                        /**
+                                                         * sleep(0) is used to 
prevent the current
+                                                         * thread from 
occupying CPU resources for a
+                                                         * long time, thus 
blocking the checkpoint
+                                                         * thread for a long 
time. It is mentioned
+                                                         * in this
+                                                         * 
https://github.com/apache/seatunnel/issues/5694
+                                                         */
+                                                        Thread.sleep(0L);
+                                                    }
                                                 } catch (Exception e) {
                                                     this.running = false;
                                                     flag.set(false);
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/CoordinatedMicroBatchPartitionReader.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/CoordinatedMicroBatchPartitionReader.java
index ac020d3053..77b427d6cf 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/CoordinatedMicroBatchPartitionReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/CoordinatedMicroBatchPartitionReader.java
@@ -154,7 +154,20 @@ public class CoordinatedMicroBatchPartitionReader extends 
ParallelMicroBatchPart
                                             while (flag.get()) {
                                                 try {
                                                     
reader.pollNext(rowCollector);
-                                                    
Thread.sleep(SLEEP_TIME_INTERVAL);
+                                                    if 
(rowCollector.isEmptyThisPollNext()) {
+                                                        Thread.sleep(100);
+                                                    } else {
+                                                        
rowCollector.resetEmptyThisPollNext();
+                                                        /**
+                                                         * sleep(0) is used to 
prevent the current
+                                                         * thread from 
occupying CPU resources for a
+                                                         * long time, thus 
blocking the checkpoint
+                                                         * thread for a long 
time. It is mentioned
+                                                         * in this
+                                                         * 
https://github.com/apache/seatunnel/issues/5694
+                                                         */
+                                                        Thread.sleep(0L);
+                                                    }
                                                 } catch (Exception e) {
                                                     this.running = false;
                                                     flag.set(false);
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowCollector.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowCollector.java
index db6c518d2a..39782f174c 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowCollector.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowCollector.java
@@ -38,6 +38,7 @@ public class InternalRowCollector implements 
Collector<SeaTunnelRow> {
     private final AtomicLong collectTotalCount;
     private Map<String, Object> envOptions;
     private FlowControlGate flowControlGate;
+    private volatile boolean emptyThisPollNext;
 
     public InternalRowCollector(
             Handover<InternalRow> handover,
@@ -65,6 +66,7 @@ public class InternalRowCollector implements 
Collector<SeaTunnelRow> {
                 handover.produce(rowSerialization.convert(record));
             }
             collectTotalCount.incrementAndGet();
+            emptyThisPollNext = false;
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -78,4 +80,14 @@ public class InternalRowCollector implements 
Collector<SeaTunnelRow> {
     public Object getCheckpointLock() {
         return this.checkpointLock;
     }
+
+    @Override
+    public boolean isEmptyThisPollNext() {
+        return emptyThisPollNext;
+    }
+
+    @Override
+    public void resetEmptyThisPollNext() {
+        this.emptyThisPollNext = true;
+    }
 }

Reply via email to