This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 1647d39 fix some serve bugs and modify for query time series
1647d39 is described below
commit 1647d3993b2fd63d0ec498780e1976e6be37c507
Author: lta <[email protected]>
AuthorDate: Sat Mar 30 22:43:33 2019 +0800
fix some serve bugs and modify for query time series
---
.../callback/{MultiTask.java => MultiQPTask.java} | 4 +-
.../cluster/callback/{Task.java => QPTask.java} | 8 +--
.../{SingleTask.java => SingleQPTask.java} | 4 +-
.../apache/iotdb/cluster/config/ClusterConfig.java | 6 +-
.../org/apache/iotdb/cluster/entity/Server.java | 6 +-
.../cluster/entity/raft/DataStateMachine.java | 7 +-
.../cluster/entity/raft/MetadataStateManchine.java | 9 +--
.../apache/iotdb/cluster/qp/ClusterQPExecutor.java | 48 +++++++-------
.../cluster/qp/executor/NonQueryExecutor.java | 75 ++++++++++++++++------
.../cluster/qp/executor/QueryMetadataExecutor.java | 32 +++++----
.../org/apache/iotdb/cluster/rpc/NodeAsClient.java | 10 +--
.../iotdb/cluster/rpc/impl/RaftNodeAsClient.java | 16 ++---
...cessor.java => DataNonQueryAsyncProcessor.java} | 74 ++++++++-------------
...or.java => MetadataNonQueryAsyncProcessor.java} | 70 +++++++-------------
.../processor/QueryTimeSeriesAsyncProcessor.java | 58 ++++++++---------
...nQueryRequest.java => DataNonQueryRequest.java} | 15 ++---
...ryRequest.java => MetadataNonQueryRequest.java} | 15 ++---
...ueryResponse.java => DataNonQueryResponse.java} | 9 ++-
...Response.java => MetadataNonQueryResponse.java} | 9 ++-
.../org/apache/iotdb/cluster/utils/RaftUtils.java | 2 +-
.../apache/iotdb/cluster/utils/hash/Router.java | 4 +-
service-rpc/src/main/thrift/rpc.thrift | 2 +-
22 files changed, 238 insertions(+), 245 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiTask.java
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiQPTask.java
similarity index 91%
rename from
cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiTask.java
rename to
cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiQPTask.java
index c7515c8..ffbd0d3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiQPTask.java
@@ -25,9 +25,9 @@ import org.apache.iotdb.cluster.rpc.response.BasicResponse;
/**
* Split a task to multi task closures.
*/
-public class MultiTask extends Task {
+public class MultiQPTask extends QPTask {
- public MultiTask(boolean isSyncTask, int taskNum, BasicRequest request) {
+ public MultiQPTask(boolean isSyncTask, int taskNum, BasicRequest request) {
super(isSyncTask, taskNum, TaskState.INITIAL);
this.request = request;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
similarity index 94%
rename from cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
rename to cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
index 7940a5f..7fbfe5a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/QPTask.java
@@ -22,15 +22,15 @@ import java.util.concurrent.CountDownLatch;
import org.apache.iotdb.cluster.rpc.request.BasicRequest;
import org.apache.iotdb.cluster.rpc.response.BasicResponse;
-public abstract class Task {
+public abstract class QPTask {
/**
- * Task response
+ * QPTask response
*/
protected BasicResponse response;
/**
- * Task request
+ * QPTask request
*/
protected BasicRequest request;
@@ -54,7 +54,7 @@ public abstract class Task {
*/
protected TaskState taskState;
- public Task(boolean isSyncTask, int taskNum, TaskState taskState) {
+ public QPTask(boolean isSyncTask, int taskNum, TaskState taskState) {
this.isSyncTask = isSyncTask;
this.taskNum = taskNum;
this.taskCountDownLatch = new CountDownLatch(taskNum);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
similarity index 93%
rename from
cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
rename to
cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
index ac5492c..85da087 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleQPTask.java
@@ -24,11 +24,11 @@ import org.apache.iotdb.cluster.rpc.response.BasicResponse;
/**
* Process single task.
*/
-public class SingleTask extends Task {
+public class SingleQPTask extends QPTask {
private static final int TASK_NUM = 1;
- public SingleTask(boolean isSyncTask, BasicRequest request) {
+ public SingleQPTask(boolean isSyncTask, BasicRequest request) {
super(isSyncTask, TASK_NUM, TaskState.INITIAL);
this.request = request;
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 83b129f..ca9b554 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -31,7 +31,7 @@ public class ClusterConfig {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterConfig.class);
public static final String CONFIG_NAME = "iotdb-cluster.properties";
- public static final String DEFAULT_NODE = "127.0.0.1:8888";
+ public static final String DEFAULT_NODE =
"192.168.130.19:8888,192.168.130.12:8888,192.168.130.14:8888,192.168.130.16:8888,192.168.130.18:8888";
public static final String METADATA_GROUP_ID = "metadata";
private static final String DEFAULT_RAFT_DIR = "raft";
private static final String DEFAULT_RAFT_METADATA_DIR = "metadata";
@@ -41,14 +41,14 @@ public class ClusterConfig {
/**
* Cluster node: {ip1,ip2,...,ipn}
*/
- private String[] nodes = {DEFAULT_NODE};
+ private String[] nodes =
{"192.168.130.19:8888","192.168.130.12:8888","192.168.130.14:8888","192.168.130.16:8888","192.168.130.18:8888"};
/**
* Replication number
*/
private int replication = 3;
- private String ip = "127.0.0.1";
+ private String ip = "192.168.130.19";
private int port = 8888;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index 108dfa7..0c7d568 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -29,7 +29,8 @@ import
org.apache.iotdb.cluster.entity.data.DataPartitionHolder;
import org.apache.iotdb.cluster.entity.metadata.MetadataHolder;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
-import org.apache.iotdb.cluster.rpc.processor.NonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.processor.DataNonQueryAsyncProcessor;
+import org.apache.iotdb.cluster.rpc.processor.MetadataNonQueryAsyncProcessor;
import org.apache.iotdb.cluster.rpc.processor.QueryTimeSeriesAsyncProcessor;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
@@ -62,7 +63,8 @@ public class Server {
RpcServer rpcServer = new RpcServer(serverId.getPort());
RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
- rpcServer.registerUserProcessor(new NonQueryAsyncProcessor(this));
+ rpcServer.registerUserProcessor(new DataNonQueryAsyncProcessor(this));
+ rpcServer.registerUserProcessor(new MetadataNonQueryAsyncProcessor(this));
rpcServer.registerUserProcessor(new QueryTimeSeriesAsyncProcessor(this));
metadataHolder = new MetadataRaftHolder(peerIds, serverId, rpcServer,
true);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
index 79a9d7a..36535d6 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
@@ -30,7 +30,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.rpc.request.NonQueryRequest;
+import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
@@ -61,14 +61,14 @@ public class DataStateMachine extends StateMachineAdapter {
while (iterator.hasNext()) {
Closure closure = null;
- NonQueryRequest request = null;
+ MetadataNonQueryRequest request = null;
if (iterator.done() != null) {
closure = iterator.done();
}
final ByteBuffer data = iterator.getData();
try {
request = SerializerManager.getSerializer(SerializerManager.Hessian2)
- .deserialize(data.array(), NonQueryRequest.class.getName());
+ .deserialize(data.array(),
MetadataNonQueryRequest.class.getName());
} catch (final CodecException e) {
LOGGER.error("Fail to decode IncrementAndGetRequest", e);
}
@@ -99,6 +99,7 @@ public class DataStateMachine extends StateMachineAdapter {
@Override
public void onStartFollowing(LeaderChangeContext ctx) {
RaftUtils.updateRaftGroupLeader(groupId, ctx.getLeaderId());
+ this.leaderTerm.set(-1);
LOGGER.info("Start following, {} starts to be leader of {}",
ctx.getLeaderId(), groupId);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
index 5c43fab..4ab4cb1 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
@@ -30,7 +30,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.rpc.request.NonQueryRequest;
+import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
@@ -70,7 +70,7 @@ public class MetadataStateManchine extends
StateMachineAdapter {
}
/**
- * Update StrageGroup List and userProfileMap based on Task read from raft
log
+ * Update StrageGroup List and userProfileMap based on QPTask read from raft
log
*
* @param iterator task iterator
*/
@@ -79,14 +79,14 @@ public class MetadataStateManchine extends
StateMachineAdapter {
while (iterator.hasNext()) {
Closure closure = null;
- NonQueryRequest request = null;
+ MetadataNonQueryRequest request = null;
if (iterator.done() != null) {
closure = iterator.done();
}
final ByteBuffer data = iterator.getData();
try {
request = SerializerManager.getSerializer(SerializerManager.Hessian2)
- .deserialize(data.array(), NonQueryRequest.class.getName());
+ .deserialize(data.array(),
MetadataNonQueryRequest.class.getName());
} catch (final CodecException e) {
LOGGER.error("Fail to decode IncrementAndGetRequest", e);
}
@@ -131,6 +131,7 @@ public class MetadataStateManchine extends
StateMachineAdapter {
@Override
public void onStartFollowing(LeaderChangeContext ctx) {
RaftUtils.updateRaftGroupLeader(groupId, ctx.getLeaderId());
+ this.leaderTerm.set(-1);
LOGGER.info("Start following, {} starts to be leader of {}",
ctx.getLeaderId(), groupId);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index ab9f153..c296296 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -20,10 +20,12 @@ package org.apache.iotdb.cluster.qp;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
-import org.apache.iotdb.cluster.callback.Task;
-import org.apache.iotdb.cluster.callback.Task.TaskState;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.cluster.callback.QPTask;
+import org.apache.iotdb.cluster.callback.QPTask.TaskState;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.entity.Server;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.rpc.NodeAsClient;
import org.apache.iotdb.cluster.rpc.impl.RaftNodeAsClient;
@@ -33,7 +35,6 @@ import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
import org.apache.iotdb.cluster.utils.hash.Router;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.qp.executor.OverflowQPExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,8 +45,8 @@ public abstract class ClusterQPExecutor {
protected Router router = Router.getInstance();
protected PhysicalNode localNode = new PhysicalNode(CLUSTER_CONFIG.getIp(),
CLUSTER_CONFIG.getPort());
- protected OverflowQPExecutor qpExecutor = new OverflowQPExecutor();
protected MManager mManager = MManager.getInstance();
+ protected final Server server = Server.getInstance();
/**
* Rpc Service Client
@@ -62,6 +63,8 @@ public abstract class ClusterQPExecutor {
*/
protected static int SUB_TASK_NUM = 1;
+ protected final AtomicInteger requestId = new AtomicInteger(0);
+
/**
* Get Storage Group Name by device name
*/
@@ -83,8 +86,8 @@ public abstract class ClusterQPExecutor {
}
/**
- * Verify if the non query command can execute in local. 1. If this node
belongs to the storage group 2. If
- * this node is leader.
+ * Verify if the non query command can execute in local. 1. If this node
belongs to the storage
+ * group 2. If this node is leader.
*/
public boolean canHandleNonQuery(String storageGroup) {
if (router.containPhysicalNode(storageGroup, localNode)) {
@@ -97,41 +100,42 @@ public abstract class ClusterQPExecutor {
}
/**
- * Verify if the query command can execute in local. Check if this node
belongs to the storage group
+ * Verify if the query command can execute in local. Check if this node
belongs to the storage
+ * group
*/
public boolean canHandleQuery(String storageGroup) {
return router.containPhysicalNode(storageGroup, localNode);
}
/**
- * Async handle task by task and leader id
+ * Async handle QPTask by QPTask and leader id
*
- * @param task request task
+ * @param QPTask request QPTask
* @param leader leader of the target raft group
- * @param taskRetryNum Number of task retries due to timeout and redirected.
+ * @param taskRetryNum Number of QPTask retries due to timeout and
redirected.
* @return basic response
*/
- public BasicResponse asyncHandleTaskGetRes(Task task, PeerId leader, int
taskRetryNum)
+ public BasicResponse asyncHandleTaskGetRes(QPTask QPTask, PeerId leader, int
taskRetryNum)
throws InterruptedException, RaftConnectionException {
if (taskRetryNum >= TASK_MAX_RETRY) {
- throw new RaftConnectionException(String.format("Task retries reach the
upper bound %s",
+ throw new RaftConnectionException(String.format("QPTask retries reach
the upper bound %s",
TASK_MAX_RETRY));
}
NodeAsClient client = new RaftNodeAsClient();
/** Call async method **/
- client.asyncHandleRequest(cliClientService, task.getRequest(), leader,
task);
- task.await();
- if (task.getTaskState() != TaskState.FINISH) {
- if (task.getTaskState() == TaskState.REDIRECT) {
+ client.asyncHandleRequest(cliClientService, QPTask.getRequest(), leader,
QPTask);
+ QPTask.await();
+ if (QPTask.getTaskState() != TaskState.FINISH) {
+ if (QPTask.getTaskState() == TaskState.REDIRECT) {
/** redirect to the right leader **/
- leader = PeerId.parsePeer(task.getResponse().getLeaderStr());
- LOGGER.info("Redirect leader: {}, group id = {}" , leader,
task.getRequest().getGroupID());
- RaftUtils.updateRaftGroupLeader(task.getRequest().getGroupID(),
leader);
+ leader = PeerId.parsePeer(QPTask.getResponse().getLeaderStr());
+ LOGGER.info("Redirect leader: {}, group id = {}", leader,
QPTask.getRequest().getGroupID());
+ RaftUtils.updateRaftGroupLeader(QPTask.getRequest().getGroupID(),
leader);
}
- task.resetTask();
- return asyncHandleTaskGetRes(task, leader, taskRetryNum + 1);
+ QPTask.resetTask();
+ return asyncHandleTaskGetRes(QPTask, leader, taskRetryNum + 1);
}
- return task.getResponse();
+ return QPTask.getResponse();
}
public void shutdown() {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index a95a227..c333f38 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -18,16 +18,25 @@
*/
package org.apache.iotdb.cluster.qp.executor;
+import com.alipay.remoting.exception.CodecException;
+import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.entity.Task;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
import java.io.IOException;
-import org.apache.iotdb.cluster.callback.SingleTask;
-import org.apache.iotdb.cluster.callback.Task;
+import java.nio.ByteBuffer;
+import org.apache.iotdb.cluster.callback.QPTask;
+import org.apache.iotdb.cluster.callback.SingleQPTask;
+import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
+import org.apache.iotdb.cluster.entity.raft.RaftService;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.qp.ClusterQPExecutor;
-import org.apache.iotdb.cluster.rpc.request.NonQueryRequest;
+import org.apache.iotdb.cluster.rpc.request.BasicRequest;
+import org.apache.iotdb.cluster.rpc.request.DataNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
import org.apache.iotdb.cluster.rpc.response.BasicResponse;
+import org.apache.iotdb.cluster.rpc.response.DataNonQueryResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
@@ -113,7 +122,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
Path path = updatePlan.getPath();
String deviceId = path.getDevice();
String storageGroup = getStroageGroupByDevice(deviceId);
- return handleRequest(storageGroup, updatePlan);
+ return handleDataGroupRequest(storageGroup, updatePlan);
}
//TODO
@@ -128,7 +137,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
throws ProcessorException, PathErrorException, InterruptedException,
IOException, RaftConnectionException {
String deviceId = insertPlan.getDeviceId();
String storageGroup = getStroageGroupByDevice(deviceId);
- return handleRequest(storageGroup, insertPlan);
+ return handleDataGroupRequest(storageGroup, insertPlan);
}
/**
@@ -156,7 +165,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
case DELETE_PATH:
String deviceId = path.getDevice();
String storageGroup = getStroageGroupByDevice(deviceId);
- return handleRequest(storageGroup, metadataPlan);
+ return handleDataGroupRequest(storageGroup, metadataPlan);
case SET_FILE_LEVEL:
boolean fileLevelExist =
mManager.checkStorageLevelOfMTree(path.getFullPath());
if (fileLevelExist) {
@@ -177,32 +186,55 @@ public class NonQueryExecutor extends ClusterQPExecutor {
}
/**
- * Handle request by storage group and physical plan
+ * Handle request to data group by storage group and physical plan
*/
- private boolean handleRequest(String storageGroup, PhysicalPlan plan)
- throws ProcessorException, IOException, RaftConnectionException,
InterruptedException {
+ private boolean handleDataGroupRequest(String storageGroup, PhysicalPlan
plan)
+ throws IOException, RaftConnectionException, InterruptedException {
+ String groupId = getGroupIdBySG(storageGroup);
+ PeerId leader = RaftUtils.getTargetPeerID(groupId);
+ DataNonQueryRequest request = new DataNonQueryRequest(groupId, plan);
+ SingleQPTask qpTask = new SingleQPTask(false, request);
/** Check if the plan can be executed locally. **/
if (canHandleNonQuery(storageGroup)) {
- return qpExecutor.processNonQuery(plan);
+ return handleDataGroupRequestLocally(groupId, qpTask, request);
} else {
- String groupId = getGroupIdBySG(storageGroup);
- NonQueryRequest request = new NonQueryRequest(groupId, plan);
- PeerId leader = RaftUtils.getTargetPeerID(groupId);
+ return asyncHandleTask(qpTask, leader, 0);
+ }
+ }
- SingleTask task = new SingleTask(false, request);
- return asyncHandleTask(task, leader, 0);
+ /**
+ * Handle data group request locally.
+ */
+ private boolean handleDataGroupRequestLocally(String groupId, QPTask qpTask,
BasicRequest request)
+ throws InterruptedException {
+ final byte[] reqContext = new byte[4];
+ Task task = null;
+ /** Apply qpTask to Raft Node **/
+ try {
+ task.setData(ByteBuffer
+ .wrap(SerializerManager.getSerializer(SerializerManager.Hessian2)
+ .serialize(reqContext)));
+ } catch (final CodecException e) {
+ return false;
}
+ DataPartitionRaftHolder dataRaftHolder = (DataPartitionRaftHolder) server
+ .getDataPartitionHolderMap().get(groupId);
+ RaftService service = (RaftService) dataRaftHolder.getService();
+ service.getNode().apply(task);
+ qpTask.await();
+ DataNonQueryResponse response = (DataNonQueryResponse)
qpTask.getResponse();
+ return response.isSuccess();
}
/**
- * Async handle task by task and leader id.
+ * Async handle task by QPTask and leader id.
*
- * @param task request task
+ * @param task request QPTask
* @param leader leader of the target raft group
- * @param taskRetryNum Number of task retries due to timeout and redirected.
+ * @param taskRetryNum Number of QPTask retries due to timeout and
redirected.
* @return request result
*/
- private boolean asyncHandleTask(Task task, PeerId leader, int taskRetryNum)
+ private boolean asyncHandleTask(QPTask task, PeerId leader, int taskRetryNum)
throws RaftConnectionException, InterruptedException {
BasicResponse response = asyncHandleTaskGetRes(task, leader, taskRetryNum);
return response.isSuccess();
@@ -215,10 +247,11 @@ public class NonQueryExecutor extends ClusterQPExecutor {
*/
public boolean redirectMetadataGroupLeader(PhysicalPlan plan)
throws IOException, RaftConnectionException, InterruptedException {
- NonQueryRequest request = new
NonQueryRequest(CLUSTER_CONFIG.METADATA_GROUP_ID, plan);
+ MetadataNonQueryRequest request = new
MetadataNonQueryRequest(CLUSTER_CONFIG.METADATA_GROUP_ID,
+ plan);
PeerId leader =
RaftUtils.getTargetPeerID(CLUSTER_CONFIG.METADATA_GROUP_ID);
- SingleTask task = new SingleTask(false, request);
+ SingleQPTask task = new SingleQPTask(false, request);
return asyncHandleTask(task, leader, 0);
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 4560973..7967536 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -26,10 +26,8 @@ import
com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
import com.alipay.sofa.jraft.util.Bits;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.iotdb.cluster.callback.SingleTask;
+import org.apache.iotdb.cluster.callback.SingleQPTask;
import org.apache.iotdb.cluster.config.ClusterConfig;
-import org.apache.iotdb.cluster.entity.Server;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
@@ -53,9 +51,6 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
private static final Logger LOGGER =
LoggerFactory.getLogger(QueryMetadataExecutor.class);
- private final AtomicInteger requestId = new AtomicInteger(0);
- private final Server server = Server.getInstance();
-
public QueryMetadataExecutor() {
}
@@ -76,10 +71,12 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
String groupId = getGroupIdBySG(storageGroup);
QueryTimeSeriesRequest request = new QueryTimeSeriesRequest(groupId, path);
PeerId leader = RaftUtils.getRandomPeerID(groupId);
- SingleTask task = new SingleTask(false, request);
+ SingleQPTask task = new SingleQPTask(false, request);
+ LOGGER.info("Execute show timeseries {} statement.", path);
/** Check if the plan can be executed locally. **/
if (canHandleQuery(storageGroup)) {
+ LOGGER.info("Execute show timeseries {} statement locally.", path);
return queryTimeSeriesLocally(path, groupId, task);
} else {
try {
@@ -96,8 +93,8 @@ public class QueryMetadataExecutor extends ClusterQPExecutor {
*
* @param path column path
*/
- private List<List<String>> queryTimeSeriesLocally(String path, String
groupId, SingleTask task)
- throws InterruptedException {
+ private List<List<String>> queryTimeSeriesLocally(String path, String
groupId, SingleQPTask task)
+ throws InterruptedException, ProcessorException {
final byte[] reqContext = new byte[4];
Bits.putInt(reqContext, 0, requestId.incrementAndGet());
DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder)
server
@@ -110,22 +107,32 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
QueryTimeSeriesResponse response;
if (status.isOk()) {
try {
+ LOGGER.info("start to read");
response = new QueryTimeSeriesResponse(false, true,
dataPartitionHolder.getFsm().getShowTimeseriesPath(path));
} catch (final PathErrorException e) {
response = new QueryTimeSeriesResponse(false, false, null,
e.toString());
}
} else {
+
+ System.out.println("false");
response = new QueryTimeSeriesResponse(false, false, null, null);
}
+ System.out.println(status.isOk());
+ System.out.println();
task.run(response);
}
});
task.await();
+ QueryTimeSeriesResponse response = (QueryTimeSeriesResponse)
task.getResponse();
+ if (!response.isSuccess()) {
+ LOGGER.error("Execute show timeseries {} statement false.", path);
+ throw new ProcessorException();
+ }
return ((QueryTimeSeriesResponse) task.getResponse()).getTimeSeries();
}
- private List<List<String>> queryTimeSeries(SingleTask task, PeerId leader)
+ private List<List<String>> queryTimeSeries(SingleQPTask task, PeerId leader)
throws InterruptedException, RaftConnectionException {
BasicResponse response = asyncHandleTaskGetRes(task, leader, 0);
return ((QueryTimeSeriesResponse) response).getTimeSeries();
@@ -137,10 +144,11 @@ public class QueryMetadataExecutor extends
ClusterQPExecutor {
* @return Set of storage group name
*/
private Set<String> queryStorageGroupLocally() throws InterruptedException {
- QueryStorageGroupRequest request = new
QueryStorageGroupRequest(ClusterConfig.METADATA_GROUP_ID);
- SingleTask task = new SingleTask(false, request);
final byte[] reqContext = new byte[4];
Bits.putInt(reqContext, 0, requestId.incrementAndGet());
+ QueryStorageGroupRequest request = new QueryStorageGroupRequest(
+ ClusterConfig.METADATA_GROUP_ID);
+ SingleQPTask task = new SingleQPTask(false, request);
MetadataRaftHolder metadataHolder = (MetadataRaftHolder)
server.getMetadataHolder();
((RaftService) metadataHolder.getService()).getNode()
.readIndex(reqContext, new ReadIndexClosure() {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/NodeAsClient.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/NodeAsClient.java
index d658c94..8d37661 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/NodeAsClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/NodeAsClient.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.cluster.rpc;
-import org.apache.iotdb.cluster.callback.Task;
+import org.apache.iotdb.cluster.callback.QPTask;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
import org.apache.iotdb.cluster.rpc.request.BasicRequest;
@@ -32,18 +32,18 @@ public interface NodeAsClient {
*
* @param clientService client rpc service handle
* @param leader leader node of the target group
- * @param task the task to be executed
+ * @param QPTask the QPTask to be executed
*/
void asyncHandleRequest(Object clientService, BasicRequest request, Object
leader,
- Task task) throws RaftConnectionException;
+ QPTask QPTask) throws RaftConnectionException;
/**
* Synchronous processing requests
*
* @param clientService client rpc service handle
* @param leader leader node of the target group
- * @param task the task to be executed
+ * @param QPTask the QPTask to be executed
*/
- void syncHandleRequest(Object clientService, BasicRequest request, Object
leader, Task task)
+ void syncHandleRequest(Object clientService, BasicRequest request, Object
leader, QPTask QPTask)
throws RaftConnectionException;
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/impl/RaftNodeAsClient.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/impl/RaftNodeAsClient.java
index a9cdd90..ea52c83 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/impl/RaftNodeAsClient.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/impl/RaftNodeAsClient.java
@@ -23,8 +23,8 @@ import com.alipay.remoting.exception.RemotingException;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
import java.util.concurrent.Executor;
-import org.apache.iotdb.cluster.callback.Task;
-import org.apache.iotdb.cluster.callback.Task.TaskState;
+import org.apache.iotdb.cluster.callback.QPTask;
+import org.apache.iotdb.cluster.callback.QPTask.TaskState;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
@@ -51,7 +51,7 @@ public class RaftNodeAsClient implements NodeAsClient {
@Override
public void asyncHandleRequest(Object clientService, BasicRequest request,
Object leader,
- Task task)
+ QPTask QPTask)
throws RaftConnectionException {
BoltCliClientService boltClientService = (BoltCliClientService)
clientService;
PeerId raftLeader = (PeerId) leader;
@@ -64,14 +64,14 @@ public class RaftNodeAsClient implements NodeAsClient {
@Override
public void onResponse(Object result) {
BasicResponse response = (BasicResponse) result;
- task.run(response);
+ QPTask.run(response);
}
@Override
public void onException(Throwable e) {
LOGGER.error("Bolt rpc client occurs errors when handling
Request", e);
- task.setTaskState(TaskState.EXCEPTION);
- task.run(null);
+ QPTask.setTaskState(TaskState.EXCEPTION);
+ QPTask.run(null);
}
@@ -88,14 +88,14 @@ public class RaftNodeAsClient implements NodeAsClient {
@Override
public void syncHandleRequest(Object clientService, BasicRequest request,
Object leader,
- Task task)
+ QPTask QPTask)
throws RaftConnectionException {
BoltCliClientService boltClientService = (BoltCliClientService)
clientService;
PeerId raftLeader = (PeerId) leader;
try {
BasicResponse response = (BasicResponse) boltClientService.getRpcClient()
.invokeSync(raftLeader.getEndpoint().toString(), request,
TASK_TIMEOUT_MS);
- task.run(response);
+ QPTask.run(response);
} catch (RemotingException | InterruptedException e) {
throw new RaftConnectionException(e);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/NonQueryAsyncProcessor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataNonQueryAsyncProcessor.java
similarity index 54%
copy from
cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/NonQueryAsyncProcessor.java
copy to
cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataNonQueryAsyncProcessor.java
index 202c2d3..db734f4 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/NonQueryAsyncProcessor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataNonQueryAsyncProcessor.java
@@ -30,93 +30,71 @@ import
com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
import java.nio.ByteBuffer;
import org.apache.iotdb.cluster.entity.Server;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
-import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.request.NonQueryRequest;
-import org.apache.iotdb.cluster.rpc.response.NonQueryResponse;
+import org.apache.iotdb.cluster.rpc.request.DataNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.response.DataNonQueryResponse;
+import org.apache.iotdb.cluster.rpc.response.MetadataNonQueryResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.qp.logical.Operator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Async handle change metadata request.
+ * Async handle those requests which need to be applied in data group.
*/
-public class NonQueryAsyncProcessor extends
BasicAsyncUserProcessor<NonQueryRequest> {
+public class DataNonQueryAsyncProcessor extends
BasicAsyncUserProcessor<DataNonQueryRequest> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(NonQueryAsyncProcessor.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DataNonQueryAsyncProcessor.class);
private Server server;
- public NonQueryAsyncProcessor(Server server) {
+ public DataNonQueryAsyncProcessor(Server server) {
this.server = server;
}
@Override
public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
- NonQueryRequest nonQueryRequest) {
- Operator.OperatorType requestType = nonQueryRequest.getRequestType();
- LOGGER.info("Handle nonquery request.");
- /** Check if it's the leader of metadata **/
- String groupId = nonQueryRequest.getGroupID();
- if (!this.server.getServerId().equals(RaftUtils.getTargetPeerID(groupId)))
{
+ DataNonQueryRequest dataNonQueryRequest) {
+ LOGGER.info("Handle data non query request.");
+
+ /** Check if it's the leader **/
+ String groupId = dataNonQueryRequest.getGroupID();
+ DataPartitionRaftHolder dataPartitionRaftHolder =
(DataPartitionRaftHolder) server
+ .getDataPartitionHolderMap().get(groupId);
+ if (!dataPartitionRaftHolder.getFsm().isLeader()) {
PeerId leader = RaftUtils.getTargetPeerID(groupId);
LOGGER.info("Request need to redirect leader: {}, groupId : {} ",
leader, groupId);
BoltCliClientService cliClientService = new BoltCliClientService();
cliClientService.init(new CliOptions());
LOGGER.info("Right leader is: {}, group id = {} ", leader, groupId);
- NonQueryResponse response = new NonQueryResponse(true, false,
leader.toString(), null);
+ MetadataNonQueryResponse response = new MetadataNonQueryResponse(true,
false,
+ leader.toString(), null);
asyncContext.sendResponse(response);
} else {
LOGGER.info("Apply task to raft node");
- /** Apply Task to Raft Node **/
+ /** Apply QPTask to Raft Node **/
final Task task = new Task();
task.setDone((Status status) -> {
asyncContext.sendResponse(
- new NonQueryResponse(false, status.isOk(), null,
status.getErrorMsg()));
+ new DataNonQueryResponse(false, status.isOk(), null,
status.getErrorMsg()));
});
try {
task.setData(ByteBuffer
.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2)
- .serialize(nonQueryRequest)));
+ .serialize(dataNonQueryRequest)));
} catch (final CodecException e) {
- asyncContext.sendResponse(new NonQueryResponse(false, false, null,
e.toString()));
- }
-
- RaftService service;
- switch (requestType) {
- case SET_STORAGE_GROUP:
- case AUTHOR:
- case CREATE_USER:
- case CREATE_ROLE:
- case DELETE_ROLE:
- case DELETE_USER:
- case GRANT_USER_ROLE:
- case GRANT_USER_PRIVILEGE:
- case REVOKE_USER_PRIVILEGE:
- case REVOKE_USER_ROLE:
- case GRANT_ROLE_PRIVILEGE:
- case LIST_USER:
- case LIST_ROLE:
- case LIST_USER_PRIVILEGE:
- case LIST_ROLE_PRIVILEGE:
- case LIST_USER_ROLES:
- case LIST_ROLE_USERS:
- case MODIFY_PASSWORD:
- MetadataRaftHolder metadataHolder = (MetadataRaftHolder)
server.getMetadataHolder();
- service = (RaftService) metadataHolder.getService();
- break;
- default:
- DataPartitionRaftHolder dataRaftHolder = (DataPartitionRaftHolder)
server
- .getDataPartitionHolderMap().get(groupId);
- service = (RaftService) dataRaftHolder.getService();
+ asyncContext.sendResponse(new MetadataNonQueryResponse(false, false,
null, e.toString()));
}
+ DataPartitionRaftHolder dataRaftHolder = (DataPartitionRaftHolder) server
+ .getDataPartitionHolderMap().get(groupId);
+ RaftService service = (RaftService) dataRaftHolder.getService();
service.getNode().apply(task);
}
}
@Override
public String interest() {
- return NonQueryRequest.class.getName();
+ return MetadataNonQueryRequest.class.getName();
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/NonQueryAsyncProcessor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetadataNonQueryAsyncProcessor.java
similarity index 53%
rename from
cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/NonQueryAsyncProcessor.java
rename to
cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetadataNonQueryAsyncProcessor.java
index 202c2d3..31e83ce 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/NonQueryAsyncProcessor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetadataNonQueryAsyncProcessor.java
@@ -29,94 +29,68 @@ import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
import java.nio.ByteBuffer;
import org.apache.iotdb.cluster.entity.Server;
-import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
-import org.apache.iotdb.cluster.rpc.request.NonQueryRequest;
-import org.apache.iotdb.cluster.rpc.response.NonQueryResponse;
+import org.apache.iotdb.cluster.rpc.request.MetadataNonQueryRequest;
+import org.apache.iotdb.cluster.rpc.response.MetadataNonQueryResponse;
import org.apache.iotdb.cluster.utils.RaftUtils;
import org.apache.iotdb.db.qp.logical.Operator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Async handle change metadata request.
+ * Async handle those requests which need to be applied in metadata group.
*/
-public class NonQueryAsyncProcessor extends
BasicAsyncUserProcessor<NonQueryRequest> {
+public class MetadataNonQueryAsyncProcessor extends
BasicAsyncUserProcessor<MetadataNonQueryRequest> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(NonQueryAsyncProcessor.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MetadataNonQueryAsyncProcessor.class);
private Server server;
- public NonQueryAsyncProcessor(Server server) {
+ public MetadataNonQueryAsyncProcessor(Server server) {
this.server = server;
}
@Override
public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
- NonQueryRequest nonQueryRequest) {
- Operator.OperatorType requestType = nonQueryRequest.getRequestType();
- LOGGER.info("Handle nonquery request.");
- /** Check if it's the leader of metadata **/
- String groupId = nonQueryRequest.getGroupID();
- if (!this.server.getServerId().equals(RaftUtils.getTargetPeerID(groupId)))
{
+ MetadataNonQueryRequest metadataNonQueryRequest) {
+ LOGGER.info("Handle metadata non query query request.");
+
+ /** Check if it's the leader **/
+ String groupId = metadataNonQueryRequest.getGroupID();
+ MetadataRaftHolder metadataHolder = (MetadataRaftHolder)
server.getMetadataHolder();
+ if (!metadataHolder.getFsm().isLeader()) {
PeerId leader = RaftUtils.getTargetPeerID(groupId);
LOGGER.info("Request need to redirect leader: {}, groupId : {} ",
leader, groupId);
BoltCliClientService cliClientService = new BoltCliClientService();
cliClientService.init(new CliOptions());
LOGGER.info("Right leader is: {}, group id = {} ", leader, groupId);
- NonQueryResponse response = new NonQueryResponse(true, false,
leader.toString(), null);
+ MetadataNonQueryResponse response = new MetadataNonQueryResponse(true,
false,
+ leader.toString(), null);
asyncContext.sendResponse(response);
} else {
- LOGGER.info("Apply task to raft node");
- /** Apply Task to Raft Node **/
+ LOGGER.info("Apply task to metadata raft node");
+ /** Apply QPTask to Raft Node **/
final Task task = new Task();
task.setDone((Status status) -> {
asyncContext.sendResponse(
- new NonQueryResponse(false, status.isOk(), null,
status.getErrorMsg()));
+ new MetadataNonQueryResponse(false, status.isOk(), null,
status.getErrorMsg()));
});
try {
task.setData(ByteBuffer
.wrap(SerializerManager.getSerializer(SerializerManager.Hessian2)
- .serialize(nonQueryRequest)));
+ .serialize(metadataNonQueryRequest)));
} catch (final CodecException e) {
- asyncContext.sendResponse(new NonQueryResponse(false, false, null,
e.toString()));
+ asyncContext.sendResponse(new MetadataNonQueryResponse(false, false,
null, e.toString()));
}
- RaftService service;
- switch (requestType) {
- case SET_STORAGE_GROUP:
- case AUTHOR:
- case CREATE_USER:
- case CREATE_ROLE:
- case DELETE_ROLE:
- case DELETE_USER:
- case GRANT_USER_ROLE:
- case GRANT_USER_PRIVILEGE:
- case REVOKE_USER_PRIVILEGE:
- case REVOKE_USER_ROLE:
- case GRANT_ROLE_PRIVILEGE:
- case LIST_USER:
- case LIST_ROLE:
- case LIST_USER_PRIVILEGE:
- case LIST_ROLE_PRIVILEGE:
- case LIST_USER_ROLES:
- case LIST_ROLE_USERS:
- case MODIFY_PASSWORD:
- MetadataRaftHolder metadataHolder = (MetadataRaftHolder)
server.getMetadataHolder();
- service = (RaftService) metadataHolder.getService();
- break;
- default:
- DataPartitionRaftHolder dataRaftHolder = (DataPartitionRaftHolder)
server
- .getDataPartitionHolderMap().get(groupId);
- service = (RaftService) dataRaftHolder.getService();
- }
+ RaftService service = (RaftService) metadataHolder.getService();
service.getNode().apply(task);
}
}
@Override
public String interest() {
- return NonQueryRequest.class.getName();
+ return MetadataNonQueryRequest.class.getName();
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryTimeSeriesAsyncProcessor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryTimeSeriesAsyncProcessor.java
index ba47279..05b9bd9 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryTimeSeriesAsyncProcessor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/QueryTimeSeriesAsyncProcessor.java
@@ -20,28 +20,24 @@ package org.apache.iotdb.cluster.rpc.processor;
import com.alipay.remoting.AsyncContext;
import com.alipay.remoting.BizContext;
-import com.alipay.remoting.exception.CodecException;
-import com.alipay.remoting.serialization.SerializerManager;
import com.alipay.sofa.jraft.Status;
-import com.alipay.sofa.jraft.entity.PeerId;
-import com.alipay.sofa.jraft.entity.Task;
-import java.nio.ByteBuffer;
+import com.alipay.sofa.jraft.closure.ReadIndexClosure;
+import com.alipay.sofa.jraft.util.Bits;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iotdb.cluster.entity.Server;
import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
import org.apache.iotdb.cluster.entity.raft.RaftService;
import org.apache.iotdb.cluster.rpc.request.QueryTimeSeriesRequest;
-import org.apache.iotdb.cluster.rpc.response.QueryStorageGroupResponse;
import org.apache.iotdb.cluster.rpc.response.QueryTimeSeriesResponse;
-import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.apache.iotdb.db.exception.PathErrorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class QueryTimeSeriesAsyncProcessor extends
BasicAsyncUserProcessor<QueryTimeSeriesRequest> {
private static final Logger LOGGER =
LoggerFactory.getLogger(QueryTimeSeriesAsyncProcessor.class);
- private Server server;
private final AtomicInteger requestId = new AtomicInteger(0);
+ private Server server;
public QueryTimeSeriesAsyncProcessor(Server server) {
this.server = server;
@@ -50,31 +46,31 @@ public class QueryTimeSeriesAsyncProcessor extends
BasicAsyncUserProcessor<Query
@Override
public void handleRequest(BizContext bizContext, AsyncContext asyncContext,
QueryTimeSeriesRequest queryMetadataRequest) {
- /** Check if it's the leader of data **/
String groupId = queryMetadataRequest.getGroupID();
- if (!this.server.getServerId().equals(RaftUtils.getTargetPeerID(groupId)))
{
- PeerId leader = RaftUtils.getTargetPeerID(groupId);
- QueryTimeSeriesResponse response = new QueryTimeSeriesResponse(true,
false, leader.toString(), null);
- asyncContext.sendResponse(response);
- }
-
- /** Apply Task to Raft Node **/
- final Task task = new Task();
- task.setDone((Status status) -> {
- asyncContext.sendResponse(
- new QueryTimeSeriesResponse(false, status.isOk(), null,
status.getErrorMsg()));
- });
- try {
- task.setData(ByteBuffer
- .wrap(SerializerManager.getSerializer(SerializerManager.Hessian2)
- .serialize(queryMetadataRequest)));
- } catch (final CodecException e) {
- asyncContext.sendResponse(new QueryTimeSeriesResponse(false, false,
null, e.toString()));
- }
+ final byte[] reqContext = new byte[4];
+ Bits.putInt(reqContext, 0, requestId.incrementAndGet());
+ DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder)
server
+ .getDataPartitionHolder(groupId);
+ ((RaftService) dataPartitionHolder.getService()).getNode()
+ .readIndex(reqContext, new ReadIndexClosure() {
- DataPartitionRaftHolder dataRaftHolder = (DataPartitionRaftHolder)
server.getDataPartitionHolderMap().get(groupId);
- RaftService service = (RaftService) dataRaftHolder.getService();
- service.getNode().apply(task);
+ @Override
+ public void run(Status status, long index, byte[] reqCtx) {
+ QueryTimeSeriesResponse response;
+ if (status.isOk()) {
+ try {
+ response = new QueryTimeSeriesResponse(false, true,
+ dataPartitionHolder.getFsm()
+
.getShowTimeseriesPath(queryMetadataRequest.getPath()));
+ } catch (final PathErrorException e) {
+ response = new QueryTimeSeriesResponse(false, false, null,
e.toString());
+ }
+ } else {
+ response = new QueryTimeSeriesResponse(false, false, null, null);
+ }
+ asyncContext.sendResponse(response);
+ }
+ });
}
@Override
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/NonQueryRequest.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataNonQueryRequest.java
similarity index 80%
copy from
cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/NonQueryRequest.java
copy to
cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataNonQueryRequest.java
index e4d25d5..c5fb573 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/NonQueryRequest.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/DataNonQueryRequest.java
@@ -24,29 +24,24 @@ import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
-public class NonQueryRequest extends BasicRequest implements Serializable {
+/**
+ * Handle request to data group
+ */
+public class DataNonQueryRequest extends BasicRequest implements Serializable {
/**
* Serialized physical plan
*/
private byte[] physicalPlanBytes;
- /**
- * Request type
- */
- private Operator.OperatorType requestType;
- public NonQueryRequest(String groupID, PhysicalPlan plan)
+ public DataNonQueryRequest(String groupID, PhysicalPlan plan)
throws IOException {
super(groupID);
this.physicalPlanBytes = PhysicalPlanLogTransfer.operatorToLog(plan);
- this.requestType = plan.getOperatorType();
}
public byte[] getPhysicalPlanBytes() {
return physicalPlanBytes;
}
- public Operator.OperatorType getRequestType() {
- return requestType;
- }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/NonQueryRequest.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetadataNonQueryRequest.java
similarity index 80%
rename from
cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/NonQueryRequest.java
rename to
cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetadataNonQueryRequest.java
index e4d25d5..dd91e09 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/NonQueryRequest.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/MetadataNonQueryRequest.java
@@ -24,29 +24,24 @@ import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.writelog.transfer.PhysicalPlanLogTransfer;
-public class NonQueryRequest extends BasicRequest implements Serializable {
+/**
+ * Handle request to metadata group leader
+ */
+public class MetadataNonQueryRequest extends BasicRequest implements
Serializable {
/**
* Serialized physical plan
*/
private byte[] physicalPlanBytes;
- /**
- * Request type
- */
- private Operator.OperatorType requestType;
- public NonQueryRequest(String groupID, PhysicalPlan plan)
+ public MetadataNonQueryRequest(String groupID, PhysicalPlan plan)
throws IOException {
super(groupID);
this.physicalPlanBytes = PhysicalPlanLogTransfer.operatorToLog(plan);
- this.requestType = plan.getOperatorType();
}
public byte[] getPhysicalPlanBytes() {
return physicalPlanBytes;
}
- public Operator.OperatorType getRequestType() {
- return requestType;
- }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/NonQueryResponse.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataNonQueryResponse.java
similarity index 77%
copy from
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/NonQueryResponse.java
copy to
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataNonQueryResponse.java
index 7e466fb..6f430d0 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/NonQueryResponse.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/DataNonQueryResponse.java
@@ -18,13 +18,16 @@
*/
package org.apache.iotdb.cluster.rpc.response;
-public class NonQueryResponse extends BasicResponse {
+/**
+ * Handle response from data group leader
+ */
+public class DataNonQueryResponse extends BasicResponse {
- public NonQueryResponse(boolean redirected, boolean success, String
leaderStr, String errorMsg) {
+ public DataNonQueryResponse(boolean redirected, boolean success, String
leaderStr, String errorMsg) {
super(redirected, success, leaderStr, errorMsg);
}
- public NonQueryResponse(boolean redirected, boolean success) {
+ public DataNonQueryResponse(boolean redirected, boolean success) {
super(redirected, success, null, null);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/NonQueryResponse.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetadataNonQueryResponse.java
similarity index 76%
rename from
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/NonQueryResponse.java
rename to
cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetadataNonQueryResponse.java
index 7e466fb..f23fa1c 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/NonQueryResponse.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/response/MetadataNonQueryResponse.java
@@ -18,13 +18,16 @@
*/
package org.apache.iotdb.cluster.rpc.response;
-public class NonQueryResponse extends BasicResponse {
+/**
+ * Handle response from metadata group leader
+ */
+public class MetadataNonQueryResponse extends BasicResponse {
- public NonQueryResponse(boolean redirected, boolean success, String
leaderStr, String errorMsg) {
+ public MetadataNonQueryResponse(boolean redirected, boolean success, String
leaderStr, String errorMsg) {
super(redirected, success, leaderStr, errorMsg);
}
- public NonQueryResponse(boolean redirected, boolean success) {
+ public MetadataNonQueryResponse(boolean redirected, boolean success) {
super(redirected, success, null, null);
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 984bfb6..da42cb7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -171,7 +171,7 @@ public class RaftUtils {
public static PeerId[] convertPhysicalNodeArrayToPeerIdArray(PhysicalNode[]
physicalNodes) {
PeerId[] peerIds = new PeerId[physicalNodes.length];
for (int i = 0; i < physicalNodes.length; i++) {
- peerIds[i] = new PeerId(physicalNodes[i].getIp(),
physicalNodes[i].getPort());
+ peerIds[i] = convertPhysicalNode(physicalNodes[i]);
}
return peerIds;
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
index 9326610..3dab29f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
@@ -163,7 +163,7 @@ public class Router {
}
/**
- * For a storage group, compute the nearest physical node on the hash ring
Only for test
+ * For a storage group, compute the nearest physical node on the hash ring
*/
public PhysicalNode routeNode(String objectKey) {
int hashVal = hashFunction.hash(objectKey);
@@ -213,7 +213,7 @@ public class Router {
* Show physical nodes by group id.
*/
public void showPhysicalNodes(String groupId) {
- PhysicalNode[] physicalPlans = Router.getInstance().routeGroup(groupId);
+ PhysicalNode[] physicalPlans = getNodesByGroupId(groupId);
for (PhysicalNode node : physicalPlans) {
System.out.println(node);
}
diff --git a/service-rpc/src/main/thrift/rpc.thrift
b/service-rpc/src/main/thrift/rpc.thrift
index c603ae0..9604bf8 100644
--- a/service-rpc/src/main/thrift/rpc.thrift
+++ b/service-rpc/src/main/thrift/rpc.thrift
@@ -52,7 +52,7 @@ struct TSHandleIdentifier {
2: required binary secret,
}
-// Client-side reference to a task running asynchronously on the server.
+// Client-side reference to a QPTask running asynchronously on the server.
struct TSOperationHandle {
1: required TSHandleIdentifier operationId