This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5574071aa62 [SPARK-41676][CORE][SQL][SS][UI] Protobuf serializer for
`StreamingQueryData`
5574071aa62 is described below
commit 5574071aa6202da108378c6c3bb9ce0c05197972
Author: yangjie01 <[email protected]>
AuthorDate: Sat Dec 31 15:36:37 2022 -0800
[SPARK-41676][CORE][SQL][SS][UI] Protobuf serializer for
`StreamingQueryData`
### What changes were proposed in this pull request?
Add Protobuf serializer for `StreamingQueryData`
### Why are the changes needed?
Support fast and compact serialization/deserialization for
`StreamingQueryData` over RocksDB.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add new UT
Closes #39233 from LuciferYang/SPARK-41676.
Lead-authored-by: yangjie01 <[email protected]>
Co-authored-by: Gengliang Wang <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 10 ++++
.../org.apache.spark.status.protobuf.ProtobufSerDe | 1 +
.../ui/StreamingQueryStatusListener.scala | 2 +-
.../sql/StreamingQueryDataSerializer.scala | 59 ++++++++++++++++++++++
.../sql/KVStoreProtobufSerializerSuite.scala | 25 +++++++++
5 files changed, 96 insertions(+), 1 deletion(-)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index ff687331a6a..38b82518ddd 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -464,6 +464,16 @@ message RDDOperationGraphWrapper {
RDDOperationClusterWrapper root_cluster = 5;
}
+message StreamingQueryData {
+ string name = 1;
+ string id = 2;
+ string run_id = 3;
+ bool is_active = 4;
+ optional string exception = 5;
+ int64 start_timestamp = 6;
+ optional int64 end_timestamp = 7;
+}
+
message StageDataWrapper {
StageData info = 1;
repeated int64 job_ids = 2;
diff --git
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
index 3f0ae5470ce..7beff87d7ec 100644
---
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
+++
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
@@ -17,3 +17,4 @@
org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer
org.apache.spark.status.protobuf.sql.SparkPlanGraphWrapperSerializer
+org.apache.spark.status.protobuf.sql.StreamingQueryDataSerializer
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
index 2e6102b01fa..1bdc5e3f79a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -115,7 +115,7 @@ private[sql] class StreamingQueryStatusListener(
}
}
-private[sql] class StreamingQueryData(
+private[spark] class StreamingQueryData(
val name: String,
val id: UUID,
@KVIndexParam val runId: String,
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
new file mode 100644
index 00000000000..f63ea80c77e
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import java.util.UUID
+
+import org.apache.spark.sql.streaming.ui.StreamingQueryData
+import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes}
+import org.apache.spark.status.protobuf.Utils.getOptional
+
+class StreamingQueryDataSerializer extends ProtobufSerDe {
+
+ override val supportClass: Class[_] = classOf[StreamingQueryData]
+
+ override def serialize(input: Any): Array[Byte] = {
+ val data = input.asInstanceOf[StreamingQueryData]
+ val builder = StoreTypes.StreamingQueryData.newBuilder()
+ .setName(data.name)
+ .setId(data.id.toString)
+ .setRunId(data.runId)
+ .setIsActive(data.isActive)
+ data.exception.foreach(builder.setException)
+ builder.setStartTimestamp(data.startTimestamp)
+ data.endTimestamp.foreach(builder.setEndTimestamp)
+ builder.build().toByteArray
+ }
+
+ override def deserialize(bytes: Array[Byte]): Any = {
+ val data = StoreTypes.StreamingQueryData.parseFrom(bytes)
+ val exception =
+ getOptional(data.hasException, () => data.getException)
+ val endTimestamp =
+ getOptional(data.hasEndTimestamp, () => data.getEndTimestamp)
+ new StreamingQueryData(
+ name = data.getName,
+ id = UUID.fromString(data.getId),
+ runId = data.getRunId,
+ isActive = data.getIsActive,
+ exception = exception,
+ startTimestamp = data.getStartTimestamp,
+ endTimestamp = endTimestamp
+ )
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
index 90d04c3f013..5f1cd812d97 100644
---
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
@@ -17,8 +17,11 @@
package org.apache.spark.status.protobuf.sql
+import java.util.UUID
+
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.execution.ui._
+import org.apache.spark.sql.streaming.ui.StreamingQueryData
import org.apache.spark.status.api.v1.sql.SqlResourceSuite
import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
@@ -211,4 +214,26 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite
{
assert(e1.toId == e2.toId)
}
}
+
+ test("StreamingQueryData") {
+ val id = UUID.randomUUID()
+ val input = new StreamingQueryData(
+ name = "some-query",
+ id = id,
+ runId = s"run-id-$id",
+ isActive = false,
+ exception = Some("Some Exception"),
+ startTimestamp = 1L,
+ endTimestamp = Some(2L)
+ )
+ val bytes = serializer.serialize(input)
+ val result = serializer.deserialize(bytes, classOf[StreamingQueryData])
+ assert(result.name == input.name)
+ assert(result.id == input.id)
+ assert(result.runId == input.runId)
+ assert(result.isActive == input.isActive)
+ assert(result.exception == input.exception)
+ assert(result.startTimestamp == input.startTimestamp)
+ assert(result.endTimestamp == input.endTimestamp)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]