This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new 4d1423d  update
4d1423d is described below

commit 4d1423dcbeec6bb713f96be51714a83c73f3454b
Author: mdf369 <[email protected]>
AuthorDate: Fri Mar 29 14:14:31 2019 +0800

    update
---
 .../cluster/qp/executor/QueryMetadataExecutor.java | 56 +++++++++++++++-------
 .../rpc/processor/QueryMetadataAsyncProcessor.java |  8 ----
 2 files changed, 40 insertions(+), 24 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 44790e0..e206b65 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -18,10 +18,13 @@
  */
 package org.apache.iotdb.cluster.qp.executor;
 
+import com.alipay.remoting.InvokeCallback;
+import com.alipay.remoting.exception.RemotingException;
 import com.alipay.sofa.jraft.entity.PeerId;
 import com.alipay.sofa.jraft.option.CliOptions;
 import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import org.apache.iotdb.cluster.callback.SingleTask;
 import org.apache.iotdb.cluster.callback.Task;
 import org.apache.iotdb.cluster.callback.Task.TaskState;
@@ -29,17 +32,19 @@ import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.ClusterQPExecutor;
 import org.apache.iotdb.cluster.rpc.MetadataType;
-import org.apache.iotdb.cluster.rpc.NodeAsClient;
-import org.apache.iotdb.cluster.rpc.impl.RaftNodeAsClient;
 import org.apache.iotdb.cluster.rpc.request.QueryMetadataRequest;
+import org.apache.iotdb.cluster.rpc.response.BasicResponse;
 import org.apache.iotdb.cluster.rpc.response.QueryMetadataResponse;
-import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Handle show all storage group logic
  */
 public class QueryMetadataExecutor extends ClusterQPExecutor {
 
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryMetadataExecutor.class);
+
   public QueryMetadataExecutor() {
 
   }
@@ -54,23 +59,42 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
       throws RaftConnectionException, InterruptedException {
     QueryMetadataRequest request = new QueryMetadataRequest(
         ClusterConfig.METADATA_GROUP_ID, type);
-    PeerId leader = RaftUtils.getLeader(ClusterConfig.METADATA_GROUP_ID);
 
     SingleTask task = new SingleTask(false, request);
-    return asyncHandleTask(task, leader, 0);
+    return asyncHandleTaskLocally(task);
   }
 
-  /**
-   * Async handle task by task and leader id.
-   *
-   * @param task request task
-   * @param leader leader of the target raft group
-   * @param taskRetryNum Number of task retries due to timeout and redirected.
-   * @return request result
-   */
-  private Set<String> asyncHandleTask(Task task, PeerId leader, int 
taskRetryNum)
+  private Set<String> asyncHandleTaskLocally(SingleTask task)
       throws RaftConnectionException, InterruptedException {
-    QueryMetadataResponse response = (QueryMetadataResponse) 
asyncHandleTaskGetRes(task, leader, taskRetryNum);
-    return response.getMetadataSet();
+    try {
+      cliClientService.getRpcClient()
+          .invokeWithCallback(localNode.toString(), task.getRequest(),
+              new InvokeCallback() {
+
+                @Override
+                public void onResponse(Object result) {
+                  BasicResponse response = (BasicResponse) result;
+                  task.run(response);
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                  LOGGER.error("Bolt rpc client occurs errors when handling 
Request", e);
+                  task.setTaskState(TaskState.EXCEPTION);
+                  task.run(null);
+
+                }
+
+                @Override
+                public Executor getExecutor() {
+                  return null;
+                }
+              }, CLUSTER_CONFIG.getTaskTimeoutMs());
+    } catch (RemotingException | InterruptedException e) {
+      throw new RaftConnectionException(e);
+    }
+
+    task.await();
+    return ((QueryMetadataResponse) task.getResponse()).getMetadataSet();
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
index 49f4821..69d89a2 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
@@ -49,14 +49,6 @@ public class QueryMetadataAsyncProcessor extends 
BasicAsyncUserProcessor<QueryMe
   @Override
   public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
       QueryMetadataRequest queryMetadataRequest) {
-    String groupId = queryMetadataRequest.getGroupID();
-    if (this.server.getServerId().equals(RaftUtils.getLeader(groupId))) {
-      PeerId leader = RaftUtils.getLeader(groupId);
-      QueryMetadataResponse response = new QueryMetadataResponse(true, false, 
leader.toString(),
-          null);
-      asyncContext.sendResponse(response);
-    }
-
     MetadataType metadataType = queryMetadataRequest.getMetadataType();
     if (metadataType == MetadataType.STORAGE_GROUP) {
       readIndexForSG(asyncContext);

Reply via email to