This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new 9472626  reconstruct response
9472626 is described below

commit 9472626fa5e6d5c051673c872ceca8f5f02b587e
Author: lta <[email protected]>
AuthorDate: Wed Apr 3 10:32:10 2019 +0800

    reconstruct response
---
 .../org/apache/iotdb/cluster/callback/BatchQPTask.java    |  9 +++++++--
 .../java/org/apache/iotdb/cluster/callback/QPTask.java    |  2 ++
 .../org/apache/iotdb/cluster/callback/SingleQPTask.java   |  5 +++++
 .../org/apache/iotdb/cluster/qp/ClusterQPExecutor.java    |  5 +++++
 .../iotdb/cluster/qp/executor/NonQueryExecutor.java       |  8 +++++---
 .../rpc/processor/DataGroupNonQueryAsyncProcessor.java    |  7 +++----
 .../rpc/processor/MetaGroupNonQueryAsyncProcessor.java    |  6 +++---
 .../cluster/rpc/response/DataGroupNonQueryResponse.java   | 15 ++++++++++++---
 .../cluster/rpc/response/MetaGroupNonQueryResponse.java   |  8 ++++++--
 9 files changed, 48 insertions(+), 17 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
index 679d8db..682066b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
@@ -95,7 +95,7 @@ public class BatchQPTask extends QPTask {
             this.run(subTask.getResponse());
           } catch (InterruptedException e) {
             LOGGER.info("Handle sub task locally failed.");
-            this.run(new DataGroupNonQueryResponse(groupId, false, null, 
e.toString()));
+            this.run(DataGroupNonQueryResponse.createErrorInstance(groupId, 
e.toString()));
           }
         });
         thread.start();
@@ -106,7 +106,7 @@ public class BatchQPTask extends QPTask {
             this.run(subTask.getResponse());
           } catch (RaftConnectionException | InterruptedException e) {
             LOGGER.info("Async handle sub task failed.");
-            this.run(new DataGroupNonQueryResponse(groupId, false, null, 
e.toString()));
+            this.run(DataGroupNonQueryResponse.createErrorInstance(groupId, 
e.toString()));
           }
         });
         thread.start();
@@ -144,6 +144,11 @@ public class BatchQPTask extends QPTask {
     taskCountDownLatch.countDown();
   }
 
