alexeykudinkin commented on a change in pull request #4789:
URL: https://github.com/apache/hudi/pull/4789#discussion_r806235304
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##########
@@ -96,44 +89,48 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
log.debug(s" buildScan filters = ${filters.mkString(",")}")
+ // NOTE: In case list of requested columns doesn't contain the Primary Key
one, we
+ // have to add it explicitly so that
+ // - Merging could be performed correctly
+ // - In case 0 columns are to be fetched (for ex, when doing
{@code count()} on Spark's [[Dataset]],
+ // Spark still fetches all the rows to execute the query correctly
+ //
+ // It's okay to return columns that have not been requested by the
caller, as those nevertheless will be
+ // filtered out upstream
+ val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+
val (requiredAvroSchema, requiredStructSchema) =
- HoodieSparkUtils.getRequiredSchema(tableAvroSchema, requiredColumns)
+ HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
val fileIndex = buildFileIndex(filters)
- val hoodieTableState = HoodieMergeOnReadTableState(
- tableStructSchema,
- requiredStructSchema,
- tableAvroSchema.toString,
- requiredAvroSchema.toString,
- fileIndex,
- preCombineField,
- recordKeyFieldOpt
- )
- val fullSchemaParquetReader =
HoodieDataSourceHelper.buildHoodieParquetReader(
- sparkSession = sqlContext.sparkSession,
- dataSchema = tableStructSchema,
+ val tableSchemas = HoodieTableSchemas(
+ tableSchema = tableStructSchema,
partitionSchema = StructType(Nil),
- requiredSchema = tableStructSchema,
- filters = Seq.empty,
+ requiredSchema = requiredStructSchema,
+ tableAvroSchema = tableAvroSchema.toString,
+ requiredAvroSchema = requiredAvroSchema.toString
+ )
+ val tableState = HoodieMergeOnReadTableState(tableSchemas, fileIndex,
recordKeyField, preCombineFieldOpt)
+ val fullSchemaParquetReader = createBaseFileReader(
+ spark = sqlContext.sparkSession,
+ tableSchemas = tableSchemas,
+ filters = filters,
options = optParams,
- hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ hadoopConf = conf
)
-
- val requiredSchemaParquetReader =
HoodieDataSourceHelper.buildHoodieParquetReader(
Review comment:
Good catch! Fixed the ref to use method from `DataSourceHelper`
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -325,24 +329,33 @@ private object HoodieMergeOnReadRDD {
def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config:
Configuration): HoodieMergedLogRecordScanner = {
val fs = FSUtils.getFs(split.tablePath, config)
- HoodieMergedLogRecordScanner.newBuilder()
- .withFileSystem(fs)
- .withBasePath(split.tablePath)
- .withLogFilePaths(split.logPaths.get.asJava)
- .withReaderSchema(logSchema)
- .withLatestInstantTime(split.latestCommit)
- .withReadBlocksLazily(
-
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
- .getOrElse(false))
- .withReverseReader(false)
- .withBufferSize(
- config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
- HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
- .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
- .withSpillableMapBasePath(
- config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
- HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
- .build()
+
+ if (HoodieTableMetadata.isMetadataTable(split.tablePath)) {
+ val metadataConfig =
HoodieMetadataConfig.newBuilder().enable(true).build()
+ val dataTableBasePath =
getDataTableBasePathFromMetadataTable(split.tablePath)
+ val metadataTable = new HoodieBackedTableMetadata(new
HoodieLocalEngineContext(config), metadataConfig, dataTableBasePath, "/tmp")
+
+ metadataTable.getLogRecordScanner(split.logPaths.get.asJava,
"blah").getLeft
Review comment:
> can we fix "blah" ? -> "files" we are going to fetch records only from
"files" partition right? or even for any partition ?
Yeah, we're fetching all partitions currently and we need to figure out a
way how we want to expose these partitions since they're practically have
different schemas
##########
File path:
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
##########
@@ -82,16 +76,11 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
private val fileIndex = if (commitsToReturn.isEmpty) List() else
buildFileIndex()
- private val preCombineField = {
- val preCombineFieldFromTableConfig =
metaClient.getTableConfig.getPreCombineField
- if (preCombineFieldFromTableConfig != null) {
- Some(preCombineFieldFromTableConfig)
- } else {
+ private val preCombineFieldOpt =
+ Option(metaClient.getTableConfig.getPreCombineField)
// get preCombineFiled from the options if this is a old table which
have not store
// the field to hoodie.properties
- optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD.key)
- }
- }
+ .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key))
Review comment:
They refer to the same key def
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]