This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 8811e8caaa8 [SPARK-38931][SS] Create root dfs directory for
RocksDBFileManager with unknown number of keys on 1st checkpoint
8811e8caaa8 is described below
commit 8811e8caaa8540d1fa05fb77152043addc607b82
Author: Yun Tang <[email protected]>
AuthorDate: Tue Apr 19 20:31:04 2022 +0900
[SPARK-38931][SS] Create root dfs directory for RocksDBFileManager with
unknown number of keys on 1st checkpoint
### What changes were proposed in this pull request?
Create root dfs directory for RocksDBFileManager with unknown number of
keys on 1st checkpoint.
### Why are the changes needed?
If this fix is not introduced, we might meet exception below:
~~~java
File
/private/var/folders/rk/wyr101_562ngn8lp7tbqt7_00000gp/T/spark-ce4a0607-b1d8-43b8-becd-638c6b030019/state/1/1
does not exist
java.io.FileNotFoundException: File
/private/var/folders/rk/wyr101_562ngn8lp7tbqt7_00000gp/T/spark-ce4a0607-b1d8-43b8-becd-638c6b030019/state/1/1
does not exist
at
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
at
org.apache.hadoop.fs.DelegateToFileSystem.getFileStatus(DelegateToFileSystem.java:128)
at
org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:93)
at
org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:353)
at
org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:400)
at
org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626)
at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701)
at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697)
at
org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.create(FileContext.java:703)
at
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:327)
at
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:140)
at
org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:143)
at
org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:333)
at
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.zipToDfsFile(RocksDBFileManager.scala:438)
at
org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.saveCheckpointToDfs(RocksDBFileManager.scala:174)
at
org.apache.spark.sql.execution.streaming.state.RocksDBSuite.saveCheckpointFiles(RocksDBSuite.scala:566)
at
org.apache.spark.sql.execution.streaming.state.RocksDBSuite.$anonfun$new$35(RocksDBSuite.scala:179)
........
~~~
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tested via RocksDBSuite.
Closes #36242 from Myasuka/SPARK-38931.
Authored-by: Yun Tang <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit abb1df9d190e35a17b693f2b013b092af4f2528a)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../streaming/state/RocksDBFileManager.scala | 4 +++-
.../sql/execution/streaming/state/RocksDBSuite.scala | 19 +++++++++++++++++++
2 files changed, 22 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index 4f2ce9b1237..26084747c32 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -161,11 +161,13 @@ class RocksDBFileManager(
metadata.writeToFile(metadataFile)
logInfo(s"Written metadata for version $version:\n${metadata.prettyJson}")
- if (version <= 1 && numKeys == 0) {
+ if (version <= 1 && numKeys <= 0) {
// If we're writing the initial version and there's no data, we have to
explicitly initialize
// the root directory. Normally saveImmutableFilesToDfs will do this
initialization, but
// when there's no data that method won't write any files, and
zipToDfsFile uses the
// CheckpointFileManager.createAtomic API which doesn't auto-initialize
parent directories.
+ // Moreover, once we disable to track the number of keys, in which the
numKeys is -1, we
+ // still need to create the initial dfs root directory anyway.
val path = new Path(dfsRootDir)
if (!fm.exists(path)) fm.mkdirs(path)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 91cd91b639a..75717d27687 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -170,6 +170,25 @@ class RocksDBSuite extends SparkFunSuite {
}
}
+ test("RocksDBFileManager: create init dfs directory with unknown number of
keys") {
+ val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath +
"/state/1/1")
+ try {
+ val verificationDir = Utils.createTempDir().getAbsolutePath
+ val fileManager = new RocksDBFileManager(
+ dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)
+ // Save a version of empty checkpoint files
+ val cpFiles = Seq()
+ generateFiles(verificationDir, cpFiles)
+ assert(!dfsRootDir.exists())
+ saveCheckpointFiles(fileManager, cpFiles, version = 1, numKeys = -1)
+ // The dfs root dir is created even with unknown number of keys
+ assert(dfsRootDir.exists())
+ loadAndVerifyCheckpointFiles(fileManager, verificationDir, version = 1,
Nil, -1)
+ } finally {
+ Utils.deleteRecursively(dfsRootDir)
+ }
+ }
+
test("RocksDBFileManager: upload only new immutable files") {
withTempDir { dir =>
val dfsRootDir = dir.getAbsolutePath
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]