This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 66f6a63b4ae [SPARK-42130][UI] Handle null string values in
AccumulableInfo and ProcessSummary
66f6a63b4ae is described below
commit 66f6a63b4ae00607b7a3d44371d379bbe1374569
Author: Gengliang Wang <[email protected]>
AuthorDate: Fri Jan 20 13:48:15 2023 -0800
[SPARK-42130][UI] Handle null string values in AccumulableInfo and
ProcessSummary
### What changes were proposed in this pull request?
After revisiting https://github.com/apache/spark/pull/39416 and
https://github.com/apache/spark/pull/39623, I propose:
* checking nullability of all string fields to avoid NPE
* using `optional string` for the protobuf definition of all string fields.
If the deserialized result is None, then set the string field as null.
Take `AccumulableInfo` as an example, it can be null on created:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/LiveEntity.scala#L744
The null value can make difference in the UI logic:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala#L791
This PR updates the developer guide and introduces utility methods for
serializing/deserializing string fields. It also handles null string values in
AccumulableInfo and ProcessSummary for setting an example.
### Why are the changes needed?
* update developer guide for better handling of null string values
* add utility methods for future development of string
serialization/deserialization
* Properly handles null string values in AccumulableInfo and ProcessSummary
for setting an example
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Test null string input values in AccumulableInfo and ProcessSummary
protobuf serializer.
Closes #39666 from gengliangwang/fixAcc.
Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 15 ++++--
.../protobuf/AccumulableInfoSerializer.scala | 10 ++--
.../protobuf/ProcessSummaryWrapperSerializer.scala | 10 ++--
.../org/apache/spark/status/protobuf/Utils.scala | 14 ++++++
.../protobuf/KVStoreProtobufSerializerSuite.scala | 53 ++++++++++++----------
5 files changed, 64 insertions(+), 38 deletions(-)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 551de95c157..96c78aa001d 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -22,7 +22,12 @@ package org.apache.spark.status.protobuf;
* Developer guides:
* - Coding style: https://developers.google.com/protocol-buffers/docs/style
* - Use int64 for job/stage IDs, in case of future extension in Spark core.
- * - Use `weakIntern` on string values in create new objects during
deserialization.
+ * - For string fields:
+ * - use `optional string` for protobuf definition
+ * - on serialization, check if the string is null to avoid NPE
+ * - on deserialization, set string fields as null if it is not set. Also,
use `weakIntern` on
+ * string values in create new objects during deserialization.
+ * - add tests with null string inputs
*/
enum JobExecutionStatus {
@@ -65,9 +70,9 @@ message JobDataWrapper {
message AccumulableInfo {
int64 id = 1;
- string name = 2;
+ optional string name = 2;
optional string update = 3;
- string value = 4;
+ optional string value = 4;
}
message TaskDataWrapper {
@@ -334,8 +339,8 @@ message SpeculationStageSummaryWrapper {
}
message ProcessSummary {
- string id = 1;
- string host_port = 2;
+ optional string id = 1;
+ optional string host_port = 2;
bool is_active = 3;
int32 total_cores = 4;
int64 add_time = 5;
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
index 18f937cecdb..a696203bc52 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
@@ -22,7 +22,7 @@ import java.util.{List => JList}
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.status.api.v1.AccumulableInfo
-import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.status.protobuf.Utils.{getOptional, getStringField,
setStringField}
import org.apache.spark.util.Utils.weakIntern
private[protobuf] object AccumulableInfoSerializer {
@@ -30,8 +30,8 @@ private[protobuf] object AccumulableInfoSerializer {
def serialize(input: AccumulableInfo): StoreTypes.AccumulableInfo = {
val builder = StoreTypes.AccumulableInfo.newBuilder()
.setId(input.id)
- .setName(input.name)
- .setValue(input.value)
+ setStringField(input.name, builder.setName)
+ setStringField(input.value, builder.setValue)
input.update.foreach(builder.setUpdate)
builder.build()
}
@@ -41,9 +41,9 @@ private[protobuf] object AccumulableInfoSerializer {
updates.forEach { update =>
accumulatorUpdates.append(new AccumulableInfo(
id = update.getId,
- name = weakIntern(update.getName),
+ name = getStringField(update.hasName, () =>
weakIntern(update.getName)),
update = getOptional(update.hasUpdate, update.getUpdate),
- value = update.getValue))
+ value = getStringField(update.hasValue, update.getValue)))
}
accumulatorUpdates
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ProcessSummaryWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ProcessSummaryWrapperSerializer.scala
index a3d13ddd31f..3a5d224f41b 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ProcessSummaryWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ProcessSummaryWrapperSerializer.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.status.ProcessSummaryWrapper
import org.apache.spark.status.api.v1.ProcessSummary
-import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.status.protobuf.Utils.{getOptional, getStringField,
setStringField}
class ProcessSummaryWrapperSerializer extends
ProtobufSerDe[ProcessSummaryWrapper] {
@@ -42,8 +42,8 @@ class ProcessSummaryWrapperSerializer extends
ProtobufSerDe[ProcessSummaryWrappe
private def serializeProcessSummary(info: ProcessSummary):
StoreTypes.ProcessSummary = {
val builder = StoreTypes.ProcessSummary.newBuilder()
- builder.setId(info.id)
- builder.setHostPort(info.hostPort)
+ setStringField(info.id, builder.setId)
+ setStringField(info.hostPort, builder.setHostPort)
builder.setIsActive(info.isActive)
builder.setTotalCores(info.totalCores)
builder.setAddTime(info.addTime.getTime)
@@ -59,8 +59,8 @@ class ProcessSummaryWrapperSerializer extends
ProtobufSerDe[ProcessSummaryWrappe
private def deserializeProcessSummary(info: StoreTypes.ProcessSummary):
ProcessSummary = {
val removeTime = getOptional(info.hasRemoveTime, () => new
Date(info.getRemoveTime))
new ProcessSummary(
- id = info.getId,
- hostPort = info.getHostPort,
+ id = getStringField(info.hasId, info.getId),
+ hostPort = getStringField(info.hasHostPort, info.getHostPort),
isActive = info.getIsActive,
totalCores = info.getTotalCores,
addTime = new Date(info.getAddTime),
diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/Utils.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/Utils.scala
index d1c85d5f5c3..47e280f4ee9 100644
--- a/core/src/main/scala/org/apache/spark/status/protobuf/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/status/protobuf/Utils.scala
@@ -17,10 +17,24 @@
package org.apache.spark.status.protobuf
+import com.google.protobuf.MessageOrBuilder
+
object Utils {
def getOptional[T](condition: Boolean, result: () => T): Option[T] = if
(condition) {
Some(result())
} else {
None
}
+
+ def setStringField(input: String, f: String => MessageOrBuilder): Unit = {
+ if (input != null) {
+ f(input)
+ }
+ }
+
+ def getStringField(condition: Boolean, result: () => String): String = if
(condition) {
+ result()
+ } else {
+ null
+ }
}
diff --git
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index 2c88666332b..0d0d26410ed 100644
---
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -86,7 +86,8 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
test("Task Data") {
val accumulatorUpdates = Seq(
new AccumulableInfo(1L, "duration", Some("update"), "value1"),
- new AccumulableInfo(2L, "duration2", None, "value2")
+ new AccumulableInfo(2L, "duration2", None, "value2"),
+ new AccumulableInfo(-1L, null, None, null)
)
val input = new TaskDataWrapper(
taskId = 1,
@@ -757,29 +758,34 @@ class KVStoreProtobufSerializerSuite extends
SparkFunSuite {
}
test("Process Summary") {
- val input = new ProcessSummaryWrapper(
- info = new ProcessSummary(
- id = "id_1",
- hostPort = "localhost:2020",
- isActive = true,
- totalCores = 4,
- addTime = new Date(1234567L),
- removeTime = Some(new Date(1234568L)),
- processLogs = Map("log1" -> "log/log1", "log2" -> "logs/log2.log")
+ Seq(
+ ("id_1", "localhost:2020"),
+ (null, "") // hostPort can't be null. Otherwise there will be NPE.
+ ).foreach { case(id, hostPort) =>
+ val input = new ProcessSummaryWrapper(
+ info = new ProcessSummary(
+ id = id,
+ hostPort = hostPort,
+ isActive = true,
+ totalCores = 4,
+ addTime = new Date(1234567L),
+ removeTime = Some(new Date(1234568L)),
+ processLogs = Map("log1" -> "log/log1", "log2" -> "logs/log2.log")
+ )
)
- )
- val bytes = serializer.serialize(input)
- val result = serializer.deserialize(bytes, classOf[ProcessSummaryWrapper])
- assert(result.info.id == input.info.id)
- assert(result.info.hostPort == input.info.hostPort)
- assert(result.info.isActive == input.info.isActive)
- assert(result.info.totalCores == input.info.totalCores)
- assert(result.info.addTime == input.info.addTime)
- assert(result.info.removeTime == input.info.removeTime)
- assert(result.info.processLogs.size == input.info.processLogs.size)
- result.info.processLogs.keys.foreach { k =>
- assert(input.info.processLogs.contains(k))
- assert(result.info.processLogs(k) == input.info.processLogs(k))
+ val bytes = serializer.serialize(input)
+ val result = serializer.deserialize(bytes,
classOf[ProcessSummaryWrapper])
+ assert(result.info.id == input.info.id)
+ assert(result.info.hostPort == input.info.hostPort)
+ assert(result.info.isActive == input.info.isActive)
+ assert(result.info.totalCores == input.info.totalCores)
+ assert(result.info.addTime == input.info.addTime)
+ assert(result.info.removeTime == input.info.removeTime)
+ assert(result.info.processLogs.size == input.info.processLogs.size)
+ result.info.processLogs.keys.foreach { k =>
+ assert(input.info.processLogs.contains(k))
+ assert(result.info.processLogs(k) == input.info.processLogs(k))
+ }
}
}
@@ -1365,6 +1371,7 @@ class KVStoreProtobufSerializerSuite extends
SparkFunSuite {
assert(a1.name == a2.name)
assert(a1.update.getOrElse("") == a2.update.getOrElse(""))
assert(a1.update == a2.update)
+ assert(a1.value == a2.value)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]