This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch rc/1.0.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.0.1 by this push:
     new 5adf3db778 [To rc/1.0.1] Cherry-pick a37da33、0b65f4d、8af48e3 to 
rc/1.0.1 (#9049)
5adf3db778 is described below

commit 5adf3db778d7662c616f7bdd677af7f6678155ab
Author: Beyyes <[email protected]>
AuthorDate: Mon Feb 13 18:41:36 2023 +0800

    [To rc/1.0.1] Cherry-pick a37da33、0b65f4d、8af48e3 to rc/1.0.1 (#9049)
---
 .../java/org/apache/iotdb/isession/ISession.java   |  7 ++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../iotdb/db/mpp/common/FragmentInstanceId.java    |  6 ++
 .../mpp/execution/exchange/SharedTsBlockQueue.java | 29 +++++++--
 .../db/mpp/execution/exchange/SinkHandle.java      | 22 +++++--
 .../db/mpp/execution/exchange/SourceHandle.java    | 24 +++++--
 .../iotdb/db/mpp/execution/memory/MemoryPool.java  | 35 +++++++---
 .../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java  |  6 +-
 .../mpp/plan/analyze/ClusterPartitionFetcher.java  | 76 ++++++++++++++++++----
 .../distribution/DistributionPlanContext.java      | 14 ++++
 .../plan/planner/distribution/SourceRewriter.java  | 42 ++++++++----
 .../execution/exchange/LocalSinkHandleTest.java    | 14 +++-
 .../db/mpp/execution/exchange/SinkHandleTest.java  | 28 +++++---
 .../mpp/execution/exchange/SourceHandleTest.java   | 11 +++-
 .../iotdb/db/mpp/plan/analyze/AnalyzeTest.java     |  6 +-
 .../java/org/apache/iotdb/session/Session.java     | 31 ++++++++-
 .../org/apache/iotdb/session/pool/SessionPool.java | 16 ++++-
 17 files changed, 296 insertions(+), 73 deletions(-)

diff --git a/isession/src/main/java/org/apache/iotdb/isession/ISession.java 
b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 922e78d0bb..4ab78f128f 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.isession;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.isession.template.Template;
 import org.apache.iotdb.isession.util.SystemStatus;
 import org.apache.iotdb.isession.util.Version;
@@ -54,6 +55,12 @@ public interface ISession extends AutoCloseable {
   void open(boolean enableRPCCompression, int connectionTimeoutInMs)
       throws IoTDBConnectionException;
 
+  void open(
+      boolean enableRPCCompression,
+      int connectionTimeoutInMs,
+      Map<String, TEndPoint> deviceIdToEndpoint)
+      throws IoTDBConnectionException;
+
   void close() throws IoTDBConnectionException;
 
   String getTimeZone();
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 8664cf8ddc..269cf0a01e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -904,7 +904,7 @@ public class IoTDBConfig {
    * series partition
    */
   private String seriesPartitionExecutorClass =
-      "org.apache.iotdb.commons.partition.executor.hash.APHashExecutor";
+      "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
 
   /** The number of series partitions in a database */
   private int seriesPartitionSlotNum = 10000;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index e75d264700..070f7524bb 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -109,4 +109,10 @@ public class FragmentInstanceId {
   public static String createFullId(String queryId, int fragmentId, String 
instanceId) {
     return String.format("%s.%d.%s", queryId, fragmentId, instanceId);
   }
+
+  public static String createFragmentInstanceIdFromTFragmentInstanceId(
+      TFragmentInstanceId tFragmentInstanceId) {
+    return String.format(
+        "%d.%s", tFragmentInstanceId.getFragmentId(), 
tFragmentInstanceId.getInstanceId());
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 95e64b1828..1eca7b7bd9 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.mpp.execution.exchange;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
 import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -48,6 +49,8 @@ public class SharedTsBlockQueue {
   private final TFragmentInstanceId localFragmentInstanceId;
 
   private final String localPlanNodeId;
+
+  private final String fullFragmentInstanceId;
   private final LocalMemoryManager localMemoryManager;
 
   private boolean noMoreTsBlocks = false;
@@ -80,6 +83,8 @@ public class SharedTsBlockQueue {
       LocalMemoryManager localMemoryManager) {
     this.localFragmentInstanceId =
         Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be 
null");
+    this.fullFragmentInstanceId =
+        
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
     this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be 
null");
     this.localMemoryManager =
         Validate.notNull(localMemoryManager, "local memory manager cannot be 
null");
@@ -154,7 +159,7 @@ public class SharedTsBlockQueue {
         .getQueryPool()
         .free(
             localFragmentInstanceId.getQueryId(),
-            localFragmentInstanceId.getInstanceId(),
+            fullFragmentInstanceId,
             localPlanNodeId,
             tsBlock.getRetainedSizeInBytes());
     bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
@@ -181,7 +186,7 @@ public class SharedTsBlockQueue {
             .getQueryPool()
             .reserve(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 tsBlock.getRetainedSizeInBytes(),
                 maxBytesCanReserve);
@@ -228,11 +233,16 @@ public class SharedTsBlockQueue {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, 
localPlanNodeId);
   }
 
   /** Destroy the queue and cancel the future. Should only be called in 
abnormal case */
@@ -253,11 +263,16 @@ public class SharedTsBlockQueue {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, 
localPlanNodeId);
   }
 
   /** Destroy the queue and cancel the future. Should only be called in 
abnormal case */
@@ -278,10 +293,14 @@ public class SharedTsBlockQueue {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, 
localPlanNodeId);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 50a3f2e1e8..626c1b5698 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import 
org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.utils.SetThreadName;
@@ -65,6 +66,7 @@ public class SinkHandle implements ISinkHandle {
 
   private final String localPlanNodeId;
   private final TFragmentInstanceId localFragmentInstanceId;
+  private final String fullFragmentInstanceId;
   private final LocalMemoryManager localMemoryManager;
   private final ExecutorService executorService;
   private final TsBlockSerde serde;
@@ -116,6 +118,8 @@ public class SinkHandle implements ISinkHandle {
     this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
     this.localPlanNodeId = Validate.notNull(localPlanNodeId);
     this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.fullFragmentInstanceId =
+        
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
     this.localMemoryManager = Validate.notNull(localMemoryManager);
     this.executorService = Validate.notNull(executorService);
     this.serde = Validate.notNull(serde);
@@ -132,7 +136,7 @@ public class SinkHandle implements ISinkHandle {
             .getQueryPool()
             .reserve(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
                 DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES) // actually we only know 
maxBytesCanReserve after
@@ -171,7 +175,7 @@ public class SinkHandle implements ISinkHandle {
             .getQueryPool()
             .reserve(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 retainedSizeInBytes,
                 maxBytesCanReserve)
@@ -211,11 +215,15 @@ public class SinkHandle implements ISinkHandle {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, 
localPlanNodeId);
     sinkHandleListener.onAborted(this);
     logger.debug("[EndAbortSinkHandle]");
   }
@@ -231,11 +239,15 @@ public class SinkHandle implements ISinkHandle {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               bufferRetainedSizeInBytes);
       bufferRetainedSizeInBytes = 0;
     }
+    localMemoryManager
+        .getQueryPool()
+        .clearMemoryReservationMap(
+            localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, 
localPlanNodeId);
     sinkHandleListener.onFinish(this);
     logger.debug("[EndCloseSinkHandle]");
   }
@@ -315,7 +327,7 @@ public class SinkHandle implements ISinkHandle {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               freedBytes);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index cda072ff0f..edfa85afcd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import 
org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import 
org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.utils.SetThreadName;
@@ -61,6 +62,7 @@ public class SourceHandle implements ISourceHandle {
   private final TEndPoint remoteEndpoint;
   private final TFragmentInstanceId remoteFragmentInstanceId;
   private final TFragmentInstanceId localFragmentInstanceId;
+  private final String fullFragmentInstanceId;
   private final String localPlanNodeId;
   private final LocalMemoryManager localMemoryManager;
   private final ExecutorService executorService;
@@ -114,6 +116,8 @@ public class SourceHandle implements ISourceHandle {
     this.remoteEndpoint = Validate.notNull(remoteEndpoint);
     this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
     this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+    this.fullFragmentInstanceId =
+        
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
     this.localPlanNodeId = Validate.notNull(localPlanNodeId);
     this.localMemoryManager = Validate.notNull(localMemoryManager);
     this.executorService = Validate.notNull(executorService);
@@ -156,7 +160,7 @@ public class SourceHandle implements ISourceHandle {
           .getQueryPool()
           .free(
               localFragmentInstanceId.getQueryId(),
-              localFragmentInstanceId.getInstanceId(),
+              fullFragmentInstanceId,
               localPlanNodeId,
               retainedSize);
 
@@ -195,7 +199,7 @@ public class SourceHandle implements ISourceHandle {
               .getQueryPool()
               .reserve(
                   localFragmentInstanceId.getQueryId(),
-                  localFragmentInstanceId.getInstanceId(),
+                  fullFragmentInstanceId,
                   localPlanNodeId,
                   bytesToReserve,
                   maxBytesCanReserve);
@@ -297,11 +301,15 @@ public class SourceHandle implements ISourceHandle {
             .getQueryPool()
             .free(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 bufferRetainedSizeInBytes);
         bufferRetainedSizeInBytes = 0;
       }
+      localMemoryManager
+          .getQueryPool()
+          .clearMemoryReservationMap(
+              localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, 
localPlanNodeId);
       aborted = true;
       sourceHandleListener.onAborted(this);
     }
@@ -330,11 +338,15 @@ public class SourceHandle implements ISourceHandle {
             .getQueryPool()
             .free(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 bufferRetainedSizeInBytes);
         bufferRetainedSizeInBytes = 0;
       }
+      localMemoryManager
+          .getQueryPool()
+          .clearMemoryReservationMap(
+              localFragmentInstanceId.getQueryId(), fullFragmentInstanceId, 
localPlanNodeId);
       closed = true;
       currSequenceId = lastSequenceId + 1;
       sourceHandleListener.onFinished(this);
@@ -398,7 +410,7 @@ public class SourceHandle implements ISourceHandle {
         "Query[%s]-[%s-%s-SourceHandle-%s]",
         localFragmentInstanceId.getQueryId(),
         localFragmentInstanceId.getFragmentId(),
-        localFragmentInstanceId.getInstanceId(),
+        fullFragmentInstanceId,
         localPlanNodeId);
   }
 
@@ -500,7 +512,7 @@ public class SourceHandle implements ISourceHandle {
             .getQueryPool()
             .free(
                 localFragmentInstanceId.getQueryId(),
-                localFragmentInstanceId.getInstanceId(),
+                fullFragmentInstanceId,
                 localPlanNodeId,
                 reservedBytes);
         sourceHandleListener.onFailure(SourceHandle.this, t);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index b3d2fc2ff8..b2970bb22e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -291,14 +291,11 @@ public class MemoryPool {
       Validate.isTrue(bytes <= queryReservedBytes);
 
       queryReservedBytes -= bytes;
-      if (queryReservedBytes == 0) {
-        
queryMemoryReservations.get(queryId).get(fragmentInstanceId).remove(planNodeId);
-      } else {
-        queryMemoryReservations
-            .get(queryId)
-            .get(fragmentInstanceId)
-            .put(planNodeId, queryReservedBytes);
-      }
+      queryMemoryReservations
+          .get(queryId)
+          .get(fragmentInstanceId)
+          .put(planNodeId, queryReservedBytes);
+
       reservedBytes -= bytes;
 
       if (memoryReservationFutures.isEmpty()) {
@@ -349,7 +346,7 @@ public class MemoryPool {
         future.set(null);
       } catch (Throwable t) {
         // ignore it, because we still need to notify other future
-        LOGGER.error("error happened while trying to free memory: ", t);
+        LOGGER.warn("error happened while trying to free memory: ", t);
       }
     }
   }
@@ -368,4 +365,24 @@ public class MemoryPool {
   public long getReservedBytes() {
     return reservedBytes;
   }
+
+  public synchronized void clearMemoryReservationMap(
+      String queryId, String fragmentInstanceId, String planNodeId) {
+    if (queryMemoryReservations.get(queryId) == null
+        || queryMemoryReservations.get(queryId).get(fragmentInstanceId) == 
null) {
+      return;
+    }
+    Map<String, Long> planNodeIdToBytesReserved =
+        queryMemoryReservations.get(queryId).get(fragmentInstanceId);
+    if (planNodeIdToBytesReserved.get(planNodeId) == null
+        || planNodeIdToBytesReserved.get(planNodeId) <= 0) {
+      planNodeIdToBytesReserved.remove(planNodeId);
+      if (planNodeIdToBytesReserved.isEmpty()) {
+        queryMemoryReservations.get(queryId).remove(fragmentInstanceId);
+      }
+      if (queryMemoryReservations.get(queryId).isEmpty()) {
+        queryMemoryReservations.remove(queryId);
+      }
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 81910ef650..dbffbb4be3 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1682,7 +1682,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
     for (InsertRowStatement insertRowStatement : 
insertRowsStatement.getInsertRowStatementList()) {
       Set<TTimePartitionSlot> timePartitionSlotSet =
           dataPartitionQueryParamMap.computeIfAbsent(
-              insertRowStatement.getDevicePath().getFullPath(), k -> new 
HashSet());
+              insertRowStatement.getDevicePath().getFullPath(), k -> new 
HashSet<>());
       timePartitionSlotSet.addAll(insertRowStatement.getTimePartitionSlots());
     }
 
@@ -1707,7 +1707,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
         insertMultiTabletsStatement.getInsertTabletStatementList()) {
       Set<TTimePartitionSlot> timePartitionSlotSet =
           dataPartitionQueryParamMap.computeIfAbsent(
-              insertTabletStatement.getDevicePath().getFullPath(), k -> new 
HashSet());
+              insertTabletStatement.getDevicePath().getFullPath(), k -> new 
HashSet<>());
       
timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots());
     }
 
@@ -2372,7 +2372,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
 
     context.setQueryType(QueryType.WRITE);
     List<List<String>> measurementsList = 
createTemplateStatement.getMeasurements();
-    for (List measurements : measurementsList) {
+    for (List<String> measurements : measurementsList) {
       Set<String> measurementsSet = new HashSet<>(measurements);
       if (measurementsSet.size() < measurements.size()) {
         throw new SemanticException(
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 2c70aaf3d0..78103835a5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -58,9 +58,11 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 public class ClusterPartitionFetcher implements IPartitionFetcher {
 
@@ -181,7 +183,7 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
       try (ConfigNodeClient client =
           
configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         TDataPartitionTableResp dataPartitionTableResp =
-            
client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+            
client.getDataPartitionTable(constructDataPartitionReqForQuery(sgNameToQueryParamsMap));
         if (dataPartitionTableResp.getStatus().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
           dataPartition = parseDataPartitionResp(dataPartitionTableResp);
@@ -208,7 +210,7 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
     try (ConfigNodeClient client =
         
configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       TDataPartitionTableResp dataPartitionTableResp =
-          
client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+          
client.getDataPartitionTable(constructDataPartitionReqForQuery(sgNameToQueryParamsMap));
       if (dataPartitionTableResp.getStatus().getCode()
           == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
         return parseDataPartitionResp(dataPartitionTableResp);
@@ -261,9 +263,8 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
     if (null == dataPartition) {
       try (ConfigNodeClient client =
           
configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
-        TDataPartitionTableResp dataPartitionTableResp =
-            client.getOrCreateDataPartitionTable(
-                constructDataPartitionReq(splitDataPartitionQueryParams));
+        TDataPartitionReq req = 
constructDataPartitionReq(splitDataPartitionQueryParams);
+        TDataPartitionTableResp dataPartitionTableResp = 
client.getOrCreateDataPartitionTable(req);
 
         if (dataPartitionTableResp.getStatus().getCode()
             == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -350,6 +351,22 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
     }
   }
 
+  private static class ComplexTimeSlotList {
+    Set<TTimePartitionSlot> timeSlotList;
+    boolean needLeftAll;
+    boolean needRightAll;
+
+    private ComplexTimeSlotList(boolean needLeftAll, boolean needRightAll) {
+      timeSlotList = new HashSet<>();
+      this.needLeftAll = needLeftAll;
+      this.needRightAll = needRightAll;
+    }
+
+    private void putTimeSlot(List<TTimePartitionSlot> slotList) {
+      timeSlotList.addAll(slotList);
+    }
+  }
+
   private TDataPartitionReq constructDataPartitionReq(
       Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
     Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = 
new HashMap<>();
@@ -357,15 +374,50 @@ public class ClusterPartitionFetcher implements 
IPartitionFetcher {
         sgNameToQueryParamsMap.entrySet()) {
       // for each sg
       Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new 
HashMap<>();
+
+      Map<TSeriesPartitionSlot, ComplexTimeSlotList> 
seriesSlotTimePartitionMap = new HashMap<>();
+
+      for (DataPartitionQueryParam queryParam : entry.getValue()) {
+        seriesSlotTimePartitionMap
+            .computeIfAbsent(
+                
partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()),
+                k ->
+                    new ComplexTimeSlotList(
+                        queryParam.isNeedLeftAll(), 
queryParam.isNeedRightAll()))
+            .putTimeSlot(queryParam.getTimePartitionSlotList());
+      }
+      seriesSlotTimePartitionMap.forEach(
+          (k, v) ->
+              deviceToTimePartitionMap.put(
+                  k,
+                  new TTimeSlotList(
+                      new ArrayList<>(v.timeSlotList), v.needLeftAll, 
v.needRightAll)));
+      partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap);
+    }
+    return new TDataPartitionReq(partitionSlotsMap);
+  }
+
+  /** For query, DataPartitionQueryParam is shared by each device */
+  private TDataPartitionReq constructDataPartitionReqForQuery(
+      Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+    Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap = 
new HashMap<>();
+    TTimeSlotList sharedTTimeSlotList = null;
+    for (Map.Entry<String, List<DataPartitionQueryParam>> entry :
+        sgNameToQueryParamsMap.entrySet()) {
+      // for each sg
+      Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new 
HashMap<>();
+
       for (DataPartitionQueryParam queryParam : entry.getValue()) {
-        TTimeSlotList timePartitionSlotList =
-            new TTimeSlotList(
-                queryParam.getTimePartitionSlotList(),
-                queryParam.isNeedLeftAll(),
-                queryParam.isNeedRightAll());
-        deviceToTimePartitionMap.put(
+        if (sharedTTimeSlotList == null) {
+          sharedTTimeSlotList =
+              new TTimeSlotList(
+                  queryParam.getTimePartitionSlotList(),
+                  queryParam.isNeedLeftAll(),
+                  queryParam.isNeedRightAll());
+        }
+        deviceToTimePartitionMap.putIfAbsent(
             
partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()),
-            timePartitionSlotList);
+            sharedTTimeSlotList);
       }
       partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
index 32de442e65..c35c4a72ac 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
@@ -20,6 +20,9 @@
 package org.apache.iotdb.db.mpp.plan.planner.distribution;
 
 import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+
+import java.util.Map;
 
 public class DistributionPlanContext {
   protected boolean isRoot;
@@ -32,6 +35,9 @@ public class DistributionPlanContext {
   // DataRegions
   protected boolean queryMultiRegion;
 
+  // used by group by level
+  private Map<String, Expression> columnNameToExpression;
+
   protected DistributionPlanContext(MPPQueryContext queryContext) {
     this.isRoot = true;
     this.queryContext = queryContext;
@@ -62,4 +68,12 @@ public class DistributionPlanContext {
   public void setQueryMultiRegion(boolean queryMultiRegion) {
     this.queryMultiRegion = queryMultiRegion;
   }
+
+  public Map<String, Expression> getColumnNameToExpression() {
+    return columnNameToExpression;
+  }
+
+  public void setColumnNameToExpression(Map<String, Expression> 
columnNameToExpression) {
+    this.columnNameToExpression = columnNameToExpression;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 9cb276384e..aa11eb80c6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -749,6 +749,18 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
             : groupSourcesForGroupByLevel(root, sourceGroup, context);
 
     // Then, we calculate the attributes for GroupByLevelNode in each level
+    Map<String, Expression> columnNameToExpression = new HashMap<>();
+    for (CrossSeriesAggregationDescriptor originalDescriptor :
+        newRoot.getGroupByLevelDescriptors()) {
+      for (Expression exp : originalDescriptor.getInputExpressions()) {
+        columnNameToExpression.put(exp.getExpressionString(), exp);
+      }
+      columnNameToExpression.put(
+          originalDescriptor.getOutputExpression().getExpressionString(),
+          originalDescriptor.getOutputExpression());
+    }
+
+    context.setColumnNameToExpression(columnNameToExpression);
     calculateGroupByLevelNodeAttributes(newRoot, 0, context);
     return newRoot;
   }
@@ -884,22 +896,30 @@ public class SourceRewriter extends 
SimplePlanNodeRewriter<DistributionPlanConte
       // Check every OutputColumn of GroupByLevelNode and set the Expression 
of corresponding
       // AggregationDescriptor
       List<CrossSeriesAggregationDescriptor> descriptorList = new 
ArrayList<>();
+      Map<String, Expression> columnNameToExpression = 
context.getColumnNameToExpression();
+      Set<Expression> childrenExpressionSet = new HashSet<>();
+      for (String childColumn : childrenOutputColumns) {
+        Expression childExpression =
+            columnNameToExpression.get(
+                childColumn.substring(childColumn.indexOf("(") + 1, 
childColumn.lastIndexOf(")")));
+        childrenExpressionSet.add(childExpression);
+      }
+
       for (CrossSeriesAggregationDescriptor originalDescriptor :
           handle.getGroupByLevelDescriptors()) {
         Set<Expression> descriptorExpressions = new HashSet<>();
-        for (String childColumn : childrenOutputColumns) {
-          // If this condition matched, the childColumn should come from 
GroupByLevelNode
-          if (isAggColumnMatchExpression(childColumn, 
originalDescriptor.getOutputExpression())) {
-            
descriptorExpressions.add(originalDescriptor.getOutputExpression());
-            continue;
-          }
-          for (Expression exp : originalDescriptor.getInputExpressions()) {
-            if (isAggColumnMatchExpression(childColumn, exp)) {
-              descriptorExpressions.add(exp);
-            }
+
+        if 
(childrenExpressionSet.contains(originalDescriptor.getOutputExpression())) {
+          descriptorExpressions.add(originalDescriptor.getOutputExpression());
+        }
+
+        for (Expression exp : originalDescriptor.getInputExpressions()) {
+          if (childrenExpressionSet.contains(exp)) {
+            descriptorExpressions.add(exp);
           }
         }
-        if (descriptorExpressions.size() == 0) {
+
+        if (descriptorExpressions.isEmpty()) {
           continue;
         }
         CrossSeriesAggregationDescriptor descriptor = 
originalDescriptor.deepClone();
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
index 37055a6a24..bac907fa4a 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.mpp.execution.exchange;
 
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import 
org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -91,7 +92,8 @@ public class LocalSinkHandleTest {
     Mockito.verify(spyMemoryPool, Mockito.times(11))
         .reserve(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                remoteFragmentInstanceId),
             remotePlanNodeId,
             mockTsBlockSize,
             Long.MAX_VALUE);
@@ -107,7 +109,12 @@ public class LocalSinkHandleTest {
     Assert.assertFalse(localSinkHandle.isFinished());
     Assert.assertEquals(0L, localSinkHandle.getBufferRetainedSizeInBytes());
     Mockito.verify(spyMemoryPool, Mockito.times(11))
-        .free(queryId, localFragmentInstanceId.getInstanceId(), 
remotePlanNodeId, mockTsBlockSize);
+        .free(
+            queryId,
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                remoteFragmentInstanceId),
+            remotePlanNodeId,
+            mockTsBlockSize);
 
     // Set no-more-TsBlocks.
     localSinkHandle.setNoMoreTsBlocks();
@@ -179,7 +186,8 @@ public class LocalSinkHandleTest {
     Mockito.verify(spyMemoryPool, Mockito.times(11))
         .reserve(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                remoteFragmentInstanceId),
             remotePlanNodeId,
             mockTsBlockSize,
             Long.MAX_VALUE);
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
index 0c8ebf9f73..8c033e2b02 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import 
org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import 
org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -116,7 +117,8 @@ public class SinkHandleTest {
     //    Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
     //        .reserve(
     //            queryId,
-    //            localFragmentInstanceId.getInstanceId(),
+    //            
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+    //            remoteFragmentInstanceId),
     //            localPlanNodeId,
     //            mockTsBlockSize * numOfMockTsBlock,
     //            Long.MAX_VALUE);
@@ -164,7 +166,8 @@ public class SinkHandleTest {
     Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
         .free(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock * mockTsBlockSize);
     Mockito.verify(mockSinkHandleListener, 
Mockito.timeout(10_0000).times(1)).onFinish(sinkHandle);
@@ -201,7 +204,8 @@ public class SinkHandleTest {
     MemoryPool mockMemoryPool =
         Utils.createMockBlockedMemoryPool(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock,
             mockTsBlockSize);
@@ -261,7 +265,8 @@ public class SinkHandleTest {
     //    Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
     //        .reserve(
     //            queryId,
-    //            localFragmentInstanceId.getInstanceId(),
+    //            
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+    //            remoteFragmentInstanceId),
     //            localPlanNodeId,
     //            mockTsBlockSize * numOfMockTsBlock,
     //            Long.MAX_VALUE);
@@ -302,7 +307,8 @@ public class SinkHandleTest {
     Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
         .free(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock * mockTsBlockSize);
 
@@ -318,7 +324,8 @@ public class SinkHandleTest {
     //    Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(3))
     //        .reserve(
     //            queryId,
-    //            localFragmentInstanceId.getInstanceId(),
+    //            
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+    //            remoteFragmentInstanceId),
     //            localPlanNodeId,
     //            mockTsBlockSize * numOfMockTsBlock,
     //            Long.MAX_VALUE);
@@ -378,7 +385,8 @@ public class SinkHandleTest {
     Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(2))
         .free(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock * mockTsBlockSize);
     Mockito.verify(mockSinkHandleListener, 
Mockito.timeout(10_0000).times(1)).onFinish(sinkHandle);
@@ -401,7 +409,8 @@ public class SinkHandleTest {
     MemoryPool mockMemoryPool =
         Utils.createMockBlockedMemoryPool(
             queryId,
-            localFragmentInstanceId.getInstanceId(),
+            FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                localFragmentInstanceId),
             localPlanNodeId,
             numOfMockTsBlock,
             mockTsBlockSize);
@@ -462,7 +471,8 @@ public class SinkHandleTest {
     //    Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
     //        .reserve(
     //            queryId,
-    //            localFragmentInstanceId.getInstanceId(),
+    //            
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+    //            remoteFragmentInstanceId),
     //            localPlanNodeId,
     //            mockTsBlockSize * numOfMockTsBlock,
     //            Long.MAX_VALUE);
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
index aa9adbace6..ae2c2c3f6d 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import 
org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
 import 
org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
 import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
 import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -233,7 +234,8 @@ public class SourceHandleTest {
       Mockito.verify(spyMemoryPool, Mockito.timeout(10_000).times(6))
           .reserve(
               queryId,
-              localFragmentInstanceId.getInstanceId(),
+              
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                  localFragmentInstanceId),
               localPlanNodeId,
               mockTsBlockSize,
               5 * mockTsBlockSize);
@@ -263,7 +265,12 @@ public class SourceHandleTest {
     // The local fragment instance consumes the data blocks.
     for (int i = 0; i < numOfMockTsBlock; i++) {
       Mockito.verify(spyMemoryPool, Mockito.timeout(10_0000).times(i))
-          .free(queryId, localFragmentInstanceId.getInstanceId(), 
localPlanNodeId, mockTsBlockSize);
+          .free(
+              queryId,
+              
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+                  localFragmentInstanceId),
+              localPlanNodeId,
+              mockTsBlockSize);
       sourceHandle.receive();
       try {
         if (i < 5) {
diff --git 
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java 
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
index d15cdd3ce4..3fbf8f44c1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
@@ -581,13 +581,13 @@ public class AnalyzeTest {
   public void testDataPartitionAnalyze() {
     Analysis analysis = analyzeSQL("insert into root.sg.d1(timestamp,s) 
values(1,10),(86401,11)");
     Assert.assertEquals(
+        1,
         analysis
             .getDataPartitionInfo()
             .getDataPartitionMap()
             .get("root.sg")
-            .get(new TSeriesPartitionSlot(8923))
-            .size(),
-        1);
+            .get(new TSeriesPartitionSlot(1107))
+            .size());
   }
 
   @Test
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java 
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 9638238c3d..0298785756 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -404,6 +404,28 @@ public class Session implements ISession {
     }
   }
 
+  @Override
+  public synchronized void open(
+      boolean enableRPCCompression,
+      int connectionTimeoutInMs,
+      Map<String, TEndPoint> deviceIdToEndpoint)
+      throws IoTDBConnectionException {
+    if (!isClosed) {
+      return;
+    }
+
+    this.enableRPCCompression = enableRPCCompression;
+    this.connectionTimeoutInMs = connectionTimeoutInMs;
+    defaultSessionConnection = constructSessionConnection(this, 
defaultEndPoint, zoneId);
+    defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
+    isClosed = false;
+    if (enableRedirection || enableQueryRedirection) {
+      this.deviceIdToEndpoint = deviceIdToEndpoint;
+      endPointToSessionConnection = new ConcurrentHashMap<>();
+      endPointToSessionConnection.put(defaultEndPoint, 
defaultSessionConnection);
+    }
+  }
+
   @Override
   public synchronized void close() throws IoTDBConnectionException {
     if (isClosed) {
@@ -920,7 +942,8 @@ public class Session implements ISession {
     TEndPoint endPoint;
     if (enableRedirection
         && !deviceIdToEndpoint.isEmpty()
-        && (endPoint = deviceIdToEndpoint.get(deviceId)) != null) {
+        && (endPoint = deviceIdToEndpoint.get(deviceId)) != null
+        && endPointToSessionConnection.containsKey(endPoint)) {
       return endPointToSessionConnection.get(endPoint);
     } else {
       return defaultSessionConnection;
@@ -965,7 +988,10 @@ public class Session implements ISession {
         return;
       }
       AtomicReference<IoTDBConnectionException> exceptionReference = new 
AtomicReference<>();
-      deviceIdToEndpoint.put(deviceId, endpoint);
+      if (!deviceIdToEndpoint.containsKey(deviceId)
+          || !deviceIdToEndpoint.get(deviceId).equals(endpoint)) {
+        deviceIdToEndpoint.put(deviceId, endpoint);
+      }
       SessionConnection connection =
           endPointToSessionConnection.computeIfAbsent(
               endpoint,
@@ -3259,6 +3285,7 @@ public class Session implements ISession {
         completableFuture.join();
       } catch (CompletionException completionException) {
         Throwable cause = completionException.getCause();
+        logger.error("Meet error when async insert!", cause);
         if (cause instanceof IoTDBConnectionException) {
           throw (IoTDBConnectionException) cause;
         } else {
diff --git 
a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java 
b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 12661ec21d..ed89b085bf 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.session.pool;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.isession.ISessionDataSet;
 import org.apache.iotdb.isession.SessionConfig;
@@ -93,6 +94,8 @@ public class SessionPool implements ISessionPool {
   private boolean enableRedirection;
   private boolean enableQueryRedirection = false;
 
+  private Map<String, TEndPoint> deviceIdToEndpoint;
+
   private int thriftDefaultBufferSize;
   private int thriftMaxFrameSize;
 
@@ -299,6 +302,9 @@ public class SessionPool implements ISessionPool {
     this.enableCompression = enableCompression;
     this.zoneId = zoneId;
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     this.version = version;
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -330,6 +336,9 @@ public class SessionPool implements ISessionPool {
     this.enableCompression = enableCompression;
     this.zoneId = zoneId;
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     this.connectionTimeoutInMs = connectionTimeoutInMs;
     this.version = version;
     this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -448,7 +457,7 @@ public class SessionPool implements ISessionPool {
       session = constructNewSession();
 
       try {
-        session.open(enableCompression, connectionTimeoutInMs);
+        session.open(enableCompression, connectionTimeoutInMs, 
deviceIdToEndpoint);
         // avoid someone has called close() the session pool
         synchronized (this) {
           if (closed) {
@@ -548,7 +557,7 @@ public class SessionPool implements ISessionPool {
   private void tryConstructNewSession() {
     Session session = constructNewSession();
     try {
-      session.open(enableCompression, connectionTimeoutInMs);
+      session.open(enableCompression, connectionTimeoutInMs, 
deviceIdToEndpoint);
       // avoid someone has called close() the session pool
       synchronized (this) {
         if (closed) {
@@ -2639,6 +2648,9 @@ public class SessionPool implements ISessionPool {
   @Override
   public void setEnableRedirection(boolean enableRedirection) {
     this.enableRedirection = enableRedirection;
+    if (this.enableRedirection) {
+      deviceIdToEndpoint = new ConcurrentHashMap<>();
+    }
     for (ISession session : queue) {
       session.setEnableRedirection(enableRedirection);
     }

Reply via email to