This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8905524ce [Improve][Seatunnel-Engine]change queue to disruptor (#3847)
8905524ce is described below

commit 8905524ce51b3d4efb694a80e63a6275da88bb92
Author: Guangdong Liu <[email protected]>
AuthorDate: Tue Jan 31 17:59:03 2023 +0800

    [Improve][Seatunnel-Engine]change queue to disruptor (#3847)
    
    * improve change queue to disruptor
---
 docs/en/seatunnel-engine/deployment.md             | 12 ++++
 release-note.md                                    |  1 +
 .../src/test/resources/seatunnel.yaml              |  1 +
 .../engine/common/config/EngineConfig.java         |  9 +++
 .../config/YamlSeaTunnelDomConfigProcessor.java    |  5 ++
 .../engine/common/config/server/QueueType.java}    | 16 +-----
 .../common/config/server/ServerConfigOptions.java  |  3 +
 .../engine/server/TaskExecutionService.java        |  7 ++-
 .../server/dag/physical/PhysicalPlanGenerator.java | 26 +++++++--
 .../engine/server/dag/physical/PlanUtils.java      | 21 ++++---
 .../seatunnel/engine/server/master/JobMaster.java  |  3 +-
 .../engine/server/task/SeaTunnelTask.java          |  8 +--
 .../server/task/flow/AbstractFlowLifeCycle.java    |  6 ++
 .../task/flow/IntermediateQueueFlowLifeCycle.java  | 50 +++++------------
 .../server/task/flow/OneOutputFlowLifeCycle.java   |  1 -
 .../PartitionTransformSourceFlowLifeCycle.java     |  1 +
 .../server/task/flow/SourceFlowLifeCycle.java      |  2 +-
 .../AbstractTaskGroupWithIntermediateQueue.java}   | 22 ++++----
 ...=> TaskGroupWithIntermediateBlockingQueue.java} | 14 +++--
 .../group/TaskGroupWithIntermediateDisruptor.java  | 65 ++++++++++++++++++++++
 .../queue/AbstractIntermediateQueue.java}          | 40 ++++++++-----
 .../queue/IntermediateBlockingQueue.java}          | 32 +++++------
 .../task/group/queue/IntermediateDisruptor.java    | 60 ++++++++++++++++++++
 .../queue/disruptor/RecordEvent.java}              | 17 ++----
 .../queue/disruptor/RecordEventFactory.java}       | 19 +++----
 .../group/queue/disruptor/RecordEventHandler.java  | 65 ++++++++++++++++++++++
 .../group/queue/disruptor/RecordEventProducer.java | 52 +++++++++++++++++
 .../server/checkpoint/CheckpointPlanTest.java      |  4 +-
 .../seatunnel/engine/server/dag/TaskTest.java      |  4 +-
 29 files changed, 422 insertions(+), 144 deletions(-)

diff --git a/docs/en/seatunnel-engine/deployment.md 
b/docs/en/seatunnel-engine/deployment.md
index 1e37dfd02..9b0484ebd 100644
--- a/docs/en/seatunnel-engine/deployment.md
+++ b/docs/en/seatunnel-engine/deployment.md
@@ -102,6 +102,18 @@ seatunnel:
 
 About the checkpoint storage, you can see [checkpoint 
storage](checkpoint-storage.md)
 
+### 4.4 Intermediate Queue Type
+
+Task internal exchange queue type. There are currently two types of 
`disruptor` and `blockingqueue`.
+
+```
+seatunnel:
+    engine:
+        queue_type: disruptor
+        # other config
+```
+
+
 ## 5. Config SeaTunnel Engine Server
 
 All SeaTunnel Engine Server config in `hazelcast.yaml` file.
diff --git a/release-note.md b/release-note.md
index b1d2531d7..2bba2d4e3 100644
--- a/release-note.md
+++ b/release-note.md
@@ -37,6 +37,7 @@
 - [Core] Improve job restart of all node down #3784
 - [Checkpoint] Cancel CheckpointCoordinator First Before Cancel Task #3838
 - [Storage] Remove seatunnel-api from engine storage. #3834
+- [Core] change queue to disruptor. #3847
 
 ## Bug Fixes
 ### Connectors
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
index 51ffc5214..df4c62583 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
@@ -18,6 +18,7 @@
 seatunnel:
   engine:
     backup-count: 2
+    queue-type: disruptor
     print-execution-info-interval: 10
     slot-service:
       dynamic-slot: true
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index 0037d0df8..f4d1d066c 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -18,9 +18,11 @@
 package org.apache.seatunnel.engine.common.config;
 
 import static com.hazelcast.internal.util.Preconditions.checkBackupCount;
+import static com.hazelcast.internal.util.Preconditions.checkNotNull;
 import static com.hazelcast.internal.util.Preconditions.checkPositive;
 
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.config.server.QueueType;
 import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
 import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
 
