This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 411ea778316b8df7f6e5a0727dbb24af7fbc2e3a
Author: lta <[email protected]>
AuthorDate: Wed May 22 19:33:06 2019 +0800

    remove groupIdMapNodeCache
---
 .../org/apache/iotdb/cluster/entity/Server.java    | 34 ++++++++++------------
 .../{QueryDataTask.java => DataQueryTask.java}     |  4 +--
 .../cluster/query/utils/ClusterRpcReaderUtils.java |  8 ++---
 .../iotdb/cluster/rpc/raft/NodeAsClient.java       |  4 +--
 .../rpc/raft/impl/RaftNodeAsClientManager.java     |  8 ++---
 .../iotdb/cluster/utils/hash/PhysicalNode.java     |  9 ++++++
 .../apache/iotdb/cluster/utils/hash/Router.java    | 26 +++--------------
 .../iotdb/cluster/utils/hash/RouterTest.java       |  2 +-
 8 files changed, 41 insertions(+), 54 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index 27b0e75..a84e389 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -32,9 +32,6 @@ import 
org.apache.iotdb.cluster.entity.metadata.MetadataHolder;
 import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder;
 import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder;
 import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager;
-import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryJobNumAsyncProcessor;
-import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryLeaderAsyncProcessor;
-import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryMetricAsyncProcessor;
 import 
org.apache.iotdb.cluster.rpc.raft.processor.nonquery.DataGroupNonQueryAsyncProcessor;
 import 
org.apache.iotdb.cluster.rpc.raft.processor.nonquery.MetaGroupNonQueryAsyncProcessor;
 import 
org.apache.iotdb.cluster.rpc.raft.processor.querydata.CloseSeriesReaderSyncProcessor;
@@ -46,15 +43,18 @@ import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryMetadataIn
 import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryPathsAsyncProcessor;
 import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QuerySeriesTypeAsyncProcessor;
 import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryTimeSeriesAsyncProcessor;
+import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryJobNumAsyncProcessor;
+import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryLeaderAsyncProcessor;
+import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryMetricAsyncProcessor;
 import 
org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryStatusAsyncProcessor;
+import org.apache.iotdb.cluster.service.ClusterMonitor;
 import org.apache.iotdb.cluster.utils.RaftUtils;
 import org.apache.iotdb.cluster.utils.hash.PhysicalNode;
 import org.apache.iotdb.cluster.utils.hash.Router;
