This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new c6f5c31  [To rel/0.12][IOTDB-1846]Fix the error when count the total 
number of devices in cluster mode (#4173)
c6f5c31 is described below

commit c6f5c314862492423468a3e06122d670fa0a7e1f
Author: ZhangHongYin <[email protected]>
AuthorDate: Mon Oct 18 12:40:11 2021 +0800

    [To rel/0.12][IOTDB-1846]Fix the error when count the total number of 
devices in cluster mode (#4173)
    
    [To rel/0.12][IOTDB-1846]Fix the error when count the total number of 
devices in cluster mode
---
 .../iotdb/cluster/query/ClusterPlanExecutor.java   | 139 +++++++++++++++++++++
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   2 +-
 2 files changed, 140 insertions(+), 1 deletion(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 34e9904..83a6f06 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -125,6 +125,145 @@ public class ClusterPlanExecutor extends PlanExecutor {
   }
 
   @Override
+  protected int getDevicesNum(PartialPath path) throws MetadataException {
+    // make sure this node knows all storage groups
+    try {
+      metaGroupMember.syncLeaderWithConsistencyCheck(false);
+    } catch (CheckConsistencyException e) {
+      throw new MetadataException(e);
+    }
+    Map<String, String> sgPathMap = 
IoTDB.metaManager.determineStorageGroup(path);
+    if (sgPathMap.isEmpty()) {
+      throw new PathNotExistException(path.getFullPath());
+    }
+    logger.debug("The storage groups of path {} are {}", path, 
sgPathMap.keySet());
+    int ret;
+    try {
+      ret = getDeviceCount(sgPathMap);
+    } catch (CheckConsistencyException e) {
+      throw new MetadataException(e);
+    }
+    logger.debug("The number of devices satisfying {} is {}", path, ret);
+    return ret;
+  }
+
+  private int getDeviceCount(Map<String, String> sgPathMap)
+      throws CheckConsistencyException, MetadataException {
+    AtomicInteger result = new AtomicInteger();
+    // split the paths by the data group they belong to
+    Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>();
+    for (Entry<String, String> sgPathEntry : sgPathMap.entrySet()) {
+      String storageGroupName = sgPathEntry.getKey();
+      PartialPath pathUnderSG = new PartialPath(sgPathEntry.getValue());
+      // find the data group that should hold the device schemas of the 
storage group
+      PartitionGroup partitionGroup =
+          metaGroupMember.getPartitionTable().route(storageGroupName, 0);
+      if (partitionGroup.contains(metaGroupMember.getThisNode())) {
+        // this node is a member of the group, perform a local query after 
synchronizing with the
+        // leader
+        metaGroupMember
+            .getLocalDataMember(partitionGroup.getHeader())
+            .syncLeaderWithConsistencyCheck(false);
+        int localResult = getLocalDeviceCount(pathUnderSG);
+        logger.debug(
+            "{}: get device count of {} locally, result {}",
+            metaGroupMember.getName(),
+            partitionGroup,
+            localResult);
+        result.addAndGet(localResult);
+      } else {
+        // batch the queries of the same group to reduce communication
+        groupPathMap
+            .computeIfAbsent(partitionGroup, p -> new ArrayList<>())
+            .add(pathUnderSG.getFullPath());
+      }
+    }
+    if (groupPathMap.isEmpty()) {
+      return result.get();
+    }
+
+    ExecutorService remoteQueryThreadPool = 
Executors.newFixedThreadPool(groupPathMap.size());
+    List<Future<Void>> remoteFutures = new ArrayList<>();
+    // query each data group separately
+    for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry : 
groupPathMap.entrySet()) {
+      PartitionGroup partitionGroup = partitionGroupPathEntry.getKey();
+      List<String> pathsToQuery = partitionGroupPathEntry.getValue();
+      remoteFutures.add(
+          remoteQueryThreadPool.submit(
+              () -> {
+                try {
+                  result.addAndGet(getRemoteDeviceCount(partitionGroup, 
pathsToQuery));
+                } catch (MetadataException e) {
+                  logger.warn(
+                      "Cannot get remote device count of {} from {}",
+                      pathsToQuery,
+                      partitionGroup,
+                      e);
+                }
+                return null;
+              }));
+    }
+    waitForThreadPool(remoteFutures, remoteQueryThreadPool, 
"getDeviceCount()");
+
+    return result.get();
+  }
+
+  private int getLocalDeviceCount(PartialPath path) throws MetadataException {
+    return IoTDB.metaManager.getDevicesNum(path);
+  }
+
+  private int getRemoteDeviceCount(PartitionGroup partitionGroup, List<String> 
pathsToCount)
+      throws MetadataException {
+    // choose the node with lowest latency or highest throughput
+    List<Node> coordinatedNodes = 
QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
+    for (Node node : coordinatedNodes) {
+      try {
+        Integer count;
+        if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+          AsyncDataClient client =
+              metaGroupMember
+                  .getClientProvider()
+                  .getAsyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
+          client.setTimeout(RaftServer.getReadOperationTimeoutMS());
+          count =
+              SyncClientAdaptor.getAllDevices(client, 
partitionGroup.getHeader(), pathsToCount)
+                  .size();
+        } else {
+          try (SyncDataClient syncDataClient =
+              metaGroupMember
+                  .getClientProvider()
+                  .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS())) {
+            try {
+              
syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS());
+              count = syncDataClient.getAllDevices(partitionGroup.getHeader(), 
pathsToCount).size();
+            } catch (TException e) {
+              // the connection may be broken, close it to avoid it being 
reused
+              syncDataClient.getInputProtocol().getTransport().close();
+              throw e;
+            }
+          }
+        }
+        logger.debug(
+            "{}: get device count of {} from {}, result {}",
+            metaGroupMember.getName(),
+            partitionGroup,
+            node,
+            count);
+        if (count != null) {
+          return count;
+        }
+      } catch (IOException | TException e) {
+        throw new MetadataException(e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new MetadataException(e);
+      }
+    }
+    logger.warn("Cannot get devices count of {} from {}", pathsToCount, 
partitionGroup);
+    return 0;
+  }
+
+  @Override
   protected int getPathsNum(PartialPath path) throws MetadataException {
     return getNodesNumInGivenLevel(path, -1);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java 
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index ad2940b..7b6e5b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -671,7 +671,7 @@ public class PlanExecutor implements IPlanExecutor {
     return singleDataSet;
   }
 
-  private int getDevicesNum(PartialPath path) throws MetadataException {
+  protected int getDevicesNum(PartialPath path) throws MetadataException {
     return IoTDB.metaManager.getDevicesNum(path);
   }
 

Reply via email to