alexeykudinkin commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1024701207


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -81,20 +79,7 @@
   public static IgnoreRecord IGNORE_RECORD = new IgnoreRecord();
 
   /**
-   * The specified schema of the table. ("specified" denotes that this is 
configured by the client,
-   * as opposed to being implicitly fetched out of the commit metadata)
-   */
-  protected final Schema tableSchema;
-  protected final Schema tableSchemaWithMetaFields;

Review Comment:
   > and I also triaged the usages of WriterSchema(master) compared to 
tableSchema. its used only in bootstrap cases.
   
   It was also inverted in case of MERGE INTO (actually this whole dichotomy 
was introduced to support it)
   
   > I would expect all writer handles (create, append, etc), to use the 
writerSchema. but I see the its the table Schema is used in these handles (as 
per master).
   Do we know why? after this patch, is what making sense to me. but wondeirng 
if we are missing any case here.
   
   That's exactly the reason it is being addressed in this PR -- it was 
inconsistent, confusing and in some cases incorrect.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -172,23 +182,63 @@ object HoodieSparkSqlWriter {
       }
 
       val commitActionType = CommitUtils.getCommitActionType(operation, 
tableConfig.getTableType)
-      val dropPartitionColumns = 
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
 
-      // short-circuit if bulk_insert via row is enabled.
+      // Register Avro classes ([[Schema]], [[GenericData]]) w/ Kryo
+      sparkContext.getConf.registerKryoClasses(

Review Comment:
   This isn't going to affect this actually



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -239,83 +289,65 @@ object HoodieSparkSqlWriter {
             client.startCommitWithTime(instantTime, commitActionType)
             val writeStatuses = 
DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, 
instantTime)
             (writeStatuses, client)
-          }
-          case _ => { // any other operation
-            // register classes & schemas
-            val (structName, nameSpace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
-            sparkContext.getConf.registerKryoClasses(
-              Array(classOf[org.apache.avro.generic.GenericData],
-                classOf[org.apache.avro.Schema]))
-
-            // TODO(HUDI-4472) revisit and simplify schema handling
-            val sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
-            val latestTableSchema = getLatestTableSchema(fs, basePath, 
sparkContext).getOrElse(sourceSchema)
-
-            val schemaEvolutionEnabled = 
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), 
"false").toBoolean
-            var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, 
sparkContext)
-
-            val writerSchema: Schema =
-              if (reconcileSchema) {
-                // In case we need to reconcile the schema and schema 
evolution is enabled,
-                // we will force-apply schema evolution to the writer's schema
-                if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
-                  internalSchemaOpt = 
Some(AvroInternalSchemaConverter.convert(sourceSchema))
-                }
 
-                if (internalSchemaOpt.isDefined) {
-                  // Apply schema evolution, by auto-merging write schema and 
read schema
-                  val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
-                  AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getName)
-                } else if 
(TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) {
-                  // In case schema reconciliation is enabled and source and 
latest table schemas
-                  // are compatible (as defined by 
[[TableSchemaResolver#isSchemaCompatible]], then we will
-                  // pick latest table's schema as the writer's schema
-                  latestTableSchema
-                } else {
-                  // Otherwise fallback to original source's schema
-                  sourceSchema
-                }
-              } else {
-                // In case reconciliation is disabled, we still have to do 
nullability attributes
-                // (minor) reconciliation, making sure schema of the incoming 
batch is in-line with
-                // the data already committed in the table
-                
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, 
latestTableSchema)
-              }
+          case _ =>
+            // Convert to RDD[HoodieRecord]
+            val avroRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, avroRecordName, avroRecordNamespace,
+              Some(writerSchema))
+
+            // Check whether partition columns should be persisted w/in the 
data-files, or should
+            // be instead omitted from them and simply encoded into the 
partition path (which is Spark's
+            // behavior by default)
+            // TODO move partition columns handling down into the handlers
+            val shouldDropPartitionColumns = 
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
+            val dataFileSchema = if (shouldDropPartitionColumns) {
+              val truncatedSchema = 
generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema)
+              // NOTE: We have to register this schema w/ Kryo to make sure 
it's able to apply an optimization
+              //       allowing it to avoid the need to ser/de the whole 
schema along _every_ Avro record
+              registerAvroSchemasWithKryo(sparkContext, truncatedSchema)
+              truncatedSchema
+            } else {
+              writerSchema
+            }
 
