This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 411ea778316b8df7f6e5a0727dbb24af7fbc2e3a Author: lta <[email protected]> AuthorDate: Wed May 22 19:33:06 2019 +0800 remove groupIdMapNodeCache --- .../org/apache/iotdb/cluster/entity/Server.java | 34 ++++++++++------------ .../{QueryDataTask.java => DataQueryTask.java} | 4 +-- .../cluster/query/utils/ClusterRpcReaderUtils.java | 8 ++--- .../iotdb/cluster/rpc/raft/NodeAsClient.java | 4 +-- .../rpc/raft/impl/RaftNodeAsClientManager.java | 8 ++--- .../iotdb/cluster/utils/hash/PhysicalNode.java | 9 ++++++ .../apache/iotdb/cluster/utils/hash/Router.java | 26 +++-------------- .../iotdb/cluster/utils/hash/RouterTest.java | 2 +- 8 files changed, 41 insertions(+), 54 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java index 27b0e75..a84e389 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java @@ -32,9 +32,6 @@ import org.apache.iotdb.cluster.entity.metadata.MetadataHolder; import org.apache.iotdb.cluster.entity.raft.DataPartitionRaftHolder; import org.apache.iotdb.cluster.entity.raft.MetadataRaftHolder; import org.apache.iotdb.cluster.rpc.raft.impl.RaftNodeAsClientManager; -import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryJobNumAsyncProcessor; -import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryLeaderAsyncProcessor; -import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryMetricAsyncProcessor; import org.apache.iotdb.cluster.rpc.raft.processor.nonquery.DataGroupNonQueryAsyncProcessor; import org.apache.iotdb.cluster.rpc.raft.processor.nonquery.MetaGroupNonQueryAsyncProcessor; import org.apache.iotdb.cluster.rpc.raft.processor.querydata.CloseSeriesReaderSyncProcessor; @@ -46,15 +43,18 @@ import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryMetadataIn import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryPathsAsyncProcessor; import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QuerySeriesTypeAsyncProcessor; import org.apache.iotdb.cluster.rpc.raft.processor.querymetadata.QueryTimeSeriesAsyncProcessor; +import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryJobNumAsyncProcessor; +import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryLeaderAsyncProcessor; +import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryMetricAsyncProcessor; import org.apache.iotdb.cluster.rpc.raft.processor.querymetric.QueryStatusAsyncProcessor; +import org.apache.iotdb.cluster.service.ClusterMonitor; import org.apache.iotdb.cluster.utils.RaftUtils; import org.apache.iotdb.cluster.utils.hash.PhysicalNode; import org.apache.iotdb.cluster.utils.hash.Router; -import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.exception.ProcessorException; +import org.apache.iotdb.db.exception.StartupException; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.service.RegisterManager; -import org.apache.iotdb.cluster.service.ClusterMonitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,20 +124,16 @@ public class Server { Router router = Router.getInstance(); PhysicalNode[][] groups = router.getGroupsNodes(serverId.getIp(), serverId.getPort()); - try { - for (int i = 0; i < groups.length; i++) { - PhysicalNode[] group = groups[i]; - String groupId = router.getGroupID(group); - DataPartitionHolder dataPartitionHolder = new DataPartitionRaftHolder(groupId, - RaftUtils.getPeerIdArrayFrom(group), serverId, rpcServer, false); - dataPartitionHolder.init(); - dataPartitionHolder.start(); - dataPartitionHolderMap.put(groupId, dataPartitionHolder); - LOGGER.info("{} group has started", groupId); - Router.getInstance().showPhysicalNodes(groupId); - } - }catch (Exception e){ - e.printStackTrace(); + for (int i = 0; i < groups.length; i++) { + PhysicalNode[] group = groups[i]; + String groupId = router.getGroupID(group); + DataPartitionHolder dataPartitionHolder = new DataPartitionRaftHolder(groupId, + RaftUtils.getPeerIdArrayFrom(group), serverId, rpcServer, false); + dataPartitionHolder.init(); + dataPartitionHolder.start(); + dataPartitionHolderMap.put(groupId, dataPartitionHolder); + LOGGER.info("{} group has started", groupId); + Router.getInstance().showPhysicalNodes(groupId); } try { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryDataTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/DataQueryTask.java similarity index 94% rename from cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryDataTask.java rename to cluster/src/main/java/org/apache/iotdb/cluster/qp/task/DataQueryTask.java index 6946925..3b905d8 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/QueryDataTask.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/task/DataQueryTask.java @@ -21,11 +21,11 @@ package org.apache.iotdb.cluster.qp.task; import org.apache.iotdb.cluster.qp.task.QPTask.TaskState; import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse; -public class QueryDataTask { +public class DataQueryTask { private BasicResponse basicResponse; private TaskState state; - public QueryDataTask(BasicResponse basicResponse, + public DataQueryTask(BasicResponse basicResponse, TaskState state) { this.basicResponse = basicResponse; this.state = state; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java index 0cc9805..75c2381 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/utils/ClusterRpcReaderUtils.java @@ -24,7 +24,7 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.entity.Server; import org.apache.iotdb.cluster.exception.RaftConnectionException; import org.apache.iotdb.cluster.qp.task.QPTask.TaskState; -import org.apache.iotdb.cluster.qp.task.QueryDataTask; +import org.apache.iotdb.cluster.qp.task.DataQueryTask; import org.apache.iotdb.cluster.query.manager.coordinatornode.ClusterRpcSingleQueryManager; import org.apache.iotdb.cluster.rpc.raft.NodeAsClient; import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest; @@ -93,9 +93,9 @@ public class ClusterRpcReaderUtils { TASK_MAX_RETRY)); } NodeAsClient nodeAsClient = RaftUtils.getRaftNodeAsClient(); - QueryDataTask queryDataTask = nodeAsClient.syncHandleRequest(request, peerId); - if (queryDataTask.getState() == TaskState.FINISH) { - return queryDataTask.getBasicResponse(); + DataQueryTask dataQueryTask = nodeAsClient.syncHandleRequest(request, peerId); + if (dataQueryTask.getState() == TaskState.FINISH) { + return dataQueryTask.getBasicResponse(); } else { return handleQueryRequest(request, peerId, taskRetryNum + 1); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java index 865b6ef..994fc07 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/NodeAsClient.java @@ -20,7 +20,7 @@ package org.apache.iotdb.cluster.rpc.raft; import com.alipay.sofa.jraft.entity.PeerId; import org.apache.iotdb.cluster.exception.RaftConnectionException; -import org.apache.iotdb.cluster.qp.task.QueryDataTask; +import org.apache.iotdb.cluster.qp.task.DataQueryTask; import org.apache.iotdb.cluster.qp.task.SingleQPTask; import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest; @@ -43,7 +43,7 @@ public interface NodeAsClient { * * @param peerId leader node of the target group */ - QueryDataTask syncHandleRequest(BasicRequest request, PeerId peerId); + DataQueryTask syncHandleRequest(BasicRequest request, PeerId peerId); /** * Shut down client diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java index eafa843..1d32fd5 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java @@ -33,7 +33,7 @@ import org.apache.iotdb.cluster.config.ClusterConfig; import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.exception.RaftConnectionException; import org.apache.iotdb.cluster.qp.task.QPTask.TaskState; -import org.apache.iotdb.cluster.qp.task.QueryDataTask; +import org.apache.iotdb.cluster.qp.task.DataQueryTask; import org.apache.iotdb.cluster.qp.task.SingleQPTask; import org.apache.iotdb.cluster.rpc.raft.NodeAsClient; import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest; @@ -266,13 +266,13 @@ public class RaftNodeAsClientManager { } @Override - public QueryDataTask syncHandleRequest(BasicRequest request, PeerId peerId) { + public DataQueryTask syncHandleRequest(BasicRequest request, PeerId peerId) { try { BasicResponse response = (BasicResponse) boltClientService.getRpcClient() .invokeSync(peerId.getEndpoint().toString(), request, TASK_TIMEOUT_MS); - return new QueryDataTask(response, TaskState.FINISH); + return new DataQueryTask(response, TaskState.FINISH); } catch (RemotingException | InterruptedException e) { - return new QueryDataTask(null, TaskState.EXCEPTION); + return new DataQueryTask(null, TaskState.EXCEPTION); } finally { releaseClient(RaftNodeAsClient.this); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/PhysicalNode.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/PhysicalNode.java index 66544a8..84cf431 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/PhysicalNode.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/PhysicalNode.java @@ -24,6 +24,7 @@ public class PhysicalNode { private String ip; private int port; + private String groupId; public PhysicalNode(String ip, int port) { this.ip = ip; @@ -77,6 +78,14 @@ public class PhysicalNode { return port; } + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + @OnlyForTest public void setIp(String ip) { this.ip = ip; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java index 0552950..cc2604a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java @@ -51,11 +51,6 @@ public class Router { private Map<PhysicalNode, PhysicalNode[][]> dataPartitionCache = new HashMap<>(); /** - * Key is the first node of the group, value is group id. - */ - private Map<PhysicalNode, String> nodeMapGroupIdCache = new HashMap<>(); - - /** * Key is group id, value is the first node of the group. */ private Map<String, PhysicalNode> groupIdMapNodeCache = new HashMap<>(); @@ -68,6 +63,7 @@ public class Router { private HashFunction hashFunction = new MD5Hash(); private final SortedMap<Integer, PhysicalNode> physicalRing = new TreeMap<>(); + private final SortedMap<Integer, VirtualNode> virtualRing = new TreeMap<>(); private static class RouterHolder { @@ -103,17 +99,9 @@ public class Router { if (len < replicator) { throw new ErrorConfigureExecption(String.format("Replicator number %d is greater " + "than cluster number %d", replicator, len)); - } else if (len == replicator) { - PhysicalNode[][] val = new PhysicalNode[1][len]; - nodeMapGroupIdCache.put(first, DATA_GROUP_STR + "0"); - groupIdMapNodeCache.put(DATA_GROUP_STR + "0", first); - for (int j = 0; j < len; j++) { - val[0][j] = nodes[(i + j) % len]; - } - dataPartitionCache.put(first, val); - } else { + } else { PhysicalNode[][] val = new PhysicalNode[replicator][replicator]; - nodeMapGroupIdCache.put(first, DATA_GROUP_STR + i); + first.setGroupId(DATA_GROUP_STR + i); groupIdMapNodeCache.put(DATA_GROUP_STR + i, first); for (int j = 0; j < replicator; j++) { for (int k = 0; k < replicator; k++) { @@ -149,7 +137,7 @@ public class Router { } public String getGroupID(PhysicalNode[] nodes) { - return nodeMapGroupIdCache.get(nodes[0]); + return nodes[0].getGroupId(); } public PhysicalNode[][] getGroupsNodes(String ip, int port) { @@ -192,7 +180,6 @@ public class Router { virtualRing.clear(); sgRouter.clear(); dataPartitionCache.clear(); - nodeMapGroupIdCache.clear(); groupIdMapNodeCache.clear(); } @@ -210,11 +197,6 @@ public class Router { } } - public boolean containPhysicalNodeBySG(String storageGroup, PhysicalNode node) { - PhysicalNode[] nodes = routeGroup(storageGroup); - return Arrays.asList(nodes).contains(node); - } - public boolean containPhysicalNodeByGroupId(String groupId, PhysicalNode node) { PhysicalNode[] nodes = getNodesByGroupId(groupId); return Arrays.asList(nodes).contains(node); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java index 5a515fe..3799119 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/hash/RouterTest.java @@ -111,7 +111,7 @@ public class RouterTest { assertPhysicalNodeEquals(expected1, router.routeGroup(sg1)); // test cache assertPhysicalNodeEquals(expected1, router.routeGroup(sg1)); - assertEquals(Router.DATA_GROUP_STR + "0", router.getGroupID(router.routeGroup(sg1))); + assertEquals(Router.DATA_GROUP_STR + "2", router.getGroupID(router.routeGroup(sg1))); String sg2 = "root.vehicle.d1"; assertEquals(router.routeNode(sg2), new PhysicalNode("192.168.130.2", PORT));
