This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new 3059d5f  unit code and add judge leader process of 
QueryMetadataAsyncProcessor
3059d5f is described below

commit 3059d5f5194cca8deedb2a1b501a0bebadf66d8d
Author: lta <[email protected]>
AuthorDate: Fri Mar 29 11:56:14 2019 +0800

    unit code and add judge leader process of QueryMetadataAsyncProcessor
---
 .../apache/iotdb/cluster/callback/SingleTask.java  |  5 +--
 .../org/apache/iotdb/cluster/callback/Task.java    | 29 +++++++++--------
 .../apache/iotdb/cluster/qp/ClusterQPExecutor.java | 36 ++++++++++++++++++++++
 .../cluster/qp/executor/NonQueryExecutor.java      | 20 ++----------
 .../cluster/qp/executor/QueryMetadataExecutor.java | 15 ++-------
 .../rpc/processor/QueryMetadataAsyncProcessor.java | 10 ++++++
 6 files changed, 68 insertions(+), 47 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
index 2fd49b2..ac5492c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
@@ -26,9 +26,10 @@ import org.apache.iotdb.cluster.rpc.response.BasicResponse;
  */
 public class SingleTask extends Task {
 
+  private static final int TASK_NUM = 1;
 
   public SingleTask(boolean isSyncTask, BasicRequest request) {
-    super(isSyncTask, 1, TaskState.INITIAL);
+    super(isSyncTask, TASK_NUM, TaskState.INITIAL);
     this.request = request;
   }
 
@@ -43,6 +44,6 @@ public class SingleTask extends Task {
     } else if (taskState != TaskState.EXCEPTION) {
       this.taskState = TaskState.FINISH;
     }
-    this.taskNum.countDown();
+    this.taskCountDownLatch.countDown();
   }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
