This is an automated email from the ASF dual-hosted git repository.
dope pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 8fbbdd1 add group id function
new fba8a70 Merge branch 'cluster' of github.com:apache/incubator-iotdb
into cluster
8fbbdd1 is described below
commit 8fbbdd17302ada9855b484ee8708bfbcb772b5aa
Author: XuYi <[email protected]>
AuthorDate: Tue Mar 26 14:20:32 2019 +0800
add group id function
---
.../org/apache/iotdb/cluster/utils/Router.java | 22 ++++-
.../org/apache/iotdb/cluster/utils/RouterTest.java | 97 +++++++++++++++++++---
2 files changed, 107 insertions(+), 12 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/Router.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/Router.java
index 45542db..82b7f4f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/Router.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/Router.java
@@ -24,6 +24,8 @@ public class Router {
// A local cache to store Which nodes do a storage group correspond to
private Map<String, PhysicalNode[]> router = new ConcurrentHashMap<>();
private Map<PhysicalNode, PhysicalNode[][]> dataPartitionCache = new
HashMap<>();
+ private Map<PhysicalNode, String> groupIdCache = new HashMap<>();
+ public static final String DATA_GROUP_STR = "data-group-";
private static class RouterHolder {
@@ -38,6 +40,7 @@ public class Router {
return RouterHolder.INSTANCE;
}
+ // change this method to public for test, you should not invoke this method
explicitly.
public void init() {
reset();
ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
@@ -57,12 +60,14 @@ public class Router {
+ "than cluster number %d", replicator, len));
} else if (len == replicator) {
PhysicalNode[][] val = new PhysicalNode[1][len];
+ groupIdCache.put(first, DATA_GROUP_STR + "0");
for (int j = 0; j < len; j++) {
val[0][j] = nodes[(i + j) % len];
}
dataPartitionCache.put(first, val);
} else {
PhysicalNode[][] val = new PhysicalNode[replicator][replicator];
+ groupIdCache.put(first, DATA_GROUP_STR + i);
for (int j = 0; j < replicator; j++) {
for (int k = 0; k < replicator; k++) {
val[j][k] = nodes[(i - j + k + len) % len];
@@ -85,6 +90,10 @@ public class Router {
return nodes;
}
+ public String getGroupID(PhysicalNode[] nodes) {
+ return groupIdCache.get(nodes[0]);
+ }
+
public PhysicalNode[][] generateGroups(String ip, int port) {
return this.generateGroups(new PhysicalNode(ip, port));
}
@@ -97,8 +106,8 @@ public class Router {
}
}
- // For a storage group, compute the nearest physical node on the VRing
- private PhysicalNode routeNode(String objectKey) {
+ // For a storage group, compute the nearest physical node on the hash ring
+ public PhysicalNode routeNode(String objectKey) {
int hashVal = hashFunction.hash(objectKey);
SortedMap<Integer, VirtualNode> tailMap = virtualRing.tailMap(hashVal);
Integer nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() :
virtualRing.firstKey();
@@ -115,11 +124,20 @@ public class Router {
virtualRing.clear();
router.clear();
dataPartitionCache.clear();
+ groupIdCache.clear();
}
+ // only for test
public void showPhysicalRing() {
for (Entry<Integer, PhysicalNode> entry : physicalRing.entrySet()) {
System.out.println(String.format("%d-%s", entry.getKey(),
entry.getValue().getKey()));
}
}
+
+ //only for test
+ public void showVirtualRing() {
+ for (Entry<Integer, VirtualNode> entry : virtualRing.entrySet()) {
+ System.out.println(String.format("%d-%s", entry.getKey(),
entry.getValue().getKey()));
+ }
+ }
}
diff --git
a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RouterTest.java
b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RouterTest.java
index 7cbf89c..f98fa8e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/utils/RouterTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/utils/RouterTest.java
@@ -14,13 +14,13 @@ public class RouterTest {
String[] ipListOld;
int portOld;
int replicatorOld;
+ HashFunction function = new MD5Hash();
@Before
public void setUp() throws Exception {
ipListOld = config.getNodes();
portOld = config.getPort();
replicatorOld = config.getReplication();
-
}
@After
@@ -30,10 +30,87 @@ public class RouterTest {
config.setReplication(replicatorOld);
}
-// @Test
-// public void testRouteGroup() {
-//
-// }
+ @Test
+ public void testRouteNodeAndGroup1() {
+ String[] ipList = {"192.168.130.1", "192.168.130.2", "192.168.130.3",
"192.168.130.4",
+ "192.168.130.5",};
+ int port = 7777;
+ int replicator = 3;
+ config.setNodes(ipList);
+ config.setPort(port);
+ config.setReplication(replicator);
+
+ Router router = Router.getInstance();
+ router.init();
+// router.showPhysicalRing();
+// router.showVirtualRing();
+ String sg1 = "root.device.sensor";
+// System.out.println(function.hash(sg1));
+ assertTrue(router.routeNode(sg1).equals(new PhysicalNode("192.168.130.4",
port)));
+ PhysicalNode[] expected1 = {
+ new PhysicalNode("192.168.130.4", port),
+ new PhysicalNode("192.168.130.5", port),
+ new PhysicalNode("192.168.130.2", port)
+ };
+ assertPhysicalNodeEquals(expected1, router.routeGroup(sg1));
+ // test cache
+ assertPhysicalNodeEquals(expected1, router.routeGroup(sg1));
+ assertEquals(Router.DATA_GROUP_STR + "0",
router.getGroupID(router.routeGroup(sg1)));
+
+ String sg2 = "root.device.sensor2";
+// System.out.println(function.hash(sg2));
+ assertTrue(router.routeNode(sg2).equals(new PhysicalNode("192.168.130.3",
port)));
+ PhysicalNode[] expected2 = {
+ new PhysicalNode("192.168.130.3", port),
+ new PhysicalNode("192.168.130.4", port),
+ new PhysicalNode("192.168.130.5", port)
+ };
+ assertPhysicalNodeEquals(expected2, router.routeGroup(sg2));
+ // test cache
+ assertPhysicalNodeEquals(expected2, router.routeGroup(sg2));
+ assertEquals(Router.DATA_GROUP_STR + "4",
router.getGroupID(router.routeGroup(sg2)));
+ }
+
+ @Test
+ public void testRouteNodeAndGroup2() {
+ String[] ipList = {"192.168.130.1", "192.168.130.2", "192.168.130.3"};
+ int port = 7777;
+ int replicator = 3;
+ config.setNodes(ipList);
+ config.setPort(port);
+ config.setReplication(replicator);
+
+ Router router = Router.getInstance();
+ router.init();
+// router.showPhysicalRing();
+// router.showVirtualRing();
+ String sg1 = "root.device.sensor";
+// System.out.println(function.hash(sg1));
+ assertTrue(router.routeNode(sg1).equals(new PhysicalNode("192.168.130.3",
port)));
+ PhysicalNode[] expected1 = {
+ new PhysicalNode("192.168.130.3", port),
+ new PhysicalNode("192.168.130.2", port),
+ new PhysicalNode("192.168.130.1", port)
+ };
+ assertPhysicalNodeEquals(expected1, router.routeGroup(sg1));
+ // test cache
+ assertPhysicalNodeEquals(expected1, router.routeGroup(sg1));
+ assertEquals(Router.DATA_GROUP_STR + "0",
router.getGroupID(router.routeGroup(sg1)));
+
+ String sg2 = "root.vehicle.d1";
+// System.out.println(function.hash(sg2));
+ assertTrue(router.routeNode(sg2).equals(new PhysicalNode("192.168.130.2",
port)));
+ PhysicalNode[] expected2 = {
+ new PhysicalNode("192.168.130.2", port),
+ new PhysicalNode("192.168.130.1", port),
+ new PhysicalNode("192.168.130.3", port)
+ };
+ assertPhysicalNodeEquals(expected2, router.routeGroup(sg2));
+ // test cache
+ assertPhysicalNodeEquals(expected2, router.routeGroup(sg2));
+ assertEquals(Router.DATA_GROUP_STR + "0",
router.getGroupID(router.routeGroup(sg2)));
+ }
+
@Test
public void testGenerateGroups1() {
@@ -77,7 +154,7 @@ public class RouterTest {
};
for (int i = 1; i < 5; i++) {
PhysicalNode[][] expected = generateNodesArray(ipIndex[i - 1], 3, 3,
port);
- assertEquals(expected, router.generateGroups("192.168.130." + i, port));
+ assertPhysicalNodeEquals(expected, router.generateGroups("192.168.130."
+ i, port));
}
}
@@ -106,24 +183,24 @@ public class RouterTest {
};
for (int i = 1; i < 4; i++) {
PhysicalNode[][] expected = generateNodesArray(ipIndex[i - 1], 1, 3,
port);
- assertEquals(expected, router.generateGroups("192.168.130." + i, port));
+ assertPhysicalNodeEquals(expected, router.generateGroups("192.168.130."
+ i, port));
}
}
- boolean assertEquals(PhysicalNode[][] expect, PhysicalNode[][] actual) {
+ boolean assertPhysicalNodeEquals(PhysicalNode[][] expect, PhysicalNode[][]
actual) {
if (expect.length != actual.length) {
return false;
}
int len = expect.length;
for (int i = 0; i < len; i++) {
- if (!assertEquals(expect[i], actual[i])) {
+ if (!assertPhysicalNodeEquals(expect[i], actual[i])) {
return false;
}
}
return true;
}
- boolean assertEquals(PhysicalNode[] expect, PhysicalNode[] actual) {
+ boolean assertPhysicalNodeEquals(PhysicalNode[] expect, PhysicalNode[]
actual) {
if (expect.length != actual.length) {
return false;
}