This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch phase-18-HoodieAvroUtils-removal
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 762b1e3f52b9a445f735fe65f035e22540ad0ac9
Author: voon <[email protected]>
AuthorDate: Tue Dec 23 23:41:27 2025 +0800

    Address comment 1
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  2 +-
 ...ConcurrentSchemaEvolutionTableSchemaGetter.java | 29 ++++++++--------------
 .../SimpleSchemaConflictResolutionStrategy.java    |  3 +--
 .../org/apache/hudi/index/HoodieIndexUtils.java    |  7 ++++++
 ...ConcurrentSchemaEvolutionTableSchemaGetter.java | 10 ++++----
 .../HoodieSparkBootstrapSchemaProvider.java        |  4 +--
 .../org/apache/hudi/avro/AvroRecordContext.java    |  5 ++--
 .../org/apache/hudi/common/util/AvroOrcUtils.java  |  4 +--
 8 files changed, 32 insertions(+), 32 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 4c4a1347903e..73be480a8346 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -341,7 +341,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     FileBasedInternalSchemaStorageManager schemasManager = new 
FileBasedInternalSchemaStorageManager(table.getMetaClient());
     if (!historySchemaStr.isEmpty() || 
Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key())))
 {
       InternalSchema internalSchema;
-      HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(config.getSchema()), 
config.allowOperationMetadataField());
+      HoodieSchema schema = 
HoodieSchemaUtils.createHoodieWriteSchema(config.getSchema(), 
config.allowOperationMetadataField());
       if (historySchemaStr.isEmpty()) {
         internalSchema = 
SerDeHelper.fromJson(config.getInternalSchema()).orElseGet(() -> 
InternalSchemaConverter.convert(schema));
         internalSchema.setSchemaId(Long.parseLong(instantTime));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java
index 0e5feab55e11..a51dbcc396cf 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java
@@ -35,7 +35,6 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.util.Lazy;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -59,12 +58,12 @@ class ConcurrentSchemaEvolutionTableSchemaGetter {
 
   protected final HoodieTableMetaClient metaClient;
 
-  private final Lazy<ConcurrentHashMap<HoodieInstant, Schema>> 
tableSchemaCache;
+  private final Lazy<ConcurrentHashMap<HoodieInstant, HoodieSchema>> 
tableSchemaCache;
 
   private Option<HoodieInstant> latestCommitWithValidSchema = Option.empty();
 
   @VisibleForTesting
-  public ConcurrentHashMap<HoodieInstant, Schema> getTableSchemaCache() {
+  public ConcurrentHashMap<HoodieInstant, HoodieSchema> getTableSchemaCache() {
     return tableSchemaCache.get();
   }
 
@@ -90,18 +89,12 @@ class ConcurrentSchemaEvolutionTableSchemaGetter {
   }
 
   public Option<HoodieSchema> getTableSchemaIfPresent(boolean 
includeMetadataFields, Option<HoodieInstant> instant) {
-    return getTableAvroSchemaFromTimelineWithCache(instant) // Get table 
schema from schema evolution timeline.
-        .map(HoodieSchema::fromAvroSchema)
+    return getTableSchemaFromTimelineWithCache(instant) // Get table schema 
from schema evolution timeline.
         .or(this::getTableCreateSchemaWithoutMetaField) // Fall back: read 
create schema from table config.
         .map(tableSchema -> includeMetadataFields ? 
HoodieSchemaUtils.addMetadataFields(tableSchema, false) : 
HoodieSchemaUtils.removeMetadataFields(tableSchema))
         .map(this::handlePartitionColumnsIfNeeded);
   }
 
-  public Option<Schema> getTableAvroSchemaIfPresent(boolean 
includeMetadataFields, Option<HoodieInstant> instant) {
-    return getTableSchemaIfPresent(includeMetadataFields, instant)
-        .map(HoodieSchema::toAvroSchema);
-  }
-
   private Option<HoodieSchema> getTableCreateSchemaWithoutMetaField() {
     return metaClient.getTableConfig().getTableCreateSchema()
         .map(HoodieSchema::fromAvroSchema);
@@ -116,16 +109,16 @@ class ConcurrentSchemaEvolutionTableSchemaGetter {
   }
 
   @VisibleForTesting
-  Option<Schema> getTableAvroSchemaFromTimelineWithCache(Option<HoodieInstant> 
instantTime) {
-    return 
getTableAvroSchemaFromTimelineWithCache(computeSchemaEvolutionTimelineInReverseOrder(),
 instantTime);
+  Option<HoodieSchema> 
getTableSchemaFromTimelineWithCache(Option<HoodieInstant> instantTime) {
+    return 
getTableSchemaFromTimelineWithCache(computeSchemaEvolutionTimelineInReverseOrder(),
 instantTime);
   }
 
   // [HUDI-9112] simplify the logic
-  Option<Schema> getTableAvroSchemaFromTimelineWithCache(Stream<HoodieInstant> 
reversedTimelineStream, Option<HoodieInstant> instantTime) {
+  Option<HoodieSchema> 
getTableSchemaFromTimelineWithCache(Stream<HoodieInstant> 
reversedTimelineStream, Option<HoodieInstant> instantTime) {
     // If instantTime is empty it means read the latest one. In that case, get 
the cached instant if there is one.
     boolean fetchFromLastValidCommit = instantTime.isEmpty();
     Option<HoodieInstant> targetInstant = 
instantTime.or(getCachedLatestCommitWithValidSchema());
-    Schema cachedTableSchema = null;
+    HoodieSchema cachedTableSchema = null;
 
     // Try cache first if there is a target instant to fetch for.
     if (!targetInstant.isEmpty()) {
@@ -134,7 +127,7 @@ class ConcurrentSchemaEvolutionTableSchemaGetter {
 
     // Cache miss on either latestCommitWithValidSchema or 
commitMetadataCache. Compute the result.
     if (cachedTableSchema == null) {
-      Option<Pair<HoodieInstant, Schema>> instantWithSchema = 
getLastCommitMetadataWithValidSchemaFromTimeline(reversedTimelineStream, 
targetInstant);
+      Option<Pair<HoodieInstant, HoodieSchema>> instantWithSchema = 
getLastCommitMetadataWithValidSchemaFromTimeline(reversedTimelineStream, 
targetInstant);
       if (instantWithSchema.isPresent()) {
         targetInstant = Option.of(instantWithSchema.get().getLeft());
         cachedTableSchema = instantWithSchema.get().getRight();
@@ -163,10 +156,10 @@ class ConcurrentSchemaEvolutionTableSchemaGetter {
   }
 
   @VisibleForTesting
-  Option<Pair<HoodieInstant, Schema>> 
getLastCommitMetadataWithValidSchemaFromTimeline(Stream<HoodieInstant> 
reversedTimelineStream, Option<HoodieInstant> instant) {
+  Option<Pair<HoodieInstant, HoodieSchema>> 
getLastCommitMetadataWithValidSchemaFromTimeline(Stream<HoodieInstant> 
reversedTimelineStream, Option<HoodieInstant> instant) {
     // To find the table schema given an instant time, need to walk backwards 
from the latest instant in
     // the timeline finding a completed instant containing a valid schema.
-    ConcurrentHashMap<HoodieInstant, Schema> tableSchemaAtInstant = new 
ConcurrentHashMap<>();
+    ConcurrentHashMap<HoodieInstant, HoodieSchema> tableSchemaAtInstant = new 
ConcurrentHashMap<>();
     Option<HoodieInstant> instantWithTableSchema = 
Option.fromJavaOptional(reversedTimelineStream
         // If a completion time is specified, find the first eligible instant 
in the schema evolution timeline.
         // Should switch to completion time based.
@@ -183,7 +176,7 @@ class ConcurrentSchemaEvolutionTableSchemaGetter {
             String schemaStr = 
metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
             boolean isValidSchemaStr = !StringUtils.isNullOrEmpty(schemaStr);
             if (isValidSchemaStr) {
-              tableSchemaAtInstant.putIfAbsent(s, new 
Schema.Parser().parse(schemaStr));
+              tableSchemaAtInstant.putIfAbsent(s, 
HoodieSchema.parse(schemaStr));
             }
             return isValidSchemaStr;
           } catch (IOException e) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java
index 5f72ce11bdf2..0ae986cd0f78 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java
@@ -165,8 +165,7 @@ public class SimpleSchemaConflictResolutionStrategy 
implements SchemaConflictRes
 
   private static Option<HoodieSchema> 
getTableSchemaAtInstant(ConcurrentSchemaEvolutionTableSchemaGetter 
schemaResolver, HoodieInstant instant) {
     try {
-      return schemaResolver.getTableAvroSchemaIfPresent(false, 
Option.of(instant))
-          .map(HoodieSchema::fromAvroSchema);
+      return schemaResolver.getTableSchemaIfPresent(false, Option.of(instant));
     } catch (Exception ex) {
       log.error("Cannot get table schema for instant {}", instant);
       throw new HoodieException("Unable to get table schema", ex);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 3ec13c4c00ee..46c04c1024a7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -63,6 +63,7 @@ import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.exception.HoodieMetadataIndexException;
@@ -138,6 +139,9 @@ public class HoodieIndexUtils {
   static boolean validateDataTypeForSecondaryIndex(List<String> sourceFields, 
HoodieSchema tableSchema) {
     return sourceFields.stream().allMatch(fieldToIndex -> {
       Option<Pair<String, HoodieSchemaField>> schema = 
HoodieSchemaUtils.getNestedField(tableSchema, fieldToIndex);
+      if (schema.isEmpty()) {
+        throw new HoodieException("Failed to get schema. Not a valid field 
name: " + fieldToIndex);
+      }
       return isSecondaryIndexSupportedType(schema.get().getRight().schema());
     });
   }
@@ -152,6 +156,9 @@ public class HoodieIndexUtils {
   public static boolean 
validateDataTypeForSecondaryOrExpressionIndex(List<String> sourceFields, 
HoodieSchema tableSchema) {
     return sourceFields.stream().anyMatch(fieldToIndex -> {
       Option<Pair<String, HoodieSchemaField>> nestedFieldOpt = 
HoodieSchemaUtils.getNestedField(tableSchema, fieldToIndex);
+      if (nestedFieldOpt.isEmpty()) {
+        throw new HoodieException("Failed to get schema. Not a valid field 
name: " + fieldToIndex);
+      }
       HoodieSchema fieldSchema = nestedFieldOpt.get().getRight().schema();
       return fieldSchema.getType() != HoodieSchemaType.RECORD && 
fieldSchema.getType() != HoodieSchemaType.ARRAY && fieldSchema.getType() != 
HoodieSchemaType.MAP;
     });
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java
index 7317cd13050a..0e1242a1531c 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java
@@ -578,7 +578,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
     HoodieInstant instant1 = 
metaClient.getCommitsTimeline().filterCompletedInstants().nthInstant(0).get();
 
     // Case 1: First call with empty instant - should fetch from timeline and 
cache
-    Option<HoodieSchema> schemaOption1 = 
resolver.getTableAvroSchemaFromTimelineWithCache(Option.empty()).map(HoodieSchema::fromAvroSchema);
+    Option<HoodieSchema> schemaOption1 = 
resolver.getTableSchemaFromTimelineWithCache(Option.empty());
     assertTrue(schemaOption1.isPresent());
     assertEquals(schema2, schemaOption1.get());
 
@@ -586,7 +586,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
     verify(resolver, 
times(1)).getLastCommitMetadataWithValidSchemaFromTimeline(any(), any());
 
     // Case 2: Second call with empty instant - should use cache
-    Option<HoodieSchema> schemaOption2 = 
resolver.getTableAvroSchemaFromTimelineWithCache(Option.empty()).map(HoodieSchema::fromAvroSchema);
+    Option<HoodieSchema> schemaOption2 = 
resolver.getTableSchemaFromTimelineWithCache(Option.empty());
     assertTrue(schemaOption2.isPresent());
     assertEquals(schema2, schemaOption2.get());
 
@@ -594,7 +594,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
     verify(resolver, 
times(1)).getLastCommitMetadataWithValidSchemaFromTimeline(any(), any());
 
     // Case 3: Call with the latest valid instant - there should be a cache hit
-    Option<HoodieSchema> schemaOption3 = 
resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(instant2)).map(HoodieSchema::fromAvroSchema);
+    Option<HoodieSchema> schemaOption3 = 
resolver.getTableSchemaFromTimelineWithCache(Option.of(instant2));
     assertTrue(schemaOption3.isPresent());
     assertEquals(schema2, schemaOption3.get());
 
@@ -602,7 +602,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
     verify(resolver, 
times(1)).getLastCommitMetadataWithValidSchemaFromTimeline(any(), any());
 
     // Case 4: Second call with some other instant - should use cache
-    Option<HoodieSchema> schemaOption4 = 
resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(instant1)).map(HoodieSchema::fromAvroSchema);
+    Option<HoodieSchema> schemaOption4 = 
resolver.getTableSchemaFromTimelineWithCache(Option.of(instant1));
     assertTrue(schemaOption4.isPresent());
     assertEquals(schema1, schemaOption4.get());
 
@@ -613,7 +613,7 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter 
extends HoodieCommon
     String nonExistentTime = "9999";
     HoodieInstant nonExistentInstant = 
metaClient.getInstantGenerator().createNewInstant(
         HoodieInstant.State.COMPLETED, COMMIT_ACTION, nonExistentTime, 
nonExistentTime);
-    Option<HoodieSchema> schemaOption5 = 
resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(nonExistentInstant)).map(HoodieSchema::fromAvroSchema);
+    Option<HoodieSchema> schemaOption5 = 
resolver.getTableSchemaFromTimelineWithCache(Option.of(nonExistentInstant));
     assertEquals(schema2, schemaOption5.get());
 
     // Verify one more call to timeline for non-existent instant
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
index 556866444ee5..c84349c1aafe 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.client.bootstrap;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -85,7 +85,7 @@ public class HoodieSparkBootstrapSchemaProvider extends 
HoodieBootstrapSchemaPro
     String structName = tableName + "_record";
     String recordNamespace = "hoodie." + tableName;
 
-    return 
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(parquetSchema,
 structName, recordNamespace));
+    return 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(parquetSchema, 
structName, recordNamespace);
   }
 
   private static HoodieSchema getBootstrapSourceSchemaOrc(HoodieWriteConfig 
writeConfig, HoodieEngineContext context, Path filePath) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
index 947a292196ae..dc1e41cd589c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java
@@ -79,11 +79,12 @@ public class AvroRecordContext extends 
RecordContext<IndexedRecord> {
       if (fieldOpt.isEmpty()) {
         return null;
       }
-      Object value = currentRecord.get(fieldOpt.get().pos());
+      HoodieSchemaField field = fieldOpt.get();
+      Object value = currentRecord.get(field.pos());
       if (i == path.length - 1) {
         return value;
       }
-      currentSchema = fieldOpt.get().schema();
+      currentSchema = field.schema();
       currentRecord = (IndexedRecord) value;
     }
     return null;
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java
index 7c9780baf623..77445f111580 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java
@@ -813,8 +813,8 @@ public class AvroOrcUtils {
   }
 
   public static HoodieSchema createSchemaWithDefaultValue(TypeDescription 
orcSchema, String recordName, String namespace, boolean nullable) {
-    HoodieSchema hoodieSchema = 
createSchemaWithNamespace(orcSchema,recordName,namespace);
-    List<HoodieSchemaField> fields = new ArrayList<>();
+    HoodieSchema hoodieSchema = createSchemaWithNamespace(orcSchema, 
recordName, namespace);
+    List<HoodieSchemaField> fields = new 
ArrayList<>(hoodieSchema.getFields().size());
     for (HoodieSchemaField field : hoodieSchema.getFields()) {
       HoodieSchema fieldSchema = field.schema();
       HoodieSchema nullableSchema = HoodieSchema.createNullable(fieldSchema);

Reply via email to