This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0767935311 [IOTDB-4758]Delete snapshot after region migration (#7739)
0767935311 is described below

commit 076793531138dc0b5e1443f6921637a16e450d52
Author: suchenglong <404083...@qq.com>
AuthorDate: Thu Oct 27 14:07:34 2022 +0800

    [IOTDB-4758]Delete snapshot after region migration (#7739)
---
 .../multileader/MultiLeaderConsensus.java          | 12 +++++++++
 .../multileader/MultiLeaderServerImpl.java         | 31 ++++++++++++++++++++++
 .../service/MultiLeaderRPCServiceProcessor.java    | 31 ++++++++++++++++++++++
 .../src/main/thrift/mutlileader.thrift             | 10 +++++++
 4 files changed, 84 insertions(+)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index fc8039c979..fd94fc795f 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -278,6 +278,10 @@ public class MultiLeaderConsensus implements IConsensus {
       logger.info("[MultiLeaderConsensus] activate new peer...");
       impl.activePeer(peer);
 
+      // step 7: spot clean
+      logger.info("[MultiLeaderConsensus] do spot clean...");
+      doSpotClean(peer, impl);
+
     } catch (ConsensusGroupAddPeerException e) {
       logger.error("cannot execute addPeer() for {}", peer, e);
       return ConsensusGenericResponse.newBuilder()
@@ -289,6 +293,14 @@ public class MultiLeaderConsensus implements IConsensus {
     return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
   }
 
+  private void doSpotClean(Peer peer, MultiLeaderServerImpl impl) {
+    try {
+      impl.cleanupRemoteSnapshot(peer);
+    } catch (ConsensusGroupAddPeerException e) {
+      logger.warn("[MultiLeaderConsensus] failed to cleanup remote snapshot", 
e);
+    }
+  }
+
   @Override
   public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer 
peer) {
     MultiLeaderServerImpl impl = stateMachineMap.get(groupId);
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index ae13e4df28..911b79c232 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -40,6 +40,8 @@ import 
org.apache.iotdb.consensus.multileader.thrift.TActivatePeerReq;
 import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerRes;
 import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelReq;
 import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelRes;
+import 
org.apache.iotdb.consensus.multileader.thrift.TCleanupTransferredSnapshotReq;
+import 
org.apache.iotdb.consensus.multileader.thrift.TCleanupTransferredSnapshotRes;
 import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq;
 import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes;
 import org.apache.iotdb.consensus.multileader.thrift.TRemoveSyncLogChannelReq;
@@ -654,4 +656,33 @@ public class MultiLeaderServerImpl {
     logger.info("set {} active status to {}", this.thisNode, active);
     this.active = active;
   }
+
+  public void cleanupRemoteSnapshot(Peer targetPeer) throws 
ConsensusGroupAddPeerException {
+    try (SyncMultiLeaderServiceClient client =
+        syncClientManager.borrowClient(targetPeer.getEndpoint())) {
+      TCleanupTransferredSnapshotReq req =
+          new TCleanupTransferredSnapshotReq(
+              targetPeer.getGroupId().convertToTConsensusGroupId(), 
latestSnapshotId);
+      TCleanupTransferredSnapshotRes res = 
client.cleanupTransferredSnapshot(req);
+      if (!isSuccess(res.getStatus())) {
+        throw new ConsensusGroupAddPeerException(
+            String.format(
+                "cleanup remote snapshot failed of %s ,status is %s", 
targetPeer, res.getStatus()));
+      }
+    } catch (IOException | TException e) {
+      throw new ConsensusGroupAddPeerException(
+          String.format("cleanup remote snapshot failed of %s", targetPeer), 
e);
+    }
+  }
+
+  public void cleanupTransferredSnapshot(String snapshotId) throws 
ConsensusGroupAddPeerException {
+    File snapshotDir = new File(storageDir, snapshotId);
+    if (snapshotDir.exists()) {
+      try {
+        FileUtils.deleteDirectory(snapshotDir);
+      } catch (IOException e) {
+        throw new ConsensusGroupAddPeerException(e);
+      }
+    }
+  }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 3f045e5e66..6bc161add2 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -34,6 +34,8 @@ import 
org.apache.iotdb.consensus.multileader.thrift.TActivatePeerReq;
 import org.apache.iotdb.consensus.multileader.thrift.TActivatePeerRes;
 import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelReq;
 import org.apache.iotdb.consensus.multileader.thrift.TBuildSyncLogChannelRes;
+import 
org.apache.iotdb.consensus.multileader.thrift.TCleanupTransferredSnapshotReq;
+import 
org.apache.iotdb.consensus.multileader.thrift.TCleanupTransferredSnapshotRes;
 import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerReq;
 import org.apache.iotdb.consensus.multileader.thrift.TInactivatePeerRes;
 import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
@@ -277,5 +279,34 @@ public class MultiLeaderRPCServiceProcessor implements 
MultiLeaderConsensusIServ
         new TTriggerSnapshotLoadRes(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())));
   }
 
+  @Override
+  public void cleanupTransferredSnapshot(
+      TCleanupTransferredSnapshotReq req,
+      AsyncMethodCallback<TCleanupTransferredSnapshotRes> resultHandler)
+      throws TException {
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+    MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+    if (impl == null) {
+      String message =
+          String.format("unexpected consensusGroupId %s for 
buildSyncLogChannel request", groupId);
+      logger.error(message);
+      TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      status.setMessage(message);
+      resultHandler.onComplete(new TCleanupTransferredSnapshotRes(status));
+      return;
+    }
+    TSStatus responseStatus;
+    try {
+      impl.cleanupTransferredSnapshot(req.snapshotId);
+      responseStatus = new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    } catch (ConsensusGroupAddPeerException e) {
+      logger.error(String.format("failed to cleanup transferred snapshot %s", 
req.snapshotId), e);
+      responseStatus = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      responseStatus.setMessage(e.getMessage());
+    }
+    resultHandler.onComplete(new 
TCleanupTransferredSnapshotRes(responseStatus));
+  }
+
   public void handleClientExit() {}
 }
diff --git a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift 
b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
index 345fc70900..f85c79c625 100644
--- a/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
+++ b/thrift-multi-leader-consensus/src/main/thrift/mutlileader.thrift
@@ -94,6 +94,15 @@ struct TTriggerSnapshotLoadRes {
   1: required common.TSStatus status
 }
 
+struct TCleanupTransferredSnapshotReq {
+  1: required common.TConsensusGroupId consensusGroupId
+  2: required string snapshotId
+}
+
+struct TCleanupTransferredSnapshotRes {
+  1: required common.TSStatus status
+}
+
 service MultiLeaderConsensusIService {
   TSyncLogRes syncLog(TSyncLogReq req)
   TInactivatePeerRes inactivatePeer(TInactivatePeerReq req)
@@ -102,4 +111,5 @@ service MultiLeaderConsensusIService {
   TRemoveSyncLogChannelRes removeSyncLogChannel(TRemoveSyncLogChannelReq req)
   TSendSnapshotFragmentRes sendSnapshotFragment(TSendSnapshotFragmentReq req)
   TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq req)
+  TCleanupTransferredSnapshotRes 
cleanupTransferredSnapshot(TCleanupTransferredSnapshotReq req)
 }
\ No newline at end of file

Reply via email to