This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new 686f59c80210 [SPARK-48586][SS][3.5] Remove lock acquisition in
doMaintenance() by making a deep copy of file mappings in RocksDBFileManager in
load()
686f59c80210 is described below
commit 686f59c802105ea7bc1f9af50f9a1bdbd84e336d
Author: Riya Verma <[email protected]>
AuthorDate: Fri Jun 28 16:59:21 2024 +0900
[SPARK-48586][SS][3.5] Remove lock acquisition in doMaintenance() by making
a deep copy of file mappings in RocksDBFileManager in load()
Backports #46942 to 3.5
### What changes were proposed in this pull request?
When change log checkpointing is enabled, the lock of the **RocksDB** state
store is acquired when uploading the snapshot inside maintenance tasks, which
causes lock contention between query processing tasks and state maintenance
thread. This PR fixes lock contention issue introduced by
https://github.com/apache/spark/pull/45724.
The changes include:
1. Removing lock acquisition in `doMaintenance()`
2. Adding a `copyFileMappings()` method to **RocksDBFileManager**, and
using this method to deep copy the file manager state, specifically the file
mappings `versionToRocksDBFiles` and `localFilesToDfsFiles`, in `load()`
3. Capture the reference to the file mappings in `commit()`.
### Why are the changes needed?
We want to eliminate lock contention to decrease latency of streaming
queries so lock acquisition inside maintenance tasks should be avoided. This
can introduce race conditions between task and maintenance threads. By making a
deep copy of `versionToRocksDBFiles` and `localFilesToDfsFiles` in
**RocksDBFileManager**, we can ensure that the file manager state is not
updated by task thread when background snapshot uploading tasks attempt to
upload a snapshot.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit test cases.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #47130 from
riyaverm-db/remove-lock-contention-between-maintenance-and-task-3.5.
Lead-authored-by: Riya Verma <[email protected]>
Co-authored-by: Riya Verma <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/execution/streaming/state/RocksDB.scala | 58 ++++++++------
.../streaming/state/RocksDBFileManager.scala | 88 ++++++++++++++-------
.../execution/streaming/state/RocksDBSuite.scala | 90 +++++++++++++++++++++-
3 files changed, 183 insertions(+), 53 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 301d978c9038..6c0447e1a325 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
import javax.annotation.concurrent.GuardedBy
import scala.collection.{mutable, Map}
+import scala.collection.mutable.ListBuffer
import scala.ref.WeakReference
import scala.util.Try
@@ -57,7 +58,11 @@ class RocksDB(
hadoopConf: Configuration = new Configuration,
loggingId: String = "") extends Logging {
- case class RocksDBSnapshot(checkpointDir: File, version: Long, numKeys:
Long) {
+ case class RocksDBSnapshot(
+ checkpointDir: File,
+ version: Long,
+ numKeys: Long,
+ capturedFileMappings: RocksDBFileMappings) {
def close(): Unit = {
silentDeleteRecursively(checkpointDir, s"Free up local checkpoint of
snapshot $version")
}
@@ -65,6 +70,7 @@ class RocksDB(
@volatile private var latestSnapshot: Option[RocksDBSnapshot] = None
@volatile private var lastSnapshotVersion = 0L
+ private val oldSnapshots = new ListBuffer[RocksDBSnapshot]
RocksDBLoader.loadLibrary()
@@ -148,6 +154,9 @@ class RocksDB(
try {
if (loadedVersion != version) {
closeDB()
+ // deep copy is needed to avoid race condition
+ // between maintenance and task threads
+ fileManager.copyFileMapping()
val latestSnapshotVersion =
fileManager.getLatestSnapshotVersion(version)
val metadata =
fileManager.loadCheckpointFromDfs(latestSnapshotVersion, workingDir)
loadedVersion = latestSnapshotVersion
@@ -156,7 +165,6 @@ class RocksDB(
if (lastSnapshotVersion > latestSnapshotVersion) {
// discard any newer snapshots
lastSnapshotVersion = 0L
- latestSnapshot = None
}
openDB()
@@ -368,10 +376,17 @@ class RocksDB(
// inside the uploadSnapshot() called below.
// If changelog checkpointing is enabled, snapshot will be uploaded
asynchronously
// during state store maintenance.
- latestSnapshot.foreach(_.close())
- latestSnapshot = Some(
- RocksDBSnapshot(checkpointDir, newVersion,
numKeysOnWritingVersion))
- lastSnapshotVersion = newVersion
+ synchronized {
+ if (latestSnapshot.isDefined) {
+ oldSnapshots += latestSnapshot.get
+ }
+ latestSnapshot = Some(
+ RocksDBSnapshot(checkpointDir,
+ newVersion,
+ numKeysOnWritingVersion,
+ fileManager.captureFileMapReference()))
+ lastSnapshotVersion = newVersion
+ }
}
}
@@ -421,22 +436,34 @@ class RocksDB(
}
private def uploadSnapshot(): Unit = {
+ var oldSnapshotsImmutable: List[RocksDBSnapshot] = Nil
val localCheckpoint = synchronized {
val checkpoint = latestSnapshot
latestSnapshot = None
+
+ // Convert mutable list buffer to immutable to prevent
+ // race condition with commit where old snapshot is added
+ oldSnapshotsImmutable = oldSnapshots.toList
+ oldSnapshots.clear()
+
checkpoint
}
localCheckpoint match {
- case Some(RocksDBSnapshot(localDir, version, numKeys)) =>
+ case Some(RocksDBSnapshot(localDir, version, numKeys,
capturedFileMappings)) =>
try {
val uploadTime = timeTakenMs {
- fileManager.saveCheckpointToDfs(localDir, version, numKeys)
+ fileManager.saveCheckpointToDfs(localDir, version, numKeys,
capturedFileMappings)
fileManagerMetrics = fileManager.latestSaveCheckpointMetrics
}
logInfo(s"$loggingId: Upload snapshot of version $version," +
s" time taken: $uploadTime ms")
} finally {
localCheckpoint.foreach(_.close())
+
+ // Clean up old latestSnapshots
+ for (snapshot <- oldSnapshotsImmutable) {
+ snapshot.close()
+ }
}
case _ =>
}
@@ -457,20 +484,7 @@ class RocksDB(
def doMaintenance(): Unit = {
if (enableChangelogCheckpointing) {
- // There is race to update latestSnapshot between load(), commit()
- // and uploadSnapshot().
- // The load method will reset latestSnapshot to discard any snapshots
taken
- // from newer versions (when a old version is reloaded).
- // commit() method deletes the existing snapshot while creating a new
snapshot.
- // In order to ensure that the snapshot being uploaded would not be
modified
- // concurrently, we need to synchronize the snapshot access between task
thread
- // and maintenance thread.
- acquire()
- try {
- uploadSnapshot()
- } finally {
- release()
- }
+ uploadSnapshot()
}
val cleanupTime = timeTakenMs {
fileManager.deleteOldVersions(conf.minVersionsToRetain)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index c527a6a03ae9..b4fe3e22e888 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -131,16 +131,6 @@ class RocksDBFileManager(
import RocksDBImmutableFile._
- private val versionToRocksDBFiles = new ConcurrentHashMap[Long,
Seq[RocksDBImmutableFile]]
-
-
- // used to keep a mapping of the exact Dfs file that was used to create a
local SST file.
- // The reason this is a separate map because versionToRocksDBFiles can
contain multiple similar
- // SST files to a particular local file (for example 1.sst can map to
1-UUID1.sst in v1 and
- // 1-UUID2.sst in v2). We need to capture the exact file used to ensure
Version ID compatibility
- // across SST files and RocksDB manifest.
- private[sql] val localFilesToDfsFiles = new ConcurrentHashMap[String,
RocksDBImmutableFile]
-
private lazy val fm = CheckpointFileManager.create(new Path(dfsRootDir),
hadoopConf)
private val fs = new Path(dfsRootDir).getFileSystem(hadoopConf)
private val onlyZipFiles = new PathFilter {
@@ -154,6 +144,30 @@ class RocksDBFileManager(
private def codec = CompressionCodec.createCodec(sparkConf, codecName)
+ @volatile private var fileMappings = RocksDBFileMappings(
+ new ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]],
+ new ConcurrentHashMap[String, RocksDBImmutableFile]
+ )
+
+ /**
+ * Make a deep copy of versionToRocksDBFiles and localFilesToDfsFiles to
avoid
+ * current task thread from overwriting the file mapping whenever background
maintenance
+ * thread attempts to upload a snapshot
+ */
+ def copyFileMapping() : Unit = {
+ val newVersionToRocksDBFiles = new ConcurrentHashMap[Long,
Seq[RocksDBImmutableFile]]
+ val newLocalFilesToDfsFiles = new ConcurrentHashMap[String,
RocksDBImmutableFile]
+
+ newVersionToRocksDBFiles.putAll(fileMappings.versionToRocksDBFiles)
+ newLocalFilesToDfsFiles.putAll(fileMappings.localFilesToDfsFiles)
+
+ fileMappings = RocksDBFileMappings(newVersionToRocksDBFiles,
newLocalFilesToDfsFiles)
+ }
+
+ def captureFileMapReference(): RocksDBFileMappings = {
+ fileMappings
+ }
+
def getChangeLogWriter(version: Long): StateStoreChangelogWriter = {
val rootDir = new Path(dfsRootDir)
val changelogFile = dfsChangelogFile(version)
@@ -185,10 +199,14 @@ class RocksDBFileManager(
def latestSaveCheckpointMetrics: RocksDBFileManagerMetrics =
saveCheckpointMetrics
/** Save all the files in given local checkpoint directory as a committed
version in DFS */
- def saveCheckpointToDfs(checkpointDir: File, version: Long, numKeys: Long):
Unit = {
+ def saveCheckpointToDfs(
+ checkpointDir: File,
+ version: Long,
+ numKeys: Long,
+ capturedFileMappings: RocksDBFileMappings): Unit = {
logFilesInDir(checkpointDir, s"Saving checkpoint files for version
$version")
val (localImmutableFiles, localOtherFiles) =
listRocksDBFiles(checkpointDir)
- val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles)
+ val rocksDBFiles = saveImmutableFilesToDfs(version, localImmutableFiles,
capturedFileMappings)
val metadata = RocksDBCheckpointMetadata(rocksDBFiles, numKeys)
val metadataFile = localMetadataFile(checkpointDir)
metadata.writeToFile(metadataFile)
@@ -219,10 +237,10 @@ class RocksDBFileManager(
// The unique ids of SST files are checked when opening a rocksdb
instance. The SST files
// in larger versions can't be reused even if they have the same size and
name because
// they belong to another rocksdb instance.
- versionToRocksDBFiles.keySet().removeIf(_ >= version)
+ fileMappings.versionToRocksDBFiles.keySet().removeIf(_ >= version)
val metadata = if (version == 0) {
if (localDir.exists) Utils.deleteRecursively(localDir)
- localFilesToDfsFiles.clear()
+ fileMappings.localFilesToDfsFiles.clear()
localDir.mkdirs()
RocksDBCheckpointMetadata(Seq.empty, 0)
} else {
@@ -235,7 +253,7 @@ class RocksDBFileManager(
val metadata = RocksDBCheckpointMetadata.readFromFile(metadataFile)
logInfo(s"Read metadata for version $version:\n${metadata.prettyJson}")
loadImmutableFilesFromDfs(metadata.immutableFiles, localDir)
- versionToRocksDBFiles.put(version, metadata.immutableFiles)
+ fileMappings.versionToRocksDBFiles.put(version, metadata.immutableFiles)
metadataFile.delete()
metadata
}
@@ -389,9 +407,9 @@ class RocksDBFileManager(
// Resolve RocksDB files for all the versions and find the max version
each file is used
val fileToMaxUsedVersion = new mutable.HashMap[String, Long]
sortedSnapshotVersions.foreach { version =>
- val files = Option(versionToRocksDBFiles.get(version)).getOrElse {
+ val files =
Option(fileMappings.versionToRocksDBFiles.get(version)).getOrElse {
val newResolvedFiles = getImmutableFilesFromVersionZip(version)
- versionToRocksDBFiles.put(version, newResolvedFiles)
+ fileMappings.versionToRocksDBFiles.put(version, newResolvedFiles)
newResolvedFiles
}
files.foreach(f => fileToMaxUsedVersion(f.dfsFileName) =
@@ -436,7 +454,7 @@ class RocksDBFileManager(
val versionFile = dfsBatchZipFile(version)
try {
fm.delete(versionFile)
- versionToRocksDBFiles.remove(version)
+ fileMappings.versionToRocksDBFiles.remove(version)
logDebug(s"Deleted version $version")
} catch {
case e: Exception =>
@@ -455,7 +473,8 @@ class RocksDBFileManager(
/** Save immutable files to DFS directory */
private def saveImmutableFilesToDfs(
version: Long,
- localFiles: Seq[File]): Seq[RocksDBImmutableFile] = {
+ localFiles: Seq[File],
+ capturedFileMappings: RocksDBFileMappings): Seq[RocksDBImmutableFile] = {
// Get the immutable files used in previous versions, as some of those
uploaded files can be
// reused for this version
logInfo(s"Saving RocksDB files to DFS for $version")
@@ -465,7 +484,8 @@ class RocksDBFileManager(
var filesReused = 0L
val immutableFiles = localFiles.map { localFile =>
- val existingDfsFile = localFilesToDfsFiles.asScala.get(localFile.getName)
+ val existingDfsFile =
+
capturedFileMappings.localFilesToDfsFiles.asScala.get(localFile.getName)
if (existingDfsFile.isDefined && existingDfsFile.get.sizeBytes ==
localFile.length()) {
val dfsFile = existingDfsFile.get
filesReused += 1
@@ -487,14 +507,14 @@ class RocksDBFileManager(
bytesCopied += localFileSize
val immutableDfsFile = RocksDBImmutableFile(localFile.getName,
dfsFileName, localFileSize)
- localFilesToDfsFiles.put(localFileName, immutableDfsFile)
+ capturedFileMappings.localFilesToDfsFiles.put(localFileName,
immutableDfsFile)
immutableDfsFile
}
}
logInfo(s"Copied $filesCopied files ($bytesCopied bytes) from local to" +
s" DFS for version $version. $filesReused files reused without copying.")
- versionToRocksDBFiles.put(version, immutableFiles)
+ capturedFileMappings.versionToRocksDBFiles.put(version, immutableFiles)
// Cleanup locally deleted files from the localFilesToDfsFiles map
// Locally, SST Files can be deleted due to RocksDB compaction. These
files need
@@ -534,7 +554,7 @@ class RocksDBFileManager(
.foreach { existingFile =>
val existingFileSize = existingFile.length()
val requiredFile =
requiredFileNameToFileDetails.get(existingFile.getName)
- val prevDfsFile =
localFilesToDfsFiles.asScala.get(existingFile.getName)
+ val prevDfsFile =
fileMappings.localFilesToDfsFiles.asScala.get(existingFile.getName)
val isSameFile = if (requiredFile.isDefined && prevDfsFile.isDefined) {
requiredFile.get.dfsFileName == prevDfsFile.get.dfsFileName &&
existingFile.length() == requiredFile.get.sizeBytes
@@ -544,7 +564,7 @@ class RocksDBFileManager(
if (!isSameFile) {
existingFile.delete()
- localFilesToDfsFiles.remove(existingFile.getName)
+ fileMappings.localFilesToDfsFiles.remove(existingFile.getName)
logInfo(s"Deleted local file $existingFile with size
$existingFileSize mapped" +
s" to previous dfsFile ${prevDfsFile.getOrElse("null")}")
} else {
@@ -574,7 +594,7 @@ class RocksDBFileManager(
}
filesCopied += 1
bytesCopied += localFileSize
- localFilesToDfsFiles.put(localFileName, file)
+ fileMappings.localFilesToDfsFiles.put(localFileName, file)
logInfo(s"Copied $dfsFile to $localFile - $localFileSize bytes")
} else {
filesReused += 1
@@ -592,13 +612,13 @@ class RocksDBFileManager(
private def removeLocallyDeletedSSTFilesFromDfsMapping(localFiles:
Seq[File]): Unit = {
// clean up deleted SST files from the localFilesToDfsFiles Map
val currentLocalFiles = localFiles.map(_.getName).toSet
- val mappingsToClean = localFilesToDfsFiles.asScala
+ val mappingsToClean = fileMappings.localFilesToDfsFiles.asScala
.keys
.filterNot(currentLocalFiles.contains)
mappingsToClean.foreach { f =>
logInfo(s"cleaning $f from the localFilesToDfsFiles map")
- localFilesToDfsFiles.remove(f)
+ fileMappings.localFilesToDfsFiles.remove(f)
}
}
@@ -705,6 +725,20 @@ class RocksDBFileManager(
}
}
+/**
+ * Track file mappings in RocksDB across local and remote directories
+ * @param versionToRocksDBFiles Mapping of RocksDB files used across versions
for maintenance
+ * @param localFilesToDfsFiles Mapping of the exact Dfs file used to create a
local SST file
+ * The reason localFilesToDfsFiles is a separate map because
versionToRocksDBFiles can contain
+ * multiple similar SST files to a particular local file (for example 1.sst
can map to 1-UUID1.sst
+ * in v1 and 1-UUID2.sst in v2). We need to capture the exact file used to
ensure Version ID
+ * compatibility across SST files and RocksDB manifest.
+ */
+
+case class RocksDBFileMappings(
+ versionToRocksDBFiles: ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]],
+ localFilesToDfsFiles: ConcurrentHashMap[String, RocksDBImmutableFile])
+
/**
* Metrics regarding RocksDB file sync between local and DFS.
*/
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 89b4925db707..973c1e0cb3b0 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -19,8 +19,11 @@ package org.apache.spark.sql.execution.streaming.state
import java.io._
import java.nio.charset.Charset
+import java.util.concurrent.Executors
import scala.collection.mutable
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration._
import scala.language.implicitConversions
import org.apache.commons.io.FileUtils
@@ -477,6 +480,41 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ testWithChangelogCheckpointingEnabled("RocksDBFileManager: " +
+ "background snapshot upload doesn't acquire RocksDB instance lock") {
+ // Create a custom ExecutionContext
+ implicit val ec: ExecutionContext = ExecutionContext
+ .fromExecutor(Executors.newSingleThreadExecutor())
+
+ val remoteDir = Utils.createTempDir().toString
+ val conf = dbConf.copy(lockAcquireTimeoutMs = 10000, minDeltasForSnapshot
= 0)
+ new File(remoteDir).delete() // to make sure that the directory gets
created
+
+ withDB(remoteDir, conf = conf) { db =>
+ db.load(0)
+ db.put("0", "0")
+ db.commit()
+
+ // Acquire lock
+ db.load(1)
+ db.put("1", "1")
+
+ // Run doMaintenance in another thread
+ val maintenanceFuture = Future {
+ db.doMaintenance()
+ }
+
+ val timeout = 5.seconds
+
+ // Ensure that maintenance task runs without being blocked by task thread
+ ThreadUtils.awaitResult(maintenanceFuture, timeout)
+ assert(snapshotVersionsPresent(remoteDir) == Seq(1))
+
+ // Release lock
+ db.commit()
+ }
+ }
+
testWithChangelogCheckpointingEnabled("RocksDBFileManager: read and write
changelog") {
val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath +
"/state/1/1")
val fileManager = new RocksDBFileManager(
@@ -1290,7 +1328,7 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
testWithChangelogCheckpointingEnabled("time travel 4 -" +
- " validate successful RocksDB load") {
+ " validate successful RocksDB load when metadata file is overwritten") {
val remoteDir = Utils.createTempDir().toString
val conf = dbConf.copy(minDeltasForSnapshot = 2, compactOnCommit = false)
new File(remoteDir).delete() // to make sure that the directory gets
created
@@ -1305,8 +1343,7 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
db.load(1)
db.put("3", "3")
- // do maintenance - upload any latest snapshots so far
- // would fail to acquire lock and no snapshots would be uploaded
+ // upload any latest snapshots so far
db.doMaintenance()
db.commit()
// upload newly created snapshot 2.zip
@@ -1318,6 +1355,47 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ testWithChangelogCheckpointingEnabled("time travel 5 -" +
+ "validate successful RocksDB load when metadata file is not overwritten") {
+ // Ensure commit doesn't modify the latestSnapshot that doMaintenance will
upload
+ val fmClass = "org.apache.spark.sql.execution.streaming.state." +
+ "NoOverwriteFileSystemBasedCheckpointFileManager"
+ withTempDir { dir =>
+ val conf = dbConf.copy(minDeltasForSnapshot = 0) // create snapshot
every commit
+ val hadoopConf = new Configuration()
+ hadoopConf.set(STREAMING_CHECKPOINT_FILE_MANAGER_CLASS.parent.key,
fmClass)
+
+ val remoteDir = dir.getCanonicalPath
+ withDB(remoteDir, conf = conf, hadoopConf = hadoopConf) { db =>
+ db.load(0)
+ db.put("a", "1")
+ db.commit()
+
+ // load previous version, and recreate the snapshot
+ db.load(0)
+ db.put("a", "1")
+
+ // upload version 1 snapshot created above
+ db.doMaintenance()
+ assert(snapshotVersionsPresent(remoteDir) == Seq(1))
+
+ db.commit() // create snapshot again
+
+ // load version 1 - should succeed
+ withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf) {
db =>
+ }
+
+ // upload recently created snapshot
+ db.doMaintenance()
+ assert(snapshotVersionsPresent(remoteDir) == Seq(1))
+
+ // load version 1 again - should succeed
+ withDB(remoteDir, version = 1, conf = conf, hadoopConf = hadoopConf) {
db =>
+ }
+ }
+ }
+ }
+
test("validate Rocks DB SST files do not have a VersionIdMismatch" +
" when metadata file is not overwritten - scenario 1") {
val fmClass = "org.apache.spark.sql.execution.streaming.state." +
@@ -1614,7 +1692,11 @@ class RocksDBSuite extends
AlsoTestWithChangelogCheckpointingEnabled with Shared
numKeys: Int): Unit = {
val checkpointDir = Utils.createTempDir().getAbsolutePath // local dir to
create checkpoints
generateFiles(checkpointDir, fileToLengths)
- fileManager.saveCheckpointToDfs(checkpointDir, version, numKeys)
+ fileManager.saveCheckpointToDfs(
+ checkpointDir,
+ version,
+ numKeys,
+ fileManager.captureFileMapReference())
}
def loadAndVerifyCheckpointFiles(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]