the-other-tim-brown commented on code in PR #17573:
URL: https://github.com/apache/hudi/pull/17573#discussion_r2620319288


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala:
##########
@@ -53,15 +53,15 @@ trait SparkFileFormatInternalRecordContext extends 
BaseSparkInternalRecordContex
     val schema = avroRecord.getSchema
     val structType = HoodieInternalRowUtils.getCachedSchema(schema)
     val deserializer = deserializerMap.getOrElseUpdate(schema, {
-      sparkAdapter.createAvroDeserializer(schema, structType)
+      sparkAdapter.createAvroDeserializer(HoodieSchema.fromAvroSchema(schema), 
structType)
     })
     deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow]
   }
 
   override def convertToAvroRecord(record: InternalRow, schema: HoodieSchema): 
GenericRecord = {
     val structType = 
HoodieInternalRowUtils.getCachedSchema(schema.toAvroSchema)
     val serializer = serializerMap.getOrElseUpdate(schema.toAvroSchema, {
-      sparkAdapter.createAvroSerializer(structType, schema.toAvroSchema, 
isNullable(schema.toAvroSchema))
+      sparkAdapter.createAvroSerializer(structType, schema, 
isNullable(schema.toAvroSchema))

Review Comment:
   Can we simplify the `isNullable` to just use the `HoodieSchema`'s method 
directly?



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java:
##########
@@ -74,7 +74,7 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieMergeHandleWithSparkMerger extends 
SparkClientFunctionalTestHarness {
-  private static final Schema SCHEMA = getAvroSchema("AvroSchema", 
"AvroSchemaNS");
+  private static final HoodieSchema SCHEMA = getSchema("AvroSchema", 
"AvroSchemaNS");

Review Comment:
   nitpick: remove `Avro` from the name and namespace



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieRelations.scala:
##########
@@ -40,13 +39,13 @@ class TestHoodieRelations {
       
"{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},"
 +
       
"{\"name\":\"partition\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
 
-    val tableAvroSchema = new Schema.Parser().parse(avroSchemaString)
-    val tableStructSchema = convertAvroSchemaToStructType(tableAvroSchema)
+    val tableAvroSchema = HoodieSchema.parse(avroSchemaString)

Review Comment:
   Update variable names to avoid using `avro` to avoid confusion



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java:
##########
@@ -144,11 +144,11 @@ public static List<HoodieKey> getKeys(List<HoodieRecord> 
records) {
     return records.stream().map(r -> r.getKey()).collect(Collectors.toList());
   }
 
-  private static Schema getAvroSchema(String schemaName, String 
schemaNameSpace) {
-    return 
AvroConversionUtils.convertStructTypeToAvroSchema(SparkDatasetTestUtils.STRUCT_TYPE,
 schemaName, schemaNameSpace);
+  private static HoodieSchema getSchema(String schemaName, String 
schemaNameSpace) {
+    return 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(SparkDatasetTestUtils.STRUCT_TYPE,
 schemaName, schemaNameSpace);
   }
 
-  public HoodieWriteConfig getWriteConfig(Schema avroSchema, String 
recordMergerImplClass, String mergeStrategyId, RecordMergeMode recordMergeMode) 
{
+  public HoodieWriteConfig getWriteConfig(HoodieSchema hoodieSchema, String 
recordMergerImplClass, String mergeStrategyId, RecordMergeMode recordMergeMode) 
{

Review Comment:
   For consistency, let's make the `hoodieSchema` simply `schema` when possible 
in all the classes.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -841,13 +840,13 @@ object HoodieBaseRelation extends SparkAdapterSupport {
       val requiredRowSchema = requiredDataSchema.structTypeSchema
       // NOTE: Schema has to be parsed at this point, since Avro's [[Schema]] 
aren't serializable
       //       to be passed from driver to executor
-      val requiredAvroSchema = new 
Schema.Parser().parse(requiredDataSchema.avroSchemaStr)
-      val avroToRowConverter = 
AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema, 
requiredRowSchema)
+      val requiredSchema = HoodieSchema.parse(requiredDataSchema.avroSchemaStr)
+      //TODO to fill impl for this

Review Comment:
   Can you add some more details on the remaining work for this implementation? 
Should it be a separate ticket?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -94,13 +95,13 @@ trait SparkAdapter extends Serializable {
    * Creates instance of [[HoodieAvroSerializer]] providing for ability to 
serialize
    * Spark's [[InternalRow]] into Avro payloads
    */
-  def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, 
nullable: Boolean): HoodieAvroSerializer
+  def createAvroSerializer(rootCatalystType: DataType, rootHoodieType: 
HoodieSchema, nullable: Boolean): HoodieAvroSerializer

Review Comment:
   Instead of `rootHoodieType`, would it make more sense to have this as 
`rootTableType` or simply `rootType`?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala:
##########
@@ -122,10 +122,11 @@ class TestTableSchemaResolverWithSparkSQL extends 
HoodieSparkWriterTestBase {
     metaClient.reloadActiveTimeline()
     var tableSchemaResolverParsingException: Exception = null
     try {
-      val schemaFromData = new 
TableSchemaResolver(metaClient).getTableAvroSchemaFromDataFile
-      val structFromData = 
AvroConversionUtils.convertAvroSchemaToStructType(HoodieAvroUtils.removeMetadataFields(schemaFromData))
-      val schemeDesign = new Schema.Parser().parse(schemaString)
-      val structDesign = 
AvroConversionUtils.convertAvroSchemaToStructType(schemeDesign)
+      val schemaFromData = HoodieSchema.fromAvroSchema(
+        new TableSchemaResolver(metaClient).getTableAvroSchemaFromDataFile)
+      val structFromData = 
AvroConversionUtils.convertAvroSchemaToStructType(HoodieAvroUtils.removeMetadataFields(schemaFromData.toAvroSchema()))

Review Comment:
   Can we use `HoodieSchemaConversionUtils.convertHoodieSchemaToStructType` 
here?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala:
##########
@@ -51,8 +52,8 @@ class TestAvroSerDe extends SparkAdapterSupport {
     val avroSchema = HoodieMetadataColumnStats.SCHEMA$

Review Comment:
   Let's update this to be a `HoodieSchema` instead of converting it twice below



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala:
##########
@@ -502,19 +502,19 @@ class TestSparkSqlWithCustomKeyGenerator extends 
HoodieSparkSqlTestBase {
       testInserts(tableName, tsGenFunc, customPartitionFunc)
 
       // Validate ts field is still of type int in the table
-      validateTsFieldSchema(tablePath, "ts", Schema.Type.INT)
+      validateTsFieldSchema(tablePath, "ts", HoodieSchemaType.INT)
 
       val metaClient = createMetaClient(spark, tablePath)
       assertEquals(KeyGeneratorType.CUSTOM.getClassName, 
metaClient.getTableConfig.getKeyGeneratorClassName)
     }
     }
   }
 
-  private def validateTsFieldSchema(tablePath: String, fieldName: String, 
expectedType: Schema.Type): Unit = {
+  private def validateTsFieldSchema(tablePath: String, fieldName: String, 
expectedType: HoodieSchemaType): Unit = {
     val metaClient = createMetaClient(spark, tablePath)
     val schemaResolver = new TableSchemaResolver(metaClient)
-    val nullableSchema = Schema.createUnion(Schema.create(Schema.Type.NULL), 
Schema.create(expectedType))
-    assertEquals(nullableSchema, 
schemaResolver.getTableAvroSchema(true).getField(fieldName).schema())
+    val nullableSchema = 
HoodieSchema.createUnion(HoodieSchema.create(HoodieSchemaType.NULL), 
HoodieSchema.create(HoodieSchemaType.valueOf(expectedType.name())))

Review Comment:
   there is a helper method for creating nullable schema that you can use here



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -795,29 +793,30 @@ object HoodieBaseRelation extends SparkAdapterSupport {
    * @param tableSchema schema to project (either of [[InternalSchema]] or 
Avro's [[Schema]])
    * @param requiredColumns required top-level columns to be projected
    */
-  def projectSchema(tableSchema: Either[Schema, InternalSchema], 
requiredColumns: Array[String]): (Schema, StructType, InternalSchema) = {
+  def projectSchema(tableSchema: Either[HoodieSchema, InternalSchema], 
requiredColumns: Array[String]): (HoodieSchema, StructType, InternalSchema) = {
     tableSchema match {
       case Right(internalSchema) =>
         checkState(!internalSchema.isEmptySchema)
         val prunedInternalSchema = 
InternalSchemaUtils.pruneInternalSchema(internalSchema, 
requiredColumns.toList.asJava)
-        val requiredAvroSchema = 
InternalSchemaConverter.convert(prunedInternalSchema, "schema").toAvroSchema
-        val requiredStructSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
+        val requiredSchema = 
InternalSchemaConverter.convert(prunedInternalSchema, "schema")
+        val requiredStructSchema = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(requiredSchema)
 
-        (requiredAvroSchema, requiredStructSchema, prunedInternalSchema)
+        (requiredSchema, requiredStructSchema, prunedInternalSchema)
 
-      case Left(avroSchema) =>
-        val fieldMap = avroSchema.getFields.asScala.map(f => f.name() -> 
f).toMap
+      case Left(hoodieSchema) =>
+        val fieldMap = hoodieSchema.getFields.asScala.map(f => f.name() -> 
f).toMap
         val requiredFields = requiredColumns.map { col =>
-          val f = fieldMap(col)
-          // We have to create a new [[Schema.Field]] since Avro schemas can't 
share field
-          // instances (and will throw "org.apache.avro.AvroRuntimeException: 
Field already used")
-          createNewSchemaField(f.name(), f.schema(), f.doc(), f.defaultVal(), 
f.order())

Review Comment:
   Can we use the util instead?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -58,7 +56,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{Row, SparkSession, SQLContext}
 import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.{convertToCatalystExpression,
 generateUnsafeProjection}
-import org.apache.spark.sql.avro.HoodieSparkAvroSchemaConverters
+import org.apache.spark.sql.avro.{HoodieSparkAvroSchemaConverters, 
HoodieSparkSchemaConverters}

Review Comment:
   Can the `HoodieSparkAvroSchemaConverters` be removed from this file as part 
of this PR?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, 
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, 
HoodieSparkUtils, HoodieTableSchema, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, 
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, 
HoodieSchemaConversionUtils, HoodieSparkUtils, HoodieTableSchema, 
SparkAdapterSupport, SparkFileFormatInternalRowReaderContext}

Review Comment:
   As part of this PR, lets track down the remaining usages of the 
`AvroConversionUtils` and make them use the `HoodieSchema` equivalent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to