This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 0ff85cdf899 Pipe: Fix pipe executor stuck by unlimited file event 
memory allocation retries & Pipe: Fix IO triggered in disruptor & Pipe: Fix 
forceAllocateIfSufficient & Load: Remove RM detect logic during the second 
phase (#15085)
0ff85cdf899 is described below

commit 0ff85cdf8993d4b4a4ab89d73866aaf9073196ab
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Mar 14 09:53:11 2025 +0800

    Pipe: Fix pipe executor stuck by unlimited file event memory allocation 
retries & Pipe: Fix IO triggered in disruptor & Pipe: Fix 
forceAllocateIfSufficient & Load: Remove RM detect logic during the second 
phase (#15085)
    
    Co-authored-by: VGalaxies <[email protected]>
---
 .../IoTDBDefaultPullConsumerDataSetIT.java         | 28 ++++++++---
 .../async/handler/PipeTransferTsFileHandler.java   | 15 +++++-
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  4 +-
 .../db/pipe/resource/memory/PipeMemoryManager.java | 29 ++++-------
 .../pipe/resource/tsfile/PipeTsFileResource.java   |  6 +--
 .../impl/DataNodeInternalRPCServiceImpl.java       | 58 +++-------------------
 .../plan/analyze/ClusterPartitionFetcher.java      |  5 --
 .../plan/analyze/IPartitionFetcher.java            |  4 --
 .../scheduler/load/LoadTsFileDispatcherImpl.java   | 27 +++++-----
 .../plan/scheduler/load/LoadTsFileScheduler.java   | 24 +--------
 .../dataregion/wal/utils/WALEntryHandler.java      |  4 +-
 .../dataregion/wal/utils/WALInsertNodeCache.java   |  8 +--
 .../plan/analyze/FakePartitionFetcherImpl.java     |  5 --
 .../plan/planner/distribution/Util.java            |  5 --
 .../plan/planner/distribution/Util2.java           |  5 --
 .../src/main/thrift/datanode.thrift                |  1 -
 16 files changed, 79 insertions(+), 149 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java
index 9b2994696cc..0b36deed5eb 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java
@@ -41,6 +41,8 @@ import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
+import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
+
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2SubscriptionRegressionMisc.class})
 public class IoTDBDefaultPullConsumerDataSetIT extends 
