This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 6ee0566 rename some functions
6ee0566 is described below
commit 6ee0566b032baa05db3aec125fe7ca656dce6224
Author: xiangdong huang <[email protected]>
AuthorDate: Wed Apr 3 00:22:43 2019 +0800
rename some functions
---
.../org/apache/iotdb/cluster/entity/Server.java | 5 +++-
.../apache/iotdb/cluster/qp/ClusterQPExecutor.java | 2 +-
.../cluster/qp/executor/NonQueryExecutor.java | 4 +--
.../processor/DataGroupNonQueryAsyncProcessor.java | 2 +-
.../processor/MetaGroupNonQueryAsyncProcessor.java | 2 +-
.../org/apache/iotdb/cluster/utils/RaftUtils.java | 31 ++++++++++++++--------
.../apache/iotdb/cluster/utils/hash/Router.java | 16 +++++------
.../iotdb/cluster/utils/hash/VirtualNode.java | 2 +-
8 files changed, 38 insertions(+), 26 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index 5131070..cc1d6fd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -41,6 +41,9 @@ import org.apache.iotdb.db.service.IoTDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * each server represents a node in the physical world.
+ */
public class Server {
private static final Logger LOGGER = LoggerFactory.getLogger(Server.class);
@@ -83,7 +86,7 @@ public class Server {
PhysicalNode[] group = groups[i];
String groupId = router.getGroupID(group);
DataPartitionHolder dataPartitionHolder = new
DataPartitionRaftHolder(groupId,
- RaftUtils.convertPhysicalNodeArrayToPeerIdArray(group), serverId,
rpcServer, false);
+ RaftUtils.getPeerIdArrayFrom(group), serverId, rpcServer, false);
dataPartitionHolder.init();
dataPartitionHolder.start();
dataPartitionHolderMap.put(groupId, dataPartitionHolder);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index 36b3207..cd0c8b9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -132,7 +132,7 @@ public abstract class ClusterQPExecutor {
public boolean canHandleNonQuery(String storageGroup) {
if (router.containPhysicalNode(storageGroup, localNode)) {
String groupId = getGroupIdBySG(storageGroup);
- if
(RaftUtils.convertPeerId(RaftUtils.getTargetPeerID(groupId)).equals(localNode))
{
+ if
(RaftUtils.getPhysicalNodeFrom(RaftUtils.getLeaderPeerID(groupId)).equals(localNode))
{
return true;
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
index 2c354cc..ac2e616 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/qp/executor/NonQueryExecutor.java
@@ -192,7 +192,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
private boolean handleDataGroupRequest(String storageGroup, PhysicalPlan
plan)
throws IOException, RaftConnectionException, InterruptedException {
String groupId = getGroupIdBySG(storageGroup);
- PeerId leader = RaftUtils.getTargetPeerID(groupId);
+ PeerId leader = RaftUtils.getLeaderPeerID(groupId);
DataGroupNonQueryRequest request = new DataGroupNonQueryRequest(groupId,
plan);
SingleQPTask qpTask = new SingleQPTask(false, request);
/** Check if the plan can be executed locally. **/
@@ -255,7 +255,7 @@ public class NonQueryExecutor extends ClusterQPExecutor {
MetaGroupNonQueryRequest request = new MetaGroupNonQueryRequest(
CLUSTER_CONFIG.METADATA_GROUP_ID,
plan);
- PeerId leader =
RaftUtils.getTargetPeerID(CLUSTER_CONFIG.METADATA_GROUP_ID);
+ PeerId leader =
RaftUtils.getLeaderPeerID(CLUSTER_CONFIG.METADATA_GROUP_ID);
SingleQPTask task = new SingleQPTask(false, request);
return asyncHandleTask(task, leader, 0);
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
index 5e756fa..5cb8ddb 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/DataGroupNonQueryAsyncProcessor.java
@@ -60,7 +60,7 @@ public class DataGroupNonQueryAsyncProcessor extends
BasicAsyncUserProcessor<Dat
DataPartitionRaftHolder dataPartitionRaftHolder =
(DataPartitionRaftHolder) server
.getDataPartitionHolderMap().get(groupId);
if (!dataPartitionRaftHolder.getFsm().isLeader()) {
- PeerId leader = RaftUtils.getTargetPeerID(groupId);
+ PeerId leader = RaftUtils.getLeaderPeerID(groupId);
LOGGER.info("Request need to redirect leader: {}, groupId : {} ",
leader, groupId);
BoltCliClientService cliClientService = new BoltCliClientService();
cliClientService.init(new CliOptions());
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
index 1a31f0d..6b51391 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/processor/MetaGroupNonQueryAsyncProcessor.java
@@ -58,7 +58,7 @@ public class MetaGroupNonQueryAsyncProcessor extends
BasicAsyncUserProcessor<Met
String groupId = metaGroupNonQueryRequest.getGroupID();
MetadataRaftHolder metadataHolder = (MetadataRaftHolder)
server.getMetadataHolder();
if (!metadataHolder.getFsm().isLeader()) {
- PeerId leader = RaftUtils.getTargetPeerID(groupId);
+ PeerId leader = RaftUtils.getLeaderPeerID(groupId);
LOGGER.info("Request need to redirect leader: {}, groupId : {} ",
leader, groupId);
BoltCliClientService cliClientService = new BoltCliClientService();
cliClientService.init(new CliOptions());
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index da42cb7..db2fcb4 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -45,7 +45,7 @@ public class RaftUtils {
private static final Router router = Router.getInstance();
/**
* The cache will be update in two case: 1. When @onLeaderStart() method of
state machine is
- * called, the cache will be update. 2. When @getTargetPeerID() in this
class is called and cache
+ * called, the cache will be update. 2. When @getLeaderPeerID() in this
class is called and cache
* don't have the key, it's will get random peer and update. 3. When
@redirected of BasicRequest
* is true, the task will be retry and the cache will update.
*/
@@ -54,14 +54,17 @@ public class RaftUtils {
private RaftUtils() {
}
- @Deprecated
+
/**
- * Get leader node according to the group id
+ * @deprecated
+ * Get leader node according to the group id.
+ * <br/> This method will connect to one of nodes in the group to get the
correct leader.
*
* @param groupId group id of raft group
* @return PeerId of leader
*/
- public static PeerId getTargetPeerID(String groupId, BoltCliClientService
cliClientService)
+ @Deprecated
+ public static PeerId getLeaderPeerID(String groupId, BoltCliClientService
cliClientService)
throws RaftConnectionException {
Configuration conf = getConfiguration(groupId);
RouteTable.getInstance().updateConfiguration(groupId, conf);
@@ -81,7 +84,7 @@ public class RaftUtils {
*
* @return leader id
*/
- public static PeerId getTargetPeerID(String groupId) {
+ public static PeerId getLeaderPeerID(String groupId) {
if (!groupLeaderCache.containsKey(groupId)) {
PeerId randomPeerId = getRandomPeerID(groupId);
groupLeaderCache.put(groupId, randomPeerId);
@@ -101,7 +104,7 @@ public class RaftUtils {
} else {
PhysicalNode[] physicalNodes = router.getNodesByGroupId(groupId);
PhysicalNode node = physicalNodes[getRandomInt(physicalNodes.length)];
- randomPeerId = convertPhysicalNode(node);
+ randomPeerId = getPeerIDFrom(node);
}
return randomPeerId;
}
@@ -121,6 +124,7 @@ public class RaftUtils {
* @return raft group configuration
*/
public static Configuration getConfiguration(String groupID) {
+ //TODO can we reuse Configuration instance?
Configuration conf = new Configuration();
RaftService service;
if (groupID.equals(CLUSTER_CONFIG.METADATA_GROUP_ID)) {
@@ -135,14 +139,19 @@ public class RaftUtils {
return conf;
}
- public static PeerId convertPhysicalNode(PhysicalNode node) {
+ public static PeerId getPeerIDFrom(PhysicalNode node) {
return new PeerId(node.ip, node.port);
}
- public static PhysicalNode convertPeerId(PeerId peer) {
+ public static PhysicalNode getPhysicalNodeFrom(PeerId peer) {
return new PhysicalNode(peer.getIp(), peer.getPort());
}
+ /**
+ *
+ * @param nodes each node string is in the format of "ip:port:idx",
+ * @return
+ */
public static PeerId[] convertStringArrayToPeerIdArray(String[] nodes) {
PeerId[] peerIds = new PeerId[nodes.length];
for (int i = 0; i < nodes.length; i++) {
@@ -160,7 +169,7 @@ public class RaftUtils {
return -1;
}
- public static PhysicalNode[] convertPeerIdArrayToPhysicalNodeArray(PeerId[]
peerIds) {
+ public static PhysicalNode[] getPhysicalNodeArrayFrom(PeerId[] peerIds) {
PhysicalNode[] physicalNodes = new PhysicalNode[peerIds.length];
for (int i = 0; i < peerIds.length; i++) {
physicalNodes[i] = new PhysicalNode(peerIds[i].getIp(),
peerIds[i].getPort());
@@ -168,10 +177,10 @@ public class RaftUtils {
return physicalNodes;
}
- public static PeerId[] convertPhysicalNodeArrayToPeerIdArray(PhysicalNode[]
physicalNodes) {
+ public static PeerId[] getPeerIdArrayFrom(PhysicalNode[] physicalNodes) {
PeerId[] peerIds = new PeerId[physicalNodes.length];
for (int i = 0; i < physicalNodes.length; i++) {
- peerIds[i] = convertPhysicalNode(physicalNodes[i]);
+ peerIds[i] = getPeerIDFrom(physicalNodes[i]);
}
return peerIds;
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
index a61c7b8..b7383aa 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/Router.java
@@ -86,7 +86,7 @@ public class Router {
/**
* Change this method to public for test, you should not invoke this method
explicitly.
*/
- public void init() {
+ void init() {
reset();
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
String[] hosts = config.getNodes();
@@ -97,7 +97,7 @@ public class Router {
PhysicalNode node = new PhysicalNode(values[0],
Integer.parseInt(values[1]));
addNode(node, numOfVirtualNodes);
}
- PhysicalNode[] nodes = physicalRing.values().toArray(new
PhysicalNode[physicalRing.size()]);
+ PhysicalNode[] nodes = physicalRing.values().toArray(new PhysicalNode[0]);
int len = nodes.length;
for (int i = 0; i < len; i++) {
PhysicalNode first = nodes[i];
@@ -129,15 +129,15 @@ public class Router {
/**
* Calculate the physical nodes corresponding to the replications where a
data point is located
*
- * @param objectKey storage group
+ * @param storageGroupName storage group
*/
- public PhysicalNode[] routeGroup(String objectKey) {
- if (sgRouter.containsKey(objectKey)) {
- return sgRouter.get(objectKey);
+ public PhysicalNode[] routeGroup(String storageGroupName) {
+ if (sgRouter.containsKey(storageGroupName)) {
+ return sgRouter.get(storageGroupName);
}
- PhysicalNode node = routeNode(objectKey);
+ PhysicalNode node = routeNode(storageGroupName);
PhysicalNode[] nodes = dataPartitionCache.get(node)[0];
- sgRouter.put(objectKey, nodes);
+ sgRouter.put(storageGroupName, nodes);
return nodes;
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/VirtualNode.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/VirtualNode.java
index 94da31e..88816cf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/VirtualNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/hash/VirtualNode.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.cluster.utils.hash;
public class VirtualNode {
-
+ //the index of the virtual node in the physicalNode
private final int replicaIndex;
private final PhysicalNode physicalNode;