This is an automated email from the ASF dual-hosted git repository.

wangchao316 pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new e3d02f4  [To rel/0.12][IOTDB-1846]Optimize when count the total number 
of devices in cluster mode (#4188)
e3d02f4 is described below

commit e3d02f452418f5ad39b0e435c7d13657cba1124f
Author: ZhangHongYin <[email protected]>
AuthorDate: Wed Oct 20 14:45:43 2021 +0800

    [To rel/0.12][IOTDB-1846]Optimize when count the total number of devices in 
cluster mode (#4188)
    
    [To rel/0.12][IOTDB-1846]Optimize when count the total number of devices in 
cluster mode (#4188)
---
 .../iotdb/cluster/client/sync/SyncClientAdaptor.java      | 10 ++++++++++
 .../apache/iotdb/cluster/query/ClusterPlanExecutor.java   |  5 ++---
 .../apache/iotdb/cluster/query/LocalQueryExecutor.java    | 11 +++++++++++
 .../apache/iotdb/cluster/server/DataClusterServer.java    | 15 +++++++++++++++
 .../iotdb/cluster/server/service/DataAsyncService.java    | 12 ++++++++++++
 .../iotdb/cluster/server/service/DataSyncService.java     |  9 +++++++++
 thrift-cluster/src/main/thrift/cluster.thrift             |  2 ++
 7 files changed, 61 insertions(+), 3 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 6d73265..31bd41b 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -368,6 +368,16 @@ public class SyncClientAdaptor {
     return handler.getResult(RaftServer.getReadOperationTimeoutMS());
   }
 
+  public static Integer getDeviceCount(
+      AsyncDataClient client, Node header, List<String> pathsToQuery)
+      throws InterruptedException, TException {
+    AtomicReference<Integer> remoteResult = new AtomicReference<>(null);
+    GenericHandler<Integer> handler = new GenericHandler<>(client.getNode(), 
remoteResult);
+
+    client.getDeviceCount(header, pathsToQuery, handler);
+    return handler.getResult(RaftServer.getReadOperationTimeoutMS());
+  }
+
   public static Set<String> getAllDevices(
       AsyncDataClient client, Node header, List<String> pathsToQuery)
       throws InterruptedException, TException {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 83a6f06..551247d 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -226,8 +226,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
                   .getAsyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS());
           client.setTimeout(RaftServer.getReadOperationTimeoutMS());
           count =
-              SyncClientAdaptor.getAllDevices(client, 
partitionGroup.getHeader(), pathsToCount)
-                  .size();
+              SyncClientAdaptor.getDeviceCount(client, 
partitionGroup.getHeader(), pathsToCount);
         } else {
           try (SyncDataClient syncDataClient =
               metaGroupMember
@@ -235,7 +234,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
                   .getSyncDataClient(node, 
RaftServer.getReadOperationTimeoutMS())) {
             try {
               
syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS());
-              count = syncDataClient.getAllDevices(partitionGroup.getHeader(), 
pathsToCount).size();
+              count = 
syncDataClient.getDeviceCount(partitionGroup.getHeader(), pathsToCount);
             } catch (TException e) {
               // the connection may be broken, close it to avoid it being 
reused
               syncDataClient.getInputProtocol().getTransport().close();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index bd5c376..74a2721 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -909,6 +909,17 @@ public class LocalQueryExecutor {
     return count;
   }
 
+  public int getDeviceCount(List<String> pathsToQuery)
+      throws CheckConsistencyException, MetadataException {
+    dataGroupMember.syncLeaderWithConsistencyCheck(false);
+
+    int count = 0;
+    for (String s : pathsToQuery) {
+      count += getCMManager().getDevicesNum(new PartialPath(s));
+    }
+    return count;
+  }
+
   @SuppressWarnings("java:S1135") // ignore todos
   public ByteBuffer last(LastQueryRequest request)
       throws CheckConsistencyException, QueryProcessException, IOException, 
StorageEngineException,
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 858459b..1ab3ca0 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -760,6 +760,16 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
+  public void getDeviceCount(
+      Node header, List<String> pathsToQuery, AsyncMethodCallback<Integer> 
resultHandler)
+      throws TException {
+    DataAsyncService service = getDataAsyncService(header, resultHandler, 
"count device");
+    if (service != null) {
+      service.getDeviceCount(header, pathsToQuery, resultHandler);
+    }
+  }
+
+  @Override
   public void onSnapshotApplied(
       Node header, List<Integer> slots, AsyncMethodCallback<Boolean> 
resultHandler) {
     DataAsyncService service = getDataAsyncService(header, resultHandler, 
"Snapshot applied");
@@ -897,6 +907,11 @@ public class DataClusterServer extends RaftServer
   }
 
   @Override
+  public int getDeviceCount(Node header, List<String> pathsToQuery) throws 
TException {
+    return getDataSyncService(header).getDeviceCount(header, pathsToQuery);
+  }
+
+  @Override
   public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) {
     return getDataSyncService(request.getHeader()).sendHeartbeat(request);
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index a592599..c6b392e 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -432,6 +432,18 @@ public class DataAsyncService extends BaseAsyncService 
implements TSDataService.
   }
 
   @Override
+  public void getDeviceCount(
+      Node header, List<String> pathsToQuery, AsyncMethodCallback<Integer> 
resultHandler)
+      throws TException {
+    try {
+      resultHandler.onComplete(
+          
dataGroupMember.getLocalQueryExecutor().getDeviceCount(pathsToQuery));
+    } catch (CheckConsistencyException | MetadataException e) {
+      resultHandler.onError(e);
+    }
+  }
+
+  @Override
   public void onSnapshotApplied(
       Node header, List<Integer> slots, AsyncMethodCallback<Boolean> 
resultHandler) {
     resultHandler.onComplete(dataGroupMember.onSnapshotInstalled(slots));
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 062dcc7..8ff3c35 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -410,6 +410,15 @@ public class DataSyncService extends BaseSyncService 
implements TSDataService.If
   }
 
   @Override
+  public int getDeviceCount(Node header, List<String> pathsToQuery) throws 
TException {
+    try {
+      return 
dataGroupMember.getLocalQueryExecutor().getDeviceCount(pathsToQuery);
+    } catch (CheckConsistencyException | MetadataException e) {
+      throw new TException(e);
+    }
+  }
+
+  @Override
   public ByteBuffer peekNextNotNullValue(Node header, long executorId, long 
startTime, long endTime)
       throws TException {
     try {
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift 
b/thrift-cluster/src/main/thrift/cluster.thrift
index e9332a5..d838b2c 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -463,6 +463,8 @@ service TSDataService extends RaftService {
 
   int getPathCount(1: Node header 2: list<string> pathsToQuery 3: int level)
 
+  int getDeviceCount(1: Node header, 2: list<string> pathsToQuery)
+
   /**
   * During slot transfer, when a member has pulled snapshot from a group, the 
member will use this
   * method to inform the group that one replica of such slots has been pulled.

Reply via email to