This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2ee7675ce10 Pipe: make hybrid extractor aware of event collector's
queue size & block processor subtask's execution when events buffered in
EventCollector (#11283)
2ee7675ce10 is described below
commit 2ee7675ce101fe4a05aa6770fb34e15cc627653e
Author: Zikun Ma <[email protected]>
AuthorDate: Thu Oct 12 16:36:26 2023 +0800
Pipe: make hybrid extractor aware of event collector's queue size & block
processor subtask's execution when events buffered in EventCollector (#11283)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../event/common/heartbeat/PipeHeartbeatEvent.java | 18 ++++++++++++-
.../PipeRealtimeDataRegionHybridExtractor.java | 31 ++++++++++++++++------
.../pipe/resource/wal/PipeWALResourceManager.java | 4 +++
.../pipe/task/connection/BlockingPendingQueue.java | 4 +++
.../pipe/task/connection/PipeEventCollector.java | 4 +++
.../subtask/processor/PipeProcessorSubtask.java | 12 ++++++---
.../apache/iotdb/commons/conf/CommonConfig.java | 23 +++++++++-------
.../iotdb/commons/conf/CommonDescriptor.java | 11 ++++----
.../iotdb/commons/pipe/config/PipeConfig.java | 16 ++++++-----
9 files changed, 89 insertions(+), 34 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
index fd6724088b0..abbfdd8c24d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.event.EnrichedEvent;
+import
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionExtractor;
+import
org.apache.iotdb.db.pipe.extractor.realtime.PipeRealtimeDataRegionHybridExtractor;
import org.apache.iotdb.db.pipe.task.connection.BoundedBlockingPendingQueue;
import org.apache.iotdb.db.pipe.task.connection.EnrichedDeque;
import org.apache.iotdb.db.pipe.task.connection.UnboundedBlockingPendingQueue;
@@ -39,6 +41,7 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
private final String dataRegionId;
private String pipeName;
+ private PipeRealtimeDataRegionExtractor extractor = null;
private long timePublished;
private long timeAssigned;
@@ -164,9 +167,14 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
public void recordBufferQueueSize(EnrichedDeque<Event> bufferQueue) {
if (shouldPrintMessage) {
bufferQueueTabletSize = bufferQueue.getTabletInsertionEventCount();
- bufferQueueTsFileSize = bufferQueue.getTsFileInsertionEventCount();
bufferQueueSize = bufferQueue.size();
}
+
+ bufferQueueTsFileSize = bufferQueue.getTsFileInsertionEventCount();
+ if (extractor instanceof PipeRealtimeDataRegionHybridExtractor) {
+ ((PipeRealtimeDataRegionHybridExtractor) extractor)
+ .informEventCollectorQueueTsFileSize(bufferQueueTsFileSize);
+ }
}
public void recordConnectorQueueSize(BoundedBlockingPendingQueue<Event>
pendingQueue) {
@@ -177,6 +185,14 @@ public class PipeHeartbeatEvent extends EnrichedEvent {
}
}
+ /////////////////////////////// For Hybrid extractor
///////////////////////////////
+
+ public void bindExtractor(PipeRealtimeDataRegionExtractor extractor) {
+ this.extractor = extractor;
+ }
+
+ /////////////////////////////// Object ///////////////////////////////
+
@Override
public String toString() {
final String unknownMessage = "Unknown";
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 1ea661eb90c..a452577858f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -21,12 +21,13 @@ package org.apache.iotdb.db.pipe.extractor.realtime;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
-import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.realtime.epoch.TsFileEpoch;
+import org.apache.iotdb.db.pipe.resource.PipeResourceManager;
+import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -34,12 +35,15 @@ import
org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegionExtractor {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
private volatile boolean isStartedToSupply = false;
+ private final AtomicInteger eventCollectorQueueTsFileSize = new
AtomicInteger(0);
@Override
protected void doExtract(PipeRealtimeEvent event) {
@@ -72,6 +76,7 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
private void extractTabletInsertion(PipeRealtimeEvent event) {
if (!isStartedToSupply
|| mayWalSizeReachThrottleThreshold()
+ || tooManyWALPinned()
|| isTsFileEventCountInQueueExceededLimit()) {
// In the following 3 cases, we should not extract any more tablet
events. all the data
// represented by the tablet events should be carried by the following
tsfile event:
@@ -159,6 +164,9 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
}
private void extractHeartbeat(PipeRealtimeEvent event) {
+ // Bind extractor so that the heartbeat event can later inform the
extractor of queue size
+ ((PipeHeartbeatEvent) event.getEvent()).bindExtractor(this);
+
// Record the pending queue size before trying to put heartbeatEvent into
queue
((PipeHeartbeatEvent)
event.getEvent()).recordExtractorQueueSize(pendingQueue);
@@ -192,16 +200,23 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
}
private boolean mayWalSizeReachThrottleThreshold() {
- final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- // Assume that the max data replica factor in common config is 3.
- // This can be changed in the future.
- return 3L * PipeAgent.task().getLeaderDataRegionCount() *
config.getWalBufferSize()
- > config.getThrottleThreshold();
+ return 3 * WALManager.getInstance().getTotalDiskUsage()
+ > IoTDBDescriptor.getInstance().getConfig().getThrottleThreshold();
+ }
+
+ private boolean tooManyWALPinned() {
+ return PipeResourceManager.wal().getApproximatePinnedWALCount()
+ > Math.max(1, PipeAgent.task().getLeaderDataRegionCount())
+ *
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
}
private boolean isTsFileEventCountInQueueExceededLimit() {
- return pendingQueue.getTsFileInsertionEventCount()
- >= PipeConfig.getInstance().getPipeExtractorPendingQueueTsFileLimit();
+ return pendingQueue.getTsFileInsertionEventCount() +
eventCollectorQueueTsFileSize.get()
+ >=
PipeConfig.getInstance().getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
+ }
+
+ public void informEventCollectorQueueTsFileSize(int queueSize) {
+ eventCollectorQueueTsFileSize.set(queueSize);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
index afbe6931533..f8202372554 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/wal/PipeWALResourceManager.java
@@ -87,6 +87,10 @@ public abstract class PipeWALResourceManager {
TimeUnit.MILLISECONDS);
}
+ public int getApproximatePinnedWALCount() {
+ return memtableIdToPipeWALResourceMap.size();
+ }
+
public final void pin(final WALEntryHandler walEntryHandler) throws
IOException {
final long memtableId = walEntryHandler.getMemTableId();
final ReentrantLock lock = memtableIdSegmentLocks[(int) (memtableId %
SEGMENT_LOCK_COUNT)];
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
index 035d7ded94d..51d5435ac01 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/BlockingPendingQueue.java
@@ -133,6 +133,10 @@ public abstract class BlockingPendingQueue<E extends
Event> {
pendingQueue.forEach(action);
}
+ public boolean isEmpty() {
+ return pendingQueue.isEmpty();
+ }
+
public int size() {
return pendingQueue.size();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
index 428a3b38d15..a341f8f9fa0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/PipeEventCollector.java
@@ -73,6 +73,10 @@ public class PipeEventCollector implements EventCollector,
AutoCloseable {
}
}
+ public boolean isBufferQueueEmpty() {
+ return bufferQueue.isEmpty();
+ }
+
/**
* Try to collect buffered events into pending queue.
*
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
index 2814a40b5ae..b5caed5245f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/subtask/processor/PipeProcessorSubtask.java
@@ -88,10 +88,14 @@ public class PipeProcessorSubtask extends PipeSubtask {
final Event event = lastEvent != null ? lastEvent :
inputEventSupplier.supply();
// Record the last event for retry when exception occurs
setLastEvent(event);
- if (event == null) {
- // Though there is no event to process, there may still be some buffered
events
- // in the outputEventCollector. Return true if there are still buffered
events,
- // false otherwise.
+ if (
+ // Though there is no event to process, there may still be some buffered
events
+ // in the outputEventCollector. Return true if there are still buffered
events,
+ // false otherwise.
+ event == null
+ // If there are still buffered events, process them first, the newly
supplied
+ // event will be processed in the next round.
+ || !outputEventCollector.isBufferQueueEmpty()) {
return outputEventCollector.tryCollectBufferedEvents();
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index c96a70d07df..ccd3e8e8c90 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -158,14 +158,14 @@ public class CommonConfig {
private int pipeSubtaskExecutorMaxThreadNum =
Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2));
+ private int pipeDataStructureTabletRowSize = 2048;
+
private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount =
10_000;
private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 *
1000L;
private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000;
private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
private int pipeExtractorMatcherCacheSize = 1024;
- private int pipeExtractorPendingQueueTsFileLimit = 3;
- private int pipeDataStructureTabletRowSize = 2048;
private long pipeConnectorTimeoutMs = 15 * 60 * 1000L; // 15 minutes
private int pipeConnectorReadFileBufferSize = 8388608;
@@ -187,6 +187,8 @@ public class CommonConfig {
private boolean pipeAirGapReceiverEnabled = false;
private int pipeAirGapReceiverPort = 9780;
+ private int pipeMaxAllowedPendingTsFileEpochPerDataRegion = 3;
+
/** Whether to use persistent schema mode. */
private String schemaEngineMode = "Memory";
@@ -545,14 +547,6 @@ public class CommonConfig {
this.pipeExtractorMatcherCacheSize = pipeExtractorMatcherCacheSize;
}
- public int getPipeExtractorPendingQueueTsFileLimit() {
- return pipeExtractorPendingQueueTsFileLimit;
- }
-
- public void setPipeExtractorPendingQueueTsFileLimit(int
pipeExtractorPendingQueueTsfileLimit) {
- this.pipeExtractorPendingQueueTsFileLimit =
pipeExtractorPendingQueueTsfileLimit;
- }
-
public long getPipeConnectorTimeoutMs() {
return pipeConnectorTimeoutMs;
}
@@ -727,6 +721,15 @@ public class CommonConfig {
return pipeAirGapReceiverPort;
}
+ public int getPipeMaxAllowedPendingTsFileEpochPerDataRegion() {
+ return pipeMaxAllowedPendingTsFileEpochPerDataRegion;
+ }
+
+ public void setPipeMaxAllowedPendingTsFileEpochPerDataRegion(
+ int pipeExtractorPendingQueueTsfileLimit) {
+ this.pipeMaxAllowedPendingTsFileEpochPerDataRegion =
pipeExtractorPendingQueueTsfileLimit;
+ }
+
public String getSchemaEngineMode() {
return schemaEngineMode;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index ab4be8eacfc..7ea9f561d0a 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -311,11 +311,6 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_extractor_matcher_cache_size",
String.valueOf(config.getPipeExtractorMatcherCacheSize()))));
- config.setPipeExtractorPendingQueueTsFileLimit(
- Integer.parseInt(
- properties.getProperty(
- "pipe_extractor_pending_queue_tsfile_limit",
-
String.valueOf(config.getPipeExtractorPendingQueueTsFileLimit()))));
config.setPipeConnectorTimeoutMs(
Long.parseLong(
@@ -398,6 +393,12 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_air_gap_receiver_port",
Integer.toString(config.getPipeAirGapReceiverPort()))));
+
+ config.setPipeMaxAllowedPendingTsFileEpochPerDataRegion(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_max_allowed_pending_tsfile_epoch_per_data_region",
+
String.valueOf(config.getPipeMaxAllowedPendingTsFileEpochPerDataRegion()))));
}
public void loadGlobalConfig(TGlobalConfig globalConfig) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index f19f948d6cc..f44dfb60bac 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -81,10 +81,6 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeExtractorMatcherCacheSize();
}
- public int getPipeExtractorPendingQueueTsFileLimit() {
- return COMMON_CONFIG.getPipeExtractorPendingQueueTsFileLimit();
- }
-
/////////////////////////////// Connector ///////////////////////////////
public long getPipeConnectorTimeoutMs() {
@@ -155,6 +151,12 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeAirGapReceiverPort();
}
+ /////////////////////////////// Hybrid Mode ///////////////////////////////
+
+ public int getPipeMaxAllowedPendingTsFileEpochPerDataRegion() {
+ return COMMON_CONFIG.getPipeMaxAllowedPendingTsFileEpochPerDataRegion();
+ }
+
/////////////////////////////// Utils ///////////////////////////////
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConfig.class);
@@ -182,8 +184,6 @@ public class PipeConfig {
"PipeExtractorAssignerDisruptorRingBufferSize: {}",
getPipeExtractorAssignerDisruptorRingBufferSize());
LOGGER.info("PipeExtractorMatcherCacheSize: {}",
getPipeExtractorMatcherCacheSize());
- LOGGER.info(
- "PipeExtractorPendingQueueTsFileLimit: {}",
getPipeExtractorPendingQueueTsFileLimit());
LOGGER.info("PipeConnectorTimeoutMs: {}", getPipeConnectorTimeoutMs());
LOGGER.info("PipeConnectorReadFileBufferSize: {}",
getPipeConnectorReadFileBufferSize());
@@ -211,6 +211,10 @@ public class PipeConfig {
LOGGER.info("PipeAirGapReceiverEnabled: {}",
getPipeAirGapReceiverEnabled());
LOGGER.info("PipeAirGapReceiverPort: {}", getPipeAirGapReceiverPort());
+
+ LOGGER.info(
+ "PipeMaxAllowedPendingTsFileEpochPerDataRegion: {}",
+ getPipeMaxAllowedPendingTsFileEpochPerDataRegion());
}
/////////////////////////////// Singleton ///////////////////////////////