This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 3020603ac6130b06d499b3da10120be5dbadc420
Author: lta <[email protected]>
AuthorDate: Mon May 20 11:12:12 2019 +0800

    Increase the function of query polling
---
 .../ClusterRpcSingleQueryManager.java              | 84 +++++++------------
 .../cluster/query/utils/ClusterRpcReaderUtils.java | 95 ++++++----------------
 .../iotdb/cluster/rpc/raft/NodeAsClient.java       |  3 +-
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  | 52 ++++++++----
 .../integration/IoTDBAggregationLargeDataIT.java   |  1 -
 5 files changed, 93 insertions(+), 142 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
index 6c4f2ad..af4db31 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcSingleQueryManager.java
@@ -37,12 +37,16 @@ import 
org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterFilterSeries
 import 
org.apache.iotdb.cluster.query.reader.coordinatornode.ClusterSelectSeriesReader;
 import org.apache.iotdb.cluster.query.utils.ClusterRpcReaderUtils;
 import org.apache.iotdb.cluster.query.utils.QueryPlanPartitionUtils;
+import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
+import 
org.apache.iotdb.cluster.rpc.raft.request.querydata.CloseSeriesReaderRequest;
+import 
org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
+import 
org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
+import 
org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicQueryDataResponse;
 import 
org.apache.iotdb.cluster.rpc.raft.response.querydata.InitSeriesReaderResponse;
 import 
org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataByTimestampResponse;
 import 
org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse;
 import org.apache.iotdb.cluster.utils.QPExecutorUtils;
-import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -140,8 +144,6 @@ public class ClusterRpcSingleQueryManager implements 
IClusterRpcSingleQueryManag
       String groupId = entry.getKey();
       QueryPlan queryPlan = entry.getValue();
       if (!QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
-        PeerId randomPeer = RaftUtils.getRandomPeerID(groupId);
-        queryNodes.put(groupId, randomPeer);
         Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class);
         allQueryPlan.put(PathType.SELECT_PATH, queryPlan);
         List<Filter> filterList = new ArrayList<>();
@@ -150,9 +152,12 @@ public class ClusterRpcSingleQueryManager implements 
IClusterRpcSingleQueryManag
           allQueryPlan.put(PathType.FILTER_PATH, 
filterGroupEntity.getQueryPlan());
           filterList = filterGroupEntity.getFilters();
         }
+        /** create request **/
+        BasicRequest request = InitSeriesReaderRequest
+            .createInitialQueryRequest(groupId, taskId, 
readDataConsistencyLevel,
+                allQueryPlan, filterList);
         InitSeriesReaderResponse response = (InitSeriesReaderResponse) 
ClusterRpcReaderUtils
-            .createClusterSeriesReader(groupId, randomPeer, 
readDataConsistencyLevel,
-                allQueryPlan, taskId, filterList);
+            .createClusterSeriesReader(groupId, request, this);
         handleInitReaderResponse(groupId, allQueryPlan, response);
       } else {
         dataGroupUsage.add(groupId);
@@ -167,14 +172,15 @@ public class ClusterRpcSingleQueryManager implements 
IClusterRpcSingleQueryManag
     for (Entry<String, FilterGroupEntity> entry : 
filterGroupEntityMap.entrySet()) {
       String groupId = entry.getKey();
       if (!selectPathPlans.containsKey(groupId)) {
-        PeerId randomPeer = RaftUtils.getRandomPeerID(groupId);
         Map<PathType, QueryPlan> allQueryPlan = new EnumMap<>(PathType.class);
         FilterGroupEntity filterGroupEntity = 
filterGroupEntityMap.get(groupId);
         allQueryPlan.put(PathType.FILTER_PATH, 
filterGroupEntity.getQueryPlan());
         List<Filter> filterList = filterGroupEntity.getFilters();
+        BasicRequest request = InitSeriesReaderRequest
+            .createInitialQueryRequest(groupId, taskId, 
readDataConsistencyLevel,
+                allQueryPlan, filterList);
         InitSeriesReaderResponse response = (InitSeriesReaderResponse) 
ClusterRpcReaderUtils
-            .createClusterSeriesReader(groupId, randomPeer, 
readDataConsistencyLevel,
-                allQueryPlan, taskId, filterList);
+            .createClusterSeriesReader(groupId, request, this);
         handleInitReaderResponse(groupId, allQueryPlan, response);
       }
     }
@@ -230,18 +236,21 @@ public class ClusterRpcSingleQueryManager implements 
IClusterRpcSingleQueryManag
         }
       }
     }
