This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 9472626 reconstruct response
9472626 is described below
commit 9472626fa5e6d5c051673c872ceca8f5f02b587e
Author: lta <[email protected]>
AuthorDate: Wed Apr 3 10:32:10 2019 +0800
reconstruct response
---
.../org/apache/iotdb/cluster/callback/BatchQPTask.java | 9 +++++++--
.../java/org/apache/iotdb/cluster/callback/QPTask.java | 2 ++
.../org/apache/iotdb/cluster/callback/SingleQPTask.java | 5 +++++
.../org/apache/iotdb/cluster/qp/ClusterQPExecutor.java | 5 +++++
.../iotdb/cluster/qp/executor/NonQueryExecutor.java | 8 +++++---
.../rpc/processor/DataGroupNonQueryAsyncProcessor.java | 7 +++----
.../rpc/processor/MetaGroupNonQueryAsyncProcessor.java | 6 +++---
.../cluster/rpc/response/DataGroupNonQueryResponse.java | 15 ++++++++++++---
.../cluster/rpc/response/MetaGroupNonQueryResponse.java | 8 ++++++--
9 files changed, 48 insertions(+), 17 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
index 679d8db..682066b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/BatchQPTask.java
@@ -95,7 +95,7 @@ public class BatchQPTask extends QPTask {
this.run(subTask.getResponse());
} catch (InterruptedException e) {
LOGGER.info("Handle sub task locally failed.");
- this.run(new DataGroupNonQueryResponse(groupId, false, null,
e.toString()));
+ this.run(DataGroupNonQueryResponse.createErrorInstance(groupId,
e.toString()));
}
});
thread.start();
@@ -106,7 +106,7 @@ public class BatchQPTask extends QPTask {
this.run(subTask.getResponse());
} catch (RaftConnectionException | InterruptedException e) {
LOGGER.info("Async handle sub task failed.");
- this.run(new DataGroupNonQueryResponse(groupId, false, null,
e.toString()));
+ this.run(DataGroupNonQueryResponse.createErrorInstance(groupId,
e.toString()));
}
});
thread.start();
@@ -144,6 +144,11 @@ public class BatchQPTask extends QPTask {
taskCountDownLatch.countDown();
}
+ @Override
+ public void cancel() {
+
+ }
+
public boolean isAllSuccessful() {
return isAllSuccessful;
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
index 3fdffea..ee090f3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
@@ -122,4 +122,6 @@ public abstract class QPTask {
public void await() throws InterruptedException {
this.taskCountDownLatch.await();
}
+
+ public abstract void cancel();
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
index 85da087..6c9850e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
@@ -46,4 +46,9 @@ public class SingleQPTask extends QPTask {
}
this.taskCountDownLatch.countDown();
}
+
+ @Override
+ public void cancel() {
+
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index 42c3dd9..5bf13fc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -54,6 +54,11 @@ public abstract class ClusterQPExecutor {
protected final Server server = Server.getInstance();
/**
+ * The task in progress.
+ */
+ protected QPTask currentTask;
+
+ /**
* Rpc Service Client
*/
protected BoltCliClientService cliClientService;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index db837c3..ea42bb8 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -243,6 +243,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
}
/** Execute multi task **/
BatchQPTask task = new BatchQPTask(subTaskMap.size(), batchResult,
subTaskMap, planIndexMap);
+ currentTask = task;
task.execute(this);
task.await();
batchResult.setAllSuccessful(task.isAllSuccessful());
@@ -250,7 +251,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
}
private boolean update(UpdatePlan updatePlan)
- throws PathErrorException, InterruptedException,
RaftConnectionException, ProcessorException, IOException {
+ throws PathErrorException, InterruptedException,
RaftConnectionException, IOException {
Path path = updatePlan.getPath();
String deviceId = path.getDevice();
String storageGroup = getStroageGroupByDevice(deviceId);
@@ -266,7 +267,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
* Handle insert plan
*/
private boolean insert(InsertPlan insertPlan)
- throws ProcessorException, PathErrorException, InterruptedException,
IOException, RaftConnectionException {
+ throws PathErrorException, InterruptedException, IOException,
RaftConnectionException {
String deviceId = insertPlan.getDeviceId();
String storageGroup = getStroageGroupByDevice(deviceId);
return handleDataGroupRequest(storageGroup, insertPlan);
@@ -328,6 +329,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
plans.add(plan);
DataGroupNonQueryRequest request = new DataGroupNonQueryRequest(groupId,
plans);
SingleQPTask qpTask = new SingleQPTask(false, request);
+ currentTask = qpTask;
/** Check if the plan can be executed locally. **/
if (canHandleNonQueryBySG(storageGroup)) {
return handleDataGroupRequestLocally(groupId, qpTask);
@@ -342,7 +344,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
public boolean handleDataGroupRequestLocally(String groupId, QPTask qpTask)
throws InterruptedException {
final Task task = new Task();
- BasicResponse response = new DataGroupNonQueryResponse(groupId, false,
null, null);
+ BasicResponse response =
DataGroupNonQueryResponse.createEmptyInstance(groupId);
ResponseClosure closure = new ResponseClosure(response, status -> {
response.addResult(status.isOk());
if (!status.isOk()) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
index 8d60a43..9f7a43e 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
@@ -62,16 +62,15 @@ public class DataGroupNonQueryAsyncProcessor extends
if (!dataPartitionRaftHolder.getFsm().isLeader()) {
PeerId leader = RaftUtils.getLeaderPeerID(groupId);
LOGGER.info("Request need to redirect leader: {}, groupId : {} ",
leader, groupId);
- DataGroupNonQueryResponse response = new
DataGroupNonQueryResponse(groupId, true,
- leader.toString(),
- null);
+ DataGroupNonQueryResponse response = DataGroupNonQueryResponse
+ .createRedirectedInstance(groupId, leader.toString());
asyncContext.sendResponse(response);
} else {
LOGGER.info("Apply task to raft node");
/** Apply QPTask to Raft Node **/
final Task task = new Task();
- BasicResponse response = new DataGroupNonQueryResponse(groupId, false,
null, null);
+ BasicResponse response =
DataGroupNonQueryResponse.createEmptyInstance(groupId);
ResponseClosure closure = new ResponseClosure(response, status -> {
response.addResult(status.isOk());
if (!status.isOk()) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
index 0c36efc..6448d07 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
@@ -61,15 +61,15 @@ public class MetaGroupNonQueryAsyncProcessor extends
if (!metadataHolder.getFsm().isLeader()) {
PeerId leader = RaftUtils.getLeaderPeerID(groupId);
LOGGER.info("Request need to redirect leader: {}, groupId : {} ",
leader, groupId);
- MetaGroupNonQueryResponse response = new
MetaGroupNonQueryResponse(groupId, true,
- leader.toString(), null);
+ MetaGroupNonQueryResponse response = MetaGroupNonQueryResponse
+ .createRedirectedInstance(groupId, leader.toString());
asyncContext.sendResponse(response);
} else {
LOGGER.info("Apply task to metadata raft node");
/** Apply QPTask to Raft Node **/
final Task task = new Task();
- BasicResponse response = new MetaGroupNonQueryResponse(groupId, false,
null, null);
+ BasicResponse response =
MetaGroupNonQueryResponse.createEmptyInstance(groupId);
ResponseClosure closure = new ResponseClosure(response, status -> {
response.addResult(status.isOk());
if (!status.isOk()) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
index 8a60180..7a4c1f8 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
@@ -23,12 +23,21 @@ package org.apache.iotdb.cluster.rpc.response;
*/
public class DataGroupNonQueryResponse extends BasicResponse {
- public DataGroupNonQueryResponse(String groupId, boolean redirected, String
leaderStr, String errorMsg) {
+ public DataGroupNonQueryResponse(String groupId, boolean redirected, String
leaderStr,
+ String errorMsg) {
super(groupId, redirected, leaderStr, errorMsg);
}
- public DataGroupNonQueryResponse(String groupId, boolean redirected) {
- super(groupId, redirected, null, null);
+ public static DataGroupNonQueryResponse createRedirectedInstance(String
groupId, String leaderStr) {
+ return new DataGroupNonQueryResponse(groupId, true, leaderStr, null);
+ }
+
+ public static DataGroupNonQueryResponse createEmptyInstance(String groupId) {
+ return new DataGroupNonQueryResponse(groupId, false, null, null);
+ }
+
+ public static DataGroupNonQueryResponse createErrorInstance(String groupId,
String errorMsg) {
+ return new DataGroupNonQueryResponse(groupId, false, null, errorMsg);
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
index 74c93a1..8502074 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
@@ -27,8 +27,12 @@ public class MetaGroupNonQueryResponse extends BasicResponse
{
super(groupId, redirected, leaderStr, errorMsg);
}
- public MetaGroupNonQueryResponse(String groupId, boolean redirected) {
- super(groupId, redirected, null, null);
+ public static MetaGroupNonQueryResponse createRedirectedInstance(String
groupId, String leaderStr) {
+ return new MetaGroupNonQueryResponse(groupId, true, leaderStr, null);
+ }
+
+ public static MetaGroupNonQueryResponse createEmptyInstance(String groupId) {
+ return new MetaGroupNonQueryResponse(groupId, false, null, null);
}
}