This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 78532fd7c6d [SPARK-41244][UI] Introducing a Protobuf serializer for UI 
data on KV store
78532fd7c6d is described below

commit 78532fd7c6d741d501fcc7d375471917d0f79328
Author: Gengliang Wang <[email protected]>
AuthorDate: Mon Dec 5 21:46:31 2022 -0800

    [SPARK-41244][UI] Introducing a Protobuf serializer for UI data on KV store
    
    ### What changes were proposed in this pull request?
    
    Introducing Protobuf serializer for KV store, which is 3 times as fast as 
the default serializer according to end-to-end benchmark against RocksDB.
    | Serializer                       | Avg Write time(μs) | Avg Read time(μs) 
| RocksDB File Total Size(MB) | Result total size in memory(MB) |
    
|----------------------------------|--------------------|-------------------|-----------------------------|---------------------------------|
    | Spark’s KV Serializer(JSON+gzip) | 352.2              | 119.26            
| 837                         | 868                             |
    | Protobuf                         | 109.9              | 34.3              
| 858                         | 2105                            |
    
    To move fast and make PR review easier, this PR will:
    * Cover the class `JobDataWrapper` only. We can handle more UI data later.
    * Not adding configuration for setting serializer in SHS. We will have it 
as a follow-up.
    
    ### Why are the changes needed?
    
    A faster serializer for KV store. It supports schema evolution so that in 
the future SHS can leverage it as well.
    More details in the SPIP: 
https://docs.google.com/document/d/1cuKnFwlTodyVhUQPMuakq2YDaLH05jaY9FRu_aD1zMo/edit
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Closes #38779 from gengliangwang/protobuf.
    
    Authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../spark/util/kvstore/KVStoreSerializer.java      |   4 +-
 connector/protobuf/pom.xml                         |   2 +-
 core/pom.xml                                       |  49 ++++++++-
 .../apache/spark/status/protobuf/store_types.proto |  57 ++++++++++
 .../spark/deploy/history/FsHistoryProvider.scala   |   2 +-
 .../org/apache/spark/status/AppStatusStore.scala   |   6 +-
 .../scala/org/apache/spark/status/KVUtils.scala    |  34 ++++--
 .../status/protobuf/JobDataWrapperSerializer.scala | 119 +++++++++++++++++++++
 .../protobuf/KVStoreProtobufSerializer.scala       |  34 ++++++
 .../spark/status/AppStatusListenerSuite.scala      |  14 ++-
 .../protobuf/KVStoreProtobufSerializerSuite.scala  |  81 ++++++++++++++
 pom.xml                                            |   1 +
 project/SparkBuild.scala                           |  14 +++
 13 files changed, 398 insertions(+), 19 deletions(-)

diff --git 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
index ff99d052cf7..02dd73e1a2f 100644
--- 
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
+++ 
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
@@ -49,7 +49,7 @@ public class KVStoreSerializer {
     this.mapper = new ObjectMapper();
   }
 
