This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4ab394f6aff [SPARK-42143][UI] Handle null string values in RDDStorageInfo/RDDDataDistribution/RDDPartitionInfo 4ab394f6aff is described below commit 4ab394f6affbbdb463e08be2283df63a11d16b03 Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Sat Jan 21 13:29:51 2023 -0800 [SPARK-42143][UI] Handle null string values in RDDStorageInfo/RDDDataDistribution/RDDPartitionInfo ### What changes were proposed in this pull request? Similar to https://github.com/apache/spark/pull/39666, this PR handles null string values in RDDStorageInfo/RDDDataDistribution/RDDPartitionInfo ### Why are the changes needed? Properly handles null string values in the protobuf serializer. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New UTs Closes #39686 from gengliangwang/fixMoreNull2. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../apache/spark/status/protobuf/store_types.proto | 8 +++--- .../protobuf/RDDStorageInfoWrapperSerializer.scala | 19 +++++++------ .../protobuf/KVStoreProtobufSerializerSuite.scala | 32 ++++++++++++++++++++-- 3 files changed, 44 insertions(+), 15 deletions(-) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 32b775ac90f..598800da9f5 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -241,7 +241,7 @@ message StreamBlockData { } message RDDDataDistribution { - string address = 1; + optional string address = 1; int64 memory_used = 2; int64 memory_remaining = 3; int64 disk_used = 4; @@ -252,8 +252,8 @@ message RDDDataDistribution { } message RDDPartitionInfo { - string block_name = 1; - string storage_level = 2; + optional string block_name = 1; + optional string storage_level = 2; int64 memory_used = 3; int64 disk_used = 4; repeated string executors = 5; @@ -261,7 +261,7 @@ message RDDPartitionInfo { message RDDStorageInfo { int32 id = 1; - string name = 2; + optional string name = 2; int32 num_partitions = 3; int32 num_cached_partitions = 4; string storage_level = 5; diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala index e59d363243e..06dbb485000 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala @@ -21,7 +21,8 @@ import scala.collection.JavaConverters._ import org.apache.spark.status.RDDStorageInfoWrapper import org.apache.spark.status.api.v1.{RDDDataDistribution, RDDPartitionInfo, RDDStorageInfo} -import org.apache.spark.status.protobuf.Utils.getOptional +import org.apache.spark.status.protobuf.Utils.{getOptional, getStringField, setStringField} +import org.apache.spark.util.Utils.weakIntern class RDDStorageInfoWrapperSerializer extends ProtobufSerDe[RDDStorageInfoWrapper] { @@ -41,7 +42,7 @@ class RDDStorageInfoWrapperSerializer extends ProtobufSerDe[RDDStorageInfoWrappe private def serializeRDDStorageInfo(info: RDDStorageInfo): StoreTypes.RDDStorageInfo = { val builder = StoreTypes.RDDStorageInfo.newBuilder() builder.setId(info.id) - builder.setName(info.name) + setStringField(info.name, builder.setName) builder.setNumPartitions(info.numPartitions) builder.setNumCachedPartitions(info.numCachedPartitions) builder.setStorageLevel(info.storageLevel) @@ -51,7 +52,7 @@ class RDDStorageInfoWrapperSerializer extends ProtobufSerDe[RDDStorageInfoWrappe if (info.dataDistribution.isDefined) { info.dataDistribution.get.foreach { dd => val dataDistributionBuilder = StoreTypes.RDDDataDistribution.newBuilder() - dataDistributionBuilder.setAddress(dd.address) + setStringField(dd.address, dataDistributionBuilder.setAddress) dataDistributionBuilder.setMemoryUsed(dd.memoryUsed) dataDistributionBuilder.setMemoryRemaining(dd.memoryRemaining) dataDistributionBuilder.setDiskUsed(dd.diskUsed) @@ -66,8 +67,8 @@ class RDDStorageInfoWrapperSerializer extends ProtobufSerDe[RDDStorageInfoWrappe if (info.partitions.isDefined) { info.partitions.get.foreach { p => val partitionsBuilder = StoreTypes.RDDPartitionInfo.newBuilder() - partitionsBuilder.setBlockName(p.blockName) - partitionsBuilder.setStorageLevel(p.storageLevel) + setStringField(p.blockName, partitionsBuilder.setBlockName) + setStringField(p.storageLevel, partitionsBuilder.setStorageLevel) partitionsBuilder.setMemoryUsed(p.memoryUsed) partitionsBuilder.setDiskUsed(p.diskUsed) p.executors.foreach(partitionsBuilder.addExecutors) @@ -81,7 +82,7 @@ class RDDStorageInfoWrapperSerializer extends ProtobufSerDe[RDDStorageInfoWrappe private def deserializeRDDStorageInfo(info: StoreTypes.RDDStorageInfo): RDDStorageInfo = { new RDDStorageInfo( id = info.getId, - name = info.getName, + name = getStringField(info.hasName, info.getName), numPartitions = info.getNumPartitions, numCachedPartitions = info.getNumCachedPartitions, storageLevel = info.getStorageLevel, @@ -102,7 +103,7 @@ class RDDStorageInfoWrapperSerializer extends ProtobufSerDe[RDDStorageInfoWrappe RDDDataDistribution = { new RDDDataDistribution( - address = info.getAddress, + address = getStringField(info.hasAddress, info.getAddress), memoryUsed = info.getMemoryUsed, memoryRemaining = info.getMemoryRemaining, diskUsed = info.getDiskUsed, @@ -117,8 +118,8 @@ class RDDStorageInfoWrapperSerializer extends ProtobufSerDe[RDDStorageInfoWrappe private def deserializeRDDPartitionInfo(info: StoreTypes.RDDPartitionInfo): RDDPartitionInfo = { new RDDPartitionInfo( - blockName = info.getBlockName, - storageLevel = info.getStorageLevel, + blockName = getStringField(info.hasBlockName, info.getBlockName), + storageLevel = getStringField(info.hasStorageLevel, () => weakIntern(info.getStorageLevel)), memoryUsed = info.getMemoryUsed, diskUsed = info.getDiskUsed, executors = info.getExecutorsList.asScala diff --git a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala index e51b2f5d012..e9dc9e00f67 100644 --- a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala @@ -386,7 +386,16 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { onHeapMemoryUsed = Some(101), offHeapMemoryUsed = Some(102), onHeapMemoryRemaining = Some(103), - offHeapMemoryRemaining = Some(104)) + offHeapMemoryRemaining = Some(104)), + new RDDDataDistribution( + address = null, + memoryUsed = 60, + memoryRemaining = 80, + diskUsed = 1000, + onHeapMemoryUsed = Some(1010), + offHeapMemoryUsed = Some(1020), + onHeapMemoryRemaining = Some(1030), + offHeapMemoryRemaining = Some(1040)) ) val rddPartitionInfo = Seq( new RDDPartitionInfo( @@ -394,7 +403,13 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { storageLevel = "IN_MEM", memoryUsed = 105, diskUsed = 106, - executors = Seq("exec_0", "exec_1")) + executors = Seq("exec_0", "exec_1")), + new RDDPartitionInfo( + blockName = null, + storageLevel = null, + memoryUsed = 105, + diskUsed = 106, + executors = Seq("exec_2", "exec_3")) ) val inputs = Seq( new RDDStorageInfoWrapper( @@ -422,6 +437,19 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { dataDistribution = None, partitions = Some(Seq.empty) ) + ), + new RDDStorageInfoWrapper( + info = new RDDStorageInfo( + id = 3, + name = null, + numPartitions = 8, + numCachedPartitions = 5, + storageLevel = "IN_MEMORY", + memoryUsed = 100, + diskUsed = 2560, + dataDistribution = None, + partitions = Some(Seq.empty) + ) ) ) inputs.foreach { input => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org