This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 51f91b6b99a8b86c68790bf7af308ee9380b31e6
Merge: 411ea77 41a49ad
Author: lta <[email protected]>
AuthorDate: Thu May 23 15:27:05 2019 +0800

    fix lots of conflicts with cluster_framework

 .../iotdb/cluster/concurrent/ThreadName.java       |   5 +
 .../concurrent/pool/NodeAsClientThreadManager.java |  64 ++++++++
 .../concurrent/pool/QPTaskThreadManager.java       |  64 ++++++++
 .../concurrent/pool/QueryTimerThreadManager.java   |  74 ++++++++++
 .../cluster/concurrent/pool/ThreadPoolManager.java |  15 +-
 .../apache/iotdb/cluster/config/ClusterConfig.java |  34 ++---
 .../iotdb/cluster/config/ClusterConstant.java      |   2 +-
 .../iotdb/cluster/config/ClusterDescriptor.java    |  17 ++-
 .../org/apache/iotdb/cluster/entity/Server.java    |  23 ++-
 .../cluster/entity/raft/DataStateMachine.java      |  18 ++-
 .../cluster/qp/executor/AbstractQPExecutor.java    |  23 ++-
 .../cluster/qp/executor/NonQueryExecutor.java      |  72 ++++-----
 .../cluster/qp/executor/QueryMetadataExecutor.java | 142 +++++++++++-------
 .../apache/iotdb/cluster/qp/task/BatchQPTask.java  |  72 ++++-----
 .../iotdb/cluster/qp/task/DataQueryTask.java       |  30 +---
 .../org/apache/iotdb/cluster/qp/task/QPTask.java   |  51 ++++++-
 .../apache/iotdb/cluster/qp/task/SingleQPTask.java |   2 +-
 .../coordinatornode/ClusterRpcQueryManager.java    |  12 ++
 .../coordinatornode/IClusterRpcQueryManager.java   |   5 +
 .../querynode/ClusterLocalQueryManager.java        |  13 ++
 .../querynode/IClusterLocalQueryManager.java       |   5 +
 .../cluster/query/utils/ClusterRpcReaderUtils.java |  22 ++-
 .../iotdb/cluster/rpc/raft/NodeAsClient.java       |  14 +-
 .../rpc/raft/impl/RaftNodeAsClientManager.java     | 161 ++++++++-------------
 .../raft/processor/QueryMetricAsyncProcessor.java  |  44 ++++++
 .../nonquery/DataGroupNonQueryAsyncProcessor.java  |   5 +-
 .../querymetadata/QueryMetadataAsyncProcessor.java |   2 +-
 .../raft/request/QueryMetricRequest.java}          |  27 ++--
 .../raft/response/QueryMetricResponse.java}        |  35 ++---
 .../nonquery/DataGroupNonQueryResponse.java        |  12 ++
 .../cluster/service/TSServiceClusterImpl.java      |  51 ++++---
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  |  54 +++----
 .../iotdb/cluster/utils/hash/PhysicalNode.java     |   5 +
 .../apache/iotdb/cluster/utils/hash/Router.java    |   8 +-
 .../iotdb/cluster/utils/hash/VirtualNode.java      |  18 +--
 .../concurrent/pool/QPTaskThreadManagerTest.java   |  85 +++++++++++
 .../cluster/config/ClusterDescriptorTest.java      |  14 +-
 .../integration/IoTDBMetadataFetchAbstract.java    |  63 ++++----
 .../integration/IoTDBMetadataFetchLocallyIT.java   |   1 +
 .../apache/iotdb/cluster/utils/RaftUtilsTest.java  |  19 +--
 .../java/org/apache/iotdb/cluster/utils/Utils.java |   1 -
 .../iotdb/cluster/utils/hash/MD5HashTest.java      |   8 +-
 .../iotdb/cluster/utils/hash/PhysicalNodeTest.java |   4 +-
 .../iotdb/cluster/utils/hash/RouterTest.java       |  11 +-
 .../UserGuideV0.7.0/7-Tools-NodeTool.md            |   2 +-
 iotdb/iotdb/conf/iotdb-cluster.properties          |  15 +-
 .../iotdb/db/qp/executor/QueryProcessExecutor.java |   2 +-
 .../db/query/executor/AggregateEngineExecutor.java |   2 -
 .../iotdb/db/query/executor/EngineQueryRouter.java |   1 -
 .../db/query/executor/IEngineQueryRouter.java      |  78 ++++++++++
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  19 ++-
 service-rpc/src/main/thrift/rpc.thrift             |   2 +-
 52 files changed, 1015 insertions(+), 513 deletions(-)

diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/ThreadPoolManager.java
index 828cc1a,72bec94..60e8a75
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/ThreadPoolManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/concurrent/pool/ThreadPoolManager.java
@@@ -38,7 -42,7 +42,7 @@@ public abstract class ThreadPoolManage
    /**
     * Init pool manager
     */
