This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d0ee95e  [HUDI-552] Fix the schema mismatch in Row-to-Avro conversion 
(#1246)
d0ee95e is described below

commit d0ee95ed16de6c3568f575169cb993b9c10ced3d
Author: Y Ethan Guo <ethan.guoyi...@gmail.com>
AuthorDate: Sat Jan 18 16:40:56 2020 -0800

    [HUDI-552] Fix the schema mismatch in Row-to-Avro conversion (#1246)
---
 .../org/apache/hudi/AvroConversionUtils.scala      |  8 ++-
 .../hudi/utilities/deltastreamer/DeltaSync.java    | 15 ++++-
 .../deltastreamer/SourceFormatAdapter.java         | 13 +++-
 .../hudi/utilities/TestHoodieDeltaStreamer.java    | 76 +++++++++++++++++++++-
 4 files changed, 104 insertions(+), 8 deletions(-)

diff --git 
a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index a27d0ee..16c2d75 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -31,12 +31,14 @@ object AvroConversionUtils {
 
   def createRdd(df: DataFrame, structName: String, recordNamespace: String): 
RDD[GenericRecord] = {
     val avroSchema = convertStructTypeToAvroSchema(df.schema, structName, 
recordNamespace)
-    createRdd(df, avroSchema.toString, structName, recordNamespace)
+    createRdd(df, avroSchema, structName, recordNamespace)
   }
 
-  def createRdd(df: DataFrame, avroSchemaAsJsonString: String, structName: 
String, recordNamespace: String)
+  def createRdd(df: DataFrame, avroSchema: Schema, structName: String, 
recordNamespace: String)
   : RDD[GenericRecord] = {
-    val dataType = df.schema
+    // Use the Avro schema to derive the StructType which has the correct 
nullability information
+    val dataType = 
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
+    val avroSchemaAsJsonString = avroSchema.toString
     val encoder = RowEncoder.apply(dataType).resolveAndBind()
     df.queryExecution.toRdd.map(encoder.fromRow)
       .mapPartitions { records =>
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 5f3259a..2dd4138 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -290,8 +290,19 @@ public class DeltaSync implements Serializable {
       Option<Dataset<Row>> transformed =
           dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, 
sparkSession, data, props));
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
-      avroRDDOptional = transformed
-          .map(t -> AvroConversionUtils.createRdd(t, 
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD());
+      if (this.schemaProvider != null && this.schemaProvider.getTargetSchema() 
!= null) {
+        // If the target schema is specified through Avro schema,
+        // pass in the schema for the Row-to-Avro conversion
+        // to avoid nullability mismatch between Avro schema and Row schema
+        avroRDDOptional = transformed
+            .map(t -> AvroConversionUtils.createRdd(
+                t, this.schemaProvider.getTargetSchema(),
+                HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE).toJavaRDD());
+      } else {
+        avroRDDOptional = transformed
+            .map(t -> AvroConversionUtils.createRdd(
+                t, HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE).toJavaRDD());
+      }
 
       // Use Transformed Row's schema if not overridden
       // Use Transformed Row's schema if not overridden. If target schema is 
not specified
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
index dd266ed..9c0be88 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utilities.deltastreamer;
 
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 import org.apache.hudi.utilities.sources.AvroSource;
 import org.apache.hudi.utilities.sources.InputBatch;
 import org.apache.hudi.utilities.sources.JsonSource;
@@ -64,7 +65,17 @@ public final class SourceFormatAdapter {
       case ROW: {
         InputBatch<Dataset<Row>> r = ((RowSource) 
source).fetchNext(lastCkptStr, sourceLimit);
         return new InputBatch<>(Option.ofNullable(r.getBatch().map(
-            rdd -> (AvroConversionUtils.createRdd(rdd, 
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toJavaRDD()))
+            rdd -> (
+                (r.getSchemaProvider() instanceof FilebasedSchemaProvider)
+                    // If the source schema is specified through Avro schema,
+                    // pass in the schema for the Row-to-Avro conversion
+                    // to avoid nullability mismatch between Avro schema and 
Row schema
+                    ? AvroConversionUtils.createRdd(
+                        rdd, r.getSchemaProvider().getSourceSchema(),
+                        HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE).toJavaRDD()
+                    : AvroConversionUtils.createRdd(
+                        rdd, HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE).toJavaRDD()
+                ))
             .orElse(null)), r.getCheckpointForNextBatch(), 
r.getSchemaProvider());
       }
       default:
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
index 641c47b..cbf9db9 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.utilities;
 
 import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.keygen.SimpleKeyGenerator;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -44,6 +45,7 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.DistributedTestDataSource;
 import org.apache.hudi.utilities.sources.HoodieIncrSource;
 import org.apache.hudi.utilities.sources.InputBatch;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
 import org.apache.hudi.utilities.sources.TestDataSource;
 import org.apache.hudi.utilities.sources.config.TestSourceConfig;
 import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
@@ -96,8 +98,13 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   private static final Random RANDOM = new Random();
   private static final String PROPS_FILENAME_TEST_SOURCE = 
"test-source.properties";
   private static final String PROPS_FILENAME_TEST_INVALID = 
"test-invalid.properties";
+  private static final String PROPS_FILENAME_TEST_PARQUET = 
"test-parquet-dfs-source.properties";
+  private static final String PARQUET_SOURCE_ROOT = dfsBasePath + 
"/parquetFiles";
+  private static final int PARQUET_NUM_RECORDS = 5;
   private static final Logger LOG = 
LogManager.getLogger(TestHoodieDeltaStreamer.class);
 
+  private static int parquetTestNum = 1;
+
   @BeforeClass
   public static void initClass() throws Exception {
     UtilitiesTestBase.initClass(true);
@@ -146,6 +153,8 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source.avsc");
     
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target.avsc");
     UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + 
"/" + PROPS_FILENAME_TEST_INVALID);
+
+    prepareParquetDFSFiles(PARQUET_NUM_RECORDS);
   }
 
   @AfterClass
@@ -186,17 +195,24 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation 
op, String transformerClassName,
         String propsFilename, boolean enableHiveSync, boolean 
useSchemaProviderClass, boolean updatePayloadClass,
                                                  String payloadClassName, 
String tableType) {
+      return makeConfig(basePath, op, TestDataSource.class.getName(), 
transformerClassName, propsFilename, enableHiveSync,
+          useSchemaProviderClass, 1000, updatePayloadClass, payloadClassName, 
tableType);
+    }
+
+    static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation 
op, String sourceClassName,
+        String transformerClassName, String propsFilename, boolean 
enableHiveSync, boolean useSchemaProviderClass,
+        int sourceLimit, boolean updatePayloadClass, String payloadClassName, 
String tableType) {
       HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
       cfg.targetBasePath = basePath;
       cfg.targetTableName = "hoodie_trips";
       cfg.tableType = tableType == null ? "COPY_ON_WRITE" : tableType;
-      cfg.sourceClassName = TestDataSource.class.getName();
+      cfg.sourceClassName = sourceClassName;
       cfg.transformerClassName = transformerClassName;
       cfg.operation = op;
       cfg.enableHiveSync = enableHiveSync;
       cfg.sourceOrderingField = "timestamp";
       cfg.propsFilePath = dfsBasePath + "/" + propsFilename;
-      cfg.sourceLimit = 1000;
+      cfg.sourceLimit = sourceLimit;
       if (updatePayloadClass) {
         cfg.payloadClassName = payloadClassName;
       }
@@ -620,6 +636,62 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     Assert.assertEquals(1000, c);
   }
 
+  private static void prepareParquetDFSFiles(int numRecords) throws 
IOException {
+    String path = PARQUET_SOURCE_ROOT + "/1.parquet";
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    Helpers.saveParquetToDFS(Helpers.toGenericRecords(
+        dataGenerator.generateInserts("000", numRecords), dataGenerator), new 
Path(path));
+  }
+
+  private void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer) throws IOException {
+    // Properties used for testing delta-streamer with Parquet source
+    TypedProperties parquetProps = new TypedProperties();
+    parquetProps.setProperty("include", "base.properties");
+    parquetProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
+    parquetProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
+    if (useSchemaProvider) {
+      
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source.avsc");
+      if (hasTransformer) {
+        
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/target.avsc");
+      }
+    }
+    parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root", 
PARQUET_SOURCE_ROOT);
+
+    UtilitiesTestBase.Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + 
"/" + PROPS_FILENAME_TEST_PARQUET);
+  }
+
+  private void testParquetDFSSource(boolean useSchemaProvider, String 
transformerClassName) throws Exception {
+    prepareParquetDFSSource(useSchemaProvider, transformerClassName != null);
+    String tableBasePath = dfsBasePath + "/test_parquet_table" + 
parquetTestNum;
+    HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+        TestHelpers.makeConfig(tableBasePath, Operation.INSERT, 
ParquetDFSSource.class.getName(),
+            transformerClassName, PROPS_FILENAME_TEST_PARQUET, false,
+            useSchemaProvider, 100000, false, null, null), jsc);
+    deltaStreamer.sync();
+    TestHelpers.assertRecordCount(PARQUET_NUM_RECORDS, tableBasePath + 
"/*/*.parquet", sqlContext);
+    parquetTestNum++;
+  }
+
+  @Test
+  public void testParquetDFSSourceWithoutSchemaProviderAndNoTransformer() 
throws Exception {
+    testParquetDFSSource(false, null);
+  }
+
+  @Test
+  public void testParquetDFSSourceWithoutSchemaProviderAndTransformer() throws 
Exception {
+    testParquetDFSSource(false, TripsWithDistanceTransformer.class.getName());
+  }
+
+  @Test
+  public void testParquetDFSSourceWithSourceSchemaFileAndNoTransformer() 
throws Exception {
+    testParquetDFSSource(true, null);
+  }
+
+  @Test
+  public void testParquetDFSSourceWithSchemaFilesAndTransformer() throws 
Exception {
+    testParquetDFSSource(true, TripsWithDistanceTransformer.class.getName());
+  }
+
   /**
    * UDF to calculate Haversine distance.
    */

Reply via email to