This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 25e0b75  [HUDI-723] Register avro schema if infered from SQL 
transformation (#1518)
25e0b75 is described below

commit 25e0b75b3d03b6d460dc18d1a5fce7b881b0e019
Author: Alexander Filipchik <[email protected]>
AuthorDate: Fri May 15 12:44:03 2020 -0700

    [HUDI-723] Register avro schema if infered from SQL transformation (#1518)
    
    * Register avro schema if infered from SQL transformation
    * Make HoodieWriteClient creation done lazily always. Handle setting 
schema-provider and avro-schemas correctly when using SQL transformer
    
    Co-authored-by: Alex Filipchik <[email protected]>
    Co-authored-by: Balaji Varadarajan <[email protected]>
---
 .../hudi/utilities/deltastreamer/DeltaSync.java    | 39 +++++++++--------
 .../utilities/schema/DelegatingSchemaProvider.java | 51 ++++++++++++++++++++++
 2 files changed, 73 insertions(+), 17 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 210c948..fd051ed 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -43,6 +43,7 @@ import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.utilities.UtilHelpers;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
 import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
+import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
 import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
 import org.apache.hudi.utilities.schema.SchemaProvider;
 import org.apache.hudi.utilities.sources.InputBatch;
@@ -98,6 +99,11 @@ public class DeltaSync implements Serializable {
   private transient SourceFormatAdapter formatAdapter;
 
   /**
+   * User Provided Schema Provider.
+   */
+  private transient SchemaProvider userProvidedSchemaProvider;
+
+  /**
    * Schema provider that supplies the command for reading the input and 
writing out the target table.
    */
   private transient SchemaProvider schemaProvider;
@@ -162,20 +168,18 @@ public class DeltaSync implements Serializable {
     this.fs = fs;
     this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
     this.props = props;
-    this.schemaProvider = schemaProvider;
+    this.userProvidedSchemaProvider = schemaProvider;
 
     refreshTimeline();
+    // Register User Provided schema first
+    registerAvroSchemas(schemaProvider);
 
     this.transformer = 
UtilHelpers.createTransformer(cfg.transformerClassNames);
     this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
 
     this.formatAdapter = new SourceFormatAdapter(
         UtilHelpers.createSource(cfg.sourceClassName, props, jssc, 
sparkSession, schemaProvider));
-
     this.conf = conf;
-
-    // If schemaRegistry already resolved, setup write-client
-    setupWriteClient();
   }
 
   /**
@@ -218,8 +222,7 @@ public class DeltaSync implements Serializable {
     if (null != srcRecordsWithCkpt) {
       // this is the first input batch. If schemaProvider not set, use it and 
register Avro Schema and start
       // compactor
-      if (null == schemaProvider) {
-        // Set the schemaProvider if not user-provided
+      if (null == writeClient) {
         this.schemaProvider = srcRecordsWithCkpt.getKey();
         // Setup HoodieWriteClient and compaction now that we decided on schema
         setupWriteClient();
@@ -280,26 +283,28 @@ public class DeltaSync implements Serializable {
       Option<Dataset<Row>> transformed =
           dataAndCheckpoint.getBatch().map(data -> 
transformer.get().apply(jssc, sparkSession, data, props));
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
-      if (this.schemaProvider != null && this.schemaProvider.getTargetSchema() 
!= null) {
+      if (this.userProvidedSchemaProvider != null && 
this.userProvidedSchemaProvider.getTargetSchema() != null) {
         // If the target schema is specified through Avro schema,
         // pass in the schema for the Row-to-Avro conversion
         // to avoid nullability mismatch between Avro schema and Row schema
         avroRDDOptional = transformed
             .map(t -> AvroConversionUtils.createRdd(
-                t, this.schemaProvider.getTargetSchema(),
+                t, this.userProvidedSchemaProvider.getTargetSchema(),
                 HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE).toJavaRDD());
+        schemaProvider = this.userProvidedSchemaProvider;
       } else {
+        // Use Transformed Row's schema if not overridden. If target schema is 
not specified
+        // default to RowBasedSchemaProvider
+        schemaProvider =
+            transformed
+                .map(r -> (SchemaProvider) new DelegatingSchemaProvider(props, 
jssc,
+                    dataAndCheckpoint.getSchemaProvider(),
+                    new RowBasedSchemaProvider(r.schema())))
+                .orElse(dataAndCheckpoint.getSchemaProvider());
         avroRDDOptional = transformed
             .map(t -> AvroConversionUtils.createRdd(
                 t, HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE).toJavaRDD());
       }
-
-      // Use Transformed Row's schema if not overridden. If target schema is 
not specified
-      // default to RowBasedSchemaProvider
-      schemaProvider = this.schemaProvider == null || 
this.schemaProvider.getTargetSchema() == null
-          ? transformed.map(r -> (SchemaProvider) new 
RowBasedSchemaProvider(r.schema())).orElse(
-          dataAndCheckpoint.getSchemaProvider())
-          : this.schemaProvider;
     } else {
       // Pull the data from the source & prepare the write
       InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
@@ -458,7 +463,7 @@ public class DeltaSync implements Serializable {
    * SchemaProvider creation is a precursor to HoodieWriteClient and 
AsyncCompactor creation. This method takes care of
    * this constraint.
    */
-  public void setupWriteClient() {
+  private void setupWriteClient() {
     LOG.info("Setting up Hoodie Write Client");
     if ((null != schemaProvider) && (null == writeClient)) {
       registerAvroSchemas(schemaProvider);
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java
new file mode 100644
index 0000000..43c64d0
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.apache.avro.Schema;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * SchemaProvider which uses separate Schema Providers for source and target.
+ */
+public class DelegatingSchemaProvider extends SchemaProvider {
+
+  private final SchemaProvider sourceSchemaProvider;
+  private final SchemaProvider targetSchemaProvider;
+
+  public DelegatingSchemaProvider(TypedProperties props,
+      JavaSparkContext jssc,
+      SchemaProvider sourceSchemaProvider, SchemaProvider 
targetSchemaProvider) {
+    super(props, jssc);
+    this.sourceSchemaProvider = sourceSchemaProvider;
+    this.targetSchemaProvider = targetSchemaProvider;
+  }
+
+  @Override
+  public Schema getSourceSchema() {
+    return sourceSchemaProvider.getSourceSchema();
+  }
+
+  @Override
+  public Schema getTargetSchema() {
+    return targetSchemaProvider.getTargetSchema();
+  }
+}

Reply via email to