aditiwari01 commented on a change in pull request #2765:
URL: https://github.com/apache/hudi/pull/2765#discussion_r615864702
##########
File path:
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -46,10 +48,67 @@ object AvroConversionUtils {
}
}
+ /**
+ *
+ * Returns avro schema from spark StructType.
+ *
+ * @param structType Dataframe Struct Type.
+ * @param structName Avro record name.
+ * @param recordNamespace Avro record namespace.
+ * @return Avro schema corresponding to given struct type.
+ */
def convertStructTypeToAvroSchema(structType: StructType,
structName: String,
recordNamespace: String): Schema = {
- SchemaConverters.toAvroType(structType, nullable = false, structName,
recordNamespace)
+ getAvroSchemaWithDefaults(SchemaConverters.toAvroType(structType, nullable
= false, structName, recordNamespace))
+ }
+
+ /**
+ *
+ * Method to add default value of null to nullable fields in given avro
schema
+ *
+ * @param schema input avro schema
+ * @return Avro schema with null default set to nullable fields
+ */
+ def getAvroSchemaWithDefaults(schema: Schema): Schema = {
+
+ schema.getType match {
+ case Schema.Type.RECORD => {
+
+ val modifiedFields = schema.getFields.map(field => {
+ val newSchema = getAvroSchemaWithDefaults(field.schema())
+ field.schema().getType match {
+ case Schema.Type.UNION => {
+ val innerFields = newSchema.getTypes
+ val containsNullSchema =
innerFields.foldLeft(false)((nullFieldEncountered, schema) =>
nullFieldEncountered | schema.getType == Schema.Type.NULL)
+ if(containsNullSchema) {
+ // Need to re shuffle the fields in list because to set null
as default, null schema must be head in union schema
+ val restructuredNewSchema =
Schema.createUnion(List(Schema.create(Schema.Type.NULL)) ++
innerFields.filter(innerSchema => !(innerSchema.getType == Schema.Type.NULL)))
+ new Schema.Field(field.name(), restructuredNewSchema,
field.doc(), JsonProperties.NULL_VALUE)
Review comment:
This method is used to get Avro schema from spark dataframe struct type.
We can not assign default values in struct. Hence here it does not make sense
to have some specific value as default. Either the column would be present in
dataframe or not. If it's not present and it is nullable, default must be added
as null.
Any default logic that actual users might want must go in their business
logic before calling df.write since spark itself doesnot handle default value.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]