This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3250550f9a6 [IOTDB-6154] Pipe: better algorithm for hybrid mode to
switch log/tsfile extraction in iotdb-extractor (#11142)
3250550f9a6 is described below
commit 3250550f9a6176859bde1c4dc34020825efbe577
Author: Itami Sho <[email protected]>
AuthorDate: Mon Sep 25 11:00:33 2023 +0800
[IOTDB-6154] Pipe: better algorithm for hybrid mode to switch log/tsfile
extraction in iotdb-extractor (#11142)
In the following 3 cases, we should not extract any more tablet events. all
the data represented by the tablet events should be carried by the following
tsfile event:
1. The historical extractor has not consumed all the data.
2. HybridExtractor will first try to do extraction in log mode, and then
choose log or tsfile mode to continue extracting, but if all wal size > maximum
size of wal allowed, the write operation will be throttled, so we should not
extract any more tablet events.
3. The number of tsfile events in the pending queue has exceeded the limit.
---------
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../apache/iotdb/confignode/manager/IManager.java | 2 +-
.../iotdb/db/pipe/agent/task/PipeTaskAgent.java | 4 ++
.../PipeRealtimeDataRegionHybridExtractor.java | 35 ++++++++++---
.../apache/iotdb/db/pipe/task/PipeTaskManager.java | 30 ++++++++++-
.../connection/UnboundedBlockingPendingQueue.java | 60 +++++++++++++++++++++-
.../apache/iotdb/commons/conf/CommonConfig.java | 19 ++-----
.../iotdb/commons/conf/CommonDescriptor.java | 11 ++--
.../iotdb/commons/pipe/config/PipeConfig.java | 11 ++--
8 files changed, 130 insertions(+), 42 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 97cb2dc0ec5..877ddb0255c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -604,7 +604,7 @@ public interface IManager {
TGetAllPipeInfoResp getAllPipeInfo();
/**
- * Get RegionId。used for Show cluster slots information in
+ * Get RegionId. used for Show cluster slots information in
* docs/zh/UserGuide/Cluster/Cluster-Maintenance.md.
*
* @return TGetRegionIdResp.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
index 34e99e09fb5..a0cc1350949 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeTaskAgent.java
@@ -117,6 +117,10 @@ public class PipeTaskAgent {
////////////////////////// Pipe Task Management Entry
//////////////////////////
+ public int getLeaderDataRegionCount() {
+ return pipeTaskManager.getLeaderDataRegionCount();
+ }
+
public synchronized TPushPipeMetaRespExceptionMessage
handleSinglePipeMetaChanges(
PipeMeta pipeMetaFromConfigNode) {
acquireWriteLock();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
index c39f58a8874..269e9ae1d6c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.pipe.extractor.realtime;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
@@ -37,6 +39,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
private static final Logger LOGGER =
LoggerFactory.getLogger(PipeRealtimeDataRegionHybridExtractor.class);
+ private volatile boolean isStartedToSupply = false;
+
@Override
protected void doExtract(PipeRealtimeEvent event) {
final Event eventToExtract = event.getEvent();
@@ -66,10 +70,17 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
}
private void extractTabletInsertion(PipeRealtimeEvent event) {
- if (isApproachingCapacity()) {
- // if the pending queue is approaching capacity, we should not extract
any more tablet events.
- // all the data represented by the tablet events should be carried by
the following tsfile
- // event.
+ if (!isStartedToSupply
+ || mayWalSizeReachThrottleThreshold()
+ || isTsFileEventCountInQueueExceededLimit()) {
+ // In the following 3 cases, we should not extract any more tablet
events. all the data
+ // represented by the tablet events should be carried by the following
tsfile event:
+ // 1. The historical extractor has not consumed all the data.
+ // 2. HybridExtractor will first try to do extraction in log mode, and
then choose log or
+ // tsfile mode to continue extracting, but if (leader data regions num
* Wal size) > (maximum
+ // size of wal buffer), the write operation will be throttled, so we
should not extract any
+ // more tablet events.
+ // 3. The number of tsfile events in the pending queue has exceeded the
limit.
event.getTsFileEpoch().migrateState(this, state ->
TsFileEpoch.State.USING_TSFILE);
}
@@ -183,13 +194,23 @@ public class PipeRealtimeDataRegionHybridExtractor
extends PipeRealtimeDataRegio
}
}
- private boolean isApproachingCapacity() {
- return pendingQueue.size()
- >= PipeConfig.getInstance().getPipeExtractorPendingQueueTabletLimit();
+ private boolean mayWalSizeReachThrottleThreshold() {
+ final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ // Assume that the max data replica factor in common config is 3.
+ // This can be changed in the future.
+ return 3L * PipeAgent.task().getLeaderDataRegionCount() *
config.getWalBufferSize()
+ > config.getThrottleThreshold();
+ }
+
+ private boolean isTsFileEventCountInQueueExceededLimit() {
+ return pendingQueue.getTsfileInsertionEventCount()
+ >= PipeConfig.getInstance().getPipeExtractorPendingQueueTsFileLimit();
}
@Override
public Event supply() {
+ isStartedToSupply = true;
+
PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent)
pendingQueue.directPoll();
while (realtimeEvent != null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskManager.java
index 016b2537f9a..6b91ae46efe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/PipeTaskManager.java
@@ -29,16 +29,33 @@ public class PipeTaskManager {
private final Map<PipeStaticMeta, Map<TConsensusGroupId, PipeTask>> pipeMap
= new HashMap<>();
+ /**
+ * Leader data region count in this data node. We simply update it when
adding pipe task but not
+ * remove it when removing pipe task. So it may be larger than the actual
leader data region count
+ * in this data node.
+ */
+ private volatile int leaderDataRegionCount = 0;
+
/** Add pipe task by pipe static meta and consensus group id. */
public synchronized void addPipeTask(
PipeStaticMeta pipeStaticMeta, TConsensusGroupId consensusGroupId,
PipeTask pipeTask) {
- pipeMap.computeIfAbsent(pipeStaticMeta, k -> new
HashMap<>()).put(consensusGroupId, pipeTask);
+ final Map<TConsensusGroupId, PipeTask> dataRegionId2PipeTask =
+ pipeMap.computeIfAbsent(pipeStaticMeta, k -> new HashMap<>());
+ dataRegionId2PipeTask.put(consensusGroupId, pipeTask);
+
+ // update leader data region count
+ leaderDataRegionCount = Math.max(leaderDataRegionCount,
dataRegionId2PipeTask.size());
}
/** Add pipe tasks by pipe static meta. */
public synchronized void addPipeTasks(
PipeStaticMeta pipeStaticMeta, Map<TConsensusGroupId, PipeTask>
pipeTasks) {
- pipeMap.computeIfAbsent(pipeStaticMeta, k -> new
HashMap<>()).putAll(pipeTasks);
+ final Map<TConsensusGroupId, PipeTask> dataRegionId2PipeTask =
+ pipeMap.computeIfAbsent(pipeStaticMeta, k -> new HashMap<>());
+ dataRegionId2PipeTask.putAll(pipeTasks);
+
+ // update leader data region count
+ leaderDataRegionCount = Math.max(leaderDataRegionCount,
dataRegionId2PipeTask.size());
}
/**
@@ -93,4 +110,13 @@ public class PipeTaskManager {
public synchronized Map<TConsensusGroupId, PipeTask>
getPipeTasks(PipeStaticMeta pipeStaticMeta) {
return pipeMap.get(pipeStaticMeta);
}
+
+ /**
+ * Get leader data region count in this data node.
+ *
+ * @return leader data region count
+ */
+ public int getLeaderDataRegionCount() {
+ return leaderDataRegionCount;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
index b3bdd7b9324..7a1dce27548 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/task/connection/UnboundedBlockingPendingQueue.java
@@ -20,24 +20,80 @@
package org.apache.iotdb.db.pipe.task.connection;
import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.atomic.AtomicInteger;
public class UnboundedBlockingPendingQueue<E extends Event> extends
BlockingPendingQueue<E> {
private final BlockingDeque<E> pendingDeque;
+ private final AtomicInteger tsfileInsertionEventCount;
+
public UnboundedBlockingPendingQueue() {
super(new LinkedBlockingDeque<>());
pendingDeque = (BlockingDeque<E>) pendingQueue;
+ tsfileInsertionEventCount = new AtomicInteger(0);
+ }
+
+ @Override
+ public boolean waitedOffer(E event) {
+ final boolean offered = super.waitedOffer(event);
+ if (offered && event instanceof TsFileInsertionEvent) {
+ tsfileInsertionEventCount.incrementAndGet();
+ }
+ return offered;
+ }
+
+ @Override
+ public boolean directOffer(E event) {
+ final boolean offered = super.directOffer(event);
+ if (offered && event instanceof TsFileInsertionEvent) {
+ tsfileInsertionEventCount.incrementAndGet();
+ }
+ return offered;
+ }
+
+ @Override
+ public boolean put(E event) {
+ final boolean putSuccessfully = super.put(event);
+ if (putSuccessfully && event instanceof TsFileInsertionEvent) {
+ tsfileInsertionEventCount.incrementAndGet();
+ }
+ return putSuccessfully;
+ }
+
+ @Override
+ public E directPoll() {
+ final E event = super.directPoll();
+ if (event instanceof TsFileInsertionEvent) {
+ tsfileInsertionEventCount.decrementAndGet();
+ }
+ return event;
+ }
+
+ @Override
+ public E waitedPoll() {
+ final E event = super.waitedPoll();
+ if (event instanceof TsFileInsertionEvent) {
+ tsfileInsertionEventCount.decrementAndGet();
+ }
+ return event;
+ }
+
+ @Override
+ public void clear() {
+ super.clear();
+ tsfileInsertionEventCount.set(0);
}
public E peekLast() {
return pendingDeque.peekLast();
}
- public E removeLast() {
- return pendingDeque.removeLast();
+ public int getTsfileInsertionEventCount() {
+ return tsfileInsertionEventCount.get();
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 8f5eab4f5d3..c96a70d07df 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -164,8 +164,7 @@ public class CommonConfig {
private int pipeExtractorAssignerDisruptorRingBufferSize = 65536;
private int pipeExtractorMatcherCacheSize = 1024;
- private int pipeExtractorPendingQueueCapacity = 256;
- private int pipeExtractorPendingQueueTabletLimit =
pipeExtractorPendingQueueCapacity / 2;
+ private int pipeExtractorPendingQueueTsFileLimit = 3;
private int pipeDataStructureTabletRowSize = 2048;
private long pipeConnectorTimeoutMs = 15 * 60 * 1000L; // 15 minutes
@@ -546,20 +545,12 @@ public class CommonConfig {
this.pipeExtractorMatcherCacheSize = pipeExtractorMatcherCacheSize;
}
- public int getPipeExtractorPendingQueueCapacity() {
- return pipeExtractorPendingQueueCapacity;
+ public int getPipeExtractorPendingQueueTsFileLimit() {
+ return pipeExtractorPendingQueueTsFileLimit;
}
- public void setPipeExtractorPendingQueueCapacity(int
pipeExtractorPendingQueueCapacity) {
- this.pipeExtractorPendingQueueCapacity = pipeExtractorPendingQueueCapacity;
- }
-
- public int getPipeExtractorPendingQueueTabletLimit() {
- return pipeExtractorPendingQueueTabletLimit;
- }
-
- public void setPipeExtractorPendingQueueTabletLimit(int
pipeExtractorPendingQueueTabletLimit) {
- this.pipeExtractorPendingQueueTabletLimit =
pipeExtractorPendingQueueTabletLimit;
+ public void setPipeExtractorPendingQueueTsFileLimit(int
pipeExtractorPendingQueueTsfileLimit) {
+ this.pipeExtractorPendingQueueTsFileLimit =
pipeExtractorPendingQueueTsfileLimit;
}
public long getPipeConnectorTimeoutMs() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 69d20269788..ab4be8eacfc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -311,16 +311,11 @@ public class CommonDescriptor {
properties.getProperty(
"pipe_extractor_matcher_cache_size",
String.valueOf(config.getPipeExtractorMatcherCacheSize()))));
- config.setPipeExtractorPendingQueueCapacity(
+ config.setPipeExtractorPendingQueueTsFileLimit(
Integer.parseInt(
properties.getProperty(
- "pipe_extractor_pending_queue_capacity",
-
String.valueOf(config.getPipeExtractorPendingQueueCapacity()))));
- config.setPipeExtractorPendingQueueTabletLimit(
- Integer.parseInt(
- properties.getProperty(
- "pipe_extractor_pending_queue_tablet_limit",
-
String.valueOf(config.getPipeExtractorPendingQueueTabletLimit()))));
+ "pipe_extractor_pending_queue_tsfile_limit",
+
String.valueOf(config.getPipeExtractorPendingQueueTsFileLimit()))));
config.setPipeConnectorTimeoutMs(
Long.parseLong(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 4b46ac5e0fa..f19f948d6cc 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -81,12 +81,8 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeExtractorMatcherCacheSize();
}
- public int getPipeExtractorPendingQueueCapacity() {
- return COMMON_CONFIG.getPipeExtractorPendingQueueCapacity();
- }
-
- public int getPipeExtractorPendingQueueTabletLimit() {
- return COMMON_CONFIG.getPipeExtractorPendingQueueTabletLimit();
+ public int getPipeExtractorPendingQueueTsFileLimit() {
+ return COMMON_CONFIG.getPipeExtractorPendingQueueTsFileLimit();
}
/////////////////////////////// Connector ///////////////////////////////
@@ -186,9 +182,8 @@ public class PipeConfig {
"PipeExtractorAssignerDisruptorRingBufferSize: {}",
getPipeExtractorAssignerDisruptorRingBufferSize());
LOGGER.info("PipeExtractorMatcherCacheSize: {}",
getPipeExtractorMatcherCacheSize());
- LOGGER.info("PipeExtractorPendingQueueCapacity: {}",
getPipeExtractorPendingQueueCapacity());
LOGGER.info(
- "PipeExtractorPendingQueueTabletLimit: {}",
getPipeExtractorPendingQueueTabletLimit());
+ "PipeExtractorPendingQueueTsFileLimit: {}",
getPipeExtractorPendingQueueTsFileLimit());
LOGGER.info("PipeConnectorTimeoutMs: {}", getPipeConnectorTimeoutMs());
LOGGER.info("PipeConnectorReadFileBufferSize: {}",
getPipeConnectorReadFileBufferSize());