This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 7de461c0fd3 [SPARK-42178][UI] Handle remaining null string values in 
ui protobuf serializer and add tests
7de461c0fd3 is described below

commit 7de461c0fd3fdf2e8bdc3b271cd70ef92e6119da
Author: Gengliang Wang <[email protected]>
AuthorDate: Wed Jan 25 10:53:54 2023 -0800

    [SPARK-42178][UI] Handle remaining null string values in ui protobuf 
serializer and add tests
    
    ### What changes were proposed in this pull request?
    
    * Similar to https://github.com/apache/spark/pull/39666, handle remaining 
null string values in ui protobuf serializer, including `RDDStorageInfo` and 
`ResourceInformation`
    * Add test to make sure all the string fields are defined as `optional 
string`.
    
    ### Why are the changes needed?
    
    Properly handles null string values in the protobuf serializer.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    ### How was this patch tested?
    
    New UTs
    
    Closes #39732 from gengliangwang/moreNullString.
    
    Authored-by: Gengliang Wang <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../apache/spark/status/protobuf/store_types.proto |  4 +-
 .../ExecutorSummaryWrapperSerializer.scala         |  8 ++--
 .../protobuf/RDDStorageInfoWrapperSerializer.scala |  4 +-
 .../protobuf/KVStoreProtobufSerializerSuite.scala  | 47 +++++++++++++++++++---
 4 files changed, 51 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index c4f64f27e4a..94ce1b8b58a 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -264,7 +264,7 @@ message RDDStorageInfo {
   optional string name = 2;
   int32 num_partitions = 3;
   int32 num_cached_partitions = 4;
-  string storage_level = 5;
+  optional string storage_level = 5;
   int64 memory_used = 6;
   int64 disk_used = 7;
   repeated RDDDataDistribution data_distribution = 8;
@@ -360,7 +360,7 @@ message MemoryMetrics {
 }
 
 message ResourceInformation {
-  string name = 1;
+  optional string name = 1;
   repeated string addresses = 2;
 }
 
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
index 9ada2bb0065..e3585feeb44 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
@@ -163,15 +163,17 @@ class ExecutorSummaryWrapperSerializer extends 
ProtobufSerDe[ExecutorSummaryWrap
   private def serializeResourceInformation(info: ResourceInformation):
     StoreTypes.ResourceInformation = {
     val builder = StoreTypes.ResourceInformation.newBuilder()
-    builder.setName(info.name)
-    info.addresses.foreach(builder.addAddresses)
+    setStringField(info.name, builder.setName)
+    if (info.addresses != null) {
+      info.addresses.foreach(builder.addAddresses)
+    }
     builder.build()
   }
 
   private def deserializeResourceInformation(binary: 
StoreTypes.ResourceInformation):
     ResourceInformation = {
     new ResourceInformation(
-      name = weakIntern(binary.getName),
+      name = getStringField(binary.hasName, () => weakIntern(binary.getName)),
       addresses = binary.getAddressesList.asScala.map(weakIntern).toArray)
   }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
index 06dbb485000..fef8c5d478e 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
@@ -45,7 +45,7 @@ class RDDStorageInfoWrapperSerializer extends 
ProtobufSerDe[RDDStorageInfoWrappe
     setStringField(info.name, builder.setName)
     builder.setNumPartitions(info.numPartitions)
     builder.setNumCachedPartitions(info.numCachedPartitions)
-    builder.setStorageLevel(info.storageLevel)
+    setStringField(info.storageLevel, builder.setStorageLevel)
     builder.setMemoryUsed(info.memoryUsed)
     builder.setDiskUsed(info.diskUsed)
 
@@ -85,7 +85,7 @@ class RDDStorageInfoWrapperSerializer extends 
ProtobufSerDe[RDDStorageInfoWrappe
       name = getStringField(info.hasName, info.getName),
       numPartitions = info.getNumPartitions,
       numCachedPartitions = info.getNumCachedPartitions,
-      storageLevel = info.getStorageLevel,
+      storageLevel = getStringField(info.hasStorageLevel, 
info.getStorageLevel),
       memoryUsed = info.getMemoryUsed,
       diskUsed = info.getDiskUsed,
       dataDistribution =
diff --git 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index de2021fb60e..0849d63b03e 100644
--- 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -19,6 +19,9 @@ package org.apache.spark.status.protobuf
 
 import java.util.Date
 
+import scala.collection.mutable
+import scala.io.Source
+
 import org.apache.spark.{JobExecutionStatus, SparkFunSuite}
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.metrics.ExecutorMetricType
@@ -27,10 +30,39 @@ import org.apache.spark.resource.{ExecutorResourceRequest, 
ResourceInformation,
 import org.apache.spark.status._
 import org.apache.spark.status.api.v1._
 import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
+import org.apache.spark.util.Utils.tryWithResource
 
 class KVStoreProtobufSerializerSuite extends SparkFunSuite {
   private val serializer = new KVStoreProtobufSerializer()
 
+  test("All the string fields must be optional to avoid NPE") {
+    val protoFile = getWorkspaceFilePath(
+      "core", "src", "main", "protobuf", "org", "apache", "spark", "status", 
"protobuf",
+      "store_types.proto")
+
+    val containsStringRegex = "\\s*string .*"
+    val invalidDefinition = new mutable.ArrayBuffer[(String, Int)]()
+    var lineNumber = 1
+    tryWithResource(Source.fromFile(protoFile.toFile.getCanonicalPath)) { file 
=>
+      file.getLines().foreach { line =>
+        if (line.matches(containsStringRegex)) {
+          invalidDefinition.append((line, lineNumber))
+        }
+        lineNumber += 1
+      }
+    }
+    val errorMessage = new StringBuilder()
+    errorMessage.append(
+      """
+        |All the string fields should be defined as `optional string` for 
handling null string.
+        |Please update the following fields:
+        |""".stripMargin)
+    invalidDefinition.foreach { case (line, num) =>
+      errorMessage.append(s"line #$num: $line\n")
+    }
+    assert(invalidDefinition.isEmpty, errorMessage)
+  }
+
   test("Job data") {
     Seq(
       ("test", Some("test description"), Some("group")),
@@ -463,7 +495,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
           name = null,
           numPartitions = 8,
           numCachedPartitions = 5,
-          storageLevel = "IN_MEMORY",
+          storageLevel = null,
           memoryUsed = 100,
           diskUsed = 2560,
           dataDistribution = None,
@@ -714,7 +746,8 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
     val peakMemoryMetric =
       Some(new ExecutorMetrics(Array(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 
1024L)))
     val resources =
-      Map("resource1" -> new ResourceInformation("re1", Array("add1", "add2")))
+      Map("resource1" -> new ResourceInformation("re1", Array("add1", "add2")),
+        "resource1" -> new ResourceInformation(null, null))
     Seq(("id_1", "localhost:7777"), (null, "")).foreach { case (id, hostPort) 
=>
       val input = new ExecutorSummaryWrapper(
         info = new ExecutorSummary(
@@ -816,9 +849,13 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite 
{
       result.info.resources.keys.foreach { k =>
         assert(input.info.resources.contains(k))
         assert(result.info.resources(k).name == input.info.resources(k).name)
-        
result.info.resources(k).addresses.zip(input.info.resources(k).addresses).foreach
 {
-          case (a1, a2) =>
-            assert(a1 == a2)
+        if (input.info.resources(k).addresses != null) {
+          
result.info.resources(k).addresses.zip(input.info.resources(k).addresses).foreach
 {
+            case (a1, a2) =>
+              assert(a1 == a2)
+          }
+        } else {
+          assert(result.info.resources(k).addresses.isEmpty)
         }
       }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to