This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 290d09d1c88 [SPARK-41423][CORE] Protobuf serializer for
StageDataWrapper
290d09d1c88 is described below
commit 290d09d1c883ece2ecc2f20133c8e8fab3a0c7c4
Author: panbingkun <[email protected]>
AuthorDate: Fri Dec 30 13:49:17 2022 -0800
[SPARK-41423][CORE] Protobuf serializer for StageDataWrapper
### What changes were proposed in this pull request?
Add Protobuf serializer for StageDataWrapper.
### Why are the changes needed?
Support fast and compact serialization/deserialization for StageDataWrapper
over RocksDB.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New UT.
Closes #39192 from panbingkun/SPARK-41423.
Authored-by: panbingkun <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 211 +++++++
.../org.apache.spark.status.protobuf.ProtobufSerDe | 1 +
.../scala/org/apache/spark/status/api/v1/api.scala | 4 +-
.../protobuf/AccumulableInfoSerializer.scala | 49 ++
....scala => ExecutorStageSummarySerializer.scala} | 34 +-
.../ExecutorStageSummaryWrapperSerializer.scala | 56 +-
.../protobuf/StageDataWrapperSerializer.scala | 616 +++++++++++++++++++++
.../status/protobuf/StageStatusSerializer.scala | 35 ++
.../protobuf/TaskDataWrapperSerializer.scala | 23 +-
.../protobuf/KVStoreProtobufSerializerSuite.scala | 610 ++++++++++++++++++--
10 files changed, 1494 insertions(+), 145 deletions(-)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index bba3a494083..ff687331a6a 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -463,3 +463,214 @@ message RDDOperationGraphWrapper {
repeated RDDOperationEdge incoming_edges = 4;
RDDOperationClusterWrapper root_cluster = 5;
}
+
+message StageDataWrapper {
+ StageData info = 1;
+ repeated int64 job_ids = 2;
+ map<string, int64> locality = 3;
+}
+
+message TaskData {
+ int64 task_id = 1;
+ int32 index = 2;
+ int32 attempt = 3;
+ int32 partition_id = 4;
+ int64 launch_time = 5;
+ optional int64 result_fetch_start = 6;
+ optional int64 duration = 7;
+ string executor_id = 8;
+ string host = 9;
+ string status = 10;
+ string task_locality = 11;
+ bool speculative = 12;
+ repeated AccumulableInfo accumulator_updates = 13;
+ optional string error_message = 14;
+ optional TaskMetrics task_metrics = 15;
+ map<string, string> executor_logs = 16;
+ int64 scheduler_delay = 17;
+ int64 getting_result_time = 18;
+}
+
+enum StageStatus {
+ STAGE_STATUS_UNSPECIFIED = 0;
+ STAGE_STATUS_ACTIVE = 1;
+ STAGE_STATUS_COMPLETE = 2;
+ STAGE_STATUS_FAILED = 3;
+ STAGE_STATUS_PENDING = 4;
+ STAGE_STATUS_SKIPPED = 5;
+}
+
+message StageData {
+ StageStatus status = 1;
+ int64 stage_id = 2;
+ int32 attempt_id = 3;
+ int32 num_tasks = 4;
+ int32 num_active_tasks = 5;
+ int32 num_complete_tasks = 6;
+ int32 num_failed_tasks = 7;
+ int32 num_killed_tasks = 8;
+ int32 num_completed_indices = 9;
+
+ optional int64 submission_time = 10;
+ optional int64 first_task_launched_time = 11;
+ optional int64 completion_time = 12;
+ optional string failure_reason = 13;
+
+ int64 executor_deserialize_time = 14;
+ int64 executor_deserialize_cpu_time = 15;
+ int64 executor_run_time = 16;
+ int64 executor_cpu_time = 17;
+ int64 result_size = 18;
+ int64 jvm_gc_time = 19;
+ int64 result_serialization_time = 20;
+ int64 memory_bytes_spilled = 21;
+ int64 disk_bytes_spilled = 22;
+ int64 peak_execution_memory = 23;
+ int64 input_bytes = 24;
+ int64 input_records = 25;
+ int64 output_bytes = 26;
+ int64 output_records = 27;
+ int64 shuffle_remote_blocks_fetched = 28;
+ int64 shuffle_local_blocks_fetched = 29;
+ int64 shuffle_fetch_wait_time = 30;
+ int64 shuffle_remote_bytes_read = 31;
+ int64 shuffle_remote_bytes_read_to_disk = 32;
+ int64 shuffle_local_bytes_read = 33;
+ int64 shuffle_read_bytes = 34;
+ int64 shuffle_read_records = 35;
+ int64 shuffle_write_bytes = 36;
+ int64 shuffle_write_time = 37;
+ int64 shuffle_write_records = 38;
+
+ string name = 39;
+ optional string description = 40;
+ string details = 41;
+ string scheduling_pool = 42;
+
+ repeated int64 rdd_ids = 43;
+ repeated AccumulableInfo accumulator_updates = 44;
+ map<int64, TaskData> tasks = 45;
+ map<string, ExecutorStageSummary> executor_summary = 46;
+ optional SpeculationStageSummary speculation_summary = 47;
+ map<string, int32> killed_tasks_summary = 48;
+ int32 resource_profile_id = 49;
+ optional ExecutorMetrics peak_executor_metrics = 50;
+ optional TaskMetricDistributions task_metrics_distributions = 51;
+ optional ExecutorMetricsDistributions executor_metrics_distributions = 52;
+}
+
+message TaskMetrics {
+ int64 executor_deserialize_time = 1;
+ int64 executor_deserialize_cpu_time = 2;
+ int64 executor_run_time = 3;
+ int64 executor_cpu_time = 4;
+ int64 result_size = 5;
+ int64 jvm_gc_time = 6;
+ int64 result_serialization_time = 7;
+ int64 memory_bytes_spilled = 8;
+ int64 disk_bytes_spilled = 9;
+ int64 peak_execution_memory = 10;
+ InputMetrics input_metrics = 11;
+ OutputMetrics output_metrics = 12;
+ ShuffleReadMetrics shuffle_read_metrics = 13;
+ ShuffleWriteMetrics shuffle_write_metrics = 14;
+}
+
+message InputMetrics {
+ int64 bytes_read = 1;
+ int64 records_read = 2;
+}
+
+message OutputMetrics {
+ int64 bytes_written = 1;
+ int64 records_written = 2;
+}
+
+message ShuffleReadMetrics {
+ int64 remote_blocks_fetched = 1;
+ int64 local_blocks_fetched = 2;
+ int64 fetch_wait_time = 3;
+ int64 remote_bytes_read = 4;
+ int64 remote_bytes_read_to_disk = 5;
+ int64 local_bytes_read = 6;
+ int64 records_read = 7;
+}
+
+message ShuffleWriteMetrics {
+ int64 bytes_written = 1;
+ int64 write_time = 2;
+ int64 records_written = 3;
+}
+
+message TaskMetricDistributions {
+ repeated double quantiles = 1;
+ repeated double duration = 2;
+ repeated double executor_deserialize_time = 3;
+ repeated double executor_deserialize_cpu_time = 4;
+ repeated double executor_run_time = 5;
+ repeated double executor_cpu_time = 6;
+ repeated double result_size = 7;
+ repeated double jvm_gc_time = 8;
+ repeated double result_serialization_time = 9;
+ repeated double getting_result_time = 10;
+ repeated double scheduler_delay = 11;
+ repeated double peak_execution_memory = 12;
+ repeated double memory_bytes_spilled = 13;
+ repeated double disk_bytes_spilled = 14;
+ InputMetricDistributions input_metrics = 15;
+ OutputMetricDistributions output_metrics = 16;
+ ShuffleReadMetricDistributions shuffle_read_metrics = 17;
+ ShuffleWriteMetricDistributions shuffle_write_metrics = 18;
+}
+
+message InputMetricDistributions {
+ repeated double bytes_read = 1;
+ repeated double records_read = 2;
+}
+
+message OutputMetricDistributions {
+ repeated double bytes_written = 1;
+ repeated double records_written = 2;
+}
+
+message ShuffleReadMetricDistributions {
+ repeated double read_bytes = 1;
+ repeated double read_records = 2;
+ repeated double remote_blocks_fetched = 3;
+ repeated double local_blocks_fetched = 4;
+ repeated double fetch_wait_time = 5;
+ repeated double remote_bytes_read = 6;
+ repeated double remote_bytes_read_to_disk = 7;
+ repeated double total_blocks_fetched = 8;
+}
+
+message ShuffleWriteMetricDistributions {
+ repeated double write_bytes = 1;
+ repeated double write_records = 2;
+ repeated double write_time = 3;
+}
+
+message ExecutorMetricsDistributions {
+ repeated double quantiles = 1;
+
+ repeated double task_time = 2;
+ repeated double failed_tasks = 3;
+ repeated double succeeded_tasks = 4;
+ repeated double killed_tasks = 5;
+ repeated double input_bytes = 6;
+ repeated double input_records = 7;
+ repeated double output_bytes = 8;
+ repeated double output_records = 9;
+ repeated double shuffle_read = 10;
+ repeated double shuffle_read_records = 11;
+ repeated double shuffle_write = 12;
+ repeated double shuffle_write_records = 13;
+ repeated double memory_bytes_spilled = 14;
+ repeated double disk_bytes_spilled = 15;
+ ExecutorPeakMetricsDistributions peak_memory_metrics = 16;
+}
+
+message ExecutorPeakMetricsDistributions {
+ repeated double quantiles = 1;
+ repeated ExecutorMetrics executor_metrics = 2;
+}
diff --git
a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
index 4e39d9ecdc0..5619f2651d3 100644
---
a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
+++
b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
@@ -28,3 +28,4 @@
org.apache.spark.status.protobuf.SpeculationStageSummaryWrapperSerializer
org.apache.spark.status.protobuf.ExecutorSummaryWrapperSerializer
org.apache.spark.status.protobuf.ProcessSummaryWrapperSerializer
org.apache.spark.status.protobuf.RDDOperationGraphWrapperSerializer
+org.apache.spark.status.protobuf.StageDataWrapperSerializer
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 025943f628b..a58b0b808b1 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -291,8 +291,8 @@ class StageData private[spark](
val details: String,
val schedulingPool: String,
- val rddIds: Seq[Int],
- val accumulatorUpdates: Seq[AccumulableInfo],
+ val rddIds: collection.Seq[Int],
+ val accumulatorUpdates: collection.Seq[AccumulableInfo],
val tasks: Option[Map[Long, TaskData]],
val executorSummary: Option[Map[String, ExecutorStageSummary]],
val speculationSummary: Option[SpeculationStageSummary],
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
new file mode 100644
index 00000000000..8d5046923e9
--- /dev/null
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import java.util.{List => JList}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.status.api.v1.AccumulableInfo
+import org.apache.spark.status.protobuf.Utils.getOptional
+
+private[protobuf] object AccumulableInfoSerializer {
+
+ def serialize(input: AccumulableInfo): StoreTypes.AccumulableInfo = {
+ val builder = StoreTypes.AccumulableInfo.newBuilder()
+ .setId(input.id)
+ .setName(input.name)
+ .setValue(input.value)
+ input.update.foreach(builder.setUpdate)
+ builder.build()
+ }
+
+ def deserialize(updates: JList[StoreTypes.AccumulableInfo]):
ArrayBuffer[AccumulableInfo] = {
+ val accumulatorUpdates = new ArrayBuffer[AccumulableInfo](updates.size())
+ updates.forEach { update =>
+ accumulatorUpdates.append(new AccumulableInfo(
+ id = update.getId,
+ name = update.getName,
+ update = getOptional(update.hasUpdate, update.getUpdate),
+ value = update.getValue))
+ }
+ accumulatorUpdates
+ }
+}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummarySerializer.scala
similarity index 68%
copy from
core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
copy to
core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummarySerializer.scala
index 21296e95e51..c304ed8e5e1 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummarySerializer.scala
@@ -17,39 +17,12 @@
package org.apache.spark.status.protobuf
-import org.apache.spark.status.ExecutorStageSummaryWrapper
import org.apache.spark.status.api.v1.ExecutorStageSummary
import org.apache.spark.status.protobuf.Utils.getOptional
-class ExecutorStageSummaryWrapperSerializer extends ProtobufSerDe {
+private[protobuf] object ExecutorStageSummarySerializer {
- override val supportClass: Class[_] = classOf[ExecutorStageSummaryWrapper]
-
- override def serialize(input: Any): Array[Byte] =
- serialize(input.asInstanceOf[ExecutorStageSummaryWrapper])
-
- private def serialize(input: ExecutorStageSummaryWrapper): Array[Byte] = {
- val info = serializeExecutorStageSummary(input.info)
- val builder = StoreTypes.ExecutorStageSummaryWrapper.newBuilder()
- .setStageId(input.stageId.toLong)
- .setStageAttemptId(input.stageAttemptId)
- .setExecutorId(input.executorId)
- .setInfo(info)
- builder.build().toByteArray
- }
-
- def deserialize(bytes: Array[Byte]): ExecutorStageSummaryWrapper = {
- val binary = StoreTypes.ExecutorStageSummaryWrapper.parseFrom(bytes)
- val info = deserializeExecutorStageSummary(binary.getInfo)
- new ExecutorStageSummaryWrapper(
- stageId = binary.getStageId.toInt,
- stageAttemptId = binary.getStageAttemptId,
- executorId = binary.getExecutorId,
- info = info)
- }
-
- private def serializeExecutorStageSummary(
- input: ExecutorStageSummary): StoreTypes.ExecutorStageSummary = {
+ def serialize(input: ExecutorStageSummary): StoreTypes.ExecutorStageSummary
= {
val builder = StoreTypes.ExecutorStageSummary.newBuilder()
.setTaskTime(input.taskTime)
.setFailedTasks(input.failedTasks)
@@ -73,8 +46,7 @@ class ExecutorStageSummaryWrapperSerializer extends
ProtobufSerDe {
builder.build()
}
- def deserializeExecutorStageSummary(
- binary: StoreTypes.ExecutorStageSummary): ExecutorStageSummary = {
+ def deserialize(binary: StoreTypes.ExecutorStageSummary):
ExecutorStageSummary = {
val peakMemoryMetrics =
getOptional(binary.hasPeakMemoryMetrics,
() =>
ExecutorMetricsSerializer.deserialize(binary.getPeakMemoryMetrics))
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
index 21296e95e51..71de2fbc81f 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
@@ -18,8 +18,6 @@
package org.apache.spark.status.protobuf
import org.apache.spark.status.ExecutorStageSummaryWrapper
-import org.apache.spark.status.api.v1.ExecutorStageSummary
-import org.apache.spark.status.protobuf.Utils.getOptional
class ExecutorStageSummaryWrapperSerializer extends ProtobufSerDe {
@@ -29,7 +27,7 @@ class ExecutorStageSummaryWrapperSerializer extends
ProtobufSerDe {
serialize(input.asInstanceOf[ExecutorStageSummaryWrapper])
private def serialize(input: ExecutorStageSummaryWrapper): Array[Byte] = {
- val info = serializeExecutorStageSummary(input.info)
+ val info = ExecutorStageSummarySerializer.serialize(input.info)
val builder = StoreTypes.ExecutorStageSummaryWrapper.newBuilder()
.setStageId(input.stageId.toLong)
.setStageAttemptId(input.stageAttemptId)
@@ -40,61 +38,11 @@ class ExecutorStageSummaryWrapperSerializer extends
ProtobufSerDe {
def deserialize(bytes: Array[Byte]): ExecutorStageSummaryWrapper = {
val binary = StoreTypes.ExecutorStageSummaryWrapper.parseFrom(bytes)
- val info = deserializeExecutorStageSummary(binary.getInfo)
+ val info = ExecutorStageSummarySerializer.deserialize(binary.getInfo)
new ExecutorStageSummaryWrapper(
stageId = binary.getStageId.toInt,
stageAttemptId = binary.getStageAttemptId,
executorId = binary.getExecutorId,
info = info)
}
-
- private def serializeExecutorStageSummary(
- input: ExecutorStageSummary): StoreTypes.ExecutorStageSummary = {
- val builder = StoreTypes.ExecutorStageSummary.newBuilder()
- .setTaskTime(input.taskTime)
- .setFailedTasks(input.failedTasks)
- .setSucceededTasks(input.succeededTasks)
- .setKilledTasks(input.killedTasks)
- .setInputBytes(input.inputBytes)
- .setInputRecords(input.inputRecords)
- .setOutputBytes(input.outputBytes)
- .setOutputRecords(input.outputRecords)
- .setShuffleRead(input.shuffleRead)
- .setShuffleReadRecords(input.shuffleReadRecords)
- .setShuffleWrite(input.shuffleWrite)
- .setShuffleWriteRecords(input.shuffleWriteRecords)
- .setMemoryBytesSpilled(input.memoryBytesSpilled)
- .setDiskBytesSpilled(input.diskBytesSpilled)
- .setIsBlacklistedForStage(input.isBlacklistedForStage)
- .setIsExcludedForStage(input.isExcludedForStage)
- input.peakMemoryMetrics.map { m =>
- builder.setPeakMemoryMetrics(ExecutorMetricsSerializer.serialize(m))
- }
- builder.build()
- }
-
- def deserializeExecutorStageSummary(
- binary: StoreTypes.ExecutorStageSummary): ExecutorStageSummary = {
- val peakMemoryMetrics =
- getOptional(binary.hasPeakMemoryMetrics,
- () =>
ExecutorMetricsSerializer.deserialize(binary.getPeakMemoryMetrics))
- new ExecutorStageSummary(
- taskTime = binary.getTaskTime,
- failedTasks = binary.getFailedTasks,
- succeededTasks = binary.getSucceededTasks,
- killedTasks = binary.getKilledTasks,
- inputBytes = binary.getInputBytes,
- inputRecords = binary.getInputRecords,
- outputBytes = binary.getOutputBytes,
- outputRecords = binary.getOutputRecords,
- shuffleRead = binary.getShuffleRead,
- shuffleReadRecords = binary.getShuffleReadRecords,
- shuffleWrite = binary.getShuffleWrite,
- shuffleWriteRecords = binary.getShuffleWriteRecords,
- memoryBytesSpilled = binary.getMemoryBytesSpilled,
- diskBytesSpilled = binary.getDiskBytesSpilled,
- isBlacklistedForStage = binary.getIsBlacklistedForStage,
- peakMemoryMetrics = peakMemoryMetrics,
- isExcludedForStage = binary.getIsExcludedForStage)
- }
}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
new file mode 100644
index 00000000000..3ab0245898d
--- /dev/null
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import java.util.Date
+
+import collection.JavaConverters._
+import org.apache.commons.collections4.MapUtils
+
+import org.apache.spark.status.StageDataWrapper
+import org.apache.spark.status.api.v1.{ExecutorMetricsDistributions,
ExecutorPeakMetricsDistributions, InputMetricDistributions, InputMetrics,
OutputMetricDistributions, OutputMetrics, ShuffleReadMetricDistributions,
ShuffleReadMetrics, ShuffleWriteMetricDistributions, ShuffleWriteMetrics,
SpeculationStageSummary, StageData, TaskData, TaskMetricDistributions,
TaskMetrics}
+import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.util.Utils.weakIntern
+
+class StageDataWrapperSerializer extends ProtobufSerDe {
+
+ override val supportClass: Class[_] = classOf[StageDataWrapper]
+
+ override def serialize(input: Any): Array[Byte] = {
+ val s = input.asInstanceOf[StageDataWrapper]
+ val builder = StoreTypes.StageDataWrapper.newBuilder()
+ builder.setInfo(serializeStageData(s.info))
+ s.jobIds.foreach(id => builder.addJobIds(id.toLong))
+ s.locality.foreach { entry =>
+ builder.putLocality(entry._1, entry._2)
+ }
+ builder.build().toByteArray
+ }
+
+ private def serializeStageData(stageData: StageData): StoreTypes.StageData =
{
+ val stageDataBuilder = StoreTypes.StageData.newBuilder()
+ stageDataBuilder
+ .setStatus(StageStatusSerializer.serialize(stageData.status))
+ .setStageId(stageData.stageId.toLong)
+ .setAttemptId(stageData.attemptId)
+ .setNumTasks(stageData.numTasks)
+ .setNumActiveTasks(stageData.numActiveTasks)
+ .setNumCompleteTasks(stageData.numCompleteTasks)
+ .setNumFailedTasks(stageData.numFailedTasks)
+ .setNumKilledTasks(stageData.numKilledTasks)
+ .setNumCompletedIndices(stageData.numCompletedIndices)
+ .setExecutorDeserializeTime(stageData.executorDeserializeTime)
+ .setExecutorDeserializeCpuTime(stageData.executorDeserializeCpuTime)
+ .setExecutorRunTime(stageData.executorRunTime)
+ .setExecutorCpuTime(stageData.executorCpuTime)
+ .setResultSize(stageData.resultSize)
+ .setJvmGcTime(stageData.jvmGcTime)
+ .setResultSerializationTime(stageData.resultSerializationTime)
+ .setMemoryBytesSpilled(stageData.memoryBytesSpilled)
+ .setDiskBytesSpilled(stageData.diskBytesSpilled)
+ .setPeakExecutionMemory(stageData.peakExecutionMemory)
+ .setInputBytes(stageData.inputBytes)
+ .setInputRecords(stageData.inputRecords)
+ .setOutputBytes(stageData.outputBytes)
+ .setOutputRecords(stageData.outputRecords)
+ .setShuffleRemoteBlocksFetched(stageData.shuffleRemoteBlocksFetched)
+ .setShuffleLocalBlocksFetched(stageData.shuffleLocalBlocksFetched)
+ .setShuffleFetchWaitTime(stageData.shuffleFetchWaitTime)
+ .setShuffleRemoteBytesRead(stageData.shuffleRemoteBytesRead)
+ .setShuffleRemoteBytesReadToDisk(stageData.shuffleRemoteBytesReadToDisk)
+ .setShuffleLocalBytesRead(stageData.shuffleLocalBytesRead)
+ .setShuffleReadBytes(stageData.shuffleReadBytes)
+ .setShuffleReadRecords(stageData.shuffleReadRecords)
+ .setShuffleWriteBytes(stageData.shuffleWriteBytes)
+ .setShuffleWriteTime(stageData.shuffleWriteTime)
+ .setShuffleWriteRecords(stageData.shuffleWriteRecords)
+ .setName(stageData.name)
+ .setDetails(stageData.details)
+ .setSchedulingPool(stageData.schedulingPool)
+ .setResourceProfileId(stageData.resourceProfileId)
+ stageData.submissionTime.foreach { d =>
+ stageDataBuilder.setSubmissionTime(d.getTime)
+ }
+ stageData.firstTaskLaunchedTime.foreach { d =>
+ stageDataBuilder.setFirstTaskLaunchedTime(d.getTime)
+ }
+ stageData.completionTime.foreach { d =>
+ stageDataBuilder.setCompletionTime(d.getTime)
+ }
+ stageData.failureReason.foreach { fr =>
+ stageDataBuilder.setFailureReason(fr)
+ }
+ stageData.description.foreach { d =>
+ stageDataBuilder.setDescription(d)
+ }
+ stageData.rddIds.foreach(id => stageDataBuilder.addRddIds(id.toLong))
+ stageData.accumulatorUpdates.foreach { update =>
+ stageDataBuilder.addAccumulatorUpdates(
+ AccumulableInfoSerializer.serialize(update))
+ }
+ stageData.tasks.foreach { t =>
+ t.foreach { entry =>
+ stageDataBuilder.putTasks(entry._1, serializeTaskData(entry._2))
+ }
+ }
+ stageData.executorSummary.foreach { es =>
+ es.foreach { entry =>
+ stageDataBuilder.putExecutorSummary(entry._1,
+ ExecutorStageSummarySerializer.serialize(entry._2))
+ }
+ }
+ stageData.speculationSummary.foreach { ss =>
+
stageDataBuilder.setSpeculationSummary(serializeSpeculationStageSummary(ss))
+ }
+ stageData.killedTasksSummary.foreach { entry =>
+ stageDataBuilder.putKilledTasksSummary(entry._1, entry._2)
+ }
+ stageData.peakExecutorMetrics.foreach { pem =>
+
stageDataBuilder.setPeakExecutorMetrics(ExecutorMetricsSerializer.serialize(pem))
+ }
+ stageData.taskMetricsDistributions.foreach { tmd =>
+
stageDataBuilder.setTaskMetricsDistributions(serializeTaskMetricDistributions(tmd))
+ }
+ stageData.executorMetricsDistributions.foreach { emd =>
+
stageDataBuilder.setExecutorMetricsDistributions(serializeExecutorMetricsDistributions(emd))
+ }
+ stageDataBuilder.build()
+ }
+
+ private def serializeTaskData(t: TaskData): StoreTypes.TaskData = {
+ val taskDataBuilder = StoreTypes.TaskData.newBuilder()
+ taskDataBuilder
+ .setTaskId(t.taskId)
+ .setIndex(t.index)
+ .setAttempt(t.attempt)
+ .setPartitionId(t.partitionId)
+ .setLaunchTime(t.launchTime.getTime)
+ .setExecutorId(t.executorId)
+ .setHost(t.host)
+ .setStatus(t.status)
+ .setTaskLocality(t.taskLocality)
+ .setSpeculative(t.speculative)
+ .setSchedulerDelay(t.schedulerDelay)
+ .setGettingResultTime(t.gettingResultTime)
+ t.resultFetchStart.foreach { rfs =>
+ taskDataBuilder.setResultFetchStart(rfs.getTime)
+ }
+ t.duration.foreach { d =>
+ taskDataBuilder.setDuration(d)
+ }
+ t.accumulatorUpdates.foreach { update =>
+ taskDataBuilder.addAccumulatorUpdates(
+ AccumulableInfoSerializer.serialize(update))
+ }
+ t.errorMessage.foreach { em =>
+ taskDataBuilder.setErrorMessage(em)
+ }
+ t.taskMetrics.foreach { tm =>
+ taskDataBuilder.setTaskMetrics(serializeTaskMetrics(tm))
+ }
+ t.executorLogs.foreach { entry =>
+ taskDataBuilder.putExecutorLogs(entry._1, entry._2)
+ }
+ taskDataBuilder.build()
+ }
+
+ private def serializeTaskMetrics(tm: TaskMetrics): StoreTypes.TaskMetrics = {
+ val taskMetricsBuilder = StoreTypes.TaskMetrics.newBuilder()
+ taskMetricsBuilder
+ .setExecutorDeserializeTime(tm.executorDeserializeTime)
+ .setExecutorDeserializeCpuTime(tm.executorDeserializeCpuTime)
+ .setExecutorRunTime(tm.executorRunTime)
+ .setExecutorCpuTime(tm.executorCpuTime)
+ .setResultSize(tm.resultSize)
+ .setJvmGcTime(tm.jvmGcTime)
+ .setResultSerializationTime(tm.resultSerializationTime)
+ .setMemoryBytesSpilled(tm.memoryBytesSpilled)
+ .setDiskBytesSpilled(tm.diskBytesSpilled)
+ .setPeakExecutionMemory(tm.peakExecutionMemory)
+ .setInputMetrics(serializeInputMetrics(tm.inputMetrics))
+ .setOutputMetrics(serializeOutputMetrics(tm.outputMetrics))
+
.setShuffleReadMetrics(serializeShuffleReadMetrics(tm.shuffleReadMetrics))
+
.setShuffleWriteMetrics(serializeShuffleWriteMetrics(tm.shuffleWriteMetrics))
+ taskMetricsBuilder.build()
+ }
+
+ private def serializeInputMetrics(im: InputMetrics): StoreTypes.InputMetrics
= {
+ StoreTypes.InputMetrics.newBuilder()
+ .setBytesRead(im.bytesRead)
+ .setRecordsRead(im.recordsRead)
+ .build()
+ }
+
+ private def serializeOutputMetrics(om: OutputMetrics):
StoreTypes.OutputMetrics = {
+ StoreTypes.OutputMetrics.newBuilder()
+ .setBytesWritten(om.bytesWritten)
+ .setRecordsWritten(om.recordsWritten)
+ .build()
+ }
+
+ private def serializeShuffleReadMetrics(
+ srm: ShuffleReadMetrics): StoreTypes.ShuffleReadMetrics = {
+ StoreTypes.ShuffleReadMetrics.newBuilder()
+ .setRemoteBlocksFetched(srm.remoteBlocksFetched)
+ .setLocalBlocksFetched(srm.localBlocksFetched)
+ .setFetchWaitTime(srm.fetchWaitTime)
+ .setRemoteBytesRead(srm.remoteBytesRead)
+ .setRemoteBytesReadToDisk(srm.remoteBytesReadToDisk)
+ .setLocalBytesRead(srm.localBytesRead)
+ .setRecordsRead(srm.recordsRead)
+ .build()
+ }
+
+ private def serializeShuffleWriteMetrics(
+ swm: ShuffleWriteMetrics): StoreTypes.ShuffleWriteMetrics = {
+ StoreTypes.ShuffleWriteMetrics.newBuilder()
+ .setBytesWritten(swm.bytesWritten)
+ .setWriteTime(swm.writeTime)
+ .setRecordsWritten(swm.recordsWritten)
+ .build()
+ }
+
+ private def serializeSpeculationStageSummary(
+ sss: SpeculationStageSummary): StoreTypes.SpeculationStageSummary = {
+ StoreTypes.SpeculationStageSummary.newBuilder()
+ .setNumTasks(sss.numTasks)
+ .setNumActiveTasks(sss.numActiveTasks)
+ .setNumCompletedTasks(sss.numCompletedTasks)
+ .setNumFailedTasks(sss.numFailedTasks)
+ .setNumKilledTasks(sss.numKilledTasks)
+ .build()
+ }
+
+ private def serializeTaskMetricDistributions(
+ tmd: TaskMetricDistributions): StoreTypes.TaskMetricDistributions = {
+ val builder = StoreTypes.TaskMetricDistributions.newBuilder()
+ tmd.quantiles.foreach(q => builder.addQuantiles(q))
+ tmd.duration.foreach(d => builder.addDuration(d))
+ tmd.executorDeserializeTime.foreach(edt =>
builder.addExecutorDeserializeTime(edt))
+ tmd.executorDeserializeCpuTime.foreach(edct =>
builder.addExecutorDeserializeCpuTime(edct))
+ tmd.executorRunTime.foreach(ert => builder.addExecutorRunTime(ert))
+ tmd.executorCpuTime.foreach(ect => builder.addExecutorCpuTime(ect))
+ tmd.resultSize.foreach(rs => builder.addResultSize(rs))
+ tmd.jvmGcTime.foreach(jgt => builder.addJvmGcTime(jgt))
+ tmd.resultSerializationTime.foreach(rst =>
builder.addResultSerializationTime(rst))
+ tmd.gettingResultTime.foreach(grt => builder.addGettingResultTime(grt))
+ tmd.schedulerDelay.foreach(sd => builder.addSchedulerDelay(sd))
+ tmd.peakExecutionMemory.foreach(pem => builder.addPeakExecutionMemory(pem))
+ tmd.memoryBytesSpilled.foreach(mbs => builder.addMemoryBytesSpilled(mbs))
+ tmd.diskBytesSpilled.foreach(dbs => builder.addDiskBytesSpilled(dbs))
+ builder
+ .setInputMetrics(serializeInputMetricDistributions(tmd.inputMetrics))
+ .setOutputMetrics(serializeOutputMetricDistributions(tmd.outputMetrics))
+
.setShuffleReadMetrics(serializeShuffleReadMetricDistributions(tmd.shuffleReadMetrics))
+
.setShuffleWriteMetrics(serializeShuffleWriteMetricDistributions(tmd.shuffleWriteMetrics))
+ .build()
+ }
+
+ private def serializeInputMetricDistributions(
+ imd: InputMetricDistributions): StoreTypes.InputMetricDistributions = {
+ val builder = StoreTypes.InputMetricDistributions.newBuilder()
+ imd.bytesRead.foreach(br => builder.addBytesRead(br))
+ imd.recordsRead.foreach(rr => builder.addRecordsRead(rr))
+ builder.build()
+ }
+
+ private def serializeOutputMetricDistributions(
+ omd: OutputMetricDistributions): StoreTypes.OutputMetricDistributions = {
+ val builder = StoreTypes.OutputMetricDistributions.newBuilder()
+ omd.bytesWritten.foreach(bw => builder.addBytesWritten(bw))
+ omd.recordsWritten.foreach(rw => builder.addRecordsWritten(rw))
+ builder.build()
+ }
+
+ private def serializeShuffleReadMetricDistributions(
+ srmd: ShuffleReadMetricDistributions):
StoreTypes.ShuffleReadMetricDistributions = {
+ val builder = StoreTypes.ShuffleReadMetricDistributions.newBuilder()
+ srmd.readBytes.foreach(rb => builder.addReadBytes(rb))
+ srmd.readRecords.foreach(rr => builder.addReadRecords(rr))
+ srmd.remoteBlocksFetched.foreach(rbf =>
builder.addRemoteBlocksFetched(rbf))
+ srmd.localBlocksFetched.foreach(lbf => builder.addLocalBlocksFetched(lbf))
+ srmd.fetchWaitTime.foreach(fwt => builder.addFetchWaitTime(fwt))
+ srmd.remoteBytesRead.foreach(rbr => builder.addRemoteBytesRead(rbr))
+ srmd.remoteBytesReadToDisk.foreach(rbrtd =>
builder.addRemoteBytesReadToDisk(rbrtd))
+ srmd.totalBlocksFetched.foreach(tbf => builder.addTotalBlocksFetched(tbf))
+ builder.build()
+ }
+
+ private def serializeShuffleWriteMetricDistributions(
+ swmd: ShuffleWriteMetricDistributions):
StoreTypes.ShuffleWriteMetricDistributions = {
+ val builder = StoreTypes.ShuffleWriteMetricDistributions.newBuilder()
+ swmd.writeBytes.foreach(wb => builder.addWriteBytes(wb))
+ swmd.writeRecords.foreach(wr => builder.addWriteRecords(wr))
+ swmd.writeTime.foreach(wt => builder.addWriteTime(wt))
+ builder.build()
+ }
+
+ private def serializeExecutorMetricsDistributions(
+ emd: ExecutorMetricsDistributions):
StoreTypes.ExecutorMetricsDistributions = {
+ val builder = StoreTypes.ExecutorMetricsDistributions.newBuilder()
+ emd.quantiles.foreach(q => builder.addQuantiles(q))
+ emd.taskTime.foreach(tt => builder.addTaskTime(tt))
+ emd.failedTasks.foreach(ft => builder.addFailedTasks(ft))
+ emd.succeededTasks.foreach(st => builder.addSucceededTasks(st))
+ emd.killedTasks.foreach(kt => builder.addKilledTasks(kt))
+ emd.inputBytes.foreach(ib => builder.addInputBytes(ib))
+ emd.inputRecords.foreach(ir => builder.addInputRecords(ir))
+ emd.outputBytes.foreach(ob => builder.addOutputBytes(ob))
+ emd.outputRecords.foreach(or => builder.addOutputRecords(or))
+ emd.shuffleRead.foreach(sr => builder.addShuffleRead(sr))
+ emd.shuffleReadRecords.foreach(srr => builder.addShuffleReadRecords(srr))
+ emd.shuffleWrite.foreach(sw => builder.addShuffleWrite(sw))
+ emd.shuffleWriteRecords.foreach(swr => builder.addShuffleWriteRecords(swr))
+ emd.memoryBytesSpilled.foreach(mbs => builder.addMemoryBytesSpilled(mbs))
+ emd.diskBytesSpilled.foreach(dbs => builder.addDiskBytesSpilled(dbs))
+
builder.setPeakMemoryMetrics(serializeExecutorPeakMetricsDistributions(emd.peakMemoryMetrics))
+ builder.build()
+ }
+
+ private def serializeExecutorPeakMetricsDistributions(
+ epmd: ExecutorPeakMetricsDistributions):
StoreTypes.ExecutorPeakMetricsDistributions = {
+ val builder = StoreTypes.ExecutorPeakMetricsDistributions.newBuilder()
+ epmd.quantiles.foreach(q => builder.addQuantiles(q))
+ epmd.executorMetrics.foreach(em => builder.addExecutorMetrics(
+ ExecutorMetricsSerializer.serialize(em)))
+ builder.build()
+ }
+
+ override def deserialize(bytes: Array[Byte]): StageDataWrapper = {
+ val binary = StoreTypes.StageDataWrapper.parseFrom(bytes)
+ val info = deserializeStageData(binary.getInfo)
+ new StageDataWrapper(
+ info = info,
+ jobIds = binary.getJobIdsList.asScala.map(_.toInt).toSet,
+ locality = binary.getLocalityMap.asScala.mapValues(_.toLong).toMap
+ )
+ }
+
+ private def deserializeStageData(binary: StoreTypes.StageData): StageData = {
+ val status = StageStatusSerializer.deserialize(binary.getStatus)
+ val submissionTime =
+ getOptional(binary.hasSubmissionTime, () => new
Date(binary.getSubmissionTime))
+ val firstTaskLaunchedTime =
+ getOptional(binary.hasFirstTaskLaunchedTime, () => new
Date(binary.getFirstTaskLaunchedTime))
+ val completionTime =
+ getOptional(binary.hasCompletionTime, () => new
Date(binary.getCompletionTime))
+ val failureReason =
+ getOptional(binary.hasFailureReason, () =>
weakIntern(binary.getFailureReason))
+ val description =
+ getOptional(binary.hasDescription, () =>
weakIntern(binary.getDescription))
+ val accumulatorUpdates =
AccumulableInfoSerializer.deserialize(binary.getAccumulatorUpdatesList)
+ val tasks = if (MapUtils.isNotEmpty(binary.getTasksMap)) {
+ Some(binary.getTasksMap.asScala.map(
+ entry => (entry._1.toLong, deserializeTaskData(entry._2))).toMap)
+ } else None
+ val executorSummary = if
(MapUtils.isNotEmpty(binary.getExecutorSummaryMap)) {
+ Some(binary.getExecutorSummaryMap.asScala.mapValues(
+ ExecutorStageSummarySerializer.deserialize).toMap)
+ } else None
+ val speculationSummary =
+ getOptional(binary.hasSpeculationSummary,
+ () => deserializeSpeculationStageSummary(binary.getSpeculationSummary))
+ val peakExecutorMetrics =
+ getOptional(binary.hasPeakExecutorMetrics,
+ () =>
ExecutorMetricsSerializer.deserialize(binary.getPeakExecutorMetrics))
+ val taskMetricsDistributions =
+ getOptional(binary.hasTaskMetricsDistributions,
+ () =>
deserializeTaskMetricDistributions(binary.getTaskMetricsDistributions))
+ val executorMetricsDistributions =
+ getOptional(binary.hasExecutorMetricsDistributions,
+ () =>
deserializeExecutorMetricsDistributions(binary.getExecutorMetricsDistributions))
+ new StageData(
+ status = status,
+ stageId = binary.getStageId.toInt,
+ attemptId = binary.getAttemptId,
+ numTasks = binary.getNumTasks,
+ numActiveTasks = binary.getNumActiveTasks,
+ numCompleteTasks = binary.getNumCompleteTasks,
+ numFailedTasks = binary.getNumFailedTasks,
+ numKilledTasks = binary.getNumKilledTasks,
+ numCompletedIndices = binary.getNumCompletedIndices,
+ submissionTime = submissionTime,
+ firstTaskLaunchedTime = firstTaskLaunchedTime,
+ completionTime = completionTime,
+ failureReason = failureReason,
+ executorDeserializeTime = binary.getExecutorDeserializeTime,
+ executorDeserializeCpuTime = binary.getExecutorDeserializeCpuTime,
+ executorRunTime = binary.getExecutorRunTime,
+ executorCpuTime = binary.getExecutorCpuTime,
+ resultSize = binary.getResultSize,
+ jvmGcTime = binary.getJvmGcTime,
+ resultSerializationTime = binary.getResultSerializationTime,
+ memoryBytesSpilled = binary.getMemoryBytesSpilled,
+ diskBytesSpilled = binary.getDiskBytesSpilled,
+ peakExecutionMemory = binary.getPeakExecutionMemory,
+ inputBytes = binary.getInputBytes,
+ inputRecords = binary.getInputRecords,
+ outputBytes = binary.getOutputBytes,
+ outputRecords = binary.getOutputRecords,
+ shuffleRemoteBlocksFetched = binary.getShuffleRemoteBlocksFetched,
+ shuffleLocalBlocksFetched = binary.getShuffleLocalBlocksFetched,
+ shuffleFetchWaitTime = binary.getShuffleFetchWaitTime,
+ shuffleRemoteBytesRead = binary.getShuffleRemoteBytesRead,
+ shuffleRemoteBytesReadToDisk = binary.getShuffleRemoteBytesReadToDisk,
+ shuffleLocalBytesRead = binary.getShuffleLocalBytesRead,
+ shuffleReadBytes = binary.getShuffleReadBytes,
+ shuffleReadRecords = binary.getShuffleReadRecords,
+ shuffleWriteBytes = binary.getShuffleWriteBytes,
+ shuffleWriteTime = binary.getShuffleWriteTime,
+ shuffleWriteRecords = binary.getShuffleWriteRecords,
+ name = weakIntern(binary.getName),
+ description = description,
+ details = weakIntern(binary.getDetails),
+ schedulingPool = weakIntern(binary.getSchedulingPool),
+ rddIds = binary.getRddIdsList.asScala.map(_.toInt),
+ accumulatorUpdates = accumulatorUpdates,
+ tasks = tasks,
+ executorSummary = executorSummary,
+ speculationSummary = speculationSummary,
+ killedTasksSummary =
binary.getKilledTasksSummaryMap.asScala.mapValues(_.toInt).toMap,
+ resourceProfileId = binary.getResourceProfileId,
+ peakExecutorMetrics = peakExecutorMetrics,
+ taskMetricsDistributions = taskMetricsDistributions,
+ executorMetricsDistributions = executorMetricsDistributions
+ )
+ }
+
+ private def deserializeSpeculationStageSummary(
+ binary: StoreTypes.SpeculationStageSummary): SpeculationStageSummary = {
+ new SpeculationStageSummary(
+ binary.getNumTasks,
+ binary.getNumActiveTasks,
+ binary.getNumCompletedTasks,
+ binary.getNumFailedTasks,
+ binary.getNumKilledTasks
+ )
+ }
+
+ private def deserializeTaskMetricDistributions(
+ binary: StoreTypes.TaskMetricDistributions): TaskMetricDistributions = {
+ new TaskMetricDistributions(
+ quantiles = binary.getQuantilesList.asScala.map(_.toDouble).toIndexedSeq,
+ duration = binary.getDurationList.asScala.map(_.toDouble).toIndexedSeq,
+ executorDeserializeTime =
+
binary.getExecutorDeserializeTimeList.asScala.map(_.toDouble).toIndexedSeq,
+ executorDeserializeCpuTime =
+
binary.getExecutorDeserializeCpuTimeList.asScala.map(_.toDouble).toIndexedSeq,
+ executorRunTime =
binary.getExecutorRunTimeList.asScala.map(_.toDouble).toIndexedSeq,
+ executorCpuTime =
binary.getExecutorCpuTimeList.asScala.map(_.toDouble).toIndexedSeq,
+ resultSize =
binary.getResultSizeList.asScala.map(_.toDouble).toIndexedSeq,
+ jvmGcTime = binary.getJvmGcTimeList.asScala.map(_.toDouble).toIndexedSeq,
+ resultSerializationTime =
+
binary.getResultSerializationTimeList.asScala.map(_.toDouble).toIndexedSeq,
+ gettingResultTime =
binary.getGettingResultTimeList.asScala.map(_.toDouble).toIndexedSeq,
+ schedulerDelay =
binary.getSchedulerDelayList.asScala.map(_.toDouble).toIndexedSeq,
+ peakExecutionMemory =
binary.getPeakExecutionMemoryList.asScala.map(_.toDouble).toIndexedSeq,
+ memoryBytesSpilled =
binary.getMemoryBytesSpilledList.asScala.map(_.toDouble).toIndexedSeq,
+ diskBytesSpilled =
binary.getDiskBytesSpilledList.asScala.map(_.toDouble).toIndexedSeq,
+ inputMetrics =
deserializeInputMetricDistributions(binary.getInputMetrics),
+ outputMetrics =
deserializeOutputMetricDistributions(binary.getOutputMetrics),
+ shuffleReadMetrics =
deserializeShuffleReadMetricDistributions(binary.getShuffleReadMetrics),
+ shuffleWriteMetrics =
+
deserializeShuffleWriteMetricDistributions(binary.getShuffleWriteMetrics)
+ )
+ }
+
+ private def deserializeInputMetricDistributions(
+ binary: StoreTypes.InputMetricDistributions): InputMetricDistributions =
{
+ new InputMetricDistributions(
+ bytesRead = binary.getBytesReadList.asScala.map(_.toDouble).toIndexedSeq,
+ recordsRead =
binary.getRecordsReadList.asScala.map(_.toDouble).toIndexedSeq
+ )
+ }
+
+ private def deserializeOutputMetricDistributions(
+ binary: StoreTypes.OutputMetricDistributions): OutputMetricDistributions
= {
+ new OutputMetricDistributions(
+ bytesWritten =
binary.getBytesWrittenList.asScala.map(_.toDouble).toIndexedSeq,
+ recordsWritten =
binary.getRecordsWrittenList.asScala.map(_.toDouble).toIndexedSeq
+ )
+ }
+
+ private def deserializeShuffleReadMetricDistributions(
+ binary: StoreTypes.ShuffleReadMetricDistributions):
ShuffleReadMetricDistributions = {
+ new ShuffleReadMetricDistributions(
+ readBytes = binary.getReadBytesList.asScala.map(_.toDouble).toIndexedSeq,
+ readRecords =
binary.getReadRecordsList.asScala.map(_.toDouble).toIndexedSeq,
+ remoteBlocksFetched =
binary.getRemoteBlocksFetchedList.asScala.map(_.toDouble).toIndexedSeq,
+ localBlocksFetched =
binary.getLocalBlocksFetchedList.asScala.map(_.toDouble).toIndexedSeq,
+ fetchWaitTime =
binary.getFetchWaitTimeList.asScala.map(_.toDouble).toIndexedSeq,
+ remoteBytesRead =
binary.getRemoteBytesReadList.asScala.map(_.toDouble).toIndexedSeq,
+ remoteBytesReadToDisk =
+
binary.getRemoteBytesReadToDiskList.asScala.map(_.toDouble).toIndexedSeq,
+ totalBlocksFetched =
binary.getTotalBlocksFetchedList.asScala.map(_.toDouble).toIndexedSeq
+ )
+ }
+
+ private def deserializeShuffleWriteMetricDistributions(
+ binary: StoreTypes.ShuffleWriteMetricDistributions):
ShuffleWriteMetricDistributions = {
+ new ShuffleWriteMetricDistributions(
+ writeBytes =
binary.getWriteBytesList.asScala.map(_.toDouble).toIndexedSeq,
+ writeRecords =
binary.getWriteRecordsList.asScala.map(_.toDouble).toIndexedSeq,
+ writeTime = binary.getWriteTimeList.asScala.map(_.toDouble).toIndexedSeq
+ )
+ }
+
+ private def deserializeExecutorMetricsDistributions(
+ binary: StoreTypes.ExecutorMetricsDistributions):
ExecutorMetricsDistributions = {
+ new ExecutorMetricsDistributions(
+ quantiles = binary.getQuantilesList.asScala.map(_.toDouble).toIndexedSeq,
+ taskTime = binary.getTaskTimeList.asScala.map(_.toDouble).toIndexedSeq,
+ failedTasks =
binary.getFailedTasksList.asScala.map(_.toDouble).toIndexedSeq,
+ succeededTasks =
binary.getSucceededTasksList.asScala.map(_.toDouble).toIndexedSeq,
+ killedTasks =
binary.getKilledTasksList.asScala.map(_.toDouble).toIndexedSeq,
+ inputBytes =
binary.getInputBytesList.asScala.map(_.toDouble).toIndexedSeq,
+ inputRecords =
binary.getInputRecordsList.asScala.map(_.toDouble).toIndexedSeq,
+ outputBytes =
binary.getOutputBytesList.asScala.map(_.toDouble).toIndexedSeq,
+ outputRecords =
binary.getOutputRecordsList.asScala.map(_.toDouble).toIndexedSeq,
+ shuffleRead =
binary.getShuffleReadList.asScala.map(_.toDouble).toIndexedSeq,
+ shuffleReadRecords =
binary.getShuffleReadRecordsList.asScala.map(_.toDouble).toIndexedSeq,
+ shuffleWrite =
binary.getShuffleWriteList.asScala.map(_.toDouble).toIndexedSeq,
+ shuffleWriteRecords =
binary.getShuffleWriteRecordsList.asScala.map(_.toDouble).toIndexedSeq,
+ memoryBytesSpilled =
binary.getMemoryBytesSpilledList.asScala.map(_.toDouble).toIndexedSeq,
+ diskBytesSpilled =
binary.getDiskBytesSpilledList.asScala.map(_.toDouble).toIndexedSeq,
+ peakMemoryMetrics =
deserializeExecutorPeakMetricsDistributions(binary.getPeakMemoryMetrics)
+ )
+ }
+
+ private def deserializeExecutorPeakMetricsDistributions(
+ binary: StoreTypes.ExecutorPeakMetricsDistributions):
ExecutorPeakMetricsDistributions = {
+ new ExecutorPeakMetricsDistributions(
+ quantiles = binary.getQuantilesList.asScala.map(_.toDouble).toIndexedSeq,
+ executorMetrics = binary.getExecutorMetricsList.asScala.map(
+ ExecutorMetricsSerializer.deserialize).toIndexedSeq
+ )
+ }
+
+ private def deserializeTaskData(binary: StoreTypes.TaskData): TaskData = {
+ val resultFetchStart = getOptional(binary.hasResultFetchStart,
+ () => new Date(binary.getResultFetchStart))
+ val duration = getOptional(binary.hasDuration, () => binary.getDuration)
+ val accumulatorUpdates =
AccumulableInfoSerializer.deserialize(binary.getAccumulatorUpdatesList)
+ val taskMetrics = getOptional(binary.hasTaskMetrics,
+ () => deserializeTaskMetrics(binary.getTaskMetrics))
+ new TaskData(
+ taskId = binary.getTaskId,
+ index = binary.getIndex,
+ attempt = binary.getAttempt,
+ partitionId = binary.getPartitionId,
+ launchTime = new Date(binary.getLaunchTime),
+ resultFetchStart = resultFetchStart,
+ duration = duration,
+ executorId = weakIntern(binary.getExecutorId),
+ host = weakIntern(binary.getHost),
+ status = weakIntern(binary.getStatus),
+ taskLocality = weakIntern(binary.getTaskLocality),
+ speculative = binary.getSpeculative,
+ accumulatorUpdates = accumulatorUpdates,
+ errorMessage = getOptional(binary.hasErrorMessage, () =>
weakIntern(binary.getErrorMessage)),
+ taskMetrics = taskMetrics,
+ executorLogs = binary.getExecutorLogsMap.asScala.toMap,
+ schedulerDelay = binary.getSchedulerDelay,
+ gettingResultTime = binary.getGettingResultTime)
+ }
+
+ private def deserializeTaskMetrics(binary: StoreTypes.TaskMetrics):
TaskMetrics = {
+ new TaskMetrics(
+ binary.getExecutorDeserializeTime,
+ binary.getExecutorDeserializeCpuTime,
+ binary.getExecutorRunTime,
+ binary.getExecutorCpuTime,
+ binary.getResultSize,
+ binary.getJvmGcTime,
+ binary.getResultSerializationTime,
+ binary.getMemoryBytesSpilled,
+ binary.getDiskBytesSpilled,
+ binary.getPeakExecutionMemory,
+ deserializeInputMetrics(binary.getInputMetrics),
+ deserializeOutputMetrics(binary.getOutputMetrics),
+ deserializeShuffleReadMetrics(binary.getShuffleReadMetrics),
+ deserializeShuffleWriteMetrics(binary.getShuffleWriteMetrics))
+ }
+
+ private def deserializeInputMetrics(binary: StoreTypes.InputMetrics):
InputMetrics = {
+ new InputMetrics(binary.getBytesRead, binary.getRecordsRead)
+ }
+
+ private def deserializeOutputMetrics(binary: StoreTypes.OutputMetrics):
OutputMetrics = {
+ new OutputMetrics(binary.getBytesWritten, binary.getRecordsWritten)
+ }
+
+ private def deserializeShuffleReadMetrics(
+ binary: StoreTypes.ShuffleReadMetrics): ShuffleReadMetrics = {
+ new ShuffleReadMetrics(
+ binary.getRemoteBlocksFetched,
+ binary.getLocalBlocksFetched,
+ binary.getFetchWaitTime,
+ binary.getRemoteBytesRead,
+ binary.getRemoteBytesReadToDisk,
+ binary.getLocalBytesRead,
+ binary.getRecordsRead)
+ }
+
+ private def deserializeShuffleWriteMetrics(
+ binary: StoreTypes.ShuffleWriteMetrics): ShuffleWriteMetrics = {
+ new ShuffleWriteMetrics(
+ binary.getBytesWritten,
+ binary.getWriteTime,
+ binary.getRecordsWritten)
+ }
+}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala
new file mode 100644
index 00000000000..6014379bb1e
--- /dev/null
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.status.api.v1.StageStatus
+
+private[protobuf] object StageStatusSerializer {
+
+ private def PREFIX = "STAGE_STATUS_"
+
+ def serialize(input: StageStatus): StoreTypes.StageStatus = {
+ StoreTypes.StageStatus.valueOf(PREFIX + input.toString)
+ }
+
+ def deserialize(binary: StoreTypes.StageStatus): StageStatus = {
+ StageStatus.valueOf(StringUtils.removeStart(binary.toString, PREFIX))
+ }
+}
diff --git
a/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
b/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
index 80f258aead6..155a0348398 100644
---
a/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
+++
b/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
@@ -17,10 +17,7 @@
package org.apache.spark.status.protobuf
-import scala.collection.mutable.ArrayBuffer
-
import org.apache.spark.status.TaskDataWrapper
-import org.apache.spark.status.api.v1.AccumulableInfo
import org.apache.spark.status.protobuf.Utils.getOptional
import org.apache.spark.util.Utils.weakIntern
@@ -74,21 +71,14 @@ class TaskDataWrapperSerializer extends ProtobufSerDe {
.setStageAttemptId(input.stageAttemptId)
input.errorMessage.foreach(builder.setErrorMessage)
input.accumulatorUpdates.foreach { update =>
- builder.addAccumulatorUpdates(serializeAccumulableInfo(update))
+
builder.addAccumulatorUpdates(AccumulableInfoSerializer.serialize(update))
}
builder.build().toByteArray
}
def deserialize(bytes: Array[Byte]): TaskDataWrapper = {
val binary = StoreTypes.TaskDataWrapper.parseFrom(bytes)
- val accumulatorUpdates = new ArrayBuffer[AccumulableInfo]()
- binary.getAccumulatorUpdatesList.forEach { update =>
- accumulatorUpdates.append(new AccumulableInfo(
- id = update.getId,
- name = update.getName,
- update = getOptional(update.hasUpdate, update.getUpdate),
- value = update.getValue))
- }
+ val accumulatorUpdates =
AccumulableInfoSerializer.deserialize(binary.getAccumulatorUpdatesList)
new TaskDataWrapper(
taskId = binary.getTaskId,
index = binary.getIndex,
@@ -133,13 +123,4 @@ class TaskDataWrapperSerializer extends ProtobufSerDe {
stageAttemptId = binary.getStageAttemptId
)
}
-
- def serializeAccumulableInfo(input: AccumulableInfo):
StoreTypes.AccumulableInfo = {
- val builder = StoreTypes.AccumulableInfo.newBuilder()
- .setId(input.id)
- .setName(input.name)
- .setValue(input.value)
- input.update.foreach(builder.setUpdate)
- builder.build()
- }
}
diff --git
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index dab9d9c071f..a9edae711b4 100644
---
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -133,13 +133,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite
{
val bytes = serializer.serialize(input)
val result = serializer.deserialize(bytes, classOf[TaskDataWrapper])
- assert(result.accumulatorUpdates.length == input.accumulatorUpdates.length)
- result.accumulatorUpdates.zip(input.accumulatorUpdates).foreach { case
(a1, a2) =>
- assert(a1.id == a2.id)
- assert(a1.name == a2.name)
- assert(a1.update.getOrElse("") == a2.update.getOrElse(""))
- assert(a1.update == a2.update)
- }
+ checkAnswer(result.accumulatorUpdates, input.accumulatorUpdates)
assert(result.taskId == input.taskId)
assert(result.index == input.index)
assert(result.attempt == input.attempt)
@@ -213,27 +207,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite
{
assert(result.stageId == input.stageId)
assert(result.stageAttemptId == input.stageAttemptId)
assert(result.executorId == input.executorId)
- assert(result.info.taskTime == input.info.taskTime)
- assert(result.info.failedTasks == input.info.failedTasks)
- assert(result.info.succeededTasks == input.info.succeededTasks)
- assert(result.info.killedTasks == input.info.killedTasks)
- assert(result.info.inputBytes == input.info.inputBytes)
- assert(result.info.inputRecords == input.info.inputRecords)
- assert(result.info.outputBytes == input.info.outputBytes)
- assert(result.info.outputRecords == input.info.outputRecords)
- assert(result.info.shuffleRead == input.info.shuffleRead)
- assert(result.info.shuffleReadRecords == input.info.shuffleReadRecords)
- assert(result.info.shuffleWrite == input.info.shuffleWrite)
- assert(result.info.shuffleWriteRecords == input.info.shuffleWriteRecords)
- assert(result.info.memoryBytesSpilled == input.info.memoryBytesSpilled)
- assert(result.info.diskBytesSpilled == input.info.diskBytesSpilled)
- assert(result.info.isBlacklistedForStage ==
input.info.isBlacklistedForStage)
- assert(result.info.isExcludedForStage == input.info.isExcludedForStage)
- assert(result.info.peakMemoryMetrics.isDefined)
- ExecutorMetricType.metricToOffset.foreach { case (name, index) =>
- result.info.peakMemoryMetrics.get.getMetricValue(name) ==
- input.info.peakMemoryMetrics.get.getMetricValue(name)
- }
+ checkAnswer(result.info, input.info)
}
test("Application Environment Info") {
@@ -613,11 +587,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite
{
val result = serializer.deserialize(bytes,
classOf[SpeculationStageSummaryWrapper])
assert(result.stageId == input.stageId)
assert(result.stageAttemptId == input.stageAttemptId)
- assert(result.info.numTasks == input.info.numTasks)
- assert(result.info.numActiveTasks == input.info.numActiveTasks)
- assert(result.info.numCompletedTasks == input.info.numCompletedTasks)
- assert(result.info.numFailedTasks == input.info.numFailedTasks)
- assert(result.info.numKilledTasks == input.info.numKilledTasks)
+ checkAnswer(result.info, input.info)
}
test("Executor Summary") {
@@ -718,10 +688,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite
{
assert(result.info.peakMemoryMetrics.isDefined ==
input.info.peakMemoryMetrics.isDefined)
if (result.info.peakMemoryMetrics.isDefined &&
input.info.peakMemoryMetrics.isDefined) {
- ExecutorMetricType.metricToOffset.foreach { case (name, index) =>
- result.info.peakMemoryMetrics.get.getMetricValue(name) ==
- input.info.peakMemoryMetrics.get.getMetricValue(name)
- }
+ checkAnswer(result.info.peakMemoryMetrics.get,
input.info.peakMemoryMetrics.get)
}
assert(result.info.attributes.size == input.info.attributes.size)
@@ -857,4 +824,573 @@ class KVStoreProtobufSerializerSuite extends
SparkFunSuite {
compareClusters(result.rootCluster, input.rootCluster)
}
+
+ test("Stage Data") {
+ val accumulatorUpdates = Seq(
+ new AccumulableInfo(1L, "duration", Some("update"), "value1"),
+ new AccumulableInfo(2L, "duration2", None, "value2")
+ )
+ val inputMetrics = new InputMetrics(
+ bytesRead = 1L,
+ recordsRead = 2L)
+ val outputMetrics = new OutputMetrics(
+ bytesWritten = 1L,
+ recordsWritten = 2L
+ )
+ val shuffleReadMetrics = new ShuffleReadMetrics(
+ remoteBlocksFetched = 1L,
+ localBlocksFetched = 2L,
+ fetchWaitTime = 3L,
+ remoteBytesRead = 4L,
+ remoteBytesReadToDisk = 5L,
+ localBytesRead = 6L,
+ recordsRead = 7L
+ )
+ val shuffleWriteMetrics = new ShuffleWriteMetrics(
+ bytesWritten = 1L,
+ writeTime = 2L,
+ recordsWritten = 3L
+ )
+ val taskMetrics = new TaskMetrics(
+ executorDeserializeTime = 1L,
+ executorDeserializeCpuTime = 2L,
+ executorRunTime = 3L,
+ executorCpuTime = 4L,
+ resultSize = 5L,
+ jvmGcTime = 6L,
+ resultSerializationTime = 7L,
+ memoryBytesSpilled = 8L,
+ diskBytesSpilled = 9L,
+ peakExecutionMemory = 10L,
+ inputMetrics = inputMetrics,
+ outputMetrics = outputMetrics,
+ shuffleReadMetrics = shuffleReadMetrics,
+ shuffleWriteMetrics = shuffleWriteMetrics
+ )
+ val taskData1 = new TaskData(
+ taskId = 1L,
+ index = 2,
+ attempt = 3,
+ partitionId = 4,
+ launchTime = new Date(123456L),
+ resultFetchStart = Some(new Date(223456L)),
+ duration = Some(10000L),
+ executorId = "executor_id_1",
+ host = "host_name_1",
+ status = "SUCCESS",
+ taskLocality = "LOCAL",
+ speculative = true,
+ accumulatorUpdates = accumulatorUpdates,
+ errorMessage = Some("error_1"),
+ taskMetrics = Some(taskMetrics),
+ executorLogs = Map("executor_id_1" -> "executor_log_1"),
+ schedulerDelay = 5L,
+ gettingResultTime = 6L
+ )
+ val taskData2 = new TaskData(
+ taskId = 11L,
+ index = 12,
+ attempt = 13,
+ partitionId = 14,
+ launchTime = new Date(1123456L),
+ resultFetchStart = Some(new Date(1223456L)),
+ duration = Some(110000L),
+ executorId = "executor_id_2",
+ host = "host_name_2",
+ status = "SUCCESS",
+ taskLocality = "LOCAL",
+ speculative = false,
+ accumulatorUpdates = accumulatorUpdates,
+ errorMessage = Some("error_2"),
+ taskMetrics = Some(taskMetrics),
+ executorLogs = Map("executor_id_2" -> "executor_log_2"),
+ schedulerDelay = 15L,
+ gettingResultTime = 16L
+ )
+ val tasks = Some(
+ Map(1L -> taskData1, 2L -> taskData2)
+ )
+ val peakMemoryMetrics =
+ Some(new ExecutorMetrics(Array(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L,
1024L)))
+ val executorStageSummary1 = new ExecutorStageSummary(
+ taskTime = 1L,
+ failedTasks = 2,
+ succeededTasks = 3,
+ killedTasks = 4,
+ inputBytes = 5L,
+ inputRecords = 6L,
+ outputBytes = 7L,
+ outputRecords = 8L,
+ shuffleRead = 9L,
+ shuffleReadRecords = 10L,
+ shuffleWrite = 11L,
+ shuffleWriteRecords = 12L,
+ memoryBytesSpilled = 13L,
+ diskBytesSpilled = 14L,
+ isBlacklistedForStage = true,
+ peakMemoryMetrics = peakMemoryMetrics,
+ isExcludedForStage = false)
+ val executorStageSummary2 = new ExecutorStageSummary(
+ taskTime = 11L,
+ failedTasks = 12,
+ succeededTasks = 13,
+ killedTasks = 14,
+ inputBytes = 15L,
+ inputRecords = 16L,
+ outputBytes = 17L,
+ outputRecords = 18L,
+ shuffleRead = 19L,
+ shuffleReadRecords = 110L,
+ shuffleWrite = 111L,
+ shuffleWriteRecords = 112L,
+ memoryBytesSpilled = 113L,
+ diskBytesSpilled = 114L,
+ isBlacklistedForStage = false,
+ peakMemoryMetrics = peakMemoryMetrics,
+ isExcludedForStage = true)
+ val executorSummary = Some(
+ Map("executor_id_1" -> executorStageSummary1, "executor_id_2" ->
executorStageSummary2)
+ )
+ val speculationStageSummary = new SpeculationStageSummary(
+ numTasks = 3,
+ numActiveTasks = 4,
+ numCompletedTasks = 5,
+ numFailedTasks = 6,
+ numKilledTasks = 7
+ )
+ val inputMetricDistributions = new InputMetricDistributions(
+ bytesRead = IndexedSeq(1.001D, 2.001D),
+ recordsRead = IndexedSeq(3.001D, 4.001D)
+ )
+ val outputMetricDistributions = new OutputMetricDistributions(
+ bytesWritten = IndexedSeq(1.001D, 2.001D),
+ recordsWritten = IndexedSeq(3.001D, 4.001D)
+ )
+ val shuffleReadMetricDistributions = new ShuffleReadMetricDistributions(
+ readBytes = IndexedSeq(1.001D, 2.001D),
+ readRecords = IndexedSeq(3.001D, 4.001D),
+ remoteBlocksFetched = IndexedSeq(5.001D, 6.001D),
+ localBlocksFetched = IndexedSeq(7.001D, 8.001D),
+ fetchWaitTime = IndexedSeq(9.001D, 10.001D),
+ remoteBytesRead = IndexedSeq(11.001D, 12.001D),
+ remoteBytesReadToDisk = IndexedSeq(13.001D, 14.001D),
+ totalBlocksFetched = IndexedSeq(15.001D, 16.001D)
+ )
+ val shuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions(
+ writeBytes = IndexedSeq(1.001D, 2.001D),
+ writeRecords = IndexedSeq(3.001D, 4.001D),
+ writeTime = IndexedSeq(5.001D, 6.001D)
+ )
+ val taskMetricDistributions = new TaskMetricDistributions(
+ quantiles = IndexedSeq(1.001D, 2.001D),
+ duration = IndexedSeq(3.001D, 4.001D),
+ executorDeserializeTime = IndexedSeq(5.001D, 6.001D),
+ executorDeserializeCpuTime = IndexedSeq(7.001D, 8.001D),
+ executorRunTime = IndexedSeq(9.001D, 10.001D),
+ executorCpuTime = IndexedSeq(11.001D, 12.001D),
+ resultSize = IndexedSeq(13.001D, 14.001D),
+ jvmGcTime = IndexedSeq(15.001D, 16.001D),
+ resultSerializationTime = IndexedSeq(17.001D, 18.001D),
+ gettingResultTime = IndexedSeq(19.001D, 20.001D),
+ schedulerDelay = IndexedSeq(21.001D, 22.001D),
+ peakExecutionMemory = IndexedSeq(23.001D, 24.001D),
+ memoryBytesSpilled = IndexedSeq(25.001D, 26.001D),
+ diskBytesSpilled = IndexedSeq(27.001D, 28.001D),
+ inputMetrics = inputMetricDistributions,
+ outputMetrics = outputMetricDistributions,
+ shuffleReadMetrics = shuffleReadMetricDistributions,
+ shuffleWriteMetrics = shuffleWriteMetricDistributions
+ )
+ val executorPeakMetricsDistributions = new
ExecutorPeakMetricsDistributions(
+ quantiles = IndexedSeq(1.001D, 2.001D),
+ executorMetrics = IndexedSeq(
+ new ExecutorMetrics(Array(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 1024L)))
+ )
+ val executorMetricsDistributions = new ExecutorMetricsDistributions(
+ quantiles = IndexedSeq(1.001D, 2.001D),
+ taskTime = IndexedSeq(3.001D, 4.001D),
+ failedTasks = IndexedSeq(5.001D, 6.001D),
+ succeededTasks = IndexedSeq(7.001D, 8.001D),
+ killedTasks = IndexedSeq(9.001D, 10.001D),
+ inputBytes = IndexedSeq(11.001D, 12.001D),
+ inputRecords = IndexedSeq(13.001D, 14.001D),
+ outputBytes = IndexedSeq(15.001D, 16.001D),
+ outputRecords = IndexedSeq(17.001D, 18.001D),
+ shuffleRead = IndexedSeq(19.001D, 20.001D),
+ shuffleReadRecords = IndexedSeq(21.001D, 22.001D),
+ shuffleWrite = IndexedSeq(23.001D, 24.001D),
+ shuffleWriteRecords = IndexedSeq(25.001D, 24.001D),
+ memoryBytesSpilled = IndexedSeq(27.001D, 28.001D),
+ diskBytesSpilled = IndexedSeq(29.001D, 30.001D),
+ peakMemoryMetrics = executorPeakMetricsDistributions
+ )
+ val info = new StageData(
+ status = StageStatus.COMPLETE,
+ stageId = 1,
+ attemptId = 2,
+ numTasks = 3,
+ numActiveTasks = 4,
+ numCompleteTasks = 5,
+ numFailedTasks = 6,
+ numKilledTasks = 7,
+ numCompletedIndices = 8,
+ submissionTime = Some(new Date(123456L)),
+ firstTaskLaunchedTime = Some(new Date(234567L)),
+ completionTime = Some(new Date(654321L)),
+ failureReason = Some("failure reason"),
+ executorDeserializeTime = 9L,
+ executorDeserializeCpuTime = 10L,
+ executorRunTime = 11L,
+ executorCpuTime = 12L,
+ resultSize = 13L,
+ jvmGcTime = 14L,
+ resultSerializationTime = 15L,
+ memoryBytesSpilled = 16L,
+ diskBytesSpilled = 17L,
+ peakExecutionMemory = 18L,
+ inputBytes = 19L,
+ inputRecords = 20L,
+ outputBytes = 21L,
+ outputRecords = 22L,
+ shuffleRemoteBlocksFetched = 23L,
+ shuffleLocalBlocksFetched = 24L,
+ shuffleFetchWaitTime = 25L,
+ shuffleRemoteBytesRead = 26L,
+ shuffleRemoteBytesReadToDisk = 27L,
+ shuffleLocalBytesRead = 28L,
+ shuffleReadBytes = 29L,
+ shuffleReadRecords = 30L,
+ shuffleWriteBytes = 31L,
+ shuffleWriteTime = 32L,
+ shuffleWriteRecords = 33L,
+ name = "name",
+ description = Some("test description"),
+ details = "test details",
+ schedulingPool = "test scheduling pool",
+ rddIds = Seq(1, 2, 3, 4, 5, 6),
+ accumulatorUpdates = accumulatorUpdates,
+ tasks = tasks,
+ executorSummary = executorSummary,
+ speculationSummary = Some(speculationStageSummary),
+ killedTasksSummary = Map("task_1" -> 1),
+ resourceProfileId = 34,
+ peakExecutorMetrics = peakMemoryMetrics,
+ taskMetricsDistributions = Some(taskMetricDistributions),
+ executorMetricsDistributions = Some(executorMetricsDistributions)
+ )
+ val input = new StageDataWrapper(
+ info = info,
+ jobIds = Set(1, 2, 3, 4),
+ locality = Map(
+ "PROCESS_LOCAL" -> 1L,
+ "NODE_LOCAL" -> 2L
+ )
+ )
+
+ val bytes = serializer.serialize(input)
+ val result = serializer.deserialize(bytes, classOf[StageDataWrapper])
+
+ assert(result.jobIds == input.jobIds)
+ assert(result.locality == input.locality)
+
+ assert(result.info.status == input.info.status)
+ assert(result.info.stageId == input.info.stageId)
+ assert(result.info.attemptId == input.info.attemptId)
+ assert(result.info.numTasks == input.info.numTasks)
+ assert(result.info.numActiveTasks == input.info.numActiveTasks)
+ assert(result.info.numCompleteTasks == input.info.numCompleteTasks)
+ assert(result.info.numFailedTasks == input.info.numFailedTasks)
+ assert(result.info.numKilledTasks == input.info.numKilledTasks)
+ assert(result.info.numCompletedIndices == input.info.numCompletedIndices)
+
+ assert(result.info.submissionTime == input.info.submissionTime)
+ assert(result.info.firstTaskLaunchedTime ==
input.info.firstTaskLaunchedTime)
+ assert(result.info.completionTime == input.info.completionTime)
+ assert(result.info.failureReason == input.info.failureReason)
+
+ assert(result.info.executorDeserializeTime ==
input.info.executorDeserializeTime)
+ assert(result.info.executorDeserializeCpuTime ==
input.info.executorDeserializeCpuTime)
+ assert(result.info.executorRunTime == input.info.executorRunTime)
+ assert(result.info.executorCpuTime == input.info.executorCpuTime)
+ assert(result.info.resultSize == input.info.resultSize)
+ assert(result.info.jvmGcTime == input.info.jvmGcTime)
+ assert(result.info.resultSerializationTime ==
input.info.resultSerializationTime)
+ assert(result.info.memoryBytesSpilled == input.info.memoryBytesSpilled)
+ assert(result.info.diskBytesSpilled == input.info.diskBytesSpilled)
+ assert(result.info.peakExecutionMemory == input.info.peakExecutionMemory)
+ assert(result.info.inputBytes == input.info.inputBytes)
+ assert(result.info.inputRecords == input.info.inputRecords)
+ assert(result.info.outputBytes == input.info.outputBytes)
+ assert(result.info.outputRecords == input.info.outputRecords)
+ assert(result.info.shuffleRemoteBlocksFetched ==
input.info.shuffleRemoteBlocksFetched)
+ assert(result.info.shuffleLocalBlocksFetched ==
input.info.shuffleLocalBlocksFetched)
+ assert(result.info.shuffleFetchWaitTime == input.info.shuffleFetchWaitTime)
+ assert(result.info.shuffleRemoteBytesRead ==
input.info.shuffleRemoteBytesRead)
+ assert(result.info.shuffleRemoteBytesReadToDisk ==
input.info.shuffleRemoteBytesReadToDisk)
+ assert(result.info.shuffleLocalBytesRead ==
input.info.shuffleLocalBytesRead)
+ assert(result.info.shuffleReadBytes == input.info.shuffleReadBytes)
+ assert(result.info.shuffleReadRecords == input.info.shuffleReadRecords)
+ assert(result.info.shuffleWriteBytes == input.info.shuffleWriteBytes)
+ assert(result.info.shuffleWriteTime == input.info.shuffleWriteTime)
+ assert(result.info.shuffleWriteRecords == input.info.shuffleWriteRecords)
+
+ assert(result.info.name == input.info.name)
+ assert(result.info.description == input.info.description)
+ assert(result.info.details == input.info.details)
+ assert(result.info.schedulingPool == input.info.schedulingPool)
+
+ assert(result.info.rddIds == input.info.rddIds)
+ checkAnswer(result.info.accumulatorUpdates, input.info.accumulatorUpdates)
+
+ assert(result.info.tasks.isDefined == input.info.tasks.isDefined)
+ if (result.info.tasks.isDefined && input.info.tasks.isDefined) {
+ checkIdTask(result.info.tasks.get, input.info.tasks.get)
+ }
+
+ assert(result.info.executorSummary.isDefined ==
input.info.executorSummary.isDefined)
+ if (result.info.executorSummary.isDefined &&
input.info.executorSummary.isDefined) {
+ checkAnswer(result.info.executorSummary.get,
input.info.executorSummary.get)
+ }
+
+ assert(result.info.speculationSummary.isDefined ==
input.info.speculationSummary.isDefined)
+ if (result.info.speculationSummary.isDefined &&
input.info.speculationSummary.isDefined) {
+ checkAnswer(result.info.speculationSummary.get,
input.info.speculationSummary.get)
+ }
+ assert(result.info.killedTasksSummary == input.info.killedTasksSummary)
+ assert(result.info.resourceProfileId == input.info.resourceProfileId)
+ assert(result.info.peakExecutorMetrics.isDefined ==
input.info.peakExecutorMetrics.isDefined)
+ if (result.info.peakExecutorMetrics.isDefined &&
input.info.peakExecutorMetrics.isDefined) {
+ checkAnswer(result.info.peakExecutorMetrics.get,
input.info.peakExecutorMetrics.get)
+ }
+ assert(result.info.taskMetricsDistributions.isDefined ==
+ input.info.taskMetricsDistributions.isDefined)
+ if (result.info.taskMetricsDistributions.isDefined &&
+ input.info.taskMetricsDistributions.isDefined) {
+ checkAnswer(result.info.taskMetricsDistributions.get,
input.info.taskMetricsDistributions.get)
+ }
+ assert(result.info.executorMetricsDistributions.isDefined ==
+ input.info.executorMetricsDistributions.isDefined)
+ if (result.info.executorMetricsDistributions.isDefined &&
+ input.info.executorMetricsDistributions.isDefined) {
+ checkAnswer(result.info.executorMetricsDistributions.get,
+ input.info.executorMetricsDistributions.get)
+ }
+ }
+
+ private def checkAnswer(result: TaskMetrics, expected: TaskMetrics): Unit = {
+ assert(result.executorDeserializeTime == expected.executorDeserializeTime)
+ assert(result.executorDeserializeCpuTime ==
expected.executorDeserializeCpuTime)
+ assert(result.executorRunTime == expected.executorRunTime)
+ assert(result.executorCpuTime == expected.executorCpuTime)
+ assert(result.resultSize == expected.resultSize)
+ assert(result.jvmGcTime == expected.jvmGcTime)
+ assert(result.resultSerializationTime == expected.resultSerializationTime)
+ assert(result.memoryBytesSpilled == expected.memoryBytesSpilled)
+ assert(result.diskBytesSpilled == expected.diskBytesSpilled)
+ assert(result.peakExecutionMemory == expected.peakExecutionMemory)
+ checkAnswer(result.inputMetrics, expected.inputMetrics)
+ checkAnswer(result.outputMetrics, expected.outputMetrics)
+ checkAnswer(result.shuffleReadMetrics, expected.shuffleReadMetrics)
+ checkAnswer(result.shuffleWriteMetrics, expected.shuffleWriteMetrics)
+ }
+
+ private def checkAnswer(result: InputMetrics, expected: InputMetrics): Unit
= {
+ assert(result.bytesRead == expected.bytesRead)
+ assert(result.recordsRead == expected.recordsRead)
+ }
+
+ private def checkAnswer(result: OutputMetrics, expected: OutputMetrics):
Unit = {
+ assert(result.bytesWritten == expected.bytesWritten)
+ assert(result.recordsWritten == expected.recordsWritten)
+ }
+
+ private def checkAnswer(result: ShuffleReadMetrics, expected:
ShuffleReadMetrics): Unit = {
+ assert(result.remoteBlocksFetched == expected.remoteBlocksFetched)
+ assert(result.localBlocksFetched == expected.localBlocksFetched)
+ assert(result.fetchWaitTime == expected.fetchWaitTime)
+ assert(result.remoteBytesRead == expected.remoteBytesRead)
+ assert(result.remoteBytesReadToDisk == expected.remoteBytesReadToDisk)
+ assert(result.localBytesRead == expected.localBytesRead)
+ assert(result.recordsRead == expected.recordsRead)
+ }
+
+ private def checkAnswer(result: ShuffleWriteMetrics, expected:
ShuffleWriteMetrics): Unit = {
+ assert(result.bytesWritten == expected.bytesWritten)
+ assert(result.writeTime == expected.writeTime)
+ assert(result.recordsWritten == expected.recordsWritten)
+ }
+
+ private def checkAnswer(result: collection.Seq[AccumulableInfo],
+ expected: collection.Seq[AccumulableInfo]): Unit = {
+ assert(result.length == expected.length)
+ result.zip(expected).foreach { case (a1, a2) =>
+ assert(a1.id == a2.id)
+ assert(a1.name == a2.name)
+ assert(a1.update.getOrElse("") == a2.update.getOrElse(""))
+ assert(a1.update == a2.update)
+ }
+ }
+
+ private def checkIdTask(result: Map[Long, TaskData], expected: Map[Long,
TaskData]): Unit = {
+ assert(result.size == expected.size)
+ assert(result.keys.size == expected.keys.size)
+ result.keysIterator.foreach { k =>
+ assert(expected.contains(k))
+ checkAnswer(result(k), expected(k))
+ }
+ }
+
+ private def checkAnswer(result: TaskData, expected: TaskData): Unit = {
+ assert(result.taskId == expected.taskId)
+ assert(result.index == expected.index)
+ assert(result.attempt == expected.attempt)
+ assert(result.partitionId == expected.partitionId)
+ assert(result.launchTime == expected.launchTime)
+ assert(result.resultFetchStart == expected.resultFetchStart)
+ assert(result.duration == expected.duration)
+ assert(result.executorId == expected.executorId)
+ assert(result.host == expected.host)
+ assert(result.status == expected.status)
+ assert(result.taskLocality == expected.taskLocality)
+ assert(result.speculative == expected.speculative)
+ checkAnswer(result.accumulatorUpdates, expected.accumulatorUpdates)
+ assert(result.errorMessage == expected.errorMessage)
+ assert(result.taskMetrics.isDefined == expected.taskMetrics.isDefined)
+ if (result.taskMetrics.isDefined && expected.taskMetrics.isDefined) {
+ checkAnswer(result.taskMetrics.get, expected.taskMetrics.get)
+ }
+ }
+
+ private def checkAnswer(result: Map[String, ExecutorStageSummary],
+ expected: Map[String, ExecutorStageSummary]): Unit = {
+ assert(result.size == expected.size)
+ assert(result.keys.size == expected.keys.size)
+ result.keysIterator.foreach { k =>
+ assert(expected.contains(k))
+ checkAnswer(result(k), expected(k))
+ }
+ }
+
+ private def checkAnswer(result: ExecutorStageSummary,
+ expected: ExecutorStageSummary): Unit = {
+ assert(result.taskTime == expected.taskTime)
+ assert(result.failedTasks == expected.failedTasks)
+ assert(result.succeededTasks == expected.succeededTasks)
+ assert(result.killedTasks == expected.killedTasks)
+ assert(result.inputBytes == expected.inputBytes)
+ assert(result.inputRecords == expected.inputRecords)
+ assert(result.outputBytes == expected.outputBytes)
+ assert(result.outputRecords == expected.outputRecords)
+ assert(result.shuffleRead == expected.shuffleRead)
+ assert(result.shuffleReadRecords == expected.shuffleReadRecords)
+ assert(result.shuffleWrite == expected.shuffleWrite)
+ assert(result.shuffleWriteRecords == expected.shuffleWriteRecords)
+ assert(result.memoryBytesSpilled == expected.memoryBytesSpilled)
+ assert(result.diskBytesSpilled == expected.diskBytesSpilled)
+ assert(result.isBlacklistedForStage == expected.isBlacklistedForStage)
+ assert(result.isExcludedForStage == expected.isExcludedForStage)
+ assert(result.peakMemoryMetrics.isDefined ==
expected.peakMemoryMetrics.isDefined)
+ if (result.peakMemoryMetrics.isDefined &&
expected.peakMemoryMetrics.isDefined) {
+ checkAnswer(result.peakMemoryMetrics.get, expected.peakMemoryMetrics.get)
+ }
+ }
+
+ private def checkAnswer(result: SpeculationStageSummary,
+ expected: SpeculationStageSummary): Unit = {
+ assert(result.numTasks == expected.numTasks)
+ assert(result.numActiveTasks == expected.numActiveTasks)
+ assert(result.numCompletedTasks == expected.numCompletedTasks)
+ assert(result.numFailedTasks == expected.numFailedTasks)
+ assert(result.numKilledTasks == expected.numKilledTasks)
+ }
+
+ private def checkAnswer(result: ExecutorMetrics, expected: ExecutorMetrics):
Unit = {
+ ExecutorMetricType.metricToOffset.foreach { case (name, _) =>
+ result.getMetricValue(name) == expected.getMetricValue(name)
+ }
+ }
+
+ private def checkAnswer(result: TaskMetricDistributions,
+ expected: TaskMetricDistributions): Unit = {
+ assert(result.quantiles == expected.quantiles)
+ assert(result.duration == expected.duration)
+ assert(result.executorDeserializeTime == expected.executorDeserializeTime)
+ assert(result.executorDeserializeCpuTime ==
expected.executorDeserializeCpuTime)
+ assert(result.executorRunTime == expected.executorRunTime)
+ assert(result.executorCpuTime == expected.executorCpuTime)
+ assert(result.resultSize == expected.resultSize)
+ assert(result.jvmGcTime == expected.jvmGcTime)
+ assert(result.resultSerializationTime == expected.resultSerializationTime)
+ assert(result.gettingResultTime == expected.gettingResultTime)
+ assert(result.schedulerDelay == expected.schedulerDelay)
+ assert(result.peakExecutionMemory == expected.peakExecutionMemory)
+ assert(result.memoryBytesSpilled == expected.memoryBytesSpilled)
+ assert(result.diskBytesSpilled == expected.diskBytesSpilled)
+
+ checkAnswer(result.inputMetrics, expected.inputMetrics)
+ checkAnswer(result.outputMetrics, expected.outputMetrics)
+ checkAnswer(result.shuffleReadMetrics, expected.shuffleReadMetrics)
+ checkAnswer(result.shuffleWriteMetrics, expected.shuffleWriteMetrics)
+ }
+
+ private def checkAnswer(result: InputMetricDistributions,
+ expected: InputMetricDistributions): Unit = {
+ assert(result.bytesRead == expected.bytesRead)
+ assert(result.recordsRead == expected.recordsRead)
+ }
+
+ private def checkAnswer(result: OutputMetricDistributions,
+ expected: OutputMetricDistributions): Unit = {
+ assert(result.bytesWritten == expected.bytesWritten)
+ assert(result.recordsWritten == expected.recordsWritten)
+ }
+
+ private def checkAnswer(result: ShuffleReadMetricDistributions,
+ expected: ShuffleReadMetricDistributions): Unit = {
+ assert(result.readBytes == expected.readBytes)
+ assert(result.readRecords == expected.readRecords)
+ assert(result.remoteBlocksFetched == expected.remoteBlocksFetched)
+ assert(result.localBlocksFetched == expected.localBlocksFetched)
+ assert(result.fetchWaitTime == expected.fetchWaitTime)
+ assert(result.remoteBytesRead == expected.remoteBytesRead)
+ assert(result.remoteBytesReadToDisk == expected.remoteBytesReadToDisk)
+ assert(result.totalBlocksFetched == expected.totalBlocksFetched)
+ }
+
+ private def checkAnswer(result: ShuffleWriteMetricDistributions,
+ expected: ShuffleWriteMetricDistributions): Unit = {
+ assert(result.writeBytes == expected.writeBytes)
+ assert(result.writeRecords == expected.writeRecords)
+ assert(result.writeTime == expected.writeTime)
+ }
+
+ private def checkAnswer(result: ExecutorMetricsDistributions,
+ expected: ExecutorMetricsDistributions): Unit = {
+ assert(result.quantiles == expected.quantiles)
+
+ assert(result.taskTime == expected.taskTime)
+ assert(result.failedTasks == expected.failedTasks)
+ assert(result.succeededTasks == expected.succeededTasks)
+ assert(result.killedTasks == expected.killedTasks)
+ assert(result.inputBytes == expected.inputBytes)
+ assert(result.inputRecords == expected.inputRecords)
+ assert(result.outputBytes == expected.outputBytes)
+ assert(result.outputRecords == expected.outputRecords)
+ assert(result.shuffleRead == expected.shuffleRead)
+ assert(result.shuffleReadRecords == expected.shuffleReadRecords)
+ assert(result.shuffleWrite == expected.shuffleWrite)
+ assert(result.shuffleWriteRecords == expected.shuffleWriteRecords)
+ assert(result.memoryBytesSpilled == expected.memoryBytesSpilled)
+ assert(result.diskBytesSpilled == expected.diskBytesSpilled)
+ checkAnswer(result.peakMemoryMetrics, expected.peakMemoryMetrics)
+ }
+
+ private def checkAnswer(result: ExecutorPeakMetricsDistributions,
+ expected: ExecutorPeakMetricsDistributions): Unit = {
+ assert(result.quantiles == expected.quantiles)
+ assert(result.executorMetrics.size == expected.executorMetrics.size)
+ result.executorMetrics.zip(expected.executorMetrics).foreach { case (a1,
a2) =>
+ checkAnswer(a1, a2)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]