This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bb18703fdbf [SPARK-41429][UI] Protobuf serializer for
RDDOperationGraphWrapper
bb18703fdbf is described below
commit bb18703fdbfbe4f7887abebd75beb37af662d0f3
Author: Sandeep Singh <[email protected]>
AuthorDate: Thu Dec 29 16:14:07 2022 -0800
[SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper
### What changes were proposed in this pull request?
Add Protobuf serializer for RDDOperationGraphWrapper
### Why are the changes needed?
Support fast and compact serialization/deserialization for
RDDOperationGraphWrapper over RocksDB.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
New UT
Closes #39110 from techaddict/SPARK-41429-RDDOperationGraphWrapper.
Authored-by: Sandeep Singh <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 35 ++++++
.../org.apache.spark.status.protobuf.ProtobufSerDe | 1 +
.../RDDOperationGraphWrapperSerializer.scala | 120 +++++++++++++++++++++
.../protobuf/KVStoreProtobufSerializerSuite.scala | 84 +++++++++++++++
4 files changed, 240 insertions(+)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 22e22eea1a2..e9150490746 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -421,3 +421,38 @@ message SparkPlanGraphWrapper {
repeated SparkPlanGraphNodeWrapper nodes = 2;
repeated SparkPlanGraphEdge edges = 3;
}
+
+message RDDOperationEdge {
+ int32 from_id = 1;
+ int32 to_id = 2;
+}
+
+message RDDOperationNode {
+ enum DeterministicLevel {
+ UNSPECIFIED = 0;
+ DETERMINATE = 1;
+ UNORDERED = 2;
+ INDETERMINATE = 3;
+ }
+ int32 id = 1;
+ string name = 2;
+ bool cached = 3;
+ bool barrier = 4;
+ string callsite = 5;
+ DeterministicLevel output_deterministic_level = 6;
+}
+
+message RDDOperationClusterWrapper {
+ string id = 1;
+ string name = 2;
+ repeated RDDOperationNode child_nodes = 3;
+ repeated RDDOperationClusterWrapper child_clusters = 4;
+}
+
+message RDDOperationGraphWrapper {
+ int64 stage_id = 1;
+ repeated RDDOperationEdge edges = 2;
+ repeated RDDOperationEdge outgoing_edges = 3;
+ repeated RDDOperationEdge incoming_edges = 4;
+ RDDOperationClusterWrapper root_cluster = 5;
+}
diff --git
a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
index 39127e6a28c..4e39d9ecdc0 100644
---
a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
+++
b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
@@ -27,3 +27,4 @@
org.apache.spark.status.protobuf.ResourceProfileWrapperSerializer
org.apache.spark.status.protobuf.SpeculationStageSummaryWrapperSerializer
org.apache.spark.status.protobuf.ExecutorSummaryWrapperSerializer
org.apache.spark.status.protobuf.ProcessSummaryWrapperSerializer
+org.apache.spark.status.protobuf.RDDOperationGraphWrapperSerializer
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
new file mode 100644
index 00000000000..8975062082c
--- /dev/null
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.DeterministicLevel
+import org.apache.spark.status.{RDDOperationClusterWrapper,
RDDOperationGraphWrapper}
+import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
+
+class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
+
+ override val supportClass: Class[_] = classOf[RDDOperationGraphWrapper]
+
+ override def serialize(input: Any): Array[Byte] = {
+ val op = input.asInstanceOf[RDDOperationGraphWrapper]
+ val builder = StoreTypes.RDDOperationGraphWrapper.newBuilder()
+ builder.setStageId(op.stageId.toLong)
+ op.edges.foreach { e =>
+ builder.addEdges(serializeRDDOperationEdge(e))
+ }
+ op.outgoingEdges.foreach { e =>
+ builder.addOutgoingEdges(serializeRDDOperationEdge(e))
+ }
+ op.incomingEdges.foreach { e =>
+ builder.addIncomingEdges(serializeRDDOperationEdge(e))
+ }
+ builder.setRootCluster(serializeRDDOperationClusterWrapper(op.rootCluster))
+ builder.build().toByteArray
+ }
+
+ def deserialize(bytes: Array[Byte]): RDDOperationGraphWrapper = {
+ val wrapper = StoreTypes.RDDOperationGraphWrapper.parseFrom(bytes)
+ new RDDOperationGraphWrapper(
+ stageId = wrapper.getStageId.toInt,
+ edges =
wrapper.getEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
+ outgoingEdges =
wrapper.getOutgoingEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
+ incomingEdges =
wrapper.getIncomingEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
+ rootCluster =
deserializeRDDOperationClusterWrapper(wrapper.getRootCluster)
+ )
+ }
+
+ private def serializeRDDOperationClusterWrapper(op:
RDDOperationClusterWrapper):
+ StoreTypes.RDDOperationClusterWrapper = {
+ val builder = StoreTypes.RDDOperationClusterWrapper.newBuilder()
+ builder.setId(op.id)
+ builder.setName(op.name)
+ op.childNodes.foreach { node =>
+ builder.addChildNodes(serializeRDDOperationNode(node))
+ }
+ op.childClusters.foreach { cluster =>
+ builder.addChildClusters(serializeRDDOperationClusterWrapper(cluster))
+ }
+ builder.build()
+ }
+
+ private def deserializeRDDOperationClusterWrapper(op:
StoreTypes.RDDOperationClusterWrapper):
+ RDDOperationClusterWrapper = {
+ new RDDOperationClusterWrapper(
+ id = op.getId,
+ name = op.getName,
+ childNodes =
op.getChildNodesList.asScala.map(deserializeRDDOperationNode).toSeq,
+ childClusters =
+
op.getChildClustersList.asScala.map(deserializeRDDOperationClusterWrapper).toSeq
+ )
+ }
+
+ private def serializeRDDOperationNode(node: RDDOperationNode):
StoreTypes.RDDOperationNode = {
+ val outputDeterministicLevel =
StoreTypes.RDDOperationNode.DeterministicLevel
+ .valueOf(node.outputDeterministicLevel.toString)
+ val builder = StoreTypes.RDDOperationNode.newBuilder()
+ builder.setId(node.id)
+ builder.setName(node.name)
+ builder.setCached(node.cached)
+ builder.setBarrier(node.barrier)
+ builder.setCallsite(node.callsite)
+ builder.setOutputDeterministicLevel(outputDeterministicLevel)
+ builder.build()
+ }
+
+ private def deserializeRDDOperationNode(node: StoreTypes.RDDOperationNode):
RDDOperationNode = {
+ RDDOperationNode(
+ id = node.getId,
+ name = node.getName,
+ cached = node.getCached,
+ barrier = node.getBarrier,
+ callsite = node.getCallsite,
+ outputDeterministicLevel =
+ DeterministicLevel.withName(node.getOutputDeterministicLevel.toString)
+ )
+ }
+
+ private def serializeRDDOperationEdge(edge: RDDOperationEdge):
StoreTypes.RDDOperationEdge = {
+ val builder = StoreTypes.RDDOperationEdge.newBuilder()
+ builder.setFromId(edge.fromId)
+ builder.setToId(edge.toId)
+ builder.build()
+ }
+
+ private def deserializeRDDOperationEdge(edge: StoreTypes.RDDOperationEdge):
RDDOperationEdge = {
+ RDDOperationEdge(
+ fromId = edge.getFromId,
+ toId = edge.getToId)
+ }
+}
diff --git
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index 5efe56b4449..dab9d9c071f 100644
---
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -22,9 +22,11 @@ import java.util.Date
import org.apache.spark.{JobExecutionStatus, SparkFunSuite}
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.metrics.ExecutorMetricType
+import org.apache.spark.rdd.DeterministicLevel
import org.apache.spark.resource.{ExecutorResourceRequest,
ResourceInformation, TaskResourceRequest}
import org.apache.spark.status._
import org.apache.spark.status.api.v1._
+import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
class KVStoreProtobufSerializerSuite extends SparkFunSuite {
private val serializer = new KVStoreProtobufSerializer()
@@ -773,4 +775,86 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite
{
assert(result.info.processLogs(k) == input.info.processLogs(k))
}
}
+
+ test("RDD Operation Graph") {
+ val input = new RDDOperationGraphWrapper(
+ stageId = 1,
+ edges = Seq(
+ RDDOperationEdge(fromId = 2, toId = 3)
+ ),
+ outgoingEdges = Seq(
+ RDDOperationEdge(fromId = 4, toId = 5),
+ RDDOperationEdge(fromId = 6, toId = 7)
+ ),
+ incomingEdges = Seq(
+ RDDOperationEdge(fromId = 8, toId = 9),
+ RDDOperationEdge(fromId = 10, toId = 11),
+ RDDOperationEdge(fromId = 12, toId = 13)
+ ),
+ rootCluster = new RDDOperationClusterWrapper(
+ id = "id_1",
+ name = "name1",
+ childNodes = Seq(
+ RDDOperationNode(
+ id = 14,
+ name = "name2",
+ cached = true,
+ barrier = false,
+ callsite = "callsite_1",
+ outputDeterministicLevel = DeterministicLevel.INDETERMINATE)),
+ childClusters = Seq(new RDDOperationClusterWrapper(
+ id = "id_1",
+ name = "name1",
+ childNodes = Seq(
+ RDDOperationNode(
+ id = 15,
+ name = "name3",
+ cached = false,
+ barrier = true,
+ callsite = "callsite_2",
+ outputDeterministicLevel = DeterministicLevel.UNORDERED)),
+ childClusters = Seq.empty
+ ))
+ )
+ )
+ val bytes = serializer.serialize(input)
+ val result = serializer.deserialize(bytes,
classOf[RDDOperationGraphWrapper])
+
+ assert(result.stageId == input.stageId)
+ assert(result.edges.size == input.edges.size)
+ result.edges.zip(input.edges).foreach { case (e1, e2) =>
+ assert(e1.fromId == e2.fromId)
+ assert(e1.toId == e2.toId)
+ }
+ assert(result.outgoingEdges.size == input.outgoingEdges.size)
+ result.outgoingEdges.zip(input.outgoingEdges).foreach { case (e1, e2) =>
+ assert(e1.fromId == e2.fromId)
+ assert(e1.toId == e2.toId)
+ }
+ assert(result.incomingEdges.size == input.incomingEdges.size)
+ result.incomingEdges.zip(input.incomingEdges).foreach { case (e1, e2) =>
+ assert(e1.fromId == e2.fromId)
+ assert(e1.toId == e2.toId)
+ }
+
+ def compareClusters(c1: RDDOperationClusterWrapper, c2:
RDDOperationClusterWrapper): Unit = {
+ assert(c1.id == c2.id)
+ assert(c1.name == c2.name)
+ assert(c1.childNodes.size == c2.childNodes.size)
+ c1.childNodes.zip(c2.childNodes).foreach { case (n1, n2) =>
+ assert(n1.id == n2.id)
+ assert(n1.name == n2.name)
+ assert(n1.cached == n2.cached)
+ assert(n1.barrier == n2.barrier)
+ assert(n1.callsite == n2.callsite)
+ assert(n1.outputDeterministicLevel == n2.outputDeterministicLevel)
+ }
+ assert(c1.childClusters.size == c2.childClusters.size)
+ c1.childClusters.zip(c2.childClusters).foreach {
+ case (_c1, _c2) => compareClusters(_c1, _c2)
+ }
+ }
+
+ compareClusters(result.rootCluster, input.rootCluster)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]