This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new cf5a19d update
cf5a19d is described below
commit cf5a19d826ee648da7f94400e11c8cd605d52669
Author: mdf369 <[email protected]>
AuthorDate: Fri Mar 29 09:20:26 2019 +0800
update
---
.../rpc/processor/QueryMetadataAsyncProcessor.java | 75 ++++++++--------------
1 file changed, 25 insertions(+), 50 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
index dd292ad..2320a2e 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryMetadataAsyncProcessor.java
@@ -20,14 +20,9 @@ package org.apache.iotdb.cluster.rpc.processor;
import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
-import com.alipay.remoting.exception.CodecException;
-import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.closure.ReadIndexClosure;
-import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.util.Bits;
-import java.nio.ByteBuffer;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.cluster.entity.Server;
import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
@@ -54,51 +49,7 @@ public class QueryMetadataAsyncProcessor extends
BasicAsyncUserProcessor<QueryMe
QueryMetadataRequest queryMetadataRequest) {
MetadataType metadataType = queryMetadataRequest.getMetadataType();
if (metadataType == MetadataType.STORAGE_GROUP) {
- MetadataRaftHolder metadataHolder = (MetadataRaftHolder)
server.getMetadataHolder();
- /** Verify if it's the leader of metadata **/
- if (metadataHolder.getFsm().isLeader()) {
- try {
- final Task task = new Task();
- Set<String> storageGroupSet =
metadataHolder.getFsm().getAllStorageGroups();
- task.setDone(status -> {
- if (!status.isOk()) {
- asyncContext.sendResponse(
- new QueryMetadataResponse(false, false, null,
status.getErrorMsg()));
- } else {
- asyncContext.sendResponse(new QueryMetadataResponse(false, true,
storageGroupSet));
- }
- });
- task.setData(ByteBuffer
- .wrap(SerializerManager.getSerializer(SerializerManager.Hessian2)
- .serialize(queryMetadataRequest)));
- RaftService service = (RaftService) metadataHolder.getService();
- service.getNode().apply(task);
- } catch (final CodecException | PathErrorException e) {
- asyncContext.sendResponse(new QueryMetadataResponse(false, false,
null, e.toString()));
- }
- } else {
- // TODO readIndex
- final byte[] reqContext = new byte[4];
- Bits.putInt(reqContext, 0, requestId.incrementAndGet());
- ((RaftService) metadataHolder.getService()).getNode()
- .readIndex(reqContext, new ReadIndexClosure() {
-
- @Override
- public void run(Status status, long index, byte[] reqCtx) {
- if (status.isOk()) {
- try {
- asyncContext.sendResponse(new QueryMetadataResponse(false,
true,
- metadataHolder.getFsm().getAllStorageGroups()));
- } catch (final PathErrorException e) {
- asyncContext
- .sendResponse(new QueryMetadataResponse(false, false,
null, e.toString()));
- }
- } else {
- asyncContext.sendResponse(new QueryMetadataResponse(false,
false, null, null));
- }
- }
- });
- }
+ readIndexForSG(asyncContext);
} else {
//TODO deal with query time series
QueryMetadataResponse response = new QueryMetadataResponse(false, false,
null, null);
@@ -106,6 +57,30 @@ public class QueryMetadataAsyncProcessor extends
BasicAsyncUserProcessor<QueryMe
}
}
+ private void readIndexForSG(AsyncContext asyncContext) {
+ final byte[] reqContext = new byte[4];
+ Bits.putInt(reqContext, 0, requestId.incrementAndGet());
+ MetadataRaftHolder metadataHolder = (MetadataRaftHolder)
server.getMetadataHolder();
+ ((RaftService) metadataHolder.getService()).getNode()
+ .readIndex(reqContext, new ReadIndexClosure() {
+
+ @Override
+ public void run(Status status, long index, byte[] reqCtx) {
+ if (status.isOk()) {
+ try {
+ asyncContext.sendResponse(new QueryMetadataResponse(false,
true,
+ metadataHolder.getFsm().getAllStorageGroups()));
+ } catch (final PathErrorException e) {
+ asyncContext
+ .sendResponse(new QueryMetadataResponse(false, false,
null, e.toString()));
+ }
+ } else {
+ asyncContext.sendResponse(new QueryMetadataResponse(false,
false, null, null));
+ }
+ }
+ });
+ }
+
@Override
public String interest() {
return QueryMetadataRequest.class.getName();