AbstractSubscriptionRegressionIT {
@@ -117,10 +119,15 @@ public class IoTDBDefaultPullConsumerDataSetIT extends 
AbstractSubscriptionRegre
     String sql = "select count(s_0) from " + databasePrefix + "0.d_0";
     System.out.println(FORMAT.format(new Date()) + " src: " + 
getCount(session_src, sql));
     // Consumption data
-    consume_data(consumer, session_dest);
-    for (int i = 0; i < deviceCount; i++) {
-      check_count(10, "select count(s_0) from " + devices.get(i), i + 
":Consumption Data:s_0");
-    }
+    AWAIT.untilAsserted(
+        () -> {
+          session_src.executeNonQueryStatement("flush");
+          consume_data(consumer, session_dest);
+          for (int i = 0; i < deviceCount; i++) {
+            check_count(
+                10, "select count(s_0) from " + devices.get(i), i + 
":Consumption Data:s_0");
+          }
+        });
     // Unsubscribe
     consumer.unsubscribe(topicName);
     // Unsubscribe and then write data
@@ -133,9 +140,14 @@ public class IoTDBDefaultPullConsumerDataSetIT extends 
AbstractSubscriptionRegre
     System.out.println(FORMAT.format(new Date()) + " src: " + 
getCount(session_src, sql));
     // Consumption data: Progress is not retained when re-subscribing after 
cancellation. Full
     // synchronization.
-    consume_data(consumer, session_dest);
-    for (int i = 0; i < deviceCount; i++) {
-      check_count(15, "select count(s_0) from " + devices.get(i), i + 
":consume data again:s_0");
-    }
+    AWAIT.untilAsserted(
+        () -> {
+          session_src.executeNonQueryStatement("flush");
+          consume_data(consumer, session_dest);
+          for (int i = 0; i < deviceCount; i++) {
+            check_count(
+                15, "select count(s_0) from " + devices.get(i), i + ":consume 
data again:s_0");
+          }
+        });
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
index 3423d980079..5de2ef38948 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/handler/PipeTransferTsFileHandler.java
@@ -113,7 +113,13 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     this.transferMod = transferMod;
     currentFile = transferMod ? modFile : tsFile;
 
-    waitForResourceEnough4Slicing(Integer.MAX_VALUE);
+    // NOTE: Waiting for resource enough for slicing here may cause deadlock!
+    // TsFile events are producing and consuming at the same time, and the 
memory of a TsFile
+    // event is not released until the TsFile is sealed. So if the memory is 
not enough for slicing,
+    // the TsFile event will be blocked and waiting for the memory to be 
released. At the same time,
+    // the memory of the TsFile event is not released, so the memory is not 
enough for slicing. This
+    // will cause a deadlock.
+    waitForResourceEnough4Slicing((long) ((1 + Math.random()) * 20 * 1000)); 
// 20 - 40 seconds
     readFileBufferSize =
         (int)
             Math.min(
@@ -395,7 +401,14 @@ public class PipeTransferTsFileHandler extends 
PipeTransferTrackableHandler {
     memoryBlock.close();
   }
 
+  /**
+   * @param timeoutMs CAN NOT BE UNLIMITED, otherwise it may cause deadlock.
+   */
   private void waitForResourceEnough4Slicing(final long timeoutMs) throws 
InterruptedException {
+    if 
(!PipeConfig.getInstance().isPipeConnectorReadFileBufferMemoryControlEnabled()) 
{
+      return;
+    }
+
     final PipeMemoryManager memoryManager = 
PipeDataNodeResourceManager.memory();
     if (memoryManager.isEnough4TsFileSlicing()) {
       return;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 15378fc6482..4f9823709a3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -402,7 +402,9 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
 
   @Override
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents() throws 
PipeException {
-    return toTabletInsertionEvents(Long.MAX_VALUE);
+    // 20 - 40 seconds for waiting
+    // Can not be unlimited or will cause deadlock
+    return toTabletInsertionEvents((long) ((1 + Math.random()) * 20 * 1000));
   }
 
   public Iterable<TabletInsertionEvent> toTabletInsertionEvents(final long 
timeoutMs)
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
index 6a2f63858f6..c2ac5cb70c4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java
@@ -242,7 +242,7 @@ public class PipeMemoryManager {
       }
 
       try {
-        tryShrink4Allocate(sizeInBytes);
+        tryShrinkUntilFreeMemorySatisfy(sizeInBytes);
         this.wait(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
@@ -312,7 +312,7 @@ public class PipeMemoryManager {
       }
 
       try {
-        tryShrink4Allocate(sizeInBytes);
+        tryShrinkUntilFreeMemorySatisfy(sizeInBytes);
         this.wait(MEMORY_ALLOCATE_RETRY_INTERVAL_IN_MS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
@@ -332,13 +332,13 @@ public class PipeMemoryManager {
   }
 
   /**
-   * Allocate a {@link PipeMemoryBlock} for pipe only if memory already used 
is less than the
-   * specified threshold.
+   * Allocate a {@link PipeMemoryBlock} for pipe only if memory used after 
allocation is less than
+   * the specified threshold.
    *
    * @param sizeInBytes size of memory needed to allocate
    * @param usedThreshold proportion of memory used, ranged from 0.0 to 1.0
-   * @return {@code null} if the proportion of memory already used exceeds 
{@code usedThreshold}.
-   *     Will return a memory block otherwise.
+   * @return {@code null} if the proportion of memory used after allocation 
exceeds {@code
+   *     usedThreshold}. Will return a memory block otherwise.
    */
   public synchronized PipeMemoryBlock forceAllocateIfSufficient(
       long sizeInBytes, float usedThreshold) {
@@ -354,18 +354,11 @@ public class PipeMemoryManager {
       return registerMemoryBlock(0);
     }
 
-    if (TOTAL_MEMORY_SIZE_IN_BYTES - usedMemorySizeInBytes >= sizeInBytes
-        && (float) usedMemorySizeInBytes / TOTAL_MEMORY_SIZE_IN_BYTES < 
usedThreshold) {
+    if ((float) (usedMemorySizeInBytes + sizeInBytes)
+        <= TOTAL_MEMORY_SIZE_IN_BYTES * usedThreshold) {
       return forceAllocate(sizeInBytes);
-    } else {
-      long memoryToShrink =
-          Math.max(
-              usedMemorySizeInBytes - (long) (TOTAL_MEMORY_SIZE_IN_BYTES * 
usedThreshold),
-              sizeInBytes);
-      if (tryShrink4Allocate(memoryToShrink)) {
-        return forceAllocate(sizeInBytes);
-      }
     }
+
     return null;
   }
 
@@ -404,7 +397,7 @@ public class PipeMemoryManager {
               MEMORY_ALLOCATE_MIN_SIZE_IN_BYTES);
     }
 
-    if (tryShrink4Allocate(sizeToAllocateInBytes)) {
+    if (tryShrinkUntilFreeMemorySatisfy(sizeToAllocateInBytes)) {
       LOGGER.info(
           "tryAllocate: allocated memory, "
               + "total memory size {} bytes, used memory size {} bytes, "
@@ -477,7 +470,7 @@ public class PipeMemoryManager {
     return returnedMemoryBlock;
   }
 
-  private boolean tryShrink4Allocate(long sizeInBytes) {
+  private boolean tryShrinkUntilFreeMemorySatisfy(long sizeInBytes) {
     final List<PipeMemoryBlock> shuffledBlocks = new 
ArrayList<>(allocatedBlocks);
     Collections.shuffle(shuffledBlocks);
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
index 1c2a46e9b59..2cb3a59c399 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResource.java
@@ -49,7 +49,7 @@ public class PipeTsFileResource implements AutoCloseable {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTsFileResource.class);
 
   public static final long TSFILE_MIN_TIME_TO_LIVE_IN_MS = 1000L * 20;
-  private static final float MEMORY_SUFFICIENT_THRESHOLD = 0.5f;
+  private static final float MEMORY_SUFFICIENT_THRESHOLD = 0.7f;
 
   private final File hardlinkOrCopiedFile;
   private final boolean isTsFile;
@@ -207,7 +207,7 @@ public class PipeTsFileResource implements AutoCloseable {
                 MEMORY_SUFFICIENT_THRESHOLD);
     if (allocatedMemoryBlock == null) {
       LOGGER.info(
-          "PipeTsFileResource: Failed to create TsFileSequenceReader for 
tsfile {} in cache, because memory usage is high",
+          "Failed to cacheDeviceIsAlignedMapIfAbsent for tsfile {}, because 
memory usage is high",
           hardlinkOrCopiedFile.getPath());
       return false;
     }
@@ -271,7 +271,7 @@ public class PipeTsFileResource implements AutoCloseable {
                 MEMORY_SUFFICIENT_THRESHOLD);
     if (allocatedMemoryBlock == null) {
       LOGGER.info(
-          "PipeTsFileResource: Failed to create TsFileSequenceReader for 
tsfile {} in cache, because memory usage is high",
+          "Failed to cacheObjectsIfAbsent for tsfile {}, because memory usage 
is high",
           hardlinkOrCopiedFile.getPath());
       return false;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 5deebe5b3ab..e40e0996e23 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -21,13 +21,11 @@ package org.apache.iotdb.db.protocol.thrift.impl;
 
 import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.common.rpc.thrift.TLoadSample;
 import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSender;
 import org.apache.iotdb.common.rpc.thrift.TServiceType;
@@ -273,7 +271,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -460,20 +457,7 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   }
 
   @Override
-  public TLoadResp sendLoadCommand(final TLoadCommandReq req) {
-    final List<Integer> regionIds = req.getRegionIds();
-    final Map<Integer, TRegionReplicaSet> id2replicaSetBeforeExecution =
-        req.isSetRegionIds()
-                && req.getCommandType() == 
LoadTsFileScheduler.LoadCommand.EXECUTE.ordinal()
-            ? regionIds.stream()
-                .collect(
-                    Collectors.toMap(
-                        regionId -> regionId,
-                        regionId ->
-                            partitionFetcher.getRegionReplicaSet(
-                                new 
TConsensusGroupId(TConsensusGroupType.DataRegion, regionId))))
-            : Collections.emptyMap();
-
+  public TLoadResp sendLoadCommand(TLoadCommandReq req) {
     final ProgressIndex progressIndex;
     if (req.isSetProgressIndex()) {
       progressIndex = 
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex()));
@@ -484,39 +468,13 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
           "Use local generated load progress index {} for uuid {}.", 
progressIndex, req.uuid);
     }
 
-    final TLoadResp resp =
-        createTLoadResp(
-            StorageEngine.getInstance()
-                .executeLoadCommand(
-                    LoadTsFileScheduler.LoadCommand.values()[req.commandType],
-                    req.uuid,
-                    req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe,
-                    progressIndex));
-
-    if (!id2replicaSetBeforeExecution.isEmpty()) {
-      for (Map.Entry<Integer, TRegionReplicaSet> entryBefore :
-          id2replicaSetBeforeExecution.entrySet()) {
-        final TRegionReplicaSet replicaSetAfterExecution =
-            partitionFetcher.getRegionReplicaSet(
-                new TConsensusGroupId(TConsensusGroupType.DataRegion, 
entryBefore.getKey()));
-        LOGGER.warn(
-            "Load request {} for region {} executed with replica set changed 
from {} to {}",
-            req.uuid,
-            entryBefore.getKey(),
-            entryBefore.getValue(),
-            replicaSetAfterExecution);
-        if (!Objects.equals(entryBefore.getValue(), replicaSetAfterExecution)) 
{
-          return createTLoadResp(
-              RpcUtils.getStatus(
-                  TSStatusCode.LOAD_FILE_ERROR,
-                  String.format(
-                      "Region %d replica set changed from %s to %s",
-                      entryBefore.getKey(), entryBefore.getValue(), 
replicaSetAfterExecution)));
-        }
-      }
-    }
-
-    return resp;
+    return createTLoadResp(
+        StorageEngine.getInstance()
+            .executeLoadCommand(
+                LoadTsFileScheduler.LoadCommand.values()[req.commandType],
+                req.uuid,
+                req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe,
+                progressIndex));
   }
 
   private TLoadResp createTLoadResp(TSStatus resultStatus) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
index d9c5e0dbac6..e76973c516b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ClusterPartitionFetcher.java
@@ -287,11 +287,6 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
     return partitionCache.updateGroupIdToReplicaSetMap(req.getTimestamp(), 
req.getRegionRouteMap());
   }
 
-  @Override
-  public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
-    return partitionCache.getRegionReplicaSet(id);
-  }
-
   @Override
   public void invalidAllCache() {
     partitionCache.invalidAllCache();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java
index 227c5df1e4e..ecd88a9c647 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/IPartitionFetcher.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.db.queryengine.plan.analyze;
 
-import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
-import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.partition.DataPartition;
 import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
@@ -90,8 +88,6 @@ public interface IPartitionFetcher {
   /** Update region cache in partition cache when receive request from config 
node */
   boolean updateRegionCache(TRegionRouteReq req);
 
-  TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id);
-
   /** Invalid all partition cache */
   void invalidAllCache();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index bab730c8af2..5227f230007 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -56,10 +56,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -107,7 +105,7 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
           for (FragmentInstance instance : instances) {
             try (SetThreadName threadName =
                 new SetThreadName(
-                    LoadTsFileScheduler.class.getName() + 
instance.getId().getFullId())) {
+                    "load-dispatcher" + "-" + instance.getId().getFullId() + 
"-" + uuid)) {
               dispatchOneInstance(instance);
             } catch (FragmentInstanceDispatchException e) {
               return new FragInstanceDispatchResult(e.getFailureStatus());
@@ -222,24 +220,25 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
 
   public Future<FragInstanceDispatchResult> dispatchCommand(
       TLoadCommandReq loadCommandReq, Set<TRegionReplicaSet> replicaSets) {
-    Map<TEndPoint, List<Integer>> endPoint2RegionIdsMap = new HashMap<>();
+    Set<TEndPoint> allEndPoint = new HashSet<>();
     for (TRegionReplicaSet replicaSet : replicaSets) {
       for (TDataNodeLocation dataNodeLocation : 
replicaSet.getDataNodeLocations()) {
-        endPoint2RegionIdsMap
-            .computeIfAbsent(dataNodeLocation.getInternalEndPoint(), o -> new 
ArrayList<>())
-            .add(replicaSet.getRegionId().getId());
+        allEndPoint.add(dataNodeLocation.getInternalEndPoint());
       }
     }
 
-    for (final Map.Entry<TEndPoint, List<Integer>> entry : 
endPoint2RegionIdsMap.entrySet()) {
-      try (final SetThreadName threadName =
+    for (TEndPoint endPoint : allEndPoint) {
+      try (SetThreadName threadName =
           new SetThreadName(
-              LoadTsFileScheduler.class.getName() + "-" + 
loadCommandReq.commandType)) {
-        loadCommandReq.setRegionIds(entry.getValue());
-        if (isDispatchedToLocal(entry.getKey())) {
+              "load-dispatcher"
+                  + "-"
+                  + 
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType]
+                  + "-"
+                  + loadCommandReq.uuid)) {
+        if (isDispatchedToLocal(endPoint)) {
           dispatchLocally(loadCommandReq);
         } else {
-          dispatchRemote(loadCommandReq, entry.getKey());
+          dispatchRemote(loadCommandReq, endPoint);
         }
       } catch (FragmentInstanceDispatchException e) {
         LOGGER.warn("Cannot dispatch LoadCommand for load operation {}", 
loadCommandReq, e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index e7d6a719ef0..04d38f25f89 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -407,8 +407,6 @@ public class LoadTsFileScheduler implements IScheduler {
         stateMachine.transitionToFailed(status);
         return false;
       }
-
-      checkAllReplicaSetsConsistency();
     } catch (IOException e) {
       LOGGER.warn(
           "Serialize Progress Index error, isFirstPhaseSuccess: {}, uuid: {}, 
tsFile: {}",
@@ -425,8 +423,7 @@ public class LoadTsFileScheduler implements IScheduler {
       stateMachine.transitionToFailed(e);
       return false;
     } catch (Exception e) {
-      LOGGER.warn(
-          String.format("Exception occurred during second phase of loading 
TsFile %s.", tsFile), e);
+      LOGGER.warn("Exception occurred during second phase of loading TsFile 
{}.", tsFile, e);
       stateMachine.transitionToFailed(e);
       return false;
     }
@@ -443,25 +440,6 @@ public class LoadTsFileScheduler implements IScheduler {
     }
   }
 
-  public void checkAllReplicaSetsConsistency() throws 
RegionReplicaSetChangedException {
-    for (final TRegionReplicaSet replicaSet : allReplicaSets) {
-      final TConsensusGroupId regionId = replicaSet.getRegionId();
-      if (regionId == null) {
-        LOGGER.info(
-            "region id is null during region consistency check, will skip this 
region: {}",
-            replicaSet);
-        continue;
-      }
-
-      final TRegionReplicaSet currentReplicaSet =
-          partitionFetcher.fetcher.getRegionReplicaSet(regionId);
-      if (!Objects.equals(replicaSet, currentReplicaSet)) {
-        LOGGER.warn("Region replica set changed from {} to {}", replicaSet, 
currentReplicaSet);
-        throw new RegionReplicaSetChangedException(replicaSet, 
currentReplicaSet);
-      }
-    }
-  }
-
   private boolean loadLocally(LoadSingleTsFileNode node) throws IoTDBException 
{
     LOGGER.info("Start load TsFile {} locally.", 
node.getTsFileResource().getTsFile().getPath());
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
index de2379c380e..333842e38ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALEntryHandler.java
@@ -90,9 +90,7 @@ public class WALEntryHandler {
   public InsertNode getInsertNodeViaCacheIfPossible() {
     try {
       final WALEntryValue finalValue = value;
-      return finalValue instanceof InsertNode
-          ? (InsertNode) finalValue
-          : 
walEntryPosition.readByteBufferOrInsertNodeViaCacheDirectly().getRight();
+      return finalValue instanceof InsertNode ? (InsertNode) finalValue : null;
     } catch (Exception e) {
       logger.warn("Fail to get insert node via cache. {}", this, e);
       throw e;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
index 0541ff96188..efba8703072 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/utils/WALInsertNodeCache.java
@@ -77,9 +77,11 @@ public class WALInsertNodeCache {
     final long requestedAllocateSize =
         (long)
             Math.min(
-                (double) PIPE_CONFIG.getPipeMaxAllowedPinnedMemTableCount()
-                    * CONFIG.getWalFileSizeThresholdInByte(),
-                CONFIG.getAllocateMemoryForPipe() * 0.45);
+                1.0
+                    * PIPE_CONFIG.getPipeMaxAllowedPinnedMemTableCount()
+                    * CONFIG.getWalFileSizeThresholdInByte()
+                    / CONFIG.getDataRegionNum(),
+                0.5 * CONFIG.getAllocateMemoryForPipe() / 
CONFIG.getDataRegionNum());
     allocatedMemoryBlock =
         PipeDataNodeResourceManager.memory()
             .tryAllocate(requestedAllocateSize)
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java
index 1afde0bfd1e..e33cc2d322c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/analyze/FakePartitionFetcherImpl.java
@@ -296,11 +296,6 @@ public class FakePartitionFetcherImpl implements 
IPartitionFetcher {
     return true;
   }
 
-  @Override
-  public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
-    return null;
-  }
-
   @Override
   public void invalidAllCache() {}
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util.java
index 93bfb6c7170..e08bbb074ea 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util.java
@@ -416,11 +416,6 @@ public class Util {
         return false;
       }
 
-      @Override
-      public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
-        return null;
-      }
-
       @Override
       public void invalidAllCache() {}
     };
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util2.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util2.java
index fee4910628f..3d5e66aea49 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util2.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/Util2.java
@@ -310,11 +310,6 @@ public class Util2 {
         return false;
       }
 
-      @Override
-      public TRegionReplicaSet getRegionReplicaSet(TConsensusGroupId id) {
-        return null;
-      }
-
       @Override
       public void invalidAllCache() {}
     };
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index bbc9057bff9..4b25e08d1f6 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -336,7 +336,6 @@ struct TLoadCommandReq {
     2: required string uuid
     3: optional bool isGeneratedByPipe
     4: optional binary progressIndex
-    5: optional list<i32> regionIds
 }
 
 struct TLoadResp {

Reply via email to