This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 15c53c3 update
15c53c3 is described below
commit 15c53c38c9dea4cff0ca1beb60cded795d4f3ef3
Author: mdf369 <[email protected]>
AuthorDate: Fri Mar 29 14:46:19 2019 +0800
update
---
.../cluster/qp/executor/QueryMetadataExecutor.java | 76 ++++++++++++----------
.../rpc/processor/QueryMetadataAsyncProcessor.java | 26 +-------
.../cluster/rpc/service/TSServiceClusterImpl.java | 4 +-
3 files changed, 43 insertions(+), 63 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 3ca9348..b55bfab 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -18,22 +18,25 @@
*/
package org.apache.iotdb.cluster.qp.executor;
-import com.alipay.remoting.InvokeCallback;
-import com.alipay.remoting.exception.RemotingException;
+import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
+import com.alipay.sofa.jraft.util.Bits;
import java.util.Set;
-import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.cluster.callback.SingleTask;
-import org.apache.iotdb.cluster.callback.Task;
import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.entity.Server;
+import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
+import org.apache.iotdb.cluster.entity.raft.RaftService;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.qp.ClusterQPExecutor;
import org.apache.iotdb.cluster.rpc.MetadataType;
import org.apache.iotdb.cluster.rpc.request.QueryMetadataRequest;
-import org.apache.iotdb.cluster.rpc.response.BasicResponse;
import org.apache.iotdb.cluster.rpc.response.QueryMetadataResponse;
+import org.apache.iotdb.db.exception.PathErrorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,55 +47,56 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
private static final Logger LOGGER =
LoggerFactory.getLogger(QueryMetadataExecutor.class);
+ private final AtomicInteger requestId = new AtomicInteger(0);
+ private final Server server = Server.getInstance();
+
public QueryMetadataExecutor() {
}
- public void init(){
+ public void init() {
this.cliClientService = new BoltCliClientService();
this.cliClientService.init(new CliOptions());
SUB_TASK_NUM = 1;
}
public Set<String> processMetadataQuery(MetadataType type)
- throws RaftConnectionException, InterruptedException {
+ throws InterruptedException {
QueryMetadataRequest request = new QueryMetadataRequest(
ClusterConfig.METADATA_GROUP_ID, type);
SingleTask task = new SingleTask(false, request);
return asyncHandleTaskLocally(task);
}
- private Set<String> asyncHandleTaskLocally(SingleTask task)
- throws RaftConnectionException, InterruptedException {
- try {
- cliClientService.getRpcClient()
- .invokeWithCallback(localNode.toString(), task.getRequest(),
- new InvokeCallback() {
-
- @Override
- public void onResponse(Object result) {
- BasicResponse response = (BasicResponse) result;
- task.run(response);
- }
-
- @Override
- public void onException(Throwable e) {
- LOGGER.error("Bolt rpc client occurs errors when handling
Request", e);
- task.setTaskState(TaskState.EXCEPTION);
- task.run(null);
-
- }
-
- @Override
- public Executor getExecutor() {
- return null;
- }
- }, CLUSTER_CONFIG.getTaskTimeoutMs());
- } catch (RemotingException | InterruptedException e) {
- throw new RaftConnectionException(e);
- }
+ private Set<String> asyncHandleTaskLocally(SingleTask task) throws
InterruptedException {
+ readIndexForSG(task);
task.await();
return ((QueryMetadataResponse) task.getResponse()).getMetadataSet();
}
+
+ private void readIndexForSG(SingleTask task) {
+ final byte[] reqContext = new byte[4];
+ Bits.putInt(reqContext, 0, requestId.incrementAndGet());
+ MetadataRaftHolder metadataHolder = (MetadataRaftHolder)
server.getMetadataHolder();
+ ((RaftService) metadataHolder.getService()).getNode()
+ .readIndex(reqContext, new ReadIndexClosure() {
+
+ @Override
+ public void run(Status status, long index, byte[] reqCtx) {
+ QueryMetadataResponse response = null;
+ if (status.isOk()) {
+ try {
+ response = new QueryMetadataResponse(false, true,
+ metadataHolder.getFsm().getAllStorageGroups());
+ } catch (final PathErrorException e) {
+ response = new QueryMetadataResponse(false, false, null,
e.toString());
+ }
+ } else {
+ response = new QueryMetadataResponse(false, false, null, null);
+ }
+ task.run(response);
+ }
+ });
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
index 69d89a2..bf8b159 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
@@ -51,7 +51,7 @@ public class QueryMetadataAsyncProcessor extends
BasicAsyncUserProcessor<QueryMe
QueryMetadataRequest queryMetadataRequest) {
MetadataType metadataType = queryMetadataRequest.getMetadataType();
if (metadataType == MetadataType.STORAGE_GROUP) {
- readIndexForSG(asyncContext);
+ // TODO
} else {
//TODO deal with query time series
QueryMetadataResponse response = new QueryMetadataResponse(false, false,
null, null);
@@ -59,30 +59,6 @@ public class QueryMetadataAsyncProcessor extends
BasicAsyncUserProcessor<QueryMe
}
}
- private void readIndexForSG(AsyncContext asyncContext) {
- final byte[] reqContext = new byte[4];
- Bits.putInt(reqContext, 0, requestId.incrementAndGet());
- MetadataRaftHolder metadataHolder = (MetadataRaftHolder)
server.getMetadataHolder();
- ((RaftService) metadataHolder.getService()).getNode()
- .readIndex(reqContext, new ReadIndexClosure() {
-
- @Override
- public void run(Status status, long index, byte[] reqCtx) {
- if (status.isOk()) {
- try {
- asyncContext.sendResponse(new QueryMetadataResponse(false,
true,
- metadataHolder.getFsm().getAllStorageGroups()));
- } catch (final PathErrorException e) {
- asyncContext
- .sendResponse(new QueryMetadataResponse(false, false,
null, e.toString()));
- }
- } else {
- asyncContext.sendResponse(new QueryMetadataResponse(false,
false, null, null));
- }
- }
- });
- }
-
@Override
public String interest() {
return QueryMetadataRequest.class.getName();
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
index 50b685a..e7f25e2 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
@@ -119,7 +119,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
try {
Set<String> storageGroups =
processMetadataQuery(MetadataType.STORAGE_GROUP);
resp.setShowStorageGroups(storageGroups);
- } catch (RaftConnectionException | InterruptedException e) {
+ } catch (InterruptedException e) {
status = getErrorStatus(
String.format("Failed to fetch storage groups' metadata because:
%s", e));
resp.setStatus(status);
@@ -145,7 +145,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
}
public Set<String> processMetadataQuery(MetadataType type)
- throws RaftConnectionException, InterruptedException {
+ throws InterruptedException {
return queryMetadataExecutor.get().processMetadataQuery(type);
}
}