This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 7de461c0fd3 [SPARK-42178][UI] Handle remaining null string values in
ui protobuf serializer and add tests
7de461c0fd3 is described below
commit 7de461c0fd3fdf2e8bdc3b271cd70ef92e6119da
Author: Gengliang Wang <[email protected]>
AuthorDate: Wed Jan 25 10:53:54 2023 -0800
[SPARK-42178][UI] Handle remaining null string values in ui protobuf
serializer and add tests
### What changes were proposed in this pull request?
* Similar to https://github.com/apache/spark/pull/39666, handle remaining
null string values in ui protobuf serializer, including `RDDStorageInfo` and
`ResourceInformation`
* Add test to make sure all the string fields are defined as `optional
string`.
### Why are the changes needed?
Properly handles null string values in the protobuf serializer.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New UTs
Closes #39732 from gengliangwang/moreNullString.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 4 +-
.../ExecutorSummaryWrapperSerializer.scala | 8 ++--
.../protobuf/RDDStorageInfoWrapperSerializer.scala | 4 +-
.../protobuf/KVStoreProtobufSerializerSuite.scala | 47 +++++++++++++++++++---
4 files changed, 51 insertions(+), 12 deletions(-)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index c4f64f27e4a..94ce1b8b58a 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -264,7 +264,7 @@ message RDDStorageInfo {
optional string name = 2;
int32 num_partitions = 3;
int32 num_cached_partitions = 4;
- string storage_level = 5;
+ optional string storage_level = 5;
int64 memory_used = 6;
int64 disk_used = 7;
repeated RDDDataDistribution data_distribution = 8;
@@ -360,7 +360,7 @@ message MemoryMetrics {
}
message ResourceInformation {
- string name = 1;
+ optional string name = 1;
repeated string addresses = 2;
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
index 9ada2bb0065..e3585feeb44 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
@@ -163,15 +163,17 @@ class ExecutorSummaryWrapperSerializer extends
ProtobufSerDe[ExecutorSummaryWrap
private def serializeResourceInformation(info: ResourceInformation):
StoreTypes.ResourceInformation = {
val builder = StoreTypes.ResourceInformation.newBuilder()
- builder.setName(info.name)
- info.addresses.foreach(builder.addAddresses)
+ setStringField(info.name, builder.setName)
+ if (info.addresses != null) {
+ info.addresses.foreach(builder.addAddresses)
+ }
builder.build()
}
private def deserializeResourceInformation(binary:
StoreTypes.ResourceInformation):
ResourceInformation = {
new ResourceInformation(
- name = weakIntern(binary.getName),
+ name = getStringField(binary.hasName, () => weakIntern(binary.getName)),
addresses = binary.getAddressesList.asScala.map(weakIntern).toArray)
}
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
index 06dbb485000..fef8c5d478e 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
@@ -45,7 +45,7 @@ class RDDStorageInfoWrapperSerializer extends
ProtobufSerDe[RDDStorageInfoWrappe
setStringField(info.name, builder.setName)
builder.setNumPartitions(info.numPartitions)
builder.setNumCachedPartitions(info.numCachedPartitions)
- builder.setStorageLevel(info.storageLevel)
+ setStringField(info.storageLevel, builder.setStorageLevel)
builder.setMemoryUsed(info.memoryUsed)
builder.setDiskUsed(info.diskUsed)
@@ -85,7 +85,7 @@ class RDDStorageInfoWrapperSerializer extends
ProtobufSerDe[RDDStorageInfoWrappe
name = getStringField(info.hasName, info.getName),
numPartitions = info.getNumPartitions,
numCachedPartitions = info.getNumCachedPartitions,
- storageLevel = info.getStorageLevel,
+ storageLevel = getStringField(info.hasStorageLevel,
info.getStorageLevel),
memoryUsed = info.getMemoryUsed,
diskUsed = info.getDiskUsed,
dataDistribution =
diff --git
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index de2021fb60e..0849d63b03e 100644
---
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -19,6 +19,9 @@ package org.apache.spark.status.protobuf
import java.util.Date
+import scala.collection.mutable
+import scala.io.Source
+
import org.apache.spark.{JobExecutionStatus, SparkFunSuite}
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.metrics.ExecutorMetricType
@@ -27,10 +30,39 @@ import org.apache.spark.resource.{ExecutorResourceRequest,
ResourceInformation,
import org.apache.spark.status._
import org.apache.spark.status.api.v1._
import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
+import org.apache.spark.util.Utils.tryWithResource
class KVStoreProtobufSerializerSuite extends SparkFunSuite {
private val serializer = new KVStoreProtobufSerializer()
+ test("All the string fields must be optional to avoid NPE") {
+ val protoFile = getWorkspaceFilePath(
+ "core", "src", "main", "protobuf", "org", "apache", "spark", "status",
"protobuf",
+ "store_types.proto")
+
+ val containsStringRegex = "\\s*string .*"
+ val invalidDefinition = new mutable.ArrayBuffer[(String, Int)]()
+ var lineNumber = 1
+ tryWithResource(Source.fromFile(protoFile.toFile.getCanonicalPath)) { file
=>
+ file.getLines().foreach { line =>
+ if (line.matches(containsStringRegex)) {
+ invalidDefinition.append((line, lineNumber))
+ }
+ lineNumber += 1
+ }
+ }
+ val errorMessage = new StringBuilder()
+ errorMessage.append(
+ """
+ |All the string fields should be defined as `optional string` for
handling null string.
+ |Please update the following fields:
+ |""".stripMargin)
+ invalidDefinition.foreach { case (line, num) =>
+ errorMessage.append(s"line #$num: $line\n")
+ }
+ assert(invalidDefinition.isEmpty, errorMessage)
+ }
+
test("Job data") {
Seq(
("test", Some("test description"), Some("group")),
@@ -463,7 +495,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
name = null,
numPartitions = 8,
numCachedPartitions = 5,
- storageLevel = "IN_MEMORY",
+ storageLevel = null,
memoryUsed = 100,
diskUsed = 2560,
dataDistribution = None,
@@ -714,7 +746,8 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
val peakMemoryMetric =
Some(new ExecutorMetrics(Array(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L,
1024L)))
val resources =
- Map("resource1" -> new ResourceInformation("re1", Array("add1", "add2")))
+ Map("resource1" -> new ResourceInformation("re1", Array("add1", "add2")),
+ "resource1" -> new ResourceInformation(null, null))
Seq(("id_1", "localhost:7777"), (null, "")).foreach { case (id, hostPort)
=>
val input = new ExecutorSummaryWrapper(
info = new ExecutorSummary(
@@ -816,9 +849,13 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite
{
result.info.resources.keys.foreach { k =>
assert(input.info.resources.contains(k))
assert(result.info.resources(k).name == input.info.resources(k).name)
-
result.info.resources(k).addresses.zip(input.info.resources(k).addresses).foreach
{
- case (a1, a2) =>
- assert(a1 == a2)
+ if (input.info.resources(k).addresses != null) {
+
result.info.resources(k).addresses.zip(input.info.resources(k).addresses).foreach
{
+ case (a1, a2) =>
+ assert(a1 == a2)
+ }
+ } else {
+ assert(result.info.resources(k).addresses.isEmpty)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]