This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cecfbbddffd [HUDI-8116] Coalesce Row Source Aliases with Schema Fields 
in S3/GCS (#11817)
cecfbbddffd is described below

commit cecfbbddffd2d7bdf105523412aa7c84127f7cfd
Author: vamsikarnika <[email protected]>
AuthorDate: Mon Sep 16 17:57:21 2024 +0530

    [HUDI-8116] Coalesce Row Source Aliases with Schema Fields in S3/GCS 
(#11817)
    
    Co-authored-by: Vamsi <[email protected]>
---
 .../hudi/utilities/config/CloudSourceConfig.java   |   7 +
 .../helpers/CloudObjectsSelectorCommon.java        | 220 ++++++++++++++++++++-
 .../helpers/TestCloudObjectsSelectorCommon.java    |  80 ++++++++
 .../src/test/resources/data/nested_data_1.json     |   1 +
 .../src/test/resources/data/nested_data_2.json     |   1 +
 .../src/test/resources/data/nested_data_3.json     |   1 +
 .../partitioned/country=US/state=TX/old_data.json  |   1 +
 .../test/resources/schema/nested_data_schema.avsc  |  81 ++++++++
 .../test/resources/schema/sample_data_schema.avsc  |   3 +-
 9 files changed, 393 insertions(+), 2 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
index e3bdca1a395..d94d26ace5e 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
@@ -155,4 +155,11 @@ public class CloudSourceConfig extends HoodieConfig {
       .withDocumentation("Max time in secs to consume " + 
MAX_NUM_MESSAGES_PER_SYNC.key() + " messages from cloud queue. Cloud event 
queues like SQS, "
           + "PubSub can return empty responses even when messages are 
available the queue, this config ensures we don't wait forever "
           + "to consume MAX_MESSAGES_CONF messages, but time out and move on 
further.");
+
+  public static final ConfigProperty<Boolean> 
SPARK_DATASOURCE_READER_COALESCE_ALIAS_COLUMNS = ConfigProperty
+      .key(STREAMER_CONFIG_PREFIX + 
"source.cloud.data.reader.coalesce.aliases")
+      .defaultValue(true)
+      .markAdvanced()
+      .sinceVersion("1.0.0")
+      .withDocumentation("Boolean value to allow coalesce alias columns with 
actual columns while reading from source");
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index 8aee9d92754..6fc24c22144 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -39,11 +39,17 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.DataFrameReader;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,12 +57,18 @@ import java.io.IOException;
 import java.net.URLDecoder;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
 import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
+import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
 import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION;
 import static 
org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_PREFIX;
@@ -257,12 +269,20 @@ public class CloudObjectsSelectorCommon {
     }
     DataFrameReader reader = spark.read().format(fileFormat);
     String datasourceOpts = getStringWithAltKeys(properties, 
CloudSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
+
+    StructType rowSchema = null;
     if (schemaProviderOption.isPresent()) {
       Schema sourceSchema = schemaProviderOption.get().getSourceSchema();
       if (sourceSchema != null && 
!sourceSchema.equals(InputBatch.NULL_SCHEMA)) {
-        reader = 
reader.schema(AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema));
+        rowSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
+        if (isCoalesceRequired(properties, sourceSchema)) {
+          reader = reader.schema(addAliasesToRowSchema(sourceSchema, 
rowSchema));
+        } else {
+          reader = reader.schema(rowSchema);
+        }
       }
     }
+
     if (StringUtils.isNullOrEmpty(datasourceOpts)) {
       // fall back to legacy config for BWC. TODO consolidate in HUDI-6020
       datasourceOpts = getStringWithAltKeys(properties, 
S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
@@ -291,6 +311,13 @@ public class CloudObjectsSelectorCommon {
       dataset = reader.load(paths.toArray(new 
String[cloudObjectMetadata.size()]));
     }
 
+    if (schemaProviderOption.isPresent()) {
+      Schema sourceSchema = schemaProviderOption.get().getSourceSchema();
+      if (isCoalesceRequired(properties, sourceSchema)) {
+        dataset = spark.createDataFrame(coalesceAliasFields(dataset, 
sourceSchema).rdd(), rowSchema);
+      }
+    }
+
     // add partition column from source path if configured
     if (containsConfigProperty(properties, PATH_BASED_PARTITION_FIELDS)) {
       String[] partitionKeysToAdd = getStringWithAltKeys(properties, 
PATH_BASED_PARTITION_FIELDS).split(",");
@@ -316,6 +343,197 @@ public class CloudObjectsSelectorCommon {
     return dataset;
   }
 
+  private static boolean isCoalesceRequired(TypedProperties properties, Schema 
sourceSchema) {
+    return getBooleanWithAltKeys(properties, 
CloudSourceConfig.SPARK_DATASOURCE_READER_COALESCE_ALIAS_COLUMNS)
+        && Objects.nonNull(sourceSchema)
+        && hasFieldWithAliases(sourceSchema);
+  }
+
+  /**
+   * Recursively checks if an Avro schema or any of its nested fields contain 
aliases.
+   *
+   * @param schema The Avro schema to check.
+   * @return True if the schema or any of its fields contain aliases, false 
otherwise.
+   */
+  private static boolean hasFieldWithAliases(Schema schema) {
+    // If the schema is a record, check its fields recursively
+    if (isNestedRecord(schema)) {
+      for (Schema.Field field : getRecordFields(schema)) {
+        // Check if the field has aliases
+        if (!field.aliases().isEmpty()) {
+          return true;
+        }
+        // Recursively check the field's schema for aliases
+        if (hasFieldWithAliases(field.schema())) {
+          return true;
+        }
+      }
+    }
+    // No aliases found
+    return false;
+  }
+
+  private static StructType addAliasesToRowSchema(Schema avroSchema, 
StructType rowSchema) {
+    Map<String, StructField> rowFieldsMap = Arrays.stream(rowSchema.fields())
+        .collect(Collectors.toMap(StructField::name, Function.identity()));
+
+    StructField[] modifiedFields = getRecordFields(avroSchema).stream()
+        .flatMap(avroField -> generateRowFieldsWithAliases(avroField, 
rowFieldsMap.get(avroField.name())).stream())
+        .toArray(StructField[]::new);
+
+    return new StructType(modifiedFields);
+  }
+
+  private static List<Schema.Field> getRecordFields(Schema schema) {
+    if (schema.getType() == Schema.Type.RECORD) {
+      return schema.getFields();
+    }
+
+    if (schema.getType() == Schema.Type.UNION) {
+      return schema.getTypes().stream()
+          .filter(subSchema -> subSchema.getType() == Schema.Type.RECORD)
+          .findFirst()
+          .map(Schema::getFields)
+          .orElse(Collections.emptyList());
+    }
+
+    return Collections.emptyList();
+  }
+
+  /**
+   * Generates a list of StructFields with aliases applied based on the 
provided Avro field schema.
+   * <p>
+   * This method processes a given Avro field and its corresponding Spark SQL 
StructField, handling
+   * nested records and aliases. If the Avro field contains nested records, 
the method recursively
+   * updates the schema for these records and applies any aliases defined in 
the Avro schema.
+   * If the Avro field has aliases, they are added as new fields with nullable 
set to true and
+   * appropriate metadata in the returned list. If no aliases or nesting are 
present, the original
+   * StructField is returned unchanged.
+   *
+   * @param avroField The Avro field schema to process.
+   * @param rowField  The corresponding Spark SQL StructField to map the Avro 
field to.
+   * @return A list of StructFields with aliases applied as per the Avro 
schema.
+   */
+  private static List<StructField> generateRowFieldsWithAliases(Schema.Field 
avroField, StructField rowField) {
+    List<StructField> fieldList = new ArrayList<>();
+
+    // Handle nested records
+    if (isNestedRecord(avroField.schema())) {
+      StructType updatedSchema = addAliasesToRowSchema(avroField.schema(), 
(StructType) rowField.dataType());
+
+      if (schemaModifiedOrHasAliases(avroField, updatedSchema, rowField)) {
+        // Add the original field with the updated schema and add aliases if 
present
+        addFieldWithAliases(fieldList, avroField.name(), updatedSchema, 
rowField.metadata(), avroField.aliases());
+      } else {
+        fieldList.add(rowField);
+      }
+    } else if (!avroField.aliases().isEmpty()) {
+      // If the field has aliases, add them to the schema
+      addFieldWithAliases(fieldList, avroField.name(), rowField.dataType(), 
rowField.metadata(), avroField.aliases());
+    } else {
+      // No aliases or nesting, return the original field
+      fieldList.add(rowField);
+    }
+    return fieldList;
+  }
+
+  private static void addFieldWithAliases(List<StructField> fieldList, String 
fieldName, DataType dataType, Metadata metadata, Set<String> aliases) {
+    fieldList.add(new StructField(fieldName, dataType, true, metadata));
+    aliases.forEach(alias -> fieldList.add(new StructField(alias, dataType, 
true, metadata)));
+  }
+
+  private static Dataset<Row> coalesceAliasFields(Dataset<Row> dataset, Schema 
sourceSchema) {
+    return coalesceNestedAliases(coalesceTopLevelAliases(dataset, 
sourceSchema), sourceSchema);
+  }
+
+  /**
+   * Merges top-level fields with their aliases in the dataset.
+   * <p>
+   * This method goes through the top-level fields in the Avro schema, and for 
any field that has aliases,
+   * it combines them in the dataset using a coalesce operation. This ensures 
that if a field is null,
+   * the value from its alias is used instead.
+   *
+   * @param dataset      The dataset to process.
+   * @param sourceSchema The Avro schema defining the fields and their aliases.
+   * @return A dataset with fields merged with their aliases.
+   */
+  private static Dataset<Row> coalesceTopLevelAliases(Dataset<Row> dataset, 
Schema sourceSchema) {
+    return getRecordFields(sourceSchema).stream()
+        .filter(field -> !field.aliases().isEmpty())
+        .reduce(dataset,
+            (ds, field) -> coalesceAndDropAliasFields(ds, field.name(), 
field.aliases()), (ds1, ds2) -> ds1);
+  }
+
+  private static Dataset<Row> coalesceAndDropAliasFields(Dataset<Row> dataset, 
String fieldName, Set<String> aliases) {
+    List<Column> columns = new ArrayList<>();
+    columns.add(dataset.col(fieldName));
+    aliases.forEach(alias -> columns.add(dataset.col(alias)));
+
+    return dataset.withColumn(fieldName, 
functions.coalesce(columns.toArray(new Column[0])))
+        .drop(aliases.toArray(new String[0]));
+  }
+
+  /**
+   * Merges nested fields with their aliases in the dataset.
+   * <p>
+   * This method iterates through the fields of the provided Avro schema and 
checks if they represent
+   * nested records. For each nested record, it verifies if there are any 
alias fields present. If
+   * aliases are found, the method generates a list of nested fields, 
coalescing them with their aliases,
+   * and creates a new column in the dataset with the merged data.
+   *
+   * @param dataset      The dataset to process.
+   * @param sourceSchema The Avro schema defining the structure and aliases of 
the data.
+   * @return A dataset with nested fields merged with their aliases.
+   */
+  private static Dataset<Row> coalesceNestedAliases(Dataset<Row> dataset, 
Schema sourceSchema) {
+    for (Schema.Field field : getRecordFields(sourceSchema)) {
+      // check if this is a nested record and contains an alias field within
+      if (isNestedRecord(field.schema()) && 
hasFieldWithAliases(field.schema())) {
+        dataset = dataset.withColumn(field.name(), 
functions.struct(getNestedFields("", field, dataset)));
+      }
+    }
+    return dataset;
+  }
+
+  private static Column[] getNestedFields(String parentField, Schema.Field 
field, Dataset<Row> dataset) {
+    return getRecordFields(field.schema()).stream()
+        .map(avroField -> {
+          List<Column> columns = new ArrayList<>();
+          String newParentField = getFullName(parentField, field.name());
+          if (isNestedRecord(avroField.schema())) {
+            // if field is nested, recursively fetch nested column
+            columns.add(functions.struct(getNestedFields(newParentField, 
avroField, dataset)));
+          } else {
+            columns.add(dataset.col(getFullName(newParentField, 
avroField.name())));
+          }
+          avroField.aliases().forEach(alias -> 
columns.add(dataset.col(getFullName(newParentField, alias))));
+          // if avro field contains aliases, coalesce the column with others 
matching the aliases otherwise return actual column
+          return avroField.aliases().isEmpty() ? columns.get(0)
+              : functions.coalesce(columns.toArray(new 
Column[0])).alias(avroField.name());
+        }).toArray(Column[]::new);
+  }
+
+  private static boolean isNestedRecord(Schema schema) {
+    if (schema.getType() == Schema.Type.RECORD) {
+      return true;
+    }
+
+    if (schema.getType() == Schema.Type.UNION) {
+      return schema.getTypes().stream()
+          .anyMatch(subSchema -> subSchema.getType() == Schema.Type.RECORD);
+    }
+
+    return false;
+  }
+
+  private static String getFullName(String namespace, String fieldName) {
+    return namespace.isEmpty() ? fieldName : namespace + "." + fieldName;
+  }
+
+  private static boolean schemaModifiedOrHasAliases(Schema.Field avroField, 
StructType modifiedNestedSchema, StructField rowField) {
+    return !modifiedNestedSchema.equals(rowField.dataType()) || 
!avroField.aliases().isEmpty();
+  }
+
   private static Option<String> getPropVal(TypedProperties props, 
ConfigProperty<String> configProperty) {
     String value = getStringWithAltKeys(props, configProperty, true);
     if (!StringUtils.isNullOrEmpty(value)) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
index 4b30bb14b57..2704dc132a2 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
@@ -18,19 +18,24 @@
 
 package org.apache.hudi.utilities.sources.helpers;
 
+import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
 import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
 
+import org.apache.avro.Schema;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.types.StructType;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -97,6 +102,24 @@ public class TestCloudObjectsSelectorCommon extends 
HoodieSparkClientTestHarness
     Assertions.assertEquals(Collections.singletonList(expected), 
result.get().collectAsList());
   }
 
+  @Test
+  void loadDatasetWithSchemaAndAliasFields() {
+    TypedProperties props = new TypedProperties();
+    
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc");
+    String schemaFilePath = 
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc").getPath();
+    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    props.put("hoodie.deltastreamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
+    
props.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", 
"country,state");
+    props.put("hoodie.streamer.source.cloud.data.reader.coalesce.aliases", 
"true");
+    CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new 
CloudObjectsSelectorCommon(props);
+    List<CloudObjectMetadata> input = Collections.singletonList(new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=TX/old_data.json",
 1));
+    Option<Dataset<Row>> result = 
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json", 
Option.of(new FilebasedSchemaProvider(props, jsc)), 1);
+    Assertions.assertTrue(result.isPresent());
+    Assertions.assertEquals(1, result.get().count());
+    Row expected = RowFactory.create("some data", "US", "TX");
+    Assertions.assertEquals(Collections.singletonList(expected), 
result.get().collectAsList());
+  }
+
   @Test
   public void loadDatasetWithSchemaAndRepartition() {
     TypedProperties props = new TypedProperties();
@@ -120,6 +143,63 @@ public class TestCloudObjectsSelectorCommon extends 
HoodieSparkClientTestHarness
     Assertions.assertEquals(new HashSet<>(expected), new HashSet<>(actual));
   }
 
+  @Test
+  void loadDatasetWithSchemaAndCoalesceAliases() {
+    TypedProperties props = new TypedProperties();
+    
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc");
+    String schemaFilePath = 
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc").getPath();
+    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    props.put("hoodie.deltastreamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
+    
props.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path", 
"country,state");
+    // Setting this config so that dataset repartition happens inside 
`loadAsDataset`
+    props.put("hoodie.streamer.source.cloud.data.partition.max.size", "1");
+    props.put("hoodie.streamer.source.cloud.data.reader.coalesce.aliases", 
"true");
+    List<CloudObjectMetadata> input = Arrays.asList(
+        new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
 1000),
+        new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=TX/old_data.json",
 1000),
+        new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=IND/state=TS/data.json",
 1000)
+    );
+    CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new 
CloudObjectsSelectorCommon(props);
+    Option<Dataset<Row>> result = 
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json", 
Option.of(new FilebasedSchemaProvider(props, jsc)), 30);
+    Assertions.assertTrue(result.isPresent());
+    List<Row> expected = Arrays.asList(RowFactory.create("some data", "US", 
"CA"), RowFactory.create("some data", "US", "TX"), RowFactory.create("some 
data", "IND", "TS"));
+    List<Row> actual = result.get().collectAsList();
+    Assertions.assertEquals(new HashSet<>(expected), new HashSet<>(actual));
+  }
+
+  @Test
+  void loadDatasetWithNestedSchemaAndCoalesceAliases() throws IOException {
+    TypedProperties props = new TypedProperties();
+    
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/nested_data_schema.avsc");
+    String schemaFilePath = 
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/nested_data_schema.avsc").getPath();
+    props.put("hoodie.deltastreamer.schemaprovider.source.schema.file", 
schemaFilePath);
+    props.put("hoodie.deltastreamer.schema.provider.class.name", 
FilebasedSchemaProvider.class.getName());
+    // Setting this config so that dataset repartition happens inside 
`loadAsDataset`
+    props.put("hoodie.streamer.source.cloud.data.partition.max.size", "1");
+    props.put("hoodie.streamer.source.cloud.data.reader.coalesce.aliases", 
"true");
+    List<CloudObjectMetadata> input = Arrays.asList(
+        new CloudObjectMetadata("src/test/resources/data/nested_data_1.json", 
1000),
+        new CloudObjectMetadata("src/test/resources/data/nested_data_2.json", 
1000),
+        new CloudObjectMetadata("src/test/resources/data/nested_data_3.json", 
1000)
+    );
+    CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new 
CloudObjectsSelectorCommon(props);
+    Option<Dataset<Row>> result = 
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json", 
Option.of(new FilebasedSchemaProvider(props, jsc)), 30);
+    Assertions.assertTrue(result.isPresent());
+    Row address1 = RowFactory.create("123 Main St", "Springfield", "12345", 
RowFactory.create("India", "IN"));
+    Row person1 = RowFactory.create("John", "Doe", RowFactory.create(1990, 5, 
15), address1);
+    Row address2 = RowFactory.create("456 Elm St", "Shelbyville", "67890", 
RowFactory.create("Spain", "SPN"));
+    Row person2 = RowFactory.create("Jane", "Smith", RowFactory.create(1992, 
9, 2), address2);
+    Row address3 = RowFactory.create("789 Maple Ave", "Paris", "98765", 
RowFactory.create("France", "FRA"));
+    Row person3 = RowFactory.create("John", "James", RowFactory.create(1985, 
6, 15), address3);
+    List<Row> expected = Arrays.asList(person1, person2, person3);
+    List<Row> actual = result.get().collectAsList();
+    Assertions.assertEquals(new HashSet<>(expected), new HashSet<>(actual));
+    Schema schema = new Schema.Parser().parse(new File(schemaFilePath));
+    StructType expectedSchema = 
AvroConversionUtils.convertAvroSchemaToStructType(schema);
+    // assert final output schema matches with the source schema
+    Assertions.assertEquals(expectedSchema, result.get().schema(), "output 
dataset schema should match source schema");
+  }
+
   @Test
   public void partitionKeyNotPresentInPath() {
     List<CloudObjectMetadata> input = Collections.singletonList(new 
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
 1));
diff --git a/hudi-utilities/src/test/resources/data/nested_data_1.json 
b/hudi-utilities/src/test/resources/data/nested_data_1.json
new file mode 100644
index 00000000000..16ccf142523
--- /dev/null
+++ b/hudi-utilities/src/test/resources/data/nested_data_1.json
@@ -0,0 +1 @@
+{"firstName":"John","lastName":"Doe","birthdate":{"year":1990,"month":5,"day":15},"address":{"street":"123
 Main 
St","city":"Springfield","zipCode":"12345","country":{"name":"India","code":"IN"}}}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/data/nested_data_2.json 
b/hudi-utilities/src/test/resources/data/nested_data_2.json
new file mode 100644
index 00000000000..983737fae40
--- /dev/null
+++ b/hudi-utilities/src/test/resources/data/nested_data_2.json
@@ -0,0 +1 @@
+{"firstName":"Jane","lastName":"Smith","dob":{"year":1992,"month":9,"day":2},"residence":{"street":"456
 Elm 
St","city":"Shelbyville","postalCode":"67890","country":{"name":"Spain","countryCode":"SPN"}}}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/data/nested_data_3.json 
b/hudi-utilities/src/test/resources/data/nested_data_3.json
new file mode 100644
index 00000000000..009471ceb67
--- /dev/null
+++ b/hudi-utilities/src/test/resources/data/nested_data_3.json
@@ -0,0 +1 @@
+{"firstName":"John","lastName":"James","dateofbirth":{"year":1985,"month":6,"day":15},"residence":{"street":"789
 Maple 
Ave","city":"Paris","postalCode":"98765","country":{"name":"France","countryCode":"FRA"}}}
\ No newline at end of file
diff --git 
a/hudi-utilities/src/test/resources/data/partitioned/country=US/state=TX/old_data.json
 
b/hudi-utilities/src/test/resources/data/partitioned/country=US/state=TX/old_data.json
new file mode 100644
index 00000000000..47007130b76
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/data/partitioned/country=US/state=TX/old_data.json
@@ -0,0 +1 @@
+{"old_data": "some data"}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/schema/nested_data_schema.avsc 
b/hudi-utilities/src/test/resources/schema/nested_data_schema.avsc
new file mode 100644
index 00000000000..972ca8c8f4a
--- /dev/null
+++ b/hudi-utilities/src/test/resources/schema/nested_data_schema.avsc
@@ -0,0 +1,81 @@
+{
+  "type": "record",
+  "name": "Person",
+  "namespace": "com.example",
+  "fields": [
+    {
+      "name": "firstName",
+      "type": "string"
+    },
+    {
+      "name": "lastName",
+      "type": ["null", "string"],
+      "aliases": ["surname"],
+      "default": null
+    },
+    {
+      "name": "birthdate",
+      "type": {
+        "type": "record",
+        "name": "Birthdate",
+        "fields": [
+          {
+            "name": "year",
+            "type": "int"
+          },
+          {
+            "name": "month",
+            "type": "int"
+          },
+          {
+            "name": "day",
+            "type": "int"
+          }
+        ]
+      },
+      "aliases": ["dob", "dateofbirth"]
+    },
+    {
+      "name": "address",
+      "type": {
+        "type": "record",
+        "name": "Address",
+        "fields": [
+          {
+            "name": "street",
+            "type": "string"
+          },
+          {
+            "name": "city",
+            "type": "string"
+          },
+          {
+            "name": "zipCode",
+            "type": "string",
+            "aliases": ["postalCode"]
+          },
+          {
+            "name": "country",
+            "type": ["null", {
+              "type": "record",
+              "name": "Country",
+              "fields": [
+                {
+                  "name": "name",
+                  "type": "string"
+                },
+                {
+                  "name": "code",
+                  "type": ["null", "string"],
+                  "aliases": ["countryCode"],
+                  "default": null
+                }
+              ]
+            }]
+          }
+        ]
+      },
+      "aliases": ["residence"]
+    }
+  ]
+}
diff --git a/hudi-utilities/src/test/resources/schema/sample_data_schema.avsc 
b/hudi-utilities/src/test/resources/schema/sample_data_schema.avsc
index 13cbcfff4be..be698496cee 100644
--- a/hudi-utilities/src/test/resources/schema/sample_data_schema.avsc
+++ b/hudi-utilities/src/test/resources/schema/sample_data_schema.avsc
@@ -21,7 +21,8 @@
   "fields": [
     {
       "name": "data",
-      "type": "string"
+      "type": "string",
+      "aliases": ["old_data"]
     }
   ]
 }

Reply via email to