This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4eac0244c765 [SPARK-45503][SS] Add Conf to Set RocksDB Compression
4eac0244c765 is described below

commit 4eac0244c7651cb23ed4ae7dec3b36661b9f6408
Author: Siying Dong <siying.d...@databricks.com>
AuthorDate: Wed Oct 25 05:25:43 2023 +0900

    [SPARK-45503][SS] Add Conf to Set RocksDB Compression
    
    ### What changes were proposed in this pull request?
    Add a conf to set RocksDB State Store Instance's compression type, with 
default to be LZ4
    
    ### Why are the changes needed?
    LZ4 is generally faster than Snappy. That's probably why we use LZ4 in 
changelogs and other places by default. However, we don't change RocksDB's 
default of Snappy compression style. The RocksDB Team recommend LZ4 and/or ZSTD 
(https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#compression)
 and the default is kept to Snappy only for backward compatibility reason. We 
should make it tunable with default LZ4
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing tests should fix it. Some benchmarks are run and we see positive 
improvements.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #43338 from siying/rocksdb_lz4.
    
    Authored-by: Siying Dong <siying.d...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 docs/structured-streaming-programming-guide.md     |  5 +++++
 .../sql/execution/streaming/state/RocksDB.scala    | 23 +++++++++++++++++++---
 .../streaming/state/RocksDBStateStoreSuite.scala   |  2 ++
 .../execution/streaming/state/RocksDBSuite.scala   | 16 +++++++++++++++
 4 files changed, 43 insertions(+), 3 deletions(-)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 9fb823abaa3a..547834c7f9e3 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -2390,6 +2390,11 @@ Here are the configs regarding to RocksDB instance of 
the state store provider:
     <td>Allow the rocksdb runtime to use fallocate to pre-allocate disk space 
for logs, etc...  Disable for apps that have many smaller state stores to trade 
off disk space for write performance.</td>
     <td>true</td>
   </tr>
+  <tr>
+    <td>spark.sql.streaming.stateStore.rocksdb.compression</td>
+    <td>Compression type used in RocksDB. The string is converted RocksDB 
compression type through RocksDB Java API getCompressionType(). </td>
+    <td>lz4</td>
+  </tr>
 </table>
 
 ##### RocksDB State Store Memory Management
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 1e3f3a67f16f..40f63c86a6a4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration
 import org.json4s.NoTypeHints
 import org.json4s.jackson.Serialization
 import org.rocksdb.{RocksDB => NativeRocksDB, _}
+import org.rocksdb.CompressionType._
 import org.rocksdb.TickerType._
 
 import org.apache.spark.TaskContext
@@ -91,7 +92,7 @@ class RocksDB(
     tableFormatConfig.setPinL0FilterAndIndexBlocksInCache(true)
   }
 
-  private val columnFamilyOptions = new ColumnFamilyOptions()
+  private[state] val columnFamilyOptions = new ColumnFamilyOptions()
 
   // Set RocksDB options around MemTable memory usage. By default, we let 
RocksDB
   // use its internal default values for these settings.
@@ -103,6 +104,8 @@ class RocksDB(
     columnFamilyOptions.setMaxWriteBufferNumber(conf.maxWriteBufferNumber)
   }
 
+  columnFamilyOptions.setCompressionType(getCompressionType(conf.compression))
+
   private val dbOptions =
     new Options(new DBOptions(), columnFamilyOptions) // options to open the 
RocksDB
 
@@ -676,7 +679,8 @@ case class RocksDBConf(
     writeBufferCacheRatio: Double,
     highPriorityPoolRatio: Double,
     compressionCodec: String,
-    allowFAllocate: Boolean)
+    allowFAllocate: Boolean,
+    compression: String)
 
 object RocksDBConf {
   /** Common prefix of all confs in SQLConf that affects RocksDB */
@@ -767,6 +771,10 @@ object RocksDBConf {
   val ALLOW_FALLOCATE_CONF_KEY = "allowFAllocate"
   private val ALLOW_FALLOCATE_CONF = SQLConfEntry(ALLOW_FALLOCATE_CONF_KEY, 
"true")
 
+  // Pass as compression type to RocksDB.
+  val COMPRESSION_KEY = "compression"
+  private val COMPRESSION_CONF = SQLConfEntry(COMPRESSION_KEY, "lz4")
+
   def apply(storeConf: StateStoreConf): RocksDBConf = {
     val sqlConfs = CaseInsensitiveMap[String](storeConf.sqlConfs)
     val extraConfs = CaseInsensitiveMap[String](storeConf.extraOptions)
@@ -826,6 +834,14 @@ object RocksDBConf {
       }
     }
 
+    def getStringConf(conf: ConfEntry): String = {
+      Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default).toString 
} getOrElse {
+        throw new IllegalArgumentException(
+          s"Invalid value for '${conf.fullName}', must be a string"
+        )
+      }
+    }
+
     RocksDBConf(
       storeConf.minVersionsToRetain,
       storeConf.minDeltasForSnapshot,
@@ -845,7 +861,8 @@ object RocksDBConf {
       getRatioConf(WRITE_BUFFER_CACHE_RATIO_CONF),
       getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF),
       storeConf.compressionCodec,
-      getBooleanConf(ALLOW_FALLOCATE_CONF))
+      getBooleanConf(ALLOW_FALLOCATE_CONF),
+      getStringConf(COMPRESSION_CONF))
   }
 
   def apply(): RocksDBConf = apply(new StateStoreConf())
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index 4ce344d4e73a..a6e65825a5bc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -87,6 +87,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
         (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxWriteBufferNumber", 
"3"),
         (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".writeBufferSizeMB", 
"16"),
         (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".allowFAllocate", 
"false"),
+        (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".compression", "zstd"),
         (SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION.key, "4")
       )
       testConfs.foreach { case (k, v) => spark.conf.set(k, v) }
@@ -117,6 +118,7 @@ class RocksDBStateStoreSuite extends 
StateStoreSuiteBase[RocksDBStateStoreProvid
       assert(rocksDBConfInTask.maxWriteBufferNumber == 3)
       assert(rocksDBConfInTask.writeBufferSizeMB == 16L)
       assert(rocksDBConfInTask.allowFAllocate == false)
+      assert(rocksDBConfInTask.compression == "zstd")
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 15d35bae700f..ac50e5520291 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -24,6 +24,7 @@ import scala.language.implicitConversions
 
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
+import org.rocksdb.CompressionType
 import org.scalactic.source.Position
 import org.scalatest.Tag
 
@@ -373,6 +374,21 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
     }
   }
 
+  test("RocksDB: compression conf") {
+    val remoteDir = Utils.createTempDir().toString
+    new File(remoteDir).delete()  // to make sure that the directory gets 
created
+
+    val conf = RocksDBConf().copy(compression = "zstd")
+    withDB(remoteDir, conf = conf) { db =>
+      assert(db.columnFamilyOptions.compressionType() == 
CompressionType.ZSTD_COMPRESSION)
+    }
+
+    // Test the default is LZ4
+    withDB(remoteDir, conf = RocksDBConf().copy()) { db =>
+      assert(db.columnFamilyOptions.compressionType() == 
CompressionType.LZ4_COMPRESSION)
+    }
+  }
+
   test("RocksDB: get, put, iterator, commit, load") {
     def testOps(compactOnCommit: Boolean): Unit = {
       val remoteDir = Utils.createTempDir().toString


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to