This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c31f5a807e7 [SPARK-50443][SS] Fixing Maven build errors introduced by 
Guava cache in RocksDBStateStoreProvider
0c31f5a807e7 is described below

commit 0c31f5a807e7aa01cd46424d52441f514e491943
Author: Eric Marnadi <[email protected]>
AuthorDate: Thu Nov 28 12:48:24 2024 +0900

    [SPARK-50443][SS] Fixing Maven build errors introduced by Guava cache in 
RocksDBStateStoreProvider
    
    ### What changes were proposed in this pull request?
    
    There are maven errors introduced by the guava dependency in `sql/core`, as 
we use the Guava cache to store the Avro encoders, outlined in this comment: 
https://github.com/apache/spark/pull/48401#issuecomment-2504353098
    Introduced a new constructor for the NonFateSharingCache and used this with 
the RocksDBStateStoreProvider.
    
    ### Why are the changes needed?
    
    To resolve maven build errors, so that the Avro change here: 
https://github.com/apache/spark/pull/48401 does not get reverted.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing unit tests are sufficient and maven build works on devbox
    ```
    [INFO] Tests run: 47, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 
17.64 s -- in test.org.apache.spark.sql.JavaDatasetSuite
    [INFO]
    [INFO] Results:
    [INFO]
    [INFO] Tests run: 47, Failures: 0, Errors: 0, Skipped: 0
    [INFO]
    [INFO]
    [INFO] --- surefire:3.2.5:test (test)  spark-sql_2.13 ---
    [INFO] Skipping execution of surefire because it has already been run for 
this configuration
    [INFO]
    [INFO] --- scalatest:2.2.0:test (test)  spark-sql_2.13 ---
    [INFO] ScalaTest report directory: 
/home/eric.marnadi/spark/sql/core/target/surefire-reports
    WARNING: Using incubator modules: jdk.incubator.vector
    Discovery starting.
    Discovery completed in 2 seconds, 737 milliseconds.
    Run starting. Expected test count is: 0
    DiscoverySuite:
    Run completed in 2 seconds, 765 milliseconds.
    Total number of tests run: 0
    Suites: completed 1, aborted 0
    Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
    No tests were executed.
    [INFO] 
------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] 
------------------------------------------------------------------------
    [INFO] Total time:  03:15 min
    [INFO] Finished at: 2024-11-28T01:10:36Z
    [INFO] 
------------------------------------------------------------------------
    ```
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #48996 from ericm-db/chm.
    
    Authored-by: Eric Marnadi <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../org/apache/spark/util/NonFateSharingCache.scala     | 16 +++++++++++++++-
 .../streaming/state/RocksDBStateStoreProvider.scala     | 17 ++++++++---------
 2 files changed, 23 insertions(+), 10 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala 
b/core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala
index 21184d70b386..7d01facc1e42 100644
--- a/core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala
+++ b/core/src/main/scala/org/apache/spark/util/NonFateSharingCache.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.util
 
-import java.util.concurrent.Callable
+import java.util.concurrent.{Callable, TimeUnit}
 
 import com.google.common.cache.{Cache, CacheBuilder, CacheLoader, LoadingCache}
 
@@ -68,6 +68,20 @@ private[spark] object NonFateSharingCache {
       override def load(k: K): V = loadingFunc.apply(k)
     }))
   }
+
+  def apply[K, V](
+      maximumSize: Long,
+      expireAfterAccessTime: Long,
+      expireAfterAccessTimeUnit: TimeUnit): NonFateSharingCache[K, V] = {
+    val builder = CacheBuilder.newBuilder().asInstanceOf[CacheBuilder[K, V]]
+    if (maximumSize > 0L) {
+      builder.maximumSize(maximumSize)
+    }
+    if(expireAfterAccessTime > 0) {
+      builder.expireAfterAccess(expireAfterAccessTime, 
expireAfterAccessTimeUnit)
+    }
+    new NonFateSharingCache(builder.build[K, V]())
+  }
 }
 
 private[spark] class NonFateSharingCache[K, V](protected val cache: Cache[K, 
V]) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index e5a4175aeec1..c9c987fa1620 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 
 import scala.util.control.NonFatal
 
-import com.google.common.cache.CacheBuilder
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
@@ -613,15 +612,15 @@ object RocksDBStateStoreProvider {
   val VIRTUAL_COL_FAMILY_PREFIX_BYTES = 2
 
   private val MAX_AVRO_ENCODERS_IN_CACHE = 1000
-  // Add the cache at companion object level so it persists across provider 
instances
-  private val avroEncoderMap: NonFateSharingCache[String, AvroEncoder] = {
-    val guavaCache = CacheBuilder.newBuilder()
-      .maximumSize(MAX_AVRO_ENCODERS_IN_CACHE)  // Adjust size based on your 
needs
-      .expireAfterAccess(1, TimeUnit.HOURS)  // Optional: Add expiration if 
needed
-      .build[String, AvroEncoder]()
+  private val AVRO_ENCODER_LIFETIME_HOURS = 1L
 
-    new NonFateSharingCache(guavaCache)
-  }
+  // Add the cache at companion object level so it persists across provider 
instances
+  private val avroEncoderMap: NonFateSharingCache[String, AvroEncoder] =
+    NonFateSharingCache(
+      maximumSize = MAX_AVRO_ENCODERS_IN_CACHE,
+      expireAfterAccessTime = AVRO_ENCODER_LIFETIME_HOURS,
+      expireAfterAccessTimeUnit = TimeUnit.HOURS
+    )
 
   def getAvroEnc(
       stateStoreEncoding: String,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to