This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 9cb1236 Readjust task class and modify some cluster config
9cb1236 is described below
commit 9cb1236c7a3d602a531b6f89e29de277541fdfda
Author: lta <[email protected]>
AuthorDate: Wed Mar 27 17:45:19 2019 +0800
Readjust task class and modify some cluster config
---
.../apache/iotdb/cluster/callback/MultiTask.java | 34 ++--------
.../apache/iotdb/cluster/callback/SingleTask.java | 5 +-
.../org/apache/iotdb/cluster/callback/Task.java | 12 ++--
.../apache/iotdb/cluster/qp/ClusterQPExecutor.java | 5 +-
.../cluster/qp/executor/NonQueryExecutor.java | 34 +++++-----
.../cluster/rpc/service/TSServiceClusterImpl.java | 77 +++++++++-------------
.../org/apache/iotdb/cluster/utils/RaftUtils.java | 6 +-
7 files changed, 69 insertions(+), 104 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiTask.java
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiTask.java
index d9c20f1..81b6dc6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiTask.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.cluster.callback;
-import java.util.concurrent.CountDownLatch;
import org.apache.iotdb.cluster.rpc.request.BasicRequest;
import org.apache.iotdb.cluster.rpc.response.BasicResponse;
@@ -28,35 +27,16 @@ import org.apache.iotdb.cluster.rpc.response.BasicResponse;
*/
public class MultiTask extends Task {
- private BasicResponse finalResponse;
-
- /**
- * Num of sub-task
- */
- private CountDownLatch countDownLatch;
-
- public MultiTask(boolean isSyncTask, CountDownLatch taskBarrier,
BasicRequest request) {
- super(isSyncTask, taskBarrier, TaskState.INITIAL);
+ public MultiTask(boolean isSyncTask, int taskNum, BasicRequest request) {
+ super(isSyncTask, taskNum, TaskState.INITIAL);
setRequest(request);
}
- public CountDownLatch getCountDownLatch() {
- return countDownLatch;
- }
-
- public void setCountDownLatch(CountDownLatch countDownLatch) {
- this.countDownLatch = countDownLatch;
- }
-
-
- public BasicResponse getFinalResponse() {
- return finalResponse;
- }
-
- public void setFinalResponse(BasicResponse finalResponse) {
- this.finalResponse = finalResponse;
- }
-
+ /**
+ * Process response
+ *
+ * @param basicResponse response from receiver
+ */
@Override
public void run(BasicResponse basicResponse) {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
index 5a3ee16..e659452 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.cluster.callback;
-import java.util.concurrent.CountDownLatch;
import org.apache.iotdb.cluster.rpc.request.BasicRequest;
import org.apache.iotdb.cluster.rpc.response.BasicResponse;
@@ -28,8 +27,8 @@ import org.apache.iotdb.cluster.rpc.response.BasicResponse;
public class SingleTask extends Task {
- public SingleTask(boolean isSyncTask, CountDownLatch taskBarrier,
BasicRequest request) {
- super(isSyncTask, taskBarrier, TaskState.INITIAL);
+ public SingleTask(boolean isSyncTask, BasicRequest request) {
+ super(isSyncTask, 1, TaskState.INITIAL);
setRequest(request);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
index 329ef04..55b42ed 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
@@ -47,9 +47,9 @@ public abstract class Task {
*/
private TaskState taskState;
- public Task(boolean isSyncTask, CountDownLatch taskNum, TaskState taskState)
{
+ public Task(boolean isSyncTask, int taskNum, TaskState taskState) {
this.isSyncTask = isSyncTask;
- this.taskNum = taskNum;
+ this.taskNum = new CountDownLatch(taskNum);
this.taskState = taskState;
}
@@ -72,8 +72,8 @@ public abstract class Task {
return taskNum;
}
- public void setTaskNum(CountDownLatch taskNum) {
- this.taskNum = taskNum;
+ public void setTaskNum(int taskNum) {
+ this.taskNum = new CountDownLatch(taskNum);
}
public TaskState getTaskState() {
@@ -104,4 +104,8 @@ public abstract class Task {
public enum TaskState {
INITIAL, REDIRECT, FINISH, EXCEPTION
}
+
+ public void await() throws InterruptedException {
+ this.taskNum.await();
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index 355acba..367bf79 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.cluster.qp;
+import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
@@ -58,10 +59,10 @@ public abstract class ClusterQPExecutor {
* 1. If this node belongs to the storage group
* 2. If this node is leader.
*/
- public boolean canHandle(String storageGroup) throws RaftConnectionException
{
+ public boolean canHandle(String storageGroup, BoltCliClientService
cliClientService) throws RaftConnectionException {
if(router.containPhysicalNode(storageGroup, localNode)){
String groupId = getGroupIdBySG(storageGroup);
-
if(RaftUtils.convertPeerId(RaftUtils.getLeader(groupId)).equals(localNode)){
+ if(RaftUtils.convertPeerId(RaftUtils.getLeader(groupId,
cliClientService)).equals(localNode)){
return true;
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index e4a87ad..3293d6a 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.cluster.qp.executor;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
import org.apache.iotdb.cluster.callback.SingleTask;
import org.apache.iotdb.cluster.callback.Task;
import org.apache.iotdb.cluster.callback.Task.TaskState;
@@ -170,10 +169,11 @@ public class NonQueryExecutor extends ClusterQPExecutor {
return false;
case SET_FILE_LEVEL:
boolean fileLevelExist = false;
- try {
- fileLevelExist = mManager.checkFileLevel(path.getFullPath());
- } catch (PathErrorException e) {
- }
+// try {
+// PathCheckRet ret =
mManager.checkPathStorageLevelAndGetDataType(path.getFullPath());
+// fileLevelExist = !ret.isSuccessfully();
+// } catch (PathErrorException e) {
+// }
if (fileLevelExist) {
throw new ProcessorException(
String.format("File level %s already exists.",
path.getFullPath()));
@@ -181,11 +181,10 @@ public class NonQueryExecutor extends ClusterQPExecutor {
ChangeMetadataRequest request = new ChangeMetadataRequest(
CLUSTER_CONFIG.METADATA_GROUP_ID,
metadataPlan);
- PeerId leader =
RaftUtils.getLeader(CLUSTER_CONFIG.METADATA_GROUP_ID);
+ PeerId leader =
RaftUtils.getLeader(CLUSTER_CONFIG.METADATA_GROUP_ID, cliClientService);
- CountDownLatch latch = new CountDownLatch(SUB_TASK_NUM);
- SingleTask task = new SingleTask(false, latch, request);
- return asyncHandleTask(task, leader, latch, 0);
+ SingleTask task = new SingleTask(false, request);
+ return asyncHandleTask(task, leader, 0);
}
default:
throw new ProcessorException("unknown namespace type:" +
namespaceType);
@@ -203,16 +202,15 @@ public class NonQueryExecutor extends ClusterQPExecutor {
private boolean handleRequest(String storageGroup, PhysicalPlan plan)
throws ProcessorException, IOException, RaftConnectionException,
InterruptedException {
/** Check if the plan can be executed locally. **/
- if (canHandle(storageGroup)) {
+ if (canHandle(storageGroup, cliClientService)) {
return qpExecutor.processNonQuery(plan);
} else {
String groupId = getGroupIdBySG(storageGroup);
ChangeMetadataRequest request = new ChangeMetadataRequest(groupId, plan);
- PeerId leader = RaftUtils.getLeader(groupId);
+ PeerId leader = RaftUtils.getLeader(groupId, cliClientService);
- CountDownLatch latch = new CountDownLatch(SUB_TASK_NUM);
- SingleTask task = new SingleTask(false, latch, request);
- return asyncHandleTask(task, leader, latch, 0);
+ SingleTask task = new SingleTask(false, request);
+ return asyncHandleTask(task, leader, 0);
}
}
@@ -225,7 +223,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
* @param taskRetryNum Number of task retries due to timeout and redirected.
* @return request result
*/
- private boolean asyncHandleTask(Task task, PeerId leader, CountDownLatch
latch, int taskRetryNum)
+ private boolean asyncHandleTask(Task task, PeerId leader, int taskRetryNum)
throws RaftConnectionException, InterruptedException {
if (taskRetryNum >= TASK_MAX_RETRY) {
throw new RaftConnectionException(String.format("Task retries reach the
upper bound %s",
@@ -234,11 +232,11 @@ public class NonQueryExecutor extends ClusterQPExecutor {
NodeAsClient client = new RaftNodeAsClient();
/** Call async method **/
client.asyncHandleRequest(cliClientService, task.getRequest(), leader,
task);
- latch.await();
+ task.await();
if (task.getTaskState() == TaskState.INITIAL || task.getTaskState() ==
TaskState.REDIRECT
|| task.getTaskState() == TaskState.EXCEPTION) {
- task.setTaskNum(new CountDownLatch(SUB_TASK_NUM));
- return asyncHandleTask(task, leader, latch, taskRetryNum + 1);
+ task.setTaskNum(SUB_TASK_NUM);
+ return asyncHandleTask(task, leader, taskRetryNum + 1);
}
return task.getResponse().isSuccess();
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
index 775c0a6..1c97a09 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
@@ -26,13 +26,6 @@ import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.service.TSServiceImpl;
-import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,38 +54,38 @@ public class TSServiceClusterImpl extends TSServiceImpl {
cliClientService.init(new CliOptions());
}
- //TODO
- @Override
- public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) throws
TException {
- throw new TException("not support");
- }
-
- //TODO
-
- /**
- * Judge whether the statement is ADMIN COMMAND and if true,
executeWithGlobalTimeFilter it.
- *
- * @param statement command
- * @return true if the statement is ADMIN COMMAND
- * @throws IOException exception
- */
- @Override
- public boolean execAdminCommand(String statement) throws IOException {
- throw new IOException("exec admin command not support");
- }
-
- //TODO
- @Override
- public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq
req) throws TException {
- throw new TException("query not support");
- }
-
-
- //TODO
- @Override
- public TSFetchResultsResp fetchResults(TSFetchResultsReq req) throws
TException {
- throw new TException("not support");
- }
+// //TODO
+// @Override
+// public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) throws
TException {
+// throw new TException("not support");
+// }
+//
+// //TODO
+//
+// /**
+// * Judge whether the statement is ADMIN COMMAND and if true,
executeWithGlobalTimeFilter it.
+// *
+// * @param statement command
+// * @return true if the statement is ADMIN COMMAND
+// * @throws IOException exception
+// */
+// @Override
+// public boolean execAdminCommand(String statement) throws IOException {
+// throw new IOException("exec admin command not support");
+// }
+//
+// //TODO
+// @Override
+// public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq
req) throws TException {
+// throw new TException("query not support");
+// }
+//
+//
+// //TODO
+// @Override
+// public TSFetchResultsResp fetchResults(TSFetchResultsReq req) throws
TException {
+// throw new TException("not support");
+// }
@Override
public boolean executeNonQuery(PhysicalPlan plan) throws ProcessorException {
@@ -105,10 +98,4 @@ public class TSServiceClusterImpl extends TSServiceImpl {
closeOperation(null);
closeSession(null);
}
-
- //TODO
- @Override
- public ServerProperties getProperties() throws TException {
- throw new TException("not support");
- }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index fc77f1d..a2218b2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.cluster.utils;
import com.alipay.sofa.jraft.RouteTable;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
-import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
import java.util.concurrent.TimeoutException;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
@@ -38,12 +37,9 @@ public class RaftUtils {
* @param groupId group id of raft group
* @return PeerId of leader
*/
- public static PeerId getLeader(String groupId) throws
RaftConnectionException {
+ public static PeerId getLeader(String groupId, BoltCliClientService
cliClientService) throws RaftConnectionException {
Configuration conf = getConfiguration(groupId);
RouteTable.getInstance().updateConfiguration(groupId, conf);
-
- final BoltCliClientService cliClientService = new BoltCliClientService();
- cliClientService.init(new CliOptions());
try {
if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId,
1000).isOk()) {
throw new RaftConnectionException("Refresh leader failed");