codope commented on code in PR #12623:
URL: https://github.com/apache/hudi/pull/12623#discussion_r1917854534


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -438,9 +438,8 @@ private void processAppendResult(AppendResult result, 
List<HoodieRecord> recordL
           .getColumnsToIndex(hoodieTable.getMetaClient().getTableConfig(),
               config.getMetadataConfig(), 
Lazy.eagerly(Option.of(writeSchemaWithMetaFields)),
               Option.of(this.recordMerger.getRecordType())));
-      final List<Schema.Field> fieldsToIndex = 
writeSchemaWithMetaFields.getFields().stream()
-          .filter(field -> columnsToIndexSet.contains(field.name()))
-          .collect(Collectors.toList());
+      final List<Pair<String, Schema.Field>> fieldsToIndex = 
columnsToIndexSet.stream()

Review Comment:
   So, the fieldname in the pair looks like `a.b.c` for nested field and the 
value i.e. schema is only for child `c` or is it the whole schema for `a.b.c`?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -242,18 +241,18 @@ class ColumnStats {
     records.forEach((record) -> {
       // For each column (field) we have to index update corresponding column 
stats
       // with the values from this record
-      targetFields.forEach(field -> {
-        ColumnStats colStats = allColumnStats.computeIfAbsent(field.name(), 
ignored -> new ColumnStats());
-        Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(recordSchema, 
field.name());
+      targetFields.forEach(fieldNameFieldPair -> {
+        ColumnStats colStats = 
allColumnStats.computeIfAbsent(fieldNameFieldPair.getKey(), ignored -> new 
ColumnStats());
+        Schema fieldSchema = fieldNameFieldPair.getValue().schema();

Review Comment:
   for readability - we can extract pair's key and value to separate fieldName 
and fieldSchema variables and use that below everywhere.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1541,39 +1533,38 @@ private static List<String> 
getColumnsToIndex(HoodieTableConfig tableConfig,
    * required meta cols
    *
    * @param metadataConfig       metadata config
-   * @param tableSchema          either a list of the columns in the table, or 
a lazy option of the table schema
+   * @param tableSchemaLazyOpt   lazy option of the table schema
    * @param isTableInitializing true if table is being initialized.
    * @param recordType           Option of record type. Used to determine 
which types are valid to index
    * @return list of columns that should be indexed
    */
   private static Stream<String> 
getColumnsToIndexWithoutRequiredMetaFields(HoodieMetadataConfig metadataConfig,
-                                                                           
Either<List<String>, Lazy<Option<Schema>>> tableSchema,
+                                                                           
Lazy<Option<Schema>> tableSchemaLazyOpt,
                                                                            
boolean isTableInitializing,
                                                                            
Option<HoodieRecordType> recordType) {
     List<String> columnsToIndex = 
metadataConfig.getColumnsEnabledForColumnStatsIndex();
     if (!columnsToIndex.isEmpty()) {
       if (isTableInitializing) {
         return columnsToIndex.stream();
       }
-      // filter for top level fields here.
-      List<String> topLevelEligibleFields = tableSchema.isLeft() ? 
tableSchema.asLeft() :
-          (tableSchema.asRight().get().map(schema -> 
schema.getFields().stream()
-                  .filter(field -> isColumnTypeSupported(field.schema(), 
recordType))
-                  .map(field -> field.name()).collect(toList()))
-              .orElse(new ArrayList<String>()));
-      return columnsToIndex.stream().filter(fieldName -> 
!META_COL_SET_TO_INDEX.contains(fieldName) && !fieldName.contains(".")
-          && (topLevelEligibleFields.isEmpty() || 
topLevelEligibleFields.contains(fieldName)));
-    }
-    if (tableSchema.isLeft()) {
-      return getFirstNFieldNames(tableSchema.asLeft().stream(), 
metadataConfig.maxColumnsToIndexForColStats());
+      // filter for eligible fields
+      Option<Schema> tableSchema = tableSchemaLazyOpt.get();
+      return columnsToIndex.stream().filter(fieldName -> 
!META_COL_SET_TO_INDEX.contains(fieldName))
+          .filter(fieldName -> {
+            if (tableSchema.isPresent()) {
+              return 
isColumnTypeSupported(getSchemaForField(tableSchema.get(), 
fieldName).getValue().schema(), recordType);
+            } else {
+              return true;

Review Comment:
   why return true here? 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1586,6 +1577,25 @@ private static Stream<String> 
getFirstNFieldNames(Stream<String> fieldNames, int
     return fieldNames.filter(fieldName -> 
!HOODIE_META_COLUMNS_WITH_OPERATION.contains(fieldName)).limit(n);
   }
 
+  @VisibleForTesting
+  public static Pair<String, Schema.Field> getSchemaForField(Schema schema, 
String fieldName) {
+    return getSchemaForField(schema, fieldName, StringUtils.EMPTY_STRING);
+  }
+
+  @VisibleForTesting
+  public static Pair<String, Schema.Field> getSchemaForField(Schema schema, 
String fieldName, String prefix) {

Review Comment:
   Let's move these util methods to `HoodieAvroUtils` in hudi-common. And, can 
we add some direct UTs for the methods with simple field, nested field, 
nullable/non-nullable.



##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java:
##########
@@ -69,6 +73,25 @@ public class DataSourceUtils {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DataSourceUtils.class);
 
+  @VisibleForTesting
+  public static Pair<String, StructField> getSchemaForField(StructType schema, 
String fieldName) {
+    return getSchemaForField(schema, fieldName, StringUtils.EMPTY_STRING);
+  }
+
+  @VisibleForTesting
+  public static Pair<String, StructField> getSchemaForField(StructType schema, 
String fieldName, String prefix) {

Review Comment:
   let's move these to `HoodieSchemaUtils` in hudi-spark-common



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala:
##########
@@ -101,7 +103,12 @@ class ColumnStatIndexTestBase extends 
HoodieSparkClientTestBase {
 
     val inputDF = if (addNestedFiled) {
       preInputDF.withColumn("c9",
-        functions.struct("c2").withField("car_brand", lit("abc_brand")))
+        functions.struct("c2").withField("c9_1_car_brand", lit("abc_brand")))
+      .withColumn("c10",
+        functions.struct("c2").withField("c10_1_car_brand", lit("abc_brand"))
+      .withField("c10_1", functions.struct("c2")
+        .withField("c10_2_1_nested_lvl2_field1", lit("random_val1"))
+        .withField("c10_2_1_nested_lvl2_field2", lit("random_val2"))))

Review Comment:
   +1 on adding 2 levels of nesting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to