Xushaohong commented on code in PR #4294: URL: https://github.com/apache/ozone/pull/4294#discussion_r1174843439
########## hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/RDBSnapshotProvider.java: ########## @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.utils; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdds.HddsUtils; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.hadoop.ozone.OzoneConsts.SNAPSHOT_CANDIDATE_DIR; + +/** + * The RocksDB specified snapshot provider. + * Supports Incremental Snapshot and Full Snapshot. + * + * The process is as the followings: + * 1. Download the snapshot file from the leader + * 2. Untar the snapshot file to candidate dir + * 3. Return the candidate dir as DBCheckpoint + * + * The difference between incremental and full snapshot is whether to send + * the existing SST file list to the leader or not. + * + */ +public abstract class RDBSnapshotProvider implements Closeable { + + private static final Logger LOG = + LoggerFactory.getLogger(RDBSnapshotProvider.class); + + private final File snapshotDir; + private final File candidateDir; + private final String dbName; + private final AtomicReference<String> lastLeaderRef; + private final AtomicLong numDownloaded; + private FaultInjector injector; + + public RDBSnapshotProvider(File snapshotDir, String dbName) { + this.snapshotDir = snapshotDir; + this.candidateDir = new File(snapshotDir, dbName + SNAPSHOT_CANDIDATE_DIR); + this.dbName = dbName; + this.injector = null; + this.lastLeaderRef = new AtomicReference<>(null); + this.numDownloaded = new AtomicLong(); + init(); + } + + /** + * Initialize or reinitialize the RDB snapshot provider. + */ + public synchronized void init() { + // check parent snapshot dir + if (!snapshotDir.exists()) { + HddsUtils.createDir(snapshotDir.toString()); + } + + LOG.info("Cleaning up the candidate dir: {}", candidateDir); + // cleanup candidate dir + if (candidateDir.exists()) { + FileUtil.fullyDeleteContents(candidateDir); + } else { + // create candidate dir + HddsUtils.createDir(candidateDir.toString()); + } + + // reset leader info + lastLeaderRef.set(null); + } + + /** + * Download the latest DB snapshot(checkpoint) from the Leader. + * + * @param leaderNodeID the ID of leader node + * @return {@link DBCheckpoint} + * @throws IOException + */ + public DBCheckpoint downloadDBSnapshotFromLeader(String leaderNodeID) + throws IOException { + LOG.info("Prepare to download the snapshot from leader OM {} and " + + "reloading state from the snapshot.", leaderNodeID); + checkLeaderConsistent(leaderNodeID); + + String snapshotFileName = getSnapshotFileName(leaderNodeID); + File targetFile = new File(snapshotDir, snapshotFileName); + downloadSnapshot(leaderNodeID, targetFile); + LOG.info("Successfully download the latest snapshot {} from leader OM: {}", + targetFile, leaderNodeID); + + RocksDBCheckpoint checkpoint = getCheckpointFromSnapshotFile(targetFile, + candidateDir, true); + LOG.info("Successfully untar the downloaded snapshot {} at {}.", targetFile, + checkpoint.getCheckpointLocation()); + + numDownloaded.incrementAndGet(); + injectPause(); + return checkpoint; + } + + /** + * Clean up the candidate DB for the following reason: + * 1. If leader changes when installing incremental snapshot + * Notice: here prevents downloading the error IC from the new leader, + * instead, will ask for a full snapshot directly + * 2. Ready to download the full snapshot + * + * @param currentLeader the ID of leader node + */ + private void checkLeaderConsistent(String currentLeader) { + String lastLeader = lastLeaderRef.get(); + if (lastLeader != null) { + if (!lastLeader.equals(currentLeader)) { + LOG.info("Last leader for install snapshot is {}, but current leader " + + "is {}. ", lastLeader, currentLeader); + init(); + lastLeaderRef.set(currentLeader); + } + return; + } + + List<String> files = HAUtils.getExistingSstFiles(candidateDir); + if (!files.isEmpty()) { Review Comment: This is a prospective case. For example, this follower just gets restarted and lost the in-memory info about whose DB this candidate dir stores(lastLeaderRef). Since how long the recovery time is not certain, even if the leader is not changed during the recovery time, the old candidate DB might be outdated or modified. To simplify the solution, here just add the warn log and reinitialize the candidate DB directory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
