This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 1a9cacb4625 [SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer
for `StreamingQueryProgressWrapper`
1a9cacb4625 is described below
commit 1a9cacb4625d461773cc7167958164d56a1b9349
Author: yangjie01 <[email protected]>
AuthorDate: Wed Jan 25 01:01:48 2023 -0800
[SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for
`StreamingQueryProgressWrapper`
### What changes were proposed in this pull request?
Add Protobuf serializer for `StreamingQueryProgressWrapper `
### Why are the changes needed?
Support fast and compact serialization/deserialization for
`StreamingQueryProgressWrapper` over RocksDB.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
- Add new UT
- Manual test sql module with `LIVE_UI_LOCAL_STORE_DIR`, all test passed:
```
build/mvn clean install -DskipTests -pl sq/core -am
export LIVE_UI_LOCAL_STORE_DIR=/tmp/spark-ui
build/mvn clean install -pl sql/core
```
There 4 existing test suites classes use `StreamingQueryProgressWrapper`:
- StateStoreCoordinatorSuite
- StreamingQueryStatusListenerWithDiskStoreSuite
- UISeleniumSuite
- UISeleniumWithRocksDBBackendSuite
Closes #39642 from LuciferYang/SPARK-41677-2.
Lead-authored-by: yangjie01 <[email protected]>
Co-authored-by: YangJie <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
(cherry picked from commit 7b93415836057107b9e296fe79cfd67565874551)
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 51 ++++
.../org/apache/spark/status/protobuf/Utils.scala | 8 +
.../org.apache.spark.status.protobuf.ProtobufSerDe | 1 +
.../org/apache/spark/sql/streaming/progress.scala | 8 +-
.../ui/StreamingQueryStatusListener.scala | 2 +-
.../protobuf/sql/SinkProgressSerializer.scala | 44 ++++
.../protobuf/sql/SourceProgressSerializer.scala | 64 +++++
.../sql/StateOperatorProgressSerializer.scala | 76 ++++++
.../sql/StreamingQueryProgressSerializer.scala | 104 +++++++++
.../StreamingQueryProgressWrapperSerializer.scala | 36 +++
.../sql/KVStoreProtobufSerializerSuite.scala | 259 ++++++++++++++++++++-
11 files changed, 647 insertions(+), 6 deletions(-)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index aacf49bd401..c4f64f27e4a 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -765,3 +765,54 @@ message PoolData {
optional string name = 1;
repeated int64 stage_ids = 2;
}
+
+message StateOperatorProgress {
+ optional string operator_name = 1;
+ int64 num_rows_total = 2;
+ int64 num_rows_updated = 3;
+ int64 all_updates_time_ms = 4;
+ int64 num_rows_removed = 5;
+ int64 all_removals_time_ms = 6;
+ int64 commit_time_ms = 7;
+ int64 memory_used_bytes = 8;
+ int64 num_rows_dropped_by_watermark = 9;
+ int64 num_shuffle_partitions = 10;
+ int64 num_state_store_instances = 11;
+ map<string, int64> custom_metrics = 12;
+}
+
+message SourceProgress {
+ optional string description = 1;
+ optional string start_offset = 2;
+ optional string end_offset = 3;
+ optional string latest_offset = 4;
+ int64 num_input_rows = 5;
+ double input_rows_per_second = 6;
+ double processed_rows_per_second = 7;
+ map<string, string> metrics = 8;
+}
+
+message SinkProgress {
+ optional string description = 1;
+ int64 num_output_rows = 2;
+ map<string, string> metrics = 3;
+}
+
+message StreamingQueryProgress {
+ optional string id = 1;
+ optional string run_id = 2;
+ optional string name = 3;
+ optional string timestamp = 4;
+ int64 batch_id = 5;
+ int64 batch_duration = 6;
+ map<string, int64> duration_ms = 7;
+ map<string, string> event_time = 8;
+ repeated StateOperatorProgress state_operators = 9;
+ repeated SourceProgress sources = 10;
+ SinkProgress sink = 11;
+ map<string, string> observed_metrics = 12;
+}
+
+message StreamingQueryProgressWrapper {
+ StreamingQueryProgress progress = 1;
+}
diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/Utils.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/Utils.scala
index 809f9647d72..cef6df3f569 100644
--- a/core/src/main/scala/org/apache/spark/status/protobuf/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/status/protobuf/Utils.scala
@@ -17,6 +17,8 @@
package org.apache.spark.status.protobuf
+import java.util.{Map => JMap}
+
object Utils {
def getOptional[T](condition: Boolean, result: () => T): Option[T] = if
(condition) {
Some(result())
@@ -35,4 +37,10 @@ object Utils {
} else {
null
}
+
+ def setJMapField[K, V](input: JMap[K, V], putAllFunc: JMap[K, V] => Any):
Unit = {
+ if (input != null && !input.isEmpty) {
+ putAllFunc(input)
+ }
+ }
}
diff --git
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
index 7beff87d7ec..e907d559349 100644
---
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
+++
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
@@ -18,3 +18,4 @@
org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer
org.apache.spark.status.protobuf.sql.SparkPlanGraphWrapperSerializer
org.apache.spark.status.protobuf.sql.StreamingQueryDataSerializer
+org.apache.spark.status.protobuf.sql.StreamingQueryProgressWrapperSerializer
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 3d206e7780c..1b755ed70c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -40,7 +40,7 @@ import
org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
* Information about updates made to stateful operators in a
[[StreamingQuery]] during a trigger.
*/
@Evolving
-class StateOperatorProgress private[sql](
+class StateOperatorProgress private[spark](
val operatorName: String,
val numRowsTotal: Long,
val numRowsUpdated: Long,
@@ -125,7 +125,7 @@ class StateOperatorProgress private[sql](
* @since 2.1.0
*/
@Evolving
-class StreamingQueryProgress private[sql](
+class StreamingQueryProgress private[spark](
val id: UUID,
val runId: UUID,
val name: String,
@@ -190,7 +190,7 @@ class StreamingQueryProgress private[sql](
* @since 2.1.0
*/
@Evolving
-class SourceProgress protected[sql](
+class SourceProgress protected[spark](
val description: String,
val startOffset: String,
val endOffset: String,
@@ -236,7 +236,7 @@ class SourceProgress protected[sql](
* @since 2.1.0
*/
@Evolving
-class SinkProgress protected[sql](
+class SinkProgress protected[spark](
val description: String,
val numOutputRows: Long,
val metrics: ju.Map[String, String] = Map[String, String]().asJava)
extends Serializable {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
index 1bdc5e3f79a..c5ecdb6395a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -141,7 +141,7 @@ private[sql] case class StreamingQueryUIData(
}
}
-private[sql] class StreamingQueryProgressWrapper(val progress:
StreamingQueryProgress) {
+private[spark] class StreamingQueryProgressWrapper(val progress:
StreamingQueryProgress) {
@JsonIgnore @KVIndex
private val uniqueId: String = getUniqueId(progress.runId, progress.batchId,
progress.timestamp)
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala
new file mode 100644
index 00000000000..eb68a487e2a
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import java.util.{HashMap => JHashMap}
+
+import org.apache.spark.sql.streaming.SinkProgress
+import org.apache.spark.status.protobuf.StoreTypes
+import org.apache.spark.status.protobuf.Utils.{getStringField, setStringField}
+
+private[protobuf] object SinkProgressSerializer {
+
+ def serialize(sink: SinkProgress): StoreTypes.SinkProgress = {
+ import org.apache.spark.status.protobuf.Utils.setJMapField
+ val builder = StoreTypes.SinkProgress.newBuilder()
+ setStringField(sink.description, builder.setDescription)
+ builder.setNumOutputRows(sink.numOutputRows)
+ setJMapField(sink.metrics, builder.putAllMetrics)
+ builder.build()
+ }
+
+ def deserialize(sink: StoreTypes.SinkProgress): SinkProgress = {
+ new SinkProgress(
+ description = getStringField(sink.hasDescription, () =>
sink.getDescription),
+ numOutputRows = sink.getNumOutputRows,
+ metrics = new JHashMap(sink.getMetricsMap)
+ )
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala
new file mode 100644
index 00000000000..9f3dd1af8f2
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import java.util.{HashMap => JHashMap, List => JList}
+
+import org.apache.spark.sql.streaming.SourceProgress
+import org.apache.spark.status.protobuf.StoreTypes
+import org.apache.spark.status.protobuf.Utils.{getStringField, setJMapField,
setStringField}
+
+private[protobuf] object SourceProgressSerializer {
+
+ def serialize(source: SourceProgress): StoreTypes.SourceProgress = {
+ val builder = StoreTypes.SourceProgress.newBuilder()
+ setStringField(source.description, builder.setDescription)
+ setStringField(source.startOffset, builder.setStartOffset)
+ setStringField(source.endOffset, builder.setEndOffset)
+ setStringField(source.latestOffset, builder.setLatestOffset)
+ builder.setNumInputRows(source.numInputRows)
+ builder.setInputRowsPerSecond(source.inputRowsPerSecond)
+ builder.setProcessedRowsPerSecond(source.processedRowsPerSecond)
+ setJMapField(source.metrics, builder.putAllMetrics)
+ builder.build()
+ }
+
+ def deserializeToArray(sourceList: JList[StoreTypes.SourceProgress]):
Array[SourceProgress] = {
+ val size = sourceList.size()
+ val result = new Array[SourceProgress](size)
+ var i = 0
+ while (i < size) {
+ result(i) = deserialize(sourceList.get(i))
+ i += 1
+ }
+ result
+ }
+
+ private def deserialize(source: StoreTypes.SourceProgress): SourceProgress =
{
+ new SourceProgress(
+ description = getStringField(source.hasDescription, () =>
source.getDescription),
+ startOffset = getStringField(source.hasStartOffset, () =>
source.getStartOffset),
+ endOffset = getStringField(source.hasEndOffset, () =>
source.getEndOffset),
+ latestOffset = getStringField(source.hasLatestOffset, () =>
source.getLatestOffset),
+ numInputRows = source.getNumInputRows,
+ inputRowsPerSecond = source.getInputRowsPerSecond,
+ processedRowsPerSecond = source.getProcessedRowsPerSecond,
+ metrics = new JHashMap(source.getMetricsMap)
+ )
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala
new file mode 100644
index 00000000000..8b66e8e289b
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import java.util.{HashMap => JHashMap, List => JList}
+
+import org.apache.spark.sql.streaming.StateOperatorProgress
+import org.apache.spark.status.protobuf.StoreTypes
+import org.apache.spark.status.protobuf.Utils.{getStringField, setStringField}
+
+object StateOperatorProgressSerializer {
+
+ def serialize(stateOperator: StateOperatorProgress):
StoreTypes.StateOperatorProgress = {
+ import org.apache.spark.status.protobuf.Utils.setJMapField
+ val builder = StoreTypes.StateOperatorProgress.newBuilder()
+ setStringField(stateOperator.operatorName, builder.setOperatorName)
+ builder.setNumRowsTotal(stateOperator.numRowsTotal)
+ builder.setNumRowsUpdated(stateOperator.numRowsUpdated)
+ builder.setAllUpdatesTimeMs(stateOperator.allUpdatesTimeMs)
+ builder.setNumRowsRemoved(stateOperator.numRowsRemoved)
+ builder.setAllRemovalsTimeMs(stateOperator.allRemovalsTimeMs)
+ builder.setCommitTimeMs(stateOperator.commitTimeMs)
+ builder.setMemoryUsedBytes(stateOperator.memoryUsedBytes)
+
builder.setNumRowsDroppedByWatermark(stateOperator.numRowsDroppedByWatermark)
+ builder.setNumShufflePartitions(stateOperator.numShufflePartitions)
+ builder.setNumStateStoreInstances(stateOperator.numStateStoreInstances)
+ setJMapField(stateOperator.customMetrics, builder.putAllCustomMetrics)
+ builder.build()
+ }
+
+ def deserializeToArray(
+ stateOperatorList: JList[StoreTypes.StateOperatorProgress]):
Array[StateOperatorProgress] = {
+ val size = stateOperatorList.size()
+ val result = new Array[StateOperatorProgress](size)
+ var i = 0
+ while (i < size) {
+ result(i) = deserialize(stateOperatorList.get(i))
+ i += 1
+ }
+ result
+ }
+
+ private def deserialize(
+ stateOperator: StoreTypes.StateOperatorProgress): StateOperatorProgress
= {
+ new StateOperatorProgress(
+ operatorName =
+ getStringField(stateOperator.hasOperatorName, () =>
stateOperator.getOperatorName),
+ numRowsTotal = stateOperator.getNumRowsTotal,
+ numRowsUpdated = stateOperator.getNumRowsUpdated,
+ allUpdatesTimeMs = stateOperator.getAllUpdatesTimeMs,
+ numRowsRemoved = stateOperator.getNumRowsRemoved,
+ allRemovalsTimeMs = stateOperator.getAllRemovalsTimeMs,
+ commitTimeMs = stateOperator.getCommitTimeMs,
+ memoryUsedBytes = stateOperator.getMemoryUsedBytes,
+ numRowsDroppedByWatermark = stateOperator.getNumRowsDroppedByWatermark,
+ numShufflePartitions = stateOperator.getNumShufflePartitions,
+ numStateStoreInstances = stateOperator.getNumStateStoreInstances,
+ customMetrics = new JHashMap(stateOperator.getCustomMetricsMap)
+ )
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala
new file mode 100644
index 00000000000..fc0fd5fa477
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import java.util.{HashMap => JHashMap, Map => JMap, UUID}
+
+import com.fasterxml.jackson.databind.json.JsonMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.streaming.StreamingQueryProgress
+import org.apache.spark.status.protobuf.StoreTypes
+import org.apache.spark.status.protobuf.Utils.{getStringField, setJMapField,
setStringField}
+
+private[protobuf] object StreamingQueryProgressSerializer {
+
+ private val mapper: JsonMapper = JsonMapper.builder()
+ .addModule(DefaultScalaModule)
+ .build()
+
+ def serialize(process: StreamingQueryProgress):
StoreTypes.StreamingQueryProgress = {
+ val builder = StoreTypes.StreamingQueryProgress.newBuilder()
+ if (process.id != null) {
+ builder.setId(process.id.toString)
+ }
+ if (process.runId != null) {
+ builder.setRunId(process.runId.toString)
+ }
+ setStringField(process.name, builder.setName)
+ setStringField(process.timestamp, builder.setTimestamp)
+ builder.setBatchId(process.batchId)
+ builder.setBatchDuration(process.batchDuration)
+ setJMapField(process.durationMs, builder.putAllDurationMs)
+ setJMapField(process.eventTime, builder.putAllEventTime)
+ process.stateOperators.foreach(
+ s =>
builder.addStateOperators(StateOperatorProgressSerializer.serialize(s)))
+ process.sources.foreach(
+ s => builder.addSources(SourceProgressSerializer.serialize(s))
+ )
+ builder.setSink(SinkProgressSerializer.serialize(process.sink))
+ setJMapField(process.observedMetrics, putAllObservedMetrics(builder, _))
+ builder.build()
+ }
+
+ def deserialize(process: StoreTypes.StreamingQueryProgress):
StreamingQueryProgress = {
+ val id = if (process.hasId) {
+ UUID.fromString(process.getId)
+ } else null
+ val runId = if (process.hasId) {
+ UUID.fromString(process.getRunId)
+ } else null
+ new StreamingQueryProgress(
+ id = id,
+ runId = runId,
+ name = getStringField(process.hasName, () => process.getName),
+ timestamp = getStringField(process.hasTimestamp, () =>
process.getTimestamp),
+ batchId = process.getBatchId,
+ batchDuration = process.getBatchDuration,
+ durationMs = new JHashMap(process.getDurationMsMap),
+ eventTime = new JHashMap(process.getEventTimeMap),
+ stateOperators =
+
StateOperatorProgressSerializer.deserializeToArray(process.getStateOperatorsList),
+ sources =
SourceProgressSerializer.deserializeToArray(process.getSourcesList),
+ sink = SinkProgressSerializer.deserialize(process.getSink),
+ observedMetrics = convertToObservedMetrics(process.getObservedMetricsMap)
+ )
+ }
+
+ private def putAllObservedMetrics(
+ builder: StoreTypes.StreamingQueryProgress.Builder,
+ observedMetrics: JMap[String, Row]): Unit = {
+ // Encode Row as Json to handle it as a string type in protobuf and this
way
+ // is simpler than defining a message type corresponding to Row in
protobuf.
+ observedMetrics.forEach {
+ case (k, v) => builder.putObservedMetrics(k,
mapper.writeValueAsString(v))
+ }
+ }
+
+ private def convertToObservedMetrics(input: JMap[String, String]):
JHashMap[String, Row] = {
+ val observedMetrics = new JHashMap[String, Row](input.size())
+ val classType = classOf[GenericRowWithSchema]
+ input.forEach {
+ case (k, v) =>
+ observedMetrics.put(k, mapper.readValue(v, classType))
+ }
+ observedMetrics
+ }
+}
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala
new file mode 100644
index 00000000000..21a0adc26da
--- /dev/null
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper
+import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes}
+
+class StreamingQueryProgressWrapperSerializer extends
ProtobufSerDe[StreamingQueryProgressWrapper] {
+
+ override def serialize(data: StreamingQueryProgressWrapper): Array[Byte] = {
+ val builder = StoreTypes.StreamingQueryProgressWrapper.newBuilder()
+
builder.setProgress(StreamingQueryProgressSerializer.serialize(data.progress))
+ builder.build().toByteArray
+ }
+
+ override def deserialize(bytes: Array[Byte]): StreamingQueryProgressWrapper
= {
+ val processWrapper =
StoreTypes.StreamingQueryProgressWrapper.parseFrom(bytes)
+ new StreamingQueryProgressWrapper(
+ StreamingQueryProgressSerializer.deserialize(processWrapper.getProgress))
+ }
+}
diff --git
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
index 3c2d2533275..16f5897d2b8 100644
---
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
@@ -17,11 +17,18 @@
package org.apache.spark.status.protobuf.sql
+import java.lang.{Long => JLong}
import java.util.UUID
+import scala.collection.JavaConverters._
+
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.execution.ui._
-import org.apache.spark.sql.streaming.ui.StreamingQueryData
+import org.apache.spark.sql.streaming.{SinkProgress, SourceProgress,
StateOperatorProgress, StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.{StreamingQueryData,
StreamingQueryProgressWrapper}
+import org.apache.spark.sql.types.StructType
import org.apache.spark.status.api.v1.sql.SqlResourceSuite
import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
@@ -271,4 +278,254 @@ class KVStoreProtobufSerializerSuite extends
SparkFunSuite {
assert(result.endTimestamp == input.endTimestamp)
}
}
+
+ test("StreamingQueryProgressWrapper") {
+ val normalInput = {
+ val stateOperatorProgress0 = new StateOperatorProgress(
+ operatorName = "op-0",
+ numRowsTotal = 1L,
+ numRowsUpdated = 2L,
+ allUpdatesTimeMs = 3L,
+ numRowsRemoved = 4L,
+ allRemovalsTimeMs = 5L,
+ commitTimeMs = 6L,
+ memoryUsedBytes = 7L,
+ numRowsDroppedByWatermark = 8L,
+ numShufflePartitions = 9L,
+ numStateStoreInstances = 10L,
+ customMetrics = Map(
+ "custom-metrics-00" -> JLong.valueOf("10"),
+ "custom-metrics-01" -> JLong.valueOf("11")).asJava
+ )
+ val stateOperatorProgress1 = new StateOperatorProgress(
+ operatorName = null,
+ numRowsTotal = 11L,
+ numRowsUpdated = 12L,
+ allUpdatesTimeMs = 13L,
+ numRowsRemoved = 14L,
+ allRemovalsTimeMs = 15L,
+ commitTimeMs = 16L,
+ memoryUsedBytes = 17L,
+ numRowsDroppedByWatermark = 18L,
+ numShufflePartitions = 19L,
+ numStateStoreInstances = 20L,
+ customMetrics = Map(
+ "custom-metrics-10" -> JLong.valueOf("20"),
+ "custom-metrics-11" -> JLong.valueOf("21")).asJava
+ )
+ val source0 = new SourceProgress(
+ description = "description-0",
+ startOffset = "startOffset-0",
+ endOffset = "endOffset-0",
+ latestOffset = "latestOffset-0",
+ numInputRows = 10L,
+ inputRowsPerSecond = 11.0,
+ processedRowsPerSecond = 12.0,
+ metrics = Map(
+ "metrics-00" -> "10",
+ "metrics-01" -> "11").asJava
+ )
+ val source1 = new SourceProgress(
+ description = "description-1",
+ startOffset = "startOffset-1",
+ endOffset = "endOffset-1",
+ latestOffset = "latestOffset-1",
+ numInputRows = 20L,
+ inputRowsPerSecond = 21.0,
+ processedRowsPerSecond = 22.0,
+ metrics = Map(
+ "metrics-10" -> "20",
+ "metrics-11" -> "21").asJava
+ )
+ val sink = new SinkProgress(
+ description = "sink-0",
+ numOutputRows = 30,
+ metrics = Map(
+ "metrics-20" -> "30",
+ "metrics-21" -> "31").asJava
+ )
+ val schema1 = new StructType()
+ .add("c1", "long")
+ .add("c2", "double")
+ val schema2 = new StructType()
+ .add("rc", "long")
+ .add("min_q", "string")
+ .add("max_q", "string")
+ val observedMetrics = Map[String, Row](
+ "event1" -> new GenericRowWithSchema(Array(1L, 3.0d), schema1),
+ "event2" -> new GenericRowWithSchema(Array(1L, "hello", "world"),
schema2)
+ ).asJava
+ val progress = new StreamingQueryProgress(
+ id = UUID.randomUUID(),
+ runId = UUID.randomUUID(),
+ name = "name-1",
+ timestamp = "2023-01-03T09:14:04.175Z",
+ batchId = 1L,
+ batchDuration = 2L,
+ durationMs = Map(
+ "duration-0" -> JLong.valueOf("10"),
+ "duration-1" -> JLong.valueOf("11")).asJava,
+ eventTime = Map(
+ "eventTime-0" -> "20",
+ "eventTime-1" -> "21").asJava,
+ stateOperators = Array(stateOperatorProgress0, stateOperatorProgress1),
+ sources = Array(source0, source1),
+ sink = sink,
+ observedMetrics = observedMetrics
+ )
+ new StreamingQueryProgressWrapper(progress)
+ }
+
+ val withNullInput = {
+ val stateOperatorProgress0 = new StateOperatorProgress(
+ operatorName = null,
+ numRowsTotal = 1L,
+ numRowsUpdated = 2L,
+ allUpdatesTimeMs = 3L,
+ numRowsRemoved = 4L,
+ allRemovalsTimeMs = 5L,
+ commitTimeMs = 6L,
+ memoryUsedBytes = 7L,
+ numRowsDroppedByWatermark = 8L,
+ numShufflePartitions = 9L,
+ numStateStoreInstances = 10L,
+ customMetrics = null
+ )
+ val stateOperatorProgress1 = new StateOperatorProgress(
+ operatorName = null,
+ numRowsTotal = 11L,
+ numRowsUpdated = 12L,
+ allUpdatesTimeMs = 13L,
+ numRowsRemoved = 14L,
+ allRemovalsTimeMs = 15L,
+ commitTimeMs = 16L,
+ memoryUsedBytes = 17L,
+ numRowsDroppedByWatermark = 18L,
+ numShufflePartitions = 19L,
+ numStateStoreInstances = 20L,
+ customMetrics = null
+ )
+ val source0 = new SourceProgress(
+ description = null,
+ startOffset = null,
+ endOffset = null,
+ latestOffset = null,
+ numInputRows = 10L,
+ inputRowsPerSecond = 11.0,
+ processedRowsPerSecond = 12.0,
+ metrics = null
+ )
+ val source1 = new SourceProgress(
+ description = null,
+ startOffset = null,
+ endOffset = null,
+ latestOffset = null,
+ numInputRows = 10L,
+ inputRowsPerSecond = 11.0,
+ processedRowsPerSecond = 12.0,
+ metrics = null
+ )
+ val sink = new SinkProgress(
+ description = null,
+ numOutputRows = 30,
+ metrics = null
+ )
+ val progress = new StreamingQueryProgress(
+ id = null,
+ runId = null,
+ name = null,
+ timestamp = null,
+ batchId = 1L,
+ batchDuration = 2L,
+ durationMs = null,
+ eventTime = null,
+ stateOperators = Array(stateOperatorProgress0, stateOperatorProgress1),
+ sources = Array(source0, source1),
+ sink = sink,
+ observedMetrics = null
+ )
+ new StreamingQueryProgressWrapper(progress)
+ }
+
+ Seq((false, normalInput), (true, withNullInput)).foreach { case
(hasNullValue, input) =>
+ // Do serialization and deserialization
+ val bytes = serializer.serialize(input)
+ val result = serializer.deserialize(bytes,
classOf[StreamingQueryProgressWrapper])
+
+ // Assertion results
+ val progress = input.progress
+ val resultProcess = result.progress
+ assert(progress.id == resultProcess.id)
+ assert(progress.runId == resultProcess.runId)
+ assert(progress.name == resultProcess.name)
+ assert(progress.timestamp == resultProcess.timestamp)
+ assert(progress.batchId == resultProcess.batchId)
+ assert(progress.batchDuration == resultProcess.batchDuration)
+ if (hasNullValue) {
+ assert(resultProcess.durationMs.isEmpty)
+ assert(resultProcess.eventTime.isEmpty)
+ } else {
+ assert(progress.durationMs == resultProcess.durationMs)
+ assert(progress.eventTime == resultProcess.eventTime)
+ }
+
+ progress.stateOperators.zip(resultProcess.stateOperators).foreach {
+ case (o1, o2) =>
+ assert(o1.operatorName == o2.operatorName)
+ assert(o1.numRowsTotal == o2.numRowsTotal)
+ assert(o1.numRowsUpdated == o2.numRowsUpdated)
+ assert(o1.allUpdatesTimeMs == o2.allUpdatesTimeMs)
+ assert(o1.numRowsRemoved == o2.numRowsRemoved)
+ assert(o1.allRemovalsTimeMs == o2.allRemovalsTimeMs)
+ assert(o1.commitTimeMs == o2.commitTimeMs)
+ assert(o1.memoryUsedBytes == o2.memoryUsedBytes)
+ assert(o1.numRowsDroppedByWatermark == o2.numRowsDroppedByWatermark)
+ assert(o1.numShufflePartitions == o2.numShufflePartitions)
+ assert(o1.numStateStoreInstances == o2.numStateStoreInstances)
+ if (hasNullValue) {
+ assert(o2.customMetrics.isEmpty)
+ } else {
+ assert(o1.customMetrics == o2.customMetrics)
+ }
+ }
+
+ progress.sources.zip(resultProcess.sources).foreach {
+ case (s1, s2) =>
+ assert(s1.description == s2.description)
+ assert(s1.startOffset == s2.startOffset)
+ assert(s1.endOffset == s2.endOffset)
+ assert(s1.latestOffset == s2.latestOffset)
+ assert(s1.numInputRows == s2.numInputRows)
+ assert(s1.inputRowsPerSecond == s2.inputRowsPerSecond)
+ assert(s1.processedRowsPerSecond == s2.processedRowsPerSecond)
+ if (hasNullValue) {
+ assert(s2.metrics.isEmpty)
+ } else {
+ assert(s1.metrics == s2.metrics)
+ }
+ }
+
+ Seq(progress.sink).zip(Seq(resultProcess.sink)).foreach {
+ case (s1, s2) =>
+ assert(s1.description == s2.description)
+ assert(s1.numOutputRows == s2.numOutputRows)
+ if (hasNullValue) {
+ assert(s2.metrics.isEmpty)
+ } else {
+ assert(s1.metrics == s2.metrics)
+ }
+ }
+
+ val resultObservedMetrics = resultProcess.observedMetrics
+ if (hasNullValue) {
+ assert(resultObservedMetrics.isEmpty)
+ } else {
+ assert(progress.observedMetrics.size() == resultObservedMetrics.size())
+ assert(progress.observedMetrics.keySet() ==
resultObservedMetrics.keySet())
+ progress.observedMetrics.entrySet().forEach { e =>
+ assert(e.getValue == resultObservedMetrics.get(e.getKey))
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]