This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a28f08d77f8 [SPARK-41432][UI][SQL] Protobuf serializer for
SparkPlanGraphWrapper
a28f08d77f8 is described below
commit a28f08d77f873fb70eee5af553630c0eab536c55
Author: Sandeep Singh <[email protected]>
AuthorDate: Wed Dec 28 15:27:36 2022 -0800
[SPARK-41432][UI][SQL] Protobuf serializer for SparkPlanGraphWrapper
### What changes were proposed in this pull request?
Add Protobuf serializer for SparkPlanGraphWrapper
### Why are the changes needed?
Support fast and compact serialization/deserialization for
SparkPlanGraphWrapper over RocksDB.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
New UT
Closes #39164 from techaddict/SPARK-41432-SparkPlanGraphWrapper.
Authored-by: Sandeep Singh <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 31 +++++
.../org.apache.spark.status.protobuf.ProtobufSerDe | 1 +
.../sql/SparkPlanGraphWrapperSerializer.scala | 134 +++++++++++++++++++++
.../sql/KVStoreProtobufSerializerSuite.scala | 128 +++++++++++++++++++-
4 files changed, 293 insertions(+), 1 deletion(-)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 9f9d87e9aae..22e22eea1a2 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -390,3 +390,34 @@ message SQLExecutionUIData {
repeated int64 stages = 11;
map<int64, string> metric_values = 12;
}
+
+message SparkPlanGraphNode {
+ int64 id = 1;
+ string name = 2;
+ string desc = 3;
+ repeated SQLPlanMetric metrics = 4;
+}
+
+message SparkPlanGraphClusterWrapper {
+ int64 id = 1;
+ string name = 2;
+ string desc = 3;
+ repeated SparkPlanGraphNodeWrapper nodes = 4;
+ repeated SQLPlanMetric metrics = 5;
+}
+
+message SparkPlanGraphNodeWrapper {
+ SparkPlanGraphNode node = 1;
+ SparkPlanGraphClusterWrapper cluster = 2;
+}
+
+message SparkPlanGraphEdge {
+ int64 from_id = 1;
+ int64 to_id = 2;
+}
+
+message SparkPlanGraphWrapper {
+ int64 execution_id = 1;
+ repeated SparkPlanGraphNodeWrapper nodes = 2;
+ repeated SparkPlanGraphEdge edges = 3;
+}
diff --git
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
index de5f2c2d05c..3f0ae5470ce 100644
---
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
+++
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
@@ -16,3 +16,4 @@
#
org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer
+org.apache.spark.status.protobuf.sql.SparkPlanGraphWrapperSerializer
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
new file mode 100644
index 00000000000..49debedbb68
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import collection.JavaConverters._
+
+import org.apache.spark.sql.execution.ui.{SparkPlanGraphClusterWrapper,
SparkPlanGraphEdge, SparkPlanGraphNode, SparkPlanGraphNodeWrapper,
SparkPlanGraphWrapper}
+import org.apache.spark.status.protobuf.ProtobufSerDe
+import org.apache.spark.status.protobuf.StoreTypes
+
+class SparkPlanGraphWrapperSerializer extends ProtobufSerDe {
+
+ override val supportClass: Class[_] = classOf[SparkPlanGraphWrapper]
+
+ override def serialize(input: Any): Array[Byte] = {
+ val plan = input.asInstanceOf[SparkPlanGraphWrapper]
+ val builder = StoreTypes.SparkPlanGraphWrapper.newBuilder()
+ builder.setExecutionId(plan.executionId)
+ plan.nodes.foreach { node =>
+ builder.addNodes(serializeSparkPlanGraphNodeWrapper(node))
+ }
+ plan.edges.foreach {edge =>
+ builder.addEdges(serializeSparkPlanGraphEdge(edge))
+ }
+ builder.build().toByteArray
+ }
+
+ def deserialize(bytes: Array[Byte]): SparkPlanGraphWrapper = {
+ val wrapper = StoreTypes.SparkPlanGraphWrapper.parseFrom(bytes)
+ new SparkPlanGraphWrapper(
+ executionId = wrapper.getExecutionId,
+ nodes =
wrapper.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper).toSeq,
+ edges =
wrapper.getEdgesList.asScala.map(deserializeSparkPlanGraphEdge).toSeq
+ )
+ }
+
+ private def serializeSparkPlanGraphNodeWrapper(input:
SparkPlanGraphNodeWrapper):
+ StoreTypes.SparkPlanGraphNodeWrapper = {
+
+ val builder = StoreTypes.SparkPlanGraphNodeWrapper.newBuilder()
+ builder.setNode(serializeSparkPlanGraphNode(input.node))
+ builder.setCluster(serializeSparkPlanGraphClusterWrapper(input.cluster))
+ builder.build()
+ }
+
+ private def deserializeSparkPlanGraphNodeWrapper(input:
StoreTypes.SparkPlanGraphNodeWrapper):
+ SparkPlanGraphNodeWrapper = {
+
+ new SparkPlanGraphNodeWrapper(
+ node = deserializeSparkPlanGraphNode(input.getNode),
+ cluster = deserializeSparkPlanGraphClusterWrapper(input.getCluster)
+ )
+ }
+
+ private def serializeSparkPlanGraphEdge(edge: SparkPlanGraphEdge):
+ StoreTypes.SparkPlanGraphEdge = {
+ val builder = StoreTypes.SparkPlanGraphEdge.newBuilder()
+ builder.setFromId(edge.fromId)
+ builder.setToId(edge.toId)
+ builder.build()
+ }
+
+ private def deserializeSparkPlanGraphEdge(edge:
StoreTypes.SparkPlanGraphEdge):
+ SparkPlanGraphEdge = {
+ SparkPlanGraphEdge(
+ fromId = edge.getFromId,
+ toId = edge.getToId)
+ }
+
+ private def serializeSparkPlanGraphNode(node: SparkPlanGraphNode):
+ StoreTypes.SparkPlanGraphNode = {
+ val builder = StoreTypes.SparkPlanGraphNode.newBuilder()
+ builder.setId(node.id)
+ builder.setName(node.name)
+ builder.setDesc(node.desc)
+ node.metrics.foreach { metric =>
+ builder.addMetrics(SQLPlanMetricSerializer.serialize(metric))
+ }
+ builder.build()
+ }
+
+ private def deserializeSparkPlanGraphNode(node:
StoreTypes.SparkPlanGraphNode):
+ SparkPlanGraphNode = {
+
+ new SparkPlanGraphNode(
+ id = node.getId,
+ name = node.getName,
+ desc = node.getDesc,
+ metrics =
node.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize).toSeq
+ )
+ }
+
+ private def serializeSparkPlanGraphClusterWrapper(cluster:
SparkPlanGraphClusterWrapper):
+ StoreTypes.SparkPlanGraphClusterWrapper = {
+ val builder = StoreTypes.SparkPlanGraphClusterWrapper.newBuilder()
+ builder.setId(cluster.id)
+ builder.setName(cluster.name)
+ builder.setDesc(cluster.desc)
+ cluster.nodes.foreach { node =>
+ builder.addNodes(serializeSparkPlanGraphNodeWrapper(node))
+ }
+ cluster.metrics.foreach { metric =>
+ builder.addMetrics(SQLPlanMetricSerializer.serialize(metric))
+ }
+ builder.build()
+ }
+
+ private def deserializeSparkPlanGraphClusterWrapper(
+ cluster: StoreTypes.SparkPlanGraphClusterWrapper):
SparkPlanGraphClusterWrapper = {
+
+ new SparkPlanGraphClusterWrapper(
+ id = cluster.getId,
+ name = cluster.getName,
+ desc = cluster.getDesc,
+ nodes =
cluster.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper).toSeq,
+ metrics =
cluster.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize).toSeq
+ )
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
index 9d6a938c3fe..90d04c3f013 100644
---
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
@@ -18,7 +18,7 @@
package org.apache.spark.status.protobuf.sql
import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.execution.ui.SQLExecutionUIData
+import org.apache.spark.sql.execution.ui._
import org.apache.spark.status.api.v1.sql.SqlResourceSuite
import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
@@ -85,4 +85,130 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
// input.metricValues is null, result.metricValues is also empty map.
assert(result2.metricValues.isEmpty)
}
+
+ test("Spark Plan Graph") {
+ val cluster = new SparkPlanGraphClusterWrapper(
+ id = 5,
+ name = "name_5",
+ desc = "desc_5",
+ nodes = Seq(new SparkPlanGraphNodeWrapper(
+ node = new SparkPlanGraphNode(
+ id = 12,
+ name = "name_12",
+ desc = "desc_12",
+ metrics = Seq(
+ SQLPlanMetric(
+ name = "name_13",
+ accumulatorId = 13,
+ metricType = "metric_13"
+ ),
+ SQLPlanMetric(
+ name = "name_14",
+ accumulatorId = 14,
+ metricType = "metric_14"
+ )
+ )
+ ),
+ cluster = new SparkPlanGraphClusterWrapper(
+ id = 15,
+ name = "name_15",
+ desc = "desc_15",
+ nodes = Seq(),
+ metrics = Seq(
+ SQLPlanMetric(
+ name = "name_16",
+ accumulatorId = 16,
+ metricType = "metric_16"
+ ),
+ SQLPlanMetric(
+ name = "name_17",
+ accumulatorId = 17,
+ metricType = "metric_17"
+ )
+ )
+ )
+ )),
+ metrics = Seq(
+ SQLPlanMetric(
+ name = "name_6",
+ accumulatorId = 6,
+ metricType = "metric_6"
+ ),
+ SQLPlanMetric(
+ name = "name_7 d",
+ accumulatorId = 7,
+ metricType = "metric_7"
+ )
+ )
+ )
+ val node = new SparkPlanGraphNodeWrapper(
+ node = new SparkPlanGraphNode(
+ id = 2,
+ name = "name_1",
+ desc = "desc_1",
+ metrics = Seq(
+ SQLPlanMetric(
+ name = "name_2",
+ accumulatorId = 3,
+ metricType = "metric_1"
+ ),
+ SQLPlanMetric(
+ name = "name_3",
+ accumulatorId = 4,
+ metricType = "metric_2"
+ )
+ )
+ ),
+ cluster = cluster
+ )
+ val input = new SparkPlanGraphWrapper(
+ executionId = 1,
+ nodes = Seq(node),
+ edges = Seq(
+ SparkPlanGraphEdge(8, 9),
+ SparkPlanGraphEdge(10, 11)
+ )
+ )
+
+ val bytes = serializer.serialize(input)
+ val result = serializer.deserialize(bytes, classOf[SparkPlanGraphWrapper])
+ assert(result.executionId == input.executionId)
+ assert(result.nodes.size == input.nodes.size)
+
+ def compareNodes(n1: SparkPlanGraphNodeWrapper, n2:
SparkPlanGraphNodeWrapper): Unit = {
+ assert(n1.node.id == n2.node.id)
+ assert(n1.node.name == n2.node.name)
+ assert(n1.node.desc == n2.node.desc)
+
+ assert(n1.node.metrics.size == n2.node.metrics.size)
+ n1.node.metrics.zip(n2.node.metrics).foreach { case (m1, m2) =>
+ assert(m1.name == m2.name)
+ assert(m1.accumulatorId == m2.accumulatorId)
+ assert(m1.metricType == m2.metricType)
+ }
+
+ assert(n1.cluster.id == n2.cluster.id)
+ assert(n1.cluster.name == n2.cluster.name)
+ assert(n1.cluster.desc == n2.cluster.desc)
+ assert(n1.cluster.nodes.size == n2.cluster.nodes.size)
+ n1.cluster.nodes.zip(n2.cluster.nodes).foreach { case (n3, n4) =>
+ compareNodes(n3, n4)
+ }
+ n1.cluster.metrics.zip(n2.cluster.metrics).foreach { case (m1, m2) =>
+ assert(m1.name == m2.name)
+ assert(m1.accumulatorId == m2.accumulatorId)
+ assert(m1.metricType == m2.metricType)
+ }
+ }
+
+ result.nodes.zip(input.nodes).foreach { case (n1, n2) =>
+ compareNodes(n1, n2)
+ }
+
+ assert(result.edges.size == input.edges.size)
+ result.edges.zip(input.edges).foreach { case (e1, e2) =>
+ assert(e1.fromId == e2.fromId)
+ assert(e1.toId == e2.toId)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]