This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0225d4cf76cc [SPARK-53001][CORE][SQL][FOLLOW-UP] Disable 
`spark.memory.unmanagedMemoryPollingInterval` by default
0225d4cf76cc is described below

commit 0225d4cf76cc183af65811e80728649d971d9580
Author: Eric Marnadi <eric.marn...@databricks.com>
AuthorDate: Fri Aug 1 13:57:47 2025 -0700

    [SPARK-53001][CORE][SQL][FOLLOW-UP] Disable 
`spark.memory.unmanagedMemoryPollingInterval` by default
    
    ### What changes were proposed in this pull request?
    
    Follow up of 
[https://github.com/apache/spark/pull/51708](https://github.com/apache/spark/pull/51708),
 addressing nits and test feedback
    
    ### Why are the changes needed?
    
    To conform with Spark style standards and make tests less flaky
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #51778 from ericm-db/rocksdb-mm-followup.
    
    Authored-by: Eric Marnadi <eric.marn...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../org/apache/spark/internal/config/package.scala |  2 +-
 .../apache/spark/memory/UnifiedMemoryManager.scala | 56 +----------------
 .../spark/memory/UnmanagedMemoryConsumer.scala     | 72 ++++++++++++++++++++++
 .../sql/execution/streaming/state/RocksDB.scala    |  5 +-
 .../streaming/state/RocksDBMemoryManager.scala     |  2 +-
 .../state/RocksDBStateStoreIntegrationSuite.scala  | 49 ++++++---------
 6 files changed, 95 insertions(+), 91 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 0be2a53d7a0f..c25a4fd45c58 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -508,7 +508,7 @@ package object config {
         "Setting this to 0 disables unmanaged memory polling.")
       .version("4.1.0")
       .timeConf(TimeUnit.MILLISECONDS)
-      .createWithDefaultString("1s")
+      .createWithDefaultString("0s")
 
   private[spark] val STORAGE_UNROLL_MEMORY_THRESHOLD =
     ConfigBuilder("spark.storage.unrollMemoryThreshold")
diff --git 
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 0aec2c232aab..212b54239ee6 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -283,7 +283,7 @@ object UnifiedMemoryManager extends Logging {
    * @param unmanagedMemoryConsumer The consumer to register for memory 
tracking
    */
   def registerUnmanagedMemoryConsumer(
-                                       unmanagedMemoryConsumer: 
UnmanagedMemoryConsumer): Unit = {
+      unmanagedMemoryConsumer: UnmanagedMemoryConsumer): Unit = {
     val id = unmanagedMemoryConsumer.unmanagedMemoryConsumerId
     unmanagedMemoryConsumers.put(id, unmanagedMemoryConsumer)
   }
@@ -481,57 +481,3 @@ object UnifiedMemoryManager extends Logging {
     (usableMemory * memoryFraction).toLong
   }
 }
-
-/**
- * Identifier for an unmanaged memory consumer.
- *
- * @param componentType The type of component (e.g., "RocksDB", 
"NativeLibrary")
- * @param instanceKey A unique key to identify this specific instance of the 
component.
- *                    For shared memory consumers, this should be a common key 
across
- *                    all instances to avoid double counting.
- */
-case class UnmanagedMemoryConsumerId(
-                                      componentType: String,
-                                      instanceKey: String
-                                    )
-
-/**
- * Interface for components that consume memory outside of Spark's unified 
memory management.
- *
- * Components implementing this trait can register themselves with the memory 
manager
- * to have their memory usage tracked and factored into memory allocation 
decisions.
- * This helps prevent OOM errors when unmanaged components use significant 
memory.
- *
- * Examples of unmanaged memory consumers:
- * - RocksDB state stores in structured streaming
- * - Native libraries with custom memory allocation
- * - Off-heap caches managed outside of Spark
- */
-trait UnmanagedMemoryConsumer {
-  /**
-   * Returns the unique identifier for this memory consumer.
-   * The identifier is used to track and manage the consumer in the memory 
tracking system.
-   */
-  def unmanagedMemoryConsumerId: UnmanagedMemoryConsumerId
-
-  /**
-   * Returns the memory mode (ON_HEAP or OFF_HEAP) that this consumer uses.
-   * This is used to ensure unmanaged memory usage only affects the correct 
memory pool.
-   */
-  def memoryMode: MemoryMode
-
-  /**
-   * Returns the current memory usage in bytes.
-   *
-   * This method is called periodically by the memory polling mechanism to 
track
-   * memory usage over time. Implementations should return the current total 
memory
-   * consumed by this component.
-   *
-   * @return Current memory usage in bytes. Should return 0 if no memory is 
currently used.
-   *         Return -1L to indicate this consumer is no longer active and 
should be
-   *         automatically removed from tracking.
-   * @throws Exception if memory usage cannot be determined. The polling 
mechanism
-   *                   will handle exceptions gracefully and log warnings.
-   */
-  def getMemBytesUsed: Long
-}
diff --git 
a/core/src/main/scala/org/apache/spark/memory/UnmanagedMemoryConsumer.scala 
b/core/src/main/scala/org/apache/spark/memory/UnmanagedMemoryConsumer.scala
new file mode 100644
index 000000000000..835191828215
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/memory/UnmanagedMemoryConsumer.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+/**
+ * Identifier for an unmanaged memory consumer.
+ *
+ * @param componentType The type of component (e.g., "RocksDB", 
"NativeLibrary")
+ * @param instanceKey A unique key to identify this specific instance of the 
component.
+ *                    For shared memory consumers, this should be a common key 
across
+ *                    all instances to avoid double counting.
+ */
+case class UnmanagedMemoryConsumerId(
+    componentType: String,
+    instanceKey: String
+)
+
+/**
+ * Interface for components that consume memory outside of Spark's unified 
memory management.
+ *
+ * Components implementing this trait can register themselves with the memory 
manager
+ * to have their memory usage tracked and factored into memory allocation 
decisions.
+ * This helps prevent OOM errors when unmanaged components use significant 
memory.
+ *
+ * Examples of unmanaged memory consumers:
+ * - RocksDB state stores in structured streaming
+ * - Native libraries with custom memory allocation
+ * - Off-heap caches managed outside of Spark
+ */
+trait UnmanagedMemoryConsumer {
+  /**
+   * Returns the unique identifier for this memory consumer.
+   * The identifier is used to track and manage the consumer in the memory 
tracking system.
+   */
+  def unmanagedMemoryConsumerId: UnmanagedMemoryConsumerId
+
+  /**
+   * Returns the memory mode (ON_HEAP or OFF_HEAP) that this consumer uses.
+   * This is used to ensure unmanaged memory usage only affects the correct 
memory pool.
+   */
+  def memoryMode: MemoryMode
+
+  /**
+   * Returns the current memory usage in bytes.
+   *
+   * This method is called periodically by the memory polling mechanism to 
track
+   * memory usage over time. Implementations should return the current total 
memory
+   * consumed by this component.
+   *
+   * @return Current memory usage in bytes. Should return 0 if no memory is 
currently used.
+   *         Return -1L to indicate this consumer is no longer active and 
should be
+   *         automatically removed from tracking.
+   * @throws Exception if memory usage cannot be determined. The polling 
mechanism
+   *                   will handle exceptions gracefully and log warnings.
+   */
+  def getMemBytesUsed: Long
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 1ee5f6783671..dc07351e7914 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -1454,11 +1454,8 @@ class RocksDB(
    * @return Total memory usage in bytes across all tracked components
    */
   def getMemoryUsage: Long = {
-
     require(db != null && !db.isClosed, "RocksDB must be open to get memory 
usage")
-    RocksDB.mainMemorySources.map { memorySource =>
-      getDBProperty(memorySource)
-    }.sum
+    RocksDB.mainMemorySources.map(getDBProperty).sum
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
index 80ad864600b2..2fc5c37814a4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBMemoryManager.scala
@@ -36,7 +36,7 @@ import org.apache.spark.memory.{MemoryMode, 
UnifiedMemoryManager, UnmanagedMemor
  * UnifiedMemoryManager, allowing Spark to account for RocksDB memory when 
making
  * memory allocation decisions.
  */
-object RocksDBMemoryManager extends Logging with UnmanagedMemoryConsumer{
+object RocksDBMemoryManager extends Logging with UnmanagedMemoryConsumer {
   private var writeBufferManager: WriteBufferManager = null
   private var cache: Cache = null
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
index 1e95da35b86a..9c7b03ac06f5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreIntegrationSuite.scala
@@ -21,8 +21,9 @@ import java.io.File
 
 import scala.jdk.CollectionConverters.SetHasAsScala
 
-import org.scalatest.time.{Minute, Span}
+import org.scalatest.time.{Millis, Minute, Seconds, Span}
 
+import org.apache.spark.memory.UnifiedMemoryManager
 import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamingQueryWrapper}
 import org.apache.spark.sql.functions.{count, max}
 import org.apache.spark.sql.internal.SQLConf
@@ -332,9 +333,6 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
           ("spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage" ->
             boundedMemoryEnabled.toString)) {
 
-          import org.apache.spark.memory.UnifiedMemoryManager
-          import org.apache.spark.sql.streaming.Trigger
-
           // Use rate stream to ensure continuous state operations that 
trigger memory updates
           val query = spark.readStream
             .format("rate")
@@ -350,38 +348,29 @@ class RocksDBStateStoreIntegrationSuite extends StreamTest
             .start()
 
           try {
-            // Let the stream run to establish RocksDB instances and generate 
state operations
-            Thread.sleep(2000) // 2 seconds should be enough for several 
processing cycles
-
-            // Now check for memory tracking - the continuous stream should 
trigger memory updates
-            var rocksDBMemory = 0L
-            var attempts = 0
-            val maxAttempts = 15 // 15 attempts with 1-second intervals = 15 
seconds max
-
-            while (rocksDBMemory <= 0L && attempts < maxAttempts) {
-              Thread.sleep(1000) // Wait between checks to allow memory updates
-              rocksDBMemory = 
UnifiedMemoryManager.getMemoryByComponentType("RocksDB")
-              attempts += 1
-
-              if (rocksDBMemory > 0L) {
-                logInfo(s"RocksDB memory detected: $rocksDBMemory bytes " +
-                  s"after $attempts attempts with 
boundedMemory=$boundedMemoryEnabled")
-              }
+            // Check for memory tracking - the continuous stream should 
trigger memory updates
+            var initialRocksDBMemory = 0L
+            eventually(timeout(Span(20, Seconds)), interval(Span(500, 
Millis))) {
+              initialRocksDBMemory = 
UnifiedMemoryManager.getMemoryByComponentType("RocksDB")
+              assert(initialRocksDBMemory > 0L,
+                s"RocksDB memory should be tracked with 
boundedMemory=$boundedMemoryEnabled")
             }
 
+            logInfo(s"RocksDB memory detected: $initialRocksDBMemory bytes " +
+              s"with boundedMemory=$boundedMemoryEnabled")
+
             // Verify memory tracking remains stable during continued operation
-            Thread.sleep(2000) // Let stream continue running
+            eventually(timeout(Span(5, Seconds)), interval(Span(500, Millis))) 
{
+              val currentMemory = 
UnifiedMemoryManager.getMemoryByComponentType("RocksDB")
+              assert(currentMemory > 0L,
+                s"RocksDB memory tracking should remain active during stream 
processing: " +
+                  s"got $currentMemory bytes (initial: $initialRocksDBMemory) 
" +
+                  s"with boundedMemory=$boundedMemoryEnabled")
+            }
 
             val finalMemory = 
UnifiedMemoryManager.getMemoryByComponentType("RocksDB")
-
-            // Memory should still be tracked (allow for some fluctuation)
-            assert(finalMemory > 0L,
-              s"RocksDB memory tracking should remain active during stream 
processing: " +
-                s"got $finalMemory bytes (initial: $rocksDBMemory) " +
-                s"with boundedMemory=$boundedMemoryEnabled")
-
             logInfo(s"RocksDB memory tracking test completed successfully: " +
-              s"initial=$rocksDBMemory bytes, final=$finalMemory bytes " +
+              s"initial=$initialRocksDBMemory bytes, final=$finalMemory bytes 
" +
               s"with boundedMemory=$boundedMemoryEnabled")
 
           } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to