swamirishi commented on code in PR #9133:
URL: https://github.com/apache/ozone/pull/9133#discussion_r2427405791
##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java:
##########
@@ -60,6 +60,17 @@ public void put(byte[] key, byte[] value) throws
RocksDatabaseException {
}
}
+ public void delete(byte[] key) throws RocksDatabaseException {
+ try {
+ sstFileWriter.delete(key);
+ keyCounter.incrementAndGet();
+ } catch (RocksDBException e) {
Review Comment:
@smengcl We should make this class public this is currently package private
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static
org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDatabase;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader;
+import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Background service for defragmenting snapshots in the active snapshot chain.
+ * When snapshots are taken, they capture the entire OM RocksDB state but may
contain
+ * fragmented data. This service defragments snapshots by creating new
compacted
+ * RocksDB instances with only the necessary data for tracked column families.
+ * <p>
+ * The service processes snapshots in the active chain sequentially, starting
with
+ * the first non-defragmented snapshot. For the first snapshot in the chain, it
+ * performs a full defragmentation by copying all keys. For subsequent
snapshots,
+ * it uses incremental defragmentation based on diffs from the previous
defragmented
+ * snapshot.
+ */
+public class SnapshotDefragService extends BackgroundService
+ implements BootstrapStateHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SnapshotDefragService.class);
+
+ // Use only a single thread for snapshot defragmentation to avoid conflicts
+ private static final int DEFRAG_CORE_POOL_SIZE = 1;
+
+ private static final String CHECKPOINT_STATE_DEFRAGGED_DIR =
OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR;
+ private static final String TEMP_DIFF_DIR = "tempDiffSstFiles"; // TODO:
Put this in OzoneConsts?
+
+ private final OzoneManager ozoneManager;
+ private SnapshotChainManager snapshotChainManager;
+ private final AtomicLong runCount = new AtomicLong(0);
+
+ // Number of snapshots to be processed in a single iteration
+ private final long snapshotLimitPerTask;
+
+ private final AtomicLong snapshotsDefraggedCount;
+ private final AtomicBoolean running;
+
+ private final BootstrapStateHandler.Lock lock = new
BootstrapStateHandler.Lock();
+
+ public SnapshotDefragService(long interval, TimeUnit unit, long
serviceTimeout,
+ OzoneManager ozoneManager, OzoneConfiguration configuration) {
+ super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE,
+ serviceTimeout, ozoneManager.getThreadNamePrefix());
+ this.ozoneManager = ozoneManager;
+ this.snapshotLimitPerTask = configuration
+ .getLong(SNAPSHOT_DEFRAG_LIMIT_PER_TASK,
+ SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT);
+ snapshotsDefraggedCount = new AtomicLong(0);
+ running = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void start() {
+ running.set(true);
+ super.start();
+ }
+
+ @VisibleForTesting
+ public void pause() {
+ running.set(false);
+ }
+
+ @VisibleForTesting
+ public void resume() {
+ running.set(true);
+ }
+
+ /**
+ * Checks if rocks-tools native library is available.
+ */
+ private boolean isRocksToolsNativeLibAvailable() {
+ try {
+ return ManagedRawSSTFileReader.tryLoadLibrary();
+ } catch (Exception e) {
+ LOG.warn("Failed to check native code availability", e);
+ return false;
+ }
+ }
+
+ /**
+ * Checks if a snapshot needs defragmentation by examining its YAML metadata.
+ */
+ private boolean needsDefragmentation(SnapshotInfo snapshotInfo) {
+ String snapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), snapshotInfo);
+
+ try {
+ // Read YAML metadata using the correct API
+ File yamlFile = new File(snapshotPath + ".yaml");
+ OmSnapshotLocalDataYaml yamlData =
+
OmSnapshotLocalDataYaml.getFromYamlFile(ozoneManager.getOmSnapshotManager(),
yamlFile);
+
+ // Check if snapshot needs compaction (defragmentation)
+ boolean needsDefrag = yamlData.getNeedsDefrag();
+ LOG.debug("Snapshot {} needsDefragmentation field value: {}",
+ snapshotInfo.getName(), needsDefrag);
+
+ return needsDefrag;
+ } catch (IOException e) {
+ LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag
needed",
+ snapshotInfo.getName(), e);
+ return true;
+ }
+ }
+
+ /**
+ * Finds the first active snapshot in the chain that needs defragmentation.
+ */
+ private SnapshotInfo findFirstSnapshotNeedingDefrag(
Review Comment:
I don't think we really need this function. We can just defrag by checking
while we are looping. Given that it will iterate through the older snapshot
first we just defrag while iterating. Can we remove this function from this
file?
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader;
+import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Background service for defragmenting snapshots in the active snapshot chain.
+ * When snapshots are taken, they capture the entire OM RocksDB state but may
contain
+ * fragmented data. This service defragments snapshots by creating new
compacted
+ * RocksDB instances with only the necessary data for tracked column families.
+ * <p>
+ * The service processes snapshots in the active chain sequentially, starting
with
+ * the first non-defragmented snapshot. For the first snapshot in the chain, it
+ * performs a full defragmentation by copying all keys. For subsequent
snapshots,
+ * it uses incremental defragmentation based on diffs from the previous
defragmented
+ * snapshot.
+ */
+public class SnapshotDefragService extends BackgroundService
+ implements BootstrapStateHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SnapshotDefragService.class);
+
+ // Use only a single thread for snapshot defragmentation to avoid conflicts
+ private static final int DEFRAG_CORE_POOL_SIZE = 1;
+
+ private final OzoneManager ozoneManager;
+ private SnapshotChainManager snapshotChainManager;
+ private final AtomicLong runCount = new AtomicLong(0);
+
+ // Number of snapshots to be processed in a single iteration
+ private final long snapshotLimitPerTask;
+
+ private final AtomicLong snapshotsDefraggedCount;
+ private final AtomicBoolean running;
+
+ private final BootstrapStateHandler.Lock lock = new
BootstrapStateHandler.Lock();
+
+ public SnapshotDefragService(long interval, TimeUnit unit, long
serviceTimeout,
+ OzoneManager ozoneManager, OzoneConfiguration configuration) {
+ super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE,
+ serviceTimeout, ozoneManager.getThreadNamePrefix());
+ this.ozoneManager = ozoneManager;
+ this.snapshotLimitPerTask = configuration
+ .getLong(SNAPSHOT_DEFRAG_LIMIT_PER_TASK,
+ SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT);
+ snapshotsDefraggedCount = new AtomicLong(0);
+ running = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void start() {
+ running.set(true);
+ super.start();
+ }
+
+ @VisibleForTesting
+ public void pause() {
+ running.set(false);
+ }
+
+ @VisibleForTesting
+ public void resume() {
+ running.set(true);
+ }
+
+ /**
+ * Checks if rocks-tools native library is available.
+ */
+ private boolean isRocksToolsNativeLibAvailable() {
+ try {
+ return ManagedRawSSTFileReader.tryLoadLibrary();
+ } catch (Exception e) {
+ LOG.warn("Failed to check native code availability", e);
+ return false;
+ }
+ }
+
+ /**
+ * Checks if a snapshot needs defragmentation by examining its YAML metadata.
+ */
+ private boolean needsDefragmentation(SnapshotInfo snapshotInfo) {
+ String snapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), snapshotInfo);
+
+ try {
+ // Read YAML metadata using the correct API
+ File yamlFile = new File(snapshotPath + ".yaml");
+ OmSnapshotLocalDataYaml yamlData =
+
OmSnapshotLocalDataYaml.getFromYamlFile(ozoneManager.getOmSnapshotManager(),
yamlFile);
+
+ // Check if snapshot needs compaction (defragmentation)
+ boolean needsDefrag = yamlData.getNeedsDefrag();
+ LOG.debug("Snapshot {} needsDefragmentation field value: {}",
+ snapshotInfo.getName(), needsDefrag);
+
+ return needsDefrag;
+ } catch (IOException e) {
+ LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag
needed",
+ snapshotInfo.getName(), e);
+ return true;
+ }
+ }
+
+ /**
+ * Finds the first active snapshot in the chain that needs defragmentation.
+ */
+ private SnapshotInfo findFirstSnapshotNeedingDefrag(
+ Table<String, SnapshotInfo> snapshotInfoTable) throws IOException {
+
+ LOG.debug("Searching for first snapshot needing defragmentation in active
chain");
+
+ // Use iterator(false) to iterate forward through the snapshot chain
+ Iterator<UUID> snapshotIterator = snapshotChainManager.iterator(false);
+
+ while (snapshotIterator.hasNext()) {
+ UUID snapshotId = snapshotIterator.next();
+ String snapshotTableKey = snapshotChainManager.getTableKey(snapshotId);
+ SnapshotInfo snapshotInfo = snapshotInfoTable.get(snapshotTableKey);
+
+ if (snapshotInfo == null) {
+ LOG.warn("Snapshot with ID '{}' not found in snapshot info table",
snapshotId);
+ continue;
+ }
+
+ // Skip deleted snapshots
+ if (snapshotInfo.getSnapshotStatus() ==
SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) {
+ LOG.debug("Skipping deleted snapshot: {}", snapshotInfo.getName());
+ continue;
+ }
+
+ // Check if this snapshot needs defragmentation
+ if (needsDefragmentation(snapshotInfo)) {
+ LOG.info("Found snapshot needing defragmentation: {} (ID: {})",
+ snapshotInfo.getName(), snapshotInfo.getSnapshotId());
+ return snapshotInfo;
+ }
+
+ LOG.debug("Snapshot {} already defragmented, continuing search",
+ snapshotInfo.getName());
+ }
+
+ LOG.debug("No snapshots found needing defragmentation");
+ return null;
+ }
+
+ /**
+ * Performs full defragmentation for the first snapshot in the chain.
+ * This is a simplified implementation that demonstrates the concept.
+ */
+ private void performFullDefragmentation(SnapshotInfo snapshotInfo,
+ OmSnapshot omSnapshot) throws IOException {
+
+ // TODO: Implement full defragmentation
+ }
+
+ /**
+ * Performs incremental defragmentation using diff from previous
defragmented snapshot.
+ */
+ private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
+ SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
+ throws IOException {
+
+ // TODO: Implement incremental defragmentation
+ }
+
+ /**
+ * Updates snapshot metadata to point to the new defragmented DB location.
+ */
+ private void updateSnapshotMetadata(SnapshotInfo snapshotInfo) throws
IOException {
+ String snapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), snapshotInfo);
+
+ LOG.info("Updating snapshot metadata for: {} at path: {}",
+ snapshotInfo.getName(), snapshotPath);
+
+ try {
+ // Read current YAML data using the correct API
+ File yamlFile = new File(snapshotPath + ".yaml");
+ OmSnapshotLocalDataYaml yamlData =
+
OmSnapshotLocalDataYaml.getFromYamlFile(ozoneManager.getOmSnapshotManager(),
yamlFile);
+
+ // Mark as defragmented by setting needsDefrag to false
+ yamlData.setNeedsDefrag(false);
+
+ // Write updated YAML data
+ yamlData.writeToYaml(ozoneManager.getOmSnapshotManager(), yamlFile);
+
+ LOG.info("Successfully updated metadata for snapshot: {}, " +
+ "marked as defragmented (needsDefrag=false)",
+ snapshotInfo.getName());
+
+ } catch (IOException e) {
+ LOG.error("Failed to update metadata for snapshot: {}",
snapshotInfo.getName(), e);
+ throw e;
+ }
+ }
+
+ private final class SnapshotDefragTask implements BackgroundTask {
+
+ @Override
+ public BackgroundTaskResult call() throws Exception {
+ // Check OM leader and readiness
+ if (shouldRun()) {
+ final long count = runCount.incrementAndGet();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initiating Snapshot Defragmentation Task: run # {}",
count);
+ }
+ triggerSnapshotDefragOnce();
+ }
+
+ return EmptyTaskResult.newResult();
+ }
+ }
+
+ public synchronized boolean triggerSnapshotDefragOnce() throws IOException {
+ // Check if rocks-tools native lib is available
+ if (!isRocksToolsNativeLibAvailable()) {
+ LOG.warn("Rocks-tools native library is not available. " +
+ "Stopping SnapshotDefragService.");
+ return false;
+ }
+
+ Optional<OmSnapshotManager> snapshotManager =
Optional.ofNullable(ozoneManager)
+ .map(OzoneManager::getOmSnapshotManager);
+ if (!snapshotManager.isPresent()) {
+ LOG.debug("OmSnapshotManager not available, skipping defragmentation
task");
+ return false;
+ }
+
+ // Get the SnapshotChainManager to iterate through the global snapshot
chain
+ // Set this each time the task runs just in case OmMetadataManager is
restarted
+ snapshotChainManager = ((OmMetadataManagerImpl)
ozoneManager.getMetadataManager()).getSnapshotChainManager();
+
+ final Table<String, SnapshotInfo> snapshotInfoTable =
+ ozoneManager.getMetadataManager().getSnapshotInfoTable();
+
+ long snapshotLimit = snapshotLimitPerTask;
+
+ while (snapshotLimit > 0 && running.get()) {
+ // Find the first snapshot needing defragmentation
+ SnapshotInfo snapshotToDefrag =
findFirstSnapshotNeedingDefrag(snapshotInfoTable);
+
+ if (snapshotToDefrag == null) {
+ LOG.info("No snapshots found needing defragmentation");
+ break;
+ }
+
+ // Acquire SNAPSHOT_GC_LOCK
+ OMLockDetails gcLockDetails = ozoneManager.getMetadataManager().getLock()
Review Comment:
Let us use MultiSnapshotLocks sot that the way locks are taken based on UUID
is abstracted out in that implementation instead of making calling
snapshotToDefrag.getSnapshotId().toString() explicitly
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader;
+import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Background service for defragmenting snapshots in the active snapshot chain.
+ * When snapshots are taken, they capture the entire OM RocksDB state but may
contain
+ * fragmented data. This service defragments snapshots by creating new
compacted
+ * RocksDB instances with only the necessary data for tracked column families.
+ * <p>
+ * The service processes snapshots in the active chain sequentially, starting
with
+ * the first non-defragmented snapshot. For the first snapshot in the chain, it
+ * performs a full defragmentation by copying all keys. For subsequent
snapshots,
+ * it uses incremental defragmentation based on diffs from the previous
defragmented
+ * snapshot.
+ */
+public class SnapshotDefragService extends BackgroundService
+ implements BootstrapStateHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SnapshotDefragService.class);
+
+ // Use only a single thread for snapshot defragmentation to avoid conflicts
+ private static final int DEFRAG_CORE_POOL_SIZE = 1;
+
+ private final OzoneManager ozoneManager;
+ private SnapshotChainManager snapshotChainManager;
+ private final AtomicLong runCount = new AtomicLong(0);
+
+ // Number of snapshots to be processed in a single iteration
+ private final long snapshotLimitPerTask;
+
+ private final AtomicLong snapshotsDefraggedCount;
+ private final AtomicBoolean running;
+
+ private final BootstrapStateHandler.Lock lock = new
BootstrapStateHandler.Lock();
+
+ public SnapshotDefragService(long interval, TimeUnit unit, long
serviceTimeout,
+ OzoneManager ozoneManager, OzoneConfiguration configuration) {
+ super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE,
+ serviceTimeout, ozoneManager.getThreadNamePrefix());
+ this.ozoneManager = ozoneManager;
+ this.snapshotLimitPerTask = configuration
+ .getLong(SNAPSHOT_DEFRAG_LIMIT_PER_TASK,
+ SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT);
+ snapshotsDefraggedCount = new AtomicLong(0);
+ running = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void start() {
+ running.set(true);
+ super.start();
+ }
+
+ @VisibleForTesting
+ public void pause() {
+ running.set(false);
+ }
+
+ @VisibleForTesting
+ public void resume() {
+ running.set(true);
+ }
+
+ /**
+ * Checks if rocks-tools native library is available.
+ */
+ private boolean isRocksToolsNativeLibAvailable() {
+ try {
+ return ManagedRawSSTFileReader.tryLoadLibrary();
+ } catch (Exception e) {
+ LOG.warn("Failed to check native code availability", e);
+ return false;
+ }
+ }
+
+ /**
+ * Checks if a snapshot needs defragmentation by examining its YAML metadata.
+ */
+ private boolean needsDefragmentation(SnapshotInfo snapshotInfo) {
+ String snapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), snapshotInfo);
+
+ try {
+ // Read YAML metadata using the correct API
+ File yamlFile = new File(snapshotPath + ".yaml");
+ OmSnapshotLocalDataYaml yamlData =
+
OmSnapshotLocalDataYaml.getFromYamlFile(ozoneManager.getOmSnapshotManager(),
yamlFile);
+
+ // Check if snapshot needs compaction (defragmentation)
+ boolean needsDefrag = yamlData.getNeedsDefrag();
+ LOG.debug("Snapshot {} needsDefragmentation field value: {}",
+ snapshotInfo.getName(), needsDefrag);
+
+ return needsDefrag;
+ } catch (IOException e) {
+ LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag
needed",
+ snapshotInfo.getName(), e);
+ return true;
+ }
+ }
+
+ /**
+ * Finds the first active snapshot in the chain that needs defragmentation.
+ */
+ private SnapshotInfo findFirstSnapshotNeedingDefrag(
+ Table<String, SnapshotInfo> snapshotInfoTable) throws IOException {
+
+ LOG.debug("Searching for first snapshot needing defragmentation in active
chain");
+
+ // Use iterator(false) to iterate forward through the snapshot chain
+ Iterator<UUID> snapshotIterator = snapshotChainManager.iterator(false);
+
+ while (snapshotIterator.hasNext()) {
+ UUID snapshotId = snapshotIterator.next();
+ String snapshotTableKey = snapshotChainManager.getTableKey(snapshotId);
+ SnapshotInfo snapshotInfo = snapshotInfoTable.get(snapshotTableKey);
+
+ if (snapshotInfo == null) {
+ LOG.warn("Snapshot with ID '{}' not found in snapshot info table",
snapshotId);
+ continue;
+ }
+
+ // Skip deleted snapshots
+ if (snapshotInfo.getSnapshotStatus() ==
SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) {
+ LOG.debug("Skipping deleted snapshot: {}", snapshotInfo.getName());
+ continue;
+ }
+
+ // Check if this snapshot needs defragmentation
+ if (needsDefragmentation(snapshotInfo)) {
+ LOG.info("Found snapshot needing defragmentation: {} (ID: {})",
+ snapshotInfo.getName(), snapshotInfo.getSnapshotId());
+ return snapshotInfo;
+ }
+
+ LOG.debug("Snapshot {} already defragmented, continuing search",
+ snapshotInfo.getName());
+ }
+
+ LOG.debug("No snapshots found needing defragmentation");
+ return null;
+ }
+
+ /**
+ * Performs full defragmentation for the first snapshot in the chain.
+ * This is a simplified implementation that demonstrates the concept.
+ */
+ private void performFullDefragmentation(SnapshotInfo snapshotInfo,
+ OmSnapshot omSnapshot) throws IOException {
+
+ // TODO: Implement full defragmentation
+ }
+
+ /**
+ * Performs incremental defragmentation using diff from previous
defragmented snapshot.
+ */
+ private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
+ SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
+ throws IOException {
+
+ // TODO: Implement incremental defragmentation
+ }
+
+ /**
+ * Updates snapshot metadata to point to the new defragmented DB location.
+ */
+ private void updateSnapshotMetadata(SnapshotInfo snapshotInfo) throws
IOException {
Review Comment:
Let us remove this function this may not be required after
[HDDS-13783](https://issues.apache.org/jira/browse/HDDS-13783)
##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBSstFileWriter.java:
##########
@@ -60,6 +60,17 @@ public void put(byte[] key, byte[] value) throws
RocksDatabaseException {
}
}
+ public void delete(byte[] key) throws RocksDatabaseException {
+ try {
+ sstFileWriter.delete(key);
+ keyCounter.incrementAndGet();
+ } catch (RocksDBException e) {
Review Comment:
Let us use the same class for pruning Dag SstFiles.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader;
+import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Background service for defragmenting snapshots in the active snapshot chain.
+ * When snapshots are taken, they capture the entire OM RocksDB state but may
contain
+ * fragmented data. This service defragments snapshots by creating new
compacted
+ * RocksDB instances with only the necessary data for tracked column families.
+ * <p>
+ * The service processes snapshots in the active chain sequentially, starting
with
+ * the first non-defragmented snapshot. For the first snapshot in the chain, it
+ * performs a full defragmentation by copying all keys. For subsequent
snapshots,
+ * it uses incremental defragmentation based on diffs from the previous
defragmented
+ * snapshot.
+ */
+public class SnapshotDefragService extends BackgroundService
+ implements BootstrapStateHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SnapshotDefragService.class);
+
+ // Use only a single thread for snapshot defragmentation to avoid conflicts
+ private static final int DEFRAG_CORE_POOL_SIZE = 1;
+
+ private final OzoneManager ozoneManager;
+ private SnapshotChainManager snapshotChainManager;
+ private final AtomicLong runCount = new AtomicLong(0);
+
+ // Number of snapshots to be processed in a single iteration
+ private final long snapshotLimitPerTask;
+
+ private final AtomicLong snapshotsDefraggedCount;
+ private final AtomicBoolean running;
+
+ private final BootstrapStateHandler.Lock lock = new
BootstrapStateHandler.Lock();
+
+ public SnapshotDefragService(long interval, TimeUnit unit, long
serviceTimeout,
+ OzoneManager ozoneManager, OzoneConfiguration configuration) {
+ super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE,
+ serviceTimeout, ozoneManager.getThreadNamePrefix());
+ this.ozoneManager = ozoneManager;
+ this.snapshotLimitPerTask = configuration
+ .getLong(SNAPSHOT_DEFRAG_LIMIT_PER_TASK,
+ SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT);
+ snapshotsDefraggedCount = new AtomicLong(0);
+ running = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void start() {
+ running.set(true);
+ super.start();
+ }
+
+ @VisibleForTesting
+ public void pause() {
+ running.set(false);
+ }
+
+ @VisibleForTesting
+ public void resume() {
+ running.set(true);
+ }
+
+ /**
+ * Checks if rocks-tools native library is available.
+ */
+ private boolean isRocksToolsNativeLibAvailable() {
+ try {
+ return ManagedRawSSTFileReader.tryLoadLibrary();
+ } catch (Exception e) {
+ LOG.warn("Failed to check native code availability", e);
+ return false;
+ }
+ }
+
+ /**
+ * Checks if a snapshot needs defragmentation by examining its YAML metadata.
+ */
+ private boolean needsDefragmentation(SnapshotInfo snapshotInfo) {
+ String snapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), snapshotInfo);
+
+ try {
+ // Read YAML metadata using the correct API
+ File yamlFile = new File(snapshotPath + ".yaml");
+ OmSnapshotLocalDataYaml yamlData =
+
OmSnapshotLocalDataYaml.getFromYamlFile(ozoneManager.getOmSnapshotManager(),
yamlFile);
+
+ // Check if snapshot needs compaction (defragmentation)
+ boolean needsDefrag = yamlData.getNeedsDefrag();
+ LOG.debug("Snapshot {} needsDefragmentation field value: {}",
+ snapshotInfo.getName(), needsDefrag);
+
+ return needsDefrag;
+ } catch (IOException e) {
+ LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag
needed",
+ snapshotInfo.getName(), e);
+ return true;
+ }
+ }
+
+ /**
+ * Finds the first active snapshot in the chain that needs defragmentation.
+ */
+ private SnapshotInfo findFirstSnapshotNeedingDefrag(
+ Table<String, SnapshotInfo> snapshotInfoTable) throws IOException {
+
+ LOG.debug("Searching for first snapshot needing defragmentation in active
chain");
+
+ // Use iterator(false) to iterate forward through the snapshot chain
+ Iterator<UUID> snapshotIterator = snapshotChainManager.iterator(false);
+
+ while (snapshotIterator.hasNext()) {
+ UUID snapshotId = snapshotIterator.next();
+ String snapshotTableKey = snapshotChainManager.getTableKey(snapshotId);
+ SnapshotInfo snapshotInfo = snapshotInfoTable.get(snapshotTableKey);
+
+ if (snapshotInfo == null) {
+ LOG.warn("Snapshot with ID '{}' not found in snapshot info table",
snapshotId);
+ continue;
+ }
+
+ // Skip deleted snapshots
+ if (snapshotInfo.getSnapshotStatus() ==
SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) {
+ LOG.debug("Skipping deleted snapshot: {}", snapshotInfo.getName());
+ continue;
+ }
+
+ // Check if this snapshot needs defragmentation
+ if (needsDefragmentation(snapshotInfo)) {
+ LOG.info("Found snapshot needing defragmentation: {} (ID: {})",
+ snapshotInfo.getName(), snapshotInfo.getSnapshotId());
+ return snapshotInfo;
+ }
+
+ LOG.debug("Snapshot {} already defragmented, continuing search",
+ snapshotInfo.getName());
+ }
+
+ LOG.debug("No snapshots found needing defragmentation");
+ return null;
+ }
+
+ /**
+ * Performs full defragmentation for the first snapshot in the chain.
+ * This is a simplified implementation that demonstrates the concept.
+ */
+ private void performFullDefragmentation(SnapshotInfo snapshotInfo,
+ OmSnapshot omSnapshot) throws IOException {
+
+ // TODO: Implement full defragmentation
+ }
+
+ /**
+ * Performs incremental defragmentation using diff from previous
defragmented snapshot.
+ */
+ private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
+ SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
+ throws IOException {
+
+ // TODO: Implement incremental defragmentation
+ }
+
+ /**
+ * Updates snapshot metadata to point to the new defragmented DB location.
+ */
+ private void updateSnapshotMetadata(SnapshotInfo snapshotInfo) throws
IOException {
+ String snapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), snapshotInfo);
+
+ LOG.info("Updating snapshot metadata for: {} at path: {}",
+ snapshotInfo.getName(), snapshotPath);
+
+ try {
+ // Read current YAML data using the correct API
+ File yamlFile = new File(snapshotPath + ".yaml");
+ OmSnapshotLocalDataYaml yamlData =
+
OmSnapshotLocalDataYaml.getFromYamlFile(ozoneManager.getOmSnapshotManager(),
yamlFile);
+
+ // Mark as defragmented by setting needsDefrag to false
+ yamlData.setNeedsDefrag(false);
+
+ // Write updated YAML data
+ yamlData.writeToYaml(ozoneManager.getOmSnapshotManager(), yamlFile);
+
+ LOG.info("Successfully updated metadata for snapshot: {}, " +
+ "marked as defragmented (needsDefrag=false)",
+ snapshotInfo.getName());
+
+ } catch (IOException e) {
+ LOG.error("Failed to update metadata for snapshot: {}",
snapshotInfo.getName(), e);
+ throw e;
+ }
+ }
+
+ private final class SnapshotDefragTask implements BackgroundTask {
+
+ @Override
+ public BackgroundTaskResult call() throws Exception {
+ // Check OM leader and readiness
+ if (shouldRun()) {
+ final long count = runCount.incrementAndGet();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initiating Snapshot Defragmentation Task: run # {}",
count);
+ }
+ triggerSnapshotDefragOnce();
+ }
+
+ return EmptyTaskResult.newResult();
+ }
+ }
+
+ public synchronized boolean triggerSnapshotDefragOnce() throws IOException {
+ // Check if rocks-tools native lib is available
+ if (!isRocksToolsNativeLibAvailable()) {
+ LOG.warn("Rocks-tools native library is not available. " +
+ "Stopping SnapshotDefragService.");
+ return false;
+ }
+
+ Optional<OmSnapshotManager> snapshotManager =
Optional.ofNullable(ozoneManager)
+ .map(OzoneManager::getOmSnapshotManager);
+ if (!snapshotManager.isPresent()) {
+ LOG.debug("OmSnapshotManager not available, skipping defragmentation
task");
+ return false;
+ }
+
+ // Get the SnapshotChainManager to iterate through the global snapshot
chain
+ // Set this each time the task runs just in case OmMetadataManager is
restarted
+ snapshotChainManager = ((OmMetadataManagerImpl)
ozoneManager.getMetadataManager()).getSnapshotChainManager();
+
+ final Table<String, SnapshotInfo> snapshotInfoTable =
+ ozoneManager.getMetadataManager().getSnapshotInfoTable();
+
+ long snapshotLimit = snapshotLimitPerTask;
+
+ while (snapshotLimit > 0 && running.get()) {
+ // Find the first snapshot needing defragmentation
+ SnapshotInfo snapshotToDefrag =
findFirstSnapshotNeedingDefrag(snapshotInfoTable);
+
+ if (snapshotToDefrag == null) {
+ LOG.info("No snapshots found needing defragmentation");
+ break;
+ }
+
+ // Acquire SNAPSHOT_GC_LOCK
+ OMLockDetails gcLockDetails = ozoneManager.getMetadataManager().getLock()
+ .acquireWriteLock(SNAPSHOT_GC_LOCK,
snapshotToDefrag.getSnapshotId().toString());
+ LOG.debug("Acquired SNAPSHOT_GC_LOCK for snapshot: {}, ID: {}",
+ snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+
+ if (!gcLockDetails.isLockAcquired()) {
+ LOG.warn("Failed to acquire SNAPSHOT_GC_LOCK for snapshot: {}",
+ snapshotToDefrag.getName());
+ break;
+ }
+
+ try {
+ LOG.info("Processing snapshot defragmentation for: {} (ID: {})",
+ snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+
+ // Get snapshot through SnapshotCache for proper locking
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshotSupplier =
+ snapshotManager.get().getActiveSnapshot(
Review Comment:
nit: Let us use
```suggestion
snapshotManager.get().getSnapshot(snapshotToDefrag.getSnapshotId())
```
directly
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader;
+import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Background service for defragmenting snapshots in the active snapshot chain.
+ * When snapshots are taken, they capture the entire OM RocksDB state but may
contain
+ * fragmented data. This service defragments snapshots by creating new
compacted
+ * RocksDB instances with only the necessary data for tracked column families.
+ * <p>
+ * The service processes snapshots in the active chain sequentially, starting
with
+ * the first non-defragmented snapshot. For the first snapshot in the chain, it
+ * performs a full defragmentation by copying all keys. For subsequent
snapshots,
+ * it uses incremental defragmentation based on diffs from the previous
defragmented
+ * snapshot.
+ */
+public class SnapshotDefragService extends BackgroundService
+ implements BootstrapStateHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SnapshotDefragService.class);
+
+ // Use only a single thread for snapshot defragmentation to avoid conflicts
+ private static final int DEFRAG_CORE_POOL_SIZE = 1;
+
+ private final OzoneManager ozoneManager;
+ private SnapshotChainManager snapshotChainManager;
+ private final AtomicLong runCount = new AtomicLong(0);
+
+ // Number of snapshots to be processed in a single iteration
+ private final long snapshotLimitPerTask;
+
+ private final AtomicLong snapshotsDefraggedCount;
+ private final AtomicBoolean running;
+
+ private final BootstrapStateHandler.Lock lock = new
BootstrapStateHandler.Lock();
+
+ public SnapshotDefragService(long interval, TimeUnit unit, long
serviceTimeout,
+ OzoneManager ozoneManager, OzoneConfiguration configuration) {
+ super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE,
+ serviceTimeout, ozoneManager.getThreadNamePrefix());
+ this.ozoneManager = ozoneManager;
+ this.snapshotLimitPerTask = configuration
+ .getLong(SNAPSHOT_DEFRAG_LIMIT_PER_TASK,
+ SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT);
+ snapshotsDefraggedCount = new AtomicLong(0);
+ running = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void start() {
+ running.set(true);
+ super.start();
+ }
+
+ @VisibleForTesting
+ public void pause() {
+ running.set(false);
+ }
+
+ @VisibleForTesting
+ public void resume() {
+ running.set(true);
+ }
+
+ /**
+ * Checks if rocks-tools native library is available.
+ */
+ private boolean isRocksToolsNativeLibAvailable() {
+ try {
+ return ManagedRawSSTFileReader.tryLoadLibrary();
+ } catch (Exception e) {
+ LOG.warn("Failed to check native code availability", e);
+ return false;
+ }
+ }
+
+ /**
+ * Checks if a snapshot needs defragmentation by examining its YAML metadata.
+ */
+ private boolean needsDefragmentation(SnapshotInfo snapshotInfo) {
+ String snapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), snapshotInfo);
+
+ try {
+ // Read YAML metadata using the correct API
+ File yamlFile = new File(snapshotPath + ".yaml");
+ OmSnapshotLocalDataYaml yamlData =
+
OmSnapshotLocalDataYaml.getFromYamlFile(ozoneManager.getOmSnapshotManager(),
yamlFile);
+
+ // Check if snapshot needs compaction (defragmentation)
+ boolean needsDefrag = yamlData.getNeedsDefrag();
+ LOG.debug("Snapshot {} needsDefragmentation field value: {}",
+ snapshotInfo.getName(), needsDefrag);
+
+ return needsDefrag;
+ } catch (IOException e) {
+ LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag
needed",
+ snapshotInfo.getName(), e);
+ return true;
+ }
+ }
+
+ /**
+ * Finds the first active snapshot in the chain that needs defragmentation.
+ */
+ private SnapshotInfo findFirstSnapshotNeedingDefrag(
+ Table<String, SnapshotInfo> snapshotInfoTable) throws IOException {
+
+ LOG.debug("Searching for first snapshot needing defragmentation in active
chain");
+
+ // Use iterator(false) to iterate forward through the snapshot chain
+ Iterator<UUID> snapshotIterator = snapshotChainManager.iterator(false);
+
+ while (snapshotIterator.hasNext()) {
+ UUID snapshotId = snapshotIterator.next();
+ String snapshotTableKey = snapshotChainManager.getTableKey(snapshotId);
+ SnapshotInfo snapshotInfo = snapshotInfoTable.get(snapshotTableKey);
+
+ if (snapshotInfo == null) {
+ LOG.warn("Snapshot with ID '{}' not found in snapshot info table",
snapshotId);
+ continue;
+ }
+
+ // Skip deleted snapshots
+ if (snapshotInfo.getSnapshotStatus() ==
SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) {
+ LOG.debug("Skipping deleted snapshot: {}", snapshotInfo.getName());
+ continue;
+ }
+
+ // Check if this snapshot needs defragmentation
+ if (needsDefragmentation(snapshotInfo)) {
+ LOG.info("Found snapshot needing defragmentation: {} (ID: {})",
+ snapshotInfo.getName(), snapshotInfo.getSnapshotId());
+ return snapshotInfo;
+ }
+
+ LOG.debug("Snapshot {} already defragmented, continuing search",
+ snapshotInfo.getName());
+ }
+
+ LOG.debug("No snapshots found needing defragmentation");
+ return null;
+ }
+
+ /**
+ * Performs full defragmentation for the first snapshot in the chain.
+ * This is a simplified implementation that demonstrates the concept.
+ */
+ private void performFullDefragmentation(SnapshotInfo snapshotInfo,
+ OmSnapshot omSnapshot) throws IOException {
+
+ // TODO: Implement full defragmentation
+ }
+
+ /**
+ * Performs incremental defragmentation using diff from previous
defragmented snapshot.
+ */
+ private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
+ SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
+ throws IOException {
+
+ // TODO: Implement incremental defragmentation
+ }
+
+ /**
+ * Updates snapshot metadata to point to the new defragmented DB location.
+ */
+ private void updateSnapshotMetadata(SnapshotInfo snapshotInfo) throws
IOException {
+ String snapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), snapshotInfo);
+
+ LOG.info("Updating snapshot metadata for: {} at path: {}",
+ snapshotInfo.getName(), snapshotPath);
+
+ try {
+ // Read current YAML data using the correct API
+ File yamlFile = new File(snapshotPath + ".yaml");
+ OmSnapshotLocalDataYaml yamlData =
+
OmSnapshotLocalDataYaml.getFromYamlFile(ozoneManager.getOmSnapshotManager(),
yamlFile);
+
+ // Mark as defragmented by setting needsDefrag to false
+ yamlData.setNeedsDefrag(false);
+
+ // Write updated YAML data
+ yamlData.writeToYaml(ozoneManager.getOmSnapshotManager(), yamlFile);
+
+ LOG.info("Successfully updated metadata for snapshot: {}, " +
+ "marked as defragmented (needsDefrag=false)",
+ snapshotInfo.getName());
+
+ } catch (IOException e) {
+ LOG.error("Failed to update metadata for snapshot: {}",
snapshotInfo.getName(), e);
+ throw e;
+ }
+ }
+
+ private final class SnapshotDefragTask implements BackgroundTask {
+
+ @Override
+ public BackgroundTaskResult call() throws Exception {
+ // Check OM leader and readiness
+ if (shouldRun()) {
+ final long count = runCount.incrementAndGet();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initiating Snapshot Defragmentation Task: run # {}",
count);
+ }
+ triggerSnapshotDefragOnce();
+ }
+
+ return EmptyTaskResult.newResult();
+ }
+ }
+
+ public synchronized boolean triggerSnapshotDefragOnce() throws IOException {
+ // Check if rocks-tools native lib is available
+ if (!isRocksToolsNativeLibAvailable()) {
+ LOG.warn("Rocks-tools native library is not available. " +
+ "Stopping SnapshotDefragService.");
+ return false;
+ }
+
+ Optional<OmSnapshotManager> snapshotManager =
Optional.ofNullable(ozoneManager)
+ .map(OzoneManager::getOmSnapshotManager);
+ if (!snapshotManager.isPresent()) {
+ LOG.debug("OmSnapshotManager not available, skipping defragmentation
task");
+ return false;
+ }
+
+ // Get the SnapshotChainManager to iterate through the global snapshot
chain
+ // Set this each time the task runs just in case OmMetadataManager is
restarted
+ snapshotChainManager = ((OmMetadataManagerImpl)
ozoneManager.getMetadataManager()).getSnapshotChainManager();
+
+ final Table<String, SnapshotInfo> snapshotInfoTable =
+ ozoneManager.getMetadataManager().getSnapshotInfoTable();
+
+ long snapshotLimit = snapshotLimitPerTask;
+
+ while (snapshotLimit > 0 && running.get()) {
Review Comment:
This would be unoptimal. We can just iterate through the chain and while
iterating figure if we need to defrag instead of iterating through the chain
over and over again.
O(n^2) vs O(n)
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT;
+import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.FlatResource.SNAPSHOT_GC_LOCK;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader;
+import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Background service for defragmenting snapshots in the active snapshot chain.
+ * When snapshots are taken, they capture the entire OM RocksDB state but may
contain
+ * fragmented data. This service defragments snapshots by creating new
compacted
+ * RocksDB instances with only the necessary data for tracked column families.
+ * <p>
+ * The service processes snapshots in the active chain sequentially, starting
with
+ * the first non-defragmented snapshot. For the first snapshot in the chain, it
+ * performs a full defragmentation by copying all keys. For subsequent
snapshots,
+ * it uses incremental defragmentation based on diffs from the previous
defragmented
+ * snapshot.
+ */
+public class SnapshotDefragService extends BackgroundService
+ implements BootstrapStateHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SnapshotDefragService.class);
+
+ // Use only a single thread for snapshot defragmentation to avoid conflicts
+ private static final int DEFRAG_CORE_POOL_SIZE = 1;
+
+ private final OzoneManager ozoneManager;
+ private SnapshotChainManager snapshotChainManager;
+ private final AtomicLong runCount = new AtomicLong(0);
+
+ // Number of snapshots to be processed in a single iteration
+ private final long snapshotLimitPerTask;
+
+ private final AtomicLong snapshotsDefraggedCount;
+ private final AtomicBoolean running;
+
+ private final BootstrapStateHandler.Lock lock = new
BootstrapStateHandler.Lock();
+
+ public SnapshotDefragService(long interval, TimeUnit unit, long
serviceTimeout,
+ OzoneManager ozoneManager, OzoneConfiguration configuration) {
+ super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE,
+ serviceTimeout, ozoneManager.getThreadNamePrefix());
+ this.ozoneManager = ozoneManager;
+ this.snapshotLimitPerTask = configuration
+ .getLong(SNAPSHOT_DEFRAG_LIMIT_PER_TASK,
+ SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT);
+ snapshotsDefraggedCount = new AtomicLong(0);
+ running = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void start() {
+ running.set(true);
+ super.start();
+ }
+
+ @VisibleForTesting
+ public void pause() {
+ running.set(false);
+ }
+
+ @VisibleForTesting
+ public void resume() {
+ running.set(true);
+ }
+
+ /**
+ * Checks if rocks-tools native library is available.
+ */
+ private boolean isRocksToolsNativeLibAvailable() {
+ try {
+ return ManagedRawSSTFileReader.tryLoadLibrary();
+ } catch (Exception e) {
+ LOG.warn("Failed to check native code availability", e);
+ return false;
+ }
+ }
+
+ /**
+ * Checks if a snapshot needs defragmentation by examining its YAML metadata.
+ */
+ private boolean needsDefragmentation(SnapshotInfo snapshotInfo) {
+ String snapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), snapshotInfo);
+
+ try {
+ // Read YAML metadata using the correct API
+ File yamlFile = new File(snapshotPath + ".yaml");
+ OmSnapshotLocalDataYaml yamlData =
+
OmSnapshotLocalDataYaml.getFromYamlFile(ozoneManager.getOmSnapshotManager(),
yamlFile);
+
+ // Check if snapshot needs compaction (defragmentation)
+ boolean needsDefrag = yamlData.getNeedsDefrag();
+ LOG.debug("Snapshot {} needsDefragmentation field value: {}",
+ snapshotInfo.getName(), needsDefrag);
+
+ return needsDefrag;
+ } catch (IOException e) {
+ LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag
needed",
+ snapshotInfo.getName(), e);
+ return true;
+ }
+ }
+
+ /**
+ * Finds the first active snapshot in the chain that needs defragmentation.
+ */
+ private SnapshotInfo findFirstSnapshotNeedingDefrag(
+ Table<String, SnapshotInfo> snapshotInfoTable) throws IOException {
+
+ LOG.debug("Searching for first snapshot needing defragmentation in active
chain");
+
+ // Use iterator(false) to iterate forward through the snapshot chain
+ Iterator<UUID> snapshotIterator = snapshotChainManager.iterator(false);
+
+ while (snapshotIterator.hasNext()) {
+ UUID snapshotId = snapshotIterator.next();
+ String snapshotTableKey = snapshotChainManager.getTableKey(snapshotId);
+ SnapshotInfo snapshotInfo = snapshotInfoTable.get(snapshotTableKey);
+
+ if (snapshotInfo == null) {
+ LOG.warn("Snapshot with ID '{}' not found in snapshot info table",
snapshotId);
+ continue;
+ }
+
+ // Skip deleted snapshots
+ if (snapshotInfo.getSnapshotStatus() ==
SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) {
+ LOG.debug("Skipping deleted snapshot: {}", snapshotInfo.getName());
+ continue;
+ }
+
+ // Check if this snapshot needs defragmentation
+ if (needsDefragmentation(snapshotInfo)) {
+ LOG.info("Found snapshot needing defragmentation: {} (ID: {})",
+ snapshotInfo.getName(), snapshotInfo.getSnapshotId());
+ return snapshotInfo;
+ }
+
+ LOG.debug("Snapshot {} already defragmented, continuing search",
+ snapshotInfo.getName());
+ }
+
+ LOG.debug("No snapshots found needing defragmentation");
+ return null;
+ }
+
+ /**
+ * Performs full defragmentation for the first snapshot in the chain.
+ * This is a simplified implementation that demonstrates the concept.
+ */
+ private void performFullDefragmentation(SnapshotInfo snapshotInfo,
+ OmSnapshot omSnapshot) throws IOException {
+
+ // TODO: Implement full defragmentation
+ }
+
+ /**
+ * Performs incremental defragmentation using diff from previous
defragmented snapshot.
+ */
+ private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
+ SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
+ throws IOException {
+
+ // TODO: Implement incremental defragmentation
+ }
+
+ /**
+ * Updates snapshot metadata to point to the new defragmented DB location.
+ */
+ private void updateSnapshotMetadata(SnapshotInfo snapshotInfo) throws
IOException {
+ String snapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), snapshotInfo);
+
+ LOG.info("Updating snapshot metadata for: {} at path: {}",
+ snapshotInfo.getName(), snapshotPath);
+
+ try {
+ // Read current YAML data using the correct API
+ File yamlFile = new File(snapshotPath + ".yaml");
+ OmSnapshotLocalDataYaml yamlData =
+
OmSnapshotLocalDataYaml.getFromYamlFile(ozoneManager.getOmSnapshotManager(),
yamlFile);
+
+ // Mark as defragmented by setting needsDefrag to false
+ yamlData.setNeedsDefrag(false);
+
+ // Write updated YAML data
+ yamlData.writeToYaml(ozoneManager.getOmSnapshotManager(), yamlFile);
+
+ LOG.info("Successfully updated metadata for snapshot: {}, " +
+ "marked as defragmented (needsDefrag=false)",
+ snapshotInfo.getName());
+
+ } catch (IOException e) {
+ LOG.error("Failed to update metadata for snapshot: {}",
snapshotInfo.getName(), e);
+ throw e;
+ }
+ }
+
+ private final class SnapshotDefragTask implements BackgroundTask {
+
+ @Override
+ public BackgroundTaskResult call() throws Exception {
+ // Check OM leader and readiness
+ if (shouldRun()) {
+ final long count = runCount.incrementAndGet();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initiating Snapshot Defragmentation Task: run # {}",
count);
+ }
+ triggerSnapshotDefragOnce();
+ }
+
+ return EmptyTaskResult.newResult();
+ }
+ }
+
+ public synchronized boolean triggerSnapshotDefragOnce() throws IOException {
+ // Check if rocks-tools native lib is available
+ if (!isRocksToolsNativeLibAvailable()) {
+ LOG.warn("Rocks-tools native library is not available. " +
+ "Stopping SnapshotDefragService.");
+ return false;
+ }
+
+ Optional<OmSnapshotManager> snapshotManager =
Optional.ofNullable(ozoneManager)
+ .map(OzoneManager::getOmSnapshotManager);
+ if (!snapshotManager.isPresent()) {
+ LOG.debug("OmSnapshotManager not available, skipping defragmentation
task");
+ return false;
+ }
+
+ // Get the SnapshotChainManager to iterate through the global snapshot
chain
+ // Set this each time the task runs just in case OmMetadataManager is
restarted
+ snapshotChainManager = ((OmMetadataManagerImpl)
ozoneManager.getMetadataManager()).getSnapshotChainManager();
+
+ final Table<String, SnapshotInfo> snapshotInfoTable =
+ ozoneManager.getMetadataManager().getSnapshotInfoTable();
+
+ long snapshotLimit = snapshotLimitPerTask;
+
+ while (snapshotLimit > 0 && running.get()) {
+ // Find the first snapshot needing defragmentation
+ SnapshotInfo snapshotToDefrag =
findFirstSnapshotNeedingDefrag(snapshotInfoTable);
+
+ if (snapshotToDefrag == null) {
+ LOG.info("No snapshots found needing defragmentation");
+ break;
+ }
+
+ // Acquire SNAPSHOT_GC_LOCK
+ OMLockDetails gcLockDetails = ozoneManager.getMetadataManager().getLock()
+ .acquireWriteLock(SNAPSHOT_GC_LOCK,
snapshotToDefrag.getSnapshotId().toString());
+ LOG.debug("Acquired SNAPSHOT_GC_LOCK for snapshot: {}, ID: {}",
+ snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+
+ if (!gcLockDetails.isLockAcquired()) {
+ LOG.warn("Failed to acquire SNAPSHOT_GC_LOCK for snapshot: {}",
+ snapshotToDefrag.getName());
+ break;
+ }
+
+ try {
+ LOG.info("Processing snapshot defragmentation for: {} (ID: {})",
+ snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+
+ // Get snapshot through SnapshotCache for proper locking
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshotSupplier =
+ snapshotManager.get().getActiveSnapshot(
+ snapshotToDefrag.getVolumeName(),
+ snapshotToDefrag.getBucketName(),
+ snapshotToDefrag.getName())) {
+
+ OmSnapshot omSnapshot = snapshotSupplier.get();
+
+ UUID pathPreviousSnapshotId =
snapshotToDefrag.getPathPreviousSnapshotId();
+ boolean isFirstSnapshotInPath = pathPreviousSnapshotId == null;
+ if (isFirstSnapshotInPath) {
+ LOG.info("Performing full defragmentation for first snapshot (in
path): {}",
+ snapshotToDefrag.getName());
+ performFullDefragmentation(snapshotToDefrag, omSnapshot);
+ } else {
+ final String psIdtableKey =
snapshotChainManager.getTableKey(pathPreviousSnapshotId);
+ SnapshotInfo previousDefraggedSnapshot =
snapshotInfoTable.get(psIdtableKey);
+
+ LOG.info("Performing incremental defragmentation for snapshot: {}
" +
+ "based on previous defragmented snapshot: {}",
+ snapshotToDefrag.getName(),
previousDefraggedSnapshot.getName());
+
+ // If previous path snapshot is not null, it must have been
defragmented already
+ // Sanity check to ensure previous snapshot exists and is
defragmented
+ if (needsDefragmentation(previousDefraggedSnapshot)) {
+ LOG.error("Fatal error before defragging snapshot: {}. " +
+ "Previous snapshot in path {} was not defragged while it
is expected to be.",
+ snapshotToDefrag.getName(),
previousDefraggedSnapshot.getName());
+ break;
+ }
+
+ performIncrementalDefragmentation(snapshotToDefrag,
+ previousDefraggedSnapshot, omSnapshot);
+ }
+
+ // Update snapshot metadata
+ updateSnapshotMetadata(snapshotToDefrag);
+
+ // Close and evict the original snapshot DB from SnapshotCache
+ // TODO: Implement proper eviction from SnapshotCache
+ LOG.info("Defragmentation completed for snapshot: {}",
+ snapshotToDefrag.getName());
+
+ snapshotLimit--;
+ snapshotsDefraggedCount.getAndIncrement();
+
+ } catch (OMException ome) {
+ if (ome.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) {
+ LOG.info("Snapshot {} was deleted during defragmentation",
+ snapshotToDefrag.getName());
+ } else {
+ LOG.error("OMException during snapshot defragmentation for: {}",
+ snapshotToDefrag.getName(), ome);
+ }
+ }
+
+ } catch (Exception e) {
+ LOG.error("Exception during snapshot defragmentation for: {}",
+ snapshotToDefrag.getName(), e);
+ return false;
+ } finally {
+ // Release SNAPSHOT_GC_LOCK
+ ozoneManager.getMetadataManager().getLock()
+ .releaseWriteLock(SNAPSHOT_GC_LOCK,
snapshotToDefrag.getSnapshotId().toString());
+ LOG.debug("Released SNAPSHOT_GC_LOCK for snapshot: {}, ID: {}",
+ snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ queue.add(new SnapshotDefragTask());
Review Comment:
Note: We can parallelize this operation across buckets
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]