This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a28f08d77f8 [SPARK-41432][UI][SQL] Protobuf serializer for SparkPlanGraphWrapper a28f08d77f8 is described below commit a28f08d77f873fb70eee5af553630c0eab536c55 Author: Sandeep Singh <sand...@techaddict.me> AuthorDate: Wed Dec 28 15:27:36 2022 -0800 [SPARK-41432][UI][SQL] Protobuf serializer for SparkPlanGraphWrapper ### What changes were proposed in this pull request? Add Protobuf serializer for SparkPlanGraphWrapper ### Why are the changes needed? Support fast and compact serialization/deserialization for SparkPlanGraphWrapper over RocksDB. ### Does this PR introduce any user-facing change? No ### How was this patch tested? New UT Closes #39164 from techaddict/SPARK-41432-SparkPlanGraphWrapper. Authored-by: Sandeep Singh <sand...@techaddict.me> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/status/protobuf/store_types.proto | 31 +++++ .../org.apache.spark.status.protobuf.ProtobufSerDe | 1 + .../sql/SparkPlanGraphWrapperSerializer.scala | 134 +++++++++++++++++++++ .../sql/KVStoreProtobufSerializerSuite.scala | 128 +++++++++++++++++++- 4 files changed, 293 insertions(+), 1 deletion(-) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 9f9d87e9aae..22e22eea1a2 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -390,3 +390,34 @@ message SQLExecutionUIData { repeated int64 stages = 11; map<int64, string> metric_values = 12; } + +message SparkPlanGraphNode { + int64 id = 1; + string name = 2; + string desc = 3; + repeated SQLPlanMetric metrics = 4; +} + +message SparkPlanGraphClusterWrapper { + int64 id = 1; + string name = 2; + string desc = 3; + repeated SparkPlanGraphNodeWrapper nodes = 4; + repeated SQLPlanMetric metrics = 5; +} + +message SparkPlanGraphNodeWrapper { + SparkPlanGraphNode node = 1; + SparkPlanGraphClusterWrapper cluster = 2; +} + +message SparkPlanGraphEdge { + int64 from_id = 1; + int64 to_id = 2; +} + +message SparkPlanGraphWrapper { + int64 execution_id = 1; + repeated SparkPlanGraphNodeWrapper nodes = 2; + repeated SparkPlanGraphEdge edges = 3; +} diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe index de5f2c2d05c..3f0ae5470ce 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe @@ -16,3 +16,4 @@ # org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer +org.apache.spark.status.protobuf.sql.SparkPlanGraphWrapperSerializer diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala new file mode 100644 index 00000000000..49debedbb68 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf.sql + +import collection.JavaConverters._ + +import org.apache.spark.sql.execution.ui.{SparkPlanGraphClusterWrapper, SparkPlanGraphEdge, SparkPlanGraphNode, SparkPlanGraphNodeWrapper, SparkPlanGraphWrapper} +import org.apache.spark.status.protobuf.ProtobufSerDe +import org.apache.spark.status.protobuf.StoreTypes + +class SparkPlanGraphWrapperSerializer extends ProtobufSerDe { + + override val supportClass: Class[_] = classOf[SparkPlanGraphWrapper] + + override def serialize(input: Any): Array[Byte] = { + val plan = input.asInstanceOf[SparkPlanGraphWrapper] + val builder = StoreTypes.SparkPlanGraphWrapper.newBuilder() + builder.setExecutionId(plan.executionId) + plan.nodes.foreach { node => + builder.addNodes(serializeSparkPlanGraphNodeWrapper(node)) + } + plan.edges.foreach {edge => + builder.addEdges(serializeSparkPlanGraphEdge(edge)) + } + builder.build().toByteArray + } + + def deserialize(bytes: Array[Byte]): SparkPlanGraphWrapper = { + val wrapper = StoreTypes.SparkPlanGraphWrapper.parseFrom(bytes) + new SparkPlanGraphWrapper( + executionId = wrapper.getExecutionId, + nodes = wrapper.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper).toSeq, + edges = wrapper.getEdgesList.asScala.map(deserializeSparkPlanGraphEdge).toSeq + ) + } + + private def serializeSparkPlanGraphNodeWrapper(input: SparkPlanGraphNodeWrapper): + StoreTypes.SparkPlanGraphNodeWrapper = { + + val builder = StoreTypes.SparkPlanGraphNodeWrapper.newBuilder() + builder.setNode(serializeSparkPlanGraphNode(input.node)) + builder.setCluster(serializeSparkPlanGraphClusterWrapper(input.cluster)) + builder.build() + } + + private def deserializeSparkPlanGraphNodeWrapper(input: StoreTypes.SparkPlanGraphNodeWrapper): + SparkPlanGraphNodeWrapper = { + + new SparkPlanGraphNodeWrapper( + node = deserializeSparkPlanGraphNode(input.getNode), + cluster = deserializeSparkPlanGraphClusterWrapper(input.getCluster) + ) + } + + private def serializeSparkPlanGraphEdge(edge: SparkPlanGraphEdge): + StoreTypes.SparkPlanGraphEdge = { + val builder = StoreTypes.SparkPlanGraphEdge.newBuilder() + builder.setFromId(edge.fromId) + builder.setToId(edge.toId) + builder.build() + } + + private def deserializeSparkPlanGraphEdge(edge: StoreTypes.SparkPlanGraphEdge): + SparkPlanGraphEdge = { + SparkPlanGraphEdge( + fromId = edge.getFromId, + toId = edge.getToId) + } + + private def serializeSparkPlanGraphNode(node: SparkPlanGraphNode): + StoreTypes.SparkPlanGraphNode = { + val builder = StoreTypes.SparkPlanGraphNode.newBuilder() + builder.setId(node.id) + builder.setName(node.name) + builder.setDesc(node.desc) + node.metrics.foreach { metric => + builder.addMetrics(SQLPlanMetricSerializer.serialize(metric)) + } + builder.build() + } + + private def deserializeSparkPlanGraphNode(node: StoreTypes.SparkPlanGraphNode): + SparkPlanGraphNode = { + + new SparkPlanGraphNode( + id = node.getId, + name = node.getName, + desc = node.getDesc, + metrics = node.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize).toSeq + ) + } + + private def serializeSparkPlanGraphClusterWrapper(cluster: SparkPlanGraphClusterWrapper): + StoreTypes.SparkPlanGraphClusterWrapper = { + val builder = StoreTypes.SparkPlanGraphClusterWrapper.newBuilder() + builder.setId(cluster.id) + builder.setName(cluster.name) + builder.setDesc(cluster.desc) + cluster.nodes.foreach { node => + builder.addNodes(serializeSparkPlanGraphNodeWrapper(node)) + } + cluster.metrics.foreach { metric => + builder.addMetrics(SQLPlanMetricSerializer.serialize(metric)) + } + builder.build() + } + + private def deserializeSparkPlanGraphClusterWrapper( + cluster: StoreTypes.SparkPlanGraphClusterWrapper): SparkPlanGraphClusterWrapper = { + + new SparkPlanGraphClusterWrapper( + id = cluster.getId, + name = cluster.getName, + desc = cluster.getDesc, + nodes = cluster.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper).toSeq, + metrics = cluster.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize).toSeq + ) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala index 9d6a938c3fe..90d04c3f013 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.status.protobuf.sql import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.execution.ui.SQLExecutionUIData +import org.apache.spark.sql.execution.ui._ import org.apache.spark.status.api.v1.sql.SqlResourceSuite import org.apache.spark.status.protobuf.KVStoreProtobufSerializer @@ -85,4 +85,130 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { // input.metricValues is null, result.metricValues is also empty map. assert(result2.metricValues.isEmpty) } + + test("Spark Plan Graph") { + val cluster = new SparkPlanGraphClusterWrapper( + id = 5, + name = "name_5", + desc = "desc_5", + nodes = Seq(new SparkPlanGraphNodeWrapper( + node = new SparkPlanGraphNode( + id = 12, + name = "name_12", + desc = "desc_12", + metrics = Seq( + SQLPlanMetric( + name = "name_13", + accumulatorId = 13, + metricType = "metric_13" + ), + SQLPlanMetric( + name = "name_14", + accumulatorId = 14, + metricType = "metric_14" + ) + ) + ), + cluster = new SparkPlanGraphClusterWrapper( + id = 15, + name = "name_15", + desc = "desc_15", + nodes = Seq(), + metrics = Seq( + SQLPlanMetric( + name = "name_16", + accumulatorId = 16, + metricType = "metric_16" + ), + SQLPlanMetric( + name = "name_17", + accumulatorId = 17, + metricType = "metric_17" + ) + ) + ) + )), + metrics = Seq( + SQLPlanMetric( + name = "name_6", + accumulatorId = 6, + metricType = "metric_6" + ), + SQLPlanMetric( + name = "name_7 d", + accumulatorId = 7, + metricType = "metric_7" + ) + ) + ) + val node = new SparkPlanGraphNodeWrapper( + node = new SparkPlanGraphNode( + id = 2, + name = "name_1", + desc = "desc_1", + metrics = Seq( + SQLPlanMetric( + name = "name_2", + accumulatorId = 3, + metricType = "metric_1" + ), + SQLPlanMetric( + name = "name_3", + accumulatorId = 4, + metricType = "metric_2" + ) + ) + ), + cluster = cluster + ) + val input = new SparkPlanGraphWrapper( + executionId = 1, + nodes = Seq(node), + edges = Seq( + SparkPlanGraphEdge(8, 9), + SparkPlanGraphEdge(10, 11) + ) + ) + + val bytes = serializer.serialize(input) + val result = serializer.deserialize(bytes, classOf[SparkPlanGraphWrapper]) + assert(result.executionId == input.executionId) + assert(result.nodes.size == input.nodes.size) + + def compareNodes(n1: SparkPlanGraphNodeWrapper, n2: SparkPlanGraphNodeWrapper): Unit = { + assert(n1.node.id == n2.node.id) + assert(n1.node.name == n2.node.name) + assert(n1.node.desc == n2.node.desc) + + assert(n1.node.metrics.size == n2.node.metrics.size) + n1.node.metrics.zip(n2.node.metrics).foreach { case (m1, m2) => + assert(m1.name == m2.name) + assert(m1.accumulatorId == m2.accumulatorId) + assert(m1.metricType == m2.metricType) + } + + assert(n1.cluster.id == n2.cluster.id) + assert(n1.cluster.name == n2.cluster.name) + assert(n1.cluster.desc == n2.cluster.desc) + assert(n1.cluster.nodes.size == n2.cluster.nodes.size) + n1.cluster.nodes.zip(n2.cluster.nodes).foreach { case (n3, n4) => + compareNodes(n3, n4) + } + n1.cluster.metrics.zip(n2.cluster.metrics).foreach { case (m1, m2) => + assert(m1.name == m2.name) + assert(m1.accumulatorId == m2.accumulatorId) + assert(m1.metricType == m2.metricType) + } + } + + result.nodes.zip(input.nodes).foreach { case (n1, n2) => + compareNodes(n1, n2) + } + + assert(result.edges.size == input.edges.size) + result.edges.zip(input.edges).foreach { case (e1, e2) => + assert(e1.fromId == e2.fromId) + assert(e1.toId == e2.toId) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org