aditiwari01 commented on a change in pull request #2765:
URL: https://github.com/apache/hudi/pull/2765#discussion_r615864702



##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -46,10 +48,67 @@ object AvroConversionUtils {
     }
   }
 
+  /**
+    *
+    * Returns avro schema from spark StructType.
+    *
+    * @param structType       Dataframe Struct Type.
+    * @param structName       Avro record name.
+    * @param recordNamespace  Avro record namespace.
+    * @return                 Avro schema corresponding to given struct type.
+    */
   def convertStructTypeToAvroSchema(structType: StructType,
                                     structName: String,
                                     recordNamespace: String): Schema = {
-    SchemaConverters.toAvroType(structType, nullable = false, structName, 
recordNamespace)
+    getAvroSchemaWithDefaults(SchemaConverters.toAvroType(structType, nullable 
= false, structName, recordNamespace))
+  }
+
+  /**
+    *
+    * Method to add default value of null to nullable fields in given avro 
schema
+    *
+    * @param schema     input avro schema
+    * @return           Avro schema with null default set to nullable fields
+    */
+  def getAvroSchemaWithDefaults(schema: Schema): Schema = {
+
+    schema.getType match {
+      case Schema.Type.RECORD => {
+
+        val modifiedFields = schema.getFields.map(field => {
+          val newSchema = getAvroSchemaWithDefaults(field.schema())
+          field.schema().getType match {
+            case Schema.Type.UNION => {
+              val innerFields = newSchema.getTypes
+              val containsNullSchema = 
innerFields.foldLeft(false)((nullFieldEncountered, schema) => 
nullFieldEncountered | schema.getType == Schema.Type.NULL)
+              if(containsNullSchema) {
+                // Need to re shuffle the fields in list because to set null 
as default, null schema must be head in union schema
+                val restructuredNewSchema = 
Schema.createUnion(List(Schema.create(Schema.Type.NULL)) ++ 
innerFields.filter(innerSchema => !(innerSchema.getType == Schema.Type.NULL)))
+                new Schema.Field(field.name(), restructuredNewSchema, 
field.doc(), JsonProperties.NULL_VALUE)

Review comment:
       This method is used to get Avro schema from spark dataframe struct type. 
We can not assign default values in struct. Hence here it does not make sense 
to have some specific value as default. Either the column would be present in 
dataframe or not. If it's not present and it is nullable, default must be added 
as null.
   
   Any default logic that actual users might want must go in their business 
logic before calling df.write since spark itself doesnot handle default value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to