This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 28c6635 add null-read metda group before applying create ts
28c6635 is described below
commit 28c6635b6aa92c470d73ebc9ebd10c5b039417af
Author: lta <[email protected]>
AuthorDate: Mon Apr 1 19:37:45 2019 +0800
add null-read metda group before applying create ts
---
.../org/apache/iotdb/cluster/entity/Server.java | 8 +-
.../cluster/entity/raft/DataStateMachine.java | 86 ++++++++++++++++++----
.../cluster/entity/raft/MetadataStateManchine.java | 6 +-
.../cluster/qp/executor/NonQueryExecutor.java | 24 +++---
.../cluster/qp/executor/QueryMetadataExecutor.java | 10 +--
.../org/apache/iotdb/cluster/rpc/MetadataType.java | 23 ------
...r.java => DataGroupNonQueryAsyncProcessor.java} | 28 ++++---
...r.java => MetaGroupNonQueryAsyncProcessor.java} | 25 +++----
...yRequest.java => DataGroupNonQueryRequest.java} | 10 ++-
...yRequest.java => MetaGroupNonQueryRequest.java} | 4 +-
...esponse.java => DataGroupNonQueryResponse.java} | 6 +-
...esponse.java => MetaGroupNonQueryResponse.java} | 6 +-
12 files changed, 138 insertions(+), 98 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index 0c7d568..d489c11 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -29,8 +29,8 @@ import
org.apache.iotdb.cluster.entity.data.DataPartitionHolder;
import org.apache.iotdb.cluster.entity.metadata.MetadataHolder;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
-import org.apache.iotdb.cluster.rpc.processor.DataNonQueryAsyncProcessor;
-import org.apache.iotdb.cluster.rpc.processor.MetadataNonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.processor.DataGroupNonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.processor.MetaGroupNonQueryAsyncProcessor;
import org.apache.iotdb.cluster.rpc.processor.QueryTimeSeriesAsyncProcessor;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
@@ -63,8 +63,8 @@ public class Server {
RpcServer rpcServer = new RpcServer(serverId.getPort());
RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
- rpcServer.registerUserProcessor(new DataNonQueryAsyncProcessor(this));
- rpcServer.registerUserProcessor(new MetadataNonQueryAsyncProcessor(this));
+ rpcServer.registerUserProcessor(new DataGroupNonQueryAsyncProcessor(this));
+ rpcServer.registerUserProcessor(new MetaGroupNonQueryAsyncProcessor(this));
rpcServer.registerUserProcessor(new QueryTimeSeriesAsyncProcessor(this));
metadataHolder = new MetadataRaftHolder(peerIds, serverId, rpcServer,
true);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
index ff26108..ee7bded 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
@@ -23,20 +23,27 @@ import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.Status;
+import com.alipay.sofa.jraft.closure.ReadIndexClosure;
import com.alipay.sofa.jraft.core.StateMachineAdapter;
import com.alipay.sofa.jraft.entity.LeaderChangeContext;
import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.util.Bits;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.rpc.request.DataNonQueryRequest;
-import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
+import org.apache.iotdb.cluster.callback.QPTask;
+import org.apache.iotdb.cluster.callback.SingleQPTask;
+import org.apache.iotdb.cluster.entity.Server;
+import org.apache.iotdb.cluster.rpc.request.DataGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.response.BasicResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
import org.slf4j.Logger;
@@ -46,11 +53,13 @@ public class DataStateMachine extends StateMachineAdapter {
private static final Logger LOGGER =
LoggerFactory.getLogger(DataStateMachine.class);
+ private Server server = Server.getInstance();
private OverflowQPExecutor qpExecutor = new OverflowQPExecutor();
private PeerId peerId;
private String groupId;
private AtomicLong leaderTerm = new AtomicLong(-1);
private MManager mManager = MManager.getInstance();
+ private final AtomicInteger requestId = new AtomicInteger(0);
public DataStateMachine(String groupId, PeerId peerId) {
this.peerId = peerId;
@@ -62,32 +71,81 @@ public class DataStateMachine extends StateMachineAdapter {
while (iterator.hasNext()) {
Closure closure = null;
- DataNonQueryRequest request = null;
- if (iterator.done() != null) {
- closure = iterator.done();
- }
+ DataGroupNonQueryRequest request = null;
final ByteBuffer data = iterator.getData();
try {
request = SerializerManager.getSerializer(SerializerManager.Hessian2)
- .deserialize(data.array(), DataNonQueryRequest.class.getName());
+ .deserialize(data.array(),
DataGroupNonQueryRequest.class.getName());
} catch (final CodecException e) {
LOGGER.error("Fail to decode IncrementAndGetRequest", e);
}
+ if (iterator.done() != null) {
+ /** It's leader to apply task **/
+ closure = iterator.done();
+ }
Status status = Status.OK();
+ /** If the request is to set path, it needs to run null-read in meta
group to avoid out of data sync **/
+ if (request.getOperatorType() == OperatorType.SET_STORAGE_GROUP) {
+ SingleQPTask nullReadTask = new SingleQPTask(false, null);
+ try {
+ LOGGER.info("handle add path request.");
+ handleNullReadToMetaGroup(nullReadTask, status);
+ nullReadTask.await();
+ } catch (InterruptedException e) {
+ status.setCode(1);
+ status.setErrorMsg(e.toString());
+ }
+ }
+ LOGGER.info("handle physical plan");
+ handlePhysicalPlan(status, request, closure);
+ iterator.next();
+ }
+ }
+
+ /**
+ * Handle Physical plan in state machine and return message if this node is
leader of the group
+ */
+ private void handlePhysicalPlan(Status status, DataGroupNonQueryRequest
request,
+ Closure closure) {
+ if (status.isOk()) {
try {
assert request != null;
- PhysicalPlan plan = PhysicalPlanLogTransfer
- .logToOperator(request.getPhysicalPlanBytes());
- qpExecutor.processNonQuery(plan);
+ PhysicalPlan plan = PhysicalPlanLogTransfer
+ .logToOperator(request.getPhysicalPlanBytes());
+ qpExecutor.processNonQuery(plan);
} catch (ProcessorException | IOException e) {
LOGGER.error("Execute physical plan error", e);
status = new Status(1, e.toString());
}
- if (closure != null) {
- closure.run(status);
- }
- iterator.next();
}
+ if (closure != null) {
+ closure.run(status);
+ }
+ }
+
+ /**
+ * Handle null-read process in metadata group if the request is to set path.
+ *
+ * @param qpTask null-read task
+ * @param originStatus status to return result if this node is leader of the
data group
+ */
+ private void handleNullReadToMetaGroup(QPTask qpTask, Status originStatus) {
+ final byte[] reqContext = new byte[4];
+ Bits.putInt(reqContext, 0, requestId.incrementAndGet());
+ MetadataRaftHolder metadataRaftHolder = (MetadataRaftHolder)
server.getMetadataHolder();
+ ((RaftService) metadataRaftHolder.getService()).getNode()
+ .readIndex(reqContext, new ReadIndexClosure() {
+ @Override
+ public void run(Status status, long index, byte[] reqCtx) {
+ BasicResponse response = new BasicResponse(false, true, null,
null) {
+ };
+ if (!status.isOk()) {
+ originStatus.setCode(1);
+ originStatus.setErrorMsg(status.getErrorMsg());
+ }
+ qpTask.run(response);
+ }
+ });
}
@Override
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
index 004aefa..dab5049 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
@@ -30,7 +30,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.request.MetaGroupNonQueryRequest;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
@@ -80,14 +80,14 @@ public class MetadataStateManchine extends
StateMachineAdapter {
while (iterator.hasNext()) {
Closure closure = null;
- MetadataNonQueryRequest request = null;
+ MetaGroupNonQueryRequest request = null;
if (iterator.done() != null) {
closure = iterator.done();
}
final ByteBuffer data = iterator.getData();
try {
request = SerializerManager.getSerializer(SerializerManager.Hessian2)
- .deserialize(data.array(),
MetadataNonQueryRequest.class.getName());
+ .deserialize(data.array(),
MetaGroupNonQueryRequest.class.getName());
} catch (final CodecException e) {
LOGGER.error("Fail to decode IncrementAndGetRequest", e);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index c333f38..2c354cc 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.cluster.qp.executor;
import com.alipay.remoting.exception.CodecException;
import com.alipay.remoting.serialization.SerializerManager;
+import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.option.CliOptions;
@@ -33,10 +34,10 @@ import org.apache.iotdb.cluster.entity.raft.RaftService;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.qp.ClusterQPExecutor;
import org.apache.iotdb.cluster.rpc.request.BasicRequest;
-import org.apache.iotdb.cluster.rpc.request.DataNonQueryRequest;
-import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.request.DataGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.request.MetaGroupNonQueryRequest;
import org.apache.iotdb.cluster.rpc.response.BasicResponse;
-import org.apache.iotdb.cluster.rpc.response.DataNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.response.DataGroupNonQueryResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
@@ -192,7 +193,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
throws IOException, RaftConnectionException, InterruptedException {
String groupId = getGroupIdBySG(storageGroup);
PeerId leader = RaftUtils.getTargetPeerID(groupId);
- DataNonQueryRequest request = new DataNonQueryRequest(groupId, plan);
+ DataGroupNonQueryRequest request = new DataGroupNonQueryRequest(groupId,
plan);
SingleQPTask qpTask = new SingleQPTask(false, request);
/** Check if the plan can be executed locally. **/
if (canHandleNonQuery(storageGroup)) {
@@ -207,13 +208,17 @@ public class NonQueryExecutor extends ClusterQPExecutor {
*/
private boolean handleDataGroupRequestLocally(String groupId, QPTask qpTask,
BasicRequest request)
throws InterruptedException {
- final byte[] reqContext = new byte[4];
- Task task = null;
+ Task task = new Task();
+ task.setDone((Status status) -> {
+ BasicResponse response = new DataGroupNonQueryResponse(false,
status.isOk(), null,
+ status.getErrorMsg());
+ qpTask.run(response);
+ });
/** Apply qpTask to Raft Node **/
try {
task.setData(ByteBuffer
.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2)
- .serialize(reqContext)));
+ .serialize(request)));
} catch (final CodecException e) {
return false;
}
@@ -222,7 +227,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
RaftService service = (RaftService) dataRaftHolder.getService();
service.getNode().apply(task);
qpTask.await();
- DataNonQueryResponse response = (DataNonQueryResponse)
qpTask.getResponse();
+ DataGroupNonQueryResponse response = (DataGroupNonQueryResponse)
qpTask.getResponse();
return response.isSuccess();
}
@@ -247,7 +252,8 @@ public class NonQueryExecutor extends ClusterQPExecutor {
*/
public boolean redirectMetadataGroupLeader(PhysicalPlan plan)
throws IOException, RaftConnectionException, InterruptedException {
- MetadataNonQueryRequest request = new
MetadataNonQueryRequest(CLUSTER_CONFIG.METADATA_GROUP_ID,
+ MetaGroupNonQueryRequest request = new MetaGroupNonQueryRequest(
+ CLUSTER_CONFIG.METADATA_GROUP_ID,
plan);
PeerId leader =
RaftUtils.getTargetPeerID(CLUSTER_CONFIG.METADATA_GROUP_ID);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 7967536..bc15218 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -45,7 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Handle show all storage group logic
+ * Handle < show timeseries <path> > logic
*/
public class QueryMetadataExecutor extends ClusterQPExecutor {
@@ -70,7 +70,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
String storageGroup = getStroageGroupByDevice(path);
String groupId = getGroupIdBySG(storageGroup);
QueryTimeSeriesRequest request = new QueryTimeSeriesRequest(groupId, path);
- PeerId leader = RaftUtils.getRandomPeerID(groupId);
+ PeerId holder = RaftUtils.getRandomPeerID(groupId);
SingleQPTask task = new SingleQPTask(false, request);
LOGGER.info("Execute show timeseries {} statement.", path);
@@ -80,7 +80,7 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
return queryTimeSeriesLocally(path, groupId, task);
} else {
try {
- return queryTimeSeries(task, leader);
+ return queryTimeSeries(task, holder);
} catch (RaftConnectionException e) {
LOGGER.error(e.getMessage());
throw new ProcessorException("Raft connection occurs error.", e);
@@ -114,12 +114,8 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
response = new QueryTimeSeriesResponse(false, false, null,
e.toString());
}
} else {
-
- System.out.println("false");
response = new QueryTimeSeriesResponse(false, false, null, null);
}
- System.out.println(status.isOk());
- System.out.println();
task.run(response);
}
});
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/MetadataType.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/MetadataType.java
deleted file mode 100644
index 9205241..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/MetadataType.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cluster.rpc;
-
-public enum MetadataType {
- STORAGE_GROUP, TIME_SERIES
-}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataNonQueryAsyncProcessor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
similarity index 77%
rename from
cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataNonQueryAsyncProcessor.java
rename to
cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
index f0134f1..5e756fa 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataNonQueryAsyncProcessor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
@@ -31,34 +31,32 @@ import java.nio.ByteBuffer;
import org.apache.iotdb.cluster.entity.Server;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.request.DataNonQueryRequest;
-import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
-import org.apache.iotdb.cluster.rpc.response.DataNonQueryResponse;
-import org.apache.iotdb.cluster.rpc.response.MetadataNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.request.DataGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.response.DataGroupNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.response.MetaGroupNonQueryResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
-import org.apache.iotdb.db.qp.logical.Operator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Async handle those requests which need to be applied in data group.
*/
-public class DataNonQueryAsyncProcessor extends
BasicAsyncUserProcessor<DataNonQueryRequest> {
+public class DataGroupNonQueryAsyncProcessor extends
BasicAsyncUserProcessor<DataGroupNonQueryRequest> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(DataNonQueryAsyncProcessor.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DataGroupNonQueryAsyncProcessor.class);
private Server server;
- public DataNonQueryAsyncProcessor(Server server) {
+ public DataGroupNonQueryAsyncProcessor(Server server) {
this.server = server;
}
@Override
public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
- DataNonQueryRequest dataNonQueryRequest) {
+ DataGroupNonQueryRequest dataGroupNonQueryRequest) {
LOGGER.info("Handle data non query request.");
/** Check if it's the leader **/
- String groupId = dataNonQueryRequest.getGroupID();
+ String groupId = dataGroupNonQueryRequest.getGroupID();
DataPartitionRaftHolder dataPartitionRaftHolder =
(DataPartitionRaftHolder) server
.getDataPartitionHolderMap().get(groupId);
if (!dataPartitionRaftHolder.getFsm().isLeader()) {
@@ -67,7 +65,7 @@ public class DataNonQueryAsyncProcessor extends
BasicAsyncUserProcessor<DataNonQ
BoltCliClientService cliClientService = new BoltCliClientService();
cliClientService.init(new CliOptions());
LOGGER.info("Right leader is: {}, group id = {} ", leader, groupId);
- MetadataNonQueryResponse response = new MetadataNonQueryResponse(true,
false,
+ MetaGroupNonQueryResponse response = new MetaGroupNonQueryResponse(true,
false,
leader.toString(), null);
asyncContext.sendResponse(response);
} else {
@@ -77,14 +75,14 @@ public class DataNonQueryAsyncProcessor extends
BasicAsyncUserProcessor<DataNonQ
final Task task = new Task();
task.setDone((Status status) -> {
asyncContext.sendResponse(
- new DataNonQueryResponse(false, status.isOk(), null,
status.getErrorMsg()));
+ new DataGroupNonQueryResponse(false, status.isOk(), null,
status.getErrorMsg()));
});
try {
task.setData(ByteBuffer
.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2)
- .serialize(dataNonQueryRequest)));
+ .serialize(dataGroupNonQueryRequest)));
} catch (final CodecException e) {
- asyncContext.sendResponse(new MetadataNonQueryResponse(false, false,
null, e.toString()));
+ asyncContext.sendResponse(new MetaGroupNonQueryResponse(false, false,
null, e.toString()));
}
DataPartitionRaftHolder dataRaftHolder = (DataPartitionRaftHolder) server
.getDataPartitionHolderMap().get(groupId);
@@ -95,6 +93,6 @@ public class DataNonQueryAsyncProcessor extends
BasicAsyncUserProcessor<DataNonQ
@Override
public String interest() {
- return DataNonQueryRequest.class.getName();
+ return DataGroupNonQueryRequest.class.getName();
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetadataNonQueryAsyncProcessor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
similarity index 77%
rename from
cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetadataNonQueryAsyncProcessor.java
rename to
cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
index 31e83ce..1a31f0d 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetadataNonQueryAsyncProcessor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
@@ -31,32 +31,31 @@ import java.nio.ByteBuffer;
import org.apache.iotdb.cluster.entity.Server;
import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
-import org.apache.iotdb.cluster.rpc.response.MetadataNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.request.MetaGroupNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.response.MetaGroupNonQueryResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
-import org.apache.iotdb.db.qp.logical.Operator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Async handle those requests which need to be applied in metadata group.
*/
-public class MetadataNonQueryAsyncProcessor extends
BasicAsyncUserProcessor<MetadataNonQueryRequest> {
+public class MetaGroupNonQueryAsyncProcessor extends
BasicAsyncUserProcessor<MetaGroupNonQueryRequest> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(MetadataNonQueryAsyncProcessor.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MetaGroupNonQueryAsyncProcessor.class);
private Server server;
- public MetadataNonQueryAsyncProcessor(Server server) {
+ public MetaGroupNonQueryAsyncProcessor(Server server) {
this.server = server;
}
@Override
public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
- MetadataNonQueryRequest metadataNonQueryRequest) {
+ MetaGroupNonQueryRequest metaGroupNonQueryRequest) {
LOGGER.info("Handle metadata non query query request.");
/** Check if it's the leader **/
- String groupId = metadataNonQueryRequest.getGroupID();
+ String groupId = metaGroupNonQueryRequest.getGroupID();
MetadataRaftHolder metadataHolder = (MetadataRaftHolder)
server.getMetadataHolder();
if (!metadataHolder.getFsm().isLeader()) {
PeerId leader = RaftUtils.getTargetPeerID(groupId);
@@ -64,7 +63,7 @@ public class MetadataNonQueryAsyncProcessor extends
BasicAsyncUserProcessor<Meta
BoltCliClientService cliClientService = new BoltCliClientService();
cliClientService.init(new CliOptions());
LOGGER.info("Right leader is: {}, group id = {} ", leader, groupId);
- MetadataNonQueryResponse response = new MetadataNonQueryResponse(true,
false,
+ MetaGroupNonQueryResponse response = new MetaGroupNonQueryResponse(true,
false,
leader.toString(), null);
asyncContext.sendResponse(response);
} else {
@@ -74,14 +73,14 @@ public class MetadataNonQueryAsyncProcessor extends
BasicAsyncUserProcessor<Meta
final Task task = new Task();
task.setDone((Status status) -> {
asyncContext.sendResponse(
- new MetadataNonQueryResponse(false, status.isOk(), null,
status.getErrorMsg()));
+ new MetaGroupNonQueryResponse(false, status.isOk(), null,
status.getErrorMsg()));
});
try {
task.setData(ByteBuffer
.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2)
- .serialize(metadataNonQueryRequest)));
+ .serialize(metaGroupNonQueryRequest)));
} catch (final CodecException e) {
- asyncContext.sendResponse(new MetadataNonQueryResponse(false, false,
null, e.toString()));
+ asyncContext.sendResponse(new MetaGroupNonQueryResponse(false, false,
null, e.toString()));
}
RaftService service = (RaftService) metadataHolder.getService();
@@ -91,6 +90,6 @@ public class MetadataNonQueryAsyncProcessor extends
BasicAsyncUserProcessor<Meta
@Override
public String interest() {
- return MetadataNonQueryRequest.class.getName();
+ return MetaGroupNonQueryRequest.class.getName();
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataNonQueryRequest.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataGroupNonQueryRequest.java
similarity index 78%
rename from
cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataNonQueryRequest.java
rename to
cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataGroupNonQueryRequest.java
index c5fb573..e9ec9d9 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataNonQueryRequest.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataGroupNonQueryRequest.java
@@ -21,27 +21,33 @@ package org.apache.iotdb.cluster.rpc.request;
import java.io.IOException;
import java.io.Serializable;
import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
/**
* Handle request to data group
*/
-public class DataNonQueryRequest extends BasicRequest implements Serializable {
+public class DataGroupNonQueryRequest extends BasicRequest implements
Serializable {
/**
* Serialized physical plan
*/
private byte[] physicalPlanBytes;
+ private Operator.OperatorType operatorType;
- public DataNonQueryRequest(String groupID, PhysicalPlan plan)
+ public DataGroupNonQueryRequest(String groupID, PhysicalPlan plan)
throws IOException {
super(groupID);
this.physicalPlanBytes = PhysicalPlanLogTransfer.operatorToLog(plan);
+ this.operatorType = plan.getOperatorType();
}
public byte[] getPhysicalPlanBytes() {
return physicalPlanBytes;
}
+ public OperatorType getOperatorType() {
+ return operatorType;
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetadataNonQueryRequest.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetaGroupNonQueryRequest.java
similarity index 90%
rename from
cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetadataNonQueryRequest.java
rename to
cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetaGroupNonQueryRequest.java
index dd91e09..fba5734 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetadataNonQueryRequest.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetaGroupNonQueryRequest.java
@@ -27,14 +27,14 @@ import
org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
/**
* Handle request to metadata group leader
*/
-public class MetadataNonQueryRequest extends BasicRequest implements
Serializable {
+public class MetaGroupNonQueryRequest extends BasicRequest implements
Serializable {
/**
* Serialized physical plan
*/
private byte[] physicalPlanBytes;
- public MetadataNonQueryRequest(String groupID, PhysicalPlan plan)
+ public MetaGroupNonQueryRequest(String groupID, PhysicalPlan plan)
throws IOException {
super(groupID);
this.physicalPlanBytes = PhysicalPlanLogTransfer.operatorToLog(plan);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataNonQueryResponse.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
similarity index 80%
rename from
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataNonQueryResponse.java
rename to
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
index 6f430d0..11bf173 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataNonQueryResponse.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataGroupNonQueryResponse.java
@@ -21,13 +21,13 @@ package org.apache.iotdb.cluster.rpc.response;
/**
* Handle response from data group leader
*/
-public class DataNonQueryResponse extends BasicResponse {
+public class DataGroupNonQueryResponse extends BasicResponse {
- public DataNonQueryResponse(boolean redirected, boolean success, String
leaderStr, String errorMsg) {
+ public DataGroupNonQueryResponse(boolean redirected, boolean success, String
leaderStr, String errorMsg) {
super(redirected, success, leaderStr, errorMsg);
}
- public DataNonQueryResponse(boolean redirected, boolean success) {
+ public DataGroupNonQueryResponse(boolean redirected, boolean success) {
super(redirected, success, null, null);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetadataNonQueryResponse.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
similarity index 80%
rename from
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetadataNonQueryResponse.java
rename to
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
index f23fa1c..60b4171 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetadataNonQueryResponse.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetaGroupNonQueryResponse.java
@@ -21,13 +21,13 @@ package org.apache.iotdb.cluster.rpc.response;
/**
* Handle response from metadata group leader
*/
-public class MetadataNonQueryResponse extends BasicResponse {
+public class MetaGroupNonQueryResponse extends BasicResponse {
- public MetadataNonQueryResponse(boolean redirected, boolean success, String
leaderStr, String errorMsg) {
+ public MetaGroupNonQueryResponse(boolean redirected, boolean success, String
leaderStr, String errorMsg) {
super(redirected, success, leaderStr, errorMsg);
}
- public MetadataNonQueryResponse(boolean redirected, boolean success) {
+ public MetaGroupNonQueryResponse(boolean redirected, boolean success) {
super(redirected, success, null, null);
}