This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new eae32bfb84 [Core][Flink] Resolve streaming source high CPU usage
(#8354)
eae32bfb84 is described below
commit eae32bfb840572e6509486541d2176cab1763c06
Author: Jast <[email protected]>
AuthorDate: Tue Dec 24 19:19:38 2024 +0800
[Core][Flink] Resolve streaming source high CPU usage (#8354)
---
.../flink/source/FlinkRowCollector.java | 14 +++++++
.../flink/source/FlinkSourceReader.java | 44 +++++++++++++++++++++-
2 files changed, 56 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
index 2ea584029e..c7eadfcfb7 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
@@ -46,6 +46,8 @@ public class FlinkRowCollector implements
Collector<SeaTunnelRow> {
private final Meter sourceReadQPS;
+ private boolean emptyThisPollNext = true;
+
public FlinkRowCollector(Config envConfig, MetricsContext metricsContext) {
this.flowControlGate =
FlowControlGate.create(FlowControlStrategy.fromConfig(envConfig));
this.sourceReadCount =
metricsContext.counter(MetricNames.SOURCE_RECEIVED_COUNT);
@@ -61,6 +63,7 @@ public class FlinkRowCollector implements
Collector<SeaTunnelRow> {
sourceReadCount.inc();
sourceReadBytes.inc(record.getBytesSize());
sourceReadQPS.markEvent();
+ emptyThisPollNext = false;
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -71,8 +74,19 @@ public class FlinkRowCollector implements
Collector<SeaTunnelRow> {
return this;
}
+ @Override
+ public boolean isEmptyThisPollNext() {
+ return emptyThisPollNext;
+ }
+
+ @Override
+ public void resetEmptyThisPollNext() {
+ this.emptyThisPollNext = true;
+ }
+
public FlinkRowCollector withReaderOutput(ReaderOutput<SeaTunnelRow>
readerOutput) {
this.readerOutput = readerOutput;
+ this.emptyThisPollNext = true;
return this;
}
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
index fb1dc85174..4c2a7b6d2e 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.translation.flink.source;
+import
org.apache.seatunnel.shade.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.source.SourceSplit;
@@ -34,6 +35,9 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@@ -55,10 +59,25 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
private InputStatus inputStatus = InputStatus.MORE_AVAILABLE;
+ private volatile CompletableFuture<Void> availabilityFuture;
+
+ private static final long DEFAULT_WAIT_TIME_MILLIS = 1000L;
+
+ private final ScheduledExecutorService scheduledExecutor;
+
public FlinkSourceReader(
org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT>
sourceReader,
org.apache.seatunnel.api.source.SourceReader.Context context,
Config envConfig) {
+ this.scheduledExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat(
+ String.format(
+ "source-reader-scheduler-%d",
+ context.getIndexOfSubtask()))
+ .build());
this.sourceReader = sourceReader;
this.context = context;
this.flinkRowCollector = new FlinkRowCollector(envConfig,
context.getMetricsContext());
@@ -78,9 +97,19 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
public InputStatus pollNext(ReaderOutput<SeaTunnelRow> output) throws
Exception {
if (!((FlinkSourceReaderContext) context).isSendNoMoreElementEvent()) {
sourceReader.pollNext(flinkRowCollector.withReaderOutput(output));
+ if (flinkRowCollector.isEmptyThisPollNext()) {
+ synchronized (this) {
+ if (availabilityFuture == null ||
availabilityFuture.isDone()) {
+ availabilityFuture = new CompletableFuture<>();
+ scheduleComplete(availabilityFuture);
+ LOGGER.debug("No data available, wait for next poll.");
+ }
+ }
+ return InputStatus.NOTHING_AVAILABLE;
+ }
} else {
// reduce CPU idle
- Thread.sleep(1000L);
+ Thread.sleep(DEFAULT_WAIT_TIME_MILLIS);
}
return inputStatus;
}
@@ -97,7 +126,8 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
@Override
public CompletableFuture<Void> isAvailable() {
- return CompletableFuture.completedFuture(null);
+ CompletableFuture<Void> future = availabilityFuture;
+ return future != null ? future :
CompletableFuture.completedFuture(null);
}
@Override
@@ -123,8 +153,13 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
@Override
public void close() throws Exception {
+ CompletableFuture<Void> future = availabilityFuture;
+ if (future != null && !future.isDone()) {
+ future.complete(null);
+ }
sourceReader.close();
context.getEventListener().onEvent(new ReaderCloseEvent());
+ scheduledExecutor.shutdown();
}
@Override
@@ -136,4 +171,9 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
public void notifyCheckpointAborted(long checkpointId) throws Exception {
sourceReader.notifyCheckpointAborted(checkpointId);
}
+
+ private void scheduleComplete(CompletableFuture<Void> future) {
+ scheduledExecutor.schedule(
+ () -> future.complete(null), DEFAULT_WAIT_TIME_MILLIS,
TimeUnit.MILLISECONDS);
+ }
}