This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new d0ee95e [HUDI-552] Fix the schema mismatch in Row-to-Avro conversion (#1246) d0ee95e is described below commit d0ee95ed16de6c3568f575169cb993b9c10ced3d Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Sat Jan 18 16:40:56 2020 -0800 [HUDI-552] Fix the schema mismatch in Row-to-Avro conversion (#1246) --- .../org/apache/hudi/AvroConversionUtils.scala | 8 ++- .../hudi/utilities/deltastreamer/DeltaSync.java | 15 ++++- .../deltastreamer/SourceFormatAdapter.java | 13 +++- .../hudi/utilities/TestHoodieDeltaStreamer.java | 76 +++++++++++++++++++++- 4 files changed, 104 insertions(+), 8 deletions(-) diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index a27d0ee..16c2d75 100644 --- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -31,12 +31,14 @@ object AvroConversionUtils { def createRdd(df: DataFrame, structName: String, recordNamespace: String): RDD[GenericRecord] = { val avroSchema = convertStructTypeToAvroSchema(df.schema, structName, recordNamespace) - createRdd(df, avroSchema.toString, structName, recordNamespace) + createRdd(df, avroSchema, structName, recordNamespace) } - def createRdd(df: DataFrame, avroSchemaAsJsonString: String, structName: String, recordNamespace: String) + def createRdd(df: DataFrame, avroSchema: Schema, structName: String, recordNamespace: String) : RDD[GenericRecord] = { - val dataType = df.schema + // Use the Avro schema to derive the StructType which has the correct nullability information + val dataType = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType] + val avroSchemaAsJsonString = avroSchema.toString val encoder = RowEncoder.apply(dataType).resolveAndBind() df.queryExecution.toRdd.map(encoder.fromRow) .mapPartitions { records => diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index 5f3259a..2dd4138 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -290,8 +290,19 @@ public class DeltaSync implements Serializable { Option<Dataset<Row>> transformed = dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, sparkSession, data, props)); checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch(); - avroRDDOptional = transformed - .map(t -> AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()); + if (this.schemaProvider != null && this.schemaProvider.getTargetSchema() != null) { + // If the target schema is specified through Avro schema, + // pass in the schema for the Row-to-Avro conversion + // to avoid nullability mismatch between Avro schema and Row schema + avroRDDOptional = transformed + .map(t -> AvroConversionUtils.createRdd( + t, this.schemaProvider.getTargetSchema(), + HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()); + } else { + avroRDDOptional = transformed + .map(t -> AvroConversionUtils.createRdd( + t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()); + } // Use Transformed Row's schema if not overridden // Use Transformed Row's schema if not overridden. If target schema is not specified diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java index dd266ed..9c0be88 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities.deltastreamer; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; import org.apache.hudi.utilities.sources.AvroSource; import org.apache.hudi.utilities.sources.InputBatch; import org.apache.hudi.utilities.sources.JsonSource; @@ -64,7 +65,17 @@ public final class SourceFormatAdapter { case ROW: { InputBatch<Dataset<Row>> r = ((RowSource) source).fetchNext(lastCkptStr, sourceLimit); return new InputBatch<>(Option.ofNullable(r.getBatch().map( - rdd -> (AvroConversionUtils.createRdd(rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD())) + rdd -> ( + (r.getSchemaProvider() instanceof FilebasedSchemaProvider) + // If the source schema is specified through Avro schema, + // pass in the schema for the Row-to-Avro conversion + // to avoid nullability mismatch between Avro schema and Row schema + ? AvroConversionUtils.createRdd( + rdd, r.getSchemaProvider().getSourceSchema(), + HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD() + : AvroConversionUtils.createRdd( + rdd, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD() + )) .orElse(null)), r.getCheckpointForNextBatch(), r.getSchemaProvider()); } default: diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java index 641c47b..cbf9db9 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities; import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; @@ -44,6 +45,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider; import org.apache.hudi.utilities.sources.DistributedTestDataSource; import org.apache.hudi.utilities.sources.HoodieIncrSource; import org.apache.hudi.utilities.sources.InputBatch; +import org.apache.hudi.utilities.sources.ParquetDFSSource; import org.apache.hudi.utilities.sources.TestDataSource; import org.apache.hudi.utilities.sources.config.TestSourceConfig; import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer; @@ -96,8 +98,13 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { private static final Random RANDOM = new Random(); private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties"; private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties"; + private static final String PROPS_FILENAME_TEST_PARQUET = "test-parquet-dfs-source.properties"; + private static final String PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles"; + private static final int PARQUET_NUM_RECORDS = 5; private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class); + private static int parquetTestNum = 1; + @BeforeClass public static void initClass() throws Exception { UtilitiesTestBase.initClass(true); @@ -146,6 +153,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/target.avsc"); UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_INVALID); + + prepareParquetDFSFiles(PARQUET_NUM_RECORDS); } @AfterClass @@ -186,17 +195,24 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, boolean updatePayloadClass, String payloadClassName, String tableType) { + return makeConfig(basePath, op, TestDataSource.class.getName(), transformerClassName, propsFilename, enableHiveSync, + useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, tableType); + } + + static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String sourceClassName, + String transformerClassName, String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass, + int sourceLimit, boolean updatePayloadClass, String payloadClassName, String tableType) { HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config(); cfg.targetBasePath = basePath; cfg.targetTableName = "hoodie_trips"; cfg.tableType = tableType == null ? "COPY_ON_WRITE" : tableType; - cfg.sourceClassName = TestDataSource.class.getName(); + cfg.sourceClassName = sourceClassName; cfg.transformerClassName = transformerClassName; cfg.operation = op; cfg.enableHiveSync = enableHiveSync; cfg.sourceOrderingField = "timestamp"; cfg.propsFilePath = dfsBasePath + "/" + propsFilename; - cfg.sourceLimit = 1000; + cfg.sourceLimit = sourceLimit; if (updatePayloadClass) { cfg.payloadClassName = payloadClassName; } @@ -620,6 +636,62 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase { Assert.assertEquals(1000, c); } + private static void prepareParquetDFSFiles(int numRecords) throws IOException { + String path = PARQUET_SOURCE_ROOT + "/1.parquet"; + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + Helpers.saveParquetToDFS(Helpers.toGenericRecords( + dataGenerator.generateInserts("000", numRecords), dataGenerator), new Path(path)); + } + + private void prepareParquetDFSSource(boolean useSchemaProvider, boolean hasTransformer) throws IOException { + // Properties used for testing delta-streamer with Parquet source + TypedProperties parquetProps = new TypedProperties(); + parquetProps.setProperty("include", "base.properties"); + parquetProps.setProperty("hoodie.datasource.write.recordkey.field", "_row_key"); + parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", "not_there"); + if (useSchemaProvider) { + parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc"); + if (hasTransformer) { + parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/target.avsc"); + } + } + parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", PARQUET_SOURCE_ROOT); + + UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" + PROPS_FILENAME_TEST_PARQUET); + } + + private void testParquetDFSSource(boolean useSchemaProvider, String transformerClassName) throws Exception { + prepareParquetDFSSource(useSchemaProvider, transformerClassName != null); + String tableBasePath = dfsBasePath + "/test_parquet_table" + parquetTestNum; + HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer( + TestHelpers.makeConfig(tableBasePath, Operation.INSERT, ParquetDFSSource.class.getName(), + transformerClassName, PROPS_FILENAME_TEST_PARQUET, false, + useSchemaProvider, 100000, false, null, null), jsc); + deltaStreamer.sync(); + TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + "/*/*.parquet", sqlContext); + parquetTestNum++; + } + + @Test + public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() throws Exception { + testParquetDFSSource(false, null); + } + + @Test + public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws Exception { + testParquetDFSSource(false, TripsWithDistanceTransformer.class.getName()); + } + + @Test + public void testParquetDFSSourceWithSourceSchemaFileAndNoTransformer() throws Exception { + testParquetDFSSource(true, null); + } + + @Test + public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws Exception { + testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName()); + } + /** * UDF to calculate Haversine distance. */