This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 330b7963f97 [SPARK-42140][CORE] Handle null string values in ApplicationEnvironmentInfoWrapper/ApplicationInfoWrapper 330b7963f97 is described below commit 330b7963f97f06c5bb3da358c61a1ac1089666e7 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Sat Jan 21 15:33:48 2023 -0800 [SPARK-42140][CORE] Handle null string values in ApplicationEnvironmentInfoWrapper/ApplicationInfoWrapper ### What changes were proposed in this pull request? Similar to #39666, this PR handles null string values in ApplicationEnvironmentInfoWrapper/ApplicationInfoWrapper ### Why are the changes needed? Properly handles null string values in the protobuf serializer. ### Does this PR introduce any user-facing change? No ### How was this patch tested? New UTs Closes #39684 from LuciferYang/SPARK-42140. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/status/protobuf/store_types.proto | 22 ++++++------- ...plicationEnvironmentInfoWrapperSerializer.scala | 34 ++++++++++++-------- .../ApplicationInfoWrapperSerializer.scala | 18 +++++------ .../protobuf/KVStoreProtobufSerializerSuite.scala | 37 ++++++++++++++++------ 4 files changed, 67 insertions(+), 44 deletions(-) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 598800da9f5..49076790321 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -161,14 +161,14 @@ message ExecutorStageSummaryWrapper { } message ExecutorResourceRequest { - string resource_name = 1; + optional string resource_name = 1; int64 amount = 2; - string discoveryScript = 3; - string vendor = 4; + optional string discoveryScript = 3; + optional string vendor = 4; } message TaskResourceRequest { - string resource_name = 1; + optional string resource_name = 1; double amount = 2; } @@ -179,9 +179,9 @@ message ResourceProfileInfo { } message RuntimeInfo { - string java_version = 1; - string java_home = 2; - string scala_version = 3; + optional string java_version = 1; + optional string java_home = 2; + optional string scala_version = 3; } message PairStrings { @@ -209,14 +209,14 @@ message ApplicationAttemptInfo { int64 end_time = 3; int64 last_updated = 4; int64 duration = 5; - string spark_user = 6; + optional string spark_user = 6; bool completed = 7; - string app_spark_version = 8; + optional string app_spark_version = 8; } message ApplicationInfo { - string id = 1; - string name = 2; + optional string id = 1; + optional string name = 2; optional int32 cores_granted = 3; optional int32 max_cores = 4; optional int32 cores_per_executor = 5; diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala index b7cf01382e2..fbbc55387b8 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala @@ -22,6 +22,7 @@ import collection.JavaConverters._ import org.apache.spark.resource.{ExecutorResourceRequest, TaskResourceRequest} import org.apache.spark.status.ApplicationEnvironmentInfoWrapper import org.apache.spark.status.api.v1.{ApplicationEnvironmentInfo, ResourceProfileInfo, RuntimeInfo} +import org.apache.spark.status.protobuf.Utils.{getStringField, setStringField} class ApplicationEnvironmentInfoWrapperSerializer extends ProtobufSerDe[ApplicationEnvironmentInfoWrapper] { @@ -43,9 +44,10 @@ class ApplicationEnvironmentInfoWrapperSerializer StoreTypes.ApplicationEnvironmentInfo = { val runtimeBuilder = StoreTypes.RuntimeInfo.newBuilder() - runtimeBuilder.setJavaVersion(info.runtime.javaVersion) - runtimeBuilder.setJavaHome(info.runtime.javaHome) - runtimeBuilder.setScalaVersion(info.runtime.scalaVersion) + val runtime = info.runtime + setStringField(runtime.javaHome, runtimeBuilder.setJavaHome) + setStringField(runtime.javaVersion, runtimeBuilder.setJavaVersion) + setStringField(runtime.scalaVersion, runtimeBuilder.setScalaVersion) val builder = StoreTypes.ApplicationEnvironmentInfo.newBuilder() builder.setRuntime(runtimeBuilder.build()) @@ -72,10 +74,11 @@ class ApplicationEnvironmentInfoWrapperSerializer private def deserializeApplicationEnvironmentInfo(info: StoreTypes.ApplicationEnvironmentInfo): ApplicationEnvironmentInfo = { + val rt = info.getRuntime val runtime = new RuntimeInfo ( - javaVersion = info.getRuntime.getJavaVersion, - javaHome = info.getRuntime.getJavaHome, - scalaVersion = info.getRuntime.getScalaVersion + javaVersion = getStringField(rt.hasJavaVersion, () => rt.getJavaVersion), + javaHome = getStringField(rt.hasJavaHome, () => rt.getJavaHome), + scalaVersion = getStringField(rt.hasScalaVersion, () => rt.getScalaVersion) ) val pairSSToTuple = (pair: StoreTypes.PairStrings) => { (pair.getValue1, pair.getValue2) @@ -105,15 +108,15 @@ class ApplicationEnvironmentInfoWrapperSerializer builder.setId(info.id) info.executorResources.foreach{case (k, resource) => val requestBuilder = StoreTypes.ExecutorResourceRequest.newBuilder() - requestBuilder.setResourceName(resource.resourceName) + setStringField(resource.resourceName, requestBuilder.setResourceName) requestBuilder.setAmount(resource.amount) - requestBuilder.setDiscoveryScript(resource.discoveryScript) - requestBuilder.setVendor(resource.vendor) + setStringField(resource.discoveryScript, requestBuilder.setDiscoveryScript) + setStringField(resource.vendor, requestBuilder.setVendor) builder.putExecutorResources(k, requestBuilder.build()) } info.taskResources.foreach { case (k, resource) => val requestBuilder = StoreTypes.TaskResourceRequest.newBuilder() - requestBuilder.setResourceName(resource.resourceName) + setStringField(resource.resourceName, requestBuilder.setResourceName) requestBuilder.setAmount(resource.amount) builder.putTaskResources(k, requestBuilder.build()) } @@ -134,15 +137,18 @@ class ApplicationEnvironmentInfoWrapperSerializer private def deserializeExecutorResourceRequest(info: StoreTypes.ExecutorResourceRequest): ExecutorResourceRequest = { new ExecutorResourceRequest( - resourceName = info.getResourceName, + resourceName = getStringField(info.hasResourceName, () => info.getResourceName), amount = info.getAmount, - discoveryScript = info.getDiscoveryScript, - vendor = info.getVendor + discoveryScript = getStringField(info.hasDiscoveryScript, () => info.getDiscoveryScript), + vendor = getStringField(info.hasVendor, () => info.getVendor) ) } private def deserializeTaskResourceRequest(info: StoreTypes.TaskResourceRequest): TaskResourceRequest = { - new TaskResourceRequest(resourceName = info.getResourceName, amount = info.getAmount) + new TaskResourceRequest( + resourceName = getStringField(info.hasResourceName, () => info.getResourceName), + amount = info.getAmount + ) } } diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala index c56b5302cc1..4b2bcfa1d1f 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala @@ -23,7 +23,7 @@ import collection.JavaConverters._ import org.apache.spark.status.ApplicationInfoWrapper import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} -import org.apache.spark.status.protobuf.Utils.getOptional +import org.apache.spark.status.protobuf.Utils._ class ApplicationInfoWrapperSerializer extends ProtobufSerDe[ApplicationInfoWrapper] { @@ -44,8 +44,8 @@ class ApplicationInfoWrapperSerializer extends ProtobufSerDe[ApplicationInfoWrap private def serializeApplicationInfo(info: ApplicationInfo): StoreTypes.ApplicationInfo = { val builder = StoreTypes.ApplicationInfo.newBuilder() - builder.setId(info.id) - .setName(info.name) + setStringField(info.id, builder.setId) + setStringField(info.name, builder.setName) info.coresGranted.foreach { c => builder.setCoresGranted(c) } @@ -71,8 +71,8 @@ class ApplicationInfoWrapperSerializer extends ProtobufSerDe[ApplicationInfoWrap val memoryPerExecutorMB = getOptional(info.hasMemoryPerExecutorMb, info.getMemoryPerExecutorMb) val attempts = info.getAttemptsList.asScala.map(deserializeApplicationAttemptInfo) ApplicationInfo( - id = info.getId, - name = info.getName, + id = getStringField(info.hasId, () => info.getId), + name = getStringField(info.hasName, () => info.getName), coresGranted = coresGranted, maxCores = maxCores, coresPerExecutor = coresPerExecutor, @@ -88,9 +88,9 @@ class ApplicationInfoWrapperSerializer extends ProtobufSerDe[ApplicationInfoWrap .setEndTime(info.endTime.getTime) .setLastUpdated(info.lastUpdated.getTime) .setDuration(info.duration) - .setSparkUser(info.sparkUser) .setCompleted(info.completed) - .setAppSparkVersion(info.appSparkVersion) + setStringField(info.sparkUser, builder.setSparkUser) + setStringField(info.appSparkVersion, builder.setAppSparkVersion) info.attemptId.foreach{ id => builder.setAttemptId(id) } @@ -107,9 +107,9 @@ class ApplicationInfoWrapperSerializer extends ProtobufSerDe[ApplicationInfoWrap endTime = new Date(info.getEndTime), lastUpdated = new Date(info.getLastUpdated), duration = info.getDuration, - sparkUser = info.getSparkUser, + sparkUser = getStringField(info.hasSparkUser, () => info.getSparkUser), completed = info.getCompleted, - appSparkVersion = info.getAppSparkVersion + appSparkVersion = getStringField(info.hasAppSparkVersion, () => info.getAppSparkVersion) ) } } diff --git a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala index e9dc9e00f67..14dd2cd601d 100644 --- a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala @@ -244,12 +244,21 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { } test("Application Environment Info") { + testApplicationEnvironmentInfoWrapperSerDe("1.8", "/tmp/java", "2.13") + } + + test("Application Environment Info with nulls") { + testApplicationEnvironmentInfoWrapperSerDe(null, null, null) + } + + private def testApplicationEnvironmentInfoWrapperSerDe( + javaVersion: String, javaHome: String, scalaVersion: String): Unit = { val input = new ApplicationEnvironmentInfoWrapper( new ApplicationEnvironmentInfo( runtime = new RuntimeInfo( - javaVersion = "1.8", - javaHome = "/tmp/java", - scalaVersion = "2.13"), + javaVersion = javaVersion, + javaHome = javaHome, + scalaVersion = scalaVersion), sparkProperties = Seq(("spark.conf.1", "1"), ("spark.conf.2", "2")), hadoopProperties = Seq(("hadoop.conf.conf1", "1"), ("hadoop.conf2", "val2")), systemProperties = Seq(("sys.prop.1", "value1"), ("sys.prop.2", "value2")), @@ -264,10 +273,10 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { discoveryScript = "script0", vendor = "apache"), "1" -> new ExecutorResourceRequest( - resourceName = "exec2", + resourceName = null, amount = 1, - discoveryScript = "script1", - vendor = "apache") + discoveryScript = null, + vendor = null) ), taskResources = Map( "0" -> new TaskResourceRequest(resourceName = "exec1", amount = 1), @@ -323,6 +332,14 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { } test("Application Info") { + testApplicationInfoWrapperSerDe("2", "app_2") + } + + test("Application Info with nulls") { + testApplicationInfoWrapperSerDe(null, null) + } + + private def testApplicationInfoWrapperSerDe(id: String, name: String): Unit = { val attempts: Seq[ApplicationAttemptInfo] = Seq( ApplicationAttemptInfo( attemptId = Some("001"), @@ -340,14 +357,14 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { endTime = new Date(17L), lastUpdated = new Date(18L), duration = 100, - sparkUser = "user", + sparkUser = null, completed = true, - appSparkVersion = "3.4.0" + appSparkVersion = null )) val input = new ApplicationInfoWrapper( ApplicationInfo( - id = "2", - name = "app_2", + id = id, + name = name, coresGranted = Some(1), maxCores = Some(2), coresPerExecutor = Some(3), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org