[ 
https://issues.apache.org/jira/browse/SPARK-38931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17525403#comment-17525403
 ] 

Jungtaek Lim commented on SPARK-38931:
--------------------------------------

(Leaving the history; SPARK-37724 will be introduced in Spark 3.3.0 which is 
not yet released - I fixed the affected version. The fix doesn't harm to 3.2 
version line indeed - I'll leave the commit as it is.)

> RocksDB File manager would not create initial dfs directory with unknown 
> number of keys on 1st empty checkpoint
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-38931
>                 URL: https://issues.apache.org/jira/browse/SPARK-38931
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.3.0
>            Reporter: Yun Tang
>            Assignee: Yun Tang
>            Priority: Major
>             Fix For: 3.3.0, 3.2.2
>
>
> Currently, we could disable to track the number of keys for performance when 
> using RocksDB state store. However, if the 1st checkpoint is empty, it will 
> not create the root dfs directory, which leads to exception below:
> {code:java}
> File 
> /private/var/folders/rk/wyr101_562ngn8lp7tbqt7_00000gp/T/spark-ce4a0607-b1d8-43b8-becd-638c6b030019/state/1/1
>  does not exist
> java.io.FileNotFoundException: File 
> /private/var/folders/rk/wyr101_562ngn8lp7tbqt7_00000gp/T/spark-ce4a0607-b1d8-43b8-becd-638c6b030019/state/1/1
>  does not exist
>       at 
> org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:779)
>       at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1100)
>       at 
> org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:769)
>       at 
> org.apache.hadoop.fs.DelegateToFileSystem.getFileStatus(DelegateToFileSystem.java:128)
>       at 
> org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:93)
>       at 
> org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:353)
>       at org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:400)
>       at 
> org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:626)
>       at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:701)
>       at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:697)
>       at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
>       at org.apache.hadoop.fs.FileContext.create(FileContext.java:703)
>       at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:327)
>       at 
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:140)
>       at 
> org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:143)
>       at 
> org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:333)
>       at 
> org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.zipToDfsFile(RocksDBFileManager.scala:438)
>       at 
> org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.saveCheckpointToDfs(RocksDBFileManager.scala:174)
>       at 
> org.apache.spark.sql.execution.streaming.state.RocksDBSuite.saveCheckpointFiles(RocksDBSuite.scala:566)
>       at 
> org.apache.spark.sql.execution.streaming.state.RocksDBSuite.$anonfun$new$35(RocksDBSuite.scala:179)
>       at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
>       at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
>       at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
>       at org.scalatest.Transformer.apply(Transformer.scala:22)
>       at org.scalatest.Transformer.apply(Transformer.scala:20)
>       at 
> org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:190)
>       at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:203)
>       at 
> org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:188)
>       at 
> org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:200)
>       at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
>       at 
> org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:200)
>       at 
> org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:182)
>       at 
> org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:64)
>       at 
> org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
>       at 
> org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
>       at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:64)
>       at 
> org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:233)
>       at 
> org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
>       at scala.collection.immutable.List.foreach(List.scala:431)
>       at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
>       at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
>       at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
>       at 
> org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:233)
>       at 
> org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:232)
>       at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1563)
>       at org.scalatest.Suite.run(Suite.scala:1112)
>       at org.scalatest.Suite.run$(Suite.scala:1094)
>       at 
> org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1563)
>       at 
> org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:237)
>       at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
>       at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:237)
>       at 
> org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:236)
>       at 
> org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:64)
>       at 
> org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
>       at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
>       at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
>       at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:64)
>       at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
>       at 
> org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1320)
>       at 
> org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1314)
>       at scala.collection.immutable.List.foreach(List.scala:431)
>       at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1314)
>       at 
> org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:993)
>       at 
> org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:971)
>       at 
> org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1480)
>       at 
> org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:971)
>       at org.scalatest.tools.Runner$.run(Runner.scala:798)
>       at org.scalatest.tools.Runner.run(Runner.scala)
>       at 
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:38)
>       at 
> org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:25)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to