This is an automated email from the ASF dual-hosted git repository.
east pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new a832cc4 update raft implementation
a832cc4 is described below
commit a832cc4f996c58b43f88555212f6f5ea25a3426c
Author: mdf369 <[email protected]>
AuthorDate: Tue Mar 26 22:00:04 2019 +0800
update raft implementation
---
.../apache/iotdb/cluster/config/ClusterConfig.java | 20 ++++++------
.../iotdb/cluster/config/ClusterDescriptor.java | 6 ++--
.../org/apache/iotdb/cluster/entity/Server.java | 36 +++++++++++++---------
.../cluster/entity/data/DataPartitionHolder.java | 2 +-
.../{IHolder.java => IMetadataHolder.java} | 2 +-
.../cluster/entity/metadata/MetadataHolder.java | 2 +-
.../entity/raft/DataPartitionRaftHolder.java | 19 +++++++++---
.../cluster/entity/raft/DataStateMachine.java | 3 +-
.../cluster/entity/raft/MetadataRaftHolder.java | 12 +++++---
.../cluster/entity/raft/MetadataStateManchine.java | 31 +++++++++----------
.../apache/iotdb/cluster/entity/raft/RaftNode.java | 2 ++
.../iotdb/cluster/entity/raft/RaftService.java | 21 +++++++++----
.../apache/iotdb/cluster/utils/PhysicalNode.java | 8 +++++
.../org/apache/iotdb/cluster/utils/RaftUtils.java | 16 ++++++++++
.../java/org/apache/iotdb/cluster/utils/Utils.java | 15 ++++-----
15 files changed, 126 insertions(+), 69 deletions(-)
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
index abdbc43..9db07b1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java
@@ -18,13 +18,11 @@
*/
package org.apache.iotdb.cluster.config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
public class ClusterConfig {
public static final String CONFIG_NAME = "iotdb-cluster.properties";
public static final String DEFAULT_NODE_IP = "127.0.0.1:8888";
+ public static final String METADATA_GROUP_ID = "metadata";
// Cluster node: {ip1,ip2,...,ipn}
private String[] nodes = {DEFAULT_NODE_IP};
@@ -56,8 +54,8 @@ public class ClusterConfig {
// Maximum allowed delay hours
private int delayHours = 24;
- /** time limit to redo a single task **/
- private int taskRedoTimeLimit = 3;
+ /** count limit to redo a single task **/
+ private int taskRedoCount = 3;
/** timeout limit for a single task **/
private int taskTimeout = 0;
@@ -155,12 +153,12 @@ public class ClusterConfig {
this.delayHours = delayHours;
}
- public int getTaskRedoTimeLimit() {
- return taskRedoTimeLimit;
+ public int getTaskRedoCount() {
+ return taskRedoCount;
}
- public void setTaskRedoTimeLimit(int taskRedoTimeLimit) {
- this.taskRedoTimeLimit = taskRedoTimeLimit;
+ public void setTaskRedoCount(int taskRedoCount) {
+ this.taskRedoCount = taskRedoCount;
}
public int getTaskTimeout() {
@@ -178,4 +176,8 @@ public class ClusterConfig {
public void setNumOfVirtulaNodes(int numOfVirtulaNodes){
this.numOfVirtulaNodes = numOfVirtulaNodes;
}
+
+ public static String getMetadataGroupId() {
+ return METADATA_GROUP_ID;
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
index ea0ca0c..c9de6f8 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterDescriptor.java
@@ -109,9 +109,9 @@ public class ClusterDescriptor {
.parseInt(properties.getProperty("delay_hours",
Integer.toString(conf.getDelayHours()))));
- conf.setTaskRedoTimeLimit(Integer
- .parseInt(properties.getProperty("task_redo_time_limit",
- Integer.toString(conf.getTaskRedoTimeLimit()))));
+ conf.setTaskRedoCount(Integer
+ .parseInt(properties.getProperty("task_redo_count",
+ Integer.toString(conf.getTaskRedoCount()))));
conf.setTaskTimeout(Integer
.parseInt(properties.getProperty("task_timeout",
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index 61501ba..610894e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -18,10 +18,16 @@
*/
package org.apache.iotdb.cluster.entity;
+import com.alipay.remoting.rpc.RpcServer;
+import com.alipay.sofa.jraft.entity.PeerId;
+import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.iotdb.cluster.utils.PhysicalNode;
+import org.apache.iotdb.cluster.utils.RaftUtils;
+import org.apache.iotdb.cluster.utils.Router;
import org.apache.iotdb.cluster.utils.Utils;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -39,32 +45,34 @@ public class Server {
private static final ClusterConfig ClusterConf =
ClusterDescriptor.getInstance().getConfig();
private MetadataHolder metadataHolder;
- private Map<Integer, DataPartitionHolder> dataPartitionHolderMap;
+ private Map<String, DataPartitionHolder> dataPartitionHolderMap;
public void start() throws AuthException {
// Stand-alone version of IoTDB, be careful to replace the internal JDBC
Server with a cluster version
IoTDB iotdb = new IoTDB();
iotdb.active();
- List<RaftNode> nodeList =
Utils.convertNodesToRaftNodeList(ClusterConf.getNodes());
- metadataHolder = new MetadataRaftHolder(nodeList);
+ PeerId[] peerIds =
Utils.convertStringArrayToPeerIdArray(ClusterConf.getNodes());
+ PeerId serverId = new PeerId(ClusterConf.getIp(), ClusterConf.getPort());
+ RpcServer rpcServer = new RpcServer(serverId.getPort());
+ RaftRpcServerFactory.addRaftRequestProcessors(rpcServer);
+ // TODO 注册业务处理器
+
+ metadataHolder = new MetadataRaftHolder(peerIds, serverId, rpcServer);
metadataHolder.init();
metadataHolder.start();
dataPartitionHolderMap = new HashMap<>();
- int index = Utils.getIndexOfIpFromRaftNodeList(ClusterConf.getIp(),
nodeList);
- List<Pair<Integer, List<RaftNode>>> groupNodeList =
getDataPartitonNodeList(index, nodeList);
- for(int i = 0; i < groupNodeList.size(); i++) {
- Pair<Integer, List<RaftNode>> pair = groupNodeList.get(i);
- DataPartitionHolder dataPartitionHolder = new
DataPartitionRaftHolder(pair.left, pair.right);
+ Router router = Router.getInstance();
+ PhysicalNode[][] groups = router.generateGroups(serverId.getIp(),
serverId.getPort());
+
+ for(int i = 0; i < groups.length; i++) {
+ PhysicalNode[] group = groups[i];
+ String groupId = router.getGroupID(group);
+ DataPartitionHolder dataPartitionHolder = new
DataPartitionRaftHolder(groupId,
RaftUtils.convertPhysicalNodeArrayToPeerIdArray(group), serverId, rpcServer);
dataPartitionHolder.init();
dataPartitionHolder.start();
- dataPartitionHolderMap.put(pair.left, dataPartitionHolder);
+ dataPartitionHolderMap.put(groupId, dataPartitionHolder);
}
}
-
- // A node belongs to multiple data groups and calculates the members of the
i-th data group.
- private List<Pair<Integer, List<RaftNode>>> getDataPartitonNodeList(int i,
List<RaftNode> nodeList) {
- return Collections.emptyList();
- }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/data/DataPartitionHolder.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/data/DataPartitionHolder.java
index 2975cc1..b29f28a 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/data/DataPartitionHolder.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/data/DataPartitionHolder.java
@@ -22,7 +22,7 @@ import org.apache.iotdb.cluster.entity.service.IService;
public class DataPartitionHolder implements IPartitionHolder {
- protected IService service;;
+ protected IService service;
@Override
public void init() {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/metadata/IHolder.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/metadata/IMetadataHolder.java
similarity index 96%
rename from
cluster/src/main/java/org/apache/iotdb/cluster/entity/metadata/IHolder.java
rename to
cluster/src/main/java/org/apache/iotdb/cluster/entity/metadata/IMetadataHolder.java
index f8cbefc..8a0f63e 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/metadata/IHolder.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/metadata/IMetadataHolder.java
@@ -18,7 +18,7 @@
*/
package org.apache.iotdb.cluster.entity.metadata;
-public interface IHolder {
+public interface IMetadataHolder {
void init();
void start();
void stop();
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/metadata/MetadataHolder.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/metadata/MetadataHolder.java
index 7358240..0bee2b9 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/metadata/MetadataHolder.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/metadata/MetadataHolder.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.cluster.entity.metadata;
import org.apache.iotdb.cluster.entity.service.IService;
-public abstract class MetadataHolder implements IHolder {
+public abstract class MetadataHolder implements IMetadataHolder {
protected IService service;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataPartitionRaftHolder.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataPartitionRaftHolder.java
index a6e9ab9..ad0bd89 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataPartitionRaftHolder.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataPartitionRaftHolder.java
@@ -18,18 +18,27 @@
*/
package org.apache.iotdb.cluster.entity.raft;
-import com.alipay.sofa.jraft.storage.LogStorage;
-import java.util.List;
+import com.alipay.remoting.rpc.RpcServer;
+import com.alipay.sofa.jraft.RaftGroupService;
+import com.alipay.sofa.jraft.entity.PeerId;
import org.apache.iotdb.cluster.entity.data.DataPartitionHolder;
public class DataPartitionRaftHolder extends DataPartitionHolder {
- private int groupId;
+ private String groupId;
+ private PeerId serverId;
private DataStateMachine fsm;
+ private RaftGroupService raftGroupService;
- public DataPartitionRaftHolder(int groupId, List<RaftNode> nodeList) {
+ public DataPartitionRaftHolder(String groupId, PeerId[] peerIds, PeerId
serverId, RpcServer rpcServer) {
this.groupId = groupId;
+ this.serverId = serverId;
fsm = new DataStateMachine();
- service = new RaftService(nodeList);
+ service = new RaftService(groupId, peerIds, serverId, rpcServer);
+ }
+
+ @Override
+ public void start() {
+ super.start();
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
index fc537e6..7d94bf1 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/DataStateMachine.java
@@ -23,7 +23,8 @@ import com.alipay.sofa.jraft.core.StateMachineAdapter;
import org.apache.iotdb.db.engine.Processor;
public class DataStateMachine extends StateMachineAdapter {
- private Processor process;
+ private Processor process; //TODO 是否复用,复用的话是否线程安全
+ // TODO 是否可以直接改成QP
public DataStateMachine() {
//TODO init @code{process}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataRaftHolder.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataRaftHolder.java
index 960fa2b..d8c2c00 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataRaftHolder.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataRaftHolder.java
@@ -18,17 +18,21 @@
*/
package org.apache.iotdb.cluster.entity.raft;
-import com.alipay.sofa.jraft.storage.LogStorage;
-import java.util.List;
+import com.alipay.remoting.rpc.RpcServer;
+import com.alipay.sofa.jraft.entity.PeerId;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.entity.metadata.MetadataHolder;
import org.apache.iotdb.db.auth.AuthException;
public class MetadataRaftHolder extends MetadataHolder {
private MetadataStateManchine fsm;
+ private PeerId serverId;
- public MetadataRaftHolder(List<RaftNode> nodeList) throws AuthException {
+ public MetadataRaftHolder(PeerId[] peerIds, PeerId serverId, RpcServer
rpcServer) throws AuthException {
fsm = new MetadataStateManchine();
- service = new RaftService(nodeList);
+ this.serverId = serverId;
+ service = new RaftService(ClusterConfig.getMetadataGroupId(), peerIds,
serverId, rpcServer);
}
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
index 42df035..3a2d0d8 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/MetadataStateManchine.java
@@ -20,30 +20,22 @@ package org.apache.iotdb.cluster.entity.raft;
import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.core.StateMachineAdapter;
-import java.util.ArrayList;
-import java.util.List;
+import java.io.IOException;
import org.apache.iotdb.db.auth.AuthException;
import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
+import org.apache.iotdb.db.exception.PathErrorException;
+import org.apache.iotdb.db.metadata.MManager;
public class MetadataStateManchine extends StateMachineAdapter {
- /** All Storage Groups in Cluster **/
- private List<String> storageGroupList;
+ /** manager of storage groups **/
+ private MManager mManager = MManager.getInstance();
/** manager of user profile **/
private IAuthorizer authorizer = LocalFileAuthorizer.getInstance();
public MetadataStateManchine() throws AuthException {
- storageGroupList = new ArrayList<>();
- updateStorageGroupList();
- }
-
- /**
- * update @code{storageGroupList} from IoTDB instance
- */
- private void updateStorageGroupList() {
-
}
// Update StrageGroup List and userProfileMap based on Task read from raft
log
@@ -53,19 +45,24 @@ public class MetadataStateManchine extends
StateMachineAdapter {
}
public boolean isStorageGroupLegal(String sg) {
- return storageGroupList.contains(sg);
+ try {
+ mManager.checkPathStorageLevelAndGetDataType(sg);
+ } catch (PathErrorException e) {
+ return false;
+ }
+ return true;
}
public boolean isUerProfileLegal(String username, String password) throws
AuthException {
return authorizer.login(username, password);
}
- public void addStorageGroup(String sg) {
- storageGroupList.add(sg);
+ public void addStorageGroup(String sg) throws IOException,
PathErrorException {
+ mManager.setStorageLevelToMTree(sg);
}
public void deleteStorageGroup(String sg) {
- storageGroupList.remove(sg);
+ // TODO implement this method
}
public void addUser(String username, String password) throws AuthException {
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/RaftNode.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/RaftNode.java
index 2ded0f2..476dcbc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/RaftNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/RaftNode.java
@@ -23,6 +23,8 @@ import org.slf4j.LoggerFactory;
public class RaftNode {
+ // TODO replace this by PeerId
+
private static final Logger LOGGER = LoggerFactory.getLogger(RaftNode.class);
private String ip;
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/RaftService.java
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/RaftService.java
index 631c805..1653462 100644
---
a/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/RaftService.java
+++
b/cluster/src/main/java/org/apache/iotdb/cluster/entity/raft/RaftService.java
@@ -18,17 +18,26 @@
*/
package org.apache.iotdb.cluster.entity.raft;
-import com.alipay.sofa.jraft.storage.LogStorage;
+import com.alipay.remoting.rpc.RpcServer;
+import com.alipay.sofa.jraft.RaftGroupService;
+import com.alipay.sofa.jraft.entity.PeerId;
+import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.cluster.entity.service.IService;
public class RaftService implements IService {
- private List<RaftNode> nodeList;
- private RaftNode leader;
+ private List<PeerId> peerIdList;
+ private PeerId leader;
+ private RaftGroupService raftGroupService;
- public RaftService(List<RaftNode> nodeList) {
- this.nodeList = nodeList;
+ public RaftService(String groupId, PeerId[] peerIds, PeerId serverId,
RpcServer rpcServer) {
+ this.peerIdList = new ArrayList<>(peerIds.length);
+ for (int i = 0; i < peerIds.length; i++) {
+ peerIdList.add(peerIds[i]);
+ }
+
+ raftGroupService = new RaftGroupService(groupId, serverId, null,
rpcServer);
}
@Override
@@ -38,7 +47,7 @@ public class RaftService implements IService {
@Override
public void start() {
-
+ raftGroupService.start();
}
@Override
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PhysicalNode.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PhysicalNode.java
index fa12f68..fc1012b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PhysicalNode.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PhysicalNode.java
@@ -49,4 +49,12 @@ public class PhysicalNode {
public String toString() {
return getKey();
}
+
+ public String getIp() {
+ return ip;
+ }
+
+ public int getPort() {
+ return port;
+ }
}
diff --git
a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
index 496f19b..0b150b7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/RaftUtils.java
@@ -36,4 +36,20 @@ public class RaftUtils {
public static Configuration getConfiguration(String groupID) {
return null;
}
+
+ public static PhysicalNode[] convertPeerIdArrayToPhysicalNodeArray(PeerId[]
peerIds) {
+ PhysicalNode[] physicalNodes = new PhysicalNode[peerIds.length];
+ for (int i = 0; i < peerIds.length; i++) {
+ physicalNodes[i] = new PhysicalNode(peerIds[i].getIp(),
peerIds[i].getPort());
+ }
+ return physicalNodes;
+ }
+
+ public static PeerId[] convertPhysicalNodeArrayToPeerIdArray(PhysicalNode[]
physicalNodes) {
+ PeerId[] peerIds = new PeerId[physicalNodes.length];
+ for (int i = 0; i < physicalNodes.length; i++) {
+ peerIds[i] = new PeerId(physicalNodes[i].getIp(),
physicalNodes[i].getPort());
+ }
+ return peerIds;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/Utils.java
b/cluster/src/main/java/org/apache/iotdb/cluster/utils/Utils.java
index 7a2ce6f..10790b9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/Utils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/Utils.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.cluster.utils;
+import com.alipay.sofa.jraft.entity.PeerId;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.cluster.entity.raft.RaftNode;
@@ -26,17 +27,17 @@ public class Utils {
private Utils(){}
- public static List<RaftNode> convertNodesToRaftNodeList(String[] nodes) {
- List<RaftNode> nodeList = new ArrayList<>(nodes.length);
+ public static PeerId[] convertStringArrayToPeerIdArray(String[] nodes) {
+ PeerId[] peerIds = new PeerId[nodes.length];
for (int i = 0; i < nodes.length; i++) {
- nodeList.add(RaftNode.parseRaftNode(nodes[i]));
+ peerIds[i] = PeerId.parsePeer(nodes[i]);
}
- return nodeList;
+ return peerIds;
}
- public static int getIndexOfIpFromRaftNodeList(String ip, List<RaftNode>
nodeList) {
- for (int i = 0; i < nodeList.size(); i++) {
- if (nodeList.get(i).getIp().equals(ip)) {
+ public static int getIndexOfIpFromRaftNodeList(String ip, PeerId[] peerIds) {
+ for (int i = 0; i < peerIds.length; i++) {
+ if (peerIds[i].getIp().equals(ip)) {
return i;
}
}