This is an automated email from the ASF dual-hosted git repository. weichiu pushed a commit to branch ozone-2.1 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 3ca52fdfe6c742e4eb98b3a96fd2f39dc6fc8e0f Author: Wei-Chiu Chuang <[email protected]> AuthorDate: Wed Nov 19 13:01:13 2025 -0800 Revert "HDDS-12984. Use InodeID to identify the SST files inside the tarball. (#8477)" This reverts commit 96390ac142725195315906400a633f316c899702. Conflicts: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java Change-Id: I777c0c8145830df85098afde7b89ff7864472534 --- .../org/apache/hadoop/hdds/utils/Archiver.java | 43 -- .../hadoop/hdds/utils/DBCheckpointServlet.java | 12 +- .../org/apache/hadoop/hdds/utils/TestArchiver.java | 89 ---- .../hdds/scm/TestSCMDbCheckpointServlet.java | 2 - .../hadoop/ozone/om/TestOMDbCheckpointServlet.java | 2 - .../TestOMDbCheckpointServletInodeBasedXfer.java | 563 --------------------- .../om/OMDBCheckpointServletInodeBasedXfer.java | 437 ---------------- .../apache/hadoop/ozone/om/OmSnapshotManager.java | 9 +- .../hadoop/ozone/om/snapshot/OmSnapshotUtils.java | 21 - 9 files changed, 5 insertions(+), 1173 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/Archiver.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/Archiver.java index eafe85853db..20bdc5d7629 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/Archiver.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/Archiver.java @@ -38,15 +38,12 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.ozone.OzoneConsts; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Create and extract archives. */ public final class Archiver { static final int MIN_BUFFER_SIZE = 8 * (int) OzoneConsts.KB; // same as IOUtils.DEFAULT_BUFFER_SIZE static final int MAX_BUFFER_SIZE = (int) OzoneConsts.MB; - private static final Logger LOG = LoggerFactory.getLogger(Archiver.class); private Archiver() { // no instances (for now) @@ -114,46 +111,6 @@ public static long includeFile(File file, String entryName, return bytes; } - /** - * Creates a hard link to the specified file in the provided temporary directory, - * adds the linked file as an entry to the archive with the given entry name, writes - * its contents to the archive output, and then deletes the temporary hard link. - * <p> - * This approach avoids altering the original file and works around limitations - * of certain archiving libraries that may require the source file to be present - * in a specific location or have a specific name. Any errors during the hardlink - * creation or archiving process are logged. - * </p> - * - * @param file the file to be included in the archive - * @param entryName the name/path under which the file should appear in the archive - * @param archiveOutput the output stream for the archive (e.g., tar) - * @param tmpDir the temporary directory in which to create the hard link - * @return number of bytes copied to the archive for this file - * @throws IOException if an I/O error occurs other than hardlink creation failure - */ - public static long linkAndIncludeFile(File file, String entryName, - ArchiveOutputStream<TarArchiveEntry> archiveOutput, Path tmpDir) throws IOException { - File link = tmpDir.resolve(entryName).toFile(); - long bytes = 0; - try { - Files.createLink(link.toPath(), file.toPath()); - TarArchiveEntry entry = archiveOutput.createArchiveEntry(link, entryName); - archiveOutput.putArchiveEntry(entry); - try (InputStream input = Files.newInputStream(link.toPath())) { - bytes = IOUtils.copyLarge(input, archiveOutput); - } - archiveOutput.closeArchiveEntry(); - } catch (IOException ioe) { - LOG.error("Couldn't create hardlink for file {} while including it in tarball.", - file.getAbsolutePath(), ioe); - throw ioe; - } finally { - Files.deleteIfExists(link.toPath()); - } - return bytes; - } - public static void extractEntry(ArchiveEntry entry, InputStream input, long size, Path ancestor, Path path) throws IOException { HddsUtils.validatePath(path, ancestor); diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java index 118a17fbb5d..629fba1772a 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java @@ -22,7 +22,6 @@ import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST; import static org.apache.hadoop.ozone.OzoneConsts.ROCKSDB_SST_SUFFIX; -import com.google.common.annotations.VisibleForTesting; import java.io.File; import java.io.IOException; import java.io.OutputStream; @@ -123,10 +122,6 @@ public void initialize(DBStore store, DBCheckpointMetrics metrics, } } - public File getBootstrapTempData() { - return bootstrapTempData; - } - private boolean hasPermission(UserGroupInformation user) { // Check ACL for dbCheckpoint only when global Ozone ACL and SPNEGO is // enabled @@ -137,7 +132,7 @@ private boolean hasPermission(UserGroupInformation user) { } } - protected static void logSstFileList(Collection<String> sstList, String msg, int sampleSize) { + private static void logSstFileList(Collection<String> sstList, String msg, int sampleSize) { int count = sstList.size(); if (LOG.isDebugEnabled()) { LOG.debug(msg, count, "", sstList); @@ -204,8 +199,7 @@ private void generateSnapshotCheckpoint(HttpServletRequest request, processMetadataSnapshotRequest(request, response, isFormData, flush); } - @VisibleForTesting - public void processMetadataSnapshotRequest(HttpServletRequest request, HttpServletResponse response, + private void processMetadataSnapshotRequest(HttpServletRequest request, HttpServletResponse response, boolean isFormData, boolean flush) { List<String> excludedSstList = new ArrayList<>(); String[] sstParam = isFormData ? @@ -298,7 +292,7 @@ public DBCheckpoint getCheckpoint(Path ignoredTmpdir, boolean flush) * @param request the HTTP servlet request * @return array of parsed sst form data parameters for exclusion */ - protected static String[] parseFormDataParameters(HttpServletRequest request) { + private static String[] parseFormDataParameters(HttpServletRequest request) { ServletFileUpload upload = new ServletFileUpload(); List<String> sstParam = new ArrayList<>(); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestArchiver.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestArchiver.java index 6c69f6fbaf5..e175f957355 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestArchiver.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/TestArchiver.java @@ -18,31 +18,9 @@ package org.apache.hadoop.hdds.utils; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.CALLS_REAL_METHODS; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; -import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.mockito.MockedStatic; /** Test {@link Archiver}. */ class TestArchiver { @@ -68,71 +46,4 @@ void bufferSizeAboveMaximum(long fileSize) { .isEqualTo(Archiver.MAX_BUFFER_SIZE); } - @Test - void testLinkAndIncludeFileSuccessfulHardLink() throws IOException { - Path tmpDir = Files.createTempDirectory("archiver-test"); - File tempFile = File.createTempFile("test-file", ".txt"); - String entryName = "test-entry"; - Files.write(tempFile.toPath(), "Test Content".getBytes(StandardCharsets.UTF_8)); - - TarArchiveOutputStream mockArchiveOutput = mock(TarArchiveOutputStream.class); - TarArchiveEntry mockEntry = new TarArchiveEntry(entryName); - AtomicBoolean isHardLinkCreated = new AtomicBoolean(false); - when(mockArchiveOutput.createArchiveEntry(any(File.class), eq(entryName))) - .thenAnswer(invocation -> { - File linkFile = invocation.getArgument(0); - isHardLinkCreated.set(Files.isSameFile(tempFile.toPath(), linkFile.toPath())); - return mockEntry; - }); - - // Call method under test - long bytesCopied = Archiver.linkAndIncludeFile(tempFile, entryName, mockArchiveOutput, tmpDir); - assertEquals(Files.size(tempFile.toPath()), bytesCopied); - // Verify archive interactions - verify(mockArchiveOutput, times(1)).putArchiveEntry(mockEntry); - verify(mockArchiveOutput, times(1)).closeArchiveEntry(); - assertTrue(isHardLinkCreated.get()); - assertFalse(Files.exists(tmpDir.resolve(entryName))); - // Cleanup - assertTrue(tempFile.delete()); - Files.deleteIfExists(tmpDir); - } - - @Test - void testLinkAndIncludeFileFailedHardLink() throws IOException { - Path tmpDir = Files.createTempDirectory("archiver-test"); - File tempFile = File.createTempFile("test-file", ".txt"); - String entryName = "test-entry"; - Files.write(tempFile.toPath(), "Test Content".getBytes(StandardCharsets.UTF_8)); - - TarArchiveOutputStream mockArchiveOutput = - mock(TarArchiveOutputStream.class); - TarArchiveEntry mockEntry = new TarArchiveEntry("test-entry"); - AtomicBoolean isHardLinkCreated = new AtomicBoolean(false); - when(mockArchiveOutput.createArchiveEntry(any(File.class), eq(entryName))) - .thenAnswer(invocation -> { - File linkFile = invocation.getArgument(0); - isHardLinkCreated.set(Files.isSameFile(tempFile.toPath(), linkFile.toPath())); - return mockEntry; - }); - - // Mock static Files.createLink to throw IOException - try (MockedStatic<Files> mockedFiles = mockStatic(Files.class, CALLS_REAL_METHODS)) { - Path linkPath = tmpDir.resolve(entryName); - String errorMessage = "Failed to create hardlink"; - mockedFiles.when(() -> Files.createLink(linkPath, tempFile.toPath())) - .thenThrow(new IOException(errorMessage)); - - IOException thrown = assertThrows(IOException.class, () -> - Archiver.linkAndIncludeFile(tempFile, entryName, mockArchiveOutput, tmpDir) - ); - - assertTrue(thrown.getMessage().contains(errorMessage)); - } - assertFalse(isHardLinkCreated.get()); - - assertTrue(tempFile.delete()); - Files.deleteIfExists(tmpDir); - } - } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java index 4639b3cd697..3502c624c1f 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java @@ -119,8 +119,6 @@ public void init() throws Exception { responseMock); doCallRealMethod().when(scmDbCheckpointServletMock).getCheckpoint(any(), anyBoolean()); - doCallRealMethod().when(scmDbCheckpointServletMock) - .processMetadataSnapshotRequest(any(), any(), anyBoolean(), anyBoolean()); ServletContext servletContextMock = mock(ServletContext.class); when(scmDbCheckpointServletMock.getServletContext()) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java index 3d542785e11..e470b4b47c5 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java @@ -229,8 +229,6 @@ public void write(int b) throws IOException { doCallRealMethod().when(omDbCheckpointServletMock).getCheckpoint(any(), anyBoolean()); - doCallRealMethod().when(omDbCheckpointServletMock) - .processMetadataSnapshotRequest(any(), any(), anyBoolean(), anyBoolean()); } @Test diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java deleted file mode 100644 index c3dd209acba..00000000000 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java +++ /dev/null @@ -1,563 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.om; - -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD; -import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR; -import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME; -import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX; -import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_CHECKPOINT_DIR; -import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA; -import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_FLUSH; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyBoolean; -import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import javax.servlet.ServletContext; -import javax.servlet.ServletOutputStream; -import javax.servlet.WriteListener; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import org.apache.commons.lang3.RandomStringUtils; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.hdds.client.ReplicationConfig; -import org.apache.hadoop.hdds.client.ReplicationFactor; -import org.apache.hadoop.hdds.client.ReplicationType; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.utils.IOUtils; -import org.apache.hadoop.hdds.utils.db.DBCheckpoint; -import org.apache.hadoop.hdds.utils.db.DBStore; -import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.TestDataUtil; -import org.apache.hadoop.ozone.client.OzoneBucket; -import org.apache.hadoop.ozone.client.OzoneClient; -import org.apache.hadoop.ozone.lock.BootstrapStateHandler; -import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -/** - * Class used for testing the OM DB Checkpoint provider servlet using inode based transfer logic. - */ -public class TestOMDbCheckpointServletInodeBasedXfer { - - private MiniOzoneCluster cluster; - private OzoneClient client; - private OzoneManager om; - private OzoneConfiguration conf; - @TempDir - private Path folder; - private HttpServletRequest requestMock = null; - private HttpServletResponse responseMock = null; - private OMDBCheckpointServletInodeBasedXfer omDbCheckpointServletMock = null; - private ServletOutputStream servletOutputStream; - private File tempFile; - private static final AtomicInteger COUNTER = new AtomicInteger(); - - @BeforeEach - void init() throws Exception { - conf = new OzoneConfiguration(); - } - - @AfterEach - void shutdown() { - IOUtils.closeQuietly(client, cluster); - } - - private void setupCluster() throws Exception { - cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build(); - cluster.waitForClusterToBeReady(); - client = cluster.newClient(); - om = cluster.getOzoneManager(); - conf.setBoolean(OZONE_ACL_ENABLED, false); - conf.set(OZONE_ADMINISTRATORS, OZONE_ADMINISTRATORS_WILDCARD); - } - - private void setupMocks() throws Exception { - final Path tempPath = folder.resolve("temp" + COUNTER.incrementAndGet() + ".tar"); - tempFile = tempPath.toFile(); - - servletOutputStream = new ServletOutputStream() { - private final OutputStream fileOutputStream = Files.newOutputStream(tempPath); - - @Override - public boolean isReady() { - return true; - } - - @Override - public void setWriteListener(WriteListener writeListener) { - } - - @Override - public void close() throws IOException { - fileOutputStream.close(); - super.close(); - } - - @Override - public void write(int b) throws IOException { - fileOutputStream.write(b); - } - }; - - omDbCheckpointServletMock = mock(OMDBCheckpointServletInodeBasedXfer.class); - - BootstrapStateHandler.Lock lock = new OMDBCheckpointServlet.Lock(om); - doCallRealMethod().when(omDbCheckpointServletMock).init(); - assertNull(doCallRealMethod().when(omDbCheckpointServletMock).getDbStore()); - - requestMock = mock(HttpServletRequest.class); - // Return current user short name when asked - when(requestMock.getRemoteUser()) - .thenReturn(UserGroupInformation.getCurrentUser().getShortUserName()); - responseMock = mock(HttpServletResponse.class); - - ServletContext servletContextMock = mock(ServletContext.class); - when(omDbCheckpointServletMock.getServletContext()) - .thenReturn(servletContextMock); - - when(servletContextMock.getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE)) - .thenReturn(om); - when(requestMock.getParameter(OZONE_DB_CHECKPOINT_REQUEST_FLUSH)) - .thenReturn("true"); - - doCallRealMethod().when(omDbCheckpointServletMock).doGet(requestMock, - responseMock); - doCallRealMethod().when(omDbCheckpointServletMock).doPost(requestMock, - responseMock); - - doCallRealMethod().when(omDbCheckpointServletMock) - .writeDbDataToStream(any(), any(), any(), any(), any()); - - when(omDbCheckpointServletMock.getBootstrapStateLock()) - .thenReturn(lock); - doCallRealMethod().when(omDbCheckpointServletMock).getCheckpoint(any(), anyBoolean()); - assertNull(doCallRealMethod().when(omDbCheckpointServletMock).getBootstrapTempData()); - doCallRealMethod().when(omDbCheckpointServletMock).getSnapshotDirs(any()); - doCallRealMethod().when(omDbCheckpointServletMock). - processMetadataSnapshotRequest(any(), any(), anyBoolean(), anyBoolean()); - doCallRealMethod().when(omDbCheckpointServletMock).writeDbDataToStream(any(), any(), any(), any()); - doCallRealMethod().when(omDbCheckpointServletMock).getCompactionLogDir(); - doCallRealMethod().when(omDbCheckpointServletMock).getSstBackupDir(); - } - - @Test - void testContentsOfTarballWithSnapshot() throws Exception { - String volumeName = "vol" + RandomStringUtils.secure().nextNumeric(5); - String bucketName = "buck" + RandomStringUtils.secure().nextNumeric(5); - AtomicReference<DBCheckpoint> realCheckpoint = new AtomicReference<>(); - setupClusterAndMocks(volumeName, bucketName, realCheckpoint); - DBStore dbStore = om.getMetadataManager().getStore(); - DBStore spyDbStore = spy(dbStore); - AtomicReference<DBCheckpoint> realCheckpoint = new AtomicReference<>(); - when(spyDbStore.getCheckpoint(true)).thenAnswer(b -> { - DBCheckpoint checkpoint = spy(dbStore.getCheckpoint(true)); - // Don't delete the checkpoint, because we need to compare it - // with the snapshot data. - doNothing().when(checkpoint).cleanupCheckpoint(); - realCheckpoint.set(checkpoint); - return checkpoint; - }); - // Init the mock with the spyDbstore - doCallRealMethod().when(omDbCheckpointServletMock).initialize(any(), any(), - eq(false), any(), any(), eq(false)); - omDbCheckpointServletMock.initialize(spyDbStore, om.getMetrics().getDBCheckpointMetrics(), - false, - om.getOmAdminUsernames(), om.getOmAdminGroups(), false); - - // Get the tarball. - when(responseMock.getOutputStream()).thenReturn(servletOutputStream); - omDbCheckpointServletMock.doGet(requestMock, responseMock); - String testDirName = folder.resolve("testDir").toString(); - String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME; - File newDbDir = new File(newDbDirName); - assertTrue(newDbDir.mkdirs()); - FileUtil.unTar(tempFile, newDbDir); - List<String> snapshotPaths = new ArrayList<>(); - client.getObjectStore().listSnapshot(volumeName, bucketName, "", null) - .forEachRemaining(snapInfo -> snapshotPaths.add(getSnapshotDBPath(snapInfo.getCheckpointDir()))); - Set<String> inodesFromOmDataDir = new HashSet<>(); - Set<String> inodesFromTarball = new HashSet<>(); - Set<Path> allPathsInTarball = new HashSet<>(); - try (Stream<Path> filesInTarball = Files.list(newDbDir.toPath())) { - List<Path> files = filesInTarball.collect(Collectors.toList()); - for (Path p : files) { - File file = p.toFile(); - if (file.getName().equals(OmSnapshotManager.OM_HARDLINK_FILE)) { - continue; - } - String inode = getInode(file.getName()); - inodesFromTarball.add(inode); - allPathsInTarball.add(p); - } - } - Map<String, List<String>> hardLinkMapFromOmData = new HashMap<>(); - Path checkpointLocation = realCheckpoint.get().getCheckpointLocation(); - populateInodesOfFilesInDirectory(dbStore, checkpointLocation, - inodesFromOmDataDir, hardLinkMapFromOmData); - for (String snapshotPath : snapshotPaths) { - populateInodesOfFilesInDirectory(dbStore, Paths.get(snapshotPath), - inodesFromOmDataDir, hardLinkMapFromOmData); - } - Path hardlinkFilePath = - newDbDir.toPath().resolve(OmSnapshotManager.OM_HARDLINK_FILE); - Map<String, List<String>> hardlinkMapFromTarball = readFileToMap(hardlinkFilePath.toString()); - - // verify that all entries in hardLinkMapFromOmData are present in hardlinkMapFromTarball. - // entries in hardLinkMapFromOmData are from the snapshots + OM db checkpoint so they - // should be present in the tarball. - - for (Map.Entry<String, List<String>> entry : hardLinkMapFromOmData.entrySet()) { - String key = entry.getKey(); - List<String> value = entry.getValue(); - assertTrue(hardlinkMapFromTarball.containsKey(key)); - assertEquals(value, hardlinkMapFromTarball.get(key)); - } - // all files from the checkpoint should be in the tarball - assertFalse(inodesFromTarball.isEmpty()); - assertTrue(inodesFromTarball.containsAll(inodesFromOmDataDir)); - - long actualYamlFiles = Files.list(newDbDir.toPath()) - .filter(f -> f.getFileName().toString() - .endsWith(".yaml")).count(); - assertEquals(numSnapshots, actualYamlFiles, - "Number of generated YAML files should match the number of snapshots."); - - // create hardlinks now - OmSnapshotUtils.createHardLinks(newDbDir.toPath()); - for (Path old : allPathsInTarball) { - assertTrue(old.toFile().delete()); - } - assertFalse(hardlinkFilePath.toFile().exists()); - } - - /** - * Verifies that a manually added entry to the snapshot's delete table - * is persisted and can be retrieved from snapshot db loaded from OM DB checkpoint. - */ - @Test - public void testSnapshotDBConsistency() throws Exception { - String volumeName = "vol" + RandomStringUtils.secure().nextNumeric(5); - String bucketName = "buck" + RandomStringUtils.secure().nextNumeric(5); - AtomicReference<DBCheckpoint> realCheckpoint = new AtomicReference<>(); - setupClusterAndMocks(volumeName, bucketName, realCheckpoint); - List<OzoneSnapshot> snapshots = new ArrayList<>(); - client.getObjectStore().listSnapshot(volumeName, bucketName, "", null) - .forEachRemaining(snapshots::add); - OzoneSnapshot snapshotToModify = snapshots.get(0); - String dummyKey = "dummyKey"; - writeDummyKeyToDeleteTableOfSnapshotDB(snapshotToModify, bucketName, volumeName, dummyKey); - // Get the tarball. - omDbCheckpointServletMock.doGet(requestMock, responseMock); - String testDirName = folder.resolve("testDir").toString(); - String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME; - File newDbDir = new File(newDbDirName); - assertTrue(newDbDir.mkdirs()); - FileUtil.unTar(tempFile, newDbDir); - Set<Path> allPathsInTarball = getAllPathsInTarball(newDbDir); - // create hardlinks now - OmSnapshotUtils.createHardLinks(newDbDir.toPath()); - for (Path old : allPathsInTarball) { - assertTrue(old.toFile().delete()); - } - Path snapshotDbDir = Paths.get(newDbDir.toPath().toString(), OM_SNAPSHOT_CHECKPOINT_DIR, - OM_DB_NAME + "-" + snapshotToModify.getSnapshotId()); - deleteWalFiles(snapshotDbDir); - assertTrue(Files.exists(snapshotDbDir)); - String value = getValueFromSnapshotDeleteTable(dummyKey, snapshotDbDir.toString()); - assertNotNull(value); - } - - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testWriteDBToArchive(boolean expectOnlySstFiles) throws Exception { - setupMocks(); - Path dbDir = folder.resolve("db_data"); - Files.createDirectories(dbDir); - // Create dummy files: one SST, one non-SST - Path sstFile = dbDir.resolve("test.sst"); - Files.write(sstFile, "sst content".getBytes(StandardCharsets.UTF_8)); // Write some content to make it non-empty - - Path nonSstFile = dbDir.resolve("test.log"); - Files.write(nonSstFile, "log content".getBytes(StandardCharsets.UTF_8)); - Set<String> sstFilesToExclude = new HashSet<>(); - AtomicLong maxTotalSstSize = new AtomicLong(1000000); // Sufficient size - Map<String, String> hardLinkFileMap = new java.util.HashMap<>(); - Path tmpDir = folder.resolve("tmp"); - Files.createDirectories(tmpDir); - TarArchiveOutputStream mockArchiveOutputStream = mock(TarArchiveOutputStream.class); - List<String> fileNames = new ArrayList<>(); - try (MockedStatic<Archiver> archiverMock = mockStatic(Archiver.class)) { - archiverMock.when(() -> Archiver.linkAndIncludeFile(any(), any(), any(), any())).thenAnswer(invocation -> { - // Get the actual mockArchiveOutputStream passed from writeDBToArchive - TarArchiveOutputStream aos = invocation.getArgument(2); - File sourceFile = invocation.getArgument(0); - String fileId = invocation.getArgument(1); - fileNames.add(sourceFile.getName()); - aos.putArchiveEntry(new TarArchiveEntry(sourceFile, fileId)); - aos.write(new byte[100], 0, 100); // Simulate writing - aos.closeArchiveEntry(); - return 100L; - }); - boolean success = omDbCheckpointServletMock.writeDBToArchive( - sstFilesToExclude, dbDir, maxTotalSstSize, mockArchiveOutputStream, - tmpDir, hardLinkFileMap, expectOnlySstFiles); - assertTrue(success); - verify(mockArchiveOutputStream, times(fileNames.size())).putArchiveEntry(any()); - verify(mockArchiveOutputStream, times(fileNames.size())).closeArchiveEntry(); - verify(mockArchiveOutputStream, times(fileNames.size())).write(any(byte[].class), anyInt(), - anyInt()); // verify write was called once - - boolean containsNonSstFile = false; - for (String fileName : fileNames) { - if (expectOnlySstFiles) { - assertTrue(fileName.endsWith(".sst"), "File is not an SST File"); - } else { - containsNonSstFile = true; - } - } - - if (!expectOnlySstFiles) { - assertTrue(containsNonSstFile, "SST File is not expected"); - } - } - } - - private static void deleteWalFiles(Path snapshotDbDir) throws IOException { - try (Stream<Path> filesInTarball = Files.list(snapshotDbDir)) { - List<Path> files = filesInTarball.filter(p -> p.toString().contains(".log")) - .collect(Collectors.toList()); - for (Path p : files) { - Files.delete(p); - } - } - } - - private static Set<Path> getAllPathsInTarball(File newDbDir) throws IOException { - Set<Path> allPathsInTarball = new HashSet<>(); - try (Stream<Path> filesInTarball = Files.list(newDbDir.toPath())) { - List<Path> files = filesInTarball.collect(Collectors.toList()); - for (Path p : files) { - File file = p.toFile(); - if (file.getName().equals(OmSnapshotManager.OM_HARDLINK_FILE)) { - continue; - } - allPathsInTarball.add(p); - } - } - return allPathsInTarball; - } - - private void writeDummyKeyToDeleteTableOfSnapshotDB(OzoneSnapshot snapshotToModify, String bucketName, - String volumeName, String keyName) - throws IOException { - try (UncheckedAutoCloseableSupplier<OmSnapshot> supplier = om.getOmSnapshotManager() - .getSnapshot(snapshotToModify.getSnapshotId())) { - OmSnapshot omSnapshot = supplier.get(); - OmKeyInfo dummyOmKeyInfo = - new OmKeyInfo.Builder().setBucketName(bucketName).setVolumeName(volumeName).setKeyName(keyName) - .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE)).build(); - RepeatedOmKeyInfo dummyRepeatedKeyInfo = - new RepeatedOmKeyInfo.Builder().setOmKeyInfos(Collections.singletonList(dummyOmKeyInfo)).build(); - omSnapshot.getMetadataManager().getDeletedTable().put(dummyOmKeyInfo.getKeyName(), dummyRepeatedKeyInfo); - } - } - - private void setupClusterAndMocks(String volumeName, String bucketName, - AtomicReference<DBCheckpoint> realCheckpoint) throws Exception { - setupCluster(); - setupMocks(); - om.getKeyManager().getSnapshotSstFilteringService().pause(); - when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)).thenReturn("true"); - // Create a "spy" dbstore keep track of the checkpoint. - writeData(volumeName, bucketName, true); - DBStore dbStore = om.getMetadataManager().getStore(); - DBStore spyDbStore = spy(dbStore); - when(spyDbStore.getCheckpoint(true)).thenAnswer(b -> { - DBCheckpoint checkpoint = spy(dbStore.getCheckpoint(true)); - // Don't delete the checkpoint, because we need to compare it - // with the snapshot data. - doNothing().when(checkpoint).cleanupCheckpoint(); - realCheckpoint.set(checkpoint); - return checkpoint; - }); - // Init the mock with the spyDbstore - doCallRealMethod().when(omDbCheckpointServletMock).initialize(any(), any(), - eq(false), any(), any(), eq(false)); - omDbCheckpointServletMock.initialize(spyDbStore, om.getMetrics().getDBCheckpointMetrics(), - false, - om.getOmAdminUsernames(), om.getOmAdminGroups(), false); - when(responseMock.getOutputStream()).thenReturn(servletOutputStream); - } - - String getValueFromSnapshotDeleteTable(String key, String snapshotDB) { - String result = null; - List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>(); - int count = 1; - int deletedTableCFIndex = 0; - cfDescriptors.add(new ColumnFamilyDescriptor("default".getBytes(StandardCharsets.UTF_8))); - for (String cfName : OMDBDefinition.getAllColumnFamilies()) { - if (cfName.equals(OMDBDefinition.DELETED_TABLE)) { - deletedTableCFIndex = count; - } - cfDescriptors.add(new ColumnFamilyDescriptor(cfName.getBytes(StandardCharsets.UTF_8))); - count++; - } - // For holding handles - List<ColumnFamilyHandle> cfHandles = new ArrayList<>(); - try (DBOptions options = new DBOptions().setCreateIfMissing(false).setCreateMissingColumnFamilies(true); - RocksDB db = RocksDB.openReadOnly(options, snapshotDB, cfDescriptors, cfHandles)) { - - ColumnFamilyHandle deletedTableCF = cfHandles.get(deletedTableCFIndex); // 0 is default - byte[] value = db.get(deletedTableCF, key.getBytes(StandardCharsets.UTF_8)); - if (value != null) { - result = new String(value, StandardCharsets.UTF_8); - } - } catch (Exception e) { - fail("Exception while reading from snapshot DB " + e.getMessage()); - } finally { - for (ColumnFamilyHandle handle : cfHandles) { - handle.close(); - } - } - return result; - } - - public static Map<String, List<String>> readFileToMap(String filePath) throws IOException { - Map<String, List<String>> dataMap = new HashMap<>(); - try (BufferedReader reader = Files.newBufferedReader(Paths.get(filePath), StandardCharsets.UTF_8)) { - String line; - while ((line = reader.readLine()) != null) { - String trimmedLine = line.trim(); - if (trimmedLine.isEmpty() || !trimmedLine.contains("\t")) { - continue; - } - int tabIndex = trimmedLine.indexOf("\t"); - if (tabIndex > 0) { - // value is the full path that needs to be constructed - String value = trimmedLine.substring(0, tabIndex).trim(); - // key is the inodeID - String key = getInode(trimmedLine.substring(tabIndex + 1).trim()); - if (!key.isEmpty() && !value.isEmpty()) { - dataMap.computeIfAbsent(key, k -> new ArrayList<>()).add(value); - } - } - } - } - for (Map.Entry<String, List<String>> entry : dataMap.entrySet()) { - Collections.sort(entry.getValue()); - } - return dataMap; - } - - private void populateInodesOfFilesInDirectory(DBStore dbStore, Path dbLocation, - Set<String> inodesFromOmDbCheckpoint, Map<String, List<String>> hardlinkMap) throws IOException { - try (Stream<Path> filesInOmDb = Files.list(dbLocation)) { - List<Path> files = filesInOmDb.collect(Collectors.toList()); - for (Path p : files) { - if (Files.isDirectory(p) || p.toFile().getName().equals(OmSnapshotManager.OM_HARDLINK_FILE)) { - continue; - } - String inode = getInode(OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(p)); - Path metadataDir = OMStorage.getOmDbDir(conf).toPath(); - String path = metadataDir.relativize(p).toString(); - if (path.contains(OM_CHECKPOINT_DIR)) { - path = metadataDir.relativize(dbStore.getDbLocation().toPath().resolve(p.getFileName())).toString(); - } - if (path.startsWith(OM_DB_NAME)) { - Path fileName = Paths.get(path).getFileName(); - // fileName will not be null, added null check for findbugs - if (fileName != null) { - path = fileName.toString(); - } - } - hardlinkMap.computeIfAbsent(inode, k -> new ArrayList<>()).add(path); - inodesFromOmDbCheckpoint.add(inode); - } - } - for (Map.Entry<String, List<String>> entry : hardlinkMap.entrySet()) { - Collections.sort(entry.getValue()); - } - } - - private String getSnapshotDBPath(String checkPointDir) { - return OMStorage.getOmDbDir(cluster.getConf()) + - OM_KEY_PREFIX + OM_SNAPSHOT_CHECKPOINT_DIR + OM_KEY_PREFIX + - OM_DB_NAME + checkPointDir; - } - - private static String getInode(String inodeAndMtime) { - String inode = inodeAndMtime.split("-")[0]; - return inode; - } - - private void writeData(String volumeName, String bucketName, boolean includeSnapshots) throws Exception { - OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(client, volumeName, bucketName); - for (int i = 0; i < 10; i++) { - TestDataUtil.createKey(bucket, "key" + i, - ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE), - "sample".getBytes(StandardCharsets.UTF_8)); - om.getMetadataManager().getStore().flushDB(); - } - if (includeSnapshots) { - TestDataUtil.createKey(bucket, "keysnap1", - ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE), - "sample".getBytes(StandardCharsets.UTF_8)); - TestDataUtil.createKey(bucket, "keysnap2", - ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS, ReplicationFactor.ONE), - "sample".getBytes(StandardCharsets.UTF_8)); - client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot10"); - client.getObjectStore().createSnapshot(volumeName, bucketName, "snapshot20"); - } - } -} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java deleted file mode 100644 index 5859b23aa4b..00000000000 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java +++ /dev/null @@ -1,437 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.om; - -import static org.apache.hadoop.hdds.utils.Archiver.includeFile; -import static org.apache.hadoop.hdds.utils.Archiver.linkAndIncludeFile; -import static org.apache.hadoop.hdds.utils.Archiver.tar; -import static org.apache.hadoop.hdds.utils.HddsServerUtil.includeRatisSnapshotCompleteFlag; -import static org.apache.hadoop.ozone.OzoneConsts.OM_CHECKPOINT_DIR; -import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME; -import static org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST; -import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT; -import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY; -import static org.apache.hadoop.ozone.om.snapshot.OMDBCheckpointUtils.includeSnapshotData; -import static org.apache.hadoop.ozone.om.snapshot.OMDBCheckpointUtils.logEstimatedTarballSize; -import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_PREFIX; -import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_SUFFIX; - -import java.io.File; -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Stream; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import org.apache.commons.compress.archivers.ArchiveOutputStream; -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.recon.ReconConfig; -import org.apache.hadoop.hdds.utils.DBCheckpointServlet; -import org.apache.hadoop.hdds.utils.db.DBCheckpoint; -import org.apache.hadoop.ozone.OzoneConsts; -import org.apache.hadoop.ozone.lock.BootstrapStateHandler; -import org.apache.hadoop.ozone.om.helpers.SnapshotInfo; -import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager; -import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Time; -import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Specialized OMDBCheckpointServlet implementation that transfers Ozone Manager - * database checkpoints using inode-based deduplication. - * <p> - * This servlet constructs checkpoint archives by examining file inodes, - * ensuring that files with the same inode (i.e., hardlinks or duplicates) - * are only transferred once. It maintains mappings from inode IDs to file - * paths, manages hardlink information, and enforces snapshot and SST file - * size constraints as needed. - * <p> - * This approach optimizes checkpoint streaming by reducing redundant data - * transfer, especially in environments where RocksDB and snapshotting result - * in multiple hardlinks to the same physical data. - */ -public class OMDBCheckpointServletInodeBasedXfer extends DBCheckpointServlet { - - protected static final Logger LOG = - LoggerFactory.getLogger(OMDBCheckpointServletInodeBasedXfer.class); - private static final long serialVersionUID = 1L; - - @Override - public void init() throws ServletException { - OzoneManager om = (OzoneManager) getServletContext() - .getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE); - - if (om == null) { - LOG.error("Unable to initialize OMDBCheckpointServlet. OM is null"); - return; - } - - OzoneConfiguration conf = getConf(); - // Only Ozone Admins and Recon are allowed - Collection<String> allowedUsers = - new LinkedHashSet<>(om.getOmAdminUsernames()); - Collection<String> allowedGroups = om.getOmAdminGroups(); - ReconConfig reconConfig = conf.getObject(ReconConfig.class); - String reconPrincipal = reconConfig.getKerberosPrincipal(); - if (!reconPrincipal.isEmpty()) { - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser(reconPrincipal); - allowedUsers.add(ugi.getShortUserName()); - } - - initialize(om.getMetadataManager().getStore(), - om.getMetrics().getDBCheckpointMetrics(), - om.getAclsEnabled(), - allowedUsers, - allowedGroups, - om.isSpnegoEnabled()); - } - - @Override - public void processMetadataSnapshotRequest(HttpServletRequest request, HttpServletResponse response, - boolean isFormData, boolean flush) { - List<String> excludedSstList = new ArrayList<>(); - String[] sstParam = isFormData ? - parseFormDataParameters(request) : request.getParameterValues( - OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST); - Set<String> receivedSstFiles = extractSstFilesToExclude(sstParam); - Path tmpdir = null; - try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) { - tmpdir = Files.createTempDirectory(getBootstrapTempData().toPath(), - "bootstrap-data-"); - if (tmpdir == null) { - throw new IOException("tmp dir is null"); - } - String tarName = "om.data-" + System.currentTimeMillis() + ".tar"; - response.setContentType("application/x-tar"); - response.setHeader("Content-Disposition", "attachment; filename=\"" + tarName + "\""); - Instant start = Instant.now(); - writeDbDataToStream(request, response.getOutputStream(), receivedSstFiles, tmpdir); - Instant end = Instant.now(); - long duration = Duration.between(start, end).toMillis(); - LOG.info("Time taken to write the checkpoint to response output " + - "stream: {} milliseconds", duration); - logSstFileList(excludedSstList, - "Excluded {} SST files from the latest checkpoint{}: {}", 5); - } catch (Exception e) { - LOG.error( - "Unable to process metadata snapshot request. ", e); - response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); - } finally { - try { - if (tmpdir != null) { - FileUtils.deleteDirectory(tmpdir.toFile()); - } - } catch (IOException e) { - LOG.error("unable to delete: " + tmpdir, e.toString()); - } - } - } - - Path getSstBackupDir() { - RocksDBCheckpointDiffer differ = getDbStore().getRocksDBCheckpointDiffer(); - return new File(differ.getSSTBackupDir()).toPath(); - } - - Path getCompactionLogDir() { - RocksDBCheckpointDiffer differ = getDbStore().getRocksDBCheckpointDiffer(); - return new File(differ.getCompactionLogDir()).toPath(); - } - - /** - * Streams the Ozone Manager database checkpoint and (optionally) snapshot-related data - * as a tar archive to the provided output stream. This method handles deduplication - * based on file inodes to avoid transferring duplicate files (such as hardlinks), - * supports excluding specific SST files, enforces maximum total SST file size limits, - * and manages temporary directories for processing. - * - * The method processes snapshot directories and backup/compaction logs (if requested), - * then finally the active OM database. It also writes a hardlink mapping file - * and includes a completion flag for Ratis snapshot streaming. - * - * @param request The HTTP servlet request containing parameters for the snapshot. - * @param destination The output stream to which the tar archive is written. - * @param sstFilesToExclude Set of SST file identifiers to exclude from the archive. - * @param tmpdir Temporary directory for staging files during archiving. - * @throws IOException if an I/O error occurs during processing or streaming. - */ - - public void writeDbDataToStream(HttpServletRequest request, OutputStream destination, - Set<String> sstFilesToExclude, Path tmpdir) throws IOException { - DBCheckpoint checkpoint = null; - OzoneManager om = (OzoneManager) getServletContext().getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE); - OMMetadataManager omMetadataManager = om.getMetadataManager(); - boolean includeSnapshotData = includeSnapshotData(request); - AtomicLong maxTotalSstSize = new AtomicLong(getConf().getLong(OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY, - OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT)); - - Set<Path> snapshotPaths = Collections.emptySet(); - - if (!includeSnapshotData) { - maxTotalSstSize.set(Long.MAX_VALUE); - } else { - snapshotPaths = getSnapshotDirs(omMetadataManager); - } - - if (sstFilesToExclude.isEmpty()) { - logEstimatedTarballSize(getDbStore().getDbLocation().toPath(), snapshotPaths); - } - - boolean shouldContinue = true; - - Map<String, String> hardLinkFileMap = new HashMap<>(); - try (ArchiveOutputStream<TarArchiveEntry> archiveOutputStream = tar(destination)) { - if (includeSnapshotData) { - // Process each snapshot db path and write it to archive - for (Path snapshotDbPath : snapshotPaths) { - if (!shouldContinue) { - break; - } - shouldContinue = writeDBToArchive(sstFilesToExclude, snapshotDbPath, - maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap); - } - - - if (shouldContinue) { - shouldContinue = writeDBToArchive(sstFilesToExclude, getSstBackupDir(), - maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap); - } - - if (shouldContinue) { - shouldContinue = writeDBToArchive(sstFilesToExclude, getCompactionLogDir(), - maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap); - } - } - - if (shouldContinue) { - // we finished transferring files from snapshot DB's by now and - // this is the last step where we transfer the active om.db contents - checkpoint = createAndPrepareCheckpoint(tmpdir, true); - // unlimited files as we want the Active DB contents to be transferred in a single batch - maxTotalSstSize.set(Long.MAX_VALUE); - Path checkpointDir = checkpoint.getCheckpointLocation(); - writeDBToArchive(sstFilesToExclude, checkpointDir, - maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap); - if (includeSnapshotData) { - Path tmpCompactionLogDir = tmpdir.resolve(getCompactionLogDir().getFileName()); - Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName()); - writeDBToArchive(sstFilesToExclude, tmpCompactionLogDir, maxTotalSstSize, archiveOutputStream, tmpdir, - hardLinkFileMap, getCompactionLogDir()); - writeDBToArchive(sstFilesToExclude, tmpSstBackupDir, maxTotalSstSize, archiveOutputStream, tmpdir, - hardLinkFileMap, getSstBackupDir()); - } - writeHardlinkFile(getConf(), hardLinkFileMap, archiveOutputStream); - includeRatisSnapshotCompleteFlag(archiveOutputStream); - } - - } catch (IOException ioe) { - LOG.error("got exception writing to archive " + ioe); - throw ioe; - } finally { - cleanupCheckpoint(checkpoint); - } - } - - private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dir, - AtomicLong maxTotalSstSize, ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, - Path tmpdir, Map<String, String> hardLinkFileMap) throws IOException { - return writeDBToArchive(sstFilesToExclude, dir, maxTotalSstSize, - archiveOutputStream, tmpdir, hardLinkFileMap, null); - } - - private static void cleanupCheckpoint(DBCheckpoint checkpoint) { - if (checkpoint != null) { - try { - checkpoint.cleanupCheckpoint(); - } catch (IOException e) { - LOG.error("Error trying to clean checkpoint at {} .", - checkpoint.getCheckpointLocation().toString()); - } - } - } - - /** - * Writes a hardlink mapping file to the archive, which maps file IDs to their - * relative paths. This method generates the mapping file based on the provided - * hardlink metadata and adds it to the archive output stream. - * - * @param conf Ozone configuration for the OM instance. - * @param hardlinkFileMap A map where the key is the absolute file path - * and the value is its corresponding file ID. - * @param archiveOutputStream The archive output stream to which the hardlink - * file should be written. - * @throws IOException If an I/O error occurs while creating or writing the - * hardlink file. - */ - private static void writeHardlinkFile(OzoneConfiguration conf, Map<String, String> hardlinkFileMap, - ArchiveOutputStream<TarArchiveEntry> archiveOutputStream) throws IOException { - Path data = Files.createTempFile(DATA_PREFIX, DATA_SUFFIX); - Path metaDirPath = OMStorage.getOmDbDir(conf).toPath(); - StringBuilder sb = new StringBuilder(); - - for (Map.Entry<String, String> entry : hardlinkFileMap.entrySet()) { - Path p = Paths.get(entry.getKey()); - String fileId = entry.getValue(); - Path relativePath = metaDirPath.relativize(p); - // if the file is in "om.db" directory, strip off the 'o - // m.db' name from the path - // and only keep the file name as this would be created in the current dir of the untarred dir - // on the follower. - if (relativePath.startsWith(OM_DB_NAME)) { - relativePath = relativePath.getFileName(); - } - sb.append(relativePath).append('\t').append(fileId).append('\n'); - } - Files.write(data, sb.toString().getBytes(StandardCharsets.UTF_8), StandardOpenOption.TRUNCATE_EXISTING); - includeFile(data.toFile(), OmSnapshotManager.OM_HARDLINK_FILE, archiveOutputStream); - } - - /** - * Gets the configuration from the OzoneManager context. - * - * @return OzoneConfiguration instance - */ - private OzoneConfiguration getConf() { - return ((OzoneManager) getServletContext() - .getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE)) - .getConfiguration(); - } - - /** - * Collects paths to all snapshot databases. - * - * @param omMetadataManager OMMetadataManager instance - * @return Set of paths to snapshot databases - * @throws IOException if an I/O error occurs - */ - Set<Path> getSnapshotDirs(OMMetadataManager omMetadataManager) throws IOException { - Set<Path> snapshotPaths = new HashSet<>(); - SnapshotChainManager snapshotChainManager = new SnapshotChainManager(omMetadataManager); - for (SnapshotChainInfo snapInfo : snapshotChainManager.getGlobalSnapshotChain().values()) { - String snapshotDir = - OmSnapshotManager.getSnapshotPath(getConf(), SnapshotInfo.getCheckpointDirName(snapInfo.getSnapshotId())); - Path path = Paths.get(snapshotDir); - snapshotPaths.add(path); - } - return snapshotPaths; - } - - /** - * Writes database files to the archive, handling deduplication based on inode IDs. - * Here the dbDir could either be a snapshot db directory, the active om.db, - * compaction log dir, sst backup dir. - * - * @param sstFilesToExclude Set of SST file IDs to exclude from the archive - * @param dbDir Directory containing database files to archive - * @param maxTotalSstSize Maximum total size of SST files to include - * @param archiveOutputStream Archive output stream - * @param tmpDir Temporary directory for processing - * @return true if processing should continue, false if size limit reached - * @throws IOException if an I/O error occurs - */ - private boolean writeDBToArchive(Set<String> sstFilesToExclude, Path dbDir, AtomicLong maxTotalSstSize, - ArchiveOutputStream<TarArchiveEntry> archiveOutputStream, Path tmpDir, - Map<String, String> hardLinkFileMap, Path destDir) throws IOException { - if (!Files.exists(dbDir)) { - LOG.warn("DB directory {} does not exist. Skipping.", dbDir); - return true; - } - long bytesWritten = 0L; - int filesWritten = 0; - long lastLoggedTime = Time.monotonicNow(); - try (Stream<Path> files = Files.list(dbDir)) { - Iterable<Path> iterable = files::iterator; - for (Path dbFile : iterable) { - if (!Files.isDirectory(dbFile)) { - String fileId = OmSnapshotUtils.getFileInodeAndLastModifiedTimeString(dbFile); - String path = dbFile.toFile().getAbsolutePath(); - if (destDir != null) { - path = destDir.resolve(dbFile.getFileName()).toString(); - } - // if the file is in the om checkpoint dir, then we need to change the path to point to the OM DB. - if (path.contains(OM_CHECKPOINT_DIR)) { - path = getDbStore().getDbLocation().toPath().resolve(dbFile.getFileName()).toAbsolutePath().toString(); - } - hardLinkFileMap.put(path, fileId); - if (!sstFilesToExclude.contains(fileId)) { - long fileSize = Files.size(dbFile); - if (maxTotalSstSize.get() - fileSize <= 0) { - return false; - } - bytesWritten += linkAndIncludeFile(dbFile.toFile(), fileId, archiveOutputStream, tmpDir); - filesWritten++; - maxTotalSstSize.addAndGet(-fileSize); - sstFilesToExclude.add(fileId); - if (Time.monotonicNow() - lastLoggedTime >= 30000) { - LOG.info("Transferred {} KB, #files {} to checkpoint tarball stream...", - bytesWritten / (1024), filesWritten); - lastLoggedTime = Time.monotonicNow(); - } - } - } - } - } - return true; - } - - /** - * Creates a database checkpoint and copies compaction log and SST backup files - * into the given temporary directory. - * The copy to the temporary directory for compaction log and SST backup files - * is done to maintain a consistent view of the files in these directories. - * - * @param tmpdir Temporary directory for storing checkpoint-related files. - * @param flush If true, flushes in-memory data to disk before checkpointing. - * @return The created database checkpoint. - * @throws IOException If an error occurs during checkpoint creation or file copying. - */ - private DBCheckpoint createAndPrepareCheckpoint(Path tmpdir, boolean flush) throws IOException { - // make tmp directories to contain the copies - Path tmpCompactionLogDir = tmpdir.resolve(getCompactionLogDir().getFileName()); - Path tmpSstBackupDir = tmpdir.resolve(getSstBackupDir().getFileName()); - - // Create checkpoint and then copy the files so that it has all the compaction entries and files. - DBCheckpoint dbCheckpoint = getDbStore().getCheckpoint(flush); - FileUtils.copyDirectory(getCompactionLogDir().toFile(), tmpCompactionLogDir.toFile()); - OmSnapshotUtils.linkFiles(getSstBackupDir().toFile(), tmpSstBackupDir.toFile()); - - return dbCheckpoint; - } -} diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java index 71c5f29511e..eae4dd6b224 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java @@ -810,15 +810,10 @@ public static Path getSnapshotPath(OMMetadataManager omMetadataManager, UUID sna } public static String getSnapshotPath(OzoneConfiguration conf, - SnapshotInfo snapshotInfo) { - return getSnapshotPath(conf, snapshotInfo.getCheckpointDirName()); - } - - public static String getSnapshotPath(OzoneConfiguration conf, - String checkpointDirName) { + SnapshotInfo snapshotInfo) { return OMStorage.getOmDbDir(conf) + OM_KEY_PREFIX + OM_SNAPSHOT_CHECKPOINT_DIR + OM_KEY_PREFIX + - OM_DB_NAME + checkpointDirName; + OM_DB_NAME + snapshotInfo.getCheckpointDirName(); } public static boolean isSnapshotKey(String[] keyParts) { diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java index 848384ce3e2..f5805044b7f 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/OmSnapshotUtils.java @@ -27,7 +27,6 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.attribute.BasicFileAttributes; -import java.nio.file.attribute.FileTime; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -66,26 +65,6 @@ public static Object getINode(Path file) throws IOException { return Files.readAttributes(file, BasicFileAttributes.class).fileKey(); } - /** - * Returns a string combining the inode (fileKey) and the last modification time (mtime) of the given file. - * <p> - * The returned string is formatted as "{inode}-{mtime}", where: - * <ul> - * <li>{@code inode} is the unique file key obtained from the file system, typically representing - * the inode on POSIX systems</li> - * <li>{@code mtime} is the last modified time of the file in milliseconds since the epoch</li> - * </ul> - * - * @param file the {@link Path} to the file whose inode and modification time are to be retrieved - * @return a string in the format "{inode}-{mtime}" - * @throws IOException if an I/O error occurs - */ - public static String getFileInodeAndLastModifiedTimeString(Path file) throws IOException { - Object inode = Files.readAttributes(file, BasicFileAttributes.class).fileKey(); - FileTime mTime = Files.getLastModifiedTime(file); - return String.format("%s-%s", inode, mTime.toMillis()); - } - /** * Create file of links to add to tarball. * Format of entries are either: --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
