the-other-tim-brown commented on code in PR #17573:
URL: https://github.com/apache/hudi/pull/17573#discussion_r2612034841


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala:
##########
@@ -86,10 +85,10 @@ case class AlterHoodieTableChangeColumnCommand(
     Seq.empty[Row]
   }
 
-  private def validateSchema(newSchema: Schema, metaClient: 
HoodieTableMetaClient): Unit = {
+  private def validateSchema(newSchema: HoodieSchema, metaClient: 
HoodieTableMetaClient): Unit = {
     val schemaUtil = new TableSchemaResolver(metaClient)
-    val tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchema(false))
-    if (!AvroSchemaUtils.isSchemaCompatible(tableSchema, newSchema)) {
+    val tableSchema = 
HoodieSchemaUtils.createHoodieWriteSchema(schemaUtil.getTableSchema(false).toString,
 false)

Review Comment:
   Can we avoid converting the schema to string here?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -211,8 +211,9 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
     exclusionFields.add("op")
     partitionSchema.fields.foreach(f => exclusionFields.add(f.name))
     val requestedSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields.filter(f => mandatoryFields.contains(f.name)))
-    val requestedAvroSchema = AvroSchemaUtils.pruneDataSchema(avroTableSchema, 
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema, 
sanitizedTableName), exclusionFields)
-    val dataAvroSchema = AvroSchemaUtils.pruneDataSchema(avroTableSchema, 
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, 
sanitizedTableName), exclusionFields)
+    //TODO add util for this in HoodieSchema
+    val requestedAvroSchema = 
AvroSchemaUtils.pruneDataSchema(hoodieTableSchema.getAvroSchema, 
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema, 
sanitizedTableName), exclusionFields)

Review Comment:
   HoodieSchemaUtils now has a `pruneDataSchema`



##########
hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/HDFSParquetImporterUtils.java:
##########
@@ -183,7 +183,7 @@ public JavaRDD<HoodieRecord<HoodieRecordPayload>> 
buildHoodieRecordsForImport(Ja
     job.getConfiguration().set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
     // To parallelize reading file status.
     job.getConfiguration().set(FileInputFormat.LIST_STATUS_NUM_THREADS, 
"1024");
-    AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(), (new 
Schema.Parser().parse(schemaStr)));
+    AvroReadSupport.setAvroReadSchema(jsc.hadoopConfiguration(), 
(HoodieSchema.parse(schemaStr).getAvroSchema()));

Review Comment:
   nitpick: outer `()` is unnecessary 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala:
##########
@@ -194,4 +198,19 @@ object HoodieSchemaConversionUtils {
       case _ => dataType
     }
   }
