This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new 9cb1236  Readjust task class and modify some cluster config
9cb1236 is described below

commit 9cb1236c7a3d602a531b6f89e29de277541fdfda
Author: lta <[email protected]>
AuthorDate: Wed Mar 27 17:45:19 2019 +0800

    Readjust task class and modify some cluster config
---
 .../apache/iotdb/cluster/callback/MultiTask.java   | 34 ++--------
 .../apache/iotdb/cluster/callback/SingleTask.java  |  5 +-
 .../org/apache/iotdb/cluster/callback/Task.java    | 12 ++--
 .../apache/iotdb/cluster/qp/ClusterQPExecutor.java |  5 +-
 .../cluster/qp/executor/NonQueryExecutor.java      | 34 +++++-----
 .../cluster/rpc/service/TSServiceClusterImpl.java  | 77 +++++++++-------------
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  |  6 +-
 7 files changed, 69 insertions(+), 104 deletions(-)

diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiTask.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiTask.java
index d9c20f1..81b6dc6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/MultiTask.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.cluster.callback;
 
-import java.util.concurrent.CountDownLatch;
 import org.apache.iotdb.cluster.rpc.request.BasicRequest;
 import org.apache.iotdb.cluster.rpc.response.BasicResponse;
 
@@ -28,35 +27,16 @@ import org.apache.iotdb.cluster.rpc.response.BasicResponse;
  */
 public class MultiTask extends Task {
 
-  private BasicResponse finalResponse;
-
-  /**
-   * Num of sub-task
-   */
-  private CountDownLatch countDownLatch;
-
-  public MultiTask(boolean isSyncTask, CountDownLatch taskBarrier, 
BasicRequest request) {
-    super(isSyncTask, taskBarrier, TaskState.INITIAL);
+  public MultiTask(boolean isSyncTask, int taskNum, BasicRequest request) {
+    super(isSyncTask, taskNum, TaskState.INITIAL);
     setRequest(request);
   }
 
-  public CountDownLatch getCountDownLatch() {
-    return countDownLatch;
-  }
-
-  public void setCountDownLatch(CountDownLatch countDownLatch) {
-    this.countDownLatch = countDownLatch;
-  }
-
-
-  public BasicResponse getFinalResponse() {
-    return finalResponse;
-  }
-
-  public void setFinalResponse(BasicResponse finalResponse) {
-    this.finalResponse = finalResponse;
-  }
-
+  /**
+   * Process response
+   *
+   * @param basicResponse response from receiver
+   */
   @Override
   public void run(BasicResponse basicResponse) {
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
index 5a3ee16..e659452 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/SingleTask.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.cluster.callback;
 
-import java.util.concurrent.CountDownLatch;
 import org.apache.iotdb.cluster.rpc.request.BasicRequest;
 import org.apache.iotdb.cluster.rpc.response.BasicResponse;
 
@@ -28,8 +27,8 @@ import org.apache.iotdb.cluster.rpc.response.BasicResponse;
 public class SingleTask extends Task {
 
 
-  public SingleTask(boolean isSyncTask, CountDownLatch taskBarrier, 
BasicRequest request) {
-    super(isSyncTask, taskBarrier, TaskState.INITIAL);
+  public SingleTask(boolean isSyncTask, BasicRequest request) {
+    super(isSyncTask, 1, TaskState.INITIAL);
     setRequest(request);
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
index 329ef04..55b42ed 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
@@ -47,9 +47,9 @@ public abstract class Task {
    */
   private TaskState taskState;
 
-  public Task(boolean isSyncTask, CountDownLatch taskNum, TaskState taskState) 
{
+  public Task(boolean isSyncTask, int taskNum, TaskState taskState) {
     this.isSyncTask = isSyncTask;
-    this.taskNum = taskNum;
+    this.taskNum = new CountDownLatch(taskNum);
     this.taskState = taskState;
   }
 
@@ -72,8 +72,8 @@ public abstract class Task {
     return taskNum;
   }
 
-  public void setTaskNum(CountDownLatch taskNum) {
-    this.taskNum = taskNum;
+  public void setTaskNum(int taskNum) {
+    this.taskNum = new CountDownLatch(taskNum);
   }
 
   public TaskState getTaskState() {
@@ -104,4 +104,8 @@ public abstract class Task {
   public enum TaskState {
     INITIAL, REDIRECT, FINISH, EXCEPTION
   }
+
+  public void await() throws InterruptedException {
+    this.taskNum.await();
+  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index 355acba..367bf79 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.cluster.qp;
 
+import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
@@ -58,10 +59,10 @@ public abstract class ClusterQPExecutor {
    * 1. If this node belongs to the storage group
    * 2. If this node is leader.
    */
-  public boolean canHandle(String storageGroup) throws RaftConnectionException 
{
+  public boolean canHandle(String storageGroup, BoltCliClientService 
cliClientService) throws RaftConnectionException {
     if(router.containPhysicalNode(storageGroup, localNode)){
       String groupId = getGroupIdBySG(storageGroup);
-      
if(RaftUtils.convertPeerId(RaftUtils.getLeader(groupId)).equals(localNode)){
+      if(RaftUtils.convertPeerId(RaftUtils.getLeader(groupId, 
cliClientService)).equals(localNode)){
         return true;
       }
     }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index e4a87ad..3293d6a 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.cluster.qp.executor;
 import com.alipay.sofa.jraft.entity.PeerId;
 import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
 import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
 import org.apache.iotdb.cluster.callback.SingleTask;
 import org.apache.iotdb.cluster.callback.Task;
 import org.apache.iotdb.cluster.callback.Task.TaskState;
@@ -170,10 +169,11 @@ public class NonQueryExecutor extends ClusterQPExecutor {
         return false;
       case SET_FILE_LEVEL:
         boolean fileLevelExist = false;
-        try {
-          fileLevelExist = mManager.checkFileLevel(path.getFullPath());
-        } catch (PathErrorException e) {
-        }
+//        try {
+//          PathCheckRet ret = 
mManager.checkPathStorageLevelAndGetDataType(path.getFullPath());
+//          fileLevelExist = !ret.isSuccessfully();
+//        } catch (PathErrorException e) {
+//        }
         if (fileLevelExist) {
           throw new ProcessorException(
               String.format("File level %s already exists.", 
path.getFullPath()));
@@ -181,11 +181,10 @@ public class NonQueryExecutor extends ClusterQPExecutor {
           ChangeMetadataRequest request = new ChangeMetadataRequest(
               CLUSTER_CONFIG.METADATA_GROUP_ID,
               metadataPlan);
-          PeerId leader = 
RaftUtils.getLeader(CLUSTER_CONFIG.METADATA_GROUP_ID);
+          PeerId leader = 
RaftUtils.getLeader(CLUSTER_CONFIG.METADATA_GROUP_ID, cliClientService);
 
-          CountDownLatch latch = new CountDownLatch(SUB_TASK_NUM);
-          SingleTask task = new SingleTask(false, latch, request);
-          return asyncHandleTask(task, leader, latch, 0);
+          SingleTask task = new SingleTask(false, request);
+          return asyncHandleTask(task, leader, 0);
         }
       default:
         throw new ProcessorException("unknown namespace type:" + 
namespaceType);
@@ -203,16 +202,15 @@ public class NonQueryExecutor extends ClusterQPExecutor {
   private boolean handleRequest(String storageGroup, PhysicalPlan plan)
       throws ProcessorException, IOException, RaftConnectionException, 
InterruptedException {
     /** Check if the plan can be executed locally. **/
-    if (canHandle(storageGroup)) {
+    if (canHandle(storageGroup, cliClientService)) {
       return qpExecutor.processNonQuery(plan);
     } else {
       String groupId = getGroupIdBySG(storageGroup);
       ChangeMetadataRequest request = new ChangeMetadataRequest(groupId, plan);
-      PeerId leader = RaftUtils.getLeader(groupId);
+      PeerId leader = RaftUtils.getLeader(groupId, cliClientService);
 
-      CountDownLatch latch = new CountDownLatch(SUB_TASK_NUM);
-      SingleTask task = new SingleTask(false, latch, request);
-      return asyncHandleTask(task, leader, latch, 0);
+      SingleTask task = new SingleTask(false, request);
+      return asyncHandleTask(task, leader, 0);
     }
   }
 
@@ -225,7 +223,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
    * @param taskRetryNum Number of task retries due to timeout and redirected.
    * @return request result
    */
-  private boolean asyncHandleTask(Task task, PeerId leader, CountDownLatch 
latch, int taskRetryNum)
+  private boolean asyncHandleTask(Task task, PeerId leader, int taskRetryNum)
       throws RaftConnectionException, InterruptedException {
     if (taskRetryNum >= TASK_MAX_RETRY) {
       throw new RaftConnectionException(String.format("Task retries reach the 
upper bound %s",
@@ -234,11 +232,11 @@ public class NonQueryExecutor extends ClusterQPExecutor {
     NodeAsClient client = new RaftNodeAsClient();
     /** Call async method **/
     client.asyncHandleRequest(cliClientService, task.getRequest(), leader, 
task);
-    latch.await();
+    task.await();
     if (task.getTaskState() == TaskState.INITIAL || task.getTaskState() == 
TaskState.REDIRECT
         || task.getTaskState() == TaskState.EXCEPTION) {
-      task.setTaskNum(new CountDownLatch(SUB_TASK_NUM));
-      return asyncHandleTask(task, leader, latch, taskRetryNum + 1);
+      task.setTaskNum(SUB_TASK_NUM);
+      return asyncHandleTask(task, leader, taskRetryNum + 1);
     }
     return task.getResponse().isSuccess();
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
index 775c0a6..1c97a09 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
@@ -26,13 +26,6 @@ import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.service.TSServiceImpl;
-import org.apache.iotdb.service.rpc.thrift.ServerProperties;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
-import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchMetadataResp;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
-import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,38 +54,38 @@ public class TSServiceClusterImpl extends TSServiceImpl {
     cliClientService.init(new CliOptions());
   }
 
-  //TODO
-  @Override
-  public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) throws 
TException {
-    throw new TException("not support");
-  }
-
-  //TODO
-
-  /**
-   * Judge whether the statement is ADMIN COMMAND and if true, 
executeWithGlobalTimeFilter it.
-   *
-   * @param statement command
-   * @return true if the statement is ADMIN COMMAND
-   * @throws IOException exception
-   */
-  @Override
-  public boolean execAdminCommand(String statement) throws IOException {
-    throw new IOException("exec admin command not support");
-  }
-
-  //TODO
-  @Override
-  public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq 
req) throws TException {
-    throw new TException("query not support");
-  }
-
-
-  //TODO
-  @Override
-  public TSFetchResultsResp fetchResults(TSFetchResultsReq req) throws 
TException {
-    throw new TException("not support");
-  }
+//  //TODO
+//  @Override
+//  public TSFetchMetadataResp fetchMetadata(TSFetchMetadataReq req) throws 
TException {
+//    throw new TException("not support");
+//  }
+//
+//  //TODO
+//
+//  /**
+//   * Judge whether the statement is ADMIN COMMAND and if true, 
executeWithGlobalTimeFilter it.
+//   *
+//   * @param statement command
+//   * @return true if the statement is ADMIN COMMAND
+//   * @throws IOException exception
+//   */
+//  @Override
+//  public boolean execAdminCommand(String statement) throws IOException {
+//    throw new IOException("exec admin command not support");
+//  }
+//
+//  //TODO
+//  @Override
+//  public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq 
req) throws TException {
+//    throw new TException("query not support");
+//  }
+//
+//
+//  //TODO
+//  @Override
+//  public TSFetchResultsResp fetchResults(TSFetchResultsReq req) throws 
TException {
+//    throw new TException("not support");
+//  }
 
   @Override
   public boolean executeNonQuery(PhysicalPlan plan) throws ProcessorException {
@@ -105,10 +98,4 @@ public class TSServiceClusterImpl extends TSServiceImpl {
     closeOperation(null);
     closeSession(null);
   }
-
-  //TODO
-  @Override
-  public ServerProperties getProperties() throws TException {
-    throw new TException("not support");
-  }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index fc77f1d..a2218b2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.cluster.utils;
 import com.alipay.sofa.jraft.RouteTable;
 import com.alipay.sofa.jraft.conf.Configuration;
 import com.alipay.sofa.jraft.entity.PeerId;
-import com.alipay.sofa.jraft.option.CliOptions;
 import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
 import java.util.concurrent.TimeoutException;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
@@ -38,12 +37,9 @@ public class RaftUtils {
    * @param groupId group id of raft group
    * @return PeerId of leader
    */
-  public static PeerId getLeader(String groupId) throws 
RaftConnectionException {
+  public static PeerId getLeader(String groupId, BoltCliClientService 
cliClientService) throws RaftConnectionException {
     Configuration conf = getConfiguration(groupId);
     RouteTable.getInstance().updateConfiguration(groupId, conf);
-
-    final BoltCliClientService cliClientService = new BoltCliClientService();
-    cliClientService.init(new CliOptions());
     try {
       if (!RouteTable.getInstance().refreshLeader(cliClientService, groupId, 
1000).isOk()) {
         throw new RaftConnectionException("Refresh leader failed");

Reply via email to