This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3b0dfcd1736 Pipe: Enable realtime first strategy in data transfer
(#12559)
3b0dfcd1736 is described below
commit 3b0dfcd17365dac3fb244bf4e10a9664d204636a
Author: Caideyipi <[email protected]>
AuthorDate: Tue May 21 17:22:38 2024 +0800
Pipe: Enable realtime first strategy in data transfer (#12559)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../pipe/it/autocreate/IoTDBPipeDataSinkIT.java | 17 ++-
.../event/common/heartbeat/PipeHeartbeatEvent.java | 34 +-----
.../PipeRealtimeDataRegionHybridExtractor.java | 9 +-
.../PipeDataNodeRemainingEventAndTimeMetrics.java | 12 --
.../PipeDataNodeRemainingEventAndTimeOperator.java | 17 ---
.../iotdb/db/pipe/metric/PipeProcessorMetrics.java | 39 ------
.../pipe/task/connection/PipeEventCollector.java | 96 ++-------------
.../db/pipe/task/stage/PipeTaskConnectorStage.java | 4 +-
.../db/pipe/task/stage/PipeTaskProcessorStage.java | 4 +-
.../subtask/connector/PipeConnectorSubtask.java | 6 +-
.../connector/PipeConnectorSubtaskLifeCycle.java | 8 +-
.../connector/PipeConnectorSubtaskManager.java | 20 +--
.../PipeRealtimePriorityBlockingQueue.java | 136 +++++++++++++++++++++
.../subtask/processor/PipeProcessorSubtask.java | 31 +----
.../db/subscription/broker/SubscriptionBroker.java | 4 +-
.../broker/SubscriptionPrefetchingQueue.java | 6 +-
.../task/stage/SubscriptionTaskConnectorStage.java | 4 +-
.../task/subtask/SubscriptionConnectorSubtask.java | 6 +-
.../SubscriptionConnectorSubtaskLifeCycle.java | 4 +-
.../SubscriptionConnectorSubtaskManager.java | 20 +--
.../PipeConnectorSubtaskExecutorTest.java | 4 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 11 --
.../iotdb/commons/conf/CommonDescriptor.java | 7 --
.../iotdb/commons/pipe/config/PipeConfig.java | 5 -
.../config/constant/PipeConnectorConstant.java | 4 +
25 files changed, 216 insertions(+), 292 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index 7e18ad9b876..d3bfe710aca 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -36,13 +36,14 @@ import org.junit.runner.RunWith;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2AutoCreateSchema.class})
public class IoTDBPipeDataSinkIT extends AbstractPipeDualAutoIT {
@Test
- public void testThriftConnector() throws Exception {
+ public void testThriftConnectorWithRealtimeFirst() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final String receiverIp = receiverDataNode.getIp();
@@ -50,6 +51,15 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualAutoIT {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+
+ // Do not fail if the failure has nothing to do with pipe
+ // Because the failures will randomly generate due to resource limitation
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList("insert into root.vehicle.d0(time, s1) values (0, 1)",
"flush"))) {
+ return;
+ }
+
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();
@@ -60,6 +70,7 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualAutoIT {
connectorAttributes.put("connector.batch.enable", "false");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
+ connectorAttributes.put("connector.realtime-first", "true");
final TSStatus status =
client.createPipe(
@@ -76,7 +87,7 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualAutoIT {
// Because the failures will randomly generate due to resource limitation
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
- Arrays.asList("insert into root.vehicle.d0(time, s1) values (0, 1)",
"flush"))) {
+ Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)",
"flush"))) {
return;
}
@@ -84,7 +95,7 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualAutoIT {
receiverEnv,
"select * from root.**",
"Time,root.vehicle.d0.s1,",
- Collections.singleton("0,1.0,"));
+ Collections.unmodifiableSet(new HashSet<>(Arrays.asList("0,1.0,",
"1,1.0,"))));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index c675ff5542c..c3d0798c7e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -23,13 +23,11 @@ import
org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.pattern.PipePattern;
-import
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.PipeRealtimeDataRegionHybridExtractor;
import org.apache.iotdb.db.pipe.metric.PipeHeartbeatEventMetrics;
-import org.apache.iotdb.db.pipe.task.connection.EnrichedDeque;
import org.apache.iotdb.db.utils.DateTimeUtils;
import org.apache.iotdb.pipe.api.event.Event;
@@ -58,10 +56,6 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
private int extractorQueueTsFileSize;
private int extractorQueueSize;
- private int bufferQueueTabletSize;
- private int bufferQueueTsFileSize;
- private int bufferQueueSize;
-
private int connectorQueueTabletSize;
private int connectorQueueTsFileSize;
private int connectorQueueSize;
@@ -195,20 +189,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
}
}
- public void recordBufferQueueSize(EnrichedDeque<Event> bufferQueue) {
- if (shouldPrintMessage) {
- bufferQueueTabletSize = bufferQueue.getTabletInsertionEventCount();
- bufferQueueTsFileSize = bufferQueue.getTsFileInsertionEventCount();
- bufferQueueSize = bufferQueue.size();
- }
-
- if (extractor instanceof PipeRealtimeDataRegionHybridExtractor) {
- ((PipeRealtimeDataRegionHybridExtractor) extractor)
-
.informProcessorEventCollectorQueueTsFileSize(bufferQueue.getTsFileInsertionEventCount());
- }
- }
-
- public void recordConnectorQueueSize(BoundedBlockingPendingQueue<Event>
pendingQueue) {
+ public void recordConnectorQueueSize(UnboundedBlockingPendingQueue<Event>
pendingQueue) {
if (shouldPrintMessage) {
connectorQueueTabletSize = pendingQueue.getTabletInsertionEventCount();
connectorQueueTsFileSize = pendingQueue.getTsFileInsertionEventCount();
@@ -271,13 +252,6 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
final String extractorQueueSizeMessage =
timeAssigned != 0 ? Integer.toString(extractorQueueSize) :
unknownMessage;
- final String bufferQueueTabletSizeMessage =
- timeProcessed != 0 ? Integer.toString(bufferQueueTabletSize) :
unknownMessage;
- final String bufferQueueTsFileSizeMessage =
- timeProcessed != 0 ? Integer.toString(bufferQueueTsFileSize) :
unknownMessage;
- final String bufferQueueSizeMessage =
- timeProcessed != 0 ? Integer.toString(bufferQueueSize) :
unknownMessage;
-
final String connectorQueueTabletSizeMessage =
timeProcessed != 0 ? Integer.toString(connectorQueueTabletSize) :
unknownMessage;
final String connectorQueueTsFileSizeMessage =
@@ -308,12 +282,6 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
+ extractorQueueTsFileSizeMessage
+ ", extractorQueueSize="
+ extractorQueueSizeMessage
- + ", bufferQueueTabletSize="
- + bufferQueueTabletSizeMessage
- + ", bufferQueueTsFileSize="
- + bufferQueueTsFileSizeMessage
- + ", bufferQueueSize="
- + bufferQueueSizeMessage
+ ", connectorQueueTabletSize="
+ connectorQueueTabletSizeMessage
+ ", connectorQueueTsFileSize="
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 210c5258120..381b8d73858 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -47,7 +47,6 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
- private final AtomicInteger processorEventCollectorQueueTsFileSize = new
AtomicInteger(0);
private final AtomicInteger connectorInputPendingQueueTsFileSize = new
AtomicInteger(0);
@Override
@@ -239,9 +238,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
}
private boolean isRealtimeTsFileEventCountExceededLimit() {
- return pendingQueue.getTsFileInsertionEventCount()
- + processorEventCollectorQueueTsFileSize.get()
- + connectorInputPendingQueueTsFileSize.get()
+ return pendingQueue.getTsFileInsertionEventCount() +
connectorInputPendingQueueTsFileSize.get()
>=
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
}
@@ -250,10 +247,6 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
>= PipeConfig.getInstance().getPipeMaxAllowedLinkedTsFileCount();
}
- public void informProcessorEventCollectorQueueTsFileSize(final int
queueSize) {
- processorEventCollectorQueueTsFileSize.set(queueSize);
- }
-
public void informConnectorInputPendingQueueTsFileSize(final int queueSize) {
connectorInputPendingQueueTsFileSize.set(queueSize);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
index 2bfe9106c01..5173d64e04f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeMetrics.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.commons.service.metric.enums.Tag;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
import
org.apache.iotdb.db.pipe.extractor.schemaregion.IoTDBSchemaRegionExtractor;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
-import org.apache.iotdb.db.pipe.task.subtask.processor.PipeProcessorSubtask;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.metrics.AbstractMetricService;
@@ -128,17 +127,6 @@ public class PipeDataNodeRemainingEventAndTimeMetrics
implements IMetricSet {
}
}
- public void register(final PipeProcessorSubtask processorSubtask) {
- // The metric is global thus the regionId is omitted
- final String pipeID = processorSubtask.getPipeName() + "_" +
processorSubtask.getCreationTime();
- remainingEventAndTimeOperatorMap
- .computeIfAbsent(pipeID, k -> new
PipeDataNodeRemainingEventAndTimeOperator())
- .register(processorSubtask);
- if (Objects.nonNull(metricService)) {
- createMetrics(pipeID);
- }
- }
-
public void register(
final PipeConnectorSubtask connectorSubtask, final String pipeName,
final long creationTime) {
// The metric is global thus the regionId is omitted
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
index ca311824a65..a4b2fd3e7b7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -72,10 +72,6 @@ class PipeDataNodeRemainingEventAndTimeOperator {
.map(IoTDBDataRegionExtractor::getEventCount)
.reduce(Integer::sum)
.orElse(0)
- + dataRegionProcessors.keySet().stream()
- .map(PipeProcessorSubtask::getEventCount)
- .reduce(Integer::sum)
- .orElse(0)
+ dataRegionConnectors.keySet().stream()
.map(connectorSubtask -> connectorSubtask.getEventCount(pipeName))
.reduce(Integer::sum)
@@ -103,10 +99,6 @@ class PipeDataNodeRemainingEventAndTimeOperator {
.map(IoTDBDataRegionExtractor::getEventCount)
.reduce(Integer::sum)
.orElse(0)
- + dataRegionProcessors.keySet().stream()
- .map(PipeProcessorSubtask::getEventCount)
- .reduce(Integer::sum)
- .orElse(0)
+ dataRegionConnectors.keySet().stream()
.map(connectorSubtask ->
connectorSubtask.getEventCount(pipeName))
.reduce(Integer::sum)
@@ -115,10 +107,6 @@ class PipeDataNodeRemainingEventAndTimeOperator {
.map(IoTDBDataRegionExtractor::getPipeHeartbeatEventCount)
.reduce(Integer::sum)
.orElse(0)
- - dataRegionProcessors.keySet().stream()
- .map(PipeProcessorSubtask::getPipeHeartbeatEventCount)
- .reduce(Integer::sum)
- .orElse(0)
- dataRegionConnectors.keySet().stream()
.map(PipeConnectorSubtask::getPipeHeartbeatEventCount)
.reduce(Integer::sum)
@@ -174,11 +162,6 @@ class PipeDataNodeRemainingEventAndTimeOperator {
dataRegionExtractors.put(extractor, extractor);
}
- void register(final PipeProcessorSubtask processorSubtask) {
- setNameAndCreationTime(processorSubtask.getPipeName(),
processorSubtask.getCreationTime());
- dataRegionProcessors.put(processorSubtask, processorSubtask);
- }
-
void register(
final PipeConnectorSubtask connectorSubtask, final String pipeName,
final long creationTime) {
setNameAndCreationTime(pipeName, creationTime);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
index 05a267c6bae..8cc2383c847 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/PipeProcessorMetrics.java
@@ -64,48 +64,9 @@ public class PipeProcessorMetrics implements IMetricSet {
}
private void createMetrics(String taskID) {
- createAutoGauge(taskID);
createRate(taskID);
}
- private void createAutoGauge(String taskID) {
- PipeProcessorSubtask processor = processorMap.get(taskID);
- // pending event count
- metricService.createAutoGauge(
- Metric.BUFFERED_TABLET_COUNT.toString(),
- MetricLevel.IMPORTANT,
- processor,
- PipeProcessorSubtask::getTabletInsertionEventCount,
- Tag.NAME.toString(),
- processor.getPipeName(),
- Tag.REGION.toString(),
- String.valueOf(processor.getRegionId()),
- Tag.CREATION_TIME.toString(),
- String.valueOf(processor.getCreationTime()));
- metricService.createAutoGauge(
- Metric.BUFFERED_TSFILE_COUNT.toString(),
- MetricLevel.IMPORTANT,
- processor,
- PipeProcessorSubtask::getTsFileInsertionEventCount,
- Tag.NAME.toString(),
- processor.getPipeName(),
- Tag.REGION.toString(),
- String.valueOf(processor.getRegionId()),
- Tag.CREATION_TIME.toString(),
- String.valueOf(processor.getCreationTime()));
- metricService.createAutoGauge(
- Metric.BUFFERED_HEARTBEAT_COUNT.toString(),
- MetricLevel.IMPORTANT,
- processor,
- PipeProcessorSubtask::getPipeHeartbeatEventCount,
- Tag.NAME.toString(),
- processor.getPipeName(),
- Tag.REGION.toString(),
- String.valueOf(processor.getRegionId()),
- Tag.CREATION_TIME.toString(),
- String.valueOf(processor.getCreationTime()));
- }
-
private void createRate(String taskID) {
PipeProcessorSubtask processor = processorMap.get(taskID);
// process event rate
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 30a16f3c2a7..cdd73598571 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.task.connection;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
-import
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -35,34 +35,27 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.LinkedList;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-public class PipeEventCollector implements EventCollector, AutoCloseable {
+public class PipeEventCollector implements EventCollector {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeEventCollector.class);
- private final BoundedBlockingPendingQueue<Event> pendingQueue;
-
- private final EnrichedDeque<Event> bufferQueue;
+ private final UnboundedBlockingPendingQueue<Event> pendingQueue;
private final long creationTime;
private final int regionId;
- private final AtomicBoolean isClosed = new AtomicBoolean(false);
-
private final AtomicInteger collectInvocationCount = new AtomicInteger(0);
public PipeEventCollector(
- final BoundedBlockingPendingQueue<Event> pendingQueue,
+ final UnboundedBlockingPendingQueue<Event> pendingQueue,
final long creationTime,
final int regionId) {
this.pendingQueue = pendingQueue;
this.creationTime = creationTime;
this.regionId = regionId;
- bufferQueue = new EnrichedDeque<>(new LinkedList<>());
}
@Override
@@ -128,7 +121,7 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
}
}
- private synchronized void collectEvent(final Event event) {
+ private void collectEvent(final Event event) {
collectInvocationCount.incrementAndGet();
if (event instanceof EnrichedEvent) {
@@ -138,32 +131,12 @@ public class PipeEventCollector implements
EventCollector, AutoCloseable {
PipeEventCommitManager.getInstance()
.enrichWithCommitterKeyAndCommitId((EnrichedEvent) event,
creationTime, regionId);
}
+
if (event instanceof PipeHeartbeatEvent) {
- ((PipeHeartbeatEvent) event).recordBufferQueueSize(bufferQueue);
((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
}
- while (!isClosed.get() && !bufferQueue.isEmpty()) {
- final Event bufferedEvent = bufferQueue.peek();
- // Try to put already buffered events into pending queue, if pending
queue is full, wait for
- // pending queue to be available with timeout.
- if (pendingQueue.waitedOffer(bufferedEvent)) {
- bufferQueue.poll();
- } else {
- // We can NOT keep too many PipeHeartbeatEvent in bufferQueue because
they may cause OOM.
- if (event instanceof PipeHeartbeatEvent
- && bufferQueue.peekLast() instanceof PipeHeartbeatEvent) {
- ((EnrichedEvent)
event).decreaseReferenceCount(PipeEventCollector.class.getName(), false);
- } else {
- bufferQueue.offer(event);
- }
- return;
- }
- }
-
- if (!pendingQueue.waitedOffer(event)) {
- bufferQueue.offer(event);
- }
+ pendingQueue.directOffer(event);
}
public void resetCollectInvocationCount() {
@@ -173,59 +146,4 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
public boolean hasNoCollectInvocationAfterReset() {
return collectInvocationCount.get() == 0;
}
-
- public boolean isBufferQueueEmpty() {
- return bufferQueue.isEmpty();
- }
-
- /**
- * Try to collect buffered events into {@link
PipeEventCollector#pendingQueue}.
- *
- * @return {@code true} if there are still buffered events after this
operation, {@code false}
- * otherwise.
- */
- public synchronized boolean tryCollectBufferedEvents() {
- while (!isClosed.get() && !bufferQueue.isEmpty()) {
- final Event bufferedEvent = bufferQueue.peek();
- if (pendingQueue.waitedOffer(bufferedEvent)) {
- bufferQueue.poll();
- } else {
- return true;
- }
- }
- return false;
- }
-
- public void close() {
- isClosed.set(true);
- doClose();
- }
-
- private synchronized void doClose() {
- bufferQueue.forEach(
- event -> {
- if (event instanceof EnrichedEvent) {
- ((EnrichedEvent)
event).clearReferenceCount(PipeEventCollector.class.getName());
- }
- });
- bufferQueue.clear();
- }
-
- //////////////////////////// APIs provided for metric framework
////////////////////////////
-
- public int getTabletInsertionEventCount() {
- return bufferQueue.getTabletInsertionEventCount();
- }
-
- public int getTsFileInsertionEventCount() {
- return bufferQueue.getTsFileInsertionEventCount();
- }
-
- public int getPipeHeartbeatEventCount() {
- return bufferQueue.getPipeHeartbeatEventCount();
- }
-
- public int getEventCount() {
- return bufferQueue.size();
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
index 001978ebe90..a5ec4fed78b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskConnectorStage.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.pipe.task.stage;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
-import
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.task.stage.PipeTaskStage;
import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
import
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskManager;
@@ -83,7 +83,7 @@ public class PipeTaskConnectorStage extends PipeTaskStage {
.deregister(pipeName, creationTime, regionId, connectorSubtaskId);
}
- public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
+ public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
return
PipeConnectorSubtaskManager.instance().getPipeConnectorPendingQueue(connectorSubtaskId);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
index d88bfb0530c..c7ceaf1f978 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/stage/PipeTaskProcessorStage.java
@@ -25,7 +25,7 @@ import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeC
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.task.EventSupplier;
-import
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.task.stage.PipeTaskStage;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
@@ -65,7 +65,7 @@ public class PipeTaskProcessorStage extends PipeTaskStage {
PipeParameters pipeProcessorParameters,
int regionId,
EventSupplier pipeExtractorInputEventSupplier,
- BoundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue,
+ UnboundedBlockingPendingQueue<Event> pipeConnectorOutputPendingQueue,
PipeProcessorSubtaskExecutor executor,
PipeTaskMeta pipeTaskMeta) {
final PipeProcessorRuntimeConfiguration runtimeConfiguration =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
index 300369ae9ed..22e87c7e5c8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtask.java
@@ -22,7 +22,7 @@ package org.apache.iotdb.db.pipe.task.subtask.connector;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.pipe.task.subtask.PipeAbstractConnectorSubtask;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
@@ -50,7 +50,7 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConnectorSubtask.class);
// For input
- protected final BoundedBlockingPendingQueue<Event> inputPendingQueue;
+ protected final UnboundedBlockingPendingQueue<Event> inputPendingQueue;
// Record these variables to provide corresponding value to tag key of
monitoring metrics
private final String attributeSortedString;
@@ -71,7 +71,7 @@ public class PipeConnectorSubtask extends
PipeAbstractConnectorSubtask {
final long creationTime,
final String attributeSortedString,
final int connectorIndex,
- final BoundedBlockingPendingQueue<Event> inputPendingQueue,
+ final UnboundedBlockingPendingQueue<Event> inputPendingQueue,
final PipeConnector outputPipeConnector) {
super(taskID, creationTime, outputPipeConnector);
this.attributeSortedString = attributeSortedString;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
index 8391f4c7c81..a28a0289b08 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskLifeCycle.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.task.subtask.connector;
-import
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.pipe.api.event.Event;
@@ -32,7 +32,7 @@ public class PipeConnectorSubtaskLifeCycle implements
AutoCloseable {
protected final PipeConnectorSubtaskExecutor executor;
protected final PipeConnectorSubtask subtask;
- private final BoundedBlockingPendingQueue<Event> pendingQueue;
+ private final UnboundedBlockingPendingQueue<Event> pendingQueue;
private int runningTaskCount;
private int registeredTaskCount;
@@ -40,7 +40,7 @@ public class PipeConnectorSubtaskLifeCycle implements
AutoCloseable {
public PipeConnectorSubtaskLifeCycle(
PipeConnectorSubtaskExecutor executor,
PipeConnectorSubtask subtask,
- BoundedBlockingPendingQueue<Event> pendingQueue) {
+ UnboundedBlockingPendingQueue<Event> pendingQueue) {
this.executor = executor;
this.subtask = subtask;
this.pendingQueue = pendingQueue;
@@ -53,7 +53,7 @@ public class PipeConnectorSubtaskLifeCycle implements
AutoCloseable {
return subtask;
}
- public BoundedBlockingPendingQueue<Event> getPendingQueue() {
+ public UnboundedBlockingPendingQueue<Event> getPendingQueue() {
return pendingQueue;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
index b28206bf306..d85f04b68ea 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -20,14 +20,13 @@
package org.apache.iotdb.db.pipe.task.subtask.connector;
import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
-import
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
import
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
@@ -84,6 +83,7 @@ public class PipeConnectorSubtaskManager {
.contains(new DataRegionId(environment.getRegionId()));
final int connectorNum;
+ boolean realTimeFirst = false;
String attributeSortedString =
generateAttributeSortedString(pipeConnectorParameters);
if (isDataRegionConnector) {
connectorNum =
@@ -92,6 +92,12 @@ public class PipeConnectorSubtaskManager {
PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_KEY,
PipeConnectorConstant.SINK_IOTDB_PARALLEL_TASKS_KEY),
PipeConnectorConstant.CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE);
+ realTimeFirst =
+ pipeConnectorParameters.getBooleanOrDefault(
+ Arrays.asList(
+ PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY,
+ PipeConnectorConstant.SINK_REALTIME_FIRST_KEY),
+ PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
attributeSortedString = "data_" + attributeSortedString;
} else {
// Do not allow parallel tasks for schema region connectors
@@ -105,10 +111,10 @@ public class PipeConnectorSubtaskManager {
new ArrayList<>(connectorNum);
// Shared pending queue for all subtasks
- final BoundedBlockingPendingQueue<Event> pendingQueue =
- new BoundedBlockingPendingQueue<>(
- PipeConfig.getInstance().getPipeConnectorPendingQueueSize(),
- new PipeDataRegionEventCounter());
+ final UnboundedBlockingPendingQueue<Event> pendingQueue =
+ realTimeFirst
+ ? new PipeRealtimePriorityBlockingQueue()
+ : new UnboundedBlockingPendingQueue<>(new
PipeDataRegionEventCounter());
for (int connectorIndex = 0; connectorIndex < connectorNum;
connectorIndex++) {
final PipeConnector pipeConnector =
@@ -210,7 +216,7 @@ public class PipeConnectorSubtaskManager {
}
}
- public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
+ public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
new file mode 100644
index 00000000000..82b09346322
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/connector/PipeRealtimePriorityBlockingQueue.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.task.subtask.connector;
+
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
+import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
+import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+
+import java.util.Objects;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.function.Consumer;
+
+public class PipeRealtimePriorityBlockingQueue extends
UnboundedBlockingPendingQueue<Event> {
+
+ private final BlockingDeque<TsFileInsertionEvent> tsfileInsertEventDeque =
+ new LinkedBlockingDeque<>();
+
+ public PipeRealtimePriorityBlockingQueue() {
+ super(new PipeDataRegionEventCounter());
+ }
+
+ @Override
+ public boolean directOffer(final Event event) {
+ if (event instanceof TsFileInsertionEvent) {
+ tsfileInsertEventDeque.add((TsFileInsertionEvent) event);
+ return true;
+ }
+
+ if (event instanceof PipeHeartbeatEvent && super.peekLast() instanceof
PipeHeartbeatEvent) {
+ // We can NOT keep too many PipeHeartbeatEvent in bufferQueue because
they may cause OOM.
+ ((EnrichedEvent)
event).decreaseReferenceCount(PipeEventCollector.class.getName(), false);
+ } else {
+ super.directOffer(event);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean waitedOffer(final Event event) {
+ return directOffer(event);
+ }
+
+ @Override
+ public boolean put(final Event event) {
+ directOffer(event);
+ return true;
+ }
+
+ @Override
+ public Event directPoll() {
+ Event event = super.directPoll();
+ if (Objects.isNull(event)) {
+ event = tsfileInsertEventDeque.pollLast();
+ }
+ return event;
+ }
+
+ /**
+ * Try to poll the freshest insertion event from the queue. First, try to
poll the first offered
+ * non-TsFileInsertionEvent. If no such event is available, poll the last
offered
+ * TsFileInsertionEvent. If no event is available, block until an event is
available.
+ *
+ * @return the freshest insertion event. can be null if no event is
available.
+ */
+ @Override
+ public Event waitedPoll() {
+ Event event = null;
+
+ if (!super.isEmpty()) {
+ // Sequentially poll the first offered non-TsFileInsertionEvent
+ event = super.directPoll();
+ } else if (!tsfileInsertEventDeque.isEmpty()) {
+ // Always poll the last offered event
+ event = tsfileInsertEventDeque.pollLast();
+ }
+
+ // If no event is available, block until an event is available
+ if (Objects.isNull(event)) {
+ event = super.waitedPoll();
+ if (Objects.isNull(event)) {
+ event = tsfileInsertEventDeque.pollLast();
+ }
+ }
+
+ return event;
+ }
+
+ @Override
+ public void clear() {
+ super.clear();
+ tsfileInsertEventDeque.clear();
+ }
+
+ @Override
+ public void forEach(final Consumer<? super Event> action) {
+ super.forEach(action);
+ tsfileInsertEventDeque.forEach(action);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return super.isEmpty() && tsfileInsertEventDeque.isEmpty();
+ }
+
+ @Override
+ public int size() {
+ return super.size() + tsfileInsertEventDeque.size();
+ }
+
+ @Override
+ public int getTsFileInsertionEventCount() {
+ return tsfileInsertEventDeque.size();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 6839dc1c828..2b48becaed2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -30,7 +30,6 @@ import
org.apache.iotdb.commons.pipe.task.subtask.PipeReportableSubtask;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
-import
org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.PipeProcessorMetrics;
import org.apache.iotdb.db.pipe.task.connection.PipeEventCollector;
import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -88,7 +87,6 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
if (StorageEngine.getInstance().getAllDataRegionIds().contains(new
DataRegionId(regionId))) {
PipeProcessorMetrics.getInstance().register(this);
}
- PipeDataNodeRemainingEventAndTimeMetrics.getInstance().register(this);
}
@Override
@@ -124,15 +122,8 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
// Record the last event for retry when exception occurs
setLastEvent(event);
- if (
- // Though there is no event to process, there may still be some buffered
events
- // in the outputEventCollector. Return true if there are still buffered
events,
- // false otherwise.
- event == null
- // If there are still buffered events, process them first, the newly
supplied
- // event will be processed in the next round.
- || !outputEventCollector.isBufferQueueEmpty()) {
- return outputEventCollector.tryCollectBufferedEvents();
+ if (Objects.isNull(event)) {
+ return false;
}
outputEventCollector.resetCollectInvocationCount();
@@ -216,8 +207,6 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
ErrorHandlingUtils.getRootCause(e).getMessage(),
e);
} finally {
- outputEventCollector.close();
-
// should be called after pipeProcessor.close()
super.close();
}
@@ -255,22 +244,6 @@ public class PipeProcessorSubtask extends
PipeReportableSubtask {
return regionId;
}
- public int getTabletInsertionEventCount() {
- return outputEventCollector.getTabletInsertionEventCount();
- }
-
- public int getTsFileInsertionEventCount() {
- return outputEventCollector.getTsFileInsertionEventCount();
- }
-
- public int getPipeHeartbeatEventCount() {
- return outputEventCollector.getPipeHeartbeatEventCount();
- }
-
- public int getEventCount() {
- return outputEventCollector.getEventCount();
- }
-
//////////////////////////// Error report ////////////////////////////
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index d8da569051a..d0e900e0a5a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.subscription.broker;
-import
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.subscription.timer.SubscriptionPollTimer;
import org.apache.iotdb.pipe.api.event.Event;
@@ -89,7 +89,7 @@ public class SubscriptionBroker {
/////////////////////////////// prefetching queue
///////////////////////////////
public void bindPrefetchingQueue(
- String topicName, BoundedBlockingPendingQueue<Event> inputPendingQueue) {
+ String topicName, UnboundedBlockingPendingQueue<Event>
inputPendingQueue) {
final SubscriptionPrefetchingQueue prefetchingQueue =
topicNameToPrefetchingQueue.get(topicName);
if (Objects.nonNull(prefetchingQueue)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 9a75e464475..c9a6a4fbab2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.subscription.broker;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
@@ -53,7 +53,7 @@ public class SubscriptionPrefetchingQueue {
private final String brokerId; // consumer group id
private final String topicName;
- private final BoundedBlockingPendingQueue<Event> inputPendingQueue;
+ private final UnboundedBlockingPendingQueue<Event> inputPendingQueue;
private final Map<String, SerializedEnrichedEvent> uncommittedEvents;
private final LinkedBlockingQueue<SerializedEnrichedEvent> prefetchingQueue;
@@ -61,7 +61,7 @@ public class SubscriptionPrefetchingQueue {
private final AtomicLong subscriptionCommitIdGenerator = new AtomicLong(0);
public SubscriptionPrefetchingQueue(
- String brokerId, String topicName, BoundedBlockingPendingQueue<Event>
inputPendingQueue) {
+ String brokerId, String topicName, UnboundedBlockingPendingQueue<Event>
inputPendingQueue) {
this.brokerId = brokerId;
this.topicName = topicName;
this.inputPendingQueue = inputPendingQueue;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
index e5f965f7176..040eccced45 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/stage/SubscriptionTaskConnectorStage.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.subscription.task.stage;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
-import
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.task.stage.PipeTaskConnectorStage;
import
org.apache.iotdb.db.subscription.task.subtask.SubscriptionConnectorSubtaskManager;
@@ -70,7 +70,7 @@ public class SubscriptionTaskConnectorStage extends
PipeTaskConnectorStage {
.deregister(pipeName, creationTime, regionId, connectorSubtaskId);
}
- public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
+ public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue() {
return SubscriptionConnectorSubtaskManager.instance()
.getPipeConnectorPendingQueue(connectorSubtaskId);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
index 4ea4dfe96d0..bb4838b903e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtask.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.subscription.task.subtask;
-import
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -35,7 +35,7 @@ public class SubscriptionConnectorSubtask extends
PipeConnectorSubtask {
long creationTime,
String attributeSortedString,
int connectorIndex,
- BoundedBlockingPendingQueue<Event> inputPendingQueue,
+ UnboundedBlockingPendingQueue<Event> inputPendingQueue,
PipeConnector outputPipeConnector,
String topicName,
String consumerGroupId) {
@@ -69,7 +69,7 @@ public class SubscriptionConnectorSubtask extends
PipeConnectorSubtask {
return consumerGroupId;
}
- public BoundedBlockingPendingQueue<Event> getInputPendingQueue() {
+ public UnboundedBlockingPendingQueue<Event> getInputPendingQueue() {
return inputPendingQueue;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
index 8192d13e811..14633dfafef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.subscription.task.subtask;
-import
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
import
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskLifeCycle;
@@ -40,7 +40,7 @@ public class SubscriptionConnectorSubtaskLifeCycle extends
PipeConnectorSubtaskL
public SubscriptionConnectorSubtaskLifeCycle(
PipeConnectorSubtaskExecutor executor, // SubscriptionSubtaskExecutor
PipeConnectorSubtask subtask, // SubscriptionConnectorSubtask
- BoundedBlockingPendingQueue<Event> pendingQueue) {
+ UnboundedBlockingPendingQueue<Event> pendingQueue) {
super(executor, subtask, pendingQueue);
runningTaskCount = 0;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
index 8f2cbd613d5..35a4a3ab4ac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
@@ -19,19 +19,19 @@
package org.apache.iotdb.db.subscription.task.subtask;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.progress.PipeEventCommitManager;
-import
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.execution.PipeConnectorSubtaskExecutor;
import org.apache.iotdb.db.pipe.metric.PipeDataRegionEventCounter;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
import
org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtaskLifeCycle;
+import
org.apache.iotdb.db.pipe.task.subtask.connector.PipeRealtimePriorityBlockingQueue;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
@@ -86,14 +86,20 @@ public class SubscriptionConnectorSubtaskManager {
environment.getRegionId(),
connectorKey);
+ boolean realTimeFirst =
+ pipeConnectorParameters.getBooleanOrDefault(
+ Arrays.asList(
+ PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_KEY,
+ PipeConnectorConstant.SINK_REALTIME_FIRST_KEY),
+ PipeConnectorConstant.CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE);
String attributeSortedString =
generateAttributeSortedString(pipeConnectorParameters);
attributeSortedString = "__subscription_" + attributeSortedString;
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
- final BoundedBlockingPendingQueue<Event> pendingQueue =
- new BoundedBlockingPendingQueue<>(
- PipeConfig.getInstance().getPipeConnectorPendingQueueSize(),
- new PipeDataRegionEventCounter());
+ final UnboundedBlockingPendingQueue<Event> pendingQueue =
+ realTimeFirst
+ ? new PipeRealtimePriorityBlockingQueue()
+ : new UnboundedBlockingPendingQueue<>(new
PipeDataRegionEventCounter());
final PipeConnector pipeConnector =
PipeAgent.plugin().dataRegion().reflectConnector(pipeConnectorParameters);
@@ -193,7 +199,7 @@ public class SubscriptionConnectorSubtaskManager {
lifeCycle.stop();
}
- public BoundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
+ public UnboundedBlockingPendingQueue<Event> getPipeConnectorPendingQueue(
final String attributeSortedString) {
if
(!attributeSortedString2SubtaskLifeCycleMap.containsKey(attributeSortedString))
{
throw new PipeException(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
index e1f5063eacc..e83c8500a48 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/execution/PipeConnectorSubtaskExecutorTest.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.execution;
-import
org.apache.iotdb.commons.pipe.task.connection.BoundedBlockingPendingQueue;
+import
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.subtask.connector.PipeConnectorSubtask;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -41,7 +41,7 @@ public class PipeConnectorSubtaskExecutorTest extends
PipeSubtaskExecutorTest {
System.currentTimeMillis(),
"TestAttributeSortedString",
0,
- mock(BoundedBlockingPendingQueue.class),
+ mock(UnboundedBlockingPendingQueue.class),
mock(PipeConnector.class)));
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 12ef440b81b..fc437a3de97 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -189,9 +189,6 @@ public class CommonConfig {
private int pipeConnectorTransferTimeoutMs = 15 * 60 * 1000; // 15 minutes
private int pipeConnectorReadFileBufferSize = 8388608;
private long pipeConnectorRetryIntervalMs = 1000L;
- // recommend to set this value to 3 * pipeSubtaskExecutorMaxThreadNum *
- // pipeAsyncConnectorCoreClientNumber
- private int pipeConnectorPendingQueueSize = 256;
private boolean pipeConnectorRPCThriftCompressionEnabled = false;
private int pipeAsyncConnectorSelectorNumber = 4;
@@ -758,14 +755,6 @@ public class CommonConfig {
this.pipeConnectorRetryIntervalMs = pipeConnectorRetryIntervalMs;
}
- public int getPipeConnectorPendingQueueSize() {
- return pipeConnectorPendingQueueSize;
- }
-
- public void setPipeConnectorPendingQueueSize(int
pipeConnectorPendingQueueSize) {
- this.pipeConnectorPendingQueueSize = pipeConnectorPendingQueueSize;
- }
-
public int
getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount() {
return pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index e1ec6c03dbb..efb3420b62c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -354,13 +354,6 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_connector_retry_interval_ms",
String.valueOf(config.getPipeConnectorRetryIntervalMs())))));
- config.setPipeConnectorPendingQueueSize(
- Integer.parseInt(
-
Optional.ofNullable(properties.getProperty("pipe_sink_pending_queue_size"))
- .orElse(
- properties.getProperty(
- "pipe_connector_pending_queue_size",
-
String.valueOf(config.getPipeConnectorPendingQueueSize())))));
config.setPipeConnectorRPCThriftCompressionEnabled(
Boolean.parseBoolean(
Optional.ofNullable(properties.getProperty("pipe_sink_rpc_thrift_compression_enabled"))
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 1821fe74a7c..41331a822dd 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -111,10 +111,6 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeConnectorRetryIntervalMs();
}
- public int getPipeConnectorPendingQueueSize() {
- return COMMON_CONFIG.getPipeConnectorPendingQueueSize();
- }
-
public boolean isPipeConnectorRPCThriftCompressionEnabled() {
return COMMON_CONFIG.isPipeConnectorRPCThriftCompressionEnabled();
}
@@ -312,7 +308,6 @@ public class PipeConfig {
LOGGER.info("PipeConnectorTransferTimeoutMs: {}",
getPipeConnectorTransferTimeoutMs());
LOGGER.info("PipeConnectorReadFileBufferSize: {}",
getPipeConnectorReadFileBufferSize());
LOGGER.info("PipeConnectorRetryIntervalMs: {}",
getPipeConnectorRetryIntervalMs());
- LOGGER.info("PipeConnectorPendingQueueSize: {}",
getPipeConnectorPendingQueueSize());
LOGGER.info(
"PipeConnectorRPCThriftCompressionEnabled: {}",
isPipeConnectorRPCThriftCompressionEnabled());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 331c4d79a2f..c15fc4db189 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -53,6 +53,10 @@ public class PipeConnectorConstant {
public static final int CONNECTOR_IOTDB_PARALLEL_TASKS_DEFAULT_VALUE =
PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum();
+ public static final String CONNECTOR_REALTIME_FIRST_KEY =
"connector.realtime-first";
+ public static final String SINK_REALTIME_FIRST_KEY = "sink.realtime-first";
+ public static final boolean CONNECTOR_REALTIME_FIRST_DEFAULT_VALUE = false;
+
public static final String CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY =
"connector.batch.enable";
public static final String SINK_IOTDB_BATCH_MODE_ENABLE_KEY =
"sink.batch.enable";
public static final boolean CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE
= true;