This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5c7cb1ecfee Pipe: Fixed multiple bugs (#15674) (#15683)
5c7cb1ecfee is described below
commit 5c7cb1ecfee26cee32d47b83a68afe326dcc5538
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 9 19:46:06 2025 +0800
Pipe: Fixed multiple bugs (#15674) (#15683)
1. Doubled the TCP connection num
2. Made the memory for wal and batch fixed
3. Changed the default batch time to 20ms
4. Disabled the batch load for wal cache
5. Separated the tsFile and insertNode client
---------
Co-authored-by: luoluoyuyu <[email protected]>
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 ++
.../client/IoTDBDataNodeAsyncClientManager.java | 5 +-
.../evolvable/batch/PipeTabletEventBatch.java | 21 ++---
.../PipeRealtimeDataRegionHybridExtractor.java | 5 ++
.../PipeDataNodeRemainingEventAndTimeOperator.java | 3 +
.../dataregion/wal/utils/WALInsertNodeCache.java | 93 +++-------------------
.../wal/utils/WALInsertNodeCacheTest.java | 48 -----------
.../iotdb/commons/client/ClientPoolFactory.java | 27 +++++++
.../apache/iotdb/commons/conf/CommonConfig.java | 18 ++++-
.../iotdb/commons/enums/PipeRateAverage.java | 2 +
.../iotdb/commons/pipe/config/PipeConfig.java | 7 ++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 10 +++
.../config/constant/PipeConnectorConstant.java | 2 +-
14 files changed, 109 insertions(+), 148 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 8137959a391..b2af80301e6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -151,6 +151,8 @@ public class IoTDBConfig {
private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 /
10;
+ private long allocateMemoryPerWalCache = 2 * 1024 * 1024;
+
/** Flush proportion for system */
private double flushProportion = 0.4;
@@ -2002,6 +2004,14 @@ public class IoTDBConfig {
this.writeMemoryVariationReportProportion =
writeMemoryVariationReportProportion;
}
+ public long getAllocateMemoryPerWalCache() {
+ return allocateMemoryPerWalCache;
+ }
+
+ public void setAllocateMemoryPerWalCache(final long
allocateMemoryForWalCache) {
+ this.allocateMemoryPerWalCache = allocateMemoryForWalCache;
+ }
+
public boolean isEnablePartialInsert() {
return enablePartialInsert;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
old mode 100755
new mode 100644
index dfd854f724c..155aab7c5b5
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2458,6 +2458,12 @@ public class IoTDBDescriptor {
conf.setIotConsensusV2DeletionFileDir(
properties.getProperty(
"iot_consensus_v2_deletion_file_dir",
conf.getIotConsensusV2DeletionFileDir()));
+
+ conf.setAllocateMemoryPerWalCache(
+ Long.parseLong(
+ properties.getProperty(
+ "allocate_memory_per_wal_cache",
+ Long.toString(conf.getAllocateMemoryPerWalCache()))));
}
private void loadCQProps(TrimProperties properties) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index a1531343b04..15a044abae4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -116,7 +116,10 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
receiverAttributes,
new IClientManager.Factory<TEndPoint,
AsyncPipeDataTransferServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
+ isTSFileUsed
+ ? new ClientPoolFactory
+
.AsyncPipeTsFileDataTransferServiceClientPoolFactory()
+ : new
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
}
endPoint2Client =
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
index 91dade4b915..9a3d9f51785 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -22,7 +22,6 @@ package
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
-import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -48,7 +47,7 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
private long firstEventProcessingTime = Long.MIN_VALUE;
protected long totalBufferSize = 0;
- private final PipeDynamicMemoryBlock allocatedMemoryBlock;
+ private final PipeModelFixedMemoryBlock allocatedMemoryBlock;
protected volatile boolean isClosed = false;
@@ -61,8 +60,10 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
// limit in buffer size
this.allocatedMemoryBlock =
-
pipeModelFixedMemoryBlock.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes);
- allocatedMemoryBlock.setExpandable(false);
+ pipeModelFixedMemoryBlock =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForModelFixedMemoryBlock(
+ requestMaxBatchSizeInBytes, PipeMemoryBlockType.BATCH);
if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
LOGGER.info(
@@ -127,12 +128,8 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
throws WALPipeException, IOException;
public boolean shouldEmit() {
- final long diff = System.currentTimeMillis() - firstEventProcessingTime;
- if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
- allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double)
diff / maxDelayInMs);
- return true;
- }
- return false;
+ return totalBufferSize >= getMaxBatchSizeInBytes()
+ || System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
}
private long getMaxBatchSizeInBytes() {
@@ -200,9 +197,7 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
try {
pipeModelFixedMemoryBlock =
PipeDataNodeResourceManager.memory()
- .forceAllocateForModelFixedMemoryBlock(
-
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch(),
- PipeMemoryBlockType.BATCH);
+ .forceAllocateForModelFixedMemoryBlock(0L,
PipeMemoryBlockType.BATCH);
} catch (Exception e) {
LOGGER.error("init pipe model fixed memory block failed", e);
// If the allocation fails, we still need to create a default memory
block to avoid NPE.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 52de019f741..92e9ad64f5d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -168,6 +168,11 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
// 5. Data inserted in the step2 is not captured by
PipeB, and if its tsfile
// epoch's state is USING_TABLET, the tsfile event
will be ignored, which
// will cause the data loss in the tsfile epoch.
+ LOGGER.info(
+ "The tsFile {}'s epoch's start time {} is smaller than
the captured insertNodes' min time {}, will regard it as data loss or
un-sequential, will extract the tsFile",
+ ((PipeTsFileInsertionEvent)
event.getEvent()).getTsFile(),
+ ((PipeTsFileInsertionEvent)
event.getEvent()).getFileStartTime(),
+ event.getTsFileEpoch().getInsertNodeMinTime());
return TsFileEpoch.State.USING_BOTH;
} else {
// All data in the tsfile epoch has been extracted in
tablet mode, so we should
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 2de7e0053f8..47dc0ff18b9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -105,6 +105,9 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
}
double getRemainingInsertEventSmoothingCount() {
+ if (PipeConfig.getInstance().getPipeRemainingInsertNodeCountAverage() ==
PipeRateAverage.NONE) {
+ return insertNodeEventCount.get();
+ }
if (System.currentTimeMillis() - lastInsertNodeEventCountSmoothingTime
>=
PipeConfig.getInstance().getPipeRemainingInsertEventCountSmoothingIntervalSeconds())
{
insertNodeEventCountMeter.mark(insertNodeEventCount.get());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 7ca049698ec..46c3d962754 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.metric.overview.PipeWALInsertNodeCacheMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
-import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -49,13 +48,11 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
/** This cache is used by {@link WALEntryPosition}. */
public class WALInsertNodeCache {
@@ -68,11 +65,10 @@ public class WALInsertNodeCache {
private static PipeModelFixedMemoryBlock walModelFixedMemory = null;
- private final PipeDynamicMemoryBlock memoryBlock;
+ private final PipeModelFixedMemoryBlock memoryBlock;
// Used to adjust the memory usage of the cache
private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
- private final AtomicBoolean isBatchLoadEnabled = new AtomicBoolean(true);
// LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
private final LoadingCache<WALEntryPosition, Pair<ByteBuffer, InsertNode>>
lruCache;
@@ -86,19 +82,12 @@ public class WALInsertNodeCache {
init();
}
- final long requestedAllocateSize =
- (long)
- Math.min(
- 1.0
- * PIPE_CONFIG.getPipeMaxAllowedPinnedMemTableCount()
- * CONFIG.getWalFileSizeThresholdInByte()
- / CONFIG.getDataRegionNum(),
- 0.5
- *
MEMORY_CONFIG.getPipeMemoryManager().getTotalMemorySizeInBytes()
- / CONFIG.getDataRegionNum());
- memoryBlock =
walModelFixedMemory.registerPipeBatchMemoryBlock(requestedAllocateSize);
- isBatchLoadEnabled.set(
- memoryBlock.getMemoryUsageInBytes() >=
CONFIG.getWalFileSizeThresholdInByte());
+ final long requestedAllocateSize = CONFIG.getAllocateMemoryPerWalCache();
+
+ memoryBlock =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForModelFixedMemoryBlock(requestedAllocateSize,
PipeMemoryBlockType.WAL);
+
lruCache =
Caffeine.newBuilder()
.maximumWeight(requestedAllocateSize)
@@ -123,58 +112,9 @@ public class WALInsertNodeCache {
.recordStats()
.build(new WALInsertNodeCacheLoader());
- memoryBlock.setExpandable(true);
- memoryBlock.setExpand(
- memoryBlock -> {
- final long oldMemory = memoryBlock.getMemoryUsageInBytes();
-
memoryBlock.updateCurrentMemoryEfficiencyAdjustMem(lruCache.stats().hitRate());
- final long newMemory = memoryBlock.getMemoryUsageInBytes();
- if (newMemory > oldMemory) {
- setExpandCallback(oldMemory, newMemory, dataRegionId);
- } else if (newMemory < oldMemory) {
- shrinkCallback(oldMemory, newMemory, dataRegionId);
- }
- });
PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId);
}
- private void setExpandCallback(long oldMemory, long newMemory, Integer
dataRegionId) {
- memoryUsageCheatFactor.updateAndGet(
- factor ->
- factor == 0L || newMemory == 0L || oldMemory == 0
- ? 0.0
- : factor / ((double) newMemory / oldMemory));
- isBatchLoadEnabled.set(newMemory >=
CONFIG.getWalFileSizeThresholdInByte());
- LOGGER.info(
- "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has expanded
from {} to {}.",
- dataRegionId,
- oldMemory,
- newMemory);
- }
-
- private void shrinkCallback(long oldMemory, long newMemory, Integer
dataRegionId) {
- memoryUsageCheatFactor.updateAndGet(
- factor ->
- factor == 0L || newMemory == 0L || oldMemory == 0
- ? 0.0
- : factor * ((double) oldMemory / newMemory));
- isBatchLoadEnabled.set(newMemory >=
CONFIG.getWalFileSizeThresholdInByte());
- LOGGER.info(
- "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has shrunk
from {} to {}.",
- dataRegionId,
- oldMemory,
- newMemory);
- if (CONFIG.getWALCacheShrinkClearEnabled()) {
- try {
- lruCache.cleanUp();
- } catch (Exception e) {
- LOGGER.warn("Failed to clear WALInsertNodeCache for dataRegion ID:
{}.", dataRegionId, e);
- return;
- }
- LOGGER.info("Successfully cleared WALInsertNodeCache for dataRegion ID:
{}.", dataRegionId);
- }
- }
-
// please call this method at PipeLauncher
public static void init() {
if (walModelFixedMemory != null) {
@@ -184,9 +124,7 @@ public class WALInsertNodeCache {
// Allocate memory for the fixed memory block of WAL
walModelFixedMemory =
PipeDataNodeResourceManager.memory()
- .forceAllocateForModelFixedMemoryBlock(
-
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfWAL(),
- PipeMemoryBlockType.WAL);
+ .forceAllocateForModelFixedMemoryBlock(0L,
PipeMemoryBlockType.WAL);
} catch (Exception e) {
LOGGER.error("Failed to initialize WAL model fixed memory block", e);
walModelFixedMemory =
@@ -259,10 +197,7 @@ public class WALInsertNodeCache {
public Pair<ByteBuffer, InsertNode> getByteBufferOrInsertNode(final
WALEntryPosition position) {
hasPipeRunning = true;
- final Pair<ByteBuffer, InsertNode> pair =
- isBatchLoadEnabled.get()
- ? lruCache.getAll(Collections.singleton(position)).get(position)
- : lruCache.get(position);
+ final Pair<ByteBuffer, InsertNode> pair = lruCache.get(position);
if (pair == null) {
throw new IllegalStateException();
@@ -402,16 +337,6 @@ public class WALInsertNodeCache {
/////////////////////////// Test Only ///////////////////////////
- @TestOnly
- public boolean isBatchLoadEnabled() {
- return isBatchLoadEnabled.get();
- }
-
- @TestOnly
- public void setIsBatchLoadEnabled(final boolean isBatchLoadEnabled) {
- this.isBatchLoadEnabled.set(isBatchLoadEnabled);
- }
-
@TestOnly
boolean contains(WALEntryPosition position) {
return lruCache.getIfPresent(position) != null;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
index 10a23d89bd3..d1e20b4b2ef 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
@@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
public class WALInsertNodeCacheTest {
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
@@ -152,53 +151,6 @@ public class WALInsertNodeCacheTest {
assertEquals(node1, cache.getInsertNode(position));
}
- @Test
- public void testBatchLoad() throws Exception {
- // Enable batch load
- boolean oldIsBatchLoadEnabled = cache.isBatchLoadEnabled();
- cache.setIsBatchLoadEnabled(true);
- WALInsertNodeCache localC = cache;
- try {
- // write memTable1
- IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
- walNode.onMemTableCreated(memTable1, logDirectory + "/" +
"fake1.tsfile");
- InsertRowNode node1 = getInsertRowNode(System.currentTimeMillis());
- node1.setSearchIndex(1);
- WALFlushListener flushListener1 = walNode.log(memTable1.getMemTableId(),
node1);
- WALEntryPosition position1 =
flushListener1.getWalEntryHandler().getWalEntryPosition();
- InsertRowNode node2 = getInsertRowNode(System.currentTimeMillis());
- node1.setSearchIndex(2);
- WALFlushListener flushListener2 = walNode.log(memTable1.getMemTableId(),
node2);
- WALEntryPosition position2 =
flushListener2.getWalEntryHandler().getWalEntryPosition();
- // write memTable2
- IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
- walNode.onMemTableCreated(memTable2, logDirectory + "/" +
"fake2.tsfile");
- InsertRowNode node3 = getInsertRowNode(System.currentTimeMillis());
- node1.setSearchIndex(3);
- WALFlushListener flushListener3 = walNode.log(memTable2.getMemTableId(),
node3);
- WALEntryPosition position3 =
flushListener3.getWalEntryHandler().getWalEntryPosition();
- // wait until wal flushed
- walNode.rollWALFile();
- Awaitility.await().until(() -> walNode.isAllWALEntriesConsumed() &&
position3.canRead());
- // check batch load memTable1
- cache.clear();
- cache.addMemTable(memTable1.getMemTableId());
- assertEquals(node1, cache.getInsertNode(position1));
- assertTrue(cache.contains(position1));
- assertTrue(cache.contains(position2));
- assertFalse(cache.contains(position3));
- // check batch load none
- cache.removeMemTable(memTable1.getMemTableId());
- cache.clear();
- assertEquals(node1, cache.getInsertNode(position1));
- assertTrue(cache.contains(position1));
- assertFalse(cache.contains(position2));
- assertFalse(cache.contains(position3));
- } finally {
-
WALInsertNodeCache.getInstance(1).setIsBatchLoadEnabled(oldIsBatchLoadEnabled);
- }
- }
-
private InsertRowNode getInsertRowNode(long time) throws
IllegalPathException {
TSDataType[] dataTypes =
new TSDataType[] {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index fad0efa75b8..32c6345dc27 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -298,6 +298,33 @@ public class ClientPoolFactory {
}
}
+ public static class AsyncPipeTsFileDataTransferServiceClientPoolFactory
+ implements IClientPoolFactory<TEndPoint,
AsyncPipeDataTransferServiceClient> {
+ @Override
+ public GenericKeyedObjectPool<TEndPoint,
AsyncPipeDataTransferServiceClient> createClientPool(
+ ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> manager) {
+ final GenericKeyedObjectPool<TEndPoint,
AsyncPipeDataTransferServiceClient> clientPool =
+ new GenericKeyedObjectPool<>(
+ new AsyncPipeDataTransferServiceClient.Factory(
+ manager,
+ new ThriftClientProperty.Builder()
+
.setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs())
+ .setRpcThriftCompressionEnabled(
+ conf.isPipeConnectorRPCThriftCompressionEnabled())
+ .setSelectorNumOfAsyncClientManager(
+ conf.getPipeAsyncConnectorSelectorNumber())
+ .build(),
+ ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
+ new
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
+
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxTsFileClientNumber())
+ .build()
+ .getConfig());
+ ClientManagerMetrics.getInstance()
+ .registerClientManager(this.getClass().getSimpleName(), clientPool);
+ return clientPool;
+ }
+ }
+
public static class AsyncAINodeHeartbeatServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint, AsyncAINodeServiceClient> {
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 6c330991f78..839e2df2022 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -250,7 +250,9 @@ public class CommonConfig {
private int pipeAsyncConnectorSelectorNumber =
Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
private int pipeAsyncConnectorMaxClientNumber =
- Math.max(16, Runtime.getRuntime().availableProcessors() / 2);
+ Math.max(32, Runtime.getRuntime().availableProcessors() * 2);
+ private int pipeAsyncConnectorMaxTsFileClientNumber =
+ Math.max(16, Runtime.getRuntime().availableProcessors());
private double pipeAllSinksRateLimitBytesPerSecond = -1;
private int rateLimiterHotReloadCheckIntervalMs = 1000;
@@ -1132,6 +1134,20 @@ public class CommonConfig {
"pipeAsyncConnectorMaxClientNumber is set to {}.",
pipeAsyncConnectorMaxClientNumber);
}
+ public int getPipeAsyncConnectorMaxTsFileClientNumber() {
+ return pipeAsyncConnectorMaxTsFileClientNumber;
+ }
+
+ public void setPipeAsyncConnectorMaxTsFileClientNumber(
+ int pipeAsyncConnectorMaxTsFileClientNumber) {
+ if (this.pipeAsyncConnectorMaxTsFileClientNumber ==
pipeAsyncConnectorMaxTsFileClientNumber) {
+ return;
+ }
+ this.pipeAsyncConnectorMaxTsFileClientNumber =
pipeAsyncConnectorMaxTsFileClientNumber;
+ logger.info(
+ "pipeAsyncConnectorMaxClientNumber is set to {}.",
pipeAsyncConnectorMaxTsFileClientNumber);
+ }
+
public boolean isSeperatedPipeHeartbeatEnabled() {
return isSeperatedPipeHeartbeatEnabled;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
index 2a08d2f9608..1c90df1bb07 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.enums;
import com.codahale.metrics.Meter;
public enum PipeRateAverage {
+ NONE,
ONE_MINUTE,
FIVE_MINUTES,
FIFTEEN_MINUTES,
@@ -37,6 +38,7 @@ public enum PipeRateAverage {
return meter.getFifteenMinuteRate();
case MEAN:
return meter.getMeanRate();
+ case NONE:
default:
throw new UnsupportedOperationException(
String.format("The type %s is not supported in pipe rate
average.", this));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index fb05e63a666..cb822bb28df 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -211,6 +211,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber();
}
+ public int getPipeAsyncConnectorMaxTsFileClientNumber() {
+ return COMMON_CONFIG.getPipeAsyncConnectorMaxTsFileClientNumber();
+ }
+
public double getPipeAllConnectorsRateLimitBytesPerSecond() {
return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
}
@@ -578,6 +582,9 @@ public class PipeConfig {
getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());
LOGGER.info("PipeAsyncConnectorSelectorNumber: {}",
getPipeAsyncConnectorSelectorNumber());
LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}",
getPipeAsyncConnectorMaxClientNumber());
+ LOGGER.info(
+ "PipeAsyncConnectorMaxTsFileClientNumber: {}",
+ getPipeAsyncConnectorMaxTsFileClientNumber());
LOGGER.info(
"PipeAllConnectorsRateLimitBytesPerSecond: {}",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 683819224aa..1849209d651 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -628,6 +628,16 @@ public class PipeDescriptor {
config.setPipeAsyncConnectorMaxClientNumber(Integer.parseInt(value));
}
+ value =
+ parserPipeConfig(
+ properties,
+ "pipe_sink_max_tsfile_client_number",
+ "pipe_async_connector_max_tsfile_client_number",
+ isHotModify);
+ if (value != null) {
+
config.setPipeAsyncConnectorMaxTsFileClientNumber(Integer.parseInt(value));
+ }
+
value = parserPipeConfig(properties,
"pipe_all_sinks_rate_limit_bytes_per_second", isHotModify);
if (value != null) {
config.setPipeAllSinksRateLimitBytesPerSecond(Double.parseDouble(value));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index e09eadeef7a..a6c491bbea8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -71,7 +71,7 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY =
"connector.batch.max-delay-ms";
public static final String SINK_IOTDB_BATCH_DELAY_MS_KEY =
"sink.batch.max-delay-ms";
- public static final int CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE = 200;
+ public static final int CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE = 20;
public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY =
"connector.batch.size-bytes";
public static final String SINK_IOTDB_BATCH_SIZE_KEY =
"sink.batch.size-bytes";