the-other-tim-brown commented on code in PR #14374:
URL: https://github.com/apache/hudi/pull/14374#discussion_r2578352901


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -108,20 +108,20 @@ object HoodieSchemaUtils {
    *   <li>Target table's schema (including Hudi's [[InternalSchema]] 
representation)</li>
    * </ul>
    */
-  def deduceWriterSchema(sourceSchema: Schema,
-                         latestTableSchemaOpt: Option[Schema],
+  def deduceWriterSchema(sourceSchema: HoodieSchema,
+                         latestTableSchemaOpt: Option[HoodieSchema],
                          internalSchemaOpt: Option[InternalSchema],
-                         opts: Map[String, String]): Schema = {
+                         opts: Map[String, String]): HoodieSchema = {
     latestTableSchemaOpt match {
       // If table schema is empty, then we use the source schema as a writer's 
schema.
-      case None => 
InternalSchemaConverter.fixNullOrdering(HoodieSchema.fromAvroSchema(sourceSchema)).toAvroSchema
+      case None => InternalSchemaConverter.fixNullOrdering(sourceSchema)
       // Otherwise, we need to make sure we reconcile incoming and latest 
table schemas
       case Some(latestTableSchemaWithMetaFields) =>
         // NOTE: Meta-fields will be unconditionally injected by Hudi writing 
handles, for the sake of deducing proper writer schema
         //       we're stripping them to make sure we can perform proper 
analysis
         // add call to fix null ordering to ensure backwards compatibility
         val latestTableSchema = 
InternalSchemaConverter.fixNullOrdering(HoodieSchema.fromAvroSchema(
-          removeMetadataFields(latestTableSchemaWithMetaFields))).toAvroSchema
+          
removeMetadataFields(latestTableSchemaWithMetaFields.getAvroSchema))).toAvroSchema

Review Comment:
   ```suggestion
           val latestTableSchema = InternalSchemaConverter.fixNullOrdering(
             
org.apache.hudi.common.schema.HoodieSchemaUtils.removeMetadataFields(latestTableSchemaWithMetaFields))
   
   ```



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -169,18 +169,18 @@ object HoodieSchemaUtils {
     if (!mergeIntoWrites && !shouldValidateSchemasCompatibility && 
!allowAutoEvolutionColumnDrop) {
       // Default behaviour
       val reconciledSchema = if (setNullForMissingColumns) {
-        AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, 
latestTableSchema, setNullForMissingColumns)
+        
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema.toAvroSchema(),
 latestTableSchema.toAvroSchema(), setNullForMissingColumns)
       } else {
-        canonicalizedSourceSchema
+        canonicalizedSourceSchema.toAvroSchema()
       }
-      checkValidEvolution(reconciledSchema, latestTableSchema)
-      reconciledSchema
+      checkValidEvolution(reconciledSchema, latestTableSchema.toAvroSchema())

Review Comment:
   Use `HoodieSchemaCompatibility.checkValidEvolution` here



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -146,7 +146,7 @@ object HoodieSparkSqlWriter {
     Metrics.shutdownAllMetrics()
   }
 
-  def getBulkInsertRowConfig(writerSchema: 
org.apache.hudi.common.util.Option[Schema], hoodieConfig: HoodieConfig,
+  def getBulkInsertRowConfig(writerSchema: 
org.apache.hudi.common.util.Option[HoodieSchema ], hoodieConfig: HoodieConfig,

Review Comment:
   nit: trailing space after `HoodieSchema`. Please fix this throughout the file



##########
hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/TestHoodieSchemaUtils.java:
##########
@@ -325,8 +326,19 @@ private static Schema deduceWriterSchema(Schema 
incomingSchema, Schema latestTab
 
   private static Schema deduceWriterSchema(Schema incomingSchema, Schema 
latestTableSchema, Boolean addNull) {
     
TYPED_PROPERTIES.setProperty(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key(),
 addNull.toString());
-    return HoodieSchemaUtils.deduceWriterSchema(incomingSchema, 
Option.ofNullable(latestTableSchema),
-        Option.empty(), TYPED_PROPERTIES);
+
+    // Convert latestTableSchema to Option<HoodieSchema>
+    Option<HoodieSchema> latestTableSchemaOpt = latestTableSchema != null
+        ? Option.of(HoodieSchema.fromAvroSchema(latestTableSchema))
+        : Option.empty();

Review Comment:
   ```suggestion
       Option<HoodieSchema> latestTableSchemaOpt = 
Option.ofNullable(latestTableSchema).map(HoodieSchema::fromAvroSchema);
   ```



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -230,16 +232,27 @@ object HoodieSchemaUtils {
     }
   }
 
-  def deduceWriterSchema(sourceSchema: Schema,
-                         latestTableSchemaOpt: 
org.apache.hudi.common.util.Option[Schema],
+  def deduceWriterSchema(sourceSchema: HoodieSchema,
+                         latestTableSchemaOpt: 
org.apache.hudi.common.util.Option[HoodieSchema],
                          internalSchemaOpt: 
org.apache.hudi.common.util.Option[InternalSchema],
-                         props: TypedProperties): Schema = {
+                         props: TypedProperties): HoodieSchema = {
     deduceWriterSchema(sourceSchema,
       HoodieConversionUtils.toScalaOption(latestTableSchemaOpt),
       HoodieConversionUtils.toScalaOption(internalSchemaOpt),
       HoodieConversionUtils.fromProperties(props))
   }
 
+  /**
+   * Converts an Option of Avro Schema to an Option of HoodieSchema.
+   * This is a convenience method for Java callers working with Hudi's Option 
type.
+   *
+   * @param avroSchemaOpt Optional Avro Schema (Hudi's Option type)
+   * @return Optional HoodieSchema (Hudi's Option type)
+   */
+  def toHoodieSchemaOption(avroSchemaOpt: 
org.apache.hudi.common.util.Option[Schema]): 
org.apache.hudi.common.util.Option[HoodieSchema] = {

Review Comment:
   This looks like it is only used once, can it be inlined instead?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -191,33 +191,35 @@ object HoodieSchemaUtils {
    * Deducing with enabled reconciliation.
    * Marked as Deprecated.
    */
-  private def deduceWriterSchemaWithReconcile(sourceSchema: Schema,
-                                              canonicalizedSourceSchema: 
Schema,
+  private def deduceWriterSchemaWithReconcile(sourceSchema: HoodieSchema,
+                                              canonicalizedSourceSchema: 
HoodieSchema,
                                               latestTableSchema: Schema,

Review Comment:
   Can we change latestTableSchema to be a HoodieSchema now?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -169,18 +169,18 @@ object HoodieSchemaUtils {
     if (!mergeIntoWrites && !shouldValidateSchemasCompatibility && 
!allowAutoEvolutionColumnDrop) {
       // Default behaviour
       val reconciledSchema = if (setNullForMissingColumns) {
-        AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema, 
latestTableSchema, setNullForMissingColumns)
+        
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema.toAvroSchema(),
 latestTableSchema.toAvroSchema(), setNullForMissingColumns)
       } else {
-        canonicalizedSourceSchema
+        canonicalizedSourceSchema.toAvroSchema()
       }
-      checkValidEvolution(reconciledSchema, latestTableSchema)
-      reconciledSchema
+      checkValidEvolution(reconciledSchema, latestTableSchema.toAvroSchema())
+      HoodieSchema.fromAvroSchema(reconciledSchema)
     } else {
       // If it's merge into writes, we don't check for projection nor schema 
compatibility. Writers down the line will take care of it.
       // Or it's not merge into writes, and we don't validate schema, but we 
allow to drop columns automatically.
       // Or it's not merge into writes, we validate schema, and schema is 
compatible.
       if (shouldValidateSchemasCompatibility) {
-        checkSchemaCompatible(latestTableSchema, canonicalizedSourceSchema, 
true,
+        checkSchemaCompatible(latestTableSchema.toAvroSchema(), 
canonicalizedSourceSchema.toAvroSchema(), true,

Review Comment:
   Similarly use `HoodieSchemaCompatibility.checkSchemaCompatible`



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -365,16 +365,25 @@ class HoodieSparkSqlWriterInternal {
       // NOTE: We need to make sure that upon conversion of the schemas b/w 
Catalyst's [[StructType]] and
       //       Avro's [[Schema]] we're preserving corresponding "record-name" 
and "record-namespace" that
       //       play crucial role in establishing compatibility b/w schemas
-      val (avroRecordName, avroRecordNamespace) = latestTableSchemaOpt.map(s 
=> (s.getName, s.getNamespace))
+      val (avroRecordName, avroRecordNamespace) = latestTableSchemaOpt.map(s =>
+        (s.getName, toScalaOption(s.getNamespace).orNull))
         .getOrElse(getAvroRecordNameAndNamespace(tblName))
 
-      val sourceSchema = convertStructTypeToAvroSchema(df.schema, 
avroRecordName, avroRecordNamespace)
+      val sourceSchema = HoodieSchema.fromAvroSchema(
+        convertStructTypeToAvroSchema(df.schema, avroRecordName, 
avroRecordNamespace)
+      )
       val internalSchemaOpt = 
HoodieSchemaUtils.getLatestTableInternalSchema(hoodieConfig, 
tableMetaClient).orElse {
         // In case we need to reconcile the schema and schema evolution is 
enabled,
         // we will force-apply schema evolution to the writer's schema
         if (shouldReconcileSchema && 
hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED))
 {
           val allowOperationMetaDataField = 
parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), 
"false").toBoolean
-          
Some(InternalSchemaConverter.convert(HoodieSchema.fromAvroSchema(HoodieAvroUtils.addMetadataFields(latestTableSchemaOpt.getOrElse(sourceSchema),
 allowOperationMetaDataField))))
+          //TODO need to rename the spark schema class to something else to 
not have this be so confusing

Review Comment:
   Is this in reference to the Utils class?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -270,14 +285,14 @@ object HoodieSchemaUtils {
     // NOTE: By default Hudi doesn't allow automatic schema evolution to drop 
the columns from the target
     //       table. However, when schema reconciliation is turned on, we would 
allow columns to be dropped
     //       in the incoming batch (as these would be reconciled in anyway)
-    if (isCompatibleProjectionOf(tableSchema, newSchema)) {
+    if (isCompatibleProjectionOf(tableSchema.toAvroSchema(), 
newSchema.toAvroSchema())) {

Review Comment:
   Use `HoodieSchemaCompatibility` here as well



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -479,7 +493,7 @@ class HoodieSparkSqlWriterInternal {
 
             // Remove meta columns from writerSchema if isPrepped is true.
             val processedDataSchema = if (preppedSparkSqlWrites || 
preppedSparkSqlMergeInto || preppedWriteOperation) {
-              HoodieAvroUtils.removeMetadataFields(dataFileSchema)
+              
HoodieSchema.fromAvroSchema(HoodieAvroUtils.removeMetadataFields(dataFileSchema.toAvroSchema()))

Review Comment:
   Can you use the HoodieSchemaUtils directly here?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -640,12 +654,14 @@ class HoodieSparkSqlWriterInternal {
       .toSeq
   }
 
-  def generateSchemaWithoutPartitionColumns(partitionParam: String, schema: 
Schema): Schema = {
+  def generateSchemaWithoutPartitionColumns(partitionParam: String, schema: 
HoodieSchema): HoodieSchema = {
     val partitionColumns = getPartitionColumns(partitionParam)
-    HoodieAvroUtils.removeFields(schema, partitionColumns.toSet.asJava)
+    HoodieSchema.fromAvroSchema(
+      HoodieAvroUtils.removeFields(schema.toAvroSchema(), 
partitionColumns.toSet.asJava)

Review Comment:
   Let's add a method for removing fields instead of converting to/from avro



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to