This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 326891f [SPARK-37680][CORE] Support RocksDB backend in Spark History
Server
326891f is described below
commit 326891f93e2610c13b9514ff936c5c247646f39f
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sat Dec 18 09:01:57 2021 -0800
[SPARK-37680][CORE] Support RocksDB backend in Spark History Server
### What changes were proposed in this pull request?
This PR aims to support `RocksDB` backend in Spark History Server via a new
configuration,
`spark.history.store.hybridStore.diskBackend`.
### Why are the changes needed?
Currently, Spark History Server's `HybridStore` uses `LevelDB` which is not
working on Java 17 native Apple Silicon VM.
`RocksDB` will support Java 17 native Apple Silicon VM in Apache Spark 3.3
timeframe.
- `spark.history.store.hybridStore.diskBackend=leveldb` (default)
```
$ sbin/start-history-server.sh
$ tree leveldb
leveldb
├── apps
├── listing.ldb
│ ├── 000003.log
│ ├── CURRENT
│ ├── LOCK
│ ├── LOG
│ └── MANIFEST-000002
└── temp
```
- `spark.history.store.hybridStore.diskBackend=rocksdb` (New)
```
$ sbin/start-history-server.sh
$ tree rocksdb
rocksdb
├── apps
├── listing.rdb
│ ├── 000007.sst
│ ├── 000009.log
│ ├── CURRENT
│ ├── IDENTITY
│ ├── LOCK
│ ├── LOG
│ ├── LOG.old.1639812937513190
│ ├── MANIFEST-000008
│ └── OPTIONS-000011
└── temp
```
### Does this PR introduce _any_ user-facing change?
Yes, this introduce a new configuration.
### How was this patch tested?
Pass the CIs.
Closes #34942 from dongjoon-hyun/SPARK-37680.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/util/kvstore/KVStoreSerializer.java | 2 +-
.../spark/deploy/history/FsHistoryProvider.scala | 41 ++++++++++++--------
.../deploy/history/HistoryServerDiskManager.scala | 6 +--
.../apache/spark/deploy/history/HybridStore.scala | 40 ++++++++++---------
.../org/apache/spark/internal/config/History.scala | 7 ++++
.../scala/org/apache/spark/status/KVUtils.scala | 14 +++++--
.../spark/deploy/history/HybridStoreSuite.scala | 45 ++++++++++++++--------
7 files changed, 96 insertions(+), 59 deletions(-)
diff --git
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
index 771a954..ff99d05 100644
---
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
+++
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
@@ -28,7 +28,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.spark.annotation.Private;
/**
- * Serializer used to translate between app-defined types and the LevelDB
store.
+ * Serializer used to translate between app-defined types and the disk-based
stores.
*
* <p>
* The serializer is based on Jackson, so values are written as JSON. It also
allows "naked strings"
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index d35d860..d5e2ef9 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.HdfsConstants
import org.apache.hadoop.security.AccessControlException
import org.fusesource.leveldbjni.internal.NativeDB
+import org.rocksdb.RocksDBException
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
@@ -130,10 +131,16 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING)
private val hybridStoreEnabled = conf.get(History.HYBRID_STORE_ENABLED)
+ private val hybridStoreDiskBackend =
conf.get(History.HYBRID_STORE_DISK_BACKEND)
// Visible for testing.
private[history] val listing: KVStore = storePath.map { path =>
- val dbPath = Files.createDirectories(new File(path,
"listing.ldb").toPath()).toFile()
+ val dir = hybridStoreDiskBackend match {
+ case "leveldb" => "listing.ldb"
+ case "rocksdb" => "listing.rdb"
+ case db => throw new IllegalArgumentException(s"$db is not supported.")
+ }
+ val dbPath = Files.createDirectories(new File(path, dir).toPath()).toFile()
Utils.chmod700(dbPath)
val metadata = new FsHistoryProviderMetadata(CURRENT_LISTING_VERSION,
@@ -149,8 +156,8 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
logInfo("Detected incompatible DB versions, deleting...")
path.listFiles().foreach(Utils.deleteRecursively)
open(dbPath, metadata)
- case dbExc: NativeDB.DBException =>
- // Get rid of the corrupted listing.ldb and re-create it.
+ case dbExc @ (_: NativeDB.DBException | _: RocksDBException) =>
+ // Get rid of the corrupted data and re-create it.
logWarning(s"Failed to load disk store $dbPath :", dbExc)
Utils.deleteRecursively(dbPath)
open(dbPath, metadata)
@@ -1214,11 +1221,11 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
} catch {
case e: Exception =>
logInfo(s"Failed to create HybridStore for
$appId/${attempt.info.attemptId}." +
- " Using LevelDB.", e)
+ s" Using $hybridStoreDiskBackend.", e)
}
}
- createLevelDBStore(dm, appId, attempt, metadata)
+ createDiskStore(dm, appId, attempt, metadata)
}
private def createHybridStore(
@@ -1257,24 +1264,24 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
}
}
- // Create a LevelDB and start a background thread to dump data to LevelDB
+ // Create a disk-base KVStore and start a background thread to dump data
to it
var lease: dm.Lease = null
try {
logInfo(s"Leasing disk manager space for app $appId /
${attempt.info.attemptId}...")
lease = dm.lease(reader.totalSize, reader.compressionCodec.isDefined)
- val levelDB = KVUtils.open(lease.tmpPath, metadata)
- hybridStore.setLevelDB(levelDB)
- hybridStore.switchToLevelDB(new HybridStore.SwitchToLevelDBListener {
- override def onSwitchToLevelDBSuccess: Unit = {
- logInfo(s"Completely switched to LevelDB for app $appId /
${attempt.info.attemptId}.")
- levelDB.close()
+ val diskStore = KVUtils.open(lease.tmpPath, metadata)
+ hybridStore.setDiskStore(diskStore)
+ hybridStore.switchToDiskStore(new HybridStore.SwitchToDiskStoreListener {
+ override def onSwitchToDiskStoreSuccess: Unit = {
+ logInfo(s"Completely switched to diskStore for app $appId /
${attempt.info.attemptId}.")
+ diskStore.close()
val newStorePath = lease.commit(appId, attempt.info.attemptId)
- hybridStore.setLevelDB(KVUtils.open(newStorePath, metadata))
+ hybridStore.setDiskStore(KVUtils.open(newStorePath, metadata))
memoryManager.release(appId, attempt.info.attemptId)
}
- override def onSwitchToLevelDBFail(e: Exception): Unit = {
- logWarning(s"Failed to switch to LevelDB for app $appId /
${attempt.info.attemptId}", e)
- levelDB.close()
+ override def onSwitchToDiskStoreFail(e: Exception): Unit = {
+ logWarning(s"Failed to switch to diskStore for app $appId /
${attempt.info.attemptId}", e)
+ diskStore.close()
lease.rollback()
}
}, appId, attempt.info.attemptId)
@@ -1291,7 +1298,7 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
hybridStore
}
- private def createLevelDBStore(
+ private def createDiskStore(
dm: HistoryServerDiskManager,
appId: String,
attempt: AttemptInfoWrapper,
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
index 31f9d18..40e337a 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
@@ -87,7 +87,7 @@ private class HistoryServerDiskManager(
listing.delete(info.getClass(), info.path)
}
- // Reading level db would trigger table file compaction, then it may cause
size of level db
+ // Reading disk-based KVStore may trigger table file compaction, then it
may cause size of
// directory changed. When service restarts, "currentUsage" is calculated
from real directory
// size. Update "ApplicationStoreInfo.size" to ensure "currentUsage" equals
// sum of "ApplicationStoreInfo.size".
@@ -162,8 +162,8 @@ private class HistoryServerDiskManager(
* @param delete Whether to delete the store from disk.
*/
def release(appId: String, attemptId: Option[String], delete: Boolean =
false): Unit = {
- // Because LevelDB may modify the structure of the store files even when
just reading, update
- // the accounting for this application when it's closed.
+ // Because disk-based stores may modify the structure of the store files
even when just reading,
+ // update the accounting for this application when it's closed.
val oldSizeOpt = active.synchronized {
active.remove(appId -> attemptId)
}
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
b/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
index 4eb5c15..f89c414 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HybridStore.scala
@@ -32,15 +32,15 @@ import org.apache.spark.util.kvstore._
*
* When rebuilding the application state from event logs, HybridStore will
* write data to InMemoryStore at first and use a background thread to dump
- * data to LevelDB once the app store is restored. We don't expect write
- * operations (except the case for caching) after calling switch to level DB.
+ * data to a disk-based KVStore once the app store is restored. We don't
expect write
+ * operations (except the case for caching) after calling switch to the
disk-based KVStore.
*/
private[history] class HybridStore extends KVStore {
private val inMemoryStore = new InMemoryStore()
- private var levelDB: LevelDB = null
+ private var diskStore: KVStore = null
// Flag to indicate whether we should use inMemoryStore or levelDB
private val shouldUseInMemoryStore = new AtomicBoolean(true)
@@ -107,8 +107,8 @@ private[history] class HybridStore extends KVStore {
}
} finally {
inMemoryStore.close()
- if (levelDB != null) {
- levelDB.close()
+ if (diskStore != null) {
+ diskStore.close()
}
}
}
@@ -125,18 +125,18 @@ private[history] class HybridStore extends KVStore {
getStore().removeAllByIndexValues(klass, index, indexValues)
}
- def setLevelDB(levelDB: LevelDB): Unit = {
- this.levelDB = levelDB
+ def setDiskStore(diskStore: KVStore): Unit = {
+ this.diskStore = diskStore
}
/**
* This method is called when the writing is done for inMemoryStore. A
* background thread will be created and be started to dump data in
inMemoryStore
- * to levelDB. Once the dumping is completed, the underlying kvstore will be
- * switched to levelDB.
+ * to diskStore. Once the dumping is completed, the underlying kvstore will
be
+ * switched to diskStore.
*/
- def switchToLevelDB(
- listener: HybridStore.SwitchToLevelDBListener,
+ def switchToDiskStore(
+ listener: HybridStore.SwitchToDiskStoreListener,
appId: String,
attemptId: Option[String]): Unit = {
if (closed.get) {
@@ -148,14 +148,18 @@ private[history] class HybridStore extends KVStore {
for (klass <- klassMap.keys().asScala) {
val values = Lists.newArrayList(
inMemoryStore.view(klass).closeableIterator())
- levelDB.writeAll(values)
+ diskStore match {
+ case db: LevelDB => db.writeAll(values)
+ case db: RocksDB => db.writeAll(values)
+ case _ => throw new IllegalStateException("Unknown disk-based
KVStore")
+ }
}
- listener.onSwitchToLevelDBSuccess()
+ listener.onSwitchToDiskStoreSuccess()
shouldUseInMemoryStore.set(false)
inMemoryStore.close()
} catch {
case e: Exception =>
- listener.onSwitchToLevelDBFail(e)
+ listener.onSwitchToDiskStoreFail(e)
}
})
backgroundThread.setDaemon(true)
@@ -171,17 +175,17 @@ private[history] class HybridStore extends KVStore {
if (shouldUseInMemoryStore.get) {
inMemoryStore
} else {
- levelDB
+ diskStore
}
}
}
private[history] object HybridStore {
- trait SwitchToLevelDBListener {
+ trait SwitchToDiskStoreListener {
- def onSwitchToLevelDBSuccess(): Unit
+ def onSwitchToDiskStoreSuccess(): Unit
- def onSwitchToLevelDBFail(e: Exception): Unit
+ def onSwitchToDiskStoreFail(e: Exception): Unit
}
}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala
b/core/src/main/scala/org/apache/spark/internal/config/History.scala
index a6d1c04..a099896 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/History.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala
@@ -211,4 +211,11 @@ private[spark] object History {
.version("3.1.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("2g")
+
+ val HYBRID_STORE_DISK_BACKEND =
ConfigBuilder("spark.history.store.hybridStore.diskBackend")
+ .doc("Specifies a disk-based store used in hybrid store; 'leveldb' or
'rocksdb'.")
+ .version("3.3.0")
+ .stringConf
+ .checkValues(Set("leveldb", "rocksdb"))
+ .createWithDefault("leveldb")
}
diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala
b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
index c79f2dc..4440d26 100644
--- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala
+++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
@@ -26,7 +26,9 @@ import scala.reflect.{classTag, ClassTag}
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.History.HYBRID_STORE_DISK_BACKEND
import org.apache.spark.util.kvstore._
private[spark] object KVUtils extends Logging {
@@ -34,6 +36,8 @@ private[spark] object KVUtils extends Logging {
/** Use this to annotate constructor params to be used as KVStore indices. */
type KVIndexParam = KVIndex @getter
+ private lazy val backend = new SparkConf().get(HYBRID_STORE_DISK_BACKEND)
+
/**
* A KVStoreSerializer that provides Scala types serialization too, and uses
the same options as
* the API serializer.
@@ -46,17 +50,21 @@ private[spark] object KVUtils extends Logging {
}
/**
- * Open or create a LevelDB store.
+ * Open or create a disk-based KVStore.
*
* @param path Location of the store.
* @param metadata Metadata value to compare to the data in the store. If
the store does not
* contain any metadata (e.g. it's a new store), this value
is written as
* the store's metadata.
*/
- def open[M: ClassTag](path: File, metadata: M): LevelDB = {
+ def open[M: ClassTag](path: File, metadata: M): KVStore = {
require(metadata != null, "Metadata is required.")
- val db = new LevelDB(path, new KVStoreScalaSerializer())
+ val db = backend match {
+ case "leveldb" => new LevelDB(path, new KVStoreScalaSerializer())
+ case "rocksdb" => new RocksDB(path, new KVStoreScalaSerializer())
+ case _ => throw new IllegalArgumentException(s"$backend is not
supported.")
+ }
val dbMeta = db.getMetadata(classTag[M].runtimeClass)
if (dbMeta == null) {
db.setMetadata(metadata)
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala
index f20e7a5..c376bba 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HybridStoreSuite.scala
@@ -28,20 +28,13 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkFunSuite
import org.apache.spark.status.KVUtils._
-import org.apache.spark.tags.ExtendedLevelDBTest
+import org.apache.spark.tags.{ExtendedLevelDBTest, ExtendedRocksDBTest}
import org.apache.spark.util.kvstore._
-@ExtendedLevelDBTest
-class HybridStoreSuite extends SparkFunSuite with BeforeAndAfter with
TimeLimits {
-
- private var db: LevelDB = _
- private var dbpath: File = _
+abstract class HybridStoreSuite extends SparkFunSuite with BeforeAndAfter with
TimeLimits {
- before {
- dbpath = File.createTempFile("test.", ".ldb")
- dbpath.delete()
- db = new LevelDB(dbpath, new KVStoreScalaSerializer())
- }
+ var db: KVStore = _
+ var dbpath: File = _
after {
if (db != null) {
@@ -156,7 +149,7 @@ class HybridStoreSuite extends SparkFunSuite with
BeforeAndAfter with TimeLimits
private def createHybridStore(): HybridStore = {
val store = new HybridStore()
- store.setLevelDB(db)
+ store.setDiskStore(db)
store
}
@@ -167,21 +160,21 @@ class HybridStoreSuite extends SparkFunSuite with
BeforeAndAfter with TimeLimits
private def switchHybridStore(store: HybridStore): Unit = {
assert(store.getStore().isInstanceOf[InMemoryStore])
val listener = new SwitchListener()
- store.switchToLevelDB(listener, "test", None)
+ store.switchToDiskStore(listener, "test", None)
failAfter(2.seconds) {
assert(listener.waitUntilDone())
}
- while (!store.getStore().isInstanceOf[LevelDB]) {
+ while (!store.getStore().isInstanceOf[LevelDB] &&
!store.getStore().isInstanceOf[RocksDB]) {
Thread.sleep(10)
}
}
- private class SwitchListener extends HybridStore.SwitchToLevelDBListener {
+ private class SwitchListener extends HybridStore.SwitchToDiskStoreListener {
// Put true to the queue when switch succeeds, and false when fails.
private val results = new LinkedBlockingQueue[Boolean]()
- override def onSwitchToLevelDBSuccess(): Unit = {
+ override def onSwitchToDiskStoreSuccess(): Unit = {
try {
results.put(true)
} catch {
@@ -190,7 +183,7 @@ class HybridStoreSuite extends SparkFunSuite with
BeforeAndAfter with TimeLimits
}
}
- override def onSwitchToLevelDBFail(e: Exception): Unit = {
+ override def onSwitchToDiskStoreFail(e: Exception): Unit = {
try {
results.put(false)
} catch {
@@ -205,6 +198,24 @@ class HybridStoreSuite extends SparkFunSuite with
BeforeAndAfter with TimeLimits
}
}
+@ExtendedLevelDBTest
+class LevelDBHybridStoreSuite extends HybridStoreSuite {
+ before {
+ dbpath = File.createTempFile("test.", ".ldb")
+ dbpath.delete()
+ db = new LevelDB(dbpath, new KVStoreScalaSerializer())
+ }
+}
+
+@ExtendedRocksDBTest
+class RocksDBHybridStoreSuite extends HybridStoreSuite {
+ before {
+ dbpath = File.createTempFile("test.", ".rdb")
+ dbpath.delete()
+ db = new RocksDB(dbpath, new KVStoreScalaSerializer())
+ }
+}
+
class CustomType1(
@KVIndexParam var key: String,
@KVIndexParam("id") var id: String,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]