codope commented on code in PR #11817:
URL: https://github.com/apache/hudi/pull/11817#discussion_r1743507702
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java:
##########
@@ -155,4 +155,11 @@ public class CloudSourceConfig extends HoodieConfig {
.withDocumentation("Max time in secs to consume " +
MAX_NUM_MESSAGES_PER_SYNC.key() + " messages from cloud queue. Cloud event
queues like SQS, "
+ "PubSub can return empty responses even when messages are
available the queue, this config ensures we don't wait forever "
+ "to consume MAX_MESSAGES_CONF messages, but time out and move on
further.");
+
+ public static final ConfigProperty<Boolean>
SPARK_DATASOURCE_READER_COALESCE_ALIAS_COLUMNS = ConfigProperty
+ .key(STREAMER_CONFIG_PREFIX +
"source.cloud.data.reader.coalesce.aliases")
+ .defaultValue(false)
Review Comment:
I think we should enable this by default. Any particular reason for not
doing so?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/config/CloudSourceConfig.java:
##########
@@ -155,4 +155,11 @@ public class CloudSourceConfig extends HoodieConfig {
.withDocumentation("Max time in secs to consume " +
MAX_NUM_MESSAGES_PER_SYNC.key() + " messages from cloud queue. Cloud event
queues like SQS, "
+ "PubSub can return empty responses even when messages are
available the queue, this config ensures we don't wait forever "
+ "to consume MAX_MESSAGES_CONF messages, but time out and move on
further.");
+
+ public static final ConfigProperty<Boolean>
SPARK_DATASOURCE_READER_COALESCE_ALIAS_COLUMNS = ConfigProperty
+ .key(STREAMER_CONFIG_PREFIX +
"source.cloud.data.reader.coalesce.aliases")
+ .defaultValue(false)
+ .markAdvanced()
+ .sinceVersion("0.15.0")
Review Comment:
0.15.0 is already released. Current master version is 1.0.0.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java:
##########
@@ -316,6 +343,197 @@ private static Dataset<Row> coalesceOrRepartition(Dataset
dataset, int numPartit
return dataset;
}
+ private static boolean isCoalesceRequired(TypedProperties properties, Schema
sourceSchema) {
+ return getBooleanWithAltKeys(properties,
CloudSourceConfig.SPARK_DATASOURCE_READER_COALESCE_ALIAS_COLUMNS)
+ && Objects.nonNull(sourceSchema)
+ && hasFieldWithAliases(sourceSchema);
+ }
+
+ /**
+ * Recursively checks if an Avro schema or any of its nested fields contain
aliases.
+ *
+ * @param schema The Avro schema to check.
+ * @return True if the schema or any of its fields contain aliases, false
otherwise.
+ */
+ private static boolean hasFieldWithAliases(Schema schema) {
+ // If the schema is a record, check its fields recursively
+ if (isNestedRecord(schema)) {
+ for (Schema.Field field : getRecordFields(schema)) {
+ // Check if the field has aliases
+ if (!field.aliases().isEmpty()) {
+ return true;
+ }
+ // Recursively check the field's schema for aliases
+ if (hasFieldWithAliases(field.schema())) {
Review Comment:
With this method, along with `getNestedFields`, and other recursive methods,
there is potential for performance bottlenecks with very large and deeply
nested schemas. Consider testing these methods on complex, deeply nested
schemas to ensure performance remains acceptable. If you can do a performance
profile that would be great. Especially, if any of these methods are being
called per row then it's going to be a bottleneck.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java:
##########
@@ -316,6 +343,197 @@ private static Dataset<Row> coalesceOrRepartition(Dataset
dataset, int numPartit
return dataset;
}
+ private static boolean isCoalesceRequired(TypedProperties properties, Schema
sourceSchema) {
+ return getBooleanWithAltKeys(properties,
CloudSourceConfig.SPARK_DATASOURCE_READER_COALESCE_ALIAS_COLUMNS)
+ && Objects.nonNull(sourceSchema)
+ && hasFieldWithAliases(sourceSchema);
+ }
+
+ /**
+ * Recursively checks if an Avro schema or any of its nested fields contain
aliases.
+ *
+ * @param schema The Avro schema to check.
+ * @return True if the schema or any of its fields contain aliases, false
otherwise.
+ */
+ private static boolean hasFieldWithAliases(Schema schema) {
+ // If the schema is a record, check its fields recursively
+ if (isNestedRecord(schema)) {
+ for (Schema.Field field : getRecordFields(schema)) {
+ // Check if the field has aliases
+ if (!field.aliases().isEmpty()) {
+ return true;
+ }
+ // Recursively check the field's schema for aliases
+ if (hasFieldWithAliases(field.schema())) {
+ return true;
+ }
+ }
+ }
+ // No aliases found
+ return false;
+ }
+
+ private static StructType addAliasesToRowSchema(Schema avroSchema,
StructType rowSchema) {
+ Map<String, StructField> rowFieldsMap = Arrays.stream(rowSchema.fields())
+ .collect(Collectors.toMap(StructField::name, Function.identity()));
+
+ StructField[] modifiedFields = getRecordFields(avroSchema).stream()
+ .flatMap(avroField -> generateRowFieldsWithAliases(avroField,
rowFieldsMap.get(avroField.name())).stream())
+ .toArray(StructField[]::new);
+
+ return new StructType(modifiedFields);
+ }
+
+ private static List<Schema.Field> getRecordFields(Schema schema) {
+ if (schema.getType() == Schema.Type.RECORD) {
+ return schema.getFields();
+ }
+
+ if (schema.getType() == Schema.Type.UNION) {
+ return schema.getTypes().stream()
+ .filter(subSchema -> subSchema.getType() == Schema.Type.RECORD)
+ .findFirst()
+ .map(Schema::getFields)
+ .orElse(Collections.emptyList());
+ }
+
+ return Collections.emptyList();
+ }
+
+ /**
+ * Generates a list of StructFields with aliases applied based on the
provided Avro field schema.
+ * <p>
+ * This method processes a given Avro field and its corresponding Spark SQL
StructField, handling
+ * nested records and aliases. If the Avro field contains nested records,
the method recursively
+ * updates the schema for these records and applies any aliases defined in
the Avro schema.
+ * If the Avro field has aliases, they are added as new fields with nullable
set to true and
+ * appropriate metadata in the returned list. If no aliases or nesting are
present, the original
+ * StructField is returned unchanged.
+ *
+ * @param avroField The Avro field schema to process.
+ * @param rowField The corresponding Spark SQL StructField to map the Avro
field to.
+ * @return A list of StructFields with aliases applied as per the Avro
schema.
+ */
+ private static List<StructField> generateRowFieldsWithAliases(Schema.Field
avroField, StructField rowField) {
+ List<StructField> fieldList = new ArrayList<>();
+
+ // Handle nested records
+ if (isNestedRecord(avroField.schema())) {
+ StructType updatedSchema = addAliasesToRowSchema(avroField.schema(),
(StructType) rowField.dataType());
+
+ if (schemaModifiedOrHasAliases(avroField, updatedSchema, rowField)) {
+ // Add the original field with the updated schema and add aliases if
present
+ addFieldWithAliases(fieldList, avroField.name(), updatedSchema,
rowField.metadata(), avroField.aliases());
+ } else {
+ fieldList.add(rowField);
+ }
+ } else if (!avroField.aliases().isEmpty()) {
+ // If the field has aliases, add them to the schema
+ addFieldWithAliases(fieldList, avroField.name(), rowField.dataType(),
rowField.metadata(), avroField.aliases());
+ } else {
+ // No aliases or nesting, return the original field
+ fieldList.add(rowField);
+ }
+ return fieldList;
+ }
+
+ private static void addFieldWithAliases(List<StructField> fieldList, String
fieldName, DataType dataType, Metadata metadata, Set<String> aliases) {
+ fieldList.add(new StructField(fieldName, dataType, true, metadata));
+ aliases.forEach(alias -> fieldList.add(new StructField(alias, dataType,
true, metadata)));
+ }
+
+ private static Dataset<Row> coalesceAliasFields(Dataset<Row> dataset, Schema
sourceSchema) {
+ return coalesceNestedAliases(coalesceTopLevelAliases(dataset,
sourceSchema), sourceSchema);
+ }
+
+ /**
+ * Merges top-level fields with their aliases in the dataset.
+ * <p>
+ * This method goes through the top-level fields in the Avro schema, and for
any field that has aliases,
+ * it combines them in the dataset using a coalesce operation. This ensures
that if a field is null,
+ * the value from its alias is used instead.
+ *
+ * @param dataset The dataset to process.
+ * @param sourceSchema The Avro schema defining the fields and their aliases.
+ * @return A dataset with fields merged with their aliases.
+ */
+ private static Dataset<Row> coalesceTopLevelAliases(Dataset<Row> dataset,
Schema sourceSchema) {
+ return getRecordFields(sourceSchema).stream()
+ .filter(field -> !field.aliases().isEmpty())
+ .reduce(dataset,
+ (ds, field) -> coalesceAndDropAliasFields(ds, field.name(),
field.aliases()), (ds1, ds2) -> ds1);
+ }
+
+ private static Dataset<Row> coalesceAndDropAliasFields(Dataset<Row> dataset,
String fieldName, Set<String> aliases) {
+ List<Column> columns = new ArrayList<>();
+ columns.add(dataset.col(fieldName));
+ aliases.forEach(alias -> columns.add(dataset.col(alias)));
+
+ return dataset.withColumn(fieldName,
functions.coalesce(columns.toArray(new Column[0])))
+ .drop(aliases.toArray(new String[0]));
+ }
+
+ /**
+ * Merges nested fields with their aliases in the dataset.
+ * <p>
+ * This method iterates through the fields of the provided Avro schema and
checks if they represent
+ * nested records. For each nested record, it verifies if there are any
alias fields present. If
+ * aliases are found, the method generates a list of nested fields,
coalescing them with their aliases,
+ * and creates a new column in the dataset with the merged data.
+ *
+ * @param dataset The dataset to process.
+ * @param sourceSchema The Avro schema defining the structure and aliases of
the data.
+ * @return A dataset with nested fields merged with their aliases.
+ */
+ private static Dataset<Row> coalesceNestedAliases(Dataset<Row> dataset,
Schema sourceSchema) {
+ for (Schema.Field field : getRecordFields(sourceSchema)) {
+ // check if this is a nested record and contains an alias field within
+ if (isNestedRecord(field.schema()) &&
hasFieldWithAliases(field.schema())) {
+ dataset = dataset.withColumn(field.name(),
functions.struct(getNestedFields("", field, dataset)));
+ }
+ }
+ return dataset;
+ }
+
+ private static Column[] getNestedFields(String parentField, Schema.Field
field, Dataset<Row> dataset) {
+ return getRecordFields(field.schema()).stream()
+ .map(avroField -> {
+ List<Column> columns = new ArrayList<>();
+ String newParentField = getFullName(parentField, field.name());
+ if (isNestedRecord(avroField.schema())) {
+ // if field is nested, recursively fetch nested column
+ columns.add(functions.struct(getNestedFields(newParentField,
avroField, dataset)));
+ } else {
+ columns.add(dataset.col(getFullName(newParentField,
avroField.name())));
+ }
+ avroField.aliases().forEach(alias ->
columns.add(dataset.col(getFullName(newParentField, alias))));
+ // if avro field contains aliases, coalesce the column with others
matching the aliases otherwise return actual column
+ return avroField.aliases().isEmpty() ? columns.get(0)
+ : functions.coalesce(columns.toArray(new
Column[0])).alias(avroField.name());
+ }).toArray(Column[]::new);
+ }
+
+ private static boolean isNestedRecord(Schema schema) {
+ if (schema.getType() == Schema.Type.RECORD) {
+ return true;
+ }
+
+ if (schema.getType() == Schema.Type.UNION) {
+ return schema.getTypes().stream()
+ .anyMatch(subSchema -> subSchema.getType() == Schema.Type.RECORD);
+ }
+
+ return false;
+ }
+
+ private static String getFullName(String namespace, String fieldName) {
+ return namespace.isEmpty() ? fieldName : namespace + "." + fieldName;
+ }
+
+ private static boolean schemaModifiedOrHasAliases(Schema.Field avroField,
StructType modifiedNestedSchema, StructField rowField) {
+ return !modifiedNestedSchema.equals(rowField.dataType()) ||
!avroField.aliases().isEmpty();
Review Comment:
the equality check `modifiedNestedSchema.equals(rowField.dataType())` may
not always work as expected depending on how `StructType` implements equality.
Ensure that the `StructType` comparison works for all cases (e.g., order of
fields).
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java:
##########
@@ -316,6 +343,197 @@ private static Dataset<Row> coalesceOrRepartition(Dataset
dataset, int numPartit
return dataset;
}
+ private static boolean isCoalesceRequired(TypedProperties properties, Schema
sourceSchema) {
+ return getBooleanWithAltKeys(properties,
CloudSourceConfig.SPARK_DATASOURCE_READER_COALESCE_ALIAS_COLUMNS)
+ && Objects.nonNull(sourceSchema)
+ && hasFieldWithAliases(sourceSchema);
+ }
+
+ /**
+ * Recursively checks if an Avro schema or any of its nested fields contain
aliases.
+ *
+ * @param schema The Avro schema to check.
+ * @return True if the schema or any of its fields contain aliases, false
otherwise.
+ */
+ private static boolean hasFieldWithAliases(Schema schema) {
Review Comment:
Is this called per row/record?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java:
##########
@@ -316,6 +343,197 @@ private static Dataset<Row> coalesceOrRepartition(Dataset
dataset, int numPartit
return dataset;
}
+ private static boolean isCoalesceRequired(TypedProperties properties, Schema
sourceSchema) {
+ return getBooleanWithAltKeys(properties,
CloudSourceConfig.SPARK_DATASOURCE_READER_COALESCE_ALIAS_COLUMNS)
+ && Objects.nonNull(sourceSchema)
+ && hasFieldWithAliases(sourceSchema);
+ }
+
+ /**
+ * Recursively checks if an Avro schema or any of its nested fields contain
aliases.
+ *
+ * @param schema The Avro schema to check.
+ * @return True if the schema or any of its fields contain aliases, false
otherwise.
+ */
+ private static boolean hasFieldWithAliases(Schema schema) {
+ // If the schema is a record, check its fields recursively
+ if (isNestedRecord(schema)) {
+ for (Schema.Field field : getRecordFields(schema)) {
+ // Check if the field has aliases
+ if (!field.aliases().isEmpty()) {
+ return true;
+ }
+ // Recursively check the field's schema for aliases
+ if (hasFieldWithAliases(field.schema())) {
+ return true;
+ }
+ }
+ }
+ // No aliases found
+ return false;
+ }
+
+ private static StructType addAliasesToRowSchema(Schema avroSchema,
StructType rowSchema) {
+ Map<String, StructField> rowFieldsMap = Arrays.stream(rowSchema.fields())
+ .collect(Collectors.toMap(StructField::name, Function.identity()));
+
+ StructField[] modifiedFields = getRecordFields(avroSchema).stream()
+ .flatMap(avroField -> generateRowFieldsWithAliases(avroField,
rowFieldsMap.get(avroField.name())).stream())
+ .toArray(StructField[]::new);
+
+ return new StructType(modifiedFields);
+ }
+
+ private static List<Schema.Field> getRecordFields(Schema schema) {
+ if (schema.getType() == Schema.Type.RECORD) {
+ return schema.getFields();
+ }
+
+ if (schema.getType() == Schema.Type.UNION) {
+ return schema.getTypes().stream()
+ .filter(subSchema -> subSchema.getType() == Schema.Type.RECORD)
+ .findFirst()
+ .map(Schema::getFields)
+ .orElse(Collections.emptyList());
+ }
+
+ return Collections.emptyList();
+ }
+
+ /**
+ * Generates a list of StructFields with aliases applied based on the
provided Avro field schema.
+ * <p>
+ * This method processes a given Avro field and its corresponding Spark SQL
StructField, handling
+ * nested records and aliases. If the Avro field contains nested records,
the method recursively
+ * updates the schema for these records and applies any aliases defined in
the Avro schema.
+ * If the Avro field has aliases, they are added as new fields with nullable
set to true and
+ * appropriate metadata in the returned list. If no aliases or nesting are
present, the original
+ * StructField is returned unchanged.
+ *
+ * @param avroField The Avro field schema to process.
+ * @param rowField The corresponding Spark SQL StructField to map the Avro
field to.
+ * @return A list of StructFields with aliases applied as per the Avro
schema.
+ */
+ private static List<StructField> generateRowFieldsWithAliases(Schema.Field
avroField, StructField rowField) {
+ List<StructField> fieldList = new ArrayList<>();
+
+ // Handle nested records
+ if (isNestedRecord(avroField.schema())) {
+ StructType updatedSchema = addAliasesToRowSchema(avroField.schema(),
(StructType) rowField.dataType());
+
+ if (schemaModifiedOrHasAliases(avroField, updatedSchema, rowField)) {
+ // Add the original field with the updated schema and add aliases if
present
+ addFieldWithAliases(fieldList, avroField.name(), updatedSchema,
rowField.metadata(), avroField.aliases());
+ } else {
+ fieldList.add(rowField);
+ }
+ } else if (!avroField.aliases().isEmpty()) {
+ // If the field has aliases, add them to the schema
+ addFieldWithAliases(fieldList, avroField.name(), rowField.dataType(),
rowField.metadata(), avroField.aliases());
+ } else {
+ // No aliases or nesting, return the original field
+ fieldList.add(rowField);
+ }
+ return fieldList;
+ }
+
+ private static void addFieldWithAliases(List<StructField> fieldList, String
fieldName, DataType dataType, Metadata metadata, Set<String> aliases) {
+ fieldList.add(new StructField(fieldName, dataType, true, metadata));
Review Comment:
Should we avoid hard-coded nullability and follow the nullability from
source schema?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]