This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 8cb0bb5c7d5e1a7733e5e877d6d882a5fdd11b8b Author: lta <[email protected]> AuthorDate: Sat May 25 12:43:46 2019 +0800 modify for raft node as client --- .../rpc/raft/impl/RaftNodeAsClientManager.java | 57 +++++++++++++--------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java index f7de451..d19ea88 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java @@ -216,31 +216,42 @@ public class RaftNodeAsClientManager { @Override public void asyncHandleRequest(SingleQPTask qpTask) { +// LOGGER.debug("Node as client to send request to leader: {}", qpTask.getTargetNode()); +// try { +// boltClientService.getRpcClient() +// .invokeWithCallback(qpTask.getTargetNode().getEndpoint().toString(), +// qpTask.getRequest(), +// new InvokeCallback() { +// +// @Override +// public void onResponse(Object result) { +// BasicResponse response = (BasicResponse) result; +// qpTask.receive(response); +// } +// +// @Override +// public void onException(Throwable e) { +// LOGGER.error("Bolt rpc client occurs errors when handling Request", e); +// qpTask.setTaskState(TaskState.EXCEPTION); +// qpTask.receive(null); +// } +// +// @Override +// public Executor getExecutor() { +// return null; +// } +// }, TASK_TIMEOUT_MS); +// } catch (RemotingException | InterruptedException e) { +// LOGGER.error(e.getMessage()); +// qpTask.setTaskState(TaskState.RAFT_CONNECTION_EXCEPTION); +// qpTask.receive(null); +// } LOGGER.debug("Node as client to send request to leader: {}", qpTask.getTargetNode()); try { - boltClientService.getRpcClient() - .invokeWithCallback(qpTask.getTargetNode().getEndpoint().toString(), - qpTask.getRequest(), - new InvokeCallback() { - - @Override - public void onResponse(Object result) { - BasicResponse response = (BasicResponse) result; - qpTask.receive(response); - } - - @Override - public void onException(Throwable e) { - LOGGER.error("Bolt rpc client occurs errors when handling Request", e); - qpTask.setTaskState(TaskState.EXCEPTION); - qpTask.receive(null); - } - - @Override - public Executor getExecutor() { - return null; - } - }, TASK_TIMEOUT_MS); + BasicResponse response = (BasicResponse) boltClientService.getRpcClient() + .invokeSync(qpTask.getTargetNode().getEndpoint().toString(), + qpTask.getRequest(), TASK_TIMEOUT_MS); + qpTask.receive(response); } catch (RemotingException | InterruptedException e) { LOGGER.error(e.getMessage()); qpTask.setTaskState(TaskState.RAFT_CONNECTION_EXCEPTION);