-            validateSchemaForHoodieIsDeleted(writerSchema)
-            sparkContext.getConf.registerAvroSchemas(writerSchema)
-            log.info(s"Registered avro schema : 
${writerSchema.toString(true)}")
+            // NOTE: Avro's [[Schema]] can't be effectively serialized by JVM 
native serialization framework
+            //       (due to containing cyclic refs), therefore we have to 
convert it to string before
+            //       passing onto the Executor
+            val dataFileSchemaStr = dataFileSchema.toString
 
-            // Convert to RDD[HoodieRecord]
-            val genericRecords: RDD[GenericRecord] = 
HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema,
-              org.apache.hudi.common.util.Option.of(writerSchema))
             val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean ||
               operation.equals(WriteOperationType.UPSERT) ||
               
parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(),
                 
HoodieWriteConfig.COMBINE_BEFORE_INSERT.defaultValue()).toBoolean
-            val hoodieAllIncomingRecords = genericRecords.map(gr => {
-              val processedRecord = getProcessedRecord(partitionColumns, gr, 
dropPartitionColumns)
-              val hoodieRecord = if (shouldCombine) {
-                val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, 
hoodieConfig.getString(PRECOMBINE_FIELD), false, parameters.getOrElse(
-                  
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-                  
DataSourceWriteOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()).toBoolean)
-                  .asInstanceOf[Comparable[_]]
-                DataSourceUtils.createHoodieRecord(processedRecord,
-                  orderingVal,
-                  keyGenerator.getKey(gr),
-                  hoodieConfig.getString(PAYLOAD_CLASS_NAME))
-              } else {
-                DataSourceUtils.createHoodieRecord(processedRecord, 
keyGenerator.getKey(gr), hoodieConfig.getString(PAYLOAD_CLASS_NAME))
+
+            val hoodieRecords = avroRecords.mapPartitions(it => {

Review Comment:
   This is not changing the DAG: there's no diff b/w `map` and `mapPartitions` 
other than the scopes of corresponding lambdas used for them (`mapPartitions` 
allows to have some executions performed just once per partition, such as Avro 
schema parsing for ex)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -567,36 +682,25 @@ object HoodieSparkSqlWriter {
   }
 
   def bulkInsertAsRow(sqlContext: SQLContext,
-                      parameters: Map[String, String],
+                      hoodieConfig: HoodieConfig,
                       df: DataFrame,
                       tblName: String,
                       basePath: Path,
                       path: String,
                       instantTime: String,
-                      partitionColumns: String): (Boolean, 
common.util.Option[String]) = {
-    val sparkContext = sqlContext.sparkContext
-    val populateMetaFields = 
java.lang.Boolean.parseBoolean(parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),
-      String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())))
-    val dropPartitionColumns = 
parameters.get(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key()).map(_.toBoolean)
-      .getOrElse(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue())
-    // register classes & schemas
-    val (structName, nameSpace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
-    sparkContext.getConf.registerKryoClasses(
-      Array(classOf[org.apache.avro.generic.GenericData],
-        classOf[org.apache.avro.Schema]))
-    var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, 
structName, nameSpace)
-    if (dropPartitionColumns) {
-      schema = generateSchemaWithoutPartitionColumns(partitionColumns, schema)
-    }
-    validateSchemaForHoodieIsDeleted(schema)
-    sparkContext.getConf.registerAvroSchemas(schema)
-    log.info(s"Registered avro schema : ${schema.toString(true)}")
-    if (parameters(INSERT_DROP_DUPS.key).toBoolean) {
+                      writerSchema: Schema): (Boolean, 
common.util.Option[String]) = {
+    if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
       throw new HoodieException("Dropping duplicates with bulk_insert in row 
writer path is not supported yet")
     }
-    val params: mutable.Map[String, String] = 
collection.mutable.Map(parameters.toSeq: _*)
-    params(HoodieWriteConfig.AVRO_SCHEMA_STRING.key) = schema.toString
-    val writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, 
path, tblName, mapAsJavaMap(params))
+
+    val writerSchemaStr = writerSchema.toString
+
+    val opts = hoodieConfig.getProps.toMap ++
+      Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key -> writerSchemaStr)
+
+    val writeConfig = DataSourceUtils.createHoodieConfig(writerSchemaStr, 
path, tblName, mapAsJavaMap(opts))

Review Comment:
   We need `HoodieWriteConfig` (we actually include whole `hoodieConfig` into 
it, L698)



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -347,6 +378,95 @@ object HoodieSparkSqlWriter {
     }
   }
 
