This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ec2334c  [HUDI-1716]: Resolving default values for schema from 
dataframe (#2765)
ec2334c is described below

commit ec2334ceac28463d7586d79f6078c69fb3eb4877
Author: Aditya Tiwari <[email protected]>
AuthorDate: Mon Apr 19 19:35:20 2021 +0530

    [HUDI-1716]: Resolving default values for schema from dataframe (#2765)
    
    - Adding default values and setting null as first entry in UNION data types 
in avro schema.
    
    Co-authored-by: Aditya Tiwari <[email protected]>
---
 .../HoodieSparkBootstrapSchemaProvider.java        |   4 +-
 .../org/apache/hudi/AvroConversionHelper.scala     |   3 +-
 .../org/apache/hudi/AvroConversionUtils.scala      |  61 +++++++-
 .../apache/hudi/testutils/DataSourceTestUtils.java |  31 ++++
 .../src/test/resources/exampleEvolvedSchema.txt    |  40 +++++
 .../org/apache/hudi/TestAvroConversionUtils.scala  | 164 +++++++++++++++++++++
 .../functional/HoodieSparkSqlWriterSuite.scala     |  89 ++++++++++-
 .../hudi/utilities/TestSchemaPostProcessor.java    |   4 +-
 .../delta-streamer-config/source-jdbc.avsc         |  24 ++-
 9 files changed, 405 insertions(+), 15 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
index 38f0acd..0dcd744 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client.bootstrap;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.common.bootstrap.FileStatusUtils;
@@ -29,7 +30,6 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.parquet.schema.MessageType;
-import org.apache.spark.sql.avro.SchemaConverters;
 import 
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
 import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.StructType;
@@ -63,6 +63,6 @@ public class HoodieSparkBootstrapSchemaProvider extends 
HoodieBootstrapSchemaPro
     String structName = tableName + "_record";
     String recordNamespace = "hoodie." + tableName;
 
-    return SchemaConverters.toAvroType(sparkSchema, false, structName, 
recordNamespace);
+    return AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, 
structName, recordNamespace);
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
index db1ca6f..31432ae 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
@@ -33,6 +33,7 @@ import 
org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters}
 import org.apache.spark.sql.catalyst.expressions.GenericRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
+import org.apache.hudi.AvroConversionUtils._
 
 import scala.collection.JavaConverters._
 
@@ -340,7 +341,7 @@ object AvroConversionHelper {
           }
         }
       case structType: StructType =>
-        val schema: Schema = SchemaConverters.toAvroType(structType, nullable 
= false, structName, recordNamespace)
+        val schema: Schema = convertStructTypeToAvroSchema(structType, 
structName, recordNamespace)
         val childNameSpace = if (recordNamespace != "") 
