This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 984cfa162da [SPARK-42139][CORE][SQL] Handle null string values in
SQLExecutionUIData/SparkPlanGraphWrapper/SQLPlanMetric
984cfa162da is described below
commit 984cfa162da99a19dd169ae5ecc4b568e11fe4b1
Author: yangjie01 <[email protected]>
AuthorDate: Sun Jan 22 13:48:41 2023 -0800
[SPARK-42139][CORE][SQL] Handle null string values in
SQLExecutionUIData/SparkPlanGraphWrapper/SQLPlanMetric
### What changes were proposed in this pull request?
Similar to #39666, this PR handles null string values in
SQLExecutionUIData/SparkPlanGraphWrapper/SQLPlanMetric
### Why are the changes needed?
Properly handles null string values in the protobuf serializer.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
New UTs
Closes #39682 from LuciferYang/SPARK-42139.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 18 ++--
.../sql/SQLExecutionUIDataSerializer.scala | 15 +--
.../protobuf/sql/SQLPlanMetricSerializer.scala | 18 ++--
.../sql/SparkPlanGraphWrapperSerializer.scala | 18 ++--
.../sql/KVStoreProtobufSerializerSuite.scala | 118 ++++++++++++++-------
5 files changed, 121 insertions(+), 66 deletions(-)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 155e73de056..ab6861057c9 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -403,17 +403,17 @@ message ExecutorSummaryWrapper {
}
message SQLPlanMetric {
- string name = 1;
+ optional string name = 1;
int64 accumulator_id = 2;
- string metric_type = 3;
+ optional string metric_type = 3;
}
message SQLExecutionUIData {
int64 execution_id = 1;
int64 root_execution_id = 2;
- string description = 3;
- string details = 4;
- string physical_plan_description = 5;
+ optional string description = 3;
+ optional string details = 4;
+ optional string physical_plan_description = 5;
map<string, string> modified_configs = 6;
repeated SQLPlanMetric metrics = 7;
int64 submission_time = 8;
@@ -427,15 +427,15 @@ message SQLExecutionUIData {
message SparkPlanGraphNode {
int64 id = 1;
- string name = 2;
- string desc = 3;
+ optional string name = 2;
+ optional string desc = 3;
repeated SQLPlanMetric metrics = 4;
}
message SparkPlanGraphClusterWrapper {
int64 id = 1;
- string name = 2;
- string desc = 3;
+ optional string name = 2;
+ optional string desc = 3;
repeated SparkPlanGraphNodeWrapper nodes = 4;
repeated SQLPlanMetric metrics = 5;
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
index dc76ab9a4e9..f0cdca985b7 100644
---
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
@@ -23,7 +23,7 @@ import collection.JavaConverters._
import org.apache.spark.sql.execution.ui.SQLExecutionUIData
import org.apache.spark.status.protobuf.{JobExecutionStatusSerializer,
ProtobufSerDe, StoreTypes}
-import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.status.protobuf.Utils._
class SQLExecutionUIDataSerializer extends ProtobufSerDe[SQLExecutionUIData] {
@@ -31,9 +31,9 @@ class SQLExecutionUIDataSerializer extends
ProtobufSerDe[SQLExecutionUIData] {
val builder = StoreTypes.SQLExecutionUIData.newBuilder()
builder.setExecutionId(ui.executionId)
builder.setRootExecutionId(ui.rootExecutionId)
- Option(ui.description).foreach(builder.setDescription)
- Option(ui.details).foreach(builder.setDetails)
-
Option(ui.physicalPlanDescription).foreach(builder.setPhysicalPlanDescription)
+ setStringField(ui.description, builder.setDescription)
+ setStringField(ui.details, builder.setDetails)
+ setStringField(ui.physicalPlanDescription,
builder.setPhysicalPlanDescription)
if (ui.modifiedConfigs != null) {
ui.modifiedConfigs.foreach {
case (k, v) => builder.putModifiedConfigs(k, v)
@@ -81,9 +81,10 @@ class SQLExecutionUIDataSerializer extends
ProtobufSerDe[SQLExecutionUIData] {
new SQLExecutionUIData(
executionId = ui.getExecutionId,
rootExecutionId = ui.getRootExecutionId,
- description = ui.getDescription,
- details = ui.getDetails,
- physicalPlanDescription = ui.getPhysicalPlanDescription,
+ description = getStringField(ui.hasDescription, () => ui.getDescription),
+ details = getStringField(ui.hasDetails, () => ui.getDetails),
+ physicalPlanDescription =
+ getStringField(ui.hasPhysicalPlanDescription, () =>
ui.getPhysicalPlanDescription),
modifiedConfigs = ui.getModifiedConfigsMap.asScala.toMap,
metrics = metrics,
submissionTime = ui.getSubmissionTime,
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala
index 8886bba2f92..88ba51c52b4 100644
---
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala
@@ -19,18 +19,24 @@ package org.apache.spark.status.protobuf.sql
import org.apache.spark.sql.execution.ui.SQLPlanMetric
import org.apache.spark.status.protobuf.StoreTypes
+import org.apache.spark.status.protobuf.Utils._
+import org.apache.spark.util.Utils.weakIntern
object SQLPlanMetricSerializer {
def serialize(metric: SQLPlanMetric): StoreTypes.SQLPlanMetric = {
- StoreTypes.SQLPlanMetric.newBuilder()
- .setName(metric.name)
- .setAccumulatorId(metric.accumulatorId)
- .setMetricType(metric.metricType)
- .build()
+ val builder = StoreTypes.SQLPlanMetric.newBuilder()
+ setStringField(metric.name, builder.setName)
+ builder.setAccumulatorId(metric.accumulatorId)
+ setStringField(metric.metricType, builder.setMetricType)
+ builder.build()
}
def deserialize(metrics: StoreTypes.SQLPlanMetric): SQLPlanMetric = {
- SQLPlanMetric(metrics.getName, metrics.getAccumulatorId,
metrics.getMetricType)
+ SQLPlanMetric(
+ name = getStringField(metrics.hasName, () =>
weakIntern(metrics.getName)),
+ accumulatorId = metrics.getAccumulatorId,
+ metricType = getStringField(metrics.hasMetricType, () =>
weakIntern(metrics.getMetricType))
+ )
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
index 1df82e3246a..bff5c0d7619 100644
---
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
@@ -22,6 +22,8 @@ import collection.JavaConverters._
import org.apache.spark.sql.execution.ui.{SparkPlanGraphClusterWrapper,
SparkPlanGraphEdge, SparkPlanGraphNode, SparkPlanGraphNodeWrapper,
SparkPlanGraphWrapper}
import org.apache.spark.status.protobuf.ProtobufSerDe
import org.apache.spark.status.protobuf.StoreTypes
+import org.apache.spark.status.protobuf.Utils.{getStringField, setStringField}
+import org.apache.spark.util.Utils.weakIntern
class SparkPlanGraphWrapperSerializer extends
ProtobufSerDe[SparkPlanGraphWrapper] {
@@ -92,8 +94,8 @@ class SparkPlanGraphWrapperSerializer extends
ProtobufSerDe[SparkPlanGraphWrappe
StoreTypes.SparkPlanGraphNode = {
val builder = StoreTypes.SparkPlanGraphNode.newBuilder()
builder.setId(node.id)
- builder.setName(node.name)
- builder.setDesc(node.desc)
+ setStringField(node.name, builder.setName)
+ setStringField(node.desc, builder.setDesc)
node.metrics.foreach { metric =>
builder.addMetrics(SQLPlanMetricSerializer.serialize(metric))
}
@@ -105,8 +107,8 @@ class SparkPlanGraphWrapperSerializer extends
ProtobufSerDe[SparkPlanGraphWrappe
new SparkPlanGraphNode(
id = node.getId,
- name = node.getName,
- desc = node.getDesc,
+ name = getStringField(node.hasName, () => weakIntern(node.getName)),
+ desc = getStringField(node.hasDesc, () => node.getDesc),
metrics =
node.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize)
)
}
@@ -115,8 +117,8 @@ class SparkPlanGraphWrapperSerializer extends
ProtobufSerDe[SparkPlanGraphWrappe
StoreTypes.SparkPlanGraphClusterWrapper = {
val builder = StoreTypes.SparkPlanGraphClusterWrapper.newBuilder()
builder.setId(cluster.id)
- builder.setName(cluster.name)
- builder.setDesc(cluster.desc)
+ setStringField(cluster.name, builder.setName)
+ setStringField(cluster.desc, builder.setDesc)
cluster.nodes.foreach { node =>
builder.addNodes(serializeSparkPlanGraphNodeWrapper(node))
}
@@ -131,8 +133,8 @@ class SparkPlanGraphWrapperSerializer extends
ProtobufSerDe[SparkPlanGraphWrappe
new SparkPlanGraphClusterWrapper(
id = cluster.getId,
- name = cluster.getName,
- desc = cluster.getDesc,
+ name = getStringField(cluster.hasName, () =>
weakIntern(cluster.getName)),
+ desc = getStringField(cluster.hasDesc, () => cluster.getDesc),
nodes =
cluster.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper),
metrics =
cluster.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize)
)
diff --git
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
index 41f185900ad..c220ca1c96f 100644
---
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
@@ -30,22 +30,39 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
private val serializer = new KVStoreProtobufSerializer()
test("SQLExecutionUIData") {
- val input = SqlResourceSuite.sqlExecutionUIData
- val bytes = serializer.serialize(input)
- val result = serializer.deserialize(bytes, classOf[SQLExecutionUIData])
- assert(result.executionId == input.executionId)
- assert(result.rootExecutionId == input.rootExecutionId)
- assert(result.description == input.description)
- assert(result.details == input.details)
- assert(result.physicalPlanDescription == input.physicalPlanDescription)
- assert(result.modifiedConfigs == input.modifiedConfigs)
- assert(result.metrics == input.metrics)
- assert(result.submissionTime == input.submissionTime)
- assert(result.completionTime == input.completionTime)
- assert(result.errorMessage == input.errorMessage)
- assert(result.jobs == input.jobs)
- assert(result.stages == input.stages)
- assert(result.metricValues == input.metricValues)
+ val normal = SqlResourceSuite.sqlExecutionUIData
+ val withNull = new SQLExecutionUIData(
+ executionId = normal.executionId,
+ rootExecutionId = normal.rootExecutionId,
+ description = null,
+ details = null,
+ physicalPlanDescription = null,
+ modifiedConfigs = normal.modifiedConfigs,
+ metrics = Seq(SQLPlanMetric(null, 0, null)),
+ submissionTime = normal.submissionTime,
+ completionTime = normal.completionTime,
+ errorMessage = normal.errorMessage,
+ jobs = normal.jobs,
+ stages = normal.stages,
+ metricValues = normal.metricValues
+ )
+ Seq(normal, withNull).foreach { input =>
+ val bytes = serializer.serialize(input)
+ val result = serializer.deserialize(bytes, classOf[SQLExecutionUIData])
+ assert(result.executionId == input.executionId)
+ assert(result.rootExecutionId == input.rootExecutionId)
+ assert(result.description == input.description)
+ assert(result.details == input.details)
+ assert(result.physicalPlanDescription == input.physicalPlanDescription)
+ assert(result.modifiedConfigs == input.modifiedConfigs)
+ assert(result.metrics == input.metrics)
+ assert(result.submissionTime == input.submissionTime)
+ assert(result.completionTime == input.completionTime)
+ assert(result.errorMessage == input.errorMessage)
+ assert(result.jobs == input.jobs)
+ assert(result.stages == input.stages)
+ assert(result.metricValues == input.metricValues)
+ }
}
test("SQLExecutionUIData with metricValues is empty map and null") {
@@ -93,30 +110,59 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite
{
}
test("Spark Plan Graph") {
+ val node0: SparkPlanGraphNodeWrapper = new SparkPlanGraphNodeWrapper(
+ node = new SparkPlanGraphNode(
+ id = 12,
+ name = "name_12",
+ desc = "desc_12",
+ metrics = Seq(
+ SQLPlanMetric(
+ name = "name_13",
+ accumulatorId = 13,
+ metricType = "metric_13"
+ ),
+ SQLPlanMetric(
+ name = "name_14",
+ accumulatorId = 14,
+ metricType = "metric_14"
+ )
+ )
+ ),
+ cluster = null
+ )
+
+ val node1: SparkPlanGraphNodeWrapper = new SparkPlanGraphNodeWrapper(
+ node = new SparkPlanGraphNode(
+ id = 13,
+ name = null,
+ desc = null,
+ metrics = Seq(
+ SQLPlanMetric(
+ name = null,
+ accumulatorId = 13,
+ metricType = null
+ )
+ )
+ ),
+ cluster = null
+ )
+
+ val node2: SparkPlanGraphNodeWrapper = new SparkPlanGraphNodeWrapper(
+ node = null,
+ cluster = new SparkPlanGraphClusterWrapper(
+ id = 6,
+ name = null,
+ desc = null,
+ nodes = Seq.empty,
+ metrics = Seq.empty
+ )
+ )
+
val cluster = new SparkPlanGraphClusterWrapper(
id = 5,
name = "name_5",
desc = "desc_5",
- nodes = Seq(new SparkPlanGraphNodeWrapper(
- node = new SparkPlanGraphNode(
- id = 12,
- name = "name_12",
- desc = "desc_12",
- metrics = Seq(
- SQLPlanMetric(
- name = "name_13",
- accumulatorId = 13,
- metricType = "metric_13"
- ),
- SQLPlanMetric(
- name = "name_14",
- accumulatorId = 14,
- metricType = "metric_14"
- )
- )
- ),
- cluster = null
- )),
+ nodes = Seq(node0, node1, node2),
metrics = Seq(
SQLPlanMetric(
name = "name_6",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]