This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 330b7963f97 [SPARK-42140][CORE] Handle null string values in 
ApplicationEnvironmentInfoWrapper/ApplicationInfoWrapper
330b7963f97 is described below

commit 330b7963f97f06c5bb3da358c61a1ac1089666e7
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Sat Jan 21 15:33:48 2023 -0800

    [SPARK-42140][CORE] Handle null string values in 
ApplicationEnvironmentInfoWrapper/ApplicationInfoWrapper
    
    ### What changes were proposed in this pull request?
    Similar to #39666, this PR handles null string values in 
ApplicationEnvironmentInfoWrapper/ApplicationInfoWrapper
    
    ### Why are the changes needed?
    Properly handles null string values in the protobuf serializer.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    New UTs
    
    Closes #39684 from LuciferYang/SPARK-42140.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../apache/spark/status/protobuf/store_types.proto | 22 ++++++-------
 ...plicationEnvironmentInfoWrapperSerializer.scala | 34 ++++++++++++--------
 .../ApplicationInfoWrapperSerializer.scala         | 18 +++++------
 .../protobuf/KVStoreProtobufSerializerSuite.scala  | 37 ++++++++++++++++------
 4 files changed, 67 insertions(+), 44 deletions(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 598800da9f5..49076790321 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -161,14 +161,14 @@ message ExecutorStageSummaryWrapper {
 }
 
 message ExecutorResourceRequest {
-  string resource_name = 1;
+  optional string resource_name = 1;
   int64 amount = 2;
-  string discoveryScript = 3;
-  string vendor = 4;
+  optional string discoveryScript = 3;
+  optional string vendor = 4;
 }
 
 message TaskResourceRequest {
-  string resource_name = 1;
+  optional string resource_name = 1;
   double amount = 2;
 }
 
@@ -179,9 +179,9 @@ message ResourceProfileInfo {
 }
 
 message RuntimeInfo {
-  string java_version = 1;
-  string java_home = 2;
-  string scala_version = 3;
+  optional string java_version = 1;
+  optional string java_home = 2;
+  optional string scala_version = 3;
 }
 
 message PairStrings {
@@ -209,14 +209,14 @@ message ApplicationAttemptInfo {
   int64 end_time = 3;
   int64 last_updated = 4;
   int64 duration = 5;
-  string spark_user = 6;
+  optional string spark_user = 6;
   bool completed = 7;
-  string app_spark_version = 8;
+  optional string app_spark_version = 8;
 }
 
 message ApplicationInfo {
-  string id = 1;
-  string name = 2;
+  optional string id = 1;
+  optional string name = 2;
   optional int32 cores_granted = 3;
   optional int32 max_cores = 4;
   optional int32 cores_per_executor = 5;
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
index b7cf01382e2..fbbc55387b8 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
@@ -22,6 +22,7 @@ import collection.JavaConverters._
 import org.apache.spark.resource.{ExecutorResourceRequest, TaskResourceRequest}
 import org.apache.spark.status.ApplicationEnvironmentInfoWrapper
 import org.apache.spark.status.api.v1.{ApplicationEnvironmentInfo, 
ResourceProfileInfo, RuntimeInfo}
+import org.apache.spark.status.protobuf.Utils.{getStringField, setStringField}
 
 class ApplicationEnvironmentInfoWrapperSerializer
   extends ProtobufSerDe[ApplicationEnvironmentInfoWrapper] {
@@ -43,9 +44,10 @@ class ApplicationEnvironmentInfoWrapperSerializer
     StoreTypes.ApplicationEnvironmentInfo = {
 
     val runtimeBuilder = StoreTypes.RuntimeInfo.newBuilder()
-    runtimeBuilder.setJavaVersion(info.runtime.javaVersion)
-    runtimeBuilder.setJavaHome(info.runtime.javaHome)
-    runtimeBuilder.setScalaVersion(info.runtime.scalaVersion)
+    val runtime = info.runtime
+    setStringField(runtime.javaHome, runtimeBuilder.setJavaHome)
+    setStringField(runtime.javaVersion, runtimeBuilder.setJavaVersion)
+    setStringField(runtime.scalaVersion, runtimeBuilder.setScalaVersion)
 
     val builder = StoreTypes.ApplicationEnvironmentInfo.newBuilder()
     builder.setRuntime(runtimeBuilder.build())
@@ -72,10 +74,11 @@ class ApplicationEnvironmentInfoWrapperSerializer
 
   private def deserializeApplicationEnvironmentInfo(info: 
StoreTypes.ApplicationEnvironmentInfo):
     ApplicationEnvironmentInfo = {
+    val rt = info.getRuntime
     val runtime = new RuntimeInfo (
-      javaVersion = info.getRuntime.getJavaVersion,
-      javaHome = info.getRuntime.getJavaHome,
-      scalaVersion = info.getRuntime.getScalaVersion
+      javaVersion = getStringField(rt.hasJavaVersion, () => rt.getJavaVersion),
+      javaHome = getStringField(rt.hasJavaHome, () => rt.getJavaHome),
+      scalaVersion = getStringField(rt.hasScalaVersion, () => 
rt.getScalaVersion)
     )
     val pairSSToTuple = (pair: StoreTypes.PairStrings) => {
       (pair.getValue1, pair.getValue2)
@@ -105,15 +108,15 @@ class ApplicationEnvironmentInfoWrapperSerializer
     builder.setId(info.id)
     info.executorResources.foreach{case (k, resource) =>
       val requestBuilder = StoreTypes.ExecutorResourceRequest.newBuilder()
-      requestBuilder.setResourceName(resource.resourceName)
+      setStringField(resource.resourceName, requestBuilder.setResourceName)
       requestBuilder.setAmount(resource.amount)
-      requestBuilder.setDiscoveryScript(resource.discoveryScript)
-      requestBuilder.setVendor(resource.vendor)
+      setStringField(resource.discoveryScript, 
requestBuilder.setDiscoveryScript)
+      setStringField(resource.vendor, requestBuilder.setVendor)
       builder.putExecutorResources(k, requestBuilder.build())
     }
     info.taskResources.foreach { case (k, resource) =>
       val requestBuilder = StoreTypes.TaskResourceRequest.newBuilder()
-      requestBuilder.setResourceName(resource.resourceName)
+      setStringField(resource.resourceName, requestBuilder.setResourceName)
       requestBuilder.setAmount(resource.amount)
       builder.putTaskResources(k, requestBuilder.build())
     }
@@ -134,15 +137,18 @@ class ApplicationEnvironmentInfoWrapperSerializer
   private def deserializeExecutorResourceRequest(info: 
StoreTypes.ExecutorResourceRequest):
     ExecutorResourceRequest = {
     new ExecutorResourceRequest(
-      resourceName = info.getResourceName,
+      resourceName = getStringField(info.hasResourceName, () => 
info.getResourceName),
       amount = info.getAmount,
-      discoveryScript = info.getDiscoveryScript,
-      vendor = info.getVendor
+      discoveryScript = getStringField(info.hasDiscoveryScript, () => 
info.getDiscoveryScript),
+      vendor = getStringField(info.hasVendor, () => info.getVendor)
     )
   }
 
   private def deserializeTaskResourceRequest(info: 
StoreTypes.TaskResourceRequest):
     TaskResourceRequest = {
-    new TaskResourceRequest(resourceName = info.getResourceName, amount = 
info.getAmount)
+    new TaskResourceRequest(
+      resourceName = getStringField(info.hasResourceName, () => 
info.getResourceName),
+      amount = info.getAmount
+    )
   }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
index c56b5302cc1..4b2bcfa1d1f 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
@@ -23,7 +23,7 @@ import collection.JavaConverters._
 
 import org.apache.spark.status.ApplicationInfoWrapper
 import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
-import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.status.protobuf.Utils._
 
 
 class ApplicationInfoWrapperSerializer extends 
ProtobufSerDe[ApplicationInfoWrapper] {
@@ -44,8 +44,8 @@ class ApplicationInfoWrapperSerializer extends 
ProtobufSerDe[ApplicationInfoWrap
 
   private def serializeApplicationInfo(info: ApplicationInfo): 
StoreTypes.ApplicationInfo = {
     val builder = StoreTypes.ApplicationInfo.newBuilder()
-    builder.setId(info.id)
-      .setName(info.name)
+    setStringField(info.id, builder.setId)
+    setStringField(info.name, builder.setName)
     info.coresGranted.foreach { c =>
       builder.setCoresGranted(c)
     }
@@ -71,8 +71,8 @@ class ApplicationInfoWrapperSerializer extends 
ProtobufSerDe[ApplicationInfoWrap
     val memoryPerExecutorMB = getOptional(info.hasMemoryPerExecutorMb, 
info.getMemoryPerExecutorMb)
     val attempts = 
info.getAttemptsList.asScala.map(deserializeApplicationAttemptInfo)
     ApplicationInfo(
-      id = info.getId,
-      name = info.getName,
+      id = getStringField(info.hasId, () => info.getId),
+      name = getStringField(info.hasName, () => info.getName),
       coresGranted = coresGranted,
       maxCores = maxCores,
       coresPerExecutor = coresPerExecutor,
@@ -88,9 +88,9 @@ class ApplicationInfoWrapperSerializer extends 
ProtobufSerDe[ApplicationInfoWrap
       .setEndTime(info.endTime.getTime)
       .setLastUpdated(info.lastUpdated.getTime)
       .setDuration(info.duration)
-      .setSparkUser(info.sparkUser)
       .setCompleted(info.completed)
-      .setAppSparkVersion(info.appSparkVersion)
+    setStringField(info.sparkUser, builder.setSparkUser)
+    setStringField(info.appSparkVersion, builder.setAppSparkVersion)
     info.attemptId.foreach{ id =>
       builder.setAttemptId(id)
     }
@@ -107,9 +107,9 @@ class ApplicationInfoWrapperSerializer extends 
ProtobufSerDe[ApplicationInfoWrap
       endTime = new Date(info.getEndTime),
       lastUpdated = new Date(info.getLastUpdated),
       duration = info.getDuration,
-      sparkUser = info.getSparkUser,
+      sparkUser = getStringField(info.hasSparkUser, () => info.getSparkUser),
       completed = info.getCompleted,
-      appSparkVersion = info.getAppSparkVersion
+      appSparkVersion = getStringField(info.hasAppSparkVersion, () => 
info.getAppSparkVersion)
     )
   }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index e9dc9e00f67..14dd2cd601d 100644
--- 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -244,12 +244,21 @@ class KVStoreProtobufSerializerSuite extends 
SparkFunSuite {
   }
 
   test("Application Environment Info") {
+    testApplicationEnvironmentInfoWrapperSerDe("1.8", "/tmp/java", "2.13")
+  }
+
+  test("Application Environment Info with nulls") {
+    testApplicationEnvironmentInfoWrapperSerDe(null, null, null)
+  }
+
+  private def testApplicationEnvironmentInfoWrapperSerDe(
+      javaVersion: String, javaHome: String, scalaVersion: String): Unit = {
     val input = new ApplicationEnvironmentInfoWrapper(
       new ApplicationEnvironmentInfo(
         runtime = new RuntimeInfo(
-          javaVersion = "1.8",
-          javaHome = "/tmp/java",
-          scalaVersion = "2.13"),
+          javaVersion = javaVersion,
+          javaHome = javaHome,
+          scalaVersion = scalaVersion),
         sparkProperties = Seq(("spark.conf.1", "1"), ("spark.conf.2", "2")),
         hadoopProperties = Seq(("hadoop.conf.conf1", "1"), ("hadoop.conf2", 
"val2")),
         systemProperties = Seq(("sys.prop.1", "value1"), ("sys.prop.2", 
"value2")),
@@ -264,10 +273,10 @@ class KVStoreProtobufSerializerSuite extends 
SparkFunSuite {
               discoveryScript = "script0",
               vendor = "apache"),
             "1" -> new ExecutorResourceRequest(
-              resourceName = "exec2",
+              resourceName = null,
               amount = 1,
-              discoveryScript = "script1",
-              vendor = "apache")
+              discoveryScript = null,
+              vendor = null)
           ),
           taskResources = Map(
             "0" -> new TaskResourceRequest(resourceName = "exec1", amount = 1),
@@ -323,6 +332,14 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite 
{
   }
 
   test("Application Info") {
+    testApplicationInfoWrapperSerDe("2", "app_2")
+  }
+
+  test("Application Info with nulls") {
+    testApplicationInfoWrapperSerDe(null, null)
+  }
+
+  private def testApplicationInfoWrapperSerDe(id: String, name: String): Unit 
= {
     val attempts: Seq[ApplicationAttemptInfo] = Seq(
       ApplicationAttemptInfo(
         attemptId = Some("001"),
@@ -340,14 +357,14 @@ class KVStoreProtobufSerializerSuite extends 
SparkFunSuite {
         endTime = new Date(17L),
         lastUpdated = new Date(18L),
         duration = 100,
-        sparkUser = "user",
+        sparkUser = null,
         completed = true,
-        appSparkVersion = "3.4.0"
+        appSparkVersion = null
       ))
     val input = new ApplicationInfoWrapper(
       ApplicationInfo(
-        id = "2",
-        name = "app_2",
+        id = id,
+        name = name,
         coresGranted = Some(1),
         maxCores = Some(2),
         coresPerExecutor = Some(3),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to