This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 2ac9056dc94131ae33a7495c65806943655799aa Author: lta <[email protected]> AuthorDate: Wed May 22 18:19:37 2019 +0800 format codes --- .../iotdb/cluster/entity/raft/DataStateMachine.java | 2 +- .../iotdb/cluster/qp/executor/QueryMetadataExecutor.java | 4 ++-- .../org/apache/iotdb/cluster/qp/task/BatchQPTask.java | 10 +++++----- .../java/org/apache/iotdb/cluster/qp/task/QPTask.java | 2 +- .../qp/task/{QueryTask.java => QueryDataTask.java} | 4 ++-- .../org/apache/iotdb/cluster/qp/task/SingleQPTask.java | 2 +- .../iotdb/cluster/query/utils/ClusterRpcReaderUtils.java | 14 ++++---------- .../org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java | 4 ++-- .../cluster/rpc/raft/impl/RaftNodeAsClientManager.java | 16 ++++++++-------- .../java/org/apache/iotdb/cluster/utils/RaftUtils.java | 6 +++--- 10 files changed, 29 insertions(+), 35 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java index b8c6f43..de46d3a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java @@ -116,7 +116,7 @@ public class DataStateMachine extends StateMachineAdapter { PhysicalPlan plan = PhysicalPlanLogTransfer.logToOperator(planByte); LOGGER.debug("OperatorType :{}", plan.getOperatorType()); - /** If the request is to set path and sg of the path doesn't exist, it needs to run null-read in meta group to avoid out of data sync **/ + /** If the request is to set path and sg of the path doesn't exist, it needs to receive null-read in meta group to avoid out of data sync **/ if (plan.getOperatorType() == OperatorType.CREATE_TIMESERIES && !checkPathExistence( ((MetadataPlan) plan).getPath().getFullPath())) { RaftUtils.handleNullReadToMetaGroup(status); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java index 229009e..6043ce6 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java @@ -452,7 +452,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor { QueryStorageGroupResponse response; response = QueryStorageGroupResponse .createSuccessResponse(metadataHolder.getFsm().getAllStorageGroups()); - task.run(response); + task.receive(response); } else { ((RaftService) metadataHolder.getService()).getNode() .readIndex(reqContext, new ReadIndexClosure() { @@ -466,7 +466,7 @@ public class QueryMetadataExecutor extends AbstractQPExecutor { } else { response = QueryStorageGroupResponse.createErrorResponse(status.getErrorMsg()); } - task.run(response); + task.receive(response); } }); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java index 667a55e..4562d77 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/BatchQPTask.java @@ -91,7 +91,7 @@ public class BatchQPTask extends MultiQPTask { * @param basicResponse response from receiver */ @Override - public void run(BasicResponse basicResponse) { + public void receive(BasicResponse basicResponse) { lock.lock(); try { String groupId = basicResponse.getGroupId(); @@ -140,10 +140,10 @@ public class BatchQPTask extends MultiQPTask { private void executeLocalSubTask(QPTask subTask, String groupId) { try { executor.handleNonQueryRequestLocally(groupId, subTask); - this.run(subTask.getResponse()); + this.receive(subTask.getResponse()); } catch (InterruptedException e) { LOGGER.error("Handle sub task locally failed."); - this.run(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage())); + this.receive(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage())); } } @@ -153,10 +153,10 @@ public class BatchQPTask extends MultiQPTask { private void executeRpcSubTask(SingleQPTask subTask, PeerId leader, String groupId) { try { executor.asyncHandleNonQueryTask(subTask, leader); - this.run(subTask.getResponse()); + this.receive(subTask.getResponse()); } catch (RaftConnectionException | InterruptedException e) { LOGGER.error("Async handle sub task failed."); - this.run(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage())); + this.receive(DataGroupNonQueryResponse.createErrorResponse(groupId, e.getMessage())); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java index 96a517a..2ca1359 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QPTask.java @@ -78,7 +78,7 @@ public abstract class QPTask { * * @param basicResponse response from receiver */ - public abstract void run(BasicResponse basicResponse); + public abstract void receive(BasicResponse basicResponse); public boolean isSyncTask() { return isSyncTask; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryDataTask.java similarity index 94% rename from cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryTask.java rename to cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryDataTask.java index f4cb4b5..6946925 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryTask.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryDataTask.java @@ -21,11 +21,11 @@ package org.apache.iotdb.cluster.qp.task; import org.apache.iotdb.cluster.qp.task.QPTask.TaskState; import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse; -public class QueryTask { +public class QueryDataTask { private BasicResponse basicResponse; private TaskState state; - public QueryTask(BasicResponse basicResponse, + public QueryDataTask(BasicResponse basicResponse, TaskState state) { this.basicResponse = basicResponse; this.state = state; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java index 805834e..b3d5f47 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/SingleQPTask.java @@ -41,7 +41,7 @@ public class SingleQPTask extends QPTask { * Process response. If it's necessary to redirect leader, redo the task. */ @Override - public void run(BasicResponse response) { + public void receive(BasicResponse response) { if(taskState != TaskState.EXCEPTION) { this.response = response; if(response == null){ diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java index d38ca83..0cc9805 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java @@ -24,17 +24,11 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.entity.Server; import org.apache.iotdb.cluster.exception.RaftConnectionException; import org.apache.iotdb.cluster.qp.task.QPTask.TaskState; -import org.apache.iotdb.cluster.qp.task.QueryTask; -import org.apache.iotdb.cluster.query.PathType; +import org.apache.iotdb.cluster.qp.task.QueryDataTask; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; import org.apache.iotdb.cluster.rpc.raft.NodeAsClient; import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest; -import org.apache.iotdb.cluster.rpc.raft.request.querydata.CloseSeriesReaderRequest; -import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataByTimestampRequest; -import org.apache.iotdb.cluster.rpc.raft.request.querydata.QuerySeriesDataRequest; import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse; -import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataByTimestampResponse; -import org.apache.iotdb.cluster.rpc.raft.response.querydata.QuerySeriesDataResponse; import org.apache.iotdb.cluster.utils.RaftUtils; import org.apache.iotdb.cluster.utils.hash.Router; import org.slf4j.Logger; @@ -99,9 +93,9 @@ public class ClusterRpcReaderUtils { TASK_MAX_RETRY)); } NodeAsClient nodeAsClient = RaftUtils.getRaftNodeAsClient(); - QueryTask queryTask = nodeAsClient.syncHandleRequest(request, peerId); - if (queryTask.getState() == TaskState.FINISH) { - return queryTask.getBasicResponse(); + QueryDataTask queryDataTask = nodeAsClient.syncHandleRequest(request, peerId); + if (queryDataTask.getState() == TaskState.FINISH) { + return queryDataTask.getBasicResponse(); } else { return handleQueryRequest(request, peerId, taskRetryNum + 1); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java index b4a2f25..865b6ef 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java @@ -20,7 +20,7 @@ package org.apache.iotdb.cluster.rpc.raft; import com.alipay.sofa.jraft.entity.PeerId; import org.apache.iotdb.cluster.exception.RaftConnectionException; -import org.apache.iotdb.cluster.qp.task.QueryTask; +import org.apache.iotdb.cluster.qp.task.QueryDataTask; import org.apache.iotdb.cluster.qp.task.SingleQPTask; import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest; @@ -43,7 +43,7 @@ public interface NodeAsClient { * * @param peerId leader node of the target group */ - QueryTask syncHandleRequest(BasicRequest request, PeerId peerId); + QueryDataTask syncHandleRequest(BasicRequest request, PeerId peerId); /** * Shut down client diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java index 19f1343..eafa843 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java @@ -33,7 +33,7 @@ import org.apache.iotdb.cluster.config.ClusterConfig; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.exception.RaftConnectionException; import org.apache.iotdb.cluster.qp.task.QPTask.TaskState; -import org.apache.iotdb.cluster.qp.task.QueryTask; +import org.apache.iotdb.cluster.qp.task.QueryDataTask; import org.apache.iotdb.cluster.qp.task.SingleQPTask; import org.apache.iotdb.cluster.rpc.raft.NodeAsClient; import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest; @@ -55,7 +55,7 @@ public class RaftNodeAsClientManager { private static final int TASK_TIMEOUT_MS = CLUSTER_CONFIG.getQpTaskTimeout(); /** - * Max valid number of @NodeAsClient usage, represent the number can run simultaneously + * Max valid number of @NodeAsClient usage, represent the number can receive simultaneously * at the same time */ private static final int MAX_VALID_CLIENT_NUM = CLUSTER_CONFIG.getMaxNumOfInnerRpcClient(); @@ -240,7 +240,7 @@ public class RaftNodeAsClientManager { public void onResponse(Object result) { BasicResponse response = (BasicResponse) result; releaseClient(RaftNodeAsClient.this); - qpTask.run(response); + qpTask.receive(response); } @Override @@ -248,7 +248,7 @@ public class RaftNodeAsClientManager { LOGGER.error("Bolt rpc client occurs errors when handling Request", e); qpTask.setTaskState(TaskState.EXCEPTION); releaseClient(RaftNodeAsClient.this); - qpTask.run(null); + qpTask.receive(null); } @Override @@ -260,19 +260,19 @@ public class RaftNodeAsClientManager { LOGGER.error(e.getMessage()); qpTask.setTaskState(TaskState.EXCEPTION); releaseClient(RaftNodeAsClient.this); - qpTask.run(null); + qpTask.receive(null); throw new RaftConnectionException(e); } } @Override - public QueryTask syncHandleRequest(BasicRequest request, PeerId peerId) { + public QueryDataTask syncHandleRequest(BasicRequest request, PeerId peerId) { try { BasicResponse response = (BasicResponse) boltClientService.getRpcClient() .invokeSync(peerId.getEndpoint().toString(), request, TASK_TIMEOUT_MS); - return new QueryTask(response, TaskState.FINISH); + return new QueryDataTask(response, TaskState.FINISH); } catch (RemotingException | InterruptedException e) { - return new QueryTask(null, TaskState.EXCEPTION); + return new QueryDataTask(null, TaskState.EXCEPTION); } finally { releaseClient(RaftNodeAsClient.this); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java index 650dbe4..601b902 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java @@ -298,7 +298,7 @@ public class RaftUtils { if (!status.isOk()) { response.setErrorMsg(status.getErrorMsg()); } - qpTask.run(response); + qpTask.receive(response); }); task.setDone(closure); try { @@ -399,7 +399,7 @@ public class RaftUtils { status.setCode(-1); status.setErrorMsg(status.getErrorMsg()); } - nullReadTask.run(response); + nullReadTask.receive(response); } }); nullReadTask.await(); @@ -435,7 +435,7 @@ public class RaftUtils { status.setCode(-1); status.setErrorMsg(status.getErrorMsg()); } - nullReadTask.run(response); + nullReadTask.receive(response); } }); nullReadTask.await();
