nsivabalan commented on a change in pull request #4789:
URL: https://github.com/apache/hudi/pull/4789#discussion_r805009590



##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
##########
@@ -18,363 +18,30 @@
 
 package org.apache.hudi
 
-import java.nio.ByteBuffer
-import java.sql.{Date, Timestamp}
-import java.time.Instant
-
-import org.apache.avro.Conversions.DecimalConversion
-import org.apache.avro.LogicalTypes.{TimestampMicros, TimestampMillis}
-import org.apache.avro.Schema.Type._
-import org.apache.avro.generic.GenericData.{Fixed, Record}
-import org.apache.avro.generic.{GenericData, GenericFixed, GenericRecord}
-import org.apache.avro.{LogicalTypes, Schema}
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.avro.SchemaConverters
-import org.apache.spark.sql.catalyst.expressions.GenericRow
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.avro.Schema
 import org.apache.spark.sql.types._
 
-import org.apache.hudi.AvroConversionUtils._
-import org.apache.hudi.exception.HoodieIncompatibleSchemaException
-
-import scala.collection.JavaConverters._
-
 object AvroConversionHelper {
 
-  private def createDecimal(decimal: java.math.BigDecimal, precision: Int, 
scale: Int): Decimal = {
-    if (precision <= Decimal.MAX_LONG_DIGITS) {
-      // Constructs a `Decimal` with an unscaled `Long` value if possible.
-      Decimal(decimal.unscaledValue().longValue(), precision, scale)
-    } else {
-      // Otherwise, resorts to an unscaled `BigInteger` instead.
-      Decimal(decimal, precision, scale)
-    }
-  }
-
   /**
-    *
-    * Returns a converter function to convert row in avro format to GenericRow 
of catalyst.
-    *
-    * @param sourceAvroSchema Source schema before conversion inferred from 
avro file by passed in
-    *                         by user.
-    * @param targetSqlType    Target catalyst sql type after the conversion.
-    * @return returns a converter function to convert row in avro format to 
GenericRow of catalyst.
+    * @deprecated please use [[AvroConversionUtils.createRowToAvroConverter]]

Review comment:
       guess you meant createAvroToRowConverter 

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
##########
@@ -82,16 +76,11 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
 
   private val fileIndex = if (commitsToReturn.isEmpty) List() else 
buildFileIndex()
 
-  private val preCombineField = {
-    val preCombineFieldFromTableConfig = 
metaClient.getTableConfig.getPreCombineField
-    if (preCombineFieldFromTableConfig != null) {
-      Some(preCombineFieldFromTableConfig)
-    } else {
+  private val preCombineFieldOpt =
+    Option(metaClient.getTableConfig.getPreCombineField)
       // get preCombineFiled from the options if this is a old table which 
have not store
       // the field to hoodie.properties
-      optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD.key)
-    }
-  }
+      .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key))

Review comment:
       guess this should be READ_PRE_COMBINE_FIELD

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -325,24 +329,33 @@ private object HoodieMergeOnReadRDD {
 
   def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: 
Configuration): HoodieMergedLogRecordScanner = {
     val fs = FSUtils.getFs(split.tablePath, config)
-    HoodieMergedLogRecordScanner.newBuilder()
-      .withFileSystem(fs)
-      .withBasePath(split.tablePath)
-      .withLogFilePaths(split.logPaths.get.asJava)
-      .withReaderSchema(logSchema)
-      .withLatestInstantTime(split.latestCommit)
-      .withReadBlocksLazily(
-        
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-          
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
-          .getOrElse(false))
-      .withReverseReader(false)
-      .withBufferSize(
-        config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
-          HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
-      .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
-      .withSpillableMapBasePath(
-        config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
-          HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
-      .build()
+
+    if (HoodieTableMetadata.isMetadataTable(split.tablePath)) {
+      val metadataConfig = 
HoodieMetadataConfig.newBuilder().enable(true).build()
+      val dataTableBasePath = 
getDataTableBasePathFromMetadataTable(split.tablePath)
+      val metadataTable = new HoodieBackedTableMetadata(new 
HoodieLocalEngineContext(config), metadataConfig, dataTableBasePath, "/tmp")
+
+      metadataTable.getLogRecordScanner(split.logPaths.get.asJava, 
"blah").getLeft

Review comment:
       can we fix "blah" ? -> "files" we are going to fetch records only from 
"files" partition right? or even for any partition ? 

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
##########
@@ -41,15 +58,91 @@ abstract class HoodieBaseRelation(
 
   protected val sparkSession: SparkSession = sqlContext.sparkSession
 
-  protected val tableAvroSchema: Schema = {
+  protected lazy val tableAvroSchema: Schema = {
     val schemaUtil = new TableSchemaResolver(metaClient)
-    Try 
(schemaUtil.getTableAvroSchema).getOrElse(SchemaConverters.toAvroType(userSchema.get))
+    Try(schemaUtil.getTableAvroSchema).getOrElse(
+      // If there is no commit in the table, we can't get the schema
+      // t/h [[TableSchemaResolver]], fallback to provided the [[userSchema]] 
instead.
+      userSchema match {
+        case Some(s) => SchemaConverters.toAvroType(s)
+        case _ => throw new IllegalArgumentException("User-provided schema is 
required in case the table is empty")
+      }
+    )
   }
 
   protected val tableStructSchema: StructType = 
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
 
   protected val partitionColumns: Array[String] = 
metaClient.getTableConfig.getPartitionFields.orElse(Array.empty)
 
-  override def schema: StructType = userSchema.getOrElse(tableStructSchema)
+  override def schema: StructType = tableStructSchema
+}
+
+object HoodieBaseRelation {
+
+  def isMetadataTable(metaClient: HoodieTableMetaClient) =
+    HoodieTableMetadata.isMetadataTable(metaClient.getBasePath)
 
+  /**
+   * Returns file-reader routine accepting [[PartitionedFile]] and returning 
an [[Iterator]]
+   * over [[InternalRow]]
+   */
+  def createBaseFileReader(spark: SparkSession,
+                           tableSchemas: HoodieTableSchemas,
+                           filters: Array[Filter],
+                           options: Map[String, String],
+                           hadoopConf: Configuration): PartitionedFile => 
Iterator[InternalRow] = {
+    val hfileReader = createHFileReader(

Review comment:
       can we move this to else block in L116. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -155,8 +151,7 @@ object HoodieSparkUtils extends SparkAdapterSupport {
     // Note: deserializer.deserializeRow(row) is not capable of handling 
evolved schema. i.e. if Row was serialized in
     // old schema, but deserializer was created with an encoder with evolved 
schema, deserialization fails.
     // Hence we always need to deserialize in the same schema as serialized 
schema.
-    df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row))
-      .mapPartitions { records =>
+    df.queryExecution.toRdd.mapPartitions { records =>

Review comment:
       prior to this patch, records were Iterator[Row] and now its 
Iterator[InternalRow]. so the transition did not cause any issues is it? bcoz, 
in L157 
   ```
   AvroConversionHelper.createConverterToAvro(reconciledDataType, structName, 
recordNamespace), 
   ```
   1st arg is 
   ```
   AvroConversionUtils.convertAvroSchemaToStructType(latestTableSchema)
   ```
   which refers to StructType. 
   can you help me understand. for Row we have a schema of type StructType, but 
for InternalRow there is no schema as such right. 
   
   
   
   

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
##########
@@ -41,15 +58,91 @@ abstract class HoodieBaseRelation(
 
   protected val sparkSession: SparkSession = sqlContext.sparkSession
 
-  protected val tableAvroSchema: Schema = {
+  protected lazy val tableAvroSchema: Schema = {
     val schemaUtil = new TableSchemaResolver(metaClient)
-    Try 
(schemaUtil.getTableAvroSchema).getOrElse(SchemaConverters.toAvroType(userSchema.get))
+    Try(schemaUtil.getTableAvroSchema).getOrElse(
+      // If there is no commit in the table, we can't get the schema
+      // t/h [[TableSchemaResolver]], fallback to provided the [[userSchema]] 
instead.
+      userSchema match {
+        case Some(s) => SchemaConverters.toAvroType(s)
+        case _ => throw new IllegalArgumentException("User-provided schema is 
required in case the table is empty")
+      }
+    )
   }
 
   protected val tableStructSchema: StructType = 
AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
 
   protected val partitionColumns: Array[String] = 
metaClient.getTableConfig.getPartitionFields.orElse(Array.empty)
 
-  override def schema: StructType = userSchema.getOrElse(tableStructSchema)
+  override def schema: StructType = tableStructSchema
+}
+
+object HoodieBaseRelation {
+
+  def isMetadataTable(metaClient: HoodieTableMetaClient) =
+    HoodieTableMetadata.isMetadataTable(metaClient.getBasePath)
 
+  /**
+   * Returns file-reader routine accepting [[PartitionedFile]] and returning 
an [[Iterator]]
+   * over [[InternalRow]]
+   */
+  def createBaseFileReader(spark: SparkSession,
+                           tableSchemas: HoodieTableSchemas,
+                           filters: Array[Filter],
+                           options: Map[String, String],
+                           hadoopConf: Configuration): PartitionedFile => 
Iterator[InternalRow] = {
+    val hfileReader = createHFileReader(
+      spark = spark,
+      tableSchemas = tableSchemas,
+      filters = filters,
+      options = options,
+      hadoopConf = hadoopConf
+    )
+    val parquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(

Review comment:
       also, can we move this to L114

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##########
@@ -96,44 +89,48 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
     log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
     log.debug(s" buildScan filters = ${filters.mkString(",")}")
 
+    // NOTE: In case list of requested columns doesn't contain the Primary Key 
one, we
+    //       have to add it explicitly so that
+    //          - Merging could be performed correctly
+    //          - In case 0 columns are to be fetched (for ex, when doing 
{@code count()} on Spark's [[Dataset]],
+    //          Spark still fetches all the rows to execute the query correctly
+    //
+    //       It's okay to return columns that have not been requested by the 
caller, as those nevertheless will be
+    //       filtered out upstream
+    val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+
     val (requiredAvroSchema, requiredStructSchema) =
-      HoodieSparkUtils.getRequiredSchema(tableAvroSchema, requiredColumns)
+      HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
     val fileIndex = buildFileIndex(filters)
-    val hoodieTableState = HoodieMergeOnReadTableState(
-      tableStructSchema,
-      requiredStructSchema,
-      tableAvroSchema.toString,
-      requiredAvroSchema.toString,
-      fileIndex,
-      preCombineField,
-      recordKeyFieldOpt
-    )
-    val fullSchemaParquetReader = 
HoodieDataSourceHelper.buildHoodieParquetReader(
-      sparkSession = sqlContext.sparkSession,
-      dataSchema = tableStructSchema,
+    val tableSchemas = HoodieTableSchemas(
+      tableSchema = tableStructSchema,
       partitionSchema = StructType(Nil),
-      requiredSchema = tableStructSchema,
-      filters = Seq.empty,
+      requiredSchema = requiredStructSchema,
+      tableAvroSchema = tableAvroSchema.toString,
+      requiredAvroSchema = requiredAvroSchema.toString
+    )
+    val tableState = HoodieMergeOnReadTableState(tableSchemas, fileIndex, 
recordKeyField, preCombineFieldOpt)
+    val fullSchemaParquetReader = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      tableSchemas = tableSchemas,
+      filters = filters,
       options = optParams,
-      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+      hadoopConf = conf
     )
-
-    val requiredSchemaParquetReader = 
HoodieDataSourceHelper.buildHoodieParquetReader(

Review comment:
       prior to this patch we had some manipulation of how we were reading 
parquet files
   ```
       file: PartitionedFile => {
         val iter = readParquetFile(file)
         val rows = iter.flatMap(_ match {
           case r: InternalRow => Seq(r)
           case b: ColumnarBatch => b.rowIterator().asScala
         })
         rows
       }
   ```
   excerpt from HoodieDataSourceHelper.buildHoodieParquetReader.
   Looks like we need this for vectorized reader. this is very critical for 
performance. May be we can embed this into 
HoodieBaseRelation,createBaseFileReader only so that all callers benefit. 

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
##########
@@ -325,24 +329,33 @@ private object HoodieMergeOnReadRDD {
 
   def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: 
Configuration): HoodieMergedLogRecordScanner = {
     val fs = FSUtils.getFs(split.tablePath, config)
-    HoodieMergedLogRecordScanner.newBuilder()
-      .withFileSystem(fs)
-      .withBasePath(split.tablePath)
-      .withLogFilePaths(split.logPaths.get.asJava)
-      .withReaderSchema(logSchema)
-      .withLatestInstantTime(split.latestCommit)
-      .withReadBlocksLazily(
-        
Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
-          
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean)
-          .getOrElse(false))
-      .withReverseReader(false)
-      .withBufferSize(
-        config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
-          HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
-      .withMaxMemorySizeInBytes(split.maxCompactionMemoryInBytes)
-      .withSpillableMapBasePath(
-        config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
-          HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
-      .build()
+
+    if (HoodieTableMetadata.isMetadataTable(split.tablePath)) {
+      val metadataConfig = 
HoodieMetadataConfig.newBuilder().enable(true).build()
+      val dataTableBasePath = 
getDataTableBasePathFromMetadataTable(split.tablePath)
+      val metadataTable = new HoodieBackedTableMetadata(new 
HoodieLocalEngineContext(config), metadataConfig, dataTableBasePath, "/tmp")
+
+      metadataTable.getLogRecordScanner(split.logPaths.get.asJava, 
"blah").getLeft

Review comment:
       for last arg, can we do config.getSpillableMapBasePath

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##########
@@ -72,19 +66,18 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
 
   private val maxCompactionMemoryInBytes = 
getMaxCompactionMemoryInBytes(jobConf)
 
-  private val preCombineField = {
-    val preCombineFieldFromTableConfig = 
metaClient.getTableConfig.getPreCombineField
-    if (preCombineFieldFromTableConfig != null) {
-      Some(preCombineFieldFromTableConfig)
-    } else {
+  private val recordKeyField = metaClient.getTableConfig.getRecordKeyFieldProp
+  private val preCombineFieldOpt =
+    Option(metaClient.getTableConfig.getPreCombineField)
       // get preCombineFiled from the options if this is a old table which 
have not store
       // the field to hoodie.properties
-      optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD.key)
-    }
-  }
-  private var recordKeyFieldOpt = Option.empty[String]
-  if (!metaClient.getTableConfig.populateMetaFields()) {
-    recordKeyFieldOpt = Option(metaClient.getTableConfig.getRecordKeyFieldProp)
+      .orElse(optParams.get(DataSourceWriteOptions.PRECOMBINE_FIELD.key))

Review comment:
       DataSourceReadOptions.READ_PRE_COMBINE_FIELD




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to