jojochuang commented on code in PR #9324:
URL: https://github.com/apache/ozone/pull/9324#discussion_r2557326373
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java:
##########
@@ -135,48 +196,322 @@ private boolean isRocksToolsNativeLibAvailable() {
}
/**
- * Checks if a snapshot needs defragmentation by examining its YAML metadata.
+ * Determines whether the specified snapshot requires defragmentation and
returns
+ * a pair indicating the need for defragmentation and the corresponding
version of the snapshot.
+ *
+ * @param snapshotInfo Information about the snapshot to be checked for
defragmentation.
+ * @return A pair containing a boolean value and an integer:
+ * - The boolean value indicates whether the snapshot requires
defragmentation
+ * (true if needed, false otherwise).
+ * - The integer represents the version of the snapshot being
evaluated.
+ * @throws IOException If an I/O error occurs while accessing the local
snapshot data or metadata.
*/
- private boolean needsDefragmentation(SnapshotInfo snapshotInfo) {
- if (!SstFilteringService.isSstFiltered(conf, snapshotInfo)) {
- return false;
- }
- try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider
readableOmSnapshotLocalDataProvider =
-
ozoneManager.getOmSnapshotManager().getSnapshotLocalDataManager().getOmSnapshotLocalData(snapshotInfo))
{
- Path snapshotPath = OmSnapshotManager.getSnapshotPath(
- ozoneManager.getMetadataManager(), snapshotInfo,
-
readableOmSnapshotLocalDataProvider.getSnapshotLocalData().getVersion());
+ private Pair<Boolean, Integer> needsDefragmentation(SnapshotInfo
snapshotInfo) throws IOException {
+ // Update snapshot local metadata to point to the correct previous
snapshotId if it was different and check if
+ // snapshot needs defrag.
+ try (WritableOmSnapshotLocalDataProvider
writableOmSnapshotLocalDataProvider =
+
snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotInfo)) {
// Read snapshot local metadata from YAML
// Check if snapshot needs compaction (defragmentation)
- boolean needsDefrag = readableOmSnapshotLocalDataProvider.needsDefrag();
- LOG.debug("Snapshot {} needsDefragmentation field value: {}",
snapshotInfo.getName(), needsDefrag);
+ writableOmSnapshotLocalDataProvider.commit();
+ boolean needsDefrag = writableOmSnapshotLocalDataProvider.needsDefrag();
+ OmSnapshotLocalData localData =
writableOmSnapshotLocalDataProvider.getSnapshotLocalData();
+ if (!needsDefrag) {
+ OmSnapshotLocalData previousLocalData =
writableOmSnapshotLocalDataProvider.getPreviousSnapshotLocalData();
+ LOG.debug("Skipping defragmentation since snapshot has already been
defragmented: id : {}(version: {}=>{}) " +
+ "previousId: {}(version: {})", snapshotInfo.getSnapshotId(),
localData.getVersion(),
+
localData.getVersionSstFileInfos().get(localData.getVersion()).getPreviousSnapshotVersion(),
+ snapshotInfo.getPathPreviousSnapshotId(),
previousLocalData.getVersion());
+ } else {
+ LOG.debug("Snapshot {} needsDefragmentation field value: true",
snapshotInfo.getSnapshotId());
+ }
+ return Pair.of(needsDefrag, localData.getVersion());
+ }
+ }
- return needsDefrag;
- } catch (IOException e) {
- LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag
needed",
- snapshotInfo.getName(), e);
- return true;
+ private Pair<String, String> getTableBounds(Table<String, ?> table) throws
RocksDatabaseException, CodecException {
+ String tableLowestValue = null, tableHighestValue = null;
+ try (TableIterator<String, String> keyIterator = table.keyIterator()) {
+ if (keyIterator.hasNext()) {
+ // Setting the lowest value to the first key in the table.
+ tableLowestValue = keyIterator.next();
+ }
+ keyIterator.seekToLast();
+ if (keyIterator.hasNext()) {
+ // Setting the highest value to the last key in the table.
+ tableHighestValue = keyIterator.next();
+ }
}
+ return Pair.of(tableLowestValue, tableHighestValue);
}
/**
- * Performs full defragmentation for the first snapshot in the chain.
- * This is a simplified implementation that demonstrates the concept.
+ * Performs a full defragmentation process for specified tables in the
metadata manager.
+ * This method processes all the entries in the tables for the provided
prefix information,
+ * deletes specified key ranges, and compacts the tables to remove
tombstones.
+ *
+ * @param checkpointDBStore the metadata manager responsible for managing
tables during the checkpoint process
+ * @param prefixInfo the prefix information used to identify bucket prefix
and determine key ranges in the tables
+ * @param incrementalTables the set of tables for which incremental
defragmentation is performed.
+ * @throws IOException if an I/O error occurs during table operations or
compaction
*/
- private void performFullDefragmentation(SnapshotInfo snapshotInfo,
- OmSnapshot omSnapshot) throws IOException {
-
- // TODO: Implement full defragmentation
+ private void performFullDefragmentation(DBStore checkpointDBStore,
TablePrefixInfo prefixInfo,
+ Set<String> incrementalTables) throws IOException {
+ for (String table : incrementalTables) {
+ Table<String, byte[]> checkpointTable =
checkpointDBStore.getTable(table, StringCodec.get(),
+ ByteArrayCodec.get());
+ String tableBucketPrefix = prefixInfo.getTablePrefix(table);
+ String prefixUpperBound =
getLexicographicallyHigherString(tableBucketPrefix);
+
+ Pair<String, String> tableBounds = getTableBounds(checkpointTable);
+ String tableLowestValue = tableBounds.getLeft();
+ String tableHighestValue = tableBounds.getRight();
+
+ // If lowest value is not null and if the bucket prefix corresponding to
the table is greater than lower then
+ // delete the range between lowest value and bucket prefix.
+ if (tableLowestValue != null &&
tableLowestValue.compareTo(tableBucketPrefix) < 0) {
+ checkpointTable.deleteRange(tableLowestValue, tableBucketPrefix);
+ }
+ // If highest value is not null and if the next higher lexicographical
string of bucket prefix corresponding to
+ // the table is less than equal to the highest value then delete the
range between bucket prefix
+ // and also the highest value.
+ if (tableHighestValue != null &&
tableHighestValue.compareTo(prefixUpperBound) >= 0) {
+ checkpointTable.deleteRange(tableBucketPrefix, tableHighestValue);
+ checkpointTable.delete(tableHighestValue);
+ }
+ // Compact the table completely with kForce to get rid of tombstones.
+ try (ManagedCompactRangeOptions compactRangeOptions = new
ManagedCompactRangeOptions()) {
+
compactRangeOptions.setBottommostLevelCompaction(ManagedCompactRangeOptions.BottommostLevelCompaction.kForce);
+ compactRangeOptions.setExclusiveManualCompaction(true);
+ checkpointDBStore.compactTable(table, compactRangeOptions);
+ }
+ }
}
/**
- * Performs incremental defragmentation using diff from previous
defragmented snapshot.
+ * Performs an incremental defragmentation process, which involves
determining
+ * and processing delta files between snapshots for metadata updates. The
method
+ * computes the changes, manages file ingestion to the checkpoint metadata
manager,
+ * and ensures that all delta files are deleted after processing.
+ *
+ * @param previousSnapshotInfo information about the previous snapshot.
+ * @param snapshotInfo information about the current snapshot for which
+ * incremental defragmentation is performed.
+ * @param snapshotVersion the version of the snapshot to be processed.
+ * @param checkpointStore the dbStore instance where data
+ * updates are ingested after being processed.
+ * @param bucketPrefixInfo table prefix information associated with buckets,
+ * used to determine bounds for processing keys.
+ * @param incrementalTables the set of tables for which incremental
defragmentation is performed.
+ * @throws IOException if an I/O error occurs during processing.
*/
- private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
- SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
+ private void performIncrementalDefragmentation(SnapshotInfo
previousSnapshotInfo, SnapshotInfo snapshotInfo,
+ int snapshotVersion, DBStore checkpointStore, TablePrefixInfo
bucketPrefixInfo, Set<String> incrementalTables)
throws IOException {
+ // Map of delta files grouped on the basis of the tableName.
+ Collection<Pair<Path, SstFileInfo>> allTableDeltaFiles =
this.deltaDiffComputer.getDeltaFiles(
+ previousSnapshotInfo, snapshotInfo, incrementalTables);
+
+ Map<String, List<Pair<Path, SstFileInfo>>> tableGroupedDeltaFiles =
allTableDeltaFiles.stream()
+ .collect(Collectors.groupingBy(pair ->
pair.getValue().getColumnFamily()));
+
+ String volumeName = snapshotInfo.getVolumeName();
+ String bucketName = snapshotInfo.getBucketName();
+
+ Set<Path> filesToBeDeleted = new HashSet<>();
+ // All files computed as delta must be deleted irrespective of whether
ingestion succeeded or not.
+ allTableDeltaFiles.forEach(pair -> filesToBeDeleted.add(pair.getKey()));
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshot =
omSnapshotManager.getActiveSnapshot(volumeName,
+ bucketName, snapshotInfo.getName());
+ UncheckedAutoCloseableSupplier<OmSnapshot> previousSnapshot =
omSnapshotManager.getActiveSnapshot(
+ volumeName, bucketName, previousSnapshotInfo.getName())) {
+ for (Map.Entry<String, List<Pair<Path, SstFileInfo>>> entry :
tableGroupedDeltaFiles.entrySet()) {
+ String table = entry.getKey();
+ List<Pair<Path, SstFileInfo>> deltaFiles = entry.getValue();
+ Path fileToBeIngested;
+ if (deltaFiles.size() == 1 && snapshotVersion > 0) {
+ // If there is only one delta file for the table and the snapshot
version is also not 0 then the same delta
+ // file can reingested into the checkpointStore.
+ fileToBeIngested = deltaFiles.get(0).getKey();
+ } else {
+ Table<String, byte[]> snapshotTable =
snapshot.get().getMetadataManager().getStore()
+ .getTable(table, StringCodec.get(), ByteArrayCodec.get());
+ Table<String, byte[]> previousSnapshotTable =
previousSnapshot.get().getMetadataManager().getStore()
+ .getTable(table, StringCodec.get(), ByteArrayCodec.get());
+
+ String tableBucketPrefix = bucketPrefixInfo.getTablePrefix(table);
+ String sstFileReaderLowerBound =
bucketPrefixInfo.getTablePrefix(entry.getKey());
+ String sstFileReaderUpperBound = null;
+ if (Strings.isNotEmpty(sstFileReaderLowerBound)) {
+ sstFileReaderUpperBound =
getLexicographicallyHigherString(sstFileReaderLowerBound);
+ }
+ List<Path> deltaFilePaths =
deltaFiles.stream().map(Pair::getKey).collect(Collectors.toList());
+ SstFileSetReader sstFileSetReader = new
SstFileSetReader(deltaFilePaths);
+ fileToBeIngested = differTmpDir.resolve(table + "-" +
UUID.randomUUID() + SST_FILE_EXTENSION);
+ // Delete all delta files after reingesting into the checkpointStore.
+ filesToBeDeleted.add(fileToBeIngested);
+ int deltaEntriesCount = 0;
+ try (ClosableIterator<String> keysToCheck =
+
sstFileSetReader.getKeyStreamWithTombstone(sstFileReaderLowerBound,
sstFileReaderUpperBound);
+ TableMergeIterator<String, byte[]> tableMergeIterator = new
TableMergeIterator<>(keysToCheck,
+ tableBucketPrefix, snapshotTable, previousSnapshotTable);
+ RDBSstFileWriter rdbSstFileWriter = new
RDBSstFileWriter(fileToBeIngested.toFile())) {
+ while (tableMergeIterator.hasNext()) {
+ Table.KeyValue<String, List<byte[]>> kvs =
tableMergeIterator.next();
+ // Check if the values are equal or if they are not equal then
the value should be written to the
+ // delta sstFile.
+ if (!Arrays.equals(kvs.getValue().get(0),
kvs.getValue().get(1))) {
+ try (CodecBuffer key =
StringCodec.get().toHeapCodecBuffer(kvs.getKey())) {
+ byte[] keyArray = key.asReadOnlyByteBuffer().array();
+ byte[] val = kvs.getValue().get(0);
+ // If the value is null then add a tombstone to the delta
sstFile.
+ if (val == null) {
+ rdbSstFileWriter.delete(keyArray);
+ } else {
+ rdbSstFileWriter.put(keyArray, val);
+ }
+ }
+ deltaEntriesCount++;
+ }
+ }
+ } catch (RocksDBException e) {
+ throw new RocksDatabaseException("Error while reading sst files.",
e);
+ }
+ if (deltaEntriesCount == 0) {
+ // If there are no delta entries then delete the delta file. No
need to ingest the file as a diff.
+ fileToBeIngested = null;
+ }
+ }
+ if (fileToBeIngested != null) {
+ if (!fileToBeIngested.toFile().exists()) {
+ throw new IOException("Delta file does not exist: " +
fileToBeIngested);
+ }
+ Table checkpointTable = checkpointStore.getTable(table);
+ checkpointTable.loadFromFile(fileToBeIngested.toFile());
+ }
+ }
+ } finally {
+ for (Path path : filesToBeDeleted) {
+ if (path.toFile().exists()) {
+ if (!path.toFile().delete()) {
+ LOG.warn("Failed to delete file: {}", path);
+ }
+ }
+ }
+ }
+ }
- // TODO: Implement incremental defragmentation
+ /**
+ * Ingests non-incremental tables from a snapshot into a checkpoint database
store.
+ * This involves exporting table data from the snapshot to intermediate SST
files
+ * and ingesting them into the corresponding tables in the checkpoint
database store.
+ * Tables that are part of incremental defragmentation are excluded from
this process.
+ *
+ * @param checkpointDBStore the database store where non-incremental tables
are ingested.
+ * @param snapshotInfo the metadata information of the snapshot being
processed.
+ * @param bucketPrefixInfo prefix information used for determining table
prefixes.
+ * @param incrementalTables the set of tables identified for incremental
defragmentation.
+ * @throws IOException if an I/O error occurs during table ingestion or file
operations.
+ */
+ private void ingestNonIncrementalTables(DBStore checkpointDBStore,
+ SnapshotInfo snapshotInfo, TablePrefixInfo bucketPrefixInfo, Set<String>
incrementalTables) throws IOException {
+ String volumeName = snapshotInfo.getVolumeName();
+ String bucketName = snapshotInfo.getBucketName();
+ String snapshotName = snapshotInfo.getName();
+ Set<Path> filesToBeDeleted = new HashSet<>();
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshot =
omSnapshotManager.getActiveSnapshot(volumeName,
+ bucketName, snapshotName)) {
+ DBStore snapshotDBStore = snapshot.get().getMetadataManager().getStore();
+ for (Table snapshotTable : snapshotDBStore.listTables()) {
+ String snapshotTableName = snapshotTable.getName();
+ if (!incrementalTables.contains(snapshotTable.getName())) {
+ Path tmpSstFile = differTmpDir.resolve(snapshotTable.getName() + "-"
+ UUID.randomUUID()
+ + SST_FILE_EXTENSION);
+ filesToBeDeleted.add(tmpSstFile);
+ String prefix = bucketPrefixInfo.getTablePrefix(snapshotTableName);
+ byte[] prefixBytes = Strings.isBlank(prefix) ? null :
StringCodec.get().toPersistedFormat(prefix);
+ Table<byte[], byte[]> snapshotTableBytes =
snapshotDBStore.getTable(snapshotTableName, ByteArrayCodec.get(),
+ ByteArrayCodec.get());
+ snapshotTableBytes.dumpToFileWithPrefix(tmpSstFile.toFile(),
prefixBytes);
+ Table<byte[], byte[]> checkpointTable =
checkpointDBStore.getTable(snapshotTableName, ByteArrayCodec.get(),
+ ByteArrayCodec.get());
+ checkpointTable.loadFromFile(tmpSstFile.toFile());
+ }
+ }
+ } finally {
+ for (Path path : filesToBeDeleted) {
+ if (path.toFile().exists()) {
+ if (!path.toFile().delete()) {
+ LOG.warn("Failed to delete file for ingesting non incremental
table: {}", path);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Atomically switches the snapshot database to a new version by moving the
+ * checkpoint directory to the next version directory and updating relevant
+ * metadata. Ensures data consistency and manages cleanup of older versions.
+ *
+ * @param snapshotId Unique identifier of the snapshot to switch.
+ * @param checkpointPath Path to the checkpoint directory that will be moved
+ * to the new version directory.
+ * @throws IOException If an error occurs during directory operations,
+ * metadata management, lock acquisition, or database
access.
+ */
+ private void atomicSwitchSnapshotDB(UUID snapshotId, Path checkpointPath)
throws IOException {
+ int previousVersion;
+ try (WritableOmSnapshotLocalDataProvider snapshotLocalDataProvider =
+
snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotId)) {
+ OmSnapshotLocalData localData =
snapshotLocalDataProvider.getSnapshotLocalData();
+ Path nextVersionPath =
OmSnapshotManager.getSnapshotPath(ozoneManager.getMetadataManager(), snapshotId,
+ localData.getVersion() + 1);
+ // Remove the directory if it exists.
+ if (nextVersionPath.toFile().exists()) {
+ deleteDirectory(nextVersionPath);
+ }
+ // Move the checkpoint directory to the next version directory.
+ Files.move(checkpointPath, nextVersionPath);
+ RocksDBCheckpoint dbCheckpoint = new RocksDBCheckpoint(nextVersionPath);
+ // Add a new version to the local data file.
+ try (OmMetadataManagerImpl newVersionCheckpointMetadataManager =
+ OmMetadataManagerImpl.createCheckpointMetadataManager(conf,
dbCheckpoint, true)) {
+ RDBStore newVersionCheckpointStore = (RDBStore)
newVersionCheckpointMetadataManager.getStore();
+
snapshotLocalDataProvider.addSnapshotVersion(newVersionCheckpointStore);
+ snapshotLocalDataProvider.commit();
+ }
+ previousVersion = localData.getVersion() - 1;
+ }
+
+ // Binary search smallest existing version and delete the older versions
starting from the smallest version.
+ // This is to ensure efficient crash recovery.
+ int smallestExistingVersion = 0;
+ int largestExistingVersion = previousVersion;
+ while (smallestExistingVersion <= largestExistingVersion) {
+ int midVersion = smallestExistingVersion + (largestExistingVersion -
smallestExistingVersion) / 2;
+ Path path =
OmSnapshotManager.getSnapshotPath(ozoneManager.getMetadataManager(),
snapshotId, midVersion);
+ if (path.toFile().exists()) {
+ largestExistingVersion = midVersion - 1;
+ } else {
+ smallestExistingVersion = midVersion + 1;
+ }
+ }
+
+ // Acquire Snapshot DBHandle lock before removing the older version to
ensure all readers are done with the
+ // snapshot db use.
+ try (UncheckedAutoCloseableSupplier<OMLockDetails> lock =
+
ozoneManager.getOmSnapshotManager().getSnapshotCache().lock(snapshotId)) {
+ if (!lock.get().isLockAcquired()) {
+ throw new IOException("Failed to acquire dbHandlelock on snapshot: " +
snapshotId);
+ }
+ // Delete the older version directories. Always starting deletes from
smallest version to largest version to
+ // ensure binary search works correctly on a later basis.
+ for (int version = smallestExistingVersion; version <= previousVersion;
version++) {
+ Path path =
OmSnapshotManager.getSnapshotPath(ozoneManager.getMetadataManager(),
snapshotId, version);
+ deleteDirectory(path);
+ }
+ }
Review Comment:
I would extract these lines into a method cleanupAndReleaseOlderVersions()
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java:
##########
@@ -192,6 +527,103 @@ public BackgroundTaskResult call() throws Exception {
}
}
+ /**
+ * Creates a new checkpoint by modifying the metadata manager from a
snapshot.
+ * This involves generating a temporary checkpoint and truncating specified
+ * column families from the checkpoint before returning the updated metadata
manager.
+ *
+ * @param snapshotInfo Information about the snapshot for which the
checkpoint
+ * is being created.
+ * @param incrementalColumnFamilies A set of table names representing
incremental
+ * column families to be retained in the
checkpoint.
+ * @return A new instance of OmMetadataManagerImpl initialized with the
modified
+ * checkpoint.
+ * @throws IOException If an I/O error occurs during snapshot processing,
+ * checkpoint creation, or table operations.
+ */
+ private OmMetadataManagerImpl createCheckpoint(SnapshotInfo snapshotInfo,
+ Set<String> incrementalColumnFamilies) throws IOException {
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshot =
omSnapshotManager.getActiveSnapshot(
+ snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(),
snapshotInfo.getName())) {
+ DBCheckpoint checkpoint =
snapshot.get().getMetadataManager().getStore().getCheckpoint(tmpDefragDir,
true);
+ try (OmMetadataManagerImpl metadataManagerBeforeTruncate =
+ OmMetadataManagerImpl.createCheckpointMetadataManager(conf,
checkpoint, false)) {
+ DBStore dbStore = metadataManagerBeforeTruncate.getStore();
+ for (String table : metadataManagerBeforeTruncate.listTableNames()) {
+ if (!incrementalColumnFamilies.contains(table)) {
+ dbStore.dropTable(table);
+ }
+ }
+ } catch (Exception e) {
+ throw new IOException("Failed to close checkpoint of snapshot: " +
snapshotInfo.getSnapshotId(), e);
+ }
+ // This will recreate the column families in the checkpoint.
+ return OmMetadataManagerImpl.createCheckpointMetadataManager(conf,
checkpoint, false);
+ }
+ }
+
+ private void acquireContentLock(UUID snapshotID) throws IOException {
+ lockIds.clear();
+ lockIds.add(snapshotID);
+ OMLockDetails lockDetails = snapshotContentLocks.acquireLock(lockIds);
+ if (!lockDetails.isLockAcquired()) {
+ throw new IOException("Failed to acquire lock on snapshot: " +
snapshotID);
+ }
+ LOG.debug("Acquired MultiSnapshotLocks on snapshot: {}", snapshotID);
+ }
+
+ private boolean checkAndDefragSnapshot(SnapshotChainManager chainManager,
UUID snapshotId) throws IOException {
+ SnapshotInfo snapshotInfo = SnapshotUtils.getSnapshotInfo(ozoneManager,
chainManager, snapshotId);
+
+ if (snapshotInfo.getSnapshotStatus() !=
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE) {
+ LOG.debug("Skipping defragmentation for non-active snapshot: {} (ID:
{})",
+ snapshotInfo.getName(), snapshotInfo.getSnapshotId());
+ return false;
+ }
+ Pair<Boolean, Integer> needsDefragVersionPair =
needsDefragmentation(snapshotInfo);
+ if (!needsDefragVersionPair.getLeft()) {
+ return false;
+ }
+ // Create a checkpoint of the previous snapshot or the current snapshot if
it is the first snapshot in the chain.
+ SnapshotInfo checkpointSnapshotInfo =
snapshotInfo.getPathPreviousSnapshotId() == null ? snapshotInfo :
+ SnapshotUtils.getSnapshotInfo(ozoneManager, chainManager,
snapshotInfo.getPathPreviousSnapshotId());
+
+ OmMetadataManagerImpl checkpointMetadataManager =
createCheckpoint(checkpointSnapshotInfo,
+ COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT);
+ Path checkpointLocation =
checkpointMetadataManager.getStore().getDbLocation().toPath();
+ try {
+ DBStore checkpointDBStore = checkpointMetadataManager.getStore();
+ TablePrefixInfo prefixInfo =
ozoneManager.getMetadataManager().getTableBucketPrefix(snapshotInfo.getVolumeName(),
+ snapshotInfo.getBucketName());
+ // If first snapshot in the chain perform full defragmentation.
+ if (snapshotInfo.getPathPreviousSnapshotId() == null) {
+ performFullDefragmentation(checkpointDBStore, prefixInfo,
COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT);
+ } else {
+ performIncrementalDefragmentation(checkpointSnapshotInfo,
snapshotInfo, needsDefragVersionPair.getValue(),
+ checkpointDBStore, prefixInfo,
COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT);
+ }
+
+ // Acquire Content lock on the snapshot to ensure the contents of the
table doesn't get changed.
+ acquireContentLock(snapshotId);
+ try {
+ // Ingestion of incremental tables KeyTable/FileTable/DirectoryTable
done now we need to just reingest the
+ // remaining tables from the original snapshot.
+ ingestNonIncrementalTables(checkpointDBStore, snapshotInfo,
prefixInfo, COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT);
+ checkpointMetadataManager.close();
+ // Switch the snapshot DB location to the new version.
+ atomicSwitchSnapshotDB(snapshotId, checkpointLocation);
+ } finally {
+ snapshotContentLocks.releaseLock();
Review Comment:
it may hold snapshot content lock for a long time if defrag service waits
for the previous snapshot cache lock. What if we delay the clean-up of older
versions after the release of snapshot content lock?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]