This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster_scalability in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 452cba315bc66688684181e47fbff0cfcd94bf3a Author: lta <[email protected]> AuthorDate: Wed Jan 13 15:49:28 2021 +0800 Reimplement the function of adding and removing nodes --- .../iotdb/cluster/client/DataClientProvider.java | 3 - ...ception.java => ChangeMembershipException.java} | 14 +- .../exception/CheckConsistencyException.java | 4 +- .../java/org/apache/iotdb/cluster/log/Log.java | 1 + .../iotdb/cluster/log/applier/DataLogApplier.java | 8 +- .../iotdb/cluster/log/applier/MetaLogApplier.java | 32 ++- .../iotdb/cluster/log/logtypes/AddNodeLog.java | 40 +++- .../iotdb/cluster/log/logtypes/RemoveNodeLog.java | 130 +++++++----- .../manage/FilePartitionedSnapshotLogManager.java | 12 ++ .../log/manage/MetaSingleSnapshotLogManager.java | 19 ++ .../iotdb/cluster/log/manage/RaftLogManager.java | 6 +- .../cluster/log/snapshot/PullSnapshotTask.java | 5 +- .../iotdb/cluster/partition/NodeRemovalResult.java | 32 +++ .../iotdb/cluster/partition/PartitionGroup.java | 21 ++ .../iotdb/cluster/partition/PartitionTable.java | 17 +- .../partition/slot/SlotNodeRemovalResult.java | 38 +++- .../cluster/partition/slot/SlotPartitionTable.java | 93 +++++---- .../iotdb/cluster/query/ClusterPlanRouter.java | 31 ++- .../iotdb/cluster/server/DataClusterServer.java | 31 +++ .../iotdb/cluster/server/MetaClusterServer.java | 8 +- .../cluster/server/member/DataGroupMember.java | 82 +++++--- .../cluster/server/member/MetaGroupMember.java | 229 +++++++++++---------- .../iotdb/cluster/server/member/RaftMember.java | 24 ++- .../cluster/server/service/MetaAsyncService.java | 8 +- .../cluster/server/service/MetaSyncService.java | 8 +- .../apache/iotdb/cluster/utils/StatusUtils.java | 4 + .../org/apache/iotdb/cluster/common/TestUtils.java | 4 + .../apache/iotdb/cluster/log/LogParserTest.java | 2 + .../cluster/log/applier/MetaLogApplierTest.java | 12 +- .../cluster/log/logtypes/SerializeLogTest.java | 2 + .../cluster/partition/SlotPartitionTableTest.java | 3 +- .../server/heartbeat/MetaHeartbeatThreadTest.java | 25 ++- .../cluster/server/member/DataGroupMemberTest.java | 7 +- .../cluster/server/member/MetaGroupMemberTest.java | 8 +- .../engine/storagegroup/StorageGroupProcessor.java | 40 +++- .../apache/iotdb/db/qp/physical/PhysicalPlan.java | 7 +- .../apache/iotdb/db/qp/physical/sys/LogPlan.java | 71 +++++++ .../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 +- thrift/src/main/thrift/cluster.thrift | 3 +- 39 files changed, 781 insertions(+), 306 deletions(-) diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java index 9a1c4df..4c882e7 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java @@ -29,12 +29,9 @@ import org.apache.iotdb.cluster.config.ClusterDescriptor; import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.thrift.TException; import org.apache.thrift.protocol.TProtocolFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class DataClientProvider { - private static final Logger logger = LoggerFactory.getLogger(DataClientProvider.class); /** * dataClientPool provides reusable thrift clients to connect to the DataGroupMembers of other * nodes diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java b/cluster/src/main/java/org/apache/iotdb/cluster/exception/ChangeMembershipException.java similarity index 64% copy from cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java copy to cluster/src/main/java/org/apache/iotdb/cluster/exception/ChangeMembershipException.java index 12ac407..f50e668 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/exception/ChangeMembershipException.java @@ -16,19 +16,15 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.iotdb.cluster.exception; /** - * Raised when check consistency failed, now only happens if there is a strong-consistency and - * syncLeader failed + * Raised when add/remove membership log can not be sent to all data groups */ -public class CheckConsistencyException extends Exception { +public class ChangeMembershipException extends Exception { - public CheckConsistencyException(String errMag) { - super(String.format("check consistency failed, error message=%s ", errMag)); + public ChangeMembershipException(String errMsg) { + super(String.format("change membership fail, error message=%s ", errMsg)); } - - public static final CheckConsistencyException CHECK_STRONG_CONSISTENCY_EXCEPTION = - new CheckConsistencyException( - "strong consistency, sync with leader failed"); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java b/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java index 12ac407..7b0609a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/exception/CheckConsistencyException.java @@ -24,8 +24,8 @@ package org.apache.iotdb.cluster.exception; */ public class CheckConsistencyException extends Exception { - public CheckConsistencyException(String errMag) { - super(String.format("check consistency failed, error message=%s ", errMag)); + public CheckConsistencyException(String errMsg) { + super(String.format("check consistency failed, error message=%s ", errMsg)); } public static final CheckConsistencyException CHECK_STRONG_CONSISTENCY_EXCEPTION = diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java index 0c236b2..2903fe9 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/Log.java @@ -127,4 +127,5 @@ public abstract class Log implements Comparable<Log> { public void setEnqueueTime(long enqueueTime) { this.enqueueTime = enqueueTime; } + } \ No newline at end of file diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java index 8ce84b5..ceed787 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/DataLogApplier.java @@ -22,8 +22,10 @@ package org.apache.iotdb.cluster.log.applier; import org.apache.iotdb.cluster.config.ClusterConstant; import org.apache.iotdb.cluster.exception.CheckConsistencyException; import org.apache.iotdb.cluster.log.Log; +import org.apache.iotdb.cluster.log.logtypes.AddNodeLog; import org.apache.iotdb.cluster.log.logtypes.CloseFileLog; import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog; +import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog; import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable; import org.apache.iotdb.cluster.server.member.DataGroupMember; import org.apache.iotdb.cluster.server.member.MetaGroupMember; @@ -60,7 +62,11 @@ public class DataLogApplier extends BaseApplier { logger.debug("DataMember [{}] start applying Log {}", dataGroupMember.getName(), log); try { - if (log instanceof PhysicalPlanLog) { + if (log instanceof AddNodeLog) { + metaGroupMember.getDataClusterServer().preAddNodeForDataGroup((AddNodeLog) log, dataGroupMember); + } else if (log instanceof RemoveNodeLog) { + metaGroupMember.getDataClusterServer().preRemoveNodeForDataGroup((RemoveNodeLog) log, dataGroupMember); + } else if (log instanceof PhysicalPlanLog) { PhysicalPlanLog physicalPlanLog = (PhysicalPlanLog) log; PhysicalPlan plan = physicalPlanLog.getPlan(); if (plan instanceof InsertPlan) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java index d7dd5f9..94437ae 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/MetaLogApplier.java @@ -19,15 +19,18 @@ package org.apache.iotdb.cluster.log.applier; +import org.apache.iotdb.cluster.exception.ChangeMembershipException; import org.apache.iotdb.cluster.log.Log; import org.apache.iotdb.cluster.log.logtypes.AddNodeLog; import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog; import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog; -import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.qp.physical.sys.LogPlan; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,25 +49,40 @@ public class MetaLogApplier extends BaseApplier { @Override public void apply(Log log) { + apply(log, false); + } + + public void apply(Log log, boolean isLeader) { try { logger.debug("MetaMember [{}] starts applying Log {}", metaGroupMember.getName(), log); if (log instanceof AddNodeLog) { - AddNodeLog addNodeLog = (AddNodeLog) log; - Node newNode = addNodeLog.getNewNode(); - member.applyAddNode(newNode); + if (isLeader) { + sendLogToAllDataGroups(log); + } + member.applyAddNode((AddNodeLog) log); } else if (log instanceof PhysicalPlanLog) { applyPhysicalPlan(((PhysicalPlanLog) log).getPlan(), null); } else if (log instanceof RemoveNodeLog) { - RemoveNodeLog removeNodeLog = ((RemoveNodeLog) log); - member.applyRemoveNode(removeNodeLog.getRemovedNode()); + if (isLeader) { + sendLogToAllDataGroups(log); + } + member.applyRemoveNode(((RemoveNodeLog) log)); } else { logger.error("Unsupported log: {} {}", log.getClass().getName(), log); } - } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException e) { + } catch (StorageEngineException | StorageGroupNotSetException | QueryProcessException | ChangeMembershipException e) { logger.debug("Exception occurred when executing {}", log, e); log.setException(e); } finally { log.setApplied(true); } } + + private void sendLogToAllDataGroups(Log log) throws ChangeMembershipException { + LogPlan plan = new LogPlan(log.serialize()); + TSStatus status = member.executeNonQueryPlan(plan); + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new ChangeMembershipException(String.format("apply %s failed", log)); + } + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java index f54725d..824c3f2 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/AddNodeLog.java @@ -33,16 +33,34 @@ import org.apache.iotdb.db.utils.SerializeUtils; */ public class AddNodeLog extends Log { + private ByteBuffer partitionTable; + private Node newNode; - public Node getNewNode() { - return newNode; + public AddNodeLog(ByteBuffer partitionTable, Node newNode) { + this.partitionTable = partitionTable; + this.newNode = newNode; + } + + public AddNodeLog() { + } + + public void setPartitionTable(ByteBuffer partitionTable) { + this.partitionTable = partitionTable; } public void setNewNode(Node newNode) { this.newNode = newNode; } + public Node getNewNode() { + return newNode; + } + + public ByteBuffer getPartitionTable() { + return partitionTable; + } + @Override public ByteBuffer serialize() { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); @@ -52,6 +70,9 @@ public class AddNodeLog extends Log { dataOutputStream.writeLong(getCurrLogTerm()); SerializeUtils.serialize(newNode, dataOutputStream); + + dataOutputStream.write(partitionTable.array().length); + dataOutputStream.write(partitionTable.array()); } catch (IOException e) { // ignored } @@ -69,6 +90,9 @@ public class AddNodeLog extends Log { newNode = new Node(); SerializeUtils.deserialize(newNode, buffer); + + int len = buffer.getInt(); + partitionTable = ByteBuffer.wrap(buffer.array(), buffer.position(), len); } @Override @@ -83,11 +107,19 @@ public class AddNodeLog extends Log { return false; } AddNodeLog that = (AddNodeLog) o; - return Objects.equals(newNode, that.newNode); + return Objects.equals(newNode, that.newNode) && Objects + .equals(partitionTable, that.partitionTable); + } + + @Override + public String toString() { + return "AddNodeLog{" + + "newNode=" + newNode + + '}'; } @Override public int hashCode() { - return Objects.hash(super.hashCode(), newNode); + return Objects.hash(super.hashCode(), newNode, partitionTable); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java index 02d89d0..800b77d 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/logtypes/RemoveNodeLog.java @@ -19,69 +19,101 @@ package org.apache.iotdb.cluster.log.logtypes; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Objects; import org.apache.iotdb.cluster.log.Log; import org.apache.iotdb.cluster.rpc.thrift.Node; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.nio.ByteBuffer; import org.apache.iotdb.db.utils.SerializeUtils; public class RemoveNodeLog extends Log { - private Node removedNode; - - @Override - public ByteBuffer serialize() { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { - dataOutputStream.writeByte(Types.REMOVE_NODE.ordinal()); - dataOutputStream.writeLong(getCurrLogIndex()); - dataOutputStream.writeLong(getCurrLogTerm()); - - SerializeUtils.serialize(removedNode, dataOutputStream); - } catch (IOException e) { - // ignored - } - return ByteBuffer.wrap(byteArrayOutputStream.toByteArray()); - } + private ByteBuffer partitionTable; - @Override - public void deserialize(ByteBuffer buffer) { - setCurrLogIndex(buffer.getLong()); - setCurrLogTerm(buffer.getLong()); + private Node removedNode; - removedNode = new Node(); - SerializeUtils.deserialize(removedNode, buffer); - } + public RemoveNodeLog(ByteBuffer partitionTable, + Node removedNode) { + this.partitionTable = partitionTable; + this.removedNode = removedNode; + } - public Node getRemovedNode() { - return removedNode; - } + public RemoveNodeLog() { + } - public void setRemovedNode(Node removedNode) { - this.removedNode = removedNode; - } + public ByteBuffer getPartitionTable() { + return partitionTable; + } + + public void setPartitionTable(ByteBuffer partitionTable) { + this.partitionTable = partitionTable; + } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - if (!super.equals(o)) { - return false; - } - RemoveNodeLog that = (RemoveNodeLog) o; - return Objects.equals(removedNode, that.removedNode); + @Override + public ByteBuffer serialize() { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) { + dataOutputStream.writeByte(Types.REMOVE_NODE.ordinal()); + dataOutputStream.writeLong(getCurrLogIndex()); + dataOutputStream.writeLong(getCurrLogTerm()); + + SerializeUtils.serialize(removedNode, dataOutputStream); + + dataOutputStream.write(partitionTable.array().length); + dataOutputStream.write(partitionTable.array()); + } catch (IOException e) { + // ignored } + return ByteBuffer.wrap(byteArrayOutputStream.toByteArray()); + } + + @Override + public void deserialize(ByteBuffer buffer) { + setCurrLogIndex(buffer.getLong()); + setCurrLogTerm(buffer.getLong()); + + removedNode = new Node(); + SerializeUtils.deserialize(removedNode, buffer); + + int len = buffer.getInt(); + partitionTable = ByteBuffer.wrap(buffer.array(), buffer.position(), len); + } + + public Node getRemovedNode() { + return removedNode; + } - @Override - public int hashCode() { - return Objects.hash(super.hashCode(), removedNode); + public void setRemovedNode(Node removedNode) { + this.removedNode = removedNode; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; } + RemoveNodeLog that = (RemoveNodeLog) o; + return Objects.equals(removedNode, that.removedNode) && Objects + .equals(partitionTable, that.partitionTable); + } + + @Override + public String toString() { + return "RemoveNodeLog{" + + "removedNode=" + removedNode + + '}'; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), removedNode, partitionTable); + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java index 79f3cd1..682da96 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java @@ -28,7 +28,10 @@ import java.util.Map; import java.util.Map.Entry; import org.apache.iotdb.cluster.exception.EntryCompactedException; +import org.apache.iotdb.cluster.log.Log; import org.apache.iotdb.cluster.log.LogApplier; +import org.apache.iotdb.cluster.log.logtypes.AddNodeLog; +import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog; import org.apache.iotdb.cluster.log.snapshot.FileSnapshot; import org.apache.iotdb.cluster.log.snapshot.FileSnapshot.Factory; import org.apache.iotdb.cluster.partition.PartitionTable; @@ -202,4 +205,13 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan } return true; } + + @Override + public long append(Log entry) { + long lastLogIndex = super.append(entry); + if (lastLogIndex != -1 && (entry instanceof AddNodeLog || entry instanceof RemoveNodeLog)) { + logApplier.apply(entry); + } + return lastLogIndex; + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java index ff650e3..1e86e11 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/MetaSingleSnapshotLogManager.java @@ -20,11 +20,15 @@ package org.apache.iotdb.cluster.log.manage; import java.io.IOException; +import java.util.List; import java.util.Map; +import org.apache.iotdb.cluster.log.Log; import org.apache.iotdb.cluster.log.LogApplier; import org.apache.iotdb.cluster.log.Snapshot; +import org.apache.iotdb.cluster.log.applier.MetaLogApplier; import org.apache.iotdb.cluster.log.manage.serializable.SyncLogDequeSerializer; import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot; +import org.apache.iotdb.cluster.server.NodeCharacter; import org.apache.iotdb.cluster.server.member.MetaGroupMember; import org.apache.iotdb.db.auth.AuthException; import org.apache.iotdb.db.auth.authorizer.BasicAuthorizer; @@ -81,4 +85,19 @@ public class MetaSingleSnapshotLogManager extends RaftLogManager { snapshot.setLastLogTerm(term); return snapshot; } + + @Override + void applyEntries(List<Log> entries) { + for (Log entry : entries) { + if (blockAppliedCommitIndex > 0 && entry.getCurrLogIndex() > blockAppliedCommitIndex) { + blockedUnappliedLogList.add(entry); + continue; + } + try { + ((MetaLogApplier)logApplier).apply(entry, metaGroupMember.getCharacter() == NodeCharacter.LEADER); + } catch (Exception e) { + entry.setException(e); + } + } + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java index bb8b231..bd65c26 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/RaftLogManager.java @@ -82,10 +82,10 @@ public abstract class RaftLogManager { * The committed log whose index is larger than blockAppliedCommitIndex will be blocked. if * blockAppliedCommitIndex < 0(default is -1), will not block any operation. */ - private volatile long blockAppliedCommitIndex; + protected volatile long blockAppliedCommitIndex; - private LogApplier logApplier; + protected LogApplier logApplier; /** * to distinguish managers of different members @@ -116,7 +116,7 @@ public abstract class RaftLogManager { */ private final Object logUpdateCondition = new Object(); - private List<Log> blockedUnappliedLogList; + protected List<Log> blockedUnappliedLogList; protected RaftLogManager(StableEntryManager stableEntryManager, LogApplier applier, String name) { this.logApplier = applier; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java index 4a79485..752e3e3 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java @@ -166,6 +166,9 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> { public Void call() { // If this node is the member of previous holder, it's unnecessary to pull data again if (descriptor.getPreviousHolders().contains(newMember.getThisNode())) { + for (Integer slot: descriptor.getSlots()) { + newMember.getSlotManager().setToNull(slot); + } // inform the previous holders that one member has successfully pulled snapshot directly newMember.registerPullSnapshotHint(descriptor); } else { @@ -176,7 +179,7 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> { request.setRequireReadOnly(descriptor.isRequireReadOnly()); boolean finished = false; - int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode()); + int nodeIndex = ((PartitionGroup) newMember.getAllNodes()).indexOf(newMember.getThisNode()) - 1; while (!finished) { try { // sequentially pick up a node that may have this slot diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java index 5493980..4193ffd 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/NodeRemovalResult.java @@ -19,8 +19,13 @@ package org.apache.iotdb.cluster.partition; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import org.apache.iotdb.cluster.rpc.thrift.Node; /** * NodeRemovalResult stores the removed partition group. @@ -61,4 +66,31 @@ public class NodeRemovalResult { } return null; } + + public void serialize(DataOutputStream dataOutputStream) throws IOException { + dataOutputStream.writeInt(removedGroupList.size()); + for (PartitionGroup group: removedGroupList) { + group.serialize(dataOutputStream); + } + dataOutputStream.writeInt(newGroupList.size()); + for (PartitionGroup group: newGroupList) { + group.serialize(dataOutputStream); + } + } + + public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) { + int removedGroupListSize = buffer.getInt(); + for (int i = 0 ; i < removedGroupListSize; i++) { + PartitionGroup group = new PartitionGroup(); + group.deserialize(buffer, idNodeMap); + removedGroupList.add(group); + } + + int newGroupListSize = buffer.getInt(); + for (int i = 0 ; i < newGroupListSize; i++) { + PartitionGroup group = new PartitionGroup(); + group.deserialize(buffer, idNodeMap); + newGroupList.add(group); + } + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java index 2a562ac..b35cc10 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionGroup.java @@ -19,9 +19,13 @@ package org.apache.iotdb.cluster.partition; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Map; import java.util.Objects; import org.apache.iotdb.cluster.rpc.thrift.Node; @@ -64,6 +68,23 @@ public class PartitionGroup extends ArrayList<Node> { super.equals(group); } + public void serialize(DataOutputStream dataOutputStream) + throws IOException { + dataOutputStream.writeInt(getId()); + dataOutputStream.writeInt(size()); + for (Node node : this) { + dataOutputStream.writeInt(node.getNodeIdentifier()); + } + } + + public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) { + id = buffer.getInt(); + int nodeNum = buffer.getInt(); + for (int i2 = 0; i2 < nodeNum; i2++) { + add(idNodeMap.get(buffer.getInt())); + } + } + @Override public int hashCode() { return Objects.hash(id, super.hashCode()); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java index 079aad1..6bf6c0c 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/PartitionTable.java @@ -63,14 +63,18 @@ public interface PartitionTable { * @param node * @return the new group generated by the node */ - NodeAdditionResult addNode(Node node); + void addNode(Node node); + + NodeAdditionResult getNodeAdditionResult(Node node); /** * Remove a node and update the partition table. * * @param node */ - NodeRemovalResult removeNode(Node node); + void removeNode(Node node); + + NodeRemovalResult getNodeRemovalResult(); /** * @return All data groups where all VNodes of this node is the header. The first index indicates @@ -88,12 +92,19 @@ public interface PartitionTable { ByteBuffer serialize(); - void deserialize(ByteBuffer buffer); + /** + * Deserialize partition table and check whether the partition table in byte buffer is valid + * @param buffer + * @return true if the partition table is valid + */ + boolean deserialize(ByteBuffer buffer); List<Node> getAllNodes(); List<PartitionGroup> getGlobalGroups(); + List<PartitionGroup> calculateGlobalGroups(List<Node> nodeRing); + /** * Judge whether the data of slot is held by node * @param node target node diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java index 17a0c93..a04a289 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotNodeRemovalResult.java @@ -19,9 +19,15 @@ package org.apache.iotdb.cluster.partition.slot; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.iotdb.cluster.partition.NodeRemovalResult; +import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.rpc.thrift.RaftNode; /** @@ -29,7 +35,7 @@ import org.apache.iotdb.cluster.rpc.thrift.RaftNode; */ public class SlotNodeRemovalResult extends NodeRemovalResult { - private Map<RaftNode, List<Integer>> newSlotOwners; + private Map<RaftNode, List<Integer>> newSlotOwners = new HashMap<>(); public Map<RaftNode, List<Integer>> getNewSlotOwners() { return newSlotOwners; @@ -38,4 +44,34 @@ public class SlotNodeRemovalResult extends NodeRemovalResult { public void addNewSlotOwners(Map<RaftNode, List<Integer>> newSlotOwners) { this.newSlotOwners = newSlotOwners; } + + @Override + public void serialize(DataOutputStream dataOutputStream) throws IOException { + super.serialize(dataOutputStream); + dataOutputStream.writeInt(newSlotOwners.size()); + for (Map.Entry<RaftNode, List<Integer>> entry: newSlotOwners.entrySet()) { + RaftNode raftNode = entry.getKey(); + dataOutputStream.writeInt(raftNode.getNode().nodeIdentifier); + dataOutputStream.writeInt(raftNode.getRaftId()); + dataOutputStream.writeInt(entry.getValue().size()); + for (Integer slot: entry.getValue()) { + dataOutputStream.writeInt(slot); + } + } + } + + @Override + public void deserialize(ByteBuffer buffer, Map<Integer, Node> idNodeMap) { + super.deserialize(buffer, idNodeMap); + int size = buffer.getInt(); + for (int i = 0 ; i < size; i++) { + RaftNode raftNode = new RaftNode(idNodeMap.get(buffer.getInt()), buffer.getInt()); + List<Integer> slots = new ArrayList<>(); + int slotSize = buffer.getInt(); + for (int j = 0 ; j < slotSize; j++) { + slots.add(buffer.getInt()); + } + newSlotOwners.put(raftNode, slots); + } + } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java index 2a5ae3c..f441e4a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java @@ -62,8 +62,11 @@ public class SlotPartitionTable implements PartitionTable { private RaftNode[] slotNodes = new RaftNode[ClusterConstant.SLOT_NUM]; // the nodes that each slot belongs to before a new node is added, used for the new node to // find the data source + // find the data source private Map<RaftNode, Map<Integer, PartitionGroup>> previousNodeMap = new ConcurrentHashMap<>(); + private NodeRemovalResult nodeRemovalResult = new NodeRemovalResult(); + //the filed is used for determining which nodes need to be a group. // the data groups which this node belongs to. private List<PartitionGroup> localGroups; @@ -231,11 +234,11 @@ public class SlotPartitionTable implements PartitionTable { } @Override - public NodeAdditionResult addNode(Node node) { + public void addNode(Node node) { List<Node> oldRing; synchronized (nodeRing) { if (nodeRing.contains(node)) { - return null; + return; } oldRing = new ArrayList<>(nodeRing); @@ -270,21 +273,34 @@ public class SlotPartitionTable implements PartitionTable { } } - SlotNodeAdditionResult result = new SlotNodeAdditionResult(); for (int raftId = 0; raftId < multiRaftFactor; raftId++) { PartitionGroup newGroup = getHeaderGroup(new RaftNode(node, raftId)); if (newGroup.contains(thisNode)) { localGroups.add(newGroup); } - result.addNewGroup(newGroup); } - calculateGlobalGroups(); + calculateGlobalGroups(nodeRing); // the slots movement is only done logically, the new node itself will pull data from the // old node - result.setLostSlots(moveSlotsToNew(node, oldRing)); + moveSlotsToNew(node, oldRing); + } + + @Override + public NodeAdditionResult getNodeAdditionResult(Node node) { + SlotNodeAdditionResult result = new SlotNodeAdditionResult(); + Map<RaftNode, Set<Integer>> lostSlotsMap = new HashMap<>(); + for (int raftId = 0; raftId < multiRaftFactor; raftId++) { + RaftNode raftNode = new RaftNode(node, raftId); + result.addNewGroup(getHeaderGroup(raftNode)); + for (Entry<Integer, PartitionGroup> entry: previousNodeMap.get(raftNode).entrySet()) { + RaftNode header = new RaftNode(entry.getValue().getHeader(), entry.getValue().getId()); + lostSlotsMap.computeIfAbsent(header, k -> new HashSet<>()).add(entry.getKey()); + } + } + result.setLostSlots(lostSlotsMap); return result; } @@ -294,10 +310,8 @@ public class SlotPartitionTable implements PartitionTable { * node. * * @param newNode - * @return a map recording what slots each group lost. */ - private Map<RaftNode, Set<Integer>> moveSlotsToNew(Node newNode, List<Node> oldRing) { - Map<RaftNode, Set<Integer>> result = new HashMap<>(); + private void moveSlotsToNew(Node newNode, List<Node> oldRing) { // as a node is added, the average slots for each node decrease // move the slots to the new node if any previous node have more slots than the new average int newAvg = totalSlotNumbers / nodeRing.size() / multiRaftFactor; @@ -324,7 +338,6 @@ public class SlotPartitionTable implements PartitionTable { previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing)); slotNodes[slot] = curNode; } - result.computeIfAbsent(entry.getKey(), n -> new HashSet<>()).addAll(slotsToMove); transferNum -= numToMove; if (transferNum > 0) { curNode = new RaftNode(newNode, ++raftId); @@ -335,11 +348,9 @@ public class SlotPartitionTable implements PartitionTable { previousNodeMap.get(curNode).put(slot, getHeaderGroup(entry.getKey(), oldRing)); slotNodes[slot] = curNode; } - result.get(entry.getKey()).addAll(slotsToMove); } } } - return result; } @Override @@ -354,6 +365,7 @@ public class SlotPartitionTable implements PartitionTable { DataOutputStream dataOutputStream = new DataOutputStream(outputStream); try { + dataOutputStream.writeLong(lastLogIndex); dataOutputStream.writeInt(totalSlotNumbers); dataOutputStream.writeInt(nodeSlotMap.size()); for (Entry<RaftNode, List<Integer>> entry : nodeSlotMap.entrySet()) { @@ -370,16 +382,11 @@ public class SlotPartitionTable implements PartitionTable { dataOutputStream.writeInt(prevHolders.size()); for (Entry<Integer, PartitionGroup> integerNodeEntry : prevHolders.entrySet()) { dataOutputStream.writeInt(integerNodeEntry.getKey()); - PartitionGroup group = integerNodeEntry.getValue(); - dataOutputStream.writeInt(group.getId()); - dataOutputStream.writeInt(group.size()); - for (Node node : group) { - dataOutputStream.writeInt(node.getNodeIdentifier()); - } + integerNodeEntry.getValue().serialize(dataOutputStream); } } - dataOutputStream.writeLong(lastLogIndex); + nodeRemovalResult.serialize(dataOutputStream); } catch (IOException ignored) { // not reachable } @@ -387,8 +394,14 @@ public class SlotPartitionTable implements PartitionTable { } @Override - public void deserialize(ByteBuffer buffer) { + public synchronized boolean deserialize(ByteBuffer buffer) { + long newLastLogIndex = buffer.getLong(); + // judge whether the partition table of byte buffer is out of date + if (lastLogIndex >= newLastLogIndex) { + return lastLogIndex <= newLastLogIndex; + } + lastLogIndex = newLastLogIndex; logger.info("Initializing the partition table from buffer"); totalSlotNumbers = buffer.getInt(); int size = buffer.getInt(); @@ -415,17 +428,15 @@ public class SlotPartitionTable implements PartitionTable { Map<Integer, PartitionGroup> prevHolders = new HashMap<>(); int holderNum = buffer.getInt(); for (int i1 = 0; i1 < holderNum; i1++) { - int slot = buffer.getInt(); - PartitionGroup group = new PartitionGroup(buffer.getInt()); - int nodeNum = buffer.getInt(); - for (int i2 = 0 ; i2 < nodeNum; i2++) { - group.add(idNodeMap.get(buffer.getInt())); - } - prevHolders.put(slot, group); + PartitionGroup group = new PartitionGroup(); + group.deserialize(buffer, idNodeMap); + prevHolders.put(buffer.getInt(), group); } previousNodeMap.put(node, prevHolders); } - lastLogIndex = buffer.getLong(); + + nodeRemovalResult = new NodeRemovalResult(); + nodeRemovalResult.deserialize(buffer, idNodeMap); for (RaftNode raftNode : nodeSlotMap.keySet()) { if (!nodeRing.contains(raftNode.getNode())) { @@ -436,6 +447,7 @@ public class SlotPartitionTable implements PartitionTable { logger.info("All known nodes: {}", nodeRing); localGroups = getPartitionGroups(thisNode); + return true; } @Override @@ -485,10 +497,10 @@ public class SlotPartitionTable implements PartitionTable { } @Override - public NodeRemovalResult removeNode(Node target) { + public void removeNode(Node target) { synchronized (nodeRing) { if (!nodeRing.contains(target)) { - return null; + return; } SlotNodeRemovalResult result = new SlotNodeRemovalResult(); @@ -532,16 +544,21 @@ public class SlotPartitionTable implements PartitionTable { result.addNewGroup(newGrp); } - calculateGlobalGroups(); + calculateGlobalGroups(nodeRing); // the slots movement is only done logically, the new node itself will pull data from the // old node Map<RaftNode, List<Integer>> raftNodeListMap = retrieveSlots(target); result.addNewSlotOwners(raftNodeListMap); - return result; + this.nodeRemovalResult = result; } } + @Override + public NodeRemovalResult getNodeRemovalResult() { + return nodeRemovalResult; + } + private Map<RaftNode, List<Integer>> retrieveSlots(Node target) { Map<RaftNode, List<Integer>> newHolderSlotMap = new HashMap<>(); for(int raftId = 0 ; raftId < multiRaftFactor; raftId++) { @@ -563,7 +580,7 @@ public class SlotPartitionTable implements PartitionTable { // preventing a thread from getting incomplete globalGroups synchronized (nodeRing) { if (globalGroups == null) { - calculateGlobalGroups(); + globalGroups = calculateGlobalGroups(nodeRing); } return globalGroups; } @@ -574,13 +591,15 @@ public class SlotPartitionTable implements PartitionTable { return getHeaderGroup(slotNodes[slot]).contains(node); } - private void calculateGlobalGroups() { - globalGroups = new ArrayList<>(); - for (Node node : getAllNodes()) { + @Override + public List<PartitionGroup> calculateGlobalGroups(List<Node> nodeRing) { + List<PartitionGroup> result = new ArrayList<>(); + for (Node node : nodeRing) { for (int i = 0; i < multiRaftFactor; i++) { - globalGroups.add(getHeaderGroup(new RaftNode(node, i))); + result.add(getHeaderGroup(new RaftNode(node, i))); } } + return result; } public synchronized long getLastLogIndex() { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java index f052294..0c8cf25 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanRouter.java @@ -21,12 +21,19 @@ package org.apache.iotdb.cluster.query; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.iotdb.cluster.exception.UnknownLogTypeException; import org.apache.iotdb.cluster.exception.UnsupportedPlanException; +import org.apache.iotdb.cluster.log.Log; +import org.apache.iotdb.cluster.log.LogParser; +import org.apache.iotdb.cluster.log.logtypes.AddNodeLog; +import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog; import org.apache.iotdb.cluster.partition.PartitionGroup; import org.apache.iotdb.cluster.partition.PartitionTable; +import org.apache.iotdb.cluster.rpc.thrift.Node; import org.apache.iotdb.cluster.utils.PartitionUtils; import org.apache.iotdb.db.engine.StorageEngine; import org.apache.iotdb.db.exception.metadata.IllegalPathException; @@ -41,6 +48,7 @@ import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.CountPlan; import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan; +import org.apache.iotdb.db.qp.physical.sys.LogPlan; import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan; import org.apache.iotdb.db.qp.physical.sys.ShowPlan.ShowContentType; import org.apache.iotdb.db.service.IoTDB; @@ -108,7 +116,7 @@ public class ClusterPlanRouter { } public Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(PhysicalPlan plan) - throws UnsupportedPlanException, MetadataException { + throws UnsupportedPlanException, MetadataException, UnknownLogTypeException { if (plan instanceof InsertTabletPlan) { return splitAndRoutePlan((InsertTabletPlan) plan); } else if (plan instanceof CountPlan) { @@ -121,6 +129,8 @@ public class ClusterPlanRouter { return splitAndRoutePlan((AlterTimeSeriesPlan) plan); } else if (plan instanceof CreateMultiTimeSeriesPlan) { return splitAndRoutePlan((CreateMultiTimeSeriesPlan) plan); + } else if (plan instanceof LogPlan) { + return splitAndRoutePlan((LogPlan)plan); } //the if clause can be removed after the program is stable if (PartitionUtils.isLocalNonQueryPlan(plan)) { @@ -134,6 +144,25 @@ public class ClusterPlanRouter { throw new UnsupportedPlanException(plan); } + private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(LogPlan plan) + throws UnknownLogTypeException, UnsupportedPlanException { + Map<PhysicalPlan, PartitionGroup> result = new HashMap<>(); + Log log = LogParser.getINSTANCE().parse(plan.getLog()); + List<Node> oldRing = new ArrayList<>(partitionTable.getAllNodes()); + if (log instanceof AddNodeLog) { + oldRing.remove(((AddNodeLog) log).getNewNode()); + } else if (log instanceof RemoveNodeLog) { + oldRing.add(((RemoveNodeLog) log).getRemovedNode()); + oldRing.sort(Comparator.comparingInt(Node::getNodeIdentifier)); + } else { + throw new UnsupportedPlanException(plan); + } + for (PartitionGroup partitionGroup: partitionTable.calculateGlobalGroups(oldRing)) { + result.put(plan, partitionGroup); + } + return result; + } + private Map<PhysicalPlan, PartitionGroup> splitAndRoutePlan(InsertRowPlan plan) throws MetadataException { PartitionGroup partitionGroup = partitionTable.partitionByPathTime(plan.getDeviceId(), diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java index b023c36..16c1da6 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java @@ -33,6 +33,8 @@ import org.apache.iotdb.cluster.exception.CheckConsistencyException; import org.apache.iotdb.cluster.exception.NoHeaderNodeException; import org.apache.iotdb.cluster.exception.NotInSameGroupException; import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException; +import org.apache.iotdb.cluster.log.logtypes.AddNodeLog; +import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog; import org.apache.iotdb.cluster.partition.NodeAdditionResult; import org.apache.iotdb.cluster.partition.NodeRemovalResult; import org.apache.iotdb.cluster.partition.PartitionGroup; @@ -490,6 +492,18 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async return "DataServerThread-"; } + public void preAddNodeForDataGroup(AddNodeLog log, DataGroupMember targetDataGroupMember) { + // Make sure the previous add/remove node log has applied + metaGroupMember.syncLeader(); + + // Check the validity of the partition table + if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) { + return; + } + + targetDataGroupMember.preAddNode(log.getNewNode()); + } + /** * Try adding the node into the group of each DataGroupMember, and if the DataGroupMember no * longer stays in that group, also remove and stop it. If the new group contains this node, also @@ -499,6 +513,10 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async * @param result */ public void addNode(Node node, NodeAdditionResult result) { + // If the node executed adding itself to the cluster, it's unnecessary to add new groups because they already exist. + if (node.equals(thisNode)) { + return; + } Iterator<Entry<RaftNode, DataGroupMember>> entryIterator = headerGroupMap.entrySet().iterator(); synchronized (headerGroupMap) { while (entryIterator.hasNext()) { @@ -581,6 +599,18 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async logger.info("Data group members are ready"); } + public void preRemoveNodeForDataGroup(RemoveNodeLog log, DataGroupMember targetDataGroupMember) { + // Make sure the previous add/remove node log has applied + metaGroupMember.syncLeader(); + + // Check the validity of the partition table + if (!metaGroupMember.getPartitionTable().deserialize(log.getPartitionTable())) { + return; + } + + targetDataGroupMember.preRemoveNode(log.getRemovedNode()); + } + /** * Try removing a node from the groups of each DataGroupMember. If the node is the header of some * group, set the member to read only so that it can still provide data for other nodes that has @@ -625,6 +655,7 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async /** * When the node joins a cluster, it also creates a new data group and a corresponding member + * When the node joins a cluster, it also creates a new data group and a corresponding member * which has no data. This is to make that member pull data from other nodes. */ public void pullSnapshots() { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java index e4a7304..d198039 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java @@ -248,8 +248,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async } @Override - public void exile(AsyncMethodCallback<Void> resultHandler) { - asyncService.exile(resultHandler); + public void exile(ByteBuffer removeNodeLog, AsyncMethodCallback<Void> resultHandler) { + asyncService.exile(removeNodeLog, resultHandler); } @Override @@ -274,8 +274,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async } @Override - public void exile() { - syncService.exile(); + public void exile(ByteBuffer removeNodeLog) { + syncService.exile(removeNodeLog); } @Override diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index 4737520..077d61d 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@ -261,33 +261,10 @@ public class DataGroupMember extends RaftMember { } } - /** - * Try to add a Node into the group to which the member belongs. - * - * @param node - * @return true if this node should leave the group because of the addition of the node, false - * otherwise - */ - public synchronized boolean addNode(Node node, NodeAdditionResult result) { - // when a new node is added, start an election instantly to avoid the stale leader still - // taking the leadership, which guarantees the valid leader will not have the stale - // partition table - synchronized (term) { - term.incrementAndGet(); - setLeader(ClusterConstant.EMPTY_NODE); - setVoteFor(thisNode); - updateHardState(term.get(), getVoteFor()); - setLastHeartbeatReceivedTime(System.currentTimeMillis()); - setCharacter(NodeCharacter.ELECTOR); - } - - // mark slots that do not belong to this group any more - Set<Integer> lostSlots = ((SlotNodeAdditionResult) result).getLostSlots() - .getOrDefault(new RaftNode(getHeader(), getRaftGroupId()), Collections.emptySet()); - for (Integer lostSlot : lostSlots) { - slotManager.setToSending(lostSlot); + public void preAddNode(Node node) { + if (allNodes.contains(node)) { + return; } - synchronized (allNodes) { int insertIndex = -1; // find the position to insert the new node, the nodes are ordered by their identifiers @@ -307,12 +284,41 @@ public class DataGroupMember extends RaftMember { if (insertIndex > 0) { allNodes.add(insertIndex, node); peerMap.putIfAbsent(node, new Peer(logManager.getLastLogIndex())); - // remove the last node because the group size is fixed to replication number - Node removedNode = allNodes.remove(allNodes.size() - 1); - peerMap.remove(removedNode); // if the local node is the last node and the insertion succeeds, this node should leave // the group logger.debug("{}: Node {} is inserted into the data group {}", name, node, allNodes); + } + } + } + + /** + * Try to add a Node into the group to which the member belongs. + * + * @param node + * @return true if this node should leave the group because of the addition of the node, false + * otherwise + */ + public boolean addNode(Node node, NodeAdditionResult result) { + + // mark slots that do not belong to this group any more + Set<Integer> lostSlots = ((SlotNodeAdditionResult) result).getLostSlots() + .getOrDefault(new RaftNode(getHeader(), getRaftGroupId()), Collections.emptySet()); + for (Integer lostSlot : lostSlots) { + slotManager.setToSending(lostSlot); + } + + synchronized (allNodes) { + if (allNodes.contains(node) && allNodes.size() > config.getReplicationNum()) { + // remove the last node because the group size is fixed to replication number + Node removedNode = allNodes.remove(allNodes.size() - 1); + peerMap.remove(removedNode); + if (removedNode.equals(leader.get())) { + // if the leader is removed, also start an election immediately + synchronized (term) { + setCharacter(NodeCharacter.ELECTOR); + setLastHeartbeatReceivedTime(Long.MIN_VALUE); + } + } return removedNode.equals(thisNode); } return false; @@ -737,6 +743,18 @@ public class DataGroupMember extends RaftMember { } } + public void preRemoveNode(Node removedNode) { + synchronized (allNodes) { + if (allNodes.contains(removedNode)) { + // update the group if the deleted node was in it + PartitionGroup newGroup = metaGroupMember.getPartitionTable().getHeaderGroup(new RaftNode(getHeader(), getRaftGroupId())); + Node newNodeToGroup = newGroup.get(newGroup.size() - 1); + allNodes.add(newNodeToGroup); + peerMap.putIfAbsent(newNodeToGroup, new Peer(logManager.getLastLogIndex())); + } + } + } + /** * When a node is removed and IT IS NOT THE HEADER of the group, the member should take over some * slots from the removed group, and add a new node to the group the removed node was in the @@ -747,8 +765,8 @@ public class DataGroupMember extends RaftMember { synchronized (allNodes) { if (allNodes.contains(removedNode)) { // update the group if the deleted node was in it - allNodes = metaGroupMember.getPartitionTable().getHeaderGroup(new RaftNode(getHeader(), getRaftGroupId())); - initPeerMap(); + allNodes.remove(removedNode); + peerMap.remove(removedNode); if (removedNode.equals(leader.get())) { // if the leader is removed, also start an election immediately synchronized (term) { @@ -837,7 +855,7 @@ public class DataGroupMember extends RaftMember { continue; } int sentReplicaNum = slotManager.sentOneReplication(slot); - if (sentReplicaNum >= ClusterDescriptor.getInstance().getConfig().getReplicationNum()) { + if (sentReplicaNum >= config.getReplicationNum()) { removableSlots.add(slot); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index 7e73f61..22520e7 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -73,6 +73,7 @@ import org.apache.iotdb.cluster.exception.LogExecutionException; import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException; import org.apache.iotdb.cluster.exception.SnapshotInstallationException; import org.apache.iotdb.cluster.exception.StartUpCheckFailureException; +import org.apache.iotdb.cluster.exception.UnknownLogTypeException; import org.apache.iotdb.cluster.exception.UnsupportedPlanException; import org.apache.iotdb.cluster.log.Log; import org.apache.iotdb.cluster.log.LogApplier; @@ -186,11 +187,6 @@ public class MetaGroupMember extends RaftMember { * members in this node */ private static final int REPORT_INTERVAL_SEC = 10; - /** - * how many times is a data record replicated, also the number of nodes in a data group - */ - private static final int REPLICATION_NUM = - ClusterDescriptor.getInstance().getConfig().getReplicationNum(); /** * during snapshot, hardlinks of data files are created to for downloading. hardlinks will be @@ -421,19 +417,23 @@ public class MetaGroupMember extends RaftMember { * Apply the addition of a new node. Register its identifier, add it to the node list and * partition table, serialize the partition table and update the DataGroupMembers. */ - public void applyAddNode(Node newNode) { + public void applyAddNode(AddNodeLog addNodeLog) { + + Node newNode = addNodeLog.getNewNode(); synchronized (allNodes) { - if (!allNodes.contains(newNode)) { + if (partitionTable.deserialize(addNodeLog.getPartitionTable())) { logger.debug("Adding a new node {} into {}", newNode, allNodes); - registerNodeIdentifier(newNode, newNode.getNodeIdentifier()); - allNodes.add(newNode); + + if (!allNodes.contains(newNode)) { + registerNodeIdentifier(newNode, newNode.getNodeIdentifier()); + allNodes.add(newNode); + } // update the partition table - NodeAdditionResult result = partitionTable.addNode(newNode); - ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex()); savePartitionTable(); // update local data members + NodeAdditionResult result = partitionTable.getNodeAdditionResult(newNode); getDataClusterServer().addNode(newNode, result); } } @@ -856,7 +856,12 @@ public class MetaGroupMember extends RaftMember { // node adding is serialized to reduce potential concurrency problem synchronized (logManager) { + // update partition table + partitionTable.addNode(node); + ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1); + AddNodeLog addNodeLog = new AddNodeLog(); + addNodeLog.setPartitionTable(partitionTable.serialize()); addNodeLog.setCurrLogTerm(getTerm().get()); addNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1); @@ -868,11 +873,11 @@ public class MetaGroupMember extends RaftMember { while (true) { logger .info("Send the join request of {} to other nodes, retry time: {}", node, retryTime); - AppendLogResult result = sendLogToAllGroups(addNodeLog); + AppendLogResult result = sendLogToFollowers(addNodeLog); switch (result) { case OK: - logger.info("Join request of {} is accepted", node); commitLog(addNodeLog); + logger.info("Join request of {} is accepted", node); synchronized (partitionTable) { response.setPartitionTableBytes(partitionTable.serialize()); @@ -902,9 +907,9 @@ public class MetaGroupMember extends RaftMember { long localPartitionInterval = IoTDBDescriptor.getInstance().getConfig() .getPartitionInterval(); int localHashSalt = ClusterConstant.HASH_SALT; - int localReplicationNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum(); - String localClusterName = ClusterDescriptor.getInstance().getConfig().getClusterName(); - int localMultiRaftFactor = ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor(); + int localReplicationNum = config.getReplicationNum(); + String localClusterName = config.getClusterName(); + int localMultiRaftFactor = config.getMultiRaftFactor(); boolean partitionIntervalEquals = true; boolean multiRaftFactorEquals = true; boolean hashSaltEquals = true; @@ -1027,7 +1032,7 @@ public class MetaGroupMember extends RaftMember { } private CheckStatusResponse checkStatus(Node seedNode) { - if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { + if (config.isUseAsyncServer()) { AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(seedNode); if (client == null) { return null; @@ -1061,29 +1066,29 @@ public class MetaGroupMember extends RaftMember { * Send the log the all data groups and return a success only when each group's quorum has * accepted this log. */ - private AppendLogResult sendLogToAllGroups(Log log) { - List<Node> nodeRing = partitionTable.getAllNodes(); - - AtomicLong newLeaderTerm = new AtomicLong(term.get()); - AtomicBoolean leaderShipStale = new AtomicBoolean(false); - AppendEntryRequest request = buildAppendEntryRequest(log, true); - - // ask for votes from each node - int[] groupRemainings = askGroupVotes(nodeRing, request, leaderShipStale, log, newLeaderTerm); - - if (!leaderShipStale.get()) { - // if all quorums of all groups have received this log, it is considered succeeded. - for (int remaining : groupRemainings) { - if (remaining > 0) { - return AppendLogResult.TIME_OUT; - } - } - } else { - return AppendLogResult.LEADERSHIP_STALE; - } - - return AppendLogResult.OK; - } +// private AppendLogResult sendLogToAllGroups(Log log) { +// List<Node> nodeRing = partitionTable.getAllNodes(); +// +// AtomicLong newLeaderTerm = new AtomicLong(term.get()); +// AtomicBoolean leaderShipStale = new AtomicBoolean(false); +// AppendEntryRequest request = buildAppendEntryRequest(log, true); +// +// // ask for votes from each node +// int[] groupRemainings = askGroupVotes(nodeRing, request, leaderShipStale, log, newLeaderTerm); +// +// if (!leaderShipStale.get()) { +// // if all quorums of all groups have received this log, it is considered succeeded. +// for (int remaining : groupRemainings) { +// if (remaining > 0) { +// return AppendLogResult.TIME_OUT; +// } +// } +// } else { +// return AppendLogResult.LEADERSHIP_STALE; +// } +// +// return AppendLogResult.OK; +// } /** * Send "request" to each node in "nodeRing" and when a node returns a success, decrease all @@ -1094,54 +1099,54 @@ public class MetaGroupMember extends RaftMember { @SuppressWarnings({"java:S2445", "java:S2274"}) // groupRemaining is shared with the handlers, // and we do not wait infinitely to enable timeouts - private int[] askGroupVotes(List<Node> nodeRing, - AppendEntryRequest request, AtomicBoolean leaderShipStale, Log log, - AtomicLong newLeaderTerm) { - // each node will be the header of a group, we use the node to represent the group - int nodeSize = nodeRing.size(); - // the decreasing counters of how many nodes in a group has received the log, each time a - // node receive the log, the counters of all groups it is in will decrease by 1 - int[] groupRemainings = new int[nodeSize]; - // a group is considered successfully received the log if such members receive the log - int groupQuorum = REPLICATION_NUM / 2 + 1; - Arrays.fill(groupRemainings, groupQuorum); - - synchronized (groupRemainings) { - // ask a vote from every node - for (int i = 0; i < nodeSize; i++) { - Node node = nodeRing.get(i); - if (node.equals(thisNode)) { - // this node automatically gives an agreement, decrease counters of all groups the local - // node is in - for (int j = 0; j < REPLICATION_NUM; j++) { - int groupIndex = i - j; - if (groupIndex < 0) { - groupIndex += groupRemainings.length; - } - groupRemainings[groupIndex]--; - } - } else { - askRemoteGroupVote(node, groupRemainings, i, leaderShipStale, log, newLeaderTerm, - request); - } - } - - try { - groupRemainings.wait(RaftServer.getWriteOperationTimeoutMS()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error("Unexpected interruption when waiting for the group votes", e); - } - } - return groupRemainings; - } +// private int[] askGroupVotes(List<Node> nodeRing, +// AppendEntryRequest request, AtomicBoolean leaderShipStale, Log log, +// AtomicLong newLeaderTerm) { +// // each node will be the header of a group, we use the node to represent the group +// int nodeSize = nodeRing.size(); +// // the decreasing counters of how many nodes in a group has received the log, each time a +// // node receive the log, the counters of all groups it is in will decrease by 1 +// int[] groupRemainings = new int[nodeSize]; +// // a group is considered successfully received the log if such members receive the log +// int groupQuorum = REPLICATION_NUM / 2 + 1; +// Arrays.fill(groupRemainings, groupQuorum); +// +// synchronized (groupRemainings) { +// // ask a vote from every node +// for (int i = 0; i < nodeSize; i++) { +// Node node = nodeRing.get(i); +// if (node.equals(thisNode)) { +// // this node automatically gives an agreement, decrease counters of all groups the local +// // node is in +// for (int j = 0; j < REPLICATION_NUM; j++) { +// int groupIndex = i - j; +// if (groupIndex < 0) { +// groupIndex += groupRemainings.length; +// } +// groupRemainings[groupIndex]--; +// } +// } else { +// askRemoteGroupVote(node, groupRemainings, i, leaderShipStale, log, newLeaderTerm, +// request); +// } +// } +// +// try { +// groupRemainings.wait(RaftServer.getWriteOperationTimeoutMS()); +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// logger.error("Unexpected interruption when waiting for the group votes", e); +// } +// } +// return groupRemainings; +// } private void askRemoteGroupVote(Node node, int[] groupRemainings, int nodeIndex, AtomicBoolean leaderShipStale, Log log, AtomicLong newLeaderTerm, AppendEntryRequest request) { AppendGroupEntryHandler handler = new AppendGroupEntryHandler(groupRemainings, nodeIndex, node, leaderShipStale, log, newLeaderTerm, this); - if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { + if (config.isUseAsyncServer()) { AsyncMetaClient client = (AsyncMetaClient) getAsyncClient(node); try { if (client != null) { @@ -1469,7 +1474,7 @@ public class MetaGroupMember extends RaftMember { if (planGroupMap == null || planGroupMap.isEmpty()) { if ((plan instanceof InsertPlan || plan instanceof CreateTimeSeriesPlan || plan instanceof CreateMultiTimeSeriesPlan) - && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) { + && config.isEnableAutoCreateSchema()) { logger.debug("{}: No associated storage group found for {}, auto-creating", name, plan); try { ((CMManager) IoTDB.metaManager).createSchema(plan); @@ -1499,10 +1504,10 @@ public class MetaGroupMember extends RaftMember { syncLeaderWithConsistencyCheck(true); try { planGroupMap = router.splitAndRoutePlan(plan); - } catch (MetadataException ex) { + } catch (MetadataException | UnknownLogTypeException ex) { // ignore } - } catch (MetadataException e) { + } catch (MetadataException | UnknownLogTypeException e) { logger.error("Cannot route plan {}", plan, e); } return planGroupMap; @@ -1534,7 +1539,7 @@ public class MetaGroupMember extends RaftMember { } if (plan instanceof InsertPlan && status.getCode() == TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode() - && ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema()) { + && config.isEnableAutoCreateSchema()) { TSStatus tmpStatus = createTimeseriesForFailedInsertion(planGroupMap, ((InsertPlan) plan)); if (tmpStatus != null) { status = tmpStatus; @@ -1773,7 +1778,7 @@ public class MetaGroupMember extends RaftMember { try { // only data plans are partitioned, so it must be processed by its data server instead of // meta server - if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { + if (config.isUseAsyncServer()) { status = forwardDataPlanAsync(plan, node, group.getHeader()); } else { status = forwardDataPlanSync(plan, node, group.getHeader()); @@ -1880,7 +1885,7 @@ public class MetaGroupMember extends RaftMember { } try { - if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { + if (config.isUseAsyncServer()) { getNodeStatusAsync(nodeStatus); } else { getNodeStatusSync(nodeStatus); @@ -1974,7 +1979,7 @@ public class MetaGroupMember extends RaftMember { } // if we cannot have enough replica after the removal, reject it - if (allNodes.size() <= ClusterDescriptor.getInstance().getConfig().getReplicationNum()) { + if (allNodes.size() <= config.getReplicationNum()) { return Response.RESPONSE_CLUSTER_TOO_SMALL; } @@ -1996,7 +2001,12 @@ public class MetaGroupMember extends RaftMember { // node removal must be serialized to reduce potential concurrency problem synchronized (logManager) { + // update partition table + partitionTable.addNode(node); + ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex() + 1); + RemoveNodeLog removeNodeLog = new RemoveNodeLog(); + removeNodeLog.setPartitionTable(partitionTable.serialize()); removeNodeLog.setCurrLogTerm(getTerm().get()); removeNodeLog.setCurrLogIndex(logManager.getLastLogIndex() + 1); @@ -2008,12 +2018,11 @@ public class MetaGroupMember extends RaftMember { while (true) { logger.info("Send the node removal request of {} to other nodes, retry time: {}", target, retryTime); - AppendLogResult result = sendLogToAllGroups(removeNodeLog); - + AppendLogResult result = sendLogToFollowers(removeNodeLog); switch (result) { case OK: - logger.info("Removal request of {} is accepted", target); commitLog(removeNodeLog); + logger.info("Removal request of {} is accepted", target); return Response.RESPONSE_AGREE; case TIME_OUT: logger.info("Removal request of {} timed out", target); @@ -2033,22 +2042,28 @@ public class MetaGroupMember extends RaftMember { * and catch-up service of data are kept alive for other nodes to pull data. If the removed node * is a leader, send an exile to the removed node so that it can know it is removed. * - * @param oldNode the node to be removed */ - public void applyRemoveNode(Node oldNode) { + public void applyRemoveNode(RemoveNodeLog removeNodeLog) { + + Node oldNode = removeNodeLog.getRemovedNode(); synchronized (allNodes) { - if (allNodes.contains(oldNode)) { + if (partitionTable.deserialize(removeNodeLog.getPartitionTable())) { logger.debug("Removing a node {} from {}", oldNode, allNodes); - allNodes.remove(oldNode); - idNodeMap.remove(oldNode.nodeIdentifier); - // update the partition table - NodeRemovalResult result = partitionTable.removeNode(oldNode); - ((SlotPartitionTable) partitionTable).setLastLogIndex(logManager.getLastLogIndex()); + if (allNodes.contains(oldNode)) { + allNodes.remove(oldNode); + idNodeMap.remove(oldNode.nodeIdentifier); + + } + + // save the updated partition table + savePartitionTable(); // update DataGroupMembers, as the node is removed, the members of some groups are // changed and there will also be one less group + NodeRemovalResult result = partitionTable.getNodeRemovalResult(); getDataClusterServer().removeNode(oldNode, result); + // the leader is removed, start the next election ASAP if (oldNode.equals(leader.get())) { setCharacter(NodeCharacter.ELECTOR); @@ -2065,21 +2080,19 @@ public class MetaGroupMember extends RaftMember { } else if (thisNode.equals(leader.get())) { // as the old node is removed, it cannot know this by heartbeat or log, so it should be // directly kicked out of the cluster - exileNode(oldNode); + exileNode(removeNodeLog); } - - // save the updated partition table - savePartitionTable(); } } } - private void exileNode(Node node) { - if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) { + private void exileNode(RemoveNodeLog removeNodeLog) { + Node node = removeNodeLog.getRemovedNode(); + if (config.isUseAsyncServer()) { AsyncMetaClient asyncMetaClient = (AsyncMetaClient) getAsyncClient(node); try { if (asyncMetaClient != null) { - asyncMetaClient.exile(new GenericHandler<>(node, null)); + asyncMetaClient.exile(removeNodeLog.serialize(), new GenericHandler<>(node, null)); } } catch (TException e) { logger.warn("Cannot inform {} its removal", node, e); @@ -2090,7 +2103,7 @@ public class MetaGroupMember extends RaftMember { return; } try { - client.exile(); + client.exile(removeNodeLog.serialize()); } catch (TException e) { client.getInputProtocol().getTransport().close(); logger.warn("Cannot inform {} its removal", node, e); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java index 0526285..57f22bc 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java @@ -93,6 +93,7 @@ import org.apache.iotdb.db.exception.metadata.PathNotExistException; import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.qp.physical.PhysicalPlan; +import org.apache.iotdb.db.qp.physical.sys.LogPlan; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -899,13 +900,23 @@ public abstract class RaftMember { return StatusUtils.NODE_READ_ONLY; } long startTime = Timer.Statistic.RAFT_SENDER_APPEND_LOG.getOperationStartTime(); - PhysicalPlanLog log = new PhysicalPlanLog(); + Log log; + if (plan instanceof LogPlan) { + try { + log = LogParser.getINSTANCE().parse(((LogPlan) plan).getLog()); + } catch (UnknownLogTypeException e) { + logger.error("Can not parse LogPlan {}", plan, e); + return StatusUtils.PARSE_LOG_ERROR; + } + } else { + log = new PhysicalPlanLog(); + ((PhysicalPlanLog)log).setPlan(plan); + } // assign term and index to the new log and append it synchronized (logManager) { log.setCurrLogTerm(getTerm().get()); log.setCurrLogIndex(logManager.getLastLogIndex() + 1); - log.setPlan(plan); plan.setIndex(log.getCurrLogIndex()); logManager.append(log); } @@ -1404,7 +1415,7 @@ public abstract class RaftMember { } private TSStatus handleLogExecutionException( - PhysicalPlanLog log, LogExecutionException e) { + Log log, LogExecutionException e) { Throwable cause = IOUtils.getRootCause(e); if (cause instanceof BatchProcessException) { return RpcUtils @@ -1536,7 +1547,7 @@ public abstract class RaftMember { logger.debug("Has lose leadership, so need not to send log"); return false; } - AppendLogResult result = sendLogToFollowers(log, allNodes.size() / 2); + AppendLogResult result = sendLogToFollowers(log); Timer.Statistic.RAFT_SENDER_SEND_LOG_TO_FOLLOWERS.calOperationCostTimeFromStart(startTime); switch (result) { case OK: @@ -1568,10 +1579,11 @@ public abstract class RaftMember { * 0, half of the cluster size will be used. * @return an AppendLogResult */ - private AppendLogResult sendLogToFollowers(Log log, int requiredQuorum) { + protected AppendLogResult sendLogToFollowers(Log log) { + int requiredQuorum = allNodes.size() / 2; if (requiredQuorum <= 0) { // use half of the members' size as the quorum - return sendLogToFollowers(log, new AtomicInteger(allNodes.size() / 2)); + return sendLogToFollowers(log, new AtomicInteger(requiredQuorum)); } else { // make sure quorum does not exceed the number of members - 1 return sendLogToFollowers(log, new AtomicInteger(Math.min(requiredQuorum, diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java index 4ca6eb0..3b2df98 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaAsyncService.java @@ -19,10 +19,12 @@ package org.apache.iotdb.cluster.server.service; +import java.nio.ByteBuffer; import org.apache.iotdb.cluster.exception.AddSelfException; import org.apache.iotdb.cluster.exception.LeaderUnknownException; import org.apache.iotdb.cluster.exception.LogExecutionException; import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException; +import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog; import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse; import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse; @@ -195,8 +197,10 @@ public class MetaAsyncService extends BaseAsyncService implements TSMetaService. * @param resultHandler */ @Override - public void exile(AsyncMethodCallback<Void> resultHandler) { - metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode()); + public void exile(ByteBuffer removeNodeLogBuffer, AsyncMethodCallback<Void> resultHandler) { + RemoveNodeLog removeNodeLog = new RemoveNodeLog(); + removeNodeLog.deserialize(removeNodeLogBuffer); + metaGroupMember.applyRemoveNode(removeNodeLog); resultHandler.onComplete(null); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java index 3b5f445..48c0e58 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/MetaSyncService.java @@ -19,11 +19,13 @@ package org.apache.iotdb.cluster.server.service; +import java.nio.ByteBuffer; import org.apache.iotdb.cluster.client.sync.SyncMetaClient; import org.apache.iotdb.cluster.exception.AddSelfException; import org.apache.iotdb.cluster.exception.LeaderUnknownException; import org.apache.iotdb.cluster.exception.LogExecutionException; import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException; +import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog; import org.apache.iotdb.cluster.rpc.thrift.AddNodeResponse; import org.apache.iotdb.cluster.rpc.thrift.AppendEntryRequest; import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse; @@ -188,7 +190,9 @@ public class MetaSyncService extends BaseSyncService implements TSMetaService.If * must tell it directly. */ @Override - public void exile() { - metaGroupMember.applyRemoveNode(metaGroupMember.getThisNode()); + public void exile(ByteBuffer removeNodeLogBuffer) { + RemoveNodeLog removeNodeLog = new RemoveNodeLog(); + removeNodeLog.deserialize(removeNodeLogBuffer); + metaGroupMember.applyRemoveNode(removeNodeLog); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java index 4d1205f..5a3168a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java @@ -45,6 +45,7 @@ public class StatusUtils { public static final TSStatus CONSISTENCY_FAILURE = getStatus(TSStatusCode.CONSISTENCY_FAILURE); public static final TSStatus TIMESERIES_NOT_EXIST_ERROR = getStatus(TSStatusCode.TIMESERIES_NOT_EXIST); public static final TSStatus NO_CONNECTION = getStatus(TSStatusCode.NO_CONNECTION); + public static final TSStatus PARSE_LOG_ERROR = getStatus(TSStatusCode.PARSE_LOG_ERROR); private static TSStatus getStatus(TSStatusCode statusCode) { @@ -197,6 +198,9 @@ public class StatusUtils { case NO_CONNECTION: status.setMessage("Node cannot be reached."); break; + case PARSE_LOG_ERROR: + status.setMessage("Parse log error."); + break; default: status.setMessage(""); break; diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java index 771112b..1f1f3ba 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestUtils.java @@ -21,6 +21,7 @@ package org.apache.iotdb.cluster.common; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -65,6 +66,8 @@ public class TestUtils { public static long TEST_TIME_OUT_MS = 200; + public static ByteBuffer seralizePartitionTable = new SlotPartitionTable(getNode(0)).serialize(); + private TestUtils() { // util class } @@ -83,6 +86,7 @@ public class TestUtils { for (int i = 0; i < logNum; i++) { AddNodeLog log = new AddNodeLog(); log.setNewNode(getNode(i)); + log.setPartitionTable(seralizePartitionTable); log.setCurrLogIndex(i); log.setCurrLogTerm(i); logList.add(log); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java index 66a9615..76efe5f 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/LogParserTest.java @@ -42,6 +42,7 @@ public class LogParserTest { public void testAddNodeLog() throws UnknownLogTypeException { AddNodeLog log = new AddNodeLog(); log.setNewNode(TestUtils.getNode(5)); + log.setPartitionTable(TestUtils.seralizePartitionTable); log.setCurrLogIndex(8); log.setCurrLogTerm(8); @@ -78,6 +79,7 @@ public class LogParserTest { @Test public void testRemoveNodeLog() throws UnknownLogTypeException { RemoveNodeLog log = new RemoveNodeLog(); + log.setPartitionTable(TestUtils.seralizePartitionTable); log.setRemovedNode(TestUtils.getNode(0)); log.setCurrLogIndex(8); log.setCurrLogTerm(8); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java index 95c8fe4..fd1a87b 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/MetaLogApplierTest.java @@ -24,11 +24,13 @@ import static junit.framework.TestCase.assertFalse; import static junit.framework.TestCase.assertTrue; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashSet; import java.util.Set; import org.apache.iotdb.cluster.common.IoTDBTest; import org.apache.iotdb.cluster.common.TestMetaGroupMember; +import org.apache.iotdb.cluster.common.TestUtils; import org.apache.iotdb.cluster.log.LogApplier; import org.apache.iotdb.cluster.log.logtypes.AddNodeLog; import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog; @@ -54,13 +56,13 @@ public class MetaLogApplierTest extends IoTDBTest { private TestMetaGroupMember testMetaGroupMember = new TestMetaGroupMember() { @Override - public void applyAddNode(Node newNode) { - nodes.add(newNode); + public void applyAddNode(AddNodeLog addNodeLog) { + nodes.add(addNodeLog.getNewNode()); } @Override - public void applyRemoveNode(Node oldNode) { - nodes.remove(oldNode); + public void applyRemoveNode(RemoveNodeLog removeNodeLog) { + nodes.remove(removeNodeLog.getRemovedNode()); } }; @@ -82,6 +84,7 @@ public class MetaLogApplierTest extends IoTDBTest { Node node = new Node("localhost", 1111, 0, 2222, 55560); AddNodeLog log = new AddNodeLog(); log.setNewNode(node); + log.setPartitionTable(TestUtils.seralizePartitionTable); applier.apply(log); assertTrue(nodes.contains(node)); @@ -94,6 +97,7 @@ public class MetaLogApplierTest extends IoTDBTest { Node node = testMetaGroupMember.getThisNode(); RemoveNodeLog log = new RemoveNodeLog(); + log.setPartitionTable(TestUtils.seralizePartitionTable); log.setRemovedNode(node); applier.apply(log); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java index 09b42e4..d6fec31 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/logtypes/SerializeLogTest.java @@ -87,6 +87,7 @@ public class SerializeLogTest { @Test public void testAddNodeLog() throws UnknownLogTypeException { AddNodeLog log = new AddNodeLog(); + log.setPartitionTable(TestUtils.seralizePartitionTable); log.setCurrLogIndex(2); log.setCurrLogTerm(2); log.setNewNode(new Node("apache.iotdb.com", 1234, 1, 4321, 55560)); @@ -110,6 +111,7 @@ public class SerializeLogTest { @Test public void testRemoveNodeLog() throws UnknownLogTypeException { RemoveNodeLog log = new RemoveNodeLog(); + log.setPartitionTable(TestUtils.seralizePartitionTable); log.setCurrLogIndex(2); log.setCurrLogTerm(2); log.setRemovedNode(TestUtils.getNode(0)); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java index b00e755..30315dc 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/partition/SlotPartitionTableTest.java @@ -499,7 +499,8 @@ public class SlotPartitionTableTest { @Test public void testRemoveNode() { List<Integer> nodeSlots = localTable.getNodeSlots(getNode(0), raftId); - NodeRemovalResult nodeRemovalResult = localTable.removeNode(getNode(0)); + localTable.removeNode(getNode(0)); + NodeRemovalResult nodeRemovalResult = localTable.getNodeRemovalResult(); assertFalse(localTable.getAllNodes().contains(getNode(0))); PartitionGroup removedGroup = nodeRemovalResult.getRemovedGroup(0); for (int i = 0; i < 5; i++) { diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java index f6bb254..2275a63 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java @@ -63,12 +63,22 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest { } @Override - public NodeAdditionResult addNode(Node node) { + public void addNode(Node node) { + return; + } + + @Override + public NodeAdditionResult getNodeAdditionResult(Node node) { return null; } @Override - public NodeRemovalResult removeNode(Node node) { + public void removeNode(Node node) { + return; + } + + @Override + public NodeRemovalResult getNodeRemovalResult() { return null; } @@ -93,8 +103,8 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest { } @Override - public void deserialize(ByteBuffer buffer) { - + public boolean deserialize(ByteBuffer buffer) { + return true; } @Override @@ -108,8 +118,13 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest { } @Override + public List<PartitionGroup> calculateGlobalGroups(List<Node> nodeRing) { + return null; + } + + @Override public boolean judgeHoldSlot(Node node, int slot) { - return true; + return false; } }; diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java index c920f06..ca4fd92 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java @@ -898,8 +898,8 @@ public class DataGroupMemberTest extends MemberTest { public void testRemoveLeader() { System.out.println("Start testRemoveLeader()"); Node nodeToRemove = TestUtils.getNode(10); - SlotNodeRemovalResult nodeRemovalResult = (SlotNodeRemovalResult) testMetaMember.getPartitionTable() - .removeNode(nodeToRemove); + testMetaMember.getPartitionTable().removeNode(nodeToRemove); + SlotNodeRemovalResult nodeRemovalResult = (SlotNodeRemovalResult) testMetaMember.getPartitionTable().getNodeRemovalResult(); dataGroupMember.setLeader(nodeToRemove); dataGroupMember.start(); @@ -926,8 +926,9 @@ public class DataGroupMemberTest extends MemberTest { public void testRemoveNonLeader() { System.out.println("Start testRemoveNonLeader()"); Node nodeToRemove = TestUtils.getNode(10); - NodeRemovalResult nodeRemovalResult = testMetaMember.getPartitionTable() + testMetaMember.getPartitionTable() .removeNode(nodeToRemove); + NodeRemovalResult nodeRemovalResult = testMetaMember.getPartitionTable().getNodeRemovalResult(); dataGroupMember.setLeader(TestUtils.getNode(20)); dataGroupMember.start(); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java index 1badcd2..a1e563e 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java @@ -60,7 +60,9 @@ import org.apache.iotdb.cluster.exception.EmptyIntervalException; import org.apache.iotdb.cluster.exception.LogExecutionException; import org.apache.iotdb.cluster.exception.PartitionTableUnavailableException; import org.apache.iotdb.cluster.exception.StartUpCheckFailureException; +import org.apache.iotdb.cluster.log.logtypes.AddNodeLog; import org.apache.iotdb.cluster.log.logtypes.CloseFileLog; +import org.apache.iotdb.cluster.log.logtypes.RemoveNodeLog; import org.apache.iotdb.cluster.log.snapshot.MetaSimpleSnapshot; import org.apache.iotdb.cluster.metadata.CMManager; import org.apache.iotdb.cluster.partition.PartitionGroup; @@ -423,7 +425,7 @@ public class MetaGroupMemberTest extends MemberTest { } @Override - public void exile(AsyncMethodCallback<Void> resultHandler) { + public void exile(ByteBuffer removeNodeLog, AsyncMethodCallback<Void> resultHandler) { System.out.printf("%s was exiled%n", node); exiledNode = node; } @@ -431,7 +433,7 @@ public class MetaGroupMemberTest extends MemberTest { @Override public void removeNode(Node node, AsyncMethodCallback<Long> resultHandler) { new Thread(() -> { - testMetaMember.applyRemoveNode(node); + testMetaMember.applyRemoveNode(new RemoveNodeLog(TestUtils.seralizePartitionTable, node)); resultHandler.onComplete(Response.RESPONSE_AGREE); }).start(); } @@ -554,7 +556,7 @@ public class MetaGroupMemberTest extends MemberTest { System.out.println("Start testAddNode()"); Node newNode = TestUtils.getNode(10); testMetaMember.onElectionWins(); - testMetaMember.applyAddNode(newNode); + testMetaMember.applyAddNode(new AddNodeLog(TestUtils.seralizePartitionTable, newNode)); assertTrue(partitionTable.getAllNodes().contains(newNode)); } diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index 955deb4..a2083d1 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -2178,7 +2178,7 @@ public class StorageGroupProcessor { * @return load the file successfully * @UsedBy sync module, load external tsfile module. */ - private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile, + private boolean loadTsFileByType(LoadTsFileType type, File tsFileToLoad, TsFileResource tsFileResource, long filePartitionId) throws LoadFileException, DiskSpaceInsufficientException { File targetFile; @@ -2195,7 +2195,7 @@ public class StorageGroupProcessor { } tsFileManagement.add(tsFileResource, false); logger.info("Load tsfile in unsequence list, move file from {} to {}", - syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath()); + tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath()); break; case LOAD_SEQUENCE: targetFile = @@ -2209,7 +2209,7 @@ public class StorageGroupProcessor { } tsFileManagement.add(tsFileResource, true); logger.info("Load tsfile in sequence list, move file from {} to {}", - syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath()); + tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath()); break; default: throw new LoadFileException( @@ -2221,29 +2221,47 @@ public class StorageGroupProcessor { targetFile.getParentFile().mkdirs(); } try { - FileUtils.moveFile(syncedTsFile, targetFile); + FileUtils.moveFile(tsFileToLoad, targetFile); } catch (IOException e) { logger.error("File renaming failed when loading tsfile. Origin: {}, Target: {}", - syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e); + tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath(), e); throw new LoadFileException(String.format( "File renaming failed when loading tsfile. Origin: %s, Target: %s, because %s", - syncedTsFile.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage())); + tsFileToLoad.getAbsolutePath(), targetFile.getAbsolutePath(), e.getMessage())); } - File syncedResourceFile = fsFactory.getFile( - syncedTsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX); + File resourceFileToLoad = fsFactory.getFile( + tsFileToLoad.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX); File targetResourceFile = fsFactory.getFile( targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX); try { - FileUtils.moveFile(syncedResourceFile, targetResourceFile); + FileUtils.moveFile(resourceFileToLoad, targetResourceFile); } catch (IOException e) { logger.error("File renaming failed when loading .resource file. Origin: {}, Target: {}", - syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(), e); + resourceFileToLoad.getAbsolutePath(), targetResourceFile.getAbsolutePath(), e); throw new LoadFileException(String.format( "File renaming failed when loading .resource file. Origin: %s, Target: %s, because %s", - syncedResourceFile.getAbsolutePath(), targetResourceFile.getAbsolutePath(), + resourceFileToLoad.getAbsolutePath(), targetResourceFile.getAbsolutePath(), e.getMessage())); } + + File modFileToLoad = fsFactory.getFile( + tsFileToLoad.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX); + if (modFileToLoad.exists()) { + File targetModFile = fsFactory.getFile( + targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX); + try { + FileUtils.moveFile(modFileToLoad, targetModFile); + } catch (IOException e) { + logger.error("File renaming failed when loading .mod file. Origin: {}, Target: {}", + resourceFileToLoad.getAbsolutePath(), targetModFile.getAbsolutePath(), e); + throw new LoadFileException(String.format( + "File renaming failed when loading .mod file. Origin: %s, Target: %s, because %s", + resourceFileToLoad.getAbsolutePath(), targetModFile.getAbsolutePath(), + e.getMessage())); + } + } + return true; } diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java index d40f1eb..0d64b30 100644 --- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java @@ -45,6 +45,7 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.DropIndexPlan; import org.apache.iotdb.db.qp.physical.sys.FlushPlan; import org.apache.iotdb.db.qp.physical.sys.LoadConfigurationPlan; +import org.apache.iotdb.db.qp.physical.sys.LogPlan; import org.apache.iotdb.db.qp.physical.sys.MNodePlan; import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan; import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan; @@ -348,6 +349,10 @@ public abstract class PhysicalPlan { plan = new StorageGroupMNodePlan(); plan.deserialize(buffer); break; + case CLUSTER_LOG: + plan = new LogPlan(); + plan.deserialize(buffer); + break; default: throw new IOException("unrecognized log type " + type); } @@ -361,7 +366,7 @@ public abstract class PhysicalPlan { REVOKE_USER_PRIVILEGE, GRANT_ROLE_PRIVILEGE, GRANT_USER_PRIVILEGE, GRANT_USER_ROLE, MODIFY_PASSWORD, DELETE_USER, DELETE_STORAGE_GROUP, SHOW_TIMESERIES, DELETE_TIMESERIES, LOAD_CONFIGURATION, CREATE_MULTI_TIMESERIES, ALTER_TIMESERIES, FLUSH, CREATE_INDEX, DROP_INDEX, - CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE + CHANGE_TAG_OFFSET, CHANGE_ALIAS, MNODE, MEASUREMENT_MNODE, STORAGE_GROUP_MNODE, CLUSTER_LOG } public long getIndex() { diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java new file mode 100644 index 0000000..725c803 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/LogPlan.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.qp.physical.sys; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.qp.physical.PhysicalPlan; + +/** + * It's used by cluster to wrap log to plan + */ +public class LogPlan extends PhysicalPlan { + + private ByteBuffer log; + + public LogPlan() { + super(false); + } + + public LogPlan(ByteBuffer log) { + super(false); + this.log = log; + } + + public ByteBuffer getLog() { + return log; + } + + public void setLog(ByteBuffer log) { + this.log = log; + } + + @Override + public List<PartialPath> getPaths() { + return Collections.emptyList(); + } + + @Override + public void serialize(DataOutputStream stream) throws IOException { + stream.writeByte((byte) PhysicalPlanType.CLUSTER_LOG.ordinal()); + stream.writeInt(log.array().length); + stream.write(log.array()); + } + + @Override + public void serialize(ByteBuffer buffer) { + int len = buffer.getInt(); + log = ByteBuffer.wrap(buffer.array(), buffer.position(), len); + } +} diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index b839c53..c41834b 100644 --- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -86,7 +86,8 @@ public enum TSStatusCode { NODE_READ_ONLY(704), CONSISTENCY_FAILURE(705), NO_CONNECTION(706), - NEED_REDIRECTION(707) + NEED_REDIRECTION(707), + PARSE_LOG_ERROR(708) ; diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift index 2a24106..124a154 100644 --- a/thrift/src/main/thrift/cluster.thrift +++ b/thrift/src/main/thrift/cluster.thrift @@ -335,7 +335,6 @@ service RaftService { } - service TSDataService extends RaftService { /** @@ -468,7 +467,7 @@ service TSMetaService extends RaftService { * the commit command by heartbeat since it has been removed, so the leader should tell it * directly that it is no longer in the cluster. **/ - void exile() + void exile(binary removeNodeLog) TNodeStatus queryNodeStatus()
