This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c1b20fa2d3 [SPARK-41685][UI] Support Protobuf serializer for the 
KVStore in History server
0c1b20fa2d3 is described below

commit 0c1b20fa2d3e8a7277aaae8ad566188478891188
Author: Gengliang Wang <[email protected]>
AuthorDate: Wed Dec 28 21:45:12 2022 -0800

    [SPARK-41685][UI] Support Protobuf serializer for the KVStore in History 
server
    
    ### What changes were proposed in this pull request?
    
    Support configurable(`spark.history.store.serializer`) serializer for the 
KVStore in History server. The value can be `JSON` or `PROTOBUF`.
    
    ### Why are the changes needed?
    
    Support the fast and compact protobuf serializer in Spark history 
server(SHS), so that:
    * Improve the performance of KVStore IO
    * Make it possible for SHS to read the live UI rocksdb instance.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, introduce a new configuration `spark.history.store.serializer` for 
setting the serializer for the KVStore in History server, which can be `JSON` 
or `PROTOBUF`
    
    ### How was this patch tested?
    
    UT
    
    Closes #39202 from gengliangwang/shsProtobuf.
    
    Authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../spark/deploy/history/FsHistoryProvider.scala   | 10 ++--
 .../org/apache/spark/internal/config/History.scala | 15 ++++++
 .../scala/org/apache/spark/status/KVUtils.scala    | 63 +++++++++++++---------
 .../deploy/history/FsHistoryProviderSuite.scala    | 24 +++++++++
 .../history/HistoryServerDiskManagerSuite.scala    |  2 +-
 .../spark/status/AppStatusListenerSuite.scala      |  7 ++-
 .../apache/spark/status/AppStatusStoreSuite.scala  |  2 +-
 docs/monitoring.md                                 | 10 ++++
 .../ui/StreamingQueryStatusListenerSuite.scala     |  4 +-
 9 files changed, 99 insertions(+), 38 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 1c1293083f4..49b479f3124 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -1214,7 +1214,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
     // the existing data.
     dm.openStore(appId, attempt.info.attemptId).foreach { path =>
       try {
-        return KVUtils.open(path, metadata, conf)
+        return KVUtils.open(path, metadata, conf, live = false)
       } catch {
         case e: Exception =>
           logInfo(s"Failed to open existing store for 
$appId/${attempt.info.attemptId}.", e)
@@ -1280,14 +1280,14 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
     try {
       logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
       lease = dm.lease(reader.totalSize, reader.compressionCodec.isDefined)
-      val diskStore = KVUtils.open(lease.tmpPath, metadata, conf)
+      val diskStore = KVUtils.open(lease.tmpPath, metadata, conf, live = false)
       hybridStore.setDiskStore(diskStore)
       hybridStore.switchToDiskStore(new HybridStore.SwitchToDiskStoreListener {
         override def onSwitchToDiskStoreSuccess: Unit = {
           logInfo(s"Completely switched to diskStore for app $appId / 
${attempt.info.attemptId}.")
           diskStore.close()
           val newStorePath = lease.commit(appId, attempt.info.attemptId)
-          hybridStore.setDiskStore(KVUtils.open(newStorePath, metadata, conf))
+          hybridStore.setDiskStore(KVUtils.open(newStorePath, metadata, conf, 
live = false))
           memoryManager.release(appId, attempt.info.attemptId)
         }
         override def onSwitchToDiskStoreFail(e: Exception): Unit = {
@@ -1323,7 +1323,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       logInfo(s"Leasing disk manager space for app $appId / 
${attempt.info.attemptId}...")
       val lease = dm.lease(reader.totalSize, isCompressed)
       try {
-        Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata, conf)) { 
store =>
+        Utils.tryWithResource(KVUtils.open(lease.tmpPath, metadata, conf, live 
= false)) { store =>
           rebuildAppStore(store, reader, attempt.info.lastUpdated.getTime())
         }
         newStorePath = lease.commit(appId, attempt.info.attemptId)
@@ -1341,7 +1341,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
       }
     }
 
-    KVUtils.open(newStorePath, metadata, conf)
+    KVUtils.open(newStorePath, metadata, conf, live = false)
   }
 
   private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala 
b/core/src/main/scala/org/apache/spark/internal/config/History.scala
index a35bc944af1..3be5f92443b 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/History.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala
@@ -79,6 +79,21 @@ private[spark] object History {
     .stringConf
     .createOptional
 
+  object LocalStoreSerializer extends Enumeration {
+    val JSON, PROTOBUF = Value
+  }
+
+  val LOCAL_STORE_SERIALIZER = ConfigBuilder("spark.history.store.serializer")
+    .doc("Serializer for writing/reading in-memory UI objects to/from 
disk-based KV Store; " +
+      "JSON or PROTOBUF. JSON serializer is the only choice before Spark 
3.4.0, thus it is the " +
+      "default value. PROTOBUF serializer is fast and compact, and it is the 
default " +
+      "serializer for disk-based KV store of live UI.")
+    .version("3.4.0")
+    .stringConf
+    .transform(_.toUpperCase(Locale.ROOT))
+    .checkValues(LocalStoreSerializer.values.map(_.toString))
+    .createWithDefault(LocalStoreSerializer.JSON.toString)
+
   val MAX_LOCAL_DISK_USAGE = ConfigBuilder("spark.history.store.maxDiskUsage")
     .version("2.3.0")
     .bytesConf(ByteUnit.BYTE)
diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala 
b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
index 42fa25393a3..0dd40962309 100644
--- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala
+++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
@@ -45,8 +45,26 @@ private[spark] object KVUtils extends Logging {
   /** Use this to annotate constructor params to be used as KVStore indices. */
   type KVIndexParam = KVIndex @getter
 
-  private def backend(conf: SparkConf) =
-    HybridStoreDiskBackend.withName(conf.get(HYBRID_STORE_DISK_BACKEND))
+  private def backend(conf: SparkConf, live: Boolean) = {
+    if (live) {
+      // For the disk-based KV store of live UI, let's simply make it ROCKSDB 
only for now,
+      // instead of supporting both LevelDB and RocksDB. RocksDB is built 
based on LevelDB with
+      // improvements on writes and reads.
+      HybridStoreDiskBackend.ROCKSDB
+    } else {
+      HybridStoreDiskBackend.withName(conf.get(HYBRID_STORE_DISK_BACKEND))
+    }
+  }
+
+  private def serializer(conf: SparkConf, live: Boolean) = {
+    if (live) {
+      // For the disk-based KV store of live UI, let's simply use protobuf 
serializer only.
+      // The default serializer is slow since it is using JSON+GZip encoding.
+      new KVStoreProtobufSerializer()
+    } else {
+      serializerForHistoryServer(conf)
+    }
+  }
 
   /**
    * A KVStoreSerializer that provides Scala types serialization too, and uses 
the same options as
@@ -72,12 +90,11 @@ private[spark] object KVUtils extends Logging {
       path: File,
       metadata: M,
       conf: SparkConf,
-      diskBackend: Option[HybridStoreDiskBackend.Value] = None,
-      serializer: Option[KVStoreSerializer] = None): KVStore = {
+      live: Boolean): KVStore = {
     require(metadata != null, "Metadata is required.")
 
-    val kvSerializer = serializer.getOrElse(new KVStoreScalaSerializer())
-    val db = diskBackend.getOrElse(backend(conf)) match {
+    val kvSerializer = serializer(conf, live)
+    val db = backend(conf, live) match {
       case LEVELDB => new LevelDB(path, kvSerializer)
       case ROCKSDB => new RocksDB(path, kvSerializer)
     }
@@ -92,27 +109,23 @@ private[spark] object KVUtils extends Logging {
     db
   }
 
+  def serializerForHistoryServer(conf: SparkConf): KVStoreScalaSerializer = {
+    
History.LocalStoreSerializer.withName(conf.get(History.LOCAL_STORE_SERIALIZER)) 
match {
+      case History.LocalStoreSerializer.JSON =>
+        new KVStoreScalaSerializer()
+      case History.LocalStoreSerializer.PROTOBUF =>
+        new KVStoreProtobufSerializer()
+      case other =>
+        throw new IllegalArgumentException(s"Unrecognized KV store serializer 
$other")
+    }
+  }
+
   def createKVStore(
       storePath: Option[File],
       live: Boolean,
       conf: SparkConf): KVStore = {
     storePath.map { path =>
-      val diskBackend = if (live) {
-        // For the disk-based KV store of live UI, let's simply make it 
ROCKSDB only for now,
-        // instead of supporting both LevelDB and RocksDB. RocksDB is built 
based on LevelDB with
-        // improvements on writes and reads.
-        HybridStoreDiskBackend.ROCKSDB
-      } else {
-        
HybridStoreDiskBackend.withName(conf.get(History.HYBRID_STORE_DISK_BACKEND))
-      }
-
-      val serializer = if (live) {
-        // For the disk-based KV store of live UI, let's simply use protobuf 
serializer only.
-        // The default serializer is slow since it is using JSON+GZip encoding.
-        Some(new KVStoreProtobufSerializer())
-      } else {
-        None
-      }
+      val diskBackend = backend(conf, live)
 
       val dir = diskBackend match {
         case LEVELDB => "listing.ldb"
@@ -128,7 +141,7 @@ private[spark] object KVUtils extends Logging {
         conf.get(History.HISTORY_LOG_DIR))
 
       try {
-        open(dbPath, metadata, conf, Some(diskBackend), serializer)
+        open(dbPath, metadata, conf, live)
       } catch {
         // If there's an error, remove the listing database and any existing 
UI database
         // from the store directory, since it's extremely likely that they'll 
all contain
@@ -136,12 +149,12 @@ private[spark] object KVUtils extends Logging {
         case _: UnsupportedStoreVersionException | _: 
MetadataMismatchException =>
           logInfo("Detected incompatible DB versions, deleting...")
           path.listFiles().foreach(Utils.deleteRecursively)
-          open(dbPath, metadata, conf, Some(diskBackend), serializer)
+          open(dbPath, metadata, conf, live)
         case dbExc @ (_: NativeDB.DBException | _: RocksDBException) =>
           // Get rid of the corrupted data and re-create it.
           logWarning(s"Failed to load disk store $dbPath :", dbExc)
           Utils.deleteRecursively(dbPath)
-          open(dbPath, metadata, conf, Some(diskBackend), serializer)
+          open(dbPath, metadata, conf, live)
       }
     }.getOrElse(new InMemoryStore())
   }
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index f408f63be5e..4e026486e84 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -51,6 +51,7 @@ import org.apache.spark.status.AppStatusStore
 import org.apache.spark.status.KVUtils
 import org.apache.spark.status.KVUtils.KVStoreScalaSerializer
 import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
+import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
 import org.apache.spark.tags.ExtendedLevelDBTest
 import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}
 import org.apache.spark.util.kvstore.InMemoryStore
@@ -75,6 +76,8 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite 
with Matchers with P
 
   protected def diskBackend: HybridStoreDiskBackend.Value
 
+  protected def serializer: LocalStoreSerializer.Value = 
LocalStoreSerializer.JSON
+
   /** Create a fake log file using the new log format used in Spark 1.3+ */
   private def newLogFile(
       appId: String,
@@ -97,6 +100,17 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite 
with Matchers with P
     testAppLogParsing(false, true)
   }
 
+  test("SPARK-41685: Verify the configurable serializer for history server") {
+    val conf = createTestConf()
+    val serializerOfKVStore = KVUtils.serializerForHistoryServer(conf)
+    assert(serializerOfKVStore.isInstanceOf[KVStoreScalaSerializer])
+    if (serializer == LocalStoreSerializer.JSON) {
+      assert(!serializerOfKVStore.isInstanceOf[KVStoreProtobufSerializer])
+    } else {
+      assert(serializerOfKVStore.isInstanceOf[KVStoreProtobufSerializer])
+    }
+  }
+
   private def testAppLogParsing(inMemory: Boolean, useHybridStore: Boolean = 
false): Unit = {
     val clock = new ManualClock(12345678)
     val conf = createTestConf(inMemory = inMemory, useHybridStore = 
useHybridStore)
@@ -1810,6 +1824,7 @@ abstract class FsHistoryProviderSuite extends 
SparkFunSuite with Matchers with P
     }
     conf.set(HYBRID_STORE_ENABLED, useHybridStore)
     conf.set(HYBRID_STORE_DISK_BACKEND.key, diskBackend.toString)
+    conf.set(LOCAL_STORE_SERIALIZER.key, serializer.toString)
 
     conf
   }
@@ -1865,7 +1880,16 @@ class LevelDBBackendFsHistoryProviderSuite extends 
FsHistoryProviderSuite {
     HybridStoreDiskBackend.LEVELDB
 }
 
+@ExtendedLevelDBTest
+class LevelDBBackendWithProtobufSerializerSuite extends 
LevelDBBackendFsHistoryProviderSuite {
+  override protected def serializer: LocalStoreSerializer.Value = 
LocalStoreSerializer.PROTOBUF
+}
+
 class RocksDBBackendFsHistoryProviderSuite extends FsHistoryProviderSuite {
   override protected def diskBackend: HybridStoreDiskBackend.Value =
     HybridStoreDiskBackend.ROCKSDB
 }
+
+class RocksDBBackendWithProtobufSerializerSuite extends 
RocksDBBackendFsHistoryProviderSuite {
+  override protected def serializer: LocalStoreSerializer.Value = 
LocalStoreSerializer.PROTOBUF
+}
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
index 9d1d741ba13..373d1c557fc 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
@@ -50,7 +50,7 @@ abstract class HistoryServerDiskManagerSuite extends 
SparkFunSuite with BeforeAn
 
   before {
     testDir = Utils.createTempDir()
-    store = KVUtils.open(new File(testDir, "listing"), "test", conf)
+    store = KVUtils.open(new File(testDir, "listing"), "test", conf, live = 
false)
   }
 
   after {
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 5d0c25aa86a..6562cc317af 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -34,7 +34,6 @@ import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster._
 import org.apache.spark.status.ListenerEventsTestHelper._
 import org.apache.spark.status.api.v1
-import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
 import org.apache.spark.storage._
 import org.apache.spark.tags.ExtendedLevelDBTest
 import org.apache.spark.util.Utils
@@ -53,7 +52,8 @@ abstract class AppStatusListenerSuite extends SparkFunSuite 
with BeforeAndAfter
     .set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
     .set(ASYNC_TRACKING_ENABLED, false)
 
-  protected def createKVStore: KVStore = KVUtils.open(testDir, 
getClass().getName(), conf)
+  protected def createKVStore: KVStore =
+    KVUtils.open(testDir, getClass().getName(), conf, live = false)
 
   before {
     time = 0L
@@ -1973,6 +1973,5 @@ class AppStatusListenerWithProtobufSerializerSuite 
extends AppStatusListenerSuit
       testDir,
       getClass().getName(),
       conf,
-      Some(HybridStoreDiskBackend.ROCKSDB),
-      Some(new KVStoreProtobufSerializer()))
+      live = true)
 }
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
index ac257bb0fb8..b257bcccf73 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
@@ -106,7 +106,7 @@ class AppStatusStoreSuite extends SparkFunSuite {
     val store: KVStore = if (disk) {
       conf.set(HYBRID_STORE_DISK_BACKEND, diskStoreType.toString)
       val testDir = Utils.createTempDir()
-      val diskStore = KVUtils.open(testDir, getClass.getName, conf)
+      val diskStore = KVUtils.open(testDir, getClass.getName, conf, live = 
false)
       new ElementTrackingStore(diskStore, conf)
     } else {
       new ElementTrackingStore(new InMemoryStore, conf)
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 14048afbe0f..804bde92060 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -341,6 +341,16 @@ Security options for the Spark History Server are covered 
more detail in the
     </td>
     <td>2.3.0</td>
   </tr>
+  <tr>
+    <td>spark.history.store.serializer</td>
+    <td>JSON</td>
+    <td>
+        Serializer for writing/reading in-memory UI objects to/from disk-based 
KV Store; JSON or PROTOBUF.
+        JSON serializer is the only choice before Spark 3.4.0, thus it is the 
default value.
+        PROTOBUF serializer is fast and compact, compared to the JSON 
serializer.
+    </td>
+    <td>3.4.0</td>
+  </tr>
   <tr>
     <td>spark.history.custom.executor.log.url</td>
     <td>(none)</td>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
index 1a51b58f4f6..7781c4276c7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListenerSuite.scala
@@ -226,7 +226,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
     val conf = new SparkConf()
       .set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.LEVELDB.toString)
     val testDir = Utils.createTempDir()
-    val kvStore = KVUtils.open(testDir, getClass.getName, conf)
+    val kvStore = KVUtils.open(testDir, getClass.getName, conf, live = false)
     try {
       testStreamingQueryData(kvStore)
     } finally {
@@ -239,7 +239,7 @@ class StreamingQueryStatusListenerSuite extends StreamTest {
     val conf = new SparkConf()
       .set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.ROCKSDB.toString)
     val testDir = Utils.createTempDir()
-    val kvStore = KVUtils.open(testDir, getClass.getName, conf)
+    val kvStore = KVUtils.open(testDir, getClass.getName, conf, live = false)
     try {
       testStreamingQueryData(kvStore)
     } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to