codope commented on code in PR #12622: URL: https://github.com/apache/hudi/pull/12622#discussion_r1920078322
########## hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark33OrcReader.scala: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} +import org.apache.hudi.common.util +import org.apache.hudi.internal.schema.InternalSchema +import org.apache.orc.mapred.OrcStruct +import org.apache.orc.mapreduce.OrcInputFormat +import org.apache.orc.{OrcConf, OrcFile, TypeDescription} +import org.apache.spark.TaskContext +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.HoodieSpark33SchemaUtils.toAttributes +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection +import org.apache.spark.sql.execution.datasources.orc._ +import org.apache.spark.sql.execution.datasources.parquet._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.Utils + +class Spark33OrcReader(enableVectorizedReader: Boolean, Review Comment: please add a javadoc for the class. In the javadoc, please also include the permalink off which this code has been copied from Spark. Would be helpful for future maintainability. ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala: ########## @@ -277,7 +279,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState, } } - private def readBaseFile(file: PartitionedFile, parquetFileReader: SparkParquetReader, requestedSchema: StructType, + private def readBaseFile(file: PartitionedFile, parquetFileReader: SparkFileReader, requestedSchema: StructType, Review Comment: This is what I had in mind: we can rename this class to `HoodieFileGroupReaderBasedFileFormat` and make this method extensible. From this class, we have two child classes for HoodieFileGroupReaderBasedFileParquetFormat andHoodieFileGroupReaderBasedOrcFileFormat, which only need to override `readBaseFile`. That way we retain all the functionalities. ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieFileGroupReaderBasedFileFormat.scala: ########## @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.Job +import org.apache.hudi.avro.AvroSchemaUtils +import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation, HoodieCDCFileGroupSplit} +import org.apache.hudi.client.utils.SparkInternalSchemaConverter +import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.read.HoodieFileGroupReader +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.internal.schema.InternalSchema +import org.apache.hudi.storage.StorageConfiguration +import org.apache.hudi.storage.hadoop.{HadoopStorageConfiguration, HoodieHadoopStorage} +import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, HoodieTableSchema, HoodieTableState, SparkAdapterSupport, SparkFileFormatInternalRowReaderContext} +import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, SparkFileReader} +import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchUtils} +import org.apache.spark.util.SerializableConfiguration + +import java.io.Closeable +import scala.collection.mutable + +trait HoodieFormatTrait { + + // Used so that the planner only projects once and does not stack overflow + var isProjected: Boolean = false + def getRequiredFilters: Seq[Filter] +} + +/** + * This class utilizes {@link HoodieFileGroupReader} and its related classes to support reading + * from Parquet formatted base files and their log files. + */ +class HoodieFileGroupReaderBasedFileFormat(tableState: HoodieTableState, + tableSchema: HoodieTableSchema, + tableName: String, + mergeType: String, + mandatoryFields: Seq[String], + isMOR: Boolean, + isBootstrap: Boolean, + isIncremental: Boolean, + isCDC: Boolean, + validCommits: String, + shouldUseRecordPosition: Boolean, + requiredFilters: Seq[Filter]) + extends FileFormat with SparkAdapterSupport with HoodieFormatTrait with Serializable { + /** + * Support batch needs to remain consistent, even if one side of a bootstrap merge can support + * while the other side can't + */ + private var supportBatchCalled = false + private var supportBatchResult = false + + def getRequiredFilters: Seq[Filter] = requiredFilters + + override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { + if (!supportBatchCalled || supportBatchResult) { + supportBatchCalled = true + supportBatchResult = !isMOR && !isIncremental && !isBootstrap && super.supportBatch(sparkSession, schema) + } + supportBatchResult + } + + override def isSplitable(sparkSession: SparkSession, + options: Map[String, String], + path: Path): Boolean = false + + //for partition columns that we read from the file, we don't want them to be constant column vectors so we + //modify the vector types in this scenario + override def vectorTypes(requiredSchema: StructType, + partitionSchema: StructType, + sqlConf: SQLConf): Option[Seq[String]] = { + val originalVectorTypes = super.vectorTypes(requiredSchema, partitionSchema, sqlConf) + if (mandatoryFields.isEmpty) { + originalVectorTypes + } else { + val regularVectorType = if (!sqlConf.offHeapColumnVectorEnabled) { + classOf[OnHeapColumnVector].getName + } else { + classOf[OffHeapColumnVector].getName + } + originalVectorTypes.map { + o: Seq[String] => o.zipWithIndex.map(a => { + if (a._2 >= requiredSchema.length && mandatoryFields.contains(partitionSchema.fields(a._2 - requiredSchema.length).name)) { + regularVectorType + } else { + a._1 + } + }) + } + } + } + + private val sanitizedTableName = AvroSchemaUtils.getAvroRecordQualifiedName(tableName) + + private lazy val internalSchemaOpt: org.apache.hudi.common.util.Option[InternalSchema] = if (tableSchema.internalSchema.isEmpty) { + org.apache.hudi.common.util.Option.empty() + } else { + org.apache.hudi.common.util.Option.of(tableSchema.internalSchema.get) + } + + override def buildReaderWithPartitionValues(spark: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + val outputSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) + val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental + val augmentedStorageConf = new HadoopStorageConfiguration(hadoopConf).getInline + setSchemaEvolutionConfigs(augmentedStorageConf) + val (remainingPartitionSchemaArr, fixedPartitionIndexesArr) = partitionSchema.fields.toSeq.zipWithIndex.filter(p => !mandatoryFields.contains(p._1.name)).unzip + + // The schema of the partition cols we want to append the value instead of reading from the file + val remainingPartitionSchema = StructType(remainingPartitionSchemaArr) + + // index positions of the remainingPartitionSchema fields in partitionSchema + val fixedPartitionIndexes = fixedPartitionIndexesArr.toSet + + // schema that we want fg reader to output to us + val requestedSchema = StructType(requiredSchema.fields ++ partitionSchema.fields.filter(f => mandatoryFields.contains(f.name))) + val requestedAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema, sanitizedTableName) + val dataAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, sanitizedTableName) + val broadcastedParquetFileReader = spark.sparkContext.broadcast( + sparkAdapter.createParquetFileReader(supportBatchResult, spark.sessionState.conf, options, augmentedStorageConf.unwrap())) + val broadcastedOrcFileReader = spark.sparkContext.broadcast( + sparkAdapter.createOrcFileReader(supportBatchCalled, spark.sessionState.conf, options, augmentedStorageConf.unwrap())) Review Comment: Can you please run a test in cluster mode to ensure everything being broadcasted here can actually be broadcasted? All our tests run locally, so we won'y be able to catch any serde issues die to broadcast. ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieFileGroupReaderBasedFileFormat.scala: ########## @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapreduce.Job +import org.apache.hudi.avro.AvroSchemaUtils +import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation, HoodieCDCFileGroupSplit} +import org.apache.hudi.client.utils.SparkInternalSchemaConverter +import org.apache.hudi.common.config.TypedProperties +import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.table.read.HoodieFileGroupReader +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.internal.schema.InternalSchema +import org.apache.hudi.storage.StorageConfiguration +import org.apache.hudi.storage.hadoop.{HadoopStorageConfiguration, HoodieHadoopStorage} +import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, HoodieTableSchema, HoodieTableState, SparkAdapterSupport, SparkFileFormatInternalRowReaderContext} +import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.JoinedRow +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, SparkFileReader} +import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchUtils} +import org.apache.spark.util.SerializableConfiguration + +import java.io.Closeable +import scala.collection.mutable + +trait HoodieFormatTrait { + + // Used so that the planner only projects once and does not stack overflow + var isProjected: Boolean = false + def getRequiredFilters: Seq[Filter] +} + +/** + * This class utilizes {@link HoodieFileGroupReader} and its related classes to support reading + * from Parquet formatted base files and their log files. + */ +class HoodieFileGroupReaderBasedFileFormat(tableState: HoodieTableState, + tableSchema: HoodieTableSchema, + tableName: String, + mergeType: String, + mandatoryFields: Seq[String], + isMOR: Boolean, + isBootstrap: Boolean, + isIncremental: Boolean, + isCDC: Boolean, + validCommits: String, + shouldUseRecordPosition: Boolean, + requiredFilters: Seq[Filter]) + extends FileFormat with SparkAdapterSupport with HoodieFormatTrait with Serializable { + /** + * Support batch needs to remain consistent, even if one side of a bootstrap merge can support + * while the other side can't + */ + private var supportBatchCalled = false + private var supportBatchResult = false + + def getRequiredFilters: Seq[Filter] = requiredFilters + + override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = { + if (!supportBatchCalled || supportBatchResult) { + supportBatchCalled = true + supportBatchResult = !isMOR && !isIncremental && !isBootstrap && super.supportBatch(sparkSession, schema) + } + supportBatchResult + } + + override def isSplitable(sparkSession: SparkSession, + options: Map[String, String], + path: Path): Boolean = false + + //for partition columns that we read from the file, we don't want them to be constant column vectors so we + //modify the vector types in this scenario + override def vectorTypes(requiredSchema: StructType, + partitionSchema: StructType, + sqlConf: SQLConf): Option[Seq[String]] = { + val originalVectorTypes = super.vectorTypes(requiredSchema, partitionSchema, sqlConf) + if (mandatoryFields.isEmpty) { + originalVectorTypes + } else { + val regularVectorType = if (!sqlConf.offHeapColumnVectorEnabled) { + classOf[OnHeapColumnVector].getName + } else { + classOf[OffHeapColumnVector].getName + } + originalVectorTypes.map { + o: Seq[String] => o.zipWithIndex.map(a => { + if (a._2 >= requiredSchema.length && mandatoryFields.contains(partitionSchema.fields(a._2 - requiredSchema.length).name)) { + regularVectorType + } else { + a._1 + } + }) + } + } + } + + private val sanitizedTableName = AvroSchemaUtils.getAvroRecordQualifiedName(tableName) + + private lazy val internalSchemaOpt: org.apache.hudi.common.util.Option[InternalSchema] = if (tableSchema.internalSchema.isEmpty) { + org.apache.hudi.common.util.Option.empty() + } else { + org.apache.hudi.common.util.Option.of(tableSchema.internalSchema.get) + } + + override def buildReaderWithPartitionValues(spark: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + val outputSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) + val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental + val augmentedStorageConf = new HadoopStorageConfiguration(hadoopConf).getInline + setSchemaEvolutionConfigs(augmentedStorageConf) + val (remainingPartitionSchemaArr, fixedPartitionIndexesArr) = partitionSchema.fields.toSeq.zipWithIndex.filter(p => !mandatoryFields.contains(p._1.name)).unzip + + // The schema of the partition cols we want to append the value instead of reading from the file + val remainingPartitionSchema = StructType(remainingPartitionSchemaArr) + + // index positions of the remainingPartitionSchema fields in partitionSchema + val fixedPartitionIndexes = fixedPartitionIndexesArr.toSet + + // schema that we want fg reader to output to us + val requestedSchema = StructType(requiredSchema.fields ++ partitionSchema.fields.filter(f => mandatoryFields.contains(f.name))) + val requestedAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema, sanitizedTableName) + val dataAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, sanitizedTableName) + val broadcastedParquetFileReader = spark.sparkContext.broadcast( + sparkAdapter.createParquetFileReader(supportBatchResult, spark.sessionState.conf, options, augmentedStorageConf.unwrap())) + val broadcastedOrcFileReader = spark.sparkContext.broadcast( + sparkAdapter.createOrcFileReader(supportBatchCalled, spark.sessionState.conf, options, augmentedStorageConf.unwrap())) + val broadcastedStorageConf = spark.sparkContext.broadcast(new SerializableConfiguration(augmentedStorageConf.unwrap())) + val fileIndexProps: TypedProperties = HoodieFileIndex.getConfigProperties(spark, options, null) + + (file: PartitionedFile) => { + val storageConf = new HadoopStorageConfiguration(broadcastedStorageConf.value.value) + file.partitionValues match { + // Snapshot or incremental queries. + case fileSliceMapping: HoodiePartitionFileSliceMapping => + val filegroupName = FSUtils.getFileIdFromFilePath(sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)) + fileSliceMapping.getSlice(filegroupName) match { + case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty || fileSlice.getLogFiles.findAny().isPresent) => + val fileReaders = new java.util.HashMap[String, SparkFileReader]() + fileReaders.put("orc", broadcastedOrcFileReader.value) + fileReaders.put("parquet", broadcastedParquetFileReader.value) + val readerContext = new SparkFileFormatInternalRowReaderContext(fileReaders, filters, requiredFilters) + val metaClient: HoodieTableMetaClient = HoodieTableMetaClient + .builder().setConf(storageConf).setBasePath(tableState.tablePath).build + val props = metaClient.getTableConfig.getProps + options.foreach(kv => props.setProperty(kv._1, kv._2)) + val reader = new HoodieFileGroupReader[InternalRow]( + readerContext, + new HoodieHadoopStorage(metaClient.getBasePath, storageConf), + tableState.tablePath, + tableState.latestCommitTimestamp.get, + fileSlice, + dataAvroSchema, + requestedAvroSchema, + internalSchemaOpt, + metaClient, + props, + file.start, + file.length, + shouldUseRecordPosition) + reader.initRecordIterators() + // Append partition values to rows and project to output schema + appendPartitionAndProject( + reader.getClosableIterator, + requestedSchema, + remainingPartitionSchema, + outputSchema, + fileSliceMapping.getPartitionValues, + fixedPartitionIndexes) + + case _ => + val filePath = sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file) + val baseFileFormat = detectFileFormat(filePath.toString) + baseFileFormat match { + case "parquet" => readBaseFile(file, broadcastedParquetFileReader.value, requestedSchema, remainingPartitionSchema, fixedPartitionIndexes, + requiredSchema, partitionSchema, outputSchema, filters, storageConf) + case "orc" => readBaseFile(file, broadcastedOrcFileReader.value, requestedSchema, remainingPartitionSchema, fixedPartitionIndexes, + requiredSchema, partitionSchema, outputSchema, filters, storageConf) Review Comment: I think this is main part which differs from the `HoodieFileGroupReaderBasedParquetFileFormat`. If so, can we not make `HoodieFileGroupReaderBasedParquetFileFormat` a subclass of this class and just implement the `buildReaderWithPartitionValues`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