-import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.RegisterManager;
-import org.apache.iotdb.cluster.service.ClusterMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -124,20 +124,16 @@ public class Server {
     Router router = Router.getInstance();
     PhysicalNode[][] groups = router.getGroupsNodes(serverId.getIp(), 
serverId.getPort());
 
-    try {
-      for (int i = 0; i < groups.length; i++) {
-        PhysicalNode[] group = groups[i];
-        String groupId = router.getGroupID(group);
-        DataPartitionHolder dataPartitionHolder = new 
DataPartitionRaftHolder(groupId,
-            RaftUtils.getPeerIdArrayFrom(group), serverId, rpcServer, false);
-        dataPartitionHolder.init();
-        dataPartitionHolder.start();
-        dataPartitionHolderMap.put(groupId, dataPartitionHolder);
-        LOGGER.info("{} group has started", groupId);
-        Router.getInstance().showPhysicalNodes(groupId);
-      }
-    }catch (Exception e){
-      e.printStackTrace();
+    for (int i = 0; i < groups.length; i++) {
+      PhysicalNode[] group = groups[i];
+      String groupId = router.getGroupID(group);
+      DataPartitionHolder dataPartitionHolder = new 
DataPartitionRaftHolder(groupId,
+          RaftUtils.getPeerIdArrayFrom(group), serverId, rpcServer, false);
+      dataPartitionHolder.init();
+      dataPartitionHolder.start();
+      dataPartitionHolderMap.put(groupId, dataPartitionHolder);
+      LOGGER.info("{} group has started", groupId);
+      Router.getInstance().showPhysicalNodes(groupId);
     }
 
     try {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryDataTask.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/DataQueryTask.java
similarity index 94%
rename from 
cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryDataTask.java
rename to 
cluster/src/main/java/org/apache/iotdb/cluster/qp/task/DataQueryTask.java
index 6946925..3b905d8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryDataTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/DataQueryTask.java
@@ -21,11 +21,11 @@ package org.apache.iotdb.cluster.qp.task;
 import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
 
-public class QueryDataTask {
+public class DataQueryTask {
   private BasicResponse basicResponse;
   private TaskState state;
 
-  public QueryDataTask(BasicResponse basicResponse,
+  public DataQueryTask(BasicResponse basicResponse,
       TaskState state) {
     this.basicResponse = basicResponse;
     this.state = state;
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
index 0cc9805..75c2381 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.entity.Server;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
-import org.apache.iotdb.cluster.qp.task.QueryDataTask;
+import org.apache.iotdb.cluster.qp.task.DataQueryTask;
 import 
org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager;
 import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
@@ -93,9 +93,9 @@ public class ClusterRpcReaderUtils {
               TASK_MAX_RETRY));
     }
     NodeAsClient nodeAsClient = RaftUtils.getRaftNodeAsClient();
-    QueryDataTask queryDataTask = nodeAsClient.syncHandleRequest(request, 
peerId);
-    if (queryDataTask.getState() == TaskState.FINISH) {
-      return queryDataTask.getBasicResponse();
+    DataQueryTask dataQueryTask = nodeAsClient.syncHandleRequest(request, 
peerId);
+    if (dataQueryTask.getState() == TaskState.FINISH) {
+      return dataQueryTask.getBasicResponse();
     } else {
       return handleQueryRequest(request, peerId, taskRetryNum + 1);
     }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
index 865b6ef..994fc07 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.cluster.rpc.raft;
 
 import com.alipay.sofa.jraft.entity.PeerId;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
-import org.apache.iotdb.cluster.qp.task.QueryDataTask;
+import org.apache.iotdb.cluster.qp.task.DataQueryTask;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 
@@ -43,7 +43,7 @@ public interface NodeAsClient {
    *
    * @param peerId leader node of the target group
    */
-  QueryDataTask syncHandleRequest(BasicRequest request, PeerId peerId);
+  DataQueryTask syncHandleRequest(BasicRequest request, PeerId peerId);
 
   /**
    * Shut down client
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index eafa843..1d32fd5 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -33,7 +33,7 @@ import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
 import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
-import org.apache.iotdb.cluster.qp.task.QueryDataTask;
+import org.apache.iotdb.cluster.qp.task.DataQueryTask;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
@@ -266,13 +266,13 @@ public class RaftNodeAsClientManager {
     }
 
     @Override
-    public QueryDataTask syncHandleRequest(BasicRequest request, PeerId 
peerId) {
+    public DataQueryTask syncHandleRequest(BasicRequest request, PeerId 
peerId) {
       try {
         BasicResponse response = (BasicResponse) 
boltClientService.getRpcClient()
             .invokeSync(peerId.getEndpoint().toString(), request, 
TASK_TIMEOUT_MS);
-        return new QueryDataTask(response, TaskState.FINISH);
+        return new DataQueryTask(response, TaskState.FINISH);
       } catch (RemotingException | InterruptedException e) {
-        return new QueryDataTask(null, TaskState.EXCEPTION);
+        return new DataQueryTask(null, TaskState.EXCEPTION);
       } finally {
         releaseClient(RaftNodeAsClient.this);
       }
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/PhysicalNode.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/PhysicalNode.java
index 66544a8..84cf431 100644
--- 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/PhysicalNode.java
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/PhysicalNode.java
@@ -24,6 +24,7 @@ public class PhysicalNode {
 
   private String ip;
   private int port;
+  private String groupId;
 
   public PhysicalNode(String ip, int port) {
     this.ip = ip;
@@ -77,6 +78,14 @@ public class PhysicalNode {
     return port;
   }
 
+  public String getGroupId() {
+    return groupId;
+  }
+
+  public void setGroupId(String groupId) {
+    this.groupId = groupId;
+  }
+
   @OnlyForTest
   public void setIp(String ip) {
     this.ip = ip;
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
index 0552950..cc2604a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
@@ -51,11 +51,6 @@ public class Router {
   private Map<PhysicalNode, PhysicalNode[][]> dataPartitionCache = new 
HashMap<>();
 
   /**
-   * Key is the first node of the group, value is group id.
-   */
-  private Map<PhysicalNode, String> nodeMapGroupIdCache = new HashMap<>();
-
-  /**
    * Key is group id, value is the first node of the group.
    */
   private Map<String, PhysicalNode> groupIdMapNodeCache = new HashMap<>();
@@ -68,6 +63,7 @@ public class Router {
   private HashFunction hashFunction = new MD5Hash();
 
   private final SortedMap<Integer, PhysicalNode> physicalRing = new 
TreeMap<>();
+
   private final SortedMap<Integer, VirtualNode> virtualRing = new TreeMap<>();
 
   private static class RouterHolder {
@@ -103,17 +99,9 @@ public class Router {
       if (len < replicator) {
         throw new ErrorConfigureExecption(String.format("Replicator number %d 
is greater "
             + "than cluster number %d", replicator, len));
-      } else if (len == replicator) {
-        PhysicalNode[][] val = new PhysicalNode[1][len];
-        nodeMapGroupIdCache.put(first, DATA_GROUP_STR + "0");
-        groupIdMapNodeCache.put(DATA_GROUP_STR + "0", first);
-        for (int j = 0; j < len; j++) {
-          val[0][j] = nodes[(i + j) % len];
-        }
-        dataPartitionCache.put(first, val);
-      }  else {
+      } else {
         PhysicalNode[][] val = new PhysicalNode[replicator][replicator];
-        nodeMapGroupIdCache.put(first, DATA_GROUP_STR + i);
+        first.setGroupId(DATA_GROUP_STR + i);
         groupIdMapNodeCache.put(DATA_GROUP_STR + i, first);
         for (int j = 0; j < replicator; j++) {
           for (int k = 0; k < replicator; k++) {
@@ -149,7 +137,7 @@ public class Router {
   }
 
   public String getGroupID(PhysicalNode[] nodes) {
-    return nodeMapGroupIdCache.get(nodes[0]);
+    return nodes[0].getGroupId();
   }
 
   public PhysicalNode[][] getGroupsNodes(String ip, int port) {
@@ -192,7 +180,6 @@ public class Router {
     virtualRing.clear();
     sgRouter.clear();
     dataPartitionCache.clear();
-    nodeMapGroupIdCache.clear();
     groupIdMapNodeCache.clear();
   }
 
@@ -210,11 +197,6 @@ public class Router {
     }
   }
 
-  public boolean containPhysicalNodeBySG(String storageGroup, PhysicalNode 
node) {
-    PhysicalNode[] nodes = routeGroup(storageGroup);
-    return Arrays.asList(nodes).contains(node);
-  }
-
   public boolean containPhysicalNodeByGroupId(String groupId, PhysicalNode 
node) {
     PhysicalNode[] nodes = getNodesByGroupId(groupId);
     return Arrays.asList(nodes).contains(node);
diff --git 
a/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java 
b/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java
index 5a515fe..3799119 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java
@@ -111,7 +111,7 @@ public class RouterTest {
     assertPhysicalNodeEquals(expected1, router.routeGroup(sg1));
     // test cache
     assertPhysicalNodeEquals(expected1, router.routeGroup(sg1));
-    assertEquals(Router.DATA_GROUP_STR + "0", 
router.getGroupID(router.routeGroup(sg1)));
+    assertEquals(Router.DATA_GROUP_STR + "2", 
router.getGroupID(router.routeGroup(sg1)));
 
     String sg2 = "root.vehicle.d1";
     assertEquals(router.routeNode(sg2), new PhysicalNode("192.168.130.2", 
PORT));

Reply via email to