This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4ab394f6aff [SPARK-42143][UI] Handle null string values in
RDDStorageInfo/RDDDataDistribution/RDDPartitionInfo
4ab394f6aff is described below
commit 4ab394f6affbbdb463e08be2283df63a11d16b03
Author: Gengliang Wang <[email protected]>
AuthorDate: Sat Jan 21 13:29:51 2023 -0800
[SPARK-42143][UI] Handle null string values in
RDDStorageInfo/RDDDataDistribution/RDDPartitionInfo
### What changes were proposed in this pull request?
Similar to https://github.com/apache/spark/pull/39666, this PR handles null
string values in RDDStorageInfo/RDDDataDistribution/RDDPartitionInfo
### Why are the changes needed?
Properly handles null string values in the protobuf serializer.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
New UTs
Closes #39686 from gengliangwang/fixMoreNull2.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 8 +++---
.../protobuf/RDDStorageInfoWrapperSerializer.scala | 19 +++++++------
.../protobuf/KVStoreProtobufSerializerSuite.scala | 32 ++++++++++++++++++++--
3 files changed, 44 insertions(+), 15 deletions(-)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 32b775ac90f..598800da9f5 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -241,7 +241,7 @@ message StreamBlockData {
}
message RDDDataDistribution {
- string address = 1;
+ optional string address = 1;
int64 memory_used = 2;
int64 memory_remaining = 3;
int64 disk_used = 4;
@@ -252,8 +252,8 @@ message RDDDataDistribution {
}
message RDDPartitionInfo {
- string block_name = 1;
- string storage_level = 2;
+ optional string block_name = 1;
+ optional string storage_level = 2;
int64 memory_used = 3;
int64 disk_used = 4;
repeated string executors = 5;
@@ -261,7 +261,7 @@ message RDDPartitionInfo {
message RDDStorageInfo {
int32 id = 1;
- string name = 2;
+ optional string name = 2;
int32 num_partitions = 3;
int32 num_cached_partitions = 4;
string storage_level = 5;
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
index e59d363243e..06dbb485000 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
@@ -21,7 +21,8 @@ import scala.collection.JavaConverters._
import org.apache.spark.status.RDDStorageInfoWrapper
import org.apache.spark.status.api.v1.{RDDDataDistribution, RDDPartitionInfo,
RDDStorageInfo}
-import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.status.protobuf.Utils.{getOptional, getStringField,
setStringField}
+import org.apache.spark.util.Utils.weakIntern
class RDDStorageInfoWrapperSerializer extends
ProtobufSerDe[RDDStorageInfoWrapper] {
@@ -41,7 +42,7 @@ class RDDStorageInfoWrapperSerializer extends
ProtobufSerDe[RDDStorageInfoWrappe
private def serializeRDDStorageInfo(info: RDDStorageInfo):
StoreTypes.RDDStorageInfo = {
val builder = StoreTypes.RDDStorageInfo.newBuilder()
builder.setId(info.id)
- builder.setName(info.name)
+ setStringField(info.name, builder.setName)
builder.setNumPartitions(info.numPartitions)
builder.setNumCachedPartitions(info.numCachedPartitions)
builder.setStorageLevel(info.storageLevel)
@@ -51,7 +52,7 @@ class RDDStorageInfoWrapperSerializer extends
ProtobufSerDe[RDDStorageInfoWrappe
if (info.dataDistribution.isDefined) {
info.dataDistribution.get.foreach { dd =>
val dataDistributionBuilder =
StoreTypes.RDDDataDistribution.newBuilder()
- dataDistributionBuilder.setAddress(dd.address)
+ setStringField(dd.address, dataDistributionBuilder.setAddress)
dataDistributionBuilder.setMemoryUsed(dd.memoryUsed)
dataDistributionBuilder.setMemoryRemaining(dd.memoryRemaining)
dataDistributionBuilder.setDiskUsed(dd.diskUsed)
@@ -66,8 +67,8 @@ class RDDStorageInfoWrapperSerializer extends
ProtobufSerDe[RDDStorageInfoWrappe
if (info.partitions.isDefined) {
info.partitions.get.foreach { p =>
val partitionsBuilder = StoreTypes.RDDPartitionInfo.newBuilder()
- partitionsBuilder.setBlockName(p.blockName)
- partitionsBuilder.setStorageLevel(p.storageLevel)
+ setStringField(p.blockName, partitionsBuilder.setBlockName)
+ setStringField(p.storageLevel, partitionsBuilder.setStorageLevel)
partitionsBuilder.setMemoryUsed(p.memoryUsed)
partitionsBuilder.setDiskUsed(p.diskUsed)
p.executors.foreach(partitionsBuilder.addExecutors)
@@ -81,7 +82,7 @@ class RDDStorageInfoWrapperSerializer extends
ProtobufSerDe[RDDStorageInfoWrappe
private def deserializeRDDStorageInfo(info: StoreTypes.RDDStorageInfo):
RDDStorageInfo = {
new RDDStorageInfo(
id = info.getId,
- name = info.getName,
+ name = getStringField(info.hasName, info.getName),
numPartitions = info.getNumPartitions,
numCachedPartitions = info.getNumCachedPartitions,
storageLevel = info.getStorageLevel,
@@ -102,7 +103,7 @@ class RDDStorageInfoWrapperSerializer extends
ProtobufSerDe[RDDStorageInfoWrappe
RDDDataDistribution = {
new RDDDataDistribution(
- address = info.getAddress,
+ address = getStringField(info.hasAddress, info.getAddress),
memoryUsed = info.getMemoryUsed,
memoryRemaining = info.getMemoryRemaining,
diskUsed = info.getDiskUsed,
@@ -117,8 +118,8 @@ class RDDStorageInfoWrapperSerializer extends
ProtobufSerDe[RDDStorageInfoWrappe
private def deserializeRDDPartitionInfo(info: StoreTypes.RDDPartitionInfo):
RDDPartitionInfo = {
new RDDPartitionInfo(
- blockName = info.getBlockName,
- storageLevel = info.getStorageLevel,
+ blockName = getStringField(info.hasBlockName, info.getBlockName),
+ storageLevel = getStringField(info.hasStorageLevel, () =>
weakIntern(info.getStorageLevel)),
memoryUsed = info.getMemoryUsed,
diskUsed = info.getDiskUsed,
executors = info.getExecutorsList.asScala
diff --git
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index e51b2f5d012..e9dc9e00f67 100644
---
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -386,7 +386,16 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite
{
onHeapMemoryUsed = Some(101),
offHeapMemoryUsed = Some(102),
onHeapMemoryRemaining = Some(103),
- offHeapMemoryRemaining = Some(104))
+ offHeapMemoryRemaining = Some(104)),
+ new RDDDataDistribution(
+ address = null,
+ memoryUsed = 60,
+ memoryRemaining = 80,
+ diskUsed = 1000,
+ onHeapMemoryUsed = Some(1010),
+ offHeapMemoryUsed = Some(1020),
+ onHeapMemoryRemaining = Some(1030),
+ offHeapMemoryRemaining = Some(1040))
)
val rddPartitionInfo = Seq(
new RDDPartitionInfo(
@@ -394,7 +403,13 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite
{
storageLevel = "IN_MEM",
memoryUsed = 105,
diskUsed = 106,
- executors = Seq("exec_0", "exec_1"))
+ executors = Seq("exec_0", "exec_1")),
+ new RDDPartitionInfo(
+ blockName = null,
+ storageLevel = null,
+ memoryUsed = 105,
+ diskUsed = 106,
+ executors = Seq("exec_2", "exec_3"))
)
val inputs = Seq(
new RDDStorageInfoWrapper(
@@ -422,6 +437,19 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite
{
dataDistribution = None,
partitions = Some(Seq.empty)
)
+ ),
+ new RDDStorageInfoWrapper(
+ info = new RDDStorageInfo(
+ id = 3,
+ name = null,
+ numPartitions = 8,
+ numCachedPartitions = 5,
+ storageLevel = "IN_MEMORY",
+ memoryUsed = 100,
+ diskUsed = 2560,
+ dataDistribution = None,
+ partitions = Some(Seq.empty)
+ )
)
)
inputs.foreach { input =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]