swamirishi commented on code in PR #8477: URL: https://github.com/apache/ozone/pull/8477#discussion_r2095348080
########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckPointServletInodeBasedXfer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.util.Time; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import javax.servlet.http.HttpServletRequest; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; + +public class OMDBCheckPointServletInodeBasedXfer extends OMDBCheckpointServlet { + + @Override + public void writeDbDataToStream(DBCheckpoint checkpoint, HttpServletRequest request, OutputStream destination, + List<String> toExcludeList, List<String> excludedList, Path tmpdir) throws IOException, InterruptedException { + + // Key is the InodeID and path is the first encountered file path with this inodeID + // This will be later used to while writing to the tar. + Map<String, Path> copyFiles = new HashMap<>(); + + try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream = tar(destination)) { + RocksDBCheckpointDiffer differ = + getDbStore().getRocksDBCheckpointDiffer(); + DirectoryData sstBackupDir = new DirectoryData(tmpdir, + differ.getSSTBackupDir()); + DirectoryData compactionLogDir = new DirectoryData(tmpdir, + differ.getCompactionLogDir()); + + Set<String> sstFilesToExclude = new HashSet<>(toExcludeList); + + Map<Object, Set<Path>> hardlinkFiles = new HashMap<>(); + + boolean completed = getFilesForArchive(checkpoint, copyFiles, hardlinkFiles, + sstFilesToExclude, + includeSnapshotData(request), excludedList, + sstBackupDir, compactionLogDir); + writeFilesToArchive(copyFiles, hardlinkFiles, archiveOutputStream, + completed, checkpoint.getCheckpointLocation()); + + } catch (Exception e) { + LOG.error("got exception writing to archive " + e); + throw e; + } + } + + private void writeFilesToArchive(Map<String, Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, boolean completed, Path checkpointLocation) + throws IOException { + Map<String, Path> filteredCopyFiles = completed ? copyFiles : + copyFiles.entrySet().stream().filter(e -> e.getValue().getFileName().toString().toLowerCase().endsWith(".sst")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Path metaDirPath = getMetaDirPath(checkpointLocation); + long bytesWritten = 0L; + int filesWritten = 0; + long lastLoggedTime = Time.monotonicNow(); + + // Go through each of the files to be copied and add to archive. + for (Map.Entry<String, Path> entry : filteredCopyFiles.entrySet()) { + Path path = entry.getValue(); + // Confirm the data is in the right place. + if (!path.toString().startsWith(metaDirPath.toString())) { + throw new IOException("tarball file not in metadata dir: " + + path + ": " + metaDirPath); + } + + File file = path.toFile(); + if (!file.isDirectory()) { + filesWritten++; + } + bytesWritten += includeFile(file, entry.getKey(), archiveOutputStream); + // Log progress every 30 seconds + if (Time.monotonicNow() - lastLoggedTime >= 30000) { + LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...", + bytesWritten / (1024), filesWritten); + lastLoggedTime = Time.monotonicNow(); + } + } + + if (completed) { + // TODO: + // 1. take an OM db checkpoint , + // 2. hold a snapshotCache lock, + // 3. copy remaining files + // 4. create hardlink file + } + + } + + boolean getFilesForArchive(DBCheckpoint checkpoint, Map<String,Path> copyFiles, + Map<Object, Set<Path>> hardlinkFiles, + Set<String> sstFilesToExclude, boolean includeSnapshotData, + List<String> excluded, + DirectoryData sstBackupDir, + DirectoryData compactionLogDir) throws IOException { + maxTotalSstSize = + OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; + + // Tarball limits are not implemented for processes that don't + // include snapshots. Currently, this is just for recon. + if (!includeSnapshotData) { + maxTotalSstSize = Long.MAX_VALUE; + } + + AtomicLong copySize = new AtomicLong(0L); + + // Log estimated total data transferred on first request. + if (sstFilesToExclude.isEmpty()) { + logEstimatedTarballSize(checkpoint, includeSnapshotData); + } + + if (includeSnapshotData) { + Set<Path> snapshotPaths = getSnapshotDirs(checkpoint, true); + Path snapshotDir = getSnapshotDir(); + if (!processDir(snapshotDir, copyFiles, hardlinkFiles, sstFilesToExclude, snapshotPaths, excluded, copySize, + null)) { + return false; + } + + // Process the tmp sst compaction dir. + if (!processDir(sstBackupDir.getTmpDir().toPath(), copyFiles, hardlinkFiles, sstFilesToExclude, new HashSet<>(), + excluded, copySize, sstBackupDir.getOriginalDir().toPath())) { + return false; + } + + if (!processDir(compactionLogDir.getTmpDir().toPath(), copyFiles, hardlinkFiles, sstFilesToExclude, + new HashSet<>(), excluded, copySize, compactionLogDir.getOriginalDir().toPath())) { + return false; + } + } + + // Get the active fs files. + Path dir = checkpoint.getCheckpointLocation(); + if (!processDir(dir, copyFiles, hardlinkFiles, sstFilesToExclude, + new HashSet<>(), excluded, copySize, null)) { + return false; + } + + return true; + } + + private boolean processDir(Path dir, Map<String,Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + Set<String> sstFilesToExclude, Set<Path> snapshotPaths, List<String> excluded, AtomicLong copySize, + Path destDir) throws IOException { + + try (Stream<Path> files = Files.list(dir)) { + for (Path file : files.collect(Collectors.toList())) { + File f = file.toFile(); + if (f.isDirectory()) { + // Skip any unexpected snapshot files. + String parent = f.getParent(); + if (parent != null && parent.contains(OM_SNAPSHOT_CHECKPOINT_DIR) + && !snapshotPaths.contains(file)) { + LOG.debug("Skipping unneeded file: " + file); + continue; + } + + // Skip the real compaction log dir. + File compactionLogDir = new File(getDbStore(). + getRocksDBCheckpointDiffer().getCompactionLogDir()); + if (f.equals(compactionLogDir)) { + LOG.debug("Skipping compaction log dir"); + continue; + } + + // Skip the real compaction sst backup dir. + File sstBackupDir = new File(getDbStore(). + getRocksDBCheckpointDiffer().getSSTBackupDir()); + if (f.equals(sstBackupDir)) { + LOG.debug("Skipping sst backup dir"); + continue; + } + // findbugs nonsense + Path filename = file.getFileName(); + if (filename == null) { + throw new IOException("file has no filename:" + file); + } + + // Update the dest dir to point to the sub dir + Path destSubDir = null; + if (destDir != null) { + destSubDir = Paths.get(destDir.toString(), + filename.toString()); + } + if (!processDir(file, copyFiles, hardlinkFiles, sstFilesToExclude, + snapshotPaths, excluded, copySize, destSubDir)) { + return false; + } + } else { + long fileSize = processFile(file, copyFiles, hardlinkFiles, + sstFilesToExclude, excluded, destDir); + if (copySize.get() + fileSize > maxTotalSstSize) { + return false; + } else { + copySize.addAndGet(fileSize); Review Comment: ```suggestion copySize.addAndGet(-fileSize); ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckPointServletInodeBasedXfer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.util.Time; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import javax.servlet.http.HttpServletRequest; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; + +public class OMDBCheckPointServletInodeBasedXfer extends OMDBCheckpointServlet { + + @Override + public void writeDbDataToStream(DBCheckpoint checkpoint, HttpServletRequest request, OutputStream destination, + List<String> toExcludeList, List<String> excludedList, Path tmpdir) throws IOException, InterruptedException { + + // Key is the InodeID and path is the first encountered file path with this inodeID + // This will be later used to while writing to the tar. + Map<String, Path> copyFiles = new HashMap<>(); + + try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream = tar(destination)) { + RocksDBCheckpointDiffer differ = + getDbStore().getRocksDBCheckpointDiffer(); + DirectoryData sstBackupDir = new DirectoryData(tmpdir, + differ.getSSTBackupDir()); + DirectoryData compactionLogDir = new DirectoryData(tmpdir, + differ.getCompactionLogDir()); + + Set<String> sstFilesToExclude = new HashSet<>(toExcludeList); + + Map<Object, Set<Path>> hardlinkFiles = new HashMap<>(); + + boolean completed = getFilesForArchive(checkpoint, copyFiles, hardlinkFiles, + sstFilesToExclude, + includeSnapshotData(request), excludedList, + sstBackupDir, compactionLogDir); + writeFilesToArchive(copyFiles, hardlinkFiles, archiveOutputStream, + completed, checkpoint.getCheckpointLocation()); + + } catch (Exception e) { + LOG.error("got exception writing to archive " + e); + throw e; + } + } + + private void writeFilesToArchive(Map<String, Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, boolean completed, Path checkpointLocation) + throws IOException { + Map<String, Path> filteredCopyFiles = completed ? copyFiles : + copyFiles.entrySet().stream().filter(e -> e.getValue().getFileName().toString().toLowerCase().endsWith(".sst")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Path metaDirPath = getMetaDirPath(checkpointLocation); + long bytesWritten = 0L; + int filesWritten = 0; + long lastLoggedTime = Time.monotonicNow(); + + // Go through each of the files to be copied and add to archive. + for (Map.Entry<String, Path> entry : filteredCopyFiles.entrySet()) { + Path path = entry.getValue(); + // Confirm the data is in the right place. + if (!path.toString().startsWith(metaDirPath.toString())) { + throw new IOException("tarball file not in metadata dir: " + + path + ": " + metaDirPath); + } + + File file = path.toFile(); + if (!file.isDirectory()) { + filesWritten++; + } + bytesWritten += includeFile(file, entry.getKey(), archiveOutputStream); + // Log progress every 30 seconds + if (Time.monotonicNow() - lastLoggedTime >= 30000) { + LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...", + bytesWritten / (1024), filesWritten); + lastLoggedTime = Time.monotonicNow(); + } + } + + if (completed) { + // TODO: + // 1. take an OM db checkpoint , + // 2. hold a snapshotCache lock, + // 3. copy remaining files + // 4. create hardlink file + } + + } + + boolean getFilesForArchive(DBCheckpoint checkpoint, Map<String,Path> copyFiles, + Map<Object, Set<Path>> hardlinkFiles, + Set<String> sstFilesToExclude, boolean includeSnapshotData, + List<String> excluded, + DirectoryData sstBackupDir, + DirectoryData compactionLogDir) throws IOException { + maxTotalSstSize = + OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; + + // Tarball limits are not implemented for processes that don't + // include snapshots. Currently, this is just for recon. + if (!includeSnapshotData) { + maxTotalSstSize = Long.MAX_VALUE; + } + + AtomicLong copySize = new AtomicLong(0L); + + // Log estimated total data transferred on first request. + if (sstFilesToExclude.isEmpty()) { + logEstimatedTarballSize(checkpoint, includeSnapshotData); + } + + if (includeSnapshotData) { + Set<Path> snapshotPaths = getSnapshotDirs(checkpoint, true); + Path snapshotDir = getSnapshotDir(); + if (!processDir(snapshotDir, copyFiles, hardlinkFiles, sstFilesToExclude, snapshotPaths, excluded, copySize, + null)) { + return false; + } + + // Process the tmp sst compaction dir. + if (!processDir(sstBackupDir.getTmpDir().toPath(), copyFiles, hardlinkFiles, sstFilesToExclude, new HashSet<>(), + excluded, copySize, sstBackupDir.getOriginalDir().toPath())) { + return false; + } + + if (!processDir(compactionLogDir.getTmpDir().toPath(), copyFiles, hardlinkFiles, sstFilesToExclude, + new HashSet<>(), excluded, copySize, compactionLogDir.getOriginalDir().toPath())) { + return false; + } + } + + // Get the active fs files. + Path dir = checkpoint.getCheckpointLocation(); + if (!processDir(dir, copyFiles, hardlinkFiles, sstFilesToExclude, + new HashSet<>(), excluded, copySize, null)) { + return false; + } + + return true; + } + + private boolean processDir(Path dir, Map<String,Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + Set<String> sstFilesToExclude, Set<Path> snapshotPaths, List<String> excluded, AtomicLong copySize, + Path destDir) throws IOException { + + try (Stream<Path> files = Files.list(dir)) { + for (Path file : files.collect(Collectors.toList())) { + File f = file.toFile(); + if (f.isDirectory()) { + // Skip any unexpected snapshot files. + String parent = f.getParent(); + if (parent != null && parent.contains(OM_SNAPSHOT_CHECKPOINT_DIR) + && !snapshotPaths.contains(file)) { + LOG.debug("Skipping unneeded file: " + file); + continue; + } + + // Skip the real compaction log dir. + File compactionLogDir = new File(getDbStore(). + getRocksDBCheckpointDiffer().getCompactionLogDir()); + if (f.equals(compactionLogDir)) { + LOG.debug("Skipping compaction log dir"); + continue; + } + + // Skip the real compaction sst backup dir. + File sstBackupDir = new File(getDbStore(). + getRocksDBCheckpointDiffer().getSSTBackupDir()); + if (f.equals(sstBackupDir)) { + LOG.debug("Skipping sst backup dir"); + continue; + } + // findbugs nonsense + Path filename = file.getFileName(); + if (filename == null) { + throw new IOException("file has no filename:" + file); + } + + // Update the dest dir to point to the sub dir + Path destSubDir = null; + if (destDir != null) { + destSubDir = Paths.get(destDir.toString(), + filename.toString()); + } + if (!processDir(file, copyFiles, hardlinkFiles, sstFilesToExclude, + snapshotPaths, excluded, copySize, destSubDir)) { + return false; + } + } else { + long fileSize = processFile(file, copyFiles, hardlinkFiles, + sstFilesToExclude, excluded, destDir); + if (copySize.get() + fileSize > maxTotalSstSize) { Review Comment: ```suggestion if (copySize.get() - fileSize >= 0) { ``` ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckPointServletInodeBasedXfer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.util.Time; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import javax.servlet.http.HttpServletRequest; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; + +public class OMDBCheckPointServletInodeBasedXfer extends OMDBCheckpointServlet { + + @Override + public void writeDbDataToStream(DBCheckpoint checkpoint, HttpServletRequest request, OutputStream destination, + List<String> toExcludeList, List<String> excludedList, Path tmpdir) throws IOException, InterruptedException { + + // Key is the InodeID and path is the first encountered file path with this inodeID + // This will be later used to while writing to the tar. + Map<String, Path> copyFiles = new HashMap<>(); + + try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream = tar(destination)) { + RocksDBCheckpointDiffer differ = + getDbStore().getRocksDBCheckpointDiffer(); + DirectoryData sstBackupDir = new DirectoryData(tmpdir, + differ.getSSTBackupDir()); + DirectoryData compactionLogDir = new DirectoryData(tmpdir, + differ.getCompactionLogDir()); + + Set<String> sstFilesToExclude = new HashSet<>(toExcludeList); + + Map<Object, Set<Path>> hardlinkFiles = new HashMap<>(); + + boolean completed = getFilesForArchive(checkpoint, copyFiles, hardlinkFiles, + sstFilesToExclude, + includeSnapshotData(request), excludedList, + sstBackupDir, compactionLogDir); + writeFilesToArchive(copyFiles, hardlinkFiles, archiveOutputStream, + completed, checkpoint.getCheckpointLocation()); + + } catch (Exception e) { + LOG.error("got exception writing to archive " + e); + throw e; + } + } + + private void writeFilesToArchive(Map<String, Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, boolean completed, Path checkpointLocation) + throws IOException { + Map<String, Path> filteredCopyFiles = completed ? copyFiles : + copyFiles.entrySet().stream().filter(e -> e.getValue().getFileName().toString().toLowerCase().endsWith(".sst")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Path metaDirPath = getMetaDirPath(checkpointLocation); + long bytesWritten = 0L; + int filesWritten = 0; + long lastLoggedTime = Time.monotonicNow(); + + // Go through each of the files to be copied and add to archive. + for (Map.Entry<String, Path> entry : filteredCopyFiles.entrySet()) { + Path path = entry.getValue(); + // Confirm the data is in the right place. + if (!path.toString().startsWith(metaDirPath.toString())) { + throw new IOException("tarball file not in metadata dir: " + + path + ": " + metaDirPath); + } + + File file = path.toFile(); + if (!file.isDirectory()) { + filesWritten++; + } + bytesWritten += includeFile(file, entry.getKey(), archiveOutputStream); + // Log progress every 30 seconds + if (Time.monotonicNow() - lastLoggedTime >= 30000) { + LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...", + bytesWritten / (1024), filesWritten); + lastLoggedTime = Time.monotonicNow(); + } + } + + if (completed) { + // TODO: + // 1. take an OM db checkpoint , + // 2. hold a snapshotCache lock, + // 3. copy remaining files + // 4. create hardlink file + } + + } + + boolean getFilesForArchive(DBCheckpoint checkpoint, Map<String,Path> copyFiles, + Map<Object, Set<Path>> hardlinkFiles, + Set<String> sstFilesToExclude, boolean includeSnapshotData, + List<String> excluded, + DirectoryData sstBackupDir, + DirectoryData compactionLogDir) throws IOException { + maxTotalSstSize = + OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; + + // Tarball limits are not implemented for processes that don't + // include snapshots. Currently, this is just for recon. + if (!includeSnapshotData) { + maxTotalSstSize = Long.MAX_VALUE; + } + + AtomicLong copySize = new AtomicLong(0L); + + // Log estimated total data transferred on first request. + if (sstFilesToExclude.isEmpty()) { + logEstimatedTarballSize(checkpoint, includeSnapshotData); + } + + if (includeSnapshotData) { + Set<Path> snapshotPaths = getSnapshotDirs(checkpoint, true); + Path snapshotDir = getSnapshotDir(); + if (!processDir(snapshotDir, copyFiles, hardlinkFiles, sstFilesToExclude, snapshotPaths, excluded, copySize, + null)) { + return false; + } + + // Process the tmp sst compaction dir. + if (!processDir(sstBackupDir.getTmpDir().toPath(), copyFiles, hardlinkFiles, sstFilesToExclude, new HashSet<>(), + excluded, copySize, sstBackupDir.getOriginalDir().toPath())) { + return false; + } + + if (!processDir(compactionLogDir.getTmpDir().toPath(), copyFiles, hardlinkFiles, sstFilesToExclude, + new HashSet<>(), excluded, copySize, compactionLogDir.getOriginalDir().toPath())) { + return false; + } + } + + // Get the active fs files. + Path dir = checkpoint.getCheckpointLocation(); + if (!processDir(dir, copyFiles, hardlinkFiles, sstFilesToExclude, + new HashSet<>(), excluded, copySize, null)) { + return false; + } + + return true; + } + + private boolean processDir(Path dir, Map<String,Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + Set<String> sstFilesToExclude, Set<Path> snapshotPaths, List<String> excluded, AtomicLong copySize, + Path destDir) throws IOException { + + try (Stream<Path> files = Files.list(dir)) { + for (Path file : files.collect(Collectors.toList())) { + File f = file.toFile(); + if (f.isDirectory()) { + // Skip any unexpected snapshot files. + String parent = f.getParent(); + if (parent != null && parent.contains(OM_SNAPSHOT_CHECKPOINT_DIR) + && !snapshotPaths.contains(file)) { + LOG.debug("Skipping unneeded file: " + file); + continue; + } + + // Skip the real compaction log dir. + File compactionLogDir = new File(getDbStore(). Review Comment: Please remove this complex logic I never liked it being present at the first place in the original implementation. This function need not understand what directory it is copying ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckPointServletInodeBasedXfer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.util.Time; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import javax.servlet.http.HttpServletRequest; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; + +public class OMDBCheckPointServletInodeBasedXfer extends OMDBCheckpointServlet { + + @Override + public void writeDbDataToStream(DBCheckpoint checkpoint, HttpServletRequest request, OutputStream destination, + List<String> toExcludeList, List<String> excludedList, Path tmpdir) throws IOException, InterruptedException { + + // Key is the InodeID and path is the first encountered file path with this inodeID + // This will be later used to while writing to the tar. + Map<String, Path> copyFiles = new HashMap<>(); + + try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream = tar(destination)) { + RocksDBCheckpointDiffer differ = + getDbStore().getRocksDBCheckpointDiffer(); + DirectoryData sstBackupDir = new DirectoryData(tmpdir, + differ.getSSTBackupDir()); + DirectoryData compactionLogDir = new DirectoryData(tmpdir, + differ.getCompactionLogDir()); + + Set<String> sstFilesToExclude = new HashSet<>(toExcludeList); + + Map<Object, Set<Path>> hardlinkFiles = new HashMap<>(); + + boolean completed = getFilesForArchive(checkpoint, copyFiles, hardlinkFiles, + sstFilesToExclude, + includeSnapshotData(request), excludedList, + sstBackupDir, compactionLogDir); + writeFilesToArchive(copyFiles, hardlinkFiles, archiveOutputStream, + completed, checkpoint.getCheckpointLocation()); + + } catch (Exception e) { + LOG.error("got exception writing to archive " + e); + throw e; + } + } + + private void writeFilesToArchive(Map<String, Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, boolean completed, Path checkpointLocation) + throws IOException { + Map<String, Path> filteredCopyFiles = completed ? copyFiles : + copyFiles.entrySet().stream().filter(e -> e.getValue().getFileName().toString().toLowerCase().endsWith(".sst")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Path metaDirPath = getMetaDirPath(checkpointLocation); + long bytesWritten = 0L; + int filesWritten = 0; + long lastLoggedTime = Time.monotonicNow(); + + // Go through each of the files to be copied and add to archive. + for (Map.Entry<String, Path> entry : filteredCopyFiles.entrySet()) { + Path path = entry.getValue(); + // Confirm the data is in the right place. + if (!path.toString().startsWith(metaDirPath.toString())) { + throw new IOException("tarball file not in metadata dir: " + + path + ": " + metaDirPath); + } + + File file = path.toFile(); + if (!file.isDirectory()) { + filesWritten++; + } + bytesWritten += includeFile(file, entry.getKey(), archiveOutputStream); + // Log progress every 30 seconds + if (Time.monotonicNow() - lastLoggedTime >= 30000) { + LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...", + bytesWritten / (1024), filesWritten); + lastLoggedTime = Time.monotonicNow(); + } + } + + if (completed) { + // TODO: + // 1. take an OM db checkpoint , + // 2. hold a snapshotCache lock, + // 3. copy remaining files + // 4. create hardlink file + } + + } + + boolean getFilesForArchive(DBCheckpoint checkpoint, Map<String,Path> copyFiles, + Map<Object, Set<Path>> hardlinkFiles, + Set<String> sstFilesToExclude, boolean includeSnapshotData, + List<String> excluded, + DirectoryData sstBackupDir, + DirectoryData compactionLogDir) throws IOException { + maxTotalSstSize = + OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; + + // Tarball limits are not implemented for processes that don't + // include snapshots. Currently, this is just for recon. + if (!includeSnapshotData) { + maxTotalSstSize = Long.MAX_VALUE; + } + + AtomicLong copySize = new AtomicLong(0L); + + // Log estimated total data transferred on first request. + if (sstFilesToExclude.isEmpty()) { + logEstimatedTarballSize(checkpoint, includeSnapshotData); + } + + if (includeSnapshotData) { + Set<Path> snapshotPaths = getSnapshotDirs(checkpoint, true); + Path snapshotDir = getSnapshotDir(); + if (!processDir(snapshotDir, copyFiles, hardlinkFiles, sstFilesToExclude, snapshotPaths, excluded, copySize, + null)) { + return false; + } + + // Process the tmp sst compaction dir. + if (!processDir(sstBackupDir.getTmpDir().toPath(), copyFiles, hardlinkFiles, sstFilesToExclude, new HashSet<>(), + excluded, copySize, sstBackupDir.getOriginalDir().toPath())) { + return false; + } + + if (!processDir(compactionLogDir.getTmpDir().toPath(), copyFiles, hardlinkFiles, sstFilesToExclude, + new HashSet<>(), excluded, copySize, compactionLogDir.getOriginalDir().toPath())) { + return false; + } + } + + // Get the active fs files. + Path dir = checkpoint.getCheckpointLocation(); + if (!processDir(dir, copyFiles, hardlinkFiles, sstFilesToExclude, + new HashSet<>(), excluded, copySize, null)) { + return false; + } + + return true; + } + + private boolean processDir(Path dir, Map<String,Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + Set<String> sstFilesToExclude, Set<Path> snapshotPaths, List<String> excluded, AtomicLong copySize, + Path destDir) throws IOException { + + try (Stream<Path> files = Files.list(dir)) { + for (Path file : files.collect(Collectors.toList())) { + File f = file.toFile(); + if (f.isDirectory()) { + // Skip any unexpected snapshot files. + String parent = f.getParent(); + if (parent != null && parent.contains(OM_SNAPSHOT_CHECKPOINT_DIR) + && !snapshotPaths.contains(file)) { + LOG.debug("Skipping unneeded file: " + file); + continue; + } + + // Skip the real compaction log dir. + File compactionLogDir = new File(getDbStore(). + getRocksDBCheckpointDiffer().getCompactionLogDir()); + if (f.equals(compactionLogDir)) { + LOG.debug("Skipping compaction log dir"); + continue; + } + + // Skip the real compaction sst backup dir. + File sstBackupDir = new File(getDbStore(). + getRocksDBCheckpointDiffer().getSSTBackupDir()); + if (f.equals(sstBackupDir)) { + LOG.debug("Skipping sst backup dir"); + continue; + } + // findbugs nonsense + Path filename = file.getFileName(); + if (filename == null) { + throw new IOException("file has no filename:" + file); + } + + // Update the dest dir to point to the sub dir + Path destSubDir = null; + if (destDir != null) { + destSubDir = Paths.get(destDir.toString(), + filename.toString()); + } + if (!processDir(file, copyFiles, hardlinkFiles, sstFilesToExclude, + snapshotPaths, excluded, copySize, destSubDir)) { + return false; + } + } else { + long fileSize = processFile(file, copyFiles, hardlinkFiles, + sstFilesToExclude, excluded, destDir); + if (copySize.get() + fileSize > maxTotalSstSize) { + return false; + } else { + copySize.addAndGet(fileSize); + } + } + } + } + return true; + } + + private long processFile(Path file, Map<String,Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, Review Comment: Why Map<Object, Set<Path>>? ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckPointServletInodeBasedXfer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.util.Time; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import javax.servlet.http.HttpServletRequest; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; + +public class OMDBCheckPointServletInodeBasedXfer extends OMDBCheckpointServlet { + + @Override + public void writeDbDataToStream(DBCheckpoint checkpoint, HttpServletRequest request, OutputStream destination, + List<String> toExcludeList, List<String> excludedList, Path tmpdir) throws IOException, InterruptedException { Review Comment: What is the use of excludedList? ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckPointServletInodeBasedXfer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.util.Time; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import javax.servlet.http.HttpServletRequest; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; + +public class OMDBCheckPointServletInodeBasedXfer extends OMDBCheckpointServlet { + + @Override + public void writeDbDataToStream(DBCheckpoint checkpoint, HttpServletRequest request, OutputStream destination, + List<String> toExcludeList, List<String> excludedList, Path tmpdir) throws IOException, InterruptedException { + + // Key is the InodeID and path is the first encountered file path with this inodeID + // This will be later used to while writing to the tar. + Map<String, Path> copyFiles = new HashMap<>(); + + try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream = tar(destination)) { + RocksDBCheckpointDiffer differ = + getDbStore().getRocksDBCheckpointDiffer(); + DirectoryData sstBackupDir = new DirectoryData(tmpdir, + differ.getSSTBackupDir()); + DirectoryData compactionLogDir = new DirectoryData(tmpdir, + differ.getCompactionLogDir()); + + Set<String> sstFilesToExclude = new HashSet<>(toExcludeList); + + Map<Object, Set<Path>> hardlinkFiles = new HashMap<>(); + + boolean completed = getFilesForArchive(checkpoint, copyFiles, hardlinkFiles, + sstFilesToExclude, + includeSnapshotData(request), excludedList, + sstBackupDir, compactionLogDir); + writeFilesToArchive(copyFiles, hardlinkFiles, archiveOutputStream, + completed, checkpoint.getCheckpointLocation()); + + } catch (Exception e) { + LOG.error("got exception writing to archive " + e); + throw e; + } + } + + private void writeFilesToArchive(Map<String, Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, boolean completed, Path checkpointLocation) + throws IOException { + Map<String, Path> filteredCopyFiles = completed ? copyFiles : + copyFiles.entrySet().stream().filter(e -> e.getValue().getFileName().toString().toLowerCase().endsWith(".sst")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Path metaDirPath = getMetaDirPath(checkpointLocation); + long bytesWritten = 0L; + int filesWritten = 0; + long lastLoggedTime = Time.monotonicNow(); + + // Go through each of the files to be copied and add to archive. + for (Map.Entry<String, Path> entry : filteredCopyFiles.entrySet()) { + Path path = entry.getValue(); + // Confirm the data is in the right place. + if (!path.toString().startsWith(metaDirPath.toString())) { + throw new IOException("tarball file not in metadata dir: " + + path + ": " + metaDirPath); + } + + File file = path.toFile(); + if (!file.isDirectory()) { + filesWritten++; + } + bytesWritten += includeFile(file, entry.getKey(), archiveOutputStream); + // Log progress every 30 seconds + if (Time.monotonicNow() - lastLoggedTime >= 30000) { + LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...", + bytesWritten / (1024), filesWritten); + lastLoggedTime = Time.monotonicNow(); + } + } + + if (completed) { + // TODO: + // 1. take an OM db checkpoint , + // 2. hold a snapshotCache lock, + // 3. copy remaining files + // 4. create hardlink file + } + + } + + boolean getFilesForArchive(DBCheckpoint checkpoint, Map<String,Path> copyFiles, + Map<Object, Set<Path>> hardlinkFiles, + Set<String> sstFilesToExclude, boolean includeSnapshotData, + List<String> excluded, + DirectoryData sstBackupDir, + DirectoryData compactionLogDir) throws IOException { + maxTotalSstSize = + OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; + + // Tarball limits are not implemented for processes that don't + // include snapshots. Currently, this is just for recon. + if (!includeSnapshotData) { + maxTotalSstSize = Long.MAX_VALUE; + } + + AtomicLong copySize = new AtomicLong(0L); + + // Log estimated total data transferred on first request. + if (sstFilesToExclude.isEmpty()) { + logEstimatedTarballSize(checkpoint, includeSnapshotData); + } + + if (includeSnapshotData) { + Set<Path> snapshotPaths = getSnapshotDirs(checkpoint, true); + Path snapshotDir = getSnapshotDir(); + if (!processDir(snapshotDir, copyFiles, hardlinkFiles, sstFilesToExclude, snapshotPaths, excluded, copySize, + null)) { + return false; + } + + // Process the tmp sst compaction dir. + if (!processDir(sstBackupDir.getTmpDir().toPath(), copyFiles, hardlinkFiles, sstFilesToExclude, new HashSet<>(), + excluded, copySize, sstBackupDir.getOriginalDir().toPath())) { + return false; + } + + if (!processDir(compactionLogDir.getTmpDir().toPath(), copyFiles, hardlinkFiles, sstFilesToExclude, + new HashSet<>(), excluded, copySize, compactionLogDir.getOriginalDir().toPath())) { + return false; + } + } + + // Get the active fs files. + Path dir = checkpoint.getCheckpointLocation(); + if (!processDir(dir, copyFiles, hardlinkFiles, sstFilesToExclude, + new HashSet<>(), excluded, copySize, null)) { + return false; + } + + return true; + } + + private boolean processDir(Path dir, Map<String,Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + Set<String> sstFilesToExclude, Set<Path> snapshotPaths, List<String> excluded, AtomicLong copySize, + Path destDir) throws IOException { + + try (Stream<Path> files = Files.list(dir)) { + for (Path file : files.collect(Collectors.toList())) { + File f = file.toFile(); + if (f.isDirectory()) { + // Skip any unexpected snapshot files. + String parent = f.getParent(); + if (parent != null && parent.contains(OM_SNAPSHOT_CHECKPOINT_DIR) + && !snapshotPaths.contains(file)) { + LOG.debug("Skipping unneeded file: " + file); + continue; + } + + // Skip the real compaction log dir. + File compactionLogDir = new File(getDbStore(). + getRocksDBCheckpointDiffer().getCompactionLogDir()); + if (f.equals(compactionLogDir)) { + LOG.debug("Skipping compaction log dir"); + continue; + } + + // Skip the real compaction sst backup dir. + File sstBackupDir = new File(getDbStore(). + getRocksDBCheckpointDiffer().getSSTBackupDir()); + if (f.equals(sstBackupDir)) { + LOG.debug("Skipping sst backup dir"); + continue; + } + // findbugs nonsense + Path filename = file.getFileName(); + if (filename == null) { + throw new IOException("file has no filename:" + file); + } + + // Update the dest dir to point to the sub dir + Path destSubDir = null; + if (destDir != null) { + destSubDir = Paths.get(destDir.toString(), + filename.toString()); + } + if (!processDir(file, copyFiles, hardlinkFiles, sstFilesToExclude, + snapshotPaths, excluded, copySize, destSubDir)) { + return false; + } + } else { + long fileSize = processFile(file, copyFiles, hardlinkFiles, + sstFilesToExclude, excluded, destDir); + if (copySize.get() + fileSize > maxTotalSstSize) { + return false; + } else { + copySize.addAndGet(fileSize); + } + } + } + } + return true; + } + + private long processFile(Path file, Map<String,Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + Set<String> sstFilesToExclude, List<String> excluded, Path destDir) throws IOException{ + long fileSize = 0; + Path destFile = file; + + // findbugs nonsense + Path fileNamePath = file.getFileName(); + String fileInode = OmSnapshotUtils.getINode(file).toString(); Review Comment: > /** > * Returns an object that uniquely identifies the given file, or {@code > * null} if a file key is not available. On some platforms or file systems > * it is possible to use an identifier, or a combination of identifiers to > * uniquely identify a file. Such identifiers are important for operations > * such as file tree traversal in file systems that support <a > * href="../package-summary.html#links">symbolic links</a> or file systems > * that allow a file to be an entry in more than one directory. On UNIX file > * systems, for example, the <em>device ID</em> and <em>inode</em> are > * commonly used for such purposes. > * > * <p> The file key returned by this method can only be guaranteed to be > * unique if the file system and files remain static. Whether a file system > * re-uses identifiers after a file is deleted is implementation dependent and > * therefore unspecified. > * > * <p> File keys returned by this method can be compared for equality and are > * suitable for use in collections. If the file system and files remain static, > * and two files are the {@link java.nio.file.Files#isSameFile same} with > * non-{@code null} file keys, then their file keys are equal. > * > * @return an object that uniquely identifies the given file, or {@code null} > * > * @see java.nio.file.Files#walkFileTree > */ > Object fileKey(); the key should be a combination of fileInode + mTime of the file ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckPointServletInodeBasedXfer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.util.Time; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import javax.servlet.http.HttpServletRequest; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; + +public class OMDBCheckPointServletInodeBasedXfer extends OMDBCheckpointServlet { + + @Override + public void writeDbDataToStream(DBCheckpoint checkpoint, HttpServletRequest request, OutputStream destination, + List<String> toExcludeList, List<String> excludedList, Path tmpdir) throws IOException, InterruptedException { + + // Key is the InodeID and path is the first encountered file path with this inodeID + // This will be later used to while writing to the tar. + Map<String, Path> copyFiles = new HashMap<>(); + + try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream = tar(destination)) { + RocksDBCheckpointDiffer differ = + getDbStore().getRocksDBCheckpointDiffer(); + DirectoryData sstBackupDir = new DirectoryData(tmpdir, + differ.getSSTBackupDir()); + DirectoryData compactionLogDir = new DirectoryData(tmpdir, + differ.getCompactionLogDir()); + + Set<String> sstFilesToExclude = new HashSet<>(toExcludeList); + + Map<Object, Set<Path>> hardlinkFiles = new HashMap<>(); + + boolean completed = getFilesForArchive(checkpoint, copyFiles, hardlinkFiles, + sstFilesToExclude, + includeSnapshotData(request), excludedList, + sstBackupDir, compactionLogDir); + writeFilesToArchive(copyFiles, hardlinkFiles, archiveOutputStream, + completed, checkpoint.getCheckpointLocation()); + + } catch (Exception e) { + LOG.error("got exception writing to archive " + e); + throw e; + } + } + + private void writeFilesToArchive(Map<String, Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, boolean completed, Path checkpointLocation) + throws IOException { + Map<String, Path> filteredCopyFiles = completed ? copyFiles : + copyFiles.entrySet().stream().filter(e -> e.getValue().getFileName().toString().toLowerCase().endsWith(".sst")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Path metaDirPath = getMetaDirPath(checkpointLocation); + long bytesWritten = 0L; + int filesWritten = 0; + long lastLoggedTime = Time.monotonicNow(); + + // Go through each of the files to be copied and add to archive. + for (Map.Entry<String, Path> entry : filteredCopyFiles.entrySet()) { + Path path = entry.getValue(); + // Confirm the data is in the right place. + if (!path.toString().startsWith(metaDirPath.toString())) { + throw new IOException("tarball file not in metadata dir: " + + path + ": " + metaDirPath); + } + + File file = path.toFile(); + if (!file.isDirectory()) { + filesWritten++; + } + bytesWritten += includeFile(file, entry.getKey(), archiveOutputStream); + // Log progress every 30 seconds + if (Time.monotonicNow() - lastLoggedTime >= 30000) { + LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...", + bytesWritten / (1024), filesWritten); + lastLoggedTime = Time.monotonicNow(); + } + } + + if (completed) { + // TODO: + // 1. take an OM db checkpoint , + // 2. hold a snapshotCache lock, + // 3. copy remaining files + // 4. create hardlink file + } + + } + + boolean getFilesForArchive(DBCheckpoint checkpoint, Map<String,Path> copyFiles, + Map<Object, Set<Path>> hardlinkFiles, + Set<String> sstFilesToExclude, boolean includeSnapshotData, + List<String> excluded, + DirectoryData sstBackupDir, + DirectoryData compactionLogDir) throws IOException { + maxTotalSstSize = + OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; + + // Tarball limits are not implemented for processes that don't + // include snapshots. Currently, this is just for recon. + if (!includeSnapshotData) { + maxTotalSstSize = Long.MAX_VALUE; + } + + AtomicLong copySize = new AtomicLong(0L); Review Comment: We can just do with one variable copySize could be just initialized with OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT or MAX_VALUE and can be decremented ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckPointServletInodeBasedXfer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.util.Time; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import javax.servlet.http.HttpServletRequest; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; + +public class OMDBCheckPointServletInodeBasedXfer extends OMDBCheckpointServlet { + + @Override + public void writeDbDataToStream(DBCheckpoint checkpoint, HttpServletRequest request, OutputStream destination, + List<String> toExcludeList, List<String> excludedList, Path tmpdir) throws IOException, InterruptedException { + + // Key is the InodeID and path is the first encountered file path with this inodeID + // This will be later used to while writing to the tar. + Map<String, Path> copyFiles = new HashMap<>(); + + try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream = tar(destination)) { + RocksDBCheckpointDiffer differ = + getDbStore().getRocksDBCheckpointDiffer(); + DirectoryData sstBackupDir = new DirectoryData(tmpdir, + differ.getSSTBackupDir()); + DirectoryData compactionLogDir = new DirectoryData(tmpdir, + differ.getCompactionLogDir()); + + Set<String> sstFilesToExclude = new HashSet<>(toExcludeList); + + Map<Object, Set<Path>> hardlinkFiles = new HashMap<>(); + + boolean completed = getFilesForArchive(checkpoint, copyFiles, hardlinkFiles, Review Comment: We don't need 2 stages we can just write to the tar ball as and when we come across a new file. The could have been deleted by the time ls finishes. We would need to take a hardlink of the file in iteration and then write it to the tarball. This is to ensure it doesn't get deleted. ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckPointServletInodeBasedXfer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.util.Time; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import javax.servlet.http.HttpServletRequest; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; + +public class OMDBCheckPointServletInodeBasedXfer extends OMDBCheckpointServlet { + + @Override + public void writeDbDataToStream(DBCheckpoint checkpoint, HttpServletRequest request, OutputStream destination, + List<String> toExcludeList, List<String> excludedList, Path tmpdir) throws IOException, InterruptedException { + + // Key is the InodeID and path is the first encountered file path with this inodeID + // This will be later used to while writing to the tar. + Map<String, Path> copyFiles = new HashMap<>(); + + try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream = tar(destination)) { + RocksDBCheckpointDiffer differ = + getDbStore().getRocksDBCheckpointDiffer(); + DirectoryData sstBackupDir = new DirectoryData(tmpdir, + differ.getSSTBackupDir()); + DirectoryData compactionLogDir = new DirectoryData(tmpdir, + differ.getCompactionLogDir()); + + Set<String> sstFilesToExclude = new HashSet<>(toExcludeList); Review Comment: why take a list input and convert to Set? Rather just take Set as an input ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckPointServletInodeBasedXfer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.util.Time; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import javax.servlet.http.HttpServletRequest; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; + +public class OMDBCheckPointServletInodeBasedXfer extends OMDBCheckpointServlet { + + @Override + public void writeDbDataToStream(DBCheckpoint checkpoint, HttpServletRequest request, OutputStream destination, + List<String> toExcludeList, List<String> excludedList, Path tmpdir) throws IOException, InterruptedException { + + // Key is the InodeID and path is the first encountered file path with this inodeID + // This will be later used to while writing to the tar. + Map<String, Path> copyFiles = new HashMap<>(); + + try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream = tar(destination)) { + RocksDBCheckpointDiffer differ = + getDbStore().getRocksDBCheckpointDiffer(); + DirectoryData sstBackupDir = new DirectoryData(tmpdir, + differ.getSSTBackupDir()); + DirectoryData compactionLogDir = new DirectoryData(tmpdir, + differ.getCompactionLogDir()); + + Set<String> sstFilesToExclude = new HashSet<>(toExcludeList); + + Map<Object, Set<Path>> hardlinkFiles = new HashMap<>(); + + boolean completed = getFilesForArchive(checkpoint, copyFiles, hardlinkFiles, + sstFilesToExclude, + includeSnapshotData(request), excludedList, + sstBackupDir, compactionLogDir); + writeFilesToArchive(copyFiles, hardlinkFiles, archiveOutputStream, + completed, checkpoint.getCheckpointLocation()); + + } catch (Exception e) { + LOG.error("got exception writing to archive " + e); + throw e; + } + } + + private void writeFilesToArchive(Map<String, Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, boolean completed, Path checkpointLocation) + throws IOException { + Map<String, Path> filteredCopyFiles = completed ? copyFiles : + copyFiles.entrySet().stream().filter(e -> e.getValue().getFileName().toString().toLowerCase().endsWith(".sst")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Path metaDirPath = getMetaDirPath(checkpointLocation); + long bytesWritten = 0L; + int filesWritten = 0; + long lastLoggedTime = Time.monotonicNow(); + + // Go through each of the files to be copied and add to archive. + for (Map.Entry<String, Path> entry : filteredCopyFiles.entrySet()) { + Path path = entry.getValue(); + // Confirm the data is in the right place. + if (!path.toString().startsWith(metaDirPath.toString())) { + throw new IOException("tarball file not in metadata dir: " + + path + ": " + metaDirPath); + } + + File file = path.toFile(); + if (!file.isDirectory()) { + filesWritten++; + } + bytesWritten += includeFile(file, entry.getKey(), archiveOutputStream); + // Log progress every 30 seconds + if (Time.monotonicNow() - lastLoggedTime >= 30000) { + LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...", + bytesWritten / (1024), filesWritten); + lastLoggedTime = Time.monotonicNow(); + } + } + + if (completed) { + // TODO: + // 1. take an OM db checkpoint , + // 2. hold a snapshotCache lock, + // 3. copy remaining files + // 4. create hardlink file + } + + } + + boolean getFilesForArchive(DBCheckpoint checkpoint, Map<String,Path> copyFiles, Review Comment: We should ideally pass the OmMetadataManager class here ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java: ########## @@ -96,11 +96,11 @@ */ public class OMDBCheckpointServlet extends DBCheckpointServlet { - private static final Logger LOG = + protected static final Logger LOG = LoggerFactory.getLogger(OMDBCheckpointServlet.class); private static final long serialVersionUID = 1L; private transient BootstrapStateHandler.Lock lock; - private long maxTotalSstSize = 0; + protected long maxTotalSstSize = 0; Review Comment: Move this inside getFilesForArchive() function ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java: ########## @@ -96,11 +96,11 @@ */ public class OMDBCheckpointServlet extends DBCheckpointServlet { - private static final Logger LOG = + protected static final Logger LOG = LoggerFactory.getLogger(OMDBCheckpointServlet.class); private static final long serialVersionUID = 1L; private transient BootstrapStateHandler.Lock lock; - private long maxTotalSstSize = 0; + protected long maxTotalSstSize = 0; Review Comment: This is wrong. We can have multiple requests coming in parallel. Why do we want to make serving this request sequential? ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckPointServletInodeBasedXfer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.util.Time; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import javax.servlet.http.HttpServletRequest; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; + +public class OMDBCheckPointServletInodeBasedXfer extends OMDBCheckpointServlet { + + @Override + public void writeDbDataToStream(DBCheckpoint checkpoint, HttpServletRequest request, OutputStream destination, Review Comment: We need not always work with a checkpoint as per the algo in the doc. Let us not extend the class since the approach diverges from the previous implementation. I just feel we are trying to force fit things here ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckPointServletInodeBasedXfer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.util.Time; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import javax.servlet.http.HttpServletRequest; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; + +public class OMDBCheckPointServletInodeBasedXfer extends OMDBCheckpointServlet { + + @Override + public void writeDbDataToStream(DBCheckpoint checkpoint, HttpServletRequest request, OutputStream destination, + List<String> toExcludeList, List<String> excludedList, Path tmpdir) throws IOException, InterruptedException { + + // Key is the InodeID and path is the first encountered file path with this inodeID + // This will be later used to while writing to the tar. + Map<String, Path> copyFiles = new HashMap<>(); + + try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream = tar(destination)) { + RocksDBCheckpointDiffer differ = + getDbStore().getRocksDBCheckpointDiffer(); + DirectoryData sstBackupDir = new DirectoryData(tmpdir, + differ.getSSTBackupDir()); + DirectoryData compactionLogDir = new DirectoryData(tmpdir, Review Comment: This should be inside getFilesForArchive. Just like other directories this is one more directory. Don't have to use the same DirectoryData it is not relevant anymore ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckPointServletInodeBasedXfer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.util.Time; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import javax.servlet.http.HttpServletRequest; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; + +public class OMDBCheckPointServletInodeBasedXfer extends OMDBCheckpointServlet { + + @Override + public void writeDbDataToStream(DBCheckpoint checkpoint, HttpServletRequest request, OutputStream destination, + List<String> toExcludeList, List<String> excludedList, Path tmpdir) throws IOException, InterruptedException { + + // Key is the InodeID and path is the first encountered file path with this inodeID + // This will be later used to while writing to the tar. + Map<String, Path> copyFiles = new HashMap<>(); + + try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream = tar(destination)) { + RocksDBCheckpointDiffer differ = + getDbStore().getRocksDBCheckpointDiffer(); + DirectoryData sstBackupDir = new DirectoryData(tmpdir, + differ.getSSTBackupDir()); + DirectoryData compactionLogDir = new DirectoryData(tmpdir, + differ.getCompactionLogDir()); + + Set<String> sstFilesToExclude = new HashSet<>(toExcludeList); + + Map<Object, Set<Path>> hardlinkFiles = new HashMap<>(); + + boolean completed = getFilesForArchive(checkpoint, copyFiles, hardlinkFiles, + sstFilesToExclude, + includeSnapshotData(request), excludedList, + sstBackupDir, compactionLogDir); + writeFilesToArchive(copyFiles, hardlinkFiles, archiveOutputStream, + completed, checkpoint.getCheckpointLocation()); + + } catch (Exception e) { + LOG.error("got exception writing to archive " + e); + throw e; + } + } + + private void writeFilesToArchive(Map<String, Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, boolean completed, Path checkpointLocation) + throws IOException { + Map<String, Path> filteredCopyFiles = completed ? copyFiles : + copyFiles.entrySet().stream().filter(e -> e.getValue().getFileName().toString().toLowerCase().endsWith(".sst")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Path metaDirPath = getMetaDirPath(checkpointLocation); + long bytesWritten = 0L; + int filesWritten = 0; + long lastLoggedTime = Time.monotonicNow(); + + // Go through each of the files to be copied and add to archive. + for (Map.Entry<String, Path> entry : filteredCopyFiles.entrySet()) { + Path path = entry.getValue(); + // Confirm the data is in the right place. + if (!path.toString().startsWith(metaDirPath.toString())) { + throw new IOException("tarball file not in metadata dir: " + + path + ": " + metaDirPath); + } + + File file = path.toFile(); + if (!file.isDirectory()) { + filesWritten++; + } + bytesWritten += includeFile(file, entry.getKey(), archiveOutputStream); + // Log progress every 30 seconds + if (Time.monotonicNow() - lastLoggedTime >= 30000) { + LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...", + bytesWritten / (1024), filesWritten); + lastLoggedTime = Time.monotonicNow(); + } + } + + if (completed) { + // TODO: + // 1. take an OM db checkpoint , + // 2. hold a snapshotCache lock, + // 3. copy remaining files + // 4. create hardlink file + } + + } + + boolean getFilesForArchive(DBCheckpoint checkpoint, Map<String,Path> copyFiles, + Map<Object, Set<Path>> hardlinkFiles, + Set<String> sstFilesToExclude, boolean includeSnapshotData, + List<String> excluded, + DirectoryData sstBackupDir, + DirectoryData compactionLogDir) throws IOException { + maxTotalSstSize = + OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; + + // Tarball limits are not implemented for processes that don't + // include snapshots. Currently, this is just for recon. + if (!includeSnapshotData) { + maxTotalSstSize = Long.MAX_VALUE; + } + + AtomicLong copySize = new AtomicLong(0L); + + // Log estimated total data transferred on first request. + if (sstFilesToExclude.isEmpty()) { + logEstimatedTarballSize(checkpoint, includeSnapshotData); + } + + if (includeSnapshotData) { + Set<Path> snapshotPaths = getSnapshotDirs(checkpoint, true); + Path snapshotDir = getSnapshotDir(); + if (!processDir(snapshotDir, copyFiles, hardlinkFiles, sstFilesToExclude, snapshotPaths, excluded, copySize, + null)) { + return false; + } + + // Process the tmp sst compaction dir. + if (!processDir(sstBackupDir.getTmpDir().toPath(), copyFiles, hardlinkFiles, sstFilesToExclude, new HashSet<>(), + excluded, copySize, sstBackupDir.getOriginalDir().toPath())) { + return false; + } + + if (!processDir(compactionLogDir.getTmpDir().toPath(), copyFiles, hardlinkFiles, sstFilesToExclude, + new HashSet<>(), excluded, copySize, compactionLogDir.getOriginalDir().toPath())) { + return false; + } + } + + // Get the active fs files. + Path dir = checkpoint.getCheckpointLocation(); + if (!processDir(dir, copyFiles, hardlinkFiles, sstFilesToExclude, + new HashSet<>(), excluded, copySize, null)) { + return false; + } + + return true; + } + + private boolean processDir(Path dir, Map<String,Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + Set<String> sstFilesToExclude, Set<Path> snapshotPaths, List<String> excluded, AtomicLong copySize, + Path destDir) throws IOException { + + try (Stream<Path> files = Files.list(dir)) { + for (Path file : files.collect(Collectors.toList())) { + File f = file.toFile(); + if (f.isDirectory()) { + // Skip any unexpected snapshot files. + String parent = f.getParent(); + if (parent != null && parent.contains(OM_SNAPSHOT_CHECKPOINT_DIR) + && !snapshotPaths.contains(file)) { + LOG.debug("Skipping unneeded file: " + file); + continue; + } + + // Skip the real compaction log dir. + File compactionLogDir = new File(getDbStore(). Review Comment: This function should just do a flat ls operation and not an ls -r operation. Please change this function ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckPointServletInodeBasedXfer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.util.Time; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import javax.servlet.http.HttpServletRequest; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; + +public class OMDBCheckPointServletInodeBasedXfer extends OMDBCheckpointServlet { + + @Override + public void writeDbDataToStream(DBCheckpoint checkpoint, HttpServletRequest request, OutputStream destination, + List<String> toExcludeList, List<String> excludedList, Path tmpdir) throws IOException, InterruptedException { + + // Key is the InodeID and path is the first encountered file path with this inodeID + // This will be later used to while writing to the tar. + Map<String, Path> copyFiles = new HashMap<>(); + + try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream = tar(destination)) { + RocksDBCheckpointDiffer differ = + getDbStore().getRocksDBCheckpointDiffer(); + DirectoryData sstBackupDir = new DirectoryData(tmpdir, + differ.getSSTBackupDir()); + DirectoryData compactionLogDir = new DirectoryData(tmpdir, + differ.getCompactionLogDir()); + + Set<String> sstFilesToExclude = new HashSet<>(toExcludeList); + + Map<Object, Set<Path>> hardlinkFiles = new HashMap<>(); + + boolean completed = getFilesForArchive(checkpoint, copyFiles, hardlinkFiles, + sstFilesToExclude, + includeSnapshotData(request), excludedList, + sstBackupDir, compactionLogDir); + writeFilesToArchive(copyFiles, hardlinkFiles, archiveOutputStream, + completed, checkpoint.getCheckpointLocation()); + + } catch (Exception e) { + LOG.error("got exception writing to archive " + e); + throw e; + } + } + + private void writeFilesToArchive(Map<String, Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, boolean completed, Path checkpointLocation) + throws IOException { + Map<String, Path> filteredCopyFiles = completed ? copyFiles : + copyFiles.entrySet().stream().filter(e -> e.getValue().getFileName().toString().toLowerCase().endsWith(".sst")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Path metaDirPath = getMetaDirPath(checkpointLocation); + long bytesWritten = 0L; + int filesWritten = 0; + long lastLoggedTime = Time.monotonicNow(); + + // Go through each of the files to be copied and add to archive. + for (Map.Entry<String, Path> entry : filteredCopyFiles.entrySet()) { + Path path = entry.getValue(); + // Confirm the data is in the right place. + if (!path.toString().startsWith(metaDirPath.toString())) { + throw new IOException("tarball file not in metadata dir: " + + path + ": " + metaDirPath); + } + + File file = path.toFile(); + if (!file.isDirectory()) { + filesWritten++; + } + bytesWritten += includeFile(file, entry.getKey(), archiveOutputStream); + // Log progress every 30 seconds + if (Time.monotonicNow() - lastLoggedTime >= 30000) { + LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...", + bytesWritten / (1024), filesWritten); + lastLoggedTime = Time.monotonicNow(); + } + } + + if (completed) { + // TODO: + // 1. take an OM db checkpoint , + // 2. hold a snapshotCache lock, + // 3. copy remaining files + // 4. create hardlink file + } + + } + + boolean getFilesForArchive(DBCheckpoint checkpoint, Map<String,Path> copyFiles, + Map<Object, Set<Path>> hardlinkFiles, + Set<String> sstFilesToExclude, boolean includeSnapshotData, + List<String> excluded, + DirectoryData sstBackupDir, + DirectoryData compactionLogDir) throws IOException { + maxTotalSstSize = + OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; + + // Tarball limits are not implemented for processes that don't + // include snapshots. Currently, this is just for recon. + if (!includeSnapshotData) { + maxTotalSstSize = Long.MAX_VALUE; + } + + AtomicLong copySize = new AtomicLong(0L); + + // Log estimated total data transferred on first request. + if (sstFilesToExclude.isEmpty()) { + logEstimatedTarballSize(checkpoint, includeSnapshotData); + } + + if (includeSnapshotData) { + Set<Path> snapshotPaths = getSnapshotDirs(checkpoint, true); Review Comment: For getting the snapshot directories create a new object of SnapshotChainManager from the omMetadataManager object. ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckPointServletInodeBasedXfer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.util.Time; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import javax.servlet.http.HttpServletRequest; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; + +public class OMDBCheckPointServletInodeBasedXfer extends OMDBCheckpointServlet { + + @Override + public void writeDbDataToStream(DBCheckpoint checkpoint, HttpServletRequest request, OutputStream destination, + List<String> toExcludeList, List<String> excludedList, Path tmpdir) throws IOException, InterruptedException { + + // Key is the InodeID and path is the first encountered file path with this inodeID + // This will be later used to while writing to the tar. + Map<String, Path> copyFiles = new HashMap<>(); + + try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream = tar(destination)) { + RocksDBCheckpointDiffer differ = + getDbStore().getRocksDBCheckpointDiffer(); + DirectoryData sstBackupDir = new DirectoryData(tmpdir, + differ.getSSTBackupDir()); + DirectoryData compactionLogDir = new DirectoryData(tmpdir, + differ.getCompactionLogDir()); + + Set<String> sstFilesToExclude = new HashSet<>(toExcludeList); + + Map<Object, Set<Path>> hardlinkFiles = new HashMap<>(); + + boolean completed = getFilesForArchive(checkpoint, copyFiles, hardlinkFiles, + sstFilesToExclude, + includeSnapshotData(request), excludedList, + sstBackupDir, compactionLogDir); + writeFilesToArchive(copyFiles, hardlinkFiles, archiveOutputStream, + completed, checkpoint.getCheckpointLocation()); + + } catch (Exception e) { + LOG.error("got exception writing to archive " + e); + throw e; + } + } + + private void writeFilesToArchive(Map<String, Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, boolean completed, Path checkpointLocation) + throws IOException { + Map<String, Path> filteredCopyFiles = completed ? copyFiles : + copyFiles.entrySet().stream().filter(e -> e.getValue().getFileName().toString().toLowerCase().endsWith(".sst")) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Path metaDirPath = getMetaDirPath(checkpointLocation); + long bytesWritten = 0L; + int filesWritten = 0; + long lastLoggedTime = Time.monotonicNow(); + + // Go through each of the files to be copied and add to archive. + for (Map.Entry<String, Path> entry : filteredCopyFiles.entrySet()) { + Path path = entry.getValue(); + // Confirm the data is in the right place. + if (!path.toString().startsWith(metaDirPath.toString())) { + throw new IOException("tarball file not in metadata dir: " + + path + ": " + metaDirPath); + } + + File file = path.toFile(); + if (!file.isDirectory()) { + filesWritten++; + } + bytesWritten += includeFile(file, entry.getKey(), archiveOutputStream); + // Log progress every 30 seconds + if (Time.monotonicNow() - lastLoggedTime >= 30000) { + LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...", + bytesWritten / (1024), filesWritten); + lastLoggedTime = Time.monotonicNow(); + } + } + + if (completed) { + // TODO: + // 1. take an OM db checkpoint , + // 2. hold a snapshotCache lock, + // 3. copy remaining files + // 4. create hardlink file + } + + } + + boolean getFilesForArchive(DBCheckpoint checkpoint, Map<String,Path> copyFiles, + Map<Object, Set<Path>> hardlinkFiles, + Set<String> sstFilesToExclude, boolean includeSnapshotData, + List<String> excluded, + DirectoryData sstBackupDir, + DirectoryData compactionLogDir) throws IOException { + maxTotalSstSize = + OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; + + // Tarball limits are not implemented for processes that don't + // include snapshots. Currently, this is just for recon. + if (!includeSnapshotData) { + maxTotalSstSize = Long.MAX_VALUE; + } + + AtomicLong copySize = new AtomicLong(0L); + + // Log estimated total data transferred on first request. + if (sstFilesToExclude.isEmpty()) { + logEstimatedTarballSize(checkpoint, includeSnapshotData); + } + + if (includeSnapshotData) { + Set<Path> snapshotPaths = getSnapshotDirs(checkpoint, true); + Path snapshotDir = getSnapshotDir(); + if (!processDir(snapshotDir, copyFiles, hardlinkFiles, sstFilesToExclude, snapshotPaths, excluded, copySize, + null)) { + return false; + } + + // Process the tmp sst compaction dir. + if (!processDir(sstBackupDir.getTmpDir().toPath(), copyFiles, hardlinkFiles, sstFilesToExclude, new HashSet<>(), + excluded, copySize, sstBackupDir.getOriginalDir().toPath())) { + return false; + } + + if (!processDir(compactionLogDir.getTmpDir().toPath(), copyFiles, hardlinkFiles, sstFilesToExclude, + new HashSet<>(), excluded, copySize, compactionLogDir.getOriginalDir().toPath())) { + return false; + } + } + + // Get the active fs files. + Path dir = checkpoint.getCheckpointLocation(); + if (!processDir(dir, copyFiles, hardlinkFiles, sstFilesToExclude, + new HashSet<>(), excluded, copySize, null)) { + return false; + } + + return true; + } + + private boolean processDir(Path dir, Map<String,Path> copyFiles, Map<Object, Set<Path>> hardlinkFiles, + Set<String> sstFilesToExclude, Set<Path> snapshotPaths, List<String> excluded, AtomicLong copySize, + Path destDir) throws IOException { + + try (Stream<Path> files = Files.list(dir)) { + for (Path file : files.collect(Collectors.toList())) { + File f = file.toFile(); + if (f.isDirectory()) { + // Skip any unexpected snapshot files. + String parent = f.getParent(); + if (parent != null && parent.contains(OM_SNAPSHOT_CHECKPOINT_DIR) + && !snapshotPaths.contains(file)) { + LOG.debug("Skipping unneeded file: " + file); + continue; + } + + // Skip the real compaction log dir. + File compactionLogDir = new File(getDbStore(). Review Comment: Let us just figure out the directories before hand. SnapshotCheckpointDirectories + Backup SST Directory + CompactionLogDB directory + AOS rocksdb directory. ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckPointServletInodeBasedXfer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.util.Time; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import javax.servlet.http.HttpServletRequest; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; + +public class OMDBCheckPointServletInodeBasedXfer extends OMDBCheckpointServlet { + + @Override + public void writeDbDataToStream(DBCheckpoint checkpoint, HttpServletRequest request, OutputStream destination, + List<String> toExcludeList, List<String> excludedList, Path tmpdir) throws IOException, InterruptedException { + + // Key is the InodeID and path is the first encountered file path with this inodeID + // This will be later used to while writing to the tar. + Map<String, Path> copyFiles = new HashMap<>(); + + try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream = tar(destination)) { + RocksDBCheckpointDiffer differ = + getDbStore().getRocksDBCheckpointDiffer(); + DirectoryData sstBackupDir = new DirectoryData(tmpdir, + differ.getSSTBackupDir()); + DirectoryData compactionLogDir = new DirectoryData(tmpdir, + differ.getCompactionLogDir()); + + Set<String> sstFilesToExclude = new HashSet<>(toExcludeList); + + Map<Object, Set<Path>> hardlinkFiles = new HashMap<>(); + + boolean completed = getFilesForArchive(checkpoint, copyFiles, hardlinkFiles, Review Comment: When we have not taken snapshot cache lock. We should just copy files with extension sst. Maybe we can pass it in this function ########## hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckPointServletInodeBasedXfer.java: ########## @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om; + +import org.apache.commons.compress.archivers.ArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.hadoop.hdds.utils.db.DBCheckpoint; +import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; +import org.apache.hadoop.util.Time; +import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; +import javax.servlet.http.HttpServletRequest; +import java.io.File; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.hadoop.hdds.utils.Archiver.includeFile; +import static org.apache.hadoop.hdds.utils.Archiver.tar; +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; +import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; + +public class OMDBCheckPointServletInodeBasedXfer extends OMDBCheckpointServlet { + + @Override + public void writeDbDataToStream(DBCheckpoint checkpoint, HttpServletRequest request, OutputStream destination, + List<String> toExcludeList, List<String> excludedList, Path tmpdir) throws IOException, InterruptedException { + + // Key is the InodeID and path is the first encountered file path with this inodeID + // This will be later used to while writing to the tar. + Map<String, Path> copyFiles = new HashMap<>(); + + try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream = tar(destination)) { + RocksDBCheckpointDiffer differ = + getDbStore().getRocksDBCheckpointDiffer(); + DirectoryData sstBackupDir = new DirectoryData(tmpdir, + differ.getSSTBackupDir()); + DirectoryData compactionLogDir = new DirectoryData(tmpdir, Review Comment: Why does this have to be here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@ozone.apache.org For additional commands, e-mail: issues-h...@ozone.apache.org