This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 02114db626 [Improve][Zeta] Add sleep for Task to reduce CPU cost 
(#5117)
02114db626 is described below

commit 02114db626a290502d33797dfbe9a11ee5b52b6f
Author: Jia Fan <[email protected]>
AuthorDate: Thu Jul 20 15:25:34 2023 +0800

    [Improve][Zeta] Add sleep for Task to reduce CPU cost (#5117)
---
 .../apache/seatunnel/engine/server/task/AbstractTask.java    |  4 +---
 .../engine/server/task/SeaTunnelSourceCollector.java         | 12 ++++++------
 .../engine/server/task/SinkAggregatedCommitterTask.java      |  8 +++++++-
 .../engine/server/task/flow/SourceFlowLifeCycle.java         |  6 ++++--
 4 files changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
index 7763297165..9f07428ff4 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
@@ -81,9 +81,7 @@ public abstract class AbstractTask implements Task {
     }
 
     @NonNull @Override
-    public ProgressState call() throws Exception {
-        return progress.toState();
-    }
+    public abstract ProgressState call() throws Exception;
 
     public TaskLocation getTaskLocation() {
         return this.taskLocation;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index 5cab2dd0b2..2a77a49729 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -40,7 +40,7 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
 
     private final Meter sourceReceivedQPS;
 
-    private volatile long rowCountThisPollNext;
+    private volatile boolean emptyThisPollNext;
 
     public SeaTunnelSourceCollector(
             Object checkpointLock,
@@ -56,7 +56,7 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
     public void collect(T row) {
         try {
             sendRecordToNext(new Record<>(row));
-            rowCountThisPollNext++;
+            emptyThisPollNext = false;
             sourceReceivedCount.inc();
             sourceReceivedQPS.markEvent();
         } catch (IOException e) {
@@ -69,12 +69,12 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
         return checkpointLock;
     }
 
-    public long getRowCountThisPollNext() {
-        return this.rowCountThisPollNext;
+    public boolean isEmptyThisPollNext() {
+        return emptyThisPollNext;
     }
 
-    public void resetRowCountThisPollNext() {
-        this.rowCountThisPollNext = 0;
+    public void resetEmptyThisPollNext() {
+        this.emptyThisPollNext = true;
     }
 
     public void sendRecordToNext(Record<?> record) throws IOException {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 797033f8d2..a83f4bfb1d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -68,7 +68,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT, 
AggregatedCommitInfoT>
 
     private static final long serialVersionUID = 5906594537520393503L;
 
-    private SeaTunnelTaskState currState;
+    private volatile SeaTunnelTaskState currState;
     private final SinkAction<?, ?, CommandInfoT, AggregatedCommitInfoT> sink;
     private final int maxWriterSize;
 
@@ -138,16 +138,22 @@ public class SinkAggregatedCommitterTask<CommandInfoT, 
AggregatedCommitInfoT>
                 if (restoreComplete.isDone()) {
                     currState = READY_START;
                     reportTaskStatus(READY_START);
+                } else {
+                    Thread.sleep(100);
                 }
                 break;
             case READY_START:
                 if (startCalled) {
                     currState = STARTING;
+                } else {
+                    Thread.sleep(100);
                 }
                 break;
             case STARTING:
                 if (receivedSinkWriter) {
                     currState = RUNNING;
+                } else {
+                    Thread.sleep(100);
                 }
                 break;
             case RUNNING:
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index e4928343cf..9ca01eba32 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -133,11 +133,13 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
     public void collect() throws Exception {
         if (!prepareClose) {
             reader.pollNext(collector);
-            if (collector.getRowCountThisPollNext() == 0) {
+            if (collector.isEmptyThisPollNext()) {
                 Thread.sleep(100);
             } else {
-                collector.resetRowCountThisPollNext();
+                collector.resetEmptyThisPollNext();
             }
+        } else {
+            Thread.sleep(100);
         }
     }
 

Reply via email to