nsivabalan commented on a change in pull request #4789:
URL: https://github.com/apache/hudi/pull/4789#discussion_r812940541
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##########
@@ -96,45 +92,56 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
log.debug(s" buildScan filters = ${filters.mkString(",")}")
+ // NOTE: In case list of requested columns doesn't contain the Primary Key
one, we
+ // have to add it explicitly so that
+ // - Merging could be performed correctly
+ // - In case 0 columns are to be fetched (for ex, when doing
{@code count()} on Spark's [[Dataset]],
+ // Spark still fetches all the rows to execute the query correctly
+ //
+ // It's okay to return columns that have not been requested by the
caller, as those nevertheless will be
+ // filtered out upstream
+ val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+
val (requiredAvroSchema, requiredStructSchema) =
- HoodieSparkUtils.getRequiredSchema(tableAvroSchema, requiredColumns)
+ HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
val fileIndex = buildFileIndex(filters)
- val hoodieTableState = HoodieMergeOnReadTableState(
- tableStructSchema,
- requiredStructSchema,
- tableAvroSchema.toString,
- requiredAvroSchema.toString,
- fileIndex,
- preCombineField,
- recordKeyFieldOpt
- )
- val fullSchemaParquetReader =
HoodieDataSourceHelper.buildHoodieParquetReader(
- sparkSession = sqlContext.sparkSession,
- dataSchema = tableStructSchema,
- partitionSchema = StructType(Nil),
- requiredSchema = tableStructSchema,
+
+ val partitionSchema = StructType(Nil)
+ val tableSchema = HoodieTableSchema(tableStructSchema,
tableAvroSchema.toString)
+ val requiredSchema = HoodieTableSchema(requiredStructSchema,
requiredAvroSchema.toString)
+
+ val fullSchemaParquetReader = createBaseFileReader(
+ spark = sqlContext.sparkSession,
+ partitionSchema = partitionSchema,
+ tableSchema = tableSchema,
+ requiredSchema = tableSchema,
+ // This file-reader is used to read base file records, subsequently
merging them with the records
+ // stored in delta-log files. As such, we have to read _all_ records
from the base file, while avoiding
+ // applying any filtering _before_ we complete combining them w/
delta-log records (to make sure that
+ // we combine them correctly)
filters = Seq.empty,
options = optParams,
- hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ // NOTE: We have to fork the Hadoop Config here as Spark will be
modifying it
+ // to configure Parquet reader appropriately
+ hadoopConf = new Configuration(conf)
Review comment:
I get it. I mistook that
sqlContext.sparkSession.sessionState.newHadoopConf() returns an empty
Configuration. but guess its returning a copy so that any modification will not
affect the original hadoop conf.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]