+  @Override
+  public void cancel() {
+
+  }
+
   public boolean isAllSuccessful() {
     return isAllSuccessful;
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
index 3fdffea..ee090f3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
@@ -122,4 +122,6 @@ public abstract class QPTask {
   public void await() throws InterruptedException {
     this.taskCountDownLatch.await();
   }
+
+  public abstract void cancel();
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
index 85da087..6c9850e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
@@ -46,4 +46,9 @@ public class SingleQPTask extends QPTask {
     }
     this.taskCountDownLatch.countDown();
   }
+
+  @Override
+  public void cancel() {
+
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index 42c3dd9..5bf13fc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -54,6 +54,11 @@ public abstract class ClusterQPExecutor {
   protected final Server server = Server.getInstance();
 
   /**
+   * The task in progress.
+   */
+  protected QPTask currentTask;
+
+  /**
    * Rpc Service Client
    */
   protected BoltCliClientService cliClientService;
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index db837c3..ea42bb8 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -243,6 +243,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
     }
     /** Execute multi task **/
     BatchQPTask task = new BatchQPTask(subTaskMap.size(), batchResult, 
subTaskMap, planIndexMap);
+    currentTask = task;
     task.execute(this);
     task.await();
     batchResult.setAllSuccessful(task.isAllSuccessful());
@@ -250,7 +251,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
   }
 
   private boolean update(UpdatePlan updatePlan)
-      throws PathErrorException, InterruptedException, 
RaftConnectionException, ProcessorException, IOException {
+      throws PathErrorException, InterruptedException, 
RaftConnectionException, IOException {
     Path path = updatePlan.getPath();
     String deviceId = path.getDevice();
     String storageGroup = getStroageGroupByDevice(deviceId);
@@ -266,7 +267,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
    * Handle insert plan
    */
   private boolean insert(InsertPlan insertPlan)
-      throws ProcessorException, PathErrorException, InterruptedException, 
IOException, RaftConnectionException {
+      throws PathErrorException, InterruptedException, IOException, 
RaftConnectionException {
     String deviceId = insertPlan.getDeviceId();
     String storageGroup = getStroageGroupByDevice(deviceId);
     return handleDataGroupRequest(storageGroup, insertPlan);
@@ -328,6 +329,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
     plans.add(plan);
     DataGroupNonQueryRequest request = new DataGroupNonQueryRequest(groupId, 
plans);
     SingleQPTask qpTask = new SingleQPTask(false, request);
+    currentTask = qpTask;
     /** Check if the plan can be executed locally. **/
     if (canHandleNonQueryBySG(storageGroup)) {
       return handleDataGroupRequestLocally(groupId, qpTask);
@@ -342,7 +344,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
   public boolean handleDataGroupRequestLocally(String groupId, QPTask qpTask)
       throws InterruptedException {
     final Task task = new Task();
-    BasicResponse response = new DataGroupNonQueryResponse(groupId, false, 
null, null);
+    BasicResponse response = 
DataGroupNonQueryResponse.createEmptyInstance(groupId);
     ResponseClosure closure = new ResponseClosure(response, status -> {
       response.addResult(status.isOk());
       if (!status.isOk()) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
index 8d60a43..9f7a43e 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
@@ -62,16 +62,15 @@ public class DataGroupNonQueryAsyncProcessor extends
     if (!dataPartitionRaftHolder.getFsm().isLeader()) {
       PeerId leader = RaftUtils.getLeaderPeerID(groupId);
       LOGGER.info("Request need to redirect leader: {}, groupId : {} ", 
leader, groupId);
-      DataGroupNonQueryResponse response = new 
DataGroupNonQueryResponse(groupId, true,
-          leader.toString(),
-          null);
+      DataGroupNonQueryResponse response = DataGroupNonQueryResponse
+          .createRedirectedInstance(groupId, leader.toString());
       asyncContext.sendResponse(response);
     } else {
 
       LOGGER.info("Apply task to raft node");
       /** Apply QPTask to Raft Node **/
       final Task task = new Task();
-      BasicResponse response = new DataGroupNonQueryResponse(groupId, false, 
null, null);
+      BasicResponse response = 
DataGroupNonQueryResponse.createEmptyInstance(groupId);
       ResponseClosure closure = new ResponseClosure(response, status -> {
         response.addResult(status.isOk());
         if (!status.isOk()) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
index 0c36efc..6448d07 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
@@ -61,15 +61,15 @@ public class MetaGroupNonQueryAsyncProcessor extends
     if (!metadataHolder.getFsm().isLeader()) {
       PeerId leader = RaftUtils.getLeaderPeerID(groupId);
       LOGGER.info("Request need to redirect leader: {}, groupId : {} ", 
leader, groupId);
-      MetaGroupNonQueryResponse response = new 
MetaGroupNonQueryResponse(groupId, true,
-          leader.toString(), null);
+      MetaGroupNonQueryResponse response = MetaGroupNonQueryResponse
+          .createRedirectedInstance(groupId, leader.toString());
       asyncContext.sendResponse(response);
     } else {
 
       LOGGER.info("Apply task to metadata raft node");
       /** Apply QPTask to Raft Node **/
       final Task task = new Task();
-      BasicResponse response = new MetaGroupNonQueryResponse(groupId, false, 
null, null);
+      BasicResponse response = 
MetaGroupNonQueryResponse.createEmptyInstance(groupId);
       ResponseClosure closure = new ResponseClosure(response, status -> {
         response.addResult(status.isOk());
         if (!status.isOk()) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
index 8a60180..7a4c1f8 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
@@ -23,12 +23,21 @@ package org.apache.iotdb.cluster.rpc.response;
  */
 public class DataGroupNonQueryResponse extends BasicResponse {
 
-  public DataGroupNonQueryResponse(String groupId, boolean redirected, String 
leaderStr, String errorMsg) {
+  public DataGroupNonQueryResponse(String groupId, boolean redirected, String 
leaderStr,
+      String errorMsg) {
     super(groupId, redirected, leaderStr, errorMsg);
   }
 
-  public DataGroupNonQueryResponse(String groupId, boolean redirected) {
-    super(groupId, redirected, null, null);
+  public static DataGroupNonQueryResponse createRedirectedInstance(String 
groupId, String leaderStr) {
+    return new DataGroupNonQueryResponse(groupId, true, leaderStr, null);
+  }
+
+  public static DataGroupNonQueryResponse createEmptyInstance(String groupId) {
+    return new DataGroupNonQueryResponse(groupId, false, null, null);
+  }
+
+  public static DataGroupNonQueryResponse createErrorInstance(String groupId, 
String errorMsg) {
+    return new DataGroupNonQueryResponse(groupId, false, null, errorMsg);
   }
 
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
index 74c93a1..8502074 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
@@ -27,8 +27,12 @@ public class MetaGroupNonQueryResponse extends BasicResponse 
{
     super(groupId, redirected, leaderStr, errorMsg);
   }
 
-  public MetaGroupNonQueryResponse(String groupId, boolean redirected) {
-    super(groupId, redirected, null, null);
+  public static MetaGroupNonQueryResponse createRedirectedInstance(String 
groupId, String leaderStr) {
+    return new MetaGroupNonQueryResponse(groupId, true, leaderStr, null);
+  }
+
+  public static MetaGroupNonQueryResponse createEmptyInstance(String groupId) {
+    return new MetaGroupNonQueryResponse(groupId, false, null, null);
   }
 
 }

Reply via email to