hudi-agent commented on code in PR #18680:
URL: https://github.com/apache/hudi/pull/18680#discussion_r3180187386


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java:
##########
@@ -675,15 +673,14 @@ private Pair<ClosableIterator<HoodieRecord>, 
HoodieSchema> getRecordsIterator(
    */
   private Option<Pair<Function<HoodieRecord, HoodieRecord>, HoodieSchema>> 
composeEvolvedSchemaTransformer(
       HoodieDataBlock dataBlock) {
-    if (internalSchema.isEmptySchema()) {
+    if (evolutionSchema.isEmptySchema()) {
       return Option.empty();
     }
 
     long currentInstantTime = 
Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
-    InternalSchema fileSchema = 
InternalSchemaCache.searchSchemaAndCache(currentInstantTime, 
hoodieTableMetaClient);
-    InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, 
internalSchema,
+    HoodieSchema fileSchema = 
HoodieSchemaHistoryCache.searchSchemaAndCache(currentInstantTime, 
hoodieTableMetaClient);

Review Comment:
   πŸ€– Subtle behavior change: legacy used `readerSchema.getFullName()` as the 
merged HoodieSchema's record name, but the new 
`HoodieSchemaMerger.mergeSchema()` uses `querySchema.getFullName()` (i.e. 
`evolutionSchema.getFullName()`). For the `BaseHoodieLogRecordReader` path the 
two match, but for any direct `withEvolutionSchema(...)` callers that pass a 
schema with a different record name, the merged schema name β€” and the 
downstream `rewriteRecordWithNewSchema` output β€” will differ from before. Could 
you confirm all migration callers stamp `readerSchema.getFullName()` onto the 
evolution schema, or alternatively explicitly thread 
`readerSchema.getFullName()` through here?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java:
##########
@@ -64,14 +62,9 @@ public HoodieAvroParquetReader(InputSplit inputSplit, 
Configuration conf, Option
       MessageType messageType = fileFooter.getFileMetaData().getSchema();
       baseSchema = getAvroSchemaConverter(conf).convert(messageType);
 
-      if (internalSchemaOption.isPresent()) {
+      if (evolutionSchemaOption.isPresent()) {
         // do schema reconciliation in case there exists read column which is 
not in the file schema.
-        InternalSchema mergedInternalSchema = new InternalSchemaMerger(
-            InternalSchemaConverter.convert(baseSchema),
-            internalSchemaOption.get(),
-            true,
-            true).mergeSchema();
-        baseSchema = InternalSchemaConverter.convert(mergedInternalSchema, 
baseSchema.getFullName());
+        baseSchema = new HoodieSchemaMerger(baseSchema, 
evolutionSchemaOption.get(), true, true).mergeSchema();

Review Comment:
   πŸ€– The merged `baseSchema` here will now carry the querySchema's full name β€” 
which in the `HoodieParquetInputFormat` callsite is the hardcoded `"schema"` 
from `getEvolutionSchemaOption("schema")`. The legacy code did 
`InternalSchemaConverter.convert(merged, baseSchema.getFullName())`, 
deliberately preserving the file's full name. Could you confirm parquet-avro's 
read path / `avroToArrayWritable` / any downstream consumer doesn't depend on 
the file schema's record name being preserved through the merge?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -198,15 +184,14 @@ object HoodieSchemaUtils {
   private def deduceWriterSchemaWithReconcile(sourceSchema: HoodieSchema,
                                               canonicalizedSourceSchema: 
HoodieSchema,
                                               latestTableSchema: HoodieSchema,
-                                              internalSchemaOpt: 
Option[InternalSchema],
+                                              tableEvolutionSchemaOpt: 
Option[HoodieSchema],
                                               opts: Map[String, String]): 
HoodieSchema = {
-    internalSchemaOpt match {
-      case Some(internalSchema) =>
+    tableEvolutionSchemaOpt match {
+      case Some(tableEvolutionSchema) =>
         // Apply schema evolution, by auto-merging write schema and read schema
         val setNullForMissingColumns = 
opts.getOrElse(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key(),
           
HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.defaultValue()).toBoolean
-        val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema.toAvroSchema(),
 internalSchema, setNullForMissingColumns)
-        val evolvedSchema = 
InternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getFullName)
+        val evolvedSchema = 
HoodieSchemaEvolutionUtils.reconcileSchemaStructural(canonicalizedSourceSchema, 
tableEvolutionSchema, setNullForMissingColumns)

Review Comment:
   πŸ€– The result schema's record full name source has shifted: the old code 
wrote `InternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getFullName)`, while `reconcileSchemaStructural` returns 
`HoodieSchema.fromAvroSchema(reconciled)` whose name comes from 
`AvroSchemaEvolutionUtils.reconcileSchema` β†’ `oldTableSchema.getFullName()` 
(i.e. `tableEvolutionSchema`'s name, not `latestTableSchema`'s). Is it 
intentional that the writer schema's record name now follows the evolution 
schema rather than being forced to the latest table schema? In normal Hudi 
writes both should equal `getRecordQualifiedName(table)`, but if an evolution 
schema was ever persisted with a divergent name it would now flow into 
SCHEMA_KEY (since `BaseHoodieWriteClient.saveInternalSchema` re-stamps 
SCHEMA_KEY with the writer's getFullName).
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaSerDe.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeMap;
+
+/**
+ * HoodieSchema-shaped faΓ§ade for evolution-schema JSON serialization.
+ *
+ * <p>The on-disk format is fixed: the {@code latest_schema} blob in commit 
metadata
+ * and the {@code .hoodie/.schema/} history files use the same JSON layout that
+ * {@link SerDeHelper} has always produced ({@code schemas} array containing
+ * objects with {@code version_id}, {@code max_column_id}, {@code type}, 
{@code fields}, etc.).
+ * Old tables must remain readable, so this faΓ§ade delegates to {@link 
SerDeHelper}
+ * verbatim and converts at the HoodieSchema/InternalSchema boundary via
+ * {@link HoodieSchemaInternalSchemaBridge}, preserving field ids on the way 
out.</p>
+ *
+ * <p>Phase 5 of the InternalSchema removal will rewrite the JSON serializer in
+ * pure HoodieSchema terms behind this stable interface β€” but the byte-for-byte
+ * compatibility constraint stays in force.</p>
+ */
+public final class HoodieSchemaSerDe {
+
+  /**
+   * Commit-metadata key under which the latest schema's JSON is stored. 
Carried
+   * over verbatim from {@link SerDeHelper#LATEST_SCHEMA} so callers reading 
old
+   * commit metadata pick up the same blob.
+   */
+  public static final String LATEST_SCHEMA = SerDeHelper.LATEST_SCHEMA;
+
+  /**
+   * JSON object key that wraps the array of historical schemas. Same as
+   * {@link SerDeHelper#SCHEMAS}.
+   */
+  public static final String SCHEMAS = SerDeHelper.SCHEMAS;
+
+  private HoodieSchemaSerDe() {
+  }
+
+  /**
+   * Serializes a single schema to JSON. Output format matches the legacy
+   * {@link SerDeHelper#toJson(InternalSchema)} byte for byte.
+   */
+  public static String toJson(HoodieSchema schema) {
+    return 
SerDeHelper.toJson(HoodieSchemaInternalSchemaBridge.toInternalSchema(schema));
+  }
+
+  /**
+   * Serializes a history of schemas to JSON. Output format matches the legacy
+   * {@link SerDeHelper#toJson(List)} byte for byte.
+   */
+  public static String toJsonHistory(List<HoodieSchema> schemas) {
+    List<InternalSchema> converted = new ArrayList<>(schemas.size());
+    for (HoodieSchema s : schemas) {
+      converted.add(HoodieSchemaInternalSchemaBridge.toInternalSchema(s));
+    }
+    return SerDeHelper.toJson(converted);
+  }
+
+  /**
+   * Parses a single-schema JSON blob (typically the {@code latest_schema} 
value
+   * from commit metadata). Returns empty if the input is null/empty so callers
+   * can pass through optional commit metadata fields.
+   */
+  public static Option<HoodieSchema> fromJson(String json) {
+    Option<InternalSchema> internal = SerDeHelper.fromJson(json);
+    if (!internal.isPresent()) {
+      return Option.empty();
+    }
+    return 
Option.of(HoodieSchemaInternalSchemaBridge.toHoodieSchema(internal.get(), 
defaultRecordName(internal.get())));
+  }
+
+  /**
+   * Variant of {@link #fromJson(String)} that lets the caller fix the record 
name
+   * on the resulting HoodieSchema. Equivalent to the legacy
+   * {@code SerDeHelper.fromJson(...).map(is -> 
InternalSchemaConverter.convert(is, recordName))}
+   * pattern, collapsed into a single call.
+   */
+  public static Option<HoodieSchema> fromJson(String json, String recordName) {
+    Option<InternalSchema> internal = SerDeHelper.fromJson(json);
+    if (!internal.isPresent()) {
+      return Option.empty();
+    }
+    return 
Option.of(HoodieSchemaInternalSchemaBridge.toHoodieSchema(internal.get(), 
recordName));
+  }
+
+  /**
+   * Parses the history-schemas JSON layout (array of versioned schemas) and
+   * returns them keyed by {@code version_id}. Field ids are preserved on each
+   * returned HoodieSchema so the resulting map is directly usable by the
+   * read/merge path.
+   */
+  public static TreeMap<Long, HoodieSchema> parseHistorySchemas(String json) {
+    TreeMap<Long, InternalSchema> internals = SerDeHelper.parseSchemas(json);
+    TreeMap<Long, HoodieSchema> out = new TreeMap<>();
+    for (Long versionId : internals.keySet()) {
+      InternalSchema is = internals.get(versionId);
+      HoodieSchema hs = HoodieSchemaInternalSchemaBridge.toHoodieSchema(is, 
defaultRecordName(is));
+      out.put(versionId, hs);
+    }
+    return out;
+  }
+
+  /**
+   * Appends a freshly-evolved schema to an existing serialized history blob 
and
+   * returns the new blob. Mirrors {@link 
SerDeHelper#inheritSchemas(InternalSchema, String)}
+   * β€” the {@code oldHistoryJson} is the prior {@code .hoodie/.schema/} 
contents
+   * (or empty for the first commit).
+   */
+  public static String inheritHistory(HoodieSchema newSchema, String 
oldHistoryJson) {
+    return SerDeHelper.inheritSchemas(
+        HoodieSchemaInternalSchemaBridge.toInternalSchema(newSchema), 
oldHistoryJson);
+  }
+
+  /**
+   * Resolves the schema-history entry that applies to a given version id β€” 
exact
+   * match if present, else the largest entry strictly less than {@code 
versionId},
+   * else {@code null}. HoodieSchema-shaped replacement for
+   * {@link 
org.apache.hudi.internal.schema.utils.InternalSchemaUtils#searchSchema}.
+   *
+   * <p>Note: legacy returned {@code InternalSchema.getEmptyInternalSchema()} 
on
+   * miss; this returns {@code null} so callers can choose their own empty
+   * sentinel via {@link HoodieSchema#empty()}. Most callers null-check + fall
+   * back, so the change is benign.</p>
+   */
+  public static HoodieSchema searchSchema(long versionId, 
java.util.TreeMap<Long, HoodieSchema> history) {
+    if (history.containsKey(versionId)) {
+      return history.get(versionId);
+    }
+    java.util.SortedMap<Long, HoodieSchema> headMap = 
history.headMap(versionId);
+    return headMap.isEmpty() ? null : headMap.get(headMap.lastKey());

Review Comment:
   πŸ€– This is a contract change worth verifying carefully: legacy 
`InternalSchemaUtils.searchSchema` returned 
`InternalSchema.getEmptyInternalSchema()` on miss; the new method returns 
`null`. The note says "most callers null-check + fall back", but if any 
migrated caller does the legacy idiom `result.isEmptySchema()`, that becomes an 
NPE. Could you audit existing call sites of the equivalent 
`InternalSchemaUtils.searchSchema` to confirm none rely on the empty-sentinel 
behavior, or alternatively return `HoodieSchema.empty()` here for parity?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java:
##########
@@ -173,7 +173,9 @@ protected BaseHoodieLogRecordReader(HoodieReaderContext<T> 
readerContext, Hoodie
     this.instantRange = instantRange;
     this.withOperationField = withOperationField;
     this.forceFullScan = forceFullScan;
-    this.internalSchema = readerContext.getSchemaHandler() != null ? 
readerContext.getSchemaHandler().getInternalSchema() : null;
+    this.evolutionSchema = readerContext.getSchemaHandler() != null && 
readerContext.getSchemaHandler().getInternalSchema() != null
+        ? 
HoodieSchemaInternalSchemaBridge.toHoodieSchema(readerContext.getSchemaHandler().getInternalSchema(),
 readerSchema != null ? readerSchema.getFullName() : null)

Review Comment:
   πŸ€– The defensive `readerSchema != null ? readerSchema.getFullName() : null` 
could pass a null recordName through to `InternalSchemaConverter.convert(..., 
null)`. In `visitInternalSchemaToBuildHoodieSchema` line 447, that becomes 
`nestedRecordName = "null" + "." + f.name()` for nested records β€” a corrupted 
name string rather than a clear failure. If readerSchema is guaranteed non-null 
at this point, the conditional adds noise; if it can be null, this should fail 
loudly or use a sensible default.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaInternalSchemaBridge.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.common.schema.types.Type;
+import org.apache.hudi.common.schema.types.Types;
+import org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * One-way bridge from {@link InternalSchema} to {@link HoodieSchema} that 
preserves
+ * column ids by stamping them as Avro custom properties on the HoodieSchema's
+ * underlying schema tree.
+ *
+ * <p>This exists during the InternalSchema β†’ HoodieSchema migration. The 
existing
+ * {@link InternalSchemaConverter#convert(InternalSchema, String)} produces a
+ * structurally-correct HoodieSchema but discards field ids. Downstream code in
+ * the new evolution layer relies on {@code field-id} / {@code element-id} /
+ * {@code key-id} / {@code value-id} properties being present, so we walk the
+ * InternalSchema and stamped HoodieSchema in lock-step and copy ids over.</p>
+ *
+ * <p>The walk order matches {@code 
InternalSchemaConverter.visitInternalSchemaToBuildHoodieSchema}
+ * (record fields in declared order; array element after array; map key + 
value after map),
+ * so positional pairing is exact.</p>
+ *
+ * <p>Public for the migration period only β€” Phase 4 callsite migrations across
+ * different packages need access to the conversion. Once Phase 5 rewrites the
+ * action algebra on pure HoodieSchema, this bridge and its dependency on
+ * {@code InternalSchema} go away.</p>
+ */
+public final class HoodieSchemaInternalSchemaBridge {
+
+  private HoodieSchemaInternalSchemaBridge() {
+  }
+
+  /**
+   * Converts a {@link HoodieSchema} to an {@link InternalSchema}, preserving 
column
+   * ids carried as {@code field-id} / {@code element-id} / {@code key-id} /
+   * {@code value-id} Avro custom properties. This is the inverse of
+   * {@link #toHoodieSchema(InternalSchema, String)} and exists so the faΓ§ade 
can
+   * round-trip a HoodieSchema through the legacy applier without renumbering 
ids on
+   * every call.
+   *
+   * <p>For HoodieSchemas that have not yet had ids assigned (e.g. freshly 
parsed
+   * input), this falls back to the existing
+   * {@link InternalSchemaConverter#convert(HoodieSchema)} which mints fresh 
ids.</p>
+   */
+  public static InternalSchema toInternalSchema(HoodieSchema hoodieSchema) {
+    // Preserve the empty-schema marker end-to-end: callers (e.g. 
FileGroupReader's
+    // schema handler) short-circuit on isEmptySchema() so the round-trip must 
not
+    // resurrect an "empty" HoodieSchema as a non-empty InternalSchema with the
+    // default versionId of 0.
+    if (hoodieSchema == null || hoodieSchema.isEmptySchema()) {
+      return InternalSchema.getEmptyInternalSchema();
+    }
+    // Take the structurally-correct InternalSchema produced by the existing 
converter,
+    // then walk both schemas in parallel and overwrite the InternalSchema's 
freshly-minted
+    // ids with the ids carried as Avro properties on the HoodieSchema (where 
present).
+    InternalSchema fresh = InternalSchemaConverter.convert(hoodieSchema, 
hoodieSchema.getNameToPosition());
+    Types.RecordType originalRecord = fresh.getRecord();
+    Types.RecordType reidentified = (Types.RecordType) 
reidentify(hoodieSchema, originalRecord);
+    InternalSchema result = (originalRecord == reidentified)
+        ? fresh
+        : new InternalSchema(reidentified);
+    long schemaId = hoodieSchema.schemaId();
+    if (schemaId >= 0) {
+      result.setSchemaId(schemaId);
+    }
+    int maxColumnId = hoodieSchema.maxColumnId();
+    if (maxColumnId >= 0) {
+      result.setMaxColumnId(maxColumnId);
+    }
+    return result;
+  }
+
+  /**
+   * Walks a HoodieSchema and the corresponding InternalSchema {@link Type} in 
parallel
+   * and produces a {@link Type} where each addressable id matches the 
HoodieSchema's
+   * Avro custom property (when present). Returns the original {@code 
internalType}
+   * unchanged when no overrides apply, so callers can short-circuit.
+   */
+  private static Type reidentify(HoodieSchema hoodieSchema, Type internalType) 
{
+    HoodieSchema effective = hoodieSchema.isNullable() ? 
hoodieSchema.getNonNullType() : hoodieSchema;
+    switch (internalType.typeId()) {
+      case RECORD: {
+        Types.RecordType record = (Types.RecordType) internalType;
+        if (effective.getType() != HoodieSchemaType.RECORD) {
+          return internalType;
+        }
+        List<Types.Field> originalFields = record.fields();
+        List<Types.Field> rebuilt = new ArrayList<>(originalFields.size());
+        boolean anyChange = false;
+        for (int i = 0; i < originalFields.size(); i++) {
+          Types.Field original = originalFields.get(i);
+          HoodieSchemaField hf = effective.getFields().get(i);
+          int overrideId = hf.fieldId();
+          Type childType = reidentify(hf.schema(), original.type());
+          int finalId = overrideId >= 0 ? overrideId : original.fieldId();
+          if (finalId == original.fieldId() && childType == original.type()) {
+            rebuilt.add(original);
+          } else {
+            rebuilt.add(Types.Field.get(finalId, original.isOptional(), 
original.name(), childType, original.doc()));
+            anyChange = true;
+          }
+        }
+        return anyChange ? Types.RecordType.get(rebuilt, record.name()) : 
record;
+      }
+      case ARRAY: {
+        Types.ArrayType array = (Types.ArrayType) internalType;
+        if (effective.getType() != HoodieSchemaType.ARRAY) {
+          return internalType;
+        }
+        int overrideElementId = 
readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.ELEMENT_ID_PROP),
 -1);
+        Type newElement = reidentify(effective.getElementType(), 
array.elementType());
+        int finalElementId = overrideElementId >= 0 ? overrideElementId : 
array.elementId();
+        if (finalElementId == array.elementId() && newElement == 
array.elementType()) {
+          return array;
+        }
+        return Types.ArrayType.get(finalElementId, array.isElementOptional(), 
newElement);
+      }
+      case MAP: {
+        Types.MapType map = (Types.MapType) internalType;
+        if (effective.getType() != HoodieSchemaType.MAP) {
+          return internalType;
+        }
+        int overrideKeyId = 
readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.KEY_ID_PROP), 
-1);
+        int overrideValueId = 
readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.VALUE_ID_PROP),
 -1);
+        Type newValue = reidentify(effective.getValueType(), map.valueType());
+        int finalKeyId = overrideKeyId >= 0 ? overrideKeyId : map.keyId();
+        int finalValueId = overrideValueId >= 0 ? overrideValueId : 
map.valueId();
+        if (finalKeyId == map.keyId() && finalValueId == map.valueId() && 
newValue == map.valueType()) {
+          return map;
+        }
+        return Types.MapType.get(finalKeyId, finalValueId, map.keyType(), 
newValue, map.isValueOptional());
+      }
+      default:
+        return internalType;
+    }
+  }
+
+  private static int readIntProp(Object raw, int fallback) {
+    return raw instanceof Number ? ((Number) raw).intValue() : fallback;
+  }
+
+  /**
+   * Converts an {@link InternalSchema} to a {@link HoodieSchema} and stamps 
every
+   * sub-schema with the corresponding field id from the source. The 
schema-level
+   * version id and max column id are also propagated.
+   */
+  public static HoodieSchema toHoodieSchema(InternalSchema internalSchema, 
String recordName) {
+    HoodieSchema hoodieSchema = 
InternalSchemaConverter.convert(internalSchema, recordName);

Review Comment:
   πŸ€– When `internalSchema` is empty (`getEmptyInternalSchema()` has 
`schemaId=-1`, `getMaxColumnId()=-1`), this method still calls through 
`InternalSchemaConverter.convert` and `stampIds` rather than short-circuiting. 
The result is a HoodieSchema with empty fields, schemaId=-1 (so 
`isEmptySchema()` returns true) β€” which is technically correct, but it's 
wasteful and asymmetric with `toInternalSchema` which does early-return for 
empty. Have you considered an early-return `return HoodieSchema.empty()` for 
parity? Also worth confirming: when `internalSchema.getRecord().name()` is null 
(which I believe is the case for the empty sentinel), what does the 
`recordName` parameter become β€” and does that propagate correctly through 
`InternalSchemaConverter.convert`?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java:
##########
@@ -139,7 +139,9 @@ public static HoodieFileGroupReader<RowData> 
createFileGroupReader(
         .withFileSlice(fileSlice)
         .withDataSchema(tableSchema)
         .withRequestedSchema(requiredSchema)
-        
.withInternalSchema(Option.ofNullable(internalSchemaManager.getQuerySchema()))
+        .withEvolutionSchema(internalSchemaManager.getQuerySchema() == null

Review Comment:
   πŸ€– nit: `getQuerySchema()` is called twice here β€” could you use a local 
variable or restore the 
`Option.ofNullable(internalSchemaManager.getQuerySchema())` form the original 
code used? The ternary reads fine, but invoking the getter twice is a small 
step backwards from what was there before.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala:
##########
@@ -72,113 +65,109 @@ case class AlterTableCommand(table: CatalogTable, 
changes: Seq[TableChange], cha
     // convert to delete first then add again
     val deleteChanges = changes.filter(p => 
p.isInstanceOf[DeleteColumn]).map(_.asInstanceOf[DeleteColumn])
     val addChanges = changes.filter(p => 
p.isInstanceOf[AddColumn]).map(_.asInstanceOf[AddColumn])
-    val (oldSchema, historySchema) = 
getInternalSchemaAndHistorySchemaStr(sparkSession)
+    val (oldSchema, historySchema) = 
getEvolutionSchemaAndHistorySchemaStr(sparkSession)
     val newSchema = applyAddAction2Schema(sparkSession, 
applyDeleteAction2Schema(sparkSession, oldSchema, deleteChanges), addChanges)
-    val verifiedHistorySchema = if (historySchema == null || 
historySchema.isEmpty) {
-      SerDeHelper.inheritSchemas(oldSchema, "")
-    } else {
-      historySchema
-    }
+    val verifiedHistorySchema = inheritedHistory(oldSchema, historySchema)
     AlterTableCommand.commitWithSchema(newSchema, verifiedHistorySchema, 
table, sparkSession)
     logInfo("column replace finished")
   }
 
-  def applyAddAction2Schema(sparkSession: SparkSession, oldSchema: 
InternalSchema, addChanges: Seq[AddColumn]): InternalSchema = {
-    val addChange = TableChanges.ColumnAddChange.get(oldSchema)
+  def applyAddAction2Schema(sparkSession: SparkSession, oldSchema: 
HoodieSchema, addChanges: Seq[AddColumn]): HoodieSchema = {
+    var cur = oldSchema
     addChanges.foreach { addColumn =>
       val names = addColumn.fieldNames()
       val parentName = AlterTableCommand.getParentName(names)
-      // add col change
-      val colType = 
SparkInternalSchemaConverter.buildTypeFromStructType(addColumn.dataType(), 
true, new AtomicInteger(0))
-      addChange.addColumns(parentName, names.last, colType, 
addColumn.comment())
-      // add position change
-      addColumn.position() match {
+      val fullName = names.mkString(".")
+      val colType = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(addColumn.dataType(),
 names.last)
+      val (positionType, positionRef) = addColumn.position() match {
         case after: TableChange.After =>
-          addChange.addPositionChange(names.mkString("."),
-            if (parentName.isEmpty) after.column() else parentName + "." + 
after.column(), "after")
+          (ColumnPositionType.AFTER, if (parentName.isEmpty) after.column() 
else parentName + "." + after.column())
         case _: TableChange.First =>
-          addChange.addPositionChange(names.mkString("."), "", "first")
+          (ColumnPositionType.FIRST, "")
         case _ =>
+          (ColumnPositionType.NO_OPERATION, "")
       }
+      cur = new HoodieSchemaChangeApplier(cur).applyAddChange(fullName, 
colType, addColumn.comment(), positionRef, positionType)
     }
-    SchemaChangeUtils.applyTableChanges2Schema(oldSchema, addChange)
+    cur
   }
 
-  private def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema: 
InternalSchema, deleteChanges: Seq[DeleteColumn]): InternalSchema = {
-    val deleteChange = TableChanges.ColumnDeleteChange.get(oldSchema)
-    deleteChanges.foreach { c =>
-      val originalColName = c.fieldNames().mkString(".")
-      checkSchemaChange(Seq(originalColName), table)
-      deleteChange.deleteColumn(originalColName)
+  private def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema: 
HoodieSchema, deleteChanges: Seq[DeleteColumn]): HoodieSchema = {
+    val colNames = deleteChanges.map { c =>
+      val name = c.fieldNames().mkString(".")
+      checkSchemaChange(Seq(name), table)
+      name
+    }.toArray
+    if (colNames.isEmpty) {
+      oldSchema
+    } else {
+      val newSchema = new 
HoodieSchemaChangeApplier(oldSchema).applyDeleteChange(colNames: _*)
+      // delete action should not change the getMaxColumnId field
+      newSchema.setMaxColumnId(oldSchema.maxColumnId())
+      newSchema
     }
-    val newSchema = SchemaChangeUtils.applyTableChanges2Schema(oldSchema, 
deleteChange)
-    // delete action should not change the getMaxColumnId field
-    newSchema.setMaxColumnId(oldSchema.getMaxColumnId)
-    newSchema
   }
 
 
   def applyAddAction(sparkSession: SparkSession): Unit = {
-    val (oldSchema, historySchema) = 
getInternalSchemaAndHistorySchemaStr(sparkSession)
+    val (oldSchema, historySchema) = 
getEvolutionSchemaAndHistorySchemaStr(sparkSession)
     val newSchema = applyAddAction2Schema(sparkSession, oldSchema, 
changes.map(_.asInstanceOf[AddColumn]))
-    val verifiedHistorySchema = if (historySchema == null || 
historySchema.isEmpty) {
-      SerDeHelper.inheritSchemas(oldSchema, "")
-    } else {
-      historySchema
-    }
+    val verifiedHistorySchema = inheritedHistory(oldSchema, historySchema)
     AlterTableCommand.commitWithSchema(newSchema, verifiedHistorySchema, 
table, sparkSession)
     logInfo("column add finished")
   }
 
   def applyDeleteAction(sparkSession: SparkSession): Unit = {
-    val (oldSchema, historySchema) = 
getInternalSchemaAndHistorySchemaStr(sparkSession)
+    val (oldSchema, historySchema) = 
getEvolutionSchemaAndHistorySchemaStr(sparkSession)
     val newSchema = applyDeleteAction2Schema(sparkSession, oldSchema, 
changes.map(_.asInstanceOf[DeleteColumn]))
-    val verifiedHistorySchema = if (historySchema == null || 
historySchema.isEmpty) {
-      SerDeHelper.inheritSchemas(oldSchema, "")
-    } else {
-      historySchema
-    }
+    val verifiedHistorySchema = inheritedHistory(oldSchema, historySchema)
     AlterTableCommand.commitWithSchema(newSchema, verifiedHistorySchema, 
table, sparkSession)
     logInfo("column delete finished")
   }
 
   def applyUpdateAction(sparkSession: SparkSession): Unit = {
-    val (oldSchema, historySchema) = 
getInternalSchemaAndHistorySchemaStr(sparkSession)
-    val updateChange = TableChanges.ColumnUpdateChange.get(oldSchema)
+    val (oldSchema, historySchema) = 
getEvolutionSchemaAndHistorySchemaStr(sparkSession)
+    var cur = oldSchema
     changes.foreach { change =>
-      change match {
+      val applier = new HoodieSchemaChangeApplier(cur)

Review Comment:
   πŸ€– I think this changes ALTER semantics from batch to sequential. The old 
code accumulated every change into a single `ColumnUpdateChange` and resolved 
each against the *original* schema (so `RENAME a→b` + `UPDATE a TYPE int` in 
the same alter merged on `a`'s fieldId and both succeeded). With a fresh 
`HoodieSchemaChangeApplier(cur)` per iteration, the second change resolves 
against the post-rename schema and `findField("a")` would return null. Could 
you confirm whether multi-change alters that target the same column (Spark V2 
catalog `alterTable(... TableChange...)`) are still supported, or add a test 
that exercises rename-then-update in one call? @yihua
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaEvolutionUtils.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.types.Type;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
+import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * HoodieSchema-shaped faΓ§ade for write-path schema evolution: reconciling an
+ * incoming write schema against the table's current schema, including missing
+ * columns, added columns, type promotions, and nullability adjustments.
+ *
+ * <p>Mirrors the two entry points of {@link AvroSchemaEvolutionUtils} but 
consumes
+ * and produces {@link HoodieSchema} so callers can stay off direct
+ * {@code InternalSchema} usage. During the InternalSchema β†’ HoodieSchema 
migration
+ * this delegates to the legacy implementation via
+ * {@link HoodieSchemaInternalSchemaBridge}, which preserves field ids 
end-to-end.
+ * Phase 5 rewrites the implementation in pure HoodieSchema terms behind this 
stable
+ * interface.</p>
+ */
+public final class HoodieSchemaEvolutionUtils {
+
+  private HoodieSchemaEvolutionUtils() {
+  }
+
+  /**
+   * Reconciles an incoming write schema against the existing table schema, 
adding
+   * any new columns, promoting types where allowed, and (optionally) marking
+   * missing columns as nullable.
+   *
+   * <p>Semantics match {@link 
AvroSchemaEvolutionUtils#reconcileSchema(org.apache.avro.Schema, 
InternalSchema, boolean)}:
+   * the incoming schema is assumed to have <i>missing</i> columns rather than
+   * <i>deleted</i> columns. Renames and explicit deletes are not inferred 
here β€”
+   * those are handled by the explicit DDL path through
+   * {@link HoodieSchemaChangeApplier}.</p>
+   *
+   * @param incomingSchema             incoming write schema
+   * @param oldTableSchema             current table schema (with field ids)
+   * @param makeMissingFieldsNullable  when true, table fields absent from the
+   *                                   incoming schema are marked nullable in 
the
+   *                                   reconciled result
+   * @return reconciled HoodieSchema with field ids preserved on unchanged 
columns
+   */
+  public static HoodieSchema reconcileSchema(HoodieSchema incomingSchema,
+                                             HoodieSchema oldTableSchema,
+                                             boolean 
makeMissingFieldsNullable) {
+    InternalSchema oldInternal = 
HoodieSchemaInternalSchemaBridge.toInternalSchema(oldTableSchema);
+    InternalSchema reconciled = AvroSchemaEvolutionUtils.reconcileSchema(
+        incomingSchema.getAvroSchema(), oldInternal, 
makeMissingFieldsNullable);
+    return HoodieSchemaInternalSchemaBridge.toHoodieSchema(reconciled, 
oldTableSchema.getFullName());
+  }
+
+  /**
+   * Avro-only sibling of {@link #reconcileSchema(HoodieSchema, HoodieSchema, 
boolean)}
+   * that does <em>not</em> route through the InternalSchema bridge β€” field 
ids are
+   * neither read from the inputs nor stamped on the output. Use this from the
+   * write-path's structural reconciliation (e.g. {@code deduceWriterSchema}) 
where
+   * carrying ids over from the table's evolution-schema would leak them into
+   * commit metadata and Parquet writes that historically didn't include them.
+   */
+  public static HoodieSchema reconcileSchemaStructural(HoodieSchema 
incomingSchema,
+                                                       HoodieSchema 
oldTableSchema,
+                                                       boolean 
makeMissingFieldsNullable) {
+    org.apache.avro.Schema reconciled = 
AvroSchemaEvolutionUtils.reconcileSchema(
+        incomingSchema.getAvroSchema(), oldTableSchema.getAvroSchema(), 
makeMissingFieldsNullable);
+    return HoodieSchema.fromAvroSchema(reconciled);
+  }
+
+  /**
+   * Reconciles nullability and type-promotion requirements between a source
+   * (incoming) schema and a target (existing) schema, adjusting the source to 
be
+   * in line with the target's nullability and promotable types.
+   *
+   * <p>Semantics match
+   * {@link 
AvroSchemaEvolutionUtils#reconcileSchemaRequirements(org.apache.avro.Schema, 
org.apache.avro.Schema, boolean)}.
+   * If {@code shouldReorderColumns} is true, the source's fields are ordered 
to match
+   * the target's positional layout, preserving inter-commit field 
ordering.</p>
+   *
+   * @param sourceSchema         incoming source schema to be reconciled
+   * @param targetSchema         target schema to reconcile against
+   * @param shouldReorderColumns if true, fields in the result follow the 
target's order
+   * @return source-shaped HoodieSchema with nullability and types reconciled
+   */
+  public static HoodieSchema reconcileSchemaRequirements(HoodieSchema 
sourceSchema,
+                                                         HoodieSchema 
targetSchema,
+                                                         boolean 
shouldReorderColumns) {
+    org.apache.avro.Schema reconciled = 
AvroSchemaEvolutionUtils.reconcileSchemaRequirements(
+        sourceSchema == null ? null : sourceSchema.getAvroSchema(),
+        targetSchema == null ? null : targetSchema.getAvroSchema(),
+        shouldReorderColumns);
+    return HoodieSchema.fromAvroSchema(reconciled);
+  }
+
+  /**

Review Comment:
   πŸ€– nit: this Javadoc block (Collects renamed columns) is orphaned β€” it 
immediately precedes fixNullOrdering's own Javadoc, so Javadoc tooling 
associates the collect-renamed-columns description with fixNullOrdering 
instead. Could you move it down to sit directly above the collectRenameCols 
method (which currently has no doc attached)?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -801,16 +800,16 @@ Pair<InputBatch, Boolean> 
fetchNextBatchFromSource(Option<Checkpoint> resumeChec
   @VisibleForTesting
   SchemaProvider getDeducedSchemaProvider(HoodieSchema incomingSchema, 
SchemaProvider sourceSchemaProvider, HoodieTableMetaClient metaClient) {
     Option<HoodieSchema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), storage, 
cfg.targetBasePath, metaClient);
-    Option<InternalSchema> internalSchemaOpt = 
HoodieConversionUtils.toJavaOption(
-        HoodieSchemaUtils.getLatestTableInternalSchema(
+    Option<HoodieSchema> evolutionSchemaOpt = 
HoodieConversionUtils.toJavaOption(
+        HoodieSchemaUtils.getLatestTableEvolutionSchema(
             HoodieStreamer.Config.getProps(conf, cfg), metaClient));
     // Deduce proper target (writer's) schema for the input dataset, 
reconciling its
     // schema w/ the table's one
 
-    HoodieSchema targetSchema = HoodieSchemaUtils.deduceWriterSchema(
+    HoodieSchema targetSchema = 
HoodieSchemaUtils.deduceWriterSchemaWithEvolution(

Review Comment:
   πŸ€– nit: the `WithEvolution` suffix implies there's a sibling 
`deduceWriterSchema` without evolution support, but the test below calls this 
same method with all-empty evolution options β€” suggesting it handles both 
cases. Could the method just be `deduceWriterSchema` to avoid that implication?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -166,23 +163,23 @@ private Option<Function<HoodieRecord, HoodieRecord>> 
composeSchemaEvolutionTrans
                                                                                
          HoodieBaseFile baseFile,
                                                                                
          HoodieWriteConfig writeConfig,
                                                                                
          HoodieTableMetaClient metaClient) {
-    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(writeConfig.getInternalSchema());
+    Option<HoodieSchema> querySchemaOpt = 
HoodieSchemaSerDe.fromJson(writeConfig.getInternalSchema());
     // TODO support bootstrap
     if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
       // check implicitly add columns, and position reorder(spark sql may 
change cols order)
-      InternalSchema querySchema = 
AvroSchemaEvolutionUtils.reconcileSchema(writerSchema.toAvroSchema(),
+      HoodieSchema querySchema = 
HoodieSchemaEvolutionUtils.reconcileSchema(writerSchema,
           querySchemaOpt.get(), 
writeConfig.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
       long commitInstantTime = Long.parseLong(baseFile.getCommitTime());
-      InternalSchema fileSchema = 
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, metaClient);
-      if (fileSchema.isEmptySchema() && 
writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
+      HoodieSchema fileSchema = 
HoodieSchemaHistoryCache.getSchemaByVersionId(commitInstantTime, metaClient);
+      if ((fileSchema == null || fileSchema.isEmptySchema()) && 
writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
         TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
         try {
-          fileSchema = 
InternalSchemaConverter.convert(tableSchemaResolver.getTableSchema(true));
+          fileSchema = tableSchemaResolver.getTableSchema(true);
         } catch (Exception e) {
           throw new HoodieException(String.format("Failed to get 
InternalSchema for given versionId: %s", commitInstantTime), e);

Review Comment:
   πŸ€– nit: writeInternalSchema still carries the legacy Internal label even 
though it is now typed as HoodieSchema. Could you rename it to something like 
fileWriteSchema to match the naming direction of this PR?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to