This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bb18703fdbf [SPARK-41429][UI] Protobuf serializer for 
RDDOperationGraphWrapper
bb18703fdbf is described below

commit bb18703fdbfbe4f7887abebd75beb37af662d0f3
Author: Sandeep Singh <sand...@techaddict.me>
AuthorDate: Thu Dec 29 16:14:07 2022 -0800

    [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper
    
    ### What changes were proposed in this pull request?
    Add Protobuf serializer for RDDOperationGraphWrapper
    
    ### Why are the changes needed?
    Support fast and compact serialization/deserialization for 
RDDOperationGraphWrapper over RocksDB.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    New UT
    
    Closes #39110 from techaddict/SPARK-41429-RDDOperationGraphWrapper.
    
    Authored-by: Sandeep Singh <sand...@techaddict.me>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../apache/spark/status/protobuf/store_types.proto |  35 ++++++
 .../org.apache.spark.status.protobuf.ProtobufSerDe |   1 +
 .../RDDOperationGraphWrapperSerializer.scala       | 120 +++++++++++++++++++++
 .../protobuf/KVStoreProtobufSerializerSuite.scala  |  84 +++++++++++++++
 4 files changed, 240 insertions(+)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 22e22eea1a2..e9150490746 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -421,3 +421,38 @@ message SparkPlanGraphWrapper {
   repeated SparkPlanGraphNodeWrapper nodes = 2;
   repeated SparkPlanGraphEdge edges = 3;
 }
+
+message RDDOperationEdge {
+  int32 from_id = 1;
+  int32 to_id = 2;
+}
+
+message RDDOperationNode {
+  enum DeterministicLevel {
+    UNSPECIFIED = 0;
+    DETERMINATE = 1;
+    UNORDERED = 2;
+    INDETERMINATE = 3;
+  }
+  int32 id = 1;
+  string name = 2;
+  bool cached = 3;
+  bool barrier = 4;
+  string callsite = 5;
+  DeterministicLevel output_deterministic_level = 6;
+}
+
+message RDDOperationClusterWrapper {
+  string id = 1;
+  string name = 2;
+  repeated RDDOperationNode child_nodes = 3;
+  repeated RDDOperationClusterWrapper child_clusters = 4;
+}
+
+message RDDOperationGraphWrapper {
+  int64 stage_id = 1;
+  repeated RDDOperationEdge edges = 2;
+  repeated RDDOperationEdge outgoing_edges = 3;
+  repeated RDDOperationEdge incoming_edges = 4;
+  RDDOperationClusterWrapper root_cluster = 5;
+}
diff --git 
a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
 
b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
index 39127e6a28c..4e39d9ecdc0 100644
--- 
a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
+++ 
b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
@@ -27,3 +27,4 @@ 
org.apache.spark.status.protobuf.ResourceProfileWrapperSerializer
 org.apache.spark.status.protobuf.SpeculationStageSummaryWrapperSerializer
 org.apache.spark.status.protobuf.ExecutorSummaryWrapperSerializer
 org.apache.spark.status.protobuf.ProcessSummaryWrapperSerializer
+org.apache.spark.status.protobuf.RDDOperationGraphWrapperSerializer
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
new file mode 100644
index 00000000000..8975062082c
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.DeterministicLevel
+import org.apache.spark.status.{RDDOperationClusterWrapper, 
RDDOperationGraphWrapper}
+import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
+
+class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
+
+  override val supportClass: Class[_] = classOf[RDDOperationGraphWrapper]
+
+  override def serialize(input: Any): Array[Byte] = {
+    val op = input.asInstanceOf[RDDOperationGraphWrapper]
+    val builder = StoreTypes.RDDOperationGraphWrapper.newBuilder()
+    builder.setStageId(op.stageId.toLong)
+    op.edges.foreach { e =>
+      builder.addEdges(serializeRDDOperationEdge(e))
+    }
+    op.outgoingEdges.foreach { e =>
+      builder.addOutgoingEdges(serializeRDDOperationEdge(e))
+    }
+    op.incomingEdges.foreach { e =>
+      builder.addIncomingEdges(serializeRDDOperationEdge(e))
+    }
+    builder.setRootCluster(serializeRDDOperationClusterWrapper(op.rootCluster))
+    builder.build().toByteArray
+  }
+
+  def deserialize(bytes: Array[Byte]): RDDOperationGraphWrapper = {
+    val wrapper = StoreTypes.RDDOperationGraphWrapper.parseFrom(bytes)
+    new RDDOperationGraphWrapper(
+      stageId = wrapper.getStageId.toInt,
+      edges = 
wrapper.getEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
+      outgoingEdges = 
wrapper.getOutgoingEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
+      incomingEdges = 
wrapper.getIncomingEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
+      rootCluster = 
deserializeRDDOperationClusterWrapper(wrapper.getRootCluster)
+    )
+  }
+
+  private def serializeRDDOperationClusterWrapper(op: 
RDDOperationClusterWrapper):
+    StoreTypes.RDDOperationClusterWrapper = {
+    val builder = StoreTypes.RDDOperationClusterWrapper.newBuilder()
+    builder.setId(op.id)
+    builder.setName(op.name)
+    op.childNodes.foreach { node =>
+      builder.addChildNodes(serializeRDDOperationNode(node))
+    }
+    op.childClusters.foreach { cluster =>
+      builder.addChildClusters(serializeRDDOperationClusterWrapper(cluster))
+    }
+    builder.build()
+  }
+
+  private def deserializeRDDOperationClusterWrapper(op: 
StoreTypes.RDDOperationClusterWrapper):
+    RDDOperationClusterWrapper = {
+    new RDDOperationClusterWrapper(
+      id = op.getId,
+      name = op.getName,
+      childNodes = 
op.getChildNodesList.asScala.map(deserializeRDDOperationNode).toSeq,
+      childClusters =
+        
op.getChildClustersList.asScala.map(deserializeRDDOperationClusterWrapper).toSeq
+    )
+  }
+
+  private def serializeRDDOperationNode(node: RDDOperationNode): 
StoreTypes.RDDOperationNode = {
+    val outputDeterministicLevel = 
StoreTypes.RDDOperationNode.DeterministicLevel
+      .valueOf(node.outputDeterministicLevel.toString)
+    val builder = StoreTypes.RDDOperationNode.newBuilder()
+    builder.setId(node.id)
+    builder.setName(node.name)
+    builder.setCached(node.cached)
+    builder.setBarrier(node.barrier)
+    builder.setCallsite(node.callsite)
+    builder.setOutputDeterministicLevel(outputDeterministicLevel)
+    builder.build()
+  }
+
+  private def deserializeRDDOperationNode(node: StoreTypes.RDDOperationNode): 
RDDOperationNode = {
+    RDDOperationNode(
+      id = node.getId,
+      name = node.getName,
+      cached = node.getCached,
+      barrier = node.getBarrier,
+      callsite = node.getCallsite,
+      outputDeterministicLevel =
+        DeterministicLevel.withName(node.getOutputDeterministicLevel.toString)
+    )
+  }
+
+  private def serializeRDDOperationEdge(edge: RDDOperationEdge): 
StoreTypes.RDDOperationEdge = {
+    val builder = StoreTypes.RDDOperationEdge.newBuilder()
+    builder.setFromId(edge.fromId)
+    builder.setToId(edge.toId)
+    builder.build()
+  }
+
+  private def deserializeRDDOperationEdge(edge: StoreTypes.RDDOperationEdge): 
RDDOperationEdge = {
+    RDDOperationEdge(
+      fromId = edge.getFromId,
+      toId = edge.getToId)
+  }
+}
diff --git 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index 5efe56b4449..dab9d9c071f 100644
--- 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -22,9 +22,11 @@ import java.util.Date
 import org.apache.spark.{JobExecutionStatus, SparkFunSuite}
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.metrics.ExecutorMetricType
+import org.apache.spark.rdd.DeterministicLevel
 import org.apache.spark.resource.{ExecutorResourceRequest, 
ResourceInformation, TaskResourceRequest}
 import org.apache.spark.status._
 import org.apache.spark.status.api.v1._
