This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c0769759f4f [SPARK-41432][UI][FOLLOWUP] Fix a bug in protobuf
serializer of SparkPlanGraphNodeWrapper
c0769759f4f is described below
commit c0769759f4fd3cbce859cde790dcd1df568cfd0b
Author: Gengliang Wang <[email protected]>
AuthorDate: Tue Jan 10 06:48:51 2023 -0800
[SPARK-41432][UI][FOLLOWUP] Fix a bug in protobuf serializer of
SparkPlanGraphNodeWrapper
### What changes were proposed in this pull request?
SparkPlanGraphNodeWrapper can only contain either a node or a cluster. In
the current implementation, both the node and cluster fields are not null. It
breaks the assertion of the method `toSparkPlanGraphNode`:
```scala
class SparkPlanGraphNodeWrapper(
val node: SparkPlanGraphNode,
val cluster: SparkPlanGraphClusterWrapper) {
def toSparkPlanGraphNode(): SparkPlanGraphNode = {
assert(node == null ^ cluster == null, "Exactly one of node, cluster
values to be set.")
if (node != null) node else cluster.toSparkPlanGraphCluster()
}
}
```
This PR is to fix the bug by using `oneof` in the protobuf definition of
`SparkPlanGraphNodeWrapper`
### Why are the changes needed?
Bug fix
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Update related UT. Also run SQLMetricsSuite with RocksDB backend enabled.
Closes #39471 from gengliangwang/fixGraph.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 6 +-
.../sql/SparkPlanGraphWrapperSerializer.scala | 24 ++++--
.../sql/KVStoreProtobufSerializerSuite.scala | 89 ++++++++--------------
3 files changed, 52 insertions(+), 67 deletions(-)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index e9aaad261f9..9001847e872 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -436,8 +436,10 @@ message SparkPlanGraphClusterWrapper {
}
message SparkPlanGraphNodeWrapper {
- SparkPlanGraphNode node = 1;
- SparkPlanGraphClusterWrapper cluster = 2;
+ oneof wrapper {
+ SparkPlanGraphNode node = 1;
+ SparkPlanGraphClusterWrapper cluster = 2;
+ }
}
message SparkPlanGraphEdge {
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
index db63fd6afe2..c68466489ce 100644
---
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
@@ -53,19 +53,27 @@ class SparkPlanGraphWrapperSerializer extends ProtobufSerDe
{
StoreTypes.SparkPlanGraphNodeWrapper = {
val builder = StoreTypes.SparkPlanGraphNodeWrapper.newBuilder()
- Option(input.node).foreach(node =>
builder.setNode(serializeSparkPlanGraphNode(node)))
- Option(input.cluster)
- .foreach(cluster =>
builder.setCluster(serializeSparkPlanGraphClusterWrapper(cluster)))
+ if (input.node != null) {
+ builder.setNode(serializeSparkPlanGraphNode(input.node))
+ } else {
+ builder.setCluster(serializeSparkPlanGraphClusterWrapper(input.cluster))
+ }
builder.build()
}
private def deserializeSparkPlanGraphNodeWrapper(input:
StoreTypes.SparkPlanGraphNodeWrapper):
SparkPlanGraphNodeWrapper = {
-
- new SparkPlanGraphNodeWrapper(
- node = deserializeSparkPlanGraphNode(input.getNode),
- cluster = deserializeSparkPlanGraphClusterWrapper(input.getCluster)
- )
+ if (input.hasNode) {
+ new SparkPlanGraphNodeWrapper(
+ node = deserializeSparkPlanGraphNode(input.getNode),
+ cluster = null
+ )
+ } else {
+ new SparkPlanGraphNodeWrapper(
+ node = null,
+ cluster = deserializeSparkPlanGraphClusterWrapper(input.getCluster)
+ )
+ }
}
private def serializeSparkPlanGraphEdge(edge: SparkPlanGraphEdge):
diff --git
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
index ddc693f1ee3..cfb5093611b 100644
---
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
@@ -114,24 +114,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite
{
)
)
),
- cluster = new SparkPlanGraphClusterWrapper(
- id = 15,
- name = "name_15",
- desc = "desc_15",
- nodes = Seq(),
- metrics = Seq(
- SQLPlanMetric(
- name = "name_16",
- accumulatorId = 16,
- metricType = "metric_16"
- ),
- SQLPlanMetric(
- name = "name_17",
- accumulatorId = 17,
- metricType = "metric_17"
- )
- )
- )
+ cluster = null
)),
metrics = Seq(
SQLPlanMetric(
@@ -147,23 +130,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite
{
)
)
val node = new SparkPlanGraphNodeWrapper(
- node = new SparkPlanGraphNode(
- id = 2,
- name = "name_1",
- desc = "desc_1",
- metrics = Seq(
- SQLPlanMetric(
- name = "name_2",
- accumulatorId = 3,
- metricType = "metric_1"
- ),
- SQLPlanMetric(
- name = "name_3",
- accumulatorId = 4,
- metricType = "metric_2"
- )
- )
- ),
+ node = null,
cluster = cluster
)
val input = new SparkPlanGraphWrapper(
@@ -181,29 +148,37 @@ class KVStoreProtobufSerializerSuite extends
SparkFunSuite {
assert(result.nodes.size == input.nodes.size)
def compareNodes(n1: SparkPlanGraphNodeWrapper, n2:
SparkPlanGraphNodeWrapper): Unit = {
- assert(n1.node.id == n2.node.id)
- assert(n1.node.name == n2.node.name)
- assert(n1.node.desc == n2.node.desc)
-
- assert(n1.node.metrics.size == n2.node.metrics.size)
- n1.node.metrics.zip(n2.node.metrics).foreach { case (m1, m2) =>
- assert(m1.name == m2.name)
- assert(m1.accumulatorId == m2.accumulatorId)
- assert(m1.metricType == m2.metricType)
- }
-
- assert(n1.cluster.id == n2.cluster.id)
- assert(n1.cluster.name == n2.cluster.name)
- assert(n1.cluster.desc == n2.cluster.desc)
- assert(n1.cluster.nodes.size == n2.cluster.nodes.size)
- n1.cluster.nodes.zip(n2.cluster.nodes).foreach { case (n3, n4) =>
- compareNodes(n3, n4)
- }
- n1.cluster.metrics.zip(n2.cluster.metrics).foreach { case (m1, m2) =>
- assert(m1.name == m2.name)
- assert(m1.accumulatorId == m2.accumulatorId)
- assert(m1.metricType == m2.metricType)
+ if (n1.node != null) {
+ assert(n2.node != null)
+ assert(n1.node.id == n2.node.id)
+ assert(n1.node.name == n2.node.name)
+ assert(n1.node.desc == n2.node.desc)
+
+ assert(n1.node.metrics.size == n2.node.metrics.size)
+ n1.node.metrics.zip(n2.node.metrics).foreach { case (m1, m2) =>
+ assert(m1.name == m2.name)
+ assert(m1.accumulatorId == m2.accumulatorId)
+ assert(m1.metricType == m2.metricType)
+ }
+ } else {
+ assert(n2.node == null)
+ assert(n1.cluster != null && n2.cluster != null)
+ assert(n1.cluster.id == n2.cluster.id)
+ assert(n1.cluster.name == n2.cluster.name)
+ assert(n1.cluster.desc == n2.cluster.desc)
+ assert(n1.cluster.nodes.size == n2.cluster.nodes.size)
+ n1.cluster.nodes.zip(n2.cluster.nodes).foreach { case (n3, n4) =>
+ compareNodes(n3, n4)
+ }
+ n1.cluster.metrics.zip(n2.cluster.metrics).foreach { case (m1, m2) =>
+ assert(m1.name == m2.name)
+ assert(m1.accumulatorId == m2.accumulatorId)
+ assert(m1.metricType == m2.metricType)
+ }
}
+ val metrics = Map(6L -> "a", 7L -> "b", 13L -> "c", 14L -> "d")
+ assert(n1.toSparkPlanGraphNode().makeDotNode(metrics) ==
+ n2.toSparkPlanGraphNode().makeDotNode(metrics))
}
result.nodes.zip(input.nodes).foreach { case (n1, n2) =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]