This is an automated email from the ASF dual-hosted git repository.

dope pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new 9ab5c10  init hash function
9ab5c10 is described below

commit 9ab5c10c732ad80d69b0715a77a6a32675ba0d0e
Author: XuYi <[email protected]>
AuthorDate: Mon Mar 25 17:52:45 2019 +0800

    init hash function
---
 .../main/java/org/apache/iotdb/cluster/App.java    |  18 +++
 .../apache/iotdb/cluster/utils/HashFunction.java   |  23 +++
 .../org/apache/iotdb/cluster/utils/Router.java     | 158 +++++++++++++++++++++
 .../java/org/apache/iotdb/cluster/AppTest.java     |  18 +++
 4 files changed, 217 insertions(+)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/App.java 
b/cluster/src/main/java/org/apache/iotdb/cluster/App.java
index 9750baa..09f7e23 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/App.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/App.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.cluster;
 
 public class App {
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
new file mode 100644
index 0000000..1101d1d
--- /dev/null
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/HashFunction.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.org.apache.iotdb.cluster.utils;
+
+public interface HashFunction {
+  public int hash(String str);
+}
diff --git 
a/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/Router.java
 
b/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/Router.java
new file mode 100644
index 0000000..a1168dd
--- /dev/null
+++ 
b/cluster/src/main/java/org/apache/iotdb/cluster/org/apache/iotdb/cluster/utils/Router.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.cluster.org.apache.iotdb.cluster.utils;
+
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class Router {
+  private List<PhysicalNode> nodes = new ArrayList<>();
+  // Replication number
+  private final int replicator;
+  private final int numOfVirtulaNodes = 2;
+  private HashFunction hashFunction = new MD5Hash();
+  private final SortedMap<Integer, PhysicalNode> physicalRing = new 
TreeMap<>();
+  private final SortedMap<Integer, VirtualNode> virtualRing = new TreeMap<>();
+
+
+  // A local cache to store Which nodes do a storage group correspond to
+  private Map<String, PhysicalNode[]> router = new ConcurrentHashMap<>();
+  private Map<PhysicalNode, PhysicalNode[][]> dataPartitionCache = new 
HashMap<>();
+
+  public Router(){
+    // TODO get replicator form config file
+    String[] ipList = {"192.168.130.1", "192.168.130.2", "192.168.130.3",};
+    this.replicator = 3;
+    int port = 7777;
+
+    for(String ip: ipList){
+      nodes.add(new PhysicalNode(ip, port));
+    }
+    init();
+  }
+
+  private void init(){
+    for(PhysicalNode node : nodes){
+      addNode(node, this.numOfVirtulaNodes);
+    }
+
+  }
+
+  private void addNode(PhysicalNode node, int virtualNum){
+    physicalRing.put(hashFunction.hash(node.getKey()), node);
+    for(int i = 0; i < virtualNum; i++){
+      VirtualNode vNode = new VirtualNode(i, node);
+      virtualRing.put(hashFunction.hash(vNode.getKey()), vNode);
+    }
+  }
+
+  // For a storage group, compute the nearest physical node on the VRing
+  private PhysicalNode routeNode(String objectKey){
+//    if(router.containsKey(objectKey)){
+//      return router.get(objectKey);
+//    }
+//    int hashVal = hashFunction.hash(objectKey);
+//    SortedMap<Integer,VirtualNode> tailMap = virtualRing.tailMap(hashVal);
+//    Integer nodeHashVal = !tailMap.isEmpty() ? tailMap.firstKey() : 
virtualRing.firstKey();
+//    PhysicalNode node = virtualRing.get(nodeHashVal).getPhysicalNode();
+//    router.put(objectKey, node);
+//    return node;
+    return null;
+  }
+
+  // Calculate the physical nodes corresponding to the replications where a 
data point is located
+  public PhysicalNode[] routeGroup(String objectKey){
+    return router.get(objectKey);
+  }
+
+  public PhysicalNode[][] generateGroups(String ip, int port){
+    return this.generateGroups(new PhysicalNode(ip, port));
+  }
+
+  // For a given physical, how many groups does it belong to
+  private PhysicalNode[][] generateGroups(PhysicalNode node){
+    return dataPartitionCache.get(node);
+  }
+
+
+
+  private class PhysicalNode{
+    final String ip;
+    final int port;
+
+    PhysicalNode(String ip, int port){
+      this.ip = ip;
+      this.port = port;
+    }
+
+    String getKey(){
+      return String.format("%s:%d", ip, port);
+    }
+
+  }
+  private class VirtualNode {
+    private final int replicaIndex;
+    private final PhysicalNode physicalNode;
+
+    VirtualNode(int replicaIndex, PhysicalNode physicalNode){
+      this.replicaIndex = replicaIndex;
+      this.physicalNode = physicalNode;
+    }
+
+    PhysicalNode getPhysicalNode(){
+      return this.physicalNode;
+    }
+
+    String getKey(){
+      return String.format("%s-%d", physicalNode.getKey(), replicaIndex);
+    }
+
+  }
+
+  private class MD5Hash implements HashFunction{
+    MessageDigest instance;
+
+    MD5Hash() {
+      try {
+        instance = MessageDigest.getInstance("MD5");
+      } catch (NoSuchAlgorithmException e) {
+      }
+    }
+
+    public synchronized int hash(String key) {
+      instance.reset();
+      instance.update(key.getBytes());
+      byte[] digest = instance.digest();
+
+      int h = 0;
+      for (int i = 0; i < 4; i++) {
+        h <<= 8;
+        h |= ((int) digest[i]) & 0xFF;
+      }
+      return h;
+    }
+  }
+}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/AppTest.java 
b/cluster/src/test/java/org/apache/iotdb/cluster/AppTest.java
index d986670..94415b9 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/AppTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/AppTest.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.cluster;
 
 import static org.junit.Assert.*;

Reply via email to