@@ -38,6 +40,8 @@ public class EngineConfig {
 
     private CheckpointConfig checkpointConfig = 
ServerConfigOptions.CHECKPOINT.defaultValue();
 
+    private QueueType queueType = 
ServerConfigOptions.QUEUE_TYPE.defaultValue();
+
     public void setBackupCount(int newBackupCount) {
         checkBackupCount(newBackupCount, 0);
         this.backupCount = newBackupCount;
@@ -52,4 +56,9 @@ public class EngineConfig {
         checkPositive(printExecutionInfoInterval, 
ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL + " must be > 0");
         this.printJobMetricsInfoInterval = printJobMetricsInfoInterval;
     }
+
+    public void setQueueType(QueueType queueType) {
+        checkNotNull(queueType);
+        this.queueType = queueType;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index 229d40f1c..346b377e7 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -24,6 +24,7 @@ import static 
com.hazelcast.internal.config.DomConfigHelper.getIntegerValue;
 
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import 
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
+import org.apache.seatunnel.engine.common.config.server.QueueType;
 import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
 import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
 
@@ -34,6 +35,7 @@ import com.hazelcast.logging.Logger;
 import org.w3c.dom.Node;
 
 import java.util.HashMap;
+import java.util.Locale;
 import java.util.Map;
 
 public class YamlSeaTunnelDomConfigProcessor extends 
AbstractDomConfigProcessor {
@@ -95,6 +97,8 @@ public class YamlSeaTunnelDomConfigProcessor extends 
AbstractDomConfigProcessor
                 engineConfig.setBackupCount(
                     getIntegerValue(ServerConfigOptions.BACKUP_COUNT.key(), 
getTextContent(node))
                 );
+            } else if (ServerConfigOptions.QUEUE_TYPE.key().equals(name)) {
+                
engineConfig.setQueueType(QueueType.valueOf(getTextContent(node).toUpperCase(Locale.ROOT)));
             } else if 
(ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key().equals(name)) {
                 
engineConfig.setPrintExecutionInfoInterval(getIntegerValue(ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key(),
                     getTextContent(node)));
@@ -159,6 +163,7 @@ public class YamlSeaTunnelDomConfigProcessor extends 
AbstractDomConfigProcessor
 
     /**
      * Parse checkpoint plugin config.
+     *
      * @param checkpointPluginConfigNode checkpoint plugin config node
      * @return checkpoint plugin config
      */
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/QueueType.java
similarity index 66%
copy from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
copy to 
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/QueueType.java
index 1fe583b2f..2a6ca85c5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/QueueType.java
@@ -15,18 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.task.flow;
-
-import org.apache.seatunnel.api.transform.Collector;
-
-/**
- * A processing component that sends a piece of data from within the engine to 
other components at a time
- *
- * @see OneInputFlowLifeCycle
- * @see SourceFlowLifeCycle
- */
-public interface OneOutputFlowLifeCycle<T> extends FlowLifeCycle {
-
-    void collect(Collector<T> collector) throws Exception;
+package org.apache.seatunnel.engine.common.config.server;
 
+public enum QueueType {
+    DISRUPTOR, BLOCKINGQUEUE
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 5706f5341..61c1e5073 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -49,6 +49,9 @@ public class ServerConfigOptions {
 
     public static final Option<Integer> CHECKPOINT_STORAGE_MAX_RETAINED = 
Options.key("max-retained").intType().defaultValue(1).withDescription("The 
maximum number of retained checkpoints.");
 
+    public static final Option<QueueType> QUEUE_TYPE = 
Options.key("queue-type").type(new TypeReference<QueueType>() {
+    }).defaultValue(QueueType.BLOCKINGQUEUE).withDescription("The internal 
data cache queue type.");
+
     public static final Option<CheckpointStorageConfig> CHECKPOINT_STORAGE = 
Options.key("storage").type(new TypeReference<CheckpointStorageConfig>() {
     }).defaultValue(new CheckpointStorageConfig()).withDescription("The 
checkpoint storage configuration.");
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index cbe3f6006..0f65aea28 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -123,6 +123,9 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
     }
 
     public TaskGroupContext getExecutionContext(TaskGroupLocation 
taskGroupLocation) {
+        if (executionContexts.get(taskGroupLocation) == null) {
+            return finishedExecutionContexts.get(taskGroupLocation);
+        }
         return executionContexts.get(taskGroupLocation);
     }
 
@@ -257,7 +260,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
-    private void notifyTaskStatusToMaster(TaskGroupLocation taskGroupLocation, 
TaskExecutionState taskExecutionState){
+    private void notifyTaskStatusToMaster(TaskGroupLocation taskGroupLocation, 
TaskExecutionState taskExecutionState) {
         long sleepTime = 1000;
         boolean notifyStateSuccess = false;
         while (isRunning && !notifyStateSuccess) {
@@ -303,7 +306,7 @@ public class TaskExecutionService implements 
DynamicMetricsProvider {
 
     }
 
-    public void notifyCleanTaskGroupContext(TaskGroupLocation 
taskGroupLocation){
+    public void notifyCleanTaskGroupContext(TaskGroupLocation 
taskGroupLocation) {
         finishedExecutionContexts.remove(taskGroupLocation);
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 2169e319f..1d6c751f6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -17,7 +17,10 @@
 
 package org.apache.seatunnel.engine.server.dag.physical;
 
+import static 
org.apache.seatunnel.engine.common.config.server.QueueType.BLOCKINGQUEUE;
+
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.engine.common.config.server.QueueType;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -49,7 +52,8 @@ import 
org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask;
 import org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
 import org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask;
-import 
org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateQueue;
+import 
org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateBlockingQueue;
+import 
org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateDisruptor;
 
 import com.google.common.collect.Lists;
 import com.hazelcast.flakeidgen.FlakeIdGenerator;
@@ -121,6 +125,8 @@ public class PhysicalPlanGenerator {
 
     private final IMap<Object, Object> runningJobStateTimestampsIMap;
 
+    private final QueueType queueType;
+
     public PhysicalPlanGenerator(@NonNull ExecutionPlan executionPlan,
                                  @NonNull NodeEngine nodeEngine,
                                  @NonNull JobImmutableInformation 
jobImmutableInformation,
@@ -128,7 +134,8 @@ public class PhysicalPlanGenerator {
                                  @NonNull ExecutorService executorService,
                                  @NonNull FlakeIdGenerator flakeIdGenerator,
                                  @NonNull IMap runningJobStateIMap,
-                                 @NonNull IMap runningJobStateTimestampsIMap) {
+                                 @NonNull IMap runningJobStateTimestampsIMap,
+                                 @NonNull QueueType queueType) {
         this.pipelines = executionPlan.getPipelines();
         this.nodeEngine = nodeEngine;
         this.jobImmutableInformation = jobImmutableInformation;
@@ -141,6 +148,7 @@ public class PhysicalPlanGenerator {
         this.subtaskActions = new HashMap<>();
         this.runningJobStateIMap = runningJobStateIMap;
         this.runningJobStateTimestampsIMap = runningJobStateTimestampsIMap;
+        this.queueType = queueType;
     }
 
     public Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> generate() {
@@ -378,13 +386,21 @@ public class PhysicalPlanGenerator {
 
                     if 
(taskList.stream().anyMatch(TransformSeaTunnelTask.class::isInstance)) {
                         // contains IntermediateExecutionFlow in task group
+                        TaskGroupDefaultImpl taskGroup;
+                        if (queueType.equals(BLOCKINGQUEUE)) {
+                            taskGroup = new 
TaskGroupWithIntermediateBlockingQueue(taskGroupLocation, 
flow.getAction().getName() +
+                                "-SourceTask",
+                                taskList.stream().map(task -> (Task) 
task).collect(Collectors.toList()));
+                        } else {
+                            taskGroup = new 
TaskGroupWithIntermediateDisruptor(taskGroupLocation, 
flow.getAction().getName() +
+                                "-SourceTask",
+                                taskList.stream().map(task -> (Task) 
task).collect(Collectors.toList()));
+                        }
                         t.add(new PhysicalVertex(
                             i,
                             executorService,
                             flow.getAction().getParallelism(),
-                            new 
TaskGroupWithIntermediateQueue(taskGroupLocation, flow.getAction().getName() +
-                                "-SourceTask",
-                                taskList.stream().map(task -> (Task) 
task).collect(Collectors.toList())),
+                            taskGroup,
                             flakeIdGenerator,
                             pipelineIndex,
                             totalPipelineNum,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
index 976ba3daf..efced807a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PlanUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.dag.physical;
 
+import org.apache.seatunnel.engine.common.config.server.QueueType;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointPlan;
@@ -34,14 +35,15 @@ import java.util.concurrent.ExecutorService;
 public class PlanUtils {
 
     public static Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> 
fromLogicalDAG(@NonNull LogicalDag logicalDag,
-                                                                      @NonNull 
NodeEngine nodeEngine,
-                                                                      @NonNull
-                                                                      
JobImmutableInformation jobImmutableInformation,
-                                                                      long 
initializationTimestamp,
-                                                                      @NonNull 
ExecutorService executorService,
-                                                                      @NonNull 
FlakeIdGenerator flakeIdGenerator,
-                                                                      @NonNull 
IMap runningJobStateIMap,
-                                                                      @NonNull 
IMap runningJobStateTimestampsIMap) {
+                                                                               
     @NonNull NodeEngine nodeEngine,
+                                                                               
     @NonNull
+                                                                               
         JobImmutableInformation jobImmutableInformation,
+                                                                               
     long initializationTimestamp,
+                                                                               
     @NonNull ExecutorService executorService,
+                                                                               
     @NonNull FlakeIdGenerator flakeIdGenerator,
+                                                                               
     @NonNull IMap runningJobStateIMap,
+                                                                               
     @NonNull IMap runningJobStateTimestampsIMap,
+                                                                               
     @NonNull QueueType queueType) {
         return new PhysicalPlanGenerator(
             new ExecutionPlanGenerator(logicalDag, 
jobImmutableInformation).generate(),
             nodeEngine,
@@ -50,6 +52,7 @@ public class PlanUtils {
             executorService,
             flakeIdGenerator,
             runningJobStateIMap,
-            runningJobStateTimestampsIMap).generate();
+            runningJobStateTimestampsIMap,
+            queueType).generate();
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index d8e40366f..f8008fd5b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -169,7 +169,8 @@ public class JobMaster {
             executorService,
             flakeIdGenerator,
             runningJobStateIMap,
-            runningJobStateTimestampsIMap);
+            runningJobStateTimestampsIMap,
+            engineConfig.getQueueType());
         this.physicalPlan = planTuple.f0();
         this.physicalPlan.setJobMaster(this);
         this.checkpointManager = new CheckpointManager(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index b8c944e59..cf8ac987e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -60,7 +60,7 @@ import 
org.apache.seatunnel.engine.server.task.flow.PartitionTransformSourceFlow
 import org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle;
 import org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle;
 import org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle;
-import 
org.apache.seatunnel.engine.server.task.group.TaskGroupWithIntermediateQueue;
+import 
org.apache.seatunnel.engine.server.task.group.AbstractTaskGroupWithIntermediateQueue;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 
@@ -226,8 +226,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
             IntermediateQueueConfig config =
                 ((IntermediateExecutionFlow<IntermediateQueueConfig>) 
flow).getConfig();
             lifeCycle = new IntermediateQueueFlowLifeCycle(this, 
completableFuture,
-                ((TaskGroupWithIntermediateQueue) taskBelongGroup)
-                    .getBlockingQueueCache(config.getQueueID()));
+                ((AbstractTaskGroupWithIntermediateQueue) taskBelongGroup)
+                    .getQueueCache(config.getQueueID()));
             outputs = flowLifeCycles;
         } else {
             throw new UnknownFlowException(flow);
@@ -305,7 +305,7 @@ public abstract class SeaTunnelTask extends AbstractTask {
         tryClose(checkpointId);
     }
 
-    public void 
notifyAllAction(ConsumerWithException<InternalCheckpointListener> consumer){
+    public void 
notifyAllAction(ConsumerWithException<InternalCheckpointListener> consumer) {
         allCycles.stream()
             .filter(cycle -> cycle instanceof InternalCheckpointListener)
             .map(cycle -> (InternalCheckpointListener) cycle)
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
index 716de8a16..606374049 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
@@ -19,15 +19,21 @@ package org.apache.seatunnel.engine.server.task.flow;
 
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 
+import lombok.Getter;
+import lombok.Setter;
+
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 
 public class AbstractFlowLifeCycle implements FlowLifeCycle {
 
+    @Getter
     protected final SeaTunnelTask runningTask;
 
     protected final CompletableFuture<Void> completableFuture;
 
+    @Getter
+    @Setter
     protected Boolean prepareClose;
 
     public AbstractFlowLifeCycle(SeaTunnelTask runningTask,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
index 3bae93e41..141d5ca0f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
@@ -19,62 +19,40 @@ package org.apache.seatunnel.engine.server.task.flow;
 
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.transform.Collector;
-import org.apache.seatunnel.common.utils.function.ConsumerWithException;
-import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
-import org.apache.seatunnel.engine.server.task.record.Barrier;
+import 
org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
 
-import java.util.concurrent.BlockingQueue;
+import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 
-public class IntermediateQueueFlowLifeCycle extends AbstractFlowLifeCycle 
implements OneInputFlowLifeCycle<Record<?>>,
-        OneOutputFlowLifeCycle<Record<?>> {
+public class IntermediateQueueFlowLifeCycle<T extends 
AbstractIntermediateQueue<?>> extends AbstractFlowLifeCycle implements 
OneInputFlowLifeCycle<Record<?>>,
+    OneOutputFlowLifeCycle<Record<?>> {
 
-    private final BlockingQueue<Record<?>> queue;
+    private final AbstractIntermediateQueue<?> queue;
 
     public IntermediateQueueFlowLifeCycle(SeaTunnelTask runningTask,
                                           CompletableFuture<Void> 
completableFuture,
-                                          BlockingQueue<Record<?>> queue) {
+                                          AbstractIntermediateQueue<?> queue) {
         super(runningTask, completableFuture);
         this.queue = queue;
+        queue.setIntermediateQueueFlowLifeCycle(this);
+        queue.setRunningTask(runningTask);
     }
 
     @Override
     public void received(Record<?> record) {
-        try {
-            handleRecord(record, queue::put);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
+        queue.received(record);
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
     @Override
     public void collect(Collector<Record<?>> collector) throws Exception {
-        while (true) {
-            Record<?> record = queue.poll(100, TimeUnit.MILLISECONDS);
-            if (record != null) {
-                handleRecord(record, collector::collect);
-            } else {
-                break;
-            }
-        }
+        queue.collect(collector);
     }
 
-    private void handleRecord(Record<?> record, 
ConsumerWithException<Record<?>> consumer) throws Exception {
-        if (record.getData() instanceof Barrier) {
-            CheckpointBarrier barrier = (CheckpointBarrier) record.getData();
-            runningTask.ack(barrier);
-            if (barrier.prepareClose()) {
-                prepareClose = true;
-            }
-            consumer.accept(record);
-        } else {
-            if (prepareClose) {
-                return;
-            }
-            consumer.accept(record);
-        }
+    @Override
+    public void close() throws IOException {
+        queue.close();
+        super.close();
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
index 1fe583b2f..dffa6ab97 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
@@ -28,5 +28,4 @@ import org.apache.seatunnel.api.transform.Collector;
 public interface OneOutputFlowLifeCycle<T> extends FlowLifeCycle {
 
     void collect(Collector<T> collector) throws Exception;
-
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSourceFlowLifeCycle.java
index 522f331d0..d30396f8f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/PartitionTransformSourceFlowLifeCycle.java
@@ -38,6 +38,7 @@ public class PartitionTransformSourceFlowLifeCycle<T> extends 
AbstractFlowLifeCy
     private long currentCheckpointId = Long.MAX_VALUE;
 
     private int alignedBarriersCounter = 0;
+
     public PartitionTransformSourceFlowLifeCycle(SeaTunnelTask runningTask, 
CompletableFuture<Void> completableFuture) {
         super(runningTask, completableFuture);
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index b9c898425..4c60aa58c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -90,7 +90,7 @@ public class SourceFlowLifeCycle<T, SplitT extends 
SourceSplit> extends ActionFl
     public void init() throws Exception {
         this.splitSerializer = sourceAction.getSource().getSplitSerializer();
         this.reader = sourceAction.getSource()
-                .createReader(new SourceReaderContext(indexID, 
sourceAction.getSource().getBoundedness(), this));
+            .createReader(new SourceReaderContext(indexID, 
sourceAction.getSource().getBoundedness(), this));
         this.enumeratorTaskAddress = getEnumeratorTaskAddress();
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/AbstractTaskGroupWithIntermediateQueue.java
similarity index 52%
copy from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
copy to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/AbstractTaskGroupWithIntermediateQueue.java
index 1fe583b2f..d520b8c74 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/AbstractTaskGroupWithIntermediateQueue.java
@@ -15,18 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.task.flow;
+package org.apache.seatunnel.engine.server.task.group;
 
-import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import 
org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
 
-/**
- * A processing component that sends a piece of data from within the engine to 
other components at a time
- *
- * @see OneInputFlowLifeCycle
- * @see SourceFlowLifeCycle
- */
-public interface OneOutputFlowLifeCycle<T> extends FlowLifeCycle {
+import java.util.Collection;
+
+public abstract class AbstractTaskGroupWithIntermediateQueue extends 
TaskGroupDefaultImpl {
+    public AbstractTaskGroupWithIntermediateQueue(TaskGroupLocation 
taskGroupLocation, String taskGroupName, Collection<Task> tasks) {
+        super(taskGroupLocation, taskGroupName, tasks);
+    }
 
-    void collect(Collector<T> collector) throws Exception;
+    public abstract AbstractIntermediateQueue<?> getQueueCache(long id);
 
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
similarity index 73%
rename from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
rename to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
index bcb0786be..478a61c1b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateQueue.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateBlockingQueue.java
@@ -19,9 +19,10 @@ package org.apache.seatunnel.engine.server.task.group;
 
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.engine.server.execution.Task;
-import org.apache.seatunnel.engine.server.execution.TaskGroupDefaultImpl;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
+import 
org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
+import 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue;
 
 import java.util.Collection;
 import java.util.Map;
@@ -29,11 +30,11 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class TaskGroupWithIntermediateQueue extends TaskGroupDefaultImpl {
+public class TaskGroupWithIntermediateBlockingQueue extends 
AbstractTaskGroupWithIntermediateQueue {
 
     public static final int QUEUE_SIZE = 100000;
 
-    public TaskGroupWithIntermediateQueue(TaskGroupLocation taskGroupLocation, 
String taskGroupName, Collection<Task> tasks) {
+    public TaskGroupWithIntermediateBlockingQueue(TaskGroupLocation 
taskGroupLocation, String taskGroupName, Collection<Task> tasks) {
         super(taskGroupLocation, taskGroupName, tasks);
     }
 
@@ -43,12 +44,13 @@ public class TaskGroupWithIntermediateQueue extends 
TaskGroupDefaultImpl {
     public void init() {
         blockingQueueCache = new ConcurrentHashMap<>();
         getTasks().stream().filter(SeaTunnelTask.class::isInstance)
-                .map(s -> (SeaTunnelTask) s).forEach(s -> 
s.setTaskGroup(this));
+            .map(s -> (SeaTunnelTask) s).forEach(s -> s.setTaskGroup(this));
     }
 
-    public BlockingQueue<Record<?>> getBlockingQueueCache(long id) {
+    @Override
+    public AbstractIntermediateQueue<?> getQueueCache(long id) {
         blockingQueueCache.computeIfAbsent(id, i -> new 
ArrayBlockingQueue<>(QUEUE_SIZE));
-        return blockingQueueCache.get(id);
+        return new IntermediateBlockingQueue(blockingQueueCache.get(id));
     }
 
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
new file mode 100644
index 000000000..0a6bfce1f
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/TaskGroupWithIntermediateDisruptor.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.group;
+
+import org.apache.seatunnel.engine.server.execution.Task;
+import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
+import 
org.apache.seatunnel.engine.server.task.group.queue.AbstractIntermediateQueue;
+import 
org.apache.seatunnel.engine.server.task.group.queue.IntermediateDisruptor;
+import 
org.apache.seatunnel.engine.server.task.group.queue.disruptor.RecordEvent;
+import 
org.apache.seatunnel.engine.server.task.group.queue.disruptor.RecordEventFactory;
+
+import com.lmax.disruptor.EventFactory;
+import com.lmax.disruptor.YieldingWaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+import com.lmax.disruptor.util.DaemonThreadFactory;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class TaskGroupWithIntermediateDisruptor extends 
AbstractTaskGroupWithIntermediateQueue {
+
+    public static final int RING_BUFFER_SIZE = 1024;
+
+    public TaskGroupWithIntermediateDisruptor(TaskGroupLocation 
taskGroupLocation, String taskGroupName, Collection<Task> tasks) {
+        super(taskGroupLocation, taskGroupName, tasks);
+    }
+
+    private Map<Long, Disruptor<RecordEvent>> disruptor = null;
+
+    @Override
+    public void init() {
+        disruptor = new ConcurrentHashMap<>();
+        getTasks().stream().filter(SeaTunnelTask.class::isInstance)
+            .map(s -> (SeaTunnelTask) s).forEach(s -> s.setTaskGroup(this));
+    }
+
+    @Override
+    public AbstractIntermediateQueue<?> getQueueCache(long id) {
+        EventFactory<RecordEvent> eventFactory = new RecordEventFactory();
+        Disruptor<RecordEvent> disruptor = new Disruptor<>(eventFactory, 
RING_BUFFER_SIZE, DaemonThreadFactory.INSTANCE,
+            ProducerType.SINGLE, new YieldingWaitStrategy());
+
+        this.disruptor.putIfAbsent(id, disruptor);
+        return new IntermediateDisruptor(this.disruptor.get(id));
+    }
+
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/AbstractIntermediateQueue.java
similarity index 51%
copy from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
copy to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/AbstractIntermediateQueue.java
index 716de8a16..f8598fa84 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/AbstractFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/AbstractIntermediateQueue.java
@@ -15,30 +15,42 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.task.flow;
+package org.apache.seatunnel.engine.server.task.group.queue;
 
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.transform.Collector;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
+import 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle;
+
+import lombok.Getter;
+import lombok.Setter;
 
 import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
 
-public class AbstractFlowLifeCycle implements FlowLifeCycle {
+public abstract class AbstractIntermediateQueue<T> {
 
-    protected final SeaTunnelTask runningTask;
+    @Getter
+    @Setter
+    private SeaTunnelTask runningTask;
 
-    protected final CompletableFuture<Void> completableFuture;
+    @Getter
+    @Setter
+    private IntermediateQueueFlowLifeCycle<?> intermediateQueueFlowLifeCycle;
 
-    protected Boolean prepareClose;
+    private final T queue;
 
-    public AbstractFlowLifeCycle(SeaTunnelTask runningTask,
-                                 CompletableFuture<Void> completableFuture) {
-        this.runningTask = runningTask;
-        this.completableFuture = completableFuture;
-        this.prepareClose = false;
+    public AbstractIntermediateQueue(T queue) {
+        this.queue = queue;
     }
 
-    @Override
-    public void close() throws IOException {
-        completableFuture.complete(null);
+    public T getIntermediateQueue() {
+        return queue;
     }
+
+    public abstract void received(Record<?> record);
+
+    public abstract void collect(Collector<Record<?>> collector) throws 
Exception;
+
+    public abstract void close() throws IOException;
+
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
similarity index 70%
copy from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
copy to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
index 3bae93e41..6699a1a20 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/IntermediateQueueFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateBlockingQueue.java
@@ -15,35 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.task.flow;
+package org.apache.seatunnel.engine.server.task.group.queue;
 
 import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.transform.Collector;
 import org.apache.seatunnel.common.utils.function.ConsumerWithException;
 import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
-import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 
+import java.io.IOException;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-public class IntermediateQueueFlowLifeCycle extends AbstractFlowLifeCycle 
implements OneInputFlowLifeCycle<Record<?>>,
-        OneOutputFlowLifeCycle<Record<?>> {
+public class IntermediateBlockingQueue extends 
AbstractIntermediateQueue<BlockingQueue<Record<?>>> {
 
-    private final BlockingQueue<Record<?>> queue;
-
-    public IntermediateQueueFlowLifeCycle(SeaTunnelTask runningTask,
-                                          CompletableFuture<Void> 
completableFuture,
-                                          BlockingQueue<Record<?>> queue) {
-        super(runningTask, completableFuture);
-        this.queue = queue;
+    public IntermediateBlockingQueue(BlockingQueue<Record<?>> queue) {
+        super(queue);
     }
 
     @Override
     public void received(Record<?> record) {
         try {
-            handleRecord(record, queue::put);
+            handleRecord(record, getIntermediateQueue()::put);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -53,7 +46,7 @@ public class IntermediateQueueFlowLifeCycle extends 
AbstractFlowLifeCycle implem
     @Override
     public void collect(Collector<Record<?>> collector) throws Exception {
         while (true) {
-            Record<?> record = queue.poll(100, TimeUnit.MILLISECONDS);
+            Record<?> record = getIntermediateQueue().poll(100, 
TimeUnit.MILLISECONDS);
             if (record != null) {
                 handleRecord(record, collector::collect);
             } else {
@@ -62,16 +55,21 @@ public class IntermediateQueueFlowLifeCycle extends 
AbstractFlowLifeCycle implem
         }
     }
 
+    @Override
+    public void close() throws IOException {
+        //nothing
+    }
+
     private void handleRecord(Record<?> record, 
ConsumerWithException<Record<?>> consumer) throws Exception {
         if (record.getData() instanceof Barrier) {
             CheckpointBarrier barrier = (CheckpointBarrier) record.getData();
-            runningTask.ack(barrier);
+            getRunningTask().ack(barrier);
             if (barrier.prepareClose()) {
-                prepareClose = true;
+                getIntermediateQueueFlowLifeCycle().setPrepareClose(true);
             }
             consumer.accept(record);
         } else {
-            if (prepareClose) {
+            if (getIntermediateQueueFlowLifeCycle().getPrepareClose()) {
                 return;
             }
             consumer.accept(record);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateDisruptor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateDisruptor.java
new file mode 100644
index 000000000..c163c55ed
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/IntermediateDisruptor.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.group.queue;
+
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.transform.Collector;
+import 
org.apache.seatunnel.engine.server.task.group.queue.disruptor.RecordEvent;
+import 
org.apache.seatunnel.engine.server.task.group.queue.disruptor.RecordEventHandler;
+import 
org.apache.seatunnel.engine.server.task.group.queue.disruptor.RecordEventProducer;
+
+import com.lmax.disruptor.dsl.Disruptor;
+
+import java.io.IOException;
+
+public class IntermediateDisruptor extends 
AbstractIntermediateQueue<Disruptor<RecordEvent>> {
+
+    public IntermediateDisruptor(Disruptor<RecordEvent> queue) {
+        super(queue);
+    }
+
+    private volatile boolean isExecuted;
+
+    @Override
+    public void received(Record<?> record) {
+        getIntermediateQueue().getRingBuffer();
+        RecordEventProducer.onData(record, 
getIntermediateQueue().getRingBuffer(), getIntermediateQueueFlowLifeCycle());
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @Override
+    public void collect(Collector<Record<?>> collector) throws Exception {
+        if (!isExecuted) {
+            getIntermediateQueue().handleEventsWith(new 
RecordEventHandler(getRunningTask(), collector, 
getIntermediateQueueFlowLifeCycle()));
+            getIntermediateQueue().start();
+            isExecuted = true;
+        } else {
+            Thread.sleep(100);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        getIntermediateQueue().shutdown();
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEvent.java
similarity index 66%
copy from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
copy to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEvent.java
index 1fe583b2f..07c7afb38 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEvent.java
@@ -15,18 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.task.flow;
+package org.apache.seatunnel.engine.server.task.group.queue.disruptor;
 
-import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.api.table.type.Record;
 
-/**
- * A processing component that sends a piece of data from within the engine to 
other components at a time
- *
- * @see OneInputFlowLifeCycle
- * @see SourceFlowLifeCycle
- */
-public interface OneOutputFlowLifeCycle<T> extends FlowLifeCycle {
-
-    void collect(Collector<T> collector) throws Exception;
+import lombok.Data;
 
+@Data
+public class RecordEvent {
+    private Record<?> record;
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventFactory.java
similarity index 66%
copy from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
copy to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventFactory.java
index 1fe583b2f..be861d62f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/OneOutputFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventFactory.java
@@ -15,18 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.engine.server.task.flow;
+package org.apache.seatunnel.engine.server.task.group.queue.disruptor;
 
-import org.apache.seatunnel.api.transform.Collector;
-
-/**
- * A processing component that sends a piece of data from within the engine to 
other components at a time
- *
- * @see OneInputFlowLifeCycle
- * @see SourceFlowLifeCycle
- */
-public interface OneOutputFlowLifeCycle<T> extends FlowLifeCycle {
-
-    void collect(Collector<T> collector) throws Exception;
+import com.lmax.disruptor.EventFactory;
 
+public class RecordEventFactory implements EventFactory<RecordEvent> {
+    @Override
+    public RecordEvent newInstance() {
+        return new RecordEvent();
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java
new file mode 100644
index 000000000..ad64fd595
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.group.queue.disruptor;
+
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.transform.Collector;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
+import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
+import 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle;
+import org.apache.seatunnel.engine.server.task.record.Barrier;
+
+import com.lmax.disruptor.EventHandler;
+
+public class RecordEventHandler implements EventHandler<RecordEvent> {
+
+    private final SeaTunnelTask runningTask;
+
+    private final Collector<Record<?>> collector;
+
+    private final IntermediateQueueFlowLifeCycle 
intermediateQueueFlowLifeCycle;
+
+    public RecordEventHandler(SeaTunnelTask runningTask, Collector<Record<?>> 
collector, IntermediateQueueFlowLifeCycle intermediateQueueFlowLifeCycle) {
+        this.runningTask = runningTask;
+        this.collector = collector;
+        this.intermediateQueueFlowLifeCycle = intermediateQueueFlowLifeCycle;
+    }
+
+    @Override
+    public void onEvent(RecordEvent recordEvent, long sequence, boolean 
endOfBatch) throws Exception {
+        handleRecord(recordEvent.getRecord(), collector);
+    }
+
+    private void handleRecord(Record<?> record, Collector<Record<?>> 
collector) throws Exception {
+        if (record != null) {
+            if (record.getData() instanceof Barrier) {
+                CheckpointBarrier barrier = (CheckpointBarrier) 
record.getData();
+                runningTask.ack(barrier);
+                if (barrier.prepareClose()) {
+                    this.intermediateQueueFlowLifeCycle.setPrepareClose(true);
+                }
+            } else {
+                if (this.intermediateQueueFlowLifeCycle.getPrepareClose()) {
+                    return;
+                }
+            }
+            collector.collect(record);
+        }
+    }
+
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java
new file mode 100644
index 000000000..7862fb5c2
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/group/queue/disruptor/RecordEventProducer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.group.queue.disruptor;
+
+import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
+import 
org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle;
+import org.apache.seatunnel.engine.server.task.record.Barrier;
+
+import com.lmax.disruptor.RingBuffer;
+
+public class RecordEventProducer {
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    public static void onData(Record<?> record, RingBuffer<RecordEvent> 
ringBuffer, IntermediateQueueFlowLifeCycle intermediateQueueFlowLifeCycle) {
+
+        if (record.getData() instanceof Barrier) {
+            CheckpointBarrier barrier = (CheckpointBarrier) record.getData();
+            intermediateQueueFlowLifeCycle.getRunningTask().ack(barrier);
+            if (barrier.prepareClose()) {
+                intermediateQueueFlowLifeCycle.setPrepareClose(true);
+            }
+        } else {
+            if (intermediateQueueFlowLifeCycle.getPrepareClose()) {
+                return;
+            }
+        }
+
+        long sequence = ringBuffer.next();
+        try {
+            RecordEvent recordEvent = ringBuffer.get(sequence);
+            recordEvent.setRecord(record);
+        } finally {
+            ringBuffer.publish(sequence);
+        }
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
index d7c23b6e7..77da5b8f5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointPlanTest.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
 import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.server.QueueType;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
@@ -67,7 +68,8 @@ public class CheckpointPlanTest extends 
AbstractSeaTunnelServerTest {
             Executors.newCachedThreadPool(),
             instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
             runningJobState,
-            runningJobStateTimestamp).f1();
+            runningJobStateTimestamp,
+            QueueType.BLOCKINGQUEUE).f1();
         Assertions.assertNotNull(checkpointPlans);
         Assertions.assertEquals(2, checkpointPlans.size());
         // enum(1) + reader(2) + writer(2)
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
index d12c2dac3..7bec42e2b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/dag/TaskTest.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink;
 import org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.server.QueueType;
 import org.apache.seatunnel.engine.common.utils.IdGenerator;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.dag.actions.Action;
@@ -108,7 +109,8 @@ public class TaskTest extends AbstractSeaTunnelServerTest {
             Executors.newCachedThreadPool(),
             instance.getFlakeIdGenerator(Constant.SEATUNNEL_ID_GENERATOR_NAME),
             runningJobState,
-            runningJobStateTimestamp).f0();
+            runningJobStateTimestamp,
+            QueueType.BLOCKINGQUEUE).f0();
 
         Assertions.assertEquals(physicalPlan.getPipelineList().size(), 1);
         
Assertions.assertEquals(physicalPlan.getPipelineList().get(0).getCoordinatorVertexList().size(),
 1);

Reply via email to