This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cecfbbddffd [HUDI-8116] Coalesce Row Source Aliases with Schema Fields
in S3/GCS (#11817)
cecfbbddffd is described below
commit cecfbbddffd2d7bdf105523412aa7c84127f7cfd
Author: vamsikarnika <[email protected]>
AuthorDate: Mon Sep 16 17:57:21 2024 +0530
[HUDI-8116] Coalesce Row Source Aliases with Schema Fields in S3/GCS
(#11817)
Co-authored-by: Vamsi <[email protected]>
---
.../hudi/utilities/config/CloudSourceConfig.java | 7 +
.../helpers/CloudObjectsSelectorCommon.java | 220 ++++++++++++++++++++-
.../helpers/TestCloudObjectsSelectorCommon.java | 80 ++++++++
.../src/test/resources/data/nested_data_1.json | 1 +
.../src/test/resources/data/nested_data_2.json | 1 +
.../src/test/resources/data/nested_data_3.json | 1 +
.../partitioned/country=US/state=TX/old_data.json | 1 +
.../test/resources/schema/nested_data_schema.avsc | 81 ++++++++
.../test/resources/schema/sample_data_schema.avsc | 3 +-
9 files changed, 393 insertions(+), 2 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
index e3bdca1a395..d94d26ace5e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java
@@ -155,4 +155,11 @@ public class CloudSourceConfig extends HoodieConfig {
.withDocumentation("Max time in secs to consume " +
MAX_NUM_MESSAGES_PER_SYNC.key() + " messages from cloud queue. Cloud event
queues like SQS, "
+ "PubSub can return empty responses even when messages are
available the queue, this config ensures we don't wait forever "
+ "to consume MAX_MESSAGES_CONF messages, but time out and move on
further.");
+
+ public static final ConfigProperty<Boolean>
SPARK_DATASOURCE_READER_COALESCE_ALIAS_COLUMNS = ConfigProperty
+ .key(STREAMER_CONFIG_PREFIX +
"source.cloud.data.reader.coalesce.aliases")
+ .defaultValue(true)
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("Boolean value to allow coalesce alias columns with
actual columns while reading from source");
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index 8aee9d92754..6fc24c22144 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -39,11 +39,17 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,12 +57,18 @@ import java.io.IOException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
+import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static
org.apache.hudi.utilities.config.CloudSourceConfig.CLOUD_DATAFILE_EXTENSION;
import static
org.apache.hudi.utilities.config.CloudSourceConfig.IGNORE_RELATIVE_PATH_PREFIX;
@@ -257,12 +269,20 @@ public class CloudObjectsSelectorCommon {
}
DataFrameReader reader = spark.read().format(fileFormat);
String datasourceOpts = getStringWithAltKeys(properties,
CloudSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
+
+ StructType rowSchema = null;
if (schemaProviderOption.isPresent()) {
Schema sourceSchema = schemaProviderOption.get().getSourceSchema();
if (sourceSchema != null &&
!sourceSchema.equals(InputBatch.NULL_SCHEMA)) {
- reader =
reader.schema(AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema));
+ rowSchema =
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
+ if (isCoalesceRequired(properties, sourceSchema)) {
+ reader = reader.schema(addAliasesToRowSchema(sourceSchema,
rowSchema));
+ } else {
+ reader = reader.schema(rowSchema);
+ }
}
}
+
if (StringUtils.isNullOrEmpty(datasourceOpts)) {
// fall back to legacy config for BWC. TODO consolidate in HUDI-6020
datasourceOpts = getStringWithAltKeys(properties,
S3EventsHoodieIncrSourceConfig.SPARK_DATASOURCE_OPTIONS, true);
@@ -291,6 +311,13 @@ public class CloudObjectsSelectorCommon {
dataset = reader.load(paths.toArray(new
String[cloudObjectMetadata.size()]));
}
+ if (schemaProviderOption.isPresent()) {
+ Schema sourceSchema = schemaProviderOption.get().getSourceSchema();
+ if (isCoalesceRequired(properties, sourceSchema)) {
+ dataset = spark.createDataFrame(coalesceAliasFields(dataset,
sourceSchema).rdd(), rowSchema);
+ }
+ }
+
// add partition column from source path if configured
if (containsConfigProperty(properties, PATH_BASED_PARTITION_FIELDS)) {
String[] partitionKeysToAdd = getStringWithAltKeys(properties,
PATH_BASED_PARTITION_FIELDS).split(",");
@@ -316,6 +343,197 @@ public class CloudObjectsSelectorCommon {
return dataset;
}
+ private static boolean isCoalesceRequired(TypedProperties properties, Schema
sourceSchema) {
+ return getBooleanWithAltKeys(properties,
CloudSourceConfig.SPARK_DATASOURCE_READER_COALESCE_ALIAS_COLUMNS)
+ && Objects.nonNull(sourceSchema)
+ && hasFieldWithAliases(sourceSchema);
+ }
+
+ /**
+ * Recursively checks if an Avro schema or any of its nested fields contain
aliases.
+ *
+ * @param schema The Avro schema to check.
+ * @return True if the schema or any of its fields contain aliases, false
otherwise.
+ */
+ private static boolean hasFieldWithAliases(Schema schema) {
+ // If the schema is a record, check its fields recursively
+ if (isNestedRecord(schema)) {
+ for (Schema.Field field : getRecordFields(schema)) {
+ // Check if the field has aliases
+ if (!field.aliases().isEmpty()) {
+ return true;
+ }
+ // Recursively check the field's schema for aliases
+ if (hasFieldWithAliases(field.schema())) {
+ return true;
+ }
+ }
+ }
+ // No aliases found
+ return false;
+ }
+
+ private static StructType addAliasesToRowSchema(Schema avroSchema,
StructType rowSchema) {
+ Map<String, StructField> rowFieldsMap = Arrays.stream(rowSchema.fields())
+ .collect(Collectors.toMap(StructField::name, Function.identity()));
+
+ StructField[] modifiedFields = getRecordFields(avroSchema).stream()
+ .flatMap(avroField -> generateRowFieldsWithAliases(avroField,
rowFieldsMap.get(avroField.name())).stream())
+ .toArray(StructField[]::new);
+
+ return new StructType(modifiedFields);
+ }
+
+ private static List<Schema.Field> getRecordFields(Schema schema) {
+ if (schema.getType() == Schema.Type.RECORD) {
+ return schema.getFields();
+ }
+
+ if (schema.getType() == Schema.Type.UNION) {
+ return schema.getTypes().stream()
+ .filter(subSchema -> subSchema.getType() == Schema.Type.RECORD)
+ .findFirst()
+ .map(Schema::getFields)
+ .orElse(Collections.emptyList());
+ }
+
+ return Collections.emptyList();
+ }
+
+ /**
+ * Generates a list of StructFields with aliases applied based on the
provided Avro field schema.
+ * <p>
+ * This method processes a given Avro field and its corresponding Spark SQL
StructField, handling
+ * nested records and aliases. If the Avro field contains nested records,
the method recursively
+ * updates the schema for these records and applies any aliases defined in
the Avro schema.
+ * If the Avro field has aliases, they are added as new fields with nullable
set to true and
+ * appropriate metadata in the returned list. If no aliases or nesting are
present, the original
+ * StructField is returned unchanged.
+ *
+ * @param avroField The Avro field schema to process.
+ * @param rowField The corresponding Spark SQL StructField to map the Avro
field to.
+ * @return A list of StructFields with aliases applied as per the Avro
schema.
+ */
+ private static List<StructField> generateRowFieldsWithAliases(Schema.Field
avroField, StructField rowField) {
+ List<StructField> fieldList = new ArrayList<>();
+
+ // Handle nested records
+ if (isNestedRecord(avroField.schema())) {
+ StructType updatedSchema = addAliasesToRowSchema(avroField.schema(),
(StructType) rowField.dataType());
+
+ if (schemaModifiedOrHasAliases(avroField, updatedSchema, rowField)) {
+ // Add the original field with the updated schema and add aliases if
present
+ addFieldWithAliases(fieldList, avroField.name(), updatedSchema,
rowField.metadata(), avroField.aliases());
+ } else {
+ fieldList.add(rowField);
+ }
+ } else if (!avroField.aliases().isEmpty()) {
+ // If the field has aliases, add them to the schema
+ addFieldWithAliases(fieldList, avroField.name(), rowField.dataType(),
rowField.metadata(), avroField.aliases());
+ } else {
+ // No aliases or nesting, return the original field
+ fieldList.add(rowField);
+ }
+ return fieldList;
+ }
+
+ private static void addFieldWithAliases(List<StructField> fieldList, String
fieldName, DataType dataType, Metadata metadata, Set<String> aliases) {
+ fieldList.add(new StructField(fieldName, dataType, true, metadata));
+ aliases.forEach(alias -> fieldList.add(new StructField(alias, dataType,
true, metadata)));
+ }
+
+ private static Dataset<Row> coalesceAliasFields(Dataset<Row> dataset, Schema
sourceSchema) {
+ return coalesceNestedAliases(coalesceTopLevelAliases(dataset,
sourceSchema), sourceSchema);
+ }
+
+ /**
+ * Merges top-level fields with their aliases in the dataset.
+ * <p>
+ * This method goes through the top-level fields in the Avro schema, and for
any field that has aliases,
+ * it combines them in the dataset using a coalesce operation. This ensures
that if a field is null,
+ * the value from its alias is used instead.
+ *
+ * @param dataset The dataset to process.
+ * @param sourceSchema The Avro schema defining the fields and their aliases.
+ * @return A dataset with fields merged with their aliases.
+ */
+ private static Dataset<Row> coalesceTopLevelAliases(Dataset<Row> dataset,
Schema sourceSchema) {
+ return getRecordFields(sourceSchema).stream()
+ .filter(field -> !field.aliases().isEmpty())
+ .reduce(dataset,
+ (ds, field) -> coalesceAndDropAliasFields(ds, field.name(),
field.aliases()), (ds1, ds2) -> ds1);
+ }
+
+ private static Dataset<Row> coalesceAndDropAliasFields(Dataset<Row> dataset,
String fieldName, Set<String> aliases) {
+ List<Column> columns = new ArrayList<>();
+ columns.add(dataset.col(fieldName));
+ aliases.forEach(alias -> columns.add(dataset.col(alias)));
+
+ return dataset.withColumn(fieldName,
functions.coalesce(columns.toArray(new Column[0])))
+ .drop(aliases.toArray(new String[0]));
+ }
+
+ /**
+ * Merges nested fields with their aliases in the dataset.
+ * <p>
+ * This method iterates through the fields of the provided Avro schema and
checks if they represent
+ * nested records. For each nested record, it verifies if there are any
alias fields present. If
+ * aliases are found, the method generates a list of nested fields,
coalescing them with their aliases,
+ * and creates a new column in the dataset with the merged data.
+ *
+ * @param dataset The dataset to process.
+ * @param sourceSchema The Avro schema defining the structure and aliases of
the data.
+ * @return A dataset with nested fields merged with their aliases.
+ */
+ private static Dataset<Row> coalesceNestedAliases(Dataset<Row> dataset,
Schema sourceSchema) {
+ for (Schema.Field field : getRecordFields(sourceSchema)) {
+ // check if this is a nested record and contains an alias field within
+ if (isNestedRecord(field.schema()) &&
hasFieldWithAliases(field.schema())) {
+ dataset = dataset.withColumn(field.name(),
functions.struct(getNestedFields("", field, dataset)));
+ }
+ }
+ return dataset;
+ }
+
+ private static Column[] getNestedFields(String parentField, Schema.Field
field, Dataset<Row> dataset) {
+ return getRecordFields(field.schema()).stream()
+ .map(avroField -> {
+ List<Column> columns = new ArrayList<>();
+ String newParentField = getFullName(parentField, field.name());
+ if (isNestedRecord(avroField.schema())) {
+ // if field is nested, recursively fetch nested column
+ columns.add(functions.struct(getNestedFields(newParentField,
avroField, dataset)));
+ } else {
+ columns.add(dataset.col(getFullName(newParentField,
avroField.name())));
+ }
+ avroField.aliases().forEach(alias ->
columns.add(dataset.col(getFullName(newParentField, alias))));
+ // if avro field contains aliases, coalesce the column with others
matching the aliases otherwise return actual column
+ return avroField.aliases().isEmpty() ? columns.get(0)
+ : functions.coalesce(columns.toArray(new
Column[0])).alias(avroField.name());
+ }).toArray(Column[]::new);
+ }
+
+ private static boolean isNestedRecord(Schema schema) {
+ if (schema.getType() == Schema.Type.RECORD) {
+ return true;
+ }
+
+ if (schema.getType() == Schema.Type.UNION) {
+ return schema.getTypes().stream()
+ .anyMatch(subSchema -> subSchema.getType() == Schema.Type.RECORD);
+ }
+
+ return false;
+ }
+
+ private static String getFullName(String namespace, String fieldName) {
+ return namespace.isEmpty() ? fieldName : namespace + "." + fieldName;
+ }
+
+ private static boolean schemaModifiedOrHasAliases(Schema.Field avroField,
StructType modifiedNestedSchema, StructField rowField) {
+ return !modifiedNestedSchema.equals(rowField.dataType()) ||
!avroField.aliases().isEmpty();
+ }
+
private static Option<String> getPropVal(TypedProperties props,
ConfigProperty<String> configProperty) {
String value = getStringWithAltKeys(props, configProperty, true);
if (!StringUtils.isNullOrEmpty(value)) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
index 4b30bb14b57..2704dc132a2 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
@@ -18,19 +18,24 @@
package org.apache.hudi.utilities.sources.helpers;
+import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.avro.Schema;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.io.File;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@@ -97,6 +102,24 @@ public class TestCloudObjectsSelectorCommon extends
HoodieSparkClientTestHarness
Assertions.assertEquals(Collections.singletonList(expected),
result.get().collectAsList());
}
+ @Test
+ void loadDatasetWithSchemaAndAliasFields() {
+ TypedProperties props = new TypedProperties();
+
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc");
+ String schemaFilePath =
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc").getPath();
+ props.put("hoodie.deltastreamer.schemaprovider.source.schema.file",
schemaFilePath);
+ props.put("hoodie.deltastreamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
+
props.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path",
"country,state");
+ props.put("hoodie.streamer.source.cloud.data.reader.coalesce.aliases",
"true");
+ CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new
CloudObjectsSelectorCommon(props);
+ List<CloudObjectMetadata> input = Collections.singletonList(new
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=TX/old_data.json",
1));
+ Option<Dataset<Row>> result =
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json",
Option.of(new FilebasedSchemaProvider(props, jsc)), 1);
+ Assertions.assertTrue(result.isPresent());
+ Assertions.assertEquals(1, result.get().count());
+ Row expected = RowFactory.create("some data", "US", "TX");
+ Assertions.assertEquals(Collections.singletonList(expected),
result.get().collectAsList());
+ }
+
@Test
public void loadDatasetWithSchemaAndRepartition() {
TypedProperties props = new TypedProperties();
@@ -120,6 +143,63 @@ public class TestCloudObjectsSelectorCommon extends
HoodieSparkClientTestHarness
Assertions.assertEquals(new HashSet<>(expected), new HashSet<>(actual));
}
+ @Test
+ void loadDatasetWithSchemaAndCoalesceAliases() {
+ TypedProperties props = new TypedProperties();
+
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc");
+ String schemaFilePath =
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc").getPath();
+ props.put("hoodie.deltastreamer.schemaprovider.source.schema.file",
schemaFilePath);
+ props.put("hoodie.deltastreamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
+
props.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path",
"country,state");
+ // Setting this config so that dataset repartition happens inside
`loadAsDataset`
+ props.put("hoodie.streamer.source.cloud.data.partition.max.size", "1");
+ props.put("hoodie.streamer.source.cloud.data.reader.coalesce.aliases",
"true");
+ List<CloudObjectMetadata> input = Arrays.asList(
+ new
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
1000),
+ new
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=TX/old_data.json",
1000),
+ new
CloudObjectMetadata("src/test/resources/data/partitioned/country=IND/state=TS/data.json",
1000)
+ );
+ CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new
CloudObjectsSelectorCommon(props);
+ Option<Dataset<Row>> result =
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json",
Option.of(new FilebasedSchemaProvider(props, jsc)), 30);
+ Assertions.assertTrue(result.isPresent());
+ List<Row> expected = Arrays.asList(RowFactory.create("some data", "US",
"CA"), RowFactory.create("some data", "US", "TX"), RowFactory.create("some
data", "IND", "TS"));
+ List<Row> actual = result.get().collectAsList();
+ Assertions.assertEquals(new HashSet<>(expected), new HashSet<>(actual));
+ }
+
+ @Test
+ void loadDatasetWithNestedSchemaAndCoalesceAliases() throws IOException {
+ TypedProperties props = new TypedProperties();
+
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/nested_data_schema.avsc");
+ String schemaFilePath =
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/nested_data_schema.avsc").getPath();
+ props.put("hoodie.deltastreamer.schemaprovider.source.schema.file",
schemaFilePath);
+ props.put("hoodie.deltastreamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
+ // Setting this config so that dataset repartition happens inside
`loadAsDataset`
+ props.put("hoodie.streamer.source.cloud.data.partition.max.size", "1");
+ props.put("hoodie.streamer.source.cloud.data.reader.coalesce.aliases",
"true");
+ List<CloudObjectMetadata> input = Arrays.asList(
+ new CloudObjectMetadata("src/test/resources/data/nested_data_1.json",
1000),
+ new CloudObjectMetadata("src/test/resources/data/nested_data_2.json",
1000),
+ new CloudObjectMetadata("src/test/resources/data/nested_data_3.json",
1000)
+ );
+ CloudObjectsSelectorCommon cloudObjectsSelectorCommon = new
CloudObjectsSelectorCommon(props);
+ Option<Dataset<Row>> result =
cloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, "json",
Option.of(new FilebasedSchemaProvider(props, jsc)), 30);
+ Assertions.assertTrue(result.isPresent());
+ Row address1 = RowFactory.create("123 Main St", "Springfield", "12345",
RowFactory.create("India", "IN"));
+ Row person1 = RowFactory.create("John", "Doe", RowFactory.create(1990, 5,
15), address1);
+ Row address2 = RowFactory.create("456 Elm St", "Shelbyville", "67890",
RowFactory.create("Spain", "SPN"));
+ Row person2 = RowFactory.create("Jane", "Smith", RowFactory.create(1992,
9, 2), address2);
+ Row address3 = RowFactory.create("789 Maple Ave", "Paris", "98765",
RowFactory.create("France", "FRA"));
+ Row person3 = RowFactory.create("John", "James", RowFactory.create(1985,
6, 15), address3);
+ List<Row> expected = Arrays.asList(person1, person2, person3);
+ List<Row> actual = result.get().collectAsList();
+ Assertions.assertEquals(new HashSet<>(expected), new HashSet<>(actual));
+ Schema schema = new Schema.Parser().parse(new File(schemaFilePath));
+ StructType expectedSchema =
AvroConversionUtils.convertAvroSchemaToStructType(schema);
+ // assert final output schema matches with the source schema
+ Assertions.assertEquals(expectedSchema, result.get().schema(), "output
dataset schema should match source schema");
+ }
+
@Test
public void partitionKeyNotPresentInPath() {
List<CloudObjectMetadata> input = Collections.singletonList(new
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
1));
diff --git a/hudi-utilities/src/test/resources/data/nested_data_1.json
b/hudi-utilities/src/test/resources/data/nested_data_1.json
new file mode 100644
index 00000000000..16ccf142523
--- /dev/null
+++ b/hudi-utilities/src/test/resources/data/nested_data_1.json
@@ -0,0 +1 @@
+{"firstName":"John","lastName":"Doe","birthdate":{"year":1990,"month":5,"day":15},"address":{"street":"123
Main
St","city":"Springfield","zipCode":"12345","country":{"name":"India","code":"IN"}}}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/data/nested_data_2.json
b/hudi-utilities/src/test/resources/data/nested_data_2.json
new file mode 100644
index 00000000000..983737fae40
--- /dev/null
+++ b/hudi-utilities/src/test/resources/data/nested_data_2.json
@@ -0,0 +1 @@
+{"firstName":"Jane","lastName":"Smith","dob":{"year":1992,"month":9,"day":2},"residence":{"street":"456
Elm
St","city":"Shelbyville","postalCode":"67890","country":{"name":"Spain","countryCode":"SPN"}}}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/data/nested_data_3.json
b/hudi-utilities/src/test/resources/data/nested_data_3.json
new file mode 100644
index 00000000000..009471ceb67
--- /dev/null
+++ b/hudi-utilities/src/test/resources/data/nested_data_3.json
@@ -0,0 +1 @@
+{"firstName":"John","lastName":"James","dateofbirth":{"year":1985,"month":6,"day":15},"residence":{"street":"789
Maple
Ave","city":"Paris","postalCode":"98765","country":{"name":"France","countryCode":"FRA"}}}
\ No newline at end of file
diff --git
a/hudi-utilities/src/test/resources/data/partitioned/country=US/state=TX/old_data.json
b/hudi-utilities/src/test/resources/data/partitioned/country=US/state=TX/old_data.json
new file mode 100644
index 00000000000..47007130b76
--- /dev/null
+++
b/hudi-utilities/src/test/resources/data/partitioned/country=US/state=TX/old_data.json
@@ -0,0 +1 @@
+{"old_data": "some data"}
\ No newline at end of file
diff --git a/hudi-utilities/src/test/resources/schema/nested_data_schema.avsc
b/hudi-utilities/src/test/resources/schema/nested_data_schema.avsc
new file mode 100644
index 00000000000..972ca8c8f4a
--- /dev/null
+++ b/hudi-utilities/src/test/resources/schema/nested_data_schema.avsc
@@ -0,0 +1,81 @@
+{
+ "type": "record",
+ "name": "Person",
+ "namespace": "com.example",
+ "fields": [
+ {
+ "name": "firstName",
+ "type": "string"
+ },
+ {
+ "name": "lastName",
+ "type": ["null", "string"],
+ "aliases": ["surname"],
+ "default": null
+ },
+ {
+ "name": "birthdate",
+ "type": {
+ "type": "record",
+ "name": "Birthdate",
+ "fields": [
+ {
+ "name": "year",
+ "type": "int"
+ },
+ {
+ "name": "month",
+ "type": "int"
+ },
+ {
+ "name": "day",
+ "type": "int"
+ }
+ ]
+ },
+ "aliases": ["dob", "dateofbirth"]
+ },
+ {
+ "name": "address",
+ "type": {
+ "type": "record",
+ "name": "Address",
+ "fields": [
+ {
+ "name": "street",
+ "type": "string"
+ },
+ {
+ "name": "city",
+ "type": "string"
+ },
+ {
+ "name": "zipCode",
+ "type": "string",
+ "aliases": ["postalCode"]
+ },
+ {
+ "name": "country",
+ "type": ["null", {
+ "type": "record",
+ "name": "Country",
+ "fields": [
+ {
+ "name": "name",
+ "type": "string"
+ },
+ {
+ "name": "code",
+ "type": ["null", "string"],
+ "aliases": ["countryCode"],
+ "default": null
+ }
+ ]
+ }]
+ }
+ ]
+ },
+ "aliases": ["residence"]
+ }
+ ]
+}
diff --git a/hudi-utilities/src/test/resources/schema/sample_data_schema.avsc
b/hudi-utilities/src/test/resources/schema/sample_data_schema.avsc
index 13cbcfff4be..be698496cee 100644
--- a/hudi-utilities/src/test/resources/schema/sample_data_schema.avsc
+++ b/hudi-utilities/src/test/resources/schema/sample_data_schema.avsc
@@ -21,7 +21,8 @@
"fields": [
{
"name": "data",
- "type": "string"
+ "type": "string",
+ "aliases": ["old_data"]
}
]
}