This is an automated email from the ASF dual-hosted git repository.
wangchao316 pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new e3d02f4 [To rel/0.12][IOTDB-1846]Optimize when count the total number
of devices in cluster mode (#4188)
e3d02f4 is described below
commit e3d02f452418f5ad39b0e435c7d13657cba1124f
Author: ZhangHongYin <[email protected]>
AuthorDate: Wed Oct 20 14:45:43 2021 +0800
[To rel/0.12][IOTDB-1846]Optimize when count the total number of devices in
cluster mode (#4188)
[To rel/0.12][IOTDB-1846]Optimize when count the total number of devices in
cluster mode (#4188)
---
.../iotdb/cluster/client/sync/SyncClientAdaptor.java | 10 ++++++++++
.../apache/iotdb/cluster/query/ClusterPlanExecutor.java | 5 ++---
.../apache/iotdb/cluster/query/LocalQueryExecutor.java | 11 +++++++++++
.../apache/iotdb/cluster/server/DataClusterServer.java | 15 +++++++++++++++
.../iotdb/cluster/server/service/DataAsyncService.java | 12 ++++++++++++
.../iotdb/cluster/server/service/DataSyncService.java | 9 +++++++++
thrift-cluster/src/main/thrift/cluster.thrift | 2 ++
7 files changed, 61 insertions(+), 3 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index 6d73265..31bd41b 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -368,6 +368,16 @@ public class SyncClientAdaptor {
return handler.getResult(RaftServer.getReadOperationTimeoutMS());
}
+ public static Integer getDeviceCount(
+ AsyncDataClient client, Node header, List<String> pathsToQuery)
+ throws InterruptedException, TException {
+ AtomicReference<Integer> remoteResult = new AtomicReference<>(null);
+ GenericHandler<Integer> handler = new GenericHandler<>(client.getNode(),
remoteResult);
+
+ client.getDeviceCount(header, pathsToQuery, handler);
+ return handler.getResult(RaftServer.getReadOperationTimeoutMS());
+ }
+
public static Set<String> getAllDevices(
AsyncDataClient client, Node header, List<String> pathsToQuery)
throws InterruptedException, TException {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 83a6f06..551247d 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -226,8 +226,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
.getAsyncDataClient(node,
RaftServer.getReadOperationTimeoutMS());
client.setTimeout(RaftServer.getReadOperationTimeoutMS());
count =
- SyncClientAdaptor.getAllDevices(client,
partitionGroup.getHeader(), pathsToCount)
- .size();
+ SyncClientAdaptor.getDeviceCount(client,
partitionGroup.getHeader(), pathsToCount);
} else {
try (SyncDataClient syncDataClient =
metaGroupMember
@@ -235,7 +234,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
.getSyncDataClient(node,
RaftServer.getReadOperationTimeoutMS())) {
try {
syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS());
- count = syncDataClient.getAllDevices(partitionGroup.getHeader(),
pathsToCount).size();
+ count =
syncDataClient.getDeviceCount(partitionGroup.getHeader(), pathsToCount);
} catch (TException e) {
// the connection may be broken, close it to avoid it being
reused
syncDataClient.getInputProtocol().getTransport().close();
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index bd5c376..74a2721 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -909,6 +909,17 @@ public class LocalQueryExecutor {
return count;
}
+ public int getDeviceCount(List<String> pathsToQuery)
+ throws CheckConsistencyException, MetadataException {
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
+
+ int count = 0;
+ for (String s : pathsToQuery) {
+ count += getCMManager().getDevicesNum(new PartialPath(s));
+ }
+ return count;
+ }
+
@SuppressWarnings("java:S1135") // ignore todos
public ByteBuffer last(LastQueryRequest request)
throws CheckConsistencyException, QueryProcessException, IOException,
StorageEngineException,
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index 858459b..1ab3ca0 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -760,6 +760,16 @@ public class DataClusterServer extends RaftServer
}
@Override
+ public void getDeviceCount(
+ Node header, List<String> pathsToQuery, AsyncMethodCallback<Integer>
resultHandler)
+ throws TException {
+ DataAsyncService service = getDataAsyncService(header, resultHandler,
"count device");
+ if (service != null) {
+ service.getDeviceCount(header, pathsToQuery, resultHandler);
+ }
+ }
+
+ @Override
public void onSnapshotApplied(
Node header, List<Integer> slots, AsyncMethodCallback<Boolean>
resultHandler) {
DataAsyncService service = getDataAsyncService(header, resultHandler,
"Snapshot applied");
@@ -897,6 +907,11 @@ public class DataClusterServer extends RaftServer
}
@Override
+ public int getDeviceCount(Node header, List<String> pathsToQuery) throws
TException {
+ return getDataSyncService(header).getDeviceCount(header, pathsToQuery);
+ }
+
+ @Override
public HeartBeatResponse sendHeartbeat(HeartBeatRequest request) {
return getDataSyncService(request.getHeader()).sendHeartbeat(request);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index a592599..c6b392e 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -432,6 +432,18 @@ public class DataAsyncService extends BaseAsyncService
implements TSDataService.
}
@Override
+ public void getDeviceCount(
+ Node header, List<String> pathsToQuery, AsyncMethodCallback<Integer>
resultHandler)
+ throws TException {
+ try {
+ resultHandler.onComplete(
+
dataGroupMember.getLocalQueryExecutor().getDeviceCount(pathsToQuery));
+ } catch (CheckConsistencyException | MetadataException e) {
+ resultHandler.onError(e);
+ }
+ }
+
+ @Override
public void onSnapshotApplied(
Node header, List<Integer> slots, AsyncMethodCallback<Boolean>
resultHandler) {
resultHandler.onComplete(dataGroupMember.onSnapshotInstalled(slots));
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index 062dcc7..8ff3c35 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -410,6 +410,15 @@ public class DataSyncService extends BaseSyncService
implements TSDataService.If
}
@Override
+ public int getDeviceCount(Node header, List<String> pathsToQuery) throws
TException {
+ try {
+ return
dataGroupMember.getLocalQueryExecutor().getDeviceCount(pathsToQuery);
+ } catch (CheckConsistencyException | MetadataException e) {
+ throw new TException(e);
+ }
+ }
+
+ @Override
public ByteBuffer peekNextNotNullValue(Node header, long executorId, long
startTime, long endTime)
throws TException {
try {
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift
b/thrift-cluster/src/main/thrift/cluster.thrift
index e9332a5..d838b2c 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -463,6 +463,8 @@ service TSDataService extends RaftService {
int getPathCount(1: Node header 2: list<string> pathsToQuery 3: int level)
+ int getDeviceCount(1: Node header, 2: list<string> pathsToQuery)
+
/**
* During slot transfer, when a member has pulled snapshot from a group, the
member will use this
* method to inform the group that one replica of such slots has been pulled.