This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 02114db626 [Improve][Zeta] Add sleep for Task to reduce CPU cost
(#5117)
02114db626 is described below
commit 02114db626a290502d33797dfbe9a11ee5b52b6f
Author: Jia Fan <[email protected]>
AuthorDate: Thu Jul 20 15:25:34 2023 +0800
[Improve][Zeta] Add sleep for Task to reduce CPU cost (#5117)
---
.../apache/seatunnel/engine/server/task/AbstractTask.java | 4 +---
.../engine/server/task/SeaTunnelSourceCollector.java | 12 ++++++------
.../engine/server/task/SinkAggregatedCommitterTask.java | 8 +++++++-
.../engine/server/task/flow/SourceFlowLifeCycle.java | 6 ++++--
4 files changed, 18 insertions(+), 12 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
index 7763297165..9f07428ff4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/AbstractTask.java
@@ -81,9 +81,7 @@ public abstract class AbstractTask implements Task {
}
@NonNull @Override
- public ProgressState call() throws Exception {
- return progress.toState();
- }
+ public abstract ProgressState call() throws Exception;
public TaskLocation getTaskLocation() {
return this.taskLocation;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index 5cab2dd0b2..2a77a49729 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -40,7 +40,7 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
private final Meter sourceReceivedQPS;
- private volatile long rowCountThisPollNext;
+ private volatile boolean emptyThisPollNext;
public SeaTunnelSourceCollector(
Object checkpointLock,
@@ -56,7 +56,7 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
public void collect(T row) {
try {
sendRecordToNext(new Record<>(row));
- rowCountThisPollNext++;
+ emptyThisPollNext = false;
sourceReceivedCount.inc();
sourceReceivedQPS.markEvent();
} catch (IOException e) {
@@ -69,12 +69,12 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
return checkpointLock;
}
- public long getRowCountThisPollNext() {
- return this.rowCountThisPollNext;
+ public boolean isEmptyThisPollNext() {
+ return emptyThisPollNext;
}
- public void resetRowCountThisPollNext() {
- this.rowCountThisPollNext = 0;
+ public void resetEmptyThisPollNext() {
+ this.emptyThisPollNext = true;
}
public void sendRecordToNext(Record<?> record) throws IOException {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
index 797033f8d2..a83f4bfb1d 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java
@@ -68,7 +68,7 @@ public class SinkAggregatedCommitterTask<CommandInfoT,
AggregatedCommitInfoT>
private static final long serialVersionUID = 5906594537520393503L;
- private SeaTunnelTaskState currState;
+ private volatile SeaTunnelTaskState currState;
private final SinkAction<?, ?, CommandInfoT, AggregatedCommitInfoT> sink;
private final int maxWriterSize;
@@ -138,16 +138,22 @@ public class SinkAggregatedCommitterTask<CommandInfoT,
AggregatedCommitInfoT>
if (restoreComplete.isDone()) {
currState = READY_START;
reportTaskStatus(READY_START);
+ } else {
+ Thread.sleep(100);
}
break;
case READY_START:
if (startCalled) {
currState = STARTING;
+ } else {
+ Thread.sleep(100);
}
break;
case STARTING:
if (receivedSinkWriter) {
currState = RUNNING;
+ } else {
+ Thread.sleep(100);
}
break;
case RUNNING:
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index e4928343cf..9ca01eba32 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -133,11 +133,13 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
public void collect() throws Exception {
if (!prepareClose) {
reader.pollNext(collector);
- if (collector.getRowCountThisPollNext() == 0) {
+ if (collector.isEmptyThisPollNext()) {
Thread.sleep(100);
} else {
- collector.resetRowCountThisPollNext();
+ collector.resetEmptyThisPollNext();
}
+ } else {
+ Thread.sleep(100);
}
}