This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 0c1b20fa2d3 [SPARK-41685][UI] Support Protobuf serializer for the
KVStore in History server
0c1b20fa2d3 is described below
commit 0c1b20fa2d3e8a7277aaae8ad566188478891188
Author: Gengliang Wang <[email protected]>
AuthorDate: Wed Dec 28 21:45:12 2022 -0800
[SPARK-41685][UI] Support Protobuf serializer for the KVStore in History
server
### What changes were proposed in this pull request?
Support configurable(`spark.history.store.serializer`) serializer for the
KVStore in History server. The value can be `JSON` or `PROTOBUF`.
### Why are the changes needed?
Support the fast and compact protobuf serializer in Spark history
server(SHS), so that:
* Improve the performance of KVStore IO
* Make it possible for SHS to read the live UI rocksdb instance.
### Does this PR introduce _any_ user-facing change?
Yes, introduce a new configuration `spark.history.store.serializer` for
setting the serializer for the KVStore in History server, which can be `JSON`
or `PROTOBUF`
### How was this patch tested?
UT
Closes #39202 from gengliangwang/shsProtobuf.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../spark/deploy/history/FsHistoryProvider.scala | 10 ++--
.../org/apache/spark/internal/config/History.scala | 15 ++++++
.../scala/org/apache/spark/status/KVUtils.scala | 63 +++++++++++++---------
.../deploy/history/FsHistoryProviderSuite.scala | 24 +++++++++
.../history/HistoryServerDiskManagerSuite.scala | 2 +-
.../spark/status/AppStatusListenerSuite.scala | 7 ++-
.../apache/spark/status/AppStatusStoreSuite.scala | 2 +-
docs/monitoring.md | 10 ++++
.../ui/StreamingQueryStatusListenerSuite.scala | 4 +-
9 files changed, 99 insertions(+), 38 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 1c1293083f4..49b479f3124 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -1214,7 +1214,7 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
// the existing data.
dm.openStore(appId, attempt.info.attemptId).foreach { path =>
try {
- return KVUtils.open(path, metadata, conf)
+ return KVUtils.open(path, metadata, conf, live = false)
} catch {
case e: Exception =>
logInfo(s"Failed to open existing store for
$appId/${attempt.info.attemptId}.", e)
@@ -1280,14 +1280,14 @@ private[history] class FsHistoryProvider(conf:
SparkConf, clock: Clock)
try {
logInfo(s"Leasing disk manager space for app $appId /
${attempt.info.attemptId}...")
lease = dm.lease(reader.totalSize, reader.compressionCodec.isDefined)
- val diskStore = KVUtils.open(lease.tmpPath, metadata, conf)
+ val diskStore = KVUtils.open(lease.tmpPath, metadata, conf, live = false)
hybridStore.setDiskStore(diskStore)
hybridStore.switchToDiskStore(new HybridStore.SwitchToDiskStoreListener {
override def onSwitchToDiskStoreSuccess: Unit = {
logInfo(s"Completely switched to diskStore for app $appId /
${attempt.info.attemptId}.")
diskStore.close()
val newStorePath = lease.commit(appId, attempt.info.attemptId)
- hybridStore.setDiskStore(KVUtils.open(newStorePath, metadata, conf))
+ hybridStore.setDiskStore(KVUtils.open(newStorePath, metadata, conf,
live = false))
memoryManager.release(appId, attempt.info.attemptId)
}
override def onSwitchToDiskStoreFail(e: Exception): Unit = {
@@ -1323,7 +1323,7 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
logInfo(s"Leasing disk manager space for app $appId /
${attempt.info.attemptId}...")
val lease = dm.lease(reader.totalSize, isCompressed)
try {
- Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata, conf)) {
store =>
+ Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata, conf, live
= false)) { store =>
rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
}
newStorePath = lease.commit(appId, attempt.info.attemptId)
@@ -1341,7 +1341,7 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
}
}
- KVUtils.open(newStorePath, metadata, conf)
+ KVUtils.open(newStorePath, metadata, conf, live = false)
}
private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala
b/core/src/main/scala/org/apache/spark/internal/config/History.scala
index a35bc944af1..3be5f92443b 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/History.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala
@@ -79,6 +79,21 @@ private[spark] object History {
.stringConf
.createOptional
+ object LocalStoreSerializer extends Enumeration {
+ val JSON, PROTOBUF = Value
+ }
+
+ val LOCAL_STORE_SERIALIZER = ConfigBuilder("spark.history.store.serializer")
+ .doc("Serializer for writing/reading in-memory UI objects to/from
disk-based KV Store; " +
+ "JSON or PROTOBUF. JSON serializer is the only choice before Spark
3.4.0, thus it is the " +
+ "default value. PROTOBUF serializer is fast and compact, and it is the
default " +
+ "serializer for disk-based KV store of live UI.")
+ .version("3.4.0")
+ .stringConf
+ .transform(_.toUpperCase(Locale.ROOT))
+ .checkValues(LocalStoreSerializer.values.map(_.toString))
+ .createWithDefault(LocalStoreSerializer.JSON.toString)
+
val MAX_LOCAL_DISK_USAGE = ConfigBuilder("spark.history.store.maxDiskUsage")
.version("2.3.0")
.bytesConf(ByteUnit.BYTE)
diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala
b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
index 42fa25393a3..0dd40962309 100644
--- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala
+++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
@@ -45,8 +45,26 @@ private[spark] object KVUtils extends Logging {
/** Use this to annotate constructor params to be used as KVStore indices. */
type KVIndexParam = KVIndex @getter
- private def backend(conf: SparkConf) =
- HybridStoreDiskBackend.withName(conf.get(HYBRID_STORE_DISK_BACKEND))
+ private def backend(conf: SparkConf, live: Boolean) = {
+ if (live) {
+ // For the disk-based KV store of live UI, let's simply make it ROCKSDB
only for now,
+ // instead of supporting both LevelDB and RocksDB. RocksDB is built
based on LevelDB with
+ // improvements on writes and reads.
+ HybridStoreDiskBackend.ROCKSDB
+ } else {
+ HybridStoreDiskBackend.withName(conf.get(HYBRID_STORE_DISK_BACKEND))
+ }
+ }
+
+ private def serializer(conf: SparkConf, live: Boolean) = {
+ if (live) {
+ // For the disk-based KV store of live UI, let's simply use protobuf
serializer only.
+ // The default serializer is slow since it is using JSON+GZip encoding.
+ new KVStoreProtobufSerializer()
+ } else {
+ serializerForHistoryServer(conf)
+ }
+ }
/**
* A KVStoreSerializer that provides Scala types serialization too, and uses
the same options as
@@ -72,12 +90,11 @@ private[spark] object KVUtils extends Logging {
path: File,
metadata: M,
conf: SparkConf,
- diskBackend: Option[HybridStoreDiskBackend.Value] = None,
- serializer: Option[KVStoreSerializer] = None): KVStore = {
+ live: Boolean): KVStore = {
require(metadata != null, "Metadata is required.")
- val kvSerializer = serializer.getOrElse(new KVStoreScalaSerializer())
- val db = diskBackend.getOrElse(backend(conf)) match {
+ val kvSerializer = serializer(conf, live)
+ val db = backend(conf, live) match {
case LEVELDB => new LevelDB(path, kvSerializer)
case ROCKSDB => new RocksDB(path, kvSerializer)
}
@@ -92,27 +109,23 @@ private[spark] object KVUtils extends Logging {
db
}
+ def serializerForHistoryServer(conf: SparkConf): KVStoreScalaSerializer = {
+
History.LocalStoreSerializer.withName(conf.get(History.LOCAL_STORE_SERIALIZER))
match {
+ case History.LocalStoreSerializer.JSON =>
+ new KVStoreScalaSerializer()
+ case History.LocalStoreSerializer.PROTOBUF =>
+ new KVStoreProtobufSerializer()
+ case other =>
+ throw new IllegalArgumentException(s"Unrecognized KV store serializer
$other")
+ }
+ }
+
def createKVStore(
storePath: Option[File],
live: Boolean,
conf: SparkConf): KVStore = {
storePath.map { path =>
- val diskBackend = if (live) {
- // For the disk-based KV store of live UI, let's simply make it
ROCKSDB only for now,
- // instead of supporting both LevelDB and RocksDB. RocksDB is built
based on LevelDB with
- // improvements on writes and reads.
- HybridStoreDiskBackend.ROCKSDB
- } else {
-
HybridStoreDiskBackend.withName(conf.get(History.HYBRID_STORE_DISK_BACKEND))
- }
-
- val serializer = if (live) {
- // For the disk-based KV store of live UI, let's simply use protobuf
serializer only.
- // The default serializer is slow since it is using JSON+GZip encoding.
- Some(new KVStoreProtobufSerializer())
- } else {
- None
- }
+ val diskBackend = backend(conf, live)
val dir = diskBackend match {
case LEVELDB => "listing.ldb"
@@ -128,7 +141,7 @@ private[spark] object KVUtils extends Logging {
conf.get(History.HISTORY_LOG_DIR))
try {
- open(dbPath, metadata, conf, Some(diskBackend), serializer)
+ open(dbPath, metadata, conf, live)
} catch {
// If there's an error, remove the listing database and any existing
UI database
// from the store directory, since it's extremely likely that they'll
all contain
@@ -136,12 +149,12 @@ private[spark] object KVUtils extends Logging {
case _: UnsupportedStoreVersionException | _:
MetadataMismatchException =>
logInfo("Detected incompatible DB versions, deleting...")
path.listFiles().foreach(Utils.deleteRecursively)
- open(dbPath, metadata, conf, Some(diskBackend), serializer)
+ open(dbPath, metadata, conf, live)
case dbExc @ (_: NativeDB.DBException | _: RocksDBException) =>
// Get rid of the corrupted data and re-create it.
logWarning(s"Failed to load disk store $dbPath :", dbExc)
Utils.deleteRecursively(dbPath)
- open(dbPath, metadata, conf, Some(diskBackend), serializer)
+ open(dbPath, metadata, conf, live)
}
}.getOrElse(new InMemoryStore())
}
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index f408f63be5e..4e026486e84 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -51,6 +51,7 @@ import org.apache.spark.status.AppStatusStore
import org.apache.spark.status.KVUtils
import org.apache.spark.status.KVUtils.KVStoreScalaSerializer
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
+import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
import org.apache.spark.tags.ExtendedLevelDBTest
import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}
import org.apache.spark.util.kvstore.InMemoryStore
@@ -75,6 +76,8 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite
with Matchers with P
protected def diskBackend: HybridStoreDiskBackend.Value
+ protected def serializer: LocalStoreSerializer.Value =
LocalStoreSerializer.JSON
+
/** Create a fake log file using the new log format used in Spark 1.3+ */
private def newLogFile(
appId: String,
@@ -97,6 +100,17 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite
with Matchers with P
testAppLogParsing(false, true)
}
+ test("SPARK-41685: Verify the configurable serializer for history server") {
+ val conf = createTestConf()
+ val serializerOfKVStore = KVUtils.serializerForHistoryServer(conf)
+ assert(serializerOfKVStore.isInstanceOf[KVStoreScalaSerializer])
+ if (serializer == LocalStoreSerializer.JSON) {
+ assert(!serializerOfKVStore.isInstanceOf[KVStoreProtobufSerializer])
+ } else {
+ assert(serializerOfKVStore.isInstanceOf[KVStoreProtobufSerializer])
+ }
+ }
+
private def testAppLogParsing(inMemory: Boolean, useHybridStore: Boolean =
false): Unit = {
val clock = new ManualClock(12345678)
val conf = createTestConf(inMemory = inMemory, useHybridStore =
useHybridStore)
@@ -1810,6 +1824,7 @@ abstract class FsHistoryProviderSuite extends
SparkFunSuite with Matchers with P
}
conf.set(HYBRID_STORE_ENABLED, useHybridStore)
conf.set(HYBRID_STORE_DISK_BACKEND.key, diskBackend.toString)
+ conf.set(LOCAL_STORE_SERIALIZER.key, serializer.toString)
conf
}
@@ -1865,7 +1880,16 @@ class LevelDBBackendFsHistoryProviderSuite extends
FsHistoryProviderSuite {
HybridStoreDiskBackend.LEVELDB
}
+@ExtendedLevelDBTest
+class LevelDBBackendWithProtobufSerializerSuite extends
LevelDBBackendFsHistoryProviderSuite {
+ override protected def serializer: LocalStoreSerializer.Value =
LocalStoreSerializer.PROTOBUF
+}
+
class RocksDBBackendFsHistoryProviderSuite extends FsHistoryProviderSuite {
override protected def diskBackend: HybridStoreDiskBackend.Value =
HybridStoreDiskBackend.ROCKSDB
}
+
+class RocksDBBackendWithProtobufSerializerSuite extends
RocksDBBackendFsHistoryProviderSuite {
+ override protected def serializer: LocalStoreSerializer.Value =
LocalStoreSerializer.PROTOBUF
+}
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
index 9d1d741ba13..373d1c557fc 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
@@ -50,7 +50,7 @@ abstract class HistoryServerDiskManagerSuite extends
SparkFunSuite with BeforeAn
before {
testDir = Utils.createTempDir()
- store = KVUtils.open(new File(testDir, "listing"), "test", conf)
+ store = KVUtils.open(new File(testDir, "listing"), "test", conf, live =
false)
}
after {
diff --git
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 5d0c25aa86a..6562cc317af 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -34,7 +34,6 @@ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster._
import org.apache.spark.status.ListenerEventsTestHelper._
import org.apache.spark.status.api.v1
-import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
import org.apache.spark.storage._
import org.apache.spark.tags.ExtendedLevelDBTest
import org.apache.spark.util.Utils
@@ -53,7 +52,8 @@ abstract class AppStatusListenerSuite extends SparkFunSuite
with BeforeAndAfter
.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
.set(ASYNC_TRACKING_ENABLED, false)
- protected def createKVStore: KVStore = KVUtils.open(testDir,
getClass().getName(), conf)
+ protected def createKVStore: KVStore =
+ KVUtils.open(testDir, getClass().getName(), conf, live = false)
before {
time = 0L
@@ -1973,6 +1973,5 @@ class AppStatusListenerWithProtobufSerializerSuite
extends AppStatusListenerSuit
testDir,
getClass().getName(),
conf,
- Some(HybridStoreDiskBackend.ROCKSDB),
- Some(new KVStoreProtobufSerializer()))
+ live = true)
}
diff --git
a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
index ac257bb0fb8..b257bcccf73 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
@@ -106,7 +106,7 @@ class AppStatusStoreSuite extends SparkFunSuite {
val store: KVStore = if (disk) {
conf.set(HYBRID_STORE_DISK_BACKEND, diskStoreType.toString)
val testDir = Utils.createTempDir()
- val diskStore = KVUtils.open(testDir, getClass.getName, conf)
+ val diskStore = KVUtils.open(testDir, getClass.getName, conf, live =
false)
new ElementTrackingStore(diskStore, conf)
} else {
new ElementTrackingStore(new InMemoryStore, conf)
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 14048afbe0f..804bde92060 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -341,6 +341,16 @@ Security options for the Spark History Server are covered
more detail in the
</td>
<td>2.3.0</td>
</tr>
+ <tr>
+ <td>spark.history.store.serializer</td>
+ <td>JSON</td>
+ <td>
+ Serializer for writing/reading in-memory UI objects to/from disk-based
KV Store; JSON or PROTOBUF.
+ JSON serializer is the only choice before Spark 3.4.0, thus it is the
default value.
+ PROTOBUF serializer is fast and compact, compared to the JSON
serializer.
+ </td>
+ <td>3.4.0</td>
+ </tr>
<tr>
<td>spark.history.custom.executor.log.url</td>
<td>(none)</td>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
index 1a51b58f4f6..7781c4276c7 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
@@ -226,7 +226,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
val conf = new SparkConf()
.set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.LEVELDB.toString)
val testDir = Utils.createTempDir()
- val kvStore = KVUtils.open(testDir, getClass.getName, conf)
+ val kvStore = KVUtils.open(testDir, getClass.getName, conf, live = false)
try {
testStreamingQueryData(kvStore)
} finally {
@@ -239,7 +239,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
val conf = new SparkConf()
.set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.ROCKSDB.toString)
val testDir = Utils.createTempDir()
- val kvStore = KVUtils.open(testDir, getClass.getName, conf)
+ val kvStore = KVUtils.open(testDir, getClass.getName, conf, live = false)
try {
testStreamingQueryData(kvStore)
} finally {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]