This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c801c02455 HDDS-9064. [SNAPSHOT] Reduce time of compaction pause
during bootstrapping. (#5104)
c801c02455 is described below
commit c801c02455982d3488cb099942f86912a492dc89
Author: GeorgeJahad <[email protected]>
AuthorDate: Thu Aug 10 15:35:59 2023 -0700
HDDS-9064. [SNAPSHOT] Reduce time of compaction pause during bootstrapping.
(#5104)
Co-authored-by: George Jahad <[email protected]>
---
.../hadoop/hdds/utils/DBCheckpointServlet.java | 54 +++--
.../ozone/rocksdiff/RocksDBCheckpointDiffer.java | 6 +-
.../hdds/scm/TestSCMDbCheckpointServlet.java | 7 +-
.../hadoop/ozone/om/TestOMDbCheckpointServlet.java | 84 +++++++-
.../hadoop/ozone/om/OMDBCheckpointServlet.java | 224 ++++++++++++++++++---
.../hadoop/ozone/om/TestOmSnapshotManager.java | 136 ++++++++++++-
6 files changed, 442 insertions(+), 69 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
index 4fe82a4781..2d718628e1 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/DBCheckpointServlet.java
@@ -21,9 +21,12 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
+import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
@@ -37,6 +40,7 @@ import org.apache.commons.fileupload.FileItemIterator;
import org.apache.commons.fileupload.FileItemStream;
import org.apache.commons.fileupload.servlet.ServletFileUpload;
import org.apache.commons.fileupload.util.Streams;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.server.OzoneAdmins;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.hdds.utils.db.DBStore;
@@ -72,6 +76,7 @@ public class DBCheckpointServlet extends HttpServlet
private boolean isSpnegoEnabled;
private transient OzoneAdmins admins;
private transient BootstrapStateHandler.Lock lock;
+ private transient File bootstrapTempData;
public void initialize(DBStore store, DBCheckpointMetrics metrics,
boolean omAclEnabled,
@@ -85,12 +90,29 @@ public class DBCheckpointServlet extends HttpServlet
if (dbStore == null) {
LOG.error(
"Unable to set metadata snapshot request. DB Store is null");
+ throw new ServletException("DB Store is null");
}
this.aclEnabled = omAclEnabled;
this.admins = new OzoneAdmins(allowedAdminUsers, allowedAdminGroups);
this.isSpnegoEnabled = isSpnegoAuthEnabled;
lock = new Lock();
+
+ // Create a directory for temp bootstrap data
+ File dbLocation = dbStore.getDbLocation();
+ if (dbLocation == null) {
+ throw new NullPointerException("dblocation null");
+ }
+ String tempData = dbLocation.getParent();
+ if (tempData == null) {
+ throw new NullPointerException("tempData dir is null");
+ }
+ bootstrapTempData = Paths.get(tempData,
+ "temp-bootstrap-data").toFile();
+ if (!bootstrapTempData.exists() &&
+ !bootstrapTempData.mkdirs()) {
+ throw new ServletException("Failed to make:" + bootstrapTempData);
+ }
}
private boolean hasPermission(UserGroupInformation user) {
@@ -171,13 +193,11 @@ public class DBCheckpointServlet extends HttpServlet
LOG.info("Received excluding SST {}", receivedSstList);
}
+ Path tmpdir = null;
try (BootstrapStateHandler.Lock lock = getBootstrapStateLock().lock()) {
- if (dbStore.getRocksDBCheckpointDiffer() != null) {
- dbStore.getRocksDBCheckpointDiffer().incrementTarballRequestCount();
- }
-
- checkpoint = dbStore.getCheckpoint(flush);
-
+ tmpdir = Files.createTempDirectory(bootstrapTempData.toPath(),
+ "bootstrap-data-");
+ checkpoint = getCheckpoint(tmpdir, flush);
if (checkpoint == null || checkpoint.getCheckpointLocation() == null) {
LOG.error("Unable to process metadata snapshot request. " +
"Checkpoint request returned null.");
@@ -198,7 +218,7 @@ public class DBCheckpointServlet extends HttpServlet
Instant start = Instant.now();
writeDbDataToStream(checkpoint, request,
- response.getOutputStream(), receivedSstList, excludedSstList);
+ response.getOutputStream(), receivedSstList, excludedSstList,
tmpdir);
Instant end = Instant.now();
long duration = Duration.between(start, end).toMillis();
@@ -220,11 +240,12 @@ public class DBCheckpointServlet extends HttpServlet
response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
dbMetrics.incNumCheckpointFails();
} finally {
- if (dbStore.getRocksDBCheckpointDiffer() != null) {
- synchronized (dbStore.getRocksDBCheckpointDiffer()) {
- dbStore.getRocksDBCheckpointDiffer().decrementTarballRequestCount();
- dbStore.getRocksDBCheckpointDiffer().notifyAll();
+ try {
+ if (tmpdir != null) {
+ FileUtils.deleteDirectory(tmpdir.toFile());
}
+ } catch (IOException e) {
+ LOG.error("unable to delete: " + tmpdir);
}
if (checkpoint != null) {
@@ -238,6 +259,11 @@ public class DBCheckpointServlet extends HttpServlet
}
}
+ public DBCheckpoint getCheckpoint(Path ignoredTmpdir, boolean flush)
+ throws IOException {
+ return dbStore.getCheckpoint(flush);
+ }
+
/**
* Parses request form data parameters.
* @param request the HTTP servlet request
@@ -310,7 +336,7 @@ public class DBCheckpointServlet extends HttpServlet
HttpServletRequest ignoredRequest,
OutputStream destination,
List<String> toExcludeList,
- List<String> excludedList)
+ List<String> excludedList, Path tmpdir)
throws IOException, InterruptedException {
Objects.requireNonNull(toExcludeList);
Objects.requireNonNull(excludedList);
@@ -319,6 +345,10 @@ public class DBCheckpointServlet extends HttpServlet
toExcludeList, excludedList);
}
+ public DBStore getDbStore() {
+ return dbStore;
+ }
+
@Override
public BootstrapStateHandler.Lock getBootstrapStateLock() {
return lock;
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index 74472f5405..2adb33ef06 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -125,7 +125,7 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
*/
private volatile String currentCompactionLogPath = null;
- static final String COMPACTION_LOG_FILE_NAME_SUFFIX = ".log";
+ public static final String COMPACTION_LOG_FILE_NAME_SUFFIX = ".log";
/**
* Marks the beginning of a comment line in the compaction log.
@@ -1451,6 +1451,10 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
return sstBackupDir;
}
+ public String getCompactionLogDir() {
+ return compactionLogDir;
+ }
+
private static final class SnapshotLogInfo {
private final long snapshotGenerationId;
private final String snapshotId;
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
index bb51702386..b180b22475 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestSCMDbCheckpointServlet.java
@@ -62,6 +62,7 @@ import org.mockito.Mockito;
import static
org.apache.hadoop.ozone.OzoneConsts.OZONE_DB_CHECKPOINT_REQUEST_TO_EXCLUDE_SST;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doCallRealMethod;
@@ -126,11 +127,13 @@ public class TestSCMDbCheckpointServlet {
Collections.emptyList(),
false);
doCallRealMethod().when(scmDbCheckpointServletMock)
- .writeDbDataToStream(any(), any(), any(), any(), any());
+ .writeDbDataToStream(any(), any(), any(), any(), any(), any());
doCallRealMethod().when(scmDbCheckpointServletMock).doPost(requestMock,
responseMock);
doCallRealMethod().when(scmDbCheckpointServletMock).doGet(requestMock,
responseMock);
+ doCallRealMethod().when(scmDbCheckpointServletMock).getCheckpoint(any(),
+ anyBoolean());
servletContextMock = mock(ServletContext.class);
when(scmDbCheckpointServletMock.getServletContext())
@@ -207,7 +210,7 @@ public class TestSCMDbCheckpointServlet {
getNumCheckpoints() > initialCheckpointCount);
Mockito.verify(scmDbCheckpointServletMock).writeDbDataToStream(any(),
- any(), any(), eq(toExcludeList), any());
+ any(), any(), eq(toExcludeList), any(), any());
} finally {
FileUtils.deleteQuietly(tempFile);
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
index cc82632fa6..94f22d7831 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServlet.java
@@ -103,8 +103,10 @@ import org.mockito.Mockito;
import static org.apache.hadoop.ozone.om.OmSnapshotManager.OM_HARDLINK_FILE;
import static
org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.truncateFileName;
import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
+import static
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COMPACTION_LOG_FILE_NAME_SUFFIX;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
@@ -197,6 +199,8 @@ public class TestOMDbCheckpointServlet {
BootstrapStateHandler.Lock lock =
new OMDBCheckpointServlet.Lock(cluster.getOzoneManager());
doCallRealMethod().when(omDbCheckpointServletMock).init();
+ Assertions.assertNull(
+ doCallRealMethod().when(omDbCheckpointServletMock).getDbStore());
requestMock = mock(HttpServletRequest.class);
// Return current user short name when asked
@@ -219,10 +223,13 @@ public class TestOMDbCheckpointServlet {
responseMock);
doCallRealMethod().when(omDbCheckpointServletMock)
- .writeDbDataToStream(any(), any(), any(), any(), any());
+ .writeDbDataToStream(any(), any(), any(), any(), any(), any());
when(omDbCheckpointServletMock.getBootstrapStateLock())
.thenReturn(lock);
+
+ doCallRealMethod().when(omDbCheckpointServletMock).getCheckpoint(any(),
+ anyBoolean());
}
@ParameterizedTest
@@ -272,7 +279,7 @@ public class TestOMDbCheckpointServlet {
getNumCheckpoints() > initialCheckpointCount);
Mockito.verify(omDbCheckpointServletMock).writeDbDataToStream(any(),
- any(), any(), eq(toExcludeList), any());
+ any(), any(), eq(toExcludeList), any(), any());
}
@Test
@@ -376,6 +383,34 @@ public class TestOMDbCheckpointServlet {
OzoneManager om = cluster.getOzoneManager();
DBStore dbStore = om.getMetadataManager().getStore();
DBStore spyDbStore = spy(dbStore);
+
+ int metaDirLength = metaDir.toString().length() + 1;
+ String compactionLogDir = dbStore.
+ getRocksDBCheckpointDiffer().getCompactionLogDir();
+ String sstBackupDir = dbStore.
+ getRocksDBCheckpointDiffer().getSSTBackupDir();
+
+ // Create files to be copied from the compaction pause
+ // temp directories so we can confirm they are correctly
+ // copied. The unexpected files should NOT be copied.
+ Path expectedLog = Paths.get(compactionLogDir, "expected" +
+ COMPACTION_LOG_FILE_NAME_SUFFIX);
+ String expectedLogStr = truncateFileName(metaDirLength, expectedLog);
+ Path unExpectedLog = Paths.get(compactionLogDir, "unexpected" +
+ COMPACTION_LOG_FILE_NAME_SUFFIX);
+ String unExpectedLogStr = truncateFileName(metaDirLength, unExpectedLog);
+ Path expectedSst = Paths.get(sstBackupDir, "expected.sst");
+ String expectedSstStr = truncateFileName(metaDirLength, expectedSst);
+ Path unExpectedSst = Paths.get(sstBackupDir, "unexpected.sst");
+ String unExpectedSstStr = truncateFileName(metaDirLength, unExpectedSst);
+
+ // put "expected" fabricated files onto the fs before the files get
+ // copied to the temp dir.
+ Files.write(expectedLog,
+ "fabricatedData".getBytes(StandardCharsets.UTF_8));
+ Files.write(expectedSst,
+ "fabricatedData".getBytes(StandardCharsets.UTF_8));
+
AtomicReference<DBCheckpoint> realCheckpoint = new AtomicReference<>();
when(spyDbStore.getCheckpoint(true)).thenAnswer(b -> {
DBCheckpoint checkpoint = spy(dbStore.getCheckpoint(true));
@@ -383,6 +418,15 @@ public class TestOMDbCheckpointServlet {
// with the snapshot data.
doNothing().when(checkpoint).cleanupCheckpoint();
realCheckpoint.set(checkpoint);
+
+ // put "unexpected" fabricated files onto the fs after the files
+ // get copied to the temp dir. Since these appear in the "real"
+ // dir after the copy, they shouldn't exist in the final file
+ // set. That will show that the copy only happened from the temp dir.
+ Files.write(unExpectedLog,
+ "fabricatedData".getBytes(StandardCharsets.UTF_8));
+ Files.write(unExpectedSst,
+ "fabricatedData".getBytes(StandardCharsets.UTF_8));
return checkpoint;
});
@@ -428,7 +472,6 @@ public class TestOMDbCheckpointServlet {
finalCheckpointSet.remove(OM_HARDLINK_FILE);
Assertions.assertEquals(initialCheckpointSet, finalCheckpointSet);
- int metaDirLength = metaDir.toString().length() + 1;
String shortSnapshotLocation =
truncateFileName(metaDirLength, Paths.get(snapshotDirName));
String shortSnapshotLocation2 =
@@ -463,6 +506,15 @@ public class TestOMDbCheckpointServlet {
Set<String> initialFullSet =
getFiles(Paths.get(metaDir.toString(), OM_SNAPSHOT_DIR),
metaDirLength);
+ Assertions.assertTrue(finalFullSet.contains(expectedLogStr));
+ Assertions.assertTrue(finalFullSet.contains(expectedSstStr));
+ Assertions.assertTrue(initialFullSet.contains(unExpectedLogStr));
+ Assertions.assertTrue(initialFullSet.contains(unExpectedSstStr));
+
+ // Remove the dummy files that should not have been copied over
+ // from the expected data.
+ initialFullSet.remove(unExpectedLogStr);
+ initialFullSet.remove(unExpectedSstStr);
Assertions.assertEquals(initialFullSet, finalFullSet,
"expected snapshot files not found");
}
@@ -472,14 +524,19 @@ public class TestOMDbCheckpointServlet {
throws Exception {
prepSnapshotData();
+ doCallRealMethod().when(omDbCheckpointServletMock).initialize(
+ any(), any(), anyBoolean(), any(), any(), anyBoolean());
+ omDbCheckpointServletMock.init();
+
// Set http param to exclude snapshot data.
when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA))
.thenReturn(null);
// Get the tarball.
+ Path tmpdir = Files.createTempDirectory("bootstrapData");
try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
omDbCheckpointServletMock.writeDbDataToStream(dbCheckpoint, requestMock,
- fileOutputStream, new ArrayList<>(), new ArrayList<>());
+ fileOutputStream, new ArrayList<>(), new ArrayList<>(), tmpdir);
}
// Untar the file into a temp folder to be examined.
@@ -503,6 +560,10 @@ public class TestOMDbCheckpointServlet {
throws Exception {
prepSnapshotData();
+ doCallRealMethod().when(omDbCheckpointServletMock).initialize(
+ any(), any(), anyBoolean(), any(), any(), anyBoolean());
+ omDbCheckpointServletMock.init();
+
File dummyFile = new File(dbCheckpoint.getCheckpointLocation().toString(),
"dummy.sst");
try (OutputStreamWriter writer = new OutputStreamWriter(
@@ -519,9 +580,10 @@ public class TestOMDbCheckpointServlet {
.thenReturn(null);
// Get the tarball.
+ Path tmpdir = Files.createTempDirectory("bootstrapData");
try (FileOutputStream fileOutputStream = new FileOutputStream(tempFile)) {
omDbCheckpointServletMock.writeDbDataToStream(dbCheckpoint, requestMock,
- fileOutputStream, toExcludeList, excludedList);
+ fileOutputStream, toExcludeList, excludedList, tmpdir);
}
// Untar the file into a temp folder to be examined.
@@ -753,15 +815,15 @@ public class TestOMDbCheckpointServlet {
String[] files = line.split("\t");
Assertions.assertTrue(
files[0].startsWith(dir0) || files[0].startsWith(dir1),
- "fabricated entry contains valid first directory");
+ "fabricated entry contains valid first directory: " + line);
Assertions.assertTrue(files[1].startsWith(realDir),
- "fabricated entry contains correct real directory");
+ "fabricated entry contains correct real directory: " + line);
Path path0 = Paths.get(files[0]);
Path path1 = Paths.get(files[1]);
Assertions.assertTrue(
path0.getFileName().toString().equals(FABRICATED_FILE_NAME) &&
path1.getFileName().toString().equals(FABRICATED_FILE_NAME),
- "fabricated entries contains correct file name");
+ "fabricated entries contains correct file name: " + line);
}
}
@@ -773,7 +835,7 @@ public class TestOMDbCheckpointServlet {
String[] files = line.split("\t");
Assertions.assertTrue(files[0].startsWith(shortSnapshotLocation) ||
files[0].startsWith(shortSnapshotLocation2),
- "hl entry starts with valid snapshot dir");
+ "hl entry starts with valid snapshot dir: " + line);
String file0 = files[0].substring(shortSnapshotLocation.length() + 1);
String file1 = files[1];
@@ -811,7 +873,7 @@ public class TestOMDbCheckpointServlet {
// Confirm the other handlers are locked out when the bootstrap
// servlet takes the lock.
- try (BootstrapStateHandler.Lock lock =
+ try (BootstrapStateHandler.Lock ignoredLock =
spyServlet.getBootstrapStateLock().lock()) {
confirmServletLocksOutOtherHandler(keyDeletingService, executorService);
confirmServletLocksOutOtherHandler(snapshotDeletingService,
@@ -853,7 +915,7 @@ public class TestOMDbCheckpointServlet {
private void confirmOtherHandlerLocksOutServlet(BootstrapStateHandler
handler,
BootstrapStateHandler servlet, ExecutorService executorService)
throws InterruptedException {
- try (BootstrapStateHandler.Lock lock =
+ try (BootstrapStateHandler.Lock ignoredLock =
handler.getBootstrapStateLock().lock()) {
Future<Boolean> test = checkLock(servlet, executorService);
// Servlet should fail to lock when other handler has taken it.
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
index 0367c5848d..57d18685cf 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServlet.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.om;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.compress.archivers.ArchiveOutputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.recon.ReconConfig;
import org.apache.hadoop.hdds.utils.DBCheckpointServlet;
@@ -31,7 +32,10 @@ import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
+
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,6 +93,8 @@ public class OMDBCheckpointServlet extends
DBCheckpointServlet {
private static final long serialVersionUID = 1L;
private transient BootstrapStateHandler.Lock lock;
private long maxTotalSstSize = 0;
+ private static final AtomicLong PAUSE_COUNTER = new AtomicLong(0);
+
@Override
public void init() throws ServletException {
@@ -128,13 +134,21 @@ public class OMDBCheckpointServlet extends
DBCheckpointServlet {
HttpServletRequest request,
OutputStream destination,
List<String> toExcludeList,
- List<String> excludedList)
+ List<String> excludedList,
+ Path tmpdir)
throws IOException, InterruptedException {
Objects.requireNonNull(toExcludeList);
Objects.requireNonNull(excludedList);
- // Files to be added to tarball
- Set<Path> copyFiles = new HashSet<>();
+ // copyFiles is a map of files to be added to tarball. The keys
+ // are the src path of the file, (where they are copied from on
+ // the leader.) The values are the dest path of the file, (where
+ // they are copied to on the follower.) In most cases these are
+ // the same. For synchronization purposes, some files are copied
+ // to a temp directory on the leader. In those cases the source
+ // and dest won't be the same.
+ Map<Path, Path> copyFiles = new HashMap<>();
+
// Map of link to path.
Map<Path, Path> hardLinkFiles = new HashMap<>();
@@ -149,7 +163,7 @@ public class OMDBCheckpointServlet extends
DBCheckpointServlet {
checkpoint.getCheckpointLocation());
boolean completed = getFilesForArchive(checkpoint, copyFiles,
hardLinkFiles, toExcludeFiles, includeSnapshotData(request),
- excludedList);
+ excludedList, tmpdir);
writeFilesToArchive(copyFiles, hardLinkFiles, archiveOutputStream,
completed, checkpoint.getCheckpointLocation());
} catch (Exception e) {
@@ -177,23 +191,96 @@ public class OMDBCheckpointServlet extends
DBCheckpointServlet {
return paths;
}
+ /**
+ * Pauses rocksdb compaction threads while creating copies of
+ * compaction logs and hard links of sst backups.
+ * @param tmpdir - Place to create copies/links
+ * @param flush - Whether to flush the db or not.
+ * @return Checkpoint containing snapshot entries expected.
+ */
+ @Override
+ public DBCheckpoint getCheckpoint(Path tmpdir, boolean flush)
+ throws IOException {
+ DBCheckpoint checkpoint;
+
+ // make tmp directories to contain the copies
+ RocksDBCheckpointDiffer differ = getDbStore().getRocksDBCheckpointDiffer();
+ DirectoryData sstBackupDir = new DirectoryData(tmpdir,
+ differ.getSSTBackupDir());
+ DirectoryData compactionLogDir = new DirectoryData(tmpdir,
+ differ.getCompactionLogDir());
+
+ long startTime = System.currentTimeMillis();
+ long pauseCounter = PAUSE_COUNTER.incrementAndGet();
+
+ // Pause compactions, Copy/link files and get checkpoint.
+ try {
+ LOG.info("Compaction pausing {} started.", pauseCounter);
+ differ.incrementTarballRequestCount();
+ FileUtils.copyDirectory(compactionLogDir.getOriginalDir(),
+ compactionLogDir.getTmpDir());
+ OmSnapshotUtils.linkFiles(sstBackupDir.getOriginalDir(),
+ sstBackupDir.getTmpDir());
+ checkpoint = getDbStore().getCheckpoint(flush);
+ } finally {
+ // Unpause the compaction threads.
+ synchronized (getDbStore().getRocksDBCheckpointDiffer()) {
+ differ.decrementTarballRequestCount();
+ differ.notifyAll();
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ LOG.info("Compaction pausing {} ended. Elapsed ms: {}",
+ pauseCounter, elapsedTime);
+ }
+ }
+ return checkpoint;
+ }
+
+
+ // Convenience class for keeping track of the tmp dirs.
+ private static class DirectoryData {
+ private final File originalDir;
+ private final File tmpDir;
+ DirectoryData(Path tmpdir, String dirStr) throws IOException {
+ originalDir = new File(dirStr);
+ tmpDir = new File(tmpdir.toString(), getOriginalDir().getName());
+ if (!tmpDir.exists() && !tmpDir.mkdirs()) {
+ throw new IOException("mkdirs failed: " + tmpDir);
+ }
+ }
+
+ public File getOriginalDir() {
+ return originalDir;
+ }
+
+ public File getTmpDir() {
+ return tmpDir;
+ }
+ }
+
private boolean getFilesForArchive(DBCheckpoint checkpoint,
- Set<Path> copyFiles,
+ Map<Path, Path> copyFiles,
Map<Path, Path> hardLinkFiles,
Set<Path> toExcludeFiles,
boolean includeSnapshotData,
- List<String> excluded)
+ List<String> excluded,
+ Path tmpdir)
throws IOException {
maxTotalSstSize = getConf().getLong(
OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY,
OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT);
+ // Tarball limits are not implemented for processes that don't
+ // include snapshots. Currently, this is just for recon.
+ if (!includeSnapshotData) {
+ maxTotalSstSize = Long.MAX_VALUE;
+ }
+
AtomicLong copySize = new AtomicLong(0L);
// Get the active fs files.
Path dir = checkpoint.getCheckpointLocation();
if (!processDir(dir, copyFiles, hardLinkFiles, toExcludeFiles,
- new HashSet<>(), excluded, copySize)) {
+ new HashSet<>(), excluded, copySize, null)) {
return false;
}
@@ -205,8 +292,29 @@ public class OMDBCheckpointServlet extends
DBCheckpointServlet {
Set<Path> snapshotPaths = waitForSnapshotDirs(checkpoint);
Path snapshotDir = Paths.get(OMStorage.getOmDbDir(getConf()).toString(),
OM_SNAPSHOT_DIR);
- return processDir(snapshotDir, copyFiles, hardLinkFiles, toExcludeFiles,
- snapshotPaths, excluded, copySize);
+ if (!processDir(snapshotDir, copyFiles, hardLinkFiles, toExcludeFiles,
+ snapshotPaths, excluded, copySize, null)) {
+ return false;
+ }
+ RocksDBCheckpointDiffer differ = getDbStore().getRocksDBCheckpointDiffer();
+ DirectoryData sstBackupDir = new DirectoryData(tmpdir,
+ differ.getSSTBackupDir());
+ DirectoryData compactionLogDir = new DirectoryData(tmpdir,
+ differ.getCompactionLogDir());
+
+ // Process the tmp sst compaction dir.
+ if (!processDir(sstBackupDir.getTmpDir().toPath(), copyFiles,
hardLinkFiles,
+ toExcludeFiles, new HashSet<>(), excluded, copySize,
+ sstBackupDir.getOriginalDir().toPath())) {
+ return false;
+ }
+
+ // Process the tmp compaction log dir.
+ return processDir(compactionLogDir.getTmpDir().toPath(), copyFiles,
+ hardLinkFiles, toExcludeFiles,
+ new HashSet<>(), excluded, copySize,
+ compactionLogDir.getOriginalDir().toPath());
+
}
/**
@@ -249,12 +357,14 @@ public class OMDBCheckpointServlet extends
DBCheckpointServlet {
}
}
- private boolean processDir(Path dir, Set<Path> copyFiles,
+ @SuppressWarnings("checkstyle:ParameterNumber")
+ private boolean processDir(Path dir, Map<Path, Path> copyFiles,
Map<Path, Path> hardLinkFiles,
Set<Path> toExcludeFiles,
Set<Path> snapshotPaths,
List<String> excluded,
- AtomicLong copySize)
+ AtomicLong copySize,
+ Path destDir)
throws IOException {
try (Stream<Path> files = Files.list(dir)) {
for (Path file : files.collect(Collectors.toList())) {
@@ -262,18 +372,46 @@ public class OMDBCheckpointServlet extends
DBCheckpointServlet {
if (f.isDirectory()) {
// Skip any unexpected snapshot files.
String parent = f.getParent();
- if (parent != null && parent.endsWith(OM_SNAPSHOT_CHECKPOINT_DIR)
+ if (parent != null && parent.contains(OM_SNAPSHOT_CHECKPOINT_DIR)
&& !snapshotPaths.contains(file)) {
LOG.debug("Skipping unneeded file: " + file);
continue;
}
+
+ // Skip the real compaction log dir.
+ File compactionLogDir = new File(getDbStore().
+ getRocksDBCheckpointDiffer().getCompactionLogDir());
+ if (f.equals(compactionLogDir)) {
+ LOG.debug("Skipping compaction log dir");
+ continue;
+ }
+
+ // Skip the real compaction sst backup dir.
+ File sstBackupDir = new File(getDbStore().
+ getRocksDBCheckpointDiffer().getSSTBackupDir());
+ if (f.equals(sstBackupDir)) {
+ LOG.debug("Skipping sst backup dir");
+ continue;
+ }
+ // findbugs nonsense
+ Path filename = file.getFileName();
+ if (filename == null) {
+ throw new IOException("file has no filename:" + file);
+ }
+
+ // Update the dest dir to point to the sub dir
+ Path destSubDir = null;
+ if (destDir != null) {
+ destSubDir = Paths.get(destDir.toString(),
+ filename.toString());
+ }
if (!processDir(file, copyFiles, hardLinkFiles, toExcludeFiles,
- snapshotPaths, excluded, copySize)) {
+ snapshotPaths, excluded, copySize, destSubDir)) {
return false;
}
} else {
long fileSize = processFile(file, copyFiles, hardLinkFiles,
- toExcludeFiles, excluded);
+ toExcludeFiles, excluded, destDir);
if (copySize.get() + fileSize > maxTotalSstSize) {
return false;
} else {
@@ -297,13 +435,28 @@ public class OMDBCheckpointServlet extends
DBCheckpointServlet {
* @param excluded The list of db files that actually were excluded.
*/
@VisibleForTesting
- public static long processFile(Path file, Set<Path> copyFiles,
+ public static long processFile(Path file, Map<Path, Path> copyFiles,
Map<Path, Path> hardLinkFiles,
Set<Path> toExcludeFiles,
- List<String> excluded) throws IOException {
+ List<String> excluded,
+ Path destDir)
+ throws IOException {
long fileSize = 0;
- if (toExcludeFiles.contains(file)) {
- excluded.add(file.toString());
+ Path destFile = file;
+
+ // findbugs nonsense
+ Path filename = file.getFileName();
+ if (filename == null) {
+ throw new IOException("file has no filename:" + file);
+ }
+
+ // if the dest dir is not null then the file needs to be copied/linked
+ // to the dest dir on the follower.
+ if (destDir != null) {
+ destFile = Paths.get(destDir.toString(), filename.toString());
+ }
+ if (toExcludeFiles.contains(destFile)) {
+ excluded.add(destFile.toString());
} else {
Path fileNamePath = file.getFileName();
if (fileNamePath == null) {
@@ -314,21 +467,21 @@ public class OMDBCheckpointServlet extends
DBCheckpointServlet {
// If same as existing excluded file, add a link for it.
Path linkPath = findLinkPath(toExcludeFiles, fileName);
if (linkPath != null) {
- hardLinkFiles.put(file, linkPath);
+ hardLinkFiles.put(destFile, linkPath);
} else {
// If already in tarball add a link for it.
- linkPath = findLinkPath(copyFiles, fileName);
+ linkPath = findLinkPath(copyFiles.values(), fileName);
if (linkPath != null) {
- hardLinkFiles.put(file, linkPath);
+ hardLinkFiles.put(destFile, linkPath);
} else {
// Add to tarball.
- copyFiles.add(file);
+ copyFiles.put(file, destFile);
fileSize = Files.size(file);
}
}
} else {
// Not sst file.
- copyFiles.add(file);
+ copyFiles.put(file, destFile);
}
}
return fileSize;
@@ -336,9 +489,10 @@ public class OMDBCheckpointServlet extends
DBCheckpointServlet {
// If fileName exists in "files" parameter,
// it should be linked to path in files.
- private static Path findLinkPath(Set<Path> files, String fileName) {
+ private static Path findLinkPath(Collection<Path> files, String fileName) {
for (Path p: files) {
- if (p.toString().endsWith(fileName)) {
+ Path file = p.getFileName();
+ if ((file != null) && file.toString().equals(fileName)) {
return p;
}
}
@@ -353,7 +507,7 @@ public class OMDBCheckpointServlet extends
DBCheckpointServlet {
}
private void writeFilesToArchive(
- Set<Path> copyFiles,
+ Map<Path, Path> copyFiles,
Map<Path, Path> hardLinkFiles,
ArchiveOutputStream archiveOutputStream,
boolean completed,
@@ -362,17 +516,21 @@ public class OMDBCheckpointServlet extends
DBCheckpointServlet {
Path metaDirPath = getVerifiedCheckPointPath(checkpointLocation);
int truncateLength = metaDirPath.toString().length() + 1;
- Set<Path> filteredCopyFiles = completed ? copyFiles :
- copyFiles.stream().filter(path ->
- path.getFileName().toString().toLowerCase().endsWith(".sst")).
- collect(Collectors.toSet());
+ Map<Path, Path> filteredCopyFiles = completed ? copyFiles :
+ copyFiles.entrySet().stream().filter(e ->
+ e.getKey().getFileName().toString().toLowerCase().endsWith(".sst")).
+ collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// Go through each of the files to be copied and add to archive.
- for (Path file : filteredCopyFiles) {
+ for (Map.Entry<Path, Path> entry : filteredCopyFiles.entrySet()) {
+ Path file = entry.getValue();
+
+ // Confirm the data is in the right place.
if (!file.toString().startsWith(metaDirPath.toString())) {
- throw new RuntimeException("tarball file not in metadata dir: "
+ throw new IOException("tarball file not in metadata dir: "
+ file + ": " + metaDirPath);
}
+
String fixedFile = truncateFileName(truncateLength, file);
if (fixedFile.startsWith(OM_CHECKPOINT_DIR)) {
// checkpoint files go to root of tarball
@@ -381,7 +539,7 @@ public class OMDBCheckpointServlet extends
DBCheckpointServlet {
fixedFile = f.toString();
}
}
- includeFile(file.toFile(), fixedFile, archiveOutputStream);
+ includeFile(entry.getKey().toFile(), fixedFile, archiveOutputStream);
}
if (completed) {
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
index c9233f04b9..7aec5be02b 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
@@ -339,9 +339,10 @@ public class TestOmSnapshotManager {
/*
* Confirm that processFile() correctly determines whether a file
* should be copied, linked, or excluded from the tarball entirely.
+ * This test always passes in a null dest dir.
*/
@Test
- public void testProcessFile() throws IOException {
+ public void testProcessFileWithNullDestDirParameter() throws IOException {
Assert.assertTrue(new File(testDir.toString(), "snap1").mkdirs());
Assert.assertTrue(new File(testDir.toString(), "snap2").mkdirs());
Path copyFile = Paths.get(testDir.toString(),
@@ -372,14 +373,15 @@ public class TestOmSnapshotManager {
Set<Path> toExcludeFiles = new HashSet<>(
Collections.singletonList(excludeFile));
- Set<Path> copyFiles = new HashSet<>(Collections.singletonList(copyFile));
+ Map<Path, Path> copyFiles = new HashMap<>();
+ copyFiles.put(copyFile, copyFile);
List<String> excluded = new ArrayList<>();
Map<Path, Path> hardLinkFiles = new HashMap<>();
long fileSize;
// Confirm the exclude file gets added to the excluded list,
// (and thus is excluded.)
fileSize = processFile(excludeFile, copyFiles, hardLinkFiles,
- toExcludeFiles, excluded);
+ toExcludeFiles, excluded, null);
Assert.assertEquals(excluded.size(), 1);
Assert.assertEquals((excluded.get(0)), excludeFile.toString());
Assert.assertEquals(copyFiles.size(), 1);
@@ -389,7 +391,7 @@ public class TestOmSnapshotManager {
// Confirm the linkToExcludedFile gets added as a link.
fileSize = processFile(linkToExcludedFile, copyFiles, hardLinkFiles,
- toExcludeFiles, excluded);
+ toExcludeFiles, excluded, null);
Assert.assertEquals(excluded.size(), 0);
Assert.assertEquals(copyFiles.size(), 1);
Assert.assertEquals(hardLinkFiles.size(), 1);
@@ -399,7 +401,7 @@ public class TestOmSnapshotManager {
// Confirm the linkToCopiedFile gets added as a link.
fileSize = processFile(linkToCopiedFile, copyFiles, hardLinkFiles,
- toExcludeFiles, excluded);
+ toExcludeFiles, excluded, null);
Assert.assertEquals(excluded.size(), 0);
Assert.assertEquals(copyFiles.size(), 1);
Assert.assertEquals(hardLinkFiles.size(), 1);
@@ -409,20 +411,134 @@ public class TestOmSnapshotManager {
// Confirm the addToCopiedFiles gets added to list of copied files
fileSize = processFile(addToCopiedFiles, copyFiles, hardLinkFiles,
- toExcludeFiles, excluded);
+ toExcludeFiles, excluded, null);
Assert.assertEquals(excluded.size(), 0);
Assert.assertEquals(copyFiles.size(), 2);
- Assert.assertTrue(copyFiles.contains(addToCopiedFiles));
+ Assert.assertEquals(copyFiles.get(addToCopiedFiles), addToCopiedFiles);
Assert.assertEquals(fileSize, expectedFileSize);
- copyFiles = new HashSet<>(Collections.singletonList(copyFile));
+ copyFiles = new HashMap<>();
+ copyFiles.put(copyFile, copyFile);
// Confirm the addNonSstToCopiedFiles gets added to list of copied files
fileSize = processFile(addNonSstToCopiedFiles, copyFiles, hardLinkFiles,
- toExcludeFiles, excluded);
+ toExcludeFiles, excluded, null);
Assert.assertEquals(excluded.size(), 0);
Assert.assertEquals(copyFiles.size(), 2);
Assert.assertEquals(fileSize, 0);
- Assert.assertTrue(copyFiles.contains(addNonSstToCopiedFiles));
+ Assert.assertEquals(copyFiles.get(addNonSstToCopiedFiles),
+ addNonSstToCopiedFiles);
+ }
+
+ /*
+ * Confirm that processFile() correctly determines whether a file
+ * should be copied, linked, or excluded from the tarball entirely.
+ * This test always passes in a non-null dest dir.
+ */
+ @Test
+ public void testProcessFileWithDestDirParameter() throws IOException {
+ Assert.assertTrue(new File(testDir.toString(), "snap1").mkdirs());
+ Assert.assertTrue(new File(testDir.toString(), "snap2").mkdirs());
+ Path destDir = Paths.get(testDir.toString(), "destDir");
+ Assert.assertTrue(new File(destDir.toString()).mkdirs());
+
+ // Create test files.
+ Path copyFile = Paths.get(testDir.toString(),
+ "snap1/copyfile.sst");
+ Path destCopyFile = Paths.get(destDir.toString(),
+ "snap1/copyfile.sst");
+ Files.write(copyFile,
+ "dummyData".getBytes(StandardCharsets.UTF_8));
+ long expectedFileSize = Files.size(copyFile);
+ Path excludeFile = Paths.get(testDir.toString(),
+ "snap1/excludeFile.sst");
+ Path destExcludeFile = Paths.get(destDir.toString(),
+ "snap1/excludeFile.sst");
+ Files.write(excludeFile,
+ "dummyData".getBytes(StandardCharsets.UTF_8));
+ Path linkToExcludedFile = Paths.get(testDir.toString(),
+ "snap2/excludeFile.sst");
+ Path destLinkToExcludedFile = Paths.get(destDir.toString(),
+ "snap2/excludeFile.sst");
+ Files.write(linkToExcludedFile,
+ "dummyData".getBytes(StandardCharsets.UTF_8));
+ Path linkToCopiedFile = Paths.get(testDir.toString(),
+ "snap2/copyfile.sst");
+ Path destLinkToCopiedFile = Paths.get(destDir.toString(),
+ "snap2/copyfile.sst");
+ Files.write(linkToCopiedFile,
+ "dummyData".getBytes(StandardCharsets.UTF_8));
+ Path addToCopiedFiles = Paths.get(testDir.toString(),
+ "snap1/copyfile2.sst");
+ Path destAddToCopiedFiles = Paths.get(destDir.toString(),
+ "snap1/copyfile2.sst");
+ Files.write(addToCopiedFiles,
+ "dummyData".getBytes(StandardCharsets.UTF_8));
+ Path addNonSstToCopiedFiles = Paths.get(testDir.toString(),
+ "snap1/nonSst");
+ Path destAddNonSstToCopiedFiles = Paths.get(destDir.toString(),
+ "snap1/nonSst");
+ Files.write(addNonSstToCopiedFiles,
+ "dummyData".getBytes(StandardCharsets.UTF_8));
+
+ // Create test data structures.
+ Set<Path> toExcludeFiles = new HashSet<>(
+ Collections.singletonList(destExcludeFile));
+ Map<Path, Path> copyFiles = new HashMap<>();
+ copyFiles.put(copyFile, destCopyFile);
+ List<String> excluded = new ArrayList<>();
+ Map<Path, Path> hardLinkFiles = new HashMap<>();
+ long fileSize;
+
+ // Confirm the exclude file gets added to the excluded list,
+ // (and thus is excluded.)
+ fileSize = processFile(excludeFile, copyFiles, hardLinkFiles,
+ toExcludeFiles, excluded, destExcludeFile.getParent());
+ Assert.assertEquals(excluded.size(), 1);
+ Assert.assertEquals((excluded.get(0)), destExcludeFile.toString());
+ Assert.assertEquals(copyFiles.size(), 1);
+ Assert.assertEquals(hardLinkFiles.size(), 0);
+ Assert.assertEquals(fileSize, 0);
+ excluded = new ArrayList<>();
+
+ // Confirm the linkToExcludedFile gets added as a link.
+ fileSize = processFile(linkToExcludedFile, copyFiles, hardLinkFiles,
+ toExcludeFiles, excluded, destLinkToExcludedFile.getParent());
+ Assert.assertEquals(excluded.size(), 0);
+ Assert.assertEquals(copyFiles.size(), 1);
+ Assert.assertEquals(hardLinkFiles.size(), 1);
+ Assert.assertEquals(hardLinkFiles.get(destLinkToExcludedFile),
+ destExcludeFile);
+ Assert.assertEquals(fileSize, 0);
+ hardLinkFiles = new HashMap<>();
+
+ // Confirm the linkToCopiedFile gets added as a link.
+ fileSize = processFile(linkToCopiedFile, copyFiles, hardLinkFiles,
+ toExcludeFiles, excluded, destLinkToCopiedFile.getParent());
+ Assert.assertEquals(excluded.size(), 0);
+ Assert.assertEquals(copyFiles.size(), 1);
+ Assert.assertEquals(hardLinkFiles.size(), 1);
+ Assert.assertEquals(hardLinkFiles.get(destLinkToCopiedFile), destCopyFile);
+ Assert.assertEquals(fileSize, 0);
+ hardLinkFiles = new HashMap<>();
+
+ // Confirm the addToCopiedFiles gets added to list of copied files
+ fileSize = processFile(addToCopiedFiles, copyFiles, hardLinkFiles,
+ toExcludeFiles, excluded, destAddToCopiedFiles.getParent());
+ Assert.assertEquals(excluded.size(), 0);
+ Assert.assertEquals(copyFiles.size(), 2);
+ Assert.assertEquals(copyFiles.get(addToCopiedFiles), destAddToCopiedFiles);
+ Assert.assertEquals(fileSize, expectedFileSize);
+ copyFiles = new HashMap<>();
+ copyFiles.put(copyFile, destCopyFile);
+
+ // Confirm the addNonSstToCopiedFiles gets added to list of copied files
+ fileSize = processFile(addNonSstToCopiedFiles, copyFiles, hardLinkFiles,
+ toExcludeFiles, excluded, destAddNonSstToCopiedFiles.getParent());
+ Assert.assertEquals(excluded.size(), 0);
+ Assert.assertEquals(copyFiles.size(), 2);
+ Assert.assertEquals(fileSize, 0);
+ Assert.assertEquals(copyFiles.get(addNonSstToCopiedFiles),
+ destAddNonSstToCopiedFiles);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]