+
+  /**
+   * Creates a converter from GenericRecord to InternalRow using HoodieSchema.
+   * This is equivalent to 
AvroConversionUtils.createAvroToInternalRowConverter() but accepts HoodieSchema.
+   *
+   * @param requiredAvroSchema the HoodieSchema to use for deserialization
+   * @param requiredRowSchema the Spark StructType for the output InternalRow
+   * @return a function that converts GenericRecord to Option[InternalRow]
+   */
+  def createHoodieSchemaToInternalRowConverter(requiredAvroSchema: 
HoodieSchema, requiredRowSchema: StructType): GenericRecord => 
Option[InternalRow] = {

Review Comment:
   I think naming this something like 
`createGenericRecordToInternalRowConverter` is more fitting. We are not 
converting from the HoodieSchema to a row.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -795,29 +793,30 @@ object HoodieBaseRelation extends SparkAdapterSupport {
    * @param tableSchema schema to project (either of [[InternalSchema]] or 
Avro's [[Schema]])
    * @param requiredColumns required top-level columns to be projected
    */
-  def projectSchema(tableSchema: Either[Schema, InternalSchema], 
requiredColumns: Array[String]): (Schema, StructType, InternalSchema) = {
+  def projectSchema(tableSchema: Either[HoodieSchema, InternalSchema], 
requiredColumns: Array[String]): (HoodieSchema, StructType, InternalSchema) = {
     tableSchema match {
       case Right(internalSchema) =>
         checkState(!internalSchema.isEmptySchema)
         val prunedInternalSchema = 
InternalSchemaUtils.pruneInternalSchema(internalSchema, 
requiredColumns.toList.asJava)
-        val requiredAvroSchema = 
InternalSchemaConverter.convert(prunedInternalSchema, "schema").toAvroSchema
-        val requiredStructSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
+        val requiredSchema = 
InternalSchemaConverter.convert(prunedInternalSchema, "schema")
+        val requiredStructSchema = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(requiredSchema)
 
-        (requiredAvroSchema, requiredStructSchema, prunedInternalSchema)
+        (requiredSchema, requiredStructSchema, prunedInternalSchema)
 
-      case Left(avroSchema) =>
-        val fieldMap = avroSchema.getFields.asScala.map(f => f.name() -> 
f).toMap
+      case Left(hoodieSchema) =>
+        val fieldMap = hoodieSchema.getFields.asScala.map(f => f.name() -> 
f).toMap
         val requiredFields = requiredColumns.map { col =>
-          val f = fieldMap(col)
-          // We have to create a new [[Schema.Field]] since Avro schemas can't 
share field
-          // instances (and will throw "org.apache.avro.AvroRuntimeException: 
Field already used")
-          createNewSchemaField(f.name(), f.schema(), f.doc(), f.defaultVal(), 
f.order())

Review Comment:
   I think we'll still need the createNewField logic since the schema is backed 
by Avro



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -795,29 +793,30 @@ object HoodieBaseRelation extends SparkAdapterSupport {
    * @param tableSchema schema to project (either of [[InternalSchema]] or 
Avro's [[Schema]])
    * @param requiredColumns required top-level columns to be projected
    */
-  def projectSchema(tableSchema: Either[Schema, InternalSchema], 
requiredColumns: Array[String]): (Schema, StructType, InternalSchema) = {
+  def projectSchema(tableSchema: Either[HoodieSchema, InternalSchema], 
requiredColumns: Array[String]): (HoodieSchema, StructType, InternalSchema) = {
     tableSchema match {
       case Right(internalSchema) =>
         checkState(!internalSchema.isEmptySchema)
         val prunedInternalSchema = 
InternalSchemaUtils.pruneInternalSchema(internalSchema, 
requiredColumns.toList.asJava)
-        val requiredAvroSchema = 
InternalSchemaConverter.convert(prunedInternalSchema, "schema").toAvroSchema
-        val requiredStructSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
+        val requiredSchema = 
InternalSchemaConverter.convert(prunedInternalSchema, "schema")
+        val requiredStructSchema = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(requiredSchema)
 
-        (requiredAvroSchema, requiredStructSchema, prunedInternalSchema)
+        (requiredSchema, requiredStructSchema, prunedInternalSchema)
 
-      case Left(avroSchema) =>
-        val fieldMap = avroSchema.getFields.asScala.map(f => f.name() -> 
f).toMap
+      case Left(hoodieSchema) =>
+        val fieldMap = hoodieSchema.getFields.asScala.map(f => f.name() -> 
f).toMap
         val requiredFields = requiredColumns.map { col =>
-          val f = fieldMap(col)
-          // We have to create a new [[Schema.Field]] since Avro schemas can't 
share field
-          // instances (and will throw "org.apache.avro.AvroRuntimeException: 
Field already used")
-          createNewSchemaField(f.name(), f.schema(), f.doc(), f.defaultVal(), 
f.order())
+          fieldMap(col)
         }.toList
-        val requiredAvroSchema = Schema.createRecord(avroSchema.getName, 
avroSchema.getDoc,
-          avroSchema.getNamespace, avroSchema.isError, requiredFields.asJava)
-        val requiredStructSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
 
-        (requiredAvroSchema, requiredStructSchema, 
InternalSchema.getEmptyInternalSchema)
+        val fieldsJava = new java.util.ArrayList[HoodieSchemaField]()

Review Comment:
   Can we just use `asJava`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to