This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e3f93425cd3b [HUDI-9439] Add helpers for reading select fields from 
schema in FileGroupReader (#13485)
e3f93425cd3b is described below

commit e3f93425cd3b5d5fc07d1b39f6396ab403f9b76d
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Jul 1 17:33:37 2025 +0530

    [HUDI-9439] Add helpers for reading select fields from schema in 
FileGroupReader (#13485)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |   4 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 143 +++++++++++++++++--
 .../hudi/metadata/HoodieTableMetadataUtil.java     |   4 +-
 .../SecondaryIndexRecordGenerationUtils.java       |  24 +++-
 .../org/apache/hudi/avro/TestHoodieAvroUtils.java  | 154 ++++++++++++++++++++-
 .../org/apache/hudi/common/util/ParquetUtils.java  |   2 +-
 .../functional/TestSecondaryIndexPruning.scala     |  19 +--
 .../apache/hudi/utilities/TestHoodieIndexer.java   |   8 +-
 .../indexer-secondary-index.properties             |   4 +-
 9 files changed, 327 insertions(+), 35 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 85416d329c26..ea3256321f5f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -93,6 +93,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -780,7 +781,8 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
       final String fileId = fileSlice.getFileId();
       HoodieReaderContext<T> readerContext = readerContextFactory.getContext();
       Schema dataSchema = 
AvroSchemaCache.intern(HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(dataWriteConfig.getWriteSchema()), 
dataWriteConfig.allowOperationMetadataField()));
-      Schema requestedSchema = 
metaClient.getTableConfig().populateMetaFields() ? getRecordKeySchema() : 
dataSchema;
+      Schema requestedSchema = 
metaClient.getTableConfig().populateMetaFields() ? getRecordKeySchema()
+          : HoodieAvroUtils.projectSchema(dataSchema, 
Arrays.asList(metaClient.getTableConfig().getRecordKeyFields().orElse(new 
String[0])));
       Option<InternalSchema> internalSchemaOption = 
SerDeHelper.fromJson(dataWriteConfig.getInternalSchema());
       HoodieFileGroupReader<T> fileGroupReader = 
HoodieFileGroupReader.<T>newBuilder()
           .withReaderContext(readerContext)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index f1595d8100d8..53bef9cb238b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -84,8 +84,10 @@ import java.time.LocalDate;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -96,6 +98,7 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.TimeZone;
+import java.util.TreeMap;
 import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -427,19 +430,136 @@ public class HoodieAvroUtils {
   }
 
   /**
-   * Fetch schema for record key and partition path.
+   * Fetches projected schema given list of fields to project. The field can 
be nested in format `a.b.c` where a is
+   * the top level field, b is at second level and so on.
    */
-  public static Schema getSchemaForFields(Schema fileSchema, List<String> 
fields) {
-    List<Schema.Field> toBeAddedFields = new ArrayList<>();
-    Schema recordSchema = Schema.createRecord("HoodieRecordKey", "", "", 
false);
+  public static Schema projectSchema(Schema fileSchema, List<String> fields) {
+    List<LinkedList<String>> fieldPathLists = new ArrayList<>();
+    for (String path : fields) {
+      fieldPathLists.add(new LinkedList<>(Arrays.asList(path.split("\\."))));
+    }
+
+    Schema result = Schema.createRecord(fileSchema.getName(), 
fileSchema.getDoc(), fileSchema.getNamespace(), fileSchema.isError());
+    result.setFields(projectFields(fileSchema, fieldPathLists));
+    return result;
+  }
 
-    for (Schema.Field schemaField : fileSchema.getFields()) {
-      if (fields.contains(schemaField.name())) {
-        toBeAddedFields.add(new Schema.Field(schemaField.name(), 
schemaField.schema(), schemaField.doc(), schemaField.defaultVal()));
+  /**
+   * Projects the requested fields in the schema. The fields can be nested in 
format `a.b.c`.
+   *
+   * @param originalSchema Schema to project from
+   * @param fieldPaths List of fields. The field can be nested.
+   *
+   * @return List of projected schema fields
+   */
+  private static List<Schema.Field> projectFields(Schema originalSchema, 
List<LinkedList<String>> fieldPaths) {
+    // We maintain a mapping of top level field to list of nested field paths 
which need to be projected from the field
+    // If the entire field needs to be projected the list is empty
+    // The map is ordered by position of the field in the original schema so 
that projected schema also maintains
+    // the same field ordering
+    Map<Field, List<LinkedList<String>>> groupedByTop = new 
TreeMap<>(Comparator.comparingInt(Field::pos));
+
+    // Group paths by their current level (first element)
+    // Here nested fields are considered as a path. The top level field is the 
first element in the path
+    // second level field is the second element and so on.
+    // We iterate through the input field paths and populate groupedByTop 
described above
+    // For example if f1 is a top level field, and input field paths are 
f1.f2.f3 and f1.f2.f4
+    // we maintain a mapping from f1 to {f2->f3, f2->f4}
+    for (LinkedList<String> originalPath : fieldPaths) {
+      if (originalPath.isEmpty()) {
+        throw new IllegalArgumentException("Field path is empty or malformed: 
" + originalPath);
+      }
+      LinkedList<String> path = new LinkedList<>(originalPath); // Avoid 
mutating the original
+      String head = path.poll(); // Remove first element
+      Field topField = originalSchema.getField(head);
+      if (topField == null) {
+        throw new IllegalArgumentException("Field `" + head + "` not found in 
schema: " + originalSchema.getFields());
       }
+      groupedByTop.compute(topField, (ignored, list) -> {
+        if (path.isEmpty() || (list != null && list.isEmpty())) {
+          // Case 1: where path is empty indicating the entire field needs to 
be included.
+          // No further projection provided for that field
+          // Case 2: if list is empty, it indicates that the top level field 
has already been selected
+          // No more projection possible.
+          return new ArrayList<>();
+        } else {
+          if (list == null) {
+            // if list is null return just the path
+            List<LinkedList<String>> retList = new ArrayList<>();
+            retList.add(path);
+            return retList;
+          } else {
+            // if list has elements, add path to it
+            list.add(path);
+            return list;
+          }
+        }
+      });
     }
-    recordSchema.setFields(toBeAddedFields);
-    return recordSchema;
+
+    List<Schema.Field> projectedFields = new ArrayList<>();
+    for (Map.Entry<Field, List<LinkedList<String>>> entry : 
groupedByTop.entrySet()) {
+      // For every top level field we process the child fields to include in 
the projected schema
+      Field originalField = entry.getKey();
+      List<LinkedList<String>> childPaths = entry.getValue();
+      Schema originalFieldSchema = originalField.schema();
+      Schema nonNullableSchema = unwrapNullable(originalFieldSchema);
+
+      Schema projectedFieldSchema;
+      if (!childPaths.isEmpty()) {
+        // If child paths are present, it indicates there are nested fields 
which need to be projected
+        if (nonNullableSchema.getType() != Schema.Type.RECORD) {
+          throw new IllegalArgumentException("Cannot project nested field from 
non-record field '" + originalField.name()
+              + "' of type: " + nonNullableSchema.getType());
+        }
+
+        // For those nested fields we make a recursive call to project fields 
to fetch the nested schema fields
+        List<Schema.Field> nestedFields = projectFields(nonNullableSchema, 
childPaths);
+        Schema nestedProjected = 
Schema.createRecord(nonNullableSchema.getName(), nonNullableSchema.getDoc(),
+            nonNullableSchema.getNamespace(), nonNullableSchema.isError());
+        nestedProjected.setFields(nestedFields);
+        projectedFieldSchema = wrapNullable(originalFieldSchema, 
nestedProjected);
+      } else {
+        // if child field paths are empty, we need to include the top level 
field itself
+        projectedFieldSchema = originalFieldSchema;
+      }
+
+      projectedFields.add(new Schema.Field(originalField.name(), 
projectedFieldSchema, originalField.doc(), originalField.defaultVal()));
+    }
+
+    return projectedFields;
+  }
+
+  /**
+   * If schema is a union with ["null", X], returns X.
+   * Otherwise, returns schema as-is.
+   */
+  public static Schema unwrapNullable(Schema schema) {
+    if (schema.getType() == Schema.Type.UNION) {
+      List<Schema> types = schema.getTypes();
+      for (Schema s : types) {
+        if (s.getType() != Schema.Type.NULL) {
+          return s;
+        }
+      }
+    }
+    return schema;
+  }
+
+  /**
+   * Wraps schema as nullable if original was a nullable union.
+   */
+  public static Schema wrapNullable(Schema original, Schema updated) {
+    if (original.getType() == Schema.Type.UNION) {
+      List<Schema> types = original.getTypes();
+      if (types.stream().anyMatch(s -> s.getType() == Schema.Type.NULL)) {
+        List<Schema> newUnion = new ArrayList<>();
+        newUnion.add(Schema.create(Schema.Type.NULL));
+        newUnion.add(updated);
+        return Schema.createUnion(newUnion);
+      }
+    }
+    return updated;
   }
 
   public static GenericRecord addHoodieKeyToRecord(GenericRecord record, 
String recordKey, String partitionPath,
@@ -1602,11 +1722,12 @@ public class HoodieAvroUtils {
 
   @VisibleForTesting
   public static Pair<String, Schema.Field> getSchemaForField(Schema schema, 
String fieldName, String prefix) {
+    Schema nonNullableSchema = unwrapNullable(schema);
     if (!fieldName.contains(".")) {
-      return Pair.of(prefix + fieldName, schema.getField(fieldName));
+      return Pair.of(prefix + fieldName, 
nonNullableSchema.getField(fieldName));
     } else {
       int rootFieldIndex = fieldName.indexOf(".");
-      Schema.Field rootField = schema.getField(fieldName.substring(0, 
rootFieldIndex));
+      Schema.Field rootField = 
nonNullableSchema.getField(fieldName.substring(0, rootFieldIndex));
       if (rootField == null) {
         throw new HoodieException("Failed to find " + fieldName + " in the 
table schema ");
       }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index dcee9952eeda..13dd2caf2a3b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -159,7 +159,7 @@ import static java.util.stream.Collectors.toList;
 import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
 import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
 import static 
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
-import static org.apache.hudi.avro.HoodieAvroUtils.getSchemaForFields;
+import static org.apache.hudi.avro.HoodieAvroUtils.projectSchema;
 import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
 import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
 import static 
org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
@@ -2457,7 +2457,7 @@ public class HoodieTableMetadataUtil {
     List<String> mergedFields = new ArrayList<>(partitionFields.size() + 
sourceFields.size());
     mergedFields.addAll(partitionFields);
     mergedFields.addAll(sourceFields);
-    return addMetadataFields(getSchemaForFields(tableSchema, mergedFields));
+    return addMetadataFields(projectSchema(tableSchema, mergedFields));
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
index 11d4ed23cdea..8a8a0ddd549d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.data.HoodieData;
@@ -48,6 +49,7 @@ import org.apache.avro.Schema;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -55,6 +57,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
 import static 
org.apache.hudi.metadata.HoodieMetadataPayload.createSecondaryIndexRecord;
 import static org.apache.hudi.metadata.HoodieTableMetadataUtil.filePath;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight;
@@ -216,7 +219,8 @@ public class SecondaryIndexRecordGenerationUtils {
                                                                                
                 String instantTime,
                                                                                
                 TypedProperties props,
                                                                                
                 boolean allowInflightInstants) throws IOException {
-    String secondaryKeyField = String.join(".", 
indexDefinition.getSourceFields());
+    String secondaryKeyField = indexDefinition.getSourceFieldsKey();
+    Schema requestedSchema = getRequestedSchemaForSecondaryIndex(metaClient, 
tableSchema, secondaryKeyField);
     HoodieFileGroupReader<T> fileGroupReader = 
HoodieFileGroupReader.<T>newBuilder()
         .withReaderContext(readerContext)
         .withFileSlice(fileSlice)
@@ -224,7 +228,7 @@ public class SecondaryIndexRecordGenerationUtils {
         .withProps(props)
         .withLatestCommitTime(instantTime)
         .withDataSchema(tableSchema)
-        .withRequestedSchema(tableSchema)
+        .withRequestedSchema(requestedSchema)
         .withAllowInflightInstants(allowInflightInstants)
         .build();
 
@@ -253,10 +257,10 @@ public class SecondaryIndexRecordGenerationUtils {
         // Loop to find the next valid record or exhaust the iterator.
         while (recordIterator.hasNext()) {
           T record = recordIterator.next();
-          Object secondaryKey = readerContext.getValue(record, tableSchema, 
secondaryKeyField);
+          Object secondaryKey = readerContext.getValue(record, 
requestedSchema, secondaryKeyField);
           if (secondaryKey != null) {
             nextValidRecord = Pair.of(
-                readerContext.getRecordKey(record, tableSchema),
+                readerContext.getRecordKey(record, requestedSchema),
                 secondaryKey.toString()
             );
             return true;
@@ -278,4 +282,16 @@ public class SecondaryIndexRecordGenerationUtils {
       }
     };
   }
+
+  private static Schema 
getRequestedSchemaForSecondaryIndex(HoodieTableMetaClient metaClient, Schema 
tableSchema, String secondaryKeyField) {
+    String[] recordKeyFields;
+    if (tableSchema.getField(RECORD_KEY_METADATA_FIELD) != null) {
+      recordKeyFields = new String[] {RECORD_KEY_METADATA_FIELD};
+    } else {
+      recordKeyFields = 
metaClient.getTableConfig().getRecordKeyFields().orElse(new String[0]);
+    }
+    String[] projectionFields = Arrays.copyOf(recordKeyFields, 
recordKeyFields.length + 1);
+    projectionFields[recordKeyFields.length] = secondaryKeyField;
+    return HoodieAvroUtils.projectSchema(tableSchema, 
Arrays.asList(projectionFields));
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index b1c8c453f82f..b0444e3de41c 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -179,6 +179,10 @@ public class TestHoodieAvroUtils {
       + "{\"name\":\"doubleField\",\"type\":\"double\"},"
       + "{\"name\":\"bytesField\",\"type\":\"bytes\"},"
       + "{\"name\":\"stringField\",\"type\":\"string\"},"
+      + "{\"name\":\"secondLevelField\",\"type\":[\"null\", 
{\"name\":\"secondLevelField\",\"type\":\"record\",\"fields\":["
+      + 
"{\"name\":\"firstname\",\"type\":[\"null\",\"string\"],\"default\":null},"
+      + 
"{\"name\":\"lastname\",\"type\":[\"null\",\"string\"],\"default\":null}"
+      + "]}],\"default\":null},"
       // Logical types
       + 
"{\"name\":\"decimalField\",\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":20,\"scale\":5},"
       + 
"{\"name\":\"timeMillisField\",\"type\":\"int\",\"logicalType\":\"time-millis\"},"
@@ -189,7 +193,6 @@ public class TestHoodieAvroUtils {
       + 
"{\"name\":\"localTimestampMicrosField\",\"type\":\"long\",\"logicalType\":\"local-timestamp-micros\"}"
       + "]}";
 
-  private static final Schema SCHEMA_WITH_NESTED_FIELD = new 
Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD_STR);
   private static final Schema SCHEMA_WITH_AVRO_TYPES = new 
Schema.Parser().parse(SCHEMA_WITH_AVRO_TYPES_STR);
 
   // Define schema with a nested field containing a union type
@@ -218,7 +221,7 @@ public class TestHoodieAvroUtils {
   public static String SCHEMA_WITH_NESTED_FIELD_LARGE_STR = 
"{\"name\":\"MyClass\",\"type\":\"record\",\"namespace\":\"com.acme.avro\",\"fields\":["
       + "{\"name\":\"firstname\",\"type\":\"string\"},"
       + "{\"name\":\"lastname\",\"type\":\"string\"},"
-      + "{\"name\":\"nested_field\",\"type\":" + SCHEMA_WITH_AVRO_TYPES_STR + 
"},"
+      + "{\"name\":\"nested_field\",\"type\":[\"null\"," + 
SCHEMA_WITH_AVRO_TYPES_STR + "],\"default\":null},"
       + 
"{\"name\":\"student\",\"type\":{\"name\":\"student\",\"type\":\"record\",\"fields\":["
       + "{\"name\":\"firstname\",\"type\":[\"null\" ,\"string\"],\"default\": 
null},{\"name\":\"lastname\",\"type\":[\"null\" ,\"string\"],\"default\": 
null}]}}]}";
 
@@ -640,6 +643,8 @@ public class TestHoodieAvroUtils {
     BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, 0, 
data.length, null);
     GenericRecord deserializedRecord = reader.read(null, decoder);
     Map<String, Object> fieldValueMapping = 
deserializedRecord.getSchema().getFields().stream()
+        // filtering out nested record field
+        .filter(field -> !field.schema().getType().equals(Schema.Type.UNION))
         .collect(Collectors.toMap(
             Schema.Field::name,
             field -> deserializedRecord.get(field.name())
@@ -930,4 +935,149 @@ public class TestHoodieAvroUtils {
     assertEquals(colName, actualColNameAndSchemaFile.getKey());
     assertEquals(schemaType, 
resolveNullableSchema(actualColNameAndSchemaFile.getValue().schema()).getType());
   }
+
+  public static Stream<Arguments> getExpectedSchemaForFields() {
+    // Projection of two nested fields. secondLevelField is entirely projected 
since both its fields are included
+    List<String> fields1 = 
Arrays.asList("nested_field.secondLevelField.firstname", 
"nested_field.secondLevelField.lastname");
+    // Expected schema - top level field and one nested field
+    String expectedSchema1 =
+        "{\n"
+            + "  \"type\": \"record\",\n"
+            + "  \"name\": \"MyClass\",\n"
+            + "  \"doc\": \"\",\n"
+            + "  \"namespace\": \"com.acme.avro\",\n"
+            + "  \"fields\": [\n"
+            + "    { \"name\": \"nested_field\", \"type\": [\"null\", {\n"
+            + "      \"type\": \"record\",\n"
+            + "      \"name\": \"TestRecordAvroTypes\",\n"
+            + "      \"fields\": [\n"
+            + "        { \"name\": \"secondLevelField\", \"type\": [\"null\", 
{\n"
+            + "          \"type\": \"record\",\n"
+            + "          \"name\": \"secondLevelField\",\n"
+            + "          \"fields\": [\n"
+            + "            { \"name\": \"firstname\", \"type\": [\"null\", 
\"string\"], \"default\": null },\n"
+            + "            { \"name\": \"lastname\", \"type\": [\"null\", 
\"string\"], \"default\": null }\n"
+            + "          ]\n"
+            + "        }], \"default\": null }\n"
+            + "      ]\n"
+            + "    }], \"default\": null }\n"
+            + "  ]\n"
+            + "}";
+
+    // Projection of first level nested field and top level field which 
contains the nested field
+    // Also include the nested field twice
+    // Expected schema - top level field
+    List<String> fields2 = 
Arrays.asList("nested_field.secondLevelField.lastname", "nested_field",
+        "nested_field.secondLevelField.lastname");
+    String expectedSchema2 =
+        "{\n"
+            + "  \"type\": \"record\",\n"
+            + "  \"name\": \"MyClass\",\n"
+            + "  \"doc\": \"\",\n"
+            + "  \"namespace\": \"com.acme.avro\",\n"
+            + "  \"fields\": [\n"
+            + "    { \"name\": \"nested_field\", \"type\": [\"null\", " + 
SCHEMA_WITH_AVRO_TYPES_STR + "], \"default\": null }\n"
+            + "  ]\n"
+            + "}";
+
+    // Projection of non overlapping nested field and top level field with 
nested fields
+    // Expected schema - top level field and one nested field
+    List<String> fields3 = Arrays.asList("student.lastname", "nested_field");
+    String expectedSchema3 =
+        "{\n"
+            + "  \"type\": \"record\",\n"
+            + "  \"name\": \"MyClass\",\n"
+            + "  \"doc\": \"\",\n"
+            + "  \"namespace\": \"com.acme.avro\",\n"
+            + "  \"fields\": [\n"
+            + "    { \"name\": \"nested_field\", \"type\": [\"null\", " + 
SCHEMA_WITH_AVRO_TYPES_STR + "], \"default\": null },\n"
+            + "    { \"name\": \"student\", \"type\": {\n"
+            + "      \"type\": \"record\",\n"
+            + "      \"name\": \"student\",\n"
+            + "      \"fields\": [\n"
+            + "        { \"name\": \"lastname\", \"type\": [\"null\", 
\"string\"], \"default\": null }\n"
+            + "      ]\n"
+            + "    }}\n"
+            + "  ]\n"
+            + "}";
+
+    // Projection of two nested fields
+    // Expected schema - two nested fields
+    List<String> fields4 = Arrays.asList("student.lastname", 
"nested_field.secondLevelField.lastname");
+    String expectedSchema4 =
+        "{\n"
+            + "  \"type\": \"record\",\n"
+            + "  \"name\": \"MyClass\",\n"
+            + "  \"doc\": \"\",\n"
+            + "  \"namespace\": \"com.acme.avro\",\n"
+            + "  \"fields\": [\n"
+            + "    { \"name\": \"nested_field\", \"type\": [\"null\", {\n"
+            + "      \"type\": \"record\",\n"
+            + "      \"name\": \"TestRecordAvroTypes\",\n"
+            + "      \"fields\": [\n"
+            + "        { \"name\": \"secondLevelField\", \"type\": [\"null\", 
{\n"
+            + "          \"type\": \"record\",\n"
+            + "          \"name\": \"secondLevelField\",\n"
+            + "          \"fields\": [\n"
+            + "            { \"name\": \"lastname\", \"type\": [\"null\", 
\"string\"], \"default\": null }\n"
+            + "          ]\n"
+            + "        }], \"default\": null }\n"
+            + "      ]\n"
+            + "    }], \"default\": null },\n"
+            + "    { \"name\": \"student\", \"type\": {\n"
+            + "      \"type\": \"record\",\n"
+            + "      \"name\": \"student\",\n"
+            + "      \"namespace\": \"com.acme.avro\","
+            + "      \"fields\": [\n"
+            + "        { \"name\": \"lastname\", \"type\": [\"null\", 
\"string\"], \"default\": null }\n"
+            + "      ]\n"
+            + "    }}\n"
+            + "  ]\n"
+            + "}";
+
+    // Projection of top level field and nested field column
+    List<String> fields5 = Arrays.asList("firstname", 
"nested_field.secondLevelField.lastname", "nested_field.longField");
+    // Expected schema - top level field and one nested field
+    String expectedSchema5 =
+        "{\n"
+            + "  \"type\": \"record\",\n"
+            + "  \"name\": \"MyClass\",\n"
+            + "  \"doc\": \"\",\n"
+            + "  \"namespace\": \"com.acme.avro\",\n"
+            + "  \"fields\": [\n"
+            + "    { \"name\": \"firstname\", \"type\": \"string\" },\n"
+            + "    { \"name\": \"nested_field\", \"type\": [\"null\", {\n"
+            + "      \"type\": \"record\",\n"
+            + "      \"name\": \"TestRecordAvroTypes\",\n"
+            + "      \"fields\": [\n"
+            + "        { \"name\": \"longField\", \"type\": \"long\" },\n"
+            + "        { \"name\": \"secondLevelField\", \"type\": [\"null\", 
{\n"
+            + "          \"type\": \"record\",\n"
+            + "          \"name\": \"secondLevelField\",\n"
+            + "          \"fields\": [\n"
+            + "            { \"name\": \"lastname\", \"type\": [\"null\", 
\"string\"], \"default\": null }\n"
+            + "          ]\n"
+            + "        }], \"default\": null }\n"
+            + "      ]\n"
+            + "    }], \"default\": null }\n"
+            + "  ]\n"
+            + "}";
+
+    Object[][] data = new Object[][] {
+        {fields1, expectedSchema1},
+        {fields2, expectedSchema2},
+        {fields3, expectedSchema3},
+        {fields4, expectedSchema4},
+        {fields5, expectedSchema5}};
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  @ParameterizedTest
+  @MethodSource("getExpectedSchemaForFields")
+  public void testProjectSchemaWithNullableAndNestedFields(List<String> 
projectedFields, String expectedSchemaStr) {
+    Schema expectedSchema = Schema.parse(expectedSchemaStr);
+    Schema projectedSchema = 
HoodieAvroUtils.projectSchema(SCHEMA_WITH_NESTED_FIELD_LARGE, projectedFields);
+    assertEquals(expectedSchema, projectedSchema);
+    assertTrue(AvroSchemaUtils.isSchemaCompatible(projectedSchema, 
expectedSchema, false));
+  }
 }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index 586b8367b323..70c89c0df277 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -205,7 +205,7 @@ public class ParquetUtils extends FileFormatUtils {
             List<String> fields = new ArrayList<>();
             fields.addAll(keyGenerator.getRecordKeyFieldNames());
             fields.addAll(keyGenerator.getPartitionPathFields());
-            return HoodieAvroUtils.getSchemaForFields(readAvroSchema(storage, 
filePath), fields);
+            return HoodieAvroUtils.projectSchema(readAvroSchema(storage, 
filePath), fields);
           })
           .orElse(partitionPath.isPresent() ? 
HoodieAvroUtils.getRecordKeySchema() : 
HoodieAvroUtils.getRecordKeyPartitionPathSchema());
       AvroReadSupport.setAvroReadSchema(conf, readSchema);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index a9acac5960cd..8fb2024880bf 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -1594,7 +1594,10 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
       s"""
          |create table $tableName (
          |  record_key_col string,
-         |  name struct<first_name:string, last_name:string>,
+         |  student struct<
+         |    name: struct<first_name:string, last_name:string>,
+         |    age: int
+         |  >,
          |  ts bigint
          |) using hudi
          | options (
@@ -1608,10 +1611,10 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
          | location '$basePath'
       """.stripMargin)
     // insert initial records
-    spark.sql(s"insert into $tableName values('id1', 
named_struct('first_name', 'John', 'last_name', 'Doe'), 1)")
-    spark.sql(s"insert into $tableName values('id2', 
named_struct('first_name', 'Jane', 'last_name', 'Smith'), 2)")
-    // create secondary index on name.last_name field
-    spark.sql(s"create index idx_last_name on $tableName (name.last_name)")
+    spark.sql(s"insert into $tableName values('id1', named_struct('name', 
named_struct('first_name', 'John', 'last_name', 'Doe'), 'age', 20), 1)")
+    spark.sql(s"insert into $tableName values('id2', named_struct('name', 
named_struct('first_name', 'Jane', 'last_name', 'Smith'), 'age', 25), 2)")
+    // create secondary index on student.name.last_name field
+    spark.sql(s"create index idx_last_name on $tableName 
(student.name.last_name)")
     // validate index creation
     metaClient = HoodieTableMetaClient.builder()
       .setBasePath(basePath)
@@ -1624,18 +1627,18 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
       Seq(s"Smith${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}id2")
     )
     // verify pruning
-    checkAnswer(s"select record_key_col, name.last_name, ts from $tableName 
where name.last_name = 'Doe'")(
+    checkAnswer(s"select record_key_col, student.name.last_name, ts from 
$tableName where student.name.last_name = 'Doe'")(
       Seq("id1", "Doe", 1)
     )
     // update nested field
-    spark.sql(s"update $tableName set name = named_struct('first_name', 
'John', 'last_name', 'Brown') where record_key_col = 'id1'")
+    spark.sql(s"update $tableName set student = named_struct('name', 
named_struct('first_name', 'John', 'last_name', 'Brown'), 'age', 20) where 
record_key_col = 'id1'")
     // validate updated index records
     checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from 
hudi_metadata('$basePath') where type=7")(
       Seq(s"Brown${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}id1", false),
       Seq(s"Smith${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}id2", false)
     )
     // verify pruning
-    checkAnswer(s"select record_key_col, name.last_name, ts from $tableName 
where name.last_name = 'Brown'")(
+    checkAnswer(s"select record_key_col, student.name.last_name, ts from 
$tableName where student.name.last_name = 'Brown'")(
       Seq("id1", "Brown", 1)
     )
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index bb105e6b3b1a..7f4cfad32b64 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -199,12 +199,12 @@ public class TestHoodieIndexer extends 
SparkClientFunctionalTestHarness implemen
     
assertTrue(metaClient.getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath()));
 
     // build RLI with the indexer
-    indexMetadataPartitionsAndAssert(RECORD_INDEX.getPartitionPath(), 
Collections.singletonList(FILES), Arrays.asList(new MetadataPartitionType[] 
{COLUMN_STATS, BLOOM_FILTERS}), tableName,
+    indexMetadataPartitionsAndAssert(RECORD_INDEX.getPartitionPath(), 
Collections.singletonList(FILES), Arrays.asList(COLUMN_STATS, BLOOM_FILTERS), 
tableName,
         "streamer-config/indexer-record-index.properties");
     // build SI with the indexer
-    String indexName = "idx_name";
-    indexMetadataPartitionsAndAssert(SECONDARY_INDEX.getPartitionPath() + 
indexName, Arrays.asList(new MetadataPartitionType[] {FILES, RECORD_INDEX}),
-        Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS, 
BLOOM_FILTERS}), tableName, 
"streamer-config/indexer-secondary-index.properties");
+    String indexName = "idx_rider";
+    indexMetadataPartitionsAndAssert(SECONDARY_INDEX.getPartitionPath() + 
indexName, Arrays.asList(FILES, RECORD_INDEX),
+        Arrays.asList(COLUMN_STATS, BLOOM_FILTERS), tableName, 
"streamer-config/indexer-secondary-index.properties");
     // validate the secondary index is built
     assertTrue(metadataPartitionExists(basePath(), context(), 
SECONDARY_INDEX.getPartitionPath() + indexName));
   }
diff --git 
a/hudi-utilities/src/test/resources/streamer-config/indexer-secondary-index.properties
 
b/hudi-utilities/src/test/resources/streamer-config/indexer-secondary-index.properties
index 7ac253ae769c..3decf0ac4387 100644
--- 
a/hudi-utilities/src/test/resources/streamer-config/indexer-secondary-index.properties
+++ 
b/hudi-utilities/src/test/resources/streamer-config/indexer-secondary-index.properties
@@ -19,8 +19,8 @@
 hoodie.metadata.enable=true
 hoodie.metadata.index.async=true
 hoodie.metadata.index.secondary.enable=true
-hoodie.index.name=idx_name
-hoodie.metadata.index.secondary.column=name
+hoodie.index.name=idx_rider
+hoodie.metadata.index.secondary.column=rider
 hoodie.metadata.index.check.timeout.seconds=60
 hoodie.write.concurrency.mode=optimistic_concurrency_control
 
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider

Reply via email to