-    QuerySeriesDataResponse response = ClusterRpcReaderUtils
-        .fetchBatchData(groupId, queryNodes.get(groupId), taskId, 
PathType.SELECT_PATH,
-            fetchDataSeries,
+    BasicRequest request = QuerySeriesDataRequest
+        .createFetchDataRequest(groupId, taskId, PathType.SELECT_PATH, 
fetchDataSeries,
             queryRounds++);
+    QuerySeriesDataResponse response = (QuerySeriesDataResponse) 
ClusterRpcReaderUtils
+        .handleQueryRequest(request, queryNodes.get(groupId), 0);
     handleFetchDataResponseForSelectPaths(fetchDataSeries, response);
   }
 
   @Override
   public void fetchBatchDataForFilterPaths(String groupId) throws 
RaftConnectionException {
-    QuerySeriesDataResponse response = ClusterRpcReaderUtils
-        .fetchBatchData(groupId, queryNodes.get(groupId), taskId, 
PathType.FILTER_PATH, null,
-            queryRounds++);
+    BasicRequest request = QuerySeriesDataRequest
+        .createFetchDataRequest(groupId, taskId, PathType.FILTER_PATH, null, 
queryRounds++);
+    QuerySeriesDataResponse response = (QuerySeriesDataResponse) 
ClusterRpcReaderUtils
+        .handleQueryRequest(request, queryNodes.get(groupId), 0);
+
     handleFetchDataResponseForFilterPaths(groupId, response);
   }
 
@@ -253,9 +262,10 @@ public class ClusterRpcSingleQueryManager implements 
IClusterRpcSingleQueryManag
       String groupId = entry.getKey();
       List<String> fetchDataFilterSeries = new ArrayList<>();
       entry.getValue().forEach(path -> 
fetchDataFilterSeries.add(path.getFullPath()));
-      QuerySeriesDataByTimestampResponse response = ClusterRpcReaderUtils
-          .fetchBatchDataByTimestamp(groupId, queryNodes.get(groupId), taskId, 
queryRounds++,
-              batchTimestamp, fetchDataFilterSeries);
+      BasicRequest request = QuerySeriesDataByTimestampRequest
+          .createRequest(groupId, queryRounds++, taskId, batchTimestamp, 
fetchDataFilterSeries);
+      QuerySeriesDataByTimestampResponse response = 
(QuerySeriesDataByTimestampResponse) ClusterRpcReaderUtils
+          .handleQueryRequest(request, queryNodes.get(groupId), 0);
       handleFetchDataByTimestampResponseForSelectPaths(fetchDataFilterSeries, 
response);
     }
   }
@@ -332,7 +342,8 @@ public class ClusterRpcSingleQueryManager implements 
IClusterRpcSingleQueryManag
     for (Entry<String, PeerId> entry : queryNodes.entrySet()) {
       String groupId = entry.getKey();
       PeerId queryNode = entry.getValue();
-      ClusterRpcReaderUtils.releaseRemoteQueryResource(groupId, queryNode, 
taskId);
+      BasicRequest request = 
CloseSeriesReaderRequest.createReleaseResourceRequest(groupId, taskId);
+      ClusterRpcReaderUtils.handleQueryRequest(request, queryNode, 0);
     }
   }
 
@@ -356,60 +367,27 @@ public class ClusterRpcSingleQueryManager implements 
IClusterRpcSingleQueryManag
     return queryRounds;
   }
 
-  public void setQueryRounds(long queryRounds) {
-    this.queryRounds = queryRounds;
-  }
-
   public QueryPlan getOriginQueryPlan() {
     return originQueryPlan;
   }
 