-  public final byte[] serialize(Object o) throws Exception {
+  public byte[] serialize(Object o) throws Exception {
     if (o instanceof String) {
       return ((String) o).getBytes(UTF_8);
     } else {
@@ -62,7 +62,7 @@ public class KVStoreSerializer {
   }
 
   @SuppressWarnings("unchecked")
-  public final <T> T deserialize(byte[] data, Class<T> klass) throws Exception 
{
+  public <T> T deserialize(byte[] data, Class<T> klass) throws Exception {
     if (klass.equals(String.class)) {
       return (T) new String(data, UTF_8);
     } else {
diff --git a/connector/protobuf/pom.xml b/connector/protobuf/pom.xml
index 7057e6148d4..3036fcbf256 100644
--- a/connector/protobuf/pom.xml
+++ b/connector/protobuf/pom.xml
@@ -122,7 +122,7 @@
           <plugin>
             <groupId>com.github.os72</groupId>
             <artifactId>protoc-jar-maven-plugin</artifactId>
-            <version>3.11.4</version>
+            <version>${protoc-jar-maven-plugin.version}</version>
             <!-- Generates Java classes for tests. TODO(Raghu): Generate 
descriptor files too. -->
             <executions>
               <execution>
diff --git a/core/pom.xml b/core/pom.xml
index 182cab90427..a9b40acf5a3 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -532,7 +532,12 @@
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-crypto</artifactId>
     </dependency>
-
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>${protobuf.version}</version>
+      <scope>compile</scope>
+    </dependency>
   </dependencies>
   <build>
     
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
@@ -616,6 +621,48 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <configuration>
+          <shadedArtifactAttached>false</shadedArtifactAttached>
+          <shadeTestJar>true</shadeTestJar>
+          <artifactSet>
+            <includes>
+              <include>com.google.protobuf:*</include>
+            </includes>
+          </artifactSet>
+          <relocations>
+            <relocation>
+              <pattern>com.google.protobuf</pattern>
+              
<shadedPattern>${spark.shade.packageName}.spark-core.protobuf</shadedPattern>
+              <includes>
+                <include>com.google.protobuf.**</include>
+              </includes>
+            </relocation>
+          </relocations>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>com.github.os72</groupId>
+        <artifactId>protoc-jar-maven-plugin</artifactId>
+        <version>${protoc-jar-maven-plugin.version}</version>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
+              <protocVersion>${protobuf.version}</protocVersion>
+              <inputDirectories>
+                <include>src/main/protobuf</include>
+              </inputDirectories>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 
diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
new file mode 100644
index 00000000000..4ad5d1e7527
--- /dev/null
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+package org.apache.spark.status.protobuf;
+
+enum JobExecutionStatus {
+  UNSPECIFIED = 0;
+  RUNNING = 1;
+  SUCCEEDED = 2;
+  FAILED = 3;
+  UNKNOWN = 4;
+}
+
+message JobData {
+  // All IDs are int64 for extendability, even when they are currently int32 
in Spark.
+  int64 job_id = 1;
+  string name = 2;
+  optional string description = 3;
+  optional int64 submission_time = 4;
+  optional int64 completion_time = 5;
+  repeated int64 stage_ids = 6;
+  optional string job_group = 7;
+  JobExecutionStatus status = 8;
+  int32 num_tasks = 9;
+  int32 num_active_tasks = 10;
+  int32 num_completed_tasks = 11;
+  int32 num_skipped_tasks = 12;
+  int32 num_failed_tasks = 13;
+  int32 num_killed_tasks = 14;
+  int32 num_completed_indices = 15;
+  int32 num_active_stages = 16;
+  int32 num_completed_stages = 17;
+  int32 num_skipped_stages = 18;
+  int32 num_failed_stages = 19;
+  map<string, int32> kill_tasks_summary = 20;
+}
+
+message JobDataWrapper {
+  JobData info = 1;
+  repeated int32 skipped_stages = 2;
+  optional int64 sql_execution_id = 3;
+}
\ No newline at end of file
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index c10bec5c960..ad4c727c597 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -133,7 +133,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 
   // Visible for testing.
   private[history] val listing: KVStore = {
-    KVUtils.createKVStore(storePath, hybridStoreDiskBackend, conf)
+    KVUtils.createKVStore(storePath, live = false, conf)
   }
 
   private val diskManager = storePath.map { path =>
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 707c1829440..ebf52189796 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -24,7 +24,6 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 
 import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext}
-import org.apache.spark.internal.config.History.HybridStoreDiskBackend
 import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
 import org.apache.spark.status.api.v1
 import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID
@@ -773,10 +772,7 @@ private[spark] object AppStatusStore {
       conf: SparkConf,
       appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
     val storePath = conf.get(LIVE_UI_LOCAL_STORE_DIR).map(new File(_))
-    // For the disk-based KV store of live UI, let's simply make it ROCKSDB 
only for now,
-    // instead of supporting both LevelDB and RocksDB. RocksDB is built based 
on LevelDB with
-    // improvements on writes and reads.
-    val kvStore = KVUtils.createKVStore(storePath, 
HybridStoreDiskBackend.ROCKSDB, conf)
+    val kvStore = KVUtils.createKVStore(storePath, live = true, conf)
     val store = new ElementTrackingStore(kvStore, conf)
     val listener = new AppStatusListener(store, conf, true, appStatusSource)
     new AppStatusStore(store, listener = Some(listener))
diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala 
b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
index 182996ac1fb..42fa25393a3 100644
--- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala
+++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
@@ -36,6 +36,7 @@ import org.apache.spark.internal.config.History
 import org.apache.spark.internal.config.History.HYBRID_STORE_DISK_BACKEND
 import org.apache.spark.internal.config.History.HybridStoreDiskBackend
 import org.apache.spark.internal.config.History.HybridStoreDiskBackend._
+import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
 import org.apache.spark.util.Utils
 import org.apache.spark.util.kvstore._
 
@@ -71,12 +72,14 @@ private[spark] object KVUtils extends Logging {
       path: File,
       metadata: M,
       conf: SparkConf,
-      diskBackend: Option[HybridStoreDiskBackend.Value] = None): KVStore = {
+      diskBackend: Option[HybridStoreDiskBackend.Value] = None,
+      serializer: Option[KVStoreSerializer] = None): KVStore = {
     require(metadata != null, "Metadata is required.")
 
+    val kvSerializer = serializer.getOrElse(new KVStoreScalaSerializer())
     val db = diskBackend.getOrElse(backend(conf)) match {
-      case LEVELDB => new LevelDB(path, new KVStoreScalaSerializer())
-      case ROCKSDB => new RocksDB(path, new KVStoreScalaSerializer())
+      case LEVELDB => new LevelDB(path, kvSerializer)
+      case ROCKSDB => new RocksDB(path, kvSerializer)
     }
     val dbMeta = db.getMetadata(classTag[M].runtimeClass)
     if (dbMeta == null) {
@@ -91,9 +94,26 @@ private[spark] object KVUtils extends Logging {
 
   def createKVStore(
       storePath: Option[File],
-      diskBackend: HybridStoreDiskBackend.Value,
+      live: Boolean,
       conf: SparkConf): KVStore = {
     storePath.map { path =>
+      val diskBackend = if (live) {
+        // For the disk-based KV store of live UI, let's simply make it 
ROCKSDB only for now,
+        // instead of supporting both LevelDB and RocksDB. RocksDB is built 
based on LevelDB with
+        // improvements on writes and reads.
+        HybridStoreDiskBackend.ROCKSDB
+      } else {
+        
HybridStoreDiskBackend.withName(conf.get(History.HYBRID_STORE_DISK_BACKEND))
+      }
+
+      val serializer = if (live) {
+        // For the disk-based KV store of live UI, let's simply use protobuf 
serializer only.
+        // The default serializer is slow since it is using JSON+GZip encoding.
+        Some(new KVStoreProtobufSerializer())
+      } else {
+        None
+      }
+
       val dir = diskBackend match {
         case LEVELDB => "listing.ldb"
         case ROCKSDB => "listing.rdb"
@@ -108,7 +128,7 @@ private[spark] object KVUtils extends Logging {
         conf.get(History.HISTORY_LOG_DIR))
 
       try {
-        open(dbPath, metadata, conf, Some(diskBackend))
+        open(dbPath, metadata, conf, Some(diskBackend), serializer)
       } catch {
         // If there's an error, remove the listing database and any existing 
UI database
         // from the store directory, since it's extremely likely that they'll 
all contain
@@ -116,12 +136,12 @@ private[spark] object KVUtils extends Logging {
         case _: UnsupportedStoreVersionException | _: 
MetadataMismatchException =>
           logInfo("Detected incompatible DB versions, deleting...")
           path.listFiles().foreach(Utils.deleteRecursively)
-          open(dbPath, metadata, conf, Some(diskBackend))
+          open(dbPath, metadata, conf, Some(diskBackend), serializer)
         case dbExc @ (_: NativeDB.DBException | _: RocksDBException) =>
           // Get rid of the corrupted data and re-create it.
           logWarning(s"Failed to load disk store $dbPath :", dbExc)
           Utils.deleteRecursively(dbPath)
-          open(dbPath, metadata, conf, Some(diskBackend))
+          open(dbPath, metadata, conf, Some(diskBackend), serializer)
       }
     }.getOrElse(new InMemoryStore())
   }
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
new file mode 100644
index 00000000000..5c2752fa6ce
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import collection.JavaConverters._
+import java.util.Date
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.status.JobDataWrapper
+import org.apache.spark.status.api.v1.JobData
+
+object JobDataWrapperSerializer {
+  def serialize(j: JobDataWrapper): Array[Byte] = {
+    val jobData = serializeJobData(j.info)
+    val builder = StoreTypes.JobDataWrapper.newBuilder()
+    builder.setInfo(jobData)
+    j.skippedStages.foreach(builder.addSkippedStages)
+    j.sqlExecutionId.foreach(builder.setSqlExecutionId)
+    builder.build().toByteArray
+  }
+
+  def deserialize(bytes: Array[Byte]): JobDataWrapper = {
+    val wrapper = StoreTypes.JobDataWrapper.parseFrom(bytes)
+    val sqlExecutionId = getOptional(wrapper.hasSqlExecutionId, 
wrapper.getSqlExecutionId)
+    new JobDataWrapper(
+      deserializeJobData(wrapper.getInfo),
+      wrapper.getSkippedStagesList.asScala.map(_.toInt).toSet,
+      sqlExecutionId
+    )
+  }
+
+  private def getOptional[T](condition: Boolean, result: () => T): Option[T] = 
if (condition) {
+    Some(result())
+  } else {
+    None
+  }
+
+  private def serializeJobData(jobData: JobData): StoreTypes.JobData = {
+    val jobDataBuilder = StoreTypes.JobData.newBuilder()
+    jobDataBuilder.setJobId(jobData.jobId.toLong)
+      .setName(jobData.name)
+      .setStatus(serializeJobExecutionStatus(jobData.status))
+      .setNumTasks(jobData.numTasks)
+      .setNumActiveTasks(jobData.numActiveTasks)
+      .setNumCompletedTasks(jobData.numCompletedTasks)
+      .setNumSkippedTasks(jobData.numSkippedTasks)
+      .setNumFailedTasks(jobData.numFailedTasks)
+      .setNumKilledTasks(jobData.numKilledTasks)
+      .setNumCompletedIndices(jobData.numCompletedIndices)
+      .setNumActiveStages(jobData.numActiveStages)
+      .setNumCompletedStages(jobData.numCompletedStages)
+      .setNumSkippedStages(jobData.numSkippedStages)
+      .setNumFailedStages(jobData.numFailedStages)
+
+    jobData.description.foreach(jobDataBuilder.setDescription)
+    jobData.submissionTime.foreach { d =>
+      jobDataBuilder.setSubmissionTime(d.getTime)
+    }
+    jobData.completionTime.foreach { d =>
+      jobDataBuilder.setCompletionTime(d.getTime)
+    }
+    jobData.stageIds.foreach(id => jobDataBuilder.addStageIds(id.toLong))
+    jobData.jobGroup.foreach(jobDataBuilder.setJobGroup)
+    jobData.killedTasksSummary.foreach { entry =>
+      jobDataBuilder.putKillTasksSummary(entry._1, entry._2)
+    }
+    jobDataBuilder.build()
+  }
+
+  private def deserializeJobData(info: StoreTypes.JobData): JobData = {
+    val description = getOptional(info.hasDescription, info.getDescription)
+    val submissionTime =
+      getOptional(info.hasSubmissionTime, () => new 
Date(info.getSubmissionTime))
+    val completionTime = getOptional(info.hasCompletionTime, () => new 
Date(info.getCompletionTime))
+    val jobGroup = getOptional(info.hasJobGroup, info.getJobGroup)
+    val status = JobExecutionStatus.valueOf(info.getStatus.toString)
+
+    new JobData(
+      jobId = info.getJobId.toInt,
+      name = info.getName,
+      description = description,
+      submissionTime = submissionTime,
+      completionTime = completionTime,
+      stageIds = info.getStageIdsList.asScala.map(_.toInt).toSeq,
+      jobGroup = jobGroup,
+      status = status,
+      numTasks = info.getNumTasks,
+      numActiveTasks = info.getNumActiveTasks,
+      numCompletedTasks = info.getNumCompletedTasks,
+      numSkippedTasks = info.getNumSkippedTasks,
+      numFailedTasks = info.getNumFailedTasks,
+      numKilledTasks = info.getNumKilledTasks,
+      numCompletedIndices = info.getNumCompletedIndices,
+      numActiveStages = info.getNumActiveStages,
+      numCompletedStages = info.getNumCompletedStages,
+      numSkippedStages = info.getNumSkippedStages,
+      numFailedStages = info.getNumFailedStages,
+      killedTasksSummary = 
info.getKillTasksSummaryMap.asScala.mapValues(_.toInt).toMap)
+  }
+
+  private def serializeJobExecutionStatus(j: JobExecutionStatus): 
StoreTypes.JobExecutionStatus = {
+    StoreTypes.JobExecutionStatus.valueOf(j.toString)
+  }
+}
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala
new file mode 100644
index 00000000000..2173821a219
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import org.apache.spark.status.JobDataWrapper
+import org.apache.spark.status.KVUtils.KVStoreScalaSerializer
+
+private[spark] class KVStoreProtobufSerializer extends KVStoreScalaSerializer {
+  override def serialize(o: Object): Array[Byte] = o match {
+    case j: JobDataWrapper => JobDataWrapperSerializer.serialize(j)
+    case other => super.serialize(other)
+  }
+
+  override def deserialize[T](data: Array[Byte], klass: Class[T]): T = klass 
match {
+    case _ if classOf[JobDataWrapper].isAssignableFrom(klass) =>
+      JobDataWrapperSerializer.deserialize(data).asInstanceOf[T]
+    case other => super.deserialize(data, klass)
+  }
+}
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index ec92877ce94..24a8a6844f1 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.status
 import java.io.File
 import java.util.{Date, Properties}
 
-import scala.collection.immutable.Map
 import scala.reflect.{classTag, ClassTag}
 
 import org.scalatest.BeforeAndAfter
@@ -35,6 +34,7 @@ import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster._
 import org.apache.spark.status.ListenerEventsTestHelper._
 import org.apache.spark.status.api.v1
+import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
 import org.apache.spark.storage._
 import org.apache.spark.tags.ExtendedLevelDBTest
 import org.apache.spark.util.Utils
@@ -45,7 +45,7 @@ abstract class AppStatusListenerSuite extends SparkFunSuite 
with BeforeAndAfter
   private val twoReplicaMemAndDiskLevel = StorageLevel(true, true, false, 
true, 2)
 
   private var time: Long = _
-  private var testDir: File = _
+  protected var testDir: File = _
   private var store: ElementTrackingStore = _
   private var taskIdTracker = -1L
 
@@ -1904,3 +1904,13 @@ class AppStatusListenerWithRocksDBSuite extends 
AppStatusListenerSuite {
   override def conf: SparkConf = super.conf
     .set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.ROCKSDB.toString)
 }
+
+class AppStatusListenerWithProtobufSerializerSuite extends 
AppStatusListenerSuite {
+  override def createKVStore: KVStore =
+    KVUtils.open(
+      testDir,
+      getClass().getName(),
+      conf,
+      Some(HybridStoreDiskBackend.ROCKSDB),
+      Some(new KVStoreProtobufSerializer()))
+}
diff --git 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
new file mode 100644
index 00000000000..7ca446e9a94
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import java.util.Date
+
+import org.apache.spark.{JobExecutionStatus, SparkFunSuite}
+import org.apache.spark.status.JobDataWrapper
+import org.apache.spark.status.api.v1.JobData
+
+class KVStoreProtobufSerializerSuite extends SparkFunSuite {
+  private val serializer = new KVStoreProtobufSerializer()
+
+  test("Job data") {
+    val input = new JobDataWrapper(
+      new JobData(
+        jobId = 1,
+        name = "test",
+        description = Some("test description"),
+        submissionTime = Some(new Date(123456L)),
+        completionTime = Some(new Date(654321L)),
+        stageIds = Seq(1, 2, 3, 4),
+        jobGroup = Some("group"),
+        status = JobExecutionStatus.UNKNOWN,
+        numTasks = 2,
+        numActiveTasks = 3,
+        numCompletedTasks = 4,
+        numSkippedTasks = 5,
+        numFailedTasks = 6,
+        numKilledTasks = 7,
+        numCompletedIndices = 8,
+        numActiveStages = 9,
+        numCompletedStages = 10,
+        numSkippedStages = 11,
+        numFailedStages = 12,
+        killedTasksSummary = Map("a" -> 1, "b" -> 2)),
+      Set(1, 2),
+      Some(999)
+    )
+
+    val bytes = serializer.serialize(input)
+    val result = serializer.deserialize(bytes, classOf[JobDataWrapper])
+    assert(result.info.jobId == input.info.jobId)
+    assert(result.info.description == input.info.description)
+    assert(result.info.submissionTime == input.info.submissionTime)
+    assert(result.info.completionTime == input.info.completionTime)
+    assert(result.info.stageIds == input.info.stageIds)
+    assert(result.info.jobGroup == input.info.jobGroup)
+    assert(result.info.status == input.info.status)
+    assert(result.info.numTasks == input.info.numTasks)
+    assert(result.info.numActiveTasks == input.info.numActiveTasks)
+    assert(result.info.numCompletedTasks == input.info.numCompletedTasks)
+    assert(result.info.numSkippedTasks == input.info.numSkippedTasks)
+    assert(result.info.numFailedTasks == input.info.numFailedTasks)
+    assert(result.info.numKilledTasks == input.info.numKilledTasks)
+    assert(result.info.numCompletedIndices == input.info.numCompletedIndices)
+    assert(result.info.numActiveStages == input.info.numActiveStages)
+    assert(result.info.numCompletedStages == input.info.numCompletedStages)
+    assert(result.info.numSkippedStages == input.info.numSkippedStages)
+    assert(result.info.numFailedStages == input.info.numFailedStages)
+    assert(result.info.killedTasksSummary == input.info.killedTasksSummary)
+    assert(result.skippedStages == input.skippedStages)
+    assert(result.sqlExecutionId == input.sqlExecutionId)
+  }
+}
+
diff --git a/pom.xml b/pom.xml
index 0639370981c..17b5c99204f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,6 +123,7 @@
     
<protobuf.hadoopDependency.version>2.5.0</protobuf.hadoopDependency.version>
     <!-- Actual Protobuf version in Spark modules like Spark Connect, protobuf 
connector, etc. -->
     <protobuf.version>3.21.9</protobuf.version>
+    <protoc-jar-maven-plugin.version>3.11.4</protoc-jar-maven-plugin.version>
     <yarn.version>${hadoop.version}</yarn.version>
     <zookeeper.version>3.6.3</zookeeper.version>
     <curator.version>2.13.0</curator.version>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 28aef586c50..a38ba7db519 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -613,9 +613,23 @@ object SparkParallelTestGrouping {
 
 object Core {
   import scala.sys.process.Process
+  import BuildCommons.protoVersion
   def buildenv = Process(Seq("uname")).!!.trim.replaceFirst("[^A-Za-z0-9].*", 
"").toLowerCase
   def bashpath = Process(Seq("where", 
"bash")).!!.split("[\r\n]+").head.replace('\\', '/')
   lazy val settings = Seq(
+    // Setting version for the protobuf compiler. This has to be propagated to 
every sub-project
+    // even if the project is not using it.
+    PB.protocVersion := BuildCommons.protoVersion,
+    // For some reason the resolution from the imported Maven build does not 
work for some
+    // of these dependendencies that we need to shade later on.
+    libraryDependencies ++= {
+      Seq(
+        "com.google.protobuf" % "protobuf-java" % protoVersion % "protobuf"
+      )
+    },
+    (Compile / PB.targets) := Seq(
+      PB.gens.java -> (Compile / sourceManaged).value
+    ),
     (Compile / resourceGenerators) += Def.task {
       val buildScript = baseDirectory.value + "/../build/spark-build-info"
       val targetDir = baseDirectory.value + "/target/extra-resources/"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to