alexeykudinkin commented on a change in pull request #4789:
URL: https://github.com/apache/hudi/pull/4789#discussion_r806235304



##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##########
@@ -96,44 +89,48 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
     log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
     log.debug(s" buildScan filters = ${filters.mkString(",")}")
 
+    // NOTE: In case list of requested columns doesn't contain the Primary Key 
one, we
+    //       have to add it explicitly so that
+    //          - Merging could be performed correctly
+    //          - In case 0 columns are to be fetched (for ex, when doing 
{@code count()} on Spark's [[Dataset]],
+    //          Spark still fetches all the rows to execute the query correctly
+    //
+    //       It's okay to return columns that have not been requested by the 
caller, as those nevertheless will be
+    //       filtered out upstream
+    val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+
     val (requiredAvroSchema, requiredStructSchema) =
-      HoodieSparkUtils.getRequiredSchema(tableAvroSchema, requiredColumns)
+      HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
     val fileIndex = buildFileIndex(filters)
-    val hoodieTableState = HoodieMergeOnReadTableState(
-      tableStructSchema,
-      requiredStructSchema,
-      tableAvroSchema.toString,
-      requiredAvroSchema.toString,
-      fileIndex,
-      preCombineField,
-      recordKeyFieldOpt
-    )
-    val fullSchemaParquetReader = 
HoodieDataSourceHelper.buildHoodieParquetReader(
-      sparkSession = sqlContext.sparkSession,
-      dataSchema = tableStructSchema,
+    val tableSchemas = HoodieTableSchemas(
+      tableSchema = tableStructSchema,
       partitionSchema = StructType(Nil),
-      requiredSchema = tableStructSchema,
-      filters = Seq.empty,
+      requiredSchema = requiredStructSchema,
+      tableAvroSchema = tableAvroSchema.toString,
+      requiredAvroSchema = requiredAvroSchema.toString
+    )
+    val tableState = HoodieMergeOnReadTableState(tableSchemas, fileIndex, 
recordKeyField, preCombineFieldOpt)
+    val fullSchemaParquetReader = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      tableSchemas = tableSchemas,
+      filters = filters,
       options = optParams,
-      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+      hadoopConf = conf
     )
-
-    val requiredSchemaParquetReader = 
HoodieDataSourceHelper.buildHoodieParquetReader(

Review comment:
       Good catch! Fixed the ref to use method from `DataSourceHelper`

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -325,24 +329,33 @@ private object HoodieMergeOnReadRDD {
 
   def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: 
Configuration): HoodieMergedLogRecordScanner = {
     val fs = FSUtils.getFs(split.tablePath, config)
-    HoodieMergedLogRecordScanner.newBuilder()
-      .withFileSystem(fs)
-      .withBasePath(split.tablePath)
-      .withLogFilePaths(split.logPaths.get.asJava)
-      .withReaderSchema(logSchema)
-      .withLatestInstantTime(split.latestCommit)
-      .withReadBlocksLazily(
-        
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-          
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
-          .getOrElse(false))
-      .withReverseReader(false)
-      .withBufferSize(
-        config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
-          HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
-      .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
-      .withSpillableMapBasePath(
-        config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
-          HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
-      .build()
+
+    if (HoodieTableMetadata.isMetadataTable(split.tablePath)) {
+      val metadataConfig = 
HoodieMetadataConfig.newBuilder().enable(true).build()
+      val dataTableBasePath = 
getDataTableBasePathFromMetadataTable(split.tablePath)
+      val metadataTable = new HoodieBackedTableMetadata(new 
HoodieLocalEngineContext(config), metadataConfig, dataTableBasePath, "/tmp")
+
+      metadataTable.getLogRecordScanner(split.logPaths.get.asJava, 
"blah").getLeft

Review comment:
       > can we fix "blah" ? -> "files" we are going to fetch records only from 
"files" partition right? or even for any partition ?
   
   Yeah, we're fetching all partitions currently and we need to figure out a 
way how we want to expose these partitions since they're practically have 
different schemas

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
##########
@@ -82,16 +76,11 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
 
   private val fileIndex = if (commitsToReturn.isEmpty) List() else 
buildFileIndex()
 
-  private val preCombineField = {
-    val preCombineFieldFromTableConfig = 
metaClient.getTableConfig.getPreCombineField
-    if (preCombineFieldFromTableConfig != null) {
-      Some(preCombineFieldFromTableConfig)
-    } else {
+  private val preCombineFieldOpt =
+    Option(metaClient.getTableConfig.getPreCombineField)
       // get preCombineFiled from the options if this is a old table which 
have not store
       // the field to hoodie.properties
-      optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD.key)
-    }
-  }
+      .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key))

Review comment:
       They refer to the same key def




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to