nsivabalan commented on a change in pull request #2765:
URL: https://github.com/apache/hudi/pull/2765#discussion_r615879081



##########
File path: 
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
##########
@@ -46,10 +48,67 @@ object AvroConversionUtils {
     }
   }
 
+  /**
+    *
+    * Returns avro schema from spark StructType.
+    *
+    * @param structType       Dataframe Struct Type.
+    * @param structName       Avro record name.
+    * @param recordNamespace  Avro record namespace.
+    * @return                 Avro schema corresponding to given struct type.
+    */
   def convertStructTypeToAvroSchema(structType: StructType,
                                     structName: String,
                                     recordNamespace: String): Schema = {
-    SchemaConverters.toAvroType(structType, nullable = false, structName, 
recordNamespace)
+    getAvroSchemaWithDefaults(SchemaConverters.toAvroType(structType, nullable 
= false, structName, recordNamespace))
+  }
+
+  /**
+    *
+    * Method to add default value of null to nullable fields in given avro 
schema
+    *
+    * @param schema     input avro schema
+    * @return           Avro schema with null default set to nullable fields
+    */
+  def getAvroSchemaWithDefaults(schema: Schema): Schema = {
+
+    schema.getType match {
+      case Schema.Type.RECORD => {
+
+        val modifiedFields = schema.getFields.map(field => {
+          val newSchema = getAvroSchemaWithDefaults(field.schema())
+          field.schema().getType match {
+            case Schema.Type.UNION => {
+              val innerFields = newSchema.getTypes
+              val containsNullSchema = 
innerFields.foldLeft(false)((nullFieldEncountered, schema) => 
nullFieldEncountered | schema.getType == Schema.Type.NULL)
+              if(containsNullSchema) {
+                // Need to re shuffle the fields in list because to set null 
as default, null schema must be head in union schema
+                val restructuredNewSchema = 
Schema.createUnion(List(Schema.create(Schema.Type.NULL)) ++ 
innerFields.filter(innerSchema => !(innerSchema.getType == Schema.Type.NULL)))
+                new Schema.Field(field.name(), restructuredNewSchema, 
field.doc(), JsonProperties.NULL_VALUE)

Review comment:
       sounds good 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to