This is an automated email from the ASF dual-hosted git repository. xingtanzjr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push: new 0767935311 [IOTDB-4758]Delete snapshot after region migration (#7739) 0767935311 is described below commit 076793531138dc0b5e1443f6921637a16e450d52 Author: suchenglong <404083...@qq.com> AuthorDate: Thu Oct 27 14:07:34 2022 +0800 [IOTDB-4758]Delete snapshot after region migration (#7739) --- .../multileader/MultiLeaderConsensus.java | 12 +++++++++ .../multileader/MultiLeaderServerImpl.java | 31 ++++++++++++++++++++++ .../service/MultiLeaderRPCServiceProcessor.java | 31 ++++++++++++++++++++++ .../src/main/thrift/mutlileader.thrift | 10 +++++++ 4 files changed, 84 insertions(+) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java index fc8039c979..fd94fc795f 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java @@ -278,6 +278,10 @@ public class MultiLeaderConsensus implements IConsensus { logger.info("[MultiLeaderConsensus] activate new peer..."); impl.activePeer(peer); + // step 7: spot clean + logger.info("[MultiLeaderConsensus] do spot clean..."); + doSpotClean(peer, impl); + } catch (ConsensusGroupAddPeerException e) { logger.error("cannot execute addPeer() for {}", peer, e); return ConsensusGenericResponse.newBuilder() @@ -289,6 +293,14 @@ public class MultiLeaderConsensus implements IConsensus { return ConsensusGenericResponse.newBuilder().setSuccess(true).build(); } + private void doSpotClean(Peer peer, MultiLeaderServerImpl impl) { + try { + impl.cleanupRemoteSnapshot(peer); + } catch (ConsensusGroupAddPeerException e) { + logger.warn("[MultiLeaderConsensus] failed to cleanup remote snapshot", e); + } + } + @Override public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) { MultiLeaderServerImpl impl = stateMachineMap.get(groupId); diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java index ae13e4df28..911b79c232 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java @@ -40,6 +40,8 @@ import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerReq; import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerRes; import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelReq; import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelRes; +import org.apache.iotdb.consensus.multileader.thrift.TCleanupTransferredSnapshotReq; +import org.apache.iotdb.consensus.multileader.thrift.TCleanupTransferredSnapshotRes; import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq; import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes; import org.apache.iotdb.consensus.multileader.thrift.TRemoveSyncLogChannelReq; @@ -654,4 +656,33 @@ public class MultiLeaderServerImpl { logger.info("set {} active status to {}", this.thisNode, active); this.active = active; } + + public void cleanupRemoteSnapshot(Peer targetPeer) throws ConsensusGroupAddPeerException { + try (SyncMultiLeaderServiceClient client = + syncClientManager.borrowClient(targetPeer.getEndpoint())) { + TCleanupTransferredSnapshotReq req = + new TCleanupTransferredSnapshotReq( + targetPeer.getGroupId().convertToTConsensusGroupId(), latestSnapshotId); + TCleanupTransferredSnapshotRes res = client.cleanupTransferredSnapshot(req); + if (!isSuccess(res.getStatus())) { + throw new ConsensusGroupAddPeerException( + String.format( + "cleanup remote snapshot failed of %s ,status is %s", targetPeer, res.getStatus())); + } + } catch (IOException | TException e) { + throw new ConsensusGroupAddPeerException( + String.format("cleanup remote snapshot failed of %s", targetPeer), e); + } + } + + public void cleanupTransferredSnapshot(String snapshotId) throws ConsensusGroupAddPeerException { + File snapshotDir = new File(storageDir, snapshotId); + if (snapshotDir.exists()) { + try { + FileUtils.deleteDirectory(snapshotDir); + } catch (IOException e) { + throw new ConsensusGroupAddPeerException(e); + } + } + } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java index 3f045e5e66..6bc161add2 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java @@ -34,6 +34,8 @@ import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerReq; import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerRes; import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelReq; import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelRes; +import org.apache.iotdb.consensus.multileader.thrift.TCleanupTransferredSnapshotReq; +import org.apache.iotdb.consensus.multileader.thrift.TCleanupTransferredSnapshotRes; import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq; import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes; import org.apache.iotdb.consensus.multileader.thrift.TLogBatch; @@ -277,5 +279,34 @@ public class MultiLeaderRPCServiceProcessor implements MultiLeaderConsensusIServ new TTriggerSnapshotLoadRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()))); } + @Override + public void cleanupTransferredSnapshot( + TCleanupTransferredSnapshotReq req, + AsyncMethodCallback<TCleanupTransferredSnapshotRes> resultHandler) + throws TException { + ConsensusGroupId groupId = + ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId()); + MultiLeaderServerImpl impl = consensus.getImpl(groupId); + if (impl == null) { + String message = + String.format("unexpected consensusGroupId %s for buildSyncLogChannel request", groupId); + logger.error(message); + TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + status.setMessage(message); + resultHandler.onComplete(new TCleanupTransferredSnapshotRes(status)); + return; + } + TSStatus responseStatus; + try { + impl.cleanupTransferredSnapshot(req.snapshotId); + responseStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + } catch (ConsensusGroupAddPeerException e) { + logger.error(String.format("failed to cleanup transferred snapshot %s", req.snapshotId), e); + responseStatus = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + responseStatus.setMessage(e.getMessage()); + } + resultHandler.onComplete(new TCleanupTransferredSnapshotRes(responseStatus)); + } + public void handleClientExit() {} } diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift index 345fc70900..f85c79c625 100644 --- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift +++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift @@ -94,6 +94,15 @@ struct TTriggerSnapshotLoadRes { 1: required common.TSStatus status } +struct TCleanupTransferredSnapshotReq { + 1: required common.TConsensusGroupId consensusGroupId + 2: required string snapshotId +} + +struct TCleanupTransferredSnapshotRes { + 1: required common.TSStatus status +} + service MultiLeaderConsensusIService { TSyncLogRes syncLog(TSyncLogReq req) TInactivatePeerRes inactivatePeer(TInactivatePeerReq req) @@ -102,4 +111,5 @@ service MultiLeaderConsensusIService { TRemoveSyncLogChannelRes removeSyncLogChannel(TRemoveSyncLogChannelReq req) TSendSnapshotFragmentRes sendSnapshotFragment(TSendSnapshotFragmentReq req) TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq req) + TCleanupTransferredSnapshotRes cleanupTransferredSnapshot(TCleanupTransferredSnapshotReq req) } \ No newline at end of file