+import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
 
 class KVStoreProtobufSerializerSuite extends SparkFunSuite {
   private val serializer = new KVStoreProtobufSerializer()
@@ -773,4 +775,86 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite 
{
       assert(result.info.processLogs(k) == input.info.processLogs(k))
     }
   }
+
+  test("RDD Operation Graph") {
+    val input = new RDDOperationGraphWrapper(
+      stageId = 1,
+      edges = Seq(
+        RDDOperationEdge(fromId = 2, toId = 3)
+      ),
+      outgoingEdges = Seq(
+        RDDOperationEdge(fromId = 4, toId = 5),
+        RDDOperationEdge(fromId = 6, toId = 7)
+      ),
+      incomingEdges = Seq(
+        RDDOperationEdge(fromId = 8, toId = 9),
+        RDDOperationEdge(fromId = 10, toId = 11),
+        RDDOperationEdge(fromId = 12, toId = 13)
+      ),
+      rootCluster = new RDDOperationClusterWrapper(
+        id = "id_1",
+        name = "name1",
+        childNodes = Seq(
+          RDDOperationNode(
+            id = 14,
+            name = "name2",
+            cached = true,
+            barrier = false,
+            callsite = "callsite_1",
+            outputDeterministicLevel = DeterministicLevel.INDETERMINATE)),
+        childClusters = Seq(new RDDOperationClusterWrapper(
+          id = "id_1",
+          name = "name1",
+          childNodes = Seq(
+            RDDOperationNode(
+              id = 15,
+              name = "name3",
+              cached = false,
+              barrier = true,
+              callsite = "callsite_2",
+              outputDeterministicLevel = DeterministicLevel.UNORDERED)),
+          childClusters = Seq.empty
+        ))
+      )
+    )
+    val bytes = serializer.serialize(input)
+    val result = serializer.deserialize(bytes, 
classOf[RDDOperationGraphWrapper])
+
+    assert(result.stageId == input.stageId)
+    assert(result.edges.size == input.edges.size)
+    result.edges.zip(input.edges).foreach { case (e1, e2) =>
+      assert(e1.fromId == e2.fromId)
+      assert(e1.toId == e2.toId)
+    }
+    assert(result.outgoingEdges.size == input.outgoingEdges.size)
+    result.outgoingEdges.zip(input.outgoingEdges).foreach { case (e1, e2) =>
+      assert(e1.fromId == e2.fromId)
+      assert(e1.toId == e2.toId)
+    }
+    assert(result.incomingEdges.size == input.incomingEdges.size)
+    result.incomingEdges.zip(input.incomingEdges).foreach { case (e1, e2) =>
+      assert(e1.fromId == e2.fromId)
+      assert(e1.toId == e2.toId)
+    }
+
+    def compareClusters(c1: RDDOperationClusterWrapper, c2: 
RDDOperationClusterWrapper): Unit = {
+      assert(c1.id == c2.id)
+      assert(c1.name == c2.name)
+      assert(c1.childNodes.size == c2.childNodes.size)
+      c1.childNodes.zip(c2.childNodes).foreach { case (n1, n2) =>
+        assert(n1.id == n2.id)
+        assert(n1.name == n2.name)
+        assert(n1.cached == n2.cached)
+        assert(n1.barrier == n2.barrier)
+        assert(n1.callsite == n2.callsite)
+        assert(n1.outputDeterministicLevel == n2.outputDeterministicLevel)
+      }
+      assert(c1.childClusters.size == c2.childClusters.size)
+      c1.childClusters.zip(c2.childClusters).foreach {
+        case (_c1, _c2) => compareClusters(_c1, _c2)
+      }
+    }
+
+    compareClusters(result.rootCluster, input.rootCluster)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to