smengcl commented on code in PR #9117: URL: https://github.com/apache/ozone/pull/9117#discussion_r2479975912
########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java: ########## @@ -0,0 +1,1138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.om; + +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR; +import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK; +import static org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT; +import static org.apache.hadoop.ozone.om.OmSnapshotManager.COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.FILE_TABLE; +import static org.apache.hadoop.ozone.om.codec.OMDBDefinition.KEY_TABLE; +import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK; + +import com.google.common.annotations.VisibleForTesting; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.utils.BackgroundService; +import org.apache.hadoop.hdds.utils.BackgroundTask; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.DBStoreBuilder; +import org.apache.hadoop.hdds.utils.db.RDBSstFileWriter; +import org.apache.hadoop.hdds.utils.db.RDBStore; +import org.apache.hadoop.hdds.utils.db.RocksDatabase; +import org.apache.hadoop.hdds.utils.db.RocksDatabaseException; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader; +import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator; +import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType; +import org.apache.hadoop.ozone.lock.BootstrapStateHandler; +import org.apache.hadoop.ozone.om.exceptions.OMException; +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; +import org.apache.hadoop.ozone.om.lock.OMLockDetails; +import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone; +import org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse; +import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Background service for defragmenting snapshots in the active snapshot chain. + * When snapshots are taken, they capture the entire OM RocksDB state but may contain + * fragmented data. This service defragments snapshots by creating new compacted + * RocksDB instances with only the necessary data for tracked column families. + * <p> + * The service processes snapshots in the active chain sequentially, starting with + * the first non-defragmented snapshot. For the first snapshot in the chain, it + * performs a full defragmentation by copying all keys. For subsequent snapshots, + * it uses incremental defragmentation based on diffs from the previous defragmented + * snapshot. + */ +public class SnapshotDefragService extends BackgroundService + implements BootstrapStateHandler { + + private static final Logger LOG = + LoggerFactory.getLogger(SnapshotDefragService.class); + + // Use only a single thread for snapshot defragmentation to avoid conflicts + private static final int DEFRAG_CORE_POOL_SIZE = 1; + + private static final String CHECKPOINT_STATE_DEFRAGGED_DIR = OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR; + private static final String TEMP_DIFF_DIR = "tempDiffSstFiles"; // TODO: Put this in OzoneConsts? + + private final OzoneManager ozoneManager; + private final AtomicLong runCount = new AtomicLong(0); + + // Number of snapshots to be processed in a single iteration + private final long snapshotLimitPerTask; + + private final AtomicLong snapshotsDefraggedCount; + private final AtomicBoolean running; + + private final BootstrapStateHandler.Lock lock = new BootstrapStateHandler.Lock(); + + public SnapshotDefragService(long interval, TimeUnit unit, long serviceTimeout, + OzoneManager ozoneManager, OzoneConfiguration configuration) { + super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE, + serviceTimeout, ozoneManager.getThreadNamePrefix()); + this.ozoneManager = ozoneManager; + this.snapshotLimitPerTask = configuration + .getLong(SNAPSHOT_DEFRAG_LIMIT_PER_TASK, + SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT); + snapshotsDefraggedCount = new AtomicLong(0); + running = new AtomicBoolean(false); + } + + @Override + public void start() { + running.set(true); + super.start(); + } + + @VisibleForTesting + public void pause() { + running.set(false); + } + + @VisibleForTesting + public void resume() { + running.set(true); + } + + /** + * Checks if rocks-tools native library is available. + */ + private boolean isRocksToolsNativeLibAvailable() { + try { + return ManagedRawSSTFileReader.tryLoadLibrary(); + } catch (Exception e) { + LOG.warn("Failed to check native code availability", e); + return false; + } + } + + /** + * Checks if a snapshot needs defragmentation by examining its YAML metadata. + */ + private boolean needsDefragmentation(SnapshotInfo snapshotInfo) { + String snapshotPath = OmSnapshotManager.getSnapshotPath( + ozoneManager.getConfiguration(), snapshotInfo); + + try { + // Read YAML metadata using the correct API + File yamlFile = new File(snapshotPath + ".yaml"); + OmSnapshotLocalDataYaml yamlData = OmSnapshotLocalDataYaml.getFromYamlFile( + ozoneManager.getOmSnapshotManager(), yamlFile); // TODO: Verify new usage + + // Check if snapshot needs compaction (defragmentation) + boolean needsDefrag = yamlData.getNeedsDefrag(); + LOG.debug("Snapshot {} needsDefragmentation field value: {}", + snapshotInfo.getName(), needsDefrag); + + return needsDefrag; + } catch (IOException e) { + LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag needed", + snapshotInfo.getName(), e); + return true; + } + } + + /** + * Finds the first active snapshot in the chain that needs defragmentation. + */ + private SnapshotInfo findFirstSnapshotNeedingDefrag( + Table<String, SnapshotInfo> snapshotInfoTable) throws IOException { + + LOG.debug("Searching for first snapshot needing defragmentation in active chain"); + + try (TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>> iterator = + snapshotInfoTable.iterator()) { + iterator.seekToFirst(); + + while (iterator.hasNext()) { + Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next(); + SnapshotInfo snapshotInfo = keyValue.getValue(); + + // Skip deleted snapshots + if (snapshotInfo.getSnapshotStatus() == SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) { + LOG.debug("Skipping deleted snapshot: {}", snapshotInfo.getName()); + continue; + } + + // Check if this snapshot needs defragmentation + if (needsDefragmentation(snapshotInfo)) { + LOG.info("Found snapshot needing defragmentation: {} (ID: {})", + snapshotInfo.getName(), snapshotInfo.getSnapshotId()); + return snapshotInfo; + } + + LOG.debug("Snapshot {} already defragmented, continuing search", + snapshotInfo.getName()); + } + } + + LOG.debug("No snapshots found needing defragmentation"); + return null; + } + + /** + * Finds the previous defragmented snapshot in the chain. + */ + private SnapshotInfo findPreviousDefraggedSnapshot(SnapshotInfo currentSnapshot, + Table<String, SnapshotInfo> snapshotInfoTable) throws IOException { + + LOG.debug("Searching for previous defragmented snapshot before: {}", + currentSnapshot.getName()); + + // Walk backwards through the snapshot chain using pathPreviousSnapshotId + String previousSnapshotId = currentSnapshot.getPathPreviousSnapshotId() != null ? + currentSnapshot.getPathPreviousSnapshotId().toString() : null; + + while (previousSnapshotId != null) { + try (TableIterator<String, ? extends Table.KeyValue<String, SnapshotInfo>> iterator = + snapshotInfoTable.iterator()) { + iterator.seekToFirst(); + + while (iterator.hasNext()) { + Table.KeyValue<String, SnapshotInfo> keyValue = iterator.next(); + SnapshotInfo snapshotInfo = keyValue.getValue(); + + if (snapshotInfo.getSnapshotId().toString().equals(previousSnapshotId)) { + if (!needsDefragmentation(snapshotInfo)) { + LOG.info("Found previous defragmented snapshot: {} (ID: {})", + snapshotInfo.getName(), snapshotInfo.getSnapshotId()); + return snapshotInfo; + } + + // Continue searching with this snapshot's previous + previousSnapshotId = snapshotInfo.getPathPreviousSnapshotId() != null ? + snapshotInfo.getPathPreviousSnapshotId().toString() : null; + break; + } + } + } + } + + LOG.debug("No previous defragmented snapshot found"); + return null; + } + + /** + * Performs full defragmentation for the first snapshot in the chain. + * This is a simplified implementation that demonstrates the concept. + */ + private void performFullDefragmentation(SnapshotInfo snapshotInfo, + OmSnapshot omSnapshot) throws IOException { + + String snapshotPath = OmSnapshotManager.getSnapshotPath( + ozoneManager.getConfiguration(), snapshotInfo); + + // For defraggedDbPath, we need to go up to the parent directory and use checkpointStateDefragged + String parentDir = Paths.get(snapshotPath).getParent().getParent().toString(); + String checkpointDirName = Paths.get(snapshotPath).getFileName().toString(); + String defraggedDbPath = Paths.get(parentDir, CHECKPOINT_STATE_DEFRAGGED_DIR, checkpointDirName).toString(); + + LOG.info("Starting full defragmentation for snapshot: {} at path: {}", + snapshotInfo.getName(), snapshotPath); + LOG.info("Target defragmented DB path: {}", defraggedDbPath); + + // Create defragmented directory + Files.createDirectories(Paths.get(defraggedDbPath)); + + // TODO: Get snapshot checkpoint DB via SnapshotCache + RDBStore originalStore = (RDBStore) omSnapshot.getMetadataManager().getStore(); + RocksDatabase originalDb = originalStore.getDb(); + + LOG.info("Starting defragmentation process for snapshot: {}", snapshotInfo.getName()); + LOG.info("Original DB path: {}", snapshotPath); + LOG.info("Defragmented DB path: {}", defraggedDbPath); + + // Implement actual RocksDB defragmentation + try { + // 1. Create a new RocksDB instance at defraggedDbPath + DBStoreBuilder defraggedDbBuilder = DBStoreBuilder.newBuilder(ozoneManager.getConfiguration()) Review Comment: Note: Step 2-5 is similar to `SSTFilteringService` but improved because of Step 5 compaction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
