This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 88fc48f5e7e [SPARK-41431][CORE][SQL][UI] Protobuf serializer for
`SQLExecutionUIData`
88fc48f5e7e is described below
commit 88fc48f5e7e907c25d082a7b35231744ccef2c7e
Author: yangjie01 <[email protected]>
AuthorDate: Fri Dec 23 15:53:40 2022 -0800
[SPARK-41431][CORE][SQL][UI] Protobuf serializer for `SQLExecutionUIData`
### What changes were proposed in this pull request?
Add Protobuf serializer for `SQLExecutionUIData`
### Why are the changes needed?
Support fast and compact serialization/deserialization for
`SQLExecutionUIData` over RocksDB.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add new UT
Closes #39139 from LuciferYang/SPARK-41431.
Authored-by: yangjie01 <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 21 +++++
sql/core/pom.xml | 5 ++
.../org.apache.spark.status.protobuf.ProtobufSerDe | 18 +++++
.../sql/SQLExecutionUIDataSerializer.scala | 90 ++++++++++++++++++++++
.../protobuf/sql/SQLPlanMetricSerializer.scala | 36 +++++++++
.../sql/KVStoreProtobufSerializerSuite.scala | 88 +++++++++++++++++++++
6 files changed, 258 insertions(+)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 7cf5c2921cb..cb0dea540bd 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -355,3 +355,24 @@ message ExecutorSummary {
message ExecutorSummaryWrapper {
ExecutorSummary info = 1;
}
+
+message SQLPlanMetric {
+ string name = 1;
+ int64 accumulator_id = 2;
+ string metric_type = 3;
+}
+
+message SQLExecutionUIData {
+ int64 execution_id = 1;
+ string description = 2;
+ string details = 3;
+ string physical_plan_description = 4;
+ map<string, string> modified_configs = 5;
+ repeated SQLPlanMetric metrics = 6;
+ int64 submission_time = 7;
+ optional int64 completion_time = 8;
+ optional string error_message = 9;
+ map<int64, JobExecutionStatus> jobs = 10;
+ repeated int64 stages = 11;
+ map<int64, string> metric_values = 12;
+}
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index cfcf7455ad0..71c57f8a7f7 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -147,6 +147,11 @@
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-asm9-shaded</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
diff --git
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
new file mode 100644
index 00000000000..de5f2c2d05c
--- /dev/null
+++
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
new file mode 100644
index 00000000000..8dc28517ff0
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import java.util.Date
+
+import collection.JavaConverters._
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.sql.execution.ui.SQLExecutionUIData
+import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes}
+import org.apache.spark.status.protobuf.Utils.getOptional
+
+class SQLExecutionUIDataSerializer extends ProtobufSerDe {
+
+ override val supportClass: Class[_] = classOf[SQLExecutionUIData]
+
+ override def serialize(input: Any): Array[Byte] = {
+ val ui = input.asInstanceOf[SQLExecutionUIData]
+ val builder = StoreTypes.SQLExecutionUIData.newBuilder()
+ builder.setExecutionId(ui.executionId)
+ builder.setDescription(ui.description)
+ builder.setDetails(ui.details)
+ builder.setPhysicalPlanDescription(ui.physicalPlanDescription)
+ ui.modifiedConfigs.foreach {
+ case (k, v) => builder.putModifiedConfigs(k, v)
+ }
+ ui.metrics.foreach(m =>
builder.addMetrics(SQLPlanMetricSerializer.serialize(m)))
+ builder.setSubmissionTime(ui.submissionTime)
+ ui.completionTime.foreach(ct => builder.setCompletionTime(ct.getTime))
+ ui.errorMessage.foreach(builder.setErrorMessage)
+ ui.jobs.foreach {
+ case (id, status) =>
+ builder.putJobs(id.toLong,
StoreTypes.JobExecutionStatus.valueOf(status.toString))
+ }
+ ui.stages.foreach(stageId => builder.addStages(stageId.toLong))
+ val metricValues = ui.metricValues
+ if (metricValues != null) {
+ metricValues.foreach {
+ case (k, v) => builder.putMetricValues(k, v)
+ }
+ }
+ builder.build().toByteArray
+ }
+
+ override def deserialize(bytes: Array[Byte]): SQLExecutionUIData = {
+ val ui = StoreTypes.SQLExecutionUIData.parseFrom(bytes)
+ val completionTime =
+ getOptional(ui.hasCompletionTime, () => new Date(ui.getCompletionTime))
+ val errorMessage = getOptional(ui.hasErrorMessage, () =>
ui.getErrorMessage)
+ val metrics =
+ ui.getMetricsList.asScala.map(m =>
SQLPlanMetricSerializer.deserialize(m)).toSeq
+ val jobs = ui.getJobsMap.asScala.map {
+ case (jobId, status) => jobId.toInt ->
JobExecutionStatus.valueOf(status.toString)
+ }.toMap
+ val metricValues = ui.getMetricValuesMap.asScala.map {
+ case (k, v) => k.toLong -> v
+ }.toMap
+
+ new SQLExecutionUIData(
+ executionId = ui.getExecutionId,
+ description = ui.getDescription,
+ details = ui.getDetails,
+ physicalPlanDescription = ui.getPhysicalPlanDescription,
+ modifiedConfigs = ui.getModifiedConfigsMap.asScala.toMap,
+ metrics = metrics,
+ submissionTime = ui.getSubmissionTime,
+ completionTime = completionTime,
+ errorMessage = errorMessage,
+ jobs = jobs,
+ stages = ui.getStagesList.asScala.map(_.toInt).toSet,
+ metricValues = metricValues
+ )
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala
new file mode 100644
index 00000000000..8886bba2f92
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import org.apache.spark.sql.execution.ui.SQLPlanMetric
+import org.apache.spark.status.protobuf.StoreTypes
+
+object SQLPlanMetricSerializer {
+
+ def serialize(metric: SQLPlanMetric): StoreTypes.SQLPlanMetric = {
+ StoreTypes.SQLPlanMetric.newBuilder()
+ .setName(metric.name)
+ .setAccumulatorId(metric.accumulatorId)
+ .setMetricType(metric.metricType)
+ .build()
+ }
+
+ def deserialize(metrics: StoreTypes.SQLPlanMetric): SQLPlanMetric = {
+ SQLPlanMetric(metrics.getName, metrics.getAccumulatorId,
metrics.getMetricType)
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
new file mode 100644
index 00000000000..9d6a938c3fe
--- /dev/null
+++
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.ui.SQLExecutionUIData
+import org.apache.spark.status.api.v1.sql.SqlResourceSuite
+import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
+
+class KVStoreProtobufSerializerSuite extends SparkFunSuite {
+
+ private val serializer = new KVStoreProtobufSerializer()
+
+ test("SQLExecutionUIData") {
+ val input = SqlResourceSuite.sqlExecutionUIData
+ val bytes = serializer.serialize(input)
+ val result = serializer.deserialize(bytes, classOf[SQLExecutionUIData])
+ assert(result.executionId == input.executionId)
+ assert(result.description == input.description)
+ assert(result.details == input.details)
+ assert(result.physicalPlanDescription == input.physicalPlanDescription)
+ assert(result.modifiedConfigs == input.modifiedConfigs)
+ assert(result.metrics == input.metrics)
+ assert(result.submissionTime == input.submissionTime)
+ assert(result.completionTime == input.completionTime)
+ assert(result.errorMessage == input.errorMessage)
+ assert(result.jobs == input.jobs)
+ assert(result.stages == input.stages)
+ assert(result.metricValues == input.metricValues)
+ }
+
+ test("SQLExecutionUIData with metricValues is empty map and null") {
+ val templateData = SqlResourceSuite.sqlExecutionUIData
+
+ val input1 = new SQLExecutionUIData(
+ executionId = templateData.executionId,
+ description = templateData.description,
+ details = templateData.details,
+ physicalPlanDescription = templateData.physicalPlanDescription,
+ modifiedConfigs = templateData.modifiedConfigs,
+ metrics = templateData.metrics,
+ submissionTime = templateData.submissionTime,
+ completionTime = templateData.completionTime,
+ errorMessage = templateData.errorMessage,
+ jobs = templateData.jobs,
+ stages = templateData.stages,
+ metricValues = Map.empty
+ )
+ val bytes1 = serializer.serialize(input1)
+ val result1 = serializer.deserialize(bytes1, classOf[SQLExecutionUIData])
+ // input.metricValues is empty map, result.metricValues is empty map.
+ assert(result1.metricValues.isEmpty)
+
+ val input2 = new SQLExecutionUIData(
+ executionId = templateData.executionId,
+ description = templateData.description,
+ details = templateData.details,
+ physicalPlanDescription = templateData.physicalPlanDescription,
+ modifiedConfigs = templateData.modifiedConfigs,
+ metrics = templateData.metrics,
+ submissionTime = templateData.submissionTime,
+ completionTime = templateData.completionTime,
+ errorMessage = templateData.errorMessage,
+ jobs = templateData.jobs,
+ stages = templateData.stages,
+ metricValues = null
+ )
+ val bytes2 = serializer.serialize(input2)
+ val result2 = serializer.deserialize(bytes2, classOf[SQLExecutionUIData])
+ // input.metricValues is null, result.metricValues is also empty map.
+ assert(result2.metricValues.isEmpty)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]