This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch rc/1.0.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.0.1 by this push:
new 5adf3db778 [To rc/1.0.1] Cherry-pick a37da33、0b65f4d、8af48e3 to
rc/1.0.1 (#9049)
5adf3db778 is described below
commit 5adf3db778d7662c616f7bdd677af7f6678155ab
Author: Beyyes <[email protected]>
AuthorDate: Mon Feb 13 18:41:36 2023 +0800
[To rc/1.0.1] Cherry-pick a37da33、0b65f4d、8af48e3 to rc/1.0.1 (#9049)
---
.../java/org/apache/iotdb/isession/ISession.java | 7 ++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../iotdb/db/mpp/common/FragmentInstanceId.java | 6 ++
.../mpp/execution/exchange/SharedTsBlockQueue.java | 29 +++++++--
.../db/mpp/execution/exchange/SinkHandle.java | 22 +++++--
.../db/mpp/execution/exchange/SourceHandle.java | 24 +++++--
.../iotdb/db/mpp/execution/memory/MemoryPool.java | 35 +++++++---
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 6 +-
.../mpp/plan/analyze/ClusterPartitionFetcher.java | 76 ++++++++++++++++++----
.../distribution/DistributionPlanContext.java | 14 ++++
.../plan/planner/distribution/SourceRewriter.java | 42 ++++++++----
.../execution/exchange/LocalSinkHandleTest.java | 14 +++-
.../db/mpp/execution/exchange/SinkHandleTest.java | 28 +++++---
.../mpp/execution/exchange/SourceHandleTest.java | 11 +++-
.../iotdb/db/mpp/plan/analyze/AnalyzeTest.java | 6 +-
.../java/org/apache/iotdb/session/Session.java | 31 ++++++++-
.../org/apache/iotdb/session/pool/SessionPool.java | 16 ++++-
17 files changed, 296 insertions(+), 73 deletions(-)
diff --git a/isession/src/main/java/org/apache/iotdb/isession/ISession.java
b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 922e78d0bb..4ab78f128f 100644
--- a/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ b/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.isession;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.isession.template.Template;
import org.apache.iotdb.isession.util.SystemStatus;
import org.apache.iotdb.isession.util.Version;
@@ -54,6 +55,12 @@ public interface ISession extends AutoCloseable {
void open(boolean enableRPCCompression, int connectionTimeoutInMs)
throws IoTDBConnectionException;
+ void open(
+ boolean enableRPCCompression,
+ int connectionTimeoutInMs,
+ Map<String, TEndPoint> deviceIdToEndpoint)
+ throws IoTDBConnectionException;
+
void close() throws IoTDBConnectionException;
String getTimeZone();
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 8664cf8ddc..269cf0a01e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -904,7 +904,7 @@ public class IoTDBConfig {
* series partition
*/
private String seriesPartitionExecutorClass =
- "org.apache.iotdb.commons.partition.executor.hash.APHashExecutor";
+ "org.apache.iotdb.commons.partition.executor.hash.BKDRHashExecutor";
/** The number of series partitions in a database */
private int seriesPartitionSlotNum = 10000;
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
index e75d264700..070f7524bb 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/common/FragmentInstanceId.java
@@ -109,4 +109,10 @@ public class FragmentInstanceId {
public static String createFullId(String queryId, int fragmentId, String
instanceId) {
return String.format("%s.%d.%s", queryId, fragmentId, instanceId);
}
+
+ public static String createFragmentInstanceIdFromTFragmentInstanceId(
+ TFragmentInstanceId tFragmentInstanceId) {
+ return String.format(
+ "%d.%s", tFragmentInstanceId.getFragmentId(),
tFragmentInstanceId.getInstanceId());
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
index 95e64b1828..1eca7b7bd9 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SharedTsBlockQueue.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.execution.exchange;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
@@ -48,6 +49,8 @@ public class SharedTsBlockQueue {
private final TFragmentInstanceId localFragmentInstanceId;
private final String localPlanNodeId;
+
+ private final String fullFragmentInstanceId;
private final LocalMemoryManager localMemoryManager;
private boolean noMoreTsBlocks = false;
@@ -80,6 +83,8 @@ public class SharedTsBlockQueue {
LocalMemoryManager localMemoryManager) {
this.localFragmentInstanceId =
Validate.notNull(fragmentInstanceId, "fragment instance ID cannot be
null");
+ this.fullFragmentInstanceId =
+
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
this.localPlanNodeId = Validate.notNull(planNodeId, "PlanNode ID cannot be
null");
this.localMemoryManager =
Validate.notNull(localMemoryManager, "local memory manager cannot be
null");
@@ -154,7 +159,7 @@ public class SharedTsBlockQueue {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
tsBlock.getRetainedSizeInBytes());
bufferRetainedSizeInBytes -= tsBlock.getRetainedSizeInBytes();
@@ -181,7 +186,7 @@ public class SharedTsBlockQueue {
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
tsBlock.getRetainedSizeInBytes(),
maxBytesCanReserve);
@@ -228,11 +233,16 @@ public class SharedTsBlockQueue {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId,
localPlanNodeId);
}
/** Destroy the queue and cancel the future. Should only be called in
abnormal case */
@@ -253,11 +263,16 @@ public class SharedTsBlockQueue {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId,
localPlanNodeId);
}
/** Destroy the queue and cancel the future. Should only be called in
abnormal case */
@@ -278,10 +293,14 @@ public class SharedTsBlockQueue {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId,
localPlanNodeId);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 50a3f2e1e8..626c1b5698 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import
org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.utils.SetThreadName;
@@ -65,6 +66,7 @@ public class SinkHandle implements ISinkHandle {
private final String localPlanNodeId;
private final TFragmentInstanceId localFragmentInstanceId;
+ private final String fullFragmentInstanceId;
private final LocalMemoryManager localMemoryManager;
private final ExecutorService executorService;
private final TsBlockSerde serde;
@@ -116,6 +118,8 @@ public class SinkHandle implements ISinkHandle {
this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
this.localPlanNodeId = Validate.notNull(localPlanNodeId);
this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+ this.fullFragmentInstanceId =
+
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
this.localMemoryManager = Validate.notNull(localMemoryManager);
this.executorService = Validate.notNull(executorService);
this.serde = Validate.notNull(serde);
@@ -132,7 +136,7 @@ public class SinkHandle implements ISinkHandle {
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES) // actually we only know
maxBytesCanReserve after
@@ -171,7 +175,7 @@ public class SinkHandle implements ISinkHandle {
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
retainedSizeInBytes,
maxBytesCanReserve)
@@ -211,11 +215,15 @@ public class SinkHandle implements ISinkHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId,
localPlanNodeId);
sinkHandleListener.onAborted(this);
logger.debug("[EndAbortSinkHandle]");
}
@@ -231,11 +239,15 @@ public class SinkHandle implements ISinkHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId,
localPlanNodeId);
sinkHandleListener.onFinish(this);
logger.debug("[EndCloseSinkHandle]");
}
@@ -315,7 +327,7 @@ public class SinkHandle implements ISinkHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
freedBytes);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
index cda072ff0f..edfa85afcd 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandle.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import
org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import
org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.utils.SetThreadName;
@@ -61,6 +62,7 @@ public class SourceHandle implements ISourceHandle {
private final TEndPoint remoteEndpoint;
private final TFragmentInstanceId remoteFragmentInstanceId;
private final TFragmentInstanceId localFragmentInstanceId;
+ private final String fullFragmentInstanceId;
private final String localPlanNodeId;
private final LocalMemoryManager localMemoryManager;
private final ExecutorService executorService;
@@ -114,6 +116,8 @@ public class SourceHandle implements ISourceHandle {
this.remoteEndpoint = Validate.notNull(remoteEndpoint);
this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
+ this.fullFragmentInstanceId =
+
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(localFragmentInstanceId);
this.localPlanNodeId = Validate.notNull(localPlanNodeId);
this.localMemoryManager = Validate.notNull(localMemoryManager);
this.executorService = Validate.notNull(executorService);
@@ -156,7 +160,7 @@ public class SourceHandle implements ISourceHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
retainedSize);
@@ -195,7 +199,7 @@ public class SourceHandle implements ISourceHandle {
.getQueryPool()
.reserve(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bytesToReserve,
maxBytesCanReserve);
@@ -297,11 +301,15 @@ public class SourceHandle implements ISourceHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId,
localPlanNodeId);
aborted = true;
sourceHandleListener.onAborted(this);
}
@@ -330,11 +338,15 @@ public class SourceHandle implements ISourceHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
bufferRetainedSizeInBytes);
bufferRetainedSizeInBytes = 0;
}
+ localMemoryManager
+ .getQueryPool()
+ .clearMemoryReservationMap(
+ localFragmentInstanceId.getQueryId(), fullFragmentInstanceId,
localPlanNodeId);
closed = true;
currSequenceId = lastSequenceId + 1;
sourceHandleListener.onFinished(this);
@@ -398,7 +410,7 @@ public class SourceHandle implements ISourceHandle {
"Query[%s]-[%s-%s-SourceHandle-%s]",
localFragmentInstanceId.getQueryId(),
localFragmentInstanceId.getFragmentId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId);
}
@@ -500,7 +512,7 @@ public class SourceHandle implements ISourceHandle {
.getQueryPool()
.free(
localFragmentInstanceId.getQueryId(),
- localFragmentInstanceId.getInstanceId(),
+ fullFragmentInstanceId,
localPlanNodeId,
reservedBytes);
sourceHandleListener.onFailure(SourceHandle.this, t);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
index b3d2fc2ff8..b2970bb22e 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/memory/MemoryPool.java
@@ -291,14 +291,11 @@ public class MemoryPool {
Validate.isTrue(bytes <= queryReservedBytes);
queryReservedBytes -= bytes;
- if (queryReservedBytes == 0) {
-
queryMemoryReservations.get(queryId).get(fragmentInstanceId).remove(planNodeId);
- } else {
- queryMemoryReservations
- .get(queryId)
- .get(fragmentInstanceId)
- .put(planNodeId, queryReservedBytes);
- }
+ queryMemoryReservations
+ .get(queryId)
+ .get(fragmentInstanceId)
+ .put(planNodeId, queryReservedBytes);
+
reservedBytes -= bytes;
if (memoryReservationFutures.isEmpty()) {
@@ -349,7 +346,7 @@ public class MemoryPool {
future.set(null);
} catch (Throwable t) {
// ignore it, because we still need to notify other future
- LOGGER.error("error happened while trying to free memory: ", t);
+ LOGGER.warn("error happened while trying to free memory: ", t);
}
}
}
@@ -368,4 +365,24 @@ public class MemoryPool {
public long getReservedBytes() {
return reservedBytes;
}
+
+ public synchronized void clearMemoryReservationMap(
+ String queryId, String fragmentInstanceId, String planNodeId) {
+ if (queryMemoryReservations.get(queryId) == null
+ || queryMemoryReservations.get(queryId).get(fragmentInstanceId) ==
null) {
+ return;
+ }
+ Map<String, Long> planNodeIdToBytesReserved =
+ queryMemoryReservations.get(queryId).get(fragmentInstanceId);
+ if (planNodeIdToBytesReserved.get(planNodeId) == null
+ || planNodeIdToBytesReserved.get(planNodeId) <= 0) {
+ planNodeIdToBytesReserved.remove(planNodeId);
+ if (planNodeIdToBytesReserved.isEmpty()) {
+ queryMemoryReservations.get(queryId).remove(fragmentInstanceId);
+ }
+ if (queryMemoryReservations.get(queryId).isEmpty()) {
+ queryMemoryReservations.remove(queryId);
+ }
+ }
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 81910ef650..dbffbb4be3 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -1682,7 +1682,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
for (InsertRowStatement insertRowStatement :
insertRowsStatement.getInsertRowStatementList()) {
Set<TTimePartitionSlot> timePartitionSlotSet =
dataPartitionQueryParamMap.computeIfAbsent(
- insertRowStatement.getDevicePath().getFullPath(), k -> new
HashSet());
+ insertRowStatement.getDevicePath().getFullPath(), k -> new
HashSet<>());
timePartitionSlotSet.addAll(insertRowStatement.getTimePartitionSlots());
}
@@ -1707,7 +1707,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
insertMultiTabletsStatement.getInsertTabletStatementList()) {
Set<TTimePartitionSlot> timePartitionSlotSet =
dataPartitionQueryParamMap.computeIfAbsent(
- insertTabletStatement.getDevicePath().getFullPath(), k -> new
HashSet());
+ insertTabletStatement.getDevicePath().getFullPath(), k -> new
HashSet<>());
timePartitionSlotSet.addAll(insertTabletStatement.getTimePartitionSlots());
}
@@ -2372,7 +2372,7 @@ public class AnalyzeVisitor extends
StatementVisitor<Analysis, MPPQueryContext>
context.setQueryType(QueryType.WRITE);
List<List<String>> measurementsList =
createTemplateStatement.getMeasurements();
- for (List measurements : measurementsList) {
+ for (List<String> measurements : measurementsList) {
Set<String> measurementsSet = new HashSet<>(measurements);
if (measurementsSet.size() < measurements.size()) {
throw new SemanticException(
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 2c70aaf3d0..78103835a5 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -58,9 +58,11 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class ClusterPartitionFetcher implements IPartitionFetcher {
@@ -181,7 +183,7 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
TDataPartitionTableResp dataPartitionTableResp =
-
client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+
client.getDataPartitionTable(constructDataPartitionReqForQuery(sgNameToQueryParamsMap));
if (dataPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
dataPartition = parseDataPartitionResp(dataPartitionTableResp);
@@ -208,7 +210,7 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
TDataPartitionTableResp dataPartitionTableResp =
-
client.getDataPartitionTable(constructDataPartitionReq(sgNameToQueryParamsMap));
+
client.getDataPartitionTable(constructDataPartitionReqForQuery(sgNameToQueryParamsMap));
if (dataPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return parseDataPartitionResp(dataPartitionTableResp);
@@ -261,9 +263,8 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
if (null == dataPartition) {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
- TDataPartitionTableResp dataPartitionTableResp =
- client.getOrCreateDataPartitionTable(
- constructDataPartitionReq(splitDataPartitionQueryParams));
+ TDataPartitionReq req =
constructDataPartitionReq(splitDataPartitionQueryParams);
+ TDataPartitionTableResp dataPartitionTableResp =
client.getOrCreateDataPartitionTable(req);
if (dataPartitionTableResp.getStatus().getCode()
== TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -350,6 +351,22 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
}
}
+ private static class ComplexTimeSlotList {
+ Set<TTimePartitionSlot> timeSlotList;
+ boolean needLeftAll;
+ boolean needRightAll;
+
+ private ComplexTimeSlotList(boolean needLeftAll, boolean needRightAll) {
+ timeSlotList = new HashSet<>();
+ this.needLeftAll = needLeftAll;
+ this.needRightAll = needRightAll;
+ }
+
+ private void putTimeSlot(List<TTimePartitionSlot> slotList) {
+ timeSlotList.addAll(slotList);
+ }
+ }
+
private TDataPartitionReq constructDataPartitionReq(
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
new HashMap<>();
@@ -357,15 +374,50 @@ public class ClusterPartitionFetcher implements
IPartitionFetcher {
sgNameToQueryParamsMap.entrySet()) {
// for each sg
Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new
HashMap<>();
+
+ Map<TSeriesPartitionSlot, ComplexTimeSlotList>
seriesSlotTimePartitionMap = new HashMap<>();
+
+ for (DataPartitionQueryParam queryParam : entry.getValue()) {
+ seriesSlotTimePartitionMap
+ .computeIfAbsent(
+
partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()),
+ k ->
+ new ComplexTimeSlotList(
+ queryParam.isNeedLeftAll(),
queryParam.isNeedRightAll()))
+ .putTimeSlot(queryParam.getTimePartitionSlotList());
+ }
+ seriesSlotTimePartitionMap.forEach(
+ (k, v) ->
+ deviceToTimePartitionMap.put(
+ k,
+ new TTimeSlotList(
+ new ArrayList<>(v.timeSlotList), v.needLeftAll,
v.needRightAll)));
+ partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap);
+ }
+ return new TDataPartitionReq(partitionSlotsMap);
+ }
+
+ /** For query, DataPartitionQueryParam is shared by each device */
+ private TDataPartitionReq constructDataPartitionReqForQuery(
+ Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap) {
+ Map<String, Map<TSeriesPartitionSlot, TTimeSlotList>> partitionSlotsMap =
new HashMap<>();
+ TTimeSlotList sharedTTimeSlotList = null;
+ for (Map.Entry<String, List<DataPartitionQueryParam>> entry :
+ sgNameToQueryParamsMap.entrySet()) {
+ // for each sg
+ Map<TSeriesPartitionSlot, TTimeSlotList> deviceToTimePartitionMap = new
HashMap<>();
+
for (DataPartitionQueryParam queryParam : entry.getValue()) {
- TTimeSlotList timePartitionSlotList =
- new TTimeSlotList(
- queryParam.getTimePartitionSlotList(),
- queryParam.isNeedLeftAll(),
- queryParam.isNeedRightAll());
- deviceToTimePartitionMap.put(
+ if (sharedTTimeSlotList == null) {
+ sharedTTimeSlotList =
+ new TTimeSlotList(
+ queryParam.getTimePartitionSlotList(),
+ queryParam.isNeedLeftAll(),
+ queryParam.isNeedRightAll());
+ }
+ deviceToTimePartitionMap.putIfAbsent(
partitionExecutor.getSeriesPartitionSlot(queryParam.getDevicePath()),
- timePartitionSlotList);
+ sharedTTimeSlotList);
}
partitionSlotsMap.put(entry.getKey(), deviceToTimePartitionMap);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
index 32de442e65..c35c4a72ac 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanContext.java
@@ -20,6 +20,9 @@
package org.apache.iotdb.db.mpp.plan.planner.distribution;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
+import org.apache.iotdb.db.mpp.plan.expression.Expression;
+
+import java.util.Map;
public class DistributionPlanContext {
protected boolean isRoot;
@@ -32,6 +35,9 @@ public class DistributionPlanContext {
// DataRegions
protected boolean queryMultiRegion;
+ // used by group by level
+ private Map<String, Expression> columnNameToExpression;
+
protected DistributionPlanContext(MPPQueryContext queryContext) {
this.isRoot = true;
this.queryContext = queryContext;
@@ -62,4 +68,12 @@ public class DistributionPlanContext {
public void setQueryMultiRegion(boolean queryMultiRegion) {
this.queryMultiRegion = queryMultiRegion;
}
+
+ public Map<String, Expression> getColumnNameToExpression() {
+ return columnNameToExpression;
+ }
+
+ public void setColumnNameToExpression(Map<String, Expression>
columnNameToExpression) {
+ this.columnNameToExpression = columnNameToExpression;
+ }
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 9cb276384e..aa11eb80c6 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -749,6 +749,18 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
: groupSourcesForGroupByLevel(root, sourceGroup, context);
// Then, we calculate the attributes for GroupByLevelNode in each level
+ Map<String, Expression> columnNameToExpression = new HashMap<>();
+ for (CrossSeriesAggregationDescriptor originalDescriptor :
+ newRoot.getGroupByLevelDescriptors()) {
+ for (Expression exp : originalDescriptor.getInputExpressions()) {
+ columnNameToExpression.put(exp.getExpressionString(), exp);
+ }
+ columnNameToExpression.put(
+ originalDescriptor.getOutputExpression().getExpressionString(),
+ originalDescriptor.getOutputExpression());
+ }
+
+ context.setColumnNameToExpression(columnNameToExpression);
calculateGroupByLevelNodeAttributes(newRoot, 0, context);
return newRoot;
}
@@ -884,22 +896,30 @@ public class SourceRewriter extends
SimplePlanNodeRewriter<DistributionPlanConte
// Check every OutputColumn of GroupByLevelNode and set the Expression
of corresponding
// AggregationDescriptor
List<CrossSeriesAggregationDescriptor> descriptorList = new
ArrayList<>();
+ Map<String, Expression> columnNameToExpression =
context.getColumnNameToExpression();
+ Set<Expression> childrenExpressionSet = new HashSet<>();
+ for (String childColumn : childrenOutputColumns) {
+ Expression childExpression =
+ columnNameToExpression.get(
+ childColumn.substring(childColumn.indexOf("(") + 1,
childColumn.lastIndexOf(")")));
+ childrenExpressionSet.add(childExpression);
+ }
+
for (CrossSeriesAggregationDescriptor originalDescriptor :
handle.getGroupByLevelDescriptors()) {
Set<Expression> descriptorExpressions = new HashSet<>();
- for (String childColumn : childrenOutputColumns) {
- // If this condition matched, the childColumn should come from
GroupByLevelNode
- if (isAggColumnMatchExpression(childColumn,
originalDescriptor.getOutputExpression())) {
-
descriptorExpressions.add(originalDescriptor.getOutputExpression());
- continue;
- }
- for (Expression exp : originalDescriptor.getInputExpressions()) {
- if (isAggColumnMatchExpression(childColumn, exp)) {
- descriptorExpressions.add(exp);
- }
+
+ if
(childrenExpressionSet.contains(originalDescriptor.getOutputExpression())) {
+ descriptorExpressions.add(originalDescriptor.getOutputExpression());
+ }
+
+ for (Expression exp : originalDescriptor.getInputExpressions()) {
+ if (childrenExpressionSet.contains(exp)) {
+ descriptorExpressions.add(exp);
}
}
- if (descriptorExpressions.size() == 0) {
+
+ if (descriptorExpressions.isEmpty()) {
continue;
}
CrossSeriesAggregationDescriptor descriptor =
originalDescriptor.deepClone();
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
index 37055a6a24..bac907fa4a 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/LocalSinkHandleTest.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.mpp.execution.exchange;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import
org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -91,7 +92,8 @@ public class LocalSinkHandleTest {
Mockito.verify(spyMemoryPool, Mockito.times(11))
.reserve(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ remoteFragmentInstanceId),
remotePlanNodeId,
mockTsBlockSize,
Long.MAX_VALUE);
@@ -107,7 +109,12 @@ public class LocalSinkHandleTest {
Assert.assertFalse(localSinkHandle.isFinished());
Assert.assertEquals(0L, localSinkHandle.getBufferRetainedSizeInBytes());
Mockito.verify(spyMemoryPool, Mockito.times(11))
- .free(queryId, localFragmentInstanceId.getInstanceId(),
remotePlanNodeId, mockTsBlockSize);
+ .free(
+ queryId,
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ remoteFragmentInstanceId),
+ remotePlanNodeId,
+ mockTsBlockSize);
// Set no-more-TsBlocks.
localSinkHandle.setNoMoreTsBlocks();
@@ -179,7 +186,8 @@ public class LocalSinkHandleTest {
Mockito.verify(spyMemoryPool, Mockito.times(11))
.reserve(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ remoteFragmentInstanceId),
remotePlanNodeId,
mockTsBlockSize,
Long.MAX_VALUE);
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
index 0c8ebf9f73..8c033e2b02 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import
org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import
org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -116,7 +117,8 @@ public class SinkHandleTest {
// Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
// .reserve(
// queryId,
- // localFragmentInstanceId.getInstanceId(),
+ //
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ // remoteFragmentInstanceId),
// localPlanNodeId,
// mockTsBlockSize * numOfMockTsBlock,
// Long.MAX_VALUE);
@@ -164,7 +166,8 @@ public class SinkHandleTest {
Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
.free(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
numOfMockTsBlock * mockTsBlockSize);
Mockito.verify(mockSinkHandleListener,
Mockito.timeout(10_0000).times(1)).onFinish(sinkHandle);
@@ -201,7 +204,8 @@ public class SinkHandleTest {
MemoryPool mockMemoryPool =
Utils.createMockBlockedMemoryPool(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
numOfMockTsBlock,
mockTsBlockSize);
@@ -261,7 +265,8 @@ public class SinkHandleTest {
// Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
// .reserve(
// queryId,
- // localFragmentInstanceId.getInstanceId(),
+ //
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ // remoteFragmentInstanceId),
// localPlanNodeId,
// mockTsBlockSize * numOfMockTsBlock,
// Long.MAX_VALUE);
@@ -302,7 +307,8 @@ public class SinkHandleTest {
Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
.free(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
numOfMockTsBlock * mockTsBlockSize);
@@ -318,7 +324,8 @@ public class SinkHandleTest {
// Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(3))
// .reserve(
// queryId,
- // localFragmentInstanceId.getInstanceId(),
+ //
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ // remoteFragmentInstanceId),
// localPlanNodeId,
// mockTsBlockSize * numOfMockTsBlock,
// Long.MAX_VALUE);
@@ -378,7 +385,8 @@ public class SinkHandleTest {
Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(2))
.free(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
numOfMockTsBlock * mockTsBlockSize);
Mockito.verify(mockSinkHandleListener,
Mockito.timeout(10_0000).times(1)).onFinish(sinkHandle);
@@ -401,7 +409,8 @@ public class SinkHandleTest {
MemoryPool mockMemoryPool =
Utils.createMockBlockedMemoryPool(
queryId,
- localFragmentInstanceId.getInstanceId(),
+ FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
numOfMockTsBlock,
mockTsBlockSize);
@@ -462,7 +471,8 @@ public class SinkHandleTest {
// Mockito.verify(mockMemoryPool, Mockito.timeout(10_0000).times(1))
// .reserve(
// queryId,
- // localFragmentInstanceId.getInstanceId(),
+ //
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ // remoteFragmentInstanceId),
// localPlanNodeId,
// mockTsBlockSize * numOfMockTsBlock,
// Long.MAX_VALUE);
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
index aa9adbace6..ae2c2c3f6d 100644
---
a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
+++
b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import
org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import
org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -233,7 +234,8 @@ public class SourceHandleTest {
Mockito.verify(spyMemoryPool, Mockito.timeout(10_000).times(6))
.reserve(
queryId,
- localFragmentInstanceId.getInstanceId(),
+
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
localPlanNodeId,
mockTsBlockSize,
5 * mockTsBlockSize);
@@ -263,7 +265,12 @@ public class SourceHandleTest {
// The local fragment instance consumes the data blocks.
for (int i = 0; i < numOfMockTsBlock; i++) {
Mockito.verify(spyMemoryPool, Mockito.timeout(10_0000).times(i))
- .free(queryId, localFragmentInstanceId.getInstanceId(),
localPlanNodeId, mockTsBlockSize);
+ .free(
+ queryId,
+
FragmentInstanceId.createFragmentInstanceIdFromTFragmentInstanceId(
+ localFragmentInstanceId),
+ localPlanNodeId,
+ mockTsBlockSize);
sourceHandle.receive();
try {
if (i < 5) {
diff --git
a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
index d15cdd3ce4..3fbf8f44c1 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeTest.java
@@ -581,13 +581,13 @@ public class AnalyzeTest {
public void testDataPartitionAnalyze() {
Analysis analysis = analyzeSQL("insert into root.sg.d1(timestamp,s)
values(1,10),(86401,11)");
Assert.assertEquals(
+ 1,
analysis
.getDataPartitionInfo()
.getDataPartitionMap()
.get("root.sg")
- .get(new TSeriesPartitionSlot(8923))
- .size(),
- 1);
+ .get(new TSeriesPartitionSlot(1107))
+ .size());
}
@Test
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java
b/session/src/main/java/org/apache/iotdb/session/Session.java
index 9638238c3d..0298785756 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -404,6 +404,28 @@ public class Session implements ISession {
}
}
+ @Override
+ public synchronized void open(
+ boolean enableRPCCompression,
+ int connectionTimeoutInMs,
+ Map<String, TEndPoint> deviceIdToEndpoint)
+ throws IoTDBConnectionException {
+ if (!isClosed) {
+ return;
+ }
+
+ this.enableRPCCompression = enableRPCCompression;
+ this.connectionTimeoutInMs = connectionTimeoutInMs;
+ defaultSessionConnection = constructSessionConnection(this,
defaultEndPoint, zoneId);
+ defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
+ isClosed = false;
+ if (enableRedirection || enableQueryRedirection) {
+ this.deviceIdToEndpoint = deviceIdToEndpoint;
+ endPointToSessionConnection = new ConcurrentHashMap<>();
+ endPointToSessionConnection.put(defaultEndPoint,
defaultSessionConnection);
+ }
+ }
+
@Override
public synchronized void close() throws IoTDBConnectionException {
if (isClosed) {
@@ -920,7 +942,8 @@ public class Session implements ISession {
TEndPoint endPoint;
if (enableRedirection
&& !deviceIdToEndpoint.isEmpty()
- && (endPoint = deviceIdToEndpoint.get(deviceId)) != null) {
+ && (endPoint = deviceIdToEndpoint.get(deviceId)) != null
+ && endPointToSessionConnection.containsKey(endPoint)) {
return endPointToSessionConnection.get(endPoint);
} else {
return defaultSessionConnection;
@@ -965,7 +988,10 @@ public class Session implements ISession {
return;
}
AtomicReference<IoTDBConnectionException> exceptionReference = new
AtomicReference<>();
- deviceIdToEndpoint.put(deviceId, endpoint);
+ if (!deviceIdToEndpoint.containsKey(deviceId)
+ || !deviceIdToEndpoint.get(deviceId).equals(endpoint)) {
+ deviceIdToEndpoint.put(deviceId, endpoint);
+ }
SessionConnection connection =
endPointToSessionConnection.computeIfAbsent(
endpoint,
@@ -3259,6 +3285,7 @@ public class Session implements ISession {
completableFuture.join();
} catch (CompletionException completionException) {
Throwable cause = completionException.getCause();
+ logger.error("Meet error when async insert!", cause);
if (cause instanceof IoTDBConnectionException) {
throw (IoTDBConnectionException) cause;
} else {
diff --git
a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 12661ec21d..ed89b085bf 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.session.pool;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.ISessionDataSet;
import org.apache.iotdb.isession.SessionConfig;
@@ -93,6 +94,8 @@ public class SessionPool implements ISessionPool {
private boolean enableRedirection;
private boolean enableQueryRedirection = false;
+ private Map<String, TEndPoint> deviceIdToEndpoint;
+
private int thriftDefaultBufferSize;
private int thriftMaxFrameSize;
@@ -299,6 +302,9 @@ public class SessionPool implements ISessionPool {
this.enableCompression = enableCompression;
this.zoneId = zoneId;
this.enableRedirection = enableRedirection;
+ if (this.enableRedirection) {
+ deviceIdToEndpoint = new ConcurrentHashMap<>();
+ }
this.connectionTimeoutInMs = connectionTimeoutInMs;
this.version = version;
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -330,6 +336,9 @@ public class SessionPool implements ISessionPool {
this.enableCompression = enableCompression;
this.zoneId = zoneId;
this.enableRedirection = enableRedirection;
+ if (this.enableRedirection) {
+ deviceIdToEndpoint = new ConcurrentHashMap<>();
+ }
this.connectionTimeoutInMs = connectionTimeoutInMs;
this.version = version;
this.thriftDefaultBufferSize = thriftDefaultBufferSize;
@@ -448,7 +457,7 @@ public class SessionPool implements ISessionPool {
session = constructNewSession();
try {
- session.open(enableCompression, connectionTimeoutInMs);
+ session.open(enableCompression, connectionTimeoutInMs,
deviceIdToEndpoint);
// avoid someone has called close() the session pool
synchronized (this) {
if (closed) {
@@ -548,7 +557,7 @@ public class SessionPool implements ISessionPool {
private void tryConstructNewSession() {
Session session = constructNewSession();
try {
- session.open(enableCompression, connectionTimeoutInMs);
+ session.open(enableCompression, connectionTimeoutInMs,
deviceIdToEndpoint);
// avoid someone has called close() the session pool
synchronized (this) {
if (closed) {
@@ -2639,6 +2648,9 @@ public class SessionPool implements ISessionPool {
@Override
public void setEnableRedirection(boolean enableRedirection) {
this.enableRedirection = enableRedirection;
+ if (this.enableRedirection) {
+ deviceIdToEndpoint = new ConcurrentHashMap<>();
+ }
for (ISession session : queue) {
session.setEnableRedirection(enableRedirection);
}