This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a28f08d77f8 [SPARK-41432][UI][SQL] Protobuf serializer for 
SparkPlanGraphWrapper
a28f08d77f8 is described below

commit a28f08d77f873fb70eee5af553630c0eab536c55
Author: Sandeep Singh <sand...@techaddict.me>
AuthorDate: Wed Dec 28 15:27:36 2022 -0800

    [SPARK-41432][UI][SQL] Protobuf serializer for SparkPlanGraphWrapper
    
    ### What changes were proposed in this pull request?
    Add Protobuf serializer for SparkPlanGraphWrapper
    
    ### Why are the changes needed?
    Support fast and compact serialization/deserialization for 
SparkPlanGraphWrapper over RocksDB.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    New UT
    
    Closes #39164 from techaddict/SPARK-41432-SparkPlanGraphWrapper.
    
    Authored-by: Sandeep Singh <sand...@techaddict.me>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../apache/spark/status/protobuf/store_types.proto |  31 +++++
 .../org.apache.spark.status.protobuf.ProtobufSerDe |   1 +
 .../sql/SparkPlanGraphWrapperSerializer.scala      | 134 +++++++++++++++++++++
 .../sql/KVStoreProtobufSerializerSuite.scala       | 128 +++++++++++++++++++-
 4 files changed, 293 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 9f9d87e9aae..22e22eea1a2 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -390,3 +390,34 @@ message SQLExecutionUIData {
   repeated int64 stages = 11;
   map<int64, string> metric_values = 12;
 }
+
+message SparkPlanGraphNode {
+  int64 id = 1;
+  string name = 2;
+  string desc = 3;
+  repeated SQLPlanMetric metrics = 4;
+}
+
+message SparkPlanGraphClusterWrapper {
+  int64 id = 1;
+  string name = 2;
+  string desc = 3;
+  repeated SparkPlanGraphNodeWrapper nodes = 4;
+  repeated SQLPlanMetric metrics = 5;
+}
+
+message SparkPlanGraphNodeWrapper {
+  SparkPlanGraphNode node = 1;
+  SparkPlanGraphClusterWrapper cluster = 2;
+}
+
+message SparkPlanGraphEdge {
+  int64 from_id = 1;
+  int64 to_id = 2;
+}
+
+message SparkPlanGraphWrapper {
+  int64 execution_id = 1;
+  repeated SparkPlanGraphNodeWrapper nodes = 2;
+  repeated SparkPlanGraphEdge edges = 3;
+}
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
index de5f2c2d05c..3f0ae5470ce 100644
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
@@ -16,3 +16,4 @@
 #
 
 org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer
+org.apache.spark.status.protobuf.sql.SparkPlanGraphWrapperSerializer
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
new file mode 100644
index 00000000000..49debedbb68
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import collection.JavaConverters._
+
+import org.apache.spark.sql.execution.ui.{SparkPlanGraphClusterWrapper, 
SparkPlanGraphEdge, SparkPlanGraphNode, SparkPlanGraphNodeWrapper, 
SparkPlanGraphWrapper}
+import org.apache.spark.status.protobuf.ProtobufSerDe
+import org.apache.spark.status.protobuf.StoreTypes
+
+class SparkPlanGraphWrapperSerializer extends ProtobufSerDe {
+
+  override val supportClass: Class[_] = classOf[SparkPlanGraphWrapper]
+
+  override def serialize(input: Any): Array[Byte] = {
+    val plan = input.asInstanceOf[SparkPlanGraphWrapper]
+    val builder = StoreTypes.SparkPlanGraphWrapper.newBuilder()
+    builder.setExecutionId(plan.executionId)
+    plan.nodes.foreach { node =>
+      builder.addNodes(serializeSparkPlanGraphNodeWrapper(node))
+    }
+    plan.edges.foreach {edge =>
+      builder.addEdges(serializeSparkPlanGraphEdge(edge))
+    }
+    builder.build().toByteArray
+  }
+
+  def deserialize(bytes: Array[Byte]): SparkPlanGraphWrapper = {
+    val wrapper = StoreTypes.SparkPlanGraphWrapper.parseFrom(bytes)
+    new SparkPlanGraphWrapper(
+      executionId = wrapper.getExecutionId,
+      nodes = 
wrapper.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper).toSeq,
+      edges = 
wrapper.getEdgesList.asScala.map(deserializeSparkPlanGraphEdge).toSeq
+    )
+  }
+
+  private def serializeSparkPlanGraphNodeWrapper(input: 
SparkPlanGraphNodeWrapper):
+    StoreTypes.SparkPlanGraphNodeWrapper = {
+
+    val builder = StoreTypes.SparkPlanGraphNodeWrapper.newBuilder()
+    builder.setNode(serializeSparkPlanGraphNode(input.node))
+    builder.setCluster(serializeSparkPlanGraphClusterWrapper(input.cluster))
+    builder.build()
+  }
+
+  private def deserializeSparkPlanGraphNodeWrapper(input: 
StoreTypes.SparkPlanGraphNodeWrapper):
+    SparkPlanGraphNodeWrapper = {
+
+    new SparkPlanGraphNodeWrapper(
+      node = deserializeSparkPlanGraphNode(input.getNode),
+      cluster = deserializeSparkPlanGraphClusterWrapper(input.getCluster)
+    )
+  }
+
+  private def serializeSparkPlanGraphEdge(edge: SparkPlanGraphEdge):
+    StoreTypes.SparkPlanGraphEdge = {
+    val builder = StoreTypes.SparkPlanGraphEdge.newBuilder()
+    builder.setFromId(edge.fromId)
+    builder.setToId(edge.toId)
+    builder.build()
+  }
+
+  private def deserializeSparkPlanGraphEdge(edge: 
StoreTypes.SparkPlanGraphEdge):
+    SparkPlanGraphEdge = {
+    SparkPlanGraphEdge(
+      fromId = edge.getFromId,
+      toId = edge.getToId)
+  }
+
+  private def serializeSparkPlanGraphNode(node: SparkPlanGraphNode):
+    StoreTypes.SparkPlanGraphNode = {
+    val builder = StoreTypes.SparkPlanGraphNode.newBuilder()
+    builder.setId(node.id)
+    builder.setName(node.name)
+    builder.setDesc(node.desc)
+    node.metrics.foreach { metric =>
+      builder.addMetrics(SQLPlanMetricSerializer.serialize(metric))
+    }
+    builder.build()
+  }
+
+  private def deserializeSparkPlanGraphNode(node: 
StoreTypes.SparkPlanGraphNode):
+    SparkPlanGraphNode = {
+
+    new SparkPlanGraphNode(
+      id = node.getId,
+      name = node.getName,
+      desc = node.getDesc,
+      metrics = 
node.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize).toSeq
+    )
+  }
+
+  private def serializeSparkPlanGraphClusterWrapper(cluster: 
SparkPlanGraphClusterWrapper):
+    StoreTypes.SparkPlanGraphClusterWrapper = {
+    val builder = StoreTypes.SparkPlanGraphClusterWrapper.newBuilder()
+    builder.setId(cluster.id)
+    builder.setName(cluster.name)
+    builder.setDesc(cluster.desc)
+    cluster.nodes.foreach { node =>
+      builder.addNodes(serializeSparkPlanGraphNodeWrapper(node))
+    }
+    cluster.metrics.foreach { metric =>
+      builder.addMetrics(SQLPlanMetricSerializer.serialize(metric))
+    }
+    builder.build()
+  }
+
+  private def deserializeSparkPlanGraphClusterWrapper(
+    cluster: StoreTypes.SparkPlanGraphClusterWrapper): 
SparkPlanGraphClusterWrapper = {
+
+    new SparkPlanGraphClusterWrapper(
+      id = cluster.getId,
+      name = cluster.getName,
+      desc = cluster.getDesc,
+      nodes = 
cluster.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper).toSeq,
+      metrics = 
cluster.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize).toSeq
+    )
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
index 9d6a938c3fe..90d04c3f013 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.status.protobuf.sql
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.execution.ui.SQLExecutionUIData
+import org.apache.spark.sql.execution.ui._
 import org.apache.spark.status.api.v1.sql.SqlResourceSuite
 import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
 
@@ -85,4 +85,130 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
     // input.metricValues is null, result.metricValues is also empty map.
     assert(result2.metricValues.isEmpty)
   }
