hudi-agent commented on code in PR #18770:
URL: https://github.com/apache/hudi/pull/18770#discussion_r3262459209


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -489,6 +497,59 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
     iter.map(mapper.apply(_))
   }
 
+  // executor — fast path for SELECT count(*).
+  // Reads only the parquet footer (no row group decoding, no vectorized 
reader). The downstream
+  // count aggregator counts rows in the produced batches/rows; column 
contents (partition
+  // values) are populated as constants from file.partitionValues so codegen 
that touches
+  // column[i] still sees valid data. Tracking: apache/hudi#18769.
+  private def readCountFromFooter(file: PartitionedFile,
+                                  partitionSchema: StructType,
+                                  storageConf: 
StorageConfiguration[Configuration]): Iterator[InternalRow] = {
+    val storagePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
+    val hadoopPath = HadoopFSUtils.convertToHadoopPath(storagePath)
+    val footer = ParquetFileReader.readFooter(storageConf.unwrap(), 
hadoopPath, ParquetMetadataConverter.NO_FILTER)
+    val rowCount = footer.getBlocks.asScala.foldLeft(0L)((acc, b) => acc + 
b.getRowCount)
+
+    // Unwrap Hudi's HoodiePartitionFileSliceMapping to get the underlying 
InternalRow with

Review Comment:
   🤖 I think this over-counts when a parquet file is large enough to be split. 
`isSplitable` returns true for COW parquet (line 239: `!isMOR && !isIncremental 
&& !isBootstrap && !isLance && superSplitable`), which is exactly the set where 
`isCount` fires — so files larger than `spark.sql.files.maxPartitionBytes` 
(128MB default) are handed to this function as multiple `PartitionedFile`s with 
different `start`/`length` ranges, and each split sums every row group in the 
footer. Could you filter blocks by `file.start <= block.getStartingPos < 
file.start + file.length` (matching what `VectorizedParquetRecordReader` does 
internally)? Otherwise count(*) on tables with >128MB files would be inflated 
by the number of splits.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -489,6 +497,59 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
     iter.map(mapper.apply(_))
   }
 
+  // executor — fast path for SELECT count(*).
+  // Reads only the parquet footer (no row group decoding, no vectorized 
reader). The downstream
+  // count aggregator counts rows in the produced batches/rows; column 
contents (partition
+  // values) are populated as constants from file.partitionValues so codegen 
that touches
+  // column[i] still sees valid data. Tracking: apache/hudi#18769.
+  private def readCountFromFooter(file: PartitionedFile,
+                                  partitionSchema: StructType,
+                                  storageConf: 
StorageConfiguration[Configuration]): Iterator[InternalRow] = {
+    val storagePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
+    val hadoopPath = HadoopFSUtils.convertToHadoopPath(storagePath)
+    val footer = ParquetFileReader.readFooter(storageConf.unwrap(), 
hadoopPath, ParquetMetadataConverter.NO_FILTER)
+    val rowCount = footer.getBlocks.asScala.foldLeft(0L)((acc, b) => acc + 
b.getRowCount)

Review Comment:
   🤖 This unconditionally calls `ParquetFileReader.readFooter`, but `isCount` 
(line 252) is `requiredSchema.isEmpty && !isMOR && !isIncremental` — it doesn't 
gate on file format. `buildBaseFileReader` shows COW supports ORC and Lance 
base files too, and `isMultipleBaseFileFormatsEnabled` allows mixed formats. 
Running `SELECT count(*)` on an ORC or Lance COW table would hit this path and 
throw. Could you also require `hoodieFileFormat == HoodieFileFormat.PARQUET && 
!isMultipleBaseFileFormatsEnabled` (or fall back to `readBaseFile` for the 
non-parquet case)?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -489,6 +497,59 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
     iter.map(mapper.apply(_))
   }
 
+  // executor — fast path for SELECT count(*).
+  // Reads only the parquet footer (no row group decoding, no vectorized 
reader). The downstream
+  // count aggregator counts rows in the produced batches/rows; column 
contents (partition
+  // values) are populated as constants from file.partitionValues so codegen 
that touches
+  // column[i] still sees valid data. Tracking: apache/hudi#18769.
+  private def readCountFromFooter(file: PartitionedFile,
+                                  partitionSchema: StructType,
+                                  storageConf: 
StorageConfiguration[Configuration]): Iterator[InternalRow] = {
+    val storagePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
+    val hadoopPath = HadoopFSUtils.convertToHadoopPath(storagePath)
+    val footer = ParquetFileReader.readFooter(storageConf.unwrap(), 
hadoopPath, ParquetMetadataConverter.NO_FILTER)
+    val rowCount = footer.getBlocks.asScala.foldLeft(0L)((acc, b) => acc + 
b.getRowCount)
+
+    // Unwrap Hudi's HoodiePartitionFileSliceMapping to get the underlying 
InternalRow with
+    // partition values, which we replicate into each output batch/row.
+    val partitionValues: InternalRow = file.partitionValues match {
+      case m: HoodiePartitionFileSliceMapping => m.getPartitionValues
+      case row: InternalRow => row
+    }
+
+    if (supportReturningBatch) {
+      val batchSize = 4096

Review Comment:
   🤖 nit: `4096` is a magic number here — could it be a named constant (e.g. 
`FOOTER_BATCH_SIZE`) defined at the class or companion level, ideally with a 
brief note that it matches Spark's default vectorized reader batch size?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -294,7 +297,12 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
     (file: PartitionedFile) => {
       // executor
       val storageConf = new 
HadoopStorageConfiguration(broadcastedStorageConf.value.value)
-      val iter = file.partitionValues match {
+      val iter = if (isCount) {
+        // Fast path for SELECT count(*): sum row counts from the parquet 
footer
+        // without opening the vectorized reader or decoding column data.
+        // Tracking: apache/hudi#18769.

Review Comment:
   🤖 nit: issue-tracker references in inline source comments tend to go stale 
once the issue is resolved — would it be worth dropping the `Tracking: 
apache/hudi#18769.` line and keeping that reference in the commit message 
instead? (same applies to line ~504).
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -489,6 +497,59 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
     iter.map(mapper.apply(_))
   }
 
+  // executor — fast path for SELECT count(*).
+  // Reads only the parquet footer (no row group decoding, no vectorized 
reader). The downstream
+  // count aggregator counts rows in the produced batches/rows; column 
contents (partition
+  // values) are populated as constants from file.partitionValues so codegen 
that touches
+  // column[i] still sees valid data. Tracking: apache/hudi#18769.
+  private def readCountFromFooter(file: PartitionedFile,
+                                  partitionSchema: StructType,
+                                  storageConf: 
StorageConfiguration[Configuration]): Iterator[InternalRow] = {
+    val storagePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
+    val hadoopPath = HadoopFSUtils.convertToHadoopPath(storagePath)
+    val footer = ParquetFileReader.readFooter(storageConf.unwrap(), 
hadoopPath, ParquetMetadataConverter.NO_FILTER)
+    val rowCount = footer.getBlocks.asScala.foldLeft(0L)((acc, b) => acc + 
b.getRowCount)
+
+    // Unwrap Hudi's HoodiePartitionFileSliceMapping to get the underlying 
InternalRow with
+    // partition values, which we replicate into each output batch/row.
+    val partitionValues: InternalRow = file.partitionValues match {

Review Comment:
   🤖 Minor: `case row: InternalRow => row` won't match if 
`file.partitionValues` is `null` (Scala type patterns reject null), so this 
would throw `MatchError` rather than fall through to a sensible default. In 
practice Spark passes an empty `InternalRow` for unpartitioned tables, so this 
may be defensive only — but worth considering an explicit `case null => 
InternalRow.empty` or just defaulting to `file.partitionValues` directly.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to