This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 8da592dc2f92 [SPARK-44639][SS][YARN] Use Java tmp dir for local 
RocksDB state storage on Yarn
8da592dc2f92 is described below

commit 8da592dc2f92a64255b3b4335e82dd382c21873e
Author: Adam Binford <[email protected]>
AuthorDate: Sat Nov 29 12:52:27 2025 -0800

    [SPARK-44639][SS][YARN] Use Java tmp dir for local RocksDB state storage on 
Yarn
    
    ### What changes were proposed in this pull request?
    
    Update the RocksDB state store to store its local data underneath 
`java.io.tmpdir` instead of going through `Utils.getLocalDir` when running on 
Yarn. This is done through a new util method `createExecutorLocalTempDir`, as 
there may be other uses cases for this behavior as well.
    
    ### Why are the changes needed?
    
    On YARN, the local RocksDB directory is placed in a directory created 
inside the root application folder such as
    
    ```
    
/tmp/nm-local-dir/usercache/<user>/appcache/<app_id>/spark-<uuid>/StateStoreId(...)
    ```
    
    The problem with this is that if an executor crashes for some reason (like 
OOM) and the shutdown hooks don't get run, this directory will stay around 
forever until the application finishes, which can cause jobs to slowly 
accumulate more and more temporary space until finally the node manager goes 
unhealthy.
    
    Because this data will only ever be accessed by the executor that created 
this directory, it would make sense to store the data inside the container 
folder, which will always get cleaned up by the node manager when that yarn 
container gets cleaned up. Yarn sets the `java.io.tmpdir` to a path inside this 
directory, such as
    
    ```
    
/tmp/nm-local-dir/usercache/<user>/appcache/<app_id>/<container_id>/tmp/StateStoreId(...)
    ```
    
    It looks like only Yarn setts the tmpdir property, and other resource 
managers (standalone and k8s) always rely on the local dirs setting/env vars.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Shouldn't be any effective changes, other than preventing disk space from 
filling up on Node Managers under certain scenarios.
    
    ### How was this patch tested?
    
    New UT
    
    Closes #42301 from Kimahriman/rocksdb-tmp-dir.
    
    Authored-by: Adam Binford <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 9e9358a309d2ea2236beb4bdb67091fa5c382b28)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../main/scala/org/apache/spark/util/Utils.scala   | 16 +++++++++++++
 .../sql/execution/streaming/state/RocksDB.scala    |  2 +-
 .../state/RocksDBStateStoreProvider.scala          |  6 +++--
 .../execution/streaming/state/RocksDBSuite.scala   | 26 +++++++++++++++++++++-
 4 files changed, 46 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 721719f2a976..0907d6d049bf 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -250,6 +250,22 @@ private[spark] object Utils
     dir
   }
 
+  /**
+   * Create a temporary directy that will always be cleaned up when the 
executor stops,
+   * even in the case of a hard shutdown when the shutdown hooks don't get run.
+   *
+   * Currently this only provides special behavior on YARN, where the local 
dirs are not
+   * guaranteed to be cleaned up on executors hard shutdown.
+   */
+  def createExecutorLocalTempDir(conf: SparkConf, namePrefix: String): File = {
+    if (Utils.isRunningInYarnContainer(conf)) {
+      // Just use the default Java tmp dir which is set to inside the 
container directory on YARN
+      createTempDir(namePrefix = namePrefix)
+    } else {
+      createTempDir(getLocalDir(conf), namePrefix)
+    }
+  }
+
   /**
    * Copy the first `maxSize` bytes of data from the InputStream to an 
in-memory
    * buffer, primarily to check for corruption.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index aa02d708933d..b12cdb9bba95 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -69,7 +69,7 @@ case object StoreTaskCompletionListener extends 
RocksDBOpType("store_task_comple
 class RocksDB(
     dfsRootDir: String,
     val conf: RocksDBConf,
-    localRootDir: File = Utils.createTempDir(),
+    val localRootDir: File = Utils.createTempDir(),
     hadoopConf: Configuration = new Configuration,
     loggingId: String = "",
     useColumnFamilies: Boolean = false,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 2cc4c8a870ae..1058c02c9304 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -837,6 +837,9 @@ private[sql] class RocksDBStateStoreProvider
   @volatile private var stateSchemaProvider: Option[StateSchemaProvider] = _
   @volatile private var rocksDBEventForwarder: Option[RocksDBEventForwarder] = 
_
   @volatile private var stateStoreProviderId: StateStoreProviderId = _
+  // Exposed for testing
+  @volatile private[sql] var sparkConf: SparkConf = 
Option(SparkEnv.get).map(_.conf)
+    .getOrElse(new SparkConf)
 
   protected def createRocksDB(
       dfsRootDir: String,
@@ -867,8 +870,7 @@ private[sql] class RocksDBStateStoreProvider
     val storeIdStr = s"StateStoreId(opId=${stateStoreId.operatorId}," +
       s"partId=${stateStoreId.partitionId},name=${stateStoreId.storeName})"
     val loggingId = stateStoreProviderId.toString
-    val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
-    val localRootDir = Utils.createTempDir(Utils.getLocalDir(sparkConf), 
storeIdStr)
+    val localRootDir = Utils.createExecutorLocalTempDir(sparkConf, storeIdStr)
     createRocksDB(dfsRootDir, RocksDBConf(storeConf), localRootDir, 
hadoopConf, loggingId,
       useColumnFamilies, storeConf.enableStateStoreCheckpointIds, 
stateStoreId.partitionId,
       rocksDBEventForwarder,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index da6c3e62798e..6c22436c29a0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
 import 
org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, 
FileContextBasedCheckpointFileManager, FileSystemBasedCheckpointFileManager}
 import 
org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager.{CancellableFSDataOutputStream,
 RenameBasedFSDataOutputStream}
+import org.apache.spark.sql.execution.streaming.runtime.StreamExecution
 import org.apache.spark.sql.internal.SQLConf
 import 
org.apache.spark.sql.internal.SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS
 import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
@@ -51,7 +52,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.tags.SlowSQLTest
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.unsafe.types.UTF8String
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.{SparkConfWithEnv, ThreadUtils, Utils}
 import org.apache.spark.util.ArrayImplicits._
 
 class NoOverwriteFileSystemBasedCheckpointFileManager(path: Path, hadoopConf: 
Configuration)
@@ -3791,6 +3792,29 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures 
with SharedSparkSession
     }}
   }
 
+  test("SPARK-44639: Use Java tmp dir instead of configured local dirs on 
Yarn") {
+    val conf = new Configuration()
+    conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
+
+    val provider = new RocksDBStateStoreProvider()
+    provider.sparkConf = new SparkConfWithEnv(Map("CONTAINER_ID" -> "1"))
+    provider.init(
+      StateStoreId(
+        "/checkpoint",
+        0,
+        0
+      ),
+      new StructType(),
+      new StructType(),
+      NoPrefixKeyStateEncoderSpec(new StructType()),
+      false,
+      new StateStoreConf(sqlConf),
+      conf
+    )
+
+    assert(provider.rocksDB.localRootDir.getParent() == 
System.getProperty("java.io.tmpdir"))
+  }
+
   private def dbConf = RocksDBConf(StateStoreConf(SQLConf.get.clone()))
 
   class RocksDBCheckpointFormatV2(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to