nsivabalan commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1024051219


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -347,6 +378,95 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  /**
+   * Deduces writer's schema based on
+   * <ul>
+   *   <li>Source's schema</li>
+   *   <li>Target table's schema (including Hudi's [[InternalSchema]] 
representation)</li>
+   * </ul>
+   */
+  def deduceWriterSchema(sourceSchema: Schema,
+                         latestTableSchemaOpt: Option[Schema],
+                         internalSchemaOpt: Option[InternalSchema],
+                         opts: Map[String, String]): Schema = {
+    val shouldReconcileSchema = 
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean

Review Comment:
   we can pass this as an arg since its already deduced in L191



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -347,6 +378,95 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  /**
+   * Deduces writer's schema based on
+   * <ul>
+   *   <li>Source's schema</li>
+   *   <li>Target table's schema (including Hudi's [[InternalSchema]] 
representation)</li>
+   * </ul>
+   */
+  def deduceWriterSchema(sourceSchema: Schema,
+                         latestTableSchemaOpt: Option[Schema],
+                         internalSchemaOpt: Option[InternalSchema],
+                         opts: Map[String, String]): Schema = {
+    val shouldReconcileSchema = 
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+    val shouldValidateSchemasCompatibility = 
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
+      HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
+
+    latestTableSchemaOpt match {

Review Comment:
   Question about canonicalization. I see a comment from @xiarixiaoyao  in java 
docs that spark-sql could convert nullable to non-nullable. So, even for 1st 
commit, don't we need to fix nullability. From master and in this patch, I 
don't see we canonicalize if its first commit. 
   
   also on a related note, in master, we canonicalize only when reconcile 
schema is disabled. why is that? shouldn't we canonicalize in all flows? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -172,23 +182,63 @@ object HoodieSparkSqlWriter {
       }
 
       val commitActionType = CommitUtils.getCommitActionType(operation, 
tableConfig.getTableType)
-      val dropPartitionColumns = 
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
 
-      // short-circuit if bulk_insert via row is enabled.
+      // Register Avro classes ([[Schema]], [[GenericData]]) w/ Kryo
+      sparkContext.getConf.registerKryoClasses(
+        Array(classOf[GenericData],
+          classOf[Schema]))
+
+      val shouldReconcileSchema = 
parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+
+      val latestTableSchemaOpt = getLatestTableSchema(spark, basePath, 
tableIdentifier, sparkContext.hadoopConfiguration)
+      // NOTE: We need to make sure that upon conversion of the schemas b/w 
Catalyst's [[StructType]] and
+      //       Avro's [[Schema]] we're preserving corresponding "record-name" 
and "record-namespace" that
+      //       play crucial role in establishing compatibility b/w schemas
+      val (avroRecordName, avroRecordNamespace) = latestTableSchemaOpt.map(s 
=> (s.getName, s.getNamespace))
+        .getOrElse(getAvroRecordNameAndNamespace(tblName))
+
+      val sourceSchema = convertStructTypeToAvroSchema(df.schema, 
avroRecordName, avroRecordNamespace)
+      val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, 
sparkContext).orElse {

Review Comment:
   this is essentially, coalesce (Internal schema (target, source)). 
   if target internal schema is available, we pick that, if not, 
   a. if reconcile schema and schema evol is enabled, we pick internal schema 
for source schema.
   b. else none. 
   
   while reviewing code within deduceWriterSchema, I got confused. I thought 
internalSchemaOpt could only represent target table's internal schema. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -347,6 +378,95 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  /**
+   * Deduces writer's schema based on
+   * <ul>
+   *   <li>Source's schema</li>
+   *   <li>Target table's schema (including Hudi's [[InternalSchema]] 
representation)</li>
+   * </ul>
+   */
+  def deduceWriterSchema(sourceSchema: Schema,
+                         latestTableSchemaOpt: Option[Schema],
+                         internalSchemaOpt: Option[InternalSchema],
+                         opts: Map[String, String]): Schema = {
+    val shouldReconcileSchema = 
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+    val shouldValidateSchemasCompatibility = 
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
+      HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
+
+    latestTableSchemaOpt match {
+      // In case table schema is empty we're just going to use the source 
schema as a
+      // writer's schema. No additional handling is required
+      case None => sourceSchema
+      // Otherwise, we need to make sure we reconcile incoming and latest 
table schemas
+      case Some(latestTableSchema) =>
+        // Before validating whether schemas are compatible, we need to 
"canonicalize" source's schema
+        // relative to the table's one, by doing a (minor) reconciliation of 
the nullability constraints:
+        // for ex, if in incoming schema column A is designated as non-null, 
but it's designated as nullable
+        // in the table's one we want to proceed aligning nullability 
constraints w/ the table's schema
+        val shouldCanonicalizeSchema = 
opts.getOrDefault(DataSourceWriteOptions.CANONICALIZE_SCHEMA.key,
+          
DataSourceWriteOptions.CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
+        val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) {
+          AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, 
latestTableSchema)
+        } else {
+          sourceSchema
+        }
+
+        if (shouldReconcileSchema) {
+          internalSchemaOpt match {
+            case Some(internalSchema) =>
+              // Apply schema evolution, by auto-merging write schema and read 
schema
+              val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, 
internalSchema)
+              AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getFullName)
+
+            case None =>
+              // In case schema reconciliation is enabled and source and 
latest table schemas
+              // are compatible (as defined by 
[[TableSchemaResolver#isSchemaCompatible]]), then we
+              // will rebase incoming batch onto the table's latest schema 
(ie, reconcile them)
+              //
+              // NOTE: Since we'll be converting incoming batch from 
[[sourceSchema]] into [[latestTableSchema]]
+              //       we're validating in that order (where [[sourceSchema]] 
is treated as a reader's schema,
+              //       and [[latestTableSchema]] is treated as a writer's 
schema)
+              //
+              // NOTE: In some cases we need to relax constraint of incoming 
dataset's schema to be compatible
+              //       w/ the table's one and allow schemas to diverge. This 
is required in cases where
+              //       partial updates will be performed (for ex, `MERGE INTO` 
Spark SQL statement) and as such
+              //       only incoming dataset's projection has to match the 
table's schema, and not the whole one
+              if (!shouldValidateSchemasCompatibility || 
TableSchemaResolver.isSchemaCompatible(canonicalizedSourceSchema, 
latestTableSchema)) {

Review Comment:
   I am not sure if the order of arg is right. 
   ```
     public static boolean isSchemaCompatible(Schema prevSchema, Schema 
newSchema) {
       // NOTE: We're establishing compatibility of the {@code prevSchema} and 
{@code newSchema}
       //       as following: {@code newSchema} is considered compatible to 
{@code prevSchema},
       //       iff data written using {@code prevSchema} could be read by 
{@code newSchema}
       AvroSchemaCompatibility.SchemaPairCompatibility result =
           AvroSchemaCompatibility.checkReaderWriterCompatibility(newSchema, 
prevSchema);
   ```
   
   prevSchema refers to writer schema with which data was written. 
   newSchema refers to reader schema w/ which we are trying to read. 
   so, prev should represent table schema and newSchema should refer to source 
schema. 
   
   can you help me understand. 
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -239,83 +289,65 @@ object HoodieSparkSqlWriter {
             client.startCommitWithTime(instantTime, commitActionType)
             val writeStatuses = 
DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, 
instantTime)
             (writeStatuses, client)
-          }
-          case _ => { // any other operation
-            // register classes & schemas
-            val (structName, nameSpace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
-            sparkContext.getConf.registerKryoClasses(
-              Array(classOf[org.apache.avro.generic.GenericData],
-                classOf[org.apache.avro.Schema]))
-
-            // TODO(HUDI-4472) revisit and simplify schema handling
-            val sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
-            val latestTableSchema = getLatestTableSchema(fs, basePath, 
sparkContext).getOrElse(sourceSchema)
-
-            val schemaEvolutionEnabled = 
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), 
"false").toBoolean
-            var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, 
sparkContext)
-
-            val writerSchema: Schema =
-              if (reconcileSchema) {
-                // In case we need to reconcile the schema and schema 
evolution is enabled,
-                // we will force-apply schema evolution to the writer's schema
-                if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
-                  internalSchemaOpt = 
Some(AvroInternalSchemaConverter.convert(sourceSchema))
-                }
 
-                if (internalSchemaOpt.isDefined) {
-                  // Apply schema evolution, by auto-merging write schema and 
read schema
-                  val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
-                  AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getName)
-                } else if 
(TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) {
-                  // In case schema reconciliation is enabled and source and 
latest table schemas
-                  // are compatible (as defined by 
[[TableSchemaResolver#isSchemaCompatible]], then we will
-                  // pick latest table's schema as the writer's schema
-                  latestTableSchema
-                } else {
-                  // Otherwise fallback to original source's schema
-                  sourceSchema
-                }
-              } else {
-                // In case reconciliation is disabled, we still have to do 
nullability attributes
-                // (minor) reconciliation, making sure schema of the incoming 
batch is in-line with
-                // the data already committed in the table
-                
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, 
latestTableSchema)
-              }
+          case _ =>
+            // Convert to RDD[HoodieRecord]
+            val avroRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, avroRecordName, avroRecordNamespace,
+              Some(writerSchema))
+
+            // Check whether partition columns should be persisted w/in the 
data-files, or should
+            // be instead omitted from them and simply encoded into the 
partition path (which is Spark's
+            // behavior by default)
+            // TODO move partition columns handling down into the handlers
+            val shouldDropPartitionColumns = 
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
+            val dataFileSchema = if (shouldDropPartitionColumns) {
+              val truncatedSchema = 
generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema)
+              // NOTE: We have to register this schema w/ Kryo to make sure 
it's able to apply an optimization
+              //       allowing it to avoid the need to ser/de the whole 
schema along _every_ Avro record
+              registerAvroSchemasWithKryo(sparkContext, truncatedSchema)
+              truncatedSchema
+            } else {
+              writerSchema
+            }
 
