alexeykudinkin commented on a change in pull request #4789:
URL: https://github.com/apache/hudi/pull/4789#discussion_r811428239



##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/ProvidesHoodieConfig.scala
##########
@@ -71,23 +74,25 @@ trait ProvidesHoodieConfig extends Logging {
         HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> "200",
         SqlKeyGenerator.PARTITION_SCHEMA -> 
hoodieCatalogTable.partitionSchema.toDDL
       )
+        .filter { case(_, v) => v != null }

Review comment:
       Cleaning up any configs that might be nulls (Java Hashtable would fail 
if passed in any null values)

##########
File path: hudi-spark-datasource/hudi-spark2/pom.xml
##########
@@ -199,6 +199,15 @@
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-sql_${scala.binary.version}</artifactId>
       <version>${spark2.version}</version>
+      <scope>provided</scope>

Review comment:
       Good call

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
##########
@@ -96,45 +92,56 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,
     log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
     log.debug(s" buildScan filters = ${filters.mkString(",")}")
 
+    // NOTE: In case list of requested columns doesn't contain the Primary Key 
one, we
+    //       have to add it explicitly so that
+    //          - Merging could be performed correctly
+    //          - In case 0 columns are to be fetched (for ex, when doing 
{@code count()} on Spark's [[Dataset]],
+    //          Spark still fetches all the rows to execute the query correctly
+    //
+    //       It's okay to return columns that have not been requested by the 
caller, as those nevertheless will be
+    //       filtered out upstream
+    val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
+
     val (requiredAvroSchema, requiredStructSchema) =
-      HoodieSparkUtils.getRequiredSchema(tableAvroSchema, requiredColumns)
+      HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns)
     val fileIndex = buildFileIndex(filters)
-    val hoodieTableState = HoodieMergeOnReadTableState(
-      tableStructSchema,
-      requiredStructSchema,
-      tableAvroSchema.toString,
-      requiredAvroSchema.toString,
-      fileIndex,
-      preCombineField,
-      recordKeyFieldOpt
-    )
-    val fullSchemaParquetReader = 
HoodieDataSourceHelper.buildHoodieParquetReader(
-      sparkSession = sqlContext.sparkSession,
-      dataSchema = tableStructSchema,
-      partitionSchema = StructType(Nil),
-      requiredSchema = tableStructSchema,
+
+    val partitionSchema = StructType(Nil)
+    val tableSchema = HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString)
+    val requiredSchema = HoodieTableSchema(requiredStructSchema, 
requiredAvroSchema.toString)
+
+    val fullSchemaParquetReader = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      partitionSchema = partitionSchema,
+      tableSchema = tableSchema,
+      requiredSchema = tableSchema,
+      // This file-reader is used to read base file records, subsequently 
merging them with the records
+      // stored in delta-log files. As such, we have to read _all_ records 
from the base file, while avoiding
+      // applying any filtering _before_ we complete combining them w/ 
delta-log records (to make sure that
+      // we combine them correctly)
       filters = Seq.empty,
       options = optParams,
-      hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
+      //       to configure Parquet reader appropriately
+      hadoopConf = new Configuration(conf)

Review comment:
       Sorry, not sure that fully understood your point. Previously we're 
actually copying the Hadoop Conf from Spark  (with `hadoopConf = 
sqlContext.sparkSession.sessionState.newHadoopConf()`), but it wasn't clarified 
what was its purpose and i got burnt by it, so rectifying problem at the root 
adding the comment here.

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala
##########
@@ -56,7 +57,7 @@ case class InsertIntoHoodieTableCommand(
   }
 }
 
