This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 326891f  [SPARK-37680][CORE] Support RocksDB backend in Spark History 
Server
326891f is described below

commit 326891f93e2610c13b9514ff936c5c247646f39f
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat Dec 18 09:01:57 2021 -0800

    [SPARK-37680][CORE] Support RocksDB backend in Spark History Server
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support `RocksDB` backend in Spark History Server via a new 
configuration,
    `spark.history.store.hybridStore.diskBackend`.
    
    ### Why are the changes needed?
    
    Currently, Spark History Server's `HybridStore` uses `LevelDB` which is not 
working on Java 17 native Apple Silicon VM.
    `RocksDB` will support Java 17 native Apple Silicon VM in Apache Spark 3.3 
timeframe.
    
    - `spark.history.store.hybridStore.diskBackend=leveldb` (default)
    ```
    $ sbin/start-history-server.sh
    $ tree leveldb
    leveldb
    ├── apps
    ├── listing.ldb
    │   ├── 000003.log
    │   ├── CURRENT
    │   ├── LOCK
    │   ├── LOG
    │   └── MANIFEST-000002
    └── temp
    ```
    
    - `spark.history.store.hybridStore.diskBackend=rocksdb` (New)
    ```
    $ sbin/start-history-server.sh
    $ tree rocksdb
    rocksdb
    ├── apps
    ├── listing.rdb
    │   ├── 000007.sst
    │   ├── 000009.log
    │   ├── CURRENT
    │   ├── IDENTITY
    │   ├── LOCK
    │   ├── LOG
    │   ├── LOG.old.1639812937513190
    │   ├── MANIFEST-000008
    │   └── OPTIONS-000011
    └── temp
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, this introduce a new configuration.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    Closes #34942 from dongjoon-hyun/SPARK-37680.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/util/kvstore/KVStoreSerializer.java      |  2 +-
 .../spark/deploy/history/FsHistoryProvider.scala   | 41 ++++++++++++--------
 .../deploy/history/HistoryServerDiskManager.scala  |  6 +--
 .../apache/spark/deploy/history/HybridStore.scala  | 40 ++++++++++---------
 .../org/apache/spark/internal/config/History.scala |  7 ++++
 .../scala/org/apache/spark/status/KVUtils.scala    | 14 +++++--
 .../spark/deploy/history/HybridStoreSuite.scala    | 45 ++++++++++++++--------
 7 files changed, 96 insertions(+), 59 deletions(-)

diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
index 771a954..ff99d05 100644
--- 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
@@ -28,7 +28,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.spark.annotation.Private;
 
 /**
- * Serializer used to translate between app-defined types and the LevelDB 
store.
+ * Serializer used to translate between app-defined types and the disk-based 
stores.
  *
  * <p>
  * The serializer is based on Jackson, so values are written as JSON. It also 
allows "naked strings"
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index d35d860..d5e2ef9 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem
 import org.apache.hadoop.hdfs.protocol.HdfsConstants
 import org.apache.hadoop.security.AccessControlException
 import org.fusesource.leveldbjni.internal.NativeDB
+import org.rocksdb.RocksDBException
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -130,10 +131,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
   private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING)
 
   private val hybridStoreEnabled = conf.get(History.HYBRID_STORE_ENABLED)
+  private val hybridStoreDiskBackend = 
conf.get(History.HYBRID_STORE_DISK_BACKEND)
 
   // Visible for testing.
   private[history] val listing: KVStore = storePath.map { path =>
-    val dbPath = Files.createDirectories(new File(path, 
"listing.ldb").toPath()).toFile()
+    val dir = hybridStoreDiskBackend match {
+      case "leveldb" => "listing.ldb"
+      case "rocksdb" => "listing.rdb"
+      case db => throw new IllegalArgumentException(s"$db is not supported.")
+    }
+    val dbPath = Files.createDirectories(new File(path, dir).toPath()).toFile()
     Utils.chmod700(dbPath)
 
     val metadata = new FsHistoryProviderMetadata(CURRENT_LISTING_VERSION,
@@ -149,8 +156,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
         logInfo("Detected incompatible DB versions, deleting...")
         path.listFiles().foreach(Utils.deleteRecursively)
         open(dbPath, metadata)
-      case dbExc: NativeDB.DBException =>
-        // Get rid of the corrupted listing.ldb and re-create it.
+      case dbExc @ (_: NativeDB.DBException | _: RocksDBException) =>
+        // Get rid of the corrupted data and re-create it.
         logWarning(s"Failed to load disk store $dbPath :", dbExc)
         Utils.deleteRecursively(dbPath)
         open(dbPath, metadata)
@@ -1214,11 +1221,11 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
       } catch {
         case e: Exception =>
           logInfo(s"Failed to create HybridStore for 
$appId/${attempt.info.attemptId}." +
-            " Using LevelDB.", e)
+            s" Using $hybridStoreDiskBackend.", e)
       }
     }
 
-    createLevelDBStore(dm, appId, attempt, metadata)
+    createDiskStore(dm, appId, attempt, metadata)
   }
 
   private def createHybridStore(
@@ -1257,24 +1264,24 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
       }
     }
 
-    // Create a LevelDB and start a background thread to dump data to LevelDB
+    // Create a disk-base KVStore and start a background thread to dump data 
to it
     var lease: dm.Lease = null
     try {
       logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
       lease = dm.lease(reader.totalSize, reader.compressionCodec.isDefined)
-      val levelDB = KVUtils.open(lease.tmpPath, metadata)
-      hybridStore.setLevelDB(levelDB)
-      hybridStore.switchToLevelDB(new HybridStore.SwitchToLevelDBListener {
-        override def onSwitchToLevelDBSuccess: Unit = {
-          logInfo(s"Completely switched to LevelDB for app $appId / 
${attempt.info.attemptId}.")
-          levelDB.close()
+      val diskStore = KVUtils.open(lease.tmpPath, metadata)
+      hybridStore.setDiskStore(diskStore)
+      hybridStore.switchToDiskStore(new HybridStore.SwitchToDiskStoreListener {
+        override def onSwitchToDiskStoreSuccess: Unit = {
+          logInfo(s"Completely switched to diskStore for app $appId / 
${attempt.info.attemptId}.")
+          diskStore.close()
           val newStorePath = lease.commit(appId, attempt.info.attemptId)
-          hybridStore.setLevelDB(KVUtils.open(newStorePath, metadata))
+          hybridStore.setDiskStore(KVUtils.open(newStorePath, metadata))
           memoryManager.release(appId, attempt.info.attemptId)
         }
-        override def onSwitchToLevelDBFail(e: Exception): Unit = {
-          logWarning(s"Failed to switch to LevelDB for app $appId / 
${attempt.info.attemptId}", e)
-          levelDB.close()
+        override def onSwitchToDiskStoreFail(e: Exception): Unit = {
+          logWarning(s"Failed to switch to diskStore for app $appId / 
${attempt.info.attemptId}", e)
+          diskStore.close()
           lease.rollback()
         }
       }, appId, attempt.info.attemptId)
@@ -1291,7 +1298,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     hybridStore
   }
 
-  private def createLevelDBStore(
+  private def createDiskStore(
       dm: HistoryServerDiskManager,
       appId: String,
       attempt: AttemptInfoWrapper,
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
index 31f9d18..40e337a 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
@@ -87,7 +87,7 @@ private class HistoryServerDiskManager(
       listing.delete(info.getClass(), info.path)
     }
 
-    // Reading level db would trigger table file compaction, then it may cause 
size of level db
+    // Reading disk-based KVStore may trigger table file compaction, then it 
may cause size of
     // directory changed. When service restarts, "currentUsage" is calculated 
from real directory
     // size. Update "ApplicationStoreInfo.size" to ensure "currentUsage" equals
     // sum of "ApplicationStoreInfo.size".
@@ -162,8 +162,8 @@ private class HistoryServerDiskManager(
    * @param delete Whether to delete the store from disk.
    */
   def release(appId: String, attemptId: Option[String], delete: Boolean = 
false): Unit = {
-    // Because LevelDB may modify the structure of the store files even when 
just reading, update
-    // the accounting for this application when it's closed.
+    // Because disk-based stores may modify the structure of the store files 
even when just reading,
+    // update the accounting for this application when it's closed.
     val oldSizeOpt = active.synchronized {
       active.remove(appId -> attemptId)
     }
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
index 4eb5c15..f89c414 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
@@ -32,15 +32,15 @@ import org.apache.spark.util.kvstore._
  *
  * When rebuilding the application state from event logs, HybridStore will
  * write data to InMemoryStore at first and use a background thread to dump
- * data to LevelDB once the app store is restored. We don't expect write
- * operations (except the case for caching) after calling switch to level DB.
+ * data to a disk-based KVStore once the app store is restored. We don't 
expect write
+ * operations (except the case for caching) after calling switch to the 
disk-based KVStore.
  */
 
 private[history] class HybridStore extends KVStore {
 
   private val inMemoryStore = new InMemoryStore()
 
-  private var levelDB: LevelDB = null
+  private var diskStore: KVStore = null
 
   // Flag to indicate whether we should use inMemoryStore or levelDB
   private val shouldUseInMemoryStore = new AtomicBoolean(true)
@@ -107,8 +107,8 @@ private[history] class HybridStore extends KVStore {
       }
     } finally {
       inMemoryStore.close()
-      if (levelDB != null) {
-        levelDB.close()
+      if (diskStore != null) {
+        diskStore.close()
       }
     }
   }
@@ -125,18 +125,18 @@ private[history] class HybridStore extends KVStore {
     getStore().removeAllByIndexValues(klass, index, indexValues)
   }
 
-  def setLevelDB(levelDB: LevelDB): Unit = {
-    this.levelDB = levelDB
+  def setDiskStore(diskStore: KVStore): Unit = {
+    this.diskStore = diskStore
   }
 
   /**
    * This method is called when the writing is done for inMemoryStore. A
    * background thread will be created and be started to dump data in 
inMemoryStore
-   * to levelDB. Once the dumping is completed, the underlying kvstore will be
-   * switched to levelDB.
+   * to diskStore. Once the dumping is completed, the underlying kvstore will 
be
+   * switched to diskStore.
    */
-  def switchToLevelDB(
-      listener: HybridStore.SwitchToLevelDBListener,
+  def switchToDiskStore(
+      listener: HybridStore.SwitchToDiskStoreListener,
       appId: String,
       attemptId: Option[String]): Unit = {
     if (closed.get) {
@@ -148,14 +148,18 @@ private[history] class HybridStore extends KVStore {
         for (klass <- klassMap.keys().asScala) {
           val values = Lists.newArrayList(
               inMemoryStore.view(klass).closeableIterator())
-          levelDB.writeAll(values)
+          diskStore match {
+            case db: LevelDB => db.writeAll(values)
+            case db: RocksDB => db.writeAll(values)
+            case _ => throw new IllegalStateException("Unknown disk-based 
KVStore")
+          }
         }
-        listener.onSwitchToLevelDBSuccess()
+        listener.onSwitchToDiskStoreSuccess()
         shouldUseInMemoryStore.set(false)
         inMemoryStore.close()
       } catch {
         case e: Exception =>
-          listener.onSwitchToLevelDBFail(e)
+          listener.onSwitchToDiskStoreFail(e)
       }
     })
     backgroundThread.setDaemon(true)
@@ -171,17 +175,17 @@ private[history] class HybridStore extends KVStore {
     if (shouldUseInMemoryStore.get) {
       inMemoryStore
     } else {
-      levelDB
+      diskStore
     }
   }
 }
 
 private[history] object HybridStore {
 
-  trait SwitchToLevelDBListener {
+  trait SwitchToDiskStoreListener {
 
-    def onSwitchToLevelDBSuccess(): Unit
+    def onSwitchToDiskStoreSuccess(): Unit
 
-    def onSwitchToLevelDBFail(e: Exception): Unit
+    def onSwitchToDiskStoreFail(e: Exception): Unit
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala 
b/core/src/main/scala/org/apache/spark/internal/config/History.scala
index a6d1c04..a099896 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/History.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala
@@ -211,4 +211,11 @@ private[spark] object History {
     .version("3.1.0")
     .bytesConf(ByteUnit.BYTE)
     .createWithDefaultString("2g")
+
+  val HYBRID_STORE_DISK_BACKEND = 
ConfigBuilder("spark.history.store.hybridStore.diskBackend")
+    .doc("Specifies a disk-based store used in hybrid store; 'leveldb' or 
'rocksdb'.")
+    .version("3.3.0")
+    .stringConf
+    .checkValues(Set("leveldb", "rocksdb"))
+    .createWithDefault("leveldb")
 }
diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala 
b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
index c79f2dc..4440d26 100644
--- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala
+++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
@@ -26,7 +26,9 @@ import scala.reflect.{classTag, ClassTag}
 import com.fasterxml.jackson.annotation.JsonInclude
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 
+import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.History.HYBRID_STORE_DISK_BACKEND
 import org.apache.spark.util.kvstore._
 
 private[spark] object KVUtils extends Logging {
@@ -34,6 +36,8 @@ private[spark] object KVUtils extends Logging {
   /** Use this to annotate constructor params to be used as KVStore indices. */
   type KVIndexParam = KVIndex @getter
 
+  private lazy val backend = new SparkConf().get(HYBRID_STORE_DISK_BACKEND)
+
   /**
    * A KVStoreSerializer that provides Scala types serialization too, and uses 
the same options as
    * the API serializer.
@@ -46,17 +50,21 @@ private[spark] object KVUtils extends Logging {
   }
 
   /**
-   * Open or create a LevelDB store.
+   * Open or create a disk-based KVStore.
    *
    * @param path Location of the store.
    * @param metadata Metadata value to compare to the data in the store. If 
the store does not
    *                 contain any metadata (e.g. it's a new store), this value 
is written as
    *                 the store's metadata.
    */
-  def open[M: ClassTag](path: File, metadata: M): LevelDB = {
+  def open[M: ClassTag](path: File, metadata: M): KVStore = {
     require(metadata != null, "Metadata is required.")
 
-    val db = new LevelDB(path, new KVStoreScalaSerializer())
+    val db = backend match {
+      case "leveldb" => new LevelDB(path, new KVStoreScalaSerializer())
+      case "rocksdb" => new RocksDB(path, new KVStoreScalaSerializer())
+      case _ => throw new IllegalArgumentException(s"$backend is not 
supported.")
+    }
     val dbMeta = db.getMetadata(classTag[M].runtimeClass)
     if (dbMeta == null) {
       db.setMetadata(metadata)
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala
index f20e7a5..c376bba 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala
@@ -28,20 +28,13 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.status.KVUtils._
-import org.apache.spark.tags.ExtendedLevelDBTest
+import org.apache.spark.tags.{ExtendedLevelDBTest, ExtendedRocksDBTest}
 import org.apache.spark.util.kvstore._
 
-@ExtendedLevelDBTest
-class HybridStoreSuite extends SparkFunSuite with BeforeAndAfter with 
TimeLimits {
-
-  private var db: LevelDB = _
-  private var dbpath: File = _
+abstract class HybridStoreSuite extends SparkFunSuite with BeforeAndAfter with 
TimeLimits {
 
-  before {
-    dbpath = File.createTempFile("test.", ".ldb")
-    dbpath.delete()
-    db = new LevelDB(dbpath, new KVStoreScalaSerializer())
-  }
+  var db: KVStore = _
+  var dbpath: File = _
 
   after {
     if (db != null) {
@@ -156,7 +149,7 @@ class HybridStoreSuite extends SparkFunSuite with 
BeforeAndAfter with TimeLimits
 
   private def createHybridStore(): HybridStore = {
     val store = new HybridStore()
-    store.setLevelDB(db)
+    store.setDiskStore(db)
     store
   }
 
@@ -167,21 +160,21 @@ class HybridStoreSuite extends SparkFunSuite with 
BeforeAndAfter with TimeLimits
   private def switchHybridStore(store: HybridStore): Unit = {
     assert(store.getStore().isInstanceOf[InMemoryStore])
     val listener = new SwitchListener()
-    store.switchToLevelDB(listener, "test", None)
+    store.switchToDiskStore(listener, "test", None)
     failAfter(2.seconds) {
       assert(listener.waitUntilDone())
     }
-    while (!store.getStore().isInstanceOf[LevelDB]) {
+    while (!store.getStore().isInstanceOf[LevelDB] && 
!store.getStore().isInstanceOf[RocksDB]) {
       Thread.sleep(10)
     }
   }
 
-  private class SwitchListener extends HybridStore.SwitchToLevelDBListener {
+  private class SwitchListener extends HybridStore.SwitchToDiskStoreListener {
 
     // Put true to the queue when switch succeeds, and false when fails.
     private val results = new LinkedBlockingQueue[Boolean]()
 
-    override def onSwitchToLevelDBSuccess(): Unit = {
+    override def onSwitchToDiskStoreSuccess(): Unit = {
       try {
         results.put(true)
       } catch {
@@ -190,7 +183,7 @@ class HybridStoreSuite extends SparkFunSuite with 
BeforeAndAfter with TimeLimits
       }
     }
 
-    override def onSwitchToLevelDBFail(e: Exception): Unit = {
+    override def onSwitchToDiskStoreFail(e: Exception): Unit = {
       try {
         results.put(false)
       } catch {
@@ -205,6 +198,24 @@ class HybridStoreSuite extends SparkFunSuite with 
BeforeAndAfter with TimeLimits
   }
 }
 
+@ExtendedLevelDBTest
+class LevelDBHybridStoreSuite extends HybridStoreSuite {
+  before {
+    dbpath = File.createTempFile("test.", ".ldb")
+    dbpath.delete()
+    db = new LevelDB(dbpath, new KVStoreScalaSerializer())
+  }
+}
+
+@ExtendedRocksDBTest
+class RocksDBHybridStoreSuite extends HybridStoreSuite {
+  before {
+    dbpath = File.createTempFile("test.", ".rdb")
+    dbpath.delete()
+    db = new RocksDB(dbpath, new KVStoreScalaSerializer())
+  }
+}
+
 class CustomType1(
     @KVIndexParam var key: String,
     @KVIndexParam("id") var id: String,

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to