This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ec2334c [HUDI-1716]: Resolving default values for schema from
dataframe (#2765)
ec2334c is described below
commit ec2334ceac28463d7586d79f6078c69fb3eb4877
Author: Aditya Tiwari <[email protected]>
AuthorDate: Mon Apr 19 19:35:20 2021 +0530
[HUDI-1716]: Resolving default values for schema from dataframe (#2765)
- Adding default values and setting null as first entry in UNION data types
in avro schema.
Co-authored-by: Aditya Tiwari <[email protected]>
---
.../HoodieSparkBootstrapSchemaProvider.java | 4 +-
.../org/apache/hudi/AvroConversionHelper.scala | 3 +-
.../org/apache/hudi/AvroConversionUtils.scala | 61 +++++++-
.../apache/hudi/testutils/DataSourceTestUtils.java | 31 ++++
.../src/test/resources/exampleEvolvedSchema.txt | 40 +++++
.../org/apache/hudi/TestAvroConversionUtils.scala | 164 +++++++++++++++++++++
.../functional/HoodieSparkSqlWriterSuite.scala | 89 ++++++++++-
.../hudi/utilities/TestSchemaPostProcessor.java | 4 +-
.../delta-streamer-config/source-jdbc.avsc | 24 ++-
9 files changed, 405 insertions(+), 15 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
index 38f0acd..0dcd744 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
@@ -20,6 +20,7 @@ package org.apache.hudi.client.bootstrap;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
@@ -29,7 +30,6 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.parquet.schema.MessageType;
-import org.apache.spark.sql.avro.SchemaConverters;
import
org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.StructType;
@@ -63,6 +63,6 @@ public class HoodieSparkBootstrapSchemaProvider extends
HoodieBootstrapSchemaPro
String structName = tableName + "_record";
String recordNamespace = "hoodie." + tableName;
- return SchemaConverters.toAvroType(sparkSchema, false, structName,
recordNamespace);
+ return AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema,
structName, recordNamespace);
}
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
index db1ca6f..31432ae 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionHelper.scala
@@ -33,6 +33,7 @@ import
org.apache.spark.sql.avro.{IncompatibleSchemaException, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.GenericRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
+import org.apache.hudi.AvroConversionUtils._
import scala.collection.JavaConverters._
@@ -340,7 +341,7 @@ object AvroConversionHelper {
}
}
case structType: StructType =>
- val schema: Schema = SchemaConverters.toAvroType(structType, nullable
= false, structName, recordNamespace)
+ val schema: Schema = convertStructTypeToAvroSchema(structType,
structName, recordNamespace)
val childNameSpace = if (recordNamespace != "")
s"$recordNamespace.$structName" else structName
val fieldConverters = structType.fields.map(field =>
createConverterToAvro(
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 8810126..5b87fee 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -19,6 +19,7 @@
package org.apache.hudi
import org.apache.avro.Schema
+import org.apache.avro.JsonProperties
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder,
IndexedRecord}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.spark.rdd.RDD
@@ -27,6 +28,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
object AvroConversionUtils {
@@ -46,10 +48,67 @@ object AvroConversionUtils {
}
}
+ /**
+ *
+ * Returns avro schema from spark StructType.
+ *
+ * @param structType Dataframe Struct Type.
+ * @param structName Avro record name.
+ * @param recordNamespace Avro record namespace.
+ * @return Avro schema corresponding to given struct type.
+ */
def convertStructTypeToAvroSchema(structType: StructType,
structName: String,
recordNamespace: String): Schema = {
- SchemaConverters.toAvroType(structType, nullable = false, structName,
recordNamespace)
+ getAvroSchemaWithDefaults(SchemaConverters.toAvroType(structType, nullable
= false, structName, recordNamespace))
+ }
+
+ /**
+ *
+ * Method to add default value of null to nullable fields in given avro
schema
+ *
+ * @param schema input avro schema
+ * @return Avro schema with null default set to nullable fields
+ */
+ def getAvroSchemaWithDefaults(schema: Schema): Schema = {
+
+ schema.getType match {
+ case Schema.Type.RECORD => {
+
+ val modifiedFields = schema.getFields.map(field => {
+ val newSchema = getAvroSchemaWithDefaults(field.schema())
+ field.schema().getType match {
+ case Schema.Type.UNION => {
+ val innerFields = newSchema.getTypes
+ val containsNullSchema =
innerFields.foldLeft(false)((nullFieldEncountered, schema) =>
nullFieldEncountered | schema.getType == Schema.Type.NULL)
+ if(containsNullSchema) {
+ // Need to re shuffle the fields in list because to set null
as default, null schema must be head in union schema
+ val restructuredNewSchema =
Schema.createUnion(List(Schema.create(Schema.Type.NULL)) ++
innerFields.filter(innerSchema => !(innerSchema.getType == Schema.Type.NULL)))
+ new Schema.Field(field.name(), restructuredNewSchema,
field.doc(), JsonProperties.NULL_VALUE)
+ } else {
+ new Schema.Field(field.name(), newSchema, field.doc(),
field.defaultVal())
+ }
+ }
+ case _ => new Schema.Field(field.name(), newSchema, field.doc(),
field.defaultVal())
+ }
+ }).toList
+ Schema.createRecord(schema.getName, schema.getDoc,
schema.getNamespace, schema.isError, modifiedFields)
+ }
+
+ case Schema.Type.UNION => {
+ Schema.createUnion(schema.getTypes.map(innerSchema =>
getAvroSchemaWithDefaults(innerSchema)))
+ }
+
+ case Schema.Type.MAP => {
+ Schema.createMap(getAvroSchemaWithDefaults(schema.getValueType))
+ }
+
+ case Schema.Type.ARRAY => {
+ Schema.createArray(getAvroSchemaWithDefaults(schema.getElementType))
+ }
+
+ case _ => schema
+ }
}
def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
index b0bb509..ebf2324 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
@@ -45,6 +45,10 @@ public class DataSourceTestUtils {
return new
Schema.Parser().parse(FileIOUtils.readAsUTFString(DataSourceTestUtils.class.getResourceAsStream("/exampleSchema.txt")));
}
+ public static Schema getStructTypeExampleEvolvedSchema() throws IOException {
+ return new
Schema.Parser().parse(FileIOUtils.readAsUTFString(DataSourceTestUtils.class.getResourceAsStream("/exampleEvolvedSchema.txt")));
+ }
+
public static List<Row> generateRandomRows(int count) {
Random random = new Random();
List<Row> toReturn = new ArrayList<>();
@@ -58,4 +62,31 @@ public class DataSourceTestUtils {
}
return toReturn;
}
+
+ public static List<Row> generateUpdates(List<Row> records, int count) {
+ List<Row> toReturn = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ Object[] values = new Object[3];
+ values[0] = records.get(i).getString(0);
+ values[1] = records.get(i).getAs(1);
+ values[2] = new Date().getTime();
+ toReturn.add(RowFactory.create(values));
+ }
+ return toReturn;
+ }
+
+ public static List<Row> generateRandomRowsEvolvedSchema(int count) {
+ Random random = new Random();
+ List<Row> toReturn = new ArrayList<>();
+ List<String> partitions = Arrays.asList(new String[]
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH,
DEFAULT_THIRD_PARTITION_PATH});
+ for (int i = 0; i < count; i++) {
+ Object[] values = new Object[4];
+ values[0] = UUID.randomUUID().toString();
+ values[1] = partitions.get(random.nextInt(3));
+ values[2] = new Date().getTime();
+ values[3] = UUID.randomUUID().toString();
+ toReturn.add(RowFactory.create(values));
+ }
+ return toReturn;
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt
b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt
new file mode 100644
index 0000000..5fcddac
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/resources/exampleEvolvedSchema.txt
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ {
+ "namespace": "example.schema",
+ "type": "record",
+ "name": "trip",
+ "fields": [
+ {
+ "name": "_row_key",
+ "type": "string"
+ },
+ {
+ "name": "partition",
+ "type": "string"
+ },
+ {
+ "name": "ts",
+ "type": ["long", "null"]
+ },
+ {
+ "name": "new_field",
+ "type": ["string","null"]
+ }
+ ]
+ }
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
new file mode 100644
index 0000000..50137c9
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.Schema
+import org.apache.spark.sql.types.{DataTypes, StructType, StringType,
ArrayType}
+import org.scalatest.{FunSuite, Matchers}
+
+class TestAvroConversionUtils extends FunSuite with Matchers {
+
+
+ test("test convertStructTypeToAvroSchema") {
+ val mapType = DataTypes.createMapType(StringType, new
StructType().add("mapKey", "string", false).add("mapVal", "integer", true))
+ val arrayType = ArrayType(new StructType().add("arrayKey", "string",
false).add("arrayVal", "integer", true))
+ val innerStruct = new
StructType().add("innerKey","string",false).add("value", "long", true)
+
+ val struct = new StructType().add("key", "string", false).add("version",
"string", true)
+ .add("data1",innerStruct,false).add("data2",innerStruct,true)
+ .add("nullableMap", mapType, true).add("map",mapType,false)
+ .add("nullableArray", arrayType, true).add("array",arrayType,false)
+
+ val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(struct,
"SchemaName", "SchemaNS")
+
+ val expectedSchemaStr = s"""
+ {
+ "type" : "record",
+ "name" : "SchemaName",
+ "namespace" : "SchemaNS",
+ "fields" : [ {
+ "name" : "key",
+ "type" : "string"
+ }, {
+ "name" : "version",
+ "type" : [ "null", "string" ],
+ "default" : null
+ }, {
+ "name" : "data1",
+ "type" : {
+ "type" : "record",
+ "name" : "data1",
+ "namespace" : "SchemaNS.SchemaName",
+ "fields" : [ {
+ "name" : "innerKey",
+ "type" : "string"
+ }, {
+ "name" : "value",
+ "type" : [ "null", "long" ],
+ "default" : null
+ } ]
+ }
+ }, {
+ "name" : "data2",
+ "type" : [ "null", {
+ "type" : "record",
+ "name" : "data2",
+ "namespace" : "SchemaNS.SchemaName",
+ "fields" : [ {
+ "name" : "innerKey",
+ "type" : "string"
+ }, {
+ "name" : "value",
+ "type" : [ "null", "long" ],
+ "default" : null
+ } ]
+ } ],
+ "default" : null
+ }, {
+ "name" : "nullableMap",
+ "type" : [ "null", {
+ "type" : "map",
+ "values" : [ {
+ "type" : "record",
+ "name" : "nullableMap",
+ "namespace" : "SchemaNS.SchemaName",
+ "fields" : [ {
+ "name" : "mapKey",
+ "type" : "string"
+ }, {
+ "name" : "mapVal",
+ "type" : [ "null", "int" ],
+ "default" : null
+ } ]
+ }, "null" ]
+ } ],
+ "default" : null
+ }, {
+ "name" : "map",
+ "type" : {
+ "type" : "map",
+ "values" : [ {
+ "type" : "record",
+ "name" : "map",
+ "namespace" : "SchemaNS.SchemaName",
+ "fields" : [ {
+ "name" : "mapKey",
+ "type" : "string"
+ }, {
+ "name" : "mapVal",
+ "type" : [ "null", "int" ],
+ "default" : null
+ } ]
+ }, "null" ]
+ }
+ }, {
+ "name" : "nullableArray",
+ "type" : [ "null", {
+ "type" : "array",
+ "items" : [ {
+ "type" : "record",
+ "name" : "nullableArray",
+ "namespace" : "SchemaNS.SchemaName",
+ "fields" : [ {
+ "name" : "arrayKey",
+ "type" : "string"
+ }, {
+ "name" : "arrayVal",
+ "type" : [ "null", "int" ],
+ "default" : null
+ } ]
+ }, "null" ]
+ } ],
+ "default" : null
+ }, {
+ "name" : "array",
+ "type" : {
+ "type" : "array",
+ "items" : [ {
+ "type" : "record",
+ "name" : "array",
+ "namespace" : "SchemaNS.SchemaName",
+ "fields" : [ {
+ "name" : "arrayKey",
+ "type" : "string"
+ }, {
+ "name" : "arrayVal",
+ "type" : [ "null", "int" ],
+ "default" : null
+ } ]
+ }, "null" ]
+ }
+ } ]
+ }
+ """
+ val expectedAvroSchema = new Schema.Parser().parse(expectedSchemaStr)
+
+ assert(avroSchema.equals(expectedAvroSchema))
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index bbaeea1..e9f375e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -39,6 +39,7 @@ import org.mockito.Mockito.{spy, times, verify}
import org.scalatest.{FunSuite, Matchers}
import scala.collection.JavaConversions._
+import org.junit.jupiter.api.Assertions.assertEquals
class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
@@ -300,13 +301,14 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType =
AvroConversionUtils.convertAvroSchemaToStructType(schema)
+ val modifiedSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(structType, "trip",
"example.schema")
val records = DataSourceTestUtils.generateRandomRows(100)
val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(sc.parallelize(recordsSeq),
structType)
val client = spy(DataSourceUtils.createHoodieClient(
new JavaSparkContext(sc),
- schema.toString,
+ modifiedSchema.toString,
path.toAbsolutePath.toString,
hoodieFooTableName,
mapAsJavaMap(fooTableParams)).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]])
@@ -399,6 +401,91 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
}
})
+ List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .foreach(tableType => {
+ test("test schema evolution for " + tableType) {
+ initSparkContext("test_schema_evolution")
+ val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
+ try {
+ val hoodieFooTableName = "hoodie_foo_tbl"
+ //create a new table
+ val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+ HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> tableType,
+ DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+ DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY ->
"org.apache.hudi.keygen.SimpleKeyGenerator")
+ val fooTableParams =
HoodieWriterUtils.parametersWithWriteDefaults(fooTableModifier)
+
+ // generate the inserts
+ var schema = DataSourceTestUtils.getStructTypeExampleSchema
+ var structType =
AvroConversionUtils.convertAvroSchemaToStructType(schema)
+ var records = DataSourceTestUtils.generateRandomRows(10)
+ var recordsSeq = convertRowListToSeq(records)
+ var df1 = spark.createDataFrame(sc.parallelize(recordsSeq),
structType)
+ // write to Hudi
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite,
fooTableParams, df1)
+
+ val snapshotDF1 = spark.read.format("org.apache.hudi")
+ .load(path.toAbsolutePath.toString + "/*/*/*/*")
+ assertEquals(10, snapshotDF1.count())
+
+ // remove metadata columns so that expected and actual DFs can be
compared as is
+ val trimmedDf1 =
snapshotDF1.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
+
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
+ .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
+
+ assert(df1.except(trimmedDf1).count() == 0)
+
+ // issue updates so that log files are created for MOR table
+ var updates = DataSourceTestUtils.generateUpdates(records, 5);
+ var updatesSeq = convertRowListToSeq(updates)
+ var updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq),
structType)
+ // write updates to Hudi
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append,
fooTableParams, updatesDf)
+
+ val snapshotDF2 = spark.read.format("org.apache.hudi")
+ .load(path.toAbsolutePath.toString + "/*/*/*/*")
+ assertEquals(10, snapshotDF2.count())
+
+ // remove metadata columns so that expected and actual DFs can be
compared as is
+ val trimmedDf2 =
snapshotDF1.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
+
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
+ .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
+
+ // ensure 2nd batch of updates matches.
+ assert(updatesDf.intersect(trimmedDf2).except(updatesDf).count() ==
0)
+
+ // getting new schema with new column
+ schema = DataSourceTestUtils.getStructTypeExampleEvolvedSchema
+ structType =
AvroConversionUtils.convertAvroSchemaToStructType(schema)
+ records = DataSourceTestUtils.generateRandomRowsEvolvedSchema(5)
+ recordsSeq = convertRowListToSeq(records)
+ val df3 = spark.createDataFrame(sc.parallelize(recordsSeq),
structType)
+ // write to Hudi with new column
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append,
fooTableParams, df3)
+
+ val snapshotDF3 = spark.read.format("org.apache.hudi")
+ .load(path.toAbsolutePath.toString + "/*/*/*/*")
+ assertEquals(15, snapshotDF3.count())
+
+ // remove metadata columns so that expected and actual DFs can be
compared as is
+ val trimmedDf3 =
snapshotDF3.drop(HoodieRecord.HOODIE_META_COLUMNS.get(0)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(1))
+
.drop(HoodieRecord.HOODIE_META_COLUMNS.get(2)).drop(HoodieRecord.HOODIE_META_COLUMNS.get(3))
+ .drop(HoodieRecord.HOODIE_META_COLUMNS.get(4))
+
+ // ensure 2nd batch of updates matches.
+ assert(df3.intersect(trimmedDf3).except(df3).count() == 0)
+
+ } finally {
+ spark.stop()
+ FileUtils.deleteDirectory(path.toFile)
+ }
+ }
+ })
+
case class Test(uuid: String, ts: Long)
import scala.collection.JavaConverters
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
index 725743d..b9e9282 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestSchemaPostProcessor.java
@@ -47,8 +47,8 @@ public class TestSchemaPostProcessor extends
UtilitiesTestBase {
private static String ORIGINAL_SCHEMA =
"{\"name\":\"t3_biz_operation_t_driver\",\"type\":\"record\",\"fields\":[{\"name\":\"ums_id_\",\"type\":[\"null\",\"string\"],\"default\":null},"
+
"{\"name\":\"ums_ts_\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
- private static String RESULT_SCHEMA =
"{\"type\":\"record\",\"name\":\"hoodie_source\",\"namespace\":\"hoodie.source\",\"fields\":[{\"name\":\"ums_id_\",\"type\":[\"string\",\"null\"]},"
- +
"{\"name\":\"ums_ts_\",\"type\":[\"string\",\"null\"]}]}";
+ private static String RESULT_SCHEMA =
"{\"type\":\"record\",\"name\":\"hoodie_source\",\"namespace\":\"hoodie.source\",\"fields\":[{\"name\":\"ums_id_\",\"type\":[\"null\",\"string\"],"
+ +
"\"default\":null},{\"name\":\"ums_ts_\",\"type\":[\"null\",\"string\"],\"default\":null}]}";
@Test
public void testPostProcessor() throws IOException {
diff --git
a/hudi-utilities/src/test/resources/delta-streamer-config/source-jdbc.avsc
b/hudi-utilities/src/test/resources/delta-streamer-config/source-jdbc.avsc
index cb8697d..e7943b0 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/source-jdbc.avsc
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/source-jdbc.avsc
@@ -26,34 +26,42 @@
},
{
"name": "TIMESTAMP",
- "type": ["double", "null"]
+ "type": ["null", "double"],
+ "default": null
},
{
"name": "RIDER",
- "type": ["string", "null"]
+ "type": ["null", "string"],
+ "default": null
},
{
"name": "DRIVER",
- "type": ["string", "null"]
+ "type": ["null" ,"string"],
+ "default": null
},
{
"name": "BEGIN_LAT",
- "type": ["double", "null"]
+ "type": ["null", "double"],
+ "default": null
},
{
"name": "BEGIN_LON",
- "type": ["double", "null"]
+ "type": ["null", "double"],
+ "default": null
},
{
"name": "END_LAT",
- "type": ["double", "null"]
+ "type": ["null", "double"],
+ "default": null
},
{
"name": "END_LON",
- "type": ["double", "null"]
+ "type": ["null", "double"],
+ "default": null
},
{
"name": "FARE",
- "type": ["double", "null"]
+ "type": ["null", "double"],
+ "default": null
} ]
}
\ No newline at end of file