hudi-agent commented on code in PR #18770:
URL: https://github.com/apache/hudi/pull/18770#discussion_r3262459209
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -489,6 +497,59 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
iter.map(mapper.apply(_))
}
+ // executor — fast path for SELECT count(*).
+ // Reads only the parquet footer (no row group decoding, no vectorized
reader). The downstream
+ // count aggregator counts rows in the produced batches/rows; column
contents (partition
+ // values) are populated as constants from file.partitionValues so codegen
that touches
+ // column[i] still sees valid data. Tracking: apache/hudi#18769.
+ private def readCountFromFooter(file: PartitionedFile,
+ partitionSchema: StructType,
+ storageConf:
StorageConfiguration[Configuration]): Iterator[InternalRow] = {
+ val storagePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
+ val hadoopPath = HadoopFSUtils.convertToHadoopPath(storagePath)
+ val footer = ParquetFileReader.readFooter(storageConf.unwrap(),
hadoopPath, ParquetMetadataConverter.NO_FILTER)
+ val rowCount = footer.getBlocks.asScala.foldLeft(0L)((acc, b) => acc +
b.getRowCount)
+
+ // Unwrap Hudi's HoodiePartitionFileSliceMapping to get the underlying
InternalRow with
Review Comment:
🤖 I think this over-counts when a parquet file is large enough to be split.
`isSplitable` returns true for COW parquet (line 239: `!isMOR && !isIncremental
&& !isBootstrap && !isLance && superSplitable`), which is exactly the set where
`isCount` fires — so files larger than `spark.sql.files.maxPartitionBytes`
(128MB default) are handed to this function as multiple `PartitionedFile`s with
different `start`/`length` ranges, and each split sums every row group in the
footer. Could you filter blocks by `file.start <= block.getStartingPos <
file.start + file.length` (matching what `VectorizedParquetRecordReader` does
internally)? Otherwise count(*) on tables with >128MB files would be inflated
by the number of splits.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -489,6 +497,59 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
iter.map(mapper.apply(_))
}
+ // executor — fast path for SELECT count(*).
+ // Reads only the parquet footer (no row group decoding, no vectorized
reader). The downstream
+ // count aggregator counts rows in the produced batches/rows; column
contents (partition
+ // values) are populated as constants from file.partitionValues so codegen
that touches
+ // column[i] still sees valid data. Tracking: apache/hudi#18769.
+ private def readCountFromFooter(file: PartitionedFile,
+ partitionSchema: StructType,
+ storageConf:
StorageConfiguration[Configuration]): Iterator[InternalRow] = {
+ val storagePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
+ val hadoopPath = HadoopFSUtils.convertToHadoopPath(storagePath)
+ val footer = ParquetFileReader.readFooter(storageConf.unwrap(),
hadoopPath, ParquetMetadataConverter.NO_FILTER)
+ val rowCount = footer.getBlocks.asScala.foldLeft(0L)((acc, b) => acc +
b.getRowCount)
Review Comment:
🤖 This unconditionally calls `ParquetFileReader.readFooter`, but `isCount`
(line 252) is `requiredSchema.isEmpty && !isMOR && !isIncremental` — it doesn't
gate on file format. `buildBaseFileReader` shows COW supports ORC and Lance
base files too, and `isMultipleBaseFileFormatsEnabled` allows mixed formats.
Running `SELECT count(*)` on an ORC or Lance COW table would hit this path and
throw. Could you also require `hoodieFileFormat == HoodieFileFormat.PARQUET &&
!isMultipleBaseFileFormatsEnabled` (or fall back to `readBaseFile` for the
non-parquet case)?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -489,6 +497,59 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
iter.map(mapper.apply(_))
}
+ // executor — fast path for SELECT count(*).
+ // Reads only the parquet footer (no row group decoding, no vectorized
reader). The downstream
+ // count aggregator counts rows in the produced batches/rows; column
contents (partition
+ // values) are populated as constants from file.partitionValues so codegen
that touches
+ // column[i] still sees valid data. Tracking: apache/hudi#18769.
+ private def readCountFromFooter(file: PartitionedFile,
+ partitionSchema: StructType,
+ storageConf:
StorageConfiguration[Configuration]): Iterator[InternalRow] = {
+ val storagePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
+ val hadoopPath = HadoopFSUtils.convertToHadoopPath(storagePath)
+ val footer = ParquetFileReader.readFooter(storageConf.unwrap(),
hadoopPath, ParquetMetadataConverter.NO_FILTER)
+ val rowCount = footer.getBlocks.asScala.foldLeft(0L)((acc, b) => acc +
b.getRowCount)
+
+ // Unwrap Hudi's HoodiePartitionFileSliceMapping to get the underlying
InternalRow with
+ // partition values, which we replicate into each output batch/row.
+ val partitionValues: InternalRow = file.partitionValues match {
+ case m: HoodiePartitionFileSliceMapping => m.getPartitionValues
+ case row: InternalRow => row
+ }
+
+ if (supportReturningBatch) {
+ val batchSize = 4096
Review Comment:
🤖 nit: `4096` is a magic number here — could it be a named constant (e.g.
`FOOTER_BATCH_SIZE`) defined at the class or companion level, ideally with a
brief note that it matches Spark's default vectorized reader batch size?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -294,7 +297,12 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
(file: PartitionedFile) => {
// executor
val storageConf = new
HadoopStorageConfiguration(broadcastedStorageConf.value.value)
- val iter = file.partitionValues match {
+ val iter = if (isCount) {
+ // Fast path for SELECT count(*): sum row counts from the parquet
footer
+ // without opening the vectorized reader or decoding column data.
+ // Tracking: apache/hudi#18769.
Review Comment:
🤖 nit: issue-tracker references in inline source comments tend to go stale
once the issue is resolved — would it be worth dropping the `Tracking:
apache/hudi#18769.` line and keeping that reference in the commit message
instead? (same applies to line ~504).
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -489,6 +497,59 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
iter.map(mapper.apply(_))
}
+ // executor — fast path for SELECT count(*).
+ // Reads only the parquet footer (no row group decoding, no vectorized
reader). The downstream
+ // count aggregator counts rows in the produced batches/rows; column
contents (partition
+ // values) are populated as constants from file.partitionValues so codegen
that touches
+ // column[i] still sees valid data. Tracking: apache/hudi#18769.
+ private def readCountFromFooter(file: PartitionedFile,
+ partitionSchema: StructType,
+ storageConf:
StorageConfiguration[Configuration]): Iterator[InternalRow] = {
+ val storagePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
+ val hadoopPath = HadoopFSUtils.convertToHadoopPath(storagePath)
+ val footer = ParquetFileReader.readFooter(storageConf.unwrap(),
hadoopPath, ParquetMetadataConverter.NO_FILTER)
+ val rowCount = footer.getBlocks.asScala.foldLeft(0L)((acc, b) => acc +
b.getRowCount)
+
+ // Unwrap Hudi's HoodiePartitionFileSliceMapping to get the underlying
InternalRow with
+ // partition values, which we replicate into each output batch/row.
+ val partitionValues: InternalRow = file.partitionValues match {
Review Comment:
🤖 Minor: `case row: InternalRow => row` won't match if
`file.partitionValues` is `null` (Scala type patterns reject null), so this
would throw `MatchError` rather than fall through to a sensible default. In
practice Spark passes an empty `InternalRow` for unpartitioned tables, so this
may be defensive only — but worth considering an explicit `case null =>
InternalRow.empty` or just defaulting to `file.partitionValues` directly.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]