the-other-tim-brown commented on code in PR #17573:
URL: https://github.com/apache/hudi/pull/17573#discussion_r2620319288
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala:
##########
@@ -53,15 +53,15 @@ trait SparkFileFormatInternalRecordContext extends
BaseSparkInternalRecordContex
val schema = avroRecord.getSchema
val structType = HoodieInternalRowUtils.getCachedSchema(schema)
val deserializer = deserializerMap.getOrElseUpdate(schema, {
- sparkAdapter.createAvroDeserializer(schema, structType)
+ sparkAdapter.createAvroDeserializer(HoodieSchema.fromAvroSchema(schema),
structType)
})
deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow]
}
override def convertToAvroRecord(record: InternalRow, schema: HoodieSchema):
GenericRecord = {
val structType =
HoodieInternalRowUtils.getCachedSchema(schema.toAvroSchema)
val serializer = serializerMap.getOrElseUpdate(schema.toAvroSchema, {
- sparkAdapter.createAvroSerializer(structType, schema.toAvroSchema,
isNullable(schema.toAvroSchema))
+ sparkAdapter.createAvroSerializer(structType, schema,
isNullable(schema.toAvroSchema))
Review Comment:
Can we simplify the `isNullable` to just use the `HoodieSchema`'s method
directly?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java:
##########
@@ -74,7 +74,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieMergeHandleWithSparkMerger extends
SparkClientFunctionalTestHarness {
- private static final Schema SCHEMA = getAvroSchema("AvroSchema",
"AvroSchemaNS");
+ private static final HoodieSchema SCHEMA = getSchema("AvroSchema",
"AvroSchemaNS");
Review Comment:
nitpick: remove `Avro` from the name and namespace
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieRelations.scala:
##########
@@ -40,13 +39,13 @@ class TestHoodieRelations {
"{\"name\":\"ts\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}],\"default\":null},"
+
"{\"name\":\"partition\",\"type\":[\"null\",\"string\"],\"default\":null}]}"
- val tableAvroSchema = new Schema.Parser().parse(avroSchemaString)
- val tableStructSchema = convertAvroSchemaToStructType(tableAvroSchema)
+ val tableAvroSchema = HoodieSchema.parse(avroSchemaString)
Review Comment:
Update variable names to avoid using `avro` to avoid confusion
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodieMergeHandleWithSparkMerger.java:
##########
@@ -144,11 +144,11 @@ public static List<HoodieKey> getKeys(List<HoodieRecord>
records) {
return records.stream().map(r -> r.getKey()).collect(Collectors.toList());
}
- private static Schema getAvroSchema(String schemaName, String
schemaNameSpace) {
- return
AvroConversionUtils.convertStructTypeToAvroSchema(SparkDatasetTestUtils.STRUCT_TYPE,
schemaName, schemaNameSpace);
+ private static HoodieSchema getSchema(String schemaName, String
schemaNameSpace) {
+ return
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(SparkDatasetTestUtils.STRUCT_TYPE,
schemaName, schemaNameSpace);
}
- public HoodieWriteConfig getWriteConfig(Schema avroSchema, String
recordMergerImplClass, String mergeStrategyId, RecordMergeMode recordMergeMode)
{
+ public HoodieWriteConfig getWriteConfig(HoodieSchema hoodieSchema, String
recordMergerImplClass, String mergeStrategyId, RecordMergeMode recordMergeMode)
{
Review Comment:
For consistency, let's make the `hoodieSchema` simply `schema` when possible
in all the classes.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -841,13 +840,13 @@ object HoodieBaseRelation extends SparkAdapterSupport {
val requiredRowSchema = requiredDataSchema.structTypeSchema
// NOTE: Schema has to be parsed at this point, since Avro's [[Schema]]
aren't serializable
// to be passed from driver to executor
- val requiredAvroSchema = new
Schema.Parser().parse(requiredDataSchema.avroSchemaStr)
- val avroToRowConverter =
AvroConversionUtils.createAvroToInternalRowConverter(requiredAvroSchema,
requiredRowSchema)
+ val requiredSchema = HoodieSchema.parse(requiredDataSchema.avroSchemaStr)
+ //TODO to fill impl for this
Review Comment:
Can you add some more details on the remaining work for this implementation?
Should it be a separate ticket?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -94,13 +95,13 @@ trait SparkAdapter extends Serializable {
* Creates instance of [[HoodieAvroSerializer]] providing for ability to
serialize
* Spark's [[InternalRow]] into Avro payloads
*/
- def createAvroSerializer(rootCatalystType: DataType, rootAvroType: Schema,
nullable: Boolean): HoodieAvroSerializer
+ def createAvroSerializer(rootCatalystType: DataType, rootHoodieType:
HoodieSchema, nullable: Boolean): HoodieAvroSerializer
Review Comment:
Instead of `rootHoodieType`, would it make more sense to have this as
`rootTableType` or simply `rootType`?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestTableSchemaResolverWithSparkSQL.scala:
##########
@@ -122,10 +122,11 @@ class TestTableSchemaResolverWithSparkSQL extends
HoodieSparkWriterTestBase {
metaClient.reloadActiveTimeline()
var tableSchemaResolverParsingException: Exception = null
try {
- val schemaFromData = new
TableSchemaResolver(metaClient).getTableAvroSchemaFromDataFile
- val structFromData =
AvroConversionUtils.convertAvroSchemaToStructType(HoodieAvroUtils.removeMetadataFields(schemaFromData))
- val schemeDesign = new Schema.Parser().parse(schemaString)
- val structDesign =
AvroConversionUtils.convertAvroSchemaToStructType(schemeDesign)
+ val schemaFromData = HoodieSchema.fromAvroSchema(
+ new TableSchemaResolver(metaClient).getTableAvroSchemaFromDataFile)
+ val structFromData =
AvroConversionUtils.convertAvroSchemaToStructType(HoodieAvroUtils.removeMetadataFields(schemaFromData.toAvroSchema()))
Review Comment:
Can we use `HoodieSchemaConversionUtils.convertHoodieSchemaToStructType`
here?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/avro/TestAvroSerDe.scala:
##########
@@ -51,8 +52,8 @@ class TestAvroSerDe extends SparkAdapterSupport {
val avroSchema = HoodieMetadataColumnStats.SCHEMA$
Review Comment:
Let's update this to be a `HoodieSchema` instead of converting it twice below
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala:
##########
@@ -502,19 +502,19 @@ class TestSparkSqlWithCustomKeyGenerator extends
HoodieSparkSqlTestBase {
testInserts(tableName, tsGenFunc, customPartitionFunc)
// Validate ts field is still of type int in the table
- validateTsFieldSchema(tablePath, "ts", Schema.Type.INT)
+ validateTsFieldSchema(tablePath, "ts", HoodieSchemaType.INT)
val metaClient = createMetaClient(spark, tablePath)
assertEquals(KeyGeneratorType.CUSTOM.getClassName,
metaClient.getTableConfig.getKeyGeneratorClassName)
}
}
}
- private def validateTsFieldSchema(tablePath: String, fieldName: String,
expectedType: Schema.Type): Unit = {
+ private def validateTsFieldSchema(tablePath: String, fieldName: String,
expectedType: HoodieSchemaType): Unit = {
val metaClient = createMetaClient(spark, tablePath)
val schemaResolver = new TableSchemaResolver(metaClient)
- val nullableSchema = Schema.createUnion(Schema.create(Schema.Type.NULL),
Schema.create(expectedType))
- assertEquals(nullableSchema,
schemaResolver.getTableAvroSchema(true).getField(fieldName).schema())
+ val nullableSchema =
HoodieSchema.createUnion(HoodieSchema.create(HoodieSchemaType.NULL),
HoodieSchema.create(HoodieSchemaType.valueOf(expectedType.name())))
Review Comment:
there is a helper method for creating nullable schema that you can use here
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -795,29 +793,30 @@ object HoodieBaseRelation extends SparkAdapterSupport {
* @param tableSchema schema to project (either of [[InternalSchema]] or
Avro's [[Schema]])
* @param requiredColumns required top-level columns to be projected
*/
- def projectSchema(tableSchema: Either[Schema, InternalSchema],
requiredColumns: Array[String]): (Schema, StructType, InternalSchema) = {
+ def projectSchema(tableSchema: Either[HoodieSchema, InternalSchema],
requiredColumns: Array[String]): (HoodieSchema, StructType, InternalSchema) = {
tableSchema match {
case Right(internalSchema) =>
checkState(!internalSchema.isEmptySchema)
val prunedInternalSchema =
InternalSchemaUtils.pruneInternalSchema(internalSchema,
requiredColumns.toList.asJava)
- val requiredAvroSchema =
InternalSchemaConverter.convert(prunedInternalSchema, "schema").toAvroSchema
- val requiredStructSchema =
AvroConversionUtils.convertAvroSchemaToStructType(requiredAvroSchema)
+ val requiredSchema =
InternalSchemaConverter.convert(prunedInternalSchema, "schema")
+ val requiredStructSchema =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(requiredSchema)
- (requiredAvroSchema, requiredStructSchema, prunedInternalSchema)
+ (requiredSchema, requiredStructSchema, prunedInternalSchema)
- case Left(avroSchema) =>
- val fieldMap = avroSchema.getFields.asScala.map(f => f.name() ->
f).toMap
+ case Left(hoodieSchema) =>
+ val fieldMap = hoodieSchema.getFields.asScala.map(f => f.name() ->
f).toMap
val requiredFields = requiredColumns.map { col =>
- val f = fieldMap(col)
- // We have to create a new [[Schema.Field]] since Avro schemas can't
share field
- // instances (and will throw "org.apache.avro.AvroRuntimeException:
Field already used")
- createNewSchemaField(f.name(), f.schema(), f.doc(), f.defaultVal(),
f.order())
Review Comment:
Can we use the util instead?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -58,7 +56,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession, SQLContext}
import
org.apache.spark.sql.HoodieCatalystExpressionUtils.{convertToCatalystExpression,
generateUnsafeProjection}
-import org.apache.spark.sql.avro.HoodieSparkAvroSchemaConverters
+import org.apache.spark.sql.avro.{HoodieSparkAvroSchemaConverters,
HoodieSparkSchemaConverters}
Review Comment:
Can the `HoodieSparkAvroSchemaConverters` be removed from this file as part
of this PR?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala:
##########
@@ -17,7 +17,7 @@
package org.apache.spark.sql.execution.datasources.parquet
-import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex,
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping,
HoodieSparkUtils, HoodieTableSchema, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex,
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping,
HoodieSchemaConversionUtils, HoodieSparkUtils, HoodieTableSchema,
SparkAdapterSupport, SparkFileFormatInternalRowReaderContext}
Review Comment:
As part of this PR, lets track down the remaining usages of the
`AvroConversionUtils` and make them use the `HoodieSchema` equivalent
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]