-object InsertIntoHoodieTableCommand extends Logging {
+object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig {

Review comment:
       This is simply removing duplication of the identical method building 
Hudi's write config and re-using the trait

##########
File path: 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
##########
@@ -75,84 +77,89 @@ class MergeOnReadIncrementalRelation(sqlContext: SQLContext,
 
   private val fileIndex = if (commitsToReturn.isEmpty) List() else 
buildFileIndex()
 
-  private val preCombineField = {
-    val preCombineFieldFromTableConfig = 
metaClient.getTableConfig.getPreCombineField
-    if (preCombineFieldFromTableConfig != null) {
-      Some(preCombineFieldFromTableConfig)
-    } else {
-      // get preCombineFiled from the options if this is a old table which 
have not store
-      // the field to hoodie.properties
-      optParams.get(DataSourceReadOptions.READ_PRE_COMBINE_FIELD.key)
-    }
-  }
+  private val preCombineFieldOpt = getPrecombineFieldProperty
 
-  override def needConversion: Boolean = false
+  // Record filters making sure that only records w/in the requested bounds 
are being fetched as part of the
+  // scan collected by this relation
+  private lazy val incrementalSpanRecordsFilters: Seq[Filter] = {
+    val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
+    val largerThanFilter = 
GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
commitsToReturn.head.getTimestamp)
+    val lessThanFilter = 
LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, 
commitsToReturn.last.getTimestamp)
+    Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
+  }
 
-  override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {

Review comment:
       `unhandledFilters` is assumed to return **subset** of the filters that 
were passed to it as the ones that relation was not able to handle.
   
   In this case Relation was actually returning additional filters that were 
never passed in -- which is very misleading, since these filters **will not 
be** applied, but previous implementation actually left such an impression 

##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -133,46 +129,49 @@ object HoodieSparkUtils extends SparkAdapterSupport {
     new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, 
fileStatusCache)
   }
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
reconcileToLatestSchema: Boolean, latestTableSchema:
-  org.apache.hudi.common.util.Option[Schema] = 
org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
-    val dfWriteSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
recordNamespace)
-    var writeSchema : Schema = null;
-    var toReconcileSchema : Schema = null;
-    if (reconcileToLatestSchema && latestTableSchema.isPresent) {
-      // if reconcileToLatestSchema is set to true and latestSchema is 
present, then try to leverage latestTableSchema.
-      // this code path will handle situations where records are serialized in 
odl schema, but callers wish to convert
-      // to Rdd[GenericRecord] using different schema(could be evolved schema 
or could be latest table schema)
-      writeSchema = dfWriteSchema
-      toReconcileSchema = latestTableSchema.get()
-    } else {
-      // there are paths where callers wish to use latestTableSchema to 
convert to Rdd[GenericRecords] and not use
-      // row's schema. So use latestTableSchema if present. if not available, 
fallback to using row's schema.
-      writeSchema = if (latestTableSchema.isPresent) { 
latestTableSchema.get()} else { dfWriteSchema}
-    }
-    createRddInternal(df, writeSchema, toReconcileSchema, structName, 
recordNamespace)
+  /**
+   * @deprecated please use other overload [[createRdd]]
+   */
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
reconcileToLatestSchema: Boolean,
+                latestTableSchema: org.apache.hudi.common.util.Option[Schema] 
= org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
+    val latestTableSchemaConverted = if (latestTableSchema.isPresent && 
reconcileToLatestSchema) Some(latestTableSchema.get()) else None
+    createRdd(df, structName, recordNamespace, latestTableSchemaConverted)
   }
 
-  def createRddInternal(df: DataFrame, writeSchema: Schema, latestTableSchema: 
Schema, structName: String, recordNamespace: String)
-  : RDD[GenericRecord] = {
-    // Use the write avro schema to derive the StructType which has the 
correct nullability information
-    val writeDataType = 
AvroConversionUtils.convertAvroSchemaToStructType(writeSchema)
-    val encoder = RowEncoder.apply(writeDataType).resolveAndBind()
-    val deserializer = sparkAdapter.createSparkRowSerDe(encoder)
-    // if records were serialized with old schema, but an evolved schema was 
passed in with latestTableSchema, we need
-    // latestTableSchema equivalent datatype to be passed in to 
AvroConversionHelper.createConverterToAvro()
-    val reconciledDataType =
-      if (latestTableSchema != null) 
AvroConversionUtils.convertAvroSchemaToStructType(latestTableSchema) else 
writeDataType
-    // Note: deserializer.deserializeRow(row) is not capable of handling 
evolved schema. i.e. if Row was serialized in
-    // old schema, but deserializer was created with an encoder with evolved 
schema, deserialization fails.
-    // Hence we always need to deserialize in the same schema as serialized 
schema.
-    df.queryExecution.toRdd.map(row => deserializer.deserializeRow(row))
-      .mapPartitions { records =>
-        if (records.isEmpty) Iterator.empty
-        else {
-          val convertor = 
AvroConversionHelper.createConverterToAvro(reconciledDataType, structName, 
recordNamespace)
-          records.map { x => convertor(x).asInstanceOf[GenericRecord] }
-        }
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
readerAvroSchemaOpt: Option[Schema]): RDD[GenericRecord] = {
+    val writerSchema = df.schema
+    val writerAvroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(writerSchema, structName, 
recordNamespace)
+    val readerAvroSchema = readerAvroSchemaOpt.getOrElse(writerAvroSchema)
+    // We check whether passed in reader schema is identical to writer schema 
to avoid costly serde loop of
+    // making Spark deserialize its internal representation [[InternalRow]] 
into [[Row]] for subsequent conversion
+    // (and back)
+    val sameSchema = writerAvroSchema.equals(readerAvroSchema)
+    val (nullable, _) = 
AvroConversionUtils.resolveAvroTypeNullability(writerAvroSchema)
+
+    // NOTE: We have to serialize Avro schema, and then subsequently parse it 
on the executor node, since Spark
+    //       serializer is not able to digest it
+    val readerAvroSchemaStr = readerAvroSchema.toString
+    val writerAvroSchemaStr = writerAvroSchema.toString
+    // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to 
[[Row]] conversion
+    df.queryExecution.toRdd.mapPartitions { rows =>
+      if (rows.isEmpty) {
+        Iterator.empty
+      } else {
+        val transform: GenericRecord => GenericRecord =
+          if (sameSchema) identity
+          else {
+            val readerAvroSchema = new 
Schema.Parser().parse(readerAvroSchemaStr)
+            rewriteRecord(_, readerAvroSchema)

Review comment:
       So this actually would be faster: 
   
   - Before, we're converting `InternalRow` > `Row` > Avro
   - After, we're doing `InternalRow` > `Avro`, and then rewrite only if schema 
is actually divergent
   
   So in best case (when there's no schema change) we save one full 
hierarchical traversal, in the worst case these conversions are identical




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to