This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c7cb1ecfee Pipe: Fixed multiple bugs (#15674) (#15683)
5c7cb1ecfee is described below

commit 5c7cb1ecfee26cee32d47b83a68afe326dcc5538
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 9 19:46:06 2025 +0800

    Pipe: Fixed multiple bugs (#15674) (#15683)
    
    1. Doubled the TCP connection num
    2. Made the memory for wal and batch fixed
    3. Changed the default batch time to 20ms
    4. Disabled the batch load for wal cache
    5. Separated the tsFile and insertNode client
    
    ---------
    
    Co-authored-by: luoluoyuyu <[email protected]>
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 +++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  6 ++
 .../client/IoTDBDataNodeAsyncClientManager.java    |  5 +-
 .../evolvable/batch/PipeTabletEventBatch.java      | 21 ++---
 .../PipeRealtimeDataRegionHybridExtractor.java     |  5 ++
 .../PipeDataNodeRemainingEventAndTimeOperator.java |  3 +
 .../dataregion/wal/utils/WALInsertNodeCache.java   | 93 +++-------------------
 .../wal/utils/WALInsertNodeCacheTest.java          | 48 -----------
 .../iotdb/commons/client/ClientPoolFactory.java    | 27 +++++++
 .../apache/iotdb/commons/conf/CommonConfig.java    | 18 ++++-
 .../iotdb/commons/enums/PipeRateAverage.java       |  2 +
 .../iotdb/commons/pipe/config/PipeConfig.java      |  7 ++
 .../iotdb/commons/pipe/config/PipeDescriptor.java  | 10 +++
 .../config/constant/PipeConnectorConstant.java     |  2 +-
 14 files changed, 109 insertions(+), 148 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 8137959a391..b2af80301e6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -151,6 +151,8 @@ public class IoTDBConfig {
 
   private long allocateMemoryForRead = Runtime.getRuntime().maxMemory() * 3 / 
10;
 
+  private long allocateMemoryPerWalCache = 2 * 1024 * 1024;
+
   /** Flush proportion for system */
   private double flushProportion = 0.4;
 
@@ -2002,6 +2004,14 @@ public class IoTDBConfig {
     this.writeMemoryVariationReportProportion = 
writeMemoryVariationReportProportion;
   }
 
+  public long getAllocateMemoryPerWalCache() {
+    return allocateMemoryPerWalCache;
+  }
+
+  public void setAllocateMemoryPerWalCache(final long 
allocateMemoryForWalCache) {
+    this.allocateMemoryPerWalCache = allocateMemoryForWalCache;
+  }
+
   public boolean isEnablePartialInsert() {
     return enablePartialInsert;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
old mode 100755
new mode 100644
index dfd854f724c..155aab7c5b5
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2458,6 +2458,12 @@ public class IoTDBDescriptor {
     conf.setIotConsensusV2DeletionFileDir(
         properties.getProperty(
             "iot_consensus_v2_deletion_file_dir", 
conf.getIotConsensusV2DeletionFileDir()));
+
+    conf.setAllocateMemoryPerWalCache(
+        Long.parseLong(
+            properties.getProperty(
+                "allocate_memory_per_wal_cache",
+                Long.toString(conf.getAllocateMemoryPerWalCache()))));
   }
 
   private void loadCQProps(TrimProperties properties) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
index a1531343b04..15a044abae4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/client/IoTDBDataNodeAsyncClientManager.java
@@ -116,7 +116,10 @@ public class IoTDBDataNodeAsyncClientManager extends 
IoTDBClientManager
             receiverAttributes,
             new IClientManager.Factory<TEndPoint, 
AsyncPipeDataTransferServiceClient>()
                 .createClientManager(
-                    new 
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
+                    isTSFileUsed
+                        ? new ClientPoolFactory
+                            
.AsyncPipeTsFileDataTransferServiceClientPoolFactory()
+                        : new 
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
       }
       endPoint2Client = 
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
index 91dade4b915..9a3d9f51785 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventBatch.java
@@ -22,7 +22,6 @@ package 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
-import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
 import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -48,7 +47,7 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
   private long firstEventProcessingTime = Long.MIN_VALUE;
 
   protected long totalBufferSize = 0;
-  private final PipeDynamicMemoryBlock allocatedMemoryBlock;
+  private final PipeModelFixedMemoryBlock allocatedMemoryBlock;
 
   protected volatile boolean isClosed = false;
 
@@ -61,8 +60,10 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
 
     // limit in buffer size
     this.allocatedMemoryBlock =
-        
pipeModelFixedMemoryBlock.registerPipeBatchMemoryBlock(requestMaxBatchSizeInBytes);
-    allocatedMemoryBlock.setExpandable(false);
+        pipeModelFixedMemoryBlock =
+            PipeDataNodeResourceManager.memory()
+                .forceAllocateForModelFixedMemoryBlock(
+                    requestMaxBatchSizeInBytes, PipeMemoryBlockType.BATCH);
 
     if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) {
       LOGGER.info(
@@ -127,12 +128,8 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
       throws WALPipeException, IOException;
 
   public boolean shouldEmit() {
-    final long diff = System.currentTimeMillis() - firstEventProcessingTime;
-    if (totalBufferSize >= getMaxBatchSizeInBytes() || diff >= maxDelayInMs) {
-      allocatedMemoryBlock.updateCurrentMemoryEfficiencyAdjustMem((double) 
diff / maxDelayInMs);
-      return true;
-    }
-    return false;
+    return totalBufferSize >= getMaxBatchSizeInBytes()
+        || System.currentTimeMillis() - firstEventProcessingTime >= 
maxDelayInMs;
   }
 
   private long getMaxBatchSizeInBytes() {
@@ -200,9 +197,7 @@ public abstract class PipeTabletEventBatch implements 
AutoCloseable {
     try {
       pipeModelFixedMemoryBlock =
           PipeDataNodeResourceManager.memory()
-              .forceAllocateForModelFixedMemoryBlock(
-                  
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfBatch(),
-                  PipeMemoryBlockType.BATCH);
+              .forceAllocateForModelFixedMemoryBlock(0L, 
PipeMemoryBlockType.BATCH);
     } catch (Exception e) {
       LOGGER.error("init pipe model fixed memory block failed", e);
       // If the allocation fails, we still need to create a default memory 
block to avoid NPE.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index 52de019f741..92e9ad64f5d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -168,6 +168,11 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
                     //  5. Data inserted in the step2 is not captured by 
PipeB, and if its tsfile
                     //     epoch's state is USING_TABLET, the tsfile event 
will be ignored, which
                     //     will cause the data loss in the tsfile epoch.
+                    LOGGER.info(
+                        "The tsFile {}'s epoch's start time {} is smaller than 
the captured insertNodes' min time {}, will regard it as data loss or 
un-sequential, will extract the tsFile",
+                        ((PipeTsFileInsertionEvent) 
event.getEvent()).getTsFile(),
+                        ((PipeTsFileInsertionEvent) 
event.getEvent()).getFileStartTime(),
+                        event.getTsFileEpoch().getInsertNodeMinTime());
                     return TsFileEpoch.State.USING_BOTH;
                   } else {
                     // All data in the tsfile epoch has been extracted in 
tablet mode, so we should
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
index 2de7e0053f8..47dc0ff18b9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeDataNodeRemainingEventAndTimeOperator.java
@@ -105,6 +105,9 @@ class PipeDataNodeRemainingEventAndTimeOperator extends 
PipeRemainingOperator {
   }
 
   double getRemainingInsertEventSmoothingCount() {
+    if (PipeConfig.getInstance().getPipeRemainingInsertNodeCountAverage() == 
PipeRateAverage.NONE) {
+      return insertNodeEventCount.get();
+    }
     if (System.currentTimeMillis() - lastInsertNodeEventCountSmoothingTime
         >= 
PipeConfig.getInstance().getPipeRemainingInsertEventCountSmoothingIntervalSeconds())
 {
       insertNodeEventCountMeter.mark(insertNodeEventCount.get());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 7ca049698ec..46c3d962754 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.metric.overview.PipeWALInsertNodeCacheMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
-import org.apache.iotdb.db.pipe.resource.memory.PipeDynamicMemoryBlock;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlockType;
 import org.apache.iotdb.db.pipe.resource.memory.PipeModelFixedMemoryBlock;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
@@ -49,13 +48,11 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /** This cache is used by {@link WALEntryPosition}. */
 public class WALInsertNodeCache {
@@ -68,11 +65,10 @@ public class WALInsertNodeCache {
 
   private static PipeModelFixedMemoryBlock walModelFixedMemory = null;
 
-  private final PipeDynamicMemoryBlock memoryBlock;
+  private final PipeModelFixedMemoryBlock memoryBlock;
 
   // Used to adjust the memory usage of the cache
   private final AtomicDouble memoryUsageCheatFactor = new AtomicDouble(1);
-  private final AtomicBoolean isBatchLoadEnabled = new AtomicBoolean(true);
   // LRU cache, find Pair<ByteBuffer, InsertNode> by WALEntryPosition
   private final LoadingCache<WALEntryPosition, Pair<ByteBuffer, InsertNode>> 
lruCache;
 
@@ -86,19 +82,12 @@ public class WALInsertNodeCache {
       init();
     }
 
-    final long requestedAllocateSize =
-        (long)
-            Math.min(
-                1.0
-                    * PIPE_CONFIG.getPipeMaxAllowedPinnedMemTableCount()
-                    * CONFIG.getWalFileSizeThresholdInByte()
-                    / CONFIG.getDataRegionNum(),
-                0.5
-                    * 
MEMORY_CONFIG.getPipeMemoryManager().getTotalMemorySizeInBytes()
-                    / CONFIG.getDataRegionNum());
-    memoryBlock = 
walModelFixedMemory.registerPipeBatchMemoryBlock(requestedAllocateSize);
-    isBatchLoadEnabled.set(
-        memoryBlock.getMemoryUsageInBytes() >= 
CONFIG.getWalFileSizeThresholdInByte());
+    final long requestedAllocateSize = CONFIG.getAllocateMemoryPerWalCache();
+
+    memoryBlock =
+        PipeDataNodeResourceManager.memory()
+            .forceAllocateForModelFixedMemoryBlock(requestedAllocateSize, 
PipeMemoryBlockType.WAL);
+
     lruCache =
         Caffeine.newBuilder()
             .maximumWeight(requestedAllocateSize)
@@ -123,58 +112,9 @@ public class WALInsertNodeCache {
             .recordStats()
             .build(new WALInsertNodeCacheLoader());
 
-    memoryBlock.setExpandable(true);
-    memoryBlock.setExpand(
-        memoryBlock -> {
-          final long oldMemory = memoryBlock.getMemoryUsageInBytes();
-          
memoryBlock.updateCurrentMemoryEfficiencyAdjustMem(lruCache.stats().hitRate());
-          final long newMemory = memoryBlock.getMemoryUsageInBytes();
-          if (newMemory > oldMemory) {
-            setExpandCallback(oldMemory, newMemory, dataRegionId);
-          } else if (newMemory < oldMemory) {
-            shrinkCallback(oldMemory, newMemory, dataRegionId);
-          }
-        });
     PipeWALInsertNodeCacheMetrics.getInstance().register(this, dataRegionId);
   }
 
-  private void setExpandCallback(long oldMemory, long newMemory, Integer 
dataRegionId) {
-    memoryUsageCheatFactor.updateAndGet(
-        factor ->
-            factor == 0L || newMemory == 0L || oldMemory == 0
-                ? 0.0
-                : factor / ((double) newMemory / oldMemory));
-    isBatchLoadEnabled.set(newMemory >= 
CONFIG.getWalFileSizeThresholdInByte());
-    LOGGER.info(
-        "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has expanded 
from {} to {}.",
-        dataRegionId,
-        oldMemory,
-        newMemory);
-  }
-
-  private void shrinkCallback(long oldMemory, long newMemory, Integer 
dataRegionId) {
-    memoryUsageCheatFactor.updateAndGet(
-        factor ->
-            factor == 0L || newMemory == 0L || oldMemory == 0
-                ? 0.0
-                : factor * ((double) oldMemory / newMemory));
-    isBatchLoadEnabled.set(newMemory >= 
CONFIG.getWalFileSizeThresholdInByte());
-    LOGGER.info(
-        "WALInsertNodeCache.allocatedMemoryBlock of dataRegion {} has shrunk 
from {} to {}.",
-        dataRegionId,
-        oldMemory,
-        newMemory);
-    if (CONFIG.getWALCacheShrinkClearEnabled()) {
-      try {
-        lruCache.cleanUp();
-      } catch (Exception e) {
-        LOGGER.warn("Failed to clear WALInsertNodeCache for dataRegion ID: 
{}.", dataRegionId, e);
-        return;
-      }
-      LOGGER.info("Successfully cleared WALInsertNodeCache for dataRegion ID: 
{}.", dataRegionId);
-    }
-  }
-
   // please call this method at PipeLauncher
   public static void init() {
     if (walModelFixedMemory != null) {
@@ -184,9 +124,7 @@ public class WALInsertNodeCache {
       // Allocate memory for the fixed memory block of WAL
       walModelFixedMemory =
           PipeDataNodeResourceManager.memory()
-              .forceAllocateForModelFixedMemoryBlock(
-                  
PipeDataNodeResourceManager.memory().getAllocatedMemorySizeInBytesOfWAL(),
-                  PipeMemoryBlockType.WAL);
+              .forceAllocateForModelFixedMemoryBlock(0L, 
PipeMemoryBlockType.WAL);
     } catch (Exception e) {
       LOGGER.error("Failed to initialize WAL model fixed memory block", e);
       walModelFixedMemory =
@@ -259,10 +197,7 @@ public class WALInsertNodeCache {
   public Pair<ByteBuffer, InsertNode> getByteBufferOrInsertNode(final 
WALEntryPosition position) {
     hasPipeRunning = true;
 
-    final Pair<ByteBuffer, InsertNode> pair =
-        isBatchLoadEnabled.get()
-            ? lruCache.getAll(Collections.singleton(position)).get(position)
-            : lruCache.get(position);
+    final Pair<ByteBuffer, InsertNode> pair = lruCache.get(position);
 
     if (pair == null) {
       throw new IllegalStateException();
@@ -402,16 +337,6 @@ public class WALInsertNodeCache {
 
   /////////////////////////// Test Only ///////////////////////////
 
-  @TestOnly
-  public boolean isBatchLoadEnabled() {
-    return isBatchLoadEnabled.get();
-  }
-
-  @TestOnly
-  public void setIsBatchLoadEnabled(final boolean isBatchLoadEnabled) {
-    this.isBatchLoadEnabled.set(isBatchLoadEnabled);
-  }
-
   @TestOnly
   boolean contains(WALEntryPosition position) {
     return lruCache.getIfPresent(position) != null;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
index 10a23d89bd3..d1e20b4b2ef 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCacheTest.java
@@ -46,7 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
 
 public class WALInsertNodeCacheTest {
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
@@ -152,53 +151,6 @@ public class WALInsertNodeCacheTest {
     assertEquals(node1, cache.getInsertNode(position));
   }
 
-  @Test
-  public void testBatchLoad() throws Exception {
-    // Enable batch load
-    boolean oldIsBatchLoadEnabled = cache.isBatchLoadEnabled();
-    cache.setIsBatchLoadEnabled(true);
-    WALInsertNodeCache localC = cache;
-    try {
-      // write memTable1
-      IMemTable memTable1 = new PrimitiveMemTable(databasePath, dataRegionId);
-      walNode.onMemTableCreated(memTable1, logDirectory + "/" + 
"fake1.tsfile");
-      InsertRowNode node1 = getInsertRowNode(System.currentTimeMillis());
-      node1.setSearchIndex(1);
-      WALFlushListener flushListener1 = walNode.log(memTable1.getMemTableId(), 
node1);
-      WALEntryPosition position1 = 
flushListener1.getWalEntryHandler().getWalEntryPosition();
-      InsertRowNode node2 = getInsertRowNode(System.currentTimeMillis());
-      node1.setSearchIndex(2);
-      WALFlushListener flushListener2 = walNode.log(memTable1.getMemTableId(), 
node2);
-      WALEntryPosition position2 = 
flushListener2.getWalEntryHandler().getWalEntryPosition();
-      // write memTable2
-      IMemTable memTable2 = new PrimitiveMemTable(databasePath, dataRegionId);
-      walNode.onMemTableCreated(memTable2, logDirectory + "/" + 
"fake2.tsfile");
-      InsertRowNode node3 = getInsertRowNode(System.currentTimeMillis());
-      node1.setSearchIndex(3);
-      WALFlushListener flushListener3 = walNode.log(memTable2.getMemTableId(), 
node3);
-      WALEntryPosition position3 = 
flushListener3.getWalEntryHandler().getWalEntryPosition();
-      // wait until wal flushed
-      walNode.rollWALFile();
-      Awaitility.await().until(() -> walNode.isAllWALEntriesConsumed() && 
position3.canRead());
-      // check batch load memTable1
-      cache.clear();
-      cache.addMemTable(memTable1.getMemTableId());
-      assertEquals(node1, cache.getInsertNode(position1));
-      assertTrue(cache.contains(position1));
-      assertTrue(cache.contains(position2));
-      assertFalse(cache.contains(position3));
-      // check batch load none
-      cache.removeMemTable(memTable1.getMemTableId());
-      cache.clear();
-      assertEquals(node1, cache.getInsertNode(position1));
-      assertTrue(cache.contains(position1));
-      assertFalse(cache.contains(position2));
-      assertFalse(cache.contains(position3));
-    } finally {
-      
WALInsertNodeCache.getInstance(1).setIsBatchLoadEnabled(oldIsBatchLoadEnabled);
-    }
-  }
-
   private InsertRowNode getInsertRowNode(long time) throws 
IllegalPathException {
     TSDataType[] dataTypes =
         new TSDataType[] {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index fad0efa75b8..32c6345dc27 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -298,6 +298,33 @@ public class ClientPoolFactory {
     }
   }
 
+  public static class AsyncPipeTsFileDataTransferServiceClientPoolFactory
+      implements IClientPoolFactory<TEndPoint, 
AsyncPipeDataTransferServiceClient> {
+    @Override
+    public GenericKeyedObjectPool<TEndPoint, 
AsyncPipeDataTransferServiceClient> createClientPool(
+        ClientManager<TEndPoint, AsyncPipeDataTransferServiceClient> manager) {
+      final GenericKeyedObjectPool<TEndPoint, 
AsyncPipeDataTransferServiceClient> clientPool =
+          new GenericKeyedObjectPool<>(
+              new AsyncPipeDataTransferServiceClient.Factory(
+                  manager,
+                  new ThriftClientProperty.Builder()
+                      
.setConnectionTimeoutMs(conf.getPipeConnectorTransferTimeoutMs())
+                      .setRpcThriftCompressionEnabled(
+                          conf.isPipeConnectorRPCThriftCompressionEnabled())
+                      .setSelectorNumOfAsyncClientManager(
+                          conf.getPipeAsyncConnectorSelectorNumber())
+                      .build(),
+                  ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()),
+              new 
ClientPoolProperty.Builder<AsyncPipeDataTransferServiceClient>()
+                  
.setMaxClientNumForEachNode(conf.getPipeAsyncConnectorMaxTsFileClientNumber())
+                  .build()
+                  .getConfig());
+      ClientManagerMetrics.getInstance()
+          .registerClientManager(this.getClass().getSimpleName(), clientPool);
+      return clientPool;
+    }
+  }
+
   public static class AsyncAINodeHeartbeatServiceClientPoolFactory
       implements IClientPoolFactory<TEndPoint, AsyncAINodeServiceClient> {
     @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 6c330991f78..839e2df2022 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -250,7 +250,9 @@ public class CommonConfig {
   private int pipeAsyncConnectorSelectorNumber =
       Math.max(4, Runtime.getRuntime().availableProcessors() / 2);
   private int pipeAsyncConnectorMaxClientNumber =
-      Math.max(16, Runtime.getRuntime().availableProcessors() / 2);
+      Math.max(32, Runtime.getRuntime().availableProcessors() * 2);
+  private int pipeAsyncConnectorMaxTsFileClientNumber =
+      Math.max(16, Runtime.getRuntime().availableProcessors());
 
   private double pipeAllSinksRateLimitBytesPerSecond = -1;
   private int rateLimiterHotReloadCheckIntervalMs = 1000;
@@ -1132,6 +1134,20 @@ public class CommonConfig {
         "pipeAsyncConnectorMaxClientNumber is set to {}.", 
pipeAsyncConnectorMaxClientNumber);
   }
 
+  public int getPipeAsyncConnectorMaxTsFileClientNumber() {
+    return pipeAsyncConnectorMaxTsFileClientNumber;
+  }
+
+  public void setPipeAsyncConnectorMaxTsFileClientNumber(
+      int pipeAsyncConnectorMaxTsFileClientNumber) {
+    if (this.pipeAsyncConnectorMaxTsFileClientNumber == 
pipeAsyncConnectorMaxTsFileClientNumber) {
+      return;
+    }
+    this.pipeAsyncConnectorMaxTsFileClientNumber = 
pipeAsyncConnectorMaxTsFileClientNumber;
+    logger.info(
+        "pipeAsyncConnectorMaxClientNumber is set to {}.", 
pipeAsyncConnectorMaxTsFileClientNumber);
+  }
+
   public boolean isSeperatedPipeHeartbeatEnabled() {
     return isSeperatedPipeHeartbeatEnabled;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
index 2a08d2f9608..1c90df1bb07 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/enums/PipeRateAverage.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.enums;
 import com.codahale.metrics.Meter;
 
 public enum PipeRateAverage {
+  NONE,
   ONE_MINUTE,
   FIVE_MINUTES,
   FIFTEEN_MINUTES,
@@ -37,6 +38,7 @@ public enum PipeRateAverage {
         return meter.getFifteenMinuteRate();
       case MEAN:
         return meter.getMeanRate();
+      case NONE:
       default:
         throw new UnsupportedOperationException(
             String.format("The type %s is not supported in pipe rate 
average.", this));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index fb05e63a666..cb822bb28df 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -211,6 +211,10 @@ public class PipeConfig {
     return COMMON_CONFIG.getPipeAsyncConnectorMaxClientNumber();
   }
 
+  public int getPipeAsyncConnectorMaxTsFileClientNumber() {
+    return COMMON_CONFIG.getPipeAsyncConnectorMaxTsFileClientNumber();
+  }
+
   public double getPipeAllConnectorsRateLimitBytesPerSecond() {
     return COMMON_CONFIG.getPipeAllSinksRateLimitBytesPerSecond();
   }
@@ -578,6 +582,9 @@ public class PipeConfig {
         getPipeAsyncConnectorMaxRetryExecutionTimeMsPerCall());
     LOGGER.info("PipeAsyncConnectorSelectorNumber: {}", 
getPipeAsyncConnectorSelectorNumber());
     LOGGER.info("PipeAsyncConnectorMaxClientNumber: {}", 
getPipeAsyncConnectorMaxClientNumber());
+    LOGGER.info(
+        "PipeAsyncConnectorMaxTsFileClientNumber: {}",
+        getPipeAsyncConnectorMaxTsFileClientNumber());
 
     LOGGER.info(
         "PipeAllConnectorsRateLimitBytesPerSecond: {}",
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index 683819224aa..1849209d651 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -628,6 +628,16 @@ public class PipeDescriptor {
       config.setPipeAsyncConnectorMaxClientNumber(Integer.parseInt(value));
     }
 
+    value =
+        parserPipeConfig(
+            properties,
+            "pipe_sink_max_tsfile_client_number",
+            "pipe_async_connector_max_tsfile_client_number",
+            isHotModify);
+    if (value != null) {
+      
config.setPipeAsyncConnectorMaxTsFileClientNumber(Integer.parseInt(value));
+    }
+
     value = parserPipeConfig(properties, 
"pipe_all_sinks_rate_limit_bytes_per_second", isHotModify);
     if (value != null) {
       config.setPipeAllSinksRateLimitBytesPerSecond(Double.parseDouble(value));
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
index e09eadeef7a..a6c491bbea8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java
@@ -71,7 +71,7 @@ public class PipeConnectorConstant {
 
   public static final String CONNECTOR_IOTDB_BATCH_DELAY_MS_KEY = 
"connector.batch.max-delay-ms";
   public static final String SINK_IOTDB_BATCH_DELAY_MS_KEY = 
"sink.batch.max-delay-ms";
-  public static final int CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE = 200;
+  public static final int CONNECTOR_IOTDB_BATCH_DELAY_MS_DEFAULT_VALUE = 20;
 
   public static final String CONNECTOR_IOTDB_BATCH_SIZE_KEY = 
"connector.batch.size-bytes";
   public static final String SINK_IOTDB_BATCH_SIZE_KEY = 
"sink.batch.size-bytes";

Reply via email to