s"$recordNamespace.$structName" else structName
         val fieldConverters = structType.fields.map(field =>
           createConverterToAvro(
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 8810126..5b87fee 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -19,6 +19,7 @@
 package org.apache.hudi
 
 import org.apache.avro.Schema
+import org.apache.avro.JsonProperties
 import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, 
IndexedRecord}
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.spark.rdd.RDD
@@ -27,6 +28,7 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
 
 import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
 
 object AvroConversionUtils {
 
@@ -46,10 +48,67 @@ object AvroConversionUtils {
     }
   }
 
+  /**
+    *
+    * Returns avro schema from spark StructType.
+    *
+    * @param structType       Dataframe Struct Type.
+    * @param structName       Avro record name.
+    * @param recordNamespace  Avro record namespace.
+    * @return                 Avro schema corresponding to given struct type.
+    */
   def convertStructTypeToAvroSchema(structType: StructType,
                                     structName: String,
                                     recordNamespace: String): Schema = {
-    SchemaConverters.toAvroType(structType, nullable = false, structName, 
recordNamespace)
+    getAvroSchemaWithDefaults(SchemaConverters.toAvroType(structType, nullable 
= false, structName, recordNamespace))
+  }
+
+  /**
+    *
+    * Method to add default value of null to nullable fields in given avro 
schema
+    *
+    * @param schema     input avro schema
+    * @return           Avro schema with null default set to nullable fields
+    */
+  def getAvroSchemaWithDefaults(schema: Schema): Schema = {
+
+    schema.getType match {
+      case Schema.Type.RECORD => {
+
+        val modifiedFields = schema.getFields.map(field => {
+          val newSchema = getAvroSchemaWithDefaults(field.schema())
+          field.schema().getType match {
+            case Schema.Type.UNION => {
+              val innerFields = newSchema.getTypes
+              val containsNullSchema = 
innerFields.foldLeft(false)((nullFieldEncountered, schema) => 
nullFieldEncountered | schema.getType == Schema.Type.NULL)
+              if(containsNullSchema) {
+                // Need to re shuffle the fields in list because to set null 
as default, null schema must be head in union schema
+                val restructuredNewSchema = 
Schema.createUnion(List(Schema.create(Schema.Type.NULL)) ++ 
innerFields.filter(innerSchema => !(innerSchema.getType == Schema.Type.NULL)))
+                new Schema.Field(field.name(), restructuredNewSchema, 
field.doc(), JsonProperties.NULL_VALUE)
+              } else {
+                new Schema.Field(field.name(), newSchema, field.doc(), 
field.defaultVal())
+              }
+            }
+            case _ => new Schema.Field(field.name(), newSchema, field.doc(), 
field.defaultVal())
+          }
+        }).toList
+        Schema.createRecord(schema.getName, schema.getDoc, 
schema.getNamespace, schema.isError, modifiedFields)
+      }
+
+      case Schema.Type.UNION => {
+        Schema.createUnion(schema.getTypes.map(innerSchema => 
getAvroSchemaWithDefaults(innerSchema)))
+      }
+
+      case Schema.Type.MAP => {
+        Schema.createMap(getAvroSchemaWithDefaults(schema.getValueType))
+      }
+
+      case Schema.Type.ARRAY => {
+        Schema.createArray(getAvroSchemaWithDefaults(schema.getElementType))
+      }
+
+      case _ => schema
+    }
   }
 
   def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
index b0bb509..ebf2324 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
@@ -45,6 +45,10 @@ public class DataSourceTestUtils {
     return new 
Schema.Parser().parse(FileIOUtils.readAsUTFString(DataSourceTestUtils.class.getResourceAsStream("/exampleSchema.txt")));
   }
 
+  public static Schema getStructTypeExampleEvolvedSchema() throws IOException {
+    return new 
Schema.Parser().parse(FileIOUtils.readAsUTFString(DataSourceTestUtils.class.getResourceAsStream("/exampleEvolvedSchema.txt")));
+  }
+
   public static List<Row> generateRandomRows(int count) {
     Random random = new Random();
     List<Row> toReturn = new ArrayList<>();
@@ -58,4 +62,31 @@ public class DataSourceTestUtils {
     }
     return toReturn;
   }
+
+  public static List<Row> generateUpdates(List<Row> records, int count) {
+    List<Row> toReturn = new ArrayList<>();
+    for (int i = 0; i < count; i++) {
+      Object[] values = new Object[3];
+      values[0] = records.get(i).getString(0);
+      values[1] = records.get(i).getAs(1);
+      values[2] = new Date().getTime();
+      toReturn.add(RowFactory.create(values));
+    }
+    return toReturn;
+  }
+
+  public static List<Row> generateRandomRowsEvolvedSchema(int count) {
+    Random random = new Random();
+    List<Row> toReturn = new ArrayList<>();
+    List<String> partitions = Arrays.asList(new String[] 
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, 
DEFAULT_THIRD_PARTITION_PATH});
+    for (int i = 0; i < count; i++) {
+      Object[] values = new Object[4];
+      values[0] = UUID.randomUUID().toString();
+      values[1] = partitions.get(random.nextInt(3));
+      values[2] = new Date().getTime();
+      values[3] = UUID.randomUUID().toString();
+      toReturn.add(RowFactory.create(values));
+    }
+    return toReturn;
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt 
b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt
new file mode 100644
index 0000000..5fcddac
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt
@@ -0,0 +1,40 @@
+/*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ {
+     "namespace": "example.schema",
+     "type": "record",
+     "name": "trip",
+     "fields": [
+         {
+             "name": "_row_key",
+             "type": "string"
+         },
+         {
+             "name": "partition",
+             "type": "string"
+         },
+         {
+             "name": "ts",
+             "type": ["long", "null"]
+         },
+         {
+             "name": "new_field",
+             "type": ["string","null"]
+         }
+     ]
+ }
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
new file mode 100644
index 0000000..50137c9
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.Schema
+import org.apache.spark.sql.types.{DataTypes, StructType, StringType, 
ArrayType}
+import org.scalatest.{FunSuite, Matchers}
+
+class TestAvroConversionUtils extends FunSuite with Matchers {
+
+
+  test("test convertStructTypeToAvroSchema") {
+    val mapType = DataTypes.createMapType(StringType, new 
StructType().add("mapKey", "string", false).add("mapVal", "integer", true))
+    val arrayType =  ArrayType(new StructType().add("arrayKey", "string", 
false).add("arrayVal", "integer", true))
+    val innerStruct = new 
StructType().add("innerKey","string",false).add("value", "long", true)
+
+    val struct = new StructType().add("key", "string", false).add("version", 
"string", true)
+      .add("data1",innerStruct,false).add("data2",innerStruct,true)
+      .add("nullableMap", mapType, true).add("map",mapType,false)
+      .add("nullableArray", arrayType, true).add("array",arrayType,false)
+
+    val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(struct, 
"SchemaName", "SchemaNS")
+
+    val expectedSchemaStr = s"""
+       {
+         "type" : "record",
+         "name" : "SchemaName",
+         "namespace" : "SchemaNS",
+         "fields" : [ {
+           "name" : "key",
+           "type" : "string"
+         }, {
+           "name" : "version",
+           "type" : [ "null", "string" ],
+           "default" : null
+         }, {
+           "name" : "data1",
+           "type" : {
+             "type" : "record",
+             "name" : "data1",
+             "namespace" : "SchemaNS.SchemaName",
+             "fields" : [ {
+               "name" : "innerKey",
+               "type" : "string"
+             }, {
+               "name" : "value",
+               "type" : [ "null", "long" ],
+               "default" : null
+             } ]
+           }
+         }, {
+           "name" : "data2",
+           "type" : [ "null", {
+             "type" : "record",
+             "name" : "data2",
+             "namespace" : "SchemaNS.SchemaName",
+             "fields" : [ {
+               "name" : "innerKey",
+               "type" : "string"
+             }, {
+               "name" : "value",
+               "type" : [ "null", "long" ],
+               "default" : null
+             } ]
+           } ],
+           "default" : null
+         }, {
+           "name" : "nullableMap",
+           "type" : [ "null", {
+             "type" : "map",
+             "values" : [ {
+               "type" : "record",
+               "name" : "nullableMap",
+               "namespace" : "SchemaNS.SchemaName",
+               "fields" : [ {
+                 "name" : "mapKey",
+                 "type" : "string"
+               }, {
+                 "name" : "mapVal",
+                 "type" : [ "null", "int" ],
+                 "default" : null
+               } ]
+             }, "null" ]
+           } ],
+           "default" : null
+         }, {
+           "name" : "map",
+           "type" : {
+             "type" : "map",
+             "values" : [ {
+               "type" : "record",
+               "name" : "map",
+               "namespace" : "SchemaNS.SchemaName",
+               "fields" : [ {
+                 "name" : "mapKey",
+                 "type" : "string"
+               }, {
+                 "name" : "mapVal",
+                 "type" : [ "null", "int" ],
+                 "default" : null
+               } ]
+             }, "null" ]
+           }
+         }, {
+           "name" : "nullableArray",
+           "type" : [ "null", {
+             "type" : "array",
+             "items" : [ {
+               "type" : "record",
+               "name" : "nullableArray",
+               "namespace" : "SchemaNS.SchemaName",
+               "fields" : [ {
+                 "name" : "arrayKey",
+                 "type" : "string"
+               }, {
+                 "name" : "arrayVal",
+                 "type" : [ "null", "int" ],
+                 "default" : null
+               } ]
+             }, "null" ]
+           } ],
+           "default" : null
+         }, {
+           "name" : "array",
+           "type" : {
+             "type" : "array",
+             "items" : [ {
+               "type" : "record",
+               "name" : "array",
+               "namespace" : "SchemaNS.SchemaName",
+               "fields" : [ {
+                 "name" : "arrayKey",
+                 "type" : "string"
+               }, {
+                 "name" : "arrayVal",
+                 "type" : [ "null", "int" ],
+                 "default" : null
+               } ]
+             }, "null" ]
+           }
+         } ]
+       }
+    """
+    val expectedAvroSchema = new Schema.Parser().parse(expectedSchemaStr)
+
+    assert(avroSchema.equals(expectedAvroSchema))
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index bbaeea1..e9f375e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -39,6 +39,7 @@ import org.mockito.Mockito.{spy, times, verify}
 import org.scalatest.{FunSuite, Matchers}
 
 import scala.collection.JavaConversions._
+import org.junit.jupiter.api.Assertions.assertEquals
 
 class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
 
@@ -300,13 +301,14 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
           // generate the inserts
           val schema = DataSourceTestUtils.getStructTypeExampleSchema
           val structType = 
AvroConversionUtils.convertAvroSchemaToStructType(schema)
+          val modifiedSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(structType, "trip", 
"example.schema")
           val records = DataSourceTestUtils.generateRandomRows(100)
           val recordsSeq = convertRowListToSeq(records)
           val df = spark.createDataFrame(sc.parallelize(recordsSeq), 
structType)
 
           val client = spy(DataSourceUtils.createHoodieClient(
             new JavaSparkContext(sc),
-            schema.toString,
+            modifiedSchema.toString,
             path.toAbsolutePath.toString,
             hoodieFooTableName,
             
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
@@ -399,6 +401,91 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
       }
     })
 
+  List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+    .foreach(tableType => {
+      test("test schema evolution for " + tableType) {
+        initSparkContext("test_schema_evolution")
+        val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
+        try {
+          val hoodieFooTableName = "hoodie_foo_tbl"
+          //create a new table
+          val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+            HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+            "hoodie.insert.shuffle.parallelism" -> "1",
+            "hoodie.upsert.shuffle.parallelism" -> "1",
+            DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> tableType,
+            DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+            DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+            DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> 
"org.apache.hudi.keygen.SimpleKeyGenerator")
+          val fooTableParams = 
HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
+
+          // generate the inserts
+          var schema = DataSourceTestUtils.getStructTypeExampleSchema
+          var structType = 
AvroConversionUtils.convertAvroSchemaToStructType(schema)
+          var records = DataSourceTestUtils.generateRandomRows(10)
+          var recordsSeq = convertRowListToSeq(records)
+          var df1 = spark.createDataFrame(sc.parallelize(recordsSeq), 
structType)
+          // write to Hudi
+          HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, 
fooTableParams, df1)
+
+          val snapshotDF1 = spark.read.format("org.apache.hudi")
+            .load(path.toAbsolutePath.toString + "/*/*/*/*")
+          assertEquals(10, snapshotDF1.count())
+
+          // remove metadata columns so that expected and actual DFs can be 
compared as is
+          val trimmedDf1 = 
snapshotDF1.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
+            
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
+            .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
+
+          assert(df1.except(trimmedDf1).count() == 0)
+
+          // issue updates so that log files are created for MOR table
+          var updates = DataSourceTestUtils.generateUpdates(records, 5);
+          var updatesSeq = convertRowListToSeq(updates)
+          var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), 
structType)
+          // write updates to Hudi
+          HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, 
fooTableParams, updatesDf)
+
+          val snapshotDF2 = spark.read.format("org.apache.hudi")
+            .load(path.toAbsolutePath.toString + "/*/*/*/*")
+          assertEquals(10, snapshotDF2.count())
+
+          // remove metadata columns so that expected and actual DFs can be 
compared as is
+          val trimmedDf2 = 
snapshotDF1.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
+            
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
+            .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
+
+          // ensure 2nd batch of updates matches.
+          assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() == 
0)
+
+          // getting new schema with new column
+          schema = DataSourceTestUtils.getStructTypeExampleEvolvedSchema
+          structType = 
AvroConversionUtils.convertAvroSchemaToStructType(schema)
+          records = DataSourceTestUtils.generateRandomRowsEvolvedSchema(5)
+          recordsSeq = convertRowListToSeq(records)
+          val df3 = spark.createDataFrame(sc.parallelize(recordsSeq), 
structType)
+          // write to Hudi with new column
+          HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, 
fooTableParams, df3)
+
+          val snapshotDF3 = spark.read.format("org.apache.hudi")
+            .load(path.toAbsolutePath.toString + "/*/*/*/*")
+          assertEquals(15, snapshotDF3.count())
+
+          // remove metadata columns so that expected and actual DFs can be 
compared as is
+          val trimmedDf3 = 
snapshotDF3.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
+            
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
+            .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
+
+          // ensure 2nd batch of updates matches.
+          assert(df3.intersect(trimmedDf3).except(df3).count() == 0)
+
+        } finally {
+          spark.stop()
+          FileUtils.deleteDirectory(path.toFile)
+        }
+      }
+    })
+
   case class Test(uuid: String, ts: Long)
 
   import scala.collection.JavaConverters
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
index 725743d..b9e9282 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
@@ -47,8 +47,8 @@ public class TestSchemaPostProcessor extends 
UtilitiesTestBase {
   private static String ORIGINAL_SCHEMA = 
"{\"name\":\"t3_biz_operation_t_driver\",\"type\":\"record\",\"fields\":[{\"name\":\"ums_id_\",\"type\":[\"null\",\"string\"],\"default\":null},"
                                               + 
"{\"name\":\"ums_ts_\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
 
-  private static String RESULT_SCHEMA = 
"{\"type\":\"record\",\"name\":\"hoodie_source\",\"namespace\":\"hoodie.source\",\"fields\":[{\"name\":\"ums_id_\",\"type\":[\"string\",\"null\"]},"
-                                            + 
"{\"name\":\"ums_ts_\",\"type\":[\"string\",\"null\"]}]}";
+  private static String RESULT_SCHEMA = 
"{\"type\":\"record\",\"name\":\"hoodie_source\",\"namespace\":\"hoodie.source\",\"fields\":[{\"name\":\"ums_id_\",\"type\":[\"null\",\"string\"],"
+                                              + 
"\"default\":null},{\"name\":\"ums_ts_\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
 
   @Test
   public void testPostProcessor() throws IOException {
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/source-jdbc.avsc 
b/hudi-utilities/src/test/resources/delta-streamer-config/source-jdbc.avsc
index cb8697d..e7943b0 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/source-jdbc.avsc
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/source-jdbc.avsc
@@ -26,34 +26,42 @@
   },
   {
        "name": "TIMESTAMP",
-       "type": ["double", "null"]
+       "type": ["null", "double"],
+       "default": null
   },
   {
        "name": "RIDER",
-       "type": ["string", "null"]
+       "type": ["null", "string"],
+       "default": null
   },
   {
        "name": "DRIVER",
-       "type": ["string", "null"]
+       "type": ["null" ,"string"],
+       "default": null
   },
   {
        "name": "BEGIN_LAT",
-       "type": ["double", "null"]
+       "type": ["null", "double"],
+       "default": null
   },
   {
        "name": "BEGIN_LON",
-       "type": ["double", "null"]
+       "type": ["null", "double"],
+       "default": null
   },
   {
        "name": "END_LAT",
-       "type": ["double", "null"]
+       "type": ["null", "double"],
+       "default": null
   },
   {
        "name": "END_LON",
-       "type": ["double", "null"]
+       "type": ["null", "double"],
+       "default": null
   },
   {
        "name": "FARE",
-       "type": ["double", "null"]
+       "type": ["null", "double"],
+       "default": null
   } ]
 }
\ No newline at end of file

Reply via email to