This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 492356d1646 [SPARK-41768][CORE] Refactor the definition of enum to
follow with the code style
492356d1646 is described below
commit 492356d1646a5e7571dad7e3107a11f765ee810a
Author: panbingkun <[email protected]>
AuthorDate: Wed Jan 4 13:15:04 2023 -0800
[SPARK-41768][CORE] Refactor the definition of enum to follow with the code
style
### What changes were proposed in this pull request?
The pr aims to refactor the definition of enum in `UI protobuf serializer`
to follow with the code style.
### Why are the changes needed?
Following code style:
https://developers.google.com/protocol-buffers/docs/style#enums
<img width="860" alt="image"
src="https://user-images.githubusercontent.com/15246973/209946067-4c541101-be0d-49a6-9812-768ba98423a4.png">
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass GA
Existed UT.
Closes #39286 from panbingkun/SPARK-41768.
Authored-by: panbingkun <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 21 ++++++-----
.../status/protobuf/JobDataWrapperSerializer.scala | 12 ++----
.../protobuf/JobExecutionStatusSerializer.scala | 43 ++++++++++++++++++++++
.../RDDOperationGraphWrapperSerializer.scala | 35 ++++++++++++++++--
.../status/protobuf/StageStatusSerializer.scala | 26 +++++++++----
.../sql/SQLExecutionUIDataSerializer.scala | 7 ++--
6 files changed, 110 insertions(+), 34 deletions(-)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 38b82518ddd..6ba1915dfa1 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -27,10 +27,10 @@ package org.apache.spark.status.protobuf;
enum JobExecutionStatus {
JOB_EXECUTION_STATUS_UNSPECIFIED = 0;
- RUNNING = 1;
- SUCCEEDED = 2;
- FAILED = 3;
- UNKNOWN = 4;
+ JOB_EXECUTION_STATUS_RUNNING = 1;
+ JOB_EXECUTION_STATUS_SUCCEEDED = 2;
+ JOB_EXECUTION_STATUS_FAILED = 3;
+ JOB_EXECUTION_STATUS_UNKNOWN = 4;
}
message JobData {
@@ -434,13 +434,14 @@ message RDDOperationEdge {
int32 to_id = 2;
}
+enum DeterministicLevel {
+ DETERMINISTIC_LEVEL_UNSPECIFIED = 0;
+ DETERMINISTIC_LEVEL_DETERMINATE = 1;
+ DETERMINISTIC_LEVEL_UNORDERED = 2;
+ DETERMINISTIC_LEVEL_INDETERMINATE = 3;
+}
+
message RDDOperationNode {
- enum DeterministicLevel {
- UNSPECIFIED = 0;
- DETERMINATE = 1;
- UNORDERED = 2;
- INDETERMINATE = 3;
- }
int32 id = 1;
string name = 2;
bool cached = 3;
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
index 98ac2d643c9..e2e2a1a8d89 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
@@ -17,10 +17,10 @@
package org.apache.spark.status.protobuf
-import collection.JavaConverters._
import java.util.Date
-import org.apache.spark.JobExecutionStatus
+import collection.JavaConverters._
+
import org.apache.spark.status.JobDataWrapper
import org.apache.spark.status.api.v1.JobData
import org.apache.spark.status.protobuf.Utils.getOptional
@@ -55,7 +55,7 @@ class JobDataWrapperSerializer extends ProtobufSerDe {
val jobDataBuilder = StoreTypes.JobData.newBuilder()
jobDataBuilder.setJobId(jobData.jobId.toLong)
.setName(jobData.name)
- .setStatus(serializeJobExecutionStatus(jobData.status))
+ .setStatus(JobExecutionStatusSerializer.serialize(jobData.status))
.setNumTasks(jobData.numTasks)
.setNumActiveTasks(jobData.numActiveTasks)
.setNumCompletedTasks(jobData.numCompletedTasks)
@@ -89,7 +89,7 @@ class JobDataWrapperSerializer extends ProtobufSerDe {
getOptional(info.hasSubmissionTime, () => new
Date(info.getSubmissionTime))
val completionTime = getOptional(info.hasCompletionTime, () => new
Date(info.getCompletionTime))
val jobGroup = getOptional(info.hasJobGroup, info.getJobGroup)
- val status = JobExecutionStatus.valueOf(info.getStatus.toString)
+ val status = JobExecutionStatusSerializer.deserialize(info.getStatus)
new JobData(
jobId = info.getJobId.toInt,
@@ -113,8 +113,4 @@ class JobDataWrapperSerializer extends ProtobufSerDe {
numFailedStages = info.getNumFailedStages,
killedTasksSummary =
info.getKillTasksSummaryMap.asScala.mapValues(_.toInt).toMap)
}
-
- private def serializeJobExecutionStatus(j: JobExecutionStatus):
StoreTypes.JobExecutionStatus = {
- StoreTypes.JobExecutionStatus.valueOf(j.toString)
- }
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/JobExecutionStatusSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/JobExecutionStatusSerializer.scala
new file mode 100644
index 00000000000..fd07da61a9e
--- /dev/null
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/JobExecutionStatusSerializer.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.status.protobuf.StoreTypes.{JobExecutionStatus =>
GJobExecutionStatus}
+
+private[protobuf] object JobExecutionStatusSerializer {
+
+ def serialize(input: JobExecutionStatus): GJobExecutionStatus = {
+ input match {
+ case JobExecutionStatus.RUNNING =>
GJobExecutionStatus.JOB_EXECUTION_STATUS_RUNNING
+ case JobExecutionStatus.SUCCEEDED =>
GJobExecutionStatus.JOB_EXECUTION_STATUS_SUCCEEDED
+ case JobExecutionStatus.FAILED =>
GJobExecutionStatus.JOB_EXECUTION_STATUS_FAILED
+ case JobExecutionStatus.UNKNOWN =>
GJobExecutionStatus.JOB_EXECUTION_STATUS_UNKNOWN
+ }
+ }
+
+ def deserialize(binary: GJobExecutionStatus): JobExecutionStatus = {
+ binary match {
+ case GJobExecutionStatus.JOB_EXECUTION_STATUS_RUNNING =>
JobExecutionStatus.RUNNING
+ case GJobExecutionStatus.JOB_EXECUTION_STATUS_SUCCEEDED =>
JobExecutionStatus.SUCCEEDED
+ case GJobExecutionStatus.JOB_EXECUTION_STATUS_FAILED =>
JobExecutionStatus.FAILED
+ case GJobExecutionStatus.JOB_EXECUTION_STATUS_UNKNOWN =>
JobExecutionStatus.UNKNOWN
+ case _ => null
+ }
+ }
+}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
index 8975062082c..44622514ac9 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.rdd.DeterministicLevel
import org.apache.spark.status.{RDDOperationClusterWrapper,
RDDOperationGraphWrapper}
+import org.apache.spark.status.protobuf.StoreTypes.{DeterministicLevel =>
GDeterministicLevel}
import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
@@ -81,8 +82,8 @@ class RDDOperationGraphWrapperSerializer extends
ProtobufSerDe {
}
private def serializeRDDOperationNode(node: RDDOperationNode):
StoreTypes.RDDOperationNode = {
- val outputDeterministicLevel =
StoreTypes.RDDOperationNode.DeterministicLevel
- .valueOf(node.outputDeterministicLevel.toString)
+ val outputDeterministicLevel = DeterministicLevelSerializer.serialize(
+ node.outputDeterministicLevel)
val builder = StoreTypes.RDDOperationNode.newBuilder()
builder.setId(node.id)
builder.setName(node.name)
@@ -100,8 +101,8 @@ class RDDOperationGraphWrapperSerializer extends
ProtobufSerDe {
cached = node.getCached,
barrier = node.getBarrier,
callsite = node.getCallsite,
- outputDeterministicLevel =
- DeterministicLevel.withName(node.getOutputDeterministicLevel.toString)
+ outputDeterministicLevel = DeterministicLevelSerializer.deserialize(
+ node.getOutputDeterministicLevel)
)
}
@@ -118,3 +119,29 @@ class RDDOperationGraphWrapperSerializer extends
ProtobufSerDe {
toId = edge.getToId)
}
}
+
+private[protobuf] object DeterministicLevelSerializer {
+
+ def serialize(input: DeterministicLevel.Value): GDeterministicLevel = {
+ input match {
+ case DeterministicLevel.DETERMINATE =>
+ GDeterministicLevel.DETERMINISTIC_LEVEL_DETERMINATE
+ case DeterministicLevel.UNORDERED =>
+ GDeterministicLevel.DETERMINISTIC_LEVEL_UNORDERED
+ case DeterministicLevel.INDETERMINATE =>
+ GDeterministicLevel.DETERMINISTIC_LEVEL_INDETERMINATE
+ }
+ }
+
+ def deserialize(binary: GDeterministicLevel): DeterministicLevel.Value = {
+ binary match {
+ case GDeterministicLevel.DETERMINISTIC_LEVEL_DETERMINATE =>
+ DeterministicLevel.DETERMINATE
+ case GDeterministicLevel.DETERMINISTIC_LEVEL_UNORDERED =>
+ DeterministicLevel.UNORDERED
+ case GDeterministicLevel.DETERMINISTIC_LEVEL_INDETERMINATE =>
+ DeterministicLevel.INDETERMINATE
+ case _ => null
+ }
+ }
+}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala
index 6014379bb1e..fbd874cf541 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala
@@ -17,19 +17,29 @@
package org.apache.spark.status.protobuf
-import org.apache.commons.lang3.StringUtils
-
import org.apache.spark.status.api.v1.StageStatus
+import org.apache.spark.status.protobuf.StoreTypes.{StageStatus =>
GStageStatus}
private[protobuf] object StageStatusSerializer {
- private def PREFIX = "STAGE_STATUS_"
-
- def serialize(input: StageStatus): StoreTypes.StageStatus = {
- StoreTypes.StageStatus.valueOf(PREFIX + input.toString)
+ def serialize(input: StageStatus): GStageStatus = {
+ input match {
+ case StageStatus.ACTIVE => GStageStatus.STAGE_STATUS_ACTIVE
+ case StageStatus.COMPLETE => GStageStatus.STAGE_STATUS_COMPLETE
+ case StageStatus.FAILED => GStageStatus.STAGE_STATUS_FAILED
+ case StageStatus.PENDING => GStageStatus.STAGE_STATUS_PENDING
+ case StageStatus.SKIPPED => GStageStatus.STAGE_STATUS_SKIPPED
+ }
}
- def deserialize(binary: StoreTypes.StageStatus): StageStatus = {
- StageStatus.valueOf(StringUtils.removeStart(binary.toString, PREFIX))
+ def deserialize(binary: GStageStatus): StageStatus = {
+ binary match {
+ case GStageStatus.STAGE_STATUS_ACTIVE => StageStatus.ACTIVE
+ case GStageStatus.STAGE_STATUS_COMPLETE => StageStatus.COMPLETE
+ case GStageStatus.STAGE_STATUS_FAILED => StageStatus.FAILED
+ case GStageStatus.STAGE_STATUS_PENDING => StageStatus.PENDING
+ case GStageStatus.STAGE_STATUS_SKIPPED => StageStatus.SKIPPED
+ case _ => null
+ }
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
index 77b6f8925cb..7a4a3e2a55d 100644
---
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
@@ -21,9 +21,8 @@ import java.util.Date
import collection.JavaConverters._
-import org.apache.spark.JobExecutionStatus
import org.apache.spark.sql.execution.ui.SQLExecutionUIData
-import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes}
+import org.apache.spark.status.protobuf.{JobExecutionStatusSerializer,
ProtobufSerDe, StoreTypes}
import org.apache.spark.status.protobuf.Utils.getOptional
class SQLExecutionUIDataSerializer extends ProtobufSerDe {
@@ -46,7 +45,7 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe {
ui.errorMessage.foreach(builder.setErrorMessage)
ui.jobs.foreach {
case (id, status) =>
- builder.putJobs(id.toLong,
StoreTypes.JobExecutionStatus.valueOf(status.toString))
+ builder.putJobs(id.toLong,
JobExecutionStatusSerializer.serialize(status))
}
ui.stages.foreach(stageId => builder.addStages(stageId.toLong))
val metricValues = ui.metricValues
@@ -66,7 +65,7 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe {
val metrics =
ui.getMetricsList.asScala.map(m =>
SQLPlanMetricSerializer.deserialize(m))
val jobs = ui.getJobsMap.asScala.map {
- case (jobId, status) => jobId.toInt ->
JobExecutionStatus.valueOf(status.toString)
+ case (jobId, status) => jobId.toInt ->
JobExecutionStatusSerializer.deserialize(status)
}.toMap
val metricValues = ui.getMetricValuesMap.asScala.map {
case (k, v) => k.toLong -> v
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]