This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7a81fd375 [Hotfix][Zeta] Fix cpu load problem (#4828)
7a81fd375 is described below
commit 7a81fd3751048492df5d3b38ce4bb5a5d7a26986
Author: Eric <[email protected]>
AuthorDate: Fri May 26 20:52:26 2023 +0800
[Hotfix][Zeta] Fix cpu load problem (#4828)
---
.../engine/server/task/SeaTunnelSourceCollector.java | 11 +++++++++++
.../engine/server/task/flow/SourceFlowLifeCycle.java | 5 +++++
2 files changed, 16 insertions(+)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index 7ebcdf0d8..5cab2dd0b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -40,6 +40,8 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
private final Meter sourceReceivedQPS;
+ private volatile long rowCountThisPollNext;
+
public SeaTunnelSourceCollector(
Object checkpointLock,
List<OneInputFlowLifeCycle<Record<?>>> outputs,
@@ -54,6 +56,7 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
public void collect(T row) {
try {
sendRecordToNext(new Record<>(row));
+ rowCountThisPollNext++;
sourceReceivedCount.inc();
sourceReceivedQPS.markEvent();
} catch (IOException e) {
@@ -66,6 +69,14 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
return checkpointLock;
}
+ public long getRowCountThisPollNext() {
+ return this.rowCountThisPollNext;
+ }
+
+ public void resetRowCountThisPollNext() {
+ this.rowCountThisPollNext = 0;
+ }
+
public void sendRecordToNext(Record<?> record) throws IOException {
synchronized (checkpointLock) {
for (OneInputFlowLifeCycle<Record<?>> output : outputs) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index 16adec49e..8430f6898 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -133,6 +133,11 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
public void collect() throws Exception {
if (!prepareClose) {
reader.pollNext(collector);
+ if (collector.getRowCountThisPollNext() == 0) {
+ Thread.sleep(100);
+ } else {
+ collector.resetRowCountThisPollNext();
+ }
}
}