+
+  test("Spark Plan Graph") {
+    val cluster = new SparkPlanGraphClusterWrapper(
+      id = 5,
+      name = "name_5",
+      desc = "desc_5",
+      nodes = Seq(new SparkPlanGraphNodeWrapper(
+        node = new SparkPlanGraphNode(
+          id = 12,
+          name = "name_12",
+          desc = "desc_12",
+          metrics = Seq(
+            SQLPlanMetric(
+              name = "name_13",
+              accumulatorId = 13,
+              metricType = "metric_13"
+            ),
+            SQLPlanMetric(
+              name = "name_14",
+              accumulatorId = 14,
+              metricType = "metric_14"
+            )
+          )
+        ),
+        cluster = new SparkPlanGraphClusterWrapper(
+          id = 15,
+          name = "name_15",
+          desc = "desc_15",
+          nodes = Seq(),
+          metrics = Seq(
+            SQLPlanMetric(
+              name = "name_16",
+              accumulatorId = 16,
+              metricType = "metric_16"
+            ),
+            SQLPlanMetric(
+              name = "name_17",
+              accumulatorId = 17,
+              metricType = "metric_17"
+            )
+          )
+        )
+      )),
+      metrics = Seq(
+        SQLPlanMetric(
+          name = "name_6",
+          accumulatorId = 6,
+          metricType = "metric_6"
+        ),
+        SQLPlanMetric(
+          name = "name_7 d",
+          accumulatorId = 7,
+          metricType = "metric_7"
+        )
+      )
+    )
+    val node = new SparkPlanGraphNodeWrapper(
+      node = new SparkPlanGraphNode(
+        id = 2,
+        name = "name_1",
+        desc = "desc_1",
+        metrics = Seq(
+          SQLPlanMetric(
+            name = "name_2",
+            accumulatorId = 3,
+            metricType = "metric_1"
+          ),
+          SQLPlanMetric(
+            name = "name_3",
+            accumulatorId = 4,
+            metricType = "metric_2"
+          )
+        )
+      ),
+      cluster = cluster
+    )
+    val input = new SparkPlanGraphWrapper(
+      executionId = 1,
+      nodes = Seq(node),
+      edges = Seq(
+        SparkPlanGraphEdge(8, 9),
+        SparkPlanGraphEdge(10, 11)
+      )
+    )
+
+    val bytes = serializer.serialize(input)
+    val result = serializer.deserialize(bytes, classOf[SparkPlanGraphWrapper])
+    assert(result.executionId == input.executionId)
+    assert(result.nodes.size == input.nodes.size)
+
+    def compareNodes(n1: SparkPlanGraphNodeWrapper, n2: 
SparkPlanGraphNodeWrapper): Unit = {
+      assert(n1.node.id == n2.node.id)
+      assert(n1.node.name == n2.node.name)
+      assert(n1.node.desc == n2.node.desc)
+
+      assert(n1.node.metrics.size == n2.node.metrics.size)
+      n1.node.metrics.zip(n2.node.metrics).foreach { case (m1, m2) =>
+        assert(m1.name == m2.name)
+        assert(m1.accumulatorId == m2.accumulatorId)
+        assert(m1.metricType == m2.metricType)
+      }
+
+      assert(n1.cluster.id == n2.cluster.id)
+      assert(n1.cluster.name == n2.cluster.name)
+      assert(n1.cluster.desc == n2.cluster.desc)
+      assert(n1.cluster.nodes.size == n2.cluster.nodes.size)
+      n1.cluster.nodes.zip(n2.cluster.nodes).foreach { case (n3, n4) =>
+        compareNodes(n3, n4)
+      }
+      n1.cluster.metrics.zip(n2.cluster.metrics).foreach { case (m1, m2) =>
+        assert(m1.name == m2.name)
+        assert(m1.accumulatorId == m2.accumulatorId)
+        assert(m1.metricType == m2.metricType)
+      }
+    }
+
+    result.nodes.zip(input.nodes).foreach { case (n1, n2) =>
+      compareNodes(n1, n2)
+    }
+
+    assert(result.edges.size == input.edges.size)
+    result.edges.zip(input.edges).foreach { case (e1, e2) =>
+      assert(e1.fromId == e2.fromId)
+      assert(e1.toId == e2.toId)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to