This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 1309c9e0ab5 Pipe: Fixed multiple bugs (#15674)
1309c9e0ab5 is described below
commit 1309c9e0ab52467ff48c1ea9cd70ec8e1c6e03ef
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 9 19:35:17 2025 +0800
Pipe: Fixed multiple bugs (#15674)
1. Doubled the TCP connection num
2. Made the memory for wal and batch fixed
3. Changed the default batch time to 20ms
4. Disabled the batch load for wal cache
5. Separated the tsFile and insertNode client
---------
Co-authored-by: luoluoyuyu <[email protected]>
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 5 ++
.../client/IoTDBDataNodeAsyncClientManager.java | 5 +-
.../evolvable/batch/PipeTabletEventBatch.java | 21 ++---
.../PipeRealtimeDataRegionHybridExtractor.java | 5 ++
.../PipeDataNodeRemainingEventAndTimeOperator.java | 3 +
.../dataregion/wal/utils/WALInsertNodeCache.java | 90 ++--------------------
.../wal/utils/WALInsertNodeCacheTest.java | 48 ------------
.../iotdb/commons/client/ClientPoolFactory.java | 27 +++++++
.../apache/iotdb/commons/conf/CommonConfig.java | 18 ++++-
.../iotdb/commons/enums/PipeRateAverage.java | 2 +
.../iotdb/commons/pipe/config/PipeConfig.java | 7 ++
.../iotdb/commons/pipe/config/PipeDescriptor.java | 10 +++
.../config/constant/PipeConnectorConstant.java | 2 +-
14 files changed, 107 insertions(+), 146 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 38a4cb648aa..df3c8734967 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -162,6 +162,8 @@ public class IoTDBConfig {
/** Memory allocated for the pipe */
private long allocateMemoryForPipe = Runtime.getRuntime().maxMemory() / 10;
+ private long allocateMemoryPerWalCache = 2 * 1024 * 1024;
+
/** Ratio of memory allocated for buffered arrays */
private double bufferedArraysMemoryProportion = 0.6;
@@ -2137,6 +2139,14 @@ public class IoTDBConfig {
this.allocateMemoryForPipe = allocateMemoryForPipe;
}
+ public long getAllocateMemoryPerWalCache() {
+ return allocateMemoryPerWalCache;
+ }
+
+ public void setAllocateMemoryPerWalCache(final long
allocateMemoryForWalCache) {
+ this.allocateMemoryPerWalCache = allocateMemoryForWalCache;
+ }
+
public long getAllocateMemoryForFree() {
return Runtime.getRuntime().maxMemory()
- allocateMemoryForStorageEngine
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 50fccad85b8..7daa0e726c3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2169,6 +2169,11 @@ public class IoTDBDescriptor {
}
}
}
+ conf.setAllocateMemoryPerWalCache(
+ Long.parseLong(
+ properties.getProperty(
+ "allocate_memory_per_wal_cache",
+ Long.toString(conf.getAllocateMemoryPerWalCache()))));
LOGGER.info("initial allocateMemoryForRead = {}",
conf.getAllocateMemoryForRead());
LOGGER.info("initial allocateMemoryForWrite = {}",
conf.getAllocateMemoryForStorageEngine());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index 551b0cb2dca..9cb1b0a5cd0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -116,7 +116,10 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
receiverAttributes,
new IClientManager.Factory<TEndPoint,
AsyncPipeDataTransferServiceClient>()
.createClientManager(
- new
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
+ isTSFileUsed
+ ? new ClientPoolFactory
+
.AsyncPipeTsFileDataTransferServiceClientPoolFactory()
+ : new
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
}
endPoint2Client =
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
index 91dade4b915..9a3d9f51785 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -22,7 +22,6 @@ package
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
-import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -48,7 +47,7 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
private long firstEventProcessingTime = Long.MIN_VALUE;
protected long totalBufferSize = 0;
- private final PipeDynamicMemoryBlock allocatedMemoryBlock;
+ private final PipeModelFixedMemoryBlock allocatedMemoryBlock;
protected volatile boolean isClosed = false;
@@ -61,8 +60,10 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
// limit in buffer size
this.allocatedMemoryBlock =
-
pipeModelFixedMemoryBlock.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes);
- allocatedMemoryBlock.setExpandable(false);
+ pipeModelFixedMemoryBlock =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForModelFixedMemoryBlock(
+ requestMaxBatchSizeInBytes, PipeMemoryBlockType.BATCH);
if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
LOGGER.info(
@@ -127,12 +128,8 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
throws WALPipeException, IOException;
public boolean shouldEmit() {
- final long diff = System.currentTimeMillis() - firstEventProcessingTime;
- if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
- allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double)
diff / maxDelayInMs);
- return true;
- }
- return false;
+ return totalBufferSize >= getMaxBatchSizeInBytes()
+ || System.currentTimeMillis() - firstEventProcessingTime >=
maxDelayInMs;
}
private long getMaxBatchSizeInBytes() {
@@ -200,9 +197,7 @@ public abstract class PipeTabletEventBatch implements
AutoCloseable {
try {
pipeModelFixedMemoryBlock =
PipeDataNodeResourceManager.memory()
- .forceAllocateForModelFixedMemoryBlock(
-
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch(),
- PipeMemoryBlockType.BATCH);
+ .forceAllocateForModelFixedMemoryBlock(0L,
PipeMemoryBlockType.BATCH);
} catch (Exception e) {
LOGGER.error("init pipe model fixed memory block failed", e);
// If the allocation fails, we still need to create a default memory
block to avoid NPE.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 825dfd11345..c9d16163fc8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -169,6 +169,11 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
// 5. Data inserted in the step2 is not captured by
PipeB, and if its tsfile
// epoch's state is USING_TABLET, the tsfile event
will be ignored, which
// will cause the data loss in the tsfile epoch.
+ LOGGER.info(
+ "The tsFile {}'s epoch's start time {} is smaller than
the captured insertNodes' min time {}, will regard it as data loss or
un-sequential, will extract the tsFile",
+ ((PipeTsFileInsertionEvent)
event.getEvent()).getTsFile(),
+ ((PipeTsFileInsertionEvent)
event.getEvent()).getFileStartTime(),
+ event.getTsFileEpoch().getInsertNodeMinTime());
return TsFileEpoch.State.USING_BOTH;
} else {
// All data in the tsfile epoch has been extracted in
tablet mode, so we should
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 2de7e0053f8..47dc0ff18b9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -105,6 +105,9 @@ class PipeDataNodeRemainingEventAndTimeOperator extends
PipeRemainingOperator {
}
double getRemainingInsertEventSmoothingCount() {
+ if (PipeConfig.getInstance().getPipeRemainingInsertNodeCountAverage() ==
PipeRateAverage.NONE) {
+ return insertNodeEventCount.get();
+ }
if (System.currentTimeMillis() - lastInsertNodeEventCountSmoothingTime
>=
PipeConfig.getInstance().getPipeRemainingInsertEventCountSmoothingIntervalSeconds())
{
insertNodeEventCountMeter.mark(insertNodeEventCount.get());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index b6ce0dc5de5..61a09ad6107 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.metric.overview.PipeWALInsertNodeCacheMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
-import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -48,13 +47,11 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
/** This cache is used by {@link WALEntryPosition}. */
public class WALInsertNodeCache {
@@ -65,11 +62,10 @@ public class WALInsertNodeCache {
private static PipeModelFixedMemoryBlock walModelFixedMemory = null;
- private final PipeDynamicMemoryBlock memoryBlock;
+ private final PipeModelFixedMemoryBlock memoryBlock;
// Used to adjust the memory usage of the cache
private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
- private final AtomicBoolean isBatchLoadEnabled = new AtomicBoolean(true);
// LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
private final LoadingCache<WALEntryPosition, Pair<ByteBuffer, InsertNode>>
lruCache;
@@ -83,17 +79,11 @@ public class WALInsertNodeCache {
init();
}
- final long requestedAllocateSize =
- (long)
- Math.min(
- 1.0
- * PIPE_CONFIG.getPipeMaxAllowedPinnedMemTableCount()
- * CONFIG.getWalFileSizeThresholdInByte()
- / CONFIG.getDataRegionNum(),
- 0.5 * CONFIG.getAllocateMemoryForPipe() /
CONFIG.getDataRegionNum());
- memoryBlock =
walModelFixedMemory.registerPipeBatchMemoryBlock(requestedAllocateSize);
- isBatchLoadEnabled.set(
- memoryBlock.getMemoryUsageInBytes() >=
CONFIG.getWalFileSizeThresholdInByte());
+ final long requestedAllocateSize = CONFIG.getAllocateMemoryPerWalCache();
+
+ memoryBlock =
+ PipeDataNodeResourceManager.memory()
+ .forceAllocateForModelFixedMemoryBlock(requestedAllocateSize,
PipeMemoryBlockType.WAL);
lruCache =
Caffeine.newBuilder()
.maximumWeight(requestedAllocateSize)
@@ -118,58 +108,9 @@ public class WALInsertNodeCache {
.recordStats()
.build(new WALInsertNodeCacheLoader());
- memoryBlock.setExpandable(true);
- memoryBlock.setExpand(
- memoryBlock -> {
- final long oldMemory = memoryBlock.getMemoryUsageInBytes();
-
memoryBlock.updateCurrentMemoryEfficiencyAdjustMem(lruCache.stats().hitRate());
- final long newMemory = memoryBlock.getMemoryUsageInBytes();
- if (newMemory > oldMemory) {
- setExpandCallback(oldMemory, newMemory, dataRegionId);
- } else if (newMemory < oldMemory) {
- shrinkCallback(oldMemory, newMemory, dataRegionId);
- }
- });
PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId);
}
- private void setExpandCallback(long oldMemory, long newMemory, Integer
dataRegionId) {
- memoryUsageCheatFactor.updateAndGet(
- factor ->
- factor == 0L || newMemory == 0L || oldMemory == 0
- ? 0.0
- : factor / ((double) newMemory / oldMemory));
- isBatchLoadEnabled.set(newMemory >=
CONFIG.getWalFileSizeThresholdInByte());
- LOGGER.info(
- "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has expanded
from {} to {}.",
- dataRegionId,
- oldMemory,
- newMemory);
- }
-
- private void shrinkCallback(long oldMemory, long newMemory, Integer
dataRegionId) {
- memoryUsageCheatFactor.updateAndGet(
- factor ->
- factor == 0L || newMemory == 0L || oldMemory == 0
- ? 0.0
- : factor * ((double) oldMemory / newMemory));
- isBatchLoadEnabled.set(newMemory >=
CONFIG.getWalFileSizeThresholdInByte());
- LOGGER.info(
- "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has shrunk
from {} to {}.",
- dataRegionId,
- oldMemory,
- newMemory);
- if (CONFIG.getWALCacheShrinkClearEnabled()) {
- try {
- lruCache.cleanUp();
- } catch (Exception e) {
- LOGGER.warn("Failed to clear WALInsertNodeCache for dataRegion ID:
{}.", dataRegionId, e);
- return;
- }
- LOGGER.info("Successfully cleared WALInsertNodeCache for dataRegion ID:
{}.", dataRegionId);
- }
- }
-
// please call this method at PipeLauncher
public static void init() {
if (walModelFixedMemory != null) {
@@ -179,9 +120,7 @@ public class WALInsertNodeCache {
// Allocate memory for the fixed memory block of WAL
walModelFixedMemory =
PipeDataNodeResourceManager.memory()
- .forceAllocateForModelFixedMemoryBlock(
-
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfWAL(),
- PipeMemoryBlockType.WAL);
+ .forceAllocateForModelFixedMemoryBlock(0L,
PipeMemoryBlockType.WAL);
} catch (Exception e) {
LOGGER.error("Failed to initialize WAL model fixed memory block", e);
walModelFixedMemory =
@@ -254,10 +193,7 @@ public class WALInsertNodeCache {
public Pair<ByteBuffer, InsertNode> getByteBufferOrInsertNode(final
WALEntryPosition position) {
hasPipeRunning = true;
- final Pair<ByteBuffer, InsertNode> pair =
- isBatchLoadEnabled.get()
- ? lruCache.getAll(Collections.singleton(position)).get(position)
- : lruCache.get(position);
+ final Pair<ByteBuffer, InsertNode> pair = lruCache.get(position);
if (pair == null) {
throw new IllegalStateException();
@@ -397,16 +333,6 @@ public class WALInsertNodeCache {
/////////////////////////// Test Only ///////////////////////////
- @TestOnly
- public boolean isBatchLoadEnabled() {
- return isBatchLoadEnabled.get();
- }
-
- @TestOnly
- public void setIsBatchLoadEnabled(final boolean isBatchLoadEnabled) {
- this.isBatchLoadEnabled.set(isBatchLoadEnabled);
- }
-
@TestOnly
boolean contains(WALEntryPosition position) {
return lruCache.getIfPresent(position) != null;
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
index 10a23d89bd3..d1e20b4b2ef 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
@@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
public class WALInsertNodeCacheTest {
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
@@ -152,53 +151,6 @@ public class WALInsertNodeCacheTest {
assertEquals(node1, cache.getInsertNode(position));
}
- @Test
- public void testBatchLoad() throws Exception {
- // Enable batch load
- boolean oldIsBatchLoadEnabled = cache.isBatchLoadEnabled();
- cache.setIsBatchLoadEnabled(true);
- WALInsertNodeCache localC = cache;
- try {
- // write memTable1
- IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
- walNode.onMemTableCreated(memTable1, logDirectory + "/" +
"fake1.tsfile");
- InsertRowNode node1 = getInsertRowNode(System.currentTimeMillis());
- node1.setSearchIndex(1);
- WALFlushListener flushListener1 = walNode.log(memTable1.getMemTableId(),
node1);
- WALEntryPosition position1 =
flushListener1.getWalEntryHandler().getWalEntryPosition();
- InsertRowNode node2 = getInsertRowNode(System.currentTimeMillis());
- node1.setSearchIndex(2);
- WALFlushListener flushListener2 = walNode.log(memTable1.getMemTableId(),
node2);
- WALEntryPosition position2 =
flushListener2.getWalEntryHandler().getWalEntryPosition();
- // write memTable2
- IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
- walNode.onMemTableCreated(memTable2, logDirectory + "/" +
"fake2.tsfile");
- InsertRowNode node3 = getInsertRowNode(System.currentTimeMillis());
- node1.setSearchIndex(3);
- WALFlushListener flushListener3 = walNode.log(memTable2.getMemTableId(),
node3);
- WALEntryPosition position3 =
flushListener3.getWalEntryHandler().getWalEntryPosition();
- // wait until wal flushed
- walNode.rollWALFile();
- Awaitility.await().until(() -> walNode.isAllWALEntriesConsumed() &&
position3.canRead());
- // check batch load memTable1
- cache.clear();
- cache.addMemTable(memTable1.getMemTableId());
- assertEquals(node1, cache.getInsertNode(position1));
- assertTrue(cache.contains(position1));
- assertTrue(cache.contains(position2));
- assertFalse(cache.contains(position3));
- // check batch load none
- cache.removeMemTable(memTable1.getMemTableId());
- cache.clear();
- assertEquals(node1, cache.getInsertNode(position1));
- assertTrue(cache.contains(position1));
- assertFalse(cache.contains(position2));
- assertFalse(cache.contains(position3));
- } finally {
-
WALInsertNodeCache.getInstance(1).setIsBatchLoadEnabled(oldIsBatchLoadEnabled);
- }
- }
-
private InsertRowNode getInsertRowNode(long time) throws
IllegalPathException {
TSDataType[] dataTypes =
new TSDataType[] {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 169c07d59d9..5d9a1e6b921 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -297,6 +297,33 @@ public class ClientPoolFactory {
}
}
+ public static class AsyncPipeTsFileDataTransferServiceClientPoolFactory
+ implements IClientPoolFactory<TEndPoint,
AsyncPipeDataTransferServiceClient> {
+ @Override
+ public GenericKeyedObjectPool<TEndPoint,
AsyncPipeDataTransferServiceClient> createClientPool(
+ ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> manager) {
+ final GenericKeyedObjectPool<TEndPoint,
AsyncPipeDataTransferServiceClient> clientPool =
+ new GenericKeyedObjectPool<>(
+ new AsyncPipeDataTransferServiceClient.Factory(
+ manager,
+ new ThriftClientProperty.Builder()
+
.setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs())
+ .setRpcThriftCompressionEnabled(
+ conf.isPipeConnectorRPCThriftCompressionEnabled())
+ .setSelectorNumOfAsyncClientManager(
+ conf.getPipeAsyncConnectorSelectorNumber())
+ .build(),
+ ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
+ new
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
+
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxTsFileClientNumber())
+ .build()
+ .getConfig());
+ ClientManagerMetrics.getInstance()
+ .registerClientManager(this.getClass().getSimpleName(), clientPool);
+ return clientPool;
+ }
+ }
+
public static class AsyncAINodeHeartbeatServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint, AsyncAINodeServiceClient> {
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 6c330991f78..839e2df2022 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -250,7 +250,9 @@ public class CommonConfig {
private int pipeAsyncConnectorSelectorNumber =
Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
private int pipeAsyncConnectorMaxClientNumber =
- Math.max(16, Runtime.getRuntime().availableProcessors() / 2);
+ Math.max(32, Runtime.getRuntime().availableProcessors() * 2);
+ private int pipeAsyncConnectorMaxTsFileClientNumber =
+ Math.max(16, Runtime.getRuntime().availableProcessors());
private double pipeAllSinksRateLimitBytesPerSecond = -1;
private int rateLimiterHotReloadCheckIntervalMs = 1000;
@@ -1132,6 +1134,20 @@ public class CommonConfig {
"pipeAsyncConnectorMaxClientNumber is set to {}.",
pipeAsyncConnectorMaxClientNumber);
}
+ public int getPipeAsyncConnectorMaxTsFileClientNumber() {
+ return pipeAsyncConnectorMaxTsFileClientNumber;
+ }
+
+ public void setPipeAsyncConnectorMaxTsFileClientNumber(
+ int pipeAsyncConnectorMaxTsFileClientNumber) {
+ if (this.pipeAsyncConnectorMaxTsFileClientNumber ==
pipeAsyncConnectorMaxTsFileClientNumber) {
+ return;
+ }
+ this.pipeAsyncConnectorMaxTsFileClientNumber =
pipeAsyncConnectorMaxTsFileClientNumber;
+ logger.info(
+ "pipeAsyncConnectorMaxClientNumber is set to {}.",
pipeAsyncConnectorMaxTsFileClientNumber);
+ }
+
public boolean isSeperatedPipeHeartbeatEnabled() {
return isSeperatedPipeHeartbeatEnabled;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
index 2a08d2f9608..1c90df1bb07 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.enums;
import com.codahale.metrics.Meter;
public enum PipeRateAverage {
+ NONE,
ONE_MINUTE,
FIVE_MINUTES,
FIFTEEN_MINUTES,
@@ -37,6 +38,7 @@ public enum PipeRateAverage {
return meter.getFifteenMinuteRate();
case MEAN:
return meter.getMeanRate();
+ case NONE:
default:
throw new UnsupportedOperationException(
String.format("The type %s is not supported in pipe rate
average.", this));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index 05f1397b071..a616d80b64e 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -211,6 +211,10 @@ public class PipeConfig {
return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber();
}
+ public int getPipeAsyncConnectorMaxTsFileClientNumber() {
+ return COMMON_CONFIG.getPipeAsyncConnectorMaxTsFileClientNumber();
+ }
+
public double getPipeAllConnectorsRateLimitBytesPerSecond() {
return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
}
@@ -579,6 +583,9 @@ public class PipeConfig {
getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());
LOGGER.info("PipeAsyncConnectorSelectorNumber: {}",
getPipeAsyncConnectorSelectorNumber());
LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}",
getPipeAsyncConnectorMaxClientNumber());
+ LOGGER.info(
+ "PipeAsyncConnectorMaxTsFileClientNumber: {}",
+ getPipeAsyncConnectorMaxTsFileClientNumber());
LOGGER.info(
"PipeAllConnectorsRateLimitBytesPerSecond: {}",
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 683819224aa..1849209d651 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -628,6 +628,16 @@ public class PipeDescriptor {
config.setPipeAsyncConnectorMaxClientNumber(Integer.parseInt(value));
}
+ value =
+ parserPipeConfig(
+ properties,
+ "pipe_sink_max_tsfile_client_number",
+ "pipe_async_connector_max_tsfile_client_number",
+ isHotModify);
+ if (value != null) {
+
config.setPipeAsyncConnectorMaxTsFileClientNumber(Integer.parseInt(value));
+ }
+
value = parserPipeConfig(properties,
"pipe_all_sinks_rate_limit_bytes_per_second", isHotModify);
if (value != null) {
config.setPipeAllSinksRateLimitBytesPerSecond(Double.parseDouble(value));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index 26841e73a70..94cdd935c98 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -70,7 +70,7 @@ public class PipeConnectorConstant {
public static final String CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY =
"connector.batch.max-delay-ms";
public static final String SINK_IOTDB_BATCH_DELAY_MS_KEY =
"sink.batch.max-delay-ms";
- public static final int CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE = 200;
+ public static final int CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE = 20;
public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY =
"connector.batch.size-bytes";
public static final String SINK_IOTDB_BATCH_SIZE_KEY =
"sink.batch.size-bytes";