This is an automated email from the ASF dual-hosted git repository.

east pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new 2bc0f52  update
2bc0f52 is described below

commit 2bc0f529815ffa261084d8b5d905b0fe5bf36741
Author: mdf369 <[email protected]>
AuthorDate: Tue Apr 2 19:00:02 2019 +0800

    update
---
 .../org/apache/iotdb/cluster/entity/Server.java    |  2 +
 .../apache/iotdb/cluster/qp/ClusterQPExecutor.java | 34 ++++++----
 .../cluster/qp/executor/QueryMetadataExecutor.java | 45 +++++++------
 .../QueryMetadataInStringAsyncProcessor.java       | 74 ++++++++++++++++++++++
 4 files changed, 123 insertions(+), 32 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index d489c11..5131070 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -31,6 +31,7 @@ import 
org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.rpc.processor.DataGroupNonQueryAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.processor.MetaGroupNonQueryAsyncProcessor;
+import 
org.apache.iotdb.cluster.rpc.processor.QueryMetadataInStringAsyncProcessor;
 import org.apache.iotdb.cluster.rpc.processor.QueryTimeSeriesAsyncProcessor;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
@@ -66,6 +67,7 @@ public class Server {
     rpcServer.registerUserProcessor(new DataGroupNonQueryAsyncProcessor(this));
     rpcServer.registerUserProcessor(new MetaGroupNonQueryAsyncProcessor(this));
     rpcServer.registerUserProcessor(new QueryTimeSeriesAsyncProcessor(this));
+    rpcServer.registerUserProcessor(new 
QueryMetadataInStringAsyncProcessor(this));
 
     metadataHolder = new MetadataRaftHolder(peerIds, serverId, rpcServer, 
true);
     metadataHolder.init();
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index cf0ded9..36b3207 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -154,32 +154,42 @@ public abstract class ClusterQPExecutor {
   /**
    * Async handle QPTask by QPTask and leader id
    *
-   * @param QPTask request QPTask
+   * @param task request QPTask
    * @param leader leader of the target raft group
    * @param taskRetryNum Number of QPTask retries due to timeout and 
redirected.
    * @return basic response
    */
-  public BasicResponse asyncHandleTaskGetRes(QPTask QPTask, PeerId leader, int 
taskRetryNum)
+  public BasicResponse asyncHandleTaskGetRes(QPTask task, PeerId leader, int 
taskRetryNum)
       throws InterruptedException, RaftConnectionException {
+    asyncSendTask(task, leader, taskRetryNum);
+    return asyncGetRes(task, leader, taskRetryNum);
+  }
+
+  public void asyncSendTask(QPTask task, PeerId leader, int taskRetryNum)
+      throws RaftConnectionException {
     if (taskRetryNum >= TASK_MAX_RETRY) {
       throw new RaftConnectionException(String.format("QPTask retries reach 
the upper bound %s",
           TASK_MAX_RETRY));
     }
     NodeAsClient client = new RaftNodeAsClient();
     /** Call async method **/
-    client.asyncHandleRequest(cliClientService, QPTask.getRequest(), leader, 
QPTask);
-    QPTask.await();
-    if (QPTask.getTaskState() != TaskState.FINISH) {
-      if (QPTask.getTaskState() == TaskState.REDIRECT) {
+    client.asyncHandleRequest(cliClientService, task.getRequest(), leader, 
task);
+  }
+
+  public BasicResponse asyncGetRes(QPTask task, PeerId leader, int 
taskRetryNum)
+      throws InterruptedException, RaftConnectionException {
+    task.await();
+    if (task.getTaskState() != TaskState.FINISH) {
+      if (task.getTaskState() == TaskState.REDIRECT) {
         /** redirect to the right leader **/
-        leader = PeerId.parsePeer(QPTask.getResponse().getLeaderStr());
-        LOGGER.info("Redirect leader: {}, group id = {}", leader, 
QPTask.getRequest().getGroupID());
-        RaftUtils.updateRaftGroupLeader(QPTask.getRequest().getGroupID(), 
leader);
+        leader = PeerId.parsePeer(task.getResponse().getLeaderStr());
+        LOGGER.info("Redirect leader: {}, group id = {}", leader, 
task.getRequest().getGroupID());
+        RaftUtils.updateRaftGroupLeader(task.getRequest().getGroupID(), 
leader);
       }
-      QPTask.resetTask();
-      return asyncHandleTaskGetRes(QPTask, leader, taskRetryNum + 1);
+      task.resetTask();
+      return asyncHandleTaskGetRes(task, leader, taskRetryNum + 1);
     }
-    return QPTask.getResponse();
+    return task.getResponse();
   }
 
   public void shutdown() {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 8a3fcd4..be20208 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -101,39 +101,41 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
     Set<String> groupIdSet = router.getAllGroupId();
 
     List<String> metadataList = new ArrayList<>(groupIdSet.size());
+    List<SingleQPTask> taskList = new ArrayList<>();
     for (String groupId : groupIdSet) {
       QueryMetadataInStringRequest request = new 
QueryMetadataInStringRequest(groupId);
       SingleQPTask task = new SingleQPTask(false, request);
+      taskList.add(task);
 
       LOGGER.info("Execute show metadata in string statement for group {}.", 
groupId);
       /** Check if the plan can be executed locally. **/
       if (router.insideGroup(groupId, localNode)) {
         LOGGER.info("Execute show metadata in string statement locally for 
group {}.", groupId);
-        metadataList.add(queryMetadataInStringLocally(groupId, task));
+        asyncQueryMetadataInStringLocally(groupId, task);
       } else {
         try {
           PeerId holder = RaftUtils.getRandomPeerID(groupId);
-          metadataList.add(queryMetadataInString(task, holder));
+          asyncSendTask(task, holder, 0);
         } catch (RaftConnectionException e) {
           LOGGER.error(e.getMessage());
           throw new ProcessorException("Raft connection occurs error.", e);
         }
       }
     }
+    for (int i = 0; i < taskList.size(); i++) {
+      SingleQPTask task = taskList.get(i);
+      task.await();
+      QueryMetadataInStringResponse response = (QueryMetadataInStringResponse) 
task.getResponse();
+      if (!response.isSuccess()) {
+        LOGGER.error("Execute show timeseries statement false.");
+        throw new ProcessorException();
+      }
+      return ((QueryMetadataInStringResponse) 
task.getResponse()).getMetadata();
+    }
     return combineMetadataInStringList(metadataList);
   }
 
   /**
-   * Combine multiple metadata in String format into single String
-   *
-   * @param metadataList
-   * @return single String of all metadata
-   */
-  private String combineMetadataInStringList(List<String> metadataList) {
-    return null; //TODO
-  }
-
-  /**
    * Handle "show timeseries <path>" statement
    *
    * @param path column path
@@ -217,7 +219,7 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
   /**
    * Handle "show timeseries" statement
    */
-  private String queryMetadataInStringLocally(String groupId, SingleQPTask 
task)
+  private void asyncQueryMetadataInStringLocally(String groupId, SingleQPTask 
task)
       throws InterruptedException, ProcessorException {
     final byte[] reqContext = new byte[4];
     Bits.putInt(reqContext, 0, requestId.incrementAndGet());
@@ -239,13 +241,6 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
             task.run(response);
           }
         });
-    task.await();
-    QueryMetadataInStringResponse response = (QueryMetadataInStringResponse) 
task.getResponse();
-    if (!response.isSuccess()) {
-      LOGGER.error("Execute show timeseries statement false.");
-      throw new ProcessorException();
-    }
-    return ((QueryMetadataInStringResponse) task.getResponse()).getMetadata();
   }
 
   private String queryMetadataInString(SingleQPTask task, PeerId leader)
@@ -253,4 +248,14 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
     BasicResponse response = asyncHandleTaskGetRes(task, leader, 0);
     return ((QueryMetadataInStringResponse) response).getMetadata();
   }
+
+  /**
+   * Combine multiple metadata in String format into single String
+   *
+   * @param metadataList
+   * @return single String of all metadata
+   */
+  private String combineMetadataInStringList(List<String> metadataList) {
+    return null; //TODO
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataInStringAsyncProcessor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataInStringAsyncProcessor.java
new file mode 100644
index 0000000..bd11468
--- /dev/null
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataInStringAsyncProcessor.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.processor;
+
+import com.alipay.remoting.AsyncContext;
+import com.alipay.remoting.BizContext;
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.closure.ReadIndexClosure;
+import com.alipay.sofa.jraft.util.Bits;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.cluster.entity.Server;
+import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
+import org.apache.iotdb.cluster.entity.raft.RaftService;
+import org.apache.iotdb.cluster.rpc.request.QueryMetadataInStringRequest;
+import org.apache.iotdb.cluster.rpc.response.QueryMetadataInStringResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QueryMetadataInStringAsyncProcessor  extends 
BasicAsyncUserProcessor<QueryMetadataInStringRequest> {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryMetadataInStringAsyncProcessor.class);
+  private final AtomicInteger requestId = new AtomicInteger(0);
+  private Server server;
+
+  public QueryMetadataInStringAsyncProcessor(Server server) {
+    this.server = server;
+  }
+
+  @Override
+  public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
+      QueryMetadataInStringRequest request) {
+    String groupId = request.getGroupID();
+    final byte[] reqContext = new byte[4];
+    Bits.putInt(reqContext, 0, requestId.incrementAndGet());
+    DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder) 
server
+        .getDataPartitionHolder(groupId);
+    ((RaftService) dataPartitionHolder.getService()).getNode()
+        .readIndex(reqContext, new ReadIndexClosure() {
+
+          @Override
+          public void run(Status status, long index, byte[] reqCtx) {
+            QueryMetadataInStringResponse response;
+            if (status.isOk()) {
+              response = new QueryMetadataInStringResponse(false, true,
+                  dataPartitionHolder.getFsm().getMetadataInString());
+            } else {
+              response = new QueryMetadataInStringResponse(false, false, null, 
null);
+            }
+            asyncContext.sendResponse(response);
+          }
+        });
+  }
+
+  @Override
+  public String interest() {
+    return QueryMetadataInStringRequest.class.getName();
+  }
+}

Reply via email to