This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e3f93425cd3b [HUDI-9439] Add helpers for reading select fields from
schema in FileGroupReader (#13485)
e3f93425cd3b is described below
commit e3f93425cd3b5d5fc07d1b39f6396ab403f9b76d
Author: Lokesh Jain <[email protected]>
AuthorDate: Tue Jul 1 17:33:37 2025 +0530
[HUDI-9439] Add helpers for reading select fields from schema in
FileGroupReader (#13485)
---
.../metadata/HoodieBackedTableMetadataWriter.java | 4 +-
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 143 +++++++++++++++++--
.../hudi/metadata/HoodieTableMetadataUtil.java | 4 +-
.../SecondaryIndexRecordGenerationUtils.java | 24 +++-
.../org/apache/hudi/avro/TestHoodieAvroUtils.java | 154 ++++++++++++++++++++-
.../org/apache/hudi/common/util/ParquetUtils.java | 2 +-
.../functional/TestSecondaryIndexPruning.scala | 19 +--
.../apache/hudi/utilities/TestHoodieIndexer.java | 8 +-
.../indexer-secondary-index.properties | 4 +-
9 files changed, 327 insertions(+), 35 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 85416d329c26..ea3256321f5f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -93,6 +93,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -780,7 +781,8 @@ public abstract class HoodieBackedTableMetadataWriter<I, O>
implements HoodieTab
final String fileId = fileSlice.getFileId();
HoodieReaderContext<T> readerContext = readerContextFactory.getContext();
Schema dataSchema =
AvroSchemaCache.intern(HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(dataWriteConfig.getWriteSchema()),
dataWriteConfig.allowOperationMetadataField()));
- Schema requestedSchema =
metaClient.getTableConfig().populateMetaFields() ? getRecordKeySchema() :
dataSchema;
+ Schema requestedSchema =
metaClient.getTableConfig().populateMetaFields() ? getRecordKeySchema()
+ : HoodieAvroUtils.projectSchema(dataSchema,
Arrays.asList(metaClient.getTableConfig().getRecordKeyFields().orElse(new
String[0])));
Option<InternalSchema> internalSchemaOption =
SerDeHelper.fromJson(dataWriteConfig.getInternalSchema());
HoodieFileGroupReader<T> fileGroupReader =
HoodieFileGroupReader.<T>newBuilder()
.withReaderContext(readerContext)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index f1595d8100d8..53bef9cb238b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -84,8 +84,10 @@ import java.time.LocalDate;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
@@ -96,6 +98,7 @@ import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
+import java.util.TreeMap;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -427,19 +430,136 @@ public class HoodieAvroUtils {
}
/**
- * Fetch schema for record key and partition path.
+ * Fetches projected schema given list of fields to project. The field can
be nested in format `a.b.c` where a is
+ * the top level field, b is at second level and so on.
*/
- public static Schema getSchemaForFields(Schema fileSchema, List<String>
fields) {
- List<Schema.Field> toBeAddedFields = new ArrayList<>();
- Schema recordSchema = Schema.createRecord("HoodieRecordKey", "", "",
false);
+ public static Schema projectSchema(Schema fileSchema, List<String> fields) {
+ List<LinkedList<String>> fieldPathLists = new ArrayList<>();
+ for (String path : fields) {
+ fieldPathLists.add(new LinkedList<>(Arrays.asList(path.split("\\."))));
+ }
+
+ Schema result = Schema.createRecord(fileSchema.getName(),
fileSchema.getDoc(), fileSchema.getNamespace(), fileSchema.isError());
+ result.setFields(projectFields(fileSchema, fieldPathLists));
+ return result;
+ }
- for (Schema.Field schemaField : fileSchema.getFields()) {
- if (fields.contains(schemaField.name())) {
- toBeAddedFields.add(new Schema.Field(schemaField.name(),
schemaField.schema(), schemaField.doc(), schemaField.defaultVal()));
+ /**
+ * Projects the requested fields in the schema. The fields can be nested in
format `a.b.c`.
+ *
+ * @param originalSchema Schema to project from
+ * @param fieldPaths List of fields. The field can be nested.
+ *
+ * @return List of projected schema fields
+ */
+ private static List<Schema.Field> projectFields(Schema originalSchema,
List<LinkedList<String>> fieldPaths) {
+ // We maintain a mapping of top level field to list of nested field paths
which need to be projected from the field
+ // If the entire field needs to be projected the list is empty
+ // The map is ordered by position of the field in the original schema so
that projected schema also maintains
+ // the same field ordering
+ Map<Field, List<LinkedList<String>>> groupedByTop = new
TreeMap<>(Comparator.comparingInt(Field::pos));
+
+ // Group paths by their current level (first element)
+ // Here nested fields are considered as a path. The top level field is the
first element in the path
+ // second level field is the second element and so on.
+ // We iterate through the input field paths and populate groupedByTop
described above
+ // For example if f1 is a top level field, and input field paths are
f1.f2.f3 and f1.f2.f4
+ // we maintain a mapping from f1 to {f2->f3, f2->f4}
+ for (LinkedList<String> originalPath : fieldPaths) {
+ if (originalPath.isEmpty()) {
+ throw new IllegalArgumentException("Field path is empty or malformed:
" + originalPath);
+ }
+ LinkedList<String> path = new LinkedList<>(originalPath); // Avoid
mutating the original
+ String head = path.poll(); // Remove first element
+ Field topField = originalSchema.getField(head);
+ if (topField == null) {
+ throw new IllegalArgumentException("Field `" + head + "` not found in
schema: " + originalSchema.getFields());
}
+ groupedByTop.compute(topField, (ignored, list) -> {
+ if (path.isEmpty() || (list != null && list.isEmpty())) {
+ // Case 1: where path is empty indicating the entire field needs to
be included.
+ // No further projection provided for that field
+ // Case 2: if list is empty, it indicates that the top level field
has already been selected
+ // No more projection possible.
+ return new ArrayList<>();
+ } else {
+ if (list == null) {
+ // if list is null return just the path
+ List<LinkedList<String>> retList = new ArrayList<>();
+ retList.add(path);
+ return retList;
+ } else {
+ // if list has elements, add path to it
+ list.add(path);
+ return list;
+ }
+ }
+ });
}
- recordSchema.setFields(toBeAddedFields);
- return recordSchema;
+
+ List<Schema.Field> projectedFields = new ArrayList<>();
+ for (Map.Entry<Field, List<LinkedList<String>>> entry :
groupedByTop.entrySet()) {
+ // For every top level field we process the child fields to include in
the projected schema
+ Field originalField = entry.getKey();
+ List<LinkedList<String>> childPaths = entry.getValue();
+ Schema originalFieldSchema = originalField.schema();
+ Schema nonNullableSchema = unwrapNullable(originalFieldSchema);
+
+ Schema projectedFieldSchema;
+ if (!childPaths.isEmpty()) {
+ // If child paths are present, it indicates there are nested fields
which need to be projected
+ if (nonNullableSchema.getType() != Schema.Type.RECORD) {
+ throw new IllegalArgumentException("Cannot project nested field from
non-record field '" + originalField.name()
+ + "' of type: " + nonNullableSchema.getType());
+ }
+
+ // For those nested fields we make a recursive call to project fields
to fetch the nested schema fields
+ List<Schema.Field> nestedFields = projectFields(nonNullableSchema,
childPaths);
+ Schema nestedProjected =
Schema.createRecord(nonNullableSchema.getName(), nonNullableSchema.getDoc(),
+ nonNullableSchema.getNamespace(), nonNullableSchema.isError());
+ nestedProjected.setFields(nestedFields);
+ projectedFieldSchema = wrapNullable(originalFieldSchema,
nestedProjected);
+ } else {
+ // if child field paths are empty, we need to include the top level
field itself
+ projectedFieldSchema = originalFieldSchema;
+ }
+
+ projectedFields.add(new Schema.Field(originalField.name(),
projectedFieldSchema, originalField.doc(), originalField.defaultVal()));
+ }
+
+ return projectedFields;
+ }
+
+ /**
+ * If schema is a union with ["null", X], returns X.
+ * Otherwise, returns schema as-is.
+ */
+ public static Schema unwrapNullable(Schema schema) {
+ if (schema.getType() == Schema.Type.UNION) {
+ List<Schema> types = schema.getTypes();
+ for (Schema s : types) {
+ if (s.getType() != Schema.Type.NULL) {
+ return s;
+ }
+ }
+ }
+ return schema;
+ }
+
+ /**
+ * Wraps schema as nullable if original was a nullable union.
+ */
+ public static Schema wrapNullable(Schema original, Schema updated) {
+ if (original.getType() == Schema.Type.UNION) {
+ List<Schema> types = original.getTypes();
+ if (types.stream().anyMatch(s -> s.getType() == Schema.Type.NULL)) {
+ List<Schema> newUnion = new ArrayList<>();
+ newUnion.add(Schema.create(Schema.Type.NULL));
+ newUnion.add(updated);
+ return Schema.createUnion(newUnion);
+ }
+ }
+ return updated;
}
public static GenericRecord addHoodieKeyToRecord(GenericRecord record,
String recordKey, String partitionPath,
@@ -1602,11 +1722,12 @@ public class HoodieAvroUtils {
@VisibleForTesting
public static Pair<String, Schema.Field> getSchemaForField(Schema schema,
String fieldName, String prefix) {
+ Schema nonNullableSchema = unwrapNullable(schema);
if (!fieldName.contains(".")) {
- return Pair.of(prefix + fieldName, schema.getField(fieldName));
+ return Pair.of(prefix + fieldName,
nonNullableSchema.getField(fieldName));
} else {
int rootFieldIndex = fieldName.indexOf(".");
- Schema.Field rootField = schema.getField(fieldName.substring(0,
rootFieldIndex));
+ Schema.Field rootField =
nonNullableSchema.getField(fieldName.substring(0, rootFieldIndex));
if (rootField == null) {
throw new HoodieException("Failed to find " + fieldName + " in the
table schema ");
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index dcee9952eeda..13dd2caf2a3b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -159,7 +159,7 @@ import static java.util.stream.Collectors.toList;
import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
import static
org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
-import static org.apache.hudi.avro.HoodieAvroUtils.getSchemaForFields;
+import static org.apache.hudi.avro.HoodieAvroUtils.projectSchema;
import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro;
import static
org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
@@ -2457,7 +2457,7 @@ public class HoodieTableMetadataUtil {
List<String> mergedFields = new ArrayList<>(partitionFields.size() +
sourceFields.size());
mergedFields.addAll(partitionFields);
mergedFields.addAll(sourceFields);
- return addMetadataFields(getSchemaForFields(tableSchema, mergedFields));
+ return addMetadataFields(projectSchema(tableSchema, mergedFields));
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
index 11d4ed23cdea..8a8a0ddd549d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/SecondaryIndexRecordGenerationUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.metadata;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
@@ -48,6 +49,7 @@ import org.apache.avro.Schema;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -55,6 +57,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
import static
org.apache.hudi.metadata.HoodieMetadataPayload.createSecondaryIndexRecord;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.filePath;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.getPartitionLatestFileSlicesIncludingInflight;
@@ -216,7 +219,8 @@ public class SecondaryIndexRecordGenerationUtils {
String instantTime,
TypedProperties props,
boolean allowInflightInstants) throws IOException {
- String secondaryKeyField = String.join(".",
indexDefinition.getSourceFields());
+ String secondaryKeyField = indexDefinition.getSourceFieldsKey();
+ Schema requestedSchema = getRequestedSchemaForSecondaryIndex(metaClient,
tableSchema, secondaryKeyField);
HoodieFileGroupReader<T> fileGroupReader =
HoodieFileGroupReader.<T>newBuilder()
.withReaderContext(readerContext)
.withFileSlice(fileSlice)
@@ -224,7 +228,7 @@ public class SecondaryIndexRecordGenerationUtils {
.withProps(props)
.withLatestCommitTime(instantTime)
.withDataSchema(tableSchema)
- .withRequestedSchema(tableSchema)
+ .withRequestedSchema(requestedSchema)
.withAllowInflightInstants(allowInflightInstants)
.build();
@@ -253,10 +257,10 @@ public class SecondaryIndexRecordGenerationUtils {
// Loop to find the next valid record or exhaust the iterator.
while (recordIterator.hasNext()) {
T record = recordIterator.next();
- Object secondaryKey = readerContext.getValue(record, tableSchema,
secondaryKeyField);
+ Object secondaryKey = readerContext.getValue(record,
requestedSchema, secondaryKeyField);
if (secondaryKey != null) {
nextValidRecord = Pair.of(
- readerContext.getRecordKey(record, tableSchema),
+ readerContext.getRecordKey(record, requestedSchema),
secondaryKey.toString()
);
return true;
@@ -278,4 +282,16 @@ public class SecondaryIndexRecordGenerationUtils {
}
};
}
+
+ private static Schema
getRequestedSchemaForSecondaryIndex(HoodieTableMetaClient metaClient, Schema
tableSchema, String secondaryKeyField) {
+ String[] recordKeyFields;
+ if (tableSchema.getField(RECORD_KEY_METADATA_FIELD) != null) {
+ recordKeyFields = new String[] {RECORD_KEY_METADATA_FIELD};
+ } else {
+ recordKeyFields =
metaClient.getTableConfig().getRecordKeyFields().orElse(new String[0]);
+ }
+ String[] projectionFields = Arrays.copyOf(recordKeyFields,
recordKeyFields.length + 1);
+ projectionFields[recordKeyFields.length] = secondaryKeyField;
+ return HoodieAvroUtils.projectSchema(tableSchema,
Arrays.asList(projectionFields));
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index b1c8c453f82f..b0444e3de41c 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -179,6 +179,10 @@ public class TestHoodieAvroUtils {
+ "{\"name\":\"doubleField\",\"type\":\"double\"},"
+ "{\"name\":\"bytesField\",\"type\":\"bytes\"},"
+ "{\"name\":\"stringField\",\"type\":\"string\"},"
+ + "{\"name\":\"secondLevelField\",\"type\":[\"null\",
{\"name\":\"secondLevelField\",\"type\":\"record\",\"fields\":["
+ +
"{\"name\":\"firstname\",\"type\":[\"null\",\"string\"],\"default\":null},"
+ +
"{\"name\":\"lastname\",\"type\":[\"null\",\"string\"],\"default\":null}"
+ + "]}],\"default\":null},"
// Logical types
+
"{\"name\":\"decimalField\",\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":20,\"scale\":5},"
+
"{\"name\":\"timeMillisField\",\"type\":\"int\",\"logicalType\":\"time-millis\"},"
@@ -189,7 +193,6 @@ public class TestHoodieAvroUtils {
+
"{\"name\":\"localTimestampMicrosField\",\"type\":\"long\",\"logicalType\":\"local-timestamp-micros\"}"
+ "]}";
- private static final Schema SCHEMA_WITH_NESTED_FIELD = new
Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD_STR);
private static final Schema SCHEMA_WITH_AVRO_TYPES = new
Schema.Parser().parse(SCHEMA_WITH_AVRO_TYPES_STR);
// Define schema with a nested field containing a union type
@@ -218,7 +221,7 @@ public class TestHoodieAvroUtils {
public static String SCHEMA_WITH_NESTED_FIELD_LARGE_STR =
"{\"name\":\"MyClass\",\"type\":\"record\",\"namespace\":\"com.acme.avro\",\"fields\":["
+ "{\"name\":\"firstname\",\"type\":\"string\"},"
+ "{\"name\":\"lastname\",\"type\":\"string\"},"
- + "{\"name\":\"nested_field\",\"type\":" + SCHEMA_WITH_AVRO_TYPES_STR +
"},"
+ + "{\"name\":\"nested_field\",\"type\":[\"null\"," +
SCHEMA_WITH_AVRO_TYPES_STR + "],\"default\":null},"
+
"{\"name\":\"student\",\"type\":{\"name\":\"student\",\"type\":\"record\",\"fields\":["
+ "{\"name\":\"firstname\",\"type\":[\"null\" ,\"string\"],\"default\":
null},{\"name\":\"lastname\",\"type\":[\"null\" ,\"string\"],\"default\":
null}]}}]}";
@@ -640,6 +643,8 @@ public class TestHoodieAvroUtils {
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, 0,
data.length, null);
GenericRecord deserializedRecord = reader.read(null, decoder);
Map<String, Object> fieldValueMapping =
deserializedRecord.getSchema().getFields().stream()
+ // filtering out nested record field
+ .filter(field -> !field.schema().getType().equals(Schema.Type.UNION))
.collect(Collectors.toMap(
Schema.Field::name,
field -> deserializedRecord.get(field.name())
@@ -930,4 +935,149 @@ public class TestHoodieAvroUtils {
assertEquals(colName, actualColNameAndSchemaFile.getKey());
assertEquals(schemaType,
resolveNullableSchema(actualColNameAndSchemaFile.getValue().schema()).getType());
}
+
+ public static Stream<Arguments> getExpectedSchemaForFields() {
+ // Projection of two nested fields. secondLevelField is entirely projected
since both its fields are included
+ List<String> fields1 =
Arrays.asList("nested_field.secondLevelField.firstname",
"nested_field.secondLevelField.lastname");
+ // Expected schema - top level field and one nested field
+ String expectedSchema1 =
+ "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"MyClass\",\n"
+ + " \"doc\": \"\",\n"
+ + " \"namespace\": \"com.acme.avro\",\n"
+ + " \"fields\": [\n"
+ + " { \"name\": \"nested_field\", \"type\": [\"null\", {\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"TestRecordAvroTypes\",\n"
+ + " \"fields\": [\n"
+ + " { \"name\": \"secondLevelField\", \"type\": [\"null\",
{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"secondLevelField\",\n"
+ + " \"fields\": [\n"
+ + " { \"name\": \"firstname\", \"type\": [\"null\",
\"string\"], \"default\": null },\n"
+ + " { \"name\": \"lastname\", \"type\": [\"null\",
\"string\"], \"default\": null }\n"
+ + " ]\n"
+ + " }], \"default\": null }\n"
+ + " ]\n"
+ + " }], \"default\": null }\n"
+ + " ]\n"
+ + "}";
+
+ // Projection of first level nested field and top level field which
contains the nested field
+ // Also include the nested field twice
+ // Expected schema - top level field
+ List<String> fields2 =
Arrays.asList("nested_field.secondLevelField.lastname", "nested_field",
+ "nested_field.secondLevelField.lastname");
+ String expectedSchema2 =
+ "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"MyClass\",\n"
+ + " \"doc\": \"\",\n"
+ + " \"namespace\": \"com.acme.avro\",\n"
+ + " \"fields\": [\n"
+ + " { \"name\": \"nested_field\", \"type\": [\"null\", " +
SCHEMA_WITH_AVRO_TYPES_STR + "], \"default\": null }\n"
+ + " ]\n"
+ + "}";
+
+ // Projection of non overlapping nested field and top level field with
nested fields
+ // Expected schema - top level field and one nested field
+ List<String> fields3 = Arrays.asList("student.lastname", "nested_field");
+ String expectedSchema3 =
+ "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"MyClass\",\n"
+ + " \"doc\": \"\",\n"
+ + " \"namespace\": \"com.acme.avro\",\n"
+ + " \"fields\": [\n"
+ + " { \"name\": \"nested_field\", \"type\": [\"null\", " +
SCHEMA_WITH_AVRO_TYPES_STR + "], \"default\": null },\n"
+ + " { \"name\": \"student\", \"type\": {\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"student\",\n"
+ + " \"fields\": [\n"
+ + " { \"name\": \"lastname\", \"type\": [\"null\",
\"string\"], \"default\": null }\n"
+ + " ]\n"
+ + " }}\n"
+ + " ]\n"
+ + "}";
+
+ // Projection of two nested fields
+ // Expected schema - two nested fields
+ List<String> fields4 = Arrays.asList("student.lastname",
"nested_field.secondLevelField.lastname");
+ String expectedSchema4 =
+ "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"MyClass\",\n"
+ + " \"doc\": \"\",\n"
+ + " \"namespace\": \"com.acme.avro\",\n"
+ + " \"fields\": [\n"
+ + " { \"name\": \"nested_field\", \"type\": [\"null\", {\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"TestRecordAvroTypes\",\n"
+ + " \"fields\": [\n"
+ + " { \"name\": \"secondLevelField\", \"type\": [\"null\",
{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"secondLevelField\",\n"
+ + " \"fields\": [\n"
+ + " { \"name\": \"lastname\", \"type\": [\"null\",
\"string\"], \"default\": null }\n"
+ + " ]\n"
+ + " }], \"default\": null }\n"
+ + " ]\n"
+ + " }], \"default\": null },\n"
+ + " { \"name\": \"student\", \"type\": {\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"student\",\n"
+ + " \"namespace\": \"com.acme.avro\","
+ + " \"fields\": [\n"
+ + " { \"name\": \"lastname\", \"type\": [\"null\",
\"string\"], \"default\": null }\n"
+ + " ]\n"
+ + " }}\n"
+ + " ]\n"
+ + "}";
+
+ // Projection of top level field and nested field column
+ List<String> fields5 = Arrays.asList("firstname",
"nested_field.secondLevelField.lastname", "nested_field.longField");
+ // Expected schema - top level field and one nested field
+ String expectedSchema5 =
+ "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"MyClass\",\n"
+ + " \"doc\": \"\",\n"
+ + " \"namespace\": \"com.acme.avro\",\n"
+ + " \"fields\": [\n"
+ + " { \"name\": \"firstname\", \"type\": \"string\" },\n"
+ + " { \"name\": \"nested_field\", \"type\": [\"null\", {\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"TestRecordAvroTypes\",\n"
+ + " \"fields\": [\n"
+ + " { \"name\": \"longField\", \"type\": \"long\" },\n"
+ + " { \"name\": \"secondLevelField\", \"type\": [\"null\",
{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"secondLevelField\",\n"
+ + " \"fields\": [\n"
+ + " { \"name\": \"lastname\", \"type\": [\"null\",
\"string\"], \"default\": null }\n"
+ + " ]\n"
+ + " }], \"default\": null }\n"
+ + " ]\n"
+ + " }], \"default\": null }\n"
+ + " ]\n"
+ + "}";
+
+ Object[][] data = new Object[][] {
+ {fields1, expectedSchema1},
+ {fields2, expectedSchema2},
+ {fields3, expectedSchema3},
+ {fields4, expectedSchema4},
+ {fields5, expectedSchema5}};
+ return Stream.of(data).map(Arguments::of);
+ }
+
+ @ParameterizedTest
+ @MethodSource("getExpectedSchemaForFields")
+ public void testProjectSchemaWithNullableAndNestedFields(List<String>
projectedFields, String expectedSchemaStr) {
+ Schema expectedSchema = Schema.parse(expectedSchemaStr);
+ Schema projectedSchema =
HoodieAvroUtils.projectSchema(SCHEMA_WITH_NESTED_FIELD_LARGE, projectedFields);
+ assertEquals(expectedSchema, projectedSchema);
+ assertTrue(AvroSchemaUtils.isSchemaCompatible(projectedSchema,
expectedSchema, false));
+ }
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
index 586b8367b323..70c89c0df277 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java
@@ -205,7 +205,7 @@ public class ParquetUtils extends FileFormatUtils {
List<String> fields = new ArrayList<>();
fields.addAll(keyGenerator.getRecordKeyFieldNames());
fields.addAll(keyGenerator.getPartitionPathFields());
- return HoodieAvroUtils.getSchemaForFields(readAvroSchema(storage,
filePath), fields);
+ return HoodieAvroUtils.projectSchema(readAvroSchema(storage,
filePath), fields);
})
.orElse(partitionPath.isPresent() ?
HoodieAvroUtils.getRecordKeySchema() :
HoodieAvroUtils.getRecordKeyPartitionPathSchema());
AvroReadSupport.setAvroReadSchema(conf, readSchema);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
index a9acac5960cd..8fb2024880bf 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala
@@ -1594,7 +1594,10 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
s"""
|create table $tableName (
| record_key_col string,
- | name struct<first_name:string, last_name:string>,
+ | student struct<
+ | name: struct<first_name:string, last_name:string>,
+ | age: int
+ | >,
| ts bigint
|) using hudi
| options (
@@ -1608,10 +1611,10 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
| location '$basePath'
""".stripMargin)
// insert initial records
- spark.sql(s"insert into $tableName values('id1',
named_struct('first_name', 'John', 'last_name', 'Doe'), 1)")
- spark.sql(s"insert into $tableName values('id2',
named_struct('first_name', 'Jane', 'last_name', 'Smith'), 2)")
- // create secondary index on name.last_name field
- spark.sql(s"create index idx_last_name on $tableName (name.last_name)")
+ spark.sql(s"insert into $tableName values('id1', named_struct('name',
named_struct('first_name', 'John', 'last_name', 'Doe'), 'age', 20), 1)")
+ spark.sql(s"insert into $tableName values('id2', named_struct('name',
named_struct('first_name', 'Jane', 'last_name', 'Smith'), 'age', 25), 2)")
+ // create secondary index on student.name.last_name field
+ spark.sql(s"create index idx_last_name on $tableName
(student.name.last_name)")
// validate index creation
metaClient = HoodieTableMetaClient.builder()
.setBasePath(basePath)
@@ -1624,18 +1627,18 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
Seq(s"Smith${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}id2")
)
// verify pruning
- checkAnswer(s"select record_key_col, name.last_name, ts from $tableName
where name.last_name = 'Doe'")(
+ checkAnswer(s"select record_key_col, student.name.last_name, ts from
$tableName where student.name.last_name = 'Doe'")(
Seq("id1", "Doe", 1)
)
// update nested field
- spark.sql(s"update $tableName set name = named_struct('first_name',
'John', 'last_name', 'Brown') where record_key_col = 'id1'")
+ spark.sql(s"update $tableName set student = named_struct('name',
named_struct('first_name', 'John', 'last_name', 'Brown'), 'age', 20) where
record_key_col = 'id1'")
// validate updated index records
checkAnswer(s"select key, SecondaryIndexMetadata.isDeleted from
hudi_metadata('$basePath') where type=7")(
Seq(s"Brown${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}id1", false),
Seq(s"Smith${SECONDARY_INDEX_RECORD_KEY_SEPARATOR}id2", false)
)
// verify pruning
- checkAnswer(s"select record_key_col, name.last_name, ts from $tableName
where name.last_name = 'Brown'")(
+ checkAnswer(s"select record_key_col, student.name.last_name, ts from
$tableName where student.name.last_name = 'Brown'")(
Seq("id1", "Brown", 1)
)
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index bb105e6b3b1a..7f4cfad32b64 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -199,12 +199,12 @@ public class TestHoodieIndexer extends
SparkClientFunctionalTestHarness implemen
assertTrue(metaClient.getTableConfig().getMetadataPartitions().contains(FILES.getPartitionPath()));
// build RLI with the indexer
- indexMetadataPartitionsAndAssert(RECORD_INDEX.getPartitionPath(),
Collections.singletonList(FILES), Arrays.asList(new MetadataPartitionType[]
{COLUMN_STATS, BLOOM_FILTERS}), tableName,
+ indexMetadataPartitionsAndAssert(RECORD_INDEX.getPartitionPath(),
Collections.singletonList(FILES), Arrays.asList(COLUMN_STATS, BLOOM_FILTERS),
tableName,
"streamer-config/indexer-record-index.properties");
// build SI with the indexer
- String indexName = "idx_name";
- indexMetadataPartitionsAndAssert(SECONDARY_INDEX.getPartitionPath() +
indexName, Arrays.asList(new MetadataPartitionType[] {FILES, RECORD_INDEX}),
- Arrays.asList(new MetadataPartitionType[] {COLUMN_STATS,
BLOOM_FILTERS}), tableName,
"streamer-config/indexer-secondary-index.properties");
+ String indexName = "idx_rider";
+ indexMetadataPartitionsAndAssert(SECONDARY_INDEX.getPartitionPath() +
indexName, Arrays.asList(FILES, RECORD_INDEX),
+ Arrays.asList(COLUMN_STATS, BLOOM_FILTERS), tableName,
"streamer-config/indexer-secondary-index.properties");
// validate the secondary index is built
assertTrue(metadataPartitionExists(basePath(), context(),
SECONDARY_INDEX.getPartitionPath() + indexName));
}
diff --git
a/hudi-utilities/src/test/resources/streamer-config/indexer-secondary-index.properties
b/hudi-utilities/src/test/resources/streamer-config/indexer-secondary-index.properties
index 7ac253ae769c..3decf0ac4387 100644
---
a/hudi-utilities/src/test/resources/streamer-config/indexer-secondary-index.properties
+++
b/hudi-utilities/src/test/resources/streamer-config/indexer-secondary-index.properties
@@ -19,8 +19,8 @@
hoodie.metadata.enable=true
hoodie.metadata.index.async=true
hoodie.metadata.index.secondary.enable=true
-hoodie.index.name=idx_name
-hoodie.metadata.index.secondary.column=name
+hoodie.index.name=idx_rider
+hoodie.metadata.index.secondary.column=rider
hoodie.metadata.index.check.timeout.seconds=60
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider