This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b0dfcd1736 Pipe: Enable realtime first strategy in data transfer 
(#12559)
3b0dfcd1736 is described below

commit 3b0dfcd17365dac3fb244bf4e10a9664d204636a
Author: Caideyipi <[email protected]>
AuthorDate: Tue May 21 17:22:38 2024 +0800

    Pipe: Enable realtime first strategy in data transfer (#12559)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../pipe/it/autocreate/IoTDBPipeDataSinkIT.java    |  17 ++-
 .../event/common/heartbeat/PipeHeartbeatEvent.java |  34 +-----
 .../PipeRealtimeDataRegionHybridExtractor.java     |   9 +-
 .../PipeDataNodeRemainingEventAndTimeMetrics.java  |  12 --
 .../PipeDataNodeRemainingEventAndTimeOperator.java |  17 ---
 .../iotdb/db/pipe/metric/PipeProcessorMetrics.java |  39 ------
 .../pipe/task/connection/PipeEventCollector.java   |  96 ++-------------
 .../db/pipe/task/stage/PipeTaskConnectorStage.java |   4 +-
 .../db/pipe/task/stage/PipeTaskProcessorStage.java |   4 +-
 .../subtask/connector/PipeConnectorSubtask.java    |   6 +-
 .../connector/PipeConnectorSubtaskLifeCycle.java   |   8 +-
 .../connector/PipeConnectorSubtaskManager.java     |  20 +--
 .../PipeRealtimePriorityBlockingQueue.java         | 136 +++++++++++++++++++++
 .../subtask/processor/PipeProcessorSubtask.java    |  31 +----
 .../db/subscription/broker/SubscriptionBroker.java |   4 +-
 .../broker/SubscriptionPrefetchingQueue.java       |   6 +-
 .../task/stage/SubscriptionTaskConnectorStage.java |   4 +-
 .../task/subtask/SubscriptionConnectorSubtask.java |   6 +-
 .../SubscriptionConnectorSubtaskLifeCycle.java     |   4 +-
 .../SubscriptionConnectorSubtaskManager.java       |  20 +--
 .../PipeConnectorSubtaskExecutorTest.java          |   4 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  11 --
 .../iotdb/commons/conf/CommonDescriptor.java       |   7 --
 .../iotdb/commons/pipe/config/PipeConfig.java      |   5 -
 .../config/constant/PipeConnectorConstant.java     |   4 +
 25 files changed, 216 insertions(+), 292 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index 7e18ad9b876..d3bfe710aca 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -36,13 +36,14 @@ import org.junit.runner.RunWith;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2AutoCreateSchema.class})
 public class IoTDBPipeDataSinkIT extends AbstractPipeDualAutoIT {
   @Test
-  public void testThriftConnector() throws Exception {
+  public void testThriftConnectorWithRealtimeFirst() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
     final String receiverIp = receiverDataNode.getIp();
@@ -50,6 +51,15 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualAutoIT {
 
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+
+      // Do not fail if the failure has nothing to do with pipe
+      // Because the failures will randomly generate due to resource limitation
+      if (!TestUtils.tryExecuteNonQueriesWithRetry(
+          senderEnv,
+          Arrays.asList("insert into root.vehicle.d0(time, s1) values (0, 1)", 
"flush"))) {
+        return;
+      }
+
       final Map<String, String> extractorAttributes = new HashMap<>();
       final Map<String, String> processorAttributes = new HashMap<>();
       final Map<String, String> connectorAttributes = new HashMap<>();
@@ -60,6 +70,7 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualAutoIT {
       connectorAttributes.put("connector.batch.enable", "false");
       connectorAttributes.put("connector.ip", receiverIp);
       connectorAttributes.put("connector.port", 
Integer.toString(receiverPort));
+      connectorAttributes.put("connector.realtime-first", "true");
 
       final TSStatus status =
           client.createPipe(
@@ -76,7 +87,7 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualAutoIT {
       // Because the failures will randomly generate due to resource limitation
       if (!TestUtils.tryExecuteNonQueriesWithRetry(
           senderEnv,
-          Arrays.asList("insert into root.vehicle.d0(time, s1) values (0, 1)", 
"flush"))) {
+          Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)", 
"flush"))) {
         return;
       }
 
@@ -84,7 +95,7 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualAutoIT {
           receiverEnv,
           "select * from root.**",
           "Time,root.vehicle.d0.s1,",
-          Collections.singleton("0,1.0,"));
+          Collections.unmodifiableSet(new HashSet<>(Arrays.asList("0,1.0,", 
"1,1.0,"))));
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index c675ff5542c..c3d0798c7e9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -23,13 +23,11 @@ import 
org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.pattern.PipePattern;
-import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
 import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
 import org.apache.iotdb.db.pipe.metric.PipeHeartbeatEventMetrics;
-import org.apache.iotdb.db.pipe.task.connection.EnrichedDeque;
 import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -58,10 +56,6 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
   private int extractorQueueTsFileSize;
   private int extractorQueueSize;
 
-  private int bufferQueueTabletSize;
-  private int bufferQueueTsFileSize;
-  private int bufferQueueSize;
-
   private int connectorQueueTabletSize;
   private int connectorQueueTsFileSize;
   private int connectorQueueSize;
@@ -195,20 +189,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
     }
   }
 
