This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 55f64cfa839 [SPARK-41968][CORE][SQL] Refactor `ProtobufSerDe` to
`ProtobufSerDe[T]`
55f64cfa839 is described below
commit 55f64cfa839a77b8fff7c1625281b84cce4c6807
Author: yangjie01 <[email protected]>
AuthorDate: Wed Jan 11 22:21:21 2023 -0800
[SPARK-41968][CORE][SQL] Refactor `ProtobufSerDe` to `ProtobufSerDe[T]`
### What changes were proposed in this pull request?
This pr aims refator `ProtobufSerDe` to `ProtobufSerDe[T]`, the main
change of `ProtobufSerDe` as follows:
- Change the definition of `ProtobufSerDe` to `ProtobufSerDe[T]`
- Remove `supportClass` method from `ProtobufSerDe[T]` and use reflection
in `KVStoreProtobufSerializer` to obtain the actual type of `T` as
`serializerMap` key
- Change the input parameter type of `serialize` function from `Any` to `T`
- Change the return value type of `deserialize` function method from `Any`
to `T`
Then, all the subclasses of `ProtobufSerDe[T]` are refactored and code
cleaned in this pr.
### Why are the changes needed?
Refactor `ProtobufSerDe` and code cleanup.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass Github Actions
Closes #39487 from LuciferYang/refactor-ProtobufSerDe.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../ApplicationEnvironmentInfoWrapperSerializer.scala | 10 +++-------
.../protobuf/ApplicationInfoWrapperSerializer.scala | 9 ++-------
.../status/protobuf/CachedQuantileSerializer.scala | 6 ++----
.../ExecutorStageSummaryWrapperSerializer.scala | 10 +++-------
.../protobuf/ExecutorSummaryWrapperSerializer.scala | 10 ++--------
.../status/protobuf/JobDataWrapperSerializer.scala | 9 ++-------
.../status/protobuf/KVStoreProtobufSerializer.scala | 15 +++++++++++----
.../protobuf/ProcessSummaryWrapperSerializer.scala | 10 ++--------
.../apache/spark/status/protobuf/ProtobufSerDe.scala | 19 +++++--------------
.../protobuf/RDDOperationGraphWrapperSerializer.scala | 7 ++-----
.../protobuf/RDDStorageInfoWrapperSerializer.scala | 9 ++-------
.../protobuf/ResourceProfileWrapperSerializer.scala | 9 ++-------
.../SpeculationStageSummaryWrapperSerializer.scala | 10 +++-------
.../status/protobuf/StageDataWrapperSerializer.scala | 13 +++++--------
.../status/protobuf/StreamBlockDataSerializer.scala | 6 ++----
.../status/protobuf/TaskDataWrapperSerializer.scala | 9 ++-------
.../protobuf/sql/SQLExecutionUIDataSerializer.scala | 7 ++-----
.../sql/SparkPlanGraphWrapperSerializer.scala | 7 ++-----
.../protobuf/sql/StreamingQueryDataSerializer.scala | 9 +++------
19 files changed, 57 insertions(+), 127 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
index 33a18daacbc..b7cf01382e2 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
@@ -23,14 +23,10 @@ import org.apache.spark.resource.{ExecutorResourceRequest,
TaskResourceRequest}
import org.apache.spark.status.ApplicationEnvironmentInfoWrapper
import org.apache.spark.status.api.v1.{ApplicationEnvironmentInfo,
ResourceProfileInfo, RuntimeInfo}
-class ApplicationEnvironmentInfoWrapperSerializer extends ProtobufSerDe {
+class ApplicationEnvironmentInfoWrapperSerializer
+ extends ProtobufSerDe[ApplicationEnvironmentInfoWrapper] {
- override val supportClass: Class[_] =
classOf[ApplicationEnvironmentInfoWrapper]
-
- override def serialize(input: Any): Array[Byte] =
- serialize(input.asInstanceOf[ApplicationEnvironmentInfoWrapper])
-
- private def serialize(input: ApplicationEnvironmentInfoWrapper): Array[Byte]
= {
+ override def serialize(input: ApplicationEnvironmentInfoWrapper):
Array[Byte] = {
val builder = StoreTypes.ApplicationEnvironmentInfoWrapper.newBuilder()
builder.setInfo(serializeApplicationEnvironmentInfo(input.info))
builder.build().toByteArray
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
index 5a2accb7506..c56b5302cc1 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
@@ -26,14 +26,9 @@ import
org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
import org.apache.spark.status.protobuf.Utils.getOptional
-class ApplicationInfoWrapperSerializer extends ProtobufSerDe {
+class ApplicationInfoWrapperSerializer extends
ProtobufSerDe[ApplicationInfoWrapper] {
- override val supportClass: Class[_] = classOf[ApplicationInfoWrapper]
-
- override def serialize(input: Any): Array[Byte] =
- serialize(input.asInstanceOf[ApplicationInfoWrapper])
-
- private def serialize(j: ApplicationInfoWrapper): Array[Byte] = {
+ override def serialize(j: ApplicationInfoWrapper): Array[Byte] = {
val jobData = serializeApplicationInfo(j.info)
val builder = StoreTypes.ApplicationInfoWrapper.newBuilder()
builder.setInfo(jobData)
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/CachedQuantileSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/CachedQuantileSerializer.scala
index 547cbd86b7a..08b8c8b0a98 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/CachedQuantileSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/CachedQuantileSerializer.scala
@@ -19,11 +19,9 @@ package org.apache.spark.status.protobuf
import org.apache.spark.status.CachedQuantile
-class CachedQuantileSerializer extends ProtobufSerDe {
- override val supportClass: Class[_] = classOf[CachedQuantile]
+class CachedQuantileSerializer extends ProtobufSerDe[CachedQuantile] {
- override def serialize(input: Any): Array[Byte] = {
- val data = input.asInstanceOf[CachedQuantile]
+ override def serialize(data: CachedQuantile): Array[Byte] = {
val builder = StoreTypes.CachedQuantile.newBuilder()
.setStageId(data.stageId.toLong)
.setStageAttemptId(data.stageAttemptId)
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
index 71de2fbc81f..4d9d045ed5e 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
@@ -19,14 +19,10 @@ package org.apache.spark.status.protobuf
import org.apache.spark.status.ExecutorStageSummaryWrapper
-class ExecutorStageSummaryWrapperSerializer extends ProtobufSerDe {
+class ExecutorStageSummaryWrapperSerializer
+ extends ProtobufSerDe[ExecutorStageSummaryWrapper] {
- override val supportClass: Class[_] = classOf[ExecutorStageSummaryWrapper]
-
- override def serialize(input: Any): Array[Byte] =
- serialize(input.asInstanceOf[ExecutorStageSummaryWrapper])
-
- private def serialize(input: ExecutorStageSummaryWrapper): Array[Byte] = {
+ override def serialize(input: ExecutorStageSummaryWrapper): Array[Byte] = {
val info = ExecutorStageSummarySerializer.serialize(input.info)
val builder = StoreTypes.ExecutorStageSummaryWrapper.newBuilder()
.setStageId(input.stageId.toLong)
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
index 03a810157d7..b008c98e562 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
@@ -26,15 +26,9 @@ import org.apache.spark.status.ExecutorSummaryWrapper
import org.apache.spark.status.api.v1.{ExecutorSummary, MemoryMetrics}
import org.apache.spark.status.protobuf.Utils.getOptional
-class ExecutorSummaryWrapperSerializer extends ProtobufSerDe {
+class ExecutorSummaryWrapperSerializer extends
ProtobufSerDe[ExecutorSummaryWrapper] {
- override val supportClass: Class[_] = classOf[ExecutorSummaryWrapper]
-
- override def serialize(input: Any): Array[Byte] = {
- serialize(input.asInstanceOf[ExecutorSummaryWrapper])
- }
-
- def serialize(input: ExecutorSummaryWrapper): Array[Byte] = {
+ override def serialize(input: ExecutorSummaryWrapper): Array[Byte] = {
val info = serializeExecutorSummary(input.info)
val builder = StoreTypes.ExecutorSummaryWrapper.newBuilder()
.setInfo(info)
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
index e2e2a1a8d89..10e0f125f6c 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
@@ -25,14 +25,9 @@ import org.apache.spark.status.JobDataWrapper
import org.apache.spark.status.api.v1.JobData
import org.apache.spark.status.protobuf.Utils.getOptional
-class JobDataWrapperSerializer extends ProtobufSerDe {
+class JobDataWrapperSerializer extends ProtobufSerDe[JobDataWrapper] {
- override val supportClass: Class[_] = classOf[JobDataWrapper]
-
- override def serialize(input: Any): Array[Byte] =
- serialize(input.asInstanceOf[JobDataWrapper])
-
- private def serialize(j: JobDataWrapper): Array[Byte] = {
+ override def serialize(j: JobDataWrapper): Array[Byte] = {
val jobData = serializeJobData(j.info)
val builder = StoreTypes.JobDataWrapper.newBuilder()
builder.setInfo(jobData)
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala
index d915edf396c..e6bdfa17715 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala
@@ -17,6 +17,7 @@
package org.apache.spark.status.protobuf
+import java.lang.reflect.ParameterizedType
import java.util.ServiceLoader
import collection.JavaConverters._
@@ -40,10 +41,16 @@ private[spark] class KVStoreProtobufSerializer extends
KVStoreScalaSerializer {
private[spark] object KVStoreProtobufSerializer {
- private[this] lazy val serializerMap: Map[Class[_], ProtobufSerDe] =
- ServiceLoader.load(classOf[ProtobufSerDe])
- .asScala.map(serDe => serDe.supportClass -> serDe).toMap
+ private[this] lazy val serializerMap: Map[Class[_], ProtobufSerDe[Any]] = {
+ def getGenericsType(klass: Class[_]): Class[_] = {
+ klass.getGenericInterfaces.head.asInstanceOf[ParameterizedType]
+ .getActualTypeArguments.head.asInstanceOf[Class[_]]
+ }
+ ServiceLoader.load(classOf[ProtobufSerDe[Any]]).asScala.map { serDe =>
+ getGenericsType(serDe.getClass) -> serDe
+ }.toMap
+ }
- def getSerializer(klass: Class[_]): Option[ProtobufSerDe] =
+ def getSerializer(klass: Class[_]): Option[ProtobufSerDe[Any]] =
serializerMap.get(klass)
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ProcessSummaryWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ProcessSummaryWrapperSerializer.scala
index 6d8a8f863ad..a3d13ddd31f 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ProcessSummaryWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ProcessSummaryWrapperSerializer.scala
@@ -25,15 +25,9 @@ import org.apache.spark.status.ProcessSummaryWrapper
import org.apache.spark.status.api.v1.ProcessSummary
import org.apache.spark.status.protobuf.Utils.getOptional
-class ProcessSummaryWrapperSerializer extends ProtobufSerDe {
+class ProcessSummaryWrapperSerializer extends
ProtobufSerDe[ProcessSummaryWrapper] {
- override val supportClass: Class[_] = classOf[ProcessSummaryWrapper]
-
- override def serialize(input: Any): Array[Byte] = {
- serialize(input.asInstanceOf[ProcessSummaryWrapper])
- }
-
- def serialize(input: ProcessSummaryWrapper): Array[Byte] = {
+ override def serialize(input: ProcessSummaryWrapper): Array[Byte] = {
val builder = StoreTypes.ProcessSummaryWrapper.newBuilder()
builder.setInfo(serializeProcessSummary(input.info))
builder.build().toByteArray
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ProtobufSerDe.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ProtobufSerDe.scala
index 5e0f6263f1a..d6eccb6307a 100644
--- a/core/src/main/scala/org/apache/spark/status/protobuf/ProtobufSerDe.scala
+++ b/core/src/main/scala/org/apache/spark/status/protobuf/ProtobufSerDe.scala
@@ -26,29 +26,20 @@ import org.apache.spark.annotation.{DeveloperApi, Unstable}
* register itself to `org.apache.spark.status.protobuf.ProtobufSerDe` so that
* `KVStoreProtobufSerializer` can use `ServiceLoader` to load and use them.
*
- * TODO: SPARK-41644 How to define `ProtobufSerDe` as `ProtobufSerDe[T]`
- *
* @since 3.4.0
*/
@DeveloperApi
@Unstable
-trait ProtobufSerDe {
-
- /**
- * Specify the data types supported by the current `ProtobufSerDe`
- */
- val supportClass: Class[_]
+trait ProtobufSerDe[T] {
/**
- * Serialize the input data of the type corresponding to `supportClass`
- * to `Array[Byte]`, since the current input parameter type is `Any`,
- * the input type needs to be guaranteed from the code level.
+ * Serialize the input data of the type `T` to `Array[Byte]`.
*/
- def serialize(input: Any): Array[Byte]
+ def serialize(input: T): Array[Byte]
/**
* Deserialize the input `Array[Byte]` to an object of the
- * type corresponding to `supportClass`.
+ * type `T`.
*/
- def deserialize(bytes: Array[Byte]): Any
+ def deserialize(bytes: Array[Byte]): T
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
index c0d86ede198..f822ed1889a 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
@@ -24,12 +24,9 @@ import org.apache.spark.status.{RDDOperationClusterWrapper,
RDDOperationGraphWra
import org.apache.spark.status.protobuf.StoreTypes.{DeterministicLevel =>
GDeterministicLevel}
import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
-class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
+class RDDOperationGraphWrapperSerializer extends
ProtobufSerDe[RDDOperationGraphWrapper] {
- override val supportClass: Class[_] = classOf[RDDOperationGraphWrapper]
-
- override def serialize(input: Any): Array[Byte] = {
- val op = input.asInstanceOf[RDDOperationGraphWrapper]
+ override def serialize(op: RDDOperationGraphWrapper): Array[Byte] = {
val builder = StoreTypes.RDDOperationGraphWrapper.newBuilder()
builder.setStageId(op.stageId.toLong)
op.edges.foreach { e =>
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
index be6fe1f83cd..e59d363243e 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
@@ -23,14 +23,9 @@ import org.apache.spark.status.RDDStorageInfoWrapper
import org.apache.spark.status.api.v1.{RDDDataDistribution, RDDPartitionInfo,
RDDStorageInfo}
import org.apache.spark.status.protobuf.Utils.getOptional
-class RDDStorageInfoWrapperSerializer extends ProtobufSerDe {
+class RDDStorageInfoWrapperSerializer extends
ProtobufSerDe[RDDStorageInfoWrapper] {
- override val supportClass: Class[_] = classOf[RDDStorageInfoWrapper]
-
- override def serialize(input: Any): Array[Byte] =
- serialize(input.asInstanceOf[RDDStorageInfoWrapper])
-
- private def serialize(input: RDDStorageInfoWrapper): Array[Byte] = {
+ override def serialize(input: RDDStorageInfoWrapper): Array[Byte] = {
val builder = StoreTypes.RDDStorageInfoWrapper.newBuilder()
builder.setInfo(serializeRDDStorageInfo(input.info))
builder.build().toByteArray
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ResourceProfileWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ResourceProfileWrapperSerializer.scala
index a780ea5d7e9..d9d29cc8d88 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ResourceProfileWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ResourceProfileWrapperSerializer.scala
@@ -19,16 +19,11 @@ package org.apache.spark.status.protobuf
import org.apache.spark.status.ResourceProfileWrapper
-class ResourceProfileWrapperSerializer extends ProtobufSerDe {
+class ResourceProfileWrapperSerializer extends
ProtobufSerDe[ResourceProfileWrapper] {
private val appEnvSerializer = new
ApplicationEnvironmentInfoWrapperSerializer
- override val supportClass: Class[_] = classOf[ResourceProfileWrapper]
-
- override def serialize(input: Any): Array[Byte] =
- serialize(input.asInstanceOf[ResourceProfileWrapper])
-
- private def serialize(input: ResourceProfileWrapper): Array[Byte] = {
+ override def serialize(input: ResourceProfileWrapper): Array[Byte] = {
val builder = StoreTypes.ResourceProfileWrapper.newBuilder()
builder.setRpInfo(appEnvSerializer.serializeResourceProfileInfo(input.rpInfo))
builder.build().toByteArray
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/SpeculationStageSummaryWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/SpeculationStageSummaryWrapperSerializer.scala
index 936e2dd4504..1b9a1ecfce6 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/SpeculationStageSummaryWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/SpeculationStageSummaryWrapperSerializer.scala
@@ -20,14 +20,10 @@ package org.apache.spark.status.protobuf
import org.apache.spark.status.SpeculationStageSummaryWrapper
import org.apache.spark.status.api.v1.SpeculationStageSummary
-class SpeculationStageSummaryWrapperSerializer extends ProtobufSerDe {
+class SpeculationStageSummaryWrapperSerializer
+ extends ProtobufSerDe[SpeculationStageSummaryWrapper] {
- override val supportClass: Class[_] = classOf[SpeculationStageSummaryWrapper]
-
- override def serialize(input: Any): Array[Byte] =
- serialize(input.asInstanceOf[SpeculationStageSummaryWrapper])
-
- private def serialize(s: SpeculationStageSummaryWrapper): Array[Byte] = {
+ override def serialize(s: SpeculationStageSummaryWrapper): Array[Byte] = {
val summary = serializeSpeculationStageSummary(s.info)
val builder = StoreTypes.SpeculationStageSummaryWrapper.newBuilder()
builder.setStageId(s.stageId.toLong)
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
index f4e5c8104a0..eda4422405e 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
@@ -27,16 +27,13 @@ import
org.apache.spark.status.api.v1.{ExecutorMetricsDistributions, ExecutorPea
import org.apache.spark.status.protobuf.Utils.getOptional
import org.apache.spark.util.Utils.weakIntern
-class StageDataWrapperSerializer extends ProtobufSerDe {
+class StageDataWrapperSerializer extends ProtobufSerDe[StageDataWrapper] {
- override val supportClass: Class[_] = classOf[StageDataWrapper]
-
- override def serialize(input: Any): Array[Byte] = {
- val s = input.asInstanceOf[StageDataWrapper]
+ override def serialize(input: StageDataWrapper): Array[Byte] = {
val builder = StoreTypes.StageDataWrapper.newBuilder()
- builder.setInfo(serializeStageData(s.info))
- s.jobIds.foreach(id => builder.addJobIds(id.toLong))
- s.locality.foreach { entry =>
+ builder.setInfo(serializeStageData(input.info))
+ input.jobIds.foreach(id => builder.addJobIds(id.toLong))
+ input.locality.foreach { entry =>
builder.putLocality(entry._1, entry._2)
}
builder.build().toByteArray
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala
index 71bb09d3118..f450bbbfd0c 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala
@@ -19,11 +19,9 @@ package org.apache.spark.status.protobuf
import org.apache.spark.status.StreamBlockData
-class StreamBlockDataSerializer extends ProtobufSerDe {
- override val supportClass: Class[_] = classOf[StreamBlockData]
+class StreamBlockDataSerializer extends ProtobufSerDe[StreamBlockData] {
- override def serialize(input: Any): Array[Byte] = {
- val data = input.asInstanceOf[StreamBlockData]
+ override def serialize(data: StreamBlockData): Array[Byte] = {
val builder = StoreTypes.StreamBlockData.newBuilder()
.setName(data.name)
.setExecutorId(data.executorId)
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
index 7ba4a32d806..3c947c79eab 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
@@ -21,14 +21,9 @@ import org.apache.spark.status.TaskDataWrapper
import org.apache.spark.status.protobuf.Utils.getOptional
import org.apache.spark.util.Utils.weakIntern
-class TaskDataWrapperSerializer extends ProtobufSerDe {
+class TaskDataWrapperSerializer extends ProtobufSerDe[TaskDataWrapper] {
- override val supportClass: Class[_] = classOf[TaskDataWrapper]
-
- override def serialize(input: Any): Array[Byte] =
- serialize(input.asInstanceOf[TaskDataWrapper])
-
- private def serialize(input: TaskDataWrapper): Array[Byte] = {
+ override def serialize(input: TaskDataWrapper): Array[Byte] = {
val builder = StoreTypes.TaskDataWrapper.newBuilder()
.setTaskId(input.taskId)
.setIndex(input.index)
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
index dcce69b803c..80a36e1b02b 100644
---
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
@@ -25,12 +25,9 @@ import org.apache.spark.sql.execution.ui.SQLExecutionUIData
import org.apache.spark.status.protobuf.{JobExecutionStatusSerializer,
ProtobufSerDe, StoreTypes}
import org.apache.spark.status.protobuf.Utils.getOptional
-class SQLExecutionUIDataSerializer extends ProtobufSerDe {
+class SQLExecutionUIDataSerializer extends ProtobufSerDe[SQLExecutionUIData] {
- override val supportClass: Class[_] = classOf[SQLExecutionUIData]
-
- override def serialize(input: Any): Array[Byte] = {
- val ui = input.asInstanceOf[SQLExecutionUIData]
+ override def serialize(ui: SQLExecutionUIData): Array[Byte] = {
val builder = StoreTypes.SQLExecutionUIData.newBuilder()
builder.setExecutionId(ui.executionId)
builder.setRootExecutionId(ui.rootExecutionId)
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
index c68466489ce..1df82e3246a 100644
---
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
@@ -23,12 +23,9 @@ import
org.apache.spark.sql.execution.ui.{SparkPlanGraphClusterWrapper, SparkPla
import org.apache.spark.status.protobuf.ProtobufSerDe
import org.apache.spark.status.protobuf.StoreTypes
-class SparkPlanGraphWrapperSerializer extends ProtobufSerDe {
+class SparkPlanGraphWrapperSerializer extends
ProtobufSerDe[SparkPlanGraphWrapper] {
- override val supportClass: Class[_] = classOf[SparkPlanGraphWrapper]
-
- override def serialize(input: Any): Array[Byte] = {
- val plan = input.asInstanceOf[SparkPlanGraphWrapper]
+ override def serialize(plan: SparkPlanGraphWrapper): Array[Byte] = {
val builder = StoreTypes.SparkPlanGraphWrapper.newBuilder()
builder.setExecutionId(plan.executionId)
plan.nodes.foreach { node =>
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
index f05b186fea5..70f8bedf91b 100644
---
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
@@ -23,12 +23,9 @@ import org.apache.spark.sql.streaming.ui.StreamingQueryData
import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes}
import org.apache.spark.status.protobuf.Utils.getOptional
-class StreamingQueryDataSerializer extends ProtobufSerDe {
+class StreamingQueryDataSerializer extends ProtobufSerDe[StreamingQueryData] {
- override val supportClass: Class[_] = classOf[StreamingQueryData]
-
- override def serialize(input: Any): Array[Byte] = {
- val data = input.asInstanceOf[StreamingQueryData]
+ override def serialize(data: StreamingQueryData): Array[Byte] = {
val builder = StoreTypes.StreamingQueryData.newBuilder()
.setId(data.id.toString)
.setRunId(data.runId)
@@ -40,7 +37,7 @@ class StreamingQueryDataSerializer extends ProtobufSerDe {
builder.build().toByteArray
}
- override def deserialize(bytes: Array[Byte]): Any = {
+ override def deserialize(bytes: Array[Byte]): StreamingQueryData = {
val data = StoreTypes.StreamingQueryData.parseFrom(bytes)
val exception =
getOptional(data.hasException, () => data.getException)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]