-            validateSchemaForHoodieIsDeleted(writerSchema)
-            sparkContext.getConf.registerAvroSchemas(writerSchema)
-            log.info(s"Registered avro schema : 
${writerSchema.toString(true)}")
+            // NOTE: Avro's [[Schema]] can't be effectively serialized by JVM 
native serialization framework
+            //       (due to containing cyclic refs), therefore we have to 
convert it to string before
+            //       passing onto the Executor
+            val dataFileSchemaStr = dataFileSchema.toString
 
-            // Convert to RDD[HoodieRecord]
-            val genericRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
-              org.apache.hudi.common.util.Option.of(writerSchema))
             val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
               operation.equals(WriteOperationType.UPSERT) ||
               
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
                 
HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
-            val hoodieAllIncomingRecords = genericRecords.map(gr => {
-              val processedRecord = getProcessedRecord(partitionColumns, gr, 
dropPartitionColumns)
-              val hoodieRecord = if (shouldCombine) {
-                val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, 
hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse(
-                  
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-                  
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean)
-                  .asInstanceOf[Comparable[_]]
-                DataSourceUtils.createHoodieRecord(processedRecord,
-                  orderingVal,
-                  keyGenerator.getKey(gr),
-                  hoodieConfig.getString(PAYLOAD_CLASS_NAME))
-              } else {
-                DataSourceUtils.createHoodieRecord(processedRecord, 
keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))
+
+            val hoodieRecords = avroRecords.mapPartitions(it => {
+              val dataFileSchema = new Schema.Parser().parse(dataFileSchemaStr)
+              it.map { avroRecord =>
+                val processedRecord = if (shouldDropPartitionColumns) {
+                  HoodieAvroUtils.rewriteRecord(avroRecord, dataFileSchema)
+                } else {
+                  avroRecord
+                }
+                val hoodieRecord = if (shouldCombine) {
+                  val orderingVal = 
HoodieAvroUtils.getNestedFieldVal(avroRecord, 
hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse(
+                    
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),

Review Comment:
   deducing the value for KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED, we 
can move it to L324 and do it once per mapPartition call



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -81,20 +79,7 @@
   public static IgnoreRecord IGNORE_RECORD = new IgnoreRecord();
 
   /**
-   * The specified schema of the table. ("specified" denotes that this is 
configured by the client,
-   * as opposed to being implicitly fetched out of the commit metadata)
-   */
-  protected final Schema tableSchema;
-  protected final Schema tableSchemaWithMetaFields;

Review Comment:
   and I also triaged the usages of WriterSchema(master) compared to 
tableSchema. its used only in bootstrap cases. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -347,6 +378,95 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  /**
+   * Deduces writer's schema based on
+   * <ul>
+   *   <li>Source's schema</li>
+   *   <li>Target table's schema (including Hudi's [[InternalSchema]] 
representation)</li>
+   * </ul>
+   */
+  def deduceWriterSchema(sourceSchema: Schema,
+                         latestTableSchemaOpt: Option[Schema],
+                         internalSchemaOpt: Option[InternalSchema],
+                         opts: Map[String, String]): Schema = {
+    val shouldReconcileSchema = 
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+    val shouldValidateSchemasCompatibility = 
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
+      HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
+
+    latestTableSchemaOpt match {
+      // In case table schema is empty we're just going to use the source 
schema as a
+      // writer's schema. No additional handling is required
+      case None => sourceSchema
+      // Otherwise, we need to make sure we reconcile incoming and latest 
table schemas
+      case Some(latestTableSchema) =>
+        // Before validating whether schemas are compatible, we need to 
"canonicalize" source's schema
+        // relative to the table's one, by doing a (minor) reconciliation of 
the nullability constraints:
+        // for ex, if in incoming schema column A is designated as non-null, 
but it's designated as nullable
+        // in the table's one we want to proceed aligning nullability 
constraints w/ the table's schema
+        val shouldCanonicalizeSchema = 
opts.getOrDefault(DataSourceWriteOptions.CANONICALIZE_SCHEMA.key,
+          
DataSourceWriteOptions.CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
+        val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) {
+          AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, 
latestTableSchema)
+        } else {
+          sourceSchema
+        }
+
+        if (shouldReconcileSchema) {
+          internalSchemaOpt match {
+            case Some(internalSchema) =>
+              // Apply schema evolution, by auto-merging write schema and read 
schema
+              val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, 
internalSchema)
+              AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getFullName)
+
+            case None =>
+              // In case schema reconciliation is enabled and source and 
latest table schemas
+              // are compatible (as defined by 
[[TableSchemaResolver#isSchemaCompatible]]), then we
+              // will rebase incoming batch onto the table's latest schema 
(ie, reconcile them)
+              //
+              // NOTE: Since we'll be converting incoming batch from 
[[sourceSchema]] into [[latestTableSchema]]
+              //       we're validating in that order (where [[sourceSchema]] 
is treated as a reader's schema,
+              //       and [[latestTableSchema]] is treated as a writer's 
schema)
+              //
+              // NOTE: In some cases we need to relax constraint of incoming 
dataset's schema to be compatible
+              //       w/ the table's one and allow schemas to diverge. This 
is required in cases where
+              //       partial updates will be performed (for ex, `MERGE INTO` 
Spark SQL statement) and as such
+              //       only incoming dataset's projection has to match the 
table's schema, and not the whole one
+              if (!shouldValidateSchemasCompatibility || 
TableSchemaResolver.isSchemaCompatible(canonicalizedSourceSchema, 
latestTableSchema)) {

Review Comment:
   ok, guess I get it now (that you were explaining yesterday) that order of 
scehma compatabilty will diff in these two cases L434, L455.
   
   but help me understand it better for following scenarios.
   reconcile true, no schema evol. 
   scenario1: 
   reconcile true. no schema evol. target table has 4 cols. and new incoming 
schema has subset of cols (3). whats the order of schema compatability we need 
to check? 
   
   scenario2: 
   reconcile true. no schema evol. target table has 4 cols. and new incoming 
schema has more cols (5). whats the order of schema compatability we need to 
check? 
   
   reconcile false, no schema evol. 
   scenario3: 
   reconcile false. no schema evol. target table has 4 cols. and new incoming 
schema has subset of cols (3). whats the order of schema compatability we need 
to check? 
   
   scenario4: 
   reconcile false. no schema evol. target table has 4 cols. and new incoming 
schema has more cols (5). whats the order of schema compatability we need to 
check? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -239,83 +289,65 @@ object HoodieSparkSqlWriter {
             client.startCommitWithTime(instantTime, commitActionType)
             val writeStatuses = 
DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, 
instantTime)
             (writeStatuses, client)
-          }
-          case _ => { // any other operation
-            // register classes & schemas
-            val (structName, nameSpace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
-            sparkContext.getConf.registerKryoClasses(
-              Array(classOf[org.apache.avro.generic.GenericData],
-                classOf[org.apache.avro.Schema]))
-
-            // TODO(HUDI-4472) revisit and simplify schema handling
-            val sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
-            val latestTableSchema = getLatestTableSchema(fs, basePath, 
sparkContext).getOrElse(sourceSchema)
-
-            val schemaEvolutionEnabled = 
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), 
"false").toBoolean
-            var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, 
sparkContext)
-
-            val writerSchema: Schema =
-              if (reconcileSchema) {
-                // In case we need to reconcile the schema and schema 
evolution is enabled,
-                // we will force-apply schema evolution to the writer's schema
-                if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
-                  internalSchemaOpt = 
Some(AvroInternalSchemaConverter.convert(sourceSchema))
-                }
 
-                if (internalSchemaOpt.isDefined) {
-                  // Apply schema evolution, by auto-merging write schema and 
read schema
-                  val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
-                  AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getName)
-                } else if 
(TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) {
-                  // In case schema reconciliation is enabled and source and 
latest table schemas
-                  // are compatible (as defined by 
[[TableSchemaResolver#isSchemaCompatible]], then we will
-                  // pick latest table's schema as the writer's schema
-                  latestTableSchema
-                } else {
-                  // Otherwise fallback to original source's schema
-                  sourceSchema
-                }
-              } else {
-                // In case reconciliation is disabled, we still have to do 
nullability attributes
-                // (minor) reconciliation, making sure schema of the incoming 
batch is in-line with
-                // the data already committed in the table
-                
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, 
latestTableSchema)
-              }
+          case _ =>
+            // Convert to RDD[HoodieRecord]
+            val avroRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, avroRecordName, avroRecordNamespace,
+              Some(writerSchema))
+
+            // Check whether partition columns should be persisted w/in the 
data-files, or should
+            // be instead omitted from them and simply encoded into the 
partition path (which is Spark's
+            // behavior by default)
+            // TODO move partition columns handling down into the handlers
+            val shouldDropPartitionColumns = 
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
+            val dataFileSchema = if (shouldDropPartitionColumns) {
+              val truncatedSchema = 
generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema)
+              // NOTE: We have to register this schema w/ Kryo to make sure 
it's able to apply an optimization
+              //       allowing it to avoid the need to ser/de the whole 
schema along _every_ Avro record
+              registerAvroSchemasWithKryo(sparkContext, truncatedSchema)
+              truncatedSchema
+            } else {
+              writerSchema
+            }
 
-            validateSchemaForHoodieIsDeleted(writerSchema)
-            sparkContext.getConf.registerAvroSchemas(writerSchema)
-            log.info(s"Registered avro schema : 
${writerSchema.toString(true)}")
+            // NOTE: Avro's [[Schema]] can't be effectively serialized by JVM 
native serialization framework
+            //       (due to containing cyclic refs), therefore we have to 
convert it to string before
+            //       passing onto the Executor
+            val dataFileSchemaStr = dataFileSchema.toString
 
-            // Convert to RDD[HoodieRecord]
-            val genericRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
-              org.apache.hudi.common.util.Option.of(writerSchema))
             val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
               operation.equals(WriteOperationType.UPSERT) ||
               
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
                 
HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
-            val hoodieAllIncomingRecords = genericRecords.map(gr => {
-              val processedRecord = getProcessedRecord(partitionColumns, gr, 
dropPartitionColumns)
-              val hoodieRecord = if (shouldCombine) {
-                val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, 
hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse(
-                  
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-                  
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean)
-                  .asInstanceOf[Comparable[_]]
-                DataSourceUtils.createHoodieRecord(processedRecord,
-                  orderingVal,
-                  keyGenerator.getKey(gr),
-                  hoodieConfig.getString(PAYLOAD_CLASS_NAME))
-              } else {
-                DataSourceUtils.createHoodieRecord(processedRecord, 
keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))
+
+            val hoodieRecords = avroRecords.mapPartitions(it => {

Review Comment:
   may I know why was the dag changed? is it to optimize parsing the schema for 
every map call ? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -347,6 +378,95 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  /**
+   * Deduces writer's schema based on
+   * <ul>
+   *   <li>Source's schema</li>
+   *   <li>Target table's schema (including Hudi's [[InternalSchema]] 
representation)</li>
+   * </ul>
+   */
+  def deduceWriterSchema(sourceSchema: Schema,
+                         latestTableSchemaOpt: Option[Schema],
+                         internalSchemaOpt: Option[InternalSchema],
+                         opts: Map[String, String]): Schema = {
+    val shouldReconcileSchema = 
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+    val shouldValidateSchemasCompatibility = 
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
+      HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
+
+    latestTableSchemaOpt match {
+      // In case table schema is empty we're just going to use the source 
schema as a
+      // writer's schema. No additional handling is required
+      case None => sourceSchema
+      // Otherwise, we need to make sure we reconcile incoming and latest 
table schemas
+      case Some(latestTableSchema) =>
+        // Before validating whether schemas are compatible, we need to 
"canonicalize" source's schema
+        // relative to the table's one, by doing a (minor) reconciliation of 
the nullability constraints:
+        // for ex, if in incoming schema column A is designated as non-null, 
but it's designated as nullable
+        // in the table's one we want to proceed aligning nullability 
constraints w/ the table's schema
+        val shouldCanonicalizeSchema = 
opts.getOrDefault(DataSourceWriteOptions.CANONICALIZE_SCHEMA.key,

Review Comment:
   following up on my previous comment, this makes sense. this is what I was 
expecting. i.e. canonicalize the source schema in all cases.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -172,23 +182,63 @@ object HoodieSparkSqlWriter {
       }
 
       val commitActionType = CommitUtils.getCommitActionType(operation, 
tableConfig.getTableType)
-      val dropPartitionColumns = 
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
 
-      // short-circuit if bulk_insert via row is enabled.
+      // Register Avro classes ([[Schema]], [[GenericData]]) w/ Kryo
+      sparkContext.getConf.registerKryoClasses(

Review Comment:
   Incase of DELETE_PARTITION, incoming df could be empty. something to be wary 
about. thats why we had individual calls to registerKryo for diff write 
operations. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -239,83 +289,65 @@ object HoodieSparkSqlWriter {
             client.startCommitWithTime(instantTime, commitActionType)
             val writeStatuses = 
DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, 
instantTime)
             (writeStatuses, client)
-          }
-          case _ => { // any other operation
-            // register classes & schemas
-            val (structName, nameSpace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
-            sparkContext.getConf.registerKryoClasses(
-              Array(classOf[org.apache.avro.generic.GenericData],
-                classOf[org.apache.avro.Schema]))
-
-            // TODO(HUDI-4472) revisit and simplify schema handling
-            val sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
-            val latestTableSchema = getLatestTableSchema(fs, basePath, 
sparkContext).getOrElse(sourceSchema)
-
-            val schemaEvolutionEnabled = 
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), 
"false").toBoolean
-            var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, 
sparkContext)
-
-            val writerSchema: Schema =
-              if (reconcileSchema) {
-                // In case we need to reconcile the schema and schema 
evolution is enabled,
-                // we will force-apply schema evolution to the writer's schema
-                if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
-                  internalSchemaOpt = 
Some(AvroInternalSchemaConverter.convert(sourceSchema))
-                }
 
-                if (internalSchemaOpt.isDefined) {
-                  // Apply schema evolution, by auto-merging write schema and 
read schema
-                  val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
-                  AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getName)
-                } else if 
(TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) {
-                  // In case schema reconciliation is enabled and source and 
latest table schemas
-                  // are compatible (as defined by 
[[TableSchemaResolver#isSchemaCompatible]], then we will
-                  // pick latest table's schema as the writer's schema
-                  latestTableSchema
-                } else {
-                  // Otherwise fallback to original source's schema
-                  sourceSchema
-                }
-              } else {
-                // In case reconciliation is disabled, we still have to do 
nullability attributes
-                // (minor) reconciliation, making sure schema of the incoming 
batch is in-line with
-                // the data already committed in the table
-                
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, 
latestTableSchema)
-              }
+          case _ =>
+            // Convert to RDD[HoodieRecord]
+            val avroRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, avroRecordName, avroRecordNamespace,
+              Some(writerSchema))
+
+            // Check whether partition columns should be persisted w/in the 
data-files, or should
+            // be instead omitted from them and simply encoded into the 
partition path (which is Spark's
+            // behavior by default)
+            // TODO move partition columns handling down into the handlers
+            val shouldDropPartitionColumns = 
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
+            val dataFileSchema = if (shouldDropPartitionColumns) {
+              val truncatedSchema = 
generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema)
+              // NOTE: We have to register this schema w/ Kryo to make sure 
it's able to apply an optimization
+              //       allowing it to avoid the need to ser/de the whole 
schema along _every_ Avro record
+              registerAvroSchemasWithKryo(sparkContext, truncatedSchema)
+              truncatedSchema
+            } else {
+              writerSchema
+            }
 
-            validateSchemaForHoodieIsDeleted(writerSchema)
-            sparkContext.getConf.registerAvroSchemas(writerSchema)
-            log.info(s"Registered avro schema : 
${writerSchema.toString(true)}")
+            // NOTE: Avro's [[Schema]] can't be effectively serialized by JVM 
native serialization framework
+            //       (due to containing cyclic refs), therefore we have to 
convert it to string before
+            //       passing onto the Executor
+            val dataFileSchemaStr = dataFileSchema.toString
 
-            // Convert to RDD[HoodieRecord]
-            val genericRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
-              org.apache.hudi.common.util.Option.of(writerSchema))
             val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
               operation.equals(WriteOperationType.UPSERT) ||
               
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
                 
HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
-            val hoodieAllIncomingRecords = genericRecords.map(gr => {
-              val processedRecord = getProcessedRecord(partitionColumns, gr, 
dropPartitionColumns)
-              val hoodieRecord = if (shouldCombine) {
-                val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, 
hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse(
-                  
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-                  
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean)
-                  .asInstanceOf[Comparable[_]]
-                DataSourceUtils.createHoodieRecord(processedRecord,
-                  orderingVal,
-                  keyGenerator.getKey(gr),
-                  hoodieConfig.getString(PAYLOAD_CLASS_NAME))
-              } else {
-                DataSourceUtils.createHoodieRecord(processedRecord, 
keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))
+
+            val hoodieRecords = avroRecords.mapPartitions(it => {

Review Comment:
   we are changing the dag here. from map -> mapPartitions. If I am not wrong, 
we will be using the spark partition count as the incoming df's partition count 
as we wouldn't have started applying any shuffle parallelism. 
   Is there a chance of OOMing here within executors after this change? have we 
thought through this. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -81,20 +79,7 @@
   public static IgnoreRecord IGNORE_RECORD = new IgnoreRecord();
 
   /**
-   * The specified schema of the table. ("specified" denotes that this is 
configured by the client,
-   * as opposed to being implicitly fetched out of the commit metadata)
-   */
-  protected final Schema tableSchema;
-  protected final Schema tableSchemaWithMetaFields;

Review Comment:
   I see in most cases we can use writerSchema (since internally, it falls back 
to using table schema anyways if not explicitly overriden). but help me 
understand one flow. 
   w/ MERGE INTO flow, when writer schema is explicitly set in 
HoodieWriteConfig, I would expect all writer handles (create, append, etc), to 
use the writerSchema. but I see the its the table Schema is used in these 
handles (as per master). 
   Do we know why? after this patch, is what making sense to me. but wondeirng 
if we are missing any case here. 
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -239,83 +289,65 @@ object HoodieSparkSqlWriter {
             client.startCommitWithTime(instantTime, commitActionType)
             val writeStatuses = 
DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, 
instantTime)
             (writeStatuses, client)
-          }
-          case _ => { // any other operation
-            // register classes & schemas
-            val (structName, nameSpace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
-            sparkContext.getConf.registerKryoClasses(
-              Array(classOf[org.apache.avro.generic.GenericData],
-                classOf[org.apache.avro.Schema]))
-
-            // TODO(HUDI-4472) revisit and simplify schema handling
-            val sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
-            val latestTableSchema = getLatestTableSchema(fs, basePath, 
sparkContext).getOrElse(sourceSchema)
-
-            val schemaEvolutionEnabled = 
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), 
"false").toBoolean
-            var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, 
sparkContext)
-
-            val writerSchema: Schema =
-              if (reconcileSchema) {
-                // In case we need to reconcile the schema and schema 
evolution is enabled,
-                // we will force-apply schema evolution to the writer's schema
-                if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
-                  internalSchemaOpt = 
Some(AvroInternalSchemaConverter.convert(sourceSchema))
-                }
 
-                if (internalSchemaOpt.isDefined) {
-                  // Apply schema evolution, by auto-merging write schema and 
read schema
-                  val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
-                  AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getName)
-                } else if 
(TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) {
-                  // In case schema reconciliation is enabled and source and 
latest table schemas
-                  // are compatible (as defined by 
[[TableSchemaResolver#isSchemaCompatible]], then we will
-                  // pick latest table's schema as the writer's schema
-                  latestTableSchema
-                } else {
-                  // Otherwise fallback to original source's schema
-                  sourceSchema
-                }
-              } else {
-                // In case reconciliation is disabled, we still have to do 
nullability attributes
-                // (minor) reconciliation, making sure schema of the incoming 
batch is in-line with
-                // the data already committed in the table
-                
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, 
latestTableSchema)
-              }
+          case _ =>

Review Comment:
   can we add comments that this case represents every other write operation. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -409,6 +409,14 @@ object DataSourceWriteOptions {
 
   val RECONCILE_SCHEMA: ConfigProperty[Boolean] = 
HoodieCommonConfig.RECONCILE_SCHEMA
 
+  val CANONICALIZE_SCHEMA: ConfigProperty[Boolean] = 
ConfigProperty.key("hoodie.datasource.write.schema.canonicalize")
+    .defaultValue(true)
+    .withDocumentation("Controls whether incoming batch's schema's nullability 
constraints should be canonicalized "

Review Comment:
   lets make this an internal config. I don't think we want to expose this to 
users. 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -567,36 +682,25 @@ object HoodieSparkSqlWriter {
   }
 
   def bulkInsertAsRow(sqlContext: SQLContext,
-                      parameters: Map[String, String],
+                      hoodieConfig: HoodieConfig,
                       df: DataFrame,
                       tblName: String,
                       basePath: Path,
                       path: String,
                       instantTime: String,
-                      partitionColumns: String): (Boolean, 
common.util.Option[String]) = {
-    val sparkContext = sqlContext.sparkContext
-    val populateMetaFields = 
java.lang.Boolean.parseBoolean(parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),
-      String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())))
-    val dropPartitionColumns = 
parameters.get(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key()).map(_.toBoolean)
-      .getOrElse(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue())
-    // register classes & schemas
-    val (structName, nameSpace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
-    sparkContext.getConf.registerKryoClasses(
-      Array(classOf[org.apache.avro.generic.GenericData],
-        classOf[org.apache.avro.Schema]))
-    var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, 
structName, nameSpace)
-    if (dropPartitionColumns) {
-      schema = generateSchemaWithoutPartitionColumns(partitionColumns, schema)
-    }
-    validateSchemaForHoodieIsDeleted(schema)
-    sparkContext.getConf.registerAvroSchemas(schema)
-    log.info(s"Registered avro schema : ${schema.toString(true)}")
-    if (parameters(INSERT_DROP_DUPS.key).toBoolean) {
+                      writerSchema: Schema): (Boolean, 
common.util.Option[String]) = {
+    if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
       throw new HoodieException("Dropping duplicates with bulk_insert in row 
writer path is not supported yet")
     }
-    val params: mutable.Map[String, String] = 
collection.mutable.Map(parameters.toSeq: _*)
-    params(HoodieWriteConfig.AVRO_SCHEMA_STRING.key) = schema.toString
-    val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, 
path, tblName, mapAsJavaMap(params))
+
+    val writerSchemaStr = writerSchema.toString
+
+    val opts = hoodieConfig.getProps.toMap ++
+      Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key -> writerSchemaStr)
+
+    val writeConfig = DataSourceUtils.createHoodieConfig(writerSchemaStr, 
path, tblName, mapAsJavaMap(opts))

Review Comment:
   we do have hoodieConfig passed in as an arg to this method w/ this patch. 
why not re-use the same ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to