This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8905524ce [Improve][Seatunnel-Engine]change queue to disruptor (#3847)
8905524ce is described below
commit 8905524ce51b3d4efb694a80e63a6275da88bb92
Author: Guangdong Liu <[email protected]>
AuthorDate: Tue Jan 31 17:59:03 2023 +0800
[Improve][Seatunnel-Engine]change queue to disruptor (#3847)
* improve change queue to disruptor
---
docs/en/seatunnel-engine/deployment.md | 12 ++++
release-note.md | 1 +
.../src/test/resources/seatunnel.yaml | 1 +
.../engine/common/config/EngineConfig.java | 9 +++
.../config/YamlSeaTunnelDomConfigProcessor.java | 5 ++
.../engine/common/config/server/QueueType.java} | 16 +-----
.../common/config/server/ServerConfigOptions.java | 3 +
.../engine/server/TaskExecutionService.java | 7 ++-
.../server/dag/physical/PhysicalPlanGenerator.java | 26 +++++++--
.../engine/server/dag/physical/PlanUtils.java | 21 ++++---
.../seatunnel/engine/server/master/JobMaster.java | 3 +-
.../engine/server/task/SeaTunnelTask.java | 8 +--
.../server/task/flow/AbstractFlowLifeCycle.java | 6 ++
.../task/flow/IntermediateQueueFlowLifeCycle.java | 50 +++++------------
.../server/task/flow/OneOutputFlowLifeCycle.java | 1 -
.../PartitionTransformSourceFlowLifeCycle.java | 1 +
.../server/task/flow/SourceFlowLifeCycle.java | 2 +-
.../AbstractTaskGroupWithIntermediateQueue.java} | 22 ++++----
...=> TaskGroupWithIntermediateBlockingQueue.java} | 14 +++--
.../group/TaskGroupWithIntermediateDisruptor.java | 65 ++++++++++++++++++++++
.../queue/AbstractIntermediateQueue.java} | 40 ++++++++-----
.../queue/IntermediateBlockingQueue.java} | 32 +++++------
.../task/group/queue/IntermediateDisruptor.java | 60 ++++++++++++++++++++
.../queue/disruptor/RecordEvent.java} | 17 ++----
.../queue/disruptor/RecordEventFactory.java} | 19 +++----
.../group/queue/disruptor/RecordEventHandler.java | 65 ++++++++++++++++++++++
.../group/queue/disruptor/RecordEventProducer.java | 52 +++++++++++++++++
.../server/checkpoint/CheckpointPlanTest.java | 4 +-
.../seatunnel/engine/server/dag/TaskTest.java | 4 +-
29 files changed, 422 insertions(+), 144 deletions(-)
diff --git a/docs/en/seatunnel-engine/deployment.md
b/docs/en/seatunnel-engine/deployment.md
index 1e37dfd02..9b0484ebd 100644
--- a/docs/en/seatunnel-engine/deployment.md
+++ b/docs/en/seatunnel-engine/deployment.md
@@ -102,6 +102,18 @@ seatunnel:
About the checkpoint storage, you can see [checkpoint
storage](checkpoint-storage.md)
+### 4.4 Intermediate Queue Type
+
+Task internal exchange queue type. There are currently two types of
`disruptor` and `blockingqueue`.
+
+```
+seatunnel:
+ engine:
+ queue_type: disruptor
+ # other config
+```
+
+
## 5. Config SeaTunnel Engine Server
All SeaTunnel Engine Server config in `hazelcast.yaml` file.
diff --git a/release-note.md b/release-note.md
index b1d2531d7..2bba2d4e3 100644
--- a/release-note.md
+++ b/release-note.md
@@ -37,6 +37,7 @@
- [Core] Improve job restart of all node down #3784
- [Checkpoint] Cancel CheckpointCoordinator First Before Cancel Task #3838
- [Storage] Remove seatunnel-api from engine storage. #3834
+- [Core] change queue to disruptor. #3847
## Bug Fixes
### Connectors
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
index 51ffc5214..df4c62583 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
@@ -18,6 +18,7 @@
seatunnel:
engine:
backup-count: 2
+ queue-type: disruptor
print-execution-info-interval: 10
slot-service:
dynamic-slot: true
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index 0037d0df8..f4d1d066c 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -18,9 +18,11 @@
package org.apache.seatunnel.engine.common.config;
import static com.hazelcast.internal.util.Preconditions.checkBackupCount;
+import static com.hazelcast.internal.util.Preconditions.checkNotNull;
import static com.hazelcast.internal.util.Preconditions.checkPositive;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
@@ -38,6 +40,8 @@ public class EngineConfig {
private CheckpointConfig checkpointConfig =
ServerConfigOptions.CHECKPOINT.defaultValue();
+ private QueueType queueType =
ServerConfigOptions.QUEUE_TYPE.defaultValue();
+
public void setBackupCount(int newBackupCount) {
checkBackupCount(newBackupCount, 0);
this.backupCount = newBackupCount;
@@ -52,4 +56,9 @@ public class EngineConfig {
checkPositive(printExecutionInfoInterval,
ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL + " must be > 0");
this.printJobMetricsInfoInterval = printJobMetricsInfoInterval;
}
+
+ public void setQueueType(QueueType queueType) {
+ checkNotNull(queueType);
+ this.queueType = queueType;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index 229d40f1c..346b377e7 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -24,6 +24,7 @@ import static
com.hazelcast.internal.config.DomConfigHelper.getIntegerValue;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
+import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
@@ -34,6 +35,7 @@ import com.hazelcast.logging.Logger;
import org.w3c.dom.Node;
import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor {
@@ -95,6 +97,8 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
engineConfig.setBackupCount(
getIntegerValue(ServerConfigOptions.BACKUP_COUNT.key(),
getTextContent(node))
);
+ } else if (ServerConfigOptions.QUEUE_TYPE.key().equals(name)) {
+
engineConfig.setQueueType(QueueType.valueOf(getTextContent(node).toUpperCase(Locale.ROOT)));
} else if
(ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key().equals(name)) {
engineConfig.setPrintExecutionInfoInterval(getIntegerValue(ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key(),
getTextContent(node)));
@@ -159,6 +163,7 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
/**
* Parse checkpoint plugin config.
+ *
* @param checkpointPluginConfigNode checkpoint plugin config node
* @return checkpoint plugin config
*/
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/QueueType.java
similarity index 66%
copy from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
copy to
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/QueueType.java
index 1fe583b2f..2a6ca85c5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/QueueType.java
@@ -15,18 +15,8 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.task.flow;
-
-import org.apache.seatunnel.api.transform.Collector;
-
-/**
- * A processing component that sends a piece of data from within the engine to
other components at a time
- *
- * @see OneInputFlowLifeCycle
- * @see SourceFlowLifeCycle
- */
-public interface OneOutputFlowLifeCycle<T> extends FlowLifeCycle {
-
- void collect(Collector<T> collector) throws Exception;
+package org.apache.seatunnel.engine.common.config.server;
+public enum QueueType {
+ DISRUPTOR, BLOCKINGQUEUE
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 5706f5341..61c1e5073 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -49,6 +49,9 @@ public class ServerConfigOptions {
public static final Option<Integer> CHECKPOINT_STORAGE_MAX_RETAINED =
Options.key("max-retained").intType().defaultValue(1).withDescription("The
maximum number of retained checkpoints.");
+ public static final Option<QueueType> QUEUE_TYPE =
Options.key("queue-type").type(new TypeReference<QueueType>() {
+ }).defaultValue(QueueType.BLOCKINGQUEUE).withDescription("The internal
data cache queue type.");
+
public static final Option<CheckpointStorageConfig> CHECKPOINT_STORAGE =
Options.key("storage").type(new TypeReference<CheckpointStorageConfig>() {
}).defaultValue(new CheckpointStorageConfig()).withDescription("The
checkpoint storage configuration.");
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index cbe3f6006..0f65aea28 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -123,6 +123,9 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
}
public TaskGroupContext getExecutionContext(TaskGroupLocation
taskGroupLocation) {
+ if (executionContexts.get(taskGroupLocation) == null) {
+ return finishedExecutionContexts.get(taskGroupLocation);
+ }
return executionContexts.get(taskGroupLocation);
}
@@ -257,7 +260,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
}
@SuppressWarnings("checkstyle:MagicNumber")
- private void notifyTaskStatusToMaster(TaskGroupLocation taskGroupLocation,
TaskExecutionState taskExecutionState){
+ private void notifyTaskStatusToMaster(TaskGroupLocation taskGroupLocation,
TaskExecutionState taskExecutionState) {
long sleepTime = 1000;
boolean notifyStateSuccess = false;
while (isRunning && !notifyStateSuccess) {
@@ -303,7 +306,7 @@ public class TaskExecutionService implements
DynamicMetricsProvider {
}
- public void notifyCleanTaskGroupContext(TaskGroupLocation
taskGroupLocation){
+ public void notifyCleanTaskGroupContext(TaskGroupLocation
taskGroupLocation) {
finishedExecutionContexts.remove(taskGroupLocation);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 2169e319f..1d6c751f6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -17,7 +17,10 @@
package org.apache.seatunnel.engine.server.dag.physical;
+import static
org.apache.seatunnel.engine.common.config.server.QueueType.BLOCKINGQUEUE;
+
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -49,7 +52,8 @@ import
org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
import org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask;
import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
import org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask;
-import
org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateQueue;
+import
org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateBlockingQueue;
+import
org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateDisruptor;
import com.google.common.collect.Lists;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
@@ -121,6 +125,8 @@ public class PhysicalPlanGenerator {
private final IMap<Object, Object> runningJobStateTimestampsIMap;
+ private final QueueType queueType;
+
public PhysicalPlanGenerator(@NonNull ExecutionPlan executionPlan,
@NonNull NodeEngine nodeEngine,
@NonNull JobImmutableInformation
jobImmutableInformation,
@@ -128,7 +134,8 @@ public class PhysicalPlanGenerator {
@NonNull ExecutorService executorService,
@NonNull FlakeIdGenerator flakeIdGenerator,
@NonNull IMap runningJobStateIMap,
- @NonNull IMap runningJobStateTimestampsIMap) {
+ @NonNull IMap runningJobStateTimestampsIMap,
+ @NonNull QueueType queueType) {
this.pipelines = executionPlan.getPipelines();
this.nodeEngine = nodeEngine;
this.jobImmutableInformation = jobImmutableInformation;
@@ -141,6 +148,7 @@ public class PhysicalPlanGenerator {
this.subtaskActions = new HashMap<>();
this.runningJobStateIMap = runningJobStateIMap;
this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
+ this.queueType = queueType;
}
public Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> generate() {
@@ -378,13 +386,21 @@ public class PhysicalPlanGenerator {
if
(taskList.stream().anyMatch(TransformSeaTunnelTask.class::isInstance)) {
// contains IntermediateExecutionFlow in task group
+ TaskGroupDefaultImpl taskGroup;
+ if (queueType.equals(BLOCKINGQUEUE)) {
+ taskGroup = new
TaskGroupWithIntermediateBlockingQueue(taskGroupLocation,
flow.getAction().getName() +
+ "-SourceTask",
+ taskList.stream().map(task -> (Task)
task).collect(Collectors.toList()));
+ } else {
+ taskGroup = new
TaskGroupWithIntermediateDisruptor(taskGroupLocation,
flow.getAction().getName() +
+ "-SourceTask",
+ taskList.stream().map(task -> (Task)
task).collect(Collectors.toList()));
+ }
t.add(new PhysicalVertex(
i,
executorService,
flow.getAction().getParallelism(),
- new
TaskGroupWithIntermediateQueue(taskGroupLocation, flow.getAction().getName() +
- "-SourceTask",
- taskList.stream().map(task -> (Task)
task).collect(Collectors.toList())),
+ taskGroup,
flakeIdGenerator,
pipelineIndex,
totalPipelineNum,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
index 976ba3daf..efced807a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.dag.physical;
+import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
@@ -34,14 +35,15 @@ import java.util.concurrent.ExecutorService;
public class PlanUtils {
public static Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>>
fromLogicalDAG(@NonNull LogicalDag logicalDag,
- @NonNull
NodeEngine nodeEngine,
- @NonNull
-
JobImmutableInformation jobImmutableInformation,
- long
initializationTimestamp,
- @NonNull
ExecutorService executorService,
- @NonNull
FlakeIdGenerator flakeIdGenerator,
- @NonNull
IMap runningJobStateIMap,
- @NonNull
IMap runningJobStateTimestampsIMap) {
+
@NonNull NodeEngine nodeEngine,
+
@NonNull
+
JobImmutableInformation jobImmutableInformation,
+
long initializationTimestamp,
+
@NonNull ExecutorService executorService,
+
@NonNull FlakeIdGenerator flakeIdGenerator,
+
@NonNull IMap runningJobStateIMap,
+
@NonNull IMap runningJobStateTimestampsIMap,
+
@NonNull QueueType queueType) {
return new PhysicalPlanGenerator(
new ExecutionPlanGenerator(logicalDag,
jobImmutableInformation).generate(),
nodeEngine,
@@ -50,6 +52,7 @@ public class PlanUtils {
executorService,
flakeIdGenerator,
runningJobStateIMap,
- runningJobStateTimestampsIMap).generate();
+ runningJobStateTimestampsIMap,
+ queueType).generate();
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index d8e40366f..f8008fd5b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -169,7 +169,8 @@ public class JobMaster {
executorService,
flakeIdGenerator,
runningJobStateIMap,
- runningJobStateTimestampsIMap);
+ runningJobStateTimestampsIMap,
+ engineConfig.getQueueType());
this.physicalPlan = planTuple.f0();
this.physicalPlan.setJobMaster(this);
this.checkpointManager = new CheckpointManager(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index b8c944e59..cf8ac987e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -60,7 +60,7 @@ import
org.apache.seatunnel.engine.server.task.flow.PartitionTransformSourceFlow
import org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle;
import org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle;
-import
org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateQueue;
+import
org.apache.seatunnel.engine.server.task.group.AbstractTaskGroupWithIntermediateQueue;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
@@ -226,8 +226,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
IntermediateQueueConfig config =
((IntermediateExecutionFlow<IntermediateQueueConfig>)
flow).getConfig();
lifeCycle = new IntermediateQueueFlowLifeCycle(this,
completableFuture,
- ((TaskGroupWithIntermediateQueue) taskBelongGroup)
- .getBlockingQueueCache(config.getQueueID()));
+ ((AbstractTaskGroupWithIntermediateQueue) taskBelongGroup)
+ .getQueueCache(config.getQueueID()));
outputs = flowLifeCycles;
} else {
throw new UnknownFlowException(flow);
@@ -305,7 +305,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
tryClose(checkpointId);
}
- public void
notifyAllAction(ConsumerWithException<InternalCheckpointListener> consumer){
+ public void
notifyAllAction(ConsumerWithException<InternalCheckpointListener> consumer) {
allCycles.stream()
.filter(cycle -> cycle instanceof InternalCheckpointListener)
.map(cycle -> (InternalCheckpointListener) cycle)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
index 716de8a16..606374049 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
@@ -19,15 +19,21 @@ package org.apache.seatunnel.engine.server.task.flow;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
+import lombok.Getter;
+import lombok.Setter;
+
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
public class AbstractFlowLifeCycle implements FlowLifeCycle {
+ @Getter
protected final SeaTunnelTask runningTask;
protected final CompletableFuture<Void> completableFuture;
+ @Getter
+ @Setter
protected Boolean prepareClose;
public AbstractFlowLifeCycle(SeaTunnelTask runningTask,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
index 3bae93e41..141d5ca0f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
@@ -19,62 +19,40 @@ package org.apache.seatunnel.engine.server.task.flow;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
-import org.apache.seatunnel.common.utils.function.ConsumerWithException;
-import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
-import org.apache.seatunnel.engine.server.task.record.Barrier;
+import
org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
-import java.util.concurrent.BlockingQueue;
+import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-public class IntermediateQueueFlowLifeCycle extends AbstractFlowLifeCycle
implements OneInputFlowLifeCycle<Record<?>>,
- OneOutputFlowLifeCycle<Record<?>> {
+public class IntermediateQueueFlowLifeCycle<T extends
AbstractIntermediateQueue<?>> extends AbstractFlowLifeCycle implements
OneInputFlowLifeCycle<Record<?>>,
+ OneOutputFlowLifeCycle<Record<?>> {
- private final BlockingQueue<Record<?>> queue;
+ private final AbstractIntermediateQueue<?> queue;
public IntermediateQueueFlowLifeCycle(SeaTunnelTask runningTask,
CompletableFuture<Void>
completableFuture,
- BlockingQueue<Record<?>> queue) {
+ AbstractIntermediateQueue<?> queue) {
super(runningTask, completableFuture);
this.queue = queue;
+ queue.setIntermediateQueueFlowLifeCycle(this);
+ queue.setRunningTask(runningTask);
}
@Override
public void received(Record<?> record) {
- try {
- handleRecord(record, queue::put);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ queue.received(record);
}
@SuppressWarnings("checkstyle:MagicNumber")
@Override
public void collect(Collector<Record<?>> collector) throws Exception {
- while (true) {
- Record<?> record = queue.poll(100, TimeUnit.MILLISECONDS);
- if (record != null) {
- handleRecord(record, collector::collect);
- } else {
- break;
- }
- }
+ queue.collect(collector);
}
- private void handleRecord(Record<?> record,
ConsumerWithException<Record<?>> consumer) throws Exception {
- if (record.getData() instanceof Barrier) {
- CheckpointBarrier barrier = (CheckpointBarrier) record.getData();
- runningTask.ack(barrier);
- if (barrier.prepareClose()) {
- prepareClose = true;
- }
- consumer.accept(record);
- } else {
- if (prepareClose) {
- return;
- }
- consumer.accept(record);
- }
+ @Override
+ public void close() throws IOException {
+ queue.close();
+ super.close();
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
index 1fe583b2f..dffa6ab97 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
@@ -28,5 +28,4 @@ import org.apache.seatunnel.api.transform.Collector;
public interface OneOutputFlowLifeCycle<T> extends FlowLifeCycle {
void collect(Collector<T> collector) throws Exception;
-
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSourceFlowLifeCycle.java
index 522f331d0..d30396f8f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSourceFlowLifeCycle.java
@@ -38,6 +38,7 @@ public class PartitionTransformSourceFlowLifeCycle<T> extends
AbstractFlowLifeCy
private long currentCheckpointId = Long.MAX_VALUE;
private int alignedBarriersCounter = 0;
+
public PartitionTransformSourceFlowLifeCycle(SeaTunnelTask runningTask,
CompletableFuture<Void> completableFuture) {
super(runningTask, completableFuture);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index b9c898425..4c60aa58c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -90,7 +90,7 @@ public class SourceFlowLifeCycle<T, SplitT extends
SourceSplit> extends ActionFl
public void init() throws Exception {
this.splitSerializer = sourceAction.getSource().getSplitSerializer();
this.reader = sourceAction.getSource()
- .createReader(new SourceReaderContext(indexID,
sourceAction.getSource().getBoundedness(), this));
+ .createReader(new SourceReaderContext(indexID,
sourceAction.getSource().getBoundedness(), this));
this.enumeratorTaskAddress = getEnumeratorTaskAddress();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/AbstractTaskGroupWithIntermediateQueue.java
similarity index 52%
copy from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
copy to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/AbstractTaskGroupWithIntermediateQueue.java
index 1fe583b2f..d520b8c74 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/AbstractTaskGroupWithIntermediateQueue.java
@@ -15,18 +15,20 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.task.flow;
+package org.apache.seatunnel.engine.server.task.group;
-import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import
org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
-/**
- * A processing component that sends a piece of data from within the engine to
other components at a time
- *
- * @see OneInputFlowLifeCycle
- * @see SourceFlowLifeCycle
- */
-public interface OneOutputFlowLifeCycle<T> extends FlowLifeCycle {
+import java.util.Collection;
+
+public abstract class AbstractTaskGroupWithIntermediateQueue extends
TaskGroupDefaultImpl {
+ public AbstractTaskGroupWithIntermediateQueue(TaskGroupLocation
taskGroupLocation, String taskGroupName, Collection<Task> tasks) {
+ super(taskGroupLocation, taskGroupName, tasks);
+ }
- void collect(Collector<T> collector) throws Exception;
+ public abstract AbstractIntermediateQueue<?> getQueueCache(long id);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
similarity index 73%
rename from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
rename to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
index bcb0786be..478a61c1b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
@@ -19,9 +19,10 @@ package org.apache.seatunnel.engine.server.task.group;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.engine.server.execution.Task;
-import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
+import
org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
+import
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue;
import java.util.Collection;
import java.util.Map;
@@ -29,11 +30,11 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
-public class TaskGroupWithIntermediateQueue extends TaskGroupDefaultImpl {
+public class TaskGroupWithIntermediateBlockingQueue extends
AbstractTaskGroupWithIntermediateQueue {
public static final int QUEUE_SIZE = 100000;
- public TaskGroupWithIntermediateQueue(TaskGroupLocation taskGroupLocation,
String taskGroupName, Collection<Task> tasks) {
+ public TaskGroupWithIntermediateBlockingQueue(TaskGroupLocation
taskGroupLocation, String taskGroupName, Collection<Task> tasks) {
super(taskGroupLocation, taskGroupName, tasks);
}
@@ -43,12 +44,13 @@ public class TaskGroupWithIntermediateQueue extends
TaskGroupDefaultImpl {
public void init() {
blockingQueueCache = new ConcurrentHashMap<>();
getTasks().stream().filter(SeaTunnelTask.class::isInstance)
- .map(s -> (SeaTunnelTask) s).forEach(s ->
s.setTaskGroup(this));
+ .map(s -> (SeaTunnelTask) s).forEach(s -> s.setTaskGroup(this));
}
- public BlockingQueue<Record<?>> getBlockingQueueCache(long id) {
+ @Override
+ public AbstractIntermediateQueue<?> getQueueCache(long id) {
blockingQueueCache.computeIfAbsent(id, i -> new
ArrayBlockingQueue<>(QUEUE_SIZE));
- return blockingQueueCache.get(id);
+ return new IntermediateBlockingQueue(blockingQueueCache.get(id));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
new file mode 100644
index 000000000..0a6bfce1f
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.group;
+
+import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
+import
org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
+import
org.apache.seatunnel.engine.server.task.group.queue.IntermediateDisruptor;
+import
org.apache.seatunnel.engine.server.task.group.queue.disruptor.RecordEvent;
+import
org.apache.seatunnel.engine.server.task.group.queue.disruptor.RecordEventFactory;
+
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.YieldingWaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TaskGroupWithIntermediateDisruptor extends
AbstractTaskGroupWithIntermediateQueue {
+
+ public static final int RING_BUFFER_SIZE = 1024;
+
+ public TaskGroupWithIntermediateDisruptor(TaskGroupLocation
taskGroupLocation, String taskGroupName, Collection<Task> tasks) {
+ super(taskGroupLocation, taskGroupName, tasks);
+ }
+
+ private Map<Long, Disruptor<RecordEvent>> disruptor = null;
+
+ @Override
+ public void init() {
+ disruptor = new ConcurrentHashMap<>();
+ getTasks().stream().filter(SeaTunnelTask.class::isInstance)
+ .map(s -> (SeaTunnelTask) s).forEach(s -> s.setTaskGroup(this));
+ }
+
+ @Override
+ public AbstractIntermediateQueue<?> getQueueCache(long id) {
+ EventFactory<RecordEvent> eventFactory = new RecordEventFactory();
+ Disruptor<RecordEvent> disruptor = new Disruptor<>(eventFactory,
RING_BUFFER_SIZE, DaemonThreadFactory.INSTANCE,
+ ProducerType.SINGLE, new YieldingWaitStrategy());
+
+ this.disruptor.putIfAbsent(id, disruptor);
+ return new IntermediateDisruptor(this.disruptor.get(id));
+ }
+
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/AbstractIntermediateQueue.java
similarity index 51%
copy from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
copy to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/AbstractIntermediateQueue.java
index 716de8a16..f8598fa84 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/AbstractIntermediateQueue.java
@@ -15,30 +15,42 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.task.flow;
+package org.apache.seatunnel.engine.server.task.group.queue;
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.transform.Collector;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
+import
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle;
+
+import lombok.Getter;
+import lombok.Setter;
import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-public class AbstractFlowLifeCycle implements FlowLifeCycle {
+public abstract class AbstractIntermediateQueue<T> {
- protected final SeaTunnelTask runningTask;
+ @Getter
+ @Setter
+ private SeaTunnelTask runningTask;
- protected final CompletableFuture<Void> completableFuture;
+ @Getter
+ @Setter
+ private IntermediateQueueFlowLifeCycle<?> intermediateQueueFlowLifeCycle;
- protected Boolean prepareClose;
+ private final T queue;
- public AbstractFlowLifeCycle(SeaTunnelTask runningTask,
- CompletableFuture<Void> completableFuture) {
- this.runningTask = runningTask;
- this.completableFuture = completableFuture;
- this.prepareClose = false;
+ public AbstractIntermediateQueue(T queue) {
+ this.queue = queue;
}
- @Override
- public void close() throws IOException {
- completableFuture.complete(null);
+ public T getIntermediateQueue() {
+ return queue;
}
+
+ public abstract void received(Record<?> record);
+
+ public abstract void collect(Collector<Record<?>> collector) throws
Exception;
+
+ public abstract void close() throws IOException;
+
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
similarity index 70%
copy from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
copy to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
index 3bae93e41..6699a1a20 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
@@ -15,35 +15,28 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.task.flow;
+package org.apache.seatunnel.engine.server.task.group.queue;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
import org.apache.seatunnel.common.utils.function.ConsumerWithException;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
-import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.record.Barrier;
+import java.io.IOException;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-public class IntermediateQueueFlowLifeCycle extends AbstractFlowLifeCycle
implements OneInputFlowLifeCycle<Record<?>>,
- OneOutputFlowLifeCycle<Record<?>> {
+public class IntermediateBlockingQueue extends
AbstractIntermediateQueue<BlockingQueue<Record<?>>> {
- private final BlockingQueue<Record<?>> queue;
-
- public IntermediateQueueFlowLifeCycle(SeaTunnelTask runningTask,
- CompletableFuture<Void>
completableFuture,
- BlockingQueue<Record<?>> queue) {
- super(runningTask, completableFuture);
- this.queue = queue;
+ public IntermediateBlockingQueue(BlockingQueue<Record<?>> queue) {
+ super(queue);
}
@Override
public void received(Record<?> record) {
try {
- handleRecord(record, queue::put);
+ handleRecord(record, getIntermediateQueue()::put);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -53,7 +46,7 @@ public class IntermediateQueueFlowLifeCycle extends
AbstractFlowLifeCycle implem
@Override
public void collect(Collector<Record<?>> collector) throws Exception {
while (true) {
- Record<?> record = queue.poll(100, TimeUnit.MILLISECONDS);
+ Record<?> record = getIntermediateQueue().poll(100,
TimeUnit.MILLISECONDS);
if (record != null) {
handleRecord(record, collector::collect);
} else {
@@ -62,16 +55,21 @@ public class IntermediateQueueFlowLifeCycle extends
AbstractFlowLifeCycle implem
}
}
+ @Override
+ public void close() throws IOException {
+ //nothing
+ }
+
private void handleRecord(Record<?> record,
ConsumerWithException<Record<?>> consumer) throws Exception {
if (record.getData() instanceof Barrier) {
CheckpointBarrier barrier = (CheckpointBarrier) record.getData();
- runningTask.ack(barrier);
+ getRunningTask().ack(barrier);
if (barrier.prepareClose()) {
- prepareClose = true;
+ getIntermediateQueueFlowLifeCycle().setPrepareClose(true);
}
consumer.accept(record);
} else {
- if (prepareClose) {
+ if (getIntermediateQueueFlowLifeCycle().getPrepareClose()) {
return;
}
consumer.accept(record);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateDisruptor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateDisruptor.java
new file mode 100644
index 000000000..c163c55ed
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateDisruptor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.group.queue;
+
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.transform.Collector;
+import
org.apache.seatunnel.engine.server.task.group.queue.disruptor.RecordEvent;
+import
org.apache.seatunnel.engine.server.task.group.queue.disruptor.RecordEventHandler;
+import
org.apache.seatunnel.engine.server.task.group.queue.disruptor.RecordEventProducer;
+
+import com.lmax.disruptor.dsl.Disruptor;
+
+import java.io.IOException;
+
+public class IntermediateDisruptor extends
AbstractIntermediateQueue<Disruptor<RecordEvent>> {
+
+ public IntermediateDisruptor(Disruptor<RecordEvent> queue) {
+ super(queue);
+ }
+
+ private volatile boolean isExecuted;
+
+ @Override
+ public void received(Record<?> record) {
+ getIntermediateQueue().getRingBuffer();
+ RecordEventProducer.onData(record,
getIntermediateQueue().getRingBuffer(), getIntermediateQueueFlowLifeCycle());
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @Override
+ public void collect(Collector<Record<?>> collector) throws Exception {
+ if (!isExecuted) {
+ getIntermediateQueue().handleEventsWith(new
RecordEventHandler(getRunningTask(), collector,
getIntermediateQueueFlowLifeCycle()));
+ getIntermediateQueue().start();
+ isExecuted = true;
+ } else {
+ Thread.sleep(100);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ getIntermediateQueue().shutdown();
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEvent.java
similarity index 66%
copy from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
copy to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEvent.java
index 1fe583b2f..07c7afb38 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEvent.java
@@ -15,18 +15,13 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.task.flow;
+package org.apache.seatunnel.engine.server.task.group.queue.disruptor;
-import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.api.table.type.Record;
-/**
- * A processing component that sends a piece of data from within the engine to
other components at a time
- *
- * @see OneInputFlowLifeCycle
- * @see SourceFlowLifeCycle
- */
-public interface OneOutputFlowLifeCycle<T> extends FlowLifeCycle {
-
- void collect(Collector<T> collector) throws Exception;
+import lombok.Data;
+@Data
+public class RecordEvent {
+ private Record<?> record;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventFactory.java
similarity index 66%
copy from
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
copy to
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventFactory.java
index 1fe583b2f..be861d62f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventFactory.java
@@ -15,18 +15,13 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.server.task.flow;
+package org.apache.seatunnel.engine.server.task.group.queue.disruptor;
-import org.apache.seatunnel.api.transform.Collector;
-
-/**
- * A processing component that sends a piece of data from within the engine to
other components at a time
- *
- * @see OneInputFlowLifeCycle
- * @see SourceFlowLifeCycle
- */
-public interface OneOutputFlowLifeCycle<T> extends FlowLifeCycle {
-
- void collect(Collector<T> collector) throws Exception;
+import com.lmax.disruptor.EventFactory;
+public class RecordEventFactory implements EventFactory<RecordEvent> {
+ @Override
+ public RecordEvent newInstance() {
+ return new RecordEvent();
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java
new file mode 100644
index 000000000..ad64fd595
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.group.queue.disruptor;
+
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
+import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
+import
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle;
+import org.apache.seatunnel.engine.server.task.record.Barrier;
+
+import com.lmax.disruptor.EventHandler;
+
+public class RecordEventHandler implements EventHandler<RecordEvent> {
+
+ private final SeaTunnelTask runningTask;
+
+ private final Collector<Record<?>> collector;
+
+ private final IntermediateQueueFlowLifeCycle
intermediateQueueFlowLifeCycle;
+
+ public RecordEventHandler(SeaTunnelTask runningTask, Collector<Record<?>>
collector, IntermediateQueueFlowLifeCycle intermediateQueueFlowLifeCycle) {
+ this.runningTask = runningTask;
+ this.collector = collector;
+ this.intermediateQueueFlowLifeCycle = intermediateQueueFlowLifeCycle;
+ }
+
+ @Override
+ public void onEvent(RecordEvent recordEvent, long sequence, boolean
endOfBatch) throws Exception {
+ handleRecord(recordEvent.getRecord(), collector);
+ }
+
+ private void handleRecord(Record<?> record, Collector<Record<?>>
collector) throws Exception {
+ if (record != null) {
+ if (record.getData() instanceof Barrier) {
+ CheckpointBarrier barrier = (CheckpointBarrier)
record.getData();
+ runningTask.ack(barrier);
+ if (barrier.prepareClose()) {
+ this.intermediateQueueFlowLifeCycle.setPrepareClose(true);
+ }
+ } else {
+ if (this.intermediateQueueFlowLifeCycle.getPrepareClose()) {
+ return;
+ }
+ }
+ collector.collect(record);
+ }
+ }
+
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java
new file mode 100644
index 000000000..7862fb5c2
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.group.queue.disruptor;
+
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
+import
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle;
+import org.apache.seatunnel.engine.server.task.record.Barrier;
+
+import com.lmax.disruptor.RingBuffer;
+
+public class RecordEventProducer {
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public static void onData(Record<?> record, RingBuffer<RecordEvent>
ringBuffer, IntermediateQueueFlowLifeCycle intermediateQueueFlowLifeCycle) {
+
+ if (record.getData() instanceof Barrier) {
+ CheckpointBarrier barrier = (CheckpointBarrier) record.getData();
+ intermediateQueueFlowLifeCycle.getRunningTask().ack(barrier);
+ if (barrier.prepareClose()) {
+ intermediateQueueFlowLifeCycle.setPrepareClose(true);
+ }
+ } else {
+ if (intermediateQueueFlowLifeCycle.getPrepareClose()) {
+ return;
+ }
+ }
+
+ long sequence = ringBuffer.next();
+ try {
+ RecordEvent recordEvent = ringBuffer.get(sequence);
+ recordEvent.setRecord(record);
+ } finally {
+ ringBuffer.publish(sequence);
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
index d7c23b6e7..77da5b8f5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
@@ -67,7 +68,8 @@ public class CheckpointPlanTest extends
AbstractSeaTunnelServerTest {
Executors.newCachedThreadPool(),
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
runningJobState,
- runningJobStateTimestamp).f1();
+ runningJobStateTimestamp,
+ QueueType.BLOCKINGQUEUE).f1();
Assertions.assertNotNull(checkpointPlans);
Assertions.assertEquals(2, checkpointPlans.size());
// enum(1) + reader(2) + writer(2)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index d12c2dac3..7bec42e2b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -23,6 +23,7 @@ import
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -108,7 +109,8 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
Executors.newCachedThreadPool(),
instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
runningJobState,
- runningJobStateTimestamp).f0();
+ runningJobStateTimestamp,
+ QueueType.BLOCKINGQUEUE).f0();
Assertions.assertEquals(physicalPlan.getPipelineList().size(), 1);
Assertions.assertEquals(physicalPlan.getPipelineList().get(0).getCoordinatorVertexList().size(),
1);