This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 73b0c71fe33 [SPARK-42153][UI] Handle null string values in
PairStrings/RDDOperationNode/RDDOperationClusterWrapper
73b0c71fe33 is described below
commit 73b0c71fe33704dd640a6008342358da67f50390
Author: Gengliang Wang <[email protected]>
AuthorDate: Sat Jan 21 20:09:26 2023 -0800
[SPARK-42153][UI] Handle null string values in
PairStrings/RDDOperationNode/RDDOperationClusterWrapper
### What changes were proposed in this pull request?
Similar to #39666, this PR handles null string values in
PairStrings/RDDOperationNode/RDDOperationClusterWrapper
### Why are the changes needed?
Properly handles null string values in the protobuf serializer.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
New UTs
Closes #39696 from gengliangwang/moreNull3.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 12 ++---
...plicationEnvironmentInfoWrapperSerializer.scala | 7 +--
.../RDDOperationGraphWrapperSerializer.scala | 17 ++++---
.../protobuf/KVStoreProtobufSerializerSuite.scala | 57 +++++++++++++++-------
4 files changed, 59 insertions(+), 34 deletions(-)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 49076790321..155e73de056 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -185,8 +185,8 @@ message RuntimeInfo {
}
message PairStrings {
- string value1 = 1;
- string value2 = 2;
+ optional string value1 = 1;
+ optional string value2 = 2;
}
message ApplicationEnvironmentInfo {
@@ -472,16 +472,16 @@ enum DeterministicLevel {
message RDDOperationNode {
int32 id = 1;
- string name = 2;
+ optional string name = 2;
bool cached = 3;
bool barrier = 4;
- string callsite = 5;
+ optional string callsite = 5;
DeterministicLevel output_deterministic_level = 6;
}
message RDDOperationClusterWrapper {
- string id = 1;
- string name = 2;
+ optional string id = 1;
+ optional string name = 2;
repeated RDDOperationNode child_nodes = 3;
repeated RDDOperationClusterWrapper child_clusters = 4;
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
index fbbc55387b8..63c8387a8db 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
@@ -81,7 +81,8 @@ class ApplicationEnvironmentInfoWrapperSerializer
scalaVersion = getStringField(rt.hasScalaVersion, () =>
rt.getScalaVersion)
)
val pairSSToTuple = (pair: StoreTypes.PairStrings) => {
- (pair.getValue1, pair.getValue2)
+ (getStringField(pair.hasValue1, pair.getValue1),
+ getStringField(pair.hasValue2, pair.getValue2))
}
new ApplicationEnvironmentInfo(
runtime = runtime,
@@ -97,8 +98,8 @@ class ApplicationEnvironmentInfoWrapperSerializer
private def serializePairStrings(pair: (String, String)):
StoreTypes.PairStrings = {
val builder = StoreTypes.PairStrings.newBuilder()
- builder.setValue1(pair._1)
- builder.setValue2(pair._2)
+ setStringField(pair._1, builder.setValue1)
+ setStringField(pair._2, builder.setValue2)
builder.build()
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
index f822ed1889a..3187b255d4c 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.rdd.DeterministicLevel
import org.apache.spark.status.{RDDOperationClusterWrapper,
RDDOperationGraphWrapper}
import org.apache.spark.status.protobuf.StoreTypes.{DeterministicLevel =>
GDeterministicLevel}
+import org.apache.spark.status.protobuf.Utils.{getStringField, setStringField}
import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
class RDDOperationGraphWrapperSerializer extends
ProtobufSerDe[RDDOperationGraphWrapper] {
@@ -56,8 +57,8 @@ class RDDOperationGraphWrapperSerializer extends
ProtobufSerDe[RDDOperationGraph
private def serializeRDDOperationClusterWrapper(op:
RDDOperationClusterWrapper):
StoreTypes.RDDOperationClusterWrapper = {
val builder = StoreTypes.RDDOperationClusterWrapper.newBuilder()
- builder.setId(op.id)
- builder.setName(op.name)
+ setStringField(op.id, builder.setId)
+ setStringField(op.name, builder.setName)
op.childNodes.foreach { node =>
builder.addChildNodes(serializeRDDOperationNode(node))
}
@@ -70,8 +71,8 @@ class RDDOperationGraphWrapperSerializer extends
ProtobufSerDe[RDDOperationGraph
private def deserializeRDDOperationClusterWrapper(op:
StoreTypes.RDDOperationClusterWrapper):
RDDOperationClusterWrapper = {
new RDDOperationClusterWrapper(
- id = op.getId,
- name = op.getName,
+ id = getStringField(op.hasId, op.getId),
+ name = getStringField(op.hasName, op.getName),
childNodes =
op.getChildNodesList.asScala.map(deserializeRDDOperationNode),
childClusters =
op.getChildClustersList.asScala.map(deserializeRDDOperationClusterWrapper)
@@ -83,10 +84,10 @@ class RDDOperationGraphWrapperSerializer extends
ProtobufSerDe[RDDOperationGraph
node.outputDeterministicLevel)
val builder = StoreTypes.RDDOperationNode.newBuilder()
builder.setId(node.id)
- builder.setName(node.name)
+ setStringField(node.name, builder.setName)
+ setStringField(node.callsite, builder.setCallsite)
builder.setCached(node.cached)
builder.setBarrier(node.barrier)
- builder.setCallsite(node.callsite)
builder.setOutputDeterministicLevel(outputDeterministicLevel)
builder.build()
}
@@ -94,10 +95,10 @@ class RDDOperationGraphWrapperSerializer extends
ProtobufSerDe[RDDOperationGraph
private def deserializeRDDOperationNode(node: StoreTypes.RDDOperationNode):
RDDOperationNode = {
RDDOperationNode(
id = node.getId,
- name = node.getName,
+ name = getStringField(node.hasName, node.getName),
cached = node.getCached,
barrier = node.getBarrier,
- callsite = node.getCallsite,
+ callsite = getStringField(node.hasCallsite, node.getCallsite),
outputDeterministicLevel = DeterministicLevelSerializer.deserialize(
node.getOutputDeterministicLevel)
)
diff --git
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index 14dd2cd601d..d4c79adf2ec 100644
---
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -259,9 +259,11 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite
{
javaVersion = javaVersion,
javaHome = javaHome,
scalaVersion = scalaVersion),
- sparkProperties = Seq(("spark.conf.1", "1"), ("spark.conf.2", "2")),
- hadoopProperties = Seq(("hadoop.conf.conf1", "1"), ("hadoop.conf2",
"val2")),
- systemProperties = Seq(("sys.prop.1", "value1"), ("sys.prop.2",
"value2")),
+ sparkProperties = Seq(("spark.conf.1", "1"), ("spark.conf.2", "2"),
(null, null)),
+ hadoopProperties =
+ Seq(("hadoop.conf.conf1", "1"), ("hadoop.conf2", "val2"), (null,
"val3")),
+ systemProperties =
+ Seq(("sys.prop.1", "value1"), ("sys.prop.2", "value2"),
("sys.prop.3", null)),
metricsProperties = Seq(("metric.1", "klass1"), ("metric2", "klass2")),
classpathEntries = Seq(("/jar1", "System"), ("/jar2", "User")),
resourceProfiles = Seq(new ResourceProfileInfo(
@@ -875,20 +877,41 @@ class KVStoreProtobufSerializerSuite extends
SparkFunSuite {
cached = true,
barrier = false,
callsite = "callsite_1",
- outputDeterministicLevel = DeterministicLevel.INDETERMINATE)),
- childClusters = Seq(new RDDOperationClusterWrapper(
- id = "id_1",
- name = "name1",
- childNodes = Seq(
- RDDOperationNode(
- id = 15,
- name = "name3",
- cached = false,
- barrier = true,
- callsite = "callsite_2",
- outputDeterministicLevel = DeterministicLevel.UNORDERED)),
- childClusters = Seq.empty
- ))
+ outputDeterministicLevel = DeterministicLevel.INDETERMINATE),
+ RDDOperationNode(
+ id = 20,
+ name = null,
+ cached = true,
+ barrier = false,
+ callsite = null,
+ outputDeterministicLevel = DeterministicLevel.DETERMINATE)),
+ childClusters = Seq(
+ new RDDOperationClusterWrapper(
+ id = "id_1",
+ name = "name1",
+ childNodes = Seq(
+ RDDOperationNode(
+ id = 15,
+ name = "name3",
+ cached = false,
+ barrier = true,
+ callsite = "callsite_2",
+ outputDeterministicLevel = DeterministicLevel.UNORDERED)),
+ childClusters = Seq.empty
+ ),
+ new RDDOperationClusterWrapper(
+ id = null,
+ name = null,
+ childNodes = Seq(
+ RDDOperationNode(
+ id = 21,
+ name = null,
+ cached = false,
+ barrier = true,
+ callsite = null,
+ outputDeterministicLevel = DeterministicLevel.UNORDERED)),
+ childClusters = Seq.empty
+ ))
)
)
val bytes = serializer.serialize(input)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]