vamsikarnika commented on code in PR #11817:
URL: https://github.com/apache/hudi/pull/11817#discussion_r1754423260


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java:
##########
@@ -316,6 +343,197 @@ private static Dataset<Row> coalesceOrRepartition(Dataset 
dataset, int numPartit
     return dataset;
   }
 
+  private static boolean isCoalesceRequired(TypedProperties properties, Schema 
sourceSchema) {
+    return getBooleanWithAltKeys(properties, 
CloudSourceConfig.SPARK_DATASOURCE_READER_COALESCE_ALIAS_COLUMNS)
+        && Objects.nonNull(sourceSchema)
+        && hasFieldWithAliases(sourceSchema);
+  }
+
+  /**
+   * Recursively checks if an Avro schema or any of its nested fields contain 
aliases.
+   *
+   * @param schema The Avro schema to check.
+   * @return True if the schema or any of its fields contain aliases, false 
otherwise.
+   */
+  private static boolean hasFieldWithAliases(Schema schema) {
+    // If the schema is a record, check its fields recursively
+    if (isNestedRecord(schema)) {
+      for (Schema.Field field : getRecordFields(schema)) {
+        // Check if the field has aliases
+        if (!field.aliases().isEmpty()) {
+          return true;
+        }
+        // Recursively check the field's schema for aliases
+        if (hasFieldWithAliases(field.schema())) {
+          return true;
+        }
+      }
+    }
+    // No aliases found
+    return false;
+  }
+
+  private static StructType addAliasesToRowSchema(Schema avroSchema, 
StructType rowSchema) {
+    Map<String, StructField> rowFieldsMap = Arrays.stream(rowSchema.fields())
+        .collect(Collectors.toMap(StructField::name, Function.identity()));
+
+    StructField[] modifiedFields = getRecordFields(avroSchema).stream()
+        .flatMap(avroField -> generateRowFieldsWithAliases(avroField, 
rowFieldsMap.get(avroField.name())).stream())
+        .toArray(StructField[]::new);
+
+    return new StructType(modifiedFields);
+  }
+
+  private static List<Schema.Field> getRecordFields(Schema schema) {
+    if (schema.getType() == Schema.Type.RECORD) {
+      return schema.getFields();
+    }
+
+    if (schema.getType() == Schema.Type.UNION) {
+      return schema.getTypes().stream()
+          .filter(subSchema -> subSchema.getType() == Schema.Type.RECORD)
+          .findFirst()
+          .map(Schema::getFields)
+          .orElse(Collections.emptyList());
+    }
+
+    return Collections.emptyList();
+  }
+
+  /**
+   * Generates a list of StructFields with aliases applied based on the 
provided Avro field schema.
+   * <p>
+   * This method processes a given Avro field and its corresponding Spark SQL 
StructField, handling
+   * nested records and aliases. If the Avro field contains nested records, 
the method recursively
+   * updates the schema for these records and applies any aliases defined in 
the Avro schema.
+   * If the Avro field has aliases, they are added as new fields with nullable 
set to true and
+   * appropriate metadata in the returned list. If no aliases or nesting are 
present, the original
+   * StructField is returned unchanged.
+   *
+   * @param avroField The Avro field schema to process.
+   * @param rowField  The corresponding Spark SQL StructField to map the Avro 
field to.
+   * @return A list of StructFields with aliases applied as per the Avro 
schema.
+   */
+  private static List<StructField> generateRowFieldsWithAliases(Schema.Field 
avroField, StructField rowField) {
+    List<StructField> fieldList = new ArrayList<>();
+
+    // Handle nested records
+    if (isNestedRecord(avroField.schema())) {
+      StructType updatedSchema = addAliasesToRowSchema(avroField.schema(), 
(StructType) rowField.dataType());
+
+      if (schemaModifiedOrHasAliases(avroField, updatedSchema, rowField)) {
+        // Add the original field with the updated schema and add aliases if 
present
+        addFieldWithAliases(fieldList, avroField.name(), updatedSchema, 
rowField.metadata(), avroField.aliases());
+      } else {
+        fieldList.add(rowField);
+      }
+    } else if (!avroField.aliases().isEmpty()) {
+      // If the field has aliases, add them to the schema
+      addFieldWithAliases(fieldList, avroField.name(), rowField.dataType(), 
rowField.metadata(), avroField.aliases());
+    } else {
+      // No aliases or nesting, return the original field
+      fieldList.add(rowField);
+    }
+    return fieldList;
+  }
+
+  private static void addFieldWithAliases(List<StructField> fieldList, String 
fieldName, DataType dataType, Metadata metadata, Set<String> aliases) {
+    fieldList.add(new StructField(fieldName, dataType, true, metadata));
+    aliases.forEach(alias -> fieldList.add(new StructField(alias, dataType, 
true, metadata)));
+  }
+
+  private static Dataset<Row> coalesceAliasFields(Dataset<Row> dataset, Schema 
sourceSchema) {
+    return coalesceNestedAliases(coalesceTopLevelAliases(dataset, 
sourceSchema), sourceSchema);
+  }
+
+  /**
+   * Merges top-level fields with their aliases in the dataset.
+   * <p>
+   * This method goes through the top-level fields in the Avro schema, and for 
any field that has aliases,
+   * it combines them in the dataset using a coalesce operation. This ensures 
that if a field is null,
+   * the value from its alias is used instead.
+   *
+   * @param dataset      The dataset to process.
+   * @param sourceSchema The Avro schema defining the fields and their aliases.
+   * @return A dataset with fields merged with their aliases.
+   */
+  private static Dataset<Row> coalesceTopLevelAliases(Dataset<Row> dataset, 
Schema sourceSchema) {
+    return getRecordFields(sourceSchema).stream()
+        .filter(field -> !field.aliases().isEmpty())
+        .reduce(dataset,
+            (ds, field) -> coalesceAndDropAliasFields(ds, field.name(), 
field.aliases()), (ds1, ds2) -> ds1);
+  }
+
+  private static Dataset<Row> coalesceAndDropAliasFields(Dataset<Row> dataset, 
String fieldName, Set<String> aliases) {
+    List<Column> columns = new ArrayList<>();
+    columns.add(dataset.col(fieldName));
+    aliases.forEach(alias -> columns.add(dataset.col(alias)));
+
+    return dataset.withColumn(fieldName, 
functions.coalesce(columns.toArray(new Column[0])))
+        .drop(aliases.toArray(new String[0]));
+  }
+
+  /**
+   * Merges nested fields with their aliases in the dataset.
+   * <p>
+   * This method iterates through the fields of the provided Avro schema and 
checks if they represent
+   * nested records. For each nested record, it verifies if there are any 
alias fields present. If
+   * aliases are found, the method generates a list of nested fields, 
coalescing them with their aliases,
+   * and creates a new column in the dataset with the merged data.
+   *
+   * @param dataset      The dataset to process.
+   * @param sourceSchema The Avro schema defining the structure and aliases of 
the data.
+   * @return A dataset with nested fields merged with their aliases.
+   */
+  private static Dataset<Row> coalesceNestedAliases(Dataset<Row> dataset, 
Schema sourceSchema) {
+    for (Schema.Field field : getRecordFields(sourceSchema)) {
+      // check if this is a nested record and contains an alias field within
+      if (isNestedRecord(field.schema()) && 
hasFieldWithAliases(field.schema())) {
+        dataset = dataset.withColumn(field.name(), 
functions.struct(getNestedFields("", field, dataset)));
+      }
+    }
+    return dataset;
+  }
+
+  private static Column[] getNestedFields(String parentField, Schema.Field 
field, Dataset<Row> dataset) {
+    return getRecordFields(field.schema()).stream()
+        .map(avroField -> {
+          List<Column> columns = new ArrayList<>();
+          String newParentField = getFullName(parentField, field.name());
+          if (isNestedRecord(avroField.schema())) {
+            // if field is nested, recursively fetch nested column
+            columns.add(functions.struct(getNestedFields(newParentField, 
avroField, dataset)));
+          } else {
+            columns.add(dataset.col(getFullName(newParentField, 
avroField.name())));
+          }
+          avroField.aliases().forEach(alias -> 
columns.add(dataset.col(getFullName(newParentField, alias))));
+          // if avro field contains aliases, coalesce the column with others 
matching the aliases otherwise return actual column
+          return avroField.aliases().isEmpty() ? columns.get(0)
+              : functions.coalesce(columns.toArray(new 
Column[0])).alias(avroField.name());
+        }).toArray(Column[]::new);
+  }
+
+  private static boolean isNestedRecord(Schema schema) {
+    if (schema.getType() == Schema.Type.RECORD) {
+      return true;
+    }
+
+    if (schema.getType() == Schema.Type.UNION) {
+      return schema.getTypes().stream()
+          .anyMatch(subSchema -> subSchema.getType() == Schema.Type.RECORD);
+    }
+
+    return false;
+  }
+
+  private static String getFullName(String namespace, String fieldName) {
+    return namespace.isEmpty() ? fieldName : namespace + "." + fieldName;
+  }
+
+  private static boolean schemaModifiedOrHasAliases(Schema.Field avroField, 
StructType modifiedNestedSchema, StructField rowField) {
+    return !modifiedNestedSchema.equals(rowField.dataType()) || 
!avroField.aliases().isEmpty();

Review Comment:
   It seems StructType's equality check is quite strict. Any change in field 
order, nullability, or metadata causes it to return false. Since schema 
modifications for alias fields don't alter existing fields when no alias is 
present, this shouldn't be an issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to