This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new 229fd03  modify some impl of query timeseries
229fd03 is described below

commit 229fd030b37a2d883c0f6c86669865265e919c92
Author: lta <[email protected]>
AuthorDate: Sat Mar 30 19:01:54 2019 +0800

    modify some impl of query timeseries
---
 .../org/apache/iotdb/cluster/callback/Task.java    |  4 ++
 .../apache/iotdb/cluster/config/ClusterConfig.java | 45 +++++++++++++++-------
 .../apache/iotdb/cluster/qp/ClusterQPExecutor.java | 13 ++++++-
 .../cluster/qp/executor/NonQueryExecutor.java      |  2 +-
 .../cluster/qp/executor/QueryMetadataExecutor.java | 19 +++++++--
 .../rpc/request/QueryTimeSeriesRequest.java        |  1 -
 .../cluster/rpc/service/TSServiceClusterImpl.java  |  3 +-
 .../org/apache/iotdb/cluster/utils/RaftUtils.java  | 28 +++++++++-----
 .../apache/iotdb/cluster/utils/hash/Router.java    | 13 ++++++-
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  6 +--
 10 files changed, 95 insertions(+), 39 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
index 0c73a01..7940a5f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/callback/Task.java
@@ -33,6 +33,7 @@ public abstract class Task {
    * Task request
    */
   protected BasicRequest request;
+
   /**
    * Whether it's a synchronization task or not.
    */
@@ -112,6 +113,9 @@ public abstract class Task {
     INITIAL, REDIRECT, FINISH, EXCEPTION
   }
 
+  /**
+   * Wait until task is finished.
+   */
   public void await() throws InterruptedException {
     this.taskCountDownLatch.await();
   }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index 67e176b..83b129f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -38,40 +38,57 @@ public class ClusterConfig {
   private static final String DEFAULT_RAFT_LOG_DIR = "log";
   private static final String DEFAULT_RAFT_SNAPSHOT_DIR = "snapshot";
 
-
-  // Cluster node: {ip1,ip2,...,ipn}
+  /**
+   * Cluster node: {ip1,ip2,...,ipn}
+   */
   private String[] nodes = {DEFAULT_NODE};
 
-  // Replication number
+  /**
+   * Replication number
+   */
   private int replication = 3;
 
-  private String ip = null;
+  private String ip = "127.0.0.1";
+
   private int port = 8888;
 
-  // Path for holder to store raft log
+  /**
+   * Path for holder to store raft log
+   */
   private String raftLogPath;
 
-  // Path for holder to store raft snapshot
+  /**
+   * Path for holder to store raft snapshot
+   */
   private String raftSnapshotPath;
 
-  // Path for holder to store raft metadata
+  /**
+   * Path for holder to store raft metadata
+   */
   private String raftMetadataPath;
 
-  // When the number of the difference between
-  // leader and follower log is less than this value, it is considered as 
'catch-up'
+  /**
+   * When the number of the difference between leader and follower log is less 
than this value, it
+   * is considered as 'catch-up'
+   */
   private int maxCatchUpLogNum = 100000;
 
-  // Whether to enable the delayed snapshot mechanism or not
+  /**
+   * Whether to enable the delayed snapshot mechanism or not
+   */
   private boolean delaySnapshot = false;
-  // Maximum allowed delay hours
+
+  /**
+   * Maximin allowed delay hours
+   */
   private int delayHours = 24;
 
   /**
-   * count limit to redo a single task
+   * Count limit to redo a single task
    **/
   private int taskRedoCount = 3;
   /**
-   * timeout limit for a single task, the unit is milliseconds
+   * Timeout limit for a single task, the unit is milliseconds
    **/
   private int taskTimeoutMs = 1000;
 
@@ -81,7 +98,7 @@ public class ClusterConfig {
     // empty constructor
   }
 
-  public void updatePath(){
+  public void updatePath() {
     IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
     String iotdbDataDir = conf.getDataDir();
     iotdbDataDir = FilePathUtils.regularizePath(iotdbDataDir);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index f100f1d..ab9f153 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -46,6 +46,7 @@ public abstract class ClusterQPExecutor {
       CLUSTER_CONFIG.getPort());
   protected OverflowQPExecutor qpExecutor = new OverflowQPExecutor();
   protected MManager mManager = MManager.getInstance();
+
   /**
    * Rpc Service Client
    */
@@ -55,6 +56,7 @@ public abstract class ClusterQPExecutor {
    * Count limit to redo a single task
    */
   protected static final int TASK_MAX_RETRY = 
CLUSTER_CONFIG.getTaskRedoCount();
+
   /**
    * Number of subtask in task segmentation
    */
@@ -81,10 +83,10 @@ public abstract class ClusterQPExecutor {
   }
 
   /**
-   * Verify if the command can execute in local. 1. If this node belongs to 
the storage group 2. If
+   * Verify if the non query command can execute in local. 1. If this node 
belongs to the storage group 2. If
    * this node is leader.
    */
-  public boolean canHandle(String storageGroup) {
+  public boolean canHandleNonQuery(String storageGroup) {
     if (router.containPhysicalNode(storageGroup, localNode)) {
       String groupId = getGroupIdBySG(storageGroup);
       if 
(RaftUtils.convertPeerId(RaftUtils.getTargetPeerID(groupId)).equals(localNode)) 
{
@@ -95,6 +97,13 @@ public abstract class ClusterQPExecutor {
   }
 
   /**
+   * Verify if the query command can execute in local. Check if this node 
belongs to the storage group
+   */
+  public boolean canHandleQuery(String storageGroup) {
+    return router.containPhysicalNode(storageGroup, localNode);
+  }
+
+  /**
    * Async handle task by task and leader id
    *
    * @param task request task
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 6bc92f5..a95a227 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -182,7 +182,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
   private boolean handleRequest(String storageGroup, PhysicalPlan plan)
       throws ProcessorException, IOException, RaftConnectionException, 
InterruptedException {
     /** Check if the plan can be executed locally. **/
-    if (canHandle(storageGroup)) {
+    if (canHandleNonQuery(storageGroup)) {
       return qpExecutor.processNonQuery(plan);
     } else {
       String groupId = getGroupIdBySG(storageGroup);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
index 71c88d1..4560973 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/QueryMetadataExecutor.java
@@ -75,11 +75,11 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
     String storageGroup = getStroageGroupByDevice(path);
     String groupId = getGroupIdBySG(storageGroup);
     QueryTimeSeriesRequest request = new QueryTimeSeriesRequest(groupId, path);
-    PeerId leader = RaftUtils.getTargetPeerID(groupId);
+    PeerId leader = RaftUtils.getRandomPeerID(groupId);
     SingleTask task = new SingleTask(false, request);
 
     /** Check if the plan can be executed locally. **/
-    if (canHandle(storageGroup)) {
+    if (canHandleQuery(storageGroup)) {
       return queryTimeSeriesLocally(path, groupId, task);
     } else {
       try {
@@ -91,17 +91,23 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
     }
   }
 
+  /**
+   * Handle "show timeseries <path>" statement
+   *
+   * @param path column path
+   */
   private List<List<String>> queryTimeSeriesLocally(String path, String 
groupId, SingleTask task)
       throws InterruptedException {
     final byte[] reqContext = new byte[4];
     Bits.putInt(reqContext, 0, requestId.incrementAndGet());
-    DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder) 
server.getDataPartitionHolder(groupId);
+    DataPartitionRaftHolder dataPartitionHolder = (DataPartitionRaftHolder) 
server
+        .getDataPartitionHolder(groupId);
     ((RaftService) dataPartitionHolder.getService()).getNode()
         .readIndex(reqContext, new ReadIndexClosure() {
 
           @Override
           public void run(Status status, long index, byte[] reqCtx) {
-            QueryTimeSeriesResponse response = null;
+            QueryTimeSeriesResponse response;
             if (status.isOk()) {
               try {
                 response = new QueryTimeSeriesResponse(false, true,
@@ -125,6 +131,11 @@ public class QueryMetadataExecutor extends 
ClusterQPExecutor {
     return ((QueryTimeSeriesResponse) response).getTimeSeries();
   }
 
+  /**
+   * Handle "show storage group" statement locally
+   *
+   * @return Set of storage group name
+   */
   private Set<String> queryStorageGroupLocally() throws InterruptedException {
     QueryStorageGroupRequest request = new 
QueryStorageGroupRequest(ClusterConfig.METADATA_GROUP_ID);
     SingleTask task = new SingleTask(false, request);
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/QueryTimeSeriesRequest.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/QueryTimeSeriesRequest.java
index 7c2c12f..142362f 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/QueryTimeSeriesRequest.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/request/QueryTimeSeriesRequest.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.cluster.rpc.request;
 
 import java.io.Serializable;
-import org.apache.iotdb.cluster.rpc.MetadataType;
 
 public class QueryTimeSeriesRequest extends BasicRequest implements 
Serializable {
 
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
index 3672ca4..a384830 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/service/TSServiceClusterImpl.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.Set;
 import org.apache.iotdb.cluster.qp.executor.NonQueryExecutor;
 import org.apache.iotdb.cluster.qp.executor.QueryMetadataExecutor;
-import org.apache.iotdb.cluster.rpc.MetadataType;
 import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -107,7 +106,7 @@ public class TSServiceClusterImpl extends TSServiceImpl {
 
   @Override
   protected List<List<String>> getTimeSeriesForPath(String path)
-      throws PathErrorException, InterruptedException {
+      throws PathErrorException, InterruptedException, ProcessorException {
     return queryMetadataExecutor.get().processTimeSeriesQuery(path);
   }
 }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 364ef24..984bfb6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -83,22 +83,30 @@ public class RaftUtils {
    */
   public static PeerId getTargetPeerID(String groupId) {
     if (!groupLeaderCache.containsKey(groupId)) {
-      PeerId randomPeerId;
-      if (groupId.equals(CLUSTER_CONFIG.METADATA_GROUP_ID)) {
-        RaftService service = (RaftService) 
server.getMetadataHolder().getService();
-        List<PeerId> peerIdList = service.getPeerIdList();
-        randomPeerId = peerIdList.get(getRandomInt(peerIdList.size()));
-      } else {
-        PhysicalNode[] physicalNodes = router.getNodesByGroupId(groupId);
-        PhysicalNode node = physicalNodes[getRandomInt(physicalNodes.length)];
-        randomPeerId = new PeerId(node.ip, node.port);
-      }
+      PeerId randomPeerId = getRandomPeerID(groupId);
       groupLeaderCache.put(groupId, randomPeerId);
     }
     return groupLeaderCache.get(groupId);
   }
 
   /**
+   * Get random peer id
+   */
+  public static PeerId getRandomPeerID(String groupId) {
+    PeerId randomPeerId;
+    if (groupId.equals(CLUSTER_CONFIG.METADATA_GROUP_ID)) {
+      RaftService service = (RaftService) 
server.getMetadataHolder().getService();
+      List<PeerId> peerIdList = service.getPeerIdList();
+      randomPeerId = peerIdList.get(getRandomInt(peerIdList.size()));
+    } else {
+      PhysicalNode[] physicalNodes = router.getNodesByGroupId(groupId);
+      PhysicalNode node = physicalNodes[getRandomInt(physicalNodes.length)];
+      randomPeerId = convertPhysicalNode(node);
+    }
+    return randomPeerId;
+  }
+
+  /**
    * Get random int from [0, bound).
    */
   public static int getRandomInt(int bound) {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
index 09f54ba..9326610 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.cluster.utils.hash;
 
+import com.alipay.sofa.jraft.util.OnlyForTest;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -29,6 +30,9 @@ import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.ErrorConfigureExecption;
 
+/**
+ * Cluster router, it's responsible for hash mapping and routing to specified 
data groups
+ */
 public class Router {
 
   /**
@@ -126,6 +130,8 @@ public class Router {
 
   /**
    * Calculate the physical nodes corresponding to the replications where a 
data point is located
+   *
+   * @param objectKey storage group
    */
   public PhysicalNode[] routeGroup(String objectKey) {
     if (sgRouter.containsKey(objectKey)) {
@@ -145,6 +151,9 @@ public class Router {
     return this.getGroupsNodes(new PhysicalNode(ip, port));
   }
 
+  /**
+   * Add a new node to cluster
+   */
   private void addNode(PhysicalNode node, int virtualNum) {
     physicalRing.put(hashFunction.hash(node.getKey()), node);
     for (int i = 0; i < virtualNum; i++) {
@@ -181,14 +190,14 @@ public class Router {
     nodeMapGroupIdCache.clear();
   }
 
-  // only for test
+  @OnlyForTest
   public void showPhysicalRing() {
     for (Entry<Integer, PhysicalNode> entry : physicalRing.entrySet()) {
       System.out.println(String.format("%d-%s", entry.getKey(), 
entry.getValue().getKey()));
     }
   }
 
-  //only for test
+  @OnlyForTest
   public void showVirtualRing() {
     for (Entry<Integer, VirtualNode> entry : virtualRing.entrySet()) {
       System.out.println(String.format("%d-%s", entry.getKey(), 
entry.getValue().getKey()));
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java 
b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 05fabfd..064446a 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -232,7 +232,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
         try {
           List<List<String>> showTimeseriesList = getTimeSeriesForPath(path);
           resp.setShowTimeseriesList(showTimeseriesList);
-        } catch (PathErrorException | InterruptedException e) {
+        } catch (PathErrorException | InterruptedException | 
ProcessorException e) {
           status = getErrorStatus(
                   String.format("Failed to fetch timeseries %s's metadata 
because: %s",
                   req.getColumnPath(), e));
@@ -269,7 +269,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
         status = new TS_Status(TS_StatusCode.SUCCESS_STATUS);
         break;
       case "METADATA_IN_JSON":
-        String metadataInJson = null;
+        String metadataInJson;
         try {
           metadataInJson = MManager.getInstance().getMetadataInString();
         } catch (OutOfMemoryError outOfMemoryError) { // TODO OOME
@@ -348,7 +348,7 @@ public class TSServiceImpl implements TSIService.Iface, 
ServerContext {
   }
 
   protected List<List<String>> getTimeSeriesForPath(String path)
-      throws PathErrorException, InterruptedException {
+      throws PathErrorException, InterruptedException, ProcessorException {
     return MManager.getInstance().getShowTimeseriesPath(path);
   }
 

Reply via email to