This is an automated email from the ASF dual-hosted git repository.
wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b6eb283 [IOTDB-1846] Optimize device count for rpc (#4187)
b6eb283 is described below
commit b6eb283e8a61044d3f53ce7ec7dc6c26fd6e16ff
Author: ZhangHongYin <[email protected]>
AuthorDate: Wed Oct 20 14:50:28 2021 +0800
[IOTDB-1846] Optimize device count for rpc (#4187)
[IOTDB-1846] Optimize device count for rpc (#4187)
---
.../iotdb/cluster/client/sync/SyncClientAdaptor.java | 10 ++++++++++
.../apache/iotdb/cluster/query/ClusterPlanExecutor.java | 5 ++---
.../apache/iotdb/cluster/query/LocalQueryExecutor.java | 11 +++++++++++
.../apache/iotdb/cluster/server/DataClusterServer.java | 15 +++++++++++++++
.../iotdb/cluster/server/service/DataAsyncService.java | 12 ++++++++++++
.../iotdb/cluster/server/service/DataSyncService.java | 9 +++++++++
thrift-cluster/src/main/thrift/cluster.thrift | 2 ++
7 files changed, 61 insertions(+), 3 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
index ee15d0d..c55ff46 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java
@@ -370,6 +370,16 @@ public class SyncClientAdaptor {
return handler.getResult(RaftServer.getReadOperationTimeoutMS());
}
+ public static Integer getDeviceCount(
+ AsyncDataClient client, RaftNode header, List<String> pathsToQuery)
+ throws InterruptedException, TException {
+ AtomicReference<Integer> remoteResult = new AtomicReference<>(null);
+ GenericHandler<Integer> handler = new GenericHandler<>(client.getNode(),
remoteResult);
+
+ client.getDeviceCount(header, pathsToQuery, handler);
+ return handler.getResult(RaftServer.getReadOperationTimeoutMS());
+ }
+
public static Set<String> getAllDevices(
AsyncDataClient client, RaftNode header, List<String> pathsToQuery)
throws InterruptedException, TException {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index ba2a928..ee62dc8 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -223,8 +223,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
.getAsyncDataClient(node,
RaftServer.getReadOperationTimeoutMS());
client.setTimeout(RaftServer.getReadOperationTimeoutMS());
count =
- SyncClientAdaptor.getAllDevices(client,
partitionGroup.getHeader(), pathsToCount)
- .size();
+ SyncClientAdaptor.getDeviceCount(client,
partitionGroup.getHeader(), pathsToCount);
} else {
try (SyncDataClient syncDataClient =
metaGroupMember
@@ -232,7 +231,7 @@ public class ClusterPlanExecutor extends PlanExecutor {
.getSyncDataClient(node,
RaftServer.getReadOperationTimeoutMS())) {
try {
syncDataClient.setTimeout(RaftServer.getReadOperationTimeoutMS());
- count = syncDataClient.getAllDevices(partitionGroup.getHeader(),
pathsToCount).size();
+ count =
syncDataClient.getDeviceCount(partitionGroup.getHeader(), pathsToCount);
} catch (TException e) {
// the connection may be broken, close it to avoid it being
reused
syncDataClient.getInputProtocol().getTransport().close();
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index 36a56e8..571d1c3 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -1014,6 +1014,17 @@ public class LocalQueryExecutor {
return count;
}
+ public int getDeviceCount(List<String> pathsToQuery)
+ throws CheckConsistencyException, MetadataException {
+ dataGroupMember.syncLeaderWithConsistencyCheck(false);
+
+ int count = 0;
+ for (String s : pathsToQuery) {
+ count += getCMManager().getDevicesNum(new PartialPath(s));
+ }
+ return count;
+ }
+
@SuppressWarnings("java:S1135") // ignore todos
public ByteBuffer last(LastQueryRequest request)
throws CheckConsistencyException, QueryProcessException, IOException,
StorageEngineException,
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
index e6be01a..35a8fe9 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java
@@ -867,6 +867,16 @@ public class DataClusterServer extends RaftServer
}
@Override
+ public void getDeviceCount(
+ RaftNode header, List<String> pathsToQuery, AsyncMethodCallback<Integer>
resultHandler)
+ throws TException {
+ DataAsyncService service = getDataAsyncService(header, resultHandler,
"count device");
+ if (service != null) {
+ service.getDeviceCount(header, pathsToQuery, resultHandler);
+ }
+ }
+
+ @Override
public void onSnapshotApplied(
RaftNode header, List<Integer> slots, AsyncMethodCallback<Boolean>
resultHandler) {
DataAsyncService service = getDataAsyncService(header, resultHandler,
"Snapshot applied");
@@ -1002,6 +1012,11 @@ public class DataClusterServer extends RaftServer
}
@Override
+ public int getDeviceCount(RaftNode header, List<String> pathsToQuery) throws
TException {
+ return getDataSyncService(header).getDeviceCount(header, pathsToQuery);
+ }
+
+ @Override
public boolean onSnapshotApplied(RaftNode header, List<Integer> slots) {
return getDataSyncService(header).onSnapshotApplied(header, slots);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
index eab4334..a360768 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataAsyncService.java
@@ -458,6 +458,18 @@ public class DataAsyncService extends BaseAsyncService
implements TSDataService.
}
@Override
+ public void getDeviceCount(
+ RaftNode header, List<String> pathsToQuery, AsyncMethodCallback<Integer>
resultHandler)
+ throws TException {
+ try {
+ resultHandler.onComplete(
+
dataGroupMember.getLocalQueryExecutor().getDeviceCount(pathsToQuery));
+ } catch (CheckConsistencyException | MetadataException e) {
+ resultHandler.onError(e);
+ }
+ }
+
+ @Override
public void onSnapshotApplied(
RaftNode header, List<Integer> slots, AsyncMethodCallback<Boolean>
resultHandler) {
resultHandler.onComplete(dataGroupMember.onSnapshotInstalled(slots));
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
index c43b617..a3320bb 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/DataSyncService.java
@@ -423,6 +423,15 @@ public class DataSyncService extends BaseSyncService
implements TSDataService.If
}
@Override
+ public int getDeviceCount(RaftNode header, List<String> pathsToQuery) throws
TException {
+ try {
+ return
dataGroupMember.getLocalQueryExecutor().getDeviceCount(pathsToQuery);
+ } catch (CheckConsistencyException | MetadataException e) {
+ throw new TException(e);
+ }
+ }
+
+ @Override
public boolean onSnapshotApplied(RaftNode header, List<Integer> slots) {
return dataGroupMember.onSnapshotInstalled(slots);
}
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift
b/thrift-cluster/src/main/thrift/cluster.thrift
index 61ae467..1cba433 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -463,6 +463,8 @@ service TSDataService extends RaftService {
int getPathCount(1: RaftNode header, 2: list<string> pathsToQuery, 3: int
level)
+ int getDeviceCount(1: RaftNode header, 2: list<string> pathsToQuery)
+
/**
* During slot transfer, when a member has pulled snapshot from a group, the
member will use this
* method to inform the group that one replica of such slots has been pulled.