This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3c4806c8fee [HUDI-9249] Support displaying
InsertIntoHoodieTableCommand metrics in Spark Web UI (#13068)
3c4806c8fee is described below
commit 3c4806c8fee0d942124594996243f8a1288d6853
Author: wangyinsheng <[email protected]>
AuthorDate: Fri May 9 21:13:57 2025 +0800
[HUDI-9249] Support displaying InsertIntoHoodieTableCommand metrics in
Spark Web UI (#13068)
* this would induce additional overhead for listing the timeline and
decoding the commit metadata.
---------
Co-authored-by: wangyinsheng <[email protected]>
---
.../command/CreateHoodieTableAsSelectCommand.scala | 6 +-
.../sql/hudi/command/HoodieCommandMetrics.scala | 97 ++++++++++++++++++++++
.../command/InsertIntoHoodieTableCommand.scala | 20 ++++-
.../hudi/command/TestHoodieCommandMetrics.scala | 64 ++++++++++++++
4 files changed, 183 insertions(+), 4 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
index cd09df9a7a9..58a9fc1ad3f 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
+import org.apache.spark.sql.execution.metric.SQLMetric
import scala.collection.JavaConverters._
@@ -45,6 +46,8 @@ case class CreateHoodieTableAsSelectCommand(
query: LogicalPlan) extends DataWritingCommand {
override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+ override lazy val metrics: Map[String, SQLMetric] =
HoodieCommandMetrics.metrics
+
override def run(sparkSession: SparkSession, plan: SparkPlan): Seq[Row] = {
checkState(table.tableType != CatalogTableType.VIEW)
checkState(table.provider.isDefined)
@@ -106,7 +109,8 @@ case class CreateHoodieTableAsSelectCommand(
)
val partitionSpec = updatedTable.partitionColumnNames.map((_,
None)).toMap
val success = InsertIntoHoodieTableCommand.run(sparkSession,
updatedTable, plan, partitionSpec,
- mode == SaveMode.Overwrite, refreshTable = false, extraOptions =
options)
+ mode == SaveMode.Overwrite, refreshTable = false, extraOptions =
options, metrics = metrics)
+ DataWritingCommand.propogateMetrics(sparkSession.sparkContext, this,
metrics)
if (success) {
// If write success, create the table in catalog if it has not synced
to the
// catalog by the meta sync.
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/HoodieCommandMetrics.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/HoodieCommandMetrics.scala
new file mode 100644
index 00000000000..2b356bf3c56
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/HoodieCommandMetrics.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.common.model.HoodieCommitMetadata
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, InstantComparison}
+import org.apache.hudi.common.util.VisibleForTesting
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+
+import scala.collection.JavaConverters._
+
+object HoodieCommandMetrics {
+
+ def updateInsertMetrics(metrics: Map[String, SQLMetric], metaClient:
HoodieTableMetaClient, commitInstantTime: String): Unit = {
+ val timeline = metaClient.getActiveTimeline.reload().getCommitsTimeline()
+ val commitInstant = timeline.getInstants.asScala
+ .filter(instant =>
InstantComparison.EQUALS.test(instant.requestedTime(), commitInstantTime))
+ commitInstant.map { commit: HoodieInstant =>
+ val metadata = timeline.readCommitMetadata(commit)
+ updateInsertMetrics(metrics, metadata)
+ }
+ }
+
+ def updateInsertMetrics(metrics: Map[String, SQLMetric], metadata:
HoodieCommitMetadata): Unit = {
+ updateInsertMetric(metrics, NUM_PARTITION_KEY,
metadata.fetchTotalPartitionsWritten())
+ updateInsertMetric(metrics, NUM_INSERT_FILE_KEY,
metadata.fetchTotalFilesInsert())
+ updateInsertMetric(metrics, NUM_UPDATE_FILE_KEY,
metadata.fetchTotalFilesUpdated())
+ updateInsertMetric(metrics, NUM_WRITE_ROWS_KEY,
metadata.fetchTotalRecordsWritten())
+ updateInsertMetric(metrics, NUM_UPDATE_ROWS_KEY,
metadata.fetchTotalUpdateRecordsWritten())
+ updateInsertMetric(metrics, NUM_INSERT_ROWS_KEY,
metadata.fetchTotalInsertRecordsWritten())
+ updateInsertMetric(metrics, NUM_DELETE_ROWS_KEY,
metadata.getTotalRecordsDeleted())
+ updateInsertMetric(metrics, NUM_OUTPUT_BYTES_KEY,
metadata.fetchTotalBytesWritten())
+ updateInsertMetric(metrics, INSERT_TIME, metadata.getTotalCreateTime())
+ updateInsertMetric(metrics, UPSERT_TIME, metadata.getTotalUpsertTime())
+ }
+
+ private def updateInsertMetric(metrics: Map[String, SQLMetric], name:
String, value: Long): Unit = {
+ val metric = metrics.get(name)
+ metric.foreach(_.set(value))
+ }
+
+
+ @VisibleForTesting
+ val NUM_PARTITION_KEY = "number of written partitions"
+ @VisibleForTesting
+ val NUM_INSERT_FILE_KEY = "number of inserted files"
+ @VisibleForTesting
+ val NUM_UPDATE_FILE_KEY = "number of updated files"
+ @VisibleForTesting
+ val NUM_WRITE_ROWS_KEY = "number of written rows"
+ @VisibleForTesting
+ val NUM_UPDATE_ROWS_KEY = "number of updated rows"
+ @VisibleForTesting
+ val NUM_INSERT_ROWS_KEY = "number of inserted rows"
+ @VisibleForTesting
+ val NUM_DELETE_ROWS_KEY = "number of deleted rows"
+ @VisibleForTesting
+ val NUM_OUTPUT_BYTES_KEY = "output size in bytes"
+ @VisibleForTesting
+ val INSERT_TIME = "total insert time"
+ @VisibleForTesting
+ val UPSERT_TIME = "total upsert time"
+
+ def metrics: Map[String, SQLMetric] = {
+ val sparkContext = SparkContext.getActive.get
+ Map(
+ NUM_PARTITION_KEY -> SQLMetrics.createMetric(sparkContext,
NUM_PARTITION_KEY),
+ NUM_INSERT_FILE_KEY -> SQLMetrics.createMetric(sparkContext,
NUM_INSERT_FILE_KEY),
+ NUM_UPDATE_FILE_KEY -> SQLMetrics.createMetric(sparkContext,
NUM_UPDATE_FILE_KEY),
+ NUM_WRITE_ROWS_KEY -> SQLMetrics.createMetric(sparkContext,
NUM_WRITE_ROWS_KEY),
+ NUM_UPDATE_ROWS_KEY -> SQLMetrics.createMetric(sparkContext,
NUM_UPDATE_ROWS_KEY),
+ NUM_INSERT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext,
NUM_INSERT_ROWS_KEY),
+ NUM_DELETE_ROWS_KEY -> SQLMetrics.createMetric(sparkContext,
NUM_DELETE_ROWS_KEY),
+ NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext,
NUM_OUTPUT_BYTES_KEY),
+ INSERT_TIME -> SQLMetrics.createTimingMetric(sparkContext, INSERT_TIME),
+ UPSERT_TIME -> SQLMetrics.createTimingMetric(sparkContext, UPSERT_TIME)
+ )
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index a7eafb6b331..baa263e7b0b 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -18,8 +18,11 @@
package org.apache.spark.sql.hudi.command
import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
+import org.apache.hudi.common.model.HoodieCommitMetadata
+import org.apache.hudi.common.table.timeline.InstantComparison
import org.apache.hudi.exception.HoodieException
+import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
@@ -29,8 +32,10 @@ import
org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import
org.apache.spark.sql.hudi.command.HoodieCommandMetrics.updateInsertMetrics
import
org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -56,6 +61,8 @@ case class InsertIntoHoodieTableCommand(logicalRelation:
LogicalRelation,
overwrite: Boolean)
extends DataWritingCommand {
override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+ val sparkContext = SparkContext.getActive.get
+ override lazy val metrics: Map[String, SQLMetric] =
HoodieCommandMetrics.metrics
override def outputColumnNames: Seq[String] = {
query.output.map(_.name)
@@ -65,7 +72,8 @@ case class InsertIntoHoodieTableCommand(logicalRelation:
LogicalRelation,
assert(logicalRelation.catalogTable.isDefined, "Missing catalog table")
val table = logicalRelation.catalogTable.get
- InsertIntoHoodieTableCommand.run(sparkSession, table, plan, partitionSpec,
overwrite)
+ InsertIntoHoodieTableCommand.run(sparkSession, table, plan, partitionSpec,
overwrite, metrics = metrics)
+ DataWritingCommand.propogateMetrics(sparkContext, this, metrics)
Seq.empty[Row]
}
@@ -94,7 +102,8 @@ object InsertIntoHoodieTableCommand extends Logging with
ProvidesHoodieConfig wi
partitionSpec: Map[String, Option[String]],
overwrite: Boolean,
refreshTable: Boolean = true,
- extraOptions: Map[String, String] = Map.empty): Boolean = {
+ extraOptions: Map[String, String] = Map.empty,
+ metrics: Map[String, SQLMetric]): Boolean = {
val catalogTable = new HoodieCatalogTable(sparkSession, table)
val (mode, isOverWriteTable, isOverWritePartition,
staticOverwritePartitionPathOpt) = if (overwrite) {
@@ -105,12 +114,17 @@ object InsertIntoHoodieTableCommand extends Logging with
ProvidesHoodieConfig wi
val config = buildHoodieInsertConfig(catalogTable, sparkSession,
isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions,
staticOverwritePartitionPathOpt)
val df = sparkSession.internalCreateDataFrame(query.execute(),
query.schema)
- val (success, _, _, _, _, _) =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df)
+ val (success, commitInstantTime, _, _, _, _) =
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df)
+
if (!success) {
throw new HoodieException("Insert Into to Hudi table failed")
}
+ if (success && commitInstantTime.isPresent) {
+ updateInsertMetrics(metrics, catalogTable.metaClient,
commitInstantTime.get())
+ }
+
if (success && refreshTable) {
sparkSession.catalog.refreshTable(table.identifier.unquotedString)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/TestHoodieCommandMetrics.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/TestHoodieCommandMetrics.scala
new file mode 100644
index 00000000000..39f425528e8
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/TestHoodieCommandMetrics.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.common.model.HoodieCommitMetadata
+
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import org.mockito.Mockito.{mock, when}
+
+class TestHoodieCommandMetrics extends HoodieSparkSqlTestBase {
+
+ override def beforeAll(): Unit = {
+ spark.sparkContext
+ }
+
+ test("test HoodieCommandMetrics metrics") {
+ val metrics = HoodieCommandMetrics.metrics
+ assert(metrics != null)
+ assert(metrics.size == 10)
+
+ HoodieCommandMetrics.updateInsertMetrics(metrics, mockedCommitMetadata())
+ assertResult(1L)(metrics(HoodieCommandMetrics.NUM_PARTITION_KEY).value)
+ assertResult(2L)(metrics(HoodieCommandMetrics.NUM_INSERT_FILE_KEY).value)
+ assertResult(3L)(metrics(HoodieCommandMetrics.NUM_UPDATE_FILE_KEY).value)
+ assertResult(4L)(metrics(HoodieCommandMetrics.NUM_WRITE_ROWS_KEY).value)
+ assertResult(5L)(metrics(HoodieCommandMetrics.NUM_UPDATE_ROWS_KEY).value)
+ assertResult(6L)(metrics(HoodieCommandMetrics.NUM_INSERT_ROWS_KEY).value)
+ assertResult(7L)(metrics(HoodieCommandMetrics.NUM_DELETE_ROWS_KEY).value)
+ assertResult(8L)(metrics(HoodieCommandMetrics.NUM_OUTPUT_BYTES_KEY).value)
+ assertResult(9L)(metrics(HoodieCommandMetrics.INSERT_TIME).value)
+ assertResult(10L)(metrics(HoodieCommandMetrics.UPSERT_TIME).value)
+ }
+
+
+ private def mockedCommitMetadata(): HoodieCommitMetadata = {
+ val metadata: HoodieCommitMetadata = mock(classOf[HoodieCommitMetadata])
+ when(metadata.fetchTotalPartitionsWritten()).thenReturn(1L)
+ when(metadata.fetchTotalFilesInsert()).thenReturn(2L)
+ when(metadata.fetchTotalFilesUpdated()).thenReturn(3L)
+ when(metadata.fetchTotalRecordsWritten()).thenReturn(4L)
+ when(metadata.fetchTotalUpdateRecordsWritten()).thenReturn(5L)
+ when(metadata.fetchTotalInsertRecordsWritten()).thenReturn(6L)
+ when(metadata.getTotalRecordsDeleted()).thenReturn(7L)
+ when(metadata.fetchTotalBytesWritten()).thenReturn(8L)
+ when(metadata.getTotalCreateTime()).thenReturn(9L)
+ when(metadata.getTotalUpsertTime()).thenReturn(10L)
+ metadata
+ }
+}