This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 25e0b75 [HUDI-723] Register avro schema if infered from SQL
transformation (#1518)
25e0b75 is described below
commit 25e0b75b3d03b6d460dc18d1a5fce7b881b0e019
Author: Alexander Filipchik <[email protected]>
AuthorDate: Fri May 15 12:44:03 2020 -0700
[HUDI-723] Register avro schema if infered from SQL transformation (#1518)
* Register avro schema if infered from SQL transformation
* Make HoodieWriteClient creation done lazily always. Handle setting
schema-provider and avro-schemas correctly when using SQL transformer
Co-authored-by: Alex Filipchik <[email protected]>
Co-authored-by: Balaji Varadarajan <[email protected]>
---
.../hudi/utilities/deltastreamer/DeltaSync.java | 39 +++++++++--------
.../utilities/schema/DelegatingSchemaProvider.java | 51 ++++++++++++++++++++++
2 files changed, 73 insertions(+), 17 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 210c948..fd051ed 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -43,6 +43,7 @@ import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
+import org.apache.hudi.utilities.schema.DelegatingSchemaProvider;
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
@@ -98,6 +99,11 @@ public class DeltaSync implements Serializable {
private transient SourceFormatAdapter formatAdapter;
/**
+ * User Provided Schema Provider.
+ */
+ private transient SchemaProvider userProvidedSchemaProvider;
+
+ /**
* Schema provider that supplies the command for reading the input and
writing out the target table.
*/
private transient SchemaProvider schemaProvider;
@@ -162,20 +168,18 @@ public class DeltaSync implements Serializable {
this.fs = fs;
this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
this.props = props;
- this.schemaProvider = schemaProvider;
+ this.userProvidedSchemaProvider = schemaProvider;
refreshTimeline();
+ // Register User Provided schema first
+ registerAvroSchemas(schemaProvider);
this.transformer =
UtilHelpers.createTransformer(cfg.transformerClassNames);
this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
this.formatAdapter = new SourceFormatAdapter(
UtilHelpers.createSource(cfg.sourceClassName, props, jssc,
sparkSession, schemaProvider));
-
this.conf = conf;
-
- // If schemaRegistry already resolved, setup write-client
- setupWriteClient();
}
/**
@@ -218,8 +222,7 @@ public class DeltaSync implements Serializable {
if (null != srcRecordsWithCkpt) {
// this is the first input batch. If schemaProvider not set, use it and
register Avro Schema and start
// compactor
- if (null == schemaProvider) {
- // Set the schemaProvider if not user-provided
+ if (null == writeClient) {
this.schemaProvider = srcRecordsWithCkpt.getKey();
// Setup HoodieWriteClient and compaction now that we decided on schema
setupWriteClient();
@@ -280,26 +283,28 @@ public class DeltaSync implements Serializable {
Option<Dataset<Row>> transformed =
dataAndCheckpoint.getBatch().map(data ->
transformer.get().apply(jssc, sparkSession, data, props));
checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
- if (this.schemaProvider != null && this.schemaProvider.getTargetSchema()
!= null) {
+ if (this.userProvidedSchemaProvider != null &&
this.userProvidedSchemaProvider.getTargetSchema() != null) {
// If the target schema is specified through Avro schema,
// pass in the schema for the Row-to-Avro conversion
// to avoid nullability mismatch between Avro schema and Row schema
avroRDDOptional = transformed
.map(t -> AvroConversionUtils.createRdd(
- t, this.schemaProvider.getTargetSchema(),
+ t, this.userProvidedSchemaProvider.getTargetSchema(),
HOODIE_RECORD_STRUCT_NAME,
HOODIE_RECORD_NAMESPACE).toJavaRDD());
+ schemaProvider = this.userProvidedSchemaProvider;
} else {
+ // Use Transformed Row's schema if not overridden. If target schema is
not specified
+ // default to RowBasedSchemaProvider
+ schemaProvider =
+ transformed
+ .map(r -> (SchemaProvider) new DelegatingSchemaProvider(props,
jssc,
+ dataAndCheckpoint.getSchemaProvider(),
+ new RowBasedSchemaProvider(r.schema())))
+ .orElse(dataAndCheckpoint.getSchemaProvider());
avroRDDOptional = transformed
.map(t -> AvroConversionUtils.createRdd(
t, HOODIE_RECORD_STRUCT_NAME,
HOODIE_RECORD_NAMESPACE).toJavaRDD());
}
-
- // Use Transformed Row's schema if not overridden. If target schema is
not specified
- // default to RowBasedSchemaProvider
- schemaProvider = this.schemaProvider == null ||
this.schemaProvider.getTargetSchema() == null
- ? transformed.map(r -> (SchemaProvider) new
RowBasedSchemaProvider(r.schema())).orElse(
- dataAndCheckpoint.getSchemaProvider())
- : this.schemaProvider;
} else {
// Pull the data from the source & prepare the write
InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
@@ -458,7 +463,7 @@ public class DeltaSync implements Serializable {
* SchemaProvider creation is a precursor to HoodieWriteClient and
AsyncCompactor creation. This method takes care of
* this constraint.
*/
- public void setupWriteClient() {
+ private void setupWriteClient() {
LOG.info("Setting up Hoodie Write Client");
if ((null != schemaProvider) && (null == writeClient)) {
registerAvroSchemas(schemaProvider);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java
new file mode 100644
index 0000000..43c64d0
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/DelegatingSchemaProvider.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.apache.avro.Schema;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * SchemaProvider which uses separate Schema Providers for source and target.
+ */
+public class DelegatingSchemaProvider extends SchemaProvider {
+
+ private final SchemaProvider sourceSchemaProvider;
+ private final SchemaProvider targetSchemaProvider;
+
+ public DelegatingSchemaProvider(TypedProperties props,
+ JavaSparkContext jssc,
+ SchemaProvider sourceSchemaProvider, SchemaProvider
targetSchemaProvider) {
+ super(props, jssc);
+ this.sourceSchemaProvider = sourceSchemaProvider;
+ this.targetSchemaProvider = targetSchemaProvider;
+ }
+
+ @Override
+ public Schema getSourceSchema() {
+ return sourceSchemaProvider.getSourceSchema();
+ }
+
+ @Override
+ public Schema getTargetSchema() {
+ return targetSchemaProvider.getTargetSchema();
+ }
+}