+  /**
+   * Deduces writer's schema based on
+   * <ul>
+   *   <li>Source's schema</li>
+   *   <li>Target table's schema (including Hudi's [[InternalSchema]] 
representation)</li>
+   * </ul>
+   */
+  def deduceWriterSchema(sourceSchema: Schema,
+                         latestTableSchemaOpt: Option[Schema],
+                         internalSchemaOpt: Option[InternalSchema],
+                         opts: Map[String, String]): Schema = {
+    val shouldReconcileSchema = 
opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+    val shouldValidateSchemasCompatibility = 
opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key,
+      HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean
+
+    latestTableSchemaOpt match {
+      // In case table schema is empty we're just going to use the source 
schema as a
+      // writer's schema. No additional handling is required
+      case None => sourceSchema
+      // Otherwise, we need to make sure we reconcile incoming and latest 
table schemas
+      case Some(latestTableSchema) =>
+        // Before validating whether schemas are compatible, we need to 
"canonicalize" source's schema
+        // relative to the table's one, by doing a (minor) reconciliation of 
the nullability constraints:
+        // for ex, if in incoming schema column A is designated as non-null, 
but it's designated as nullable
+        // in the table's one we want to proceed aligning nullability 
constraints w/ the table's schema
+        val shouldCanonicalizeSchema = 
opts.getOrDefault(DataSourceWriteOptions.CANONICALIZE_SCHEMA.key,
+          
DataSourceWriteOptions.CANONICALIZE_SCHEMA.defaultValue.toString).toBoolean
+        val canonicalizedSourceSchema = if (shouldCanonicalizeSchema) {
+          AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, 
latestTableSchema)
+        } else {
+          sourceSchema
+        }
+
+        if (shouldReconcileSchema) {
+          internalSchemaOpt match {
+            case Some(internalSchema) =>
+              // Apply schema evolution, by auto-merging write schema and read 
schema
+              val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, 
internalSchema)
+              AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getFullName)
+
+            case None =>
+              // In case schema reconciliation is enabled and source and 
latest table schemas
+              // are compatible (as defined by 
[[TableSchemaResolver#isSchemaCompatible]]), then we
+              // will rebase incoming batch onto the table's latest schema 
(ie, reconcile them)
+              //
+              // NOTE: Since we'll be converting incoming batch from 
[[sourceSchema]] into [[latestTableSchema]]
+              //       we're validating in that order (where [[sourceSchema]] 
is treated as a reader's schema,
+              //       and [[latestTableSchema]] is treated as a writer's 
schema)
+              //
+              // NOTE: In some cases we need to relax constraint of incoming 
dataset's schema to be compatible
+              //       w/ the table's one and allow schemas to diverge. This 
is required in cases where
+              //       partial updates will be performed (for ex, `MERGE INTO` 
Spark SQL statement) and as such
+              //       only incoming dataset's projection has to match the 
table's schema, and not the whole one
+              if (!shouldValidateSchemasCompatibility || 
TableSchemaResolver.isSchemaCompatible(canonicalizedSourceSchema, 
latestTableSchema)) {

Review Comment:
   This primarily depends on whether reconciliation is enabled or not:
   
    - if **enabled**: we will rewrite incoming batch into table's schema (if we 
can; ie if target has 4 cols, incoming has 3, we will add missing one filled w/ 
nulls) 
   
    - if **disabled**: we will rewrite table* into incoming batch's schema (ie 
the other way around; note that we'd only rewrite the files that will have 
records updated in them) 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to