This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e26006d2aa [Improve][Engine] Adjust the sleep mode of flink and spark
engine to be consistent with zeta (#5698)
e26006d2aa is described below
commit e26006d2aa47df0f92740cf9843dca862db82232
Author: happyboy1024 <[email protected]>
AuthorDate: Tue Oct 31 15:23:27 2023 +0800
[Improve][Engine] Adjust the sleep mode of flink and spark engine to be
consistent with zeta (#5698)
---
.../java/org/apache/seatunnel/api/source/Collector.java | 6 ++++++
.../engine/server/task/SeaTunnelSourceCollector.java | 2 ++
.../seatunnel/translation/source/CoordinatedSource.java | 15 ++++++++++++++-
.../seatunnel/translation/source/ParallelSource.java | 14 +++++++++++---
.../seatunnel/translation/flink/source/RowCollector.java | 12 ++++++++++++
.../reader/batch/CoordinatedBatchPartitionReader.java | 15 ++++++++++++++-
.../micro/CoordinatedMicroBatchPartitionReader.java | 15 ++++++++++++++-
.../partition/batch/CoordinatedBatchPartitionReader.java | 15 ++++++++++++++-
.../micro/CoordinatedMicroBatchPartitionReader.java | 15 ++++++++++++++-
.../spark/serialization/InternalRowCollector.java | 12 ++++++++++++
10 files changed, 113 insertions(+), 8 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
index 85435880c6..51ace474e5 100644
--- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/Collector.java
@@ -40,4 +40,10 @@ public interface Collector<T> {
* @return The object to use as the lock
*/
Object getCheckpointLock();
+
+ default boolean isEmptyThisPollNext() {
+ return false;
+ }
+
+ default void resetEmptyThisPollNext() {}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index 6e44089e8f..aecb64ebfc 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -192,10 +192,12 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
return checkpointLock;
}
+ @Override
public boolean isEmptyThisPollNext() {
return emptyThisPollNext;
}
+ @Override
public void resetEmptyThisPollNext() {
this.emptyThisPollNext = true;
}
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
index 6cf6f5d51d..553bb34507 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java
@@ -165,7 +165,20 @@ public class CoordinatedSource<T, SplitT extends
SourceSplit, StateT extends Ser
while (flag.get()) {
try {
reader.pollNext(collector);
-
Thread.sleep(SLEEP_TIME_INTERVAL);
+ if
(collector.isEmptyThisPollNext()) {
+ Thread.sleep(100);
+ } else {
+
collector.resetEmptyThisPollNext();
+ /**
+ * sleep(0) is used to
prevent the current
+ * thread from occupying
CPU resources for a
+ * long time, thus
blocking the checkpoint
+ * thread for a long time.
It is mentioned in
+ * this
+ *
https://github.com/apache/seatunnel/issues/5694
+ */
+ Thread.sleep(0L);
+ }
} catch (Exception e) {
running = false;
flag.set(false);
diff --git
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
index d8c59e4cc4..c6934877a0 100644
---
a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
+++
b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/ParallelSource.java
@@ -38,8 +38,6 @@ import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-import static
org.apache.seatunnel.translation.source.CoordinatedSource.SLEEP_TIME_INTERVAL;
-
public class ParallelSource<T, SplitT extends SourceSplit, StateT extends
Serializable>
implements BaseSourceFunction<T> {
private static final Logger LOG =
LoggerFactory.getLogger(ParallelSource.class);
@@ -134,7 +132,17 @@ public class ParallelSource<T, SplitT extends SourceSplit,
StateT extends Serial
future.get();
}
reader.pollNext(collector);
- Thread.sleep(SLEEP_TIME_INTERVAL);
+ if (collector.isEmptyThisPollNext()) {
+ Thread.sleep(100);
+ } else {
+ collector.resetEmptyThisPollNext();
+ /**
+ * sleep(0) is used to prevent the current thread from
occupying CPU resources for a
+ * long time, thus blocking the checkpoint thread for a long
time. It is mentioned
+ * in this https://github.com/apache/seatunnel/issues/5694
+ */
+ Thread.sleep(0L);
+ }
}
LOG.debug("Parallel source runs complete.");
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
index 23573c31ec..daf615efd8 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/RowCollector.java
@@ -36,6 +36,7 @@ public class RowCollector implements Collector<SeaTunnelRow> {
protected final Object checkpointLock;
private FlowControlGate flowControlGate;
+ private volatile boolean emptyThisPollNext;
public RowCollector(
SourceFunction.SourceContext<Row> internalCollector,
@@ -60,6 +61,7 @@ public class RowCollector implements Collector<SeaTunnelRow> {
}
}
internalCollector.collect(rowSerialization.convert(sourceRecord));
+ emptyThisPollNext = false;
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -69,4 +71,14 @@ public class RowCollector implements Collector<SeaTunnelRow>
{
public Object getCheckpointLock() {
return this.checkpointLock;
}
+
+ @Override
+ public boolean isEmptyThisPollNext() {
+ return emptyThisPollNext;
+ }
+
+ @Override
+ public void resetEmptyThisPollNext() {
+ this.emptyThisPollNext = true;
+ }
}
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/CoordinatedBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/CoordinatedBatchPartitionReader.java
index f901c99ddf..436e371acc 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/CoordinatedBatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/batch/CoordinatedBatchPartitionReader.java
@@ -87,7 +87,20 @@ public class CoordinatedBatchPartitionReader extends
ParallelBatchPartitionReade
while (flag.get()) {
try {
reader.pollNext(rowCollector);
-
Thread.sleep(SLEEP_TIME_INTERVAL);
+ if
(rowCollector.isEmptyThisPollNext()) {
+ Thread.sleep(100);
+ } else {
+
rowCollector.resetEmptyThisPollNext();
+ /**
+ * sleep(0) is used to
prevent the current
+ * thread from
occupying CPU resources for a
+ * long time, thus
blocking the checkpoint
+ * thread for a long
time. It is mentioned
+ * in this
+ *
https://github.com/apache/seatunnel/issues/5694
+ */
+ Thread.sleep(0L);
+ }
} catch (Exception e) {
this.running = false;
flag.set(false);
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/micro/CoordinatedMicroBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/micro/CoordinatedMicroBatchPartitionReader.java
index 0aa4edacc8..0cec65e613 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/micro/CoordinatedMicroBatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/reader/micro/CoordinatedMicroBatchPartitionReader.java
@@ -155,7 +155,20 @@ public class CoordinatedMicroBatchPartitionReader extends
ParallelMicroBatchPart
while (flag.get()) {
try {
reader.pollNext(rowCollector);
-
Thread.sleep(SLEEP_TIME_INTERVAL);
+ if
(rowCollector.isEmptyThisPollNext()) {
+ Thread.sleep(100);
+ } else {
+
rowCollector.resetEmptyThisPollNext();
+ /**
+ * sleep(0) is used to
prevent the current
+ * thread from
occupying CPU resources for a
+ * long time, thus
blocking the checkpoint
+ * thread for a long
time. It is mentioned
+ * in this
+ *
https://github.com/apache/seatunnel/issues/5694
+ */
+ Thread.sleep(0L);
+ }
} catch (Exception e) {
this.running = false;
flag.set(false);
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/CoordinatedBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/CoordinatedBatchPartitionReader.java
index 9302b9f089..62ff5030f3 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/CoordinatedBatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/batch/CoordinatedBatchPartitionReader.java
@@ -87,7 +87,20 @@ public class CoordinatedBatchPartitionReader extends
ParallelBatchPartitionReade
while (flag.get()) {
try {
reader.pollNext(rowCollector);
-
Thread.sleep(SLEEP_TIME_INTERVAL);
+ if
(rowCollector.isEmptyThisPollNext()) {
+ Thread.sleep(100);
+ } else {
+
rowCollector.resetEmptyThisPollNext();
+ /**
+ * sleep(0) is used to
prevent the current
+ * thread from
occupying CPU resources for a
+ * long time, thus
blocking the checkpoint
+ * thread for a long
time. It is mentioned
+ * in this
+ *
https://github.com/apache/seatunnel/issues/5694
+ */
+ Thread.sleep(0L);
+ }
} catch (Exception e) {
this.running = false;
flag.set(false);
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/CoordinatedMicroBatchPartitionReader.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/CoordinatedMicroBatchPartitionReader.java
index ac020d3053..77b427d6cf 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/CoordinatedMicroBatchPartitionReader.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/partition/micro/CoordinatedMicroBatchPartitionReader.java
@@ -154,7 +154,20 @@ public class CoordinatedMicroBatchPartitionReader extends
ParallelMicroBatchPart
while (flag.get()) {
try {
reader.pollNext(rowCollector);
-
Thread.sleep(SLEEP_TIME_INTERVAL);
+ if
(rowCollector.isEmptyThisPollNext()) {
+ Thread.sleep(100);
+ } else {
+
rowCollector.resetEmptyThisPollNext();
+ /**
+ * sleep(0) is used to
prevent the current
+ * thread from
occupying CPU resources for a
+ * long time, thus
blocking the checkpoint
+ * thread for a long
time. It is mentioned
+ * in this
+ *
https://github.com/apache/seatunnel/issues/5694
+ */
+ Thread.sleep(0L);
+ }
} catch (Exception e) {
this.running = false;
flag.set(false);
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowCollector.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowCollector.java
index db6c518d2a..39782f174c 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowCollector.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowCollector.java
@@ -38,6 +38,7 @@ public class InternalRowCollector implements
Collector<SeaTunnelRow> {
private final AtomicLong collectTotalCount;
private Map<String, Object> envOptions;
private FlowControlGate flowControlGate;
+ private volatile boolean emptyThisPollNext;
public InternalRowCollector(
Handover<InternalRow> handover,
@@ -65,6 +66,7 @@ public class InternalRowCollector implements
Collector<SeaTunnelRow> {
handover.produce(rowSerialization.convert(record));
}
collectTotalCount.incrementAndGet();
+ emptyThisPollNext = false;
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -78,4 +80,14 @@ public class InternalRowCollector implements
Collector<SeaTunnelRow> {
public Object getCheckpointLock() {
return this.checkpointLock;
}
+
+ @Override
+ public boolean isEmptyThisPollNext() {
+ return emptyThisPollNext;
+ }
+
+ @Override
+ public void resetEmptyThisPollNext() {
+ this.emptyThisPollNext = true;
+ }
}