This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c4806c8fee [HUDI-9249] Support displaying 
InsertIntoHoodieTableCommand metrics in Spark Web UI (#13068)
3c4806c8fee is described below

commit 3c4806c8fee0d942124594996243f8a1288d6853
Author: wangyinsheng <[email protected]>
AuthorDate: Fri May 9 21:13:57 2025 +0800

    [HUDI-9249] Support displaying InsertIntoHoodieTableCommand metrics in 
Spark Web UI (#13068)
    
    * this would induce additional overhead for listing the timeline and 
decoding the commit metadata.
    
    ---------
    
    Co-authored-by: wangyinsheng <[email protected]>
---
 .../command/CreateHoodieTableAsSelectCommand.scala |  6 +-
 .../sql/hudi/command/HoodieCommandMetrics.scala    | 97 ++++++++++++++++++++++
 .../command/InsertIntoHoodieTableCommand.scala     | 20 ++++-
 .../hudi/command/TestHoodieCommandMetrics.scala    | 64 ++++++++++++++
 4 files changed, 183 insertions(+), 4 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
index cd09df9a7a9..58a9fc1ad3f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.DataWritingCommand
+import org.apache.spark.sql.execution.metric.SQLMetric
 
 import scala.collection.JavaConverters._
 
@@ -45,6 +46,8 @@ case class CreateHoodieTableAsSelectCommand(
    query: LogicalPlan) extends DataWritingCommand {
   override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
 
+  override lazy val metrics: Map[String, SQLMetric] = 
HoodieCommandMetrics.metrics
+
   override def run(sparkSession: SparkSession, plan: SparkPlan): Seq[Row] = {
     checkState(table.tableType != CatalogTableType.VIEW)
     checkState(table.provider.isDefined)
@@ -106,7 +109,8 @@ case class CreateHoodieTableAsSelectCommand(
       )
       val partitionSpec = updatedTable.partitionColumnNames.map((_, 
None)).toMap
       val success = InsertIntoHoodieTableCommand.run(sparkSession, 
updatedTable, plan, partitionSpec,
-        mode == SaveMode.Overwrite, refreshTable = false, extraOptions = 
options)
+        mode == SaveMode.Overwrite, refreshTable = false, extraOptions = 
options, metrics = metrics)
+      DataWritingCommand.propogateMetrics(sparkSession.sparkContext, this, 
metrics)
       if (success) {
         // If write success, create the table in catalog if it has not synced 
to the
         // catalog by the meta sync.
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/HoodieCommandMetrics.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/HoodieCommandMetrics.scala
new file mode 100644
index 00000000000..2b356bf3c56
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/HoodieCommandMetrics.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.common.model.HoodieCommitMetadata
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.timeline.{HoodieInstant, InstantComparison}
+import org.apache.hudi.common.util.VisibleForTesting
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+
+import scala.collection.JavaConverters._
+
+object HoodieCommandMetrics {
+
+  def updateInsertMetrics(metrics: Map[String, SQLMetric], metaClient: 
HoodieTableMetaClient, commitInstantTime: String): Unit = {
+    val timeline = metaClient.getActiveTimeline.reload().getCommitsTimeline()
+    val commitInstant = timeline.getInstants.asScala
+      .filter(instant => 
InstantComparison.EQUALS.test(instant.requestedTime(), commitInstantTime))
+    commitInstant.map { commit: HoodieInstant =>
+      val metadata = timeline.readCommitMetadata(commit)
+      updateInsertMetrics(metrics, metadata)
+    }
+  }
+
+  def updateInsertMetrics(metrics: Map[String, SQLMetric], metadata: 
HoodieCommitMetadata): Unit = {
+    updateInsertMetric(metrics, NUM_PARTITION_KEY, 
metadata.fetchTotalPartitionsWritten())
+    updateInsertMetric(metrics, NUM_INSERT_FILE_KEY, 
metadata.fetchTotalFilesInsert())
+    updateInsertMetric(metrics, NUM_UPDATE_FILE_KEY, 
metadata.fetchTotalFilesUpdated())
+    updateInsertMetric(metrics, NUM_WRITE_ROWS_KEY, 
metadata.fetchTotalRecordsWritten())
+    updateInsertMetric(metrics, NUM_UPDATE_ROWS_KEY, 
metadata.fetchTotalUpdateRecordsWritten())
+    updateInsertMetric(metrics, NUM_INSERT_ROWS_KEY, 
metadata.fetchTotalInsertRecordsWritten())
+    updateInsertMetric(metrics, NUM_DELETE_ROWS_KEY, 
metadata.getTotalRecordsDeleted())
+    updateInsertMetric(metrics, NUM_OUTPUT_BYTES_KEY, 
metadata.fetchTotalBytesWritten())
+    updateInsertMetric(metrics, INSERT_TIME, metadata.getTotalCreateTime())
+    updateInsertMetric(metrics, UPSERT_TIME, metadata.getTotalUpsertTime())
+  }
+
+  private def updateInsertMetric(metrics: Map[String, SQLMetric], name: 
String, value: Long): Unit = {
+    val metric = metrics.get(name)
+    metric.foreach(_.set(value))
+  }
+
+
+  @VisibleForTesting
+  val NUM_PARTITION_KEY = "number of written partitions"
+  @VisibleForTesting
+  val NUM_INSERT_FILE_KEY = "number of inserted files"
+  @VisibleForTesting
+  val NUM_UPDATE_FILE_KEY = "number of updated files"
+  @VisibleForTesting
+  val NUM_WRITE_ROWS_KEY = "number of written rows"
+  @VisibleForTesting
+  val NUM_UPDATE_ROWS_KEY = "number of updated rows"
+  @VisibleForTesting
+  val NUM_INSERT_ROWS_KEY = "number of inserted rows"
+  @VisibleForTesting
+  val NUM_DELETE_ROWS_KEY = "number of deleted rows"
+  @VisibleForTesting
+  val NUM_OUTPUT_BYTES_KEY = "output size in bytes"
+  @VisibleForTesting
+  val INSERT_TIME = "total insert time"
+  @VisibleForTesting
+  val UPSERT_TIME = "total upsert time"
+
+  def metrics: Map[String, SQLMetric] = {
+    val sparkContext = SparkContext.getActive.get
+    Map(
+      NUM_PARTITION_KEY -> SQLMetrics.createMetric(sparkContext, 
NUM_PARTITION_KEY),
+      NUM_INSERT_FILE_KEY -> SQLMetrics.createMetric(sparkContext, 
NUM_INSERT_FILE_KEY),
+      NUM_UPDATE_FILE_KEY -> SQLMetrics.createMetric(sparkContext, 
NUM_UPDATE_FILE_KEY),
+      NUM_WRITE_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, 
NUM_WRITE_ROWS_KEY),
+      NUM_UPDATE_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, 
NUM_UPDATE_ROWS_KEY),
+      NUM_INSERT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, 
NUM_INSERT_ROWS_KEY),
+      NUM_DELETE_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, 
NUM_DELETE_ROWS_KEY),
+      NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext, 
NUM_OUTPUT_BYTES_KEY),
+      INSERT_TIME -> SQLMetrics.createTimingMetric(sparkContext, INSERT_TIME),
+      UPSERT_TIME -> SQLMetrics.createTimingMetric(sparkContext, UPSERT_TIME)
+    )
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
index a7eafb6b331..baa263e7b0b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
@@ -18,8 +18,11 @@
 package org.apache.spark.sql.hudi.command
 
 import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport}