-  public void setOriginQueryPlan(QueryPlan queryPlan) {
-    this.originQueryPlan = queryPlan;
-  }
-
-  public Map<String, PeerId> getQueryNodes() {
-    return queryNodes;
-  }
-
-  public void setQueryNodes(
-      Map<String, PeerId> queryNodes) {
-    this.queryNodes = queryNodes;
+  public void setQueryNode(String groupID, PeerId peerId) {
+    this.queryNodes.put(groupID, peerId);
   }
 
   public Map<String, QueryPlan> getSelectPathPlans() {
     return selectPathPlans;
   }
 
-  public void setSelectPathPlans(
-      Map<String, QueryPlan> selectPathPlans) {
-    this.selectPathPlans = selectPathPlans;
-  }
-
   public Map<String, List<Path>> getSelectSeriesByGroupId() {
     return selectSeriesByGroupId;
   }
 
-  public void setSelectSeriesByGroupId(
-      Map<String, List<Path>> selectSeriesByGroupId) {
-    this.selectSeriesByGroupId = selectSeriesByGroupId;
-  }
-
   public Map<Path, ClusterSelectSeriesReader> getSelectSeriesReaders() {
     return selectSeriesReaders;
   }
 
-  public void setSelectSeriesReaders(
-      Map<Path, ClusterSelectSeriesReader> selectSeriesReaders) {
-    this.selectSeriesReaders = selectSeriesReaders;
-  }
-
   public Map<String, FilterGroupEntity> getFilterGroupEntityMap() {
     return filterGroupEntityMap;
   }
-
-  public void setFilterGroupEntityMap(
-      Map<String, FilterGroupEntity> filterGroupEntityMap) {
-    this.filterGroupEntityMap = filterGroupEntityMap;
-  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
index dca2d30..0247bbe 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
@@ -19,32 +19,34 @@
 package org.apache.iotdb.cluster.query.utils;
 
 import com.alipay.sofa.jraft.entity.PeerId;
-import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
 import org.apache.iotdb.cluster.qp.task.QueryTask;
 import org.apache.iotdb.cluster.query.PathType;
+import 
org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
 import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 import 
org.apache.iotdb.cluster.rpc.raft.request.querydata.CloseSeriesReaderRequest;
-import 
org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
 import 
org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
 import 
org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 import 
org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataByTimestampResponse;
 import 
org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
-import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
-import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.cluster.utils.hash.Router;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Utils for cluster reader which needs to acquire data from remote query node.
  */
 public class ClusterRpcReaderUtils {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterRpcReaderUtils.class);
+
   /**
    * Count limit to redo a task
    */
@@ -56,72 +58,27 @@ public class ClusterRpcReaderUtils {
 
   /**
    * Create cluster series reader
-   *
-   * @param peerId query node to fetch data
-   * @param readDataConsistencyLevel consistency level of read data
-   * @param taskId task id assigned by coordinator node
    */
-  public static BasicResponse createClusterSeriesReader(String groupId, PeerId 
peerId,
-      int readDataConsistencyLevel, Map<PathType, QueryPlan> allQueryPlan, 
String taskId,
-      List<Filter> filterList) throws RaftConnectionException, IOException {
-
-    /** handle request **/
-    BasicRequest request = InitSeriesReaderRequest
-        .createInitialQueryRequest(groupId, taskId, readDataConsistencyLevel,
-            allQueryPlan, filterList);
-    return handleQueryRequest(request, peerId, 0);
-  }
-
-  /**
-   * Fetch batch data for select series in a query without value filter or 
filter series.
-   *
-   * @param groupId data group id
-   * @param peerId query node id
-   * @param taskId task id of query task
-   * @param pathType type of path
-   * @param fetchDataSeries series list which need to fetch data
-   * @param queryRounds query rounds
-   */
-  public static QuerySeriesDataResponse fetchBatchData(String groupId, PeerId 
peerId, String taskId,
-      PathType pathType, List<String> fetchDataSeries, long queryRounds)
+  public static BasicResponse createClusterSeriesReader(String groupId, 
BasicRequest request,
+      ClusterRpcSingleQueryManager manager)
       throws RaftConnectionException {
-    BasicRequest request = QuerySeriesDataRequest
-        .createFetchDataRequest(groupId, taskId, pathType, fetchDataSeries, 
queryRounds);
-    return (QuerySeriesDataResponse) handleQueryRequest(request, peerId, 0);
-  }
 
-  /**
-   * Fetch batch data corresponding to a given list of timestamp for select 
series in a query with
-   * value filter.
-   *
-   * @param groupId data group id
-   * @param peerId query node id
-   * @param taskId task id of query task
-   * @param queryRounds query rounds
-   * @param batchTimestamp list of valid timestamp
-   * @param fetchDataSeries series list which need to fetch data
-   */
-  public static QuerySeriesDataByTimestampResponse 
fetchBatchDataByTimestamp(String groupId,
-      PeerId peerId, String taskId, long queryRounds, List<Long> 
batchTimestamp,
-      List<String> fetchDataSeries)
-      throws RaftConnectionException {
-    BasicRequest request = QuerySeriesDataByTimestampRequest
-        .createRequest(groupId, queryRounds, taskId, batchTimestamp, 
fetchDataSeries);
-    return (QuerySeriesDataByTimestampResponse) handleQueryRequest(request, 
peerId, 0);
-  }
-
-  /**
-   * Release remote query resources
-   *
-   * @param groupId data group id
-   * @param peerId target query node
-   * @param taskId unique task id
-   */
-  public static void releaseRemoteQueryResource(String groupId, PeerId peerId, 
String taskId)
-      throws RaftConnectionException {
-
-    BasicRequest request = 
CloseSeriesReaderRequest.createReleaseResourceRequest(groupId, taskId);
-    handleQueryRequest(request, peerId, 0);
+    List<PeerId> peerIdList = RaftUtils
+        .getPeerIDList(groupId, Server.getInstance(), Router.getInstance());
+    int randomPeerIndex = RaftUtils.getRandomInt(peerIdList.size());
+    BasicResponse response;
+    for (int i = 0; i < peerIdList.size(); i++) {
+      PeerId peerId = peerIdList.get((i + randomPeerIndex) % 
peerIdList.size());
+      try {
+        response = handleQueryRequest(request, peerId, 0);
+        manager.setQueryNode(groupId, peerId);
+        return response;
+      } catch (RaftConnectionException e) {
+        LOGGER.error("Can not init series reader in Node<{}> of group<{}>", 
peerId, groupId, e);
+      }
+    }
+    throw new RaftConnectionException(
+        String.format("Can not init series reader in all nodes of group<%s>.", 
groupId));
   }
 
   /**
@@ -132,7 +89,7 @@ public class ClusterRpcReaderUtils {
    * @param taskRetryNum retry num of the request
    * @return Response from remote query node
    */
-  private static BasicResponse handleQueryRequest(BasicRequest request, PeerId 
peerId,
+  public static BasicResponse handleQueryRequest(BasicRequest request, PeerId 
peerId,
       int taskRetryNum)
       throws RaftConnectionException {
     if (taskRetryNum > TASK_MAX_RETRY) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
index bab1536..197c7eb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
@@ -42,8 +42,7 @@ public interface NodeAsClient {
    * @param peerId leader node of the target group
    *
    */
-  QueryTask syncHandleRequest(BasicRequest request, PeerId peerId)
-      throws RaftConnectionException;
+  QueryTask syncHandleRequest(BasicRequest request, PeerId peerId);
 
   /**
    * Shut down client
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index d5486fb..61fbdca 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -118,18 +118,29 @@ public class RaftUtils {
     return getRandomPeerID(groupId, server, router);
   }
 
+  /**
+   * Get random peer id
+   */
   public static PeerId getRandomPeerID(String groupId, Server server, Router 
router) {
-    PeerId randomPeerId;
+    List<PeerId> peerIdList = getPeerIDList(groupId, server, router);
+    return peerIdList.get(getRandomInt(peerIdList.size()));
+  }
+
+  /**
+   * Get peer id list by groupid
+   */
+  public static List<PeerId> getPeerIDList(String groupId, Server server, 
Router router) {
+    List<PeerId> peerIdList = new ArrayList<>();
     if (groupId.equals(ClusterConfig.METADATA_GROUP_ID)) {
       RaftService service = (RaftService) 
server.getMetadataHolder().getService();
-      List<PeerId> peerIdList = service.getPeerIdList();
-      randomPeerId = peerIdList.get(getRandomInt(peerIdList.size()));
+      peerIdList.addAll(service.getPeerIdList());
     } else {
       PhysicalNode[] physicalNodes = router.getNodesByGroupId(groupId);
-      PhysicalNode node = physicalNodes[getRandomInt(physicalNodes.length)];
-      randomPeerId = getPeerIDFrom(node);
+      for (PhysicalNode node : physicalNodes) {
+        peerIdList.add(getPeerIDFrom(node));
+      }
     }
-    return randomPeerId;
+    return peerIdList;
   }
 
   /**
@@ -196,7 +207,7 @@ public class RaftUtils {
 
   @OnlyForTest
   public static void clearRaftGroupLeader() {
-         groupLeaderCache.clear();
+    groupLeaderCache.clear();
   }
 
   /**
@@ -339,7 +350,8 @@ public class RaftUtils {
     try {
       LOGGER.debug("Handle null-read in data group for reading.");
       final byte[] reqContext = RaftUtils.createRaftRequestContext();
-      DataPartitionRaftHolder dataPartitionRaftHolder = 
(DataPartitionRaftHolder) server.getDataPartitionHolder(groupId);
+      DataPartitionRaftHolder dataPartitionRaftHolder = 
(DataPartitionRaftHolder) server
+          .getDataPartitionHolder(groupId);
       ((RaftService) dataPartitionRaftHolder.getService()).getNode()
           .readIndex(reqContext, new ReadIndexClosure() {
             @Override
@@ -360,7 +372,7 @@ public class RaftUtils {
     }
   }
 
-  public static Status createErrorStatus(String errorMsg){
+  public static Status createErrorStatus(String errorMsg) {
     Status status = new Status();
     status.setErrorMsg(errorMsg);
     status.setCode(-1);
@@ -386,8 +398,8 @@ public class RaftUtils {
   }
 
   /**
-   * Get all node information of the data group of input storage group.
-   * The first node is the current leader
+   * Get all node information of the data group of input storage group. The 
first node is the
+   * current leader
    *
    * @param sg storage group ID. If null, return metadata group info
    */
@@ -400,7 +412,8 @@ public class RaftUtils {
     PeerId[] nodes;
     if (sg == null) {
       groupId = ClusterConfig.METADATA_GROUP_ID;
-      List<PeerId> peerIdList = ((RaftService) 
server.getMetadataHolder().getService()).getPeerIdList();
+      List<PeerId> peerIdList = ((RaftService) 
server.getMetadataHolder().getService())
+          .getPeerIdList();
       nodes = peerIdList.toArray(new PeerId[peerIdList.size()]);
     } else {
       PhysicalNode[] group = router.routeGroup(sg);
@@ -434,7 +447,8 @@ public class RaftUtils {
     return getDataPartitionOfNode(ip, port, server, router);
   }
 
-  public static Map<String[], String[]> getDataPartitionOfNode(String ip, int 
port, Server server, Router router) {
+  public static Map<String[], String[]> getDataPartitionOfNode(String ip, int 
port, Server server,
+      Router router) {
     PhysicalNode[][] groups = router.getGroupsNodes(ip, port);
     if (groups == null) {
       return null;
@@ -444,7 +458,8 @@ public class RaftUtils {
     for (int i = 0; i < groups.length; i++) {
       groupSGMap.put(generateStringKey(groups[i]), new ArrayList<>());
     }
-    Set<String> allSGList = 
((MetadataStateManchine)((RaftService)server.getMetadataHolder().getService()).getFsm()).getAllStorageGroups();
+    Set<String> allSGList = ((MetadataStateManchine) ((RaftService) 
server.getMetadataHolder()
+        .getService()).getFsm()).getAllStorageGroups();
     for (String sg : allSGList) {
       String key = generateStringKey(router.routeGroup(sg));
       if (groupSGMap.containsKey(key)) {
@@ -496,7 +511,8 @@ public class RaftUtils {
     RaftService raftService = (RaftService) 
server.getMetadataHolder().getService();
     metricMap.put(raftService.getGroupId(), 
getReplicaMetricFromRaftService(raftService, metric));
 
-    router.getAllGroupId().forEach(groupId -> metricMap.put(groupId, 
getReplicaMetric(groupId, metric)));
+    router.getAllGroupId()
+        .forEach(groupId -> metricMap.put(groupId, getReplicaMetric(groupId, 
metric)));
     return metricMap;
   }
 
@@ -505,12 +521,14 @@ public class RaftUtils {
       RaftService service = (RaftService) 
server.getDataPartitionHolder(groupId).getService();
       return getReplicaMetricFromRaftService(service, metric);
     } else {
-      LOGGER.debug("Current host does not contain group {}, all groups are 
{}.", groupId, server.getDataPartitionHolderMap().keySet());
+      LOGGER.debug("Current host does not contain group {}, all groups are 
{}.", groupId,
+          server.getDataPartitionHolderMap().keySet());
       return getReplicaMetricFromRemoteNode(groupId, metric);
     }
   }
 
-  private static Map<String, Long> getReplicaMetricFromRaftService(RaftService 
service, String metric) {
+  private static Map<String, Long> getReplicaMetricFromRaftService(RaftService 
service,
+      String metric) {
     String groupId = service.getGroupId();
     LOGGER.debug("Get replica metric {} for group {}.", metric, 
service.getGroupId());
     NodeImpl node = (NodeImpl) service.getNode();
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java
 
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java
index 973d3e7..494029c 100644
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBAggregationLargeDataIT.java
@@ -35,7 +35,6 @@ import java.sql.Statement;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.entity.Server;
-import org.apache.iotdb.cluster.integration.Constant;
 import org.apache.iotdb.cluster.utils.EnvironmentUtils;
 import org.apache.iotdb.cluster.utils.QPExecutorUtils;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;

Reply via email to