This is an automated email from the ASF dual-hosted git repository. weichiu pushed a commit to branch ozone-2.1 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit b2e8d0172e4d597cc1a005a864f243a67970326c Author: Wei-Chiu Chuang <[email protected]> AuthorDate: Wed Nov 19 11:35:13 2025 -0800 Revert "HDDS-13500. Transfer Non SST Files in the last batch of the tarball transfer. (#8857)" This reverts commit 2870126afbf96514a210b59204977a0e0d064e9d. Conflicts: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java Change-Id: Ib9141efa7e55b0f71520da8e16d32f5cd9b63331 --- .../TestOMDbCheckpointServletInodeBasedXfer.java | 73 +--------------------- .../hadoop/ozone/om/TestOMRatisSnapshots.java | 13 ++-- .../om/OMDBCheckpointServletInodeBasedXfer.java | 45 ++++--------- 3 files changed, 17 insertions(+), 114 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java index ec2080e9cf4..099c281db97 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java @@ -34,17 +34,13 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.io.BufferedReader; @@ -64,7 +60,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -73,8 +68,6 @@ import javax.servlet.WriteListener; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.client.ReplicationConfig; @@ -82,7 +75,6 @@ import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.utils.Archiver; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.hdds.utils.db.DBCheckpoint; import org.apache.hadoop.hdds.utils.db.DBStore; @@ -103,9 +95,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.MockedStatic; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.DBOptions; @@ -181,10 +170,7 @@ public void write(int b) throws IOException { omDbCheckpointServletMock = mock(OMDBCheckpointServletInodeBasedXfer.class); - BootstrapStateHandler.Lock lock = null; - if (om != null) { - lock = new OMDBCheckpointServlet.Lock(om); - } + BootstrapStateHandler.Lock lock = new OMDBCheckpointServlet.Lock(om); doCallRealMethod().when(omDbCheckpointServletMock).init(); assertNull(doCallRealMethod().when(omDbCheckpointServletMock).getDbStore()); @@ -210,8 +196,6 @@ public void write(int b) throws IOException { doCallRealMethod().when(omDbCheckpointServletMock) .writeDbDataToStream(any(), any(), any(), any(), any()); - doCallRealMethod().when(omDbCheckpointServletMock) - .writeDBToArchive(any(), any(), any(), any(), any(), any(), anyBoolean()); when(omDbCheckpointServletMock.getBootstrapStateLock()) .thenReturn(lock); @@ -383,61 +367,6 @@ public void testSnapshotDBConsistency() throws Exception { assertNotNull(value); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testWriteDBToArchive(boolean expectOnlySstFiles) throws Exception { - setupMocks(); - Path dbDir = folder.resolve("db_data"); - Files.createDirectories(dbDir); - // Create dummy files: one SST, one non-SST - Path sstFile = dbDir.resolve("test.sst"); - Files.write(sstFile, "sst content".getBytes(StandardCharsets.UTF_8)); // Write some content to make it non-empty - - Path nonSstFile = dbDir.resolve("test.log"); - Files.write(nonSstFile, "log content".getBytes(StandardCharsets.UTF_8)); - Set<String> sstFilesToExclude = new HashSet<>(); - AtomicLong maxTotalSstSize = new AtomicLong(1000000); // Sufficient size - Map<String, String> hardLinkFileMap = new java.util.HashMap<>(); - Path tmpDir = folder.resolve("tmp"); - Files.createDirectories(tmpDir); - TarArchiveOutputStream mockArchiveOutputStream = mock(TarArchiveOutputStream.class); - List<String> fileNames = new ArrayList<>(); - try (MockedStatic<Archiver> archiverMock = mockStatic(Archiver.class)) { - archiverMock.when(() -> Archiver.linkAndIncludeFile(any(), any(), any(), any())).thenAnswer(invocation -> { - // Get the actual mockArchiveOutputStream passed from writeDBToArchive - TarArchiveOutputStream aos = invocation.getArgument(2); - File sourceFile = invocation.getArgument(0); - String fileId = invocation.getArgument(1); - fileNames.add(sourceFile.getName()); - aos.putArchiveEntry(new TarArchiveEntry(sourceFile, fileId)); - aos.write(new byte[100], 0, 100); // Simulate writing - aos.closeArchiveEntry(); - return 100L; - }); - boolean success = omDbCheckpointServletMock.writeDBToArchive( - sstFilesToExclude, dbDir, maxTotalSstSize, mockArchiveOutputStream, - tmpDir, hardLinkFileMap, expectOnlySstFiles); - assertTrue(success); - verify(mockArchiveOutputStream, times(fileNames.size())).putArchiveEntry(any()); - verify(mockArchiveOutputStream, times(fileNames.size())).closeArchiveEntry(); - verify(mockArchiveOutputStream, times(fileNames.size())).write(any(byte[].class), anyInt(), - anyInt()); // verify write was called once - - boolean containsNonSstFile = false; - for (String fileName : fileNames) { - if (expectOnlySstFiles) { - assertTrue(fileName.endsWith(".sst"), "File is not an SST File"); - } else { - containsNonSstFile = true; - } - } - - if (!expectOnlySstFiles) { - assertTrue(containsNonSstFile, "SST File is not expected"); - } - } - } - private static void deleteWalFiles(Path snapshotDbDir) throws IOException { try (Stream<Path> filesInTarball = Files.list(snapshotDbDir)) { List<Path> files = filesInTarball.filter(p -> p.toString().contains(".log")) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java index a1de8fc377a..a6a9e897a91 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java @@ -1165,7 +1165,7 @@ public void pause() throws IOException { // max size config. That way next time through, we get multiple // tarballs. if (count == 1) { - long sstSize = getSizeOfSstFiles(tarball); + long sstSize = getSizeOfFiles(tarball); om.getConfiguration().setLong( OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY, sstSize / 2); // Now empty the tarball to restart the download @@ -1179,16 +1179,13 @@ public void pause() throws IOException { } // Get Size of sstfiles in tarball. - private long getSizeOfSstFiles(File tarball) throws IOException { + private long getSizeOfFiles(File tarball) throws IOException { FileUtil.unTar(tarball, tempDir.toFile()); - OmSnapshotUtils.createHardLinks(tempDir, true); - List<Path> sstPaths = Files.list(tempDir).collect(Collectors.toList()); + List<Path> sstPaths = Files.walk(tempDir). + collect(Collectors.toList()); long totalFileSize = 0; for (Path sstPath : sstPaths) { - File file = sstPath.toFile(); - if (file.isFile() && file.getName().endsWith(".sst")) { - totalFileSize += Files.size(sstPath); - } + totalFileSize += Files.size(sstPath); } return totalFileSize; } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java index c7af1eb0784..4e4191aaa35 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java @@ -24,7 +24,6 @@ import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR; import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME; import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST; -import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; import static org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_DB_LOCK; @@ -33,7 +32,6 @@ import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_PREFIX; import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_SUFFIX; -import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.io.IOException; import java.io.OutputStream; @@ -226,18 +224,18 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina break; } shouldContinue = writeDBToArchive(sstFilesToExclude, snapshotDbPath, - maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, true); + maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap); } if (shouldContinue) { shouldContinue = writeDBToArchive(sstFilesToExclude, getSstBackupDir(), - maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, true); + maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap); } if (shouldContinue) { shouldContinue = writeDBToArchive(sstFilesToExclude, getCompactionLogDir(), - maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, true); + maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap); } } @@ -249,14 +247,14 @@ public void writeDbDataToStream(HttpServletRequest request, OutputStream destina maxTotalSstSize.set(Long.MAX_VALUE); Path checkpointDir = checkpoint.getCheckpointLocation(); writeDBToArchive(sstFilesToExclude, checkpointDir, - maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap, false); + maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap); if (includeSnapshotData) { Path tmpCompactionLogDir = tmpdir.resolve(getCompactionLogDir().getFileName()); Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName()); writeDBToArchive(sstFilesToExclude, tmpCompactionLogDir, maxTotalSstSize, archiveOutputStream, tmpdir, - hardLinkFileMap, getCompactionLogDir(), false); + hardLinkFileMap, getCompactionLogDir()); writeDBToArchive(sstFilesToExclude, tmpSstBackupDir, maxTotalSstSize, archiveOutputStream, tmpdir, - hardLinkFileMap, getSstBackupDir(), false); + hardLinkFileMap, getSstBackupDir()); // This is done to ensure all data to be copied correctly is flushed in the snapshot DB transferSnapshotData(sstFilesToExclude, tmpdir, snapshotPaths, maxTotalSstSize, archiveOutputStream, hardLinkFileMap); @@ -296,27 +294,18 @@ private void transferSnapshotData(Set<String> sstFilesToExclude, Path tmpdir, Se try { // invalidate closes the snapshot DB om.getOmSnapshotManager().invalidateCacheEntry(UUID.fromString(snapshotId)); - writeDBToArchive(sstFilesToExclude, snapshotDir, maxTotalSstSize, archiveOutputStream, tmpdir, - hardLinkFileMap, false); - Path snapshotLocalPropertyYaml = Paths.get( - OmSnapshotLocalDataManager.getSnapshotLocalPropertyYamlPath(snapshotDir)); - if (Files.exists(snapshotLocalPropertyYaml)) { - File yamlFile = snapshotLocalPropertyYaml.toFile(); - hardLinkFileMap.put(yamlFile.getAbsolutePath(), yamlFile.getName()); - linkAndIncludeFile(yamlFile, yamlFile.getName(), archiveOutputStream, tmpdir); - } + writeDBToArchive(sstFilesToExclude, snapshotDir, maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap); } finally { omMetadataManager.getLock().releaseReadLock(SNAPSHOT_DB_LOCK, snapshotId); } } } - @VisibleForTesting - boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dir, + private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dir, AtomicLong maxTotalSstSize, ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, - Path tmpdir, Map<String, String> hardLinkFileMap, boolean onlySstFile) throws IOException { + Path tmpdir, Map<String, String> hardLinkFileMap) throws IOException { return writeDBToArchive(sstFilesToExclude, dir, maxTotalSstSize, - archiveOutputStream, tmpdir, hardLinkFileMap, null, onlySstFile); + archiveOutputStream, tmpdir, hardLinkFileMap, null); } private static void cleanupCheckpoint(DBCheckpoint checkpoint) { @@ -406,21 +395,12 @@ Set<Path> getSnapshotDirs(OMMetadataManager omMetadataManager) throws IOExceptio * @param maxTotalSstSize Maximum total size of SST files to include * @param archiveOutputStream Archive output stream * @param tmpDir Temporary directory for processing - * @param hardLinkFileMap Map of hardlink file paths to their unique identifiers for deduplication - * @param destDir Destination directory for the archived files. If null, - * the archived files are not moved to this directory. - * @param onlySstFile If true, only SST files are processed. If false, all files are processed. - * <p> - * This parameter is typically set to {@code true} for initial iterations to - * prioritize SST file transfer, and then set to {@code false} only for the - * final iteration to ensure all remaining file types are transferred. * @return true if processing should continue, false if size limit reached * @throws IOException if an I/O error occurs */ - @SuppressWarnings("checkstyle:ParameterNumber") private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir, AtomicLong maxTotalSstSize, ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, Path tmpDir, - Map<String, String> hardLinkFileMap, Path destDir, boolean onlySstFile) throws IOException { + Map<String, String> hardLinkFileMap, Path destDir) throws IOException { if (!Files.exists(dbDir)) { LOG.warn("DB directory {} does not exist. Skipping.", dbDir); return true; @@ -432,9 +412,6 @@ private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir, Atom Iterable<Path> iterable = files::iterator; for (Path dbFile : iterable) { if (!Files.isDirectory(dbFile)) { - if (onlySstFile && !dbFile.toString().endsWith(ROCKSDB_SST_SUFFIX)) { - continue; - } String fileId = OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(dbFile); String path = dbFile.toFile().getAbsolutePath(); if (destDir != null) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
