This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e0ab89b  [HUDI-223] Adding a way to infer target schema from the 
dataset after the transformation (#854)
e0ab89b is described below

commit e0ab89b3ac22207ff45cf3cae782d64b8be01bf1
Author: Alexander Filipchik <[email protected]>
AuthorDate: Wed Aug 28 04:48:38 2019 -0700

    [HUDI-223] Adding a way to infer target schema from the dataset after the 
transformation (#854)
    
    - Adding a way to decouple target and source schema providers
    - Adding flattening transformer
---
 .../hudi/utilities/deltastreamer/DeltaSync.java    | 20 ++++--
 .../schema/NullTargetSchemaRegistryProvider.java   | 40 +++++++++++
 .../utilities/transform/FlatteningTransformer.java | 83 ++++++++++++++++++++++
 .../hudi/utilities/TestFlatteningTransformer.java  | 56 +++++++++++++++
 4 files changed, 194 insertions(+), 5 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 075e1c9..b093010 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -24,7 +24,7 @@ import static 
org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_REC
 import com.codahale.metrics.Timer;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.function.Function;
@@ -282,9 +282,14 @@ public class DeltaSync implements Serializable {
           AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE).toJavaRDD()
       );
       // Use Transformed Row's schema if not overridden
+      // Use Transformed Row's schema if not overridden. If target schema is 
not specified
+      // default to RowBasedSchemaProvider
       schemaProvider =
-          this.schemaProvider == null ? transformed.map(r -> (SchemaProvider) 
new RowBasedSchemaProvider(r.schema()))
-              .orElse(dataAndCheckpoint.getSchemaProvider()) : 
this.schemaProvider;
+          this.schemaProvider == null || this.schemaProvider.getTargetSchema() 
== null
+              ? transformed
+              .map(r -> (SchemaProvider) new 
RowBasedSchemaProvider(r.schema()))
+              .orElse(dataAndCheckpoint.getSchemaProvider())
+              : this.schemaProvider;
     } else {
       // Pull the data from the source & prepare the write
       InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
@@ -472,7 +477,7 @@ public class DeltaSync implements Serializable {
             .forTable(cfg.targetTableName)
             
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
             .withAutoCommit(false);
-    if (null != schemaProvider) {
+    if (null != schemaProvider && null != schemaProvider.getTargetSchema()) {
       builder = 
builder.withSchema(schemaProvider.getTargetSchema().toString());
     }
 
@@ -487,7 +492,12 @@ public class DeltaSync implements Serializable {
   private void registerAvroSchemas(SchemaProvider schemaProvider) {
     // register the schemas, so that shuffle does not serialize the full 
schemas
     if (null != schemaProvider) {
-      List<Schema> schemas = Arrays.asList(schemaProvider.getSourceSchema(), 
schemaProvider.getTargetSchema());
+      List<Schema> schemas = new ArrayList<>();
+      schemas.add(schemaProvider.getSourceSchema());
+      if (schemaProvider.getTargetSchema() != null) {
+        schemas.add(schemaProvider.getTargetSchema());
+      }
+
       log.info("Registering Schema :" + schemas);
       
jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/NullTargetSchemaRegistryProvider.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/NullTargetSchemaRegistryProvider.java
new file mode 100644
index 0000000..109b499
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/NullTargetSchemaRegistryProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.common.util.TypedProperties;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Schema provider that will force DeltaStreamer to infer target schema from 
the dataset.
+ * It can be used with SQL or Flattening transformers to avoid having a target 
schema in the schema
+ * registry.
+ */
+public class NullTargetSchemaRegistryProvider extends SchemaRegistryProvider {
+
+  public NullTargetSchemaRegistryProvider(TypedProperties props, 
JavaSparkContext jssc) {
+    super(props, jssc);
+  }
+
+  @Override
+  public Schema getTargetSchema() {
+    return null;
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
new file mode 100644
index 0000000..d029f6c
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.transform;
+
+import java.util.UUID;
+import org.apache.hudi.common.util.TypedProperties;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * Transformer that can flatten nested objects. It currently doesn't unnest 
arrays.
+ */
+public class FlatteningTransformer implements Transformer {
+
+  private static final String TMP_TABLE = "HUDI_SRC_TMP_TABLE_";
+  private static volatile Logger log = 
LogManager.getLogger(SqlQueryBasedTransformer.class);
+
+  /** Configs supported */
+  @Override
+  public Dataset<Row> apply(
+      JavaSparkContext jsc,
+      SparkSession sparkSession,
+      Dataset<Row> rowDataset,
+      TypedProperties properties) {
+
+    // tmp table name doesn't like dashes
+    String tmpTable = 
TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
+    log.info("Registering tmp table : " + tmpTable);
+    rowDataset.registerTempTable(tmpTable);
+    return sparkSession.sql("select " + flattenSchema(rowDataset.schema(), 
null)
+        + " from " + tmpTable);
+  }
+
+  public String flattenSchema(StructType schema, String prefix) {
+    final StringBuilder selectSQLQuery = new StringBuilder();
+
+    for (StructField field : schema.fields()) {
+      final String fieldName = field.name();
+
+      // it is also possible to expand arrays by using Spark "expand" function.
+      // As it can increase data size significantly we later pass additional 
property with a
+      // list of arrays to expand.
+      final String colName = prefix == null ? fieldName : (prefix + "." + 
fieldName);
+      if (field.dataType().getClass().equals(StructType.class)) {
+        selectSQLQuery.append(flattenSchema((StructType) field.dataType(), 
colName));
+      } else {
+        selectSQLQuery.append(colName);
+        selectSQLQuery.append(" as ");
+        selectSQLQuery.append(colName.replace(".", "_"));
+      }
+
+      selectSQLQuery.append(",");
+    }
+
+    if (selectSQLQuery.length() > 0) {
+      selectSQLQuery. deleteCharAt(selectSQLQuery.length() - 1);
+    }
+
+    return selectSQLQuery.toString();
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestFlatteningTransformer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestFlatteningTransformer.java
new file mode 100644
index 0000000..c5a2ab0
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestFlatteningTransformer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hudi.utilities.transform.FlatteningTransformer;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.Test;
+
+public class TestFlatteningTransformer {
+
+  @Test
+  public void testFlatten() {
+    FlatteningTransformer transformer = new FlatteningTransformer();
+
+    // Init
+    StructField[] nestedStructFields = new StructField[]{
+        new StructField("nestedIntColumn", DataTypes.IntegerType, true, 
Metadata.empty()),
+        new StructField("nestedStringColumn", DataTypes.StringType, true, 
Metadata.empty()),
+    };
+
+    StructField[] structFields = new StructField[]{
+        new StructField("intColumn", DataTypes.IntegerType, true, 
Metadata.empty()),
+        new StructField("stringColumn", DataTypes.StringType, true, 
Metadata.empty()),
+        new StructField("nestedStruct", 
DataTypes.createStructType(nestedStructFields), true, Metadata.empty())
+    };
+
+    StructType schema = new StructType(structFields);
+    String flattenedSql = transformer.flattenSchema(schema, null);
+
+    assertEquals("intColumn as intColumn,stringColumn as stringColumn,"
+            + "nestedStruct.nestedIntColumn as nestedStruct_nestedIntColumn,"
+            + "nestedStruct.nestedStringColumn as 
nestedStruct_nestedStringColumn",
+        flattenedSql);
+  }
+}

Reply via email to