This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 11f0e7c797ed feat(spark): add input records & bytes metrics (#18882)
11f0e7c797ed is described below

commit 11f0e7c797ed3740be20c047892e39ce53627bd8
Author: fhan <[email protected]>
AuthorDate: Tue Jun 2 01:47:54 2026 +0800

    feat(spark): add input records & bytes metrics (#18882)
    
    Co-authored-by: fhan <[email protected]>
---
 .../org/apache/hudi/HoodieMergeOnReadRDDV2.scala   | 45 ++++++++++++++++------
 .../spark/HoodieSparkInputMetricsUtils.scala       | 40 +++++++++++++++++++
 2 files changed, 74 insertions(+), 11 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
index 3d9b908c4d31..0104136f8424 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDDV2.scala
@@ -42,7 +42,7 @@ import 
org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
 import org.apache.avro.generic.IndexedRecord
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
-import org.apache.spark.{Partition, SerializableWritable, SparkContext, 
TaskContext}
+import org.apache.spark.{HoodieSparkInputMetricsUtils, Partition, 
SerializableWritable, SparkContext, TaskContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -146,6 +146,7 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
 
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
     val partition = split.asInstanceOf[HoodieMergeOnReadPartition]
+    val bytesReadCallback = 
HoodieSparkInputMetricsUtils.getFSBytesReadOnThreadCallback()
 
     val iter: Iterator[InternalRow] = partition.split match {
       case dataFileOnlySplit if dataFileOnlySplit.logFiles.isEmpty =>
@@ -203,15 +204,9 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
         }
     }
 
-    if (iter.isInstanceOf[Closeable]) {
-      // register a callback to close logScanner which will be executed on 
task completion.
-      // when tasks finished, this method will be called, and release 
resources.
-      Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => 
iter.asInstanceOf[Closeable].close()))
-    }
-
     val commitTimeMetadataFieldIdx = 
requiredSchema.structTypeSchema.fieldNames.indexOf(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
     val needsFiltering = commitTimeMetadataFieldIdx >= 0 && 
includedInstantTimeSet.isDefined
-    if (needsFiltering) {
+    val resultIter = if (needsFiltering) {
       val filterT: Predicate[InternalRow] = new Predicate[InternalRow] {
         override def test(row: InternalRow): Boolean = {
           val commitTime = row.getString(commitTimeMetadataFieldIdx)
@@ -219,10 +214,11 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
         }
       }
       iter.filter(filterT.test)
-    }
-    else {
+    } else {
       iter
     }
+
+    withInputMetrics(resultIter, iter, context, bytesReadCallback)
   }
 
   override protected def getPartitions: Array[Partition] =
@@ -259,6 +255,34 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
     }
   }
 
+  private def withInputMetrics(iter: Iterator[InternalRow],
+                               closeableIter: Iterator[InternalRow],
+                               context: TaskContext,
+                               bytesReadCallback: () => Long): 
Iterator[InternalRow] = {
+    val metricIter = new Iterator[InternalRow] with Closeable {
+      override def hasNext: Boolean = iter.hasNext
+
+      override def next(): InternalRow = {
+        val row = iter.next()
+        HoodieSparkInputMetricsUtils.incRecordsRead(context, 1)
+        row
+      }
+
+      override def close(): Unit = {
+        closeableIter match {
+          case closeable: Closeable => closeable.close()
+          case _ =>
+        }
+      }
+    }
+
+    context.addTaskCompletionListener[Unit] { _ =>
+      HoodieSparkInputMetricsUtils.incBytesRead(context, bytesReadCallback())
+      metricIter.close()
+    }
+    metricIter
+  }
+
   private def getPartitionPath(split: HoodieMergeOnReadFileSplit): StoragePath 
= {
     // Determine partition path as an immediate parent folder of either
     //    - The base file
@@ -273,4 +297,3 @@ class HoodieMergeOnReadRDDV2(@transient sc: SparkContext,
 object HoodieMergeOnReadRDDV2 {
   val CONFIG_INSTANTIATION_LOCK = new Object()
 }
-
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieSparkInputMetricsUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieSparkInputMetricsUtils.scala
new file mode 100644
index 000000000000..4a3210bde126
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/HoodieSparkInputMetricsUtils.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.deploy.SparkHadoopUtil
+
+object HoodieSparkInputMetricsUtils {
+
+  def getFSBytesReadOnThreadCallback(): () => Long = {
+    SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
+  }
+
+  def incRecordsRead(taskContext: TaskContext, count: Long): Unit = {
+    if (taskContext != null) {
+      taskContext.taskMetrics().inputMetrics.incRecordsRead(count)
+    }
+  }
+
+  def incBytesRead(taskContext: TaskContext, bytes: Long): Unit = {
+    if (taskContext != null) {
+      taskContext.taskMetrics().inputMetrics.incBytesRead(bytes)
+    }
+  }
+}

Reply via email to