This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 2bc0f52 update
2bc0f52 is described below
commit 2bc0f529815ffa261084d8b5d905b0fe5bf36741
Author: mdf369 <[email protected]>
AuthorDate: Tue Apr 2 19:00:02 2019 +0800
update
---
.../org/apache/iotdb/cluster/entity/Server.java | 2 +
.../apache/iotdb/cluster/qp/ClusterQPExecutor.java | 34 ++++++----
.../cluster/qp/executor/QueryMetadataExecutor.java | 45 +++++++------
.../QueryMetadataInStringAsyncProcessor.java | 74 ++++++++++++++++++++++
4 files changed, 123 insertions(+), 32 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index d489c11..5131070 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
import org.apache.iotdb.cluster.rpc.processor.DataGroupNonQueryAsyncProcessor;
import org.apache.iotdb.cluster.rpc.processor.MetaGroupNonQueryAsyncProcessor;
+import
org.apache.iotdb.cluster.rpc.processor.QueryMetadataInStringAsyncProcessor;
import org.apache.iotdb.cluster.rpc.processor.QueryTimeSeriesAsyncProcessor;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
@@ -66,6 +67,7 @@ public class Server {
rpcServer.registerUserProcessor(new DataGroupNonQueryAsyncProcessor(this));
rpcServer.registerUserProcessor(new MetaGroupNonQueryAsyncProcessor(this));
rpcServer.registerUserProcessor(new QueryTimeSeriesAsyncProcessor(this));
+ rpcServer.registerUserProcessor(new
QueryMetadataInStringAsyncProcessor(this));
metadataHolder = new MetadataRaftHolder(peerIds, serverId, rpcServer,
true);
metadataHolder.init();
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index cf0ded9..36b3207 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -154,32 +154,42 @@ public abstract class ClusterQPExecutor {
/**
* Async handle QPTask by QPTask and leader id
*
- * @param QPTask request QPTask
+ * @param task request QPTask
* @param leader leader of the target raft group
* @param taskRetryNum Number of QPTask retries due to timeout and
redirected.
* @return basic response
*/
- public BasicResponse asyncHandleTaskGetRes(QPTask QPTask, PeerId leader, int
taskRetryNum)
+ public BasicResponse asyncHandleTaskGetRes(QPTask task, PeerId leader, int
taskRetryNum)
throws InterruptedException, RaftConnectionException {
+ asyncSendTask(task, leader, taskRetryNum);
+ return asyncGetRes(task, leader, taskRetryNum);
+ }
+
+ public void asyncSendTask(QPTask task, PeerId leader, int taskRetryNum)
+ throws RaftConnectionException {
if (taskRetryNum >= TASK_MAX_RETRY) {
throw new RaftConnectionException(String.format("QPTask retries reach
the upper bound %s",
TASK_MAX_RETRY));
}
NodeAsClient client = new RaftNodeAsClient();
/** Call async method **/
- client.asyncHandleRequest(cliClientService, QPTask.getRequest(), leader,
QPTask);
- QPTask.await();
- if (QPTask.getTaskState() != TaskState.FINISH) {
- if (QPTask.getTaskState() == TaskState.REDIRECT) {
+ client.asyncHandleRequest(cliClientService, task.getRequest(), leader,
task);
+ }
+
+ public BasicResponse asyncGetRes(QPTask task, PeerId leader, int
taskRetryNum)
+ throws InterruptedException, RaftConnectionException {
+ task.await();
+ if (task.getTaskState() != TaskState.FINISH) {
+ if (task.getTaskState() == TaskState.REDIRECT) {
/** redirect to the right leader **/
- leader = PeerId.parsePeer(QPTask.getResponse().getLeaderStr());
- LOGGER.info("Redirect leader: {}, group id = {}", leader,
QPTask.getRequest().getGroupID());
- RaftUtils.updateRaftGroupLeader(QPTask.getRequest().getGroupID(),
leader);
+ leader = PeerId.parsePeer(task.getResponse().getLeaderStr());
+ LOGGER.info("Redirect leader: {}, group id = {}", leader,
task.getRequest().getGroupID());
+ RaftUtils.updateRaftGroupLeader(task.getRequest().getGroupID(),
leader);
}
- QPTask.resetTask();
- return asyncHandleTaskGetRes(QPTask, leader, taskRetryNum + 1);
+ task.resetTask();
+ return asyncHandleTaskGetRes(task, leader, taskRetryNum + 1);
}
- return QPTask.getResponse();
+ return task.getResponse();
}
public void shutdown() {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 8a3fcd4..be20208 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -101,39 +101,41 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
Set<String> groupIdSet = router.getAllGroupId();
List<String> metadataList = new ArrayList<>(groupIdSet.size());
+ List<SingleQPTask> taskList = new ArrayList<>();
for (String groupId : groupIdSet) {
QueryMetadataInStringRequest request = new
QueryMetadataInStringRequest(groupId);
SingleQPTask task = new SingleQPTask(false, request);
+ taskList.add(task);
LOGGER.info("Execute show metadata in string statement for group {}.",
groupId);
/** Check if the plan can be executed locally. **/
if (router.insideGroup(groupId, localNode)) {
LOGGER.info("Execute show metadata in string statement locally for
group {}.", groupId);
- metadataList.add(queryMetadataInStringLocally(groupId, task));
+ asyncQueryMetadataInStringLocally(groupId, task);
} else {
try {
PeerId holder = RaftUtils.getRandomPeerID(groupId);
- metadataList.add(queryMetadataInString(task, holder));
+ asyncSendTask(task, holder, 0);
} catch (RaftConnectionException e) {
LOGGER.error(e.getMessage());
throw new ProcessorException("Raft connection occurs error.", e);
}
}
}
+ for (int i = 0; i < taskList.size(); i++) {
+ SingleQPTask task = taskList.get(i);
+ task.await();
+ QueryMetadataInStringResponse response = (QueryMetadataInStringResponse)
task.getResponse();
+ if (!response.isSuccess()) {
+ LOGGER.error("Execute show timeseries statement false.");
+ throw new ProcessorException();
+ }
+ return ((QueryMetadataInStringResponse)
task.getResponse()).getMetadata();
+ }
return combineMetadataInStringList(metadataList);
}
/**
- * Combine multiple metadata in String format into single String
- *
- * @param metadataList
- * @return single String of all metadata
- */
- private String combineMetadataInStringList(List<String> metadataList) {
- return null; //TODO
- }
-
- /**
* Handle "show timeseries <path>" statement
*
* @param path column path
@@ -217,7 +219,7 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
/**
* Handle "show timeseries" statement
*/
- private String queryMetadataInStringLocally(String groupId, SingleQPTask
task)
+ private void asyncQueryMetadataInStringLocally(String groupId, SingleQPTask
task)
throws InterruptedException, ProcessorException {
final byte[] reqContext = new byte[4];
Bits.putInt(reqContext, 0, requestId.incrementAndGet());
@@ -239,13 +241,6 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
task.run(response);
}
});
- task.await();
- QueryMetadataInStringResponse response = (QueryMetadataInStringResponse)
task.getResponse();
- if (!response.isSuccess()) {
- LOGGER.error("Execute show timeseries statement false.");
- throw new ProcessorException();
- }
- return ((QueryMetadataInStringResponse) task.getResponse()).getMetadata();
}
private String queryMetadataInString(SingleQPTask task, PeerId leader)
@@ -253,4 +248,14 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
BasicResponse response = asyncHandleTaskGetRes(task, leader, 0);
return ((QueryMetadataInStringResponse) response).getMetadata();
}
+
+ /**
+ * Combine multiple metadata in String format into single String
+ *
+ * @param metadataList
+ * @return single String of all metadata
+ */
+ private String combineMetadataInStringList(List<String> metadataList) {
+ return null; //TODO
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataInStringAsyncProcessor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataInStringAsyncProcessor.java
new file mode 100644
index 0000000..bd11468
--- /dev/null
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataInStringAsyncProcessor.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.rpc.processor;
+
+import com.alipay.remoting.AsyncContext;
+import com.alipay.remoting.BizContext;
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.closure.ReadIndexClosure;
+import com.alipay.sofa.jraft.util.Bits;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.cluster.entity.Server;
+import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
+import org.apache.iotdb.cluster.entity.raft.RaftService;
+import org.apache.iotdb.cluster.rpc.request.QueryMetadataInStringRequest;
+import org.apache.iotdb.cluster.rpc.response.QueryMetadataInStringResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class QueryMetadataInStringAsyncProcessor extends
BasicAsyncUserProcessor<QueryMetadataInStringRequest> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QueryMetadataInStringAsyncProcessor.class);
+ private final AtomicInteger requestId = new AtomicInteger(0);
+ private Server server;
+
+ public QueryMetadataInStringAsyncProcessor(Server server) {
+ this.server = server;
+ }
+
+ @Override
+ public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
+ QueryMetadataInStringRequest request) {
+ String groupId = request.getGroupID();
+ final byte[] reqContext = new byte[4];
+ Bits.putInt(reqContext, 0, requestId.incrementAndGet());
+ DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder)
server
+ .getDataPartitionHolder(groupId);
+ ((RaftService) dataPartitionHolder.getService()).getNode()
+ .readIndex(reqContext, new ReadIndexClosure() {
+
+ @Override
+ public void run(Status status, long index, byte[] reqCtx) {
+ QueryMetadataInStringResponse response;
+ if (status.isOk()) {
+ response = new QueryMetadataInStringResponse(false, true,
+ dataPartitionHolder.getFsm().getMetadataInString());
+ } else {
+ response = new QueryMetadataInStringResponse(false, false, null,
null);
+ }
+ asyncContext.sendResponse(response);
+ }
+ });
+ }
+
+ @Override
+ public String interest() {
+ return QueryMetadataInStringRequest.class.getName();
+ }
+}