This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new c6f5c31 [To rel/0.12][IOTDB-1846]Fix the error when count the total
number of devices in cluster mode (#4173)
c6f5c31 is described below
commit c6f5c314862492423468a3e06122d670fa0a7e1f
Author: ZhangHongYin <[email protected]>
AuthorDate: Mon Oct 18 12:40:11 2021 +0800
[To rel/0.12][IOTDB-1846]Fix the error when count the total number of
devices in cluster mode (#4173)
[To rel/0.12][IOTDB-1846]Fix the error when count the total number of
devices in cluster mode
---
.../iotdb/cluster/query/ClusterPlanExecutor.java | 139 +++++++++++++++++++++
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 2 +-
2 files changed, 140 insertions(+), 1 deletion(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 34e9904..83a6f06 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -125,6 +125,145 @@ public class ClusterPlanExecutor extends PlanExecutor {
}
@Override
+ protected int getDevicesNum(PartialPath path) throws MetadataException {
+ // make sure this node knows all storage groups
+ try {
+ metaGroupMember.syncLeaderWithConsistencyCheck(false);
+ } catch (CheckConsistencyException e) {
+ throw new MetadataException(e);
+ }
+ Map<String, String> sgPathMap =
IoTDB.metaManager.determineStorageGroup(path);
+ if (sgPathMap.isEmpty()) {
+ throw new PathNotExistException(path.getFullPath());
+ }
+ logger.debug("The storage groups of path {} are {}", path,
sgPathMap.keySet());
+ int ret;
+ try {
+ ret = getDeviceCount(sgPathMap);
+ } catch (CheckConsistencyException e) {
+ throw new MetadataException(e);
+ }
+ logger.debug("The number of devices satisfying {} is {}", path, ret);
+ return ret;
+ }
+
+ private int getDeviceCount(Map<String, String> sgPathMap)
+ throws CheckConsistencyException, MetadataException {
+ AtomicInteger result = new AtomicInteger();
+ // split the paths by the data group they belong to
+ Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>();
+ for (Entry<String, String> sgPathEntry : sgPathMap.entrySet()) {
+ String storageGroupName = sgPathEntry.getKey();
+ PartialPath pathUnderSG = new PartialPath(sgPathEntry.getValue());
+ // find the data group that should hold the device schemas of the
storage group
+ PartitionGroup partitionGroup =
+ metaGroupMember.getPartitionTable().route(storageGroupName, 0);
+ if (partitionGroup.contains(metaGroupMember.getThisNode())) {
+ // this node is a member of the group, perform a local query after
synchronizing with the
+ // leader
+ metaGroupMember
+ .getLocalDataMember(partitionGroup.getHeader())
+ .syncLeaderWithConsistencyCheck(false);
+ int localResult = getLocalDeviceCount(pathUnderSG);
+ logger.debug(
+ "{}: get device count of {} locally, result {}",
+ metaGroupMember.getName(),
+ partitionGroup,
+ localResult);
+ result.addAndGet(localResult);
+ } else {
+ // batch the queries of the same group to reduce communication
+ groupPathMap
+ .computeIfAbsent(partitionGroup, p -> new ArrayList<>())
+ .add(pathUnderSG.getFullPath());
+ }
+ }
+ if (groupPathMap.isEmpty()) {
+ return result.get();
+ }
+
+ ExecutorService remoteQueryThreadPool =
Executors.newFixedThreadPool(groupPathMap.size());
+ List<Future<Void>> remoteFutures = new ArrayList<>();
+ // query each data group separately
+ for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry :
groupPathMap.entrySet()) {
+ PartitionGroup partitionGroup = partitionGroupPathEntry.getKey();
+ List<String> pathsToQuery = partitionGroupPathEntry.getValue();
+ remoteFutures.add(
+ remoteQueryThreadPool.submit(
+ () -> {
+ try {
+ result.addAndGet(getRemoteDeviceCount(partitionGroup,
pathsToQuery));
+ } catch (MetadataException e) {
+ logger.warn(
+ "Cannot get remote device count of {} from {}",
+ pathsToQuery,
+ partitionGroup,
+ e);
+ }
+ return null;
+ }));
+ }
+ waitForThreadPool(remoteFutures, remoteQueryThreadPool,
"getDeviceCount()");
+
+ return result.get();
+ }
+
+ private int getLocalDeviceCount(PartialPath path) throws MetadataException {
+ return IoTDB.metaManager.getDevicesNum(path);
+ }
+
+ private int getRemoteDeviceCount(PartitionGroup partitionGroup, List<String>
pathsToCount)
+ throws MetadataException {
+ // choose the node with lowest latency or highest throughput
+ List<Node> coordinatedNodes =
QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
+ for (Node node : coordinatedNodes) {
+ try {
+ Integer count;
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ AsyncDataClient client =
+ metaGroupMember
+ .getClientProvider()
+ .getAsyncDataClient(node,
RaftServer.getReadOperationTimeoutMS());
+ client.setTimeout(RaftServer.getReadOperationTimeoutMS());
+ count =
+ SyncClientAdaptor.getAllDevices(client,
partitionGroup.getHeader(), pathsToCount)
+ .size();
+ } else {
+ try (SyncDataClient syncDataClient =
+ metaGroupMember
+ .getClientProvider()
+ .getSyncDataClient(node,
RaftServer.getReadOperationTimeoutMS())) {
+ try {
+
syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS());
+ count = syncDataClient.getAllDevices(partitionGroup.getHeader(),
pathsToCount).size();
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being
reused
+ syncDataClient.getInputProtocol().getTransport().close();
+ throw e;
+ }
+ }
+ }
+ logger.debug(
+ "{}: get device count of {} from {}, result {}",
+ metaGroupMember.getName(),
+ partitionGroup,
+ node,
+ count);
+ if (count != null) {
+ return count;
+ }
+ } catch (IOException | TException e) {
+ throw new MetadataException(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new MetadataException(e);
+ }
+ }
+ logger.warn("Cannot get devices count of {} from {}", pathsToCount,
partitionGroup);
+ return 0;
+ }
+
+ @Override
protected int getPathsNum(PartialPath path) throws MetadataException {
return getNodesNumInGivenLevel(path, -1);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index ad2940b..7b6e5b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -671,7 +671,7 @@ public class PlanExecutor implements IPlanExecutor {
return singleDataSet;
}
- private int getDevicesNum(PartialPath path) throws MetadataException {
+ protected int getDevicesNum(PartialPath path) throws MetadataException {
return IoTDB.metaManager.getDevicesNum(path);
}