This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 803b88d  [IOTDB-1846]Fix the error when count the total number of 
devices in cluster mode (#4171)
803b88d is described below

commit 803b88d77b889a4022b1b5e3f47160b64ad57105
Author: ZhangHongYin <[email protected]>
AuthorDate: Mon Oct 18 09:28:21 2021 +0800

    [IOTDB-1846]Fix the error when count the total number of devices in cluster 
mode (#4171)
    
    * [IOTDB-1846] fix count total number of devices
    
    * [IOTDB-1846] add count devices cases
    
    * [IOTDB-1846] fix hard code
---
 .../iotdb/cluster/query/ClusterPlanExecutor.java   | 139 +++++++++++++++++++++
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   2 +-
 .../test/java/org/apache/iotdb/db/sql/Cases.java   |   9 ++
 3 files changed, 149 insertions(+), 1 deletion(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index d36b9aa..ba2a928 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -122,6 +122,145 @@ public class ClusterPlanExecutor extends PlanExecutor {
   }
 
   @Override
+  protected int getDevicesNum(PartialPath path) throws MetadataException {
+    // make sure this node knows all storage groups
+    try {
+      metaGroupMember.syncLeaderWithConsistencyCheck(false);
+    } catch (CheckConsistencyException e) {
+      throw new MetadataException(e);
+    }
+    Map<String, String> sgPathMap = 
IoTDB.metaManager.groupPathByStorageGroup(path);
+    if (sgPathMap.isEmpty()) {
+      throw new PathNotExistException(path.getFullPath());
+    }
+    logger.debug("The storage groups of path {} are {}", path, 
sgPathMap.keySet());
+    int ret;
+    try {
+      ret = getDeviceCount(sgPathMap);
+    } catch (CheckConsistencyException e) {
+      throw new MetadataException(e);
+    }
+    logger.debug("The number of devices satisfying {} is {}", path, ret);
+    return ret;
+  }
+
+  private int getDeviceCount(Map<String, String> sgPathMap)
+      throws CheckConsistencyException, MetadataException {
+    AtomicInteger result = new AtomicInteger();
+    // split the paths by the data group they belong to
+    Map<PartitionGroup, List<String>> groupPathMap = new HashMap<>();
+    for (Entry<String, String> sgPathEntry : sgPathMap.entrySet()) {
+      String storageGroupName = sgPathEntry.getKey();
+      PartialPath pathUnderSG = new PartialPath(sgPathEntry.getValue());
+      // find the data group that should hold the device schemas of the 
storage group
+      PartitionGroup partitionGroup =
+          metaGroupMember.getPartitionTable().route(storageGroupName, 0);
+      if (partitionGroup.contains(metaGroupMember.getThisNode())) {
+        // this node is a member of the group, perform a local query after 
synchronizing with the
+        // leader
+        metaGroupMember
+            .getLocalDataMember(partitionGroup.getHeader(), 
partitionGroup.getId())
+            .syncLeaderWithConsistencyCheck(false);
+        int localResult = getLocalDeviceCount(pathUnderSG);
+        logger.debug(
+            "{}: get device count of {} locally, result {}",
+            metaGroupMember.getName(),
+            partitionGroup,
+            localResult);
+        result.addAndGet(localResult);
+      } else {
+        // batch the queries of the same group to reduce communication
+        groupPathMap
+            .computeIfAbsent(partitionGroup, p -> new ArrayList<>())
+            .add(pathUnderSG.getFullPath());
+      }
+    }
+    if (groupPathMap.isEmpty()) {
+      return result.get();
+    }
+
+    ExecutorService remoteQueryThreadPool = 
Executors.newFixedThreadPool(groupPathMap.size());
+    List<Future<Void>> remoteFutures = new ArrayList<>();
+    // query each data group separately
+    for (Entry<PartitionGroup, List<String>> partitionGroupPathEntry : 
groupPathMap.entrySet()) {
+      PartitionGroup partitionGroup = partitionGroupPathEntry.getKey();
+      List<String> pathsToQuery = partitionGroupPathEntry.getValue();
+      remoteFutures.add(
+          remoteQueryThreadPool.submit(
+              () -> {
+                try {
+                  result.addAndGet(getRemoteDeviceCount(partitionGroup, 
pathsToQuery));
+                } catch (MetadataException e) {
+                  logger.warn(
+                      "Cannot get remote device count of {} from {}",
+                      pathsToQuery,
+                      partitionGroup,
+                      e);
+                }
+                return null;
+              }));
+    }
+    waitForThreadPool(remoteFutures, remoteQueryThreadPool, 
"getDeviceCount()");
+
+    return result.get();
+  }
+
+  private int getLocalDeviceCount(PartialPath path) throws MetadataException {
+    return IoTDB.metaManager.getDevicesNum(path);
+  }
+
+  private int getRemoteDeviceCount(PartitionGroup partitionGroup, List<String> 
pathsToCount)
+      throws MetadataException {
+    // choose the node with lowest latency or highest throughput
+    List<Node> coordinatedNodes = 
QueryCoordinator.getINSTANCE().reorderNodes(partitionGroup);
+    for (Node node : coordinatedNodes) {
+      try {
+        Integer count;
+        if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+          AsyncDataClient client =
+              metaGroupMember
+                  .getClientProvider()
+                  .getAsyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
+          client.setTimeout(RaftServer.getReadOperationTimeoutMS());
+          count =
+              SyncClientAdaptor.getAllDevices(client, 
partitionGroup.getHeader(), pathsToCount)
+                  .size();
+        } else {
+          try (SyncDataClient syncDataClient =
+              metaGroupMember
+                  .getClientProvider()
+                  .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS())) {
+            try {
+              
syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS());
+              count = syncDataClient.getAllDevices(partitionGroup.getHeader(), 
pathsToCount).size();
+            } catch (TException e) {
+              // the connection may be broken, close it to avoid it being 
reused
+              syncDataClient.getInputProtocol().getTransport().close();
+              throw e;
+            }
+          }
+        }
+        logger.debug(
+            "{}: get device count of {} from {}, result {}",
+            metaGroupMember.getName(),
+            partitionGroup,
+            node,
+            count);
+        if (count != null) {
+          return count;
+        }
+      } catch (IOException | TException e) {
+        throw new MetadataException(e);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new MetadataException(e);
+      }
+    }
+    logger.warn("Cannot get devices count of {} from {}", pathsToCount, 
partitionGroup);
+    return 0;
+  }
+
+  @Override
   protected int getPathsNum(PartialPath path) throws MetadataException {
     return getNodesNumInGivenLevel(path, -1);
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java 
b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 9ac7c29..8076e9b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -697,7 +697,7 @@ public class PlanExecutor implements IPlanExecutor {
     return singleDataSet;
   }
 
-  private int getDevicesNum(PartialPath path) throws MetadataException {
+  protected int getDevicesNum(PartialPath path) throws MetadataException {
     return IoTDB.metaManager.getDevicesNum(path);
   }
 
diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java 
b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
index f5d4cc9..5d5eb6c 100644
--- a/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
+++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/Cases.java
@@ -54,6 +54,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 public abstract class Cases {
@@ -286,6 +287,14 @@ public abstract class Cases {
       Assert.assertEquals(n, cnt);
       resultSet.close();
     }
+
+    // try to get devices on each node;
+    for (Statement readStatement : readStatements) {
+      ResultSet resultSet = readStatement.executeQuery("COUNT DEVICES");
+      while (resultSet.next()) {
+        assertEquals(n, resultSet.getInt(1));
+      }
+    }
   }
 
   @Test

Reply via email to