+import org.apache.hudi.common.model.HoodieCommitMetadata
+import org.apache.hudi.common.table.timeline.InstantComparison
 import org.apache.hudi.exception.HoodieException
 
+import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable}
@@ -29,8 +32,10 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.DataWritingCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
 import org.apache.spark.sql.hudi.ProvidesHoodieConfig
+import 
org.apache.spark.sql.hudi.command.HoodieCommandMetrics.updateInsertMetrics
 import 
org.apache.spark.sql.hudi.command.HoodieLeafRunnableCommand.stripMetaFieldAttributes
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -56,6 +61,8 @@ case class InsertIntoHoodieTableCommand(logicalRelation: 
LogicalRelation,
                                         overwrite: Boolean)
   extends DataWritingCommand {
   override def innerChildren: Seq[QueryPlan[_]] = Seq(query)
+  val sparkContext = SparkContext.getActive.get
+  override lazy val metrics: Map[String, SQLMetric] = 
HoodieCommandMetrics.metrics
 
   override def outputColumnNames: Seq[String] = {
     query.output.map(_.name)
@@ -65,7 +72,8 @@ case class InsertIntoHoodieTableCommand(logicalRelation: 
LogicalRelation,
     assert(logicalRelation.catalogTable.isDefined, "Missing catalog table")
 
     val table = logicalRelation.catalogTable.get
-    InsertIntoHoodieTableCommand.run(sparkSession, table, plan, partitionSpec, 
overwrite)
+    InsertIntoHoodieTableCommand.run(sparkSession, table, plan, partitionSpec, 
overwrite, metrics = metrics)
+    DataWritingCommand.propogateMetrics(sparkContext, this, metrics)
     Seq.empty[Row]
   }
 
@@ -94,7 +102,8 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig wi
           partitionSpec: Map[String, Option[String]],
           overwrite: Boolean,
           refreshTable: Boolean = true,
-          extraOptions: Map[String, String] = Map.empty): Boolean = {
+          extraOptions: Map[String, String] = Map.empty,
+          metrics: Map[String, SQLMetric]): Boolean = {
     val catalogTable = new HoodieCatalogTable(sparkSession, table)
 
     val (mode, isOverWriteTable, isOverWritePartition, 
staticOverwritePartitionPathOpt) = if (overwrite) {
@@ -105,12 +114,17 @@ object InsertIntoHoodieTableCommand extends Logging with 
ProvidesHoodieConfig wi
     val config = buildHoodieInsertConfig(catalogTable, sparkSession, 
isOverWritePartition, isOverWriteTable, partitionSpec, extraOptions, 
staticOverwritePartitionPathOpt)
 
     val df = sparkSession.internalCreateDataFrame(query.execute(), 
query.schema)
-    val (success, _, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df)
+    val (success, commitInstantTime, _, _, _, _) = 
HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, df)
+
 
     if (!success) {
       throw new HoodieException("Insert Into to Hudi table failed")
     }
 
+    if (success && commitInstantTime.isPresent) {
+      updateInsertMetrics(metrics, catalogTable.metaClient, 
commitInstantTime.get())
+    }
+
     if (success && refreshTable) {
       sparkSession.catalog.refreshTable(table.identifier.unquotedString)
     }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/TestHoodieCommandMetrics.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/TestHoodieCommandMetrics.scala
new file mode 100644
index 00000000000..39f425528e8
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/TestHoodieCommandMetrics.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hudi.common.model.HoodieCommitMetadata
+
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
+import org.mockito.Mockito.{mock, when}
+
+class TestHoodieCommandMetrics extends HoodieSparkSqlTestBase {
+
+  override def beforeAll(): Unit = {
+    spark.sparkContext
+  }
+
+  test("test HoodieCommandMetrics metrics") {
+    val metrics = HoodieCommandMetrics.metrics
+    assert(metrics != null)
+    assert(metrics.size == 10)
+
+    HoodieCommandMetrics.updateInsertMetrics(metrics, mockedCommitMetadata())
+    assertResult(1L)(metrics(HoodieCommandMetrics.NUM_PARTITION_KEY).value)
+    assertResult(2L)(metrics(HoodieCommandMetrics.NUM_INSERT_FILE_KEY).value)
+    assertResult(3L)(metrics(HoodieCommandMetrics.NUM_UPDATE_FILE_KEY).value)
+    assertResult(4L)(metrics(HoodieCommandMetrics.NUM_WRITE_ROWS_KEY).value)
+    assertResult(5L)(metrics(HoodieCommandMetrics.NUM_UPDATE_ROWS_KEY).value)
+    assertResult(6L)(metrics(HoodieCommandMetrics.NUM_INSERT_ROWS_KEY).value)
+    assertResult(7L)(metrics(HoodieCommandMetrics.NUM_DELETE_ROWS_KEY).value)
+    assertResult(8L)(metrics(HoodieCommandMetrics.NUM_OUTPUT_BYTES_KEY).value)
+    assertResult(9L)(metrics(HoodieCommandMetrics.INSERT_TIME).value)
+    assertResult(10L)(metrics(HoodieCommandMetrics.UPSERT_TIME).value)
+  }
+
+
+  private def mockedCommitMetadata(): HoodieCommitMetadata = {
+    val metadata: HoodieCommitMetadata = mock(classOf[HoodieCommitMetadata])
+    when(metadata.fetchTotalPartitionsWritten()).thenReturn(1L)
+    when(metadata.fetchTotalFilesInsert()).thenReturn(2L)
+    when(metadata.fetchTotalFilesUpdated()).thenReturn(3L)
+    when(metadata.fetchTotalRecordsWritten()).thenReturn(4L)
+    when(metadata.fetchTotalUpdateRecordsWritten()).thenReturn(5L)
+    when(metadata.fetchTotalInsertRecordsWritten()).thenReturn(6L)
+    when(metadata.getTotalRecordsDeleted()).thenReturn(7L)
+    when(metadata.fetchTotalBytesWritten()).thenReturn(8L)
+    when(metadata.getTotalCreateTime()).thenReturn(9L)
+    when(metadata.getTotalUpsertTime()).thenReturn(10L)
+    metadata
+  }
+}

Reply via email to