This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 78532fd7c6d [SPARK-41244][UI] Introducing a Protobuf serializer for UI
data on KV store
78532fd7c6d is described below
commit 78532fd7c6d741d501fcc7d375471917d0f79328
Author: Gengliang Wang <[email protected]>
AuthorDate: Mon Dec 5 21:46:31 2022 -0800
[SPARK-41244][UI] Introducing a Protobuf serializer for UI data on KV store
### What changes were proposed in this pull request?
Introducing Protobuf serializer for KV store, which is 3 times as fast as
the default serializer according to end-to-end benchmark against RocksDB.
| Serializer | Avg Write time(μs) | Avg Read time(μs)
| RocksDB File Total Size(MB) | Result total size in memory(MB) |
|----------------------------------|--------------------|-------------------|-----------------------------|---------------------------------|
| Spark’s KV Serializer(JSON+gzip) | 352.2 | 119.26
| 837 | 868 |
| Protobuf | 109.9 | 34.3
| 858 | 2105 |
To move fast and make PR review easier, this PR will:
* Cover the class `JobDataWrapper` only. We can handle more UI data later.
* Not adding configuration for setting serializer in SHS. We will have it
as a follow-up.
### Why are the changes needed?
A faster serializer for KV store. It supports schema evolution so that in
the future SHS can leverage it as well.
More details in the SPIP:
https://docs.google.com/document/d/1cuKnFwlTodyVhUQPMuakq2YDaLH05jaY9FRu_aD1zMo/edit
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Closes #38779 from gengliangwang/protobuf.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../spark/util/kvstore/KVStoreSerializer.java | 4 +-
connector/protobuf/pom.xml | 2 +-
core/pom.xml | 49 ++++++++-
.../apache/spark/status/protobuf/store_types.proto | 57 ++++++++++
.../spark/deploy/history/FsHistoryProvider.scala | 2 +-
.../org/apache/spark/status/AppStatusStore.scala | 6 +-
.../scala/org/apache/spark/status/KVUtils.scala | 34 ++++--
.../status/protobuf/JobDataWrapperSerializer.scala | 119 +++++++++++++++++++++
.../protobuf/KVStoreProtobufSerializer.scala | 34 ++++++
.../spark/status/AppStatusListenerSuite.scala | 14 ++-
.../protobuf/KVStoreProtobufSerializerSuite.scala | 81 ++++++++++++++
pom.xml | 1 +
project/SparkBuild.scala | 14 +++
13 files changed, 398 insertions(+), 19 deletions(-)
diff --git
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
index ff99d052cf7..02dd73e1a2f 100644
---
a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
+++
b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStoreSerializer.java
@@ -49,7 +49,7 @@ public class KVStoreSerializer {
this.mapper = new ObjectMapper();
}
- public final byte[] serialize(Object o) throws Exception {
+ public byte[] serialize(Object o) throws Exception {
if (o instanceof String) {
return ((String) o).getBytes(UTF_8);
} else {
@@ -62,7 +62,7 @@ public class KVStoreSerializer {
}
@SuppressWarnings("unchecked")
- public final <T> T deserialize(byte[] data, Class<T> klass) throws Exception
{
+ public <T> T deserialize(byte[] data, Class<T> klass) throws Exception {
if (klass.equals(String.class)) {
return (T) new String(data, UTF_8);
} else {
diff --git a/connector/protobuf/pom.xml b/connector/protobuf/pom.xml
index 7057e6148d4..3036fcbf256 100644
--- a/connector/protobuf/pom.xml
+++ b/connector/protobuf/pom.xml
@@ -122,7 +122,7 @@
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
- <version>3.11.4</version>
+ <version>${protoc-jar-maven-plugin.version}</version>
<!-- Generates Java classes for tests. TODO(Raghu): Generate
descriptor files too. -->
<executions>
<execution>
diff --git a/core/pom.xml b/core/pom.xml
index 182cab90427..a9b40acf5a3 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -532,7 +532,12 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
@@ -616,6 +621,48 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <shadedArtifactAttached>false</shadedArtifactAttached>
+ <shadeTestJar>true</shadeTestJar>
+ <artifactSet>
+ <includes>
+ <include>com.google.protobuf:*</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>com.google.protobuf</pattern>
+
<shadedPattern>${spark.shade.packageName}.spark-core.protobuf</shadedPattern>
+ <includes>
+ <include>com.google.protobuf.**</include>
+ </includes>
+ </relocation>
+ </relocations>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>com.github.os72</groupId>
+ <artifactId>protoc-jar-maven-plugin</artifactId>
+ <version>${protoc-jar-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
+ <protocVersion>${protobuf.version}</protocVersion>
+ <inputDirectories>
+ <include>src/main/protobuf</include>
+ </inputDirectories>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
new file mode 100644
index 00000000000..4ad5d1e7527
--- /dev/null
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+package org.apache.spark.status.protobuf;
+
+enum JobExecutionStatus {
+ UNSPECIFIED = 0;
+ RUNNING = 1;
+ SUCCEEDED = 2;
+ FAILED = 3;
+ UNKNOWN = 4;
+}
+
+message JobData {
+ // All IDs are int64 for extendability, even when they are currently int32
in Spark.
+ int64 job_id = 1;
+ string name = 2;
+ optional string description = 3;
+ optional int64 submission_time = 4;
+ optional int64 completion_time = 5;
+ repeated int64 stage_ids = 6;
+ optional string job_group = 7;
+ JobExecutionStatus status = 8;
+ int32 num_tasks = 9;
+ int32 num_active_tasks = 10;
+ int32 num_completed_tasks = 11;
+ int32 num_skipped_tasks = 12;
+ int32 num_failed_tasks = 13;
+ int32 num_killed_tasks = 14;
+ int32 num_completed_indices = 15;
+ int32 num_active_stages = 16;
+ int32 num_completed_stages = 17;
+ int32 num_skipped_stages = 18;
+ int32 num_failed_stages = 19;
+ map<string, int32> kill_tasks_summary = 20;
+}
+
+message JobDataWrapper {
+ JobData info = 1;
+ repeated int32 skipped_stages = 2;
+ optional int64 sql_execution_id = 3;
+}
\ No newline at end of file
diff --git
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index c10bec5c960..ad4c727c597 100644
---
a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++
b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -133,7 +133,7 @@ private[history] class FsHistoryProvider(conf: SparkConf,
clock: Clock)
// Visible for testing.
private[history] val listing: KVStore = {
- KVUtils.createKVStore(storePath, hybridStoreDiskBackend, conf)
+ KVUtils.createKVStore(storePath, live = false, conf)
}
private val diskManager = storePath.map { path =>
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 707c1829440..ebf52189796 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -24,7 +24,6 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext}
-import org.apache.spark.internal.config.History.HybridStoreDiskBackend
import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
import org.apache.spark.status.api.v1
import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID
@@ -773,10 +772,7 @@ private[spark] object AppStatusStore {
conf: SparkConf,
appStatusSource: Option[AppStatusSource] = None): AppStatusStore = {
val storePath = conf.get(LIVE_UI_LOCAL_STORE_DIR).map(new File(_))
- // For the disk-based KV store of live UI, let's simply make it ROCKSDB
only for now,
- // instead of supporting both LevelDB and RocksDB. RocksDB is built based
on LevelDB with
- // improvements on writes and reads.
- val kvStore = KVUtils.createKVStore(storePath,
HybridStoreDiskBackend.ROCKSDB, conf)
+ val kvStore = KVUtils.createKVStore(storePath, live = true, conf)
val store = new ElementTrackingStore(kvStore, conf)
val listener = new AppStatusListener(store, conf, true, appStatusSource)
new AppStatusStore(store, listener = Some(listener))
diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala
b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
index 182996ac1fb..42fa25393a3 100644
--- a/core/src/main/scala/org/apache/spark/status/KVUtils.scala
+++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
@@ -36,6 +36,7 @@ import org.apache.spark.internal.config.History
import org.apache.spark.internal.config.History.HYBRID_STORE_DISK_BACKEND
import org.apache.spark.internal.config.History.HybridStoreDiskBackend
import org.apache.spark.internal.config.History.HybridStoreDiskBackend._
+import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
import org.apache.spark.util.Utils
import org.apache.spark.util.kvstore._
@@ -71,12 +72,14 @@ private[spark] object KVUtils extends Logging {
path: File,
metadata: M,
conf: SparkConf,
- diskBackend: Option[HybridStoreDiskBackend.Value] = None): KVStore = {
+ diskBackend: Option[HybridStoreDiskBackend.Value] = None,
+ serializer: Option[KVStoreSerializer] = None): KVStore = {
require(metadata != null, "Metadata is required.")
+ val kvSerializer = serializer.getOrElse(new KVStoreScalaSerializer())
val db = diskBackend.getOrElse(backend(conf)) match {
- case LEVELDB => new LevelDB(path, new KVStoreScalaSerializer())
- case ROCKSDB => new RocksDB(path, new KVStoreScalaSerializer())
+ case LEVELDB => new LevelDB(path, kvSerializer)
+ case ROCKSDB => new RocksDB(path, kvSerializer)
}
val dbMeta = db.getMetadata(classTag[M].runtimeClass)
if (dbMeta == null) {
@@ -91,9 +94,26 @@ private[spark] object KVUtils extends Logging {
def createKVStore(
storePath: Option[File],
- diskBackend: HybridStoreDiskBackend.Value,
+ live: Boolean,
conf: SparkConf): KVStore = {
storePath.map { path =>
+ val diskBackend = if (live) {
+ // For the disk-based KV store of live UI, let's simply make it
ROCKSDB only for now,
+ // instead of supporting both LevelDB and RocksDB. RocksDB is built
based on LevelDB with
+ // improvements on writes and reads.
+ HybridStoreDiskBackend.ROCKSDB
+ } else {
+
HybridStoreDiskBackend.withName(conf.get(History.HYBRID_STORE_DISK_BACKEND))
+ }
+
+ val serializer = if (live) {
+ // For the disk-based KV store of live UI, let's simply use protobuf
serializer only.
+ // The default serializer is slow since it is using JSON+GZip encoding.
+ Some(new KVStoreProtobufSerializer())
+ } else {
+ None
+ }
+
val dir = diskBackend match {
case LEVELDB => "listing.ldb"
case ROCKSDB => "listing.rdb"
@@ -108,7 +128,7 @@ private[spark] object KVUtils extends Logging {
conf.get(History.HISTORY_LOG_DIR))
try {
- open(dbPath, metadata, conf, Some(diskBackend))
+ open(dbPath, metadata, conf, Some(diskBackend), serializer)
} catch {
// If there's an error, remove the listing database and any existing
UI database
// from the store directory, since it's extremely likely that they'll
all contain
@@ -116,12 +136,12 @@ private[spark] object KVUtils extends Logging {
case _: UnsupportedStoreVersionException | _:
MetadataMismatchException =>
logInfo("Detected incompatible DB versions, deleting...")
path.listFiles().foreach(Utils.deleteRecursively)
- open(dbPath, metadata, conf, Some(diskBackend))
+ open(dbPath, metadata, conf, Some(diskBackend), serializer)
case dbExc @ (_: NativeDB.DBException | _: RocksDBException) =>
// Get rid of the corrupted data and re-create it.
logWarning(s"Failed to load disk store $dbPath :", dbExc)
Utils.deleteRecursively(dbPath)
- open(dbPath, metadata, conf, Some(diskBackend))
+ open(dbPath, metadata, conf, Some(diskBackend), serializer)
}
}.getOrElse(new InMemoryStore())
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
new file mode 100644
index 00000000000..5c2752fa6ce
--- /dev/null
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import collection.JavaConverters._
+import java.util.Date
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.status.JobDataWrapper
+import org.apache.spark.status.api.v1.JobData
+
+object JobDataWrapperSerializer {
+ def serialize(j: JobDataWrapper): Array[Byte] = {
+ val jobData = serializeJobData(j.info)
+ val builder = StoreTypes.JobDataWrapper.newBuilder()
+ builder.setInfo(jobData)
+ j.skippedStages.foreach(builder.addSkippedStages)
+ j.sqlExecutionId.foreach(builder.setSqlExecutionId)
+ builder.build().toByteArray
+ }
+
+ def deserialize(bytes: Array[Byte]): JobDataWrapper = {
+ val wrapper = StoreTypes.JobDataWrapper.parseFrom(bytes)
+ val sqlExecutionId = getOptional(wrapper.hasSqlExecutionId,
wrapper.getSqlExecutionId)
+ new JobDataWrapper(
+ deserializeJobData(wrapper.getInfo),
+ wrapper.getSkippedStagesList.asScala.map(_.toInt).toSet,
+ sqlExecutionId
+ )
+ }
+
+ private def getOptional[T](condition: Boolean, result: () => T): Option[T] =
if (condition) {
+ Some(result())
+ } else {
+ None
+ }
+
+ private def serializeJobData(jobData: JobData): StoreTypes.JobData = {
+ val jobDataBuilder = StoreTypes.JobData.newBuilder()
+ jobDataBuilder.setJobId(jobData.jobId.toLong)
+ .setName(jobData.name)
+ .setStatus(serializeJobExecutionStatus(jobData.status))
+ .setNumTasks(jobData.numTasks)
+ .setNumActiveTasks(jobData.numActiveTasks)
+ .setNumCompletedTasks(jobData.numCompletedTasks)
+ .setNumSkippedTasks(jobData.numSkippedTasks)
+ .setNumFailedTasks(jobData.numFailedTasks)
+ .setNumKilledTasks(jobData.numKilledTasks)
+ .setNumCompletedIndices(jobData.numCompletedIndices)
+ .setNumActiveStages(jobData.numActiveStages)
+ .setNumCompletedStages(jobData.numCompletedStages)
+ .setNumSkippedStages(jobData.numSkippedStages)
+ .setNumFailedStages(jobData.numFailedStages)
+
+ jobData.description.foreach(jobDataBuilder.setDescription)
+ jobData.submissionTime.foreach { d =>
+ jobDataBuilder.setSubmissionTime(d.getTime)
+ }
+ jobData.completionTime.foreach { d =>
+ jobDataBuilder.setCompletionTime(d.getTime)
+ }
+ jobData.stageIds.foreach(id => jobDataBuilder.addStageIds(id.toLong))
+ jobData.jobGroup.foreach(jobDataBuilder.setJobGroup)
+ jobData.killedTasksSummary.foreach { entry =>
+ jobDataBuilder.putKillTasksSummary(entry._1, entry._2)
+ }
+ jobDataBuilder.build()
+ }
+
+ private def deserializeJobData(info: StoreTypes.JobData): JobData = {
+ val description = getOptional(info.hasDescription, info.getDescription)
+ val submissionTime =
+ getOptional(info.hasSubmissionTime, () => new
Date(info.getSubmissionTime))
+ val completionTime = getOptional(info.hasCompletionTime, () => new
Date(info.getCompletionTime))
+ val jobGroup = getOptional(info.hasJobGroup, info.getJobGroup)
+ val status = JobExecutionStatus.valueOf(info.getStatus.toString)
+
+ new JobData(
+ jobId = info.getJobId.toInt,
+ name = info.getName,
+ description = description,
+ submissionTime = submissionTime,
+ completionTime = completionTime,
+ stageIds = info.getStageIdsList.asScala.map(_.toInt).toSeq,
+ jobGroup = jobGroup,
+ status = status,
+ numTasks = info.getNumTasks,
+ numActiveTasks = info.getNumActiveTasks,
+ numCompletedTasks = info.getNumCompletedTasks,
+ numSkippedTasks = info.getNumSkippedTasks,
+ numFailedTasks = info.getNumFailedTasks,
+ numKilledTasks = info.getNumKilledTasks,
+ numCompletedIndices = info.getNumCompletedIndices,
+ numActiveStages = info.getNumActiveStages,
+ numCompletedStages = info.getNumCompletedStages,
+ numSkippedStages = info.getNumSkippedStages,
+ numFailedStages = info.getNumFailedStages,
+ killedTasksSummary =
info.getKillTasksSummaryMap.asScala.mapValues(_.toInt).toMap)
+ }
+
+ private def serializeJobExecutionStatus(j: JobExecutionStatus):
StoreTypes.JobExecutionStatus = {
+ StoreTypes.JobExecutionStatus.valueOf(j.toString)
+ }
+}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala
new file mode 100644
index 00000000000..2173821a219
--- /dev/null
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import org.apache.spark.status.JobDataWrapper
+import org.apache.spark.status.KVUtils.KVStoreScalaSerializer
+
+private[spark] class KVStoreProtobufSerializer extends KVStoreScalaSerializer {
+ override def serialize(o: Object): Array[Byte] = o match {
+ case j: JobDataWrapper => JobDataWrapperSerializer.serialize(j)
+ case other => super.serialize(other)
+ }
+
+ override def deserialize[T](data: Array[Byte], klass: Class[T]): T = klass
match {
+ case _ if classOf[JobDataWrapper].isAssignableFrom(klass) =>
+ JobDataWrapperSerializer.deserialize(data).asInstanceOf[T]
+ case other => super.deserialize(data, klass)
+ }
+}
diff --git
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index ec92877ce94..24a8a6844f1 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.status
import java.io.File
import java.util.{Date, Properties}
-import scala.collection.immutable.Map
import scala.reflect.{classTag, ClassTag}
import org.scalatest.BeforeAndAfter
@@ -35,6 +34,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster._
import org.apache.spark.status.ListenerEventsTestHelper._
import org.apache.spark.status.api.v1
+import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
import org.apache.spark.storage._
import org.apache.spark.tags.ExtendedLevelDBTest
import org.apache.spark.util.Utils
@@ -45,7 +45,7 @@ abstract class AppStatusListenerSuite extends SparkFunSuite
with BeforeAndAfter
private val twoReplicaMemAndDiskLevel = StorageLevel(true, true, false,
true, 2)
private var time: Long = _
- private var testDir: File = _
+ protected var testDir: File = _
private var store: ElementTrackingStore = _
private var taskIdTracker = -1L
@@ -1904,3 +1904,13 @@ class AppStatusListenerWithRocksDBSuite extends
AppStatusListenerSuite {
override def conf: SparkConf = super.conf
.set(HYBRID_STORE_DISK_BACKEND, HybridStoreDiskBackend.ROCKSDB.toString)
}
+
+class AppStatusListenerWithProtobufSerializerSuite extends
AppStatusListenerSuite {
+ override def createKVStore: KVStore =
+ KVUtils.open(
+ testDir,
+ getClass().getName(),
+ conf,
+ Some(HybridStoreDiskBackend.ROCKSDB),
+ Some(new KVStoreProtobufSerializer()))
+}
diff --git
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
new file mode 100644
index 00000000000..7ca446e9a94
--- /dev/null
+++
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import java.util.Date
+
+import org.apache.spark.{JobExecutionStatus, SparkFunSuite}
+import org.apache.spark.status.JobDataWrapper
+import org.apache.spark.status.api.v1.JobData
+
+class KVStoreProtobufSerializerSuite extends SparkFunSuite {
+ private val serializer = new KVStoreProtobufSerializer()
+
+ test("Job data") {
+ val input = new JobDataWrapper(
+ new JobData(
+ jobId = 1,
+ name = "test",
+ description = Some("test description"),
+ submissionTime = Some(new Date(123456L)),
+ completionTime = Some(new Date(654321L)),
+ stageIds = Seq(1, 2, 3, 4),
+ jobGroup = Some("group"),
+ status = JobExecutionStatus.UNKNOWN,
+ numTasks = 2,
+ numActiveTasks = 3,
+ numCompletedTasks = 4,
+ numSkippedTasks = 5,
+ numFailedTasks = 6,
+ numKilledTasks = 7,
+ numCompletedIndices = 8,
+ numActiveStages = 9,
+ numCompletedStages = 10,
+ numSkippedStages = 11,
+ numFailedStages = 12,
+ killedTasksSummary = Map("a" -> 1, "b" -> 2)),
+ Set(1, 2),
+ Some(999)
+ )
+
+ val bytes = serializer.serialize(input)
+ val result = serializer.deserialize(bytes, classOf[JobDataWrapper])
+ assert(result.info.jobId == input.info.jobId)
+ assert(result.info.description == input.info.description)
+ assert(result.info.submissionTime == input.info.submissionTime)
+ assert(result.info.completionTime == input.info.completionTime)
+ assert(result.info.stageIds == input.info.stageIds)
+ assert(result.info.jobGroup == input.info.jobGroup)
+ assert(result.info.status == input.info.status)
+ assert(result.info.numTasks == input.info.numTasks)
+ assert(result.info.numActiveTasks == input.info.numActiveTasks)
+ assert(result.info.numCompletedTasks == input.info.numCompletedTasks)
+ assert(result.info.numSkippedTasks == input.info.numSkippedTasks)
+ assert(result.info.numFailedTasks == input.info.numFailedTasks)
+ assert(result.info.numKilledTasks == input.info.numKilledTasks)
+ assert(result.info.numCompletedIndices == input.info.numCompletedIndices)
+ assert(result.info.numActiveStages == input.info.numActiveStages)
+ assert(result.info.numCompletedStages == input.info.numCompletedStages)
+ assert(result.info.numSkippedStages == input.info.numSkippedStages)
+ assert(result.info.numFailedStages == input.info.numFailedStages)
+ assert(result.info.killedTasksSummary == input.info.killedTasksSummary)
+ assert(result.skippedStages == input.skippedStages)
+ assert(result.sqlExecutionId == input.sqlExecutionId)
+ }
+}
+
diff --git a/pom.xml b/pom.xml
index 0639370981c..17b5c99204f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,6 +123,7 @@
<protobuf.hadoopDependency.version>2.5.0</protobuf.hadoopDependency.version>
<!-- Actual Protobuf version in Spark modules like Spark Connect, protobuf
connector, etc. -->
<protobuf.version>3.21.9</protobuf.version>
+ <protoc-jar-maven-plugin.version>3.11.4</protoc-jar-maven-plugin.version>
<yarn.version>${hadoop.version}</yarn.version>
<zookeeper.version>3.6.3</zookeeper.version>
<curator.version>2.13.0</curator.version>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 28aef586c50..a38ba7db519 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -613,9 +613,23 @@ object SparkParallelTestGrouping {
object Core {
import scala.sys.process.Process
+ import BuildCommons.protoVersion
def buildenv = Process(Seq("uname")).!!.trim.replaceFirst("[^A-Za-z0-9].*",
"").toLowerCase
def bashpath = Process(Seq("where",
"bash")).!!.split("[\r\n]+").head.replace('\\', '/')
lazy val settings = Seq(
+ // Setting version for the protobuf compiler. This has to be propagated to
every sub-project
+ // even if the project is not using it.
+ PB.protocVersion := BuildCommons.protoVersion,
+ // For some reason the resolution from the imported Maven build does not
work for some
+ // of these dependendencies that we need to shade later on.
+ libraryDependencies ++= {
+ Seq(
+ "com.google.protobuf" % "protobuf-java" % protoVersion % "protobuf"
+ )
+ },
+ (Compile / PB.targets) := Seq(
+ PB.gens.java -> (Compile / sourceManaged).value
+ ),
(Compile / resourceGenerators) += Def.task {
val buildScript = baseDirectory.value + "/../build/spark-build-info"
val targetDir = baseDirectory.value + "/target/extra-resources/"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]