This is an automated email from the ASF dual-hosted git repository. vhs pushed a commit to branch phase-18-HoodieAvroUtils-removal in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 762b1e3f52b9a445f735fe65f035e22540ad0ac9 Author: voon <[email protected]> AuthorDate: Tue Dec 23 23:41:27 2025 +0800 Address comment 1 --- .../apache/hudi/client/BaseHoodieWriteClient.java | 2 +- ...ConcurrentSchemaEvolutionTableSchemaGetter.java | 29 ++++++++-------------- .../SimpleSchemaConflictResolutionStrategy.java | 3 +-- .../org/apache/hudi/index/HoodieIndexUtils.java | 7 ++++++ ...ConcurrentSchemaEvolutionTableSchemaGetter.java | 10 ++++---- .../HoodieSparkBootstrapSchemaProvider.java | 4 +-- .../org/apache/hudi/avro/AvroRecordContext.java | 5 ++-- .../org/apache/hudi/common/util/AvroOrcUtils.java | 4 +-- 8 files changed, 32 insertions(+), 32 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 4c4a1347903e..73be480a8346 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -341,7 +341,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient()); if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) { InternalSchema internalSchema; - HoodieSchema schema = HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(config.getSchema()), config.allowOperationMetadataField()); + HoodieSchema schema = HoodieSchemaUtils.createHoodieWriteSchema(config.getSchema(), config.allowOperationMetadataField()); if (historySchemaStr.isEmpty()) { internalSchema = SerDeHelper.fromJson(config.getInternalSchema()).orElseGet(() -> InternalSchemaConverter.convert(schema)); internalSchema.setSchemaId(Long.parseLong(instantTime)); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java index 0e5feab55e11..a51dbcc396cf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java @@ -35,7 +35,6 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.util.Lazy; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import java.io.IOException; import java.util.Arrays; @@ -59,12 +58,12 @@ class ConcurrentSchemaEvolutionTableSchemaGetter { protected final HoodieTableMetaClient metaClient; - private final Lazy<ConcurrentHashMap<HoodieInstant, Schema>> tableSchemaCache; + private final Lazy<ConcurrentHashMap<HoodieInstant, HoodieSchema>> tableSchemaCache; private Option<HoodieInstant> latestCommitWithValidSchema = Option.empty(); @VisibleForTesting - public ConcurrentHashMap<HoodieInstant, Schema> getTableSchemaCache() { + public ConcurrentHashMap<HoodieInstant, HoodieSchema> getTableSchemaCache() { return tableSchemaCache.get(); } @@ -90,18 +89,12 @@ class ConcurrentSchemaEvolutionTableSchemaGetter { } public Option<HoodieSchema> getTableSchemaIfPresent(boolean includeMetadataFields, Option<HoodieInstant> instant) { - return getTableAvroSchemaFromTimelineWithCache(instant) // Get table schema from schema evolution timeline. - .map(HoodieSchema::fromAvroSchema) + return getTableSchemaFromTimelineWithCache(instant) // Get table schema from schema evolution timeline. .or(this::getTableCreateSchemaWithoutMetaField) // Fall back: read create schema from table config. .map(tableSchema -> includeMetadataFields ? HoodieSchemaUtils.addMetadataFields(tableSchema, false) : HoodieSchemaUtils.removeMetadataFields(tableSchema)) .map(this::handlePartitionColumnsIfNeeded); } - public Option<Schema> getTableAvroSchemaIfPresent(boolean includeMetadataFields, Option<HoodieInstant> instant) { - return getTableSchemaIfPresent(includeMetadataFields, instant) - .map(HoodieSchema::toAvroSchema); - } - private Option<HoodieSchema> getTableCreateSchemaWithoutMetaField() { return metaClient.getTableConfig().getTableCreateSchema() .map(HoodieSchema::fromAvroSchema); @@ -116,16 +109,16 @@ class ConcurrentSchemaEvolutionTableSchemaGetter { } @VisibleForTesting - Option<Schema> getTableAvroSchemaFromTimelineWithCache(Option<HoodieInstant> instantTime) { - return getTableAvroSchemaFromTimelineWithCache(computeSchemaEvolutionTimelineInReverseOrder(), instantTime); + Option<HoodieSchema> getTableSchemaFromTimelineWithCache(Option<HoodieInstant> instantTime) { + return getTableSchemaFromTimelineWithCache(computeSchemaEvolutionTimelineInReverseOrder(), instantTime); } // [HUDI-9112] simplify the logic - Option<Schema> getTableAvroSchemaFromTimelineWithCache(Stream<HoodieInstant> reversedTimelineStream, Option<HoodieInstant> instantTime) { + Option<HoodieSchema> getTableSchemaFromTimelineWithCache(Stream<HoodieInstant> reversedTimelineStream, Option<HoodieInstant> instantTime) { // If instantTime is empty it means read the latest one. In that case, get the cached instant if there is one. boolean fetchFromLastValidCommit = instantTime.isEmpty(); Option<HoodieInstant> targetInstant = instantTime.or(getCachedLatestCommitWithValidSchema()); - Schema cachedTableSchema = null; + HoodieSchema cachedTableSchema = null; // Try cache first if there is a target instant to fetch for. if (!targetInstant.isEmpty()) { @@ -134,7 +127,7 @@ class ConcurrentSchemaEvolutionTableSchemaGetter { // Cache miss on either latestCommitWithValidSchema or commitMetadataCache. Compute the result. if (cachedTableSchema == null) { - Option<Pair<HoodieInstant, Schema>> instantWithSchema = getLastCommitMetadataWithValidSchemaFromTimeline(reversedTimelineStream, targetInstant); + Option<Pair<HoodieInstant, HoodieSchema>> instantWithSchema = getLastCommitMetadataWithValidSchemaFromTimeline(reversedTimelineStream, targetInstant); if (instantWithSchema.isPresent()) { targetInstant = Option.of(instantWithSchema.get().getLeft()); cachedTableSchema = instantWithSchema.get().getRight(); @@ -163,10 +156,10 @@ class ConcurrentSchemaEvolutionTableSchemaGetter { } @VisibleForTesting - Option<Pair<HoodieInstant, Schema>> getLastCommitMetadataWithValidSchemaFromTimeline(Stream<HoodieInstant> reversedTimelineStream, Option<HoodieInstant> instant) { + Option<Pair<HoodieInstant, HoodieSchema>> getLastCommitMetadataWithValidSchemaFromTimeline(Stream<HoodieInstant> reversedTimelineStream, Option<HoodieInstant> instant) { // To find the table schema given an instant time, need to walk backwards from the latest instant in // the timeline finding a completed instant containing a valid schema. - ConcurrentHashMap<HoodieInstant, Schema> tableSchemaAtInstant = new ConcurrentHashMap<>(); + ConcurrentHashMap<HoodieInstant, HoodieSchema> tableSchemaAtInstant = new ConcurrentHashMap<>(); Option<HoodieInstant> instantWithTableSchema = Option.fromJavaOptional(reversedTimelineStream // If a completion time is specified, find the first eligible instant in the schema evolution timeline. // Should switch to completion time based. @@ -183,7 +176,7 @@ class ConcurrentSchemaEvolutionTableSchemaGetter { String schemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); boolean isValidSchemaStr = !StringUtils.isNullOrEmpty(schemaStr); if (isValidSchemaStr) { - tableSchemaAtInstant.putIfAbsent(s, new Schema.Parser().parse(schemaStr)); + tableSchemaAtInstant.putIfAbsent(s, HoodieSchema.parse(schemaStr)); } return isValidSchemaStr; } catch (IOException e) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java index 5f72ce11bdf2..0ae986cd0f78 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java @@ -165,8 +165,7 @@ public class SimpleSchemaConflictResolutionStrategy implements SchemaConflictRes private static Option<HoodieSchema> getTableSchemaAtInstant(ConcurrentSchemaEvolutionTableSchemaGetter schemaResolver, HoodieInstant instant) { try { - return schemaResolver.getTableAvroSchemaIfPresent(false, Option.of(instant)) - .map(HoodieSchema::fromAvroSchema); + return schemaResolver.getTableSchemaIfPresent(false, Option.of(instant)); } catch (Exception ex) { log.error("Cannot get table schema for instant {}", instant); throw new HoodieException("Unable to get table schema", ex); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index 3ec13c4c00ee..46c04c1024a7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -63,6 +63,7 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.exception.HoodieMetadataIndexException; @@ -138,6 +139,9 @@ public class HoodieIndexUtils { static boolean validateDataTypeForSecondaryIndex(List<String> sourceFields, HoodieSchema tableSchema) { return sourceFields.stream().allMatch(fieldToIndex -> { Option<Pair<String, HoodieSchemaField>> schema = HoodieSchemaUtils.getNestedField(tableSchema, fieldToIndex); + if (schema.isEmpty()) { + throw new HoodieException("Failed to get schema. Not a valid field name: " + fieldToIndex); + } return isSecondaryIndexSupportedType(schema.get().getRight().schema()); }); } @@ -152,6 +156,9 @@ public class HoodieIndexUtils { public static boolean validateDataTypeForSecondaryOrExpressionIndex(List<String> sourceFields, HoodieSchema tableSchema) { return sourceFields.stream().anyMatch(fieldToIndex -> { Option<Pair<String, HoodieSchemaField>> nestedFieldOpt = HoodieSchemaUtils.getNestedField(tableSchema, fieldToIndex); + if (nestedFieldOpt.isEmpty()) { + throw new HoodieException("Failed to get schema. Not a valid field name: " + fieldToIndex); + } HoodieSchema fieldSchema = nestedFieldOpt.get().getRight().schema(); return fieldSchema.getType() != HoodieSchemaType.RECORD && fieldSchema.getType() != HoodieSchemaType.ARRAY && fieldSchema.getType() != HoodieSchemaType.MAP; }); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java index 7317cd13050a..0e1242a1531c 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java @@ -578,7 +578,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon HoodieInstant instant1 = metaClient.getCommitsTimeline().filterCompletedInstants().nthInstant(0).get(); // Case 1: First call with empty instant - should fetch from timeline and cache - Option<HoodieSchema> schemaOption1 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.empty()).map(HoodieSchema::fromAvroSchema); + Option<HoodieSchema> schemaOption1 = resolver.getTableSchemaFromTimelineWithCache(Option.empty()); assertTrue(schemaOption1.isPresent()); assertEquals(schema2, schemaOption1.get()); @@ -586,7 +586,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon verify(resolver, times(1)).getLastCommitMetadataWithValidSchemaFromTimeline(any(), any()); // Case 2: Second call with empty instant - should use cache - Option<HoodieSchema> schemaOption2 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.empty()).map(HoodieSchema::fromAvroSchema); + Option<HoodieSchema> schemaOption2 = resolver.getTableSchemaFromTimelineWithCache(Option.empty()); assertTrue(schemaOption2.isPresent()); assertEquals(schema2, schemaOption2.get()); @@ -594,7 +594,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon verify(resolver, times(1)).getLastCommitMetadataWithValidSchemaFromTimeline(any(), any()); // Case 3: Call with the latest valid instant - there should be a cache hit - Option<HoodieSchema> schemaOption3 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(instant2)).map(HoodieSchema::fromAvroSchema); + Option<HoodieSchema> schemaOption3 = resolver.getTableSchemaFromTimelineWithCache(Option.of(instant2)); assertTrue(schemaOption3.isPresent()); assertEquals(schema2, schemaOption3.get()); @@ -602,7 +602,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon verify(resolver, times(1)).getLastCommitMetadataWithValidSchemaFromTimeline(any(), any()); // Case 4: Second call with some other instant - should use cache - Option<HoodieSchema> schemaOption4 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(instant1)).map(HoodieSchema::fromAvroSchema); + Option<HoodieSchema> schemaOption4 = resolver.getTableSchemaFromTimelineWithCache(Option.of(instant1)); assertTrue(schemaOption4.isPresent()); assertEquals(schema1, schemaOption4.get()); @@ -613,7 +613,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon String nonExistentTime = "9999"; HoodieInstant nonExistentInstant = metaClient.getInstantGenerator().createNewInstant( HoodieInstant.State.COMPLETED, COMMIT_ACTION, nonExistentTime, nonExistentTime); - Option<HoodieSchema> schemaOption5 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(nonExistentInstant)).map(HoodieSchema::fromAvroSchema); + Option<HoodieSchema> schemaOption5 = resolver.getTableSchemaFromTimelineWithCache(Option.of(nonExistentInstant)); assertEquals(schema2, schemaOption5.get()); // Verify one more call to timeline for non-existent instant diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java index 556866444ee5..c84349c1aafe 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java @@ -18,7 +18,7 @@ package org.apache.hudi.client.bootstrap; -import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.HoodieSchemaConversionUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieFileStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -85,7 +85,7 @@ public class HoodieSparkBootstrapSchemaProvider extends HoodieBootstrapSchemaPro String structName = tableName + "_record"; String recordNamespace = "hoodie." + tableName; - return HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(parquetSchema, structName, recordNamespace)); + return HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(parquetSchema, structName, recordNamespace); } private static HoodieSchema getBootstrapSourceSchemaOrc(HoodieWriteConfig writeConfig, HoodieEngineContext context, Path filePath) { diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java index 947a292196ae..dc1e41cd589c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java @@ -79,11 +79,12 @@ public class AvroRecordContext extends RecordContext<IndexedRecord> { if (fieldOpt.isEmpty()) { return null; } - Object value = currentRecord.get(fieldOpt.get().pos()); + HoodieSchemaField field = fieldOpt.get(); + Object value = currentRecord.get(field.pos()); if (i == path.length - 1) { return value; } - currentSchema = fieldOpt.get().schema(); + currentSchema = field.schema(); currentRecord = (IndexedRecord) value; } return null; diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java index 7c9780baf623..77445f111580 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java @@ -813,8 +813,8 @@ public class AvroOrcUtils { } public static HoodieSchema createSchemaWithDefaultValue(TypeDescription orcSchema, String recordName, String namespace, boolean nullable) { - HoodieSchema hoodieSchema = createSchemaWithNamespace(orcSchema,recordName,namespace); - List<HoodieSchemaField> fields = new ArrayList<>(); + HoodieSchema hoodieSchema = createSchemaWithNamespace(orcSchema, recordName, namespace); + List<HoodieSchemaField> fields = new ArrayList<>(hoodieSchema.getFields().size()); for (HoodieSchemaField field : hoodieSchema.getFields()) { HoodieSchema fieldSchema = field.schema(); HoodieSchema nullableSchema = HoodieSchema.createNullable(fieldSchema);