-  public void recordBufferQueueSize(EnrichedDeque<Event> bufferQueue) {
-    if (shouldPrintMessage) {
-      bufferQueueTabletSize = bufferQueue.getTabletInsertionEventCount();
-      bufferQueueTsFileSize = bufferQueue.getTsFileInsertionEventCount();
-      bufferQueueSize = bufferQueue.size();
-    }
-
-    if (extractor instanceof PipeRealtimeDataRegionHybridExtractor) {
-      ((PipeRealtimeDataRegionHybridExtractor) extractor)
-          
.informProcessorEventCollectorQueueTsFileSize(bufferQueue.getTsFileInsertionEventCount());
-    }
-  }
-
-  public void recordConnectorQueueSize(BoundedBlockingPendingQueue<Event> 
pendingQueue) {
+  public void recordConnectorQueueSize(UnboundedBlockingPendingQueue<Event> 
pendingQueue) {
     if (shouldPrintMessage) {
       connectorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
       connectorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
@@ -271,13 +252,6 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
     final String extractorQueueSizeMessage =
         timeAssigned != 0 ? Integer.toString(extractorQueueSize) : 
unknownMessage;
 
-    final String bufferQueueTabletSizeMessage =
-        timeProcessed != 0 ? Integer.toString(bufferQueueTabletSize) : 
unknownMessage;
-    final String bufferQueueTsFileSizeMessage =
-        timeProcessed != 0 ? Integer.toString(bufferQueueTsFileSize) : 
unknownMessage;
-    final String bufferQueueSizeMessage =
-        timeProcessed != 0 ? Integer.toString(bufferQueueSize) : 
unknownMessage;
-
     final String connectorQueueTabletSizeMessage =
         timeProcessed != 0 ? Integer.toString(connectorQueueTabletSize) : 
unknownMessage;
     final String connectorQueueTsFileSizeMessage =
@@ -308,12 +282,6 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
         + extractorQueueTsFileSizeMessage
         + ", extractorQueueSize="
         + extractorQueueSizeMessage
-        + ", bufferQueueTabletSize="
-        + bufferQueueTabletSizeMessage
-        + ", bufferQueueTsFileSize="
-        + bufferQueueTsFileSizeMessage
-        + ", bufferQueueSize="
-        + bufferQueueSizeMessage
         + ", connectorQueueTabletSize="
         + connectorQueueTabletSizeMessage
         + ", connectorQueueTsFileSize="
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 210c5258120..381b8d73858 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -47,7 +47,6 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   private static final Logger LOGGER =
       LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
 
-  private final AtomicInteger processorEventCollectorQueueTsFileSize = new 
AtomicInteger(0);
   private final AtomicInteger connectorInputPendingQueueTsFileSize = new 
AtomicInteger(0);
 
   @Override
@@ -239,9 +238,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
   }
 
   private boolean isRealtimeTsFileEventCountExceededLimit() {
-    return pendingQueue.getTsFileInsertionEventCount()
-            + processorEventCollectorQueueTsFileSize.get()
-            + connectorInputPendingQueueTsFileSize.get()
+    return pendingQueue.getTsFileInsertionEventCount() + 
connectorInputPendingQueueTsFileSize.get()
         >= 
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
   }
 
@@ -250,10 +247,6 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
         >= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
   }
 
-  public void informProcessorEventCollectorQueueTsFileSize(final int 
queueSize) {
-    processorEventCollectorQueueTsFileSize.set(queueSize);
-  }
-
   public void informConnectorInputPendingQueueTsFileSize(final int queueSize) {
     connectorInputPendingQueueTsFileSize.set(queueSize);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
index 2bfe9106c01..5173d64e04f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
 import 
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
 import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
-import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
 import org.apache.iotdb.db.schemaengine.SchemaEngine;
 import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.metrics.AbstractMetricService;
@@ -128,17 +127,6 @@ public class PipeDataNodeRemainingEventAndTimeMetrics 
implements IMetricSet {
     }
   }
 
-  public void register(final PipeProcessorSubtask processorSubtask) {
-    // The metric is global thus the regionId is omitted
-    final String pipeID = processorSubtask.getPipeName() + "_" + 
processorSubtask.getCreationTime();
-    remainingEventAndTimeOperatorMap
-        .computeIfAbsent(pipeID, k -> new 
PipeDataNodeRemainingEventAndTimeOperator())
-        .register(processorSubtask);
-    if (Objects.nonNull(metricService)) {
-      createMetrics(pipeID);
-    }
-  }
-
   public void register(
       final PipeConnectorSubtask connectorSubtask, final String pipeName, 
final long creationTime) {
     // The metric is global thus the regionId is omitted
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
index ca311824a65..a4b2fd3e7b7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -72,10 +72,6 @@ class PipeDataNodeRemainingEventAndTimeOperator {
             .map(IoTDBDataRegionExtractor::getEventCount)
             .reduce(Integer::sum)
             .orElse(0)
-        + dataRegionProcessors.keySet().stream()
-            .map(PipeProcessorSubtask::getEventCount)
-            .reduce(Integer::sum)
-            .orElse(0)
         + dataRegionConnectors.keySet().stream()
             .map(connectorSubtask -> connectorSubtask.getEventCount(pipeName))
             .reduce(Integer::sum)
@@ -103,10 +99,6 @@ class PipeDataNodeRemainingEventAndTimeOperator {
                 .map(IoTDBDataRegionExtractor::getEventCount)
                 .reduce(Integer::sum)
                 .orElse(0)
-            + dataRegionProcessors.keySet().stream()
-                .map(PipeProcessorSubtask::getEventCount)
-                .reduce(Integer::sum)
-                .orElse(0)
             + dataRegionConnectors.keySet().stream()
                 .map(connectorSubtask -> 
connectorSubtask.getEventCount(pipeName))
                 .reduce(Integer::sum)
@@ -115,10 +107,6 @@ class PipeDataNodeRemainingEventAndTimeOperator {
                 .map(IoTDBDataRegionExtractor::getPipeHeartbeatEventCount)
                 .reduce(Integer::sum)
                 .orElse(0)
-            - dataRegionProcessors.keySet().stream()
-                .map(PipeProcessorSubtask::getPipeHeartbeatEventCount)
-                .reduce(Integer::sum)
-                .orElse(0)
             - dataRegionConnectors.keySet().stream()
                 .map(PipeConnectorSubtask::getPipeHeartbeatEventCount)
                 .reduce(Integer::sum)
@@ -174,11 +162,6 @@ class PipeDataNodeRemainingEventAndTimeOperator {
     dataRegionExtractors.put(extractor, extractor);
   }
 
-  void register(final PipeProcessorSubtask processorSubtask) {
-    setNameAndCreationTime(processorSubtask.getPipeName(), 
processorSubtask.getCreationTime());
-    dataRegionProcessors.put(processorSubtask, processorSubtask);
-  }
-
   void register(
       final PipeConnectorSubtask connectorSubtask, final String pipeName, 
final long creationTime) {
     setNameAndCreationTime(pipeName, creationTime);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
index 05a267c6bae..8cc2383c847 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
@@ -64,48 +64,9 @@ public class PipeProcessorMetrics implements IMetricSet {
   }
 
   private void createMetrics(String taskID) {
-    createAutoGauge(taskID);
     createRate(taskID);
   }
 
-  private void createAutoGauge(String taskID) {
-    PipeProcessorSubtask processor = processorMap.get(taskID);
-    // pending event count
-    metricService.createAutoGauge(
-        Metric.BUFFERED_TABLET_COUNT.toString(),
-        MetricLevel.IMPORTANT,
-        processor,
-        PipeProcessorSubtask::getTabletInsertionEventCount,
-        Tag.NAME.toString(),
-        processor.getPipeName(),
-        Tag.REGION.toString(),
-        String.valueOf(processor.getRegionId()),
-        Tag.CREATION_TIME.toString(),
-        String.valueOf(processor.getCreationTime()));
-    metricService.createAutoGauge(
-        Metric.BUFFERED_TSFILE_COUNT.toString(),
-        MetricLevel.IMPORTANT,
-        processor,
-        PipeProcessorSubtask::getTsFileInsertionEventCount,
-        Tag.NAME.toString(),
-        processor.getPipeName(),
-        Tag.REGION.toString(),
-        String.valueOf(processor.getRegionId()),
-        Tag.CREATION_TIME.toString(),
-        String.valueOf(processor.getCreationTime()));
-    metricService.createAutoGauge(
-        Metric.BUFFERED_HEARTBEAT_COUNT.toString(),
-        MetricLevel.IMPORTANT,
-        processor,
-        PipeProcessorSubtask::getPipeHeartbeatEventCount,
-        Tag.NAME.toString(),
-        processor.getPipeName(),
-        Tag.REGION.toString(),
-        String.valueOf(processor.getRegionId()),
-        Tag.CREATION_TIME.toString(),
-        String.valueOf(processor.getCreationTime()));
-  }
-
   private void createRate(String taskID) {
     PipeProcessorSubtask processor = processorMap.get(taskID);
     // process event rate
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 30a16f3c2a7..cdd73598571 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.task.connection;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
-import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -35,34 +35,27 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.LinkedList;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class PipeEventCollector implements EventCollector, AutoCloseable {
+public class PipeEventCollector implements EventCollector {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeEventCollector.class);
 
-  private final BoundedBlockingPendingQueue<Event> pendingQueue;
-
-  private final EnrichedDeque<Event> bufferQueue;
+  private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
   private final long creationTime;
 
   private final int regionId;
 
-  private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
   private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
 
   public PipeEventCollector(
-      final BoundedBlockingPendingQueue<Event> pendingQueue,
+      final UnboundedBlockingPendingQueue<Event> pendingQueue,
       final long creationTime,
       final int regionId) {
     this.pendingQueue = pendingQueue;
     this.creationTime = creationTime;
     this.regionId = regionId;
-    bufferQueue = new EnrichedDeque<>(new LinkedList<>());
   }
 
   @Override
@@ -128,7 +121,7 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
     }
   }
 
-  private synchronized void collectEvent(final Event event) {
+  private void collectEvent(final Event event) {
     collectInvocationCount.incrementAndGet();
 
     if (event instanceof EnrichedEvent) {
@@ -138,32 +131,12 @@ public class PipeEventCollector implements 
EventCollector, AutoCloseable {
       PipeEventCommitManager.getInstance()
           .enrichWithCommitterKeyAndCommitId((EnrichedEvent) event, 
creationTime, regionId);
     }
+
     if (event instanceof PipeHeartbeatEvent) {
-      ((PipeHeartbeatEvent) event).recordBufferQueueSize(bufferQueue);
       ((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
     }
 
-    while (!isClosed.get() && !bufferQueue.isEmpty()) {
-      final Event bufferedEvent = bufferQueue.peek();
-      // Try to put already buffered events into pending queue, if pending 
queue is full, wait for
-      // pending queue to be available with timeout.
-      if (pendingQueue.waitedOffer(bufferedEvent)) {
-        bufferQueue.poll();
-      } else {
-        // We can NOT keep too many PipeHeartbeatEvent in bufferQueue because 
they may cause OOM.
-        if (event instanceof PipeHeartbeatEvent
-            && bufferQueue.peekLast() instanceof PipeHeartbeatEvent) {
-          ((EnrichedEvent) 
event).decreaseReferenceCount(PipeEventCollector.class.getName(), false);
-        } else {
-          bufferQueue.offer(event);
-        }
-        return;
-      }
-    }
-
-    if (!pendingQueue.waitedOffer(event)) {
-      bufferQueue.offer(event);
-    }
+    pendingQueue.directOffer(event);
   }
 
   public void resetCollectInvocationCount() {
@@ -173,59 +146,4 @@ public class PipeEventCollector implements EventCollector, 
AutoCloseable {
   public boolean hasNoCollectInvocationAfterReset() {
     return collectInvocationCount.get() == 0;
   }
-
-  public boolean isBufferQueueEmpty() {
-    return bufferQueue.isEmpty();
-  }
-
-  /**
-   * Try to collect buffered events into {@link 
PipeEventCollector#pendingQueue}.
-   *
-   * @return {@code true} if there are still buffered events after this 
operation, {@code false}
-   *     otherwise.
-   */
-  public synchronized boolean tryCollectBufferedEvents() {
-    while (!isClosed.get() && !bufferQueue.isEmpty()) {
-      final Event bufferedEvent = bufferQueue.peek();
-      if (pendingQueue.waitedOffer(bufferedEvent)) {
-        bufferQueue.poll();
-      } else {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  public void close() {
-    isClosed.set(true);
-    doClose();
-  }
-
-  private synchronized void doClose() {
-    bufferQueue.forEach(
-        event -> {
-          if (event instanceof EnrichedEvent) {
-            ((EnrichedEvent) 
event).clearReferenceCount(PipeEventCollector.class.getName());
-          }
-        });
-    bufferQueue.clear();
-  }
-
-  //////////////////////////// APIs provided for metric framework 
////////////////////////////
-
-  public int getTabletInsertionEventCount() {
-    return bufferQueue.getTabletInsertionEventCount();
-  }
-
-  public int getTsFileInsertionEventCount() {
-    return bufferQueue.getTsFileInsertionEventCount();
-  }
-
-  public int getPipeHeartbeatEventCount() {
-    return bufferQueue.getPipeHeartbeatEventCount();
-  }
-
-  public int getEventCount() {
-    return bufferQueue.size();
-  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 001978ebe90..a5ec4fed78b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.task.stage;
 
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
-import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.pipe.task.stage.PipeTaskStage;
 import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
 import 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager;
@@ -83,7 +83,7 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
         .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
   }
 
-  public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
+  public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
     return 
PipeConnectorSubtaskManager.instance().getPipeConnectorPendingQueue(connectorSubtaskId);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index d88bfb0530c..c7ceaf1f978 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -25,7 +25,7 @@ import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeC
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.task.EventSupplier;
-import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.task.stage.PipeTaskStage;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
@@ -65,7 +65,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
       PipeParameters pipeProcessorParameters,
       int regionId,
       EventSupplier pipeExtractorInputEventSupplier,
-      BoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue,
+      UnboundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue,
       PipeProcessorSubtaskExecutor executor,
       PipeTaskMeta pipeTaskMeta) {
     final PipeProcessorRuntimeConfiguration runtimeConfiguration =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 300369ae9ed..22e87c7e5c8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.task.subtask.connector;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.pipe.task.subtask.PipeAbstractConnectorSubtask;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
@@ -50,7 +50,7 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConnectorSubtask.class);
 
   // For input
-  protected final BoundedBlockingPendingQueue<Event> inputPendingQueue;
+  protected final UnboundedBlockingPendingQueue<Event> inputPendingQueue;
 
   // Record these variables to provide corresponding value to tag key of 
monitoring metrics
   private final String attributeSortedString;
@@ -71,7 +71,7 @@ public class PipeConnectorSubtask extends 
PipeAbstractConnectorSubtask {
       final long creationTime,
       final String attributeSortedString,
       final int connectorIndex,
-      final BoundedBlockingPendingQueue<Event> inputPendingQueue,
+      final UnboundedBlockingPendingQueue<Event> inputPendingQueue,
       final PipeConnector outputPipeConnector) {
     super(taskID, creationTime, outputPipeConnector);
     this.attributeSortedString = attributeSortedString;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
index 8391f4c7c81..a28a0289b08 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.task.subtask.connector;
 
-import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -32,7 +32,7 @@ public class PipeConnectorSubtaskLifeCycle implements 
AutoCloseable {
 
   protected final PipeConnectorSubtaskExecutor executor;
   protected final PipeConnectorSubtask subtask;
-  private final BoundedBlockingPendingQueue<Event> pendingQueue;
+  private final UnboundedBlockingPendingQueue<Event> pendingQueue;
 
   private int runningTaskCount;
   private int registeredTaskCount;
@@ -40,7 +40,7 @@ public class PipeConnectorSubtaskLifeCycle implements 
AutoCloseable {
   public PipeConnectorSubtaskLifeCycle(
       PipeConnectorSubtaskExecutor executor,
       PipeConnectorSubtask subtask,
-      BoundedBlockingPendingQueue<Event> pendingQueue) {
+      UnboundedBlockingPendingQueue<Event> pendingQueue) {
     this.executor = executor;
     this.subtask = subtask;
     this.pendingQueue = pendingQueue;
@@ -53,7 +53,7 @@ public class PipeConnectorSubtaskLifeCycle implements 
AutoCloseable {
     return subtask;
   }
 
-  public BoundedBlockingPendingQueue<Event> getPendingQueue() {
+  public UnboundedBlockingPendingQueue<Event> getPendingQueue() {
     return pendingQueue;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index b28206bf306..d85f04b68ea 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -20,14 +20,13 @@
 package org.apache.iotdb.db.pipe.task.subtask.connector;
 
 import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
-import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
 import 
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
@@ -84,6 +83,7 @@ public class PipeConnectorSubtaskManager {
             .contains(new DataRegionId(environment.getRegionId()));
 
     final int connectorNum;
+    boolean realTimeFirst = false;
     String attributeSortedString = 
generateAttributeSortedString(pipeConnectorParameters);
     if (isDataRegionConnector) {
       connectorNum =
@@ -92,6 +92,12 @@ public class PipeConnectorSubtaskManager {
                   PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
                   PipeConnectorConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
               
PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
+      realTimeFirst =
+          pipeConnectorParameters.getBooleanOrDefault(
+              Arrays.asList(
+                  PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY,
+                  PipeConnectorConstant.SINK_REALTIME_FIRST_KEY),
+              PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
       attributeSortedString = "data_" + attributeSortedString;
     } else {
       // Do not allow parallel tasks for schema region connectors
@@ -105,10 +111,10 @@ public class PipeConnectorSubtaskManager {
           new ArrayList<>(connectorNum);
 
       // Shared pending queue for all subtasks
-      final BoundedBlockingPendingQueue<Event> pendingQueue =
-          new BoundedBlockingPendingQueue<>(
-              PipeConfig.getInstance().getPipeConnectorPendingQueueSize(),
-              new PipeDataRegionEventCounter());
+      final UnboundedBlockingPendingQueue<Event> pendingQueue =
+          realTimeFirst
+              ? new PipeRealtimePriorityBlockingQueue()
+              : new UnboundedBlockingPendingQueue<>(new 
PipeDataRegionEventCounter());
 
       for (int connectorIndex = 0; connectorIndex < connectorNum; 
connectorIndex++) {
         final PipeConnector pipeConnector =
@@ -210,7 +216,7 @@ public class PipeConnectorSubtaskManager {
     }
   }
 
-  public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
+  public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
       final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
new file mode 100644
index 00000000000..82b09346322
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.subtask.connector;
+
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
+import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+
+import java.util.Objects;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.function.Consumer;
+
+public class PipeRealtimePriorityBlockingQueue extends 
UnboundedBlockingPendingQueue<Event> {
+
+  private final BlockingDeque<TsFileInsertionEvent> tsfileInsertEventDeque =
+      new LinkedBlockingDeque<>();
+
+  public PipeRealtimePriorityBlockingQueue() {
+    super(new PipeDataRegionEventCounter());
+  }
+
+  @Override
+  public boolean directOffer(final Event event) {
+    if (event instanceof TsFileInsertionEvent) {
+      tsfileInsertEventDeque.add((TsFileInsertionEvent) event);
+      return true;
+    }
+
+    if (event instanceof PipeHeartbeatEvent && super.peekLast() instanceof 
PipeHeartbeatEvent) {
+      // We can NOT keep too many PipeHeartbeatEvent in bufferQueue because 
they may cause OOM.
+      ((EnrichedEvent) 
event).decreaseReferenceCount(PipeEventCollector.class.getName(), false);
+    } else {
+      super.directOffer(event);
+    }
+    return true;
+  }
+
+  @Override
+  public boolean waitedOffer(final Event event) {
+    return directOffer(event);
+  }
+
+  @Override
+  public boolean put(final Event event) {
+    directOffer(event);
+    return true;
+  }
+
+  @Override
+  public Event directPoll() {
+    Event event = super.directPoll();
+    if (Objects.isNull(event)) {
+      event = tsfileInsertEventDeque.pollLast();
+    }
+    return event;
+  }
+
+  /**
+   * Try to poll the freshest insertion event from the queue. First, try to 
poll the first offered
+   * non-TsFileInsertionEvent. If no such event is available, poll the last 
offered
+   * TsFileInsertionEvent. If no event is available, block until an event is 
available.
+   *
+   * @return the freshest insertion event. can be null if no event is 
available.
+   */
+  @Override
+  public Event waitedPoll() {
+    Event event = null;
+
+    if (!super.isEmpty()) {
+      // Sequentially poll the first offered non-TsFileInsertionEvent
+      event = super.directPoll();
+    } else if (!tsfileInsertEventDeque.isEmpty()) {
+      // Always poll the last offered event
+      event = tsfileInsertEventDeque.pollLast();
+    }
+
+    // If no event is available, block until an event is available
+    if (Objects.isNull(event)) {
+      event = super.waitedPoll();
+      if (Objects.isNull(event)) {
+        event = tsfileInsertEventDeque.pollLast();
+      }
+    }
+
+    return event;
+  }
+
+  @Override
+  public void clear() {
+    super.clear();
+    tsfileInsertEventDeque.clear();
+  }
+
+  @Override
+  public void forEach(final Consumer<? super Event> action) {
+    super.forEach(action);
+    tsfileInsertEventDeque.forEach(action);
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return super.isEmpty() && tsfileInsertEventDeque.isEmpty();
+  }
+
+  @Override
+  public int size() {
+    return super.size() + tsfileInsertEventDeque.size();
+  }
+
+  @Override
+  public int getTsFileInsertionEventCount() {
+    return tsfileInsertEventDeque.size();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 6839dc1c828..2b48becaed2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -30,7 +30,6 @@ import 
org.apache.iotdb.commons.pipe.task.subtask.PipeReportableSubtask;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
-import 
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
 import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics;
 import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
 import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -88,7 +87,6 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
     if (StorageEngine.getInstance().getAllDataRegionIds().contains(new 
DataRegionId(regionId))) {
       PipeProcessorMetrics.getInstance().register(this);
     }
-    PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
   }
 
   @Override
@@ -124,15 +122,8 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
     // Record the last event for retry when exception occurs
     setLastEvent(event);
 
-    if (
-    // Though there is no event to process, there may still be some buffered 
events
-    // in the outputEventCollector. Return true if there are still buffered 
events,
-    // false otherwise.
-    event == null
-        // If there are still buffered events, process them first, the newly 
supplied
-        // event will be processed in the next round.
-        || !outputEventCollector.isBufferQueueEmpty()) {
-      return outputEventCollector.tryCollectBufferedEvents();
+    if (Objects.isNull(event)) {
+      return false;
     }
 
     outputEventCollector.resetCollectInvocationCount();
@@ -216,8 +207,6 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
           ErrorHandlingUtils.getRootCause(e).getMessage(),
           e);
     } finally {
-      outputEventCollector.close();
-
       // should be called after pipeProcessor.close()
       super.close();
     }
@@ -255,22 +244,6 @@ public class PipeProcessorSubtask extends 
PipeReportableSubtask {
     return regionId;
   }
 
-  public int getTabletInsertionEventCount() {
-    return outputEventCollector.getTabletInsertionEventCount();
-  }
-
-  public int getTsFileInsertionEventCount() {
-    return outputEventCollector.getTsFileInsertionEventCount();
-  }
-
-  public int getPipeHeartbeatEventCount() {
-    return outputEventCollector.getPipeHeartbeatEventCount();
-  }
-
-  public int getEventCount() {
-    return outputEventCollector.getEventCount();
-  }
-
   //////////////////////////// Error report ////////////////////////////
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index d8da569051a..d0e900e0a5a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.subscription.broker;
 
-import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.db.subscription.timer.SubscriptionPollTimer;
 import org.apache.iotdb.pipe.api.event.Event;
 
@@ -89,7 +89,7 @@ public class SubscriptionBroker {
   /////////////////////////////// prefetching queue 
///////////////////////////////
 
   public void bindPrefetchingQueue(
-      String topicName, BoundedBlockingPendingQueue<Event> inputPendingQueue) {
+      String topicName, UnboundedBlockingPendingQueue<Event> 
inputPendingQueue) {
     final SubscriptionPrefetchingQueue prefetchingQueue =
         topicNameToPrefetchingQueue.get(topicName);
     if (Objects.nonNull(prefetchingQueue)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 9a75e464475..c9a6a4fbab2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.subscription.broker;
 
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
@@ -53,7 +53,7 @@ public class SubscriptionPrefetchingQueue {
 
   private final String brokerId; // consumer group id
   private final String topicName;
-  private final BoundedBlockingPendingQueue<Event> inputPendingQueue;
+  private final UnboundedBlockingPendingQueue<Event> inputPendingQueue;
 
   private final Map<String, SerializedEnrichedEvent> uncommittedEvents;
   private final LinkedBlockingQueue<SerializedEnrichedEvent> prefetchingQueue;
@@ -61,7 +61,7 @@ public class SubscriptionPrefetchingQueue {
   private final AtomicLong subscriptionCommitIdGenerator = new AtomicLong(0);
 
   public SubscriptionPrefetchingQueue(
-      String brokerId, String topicName, BoundedBlockingPendingQueue<Event> 
inputPendingQueue) {
+      String brokerId, String topicName, UnboundedBlockingPendingQueue<Event> 
inputPendingQueue) {
     this.brokerId = brokerId;
     this.topicName = topicName;
     this.inputPendingQueue = inputPendingQueue;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
index e5f965f7176..040eccced45 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.subscription.task.stage;
 
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
-import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
 import 
org.apache.iotdb.db.subscription.task.subtask.SubscriptionConnectorSubtaskManager;
@@ -70,7 +70,7 @@ public class SubscriptionTaskConnectorStage extends 
PipeTaskConnectorStage {
         .deregister(pipeName, creationTime, regionId, connectorSubtaskId);
   }
 
-  public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
+  public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
     return SubscriptionConnectorSubtaskManager.instance()
         .getPipeConnectorPendingQueue(connectorSubtaskId);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
index 4ea4dfe96d0..bb4838b903e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.subscription.task.subtask;
 
-import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
 import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
 import org.apache.iotdb.pipe.api.PipeConnector;
@@ -35,7 +35,7 @@ public class SubscriptionConnectorSubtask extends 
PipeConnectorSubtask {
       long creationTime,
       String attributeSortedString,
       int connectorIndex,
-      BoundedBlockingPendingQueue<Event> inputPendingQueue,
+      UnboundedBlockingPendingQueue<Event> inputPendingQueue,
       PipeConnector outputPipeConnector,
       String topicName,
       String consumerGroupId) {
@@ -69,7 +69,7 @@ public class SubscriptionConnectorSubtask extends 
PipeConnectorSubtask {
     return consumerGroupId;
   }
 
-  public BoundedBlockingPendingQueue<Event> getInputPendingQueue() {
+  public UnboundedBlockingPendingQueue<Event> getInputPendingQueue() {
     return inputPendingQueue;
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
index 8192d13e811..14633dfafef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.subscription.task.subtask;
 
-import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
 import 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskLifeCycle;
@@ -40,7 +40,7 @@ public class SubscriptionConnectorSubtaskLifeCycle extends 
PipeConnectorSubtaskL
   public SubscriptionConnectorSubtaskLifeCycle(
       PipeConnectorSubtaskExecutor executor, // SubscriptionSubtaskExecutor
       PipeConnectorSubtask subtask, // SubscriptionConnectorSubtask
-      BoundedBlockingPendingQueue<Event> pendingQueue) {
+      UnboundedBlockingPendingQueue<Event> pendingQueue) {
     super(executor, subtask, pendingQueue);
 
     runningTaskCount = 0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
index 8f2cbd613d5..35a4a3ab4ac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
@@ -19,19 +19,19 @@
 
 package org.apache.iotdb.db.subscription.task.subtask;
 
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
 import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
-import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
 import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
 import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
 import 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskLifeCycle;
+import 
org.apache.iotdb.db.pipe.task.subtask.connector.PipeRealtimePriorityBlockingQueue;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -86,14 +86,20 @@ public class SubscriptionConnectorSubtaskManager {
             environment.getRegionId(),
             connectorKey);
 
+    boolean realTimeFirst =
+        pipeConnectorParameters.getBooleanOrDefault(
+            Arrays.asList(
+                PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY,
+                PipeConnectorConstant.SINK_REALTIME_FIRST_KEY),
+            PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
     String attributeSortedString = 
generateAttributeSortedString(pipeConnectorParameters);
     attributeSortedString = "__subscription_" + attributeSortedString;
 
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
-      final BoundedBlockingPendingQueue<Event> pendingQueue =
-          new BoundedBlockingPendingQueue<>(
-              PipeConfig.getInstance().getPipeConnectorPendingQueueSize(),
-              new PipeDataRegionEventCounter());
+      final UnboundedBlockingPendingQueue<Event> pendingQueue =
+          realTimeFirst
+              ? new PipeRealtimePriorityBlockingQueue()
+              : new UnboundedBlockingPendingQueue<>(new 
PipeDataRegionEventCounter());
 
       final PipeConnector pipeConnector =
           
PipeAgent.plugin().dataRegion().reflectConnector(pipeConnectorParameters);
@@ -193,7 +199,7 @@ public class SubscriptionConnectorSubtaskManager {
     lifeCycle.stop();
   }
 
-  public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
+  public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
       final String attributeSortedString) {
     if 
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString)) 
{
       throw new PipeException(
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
index e1f5063eacc..e83c8500a48 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.execution;
 
-import 
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
 import org.apache.iotdb.pipe.api.PipeConnector;
 
@@ -41,7 +41,7 @@ public class PipeConnectorSubtaskExecutorTest extends 
PipeSubtaskExecutorTest {
                 System.currentTimeMillis(),
                 "TestAttributeSortedString",
                 0,
-                mock(BoundedBlockingPendingQueue.class),
+                mock(UnboundedBlockingPendingQueue.class),
                 mock(PipeConnector.class)));
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 12ef440b81b..fc437a3de97 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -189,9 +189,6 @@ public class CommonConfig {
   private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
   private int pipeConnectorReadFileBufferSize = 8388608;
   private long pipeConnectorRetryIntervalMs = 1000L;
-  // recommend to set this value to 3 * pipeSubtaskExecutorMaxThreadNum *
-  // pipeAsyncConnectorCoreClientNumber
-  private int pipeConnectorPendingQueueSize = 256;
   private boolean pipeConnectorRPCThriftCompressionEnabled = false;
 
   private int pipeAsyncConnectorSelectorNumber = 4;
@@ -758,14 +755,6 @@ public class CommonConfig {
     this.pipeConnectorRetryIntervalMs = pipeConnectorRetryIntervalMs;
   }
 
-  public int getPipeConnectorPendingQueueSize() {
-    return pipeConnectorPendingQueueSize;
-  }
-
-  public void setPipeConnectorPendingQueueSize(int 
pipeConnectorPendingQueueSize) {
-    this.pipeConnectorPendingQueueSize = pipeConnectorPendingQueueSize;
-  }
-
   public int 
getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount() {
     return pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index e1ec6c03dbb..efb3420b62c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -354,13 +354,6 @@ public class CommonDescriptor {
                     properties.getProperty(
                         "pipe_connector_retry_interval_ms",
                         
String.valueOf(config.getPipeConnectorRetryIntervalMs())))));
-    config.setPipeConnectorPendingQueueSize(
-        Integer.parseInt(
-            
Optional.ofNullable(properties.getProperty("pipe_sink_pending_queue_size"))
-                .orElse(
-                    properties.getProperty(
-                        "pipe_connector_pending_queue_size",
-                        
String.valueOf(config.getPipeConnectorPendingQueueSize())))));
     config.setPipeConnectorRPCThriftCompressionEnabled(
         Boolean.parseBoolean(
             
Optional.ofNullable(properties.getProperty("pipe_sink_rpc_thrift_compression_enabled"))
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 1821fe74a7c..41331a822dd 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -111,10 +111,6 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeConnectorRetryIntervalMs();
   }
 
-  public int getPipeConnectorPendingQueueSize() {
-    return COMMON_CONFIG.getPipeConnectorPendingQueueSize();
-  }
-
   public boolean isPipeConnectorRPCThriftCompressionEnabled() {
     return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
   }
@@ -312,7 +308,6 @@ public class PipeConfig {
     LOGGER.info("PipeConnectorTransferTimeoutMs: {}", 
getPipeConnectorTransferTimeoutMs());
     LOGGER.info("PipeConnectorReadFileBufferSize: {}", 
getPipeConnectorReadFileBufferSize());
     LOGGER.info("PipeConnectorRetryIntervalMs: {}", 
getPipeConnectorRetryIntervalMs());
-    LOGGER.info("PipeConnectorPendingQueueSize: {}", 
getPipeConnectorPendingQueueSize());
     LOGGER.info(
         "PipeConnectorRPCThriftCompressionEnabled: {}",
         isPipeConnectorRPCThriftCompressionEnabled());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 331c4d79a2f..c15fc4db189 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -53,6 +53,10 @@ public class PipeConnectorConstant {
   public static final int CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE =
       PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum();
 
+  public static final String CONNECTOR_REALTIME_FIRST_KEY = 
"connector.realtime-first";
+  public static final String SINK_REALTIME_FIRST_KEY = "sink.realtime-first";
+  public static final boolean CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE = false;
+
   public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY = 
"connector.batch.enable";
   public static final String SINK_IOTDB_BATCH_MODE_ENABLE_KEY = 
"sink.batch.enable";
   public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE 
= true;

Reply via email to