index 8c825ac..0c73a01 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
@@ -39,21 +39,24 @@ public abstract class Task {
   protected boolean isSyncTask;
 
   /**
+   * Count down latch for sub-tasks
+   */
+  protected CountDownLatch taskCountDownLatch;
+
+  /**
    * Num of sub-task
    */
-  protected CountDownLatch taskNum;
+  private int taskNum;
+
   /**
    * Describe task type
    */
   protected TaskState taskState;
-  /**
-   * Default task num
-   */
-  private static final int DEFAULT_TASK_NUM = 1;
 
   public Task(boolean isSyncTask, int taskNum, TaskState taskState) {
     this.isSyncTask = isSyncTask;
-    this.taskNum = new CountDownLatch(taskNum);
+    this.taskNum = taskNum;
+    this.taskCountDownLatch = new CountDownLatch(taskNum);
     this.taskState = taskState;
   }
 
@@ -72,16 +75,12 @@ public abstract class Task {
     isSyncTask = syncTask;
   }
 
-  public CountDownLatch getTaskNum() {
-    return taskNum;
-  }
-
-  public void setTaskNum(int taskNum) {
-    this.taskNum = new CountDownLatch(taskNum);
+  public CountDownLatch getTaskCountDownLatch() {
+    return taskCountDownLatch;
   }
 
-  public void setTaskNum() {
-    this.taskNum = new CountDownLatch(DEFAULT_TASK_NUM);
+  public void resetTask() {
+    this.taskCountDownLatch = new CountDownLatch(taskNum);
   }
 
   public TaskState getTaskState() {
@@ -114,6 +113,6 @@ public abstract class Task {
   }
 
   public void await() throws InterruptedException {
-    this.taskNum.await();
+    this.taskCountDownLatch.await();
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index 11fda06..d281a2a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -18,9 +18,16 @@
  */
 package org.apache.iotdb.cluster.qp;
 
+import com.alipay.sofa.jraft.entity.PeerId;
 import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
+import org.apache.iotdb.cluster.callback.Task;
+import org.apache.iotdb.cluster.callback.Task.TaskState;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.rpc.NodeAsClient;
+import org.apache.iotdb.cluster.rpc.impl.RaftNodeAsClient;
+import org.apache.iotdb.cluster.rpc.response.BasicResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
 import org.apache.iotdb.cluster.utils.hash.Router;
@@ -84,6 +91,35 @@ public abstract class ClusterQPExecutor {
     return false;
   }
 
+  /**
+   * Async handle task by task and leader id
+   *
+   * @param task request task
+   * @param leader leader of the target raft group
+   * @param taskRetryNum Number of task retries due to timeout and redirected.
+   * @return basic response
+   */
+  public BasicResponse asyncHandleTaskGetRes(Task task, PeerId leader, int 
taskRetryNum)
+      throws RaftConnectionException, InterruptedException {
+    if (taskRetryNum >= TASK_MAX_RETRY) {
+      throw new RaftConnectionException(String.format("Task retries reach the 
upper bound %s",
+          TASK_MAX_RETRY));
+    }
+    NodeAsClient client = new RaftNodeAsClient();
+    /** Call async method **/
+    client.asyncHandleRequest(cliClientService, task.getRequest(), leader, 
task);
+    task.await();
+    if (task.getTaskState() != TaskState.FINISH) {
+      if (task.getTaskState() == TaskState.REDIRECT) {
+        /** redirect to the right leader **/
+        leader = PeerId.parsePeer(task.getResponse().getLeaderStr());
+      }
+      task.resetTask();
+      return asyncHandleTaskGetRes(task, leader, taskRetryNum + 1);
+    }
+    return task.getResponse();
+  }
+
   public void shutdown() {
     cliClientService.shutdown();
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index e263a80..88edd0a 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -30,6 +30,7 @@ import org.apache.iotdb.cluster.qp.ClusterQPExecutor;
 import org.apache.iotdb.cluster.rpc.NodeAsClient;
 import org.apache.iotdb.cluster.rpc.impl.RaftNodeAsClient;
 import org.apache.iotdb.cluster.rpc.request.NonQueryRequest;
+import org.apache.iotdb.cluster.rpc.response.BasicResponse;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
@@ -204,23 +205,8 @@ public class NonQueryExecutor extends ClusterQPExecutor {
    */
   private boolean asyncHandleTask(Task task, PeerId leader, int taskRetryNum)
       throws RaftConnectionException, InterruptedException {
-    if (taskRetryNum >= TASK_MAX_RETRY) {
-      throw new RaftConnectionException(String.format("Task retries reach the 
upper bound %s",
-          TASK_MAX_RETRY));
-    }
-    NodeAsClient client = new RaftNodeAsClient();
-    /** Call async method **/
-    client.asyncHandleRequest(cliClientService, task.getRequest(), leader, 
task);
-    task.await();
-    if (task.getTaskState() != TaskState.FINISH) {
-      if (task.getTaskState() == TaskState.REDIRECT) {
-        /** redirect to the right leader **/
-        leader = PeerId.parsePeer(task.getResponse().getLeaderStr());
-      }
-      task.setTaskNum();
-      return asyncHandleTask(task, leader, taskRetryNum + 1);
-    }
-    return task.getResponse().isSuccess();
+    BasicResponse response = asyncHandleTaskGetRes(task, leader, taskRetryNum);
+    return response.isSuccess();
   }
 
   public void shutdown() {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index b514cbf..44790e0 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -70,18 +70,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor 
{
    */
   private Set<String> asyncHandleTask(Task task, PeerId leader, int 
taskRetryNum)
       throws RaftConnectionException, InterruptedException {
-    if (taskRetryNum >= TASK_MAX_RETRY) {
-      throw new RaftConnectionException(String.format("Task retries reach the 
upper bound %s",
-          TASK_MAX_RETRY));
-    }
-    NodeAsClient client = new RaftNodeAsClient();
-    /** Call async method **/
-    client.asyncHandleRequest(cliClientService, task.getRequest(), leader, 
task);
-    task.await();
-    if (task.getTaskState() != TaskState.FINISH) {
-      task.setTaskNum(SUB_TASK_NUM);
-      return asyncHandleTask(task, leader, taskRetryNum + 1);
-    }
-    return ((QueryMetadataResponse) task.getResponse()).getMetadataSet();
+    QueryMetadataResponse response = (QueryMetadataResponse) 
asyncHandleTaskGetRes(task, leader, taskRetryNum);
+    return response.getMetadataSet();
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
index 2320a2e..49f4821 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
@@ -22,6 +22,7 @@ import com.alipay.remoting.AsyncContext;
 import com.alipay.remoting.BizContext;
 import com.alipay.sofa.jraft.Status;
 import com.alipay.sofa.jraft.closure.ReadIndexClosure;
+import com.alipay.sofa.jraft.entity.PeerId;
 import com.alipay.sofa.jraft.util.Bits;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iotdb.cluster.entity.Server;
@@ -30,6 +31,7 @@ import org.apache.iotdb.cluster.entity.raft.RaftService;
 import org.apache.iotdb.cluster.rpc.MetadataType;
 import org.apache.iotdb.cluster.rpc.request.QueryMetadataRequest;
 import org.apache.iotdb.cluster.rpc.response.QueryMetadataResponse;
+import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +49,14 @@ public class QueryMetadataAsyncProcessor extends 
BasicAsyncUserProcessor<QueryMe
   @Override
   public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
       QueryMetadataRequest queryMetadataRequest) {
+    String groupId = queryMetadataRequest.getGroupID();
+    if (this.server.getServerId().equals(RaftUtils.getLeader(groupId))) {
+      PeerId leader = RaftUtils.getLeader(groupId);
+      QueryMetadataResponse response = new QueryMetadataResponse(true, false, 
leader.toString(),
+          null);
+      asyncContext.sendResponse(response);
+    }
+
     MetadataType metadataType = queryMetadataRequest.getMetadataType();
     if (metadataType == MetadataType.STORAGE_GROUP) {
       readIndexForSG(asyncContext);

Reply via email to