hudi-agent commented on code in PR #18277: URL: https://github.com/apache/hudi/pull/18277#discussion_r3350318180
########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/v2/HoodieScanBuilder.scala: ########## @@ -0,0 +1,641 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.v2 + +import org.apache.hudi.{DataSourceReadOptions, HoodieBaseRelation, HoodieDataSourceHelper, HoodieFileIndex, HoodieSparkUtils, SparkAdapterSupport} +import org.apache.hudi.client.utils.SparkInternalSchemaConverter +import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.engine.HoodieLocalEngineContext +import org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE +import org.apache.hudi.common.schema.HoodieSchema +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.common.table.timeline.TimelineLayout +import org.apache.hudi.common.util.{Option => HOption} +import org.apache.hudi.common.util.collection.Pair +import org.apache.hudi.config.HoodieBootstrapConfig +import org.apache.hudi.internal.schema.InternalSchema +import org.apache.hudi.internal.schema.utils.SerDeHelper +import org.apache.hudi.metadata.{HoodieBackedTableMetadata, MetadataPartitionType} +import org.apache.hudi.stats.HoodieColumnRangeMetadata +import org.apache.hudi.util.SparkConfigUtils + +import org.apache.spark.sql.HoodieCatalystExpressionUtils +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow} +import org.apache.spark.sql.connector.expressions.NamedReference +import org.apache.spark.sql.connector.expressions.aggregate.{Aggregation, Count, CountStar, Max, Min} +import org.apache.spark.sql.connector.read.{InputPartition, Scan, ScanBuilder, SupportsPushDownAggregates, SupportsPushDownFilters, SupportsPushDownRequiredColumns} +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils +import org.apache.spark.sql.hudi.v2.HoodieScanBuilder.{CountStarAbsent, CountStarColumn, CountStarMissingTarget, CountStarWithColumn} +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, StructField, StructType} +import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.SerializableConfiguration +import org.slf4j.LoggerFactory + +import scala.collection.JavaConverters._ + +/** + * Scan builder for DSv2 CoW snapshot reads. + */ +class HoodieScanBuilder(spark: SparkSession, + metaClient: HoodieTableMetaClient, + tableSchema: StructType, + options: Map[String, String]) extends ScanBuilder + with SupportsPushDownFilters + with SupportsPushDownRequiredColumns + with HoodiePartialLimitPushDown + with SupportsPushDownAggregates + with SparkAdapterSupport { + + private val log = LoggerFactory.getLogger(getClass) + + private var requiredSchema: StructType = tableSchema + private var _pushedFilters: Array[Filter] = Array.empty + private var partitionFilterExprs: Seq[Expression] = Seq.empty + private var dataFilterExprs: Seq[Expression] = Seq.empty + private var hasPostScanFilters: Boolean = false + + private var pushedLimit: Option[Int] = None + private var pushedAggregation: Option[Aggregation] = None + private var aggregateResult: Option[Array[InternalRow]] = None + + private lazy val fileIndex = HoodieFileIndex(spark, metaClient, Some(tableSchema), options, + includeLogFiles = false, shouldEmbedFileSlices = false) + + override def pushFilters(filters: Array[Filter]): Array[Filter] = { + val (pushed, postScan) = filters.partition { f => + HoodieCatalystExpressionUtils.convertToCatalystExpression(f, tableSchema).isDefined + } + + val expressions = pushed.flatMap(f => + HoodieCatalystExpressionUtils.convertToCatalystExpression(f, tableSchema)) + + val (partFilters, datFilters) = HoodieCatalystExpressionUtils + .splitPartitionAndDataPredicates(spark, expressions, fileIndex.partitionSchema.fieldNames) + + // Mirror DSv1's MergeOnReadSnapshotRelation.collectFileSplits: convert partition filters + // for TimestampBasedKeyGenerator before pruning. HoodieFileIndex only applies this + // conversion internally when shouldEmbedFileSlices=true; this scan builder sets it + // false, so without an explicit conversion `dt = '2024-01-01'` would be matched against + // formatted path values like "2024/01/01" and prune away the matching partition. + partitionFilterExprs = HoodieFileIndex + .convertFilterForTimestampKeyGenerator(metaClient, partFilters).toSeq + dataFilterExprs = datFilters.toSeq + + val partFieldNames = fileIndex.partitionSchema.fieldNames.toSet + // Split pushed filters into: + // - pushedDataFilters (references only data columns): forwarded to Parquet for + // row-group pruning AND returned so Spark re-applies them row-wise (Parquet stats + // only prune row-groups). + // - partitionReferencingFilters (references at least one partition column, including + // partition-only and mixed filters): must NOT be forwarded to Parquet because the + // partition column may be absent from the base file (drop_partition_columns) or, + // when present, may hold a value that disagrees with the path-derived value — + // e.g. for TimestampBased/Custom key generators where path = "2024/01/01" but the + // stored column value is "2024-01-01 12:00:00". Row-group stats would then prune + // row-groups containing matching rows. They MUST be returned so Spark re-applies + // them row-wise; partition pruning narrows the file set but does not evaluate + // predicates against actual column values. + val (pushedDataFilters, partitionReferencingFilters) = pushed.partition(f => + f.references.nonEmpty && f.references.forall(r => !partFieldNames.contains(r))) + + _pushedFilters = pushedDataFilters + hasPostScanFilters = postScan.nonEmpty || partitionReferencingFilters.nonEmpty + + postScan ++ partitionReferencingFilters ++ pushedDataFilters + } + + override def pushedFilters(): Array[Filter] = _pushedFilters + + override def pruneColumns(requiredSchema: StructType): Unit = { + this.requiredSchema = requiredSchema + } + + override def pushLimit(limit: Int): Boolean = { + // Spark 3.3's planner does not honor SupportsPushDownLimit.isPartiallyPushed — any + // successful pushdown is treated as COMPLETE and the outer LocalLimit is dropped. + // HoodieBatchScan enforces the limit per input partition, so LIMIT N across multiple + // base files could over-return. Refuse pushdown on 3.3; on 3.4+, HoodiePartialLimitPushDown + // keeps the outer LocalLimit in place. + // Otherwise the pushdown is only safe when no filter is re-applied above the scan: + // _pushedFilters are returned to Spark for row-wise re-evaluation (Parquet only + // prunes row-groups via them), and hasPostScanFilters covers filters we couldn't + // convert. Capping rows in the reader before either runs would drop later matching + // rows — e.g. WHERE id > 3 LIMIT 1 could stop at a row with id = 1. Spark pushes + // filters before limits, so both flags are already populated here. + if (!HoodieSparkUtils.gteqSpark3_4) { + false + } else if (_pushedFilters.nonEmpty || hasPostScanFilters) { + false + } else { + pushedLimit = Some(limit) + true + } + } + + override def pushAggregation(aggregation: Aggregation): Boolean = { + if (aggregation.groupByExpressions().nonEmpty) { + false + } else if (!MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(metaClient)) { + false + } else { + val funcs = aggregation.aggregateExpressions() + val allSupported = funcs.forall { + case _: CountStar => true + case c: Count => !c.isDistinct + // Parquet/Hudi column stats exclude NaN from min/max, but Spark SQL + // treats NaN as greater than any non-NaN value. Stats can't distinguish + // "all NaN" from "all null" (both surface as a null min/max), so skip + // MIN/MAX pushdown on float/double columns and let Spark compute from + // the scan. + case m: Min => !isFloatingPointColumn(m.column()) + case m: Max => !isFloatingPointColumn(m.column()) + case _ => false + } + if (!allSupported) { + false + } else { + tryComputeAggregates(aggregation) match { + case Some(rows) => + pushedAggregation = Some(aggregation) + aggregateResult = Some(rows) + true + case None => false + } + } + } + } + + override def supportCompletePushDown(aggregation: Aggregation): Boolean = { + pushedAggregation.contains(aggregation) && dataFilterExprs.isEmpty && !hasPostScanFilters + } + + override def build(): Scan = { + aggregateResult match { + case Some(rows) => + val outputSchema = buildAggregateOutputSchema(pushedAggregation.get) + new HoodieLocalScan(outputSchema, rows) + case None => + buildSnapshotScan() + } + } + + private def buildSnapshotScan(): Scan = { + // Invariant established by HoodieV2ReadSupport.isSupportedByDSv2: + // COW snapshot or MOR read_optimized only, Parquet only, single base-file format. + val queryType = SparkConfigUtils.getStringWithAltKeys(options, DataSourceReadOptions.QUERY_TYPE) + require(metaClient.getTableType == COPY_ON_WRITE || + queryType == DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, + "HoodieScanBuilder supports COW snapshot or read_optimized only; " + + s"got tableType=${metaClient.getTableType}, queryType=$queryType") + + val partFieldNames = fileIndex.partitionSchema.fieldNames.toSet + // Mirror DSv1's HoodieBaseRelation.shouldExtractPartitionValuesFromPartitionPath: only + // strip partition columns from the file schema and supply them from the parsed path + // when the table was written with drop_partition_columns, the user explicitly opted + // in via EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH, or the bootstrap fast-read + // path is requested. Otherwise partition columns are persisted in the base files and + // path-derived values would be wrong for timestamp/custom key generators or encoded + // partition paths. (Bootstrap is currently rejected by HoodieV2ReadSupport, but the + // gate is kept identical for parity.) + val shouldOmitPartitionColumns = + metaClient.getTableConfig.shouldDropPartitionColumns && partFieldNames.nonEmpty + val shouldExtractPartitionValueFromPath = + options.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key, + DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean + val shouldUseBootstrapFastRead = + options.getOrElse(HoodieBootstrapConfig.DATA_QUERIES_ONLY.key, "false").toBoolean + val extractPartitionValuesFromPartitionPath = + shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath || shouldUseBootstrapFastRead + + // When partition values are extracted from the path, the partition column types must come + // from the file index. TimestampBasedKeyGenerator (and similar) parse non-DATE_STRING + // partition path values as UTF8String/StringType, so fileIndex.partitionSchema is the only + // place that reflects what HoodieFileIndex puts into HoodieInputPartition.partitionValues. + // Building requiredPartitionSchema from requiredSchema would advertise the original + // Long/Timestamp type and feed string path values into the parquet partition reader, + // producing wrong values or runtime errors. If the file-index type differs from the + // requiredSchema type, reject — DSv2 cannot reconcile that mismatch without also rewriting + // HoodieSparkV2Table.schema(), which is out of scope here. + val fullPartSchema = fileIndex.partitionSchema + val (requiredDataSchema, requiredPartitionSchema) = if (extractPartitionValuesFromPartitionPath) { + val partFieldsByName = fullPartSchema.fields.map(f => f.name -> f).toMap + val partitionFieldsInRequired = requiredSchema.fields + .filter(f => partFieldNames.contains(f.name)) + .map { f => + val fileIdxField = partFieldsByName(f.name) + if (fileIdxField.dataType != f.dataType) { + throw new UnsupportedOperationException( + s"DSv2 read with extractPartitionValuesFromPartitionPath=true does not support " + + s"partition column '${f.name}' whose file-index type ${fileIdxField.dataType.simpleString} " + + s"differs from the table-schema type ${f.dataType.simpleString} (typical for " + + s"TimestampBasedKeyGenerator). Disable extractPartitionValuesFromPartitionPath, " + + s"unset hoodie.datasource.write.drop.partition.columns, or use the DSv1 reader.") + } + fileIdxField + } + (StructType(requiredSchema.filterNot(f => partFieldNames.contains(f.name))), + StructType(partitionFieldsInRequired)) + } else { + (requiredSchema, StructType(Nil)) + } + + val hadoopConf = spark.sessionState.newHadoopConf() + val internalSchemaOpt = fetchInternalSchema() + embedInternalSchema(hadoopConf, internalSchemaOpt) + val tableAvroSchemaOpt = fetchTableAvroSchema() + + val readerOptions = options + (FileFormat.OPTION_RETURNING_BATCH -> "false") + val reader = sparkAdapter.createParquetFileReader(false, spark.sessionState.conf, readerOptions, hadoopConf) + val broadcastReader = spark.sparkContext.broadcast(reader) + val broadcastConf = spark.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + + val fileSlicesPerPartition = fileIndex.filterFileSlices(dataFilterExprs, partitionFilterExprs) + + // Mirror the DSv1 split path: break each base file into ranges of at most + // spark.sql.files.maxPartitionBytes so large files don't become single-task + // stragglers. DSv2 only supports COW + MOR read-optimized (see + // HoodieV2ReadSupport.isSupportedByDSv2), so log files never participate and + // splitting base files is always safe. + val partitions = fileSlicesPerPartition.flatMap { case (partitionOpt, fileSlices) => + fileSlices.filter(_.getBaseFile.isPresent).flatMap { fs => + val baseFile = fs.getBaseFile.get() + val baseFilePath = baseFile.getPath + val baseFileLength = baseFile.getFileSize + val allPartValues = partitionOpt.map(_.getValues).getOrElse(Array.empty[AnyRef]) + + val partValues = if (requiredPartitionSchema.isEmpty) { + Array.empty[AnyRef] + } else { + requiredPartitionSchema.fieldNames.map { name => + val idx = fullPartSchema.fieldIndex(name) + allPartValues(idx) + } + } + + HoodieDataSourceHelper.computeSplitRanges(spark, baseFileLength).map { case (start, length) => + HoodieInputPartition(0, baseFilePath, start, length, partValues) + } + } + }.zipWithIndex.map { case (p, i) => p.copy(index = i) }.toArray[InputPartition] + + new HoodieBatchScan( + requiredSchema, + partitions, + broadcastReader, + broadcastConf, + requiredDataSchema, + requiredPartitionSchema, + internalSchemaOpt, + _pushedFilters, + pushedLimit, + tableAvroSchemaOpt) + } + + private def fetchTableAvroSchema(): HOption[HoodieSchema] = { + try { + val resolver = new TableSchemaResolver(metaClient) + // Mirror DSv1's HoodieHadoopFsRelationFactory and HoodieFileGroupReaderBasedFileFormat: + // resolve the Avro schema as of a time-travel instant when supplied so the Parquet + // SchemaRepair sees the column types that match the snapshot being read. Without this, + // SparkXXParquetReader cannot repair logical timestamp annotations and timestamp-millis + // columns surface as their physical long value. + val schema = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) + .map(HoodieSqlCommonUtils.formatQueryInstant) match { + case Some(ts) => resolver.getTableSchema(ts) + case None => resolver.getTableSchema + } + HOption.ofNullable(schema) + } catch { + case e: Exception => + log.warn("Failed to fetch table Avro schema for SchemaRepair", e) + HOption.empty[HoodieSchema]() + } + } + + private def fetchInternalSchema(): HOption[InternalSchema] = { + if (!HoodieBaseRelation.isSchemaEvolutionEnabledOnRead(options, spark)) { + HOption.empty[InternalSchema]() + } else { + try { + val resolver = new TableSchemaResolver(metaClient) + // Mirror DSv1 (HoodieBaseRelation): when a time-travel instant is supplied, fetch + // the internal schema as of that instant so schema-evolved column IDs/types match + // the snapshot being read. Otherwise fall back to the latest internal schema. + options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key) + .map(HoodieSqlCommonUtils.formatQueryInstant) match { + case Some(ts) => resolver.getTableInternalSchemaFromCommitMetadata(ts) + case None => resolver.getTableInternalSchemaFromCommitMetadata + } + } catch { + case e: Exception => + log.warn("Failed to fetch internal schema from commit metadata", e) + HOption.empty[InternalSchema]() + } + } + } + + private def embedInternalSchema(conf: org.apache.hadoop.conf.Configuration, + internalSchemaOpt: HOption[InternalSchema]): Unit = { + if (internalSchemaOpt.isPresent) { + val internalSchema = internalSchemaOpt.get + val instantFileNameGenerator = TimelineLayout.fromVersion(metaClient.getTimelineLayoutVersion) + .getInstantFileNameGenerator + val validCommits = metaClient.getActiveTimeline.getInstants.iterator.asScala + .map(instant => instantFileNameGenerator.getFileName(instant)).mkString(",") + conf.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA, SerDeHelper.toJson(internalSchema)) + conf.set(SparkInternalSchemaConverter.HOODIE_TABLE_PATH, metaClient.getBasePath.toString) + conf.set(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST, validCommits) + } + } + + private def tryComputeAggregates(aggregation: Aggregation): Option[Array[InternalRow]] = { + try { + val fileSlicesPerPartition = fileIndex.filterFileSlices(dataFilterExprs, partitionFilterExprs) + + if (dataFilterExprs.nonEmpty || hasPostScanFilters) { + None + } else { + // Collect base files as (partitionPath, fileName) pairs + val baseFiles = fileSlicesPerPartition.flatMap { case (partOpt, slices) => + slices.filter(_.getBaseFile.isPresent).map { fs => + val partPath = partOpt.map(_.getPath).getOrElse("") + val fileName = fs.getBaseFile.get().getFileName + (partPath, fileName) + } + } + + if (baseFiles.isEmpty) { + Some(Array(buildEmptyAggregateRow(aggregation))) + } else { + computeAggregatesFromStats(aggregation, baseFiles) + } + } + } catch { + case e: Exception => + log.debug("Aggregate pushdown computation failed, falling back to scan", e) + None + } + } + + private def computeAggregatesFromStats( + aggregation: Aggregation, + baseFiles: Seq[(String, String)]): Option[Array[InternalRow]] = { + val aggFuncs = aggregation.aggregateExpressions() + + // Determine which columns need stats; None if unsupported function found + val referencedColumnsOpt = aggFuncs.foldLeft(Option(Seq.empty[String])) { (accOpt, func) => + accOpt.flatMap { acc => + func match { + case _: CountStar => Some(acc) + case c: Count => extractColumnName(c.column()).map(name => acc :+ name) + case m: Min => extractColumnName(m.column()).map(name => acc :+ name) + case m: Max => extractColumnName(m.column()).map(name => acc :+ name) + case _ => None + } + } + } + + referencedColumnsOpt.flatMap { referencedColumns => + val distinctColumns = referencedColumns.distinct + + // For CountStar, prefer a user-defined record key field that's present in the + // user schema: it's an original column from the table's first commit, so its + // column stats are complete even after schema evolution. Fall back to the + // first column for keyless tables or when the record key is meta-field-only. + val countStarColumn: CountStarColumn = if (aggFuncs.exists(_.isInstanceOf[CountStar])) { + val schemaNames = tableSchema.fields.map(_.name).toSet + val recordKeyFields = metaClient.getTableConfig.getRecordKeyFields.orElse(Array.empty[String]) + recordKeyFields.find(schemaNames.contains) + .orElse(tableSchema.fields.headOption.map(_.name)) match { + case Some(name) => CountStarWithColumn(name) + case None => CountStarMissingTarget + } + } else { + CountStarAbsent + } + + countStarColumn match { + case CountStarMissingTarget => None + case resolved => + val countStarColOpt: Option[String] = resolved match { + case CountStarWithColumn(name) => Some(name) + case _ => None + } + val allColumns = (distinctColumns ++ countStarColOpt.toSeq).distinct + if (allColumns.isEmpty) { + None + } else { + queryAndComputeAggregates(aggFuncs, allColumns, countStarColOpt, baseFiles) + } + } + } + } + + private def queryAndComputeAggregates( + aggFuncs: Array[org.apache.spark.sql.connector.expressions.aggregate.AggregateFunc], + allColumns: Seq[String], + countStarColumn: Option[String], + baseFiles: Seq[(String, String)]): Option[Array[InternalRow]] = { + val metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build() + val engineContext = new HoodieLocalEngineContext(metaClient.getStorageConf) + val tableMetadata = new HoodieBackedTableMetadata( + engineContext, metaClient.getStorage, metadataConfig, metaClient.getBasePath.toString) + + try { + val filePairs = baseFiles.map { case (part, file) => Pair.of(part, file) }.asJava + + // Query stats for all needed columns; None if any column has incomplete stats + val columnStatsOpt = allColumns.foldLeft( + Option(Map.empty[String, Seq[HoodieColumnRangeMetadata[Comparable[_]]]]) + ) { (accOpt, col) => + accOpt.flatMap { acc => + val statsMap = tableMetadata.getColumnStats(filePairs, col) + if (statsMap.size() != baseFiles.size) { + None + } else { + val allFileStats = statsMap.values().asScala + .map(cs => HoodieColumnRangeMetadata.fromColumnStats(cs)).toSeq + Some(acc + (col -> allFileStats)) + } + } + } + + columnStatsOpt.flatMap { columnStats => + // Compute each aggregate value as Option; None means unsupported + val valuesOpt: Array[Option[Any]] = aggFuncs.map { + case _: CountStar => + val stats = columnStats(countStarColumn.get) + Some(stats.map(s => s.getValueCount).sum: Any) + + case c: Count => + extractColumnName(c.column()).map { colName => + val stats = columnStats(colName) + stats.map(s => s.getValueCount - s.getNullCount).sum: Any + } + + case m: Min => + for { + colName <- extractColumnName(m.column()) + colType <- tableSchema.fields.find(_.name == colName).map(_.dataType) + result <- computeMinMax(columnStats(colName), colType, isMin = true) + } yield result + + case m: Max => + for { + colName <- extractColumnName(m.column()) + colType <- tableSchema.fields.find(_.name == colName).map(_.dataType) + result <- computeMinMax(columnStats(colName), colType, isMin = false) + } yield result + + case _ => None + } + + if (valuesOpt.forall(_.isDefined)) { + Some(Array(new GenericInternalRow(valuesOpt.map(_.get)))) + } else { + None + } + } + } finally { + tableMetadata.close() + } + } + + private def computeMinMax( + allFileStats: Seq[HoodieColumnRangeMetadata[Comparable[_]]], + colType: DataType, + isMin: Boolean): Option[Any] = { + val rawValues = allFileStats.map(s => if (isMin) s.getMinValue else s.getMaxValue).filter(_ != null) + if (rawValues.isEmpty) { + Some(null) + } else { + val sparkValues = rawValues.flatMap(v => convertToSparkValue(v, colType)) + if (sparkValues.size != rawValues.size) { + None + } else { + colType match { + case ByteType => + val vs = sparkValues.map(_.asInstanceOf[Byte]) + Some(if (isMin) vs.min else vs.max) + case ShortType => + val vs = sparkValues.map(_.asInstanceOf[Short]) + Some(if (isMin) vs.min else vs.max) + case IntegerType => + val vs = sparkValues.map(_.asInstanceOf[Int]) + Some(if (isMin) vs.min else vs.max) + case LongType => + val vs = sparkValues.map(_.asInstanceOf[Long]) + Some(if (isMin) vs.min else vs.max) + case FloatType => + val vs = sparkValues.map(_.asInstanceOf[Float]).filterNot(_.isNaN) + if (vs.isEmpty) None else Some(if (isMin) vs.min else vs.max) + case DoubleType => + val vs = sparkValues.map(_.asInstanceOf[Double]).filterNot(_.isNaN) + if (vs.isEmpty) None else Some(if (isMin) vs.min else vs.max) + case StringType => + val vs = sparkValues.map(_.asInstanceOf[UTF8String]) + Some(vs.reduce((a, b) => if ((isMin && a.compareTo(b) <= 0) || (!isMin && a.compareTo(b) >= 0)) a else b)) + case _ => None + } + } + } + } + + private def buildEmptyAggregateRow(aggregation: Aggregation): InternalRow = { + val values = aggregation.aggregateExpressions().map { + case _: CountStar => 0L: Any + case _: Count => 0L: Any + case _ => null: Any + } + new GenericInternalRow(values) + } + + private def buildAggregateOutputSchema(aggregation: Aggregation): StructType = { + val fields = aggregation.aggregateExpressions().zipWithIndex.map { case (func, i) => + func match { + case _: CountStar => + StructField(s"count(*)", LongType, nullable = false) + case c: Count => + val colName = extractColumnName(c.column()).getOrElse(s"col$i") + StructField(s"count($colName)", LongType, nullable = false) + case m: Min => + val colName = extractColumnName(m.column()).getOrElse(s"col$i") + val colType = tableSchema.fields.find(_.name == colName).map(_.dataType).getOrElse(LongType) + StructField(s"min($colName)", colType, nullable = true) + case m: Max => + val colName = extractColumnName(m.column()).getOrElse(s"col$i") + val colType = tableSchema.fields.find(_.name == colName).map(_.dataType).getOrElse(LongType) + StructField(s"max($colName)", colType, nullable = true) + case _ => + StructField(s"agg$i", LongType, nullable = true) + } + } + StructType(fields) + } + + private def extractColumnName( + expr: org.apache.spark.sql.connector.expressions.Expression): Option[String] = { + expr match { + case ref: NamedReference => + val names = ref.fieldNames() + if (names.length == 1) Some(names.head) else None + case _ => None + } + } + + private def isFloatingPointColumn( + expr: org.apache.spark.sql.connector.expressions.Expression): Boolean = { + extractColumnName(expr) + .flatMap(name => tableSchema.fields.find(_.name == name).map(_.dataType)) + .exists { + case FloatType | DoubleType => true + case _ => false + } + } + + private def convertToSparkValue(value: Any, dataType: DataType): Option[Any] = { + if (value == null) { + Some(null) + } else try { + dataType match { + case BooleanType => Some(value.asInstanceOf[Boolean]) + case ByteType => Some(value.asInstanceOf[Number].byteValue()) + case ShortType => Some(value.asInstanceOf[Number].shortValue()) + case IntegerType => Some(value.asInstanceOf[Number].intValue()) + case LongType => Some(value.asInstanceOf[Number].longValue()) + case FloatType => Some(value.asInstanceOf[Number].floatValue()) + case DoubleType => Some(value.asInstanceOf[Number].doubleValue()) + case StringType => Some(UTF8String.fromString(value.toString)) + case _ => None + } + } catch { + case _: Exception => None + } + } +} + +private[v2] object HoodieScanBuilder { + sealed trait CountStarColumn + case object CountStarAbsent extends CountStarColumn + case object CountStarMissingTarget extends CountStarColumn Review Comment: 🤖 nit: `CountStarMissingTarget` is a bit opaque — 'target' isn't defined anywhere nearby. Could you rename it to something like `CountStarNoColumn` or `CountStarColumnUnavailable` to make it immediately clear that this case means 'COUNT(*) is present in the aggregation but no suitable column was found in the table schema'? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/v2/HoodieDataSourceV2.scala: ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.v2 + +import org.apache.hudi.{DataSourceWriteOptions, HoodieEmptyRelation, HoodieSparkSqlWriter} +import org.apache.hudi.exception.HoodieException + +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, SQLContext} +import org.apache.spark.sql.connector.catalog.{Table, TableProvider} +import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +import java.util + +/** + * DSv2 data source for Hudi, registered with short name "hudi_v2". + * + * Activation via DataFrame API: + * - Read: `spark.read.format("hudi_v2").load(path)` + * - Write: `df.write.format("hudi_v2").save(path)` + * + * The SQL/Catalog path is handled by [[org.apache.spark.sql.hudi.catalog.HoodieCatalog.loadTable]]. + * + * Write via DataFrame API is supported through [[CreatableRelationProvider]], which Spark uses + * as V1 fallback when the data source implements [[TableProvider]] + [[DataSourceRegister]]. + */ +class HoodieDataSourceV2 extends TableProvider with DataSourceRegister with CreatableRelationProvider { + + override def shortName(): String = "hudi_v2" + + override def inferSchema(options: CaseInsensitiveStringMap): StructType = new StructType() + + override def getTable(schema: StructType, + partitioning: Array[Transform], + properties: util.Map[String, String]): Table = { + val options = new CaseInsensitiveStringMap(properties) + val path = options.get("path") + if (path == null) { + throw new HoodieException("'path' cannot be null, missing 'path' from table properties") + } + // Read-side supportability is enforced in HoodieSparkV2Table.newScanBuilder. Throwing + // here would also block the V1-write fallback for existing MOR/bootstrap/non-Parquet + // tables, since DataFrameWriter.save calls getTable before deciding V1 vs V2. + HoodieSparkV2Table(SparkSession.active, path, options = options) + } + + override def createRelation(sqlContext: SQLContext, + mode: SaveMode, + optParams: Map[String, String], + df: DataFrame): BaseRelation = { Review Comment: 🤖 nit: the bootstrap/write success-checking block here is nearly identical to `DefaultSource.createRelation`. If the bootstrap semantics change (e.g. new error codes, new mode handling), both files need to be updated. Could you extract this into a shared helper — say a static method on `HoodieSparkSqlWriter` or a small object — so there's only one place to maintain? <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
