This is an automated email from the ASF dual-hosted git repository. lta pushed a commit to branch cluster_scalability in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 109d432adecdee83d2881e54d07a91ddc87e1761 Author: lta <[email protected]> AuthorDate: Tue Jan 5 11:48:53 2021 +0800 fix some issues of multi-raft --- .../resources/conf/iotdb-cluster.properties | 2 +- .../cluster/client/sync/SyncClientAdaptor.java | 4 +-- .../cluster/log/catchup/SnapshotCatchUpTask.java | 1 + .../manage/FilePartitionedSnapshotLogManager.java | 2 +- .../iotdb/cluster/log/snapshot/FileSnapshot.java | 11 ++++--- .../cluster/log/snapshot/PullSnapshotTask.java | 3 +- .../log/snapshot/PullSnapshotTaskDescriptor.java | 3 +- .../apache/iotdb/cluster/metadata/CMManager.java | 1 + .../apache/iotdb/cluster/metadata/MetaPuller.java | 1 + .../cluster/partition/slot/SlotPartitionTable.java | 9 ++--- .../iotdb/cluster/server/DataClusterServer.java | 18 +++++----- .../iotdb/cluster/server/MetaClusterServer.java | 16 ++++----- .../cluster/server/member/DataGroupMember.java | 9 ++--- .../cluster/server/member/MetaGroupMember.java | 38 +++++++++++++--------- .../cluster/server/service/BaseAsyncService.java | 4 +-- .../cluster/server/service/BaseSyncService.java | 4 +-- .../apache/iotdb/cluster/utils/ClusterUtils.java | 8 ++++- .../cluster/client/sync/SyncClientAdaptorTest.java | 7 ++-- .../iotdb/cluster/common/TestAsyncDataClient.java | 2 +- .../cluster/log/snapshot/DataSnapshotTest.java | 7 ++-- .../cluster/log/snapshot/PullSnapshotTaskTest.java | 2 +- .../cluster/server/member/DataGroupMemberTest.java | 2 +- .../org/apache/iotdb/db/engine/StorageEngine.java | 4 +-- thrift/src/main/thrift/cluster.thrift | 6 ++-- 24 files changed, 93 insertions(+), 71 deletions(-) diff --git a/cluster/src/assembly/resources/conf/iotdb-cluster.properties b/cluster/src/assembly/resources/conf/iotdb-cluster.properties index 7cd8c22..4b9b2f5 100644 --- a/cluster/src/assembly/resources/conf/iotdb-cluster.properties +++ b/cluster/src/assembly/resources/conf/iotdb-cluster.properties @@ -63,7 +63,7 @@ max_concurrent_client_num=10000 default_replica_num=2 # sub raft num for multi-raft -multi_raft_factor=2 +multi_raft_factor=1 # cluster name to identify different clusters # all node's cluster_name in one cluster are the same diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java index 7a31957..019a08b 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptor.java @@ -387,12 +387,12 @@ public class SyncClientAdaptor { } public static ByteBuffer readFile(AsyncDataClient client, String remotePath, long offset, - int fetchSize) + int fetchSize, int raftId) throws InterruptedException, TException { AtomicReference<ByteBuffer> result = new AtomicReference<>(); GenericHandler<ByteBuffer> handler = new GenericHandler<>(client.getNode(), result); - client.readFile(remotePath, offset, fetchSize, handler); + client.readFile(remotePath, offset, fetchSize, raftId, handler); return handler.getResult(RaftServer.getWriteOperationTimeoutMS()); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java index 548a2db..483ba64 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTask.java @@ -59,6 +59,7 @@ public class SnapshotCatchUpTask extends LogCatchUpTask implements Callable<Bool private void doSnapshotCatchUp() throws TException, InterruptedException, LeaderUnknownException { SendSnapshotRequest request = new SendSnapshotRequest(); + request.setRaftId(raftMember.getRaftGroupId()); if (raftMember.getHeader() != null) { request.setHeader(raftMember.getHeader()); } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java index a3b0153..79f3cd1 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/manage/FilePartitionedSnapshotLogManager.java @@ -113,7 +113,7 @@ public class FilePartitionedSnapshotLogManager extends PartitionedSnapshotLogMan // 1.collect tsfile collectTsFiles(); - //2.register the measurement + // 2.register the measurement for (Map.Entry<Integer, Collection<TimeseriesSchema>> entry : slotTimeseries.entrySet()) { int slotNum = entry.getKey(); FileSnapshot snapshot = slotSnapshots.computeIfAbsent(slotNum, diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java index fe3a7a0..9f1b562 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java @@ -311,7 +311,7 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot { if (client != null) { try { client.removeHardLink(resource.getTsFile().getAbsolutePath(), - new GenericHandler<>(sourceNode, null)); + dataGroupMember.getRaftGroupId(), new GenericHandler<>(sourceNode, null)); } catch (TException e) { logger .error("Cannot remove hardlink {} from {}", resource.getTsFile().getAbsolutePath(), @@ -326,7 +326,8 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot { return; } try { - client.removeHardLink(resource.getTsFile().getAbsolutePath()); + client.removeHardLink(resource.getTsFile().getAbsolutePath(), + dataGroupMember.getRaftGroupId()); } catch (TException te) { client.getInputProtocol().getTransport().close(); logger @@ -516,7 +517,8 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot { if (client == null) { throw new IOException("No available client for " + node.toString()); } - ByteBuffer buffer = SyncClientAdaptor.readFile(client, remotePath, offset, fetchSize); + ByteBuffer buffer = SyncClientAdaptor + .readFile(client, remotePath, offset, fetchSize, dataGroupMember.getRaftGroupId()); int len = writeBuffer(buffer, dest); if (len == 0) { break; @@ -552,7 +554,8 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot { try { while (true) { - ByteBuffer buffer = client.readFile(remotePath, offset, fetchSize); + ByteBuffer buffer = client.readFile(remotePath, offset, fetchSize, + dataGroupMember.getRaftGroupId()); int len = writeBuffer(buffer, dest); if (len == 0) { break; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java index f0aa3f0..9dc6231 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java @@ -80,6 +80,7 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> { this.newMember = newMember; this.snapshotFactory = snapshotFactory; this.snapshotSave = snapshotSave; + persistTask(); } @SuppressWarnings("java:S3740") // type cannot be known ahead @@ -162,9 +163,9 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> { @Override public Void call() { - persistTask(); request = new PullSnapshotRequest(); request.setHeader(descriptor.getPreviousHolders().getHeader()); + request.setRaftId(descriptor.getPreviousHolders().getId()); request.setRequiredSlots(descriptor.getSlots()); request.setRequireReadOnly(descriptor.isRequireReadOnly()); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java index 9e1fb90..441e960 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskDescriptor.java @@ -75,6 +75,7 @@ public class PullSnapshotTaskDescriptor { dataOutputStream.writeInt(slot); } + dataOutputStream.writeInt(previousHolders.getId()); dataOutputStream.writeInt(previousHolders.size()); for (Node previousHolder : previousHolders) { SerializeUtils.serialize(previousHolder, dataOutputStream); @@ -90,8 +91,8 @@ public class PullSnapshotTaskDescriptor { slots.add(dataInputStream.readInt()); } + previousHolders = new PartitionGroup(dataInputStream.readInt()); int holderSize = dataInputStream.readInt(); - previousHolders = new PartitionGroup(); for (int i = 0; i < holderSize; i++) { Node node = new Node(); SerializeUtils.deserialize(node, dataInputStream); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java index 01817a4..82ef895 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java @@ -679,6 +679,7 @@ public class CMManager extends MManager { // pull schemas from a remote node PullSchemaRequest pullSchemaRequest = new PullSchemaRequest(); pullSchemaRequest.setHeader(partitionGroup.getHeader()); + pullSchemaRequest.setRaftId(partitionGroup.getId()); pullSchemaRequest.setPrefixPaths(prefixPaths); // decide the node access order with the help of QueryCoordinator diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java index e185e9d..16a4988 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java @@ -137,6 +137,7 @@ public class MetaPuller { // pull schemas from a remote node PullSchemaRequest pullSchemaRequest = new PullSchemaRequest(); pullSchemaRequest.setHeader(partitionGroup.getHeader()); + pullSchemaRequest.setRaftId(partitionGroup.getId()); pullSchemaRequest.setPrefixPaths(prefixPaths.stream().map(PartialPath::getFullPath).collect( Collectors.toList())); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java index 5fcfbcb..f8f89b9 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/partition/slot/SlotPartitionTable.java @@ -265,7 +265,7 @@ public class SlotPartitionTable implements PartitionTable { } SlotNodeAdditionResult result = new SlotNodeAdditionResult(); - for (int raftId = 0 ;raftId < multiRaftFactor; raftId++) { + for (int raftId = 0; raftId < multiRaftFactor; raftId++) { PartitionGroup newGroup = getHeaderGroup(new RaftNode(node, raftId)); if (newGroup.contains(thisNode)) { localGroups.add(newGroup); @@ -296,7 +296,7 @@ public class SlotPartitionTable implements PartitionTable { // move the slots to the new node if any previous node have more slots than the new average int newAvg = totalSlotNumbers / nodeRing.size() / multiRaftFactor; int raftId = 0; - for(int i = 0 ; i < multiRaftFactor; i++) { + for (int i = 0; i < multiRaftFactor; i++) { RaftNode raftNode = new RaftNode(newNode, i); nodeSlotMap.putIfAbsent(raftNode, new ArrayList<>()); previousNodeMap.putIfAbsent(raftNode, new HashMap<>()); @@ -307,10 +307,11 @@ public class SlotPartitionTable implements PartitionTable { if (transferNum > 0) { RaftNode curNode = new RaftNode(newNode, raftId); int numToMove = transferNum; - if(raftId != multiRaftFactor - 1) { + if (raftId != multiRaftFactor - 1) { numToMove = Math.min(numToMove, newAvg - nodeSlotMap.get(curNode).size()); } - List<Integer> slotsToMove = slots.subList(slots.size() - transferNum, slots.size() - transferNum + numToMove); + List<Integer> slotsToMove = slots + .subList(slots.size() - transferNum, slots.size() - transferNum + numToMove); nodeSlotMap.get(curNode).addAll(slotsToMove); for (Integer slot : slotsToMove) { // record what node previously hold the integer diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java index e37aa7e..81ae373 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/DataClusterServer.java @@ -317,12 +317,12 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async } @Override - public void readFile(String filePath, long offset, int length, + public void readFile(String filePath, long offset, int length, int raftId, AsyncMethodCallback<ByteBuffer> resultHandler) { - DataAsyncService service = getDataAsyncService(new RaftNode(thisNode, 0), resultHandler, + DataAsyncService service = getDataAsyncService(new RaftNode(thisNode, raftId), resultHandler, "Read file:" + filePath); if (service != null) { - service.readFile(filePath, offset, length, resultHandler); + service.readFile(filePath, offset, length, raftId, resultHandler); } } @@ -849,8 +849,8 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async } @Override - public ByteBuffer readFile(String filePath, long offset, int length) throws TException { - return getDataSyncService(new RaftNode(thisNode, 0)).readFile(filePath, offset, length); + public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException { + return getDataSyncService(new RaftNode(thisNode, raftId)).readFile(filePath, offset, length, raftId); } @Override @@ -874,14 +874,14 @@ public class DataClusterServer extends RaftServer implements TSDataService.Async } @Override - public void removeHardLink(String hardLinkPath) throws TException { - getDataSyncService(new RaftNode(thisNode, 0)).removeHardLink(hardLinkPath); + public void removeHardLink(String hardLinkPath, int raftId) throws TException { + getDataSyncService(new RaftNode(thisNode, raftId)).removeHardLink(hardLinkPath, raftId); } @Override - public void removeHardLink(String hardLinkPath, + public void removeHardLink(String hardLinkPath, int raftId, AsyncMethodCallback<Void> resultHandler) { - getDataAsyncService(new RaftNode(thisNode, 0), resultHandler, hardLinkPath).removeHardLink(hardLinkPath, + getDataAsyncService(new RaftNode(thisNode, raftId), resultHandler, hardLinkPath).removeHardLink(hardLinkPath, raftId, resultHandler); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java index f8830c0..e4a7304 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/MetaClusterServer.java @@ -222,9 +222,9 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async } @Override - public void readFile(String filePath, long offset, int length, + public void readFile(String filePath, long offset, int length, int raftId, AsyncMethodCallback<ByteBuffer> resultHandler) { - asyncService.readFile(filePath, offset, length, resultHandler); + asyncService.readFile(filePath, offset, length, raftId, resultHandler); } @Override @@ -324,8 +324,8 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async } @Override - public ByteBuffer readFile(String filePath, long offset, int length) throws TException { - return syncService.readFile(filePath, offset, length); + public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException { + return syncService.readFile(filePath, offset, length, raftId); } @Override @@ -334,13 +334,13 @@ public class MetaClusterServer extends RaftServer implements TSMetaService.Async } @Override - public void removeHardLink(String hardLinkPath) throws TException { - syncService.removeHardLink(hardLinkPath); + public void removeHardLink(String hardLinkPath, int raftId) throws TException { + syncService.removeHardLink(hardLinkPath, raftId); } @Override - public void removeHardLink(String hardLinkPath, + public void removeHardLink(String hardLinkPath, int raftId, AsyncMethodCallback<Void> resultHandler) { - asyncService.removeHardLink(hardLinkPath, resultHandler); + asyncService.removeHardLink(hardLinkPath, raftId, resultHandler); } } diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java index 1fb8336..2cd675f 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java @@ -572,8 +572,9 @@ public class DataGroupMember extends RaftMember { * @return the path of the directory that is provided exclusively for the member. */ private String getMemberDir() { - return IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + - "raft" + File.separator + getHeader().nodeIdentifier + File.separator; + return IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + "raft" + + File.separator + getHeader().nodeIdentifier + File.separator + getRaftGroupId() + + File.separator; } public MetaGroupMember getMetaGroupMember() { @@ -625,8 +626,8 @@ public class DataGroupMember extends RaftMember { RaftNode raftNode = metaGroupMember.getPartitionTable().routeToHeaderByTime(storageGroupName, partitionId * StorageEngine.getTimePartitionInterval()); DataGroupMember localDataMember = metaGroupMember.getLocalDataMember(raftNode); - if (localDataMember.getHeader().equals(this.getHeader())) { - localListPair.add(new Pair<>(partitionId, pair.right)); + if (localDataMember.getHeader().equals(thisNode)) { + localListPair.add(pair); } } try { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java index c97223b..d93efc0 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java @@ -532,6 +532,7 @@ public class MetaGroupMember extends RaftMember { newStartUpStatus .setReplicationNumber(ClusterDescriptor.getInstance().getConfig().getReplicationNum()); newStartUpStatus.setClusterName(ClusterDescriptor.getInstance().getConfig().getClusterName()); + newStartUpStatus.setMultiRaftFactor(ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor()); List<String> seedUrls = ClusterDescriptor.getInstance().getConfig().getSeedNodeUrls(); List<Node> seedNodeList = new ArrayList<>(); for (String seedUrl : seedUrls) { @@ -545,7 +546,7 @@ public class MetaGroupMember extends RaftMember { * Send a join cluster request to "node". If the joining is accepted, set the partition table, * start DataClusterServer and ClientServer and initialize DataGroupMembers. * - * @return rue if the node has successfully joined the cluster, false otherwise. + * @return true if the node has successfully joined the cluster, false otherwise. */ private boolean joinCluster(Node node, StartUpStatus startUpStatus) throws TException, InterruptedException, ConfigInconsistentException { @@ -594,18 +595,17 @@ public class MetaGroupMember extends RaftMember { } private void handleConfigInconsistency(AddNodeResponse resp) throws ConfigInconsistentException { - if (logger.isInfoEnabled()) { - CheckStatusResponse checkStatusResponse = resp.getCheckStatusResponse(); - String parameters = - (checkStatusResponse.isPartitionalIntervalEquals() ? "" : ", partition interval") - + (checkStatusResponse.isHashSaltEquals() ? "" : ", hash salt") - + (checkStatusResponse.isReplicationNumEquals() ? "" : ", replication number") - + (checkStatusResponse.isSeedNodeEquals() ? "" : ", seedNodes") - + (checkStatusResponse.isClusterNameEquals() ? "" : ", clusterName"); - logger.error( - "The start up configuration{} conflicts the cluster. Please reset the configurations. ", - parameters.substring(1)); - } + CheckStatusResponse checkStatusResponse = resp.getCheckStatusResponse(); + String parameters = + (checkStatusResponse.isPartitionalIntervalEquals() ? "" : ", partition interval") + + (checkStatusResponse.isHashSaltEquals() ? "" : ", hash salt") + + (checkStatusResponse.isReplicationNumEquals() ? "" : ", replication number") + + (checkStatusResponse.isSeedNodeEquals() ? "" : ", seedNodes") + + (checkStatusResponse.isClusterNameEquals() ? "" : ", clusterName") + + (checkStatusResponse.isMultiRaftFactorEquals() ? "" : ", multiRaftFactor"); + logger.error( + "The start up configuration{} conflicts the cluster. Please reset the configurations. ", + parameters.substring(1)); throw new ConfigInconsistentException(); } @@ -897,6 +897,7 @@ public class MetaGroupMember extends RaftMember { long remotePartitionInterval = remoteStartUpStatus.getPartitionInterval(); int remoteHashSalt = remoteStartUpStatus.getHashSalt(); int remoteReplicationNum = remoteStartUpStatus.getReplicationNumber(); + int remoteMultiRaftFactor = remoteStartUpStatus.getMultiRaftFactor(); String remoteClusterName = remoteStartUpStatus.getClusterName(); List<Node> remoteSeedNodeList = remoteStartUpStatus.getSeedNodeList(); long localPartitionInterval = IoTDBDescriptor.getInstance().getConfig() @@ -904,7 +905,9 @@ public class MetaGroupMember extends RaftMember { int localHashSalt = ClusterConstant.HASH_SALT; int localReplicationNum = ClusterDescriptor.getInstance().getConfig().getReplicationNum(); String localClusterName = ClusterDescriptor.getInstance().getConfig().getClusterName(); + int localMultiRaftFactor = ClusterDescriptor.getInstance().getConfig().getMultiRaftFactor(); boolean partitionIntervalEquals = true; + boolean multiRaftFactorEquals = true; boolean hashSaltEquals = true; boolean replicationNumEquals = true; boolean seedNodeEquals = true; @@ -915,6 +918,11 @@ public class MetaGroupMember extends RaftMember { logger.info("Remote partition interval conflicts with the leader's. Leader: {}, remote: {}", localPartitionInterval, remotePartitionInterval); } + if (localMultiRaftFactor != remoteMultiRaftFactor) { + multiRaftFactorEquals = false; + logger.info("Remote multi-raft factor conflicts with the leader's. Leader: {}, remote: {}", + localMultiRaftFactor, remoteMultiRaftFactor); + } if (localHashSalt != remoteHashSalt) { hashSaltEquals = false; logger.info("Remote hash salt conflicts with the leader's. Leader: {}, remote: {}", @@ -938,11 +946,11 @@ public class MetaGroupMember extends RaftMember { } } if (!(partitionIntervalEquals && hashSaltEquals && replicationNumEquals && seedNodeEquals - && clusterNameEquals)) { + && clusterNameEquals && multiRaftFactorEquals)) { response.setRespNum((int) Response.RESPONSE_NEW_NODE_PARAMETER_CONFLICT); response.setCheckStatusResponse( new CheckStatusResponse(partitionIntervalEquals, hashSaltEquals, - replicationNumEquals, seedNodeEquals, clusterNameEquals)); + replicationNumEquals, seedNodeEquals, clusterNameEquals, multiRaftFactorEquals)); return false; } return true; diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java index 521050e..425a8d9 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseAsyncService.java @@ -104,7 +104,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface { } @Override - public void readFile(String filePath, long offset, int length, + public void readFile(String filePath, long offset, int length, int raftId, AsyncMethodCallback<ByteBuffer> resultHandler) { try { resultHandler.onComplete(IOUtils.readFile(filePath, offset, length)); @@ -114,7 +114,7 @@ public abstract class BaseAsyncService implements RaftService.AsyncIface { } @Override - public void removeHardLink(String hardLinkPath, + public void removeHardLink(String hardLinkPath, int raftId, AsyncMethodCallback<Void> resultHandler) { try { Files.deleteIfExists(new File(hardLinkPath).toPath()); diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java index f9bda1f..92f7018 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/service/BaseSyncService.java @@ -122,7 +122,7 @@ public abstract class BaseSyncService implements RaftService.Iface { } @Override - public ByteBuffer readFile(String filePath, long offset, int length) throws TException { + public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException { try { return IOUtils.readFile(filePath, offset, length); } catch (IOException e) { @@ -131,7 +131,7 @@ public abstract class BaseSyncService implements RaftService.Iface { } @Override - public void removeHardLink(String hardLinkPath) throws TException { + public void removeHardLink(String hardLinkPath, int raftId) throws TException { try { Files.deleteIfExists(new File(hardLinkPath).toPath()); } catch (IOException e) { diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java index a777f5a..4d1c87a 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java @@ -82,12 +82,18 @@ public class ClusterUtils { boolean replicationNumEquals = true; boolean seedNodeListEquals = true; boolean clusterNameEqual = true; + boolean multiRaftFactorEqual = true; if (localStartUpStatus.getPartitionInterval() != remoteStartUpStatus.getPartitionInterval()) { partitionIntervalEquals = false; logger.error("Remote partition interval conflicts with local. local: {}, remote: {}", localStartUpStatus.getPartitionInterval(), remoteStartUpStatus.getPartitionInterval()); } + if (localStartUpStatus.getMultiRaftFactor() != remoteStartUpStatus.getMultiRaftFactor()) { + multiRaftFactorEqual = false; + logger.error("Remote multi-raft factor conflicts with local. local: {}, remote: {}", + localStartUpStatus.getMultiRaftFactor(), remoteStartUpStatus.getMultiRaftFactor()); + } if (localStartUpStatus.getHashSalt() != remoteStartUpStatus.getHashSalt()) { hashSaltEquals = false; logger.error("Remote hash salt conflicts with local. local: {}, remote: {}", @@ -115,7 +121,7 @@ public class ClusterUtils { } return new CheckStatusResponse(partitionIntervalEquals, hashSaltEquals, - replicationNumEquals, seedNodeListEquals, clusterNameEqual); + replicationNumEquals, seedNodeListEquals, clusterNameEqual, multiRaftFactorEqual); } public static boolean checkSeedNodes(boolean isClusterEstablished, List<Node> localSeedNodes, diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java index dcc582a..88deb3d 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java @@ -64,7 +64,6 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan; import org.apache.iotdb.db.query.context.QueryContext; import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.read.common.Path; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema; import org.apache.thrift.TException; @@ -94,7 +93,7 @@ public class SyncClientAdaptorTest { @Before public void setUp() { nodeStatus = new TNodeStatus(); - checkStatusResponse = new CheckStatusResponse(true, false, true, false, true); + checkStatusResponse = new CheckStatusResponse(true, false, true, false, true, true); addNodeResponse = new AddNodeResponse((int) Response.RESPONSE_AGREE); aggregateResults = Arrays.asList(ByteBuffer.wrap("1".getBytes()), ByteBuffer.wrap("2".getBytes()), ByteBuffer.wrap("2".getBytes())); @@ -247,7 +246,7 @@ public class SyncClientAdaptorTest { } @Override - public void readFile(String filePath, long offset, int length, + public void readFile(String filePath, long offset, int length, int raftId, AsyncMethodCallback<ByteBuffer> resultHandler) { resultHandler.onComplete(readFileResult); } @@ -338,7 +337,7 @@ public class SyncClientAdaptorTest { TestUtils.getNode(0), 0, paths)); assertEquals(1L, (long) SyncClientAdaptor.getGroupByExecutor(dataClient, new GroupByRequest())); assertEquals(fillResult, SyncClientAdaptor.previousFill(dataClient, new PreviousFillRequest())); - assertEquals(readFileResult, SyncClientAdaptor.readFile(dataClient, "a file", 0, 1000)); + assertEquals(readFileResult, SyncClientAdaptor.readFile(dataClient, "a file", 0, 1000, 0)); assertEquals(aggregateResults, SyncClientAdaptor.getGroupByResult(dataClient, TestUtils.getNode(0), 0, 1, 1, 2)); assertEquals(peekNextNotNullValueResult, SyncClientAdaptor.peekNextNotNullValue(dataClient, diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java index 63ce1b7..51f0c8a 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncDataClient.java @@ -117,7 +117,7 @@ public class TestAsyncDataClient extends AsyncDataClient { } @Override - public void readFile(String filePath, long offset, int length, + public void readFile(String filePath, long offset, int length, int raftId, AsyncMethodCallback<ByteBuffer> resultHandler) { new Thread(() -> { File file = new File(filePath); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java index b89d058..38393d1 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java @@ -62,7 +62,7 @@ public abstract class DataSnapshotTest { public AsyncClient getAsyncClient(Node node) { return new AsyncDataClient(null, null, null) { @Override - public void readFile(String filePath, long offset, int length, + public void readFile(String filePath, long offset, int length, int raftId, AsyncMethodCallback<ByteBuffer> resultHandler) { new Thread(() -> { if (addNetFailure && (failureCnt++) % failureFrequency == 0) { @@ -79,8 +79,7 @@ public abstract class DataSnapshotTest { } @Override - public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> resultHandler) - throws TException { + public void removeHardLink(String hardLinkPath, int raftId, AsyncMethodCallback<Void> resultHandler) { new Thread(() -> { try { Files.deleteIfExists(new File(hardLinkPath).toPath()); @@ -121,7 +120,7 @@ public abstract class DataSnapshotTest { } })) { @Override - public ByteBuffer readFile(String filePath, long offset, int length) throws TException { + public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException { if (addNetFailure && (failureCnt++) % failureFrequency == 0) { // simulate failures throw new TException("Faked network failure"); diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java index d9ce485..539e839 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java @@ -119,7 +119,7 @@ public class PullSnapshotTaskTest extends DataSnapshotTest { } @Override - public ByteBuffer readFile(String filePath, long offset, int length) throws TException { + public ByteBuffer readFile(String filePath, long offset, int length, int raftId) throws TException { try { return IOUtils.readFile(filePath, offset, length); } catch (IOException e) { diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java index 8180d4e..c920f06 100644 --- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java +++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java @@ -222,7 +222,7 @@ public class DataGroupMemberTest extends MemberTest { } @Override - public void removeHardLink(String hardLinkPath, AsyncMethodCallback<Void> resultHandler) { + public void removeHardLink(String hardLinkPath, int raftId, AsyncMethodCallback<Void> resultHandler) { new Thread(() -> { try { Files.deleteIfExists(new File(hardLinkPath).toPath()); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java index 781836f..b03f984 100644 --- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java @@ -500,9 +500,7 @@ public class StorageEngine implements IService { * @throws StorageGroupNotSetException */ public void closeStorageGroupProcessor(PartialPath storageGroupPath, long partitionId, - boolean isSeq, - boolean isSync) - throws StorageGroupNotSetException { + boolean isSeq, boolean isSync) throws StorageGroupNotSetException { StorageGroupProcessor processor = processorMap.get(storageGroupPath); if (processor == null) { throw new StorageGroupNotSetException(storageGroupPath.getFullPath()); diff --git a/thrift/src/main/thrift/cluster.thrift b/thrift/src/main/thrift/cluster.thrift index 69a4dc5..9019680 100644 --- a/thrift/src/main/thrift/cluster.thrift +++ b/thrift/src/main/thrift/cluster.thrift @@ -132,6 +132,7 @@ struct StartUpStatus { 3: required int replicationNumber 4: required list<Node> seedNodeList 5: required string clusterName + 6: required int multiRaftFactor } // follower -> leader @@ -141,6 +142,7 @@ struct CheckStatusResponse { 3: required bool replicationNumEquals 4: required bool seedNodeEquals 5: required bool clusterNameEquals + 6: required bool multiRaftFactorEquals } struct SendSnapshotRequest { @@ -319,7 +321,7 @@ service RaftService { * bytes, only the remaining will be returned. * Notice that when the last chunk of the file is read, the file will be deleted immediately. **/ - binary readFile(1:string filePath, 2:long offset, 3:int length) + binary readFile(1:string filePath, 2:long offset, 3:int length, 4: int raftId) /** * Test if a log of "index" and "term" exists. @@ -330,7 +332,7 @@ service RaftService { * When a follower finds that it already has a file in a snapshot locally, it calls this * interface to notify the leader to remove the associated hardlink. **/ - void removeHardLink(1: string hardLinkPath) + void removeHardLink(1: string hardLinkPath, 2: int raftId) }