--  public void init(){
++  public void init() {
      pool = IoTDBThreadPoolFactory.newFixedThreadPool(getThreadPoolSize(), 
getThreadName());
    }
  
@@@ -53,14 -57,14 +57,13 @@@
    public void close(boolean block, long timeout) throws ProcessorException {
      if (pool != null) {
        try {
--        pool.shutdown();
++        pool.shutdownNow();
          if (block) {
            try {
              if (!pool.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
-               throw new ProcessorException(
 -              LOGGER.debug(
--                  String
--                      .format("%s thread pool doesn't exit after %d ms", 
getManagerName(),
--                          timeout));
++              LOGGER
++                  .debug(String.format("%s thread pool doesn't exit after %d 
ms", getManagerName(),
++                      timeout));
              }
            } catch (InterruptedException e) {
              Thread.currentThread().interrupt();
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 627f561,1ab6eda..230afcb
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@@ -110,15 -110,15 +110,15 @@@ public class ClusterConfig 
    private int numOfVirtualNodes = 2;
  
    /**
-    * Maximum number of @NodeAsClient usage
 -   * Maximum number of inner rpc client thread. When this value <= 0, use CPU 
core number * 10
++   * Maximum number of inner rpc client thread. When this value <= 0, use CPU 
core number * 5
     */
-   private int maxNumOfInnerRpcClient = 500;
 -  private int concurrentInnerRpcClientThread = 
Runtime.getRuntime().availableProcessors() * 10;
++  private int concurrentInnerRpcClientThread = 
Runtime.getRuntime().availableProcessors() * 5;
  
    /**
-    * Maximum number of queue length to use @NodeAsClient, the request which 
exceed to this number
-    * will be rejected.
+    * Maximum number of queue length of qp task which is waiting to be 
executed. If the num of
+    * waiting qp tasks exceed to this number, new qp task will be rejected.
     */
-   private int maxQueueNumOfInnerRpcClient = 500;
+   private int maxQueueNumOfQPTask = 500;
  
    /**
     * ReadMetadataConsistencyLevel: 1 Strong consistency, 2 Weak consistency
@@@ -135,11 -135,11 +135,11 @@@
     * client request corresponds to a QP Task. A QP task may be divided into 
several sub-tasks. So
     * this value is the sum of all sub-tasks. When this value <= 0, use CPU 
core number * 10
     */
--  private int concurrentQPSubTaskThread = 
Runtime.getRuntime().availableProcessors() * 10;
++  private int concurrentQPSubTaskThread = 
Runtime.getRuntime().availableProcessors() * 5;
  
    /**
-    * Batch data size read from remote query node once while reading, default 
value is 10000.
-    * The smaller the parameter, the more communication times and the more 
time-consuming it is.
+    * Batch data size read from remote query node once while reading, default 
value is 10000. The
+    * smaller the parameter, the more communication times and the more 
time-consuming it is.
     */
    private int batchReadSize = 10000;
  
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index a84e389,85ab80d..d12f78f
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@@ -31,28 -31,19 +31,32 @@@ import org.apache.iotdb.cluster.entity.
  import org.apache.iotdb.cluster.entity.metadata.MetadataHolder;
  import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
  import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
++import org.apache.iotdb.cluster.exception.RaftConnectionException;
++import 
org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcQueryManager;
++import 
org.apache.iotdb.cluster.query.manager.querynode.ClusterLocalQueryManager;
  import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
+ import org.apache.iotdb.cluster.rpc.raft.processor.QueryMetricAsyncProcessor;
  import 
org.apache.iotdb.cluster.rpc.raft.processor.nonquery.DataGroupNonQueryAsyncProcessor;
  import 
org.apache.iotdb.cluster.rpc.raft.processor.nonquery.MetaGroupNonQueryAsyncProcessor;
 +import 
org.apache.iotdb.cluster.rpc.raft.processor.querydata.CloseSeriesReaderSyncProcessor;
 +import 
org.apache.iotdb.cluster.rpc.raft.processor.querydata.InitSeriesReaderSyncProcessor;
 +import 
org.apache.iotdb.cluster.rpc.raft.processor.querydata.QuerySeriesDataByTimestampSyncProcessor;
 +import 
org.apache.iotdb.cluster.rpc.raft.processor.querydata.QuerySeriesDataSyncProcessor;
  import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryMetadataAsyncProcessor;
  import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryMetadataInStringAsyncProcessor;
  import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryPathsAsyncProcessor;
  import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QuerySeriesTypeAsyncProcessor;
  import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryTimeSeriesAsyncProcessor;
 +import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryJobNumAsyncProcessor;
 +import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryLeaderAsyncProcessor;
- import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryMetricAsyncProcessor;
 +import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryStatusAsyncProcessor;
 +import org.apache.iotdb.cluster.service.ClusterMonitor;
  import org.apache.iotdb.cluster.utils.RaftUtils;
  import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
  import org.apache.iotdb.cluster.utils.hash.Router;
++import org.apache.iotdb.db.exception.FileNodeManagerException;
  import org.apache.iotdb.db.exception.ProcessorException;
 +import org.apache.iotdb.db.exception.StartupException;
  import org.apache.iotdb.db.service.IoTDB;
  import org.apache.iotdb.db.service.RegisterManager;
  import org.slf4j.Logger;
@@@ -92,12 -83,12 +96,15 @@@ public class Server 
  
    private RegisterManager registerManager = new RegisterManager();
  
-   public static void main(String[] args) throws ProcessorException, 
InterruptedException {
 -  public static void main(String[] args) {
++  public static void main(String[] args)
++      throws ProcessorException, InterruptedException, 
RaftConnectionException, FileNodeManagerException {
      Server server = Server.getInstance();
      server.start();
    }
  
-   public void start() throws ProcessorException, InterruptedException {
 -  public void start() {
++  public void start()
++      throws ProcessorException, InterruptedException, 
RaftConnectionException, FileNodeManagerException {
++
      /** Stand-alone version of IoTDB, be careful to replace the internal JDBC 
Server with a cluster version **/
      iotdb = new IoTDB();
      iotdb.active();
@@@ -157,24 -142,17 +166,26 @@@
      rpcServer.registerUserProcessor(new QueryPathsAsyncProcessor());
    }
  
 -  /**
 -   * for nodetool
 -   */
 +  private void registerQueryDataProcessor(RpcServer rpcServer) {
 +    rpcServer.registerUserProcessor(new InitSeriesReaderSyncProcessor());
 +    rpcServer.registerUserProcessor(new QuerySeriesDataSyncProcessor());
 +    rpcServer.registerUserProcessor(new 
QuerySeriesDataByTimestampSyncProcessor());
 +    rpcServer.registerUserProcessor(new CloseSeriesReaderSyncProcessor());
 +  }
 +
    private void registerQueryMetricProcessor(RpcServer rpcServer) {
      rpcServer.registerUserProcessor(new QueryMetricAsyncProcessor());
 +    rpcServer.registerUserProcessor(new QueryJobNumAsyncProcessor());
 +    rpcServer.registerUserProcessor(new QueryStatusAsyncProcessor());
 +    rpcServer.registerUserProcessor(new QueryLeaderAsyncProcessor());
    }
  
-   public void stop() throws ProcessorException, InterruptedException {
-     QPTaskManager.getInstance().close(true, 
ClusterConstant.CLOSE_QP_SUB_TASK_BLOCK_TIMEOUT);
 -  public void stop() throws ProcessorException {
 -    QPTaskThreadManager.getInstance().close(true, 
ClusterConstant.CLOSE_THREAD_POOL_BLOCK_TIMEOUT);
--    iotdb.deactivate();
++  public void stop() throws ProcessorException, RaftConnectionException, 
FileNodeManagerException {
++    QPTaskManager.getInstance().close(true, 
ClusterConstant.CLOSE_THREAD_POOL_BLOCK_TIMEOUT);
++    ClusterRpcQueryManager.getInstance().close();
++    ClusterLocalQueryManager.getInstance().close();
      CLIENT_MANAGER.shutdown();
++    iotdb.deactivate();
      metadataHolder.stop();
      for (DataPartitionHolder dataPartitionHolder : 
dataPartitionHolderMap.values()) {
        dataPartitionHolder.stop();
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
index 9841781,96f150f..faacfff
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/AbstractQPExecutor.java
@@@ -87,7 -87,7 +87,6 @@@ public abstract class AbstractQPExecuto
     * Async handle QPTask by QPTask and leader id
     *
     * @param task request QPTask
--   * @param leader leader of the target raft group
     * @param taskRetryNum Number of QPTask retries due to timeout and 
redirected.
     * @return basic response
     */
@@@ -127,7 -123,7 +122,10 @@@
      task.await();
      PeerId leader;
      if (task.getTaskState() != TaskState.FINISH) {
--      if (task.getTaskState() == TaskState.REDIRECT) {
++      if (task.getTaskState() == TaskState.RAFT_CONNECTION_EXCEPTION) {
++        throw new RaftConnectionException(
++            String.format("Can not connect to remote node : %s", 
task.getTargetNode()));
++      } else if (task.getTaskState() == TaskState.REDIRECT) {
          /** redirect to the right leader **/
          leader = PeerId.parsePeer(task.getResponse().getLeaderStr());
          LOGGER.debug("Redirect leader: {}, group id = {}", leader, 
task.getRequest().getGroupID());
@@@ -136,10 -132,11 +134,11 @@@
          String groupId = task.getRequest().getGroupID();
          RaftUtils.removeCachedRaftGroupLeader(groupId);
          LOGGER.debug("Remove cached raft group leader of {}", groupId);
 -        leader = RaftUtils.getLeaderPeerID(groupId);
 +        leader = RaftUtils.getLocalLeaderPeerID(groupId);
        }
+       task.setTargetNode(leader);
        task.resetTask();
-       return asyncHandleNonQuerySingleTaskGetRes(task, leader, taskRetryNum + 
1);
+       return syncHandleNonQuerySingleTaskGetRes(task, taskRetryNum + 1);
      }
      return task.getResponse();
    }
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 515fb6a,f62a83f..7ba9ef7
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@@ -133,7 -131,7 +132,8 @@@ public class NonQueryExecutor extends A
     */
    private void classifyPhysicalPlanByGroupId(PhysicalPlan[] physicalPlans, 
BatchResult batchResult,
        Map<String, List<PhysicalPlan>> physicalPlansMap, Map<String, 
List<Integer>> planIndexMap) {
-     int[] result = batchResult.getResult();
++
+     int[] result = batchResult.getResultArray();
      for (int i = 0; i < result.length; i++) {
        /** Check if the request has failed. If it has failed, ignore it. **/
        if (result[i] != Statement.EXECUTE_FAILED) {
@@@ -141,19 -139,16 +141,17 @@@
          try {
            String groupId = getGroupIdFromPhysicalPlan(plan);
            if (groupId.equals(ClusterConfig.METADATA_GROUP_ID)) {
++
+             // this is for set storage group statement and role/user 
management statement.
              LOGGER.debug("Execute metadata group task");
              boolean executeResult = handleNonQueryRequest(groupId, plan);
-             nullReaderEnable = true;
-             result[i] =  executeResult ? Statement.SUCCESS_NO_INFO
+             emptyTaskEnable = true;
+             result[i] = executeResult ? Statement.SUCCESS_NO_INFO
                  : Statement.EXECUTE_FAILED;
              batchResult.setAllSuccessful(executeResult);
-           }else {
-             if (!physicalPlansMap.containsKey(groupId)) {
-               physicalPlansMap.put(groupId, new ArrayList<>());
-               planIndexMap.put(groupId, new ArrayList<>());
-             }
-             physicalPlansMap.get(groupId).add(plan);
-             planIndexMap.get(groupId).add(i);
+           } else {
+             physicalPlansMap.computeIfAbsent(groupId, l -> new 
ArrayList<>()).add(plan);
+             planIndexMap.computeIfAbsent(groupId, l -> new 
ArrayList<>()).add(i);
            }
          } catch (PathErrorException | ProcessorException | IOException | 
RaftConnectionException | InterruptedException e) {
            result[i] = Statement.EXECUTE_FAILED;
@@@ -327,42 -323,9 +326,44 @@@
      if (QPExecutorUtils.canHandleNonQueryByGroupId(groupId)) {
        return handleNonQueryRequestLocally(groupId, qpTask);
      } else {
 -      PeerId leader = RaftUtils.getLeaderPeerID(groupId);
 +      PeerId leader = RaftUtils.getLocalLeaderPeerID(groupId);
 +      boolean res = false;
+       qpTask.setTargetNode(leader);
 -      return syncHandleNonQueryTask(qpTask);
 +      try {
-          res = asyncHandleNonQueryTask(qpTask, leader);
++         res = syncHandleNonQueryTask(qpTask);
 +      } catch (RaftConnectionException ex) {
 +        boolean success = false;
 +        PeerId nextNode = RaftUtils.getPeerIDInOrder(groupId);
 +        PeerId firstNode = nextNode;
 +        boolean first = true;
 +        while (!success) {
 +          try {
 +            if (!first) {
 +              nextNode = RaftUtils.getPeerIDInOrder(groupId);
 +              if (firstNode.equals(nextNode)) {
 +                break;
 +              }
 +            }
 +            first = false;
 +            LOGGER.debug("Previous task fail, then send non-query task for 
group {} to node {}.", groupId, nextNode);
 +            qpTask.resetTask();
++            qpTask.setTargetNode(nextNode);
 +            qpTask.setTaskState(TaskState.INITIAL);
 +            currentTask.set(qpTask);
-             res = asyncHandleNonQueryTask(qpTask, nextNode);
++            res = syncHandleNonQueryTask(qpTask);
 +            LOGGER.debug("Non-query task for group {} to node {} succeed.", 
groupId, nextNode);
 +            success = true;
 +            RaftUtils.updateRaftGroupLeader(groupId, nextNode);
 +          } catch (RaftConnectionException e1) {
 +            LOGGER.debug("Non-query task for group {} to node {} fail.", 
groupId, nextNode);
 +          }
 +        }
 +        LOGGER.debug("The final result for non-query task is {}", success);
 +        if (!success) {
 +          throw ex;
 +        }
 +      }
 +      return res;
      }
    }
  
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 6043ce6,a258d7f..625269e
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@@ -26,13 -26,12 +26,13 @@@ import java.util.List
  import java.util.Map;
  import java.util.Map.Entry;
  import java.util.Set;
- import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
--import org.apache.iotdb.cluster.qp.task.SingleQPTask;
  import org.apache.iotdb.cluster.config.ClusterConfig;
  import org.apache.iotdb.cluster.config.ClusterConstant;
  import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
  import org.apache.iotdb.cluster.entity.raft.RaftService;
  import org.apache.iotdb.cluster.exception.RaftConnectionException;
++import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
++import org.apache.iotdb.cluster.qp.task.SingleQPTask;
  import 
org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryMetadataInStringRequest;
  import 
org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryMetadataRequest;
  import 
org.apache.iotdb.cluster.rpc.raft.request.querymetadata.QueryPathsRequest;
@@@ -106,19 -105,19 +106,19 @@@ public class QueryMetadataExecutor exte
          StringBuilder path = new StringBuilder();
          String[] storageGroupNodes = storageGroup.split(DOUB_SEPARATOR);
          String[] queryPathNodes = queryPath.split(DOUB_SEPARATOR);
--        for(int  i = 0 ; i < queryPathNodes.length ; i++){
--          if(i >= storageGroupNodes.length){
++        for (int i = 0; i < queryPathNodes.length; i++) {
++          if (i >= storageGroupNodes.length) {
              path.append(queryPathNodes[i]).append(SINGLE_SEPARATOR);
            } else {
              path.append(storageGroupNodes[i]).append(SINGLE_SEPARATOR);
            }
          }
--        paths.add(path.deleteCharAt(path.length()-1).toString());
++        paths.add(path.deleteCharAt(path.length() - 1).toString());
        }
      }
      return paths;
    }
--  
++
    /**
     * Handle query timeseries in one data group
     *
@@@ -134,38 -133,16 +134,47 @@@
      PeerId holder;
      /** Check if the plan can be executed locally. **/
      if (QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
--      LOGGER.debug("Execute show timeseries {} statement locally for group {} 
by sending request to local node.", pathList, groupId);
++      LOGGER.debug(
++          "Execute show timeseries {} statement locally for group {} by 
sending request to local node.",
++          pathList, groupId);
        holder = this.server.getServerId();
      } else {
 -      holder = RaftUtils.getRandomPeerID(groupId);
 +      holder = RaftUtils.getPeerIDInOrder(groupId);
      }
+     task.setTargetNode(holder);
      try {
-       LOGGER.debug("Send show timeseries {} task for group {} to node {}.", 
pathList, groupId, holder);
-       res.addAll(queryTimeSeries(task, holder));
++      LOGGER.debug("Send show timeseries {} task for group {} to node {}.", 
pathList, groupId,
++          holder);
+       res.addAll(queryTimeSeries(task));
      } catch (RaftConnectionException e) {
 -      throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
 +      boolean success = false;
 +      while (!success) {
 +        PeerId nextNode = null;
 +        try {
 +          nextNode = RaftUtils.getPeerIDInOrder(groupId);
 +          if (holder.equals(nextNode)) {
 +            break;
 +          }
-           LOGGER.debug("Previous task fail, then send show timeseries {} task 
for group {} to node {}.", pathList, groupId, nextNode);
++          LOGGER.debug(
++              "Previous task fail, then send show timeseries {} task for 
group {} to node {}.",
++              pathList, groupId, nextNode);
 +          task.resetTask();
++          task.setTargetNode(holder);
 +          task.setTaskState(TaskState.INITIAL);
-           res.addAll(queryTimeSeries(task, nextNode));
-           LOGGER.debug("Show timeseries {} task for group {} to node {} 
succeed.", pathList, groupId, nextNode);
++          res.addAll(queryTimeSeries(task));
++          LOGGER
++              .debug("Show timeseries {} task for group {} to node {} 
succeed.", pathList, groupId,
++                  nextNode);
 +          success = true;
 +        } catch (RaftConnectionException e1) {
-           LOGGER.debug("Show timeseries {} task for group {} to node {} 
fail.", pathList, groupId, nextNode);
-           continue;
++          LOGGER.debug("Show timeseries {} task for group {} to node {} 
fail.", pathList, groupId,
++              nextNode);
 +        }
 +      }
 +      LOGGER.debug("The final result for show timeseries {} task is {}", 
pathList, success);
 +      if (!success) {
 +        throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
 +      }
      }
    }
  
@@@ -185,38 -162,16 +194,45 @@@
        PeerId holder;
        /** Check if the plan can be executed locally. **/
        if (QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
--        LOGGER.debug("Execute show metadata in string statement locally for 
group {} by sending request to local node.", groupId);
++        LOGGER.debug(
++            "Execute show metadata in string statement locally for group {} 
by sending request to local node.",
++            groupId);
          holder = this.server.getServerId();
        } else {
 -        holder = RaftUtils.getRandomPeerID(groupId);
 +        holder = RaftUtils.getPeerIDInOrder(groupId);
        }
+       task.setTargetNode(holder);
        try {
 +        LOGGER.debug("Send show metadata in string task for group {} to node 
{}.", groupId, holder);
-         asyncSendNonQuerySingleTask(task, holder, 0);
+         asyncSendNonQuerySingleTask(task, 0);
        } catch (RaftConnectionException e) {
 -        throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
 +        boolean success = false;
 +        while (!success) {
 +          PeerId nextNode = null;
 +          try {
 +            nextNode = RaftUtils.getPeerIDInOrder(groupId);
 +            if (holder.equals(nextNode)) {
 +              break;
 +            }
-             LOGGER.debug("Previous task fail, then send show metadata in 
string task for group {} to node {}.", groupId, nextNode);
++            LOGGER.debug(
++                "Previous task fail, then send show metadata in string task 
for group {} to node {}.",
++                groupId, nextNode);
 +            task.resetTask();
++            task.setTargetNode(nextNode);
 +            task.setTaskState(TaskState.INITIAL);
-             asyncSendNonQuerySingleTask(task, nextNode, 0);
-             LOGGER.debug("Show metadata in string task for group {} to node 
{} succeed.", groupId, nextNode);
++            asyncSendNonQuerySingleTask(task, 0);
++            LOGGER.debug("Show metadata in string task for group {} to node 
{} succeed.", groupId,
++                nextNode);
 +            success = true;
 +          } catch (RaftConnectionException e1) {
-             LOGGER.debug("Show metadata in string task for group {} to node 
{} fail.", groupId, nextNode);
-             continue;
++            LOGGER.debug("Show metadata in string task for group {} to node 
{} fail.", groupId,
++                nextNode);
 +          }
 +        }
 +        LOGGER.debug("The final result for show metadata in string task is 
{}", success);
 +        if (!success) {
 +          throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
 +        }
        }
      }
      for (int i = 0; i < taskList.size(); i++) {
@@@ -224,13 -179,9 +240,14 @@@
        task.await();
        BasicResponse response = task.getResponse();
        if (response == null || !response.isSuccess()) {
 -        throw new ProcessorException();
 +        String errorMessage = "response is null";
 +        if (response != null && response.getErrorMsg() != null) {
 +          errorMessage = response.getErrorMsg();
 +        }
-         throw new ProcessorException("Execute show metadata in string 
statement fail because " + errorMessage);
++        throw new ProcessorException(
++            "Execute show metadata in string statement fail because " + 
errorMessage);
        }
--      
metadataList.add(((QueryMetadataInStringResponse)response).getMetadata());
++      metadataList.add(((QueryMetadataInStringResponse) 
response).getMetadata());
      }
      return combineMetadataInStringList(metadataList);
    }
@@@ -251,38 -202,16 +268,44 @@@
        PeerId holder;
        /** Check if the plan can be executed locally. **/
        if (QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
--        LOGGER.debug("Execute query metadata statement locally for group {} 
by sending request to local node.", groupId);
++        LOGGER.debug(
++            "Execute query metadata statement locally for group {} by sending 
request to local node.",
++            groupId);
          holder = this.server.getServerId();
        } else {
 -        holder = RaftUtils.getRandomPeerID(groupId);
 +        holder = RaftUtils.getPeerIDInOrder(groupId);
        }
+       task.setTargetNode(holder);
        try {
 +        LOGGER.debug("Send query metadata task for group {} to node {}.", 
groupId, holder);
-         asyncSendNonQuerySingleTask(task, holder, 0);
+         asyncSendNonQuerySingleTask(task, 0);
        } catch (RaftConnectionException e) {
 -        throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
 +        boolean success = false;
 +        while (!success) {
 +          PeerId nextNode = null;
 +          try {
 +            nextNode = RaftUtils.getPeerIDInOrder(groupId);
 +            if (holder.equals(nextNode)) {
 +              break;
 +            }
-             LOGGER.debug("Previous task fail, then send query metadata task 
for group {} to node {}.", groupId, nextNode);
++            LOGGER
++                .debug("Previous task fail, then send query metadata task for 
group {} to node {}.",
++                    groupId, nextNode);
 +            task.resetTask();
++            task.setTargetNode(nextNode);
 +            task.setTaskState(TaskState.INITIAL);
-             asyncSendNonQuerySingleTask(task, nextNode, 0);
++            asyncSendNonQuerySingleTask(task, 0);
 +            LOGGER.debug("Query metadata task for group {} to node {} 
succeed.", groupId, nextNode);
 +            success = true;
 +          } catch (RaftConnectionException e1) {
 +            LOGGER.debug("Query metadata task for group {} to node {} fail.", 
groupId, nextNode);
 +            continue;
 +          }
 +        }
 +        LOGGER.debug("The final result for query metadata task is {}", 
success);
 +        if (!success) {
 +          throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
 +        }
        }
      }
      for (int i = 0; i < taskList.size(); i++) {
@@@ -294,9 -223,9 +317,10 @@@
          if (response != null && response.getErrorMsg() != null) {
            errorMessage = response.getErrorMsg();
          }
-         throw new ProcessorException("Execute query metadata statement fail 
because " + errorMessage);
 -        throw new ProcessorException("Execute query metadata statement false 
because " + errorMessage);
++        throw new ProcessorException(
++            "Execute query metadata statement fail because " + errorMessage);
        }
--      metadatas[i] = ((QueryMetadataResponse)response).getMetadata();
++      metadatas[i] = ((QueryMetadataResponse) response).getMetadata();
      }
      return Metadata.combineMetadatas(metadatas);
    }
@@@ -317,38 -246,16 +341,47 @@@
        PeerId holder;
        /** Check if the plan can be executed locally. **/
        if (QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
--        LOGGER.debug("Execute get series type for {} statement locally for 
group {} by sending request to local node.", path, groupId);
++        LOGGER.debug(
++            "Execute get series type for {} statement locally for group {} by 
sending request to local node.",
++            path, groupId);
          holder = this.server.getServerId();
        } else {
 -        holder = RaftUtils.getRandomPeerID(groupId);
 +        holder = RaftUtils.getPeerIDInOrder(groupId);
        }
+       task.setTargetNode(holder);
        try {
-         LOGGER.debug("Send get series type for {} task for group {} to node 
{}.", path, groupId, holder);
-         dataType = querySeriesType(task, holder);
++        LOGGER.debug("Send get series type for {} task for group {} to node 
{}.", path, groupId,
++            holder);
+         dataType = querySeriesType(task);
        } catch (RaftConnectionException e) {
 -        throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
 +        boolean success = false;
 +        while (!success) {
 +          PeerId nextNode = null;
 +          try {
 +            nextNode = RaftUtils.getPeerIDInOrder(groupId);
 +            if (holder.equals(nextNode)) {
 +              break;
 +            }
-             LOGGER.debug("Previous task fail, then send get series type for 
{} task for group {} to node {}.", path, groupId, nextNode);
++            LOGGER.debug(
++                "Previous task fail, then send get series type for {} task 
for group {} to node {}.",
++                path, groupId, nextNode);
 +            task.resetTask();
++            task.setTargetNode(nextNode);
 +            task.setTaskState(TaskState.INITIAL);
-             dataType = querySeriesType(task, nextNode);
-             LOGGER.debug("Get series type for {} task for group {} to node {} 
succeed.", path, groupId, nextNode);
++            dataType = querySeriesType(task);
++            LOGGER.debug("Get series type for {} task for group {} to node {} 
succeed.", path,
++                groupId, nextNode);
 +            success = true;
 +          } catch (RaftConnectionException e1) {
-             LOGGER.debug("Get series type for {} task for group {} to node {} 
fail.", path, groupId, nextNode);
++            LOGGER.debug("Get series type for {} task for group {} to node {} 
fail.", path, groupId,
++                nextNode);
 +            continue;
 +          }
 +        }
 +        LOGGER.debug("The final result for get series type for {} task is 
{}", path, success);
 +        if (!success) {
 +          throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
 +        }
        }
      }
      return dataType;
@@@ -389,37 -296,16 +422,46 @@@
      PeerId holder;
      /** Check if the plan can be executed locally. **/
      if (QPExecutorUtils.canHandleQueryByGroupId(groupId)) {
--      LOGGER.debug("Execute get paths for {} statement locally for group {} 
by sending request to local node.", pathList, groupId);
++      LOGGER.debug(
++          "Execute get paths for {} statement locally for group {} by sending 
request to local node.",
++          pathList, groupId);
        holder = this.server.getServerId();
      } else {
 -      holder = RaftUtils.getRandomPeerID(groupId);
 +      holder = RaftUtils.getPeerIDInOrder(groupId);
      }
+     task.setTargetNode(holder);
      try {
-       LOGGER.debug("Send get paths for {} task for group {} to node {}.", 
pathList, groupId, holder);
-       res.addAll(queryPaths(task, holder));
++      LOGGER
++          .debug("Send get paths for {} task for group {} to node {}.", 
pathList, groupId, holder);
+       res.addAll(queryPaths(task));
      } catch (RaftConnectionException e) {
 -      throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
 +      boolean success = false;
 +      while (!success) {
 +        PeerId nextNode = null;
 +        try {
 +          nextNode = RaftUtils.getPeerIDInOrder(groupId);
 +          if (holder.equals(nextNode)) {
 +            break;
 +          }
-           LOGGER.debug("Previous task fail, then send get paths for {} task 
for group {} to node {}.", pathList, groupId, nextNode);
++          LOGGER
++              .debug("Previous task fail, then send get paths for {} task for 
group {} to node {}.",
++                  pathList, groupId, nextNode);
++          task.setTargetNode(nextNode);
 +          task.resetTask();
 +          task.setTaskState(TaskState.INITIAL);
-           res.addAll(queryPaths(task, nextNode));
-           LOGGER.debug("Get paths for {} task for group {} to node {} 
succeed.", pathList, groupId, nextNode);
++          res.addAll(queryPaths(task));
++          LOGGER.debug("Get paths for {} task for group {} to node {} 
succeed.", pathList, groupId,
++              nextNode);
 +          success = true;
 +        } catch (RaftConnectionException e1) {
-           LOGGER.debug("Get paths for {} task for group {} to node {} fail.", 
pathList, groupId, nextNode);
++          LOGGER.debug("Get paths for {} task for group {} to node {} fail.", 
pathList, groupId,
++              nextNode);
 +        }
 +      }
 +      LOGGER.debug("The final result for get paths for {} task is {}", 
pathList, success);
 +      if (!success) {
 +        throw new ProcessorException(RAFT_CONNECTION_ERROR, e);
 +      }
      }
    }
  
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
index 4562d77,f06fa4b..b881549
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java
@@@ -73,13 -68,12 +68,11 @@@ public class BatchQPTask extends MultiQ
  
    private NonQueryExecutor executor;
  
--
-   public BatchQPTask(int taskNum, BatchResult batchResult, Map<String, 
SingleQPTask> taskMap,
+   public BatchQPTask(int taskNum, BatchResult result, Map<String, 
SingleQPTask> taskMap,
        Map<String, List<Integer>> planIndexMap) {
      super(false, taskNum, TaskType.BATCH);
-     this.batchResult = batchResult.getResult();
-     this.isAllSuccessful = batchResult.isAllSuccessful();
-     this.batchErrorMessage = batchResult.getBatchErrorMessage();
+     this.resultArray = result.getResultArray();
+     this.batchResult = result;
      this.taskMap = taskMap;
      this.planIndexMap = planIndexMap;
      this.taskThreadMap = new HashMap<>();
@@@ -123,12 -123,13 +122,13 @@@
        SingleQPTask subTask = entry.getValue();
        Future<?> taskThread;
        if (QPExecutorUtils.canHandleNonQueryByGroupId(groupId)) {
-         taskThread = QPTaskManager.getInstance()
+         taskThread = QPTaskThreadManager.getInstance()
              .submit(() -> executeLocalSubTask(subTask, groupId));
        } else {
 -        PeerId leader = RaftUtils.getLeaderPeerID(groupId);
 +        PeerId leader = RaftUtils.getLocalLeaderPeerID(groupId);
-         taskThread = QPTaskManager.getInstance()
-             .submit(() -> executeRpcSubTask(subTask, leader, groupId));
+         subTask.setTargetNode(leader);
+         taskThread = QPTaskThreadManager.getInstance()
+             .submit(() -> executeRpcSubTask(subTask, groupId));
        }
        taskThreadMap.put(groupId, taskThread);
      }
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/qp/task/DataQueryTask.java
index 3b905d8,3b905d8..f861f55
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/DataQueryTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/DataQueryTask.java
@@@ -18,32 -18,32 +18,12 @@@
   */
  package org.apache.iotdb.cluster.qp.task;
  
--import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
--import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
++import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
  
--public class DataQueryTask {
--  private BasicResponse basicResponse;
--  private TaskState state;
++public class DataQueryTask extends SingleQPTask {
  
--  public DataQueryTask(BasicResponse basicResponse,
--      TaskState state) {
--    this.basicResponse = basicResponse;
--    this.state = state;
--  }
--
--  public BasicResponse getBasicResponse() {
--    return basicResponse;
--  }
--
--  public void setBasicResponse(BasicResponse basicResponse) {
--    this.basicResponse = basicResponse;
--  }
--
--  public TaskState getState() {
--    return state;
--  }
--
--  public void setState(TaskState state) {
--    this.state = state;
++  public DataQueryTask(boolean isSyncTask,
++      BasicRequest request) {
++    super(isSyncTask, request);
    }
  }
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java
index 6653601,0000000..f57c538
mode 100644,000000..100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/ClusterRpcQueryManager.java
@@@ -1,111 -1,0 +1,123 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.iotdb.cluster.query.manager.coordinatornode;
 +
 +import com.alipay.sofa.jraft.util.OnlyForTest;
 +import java.util.HashMap;
++import java.util.Iterator;
 +import java.util.Map;
++import java.util.Map.Entry;
 +import java.util.concurrent.ConcurrentHashMap;
 +import org.apache.iotdb.cluster.config.ClusterConfig;
 +import org.apache.iotdb.cluster.config.ClusterDescriptor;
 +import org.apache.iotdb.cluster.exception.RaftConnectionException;
 +import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 +
 +public class ClusterRpcQueryManager implements IClusterRpcQueryManager {
 +
 +  /**
 +   * Key is job id, value is task id.
 +   */
 +  private static final ConcurrentHashMap<Long, String> JOB_ID_MAP_TASK_ID = 
new ConcurrentHashMap<>();
 +
 +  /**
 +   * Key is task id, value is manager of a client query.
 +   */
 +  private static final ConcurrentHashMap<String, 
ClusterRpcSingleQueryManager> SINGLE_QUERY_MANAGER_MAP = new 
ConcurrentHashMap<>();
 +
 +  private static final ClusterConfig CLUSTER_CONFIG = 
ClusterDescriptor.getInstance().getConfig();
 +
 +  /**
 +   * Local address
 +   */
 +  private static final String LOCAL_ADDR = String
 +      .format("%s:%d", CLUSTER_CONFIG.getIp(), CLUSTER_CONFIG.getPort());
 +
 +  @Override
 +  public void addSingleQuery(long jobId, QueryPlan physicalPlan) {
 +    String taskId = createTaskId(jobId);
 +    JOB_ID_MAP_TASK_ID.put(jobId, taskId);
 +    SINGLE_QUERY_MANAGER_MAP.put(taskId, new 
ClusterRpcSingleQueryManager(taskId, physicalPlan));
 +  }
 +
 +  @Override
 +  public String createTaskId(long jobId) {
 +    return String.format("%s:%d", LOCAL_ADDR, jobId);
 +  }
 +
 +  @Override
 +  public ClusterRpcSingleQueryManager getSingleQuery(long jobId) {
 +    return SINGLE_QUERY_MANAGER_MAP.get(JOB_ID_MAP_TASK_ID.get(jobId));
 +  }
 +
 +  @Override
 +  public ClusterRpcSingleQueryManager getSingleQuery(String taskId) {
 +    return SINGLE_QUERY_MANAGER_MAP.get(taskId);
 +  }
 +
 +  @Override
 +  public void releaseQueryResource(long jobId) throws RaftConnectionException 
{
 +    if (JOB_ID_MAP_TASK_ID.containsKey(jobId)) {
 +      
SINGLE_QUERY_MANAGER_MAP.remove(JOB_ID_MAP_TASK_ID.remove(jobId)).releaseQueryResource();
 +    }
 +  }
 +
 +  @Override
 +  public Map<String, Integer> getAllReadUsage() {
 +    Map<String, Integer> readerUsageMap = new HashMap<>();
 +    SINGLE_QUERY_MANAGER_MAP.values().forEach(singleQueryManager -> {
 +      for (String groupId : singleQueryManager.getDataGroupUsage()) {
 +        readerUsageMap.put(groupId, readerUsageMap.getOrDefault(groupId, 0) + 
1);
 +      }
 +    });
 +    return readerUsageMap;
 +  }
 +
++  @Override
++  public void close() throws RaftConnectionException {
++    Iterator<Map.Entry<String, ClusterRpcSingleQueryManager>> iterator = 
SINGLE_QUERY_MANAGER_MAP.entrySet().iterator();
++    while(iterator.hasNext()){
++      Entry<String, ClusterRpcSingleQueryManager> entry = iterator.next();
++      entry.getValue().releaseQueryResource();
++      iterator.remove();
++    }
++  }
++
 +  @OnlyForTest
 +  public static ConcurrentHashMap<Long, String> getJobIdMapTaskId() {
 +    return JOB_ID_MAP_TASK_ID;
 +  }
 +
 +  private ClusterRpcQueryManager() {
 +  }
 +
 +  public static final ClusterRpcQueryManager getInstance() {
 +    return ClusterRpcQueryManagerHolder.INSTANCE;
 +  }
 +
 +  private static class ClusterRpcQueryManagerHolder {
 +
 +    private static final ClusterRpcQueryManager INSTANCE = new 
ClusterRpcQueryManager();
 +
 +    private ClusterRpcQueryManagerHolder() {
 +
 +    }
 +  }
 +
 +}
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcQueryManager.java
index b8e4f5d,0000000..0917631
mode 100644,000000..100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcQueryManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/coordinatornode/IClusterRpcQueryManager.java
@@@ -1,69 -1,0 +1,74 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.iotdb.cluster.query.manager.coordinatornode;
 +
 +import java.util.Map;
 +import org.apache.iotdb.cluster.exception.RaftConnectionException;
 +import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
 +
 +/**
 + * Manage all query series reader resources which fetch data from remote 
query nodes in coordinator
 + * node
 + */
 +public interface IClusterRpcQueryManager {
 +
 +  /**
 +   * Add a query
 +   *
 +   * @param jobId job id assigned by QueryResourceManager
 +   * @param physicalPlan physical plan
 +   */
 +  void addSingleQuery(long jobId, QueryPlan physicalPlan);
 +
 +  /**
 +   * Get full task id (local address + job id)
 +   */
 +  String createTaskId(long jobId);
 +
 +  /**
 +   * Get query manager by jobId
 +   *
 +   * @param jobId job id assigned by QueryResourceManager
 +   */
 +  ClusterRpcSingleQueryManager getSingleQuery(long jobId);
 +
 +  /**
 +   * Get query manager by taskId
 +   *
 +   * @param taskId task id assigned by getAndIncreaTaskId() method
 +   */
 +  ClusterRpcSingleQueryManager getSingleQuery(String taskId);
 +
 +  /**
 +   * Release query resource
 +   *
 +   * @param jobId job id
 +   */
 +  void releaseQueryResource(long jobId) throws RaftConnectionException;
 +
 +  /**
 +   * Get all read usage count group by data group id, key is group id, value 
is usage count
 +   */
 +  Map<String, Integer> getAllReadUsage();
++
++  /**
++   * Close manager
++   */
++  void close() throws RaftConnectionException;
 +}
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
index 4e09af8,0000000..a602c84
mode 100644,000000..100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/ClusterLocalQueryManager.java
@@@ -1,125 -1,0 +1,138 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.iotdb.cluster.query.manager.querynode;
 +
 +import com.alipay.sofa.jraft.util.OnlyForTest;
 +import java.io.IOException;
 +import java.util.HashMap;
++import java.util.Iterator;
 +import java.util.Map;
++import java.util.Map.Entry;
 +import java.util.concurrent.ConcurrentHashMap;
++import 
org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
 +import 
org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
 +import 
org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
 +import 
org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
 +import 
org.apache.iotdb.cluster.rpc.raft.response.querydata.InitSeriesReaderResponse;
 +import 
org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataByTimestampResponse;
 +import 
org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse;
 +import org.apache.iotdb.db.exception.FileNodeManagerException;
 +import org.apache.iotdb.db.exception.PathErrorException;
 +import org.apache.iotdb.db.exception.ProcessorException;
 +import org.apache.iotdb.db.query.control.QueryResourceManager;
 +import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 +
 +public class ClusterLocalQueryManager implements IClusterLocalQueryManager {
 +
 +  /**
 +   * Key is task id which is assigned by coordinator node, value is job id 
which is assigned by
 +   * query node(local).
 +   */
 +  private static final ConcurrentHashMap<String, Long> TASK_ID_MAP_JOB_ID = 
new ConcurrentHashMap<>();
 +
 +  /**
 +   * Key is job id, value is manager of a client query.
 +   */
 +  private static final ConcurrentHashMap<Long, 
ClusterLocalSingleQueryManager> SINGLE_QUERY_MANAGER_MAP = new 
ConcurrentHashMap<>();
 +
 +  private ClusterLocalQueryManager() {
 +  }
 +
 +  @Override
 +  public InitSeriesReaderResponse createQueryDataSet(InitSeriesReaderRequest 
request)
 +      throws IOException, FileNodeManagerException, PathErrorException, 
ProcessorException, QueryFilterOptimizationException, ClassNotFoundException {
 +    long jobId = QueryResourceManager.getInstance().assignJobId();
 +    String taskId = request.getTaskId();
 +    TASK_ID_MAP_JOB_ID.put(taskId, jobId);
 +    ClusterLocalSingleQueryManager localQueryManager = new 
ClusterLocalSingleQueryManager(jobId);
 +    SINGLE_QUERY_MANAGER_MAP.put(jobId, localQueryManager);
 +    return localQueryManager.createSeriesReader(request);
 +  }
 +
 +  @Override
 +  public QuerySeriesDataResponse readBatchData(QuerySeriesDataRequest request)
 +      throws IOException {
 +    long jobId = TASK_ID_MAP_JOB_ID.get(request.getTaskId());
 +    return SINGLE_QUERY_MANAGER_MAP.get(jobId).readBatchData(request);
 +  }
 +
 +  @Override
 +  public QuerySeriesDataByTimestampResponse readBatchDataByTimestamp(
 +      QuerySeriesDataByTimestampRequest request)
 +      throws IOException {
 +    long jobId = TASK_ID_MAP_JOB_ID.get(request.getTaskId());
 +    return 
SINGLE_QUERY_MANAGER_MAP.get(jobId).readBatchDataByTimestamp(request);
 +  }
 +
 +  @Override
 +  public void close(String taskId) throws FileNodeManagerException {
 +    if (TASK_ID_MAP_JOB_ID.containsKey(taskId)) {
 +      
SINGLE_QUERY_MANAGER_MAP.remove(TASK_ID_MAP_JOB_ID.remove(taskId)).close();
 +    }
 +  }
 +
 +  @Override
 +  public ClusterLocalSingleQueryManager getSingleQuery(String taskId) {
 +    long jobId = TASK_ID_MAP_JOB_ID.get(taskId);
 +    return SINGLE_QUERY_MANAGER_MAP.get(jobId);
 +  }
 +
 +  public static final ClusterLocalQueryManager getInstance() {
 +    return ClusterLocalQueryManager.ClusterLocalQueryManagerHolder.INSTANCE;
 +  }
 +
 +  private static class ClusterLocalQueryManagerHolder {
 +
 +    private static final ClusterLocalQueryManager INSTANCE = new 
ClusterLocalQueryManager();
 +
 +    private ClusterLocalQueryManagerHolder() {
 +
 +    }
 +  }
 +
 +  @Override
 +  public Map<String, Integer> getAllReadUsage() {
 +    Map<String, Integer> readerUsageMap = new HashMap<>();
 +    SINGLE_QUERY_MANAGER_MAP.values().forEach(singleQueryManager -> {
 +      String groupId = singleQueryManager.getGroupId();
 +      readerUsageMap.put(groupId, readerUsageMap.getOrDefault(groupId, 0) + 
1);
 +    });
 +    return readerUsageMap;
 +  }
 +
++  @Override
++  public void close() throws FileNodeManagerException {
++    Iterator<Entry<Long, ClusterLocalSingleQueryManager>> iterator = 
SINGLE_QUERY_MANAGER_MAP.entrySet().iterator();
++    while(iterator.hasNext()){
++      Entry<Long, ClusterLocalSingleQueryManager> entry = iterator.next();
++      entry.getValue().close();
++      iterator.remove();
++    }
++  }
++
 +  @OnlyForTest
 +  public static ConcurrentHashMap<String, Long> getTaskIdMapJobId() {
 +    return TASK_ID_MAP_JOB_ID;
 +  }
 +
 +  @OnlyForTest
 +  public static ConcurrentHashMap<Long, ClusterLocalSingleQueryManager> 
getSingleQueryManagerMap() {
 +    return SINGLE_QUERY_MANAGER_MAP;
 +  }
 +}
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java
index 1105bb2,0000000..42374d5
mode 100644,000000..100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/manager/querynode/IClusterLocalQueryManager.java
@@@ -1,82 -1,0 +1,87 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.iotdb.cluster.query.manager.querynode;
 +
 +import java.io.IOException;
 +import java.util.Map;
 +import 
org.apache.iotdb.cluster.rpc.raft.request.querydata.InitSeriesReaderRequest;
 +import 
org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest;
 +import 
org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest;
 +import 
org.apache.iotdb.cluster.rpc.raft.response.querydata.InitSeriesReaderResponse;
 +import 
org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataByTimestampResponse;
 +import 
org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse;
 +import org.apache.iotdb.db.exception.FileNodeManagerException;
 +import org.apache.iotdb.db.exception.PathErrorException;
 +import org.apache.iotdb.db.exception.ProcessorException;
 +import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 +
 +/**
 + * Manage all local query resources which provide data for coordinator node 
in cluster query node.
 + */
 +public interface IClusterLocalQueryManager {
 +
 +  /**
 +   * Initially create query data set for coordinator node.
 +   *
 +   * @param request request for query data from coordinator node
 +   */
 +  InitSeriesReaderResponse createQueryDataSet(InitSeriesReaderRequest request)
 +      throws IOException, FileNodeManagerException, PathErrorException, 
ProcessorException, QueryFilterOptimizationException, ClassNotFoundException;
 +
 +  /**
 +   * Read batch data of all querying series in request and set response.
 +   *
 +   * @param request request of querying series
 +   */
 +  QuerySeriesDataResponse readBatchData(QuerySeriesDataRequest request)
 +      throws IOException;
 +
 +  /**
 +   * Read batch data of select series by batch timestamp which is used in 
query with value filter
 +   *
 +   * @param request request of querying select paths
 +   */
 +  QuerySeriesDataByTimestampResponse readBatchDataByTimestamp(
 +      QuerySeriesDataByTimestampRequest request) throws IOException;
 +
 +  /**
 +   * Close query resource of a task
 +   *
 +   * @param taskId task id of local single query manager
 +   */
 +  void close(String taskId) throws FileNodeManagerException;
 +
 +
 +  /**
 +   * Get query manager by taskId
 +   *
 +   * @param taskId task id assigned by ClusterRpcQueryManager
 +   */
 +  ClusterLocalSingleQueryManager getSingleQuery(String taskId);
 +
 +  /**
 +   * Get all read usage count group by data group id, key is group id, value 
is usage count
 +   */
 +  Map<String, Integer> getAllReadUsage();
++
++  /**
++   * Close manager
++   */
++  void close() throws FileNodeManagerException;
 +}
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
index 75c2381,0000000..bd61375
mode 100644,000000..100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
@@@ -1,103 -1,0 +1,113 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.iotdb.cluster.query.utils;
 +
 +import com.alipay.sofa.jraft.entity.PeerId;
 +import java.util.List;
 +import org.apache.iotdb.cluster.config.ClusterDescriptor;
 +import org.apache.iotdb.cluster.entity.Server;
 +import org.apache.iotdb.cluster.exception.RaftConnectionException;
- import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
 +import org.apache.iotdb.cluster.qp.task.DataQueryTask;
++import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
 +import 
org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
- import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
++import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
 +import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 +import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 +import org.apache.iotdb.cluster.utils.RaftUtils;
 +import org.apache.iotdb.cluster.utils.hash.Router;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +/**
 + * Utils for cluster reader which needs to acquire data from remote query 
node.
 + */
 +public class ClusterRpcReaderUtils {
 +
 +  private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterRpcReaderUtils.class);
 +
 +  /**
 +   * Count limit to redo a task
 +   */
 +  private static final int TASK_MAX_RETRY = 
ClusterDescriptor.getInstance().getConfig()
 +      .getQpTaskRedoCount();
 +
 +  private ClusterRpcReaderUtils() {
 +  }
 +
 +  /**
 +   * Create cluster series reader
 +   */
 +  public static BasicResponse createClusterSeriesReader(String groupId, 
BasicRequest request,
 +      ClusterRpcSingleQueryManager manager)
 +      throws RaftConnectionException {
 +
 +    List<PeerId> peerIdList = RaftUtils
 +        .getPeerIDList(groupId, Server.getInstance(), Router.getInstance());
 +    int randomPeerIndex = RaftUtils.getRandomInt(peerIdList.size());
 +    BasicResponse response;
 +    for (int i = 0; i < peerIdList.size(); i++) {
 +      PeerId peerId = peerIdList.get((i + randomPeerIndex) % 
peerIdList.size());
 +      try {
 +        response = handleQueryRequest(request, peerId, 0);
 +        manager.setQueryNode(groupId, peerId);
 +        LOGGER.debug("Init series reader in Node<{}> of group<{}> success.", 
peerId, groupId);
 +        return response;
 +      } catch (RaftConnectionException e) {
 +        LOGGER.debug("Can not init series reader in Node<{}> of group<{}>", 
peerId, groupId, e);
 +      }
 +    }
 +    throw new RaftConnectionException(
 +        String.format("Can not init series reader in all nodes of 
group<%s>.", groupId));
 +  }
 +
 +  /**
 +   * Send query request to remote node and return response
 +   *
 +   * @param request query request
 +   * @param peerId target remote query node
 +   * @param taskRetryNum retry num of the request
 +   * @return Response from remote query node
 +   */
 +  public static BasicResponse handleQueryRequest(BasicRequest request, PeerId 
peerId,
 +      int taskRetryNum)
 +      throws RaftConnectionException {
 +    if (taskRetryNum > TASK_MAX_RETRY) {
 +      throw new RaftConnectionException(
 +          String.format("Query request retries reach the upper bound %s",
 +              TASK_MAX_RETRY));
 +    }
-     NodeAsClient nodeAsClient = RaftUtils.getRaftNodeAsClient();
-     DataQueryTask dataQueryTask = nodeAsClient.syncHandleRequest(request, 
peerId);
-     if (dataQueryTask.getState() == TaskState.FINISH) {
-       return dataQueryTask.getBasicResponse();
++    DataQueryTask dataQueryTask = new DataQueryTask(true, request);
++    dataQueryTask.setTargetNode(peerId);
++    RaftNodeAsClientManager.getInstance().produceQPTask(dataQueryTask);
++    try {
++      dataQueryTask.await();
++    } catch (InterruptedException e) {
++      throw new RaftConnectionException(
++          String.format("Can not connect to remote node {%s} for query", 
peerId));
++    }
++    if (dataQueryTask.getTaskState() == TaskState.RAFT_CONNECTION_EXCEPTION) {
++      throw new RaftConnectionException(
++          String.format("Can not connect to remote node {%s} for query", 
peerId));
++    } else if (dataQueryTask.getTaskState() == TaskState.FINISH) {
++      return dataQueryTask.getResponse();
 +    } else {
 +      return handleQueryRequest(request, peerId, taskRetryNum + 1);
 +    }
 +  }
 +}
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
index 994fc07,6257fbd..d0690cd
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
@@@ -31,20 -31,18 +31,10 @@@ public interface NodeAsClient 
  
    /**
     * Asynchronous processing requests
-    *
-    * @param leader leader node of the target group
     * @param qpTask single QPTask to be executed
     */
-   void asyncHandleRequest(BasicRequest request, PeerId leader,
-       SingleQPTask qpTask) throws RaftConnectionException;
- 
-   /**
-    * Synchronous processing requests
-    *
-    * @param peerId leader node of the target group
-    */
-   DataQueryTask syncHandleRequest(BasicRequest request, PeerId peerId);
+   void asyncHandleRequest(SingleQPTask qpTask) throws RaftConnectionException;
  
 -//  /**
 -//   * Synchronous processing requests
 -//   * @param peerId leader node of the target group
 -//   *
 -//   */
 -//  DataQueryTask syncHandleRequest(BasicRequest request, PeerId peerId)
 -//      throws RaftConnectionException;
 -
    /**
     * Shut down client
     */
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index 1d32fd5,e96da99..351eece
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@@ -101,6 -90,18 +90,19 @@@ public class RaftNodeAsClientManager 
  
    public void init() {
      isShuttingDown = false;
++    taskQueue.clear();
+     for (int i = 0; i < CLUSTER_CONFIG.getConcurrentInnerRpcClientThread(); 
i++) {
+       THREAD_POOL_MANAGER.execute(() -> {
+         RaftNodeAsClient client = new RaftNodeAsClient();
+         while (true) {
+           consumeQPTask(client);
+           if (Thread.currentThread().isInterrupted()) {
+             break;
+           }
+         }
+         client.shutdown();
+       });
+     }
    }
  
    /**
@@@ -135,46 -124,30 +125,33 @@@
      }
    }
  
-   private void checkShuttingDown() throws RaftConnectionException {
-     if (isShuttingDown) {
-       throw new RaftConnectionException(
-           "Reject to provide RaftNodeAsClient client because cluster system 
is shutting down");
-     }
-   }
- 
-   /**
-    * No-safe method, get client
-    */
-   private RaftNodeAsClient getClient() {
-     if (clientList.isEmpty()) {
-       return new RaftNodeAsClient();
-     } else {
-       return clientList.removeFirst();
-     }
-   }
 +
    /**
-    * Release usage of a client
+    * Consume qp task
     */
-   public void releaseClient(RaftNodeAsClient client) {
+   private void consumeQPTask(RaftNodeAsClient client) {
      resourceLock.lock();
      try {
-       clientNumInUse.decrementAndGet();
-       resourceCondition.signalAll();
-       clientList.addLast(client);
+       while (taskQueue.isEmpty()) {
+         if (Thread.currentThread().isInterrupted()) {
+           return;
+         }
+         resourceCondition.await();
+       }
+       client.asyncHandleRequest(taskQueue.removeFirst());
+     } catch (InterruptedException e) {
 -      LOGGER.error("An error occurred when await for ResourceContidion", e);
++      Thread.currentThread().interrupt();
++      LOGGER.debug("Occur interruption when await for ResourceContidion", e);
      } finally {
        resourceLock.unlock();
      }
    }
  
-   public void shutdown() throws InterruptedException {
-     isShuttingDown = true;
-     while (clientNumInUse.get() != 0 && queueClientNum != 0) {
-       // wait until releasing all usage of clients.
-       resourceCondition.await();
-     }
-     while (!clientList.isEmpty()) {
-       clientList.removeFirst().shutdown();
++
+   private void checkShuttingDown() throws RaftConnectionException {
+     if (isShuttingDown) {
+       throw new RaftConnectionException(
+           "Reject to execute QPTask because cluster system is shutting down");
      }
    }
  
diff --cc 
cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/DataGroupNonQueryAsyncProcessor.java
index c331714,44f42a7..291da32
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/DataGroupNonQueryAsyncProcessor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/processor/nonquery/DataGroupNonQueryAsyncProcessor.java
@@@ -58,7 -58,7 +58,8 @@@ public class DataGroupNonQueryAsyncProc
      } else {
        LOGGER.debug("Apply task to raft node");
  
-       /** Apply Task to Raft Node **/
++
+       /* Apply Task to Raft Node */
        BasicResponse response = 
DataGroupNonQueryResponse.createEmptyResponse(groupId);
        RaftService service = (RaftService) 
dataPartitionRaftHolder.getService();
        RaftUtils.executeRaftTaskForRpcProcessor(service, asyncContext, 
request, response);
diff --cc cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 601b902,aa2b46a..673fd12
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@@ -54,23 -54,16 +54,22 @@@ import org.apache.iotdb.cluster.excepti
  import org.apache.iotdb.cluster.qp.task.QPTask;
  import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
  import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 -import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
 +import 
org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcQueryManager;
- import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
  import org.apache.iotdb.cluster.rpc.raft.closure.ResponseClosure;
  import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
  import org.apache.iotdb.cluster.rpc.raft.request.BasicNonQueryRequest;
  import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 -import org.apache.iotdb.cluster.rpc.raft.request.QueryMetricRequest;
 +import 
org.apache.iotdb.cluster.rpc.raft.request.querymetric.QueryJobNumRequest;
 +import 
org.apache.iotdb.cluster.rpc.raft.request.querymetric.QueryLeaderRequest;
 +import 
org.apache.iotdb.cluster.rpc.raft.request.querymetric.QueryMetricRequest;
 +import 
org.apache.iotdb.cluster.rpc.raft.request.querymetric.QueryStatusRequest;
  import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 -import org.apache.iotdb.cluster.rpc.raft.response.QueryMetricResponse;
+ import 
org.apache.iotdb.cluster.rpc.raft.response.nonquery.DataGroupNonQueryResponse;
+ import 
org.apache.iotdb.cluster.rpc.raft.response.nonquery.MetaGroupNonQueryResponse;
 +import 
org.apache.iotdb.cluster.rpc.raft.response.querymetric.QueryJobNumResponse;
 +import 
org.apache.iotdb.cluster.rpc.raft.response.querymetric.QueryLeaderResponse;
 +import 
org.apache.iotdb.cluster.rpc.raft.response.querymetric.QueryMetricResponse;
- import 
org.apache.iotdb.cluster.rpc.raft.response.nonquery.DataGroupNonQueryResponse;
- import 
org.apache.iotdb.cluster.rpc.raft.response.nonquery.MetaGroupNonQueryResponse;
 +import 
org.apache.iotdb.cluster.rpc.raft.response.querymetric.QueryStatusResponse;
  import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
  import org.apache.iotdb.cluster.utils.hash.Router;
  import org.apache.iotdb.cluster.utils.hash.VirtualNode;
@@@ -94,9 -85,9 +93,9 @@@ public class RaftUtils 
  
    /**
     * The cache will be update in two case: 1. When @onLeaderStart() method of 
state machine is
-    * called, the cache will be update. 2. When @getLocalLeaderPeerID() in 
this class is called and cache
 -   * called, the cache will be update. 2. When @getLeaderPeerID() in this 
class is called and cache
--   * don't have the key, it's will get random peer and update. 3. When 
@redirected of BasicRequest
--   * is true, the task will be retry and the cache will update.
++   * called, the cache will be update. 2. When @getLocalLeaderPeerID() in 
this class is called and
++   * cache don't have the key, it's will get random peer and update. 3. When 
@redirected of
++   * BasicRequest is true, the task will be retry and the cache will update.
     */
    private static final ConcurrentHashMap<String, PeerId> groupLeaderCache = 
new ConcurrentHashMap<>();
  
@@@ -144,44 -100,8 +143,43 @@@
     *
     * @return leader id
     */
 -  public static PeerId getLeaderPeerID(String groupId) {
 -    return groupLeaderCache.computeIfAbsent(groupId, 
RaftUtils::getRandomPeerID);
 +  public static PeerId getLocalLeaderPeerID(String groupId) {
 +    if (!groupLeaderCache.containsKey(groupId)) {
 +      PeerId randomPeerId = getRandomPeerID(groupId);
 +      groupLeaderCache.put(groupId, randomPeerId);
 +    }
 +    PeerId leader = groupLeaderCache.get(groupId);
 +    LOGGER.debug("Get local cached leader {} of group {}.", leader, groupId);
 +    return leader;
 +  }
 +
 +  /**
 +   * Get peer id to send request. If groupLeaderCache has the group id, then 
return leader id of the
 +   * group.Otherwise, random get a peer of the group.
 +   *
 +   * @return leader id
 +   */
 +  public static PeerId getLeaderPeerIDFromRemoteNode(PeerId peerId, String 
groupId) {
 +    QueryLeaderRequest request = new QueryLeaderRequest(groupId);
 +    SingleQPTask task = new SingleQPTask(false, request);
- 
++    task.setTargetNode(peerId);
 +    LOGGER.debug("Execute get leader of group {} from node {}.", groupId, 
peerId);
 +    try {
-       NodeAsClient client = 
RaftNodeAsClientManager.getInstance().getRaftNodeAsClient();
-       /** Call async method **/
-       client.asyncHandleRequest(task.getRequest(), peerId, task);
++      CLIENT_MANAGER.produceQPTask(task);
 +
 +      task.await();
 +      PeerId leader = null;
 +      if (task.getTaskState() == TaskState.FINISH) {
 +        BasicResponse response = task.getResponse();
 +        leader = response == null ? null : ((QueryLeaderResponse) 
response).getLeader();
 +      }
 +      LOGGER.debug("Get leader {} of group {} from node {}.", leader, 
groupId, peerId);
 +      return leader;
 +    } catch (RaftConnectionException | InterruptedException e) {
-       LOGGER.error("Fail to get leader of group {} from remote node {} 
because of {}.", groupId, peerId, e.getMessage());
++      LOGGER.error("Fail to get leader of group {} from remote node {} 
because of {}.", groupId,
++          peerId, e.getMessage());
 +      return null;
 +    }
    }
  
    /**
@@@ -493,23 -402,7 +491,24 @@@
        groupId = router.getGroupID(group);
        nodes = getPeerIdArrayFrom(group);
      }
 -    PeerId leader = RaftUtils.getLeaderPeerID(groupId);
 +
 +    PeerId leader = null;
 +    for (PeerId node : nodes) {
 +      LOGGER.debug("Try to get leader of group {} from node {}.", groupId, 
node);
 +      leader = getLeaderPeerIDFromRemoteNode(node, groupId);
 +      LOGGER.debug("Get leader {} of group {} from node {}.", leader, 
groupId, node);
 +      if (leader != null) {
 +        break;
 +      }
 +    }
 +
 +    if (leader == null) {
-       LOGGER.debug("Fail to get leader of group {} from all remote nodes, get 
it locally.", groupId);
++      LOGGER
++          .debug("Fail to get leader of group {} from all remote nodes, get 
it locally.", groupId);
 +      leader = RaftUtils.getLocalLeaderPeerID(groupId);
 +      LOGGER.debug("Get leader {} of group {} locally.", leader, groupId);
 +    }
 +
      for (int i = 0; i < nodes.length; i++) {
        if (leader.equals(nodes[i])) {
          PeerId t = nodes[i];
@@@ -657,12 -551,11 +656,11 @@@
      SingleQPTask task = new SingleQPTask(false, request);
  
      LOGGER.debug("Execute get metric for {} statement for group {}.", metric, 
groupId);
 -    PeerId holder = RaftUtils.getLeaderPeerID(groupId);
 -    task.setTargetNode(holder);
 +    PeerId holder = RaftUtils.getLocalLeaderPeerID(groupId);
      LOGGER.debug("Get metric from node {}.", holder);
++    task.setTargetNode(holder);
      try {
-       NodeAsClient client = 
RaftNodeAsClientManager.getInstance().getRaftNodeAsClient();
-       /** Call async method **/
-       client.asyncHandleRequest(task.getRequest(), holder, task);
+       CLIENT_MANAGER.produceQPTask(task);
  
        task.await();
        Map<String, Long> value = null;
@@@ -676,100 -569,4 +674,82 @@@
        return null;
      }
    }
 +
 +  /**
 +   * Get query job number running on each data partition for all nodes
 +   *
 +   * @return outer key: ip, inner key: groupId, value: number of query jobs
 +   */
 +  public static Map<String, Map<String, Integer>> 
getQueryJobNumMapForCluster() {
 +    PeerId[] peerIds = 
RaftUtils.convertStringArrayToPeerIdArray(config.getNodes());
 +    Map<String, Map<String, Integer>> res = new HashMap<>();
 +    for (int i = 0; i < peerIds.length; i++) {
 +      PeerId peerId = peerIds[i];
 +      res.put(peerId.getIp(), getQueryJobNumMapFromRemoteNode(peerId));
 +    }
 +
 +    return res;
 +  }
 +
 +  public static Map<String, Integer> getLocalQueryJobNumMap() {
 +    return ClusterRpcQueryManager.getInstance().getAllReadUsage();
 +  }
 +
 +  private static Map<String, Integer> getQueryJobNumMapFromRemoteNode(PeerId 
peerId) {
 +    QueryJobNumRequest request = new QueryJobNumRequest("");
 +    SingleQPTask task = new SingleQPTask(false, request);
- 
++    task.setTargetNode(peerId);
 +    LOGGER.debug("Execute get query job num map for node {}.", peerId);
 +    try {
-       NodeAsClient client = 
RaftNodeAsClientManager.getInstance().getRaftNodeAsClient();
-       /** Call async method **/
-       client.asyncHandleRequest(task.getRequest(), peerId, task);
++      CLIENT_MANAGER.produceQPTask(task);
 +
 +      task.await();
 +      Map<String, Integer> value = null;
 +      if (task.getTaskState() == TaskState.FINISH) {
 +        BasicResponse response = task.getResponse();
 +        value = response == null ? null : ((QueryJobNumResponse) 
response).getValue();
 +      }
 +      return value;
 +    } catch (RaftConnectionException | InterruptedException e) {
 +      LOGGER.error("Fail to get query job num map from remote node {} because 
of {}.", peerId, e);
 +      return null;
 +    }
 +  }
 +
 +  /**
 +   * Get status of each node in cluster
 +   *
 +   * @return key: node ip, value: live or not
 +   */
 +  public static Map<String, Boolean> getStatusMapForCluster() {
 +    PeerId[] peerIds = 
RaftUtils.convertStringArrayToPeerIdArray(config.getNodes());
 +    Map<String, Boolean> res = new HashMap<>();
 +    for (int i = 0; i < peerIds.length; i++) {
 +      PeerId peerId = peerIds[i];
 +      res.put(peerId.getIp(), getStatusOfNode(peerId));
 +    }
 +
 +    return res;
 +  }
 +
 +  private static boolean getStatusOfNode(PeerId peerId) {
 +    QueryStatusRequest request = new QueryStatusRequest("");
 +    SingleQPTask task = new SingleQPTask(false, request);
- 
++    task.setTargetNode(peerId);
 +    LOGGER.debug("Execute get status for node {}.", peerId);
 +    try {
-       NodeAsClient client = 
RaftNodeAsClientManager.getInstance().getRaftNodeAsClient();
-       /** Call async method **/
-       client.asyncHandleRequest(task.getRequest(), peerId, task);
++      CLIENT_MANAGER.produceQPTask(task);
 +
 +      task.await();
 +      boolean status = false;
 +      if (task.getTaskState() == TaskState.FINISH) {
 +        BasicResponse response = task.getResponse();
 +        status = response == null ? null : ((QueryStatusResponse) 
response).getStatus();
 +      }
 +      return status;
 +    } catch (RaftConnectionException | InterruptedException e) {
 +      LOGGER.error("Fail to get status from remote node {} because of {}.", 
peerId, e);
 +      return false;
 +    }
 +  }
- 
-   /**
-    * try to get raft rpc client
-    */
-   public static NodeAsClient getRaftNodeAsClient() throws 
RaftConnectionException {
-     NodeAsClient client = CLIENT_MANAGER.getRaftNodeAsClient();
-     if (client == null) {
-       throw new RaftConnectionException(String
-           .format("Raft inner rpc clients have reached the max numbers %s",
-               CLUSTER_CONFIG.getMaxNumOfInnerRpcClient() + CLUSTER_CONFIG
-                   .getMaxQueueNumOfInnerRpcClient()));
-     }
-     return client;
-   }
  }
diff --cc 
cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBMetadataFetchAbstract.java
index d0f371f,be24b4d..4c4c78b
--- 
a/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBMetadataFetchAbstract.java
+++ 
b/cluster/src/test/java/org/apache/iotdb/cluster/integration/IoTDBMetadataFetchAbstract.java
@@@ -110,8 -108,8 +108,6 @@@ public abstract class IoTDBMetadataFetc
          "show timeseries root.ln.*.wt01.*", // seriesPath with stars
          "show timeseries root.a.b", // nonexistent timeseries, thus returning 
""
          "show timeseries root.ln,root.ln",
--        // SHOW TIMESERIES <PATH> only accept single seriesPath, thus
--        // returning ""
      };
      String[] standards = new String[]{
          "root.ln.wf01.wt01.status,root.ln.wf01,BOOLEAN,PLAIN,\n",
@@@ -355,7 -358,8 +356,9 @@@
              + "root.ln.wf02,\n"
              + "root.ln.wf01,\n"
              + "root.ln.wf05,\n";
-     ResultSet resultSet = 
databaseMetaData.getColumns(Constant.CATALOG_STORAGE_GROUP, null, null, null);
++
+     ResultSet resultSet = databaseMetaData
+         .getColumns(Constant.CATALOG_STORAGE_GROUP, null, null, null);
      checkCorrectness(resultSet, standard);
    }
  
diff --cc cluster/src/test/java/org/apache/iotdb/cluster/utils/Utils.java
index 0bb3c08,f080a9d..8800415
--- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/Utils.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/Utils.java
@@@ -26,11 -26,6 +26,10 @@@ import java.sql.SQLException
  import java.sql.Statement;
  
  public class Utils {
 +
 +  private Utils() {
 +
 +  }
- 
    public static String getCurrentPath(String... command) throws IOException {
      ProcessBuilder builder = new ProcessBuilder(command);
      builder.redirectErrorStream(true);
diff --cc 
cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/MD5HashTest.java
index cd488d1,5af917b..129d1ac
--- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/MD5HashTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/MD5HashTest.java
@@@ -18,10 -18,9 +18,10 @@@
   */
  package org.apache.iotdb.cluster.utils.hash;
  
- import static org.junit.Assert.*;
++
+ import static org.junit.Assert.assertEquals;
  
  import java.util.concurrent.CountDownLatch;
- import org.apache.iotdb.cluster.utils.hash.MD5Hash;
  import org.junit.After;
  import org.junit.Before;
  import org.junit.Test;
diff --cc docs/Documentation/UserGuideV0.7.0/7-Tools-NodeTool.md
index a442bf5,a7113b6..011271d
--- a/docs/Documentation/UserGuideV0.7.0/7-Tools-NodeTool.md
+++ b/docs/Documentation/UserGuideV0.7.0/7-Tools-NodeTool.md
@@@ -285,50 -285,8 +285,50 @@@ The Windows system startup commands ar
  After using the command, the successful output will be as follows: 
        
  ```
 -data-group-0  ->      1
 -data-group-3  ->      3
 -Total ->      4
 +192.168.130.14:
 +  data-group-0  ->   1
 +  data-group-1  ->   3
 +192.168.130.16:
 +  data-group-2  ->   2
 +  data-group-1  ->   0
 +192.168.130.18:
 +  data-group-0  ->   0
 +  data-group-2  ->   1
 +Total  ->   7
  ```
 -The above output indicates that node 192.168.130.14 contains 2 data 
partitions and 4 query tasks are running on it, wherein 1 query tasks is 
running on data partition data-group-0, and 3 query tasks are running on data 
partition data-group-1.
 +The above output indicates that 7 query tasks are running on cluster. 
Moreover, node 192.168.130.14 contains 2 data partitions and 4 query tasks are 
running on it, wherein 1 query task is running on data partition data-group-0, 
and 3 query tasks are running on data partition data-group-1; node 
192.168.130.16 contains 2 data partitions and 2 query tasks are running on it, 
wherein 2 query tasks is running on data partition data-group-2, and no query 
task is running on data partition data- [...]
 +
 +### Query Status of Nodes in Cluster (status)
 +
 +IoTDB Cluster contains multiple nodes. For any node, there is a possibility 
that the service cannot be provided normally due to problems of network and 
hardware. With this command, users are able to know the current status of all 
nodes in the cluster.
 +
 +#### Input
 +
 +The command to query status of nodes is `status`, no additional parameters 
are needed.
 +
 +#### Output
 +
 +The output is multiple string lines, each line represents a key-value pair, 
where the key is node IP and the value is status of this node (`on` represents 
normal and `off` represents abnormal. The format of each line is `key -> value`.
 +
 +#### Example
 +
 +Assume that the IoTDB Cluster is running on 3 nodes: 192.168.130.14, 
192.168.130.16 and 192.168.130.18, and number of replicas is 2.
 +
 +The Linux and MacOS system startup commands are as follows:
 +```
 +  Shell > ./bin/nodetool.sh -h 192.168.130.14 status
 +```
 +  
 +The Windows system startup commands are as follows:
 +```
 +  Shell > \bin\nodetool.bat -h 192.168.130.14 status
 +```
 +  
 +After using the command, the successful output will be as follows: 
 +      
 +```
 +192.168.130.14  ->  on
 +192.168.130.16  ->  on
 +192.168.130.18  ->  off
 +```
- The above output indicates that node 192.168.130.14 and node 192.168.130.16 
are in normal state, and node 192.168.130.18 cannot provide services.
++The above output indicates that node 192.168.130.14 and node 192.168.130.16 
are in normal state, and node 192.168.130.18 cannot provide services.
diff --cc iotdb/iotdb/conf/iotdb-cluster.properties
index 29df4ce,fc27ee0..2761b22
--- a/iotdb/iotdb/conf/iotdb-cluster.properties
+++ b/iotdb/iotdb/conf/iotdb-cluster.properties
@@@ -63,11 -67,12 +67,14 @@@ qp_task_timeout_ms = 500
  # number of virtual nodes
  num_of_virtual_nodes = 2
  
- # Maximum number of use inner rpc client
- max_num_of_inner_rpc_client = 500
- 
 +# Maximum number of queue length to use inner rpc client, the request which 
exceed to this
 +# number will be rejected.
+ # Maximum number of inner rpc client thread.
 -# When this value <= 0, use CPU core number * 10
++# When this value <= 0, use CPU core number * 5
+ concurrent_inner_rpc_client_thread = 0
+ 
+ # Maximum number of queue length of qp task which is waiting to be executed. 
If the num of
+ # waiting qp tasks exceed to this number, new qp task will be rejected.
  max_queue_num_of_inner_rpc_client = 500
  
  # ReadMetadataConsistencyLevel: 1  Strong consistency, 2  Weak consistency
@@@ -79,7 -84,7 +86,7 @@@ read_data_consistency_level = 
  # Maximum number of threads which execute tasks generated by client requests 
concurrently.
  # Each client request corresponds to a QP Task. A QP task may be divided into 
several sub-tasks.
  # So this value is the sum of all sub-tasks.
--# When this value <= 0, use CPU core number * 10
++# When this value <= 0, use CPU core number * 5
  concurrent_qp_sub_task_thread = 0
  
  # Batch data size read from remote query node once while reading, default 
value is 10000.
diff --cc 
iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 9bf87c2,99476f2..04e905b
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@@ -32,8 -33,8 +32,8 @@@ import org.apache.iotdb.db.qp.physical.
  import org.apache.iotdb.db.qp.physical.crud.GroupByPlan;
  import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
  import org.apache.iotdb.db.query.context.QueryContext;
- import org.apache.iotdb.db.query.executor.EngineQueryRouter;
 +import org.apache.iotdb.db.query.executor.AbstractQueryRouter;
+ import org.apache.iotdb.db.query.executor.EngineQueryRouter;
 -import org.apache.iotdb.db.query.executor.IEngineQueryRouter;
  import 
org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
  import org.apache.iotdb.tsfile.read.common.Path;
  import org.apache.iotdb.tsfile.read.expression.QueryExpression;
diff --cc 
iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
index c46c6f1,03c600d..b321f45
--- 
a/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
+++ 
b/iotdb/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java
@@@ -162,5 -172,29 +162,4 @@@ public class EngineQueryRouter extends 
          fillType);
      return fillEngineExecutor.execute(context);
    }
 -
 -  /**
 -   * sort intervals by start time and merge overlapping intervals.
 -   *
 -   * @param intervals time interval
 -   */
 -  private List<Pair<Long, Long>> mergeInterval(List<Pair<Long, Long>> 
intervals) {
 -    // sort by interval start time.
 -    intervals.sort(((o1, o2) -> (int) (o1.left - o2.left)));
 -
 -    LinkedList<Pair<Long, Long>> merged = new LinkedList<>();
 -    for (Pair<Long, Long> interval : intervals) {
 -      // if the list of merged intervals is empty or
 -      // if the current interval does not overlap with the previous, simply 
append it.
 -      if (merged.isEmpty() || merged.getLast().right < interval.left) {
 -        merged.add(interval);
 -      } else {
 -        // otherwise, there is overlap, so we merge the current and previous 
intervals.
 -        merged.getLast().right = Math.max(merged.getLast().right, 
interval.right);
 -      }
 -    }
 -    return merged;
 -  }
 -
--
  }

Reply via email to