This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster_scalability in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 327eb7e80d7165931b4fa41d4bb1c25eebd38f65 Author: lta <[email protected]> AuthorDate: Thu Jan 14 15:40:58 2021 +0800 1. fix ut tests 2. The two-stage relative order problem of double logs is solved. --- .../apache/iotdb/cluster/config/ClusterConfig.java | 2 +- .../iotdb/cluster/log/applier/MetaLogApplier.java | 2 +- .../iotdb/cluster/log/logtypes/AddNodeLog.java | 14 ++- .../iotdb/cluster/log/logtypes/RemoveNodeLog.java | 14 ++- .../iotdb/cluster/log/manage/RaftLogManager.java | 4 + .../iotdb/cluster/partition/NodeRemovalResult.java | 1 + .../partition/balancer/DefaultSlotBalancer.java | 118 ++++++++++++++++++ .../cluster/partition/balancer/SlotBalancer.java | 43 +++++++ .../iotdb/cluster/partition/slot/SlotManager.java | 17 ++- .../cluster/partition/slot/SlotPartitionTable.java | 115 +++++++----------- .../iotdb/cluster/partition/slot/SlotStrategy.java | 18 ++- .../iotdb/cluster/query/ClusterPlanRouter.java | 2 +- .../iotdb/cluster/server/DataClusterServer.java | 4 +- .../cluster/server/member/DataGroupMember.java | 15 ++- .../cluster/server/member/MetaGroupMember.java | 132 +++------------------ .../iotdb/cluster/server/member/RaftMember.java | 22 ++-- .../iotdb/cluster/common/TestAsyncDataClient.java | 27 +++-- .../org/apache/iotdb/cluster/common/TestUtils.java | 2 + .../cluster/server/member/DataGroupMemberTest.java | 30 ++++- .../iotdb/cluster/server/member/MemberTest.java | 34 ++++-- .../cluster/server/member/MetaGroupMemberTest.java | 55 +++++++-- .../apache/iotdb/db/qp/physical/sys/LogPlan.java | 1 + 22 files changed, 421 insertions(+), 251 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java index 05745fd..621149d 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/config/ClusterConfig.java @@ -46,7 +46,7 @@ public class ClusterConfig { private int replicationNum = 2; @ClusterConsistent - private int multiRaftFactor = 2; + private int multiRaftFactor = 1; @ClusterConsistent private String clusterName = "default"; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java index 94437ae..aaa03a4 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java @@ -82,7 +82,7 @@ public class MetaLogApplier extends BaseApplier { LogPlan plan = new LogPlan(log.serialize()); TSStatus status = member.executeNonQueryPlan(plan); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - throw new ChangeMembershipException(String.format("apply %s failed", log)); + throw new ChangeMembershipException(String.format("apply %s failed with status {%s}", log, status)); } } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java index 824c3f2..380ba08 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java @@ -37,6 +37,8 @@ public class AddNodeLog extends Log { private Node newNode; + private long metaLogIndex; + public AddNodeLog(ByteBuffer partitionTable, Node newNode) { this.partitionTable = partitionTable; this.newNode = newNode; @@ -45,6 +47,14 @@ public class AddNodeLog extends Log { public AddNodeLog() { } + public long getMetaLogIndex() { + return metaLogIndex; + } + + public void setMetaLogIndex(long metaLogIndex) { + this.metaLogIndex = metaLogIndex; + } + public void setPartitionTable(ByteBuffer partitionTable) { this.partitionTable = partitionTable; } @@ -68,10 +78,11 @@ public class AddNodeLog extends Log { dataOutputStream.writeByte(Types.ADD_NODE.ordinal()); dataOutputStream.writeLong(getCurrLogIndex()); dataOutputStream.writeLong(getCurrLogTerm()); + dataOutputStream.writeLong(getMetaLogIndex()); SerializeUtils.serialize(newNode, dataOutputStream); - dataOutputStream.write(partitionTable.array().length); + dataOutputStream.writeInt(partitionTable.array().length); dataOutputStream.write(partitionTable.array()); } catch (IOException e) { // ignored @@ -87,6 +98,7 @@ public class AddNodeLog extends Log { // ipLength(int), inBytes(byte[]), port(int), identifier(int), dataPort(int) setCurrLogIndex(buffer.getLong()); setCurrLogTerm(buffer.getLong()); + setMetaLogIndex(buffer.getLong()); newNode = new Node(); SerializeUtils.deserialize(newNode, buffer); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java index 800b77d..22af482 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java @@ -34,6 +34,8 @@ public class RemoveNodeLog extends Log { private Node removedNode; + private long metaLogIndex; + public RemoveNodeLog(ByteBuffer partitionTable, Node removedNode) { this.partitionTable = partitionTable; @@ -43,6 +45,14 @@ public class RemoveNodeLog extends Log { public RemoveNodeLog() { } + public long getMetaLogIndex() { + return metaLogIndex; + } + + public void setMetaLogIndex(long metaLogIndex) { + this.metaLogIndex = metaLogIndex; + } + public ByteBuffer getPartitionTable() { return partitionTable; } @@ -58,10 +68,11 @@ public class RemoveNodeLog extends Log { dataOutputStream.writeByte(Types.REMOVE_NODE.ordinal()); dataOutputStream.writeLong(getCurrLogIndex()); dataOutputStream.writeLong(getCurrLogTerm()); + dataOutputStream.writeLong(getMetaLogIndex()); SerializeUtils.serialize(removedNode, dataOutputStream); - dataOutputStream.write(partitionTable.array().length); + dataOutputStream.writeInt(partitionTable.array().length); dataOutputStream.write(partitionTable.array()); } catch (IOException e) { // ignored @@ -73,6 +84,7 @@ public class RemoveNodeLog extends Log { public void deserialize(ByteBuffer buffer) { setCurrLogIndex(buffer.getLong()); setCurrLogTerm(buffer.getLong()); + setMetaLogIndex(buffer.getLong()); removedNode = new Node(); SerializeUtils.deserialize(removedNode, buffer); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java index bd65c26..c57ee06 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java @@ -946,4 +946,8 @@ public abstract class RaftLogManager { public long getBlockAppliedCommitIndex() { return blockAppliedCommitIndex; } + + public RaftLogManager(LogApplier logApplier) { + this.logApplier = logApplier; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java index 4193ffd..86ff9a2 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java @@ -93,4 +93,5 @@ public class NodeRemovalResult { newGroupList.add(group); } } + } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java new file mode 100644 index 0000000..eb1825f --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/DefaultSlotBalancer.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.partition.balancer; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import org.apache.iotdb.cluster.config.ClusterDescriptor; +import org.apache.iotdb.cluster.partition.PartitionGroup; +import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable; +import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.iotdb.cluster.rpc.thrift.RaftNode; + +/** + * This balancer aims to avg slots to all raft groups. + */ +public class DefaultSlotBalancer implements SlotBalancer { + + private int multiRaftFactor = + ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor(); + private SlotPartitionTable table; + + public DefaultSlotBalancer(SlotPartitionTable partitionTable) { + this.table = partitionTable; + } + + /** + * Move last slots from each group whose slot number is bigger than the new average to the new node. + */ + @Override + public void moveSlotsToNew(Node newNode, List<Node> oldRing) { + Map<RaftNode, List<Integer>> nodeSlotMap = table.getAllNodeSlots(); + Map<RaftNode, Map<Integer, PartitionGroup>> previousNodeMap = table.getPreviousNodeMap(); + RaftNode[] slotNodes = table.getSlotNodes(); + + // as a node is added, the average slots for each node decrease + // move the slots to the new node if any previous node have more slots than the new average + int newAvg = table.getTotalSlotNumbers() / table.getAllNodes().size() / multiRaftFactor; + Map<RaftNode, List<Integer>> newNodeSlotMap = new HashMap<>(); + int raftId = 0; + for (int i = 0; i < multiRaftFactor; i++) { + RaftNode raftNode = new RaftNode(newNode, i); + newNodeSlotMap.putIfAbsent(raftNode, new ArrayList<>()); + previousNodeMap.putIfAbsent(raftNode, new HashMap<>()); + } + for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) { + List<Integer> slots = entry.getValue(); + int transferNum = slots.size() - newAvg; + if (transferNum > 0) { + RaftNode curNode = new RaftNode(newNode, raftId); + int numToMove = transferNum; + if (raftId != multiRaftFactor - 1) { + numToMove = Math.min(numToMove, newAvg - newNodeSlotMap.get(curNode).size()); + } + List<Integer> slotsToMove = slots + .subList(slots.size() - transferNum, slots.size() - transferNum + numToMove); + newNodeSlotMap.get(curNode).addAll(slotsToMove); + for (Integer slot : slotsToMove) { + // record what node previously hold the integer + previousNodeMap.get(curNode).put(slot, table.getHeaderGroup(entry.getKey(), oldRing)); + slotNodes[slot] = curNode; + } + transferNum -= numToMove; + if (transferNum > 0) { + curNode = new RaftNode(newNode, ++raftId); + slotsToMove = slots.subList(slots.size() - transferNum, slots.size()); + newNodeSlotMap.get(curNode).addAll(slotsToMove); + for (Integer slot : slotsToMove) { + // record what node previously hold the integer + previousNodeMap.get(curNode).put(slot, table.getHeaderGroup(entry.getKey(), oldRing)); + slotNodes[slot] = curNode; + } + } + } + } + nodeSlotMap.putAll(newNodeSlotMap); + } + + @Override + public Map<RaftNode, List<Integer>> retrieveSlots(Node target) { + Map<RaftNode, List<Integer>> nodeSlotMap = table.getAllNodeSlots(); + RaftNode[] slotNodes = table.getSlotNodes(); + List<Node> nodeRing = table.getAllNodes(); + + Map<RaftNode, List<Integer>> newHolderSlotMap = new HashMap<>(); + for(int raftId = 0 ; raftId < multiRaftFactor; raftId++) { + RaftNode raftNode = new RaftNode(target, raftId); + List<Integer> slots = nodeSlotMap.remove(raftNode); + for (int i = 0; i < slots.size(); i++) { + int slot = slots.get(i); + RaftNode newHolder = new RaftNode(nodeRing.get(i % nodeRing.size()), raftId); + slotNodes[slot] = newHolder; + nodeSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot); + newHolderSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot); + } + } + return newHolderSlotMap; + } +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/SlotBalancer.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/SlotBalancer.java new file mode 100644 index 0000000..95f806b --- /dev/null +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/balancer/SlotBalancer.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.cluster.partition.balancer; + +import java.util.List; +import java.util.Map; +import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.iotdb.cluster.rpc.thrift.RaftNode; + +/** + * When add/remove node, the slots need to be redistributed. + */ +public interface SlotBalancer { + + /** + * When add a new node, new raft groups will take over some hash slots from another raft groups. + */ + void moveSlotsToNew(Node newNode, List<Node> oldRing); + + /** + * When remove a old node, all hash slots of the removed groups will assigned to other raft groups. + * @param target the node to be removed + */ + Map<RaftNode, List<Integer>> retrieveSlots(Node target); + +} diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java index f0fc11c..7165145 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotManager.java @@ -1,5 +1,20 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or ag [...] + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.iotdb.cluster.partition.slot; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java index f441e4a..255fb22 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java @@ -1,5 +1,20 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or ag [...] + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ package org.apache.iotdb.cluster.partition.slot; @@ -27,6 +42,8 @@ import org.apache.iotdb.cluster.partition.NodeAdditionResult; import org.apache.iotdb.cluster.partition.NodeRemovalResult; import org.apache.iotdb.cluster.partition.PartitionGroup; import org.apache.iotdb.cluster.partition.PartitionTable; +import org.apache.iotdb.cluster.partition.balancer.DefaultSlotBalancer; +import org.apache.iotdb.cluster.partition.balancer.SlotBalancer; import org.apache.iotdb.cluster.partition.slot.SlotStrategy.DefaultStrategy; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.RaftNode; @@ -78,6 +95,8 @@ public class SlotPartitionTable implements PartitionTable { // last log index that modifies the partition table private long lastLogIndex = -1; + private SlotBalancer slotBalancer = new DefaultSlotBalancer(this); + /** * only used for deserialize. * @@ -105,6 +124,14 @@ public class SlotPartitionTable implements PartitionTable { SlotPartitionTable.slotStrategy = slotStrategy; } + public SlotBalancer getLoadBalancer() { + return slotBalancer; + } + + public void setLoadBalancer(SlotBalancer slotBalancer) { + this.slotBalancer = slotBalancer; + } + private void init(Collection<Node> nodes) { logger.info("Initializing a new partition table"); nodeRing.addAll(nodes); @@ -167,7 +194,7 @@ public class SlotPartitionTable implements PartitionTable { return ret; } - private PartitionGroup getHeaderGroup(RaftNode raftNode, List<Node> nodeRing) { + public PartitionGroup getHeaderGroup(RaftNode raftNode, List<Node> nodeRing) { PartitionGroup ret = new PartitionGroup(raftNode.getRaftId()); // assuming the nodes are [1,2,3,4,5] @@ -284,7 +311,7 @@ public class SlotPartitionTable implements PartitionTable { // the slots movement is only done logically, the new node itself will pull data from the // old node - moveSlotsToNew(node, oldRing); + slotBalancer.moveSlotsToNew(node, oldRing); } @@ -304,55 +331,6 @@ public class SlotPartitionTable implements PartitionTable { return result; } - - /** - * Move last slots from each group whose slot number is bigger than the new average to the new - * node. - * - * @param newNode - */ - private void moveSlotsToNew(Node newNode, List<Node> oldRing) { - // as a node is added, the average slots for each node decrease - // move the slots to the new node if any previous node have more slots than the new average - int newAvg = totalSlotNumbers / nodeRing.size() / multiRaftFactor; - int raftId = 0; - for (int i = 0; i < multiRaftFactor; i++) { - RaftNode raftNode = new RaftNode(newNode, i); - nodeSlotMap.putIfAbsent(raftNode, new ArrayList<>()); - previousNodeMap.putIfAbsent(raftNode, new HashMap<>()); - } - for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) { - List<Integer> slots = entry.getValue(); - int transferNum = slots.size() - newAvg; - if (transferNum > 0) { - RaftNode curNode = new RaftNode(newNode, raftId); - int numToMove = transferNum; - if (raftId != multiRaftFactor - 1) { - numToMove = Math.min(numToMove, newAvg - nodeSlotMap.get(curNode).size()); - } - List<Integer> slotsToMove = slots - .subList(slots.size() - transferNum, slots.size() - transferNum + numToMove); - nodeSlotMap.get(curNode).addAll(slotsToMove); - for (Integer slot : slotsToMove) { - // record what node previously hold the integer - previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing)); - slotNodes[slot] = curNode; - } - transferNum -= numToMove; - if (transferNum > 0) { - curNode = new RaftNode(newNode, ++raftId); - slotsToMove = slots.subList(slots.size() - transferNum, slots.size()); - nodeSlotMap.get(curNode).addAll(slotsToMove); - for (Integer slot : slotsToMove) { - // record what node previously hold the integer - previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing)); - slotNodes[slot] = curNode; - } - } - } - } - } - @Override public List<PartitionGroup> getLocalGroups() { return localGroups; @@ -381,8 +359,8 @@ public class SlotPartitionTable implements PartitionTable { Map<Integer, PartitionGroup> prevHolders = nodeMapEntry.getValue(); dataOutputStream.writeInt(prevHolders.size()); for (Entry<Integer, PartitionGroup> integerNodeEntry : prevHolders.entrySet()) { - dataOutputStream.writeInt(integerNodeEntry.getKey()); integerNodeEntry.getValue().serialize(dataOutputStream); + dataOutputStream.writeInt(integerNodeEntry.getKey()); } } @@ -398,7 +376,7 @@ public class SlotPartitionTable implements PartitionTable { long newLastLogIndex = buffer.getLong(); // judge whether the partition table of byte buffer is out of date - if (lastLogIndex >= newLastLogIndex) { + if (lastLogIndex != -1 && lastLogIndex >= newLastLogIndex) { return lastLogIndex <= newLastLogIndex; } lastLogIndex = newLastLogIndex; @@ -455,6 +433,10 @@ public class SlotPartitionTable implements PartitionTable { return nodeRing; } + public Map<RaftNode, Map<Integer, PartitionGroup>> getPreviousNodeMap() { + return previousNodeMap; + } + public Map<Integer, PartitionGroup> getPreviousNodeMap(RaftNode raftNode) { return previousNodeMap.get(raftNode); } @@ -488,7 +470,8 @@ public class SlotPartitionTable implements PartitionTable { Objects.equals(nodeRing, that.nodeRing) && Objects.equals(nodeSlotMap, that.nodeSlotMap) && Arrays.equals(slotNodes, that.slotNodes) && - Objects.equals(previousNodeMap, that.previousNodeMap); + Objects.equals(previousNodeMap, that.previousNodeMap) && + lastLogIndex == that.lastLogIndex; } @Override @@ -548,7 +531,7 @@ public class SlotPartitionTable implements PartitionTable { // the slots movement is only done logically, the new node itself will pull data from the // old node - Map<RaftNode, List<Integer>> raftNodeListMap = retrieveSlots(target); + Map<RaftNode, List<Integer>> raftNodeListMap = slotBalancer.retrieveSlots(target); result.addNewSlotOwners(raftNodeListMap); this.nodeRemovalResult = result; } @@ -559,22 +542,6 @@ public class SlotPartitionTable implements PartitionTable { return nodeRemovalResult; } - private Map<RaftNode, List<Integer>> retrieveSlots(Node target) { - Map<RaftNode, List<Integer>> newHolderSlotMap = new HashMap<>(); - for(int raftId = 0 ; raftId < multiRaftFactor; raftId++) { - RaftNode raftNode = new RaftNode(target, raftId); - List<Integer> slots = nodeSlotMap.remove(raftNode); - for (int i = 0; i < slots.size(); i++) { - int slot = slots.get(i); - RaftNode newHolder = new RaftNode(nodeRing.get(i % nodeRing.size()), raftId); - slotNodes[slot] = newHolder; - nodeSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot); - newHolderSlotMap.computeIfAbsent(newHolder, n -> new ArrayList<>()).add(slot); - } - } - return newHolderSlotMap; - } - @Override public List<PartitionGroup> getGlobalGroups() { // preventing a thread from getting incomplete globalGroups @@ -609,4 +576,8 @@ public class SlotPartitionTable implements PartitionTable { public synchronized void setLastLogIndex(long lastLogIndex) { this.lastLogIndex = Math.max(this.lastLogIndex, lastLogIndex); } + + public RaftNode[] getSlotNodes() { + return slotNodes; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotStrategy.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotStrategy.java index b5a45c0..5144bcb 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotStrategy.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotStrategy.java @@ -1,8 +1,22 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or ag [...] + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ - package org.apache.iotdb.cluster.partition.slot; import static org.apache.iotdb.cluster.config.ClusterConstant.HASH_SALT; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java index 0c8cf25..487b88d 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java @@ -144,7 +144,7 @@ public class ClusterPlanRouter { throw new UnsupportedPlanException(plan); } - private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan) + protected Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan) throws UnknownLogTypeException, UnsupportedPlanException { Map<PhysicalPlan, PartitionGroup> result = new HashMap<>(); Log log = LogParser.getINSTANCE().parse(plan.getLog()); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java index 16c1da6..3d233ed 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java @@ -494,7 +494,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async public void preAddNodeForDataGroup(AddNodeLog log, DataGroupMember targetDataGroupMember) { // Make sure the previous add/remove node log has applied - metaGroupMember.syncLeader(); + metaGroupMember.waitUtil(log.getMetaLogIndex() - 1); // Check the validity of the partition table if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) { @@ -601,7 +601,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async public void preRemoveNodeForDataGroup(RemoveNodeLog log, DataGroupMember targetDataGroupMember) { // Make sure the previous add/remove node log has applied - metaGroupMember.syncLeader(); + metaGroupMember.waitUtil(log.getMetaLogIndex() - 1); // Check the validity of the partition table if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index 077d61d..df45b53 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@ -261,9 +261,9 @@ public class DataGroupMember extends RaftMember { } } - public void preAddNode(Node node) { + public boolean preAddNode(Node node) { if (allNodes.contains(node)) { - return; + return false; } synchronized (allNodes) { int insertIndex = -1; @@ -288,6 +288,7 @@ public class DataGroupMember extends RaftMember { // the group logger.debug("{}: Node {} is inserted into the data group {}", name, node, allNodes); } + return insertIndex > 0; } } @@ -299,6 +300,7 @@ public class DataGroupMember extends RaftMember { * otherwise */ public boolean addNode(Node node, NodeAdditionResult result) { + syncLeader(); // mark slots that do not belong to this group any more Set<Integer> lostSlots = ((SlotNodeAdditionResult) result).getLostSlots() @@ -604,9 +606,9 @@ public class DataGroupMember extends RaftMember { * @param partitionId * @param isSeq */ - void closePartition(String storageGroupName, long partitionId, boolean isSeq) { + boolean closePartition(String storageGroupName, long partitionId, boolean isSeq) { if (character != NodeCharacter.LEADER) { - return; + return false; } CloseFileLog log = new CloseFileLog(storageGroupName, partitionId, isSeq); synchronized (logManager) { @@ -618,10 +620,11 @@ public class DataGroupMember extends RaftMember { logger.info("Send the close file request of {} to other nodes", log); } try { - appendLogInGroup(log); + return appendLogInGroup(log); } catch (LogExecutionException e) { logger.error("Cannot close partition {}#{} seq:{}", storageGroupName, partitionId, isSeq, e); } + return false; } public boolean flushFileWhenDoSnapshot( @@ -762,6 +765,8 @@ public class DataGroupMember extends RaftMember { */ @SuppressWarnings("java:S2445") // the reference of allNodes is unchanged public void removeNode(Node removedNode, NodeRemovalResult removalResult) { + syncLeader(); + synchronized (allNodes) { if (allNodes.contains(removedNode)) { // update the group if the deleted node was in it diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index 22520e7..1534323 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -302,20 +302,24 @@ public class MetaGroupMember extends RaftMember { * closed by the method. * */ - public void closePartition(String storageGroupName, long partitionId, boolean isSeq) { + public boolean closePartition(String storageGroupName, long partitionId, boolean isSeq) { RaftNode raftNode = partitionTable.routeToHeaderByTime(storageGroupName, partitionId * StorageEngine.getTimePartitionInterval()); DataGroupMember localDataMember = getLocalDataMember(raftNode); if (localDataMember == null || localDataMember.getCharacter() != NodeCharacter.LEADER) { - return; + return false; } - localDataMember.closePartition(storageGroupName, partitionId, isSeq); + return localDataMember.closePartition(storageGroupName, partitionId, isSeq); } public DataClusterServer getDataClusterServer() { return dataClusterServer; } + public void setDataClusterServer(DataClusterServer dataClusterServer) { + this.dataClusterServer = dataClusterServer; + } + public DataHeartbeatServer getDataHeartbeatServer() { return dataHeartbeatServer; } @@ -864,6 +868,7 @@ public class MetaGroupMember extends RaftMember { addNodeLog.setPartitionTable(partitionTable.serialize()); addNodeLog.setCurrLogTerm(getTerm().get()); addNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1); + addNodeLog.setMetaLogIndex(logManager.getLastLogIndex() + 1); addNodeLog.setNewNode(node); @@ -1062,120 +1067,6 @@ public class MetaGroupMember extends RaftMember { return null; } - /** - * Send the log the all data groups and return a success only when each group's quorum has - * accepted this log. - */ -// private AppendLogResult sendLogToAllGroups(Log log) { -// List<Node> nodeRing = partitionTable.getAllNodes(); -// -// AtomicLong newLeaderTerm = new AtomicLong(term.get()); -// AtomicBoolean leaderShipStale = new AtomicBoolean(false); -// AppendEntryRequest request = buildAppendEntryRequest(log, true); -// -// // ask for votes from each node -// int[] groupRemainings = askGroupVotes(nodeRing, request, leaderShipStale, log, newLeaderTerm); -// -// if (!leaderShipStale.get()) { -// // if all quorums of all groups have received this log, it is considered succeeded. -// for (int remaining : groupRemainings) { -// if (remaining > 0) { -// return AppendLogResult.TIME_OUT; -// } -// } -// } else { -// return AppendLogResult.LEADERSHIP_STALE; -// } -// -// return AppendLogResult.OK; -// } - - /** - * Send "request" to each node in "nodeRing" and when a node returns a success, decrease all - * counters of the groups it is in of "groupRemainings" - * - * @return a int array indicating how many votes are left in each group to make an agreement - */ - @SuppressWarnings({"java:S2445", "java:S2274"}) - // groupRemaining is shared with the handlers, - // and we do not wait infinitely to enable timeouts -// private int[] askGroupVotes(List<Node> nodeRing, -// AppendEntryRequest request, AtomicBoolean leaderShipStale, Log log, -// AtomicLong newLeaderTerm) { -// // each node will be the header of a group, we use the node to represent the group -// int nodeSize = nodeRing.size(); -// // the decreasing counters of how many nodes in a group has received the log, each time a -// // node receive the log, the counters of all groups it is in will decrease by 1 -// int[] groupRemainings = new int[nodeSize]; -// // a group is considered successfully received the log if such members receive the log -// int groupQuorum = REPLICATION_NUM / 2 + 1; -// Arrays.fill(groupRemainings, groupQuorum); -// -// synchronized (groupRemainings) { -// // ask a vote from every node -// for (int i = 0; i < nodeSize; i++) { -// Node node = nodeRing.get(i); -// if (node.equals(thisNode)) { -// // this node automatically gives an agreement, decrease counters of all groups the local -// // node is in -// for (int j = 0; j < REPLICATION_NUM; j++) { -// int groupIndex = i - j; -// if (groupIndex < 0) { -// groupIndex += groupRemainings.length; -// } -// groupRemainings[groupIndex]--; -// } -// } else { -// askRemoteGroupVote(node, groupRemainings, i, leaderShipStale, log, newLeaderTerm, -// request); -// } -// } -// -// try { -// groupRemainings.wait(RaftServer.getWriteOperationTimeoutMS()); -// } catch (InterruptedException e) { -// Thread.currentThread().interrupt(); -// logger.error("Unexpected interruption when waiting for the group votes", e); -// } -// } -// return groupRemainings; -// } - - private void askRemoteGroupVote(Node node, int[] groupRemainings, int nodeIndex, - AtomicBoolean leaderShipStale, Log log, - AtomicLong newLeaderTerm, AppendEntryRequest request) { - AppendGroupEntryHandler handler = new AppendGroupEntryHandler(groupRemainings, - nodeIndex, node, leaderShipStale, log, newLeaderTerm, this); - if (config.isUseAsyncServer()) { - AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(node); - try { - if (client != null) { - client.appendEntry(request, handler); - } - } catch (TException e) { - logger.error("Cannot send log to node {}", node, e); - } - } else { - SyncMetaClient client = (SyncMetaClient) getSyncClient(node); - if (client == null) { - logger.error("No available client for {}", node); - return; - } - getSerialToParallelPool().submit(() -> { - try { - handler.onComplete(client.appendEntry(request)); - } catch (TException e) { - client.getInputProtocol().getTransport().close(); - handler.onError(e); - } finally { - ClientUtils.putBackSyncClient(client); - } - }); - } - - } - - public Set<Node> getIdConflictNodes() { return idConflictNodes; } @@ -1519,7 +1410,7 @@ public class MetaGroupMember extends RaftMember { * * @param planGroupMap sub-plan -> belong data group pairs */ - private TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) { + public TSStatus forwardPlan(Map<PhysicalPlan, PartitionGroup> planGroupMap, PhysicalPlan plan) { // the error codes from the groups that cannot execute the plan TSStatus status; if (planGroupMap.size() == 1) { @@ -2009,6 +1900,7 @@ public class MetaGroupMember extends RaftMember { removeNodeLog.setPartitionTable(partitionTable.serialize()); removeNodeLog.setCurrLogTerm(getTerm().get()); removeNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1); + removeNodeLog.setMetaLogIndex(logManager.getLastLogIndex() + 1); removeNodeLog.setRemovedNode(target); @@ -2203,4 +2095,8 @@ public class MetaGroupMember extends RaftMember { public void setClientProvider(DataClientProvider dataClientProvider) { this.dataClientProvider = dataClientProvider; } + + public void setRouter(ClusterPlanRouter router) { + this.router = router; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 57f22bc..121892b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -275,6 +275,7 @@ public abstract class RaftMember { this.asyncHeartbeatClientPool = asyncHeartbeatPool; this.syncHeartbeatClientPool = syncHeartbeatPool; this.asyncSendLogClientPool = asyncSendLogClientPool; + this.stopStatus = new StopStatus(); } /** @@ -836,8 +837,6 @@ public abstract class RaftMember { * @return true if this node has caught up before timeout, false otherwise */ private boolean waitUntilCatchUp() { - long startTime = System.currentTimeMillis(); - long waitedTime = 0; long leaderCommitId; try { leaderCommitId = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer() ? @@ -854,16 +853,25 @@ public abstract class RaftMember { logger.error(MSG_NO_LEADER_COMMIT_INDEX, name, leader.get(), e); return false; } + logger.debug("Start to sync with leader, leader commit id is {}", leaderCommitId); + return waitUtil(leaderCommitId); + } + /** + * Wait until local commit index becomes not less than target log index + */ + public boolean waitUtil(long targetLogIndex) { + long startTime = System.currentTimeMillis(); + long waitedTime = 0; while (waitedTime < RaftServer.getSyncLeaderMaxWaitMs()) { try { long localAppliedId = logManager.getMaxHaveAppliedCommitIndex(); - logger.debug("{}: synchronizing commitIndex {}/{}", name, localAppliedId, leaderCommitId); - if (leaderCommitId <= localAppliedId) { + logger.debug("{}: synchronizing commitIndex {}/{}", name, localAppliedId, targetLogIndex); + if (targetLogIndex <= localAppliedId) { // this node has caught up if (logger.isDebugEnabled()) { waitedTime = System.currentTimeMillis() - startTime; - logger.debug("{}: synchronized with the leader after {}ms", name, waitedTime); + logger.debug("{}: synchronized to target index {} after {}ms", name, targetLogIndex, waitedTime); } return true; } @@ -879,7 +887,7 @@ public abstract class RaftMember { logger.error(MSG_NO_LEADER_COMMIT_INDEX, name, leader.get(), e); } } - logger.warn("{}: Failed to synchronize with the leader after {}ms", name, waitedTime); + logger.warn("{}: Failed to synchronize to target index {} after {}ms", name, targetLogIndex, waitedTime); return false; } @@ -911,13 +919,13 @@ public abstract class RaftMember { } else { log = new PhysicalPlanLog(); ((PhysicalPlanLog)log).setPlan(plan); + plan.setIndex(log.getCurrLogIndex()); } // assign term and index to the new log and append it synchronized (logManager) { log.setCurrLogTerm(getTerm().get()); log.setCurrLogIndex(logManager.getLastLogIndex() + 1); - plan.setIndex(log.getCurrLogIndex()); logManager.append(log); } Timer.Statistic.RAFT_SENDER_APPEND_LOG.calOperationCostTimeFromStart(startTime); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java index 51f0c8a..62cd0c8 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java @@ -35,6 +35,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.PreviousFillRequest; import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest; import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp; +import org.apache.iotdb.cluster.rpc.thrift.RaftNode; import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest; import org.apache.iotdb.cluster.server.member.DataGroupMember; import org.apache.iotdb.cluster.server.member.MemberTest; @@ -53,10 +54,10 @@ import org.apache.thrift.async.AsyncMethodCallback; public class TestAsyncDataClient extends AsyncDataClient { private PlanExecutor planExecutor; - private Map<Node, DataGroupMember> dataGroupMemberMap; + private Map<RaftNode, DataGroupMember> dataGroupMemberMap; public TestAsyncDataClient(Node node, - Map<Node, DataGroupMember> dataGroupMemberMap) + Map<RaftNode, DataGroupMember> dataGroupMemberMap) throws IOException { super(null, null, node, null); this.dataGroupMemberMap = dataGroupMemberMap; @@ -70,35 +71,35 @@ public class TestAsyncDataClient extends AsyncDataClient { @Override public void fetchSingleSeries(Node header, int raftId, long readerId, AsyncMethodCallback<ByteBuffer> resultHandler) { - new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeries(header, raftId, readerId, + new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(header,0))).fetchSingleSeries(header, raftId, readerId, resultHandler)).start(); } @Override public void getAggrResult(GetAggrResultRequest request, AsyncMethodCallback<List<ByteBuffer>> resultHandler) { - new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(request.getHeader())).getAggrResult(request, + new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(request.getHeader(),0))).getAggrResult(request, resultHandler)).start(); } @Override public void querySingleSeries(SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler) { - new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(request.getHeader())).querySingleSeries(request, + new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(request.getHeader(),0))).querySingleSeries(request, resultHandler)).start(); } @Override public void fetchSingleSeriesByTimestamp(Node header, int raftId, long readerId, long time, AsyncMethodCallback<ByteBuffer> resultHandler) { - new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).fetchSingleSeriesByTimestamp(header, raftId, + new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(header,0))).fetchSingleSeriesByTimestamp(header, raftId, readerId, time, resultHandler)).start(); } @Override public void getAllPaths(Node header, int raftId, List<String> paths, boolean withAlias, AsyncMethodCallback<GetAllPathsResult> resultHandler) { - new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).getAllPaths(header, raftId, + new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(header,0))).getAllPaths(header, raftId, paths, withAlias, resultHandler)).start(); } @@ -152,39 +153,39 @@ public class TestAsyncDataClient extends AsyncDataClient { @Override public void pullTimeSeriesSchema(PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler) { - new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(request.getHeader())).pullTimeSeriesSchema(request, resultHandler)).start(); + new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(request.getHeader(),0))).pullTimeSeriesSchema(request, resultHandler)).start(); } @Override public void pullMeasurementSchema(PullSchemaRequest request, AsyncMethodCallback<PullSchemaResp> resultHandler) { - new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(request.getHeader())).pullMeasurementSchema(request, + new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(request.getHeader(),0))).pullMeasurementSchema(request, resultHandler)).start(); } @Override public void querySingleSeriesByTimestamp(SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler) { - new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(request.getHeader())).querySingleSeriesByTimestamp(request, + new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(request.getHeader(),0))).querySingleSeriesByTimestamp(request, resultHandler)).start(); } @Override public void getGroupByExecutor(GroupByRequest request, AsyncMethodCallback<Long> resultHandler) { - new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(request.getHeader())).getGroupByExecutor(request, + new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(request.getHeader(),0))).getGroupByExecutor(request, resultHandler)).start(); } @Override public void getGroupByResult(Node header, int raftId, long executorId, long startTime, long endTime, AsyncMethodCallback<List<ByteBuffer>> resultHandler) { - new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(header)).getGroupByResult(header, raftId, executorId, + new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(header,0))).getGroupByResult(header, raftId, executorId, startTime, endTime, resultHandler)).start(); } @Override public void previousFill(PreviousFillRequest request, AsyncMethodCallback<ByteBuffer> resultHandler) { - new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(request.getHeader())).previousFill(request, resultHandler)).start(); + new Thread(() -> new DataAsyncService(dataGroupMemberMap.get(new RaftNode(request.getHeader(),0))).previousFill(request, resultHandler)).start(); } } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java index 1f1f3ba..17d04a1 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java @@ -103,6 +103,8 @@ public class TestUtils { .setReplicationNumber(ClusterDescriptor.getInstance().getConfig().getReplicationNum()); startUpStatus .setClusterName(ClusterDescriptor.getInstance().getConfig().getClusterName()); + startUpStatus + .setMultiRaftFactor(ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor()); List<Node> seedNodeList = new ArrayList<>(); for (int i = 0; i < 100; i += 10) { seedNodeList.add(getNode(i)); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java index ca4fd92..fbffb7f 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java @@ -64,6 +64,7 @@ import org.apache.iotdb.cluster.partition.PartitionGroup; import org.apache.iotdb.cluster.partition.slot.SlotNodeAdditionResult; import org.apache.iotdb.cluster.partition.slot.SlotNodeRemovalResult; import org.apache.iotdb.cluster.query.RemoteQueryContext; +import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; import org.apache.iotdb.cluster.rpc.thrift.ElectionRequest; import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult; import org.apache.iotdb.cluster.rpc.thrift.GroupByRequest; @@ -71,6 +72,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest; import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest; import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp; +import org.apache.iotdb.cluster.rpc.thrift.RaftNode; import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient; import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest; import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest; @@ -165,9 +167,9 @@ public class DataGroupMemberTest extends MemberTest { }; } - DataGroupMember getDataGroupMember(Node node) { - PartitionGroup nodes = partitionTable.getHeaderGroup(node); - return dataGroupMemberMap.computeIfAbsent(node, n -> getDataGroupMember(n, nodes)); + DataGroupMember getDataGroupMember(RaftNode raftNode) { + PartitionGroup nodes = partitionTable.getHeaderGroup(raftNode.getNode()); + return dataGroupMemberMap.computeIfAbsent(raftNode, n -> getDataGroupMember(n.getNode(), nodes)); } private DataGroupMember getDataGroupMember(Node node, PartitionGroup nodes) { @@ -179,6 +181,11 @@ public class DataGroupMemberTest extends MemberTest { } @Override + public long appendEntry(AppendEntryRequest request) { + return Response.RESPONSE_AGREE; + } + + @Override public void updateHardState(long currentTerm, Node leader) { } @@ -267,16 +274,25 @@ public class DataGroupMemberTest extends MemberTest { try { Node newNodeBeforeGroup = TestUtils.getNode(-5); + assertFalse(firstMember.preAddNode(newNodeBeforeGroup)); + assertFalse(midMember.preAddNode(newNodeBeforeGroup)); + assertFalse(lastMember.preAddNode(newNodeBeforeGroup)); assertFalse(firstMember.addNode(newNodeBeforeGroup, result)); assertFalse(midMember.addNode(newNodeBeforeGroup, result)); assertFalse(lastMember.addNode(newNodeBeforeGroup, result)); Node newNodeInGroup = TestUtils.getNode(66); + assertTrue(firstMember.preAddNode(newNodeInGroup)); + assertTrue(midMember.preAddNode(newNodeInGroup)); + assertTrue(lastMember.preAddNode(newNodeInGroup)); assertFalse(firstMember.addNode(newNodeInGroup, result)); assertFalse(midMember.addNode(newNodeInGroup, result)); assertTrue(lastMember.addNode(newNodeInGroup, result)); Node newNodeAfterGroup = TestUtils.getNode(101); + assertFalse(firstMember.preAddNode(newNodeAfterGroup)); + assertFalse(midMember.preAddNode(newNodeAfterGroup)); + assertFalse(lastMember.preAddNode(newNodeAfterGroup)); assertFalse(firstMember.addNode(newNodeAfterGroup, result)); assertFalse(midMember.addNode(newNodeAfterGroup, result)); } finally { @@ -480,6 +496,7 @@ public class DataGroupMemberTest extends MemberTest { @Test public void testForwardPullSnapshot() { System.out.println("Start testForwardPullSnapshot()"); + hasInitialSnapshots = true; dataGroupMember.setCharacter(NodeCharacter.FOLLOWER); dataGroupMember.setLeader(TestUtils.getNode(1)); PullSnapshotRequest request = new PullSnapshotRequest(); @@ -904,13 +921,15 @@ public class DataGroupMemberTest extends MemberTest { dataGroupMember.start(); try { + dataGroupMember.preRemoveNode(nodeToRemove); dataGroupMember.removeNode(nodeToRemove, nodeRemovalResult); assertEquals(NodeCharacter.ELECTOR, dataGroupMember.getCharacter()); assertEquals(Long.MIN_VALUE, dataGroupMember.getLastHeartbeatReceivedTime()); assertTrue(dataGroupMember.getAllNodes().contains(TestUtils.getNode(30))); assertFalse(dataGroupMember.getAllNodes().contains(nodeToRemove)); - List<Integer> newSlots = nodeRemovalResult.getNewSlotOwners().get(TestUtils.getNode(0)); + List<Integer> newSlots = nodeRemovalResult.getNewSlotOwners() + .get(new RaftNode(TestUtils.getNode(0), raftId)); while (newSlots.size() != pulledSnapshots.size()) { } @@ -933,13 +952,14 @@ public class DataGroupMemberTest extends MemberTest { dataGroupMember.start(); try { + dataGroupMember.preRemoveNode(nodeToRemove); dataGroupMember.removeNode(nodeToRemove, nodeRemovalResult); assertEquals(0, dataGroupMember.getLastHeartbeatReceivedTime()); assertTrue(dataGroupMember.getAllNodes().contains(TestUtils.getNode(30))); assertFalse(dataGroupMember.getAllNodes().contains(nodeToRemove)); List<Integer> newSlots = - ((SlotNodeRemovalResult) nodeRemovalResult).getNewSlotOwners().get(TestUtils.getNode(0)); + ((SlotNodeRemovalResult) nodeRemovalResult).getNewSlotOwners().get(new RaftNode(TestUtils.getNode(0), 0)); while (newSlots.size() != pulledSnapshots.size()) { } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java index 6065370..8c1beef 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MemberTest.java @@ -22,6 +22,7 @@ package org.apache.iotdb.cluster.server.member; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -50,8 +51,10 @@ import org.apache.iotdb.cluster.metadata.MetaPuller; import org.apache.iotdb.cluster.partition.PartitionGroup; import org.apache.iotdb.cluster.partition.PartitionTable; import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable; +import org.apache.iotdb.cluster.query.ClusterPlanRouter; import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; import org.apache.iotdb.cluster.rpc.thrift.Node; +import org.apache.iotdb.cluster.rpc.thrift.RaftNode; import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient; import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus; import org.apache.iotdb.cluster.server.NodeCharacter; @@ -60,6 +63,8 @@ import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.qp.executor.PlanExecutor; +import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.db.qp.physical.sys.LogPlan; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.SchemaUtils; import org.apache.thrift.async.AsyncMethodCallback; @@ -73,7 +78,7 @@ public class MemberTest { public static AtomicLong dummyResponse = new AtomicLong(Response.RESPONSE_AGREE); - Map<Node, DataGroupMember> dataGroupMemberMap; + Map<RaftNode, DataGroupMember> dataGroupMemberMap; private Map<Node, MetaGroupMember> metaGroupMemberMap; PartitionGroup allNodes; protected MetaGroupMember testMetaMember; @@ -95,7 +100,6 @@ public class MemberTest { prevUseAsyncServer = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(); preLogBufferSize = ClusterDescriptor.getInstance().getConfig().getRaftLogBufferSize(); ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true); - ClusterDescriptor.getInstance().getConfig().setRaftLogBufferSize(4096); testThreadPool = Executors.newFixedThreadPool(4); prevLeaderWait = RaftMember.getWaitLeaderTimeMs(); RaftMember.setWaitLeaderTimeMs(10); @@ -105,7 +109,12 @@ public class MemberTest { allNodes.add(TestUtils.getNode(i)); } - partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0)); + partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0)) { + @Override + public RaftNode routeToHeaderByTime(String storageGroupName, long timestamp) { + return new RaftNode(TestUtils.getNode(0), 0); + } + }; dataGroupMemberMap = new HashMap<>(); metaGroupMemberMap = new HashMap<>(); @@ -170,11 +179,15 @@ public class MemberTest { } DataGroupMember getDataGroupMember(Node node) { + return getDataGroupMember(new RaftNode(node, 0)); + } + + DataGroupMember getDataGroupMember(RaftNode node) { return dataGroupMemberMap.computeIfAbsent(node, this::newDataGroupMember); } - private DataGroupMember newDataGroupMember(Node node) { - DataGroupMember newMember = new TestDataGroupMember(node, partitionTable.getHeaderGroup(node)) { + private DataGroupMember newDataGroupMember(RaftNode raftNode) { + DataGroupMember newMember = new TestDataGroupMember(raftNode.getNode(), partitionTable.getHeaderGroup(raftNode)) { @Override public boolean syncLeader() { @@ -200,9 +213,9 @@ public class MemberTest { return getAsyncClient(node); } }; - newMember.setThisNode(node); + newMember.setThisNode(raftNode.getNode()); newMember.setMetaGroupMember(testMetaMember); - newMember.setLeader(node); + newMember.setLeader(raftNode.getNode()); newMember.setCharacter(NodeCharacter.LEADER); newMember .setLogManager( @@ -232,11 +245,16 @@ public class MemberTest { @Override public DataGroupMember getLocalDataMember(Node header, int raftId, Object request) { - return getDataGroupMember(header); + return getDataGroupMember(new RaftNode(header, raftId)); } @Override public DataGroupMember getLocalDataMember(Node header, int raftId) { + return getDataGroupMember(new RaftNode(header, raftId)); + } + + @Override + public DataGroupMember getLocalDataMember(RaftNode header) { return getDataGroupMember(header); } diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java index a1e563e..94f5067 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java @@ -51,6 +51,8 @@ import org.apache.iotdb.cluster.client.async.AsyncDataClient; import org.apache.iotdb.cluster.common.TestAsyncClient; import org.apache.iotdb.cluster.common.TestAsyncDataClient; import org.apache.iotdb.cluster.common.TestAsyncMetaClient; +import org.apache.iotdb.cluster.common.TestLogApplier; +import org.apache.iotdb.cluster.common.TestLogManager; import org.apache.iotdb.cluster.common.TestPartitionedLogManager; import org.apache.iotdb.cluster.common.TestSnapshot; import org.apache.iotdb.cluster.common.TestUtils; @@ -60,6 +62,9 @@ import org.apache.iotdb.cluster.exception.EmptyIntervalException; import org.apache.iotdb.cluster.exception.LogExecutionException; import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException; import org.apache.iotdb.cluster.exception.StartUpCheckFailureException; +import org.apache.iotdb.cluster.exception.UnknownLogTypeException; +import org.apache.iotdb.cluster.exception.UnsupportedPlanException; +import org.apache.iotdb.cluster.log.Log; import org.apache.iotdb.cluster.log.logtypes.AddNodeLog; import org.apache.iotdb.cluster.log.logtypes.CloseFileLog; import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog; @@ -68,6 +73,7 @@ import org.apache.iotdb.cluster.metadata.CMManager; import org.apache.iotdb.cluster.partition.PartitionGroup; import org.apache.iotdb.cluster.partition.PartitionTable; import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable; +import org.apache.iotdb.cluster.query.ClusterPlanRouter; import org.apache.iotdb.cluster.query.LocalQueryExecutor; import org.apache.iotdb.cluster.query.RemoteQueryContext; import org.apache.iotdb.cluster.query.manage.QueryCoordinator; @@ -82,6 +88,7 @@ import org.apache.iotdb.cluster.rpc.thrift.HeartBeatResponse; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest; import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp; +import org.apache.iotdb.cluster.rpc.thrift.RaftNode; import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient; import org.apache.iotdb.cluster.rpc.thrift.SendSnapshotRequest; import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus; @@ -113,6 +120,7 @@ import org.apache.iotdb.db.qp.executor.PlanExecutor; import org.apache.iotdb.db.qp.physical.PhysicalPlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.LogPlan; import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.db.query.control.QueryResourceManager; @@ -133,6 +141,7 @@ import org.apache.thrift.protocol.TCompactProtocol.Factory; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; public class MetaGroupMemberTest extends MemberTest { @@ -177,6 +186,12 @@ public class MetaGroupMemberTest extends MemberTest { buildDataGroups(dataClusterServer); testMetaMember.getThisNode().setNodeIdentifier(0); + testMetaMember.setRouter(new ClusterPlanRouter(testMetaMember.getPartitionTable()){ + @Override + protected Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan) { + return Collections.singletonMap(plan, partitionTable.getHeaderGroup(testMetaMember.getThisNode())); + } + }); mockDataClusterServer = false; QueryCoordinator.getINSTANCE().setMetaGroupMember(testMetaMember); exiledNode = null; @@ -207,6 +222,11 @@ public class MetaGroupMemberTest extends MemberTest { } @Override + protected AppendLogResult sendLogToFollowers(Log log) { + return AppendLogResult.OK; + } + + @Override public AsyncClient getAsyncClient(Node node) { return getClient(node); } @@ -287,6 +307,7 @@ public class MetaGroupMemberTest extends MemberTest { return resp; } + @Override protected MetaGroupMember getMetaGroupMember(Node node) throws QueryProcessException { MetaGroupMember metaGroupMember = new MetaGroupMember(new Factory(), node) { @@ -309,11 +330,16 @@ public class MetaGroupMemberTest extends MemberTest { @Override public DataGroupMember getLocalDataMember(Node header, int raftId, Object request) { - return getDataGroupMember(header); + return getDataGroupMember(new RaftNode(header, raftId)); } @Override public DataGroupMember getLocalDataMember(Node header, int raftId) { + return getDataGroupMember(new RaftNode(header, raftId)); + } + + @Override + public DataGroupMember getLocalDataMember(RaftNode header) { return getDataGroupMember(header); } @@ -433,7 +459,7 @@ public class MetaGroupMemberTest extends MemberTest { @Override public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) { new Thread(() -> { - testMetaMember.applyRemoveNode(new RemoveNodeLog(TestUtils.seralizePartitionTable, node)); + testMetaMember.applyRemoveNode(new RemoveNodeLog(partitionTable.serialize(), node)); resultHandler.onComplete(Response.RESPONSE_AGREE); }).start(); } @@ -503,7 +529,7 @@ public class MetaGroupMemberTest extends MemberTest { } ExecutorService testThreadPool = Executors.newFixedThreadPool(4); - testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true); + assertTrue(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true)); StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(new PartialPath(TestUtils.getTestSg(0))); @@ -522,15 +548,13 @@ public class MetaGroupMemberTest extends MemberTest { // the net work is down dummyResponse.set(Long.MIN_VALUE); - // network resume in 100ms - new Thread(() -> { - await().atLeast(200, TimeUnit.MILLISECONDS); - dummyResponse.set(Response.RESPONSE_AGREE); - }).start(); - System.out.println("Close the first file"); + assertFalse(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true)); + assertFalse(processor.getWorkSequenceTsFileProcessors().isEmpty()); - testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true); + // network resume in 100ms + dummyResponse.set(Response.RESPONSE_AGREE); + assertTrue(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true)); assertTrue(processor.getWorkSequenceTsFileProcessors().isEmpty()); System.out.println("Create the second file"); @@ -540,10 +564,11 @@ public class MetaGroupMemberTest extends MemberTest { PlanExecutor planExecutor = new PlanExecutor(); planExecutor.processNonQuery(insertPlan); } + // indicating the leader is stale System.out.println("Close the second file"); dummyResponse.set(100); - testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true); + assertFalse(testMetaMember.closePartition(TestUtils.getTestSg(0), 0, true)); assertFalse(processor.getWorkSequenceTsFileProcessors().isEmpty()); } finally { RaftServer.setConnectionTimeoutInMS(prevTimeout); @@ -554,9 +579,10 @@ public class MetaGroupMemberTest extends MemberTest { @Test public void testAddNode() { System.out.println("Start testAddNode()"); - Node newNode = TestUtils.getNode(10); + Node newNode = TestUtils.getNode(11); + testMetaMember.getPartitionTable().addNode(newNode); testMetaMember.onElectionWins(); - testMetaMember.applyAddNode(new AddNodeLog(TestUtils.seralizePartitionTable, newNode)); + testMetaMember.applyAddNode(new AddNodeLog(testMetaMember.getPartitionTable().serialize(), newNode)); assertTrue(partitionTable.getAllNodes().contains(newNode)); } @@ -867,6 +893,7 @@ public class MetaGroupMemberTest extends MemberTest { public void testProcessValidHeartbeatReq() throws QueryProcessException { System.out.println("Start testProcessValidHeartbeatReq()"); MetaGroupMember testMetaMember = getMetaGroupMember(TestUtils.getNode(10)); + partitionTable = new SlotPartitionTable(allNodes, TestUtils.getNode(0)); try { HeartBeatRequest request = new HeartBeatRequest(); request.setRequireIdentifier(true); @@ -1105,6 +1132,7 @@ public class MetaGroupMemberTest extends MemberTest { AtomicReference<Long> resultRef = new AtomicReference<>(); testMetaMember.setLeader(testMetaMember.getThisNode()); testMetaMember.setCharacter(LEADER); + doRemoveNode(resultRef, testMetaMember.getThisNode()); assertEquals(Response.RESPONSE_AGREE, (long) resultRef.get()); @@ -1203,6 +1231,7 @@ public class MetaGroupMemberTest extends MemberTest { @Override public void onError(Exception e) { + resultRef.set(-1L); e.printStackTrace(); } }); diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java index 725c803..bdc19c5 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java @@ -44,6 +44,7 @@ public class LogPlan extends PhysicalPlan { } public ByteBuffer getLog() { + log.clear(); return log; }
