hudi-agent commented on code in PR #18680: URL: https://github.com/apache/hudi/pull/18680#discussion_r3191650510
########## hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaEvolutionUtils.java: ########## @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema.evolution; + +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.types.Type; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.convert.InternalSchemaConverter; +import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * HoodieSchema-shaped faΓ§ade for write-path schema evolution: reconciling an + * incoming write schema against the table's current schema, including missing + * columns, added columns, type promotions, and nullability adjustments. + * + * <p>Mirrors the two entry points of {@link AvroSchemaEvolutionUtils} but consumes + * and produces {@link HoodieSchema} so callers can stay off direct + * {@code InternalSchema} usage. During the InternalSchema β HoodieSchema migration + * this delegates to the legacy implementation via + * {@link HoodieSchemaInternalSchemaBridge}, which preserves field ids end-to-end. + * Phase 5 rewrites the implementation in pure HoodieSchema terms behind this stable + * interface.</p> + */ +public final class HoodieSchemaEvolutionUtils { + + private HoodieSchemaEvolutionUtils() { + } + + /** + * Reconciles an incoming write schema against the existing table schema, adding + * any new columns, promoting types where allowed, and (optionally) marking + * missing columns as nullable. + * + * <p>Semantics match {@link AvroSchemaEvolutionUtils#reconcileSchema(org.apache.avro.Schema, InternalSchema, boolean)}: + * the incoming schema is assumed to have <i>missing</i> columns rather than + * <i>deleted</i> columns. Renames and explicit deletes are not inferred here β + * those are handled by the explicit DDL path through + * {@link HoodieSchemaChangeApplier}.</p> + * + * @param incomingSchema incoming write schema + * @param oldTableSchema current table schema (with field ids) + * @param makeMissingFieldsNullable when true, table fields absent from the + * incoming schema are marked nullable in the + * reconciled result + * @return reconciled HoodieSchema with field ids preserved on unchanged columns + */ + public static HoodieSchema reconcileSchema(HoodieSchema incomingSchema, + HoodieSchema oldTableSchema, + boolean makeMissingFieldsNullable) { + InternalSchema oldInternal = HoodieSchemaInternalSchemaBridge.toInternalSchema(oldTableSchema); + InternalSchema reconciled = AvroSchemaEvolutionUtils.reconcileSchema( + incomingSchema.getAvroSchema(), oldInternal, makeMissingFieldsNullable); + return HoodieSchemaInternalSchemaBridge.toHoodieSchema(reconciled, oldTableSchema.getFullName()); + } + + /** + * Avro-only sibling of {@link #reconcileSchema(HoodieSchema, HoodieSchema, boolean)} + * that does <em>not</em> route through the InternalSchema bridge β field ids are + * neither read from the inputs nor stamped on the output. Use this from the + * write-path's structural reconciliation (e.g. {@code deduceWriterSchema}) where + * carrying ids over from the table's evolution-schema would leak them into + * commit metadata and Parquet writes that historically didn't include them. + */ + public static HoodieSchema reconcileSchemaStructural(HoodieSchema incomingSchema, + HoodieSchema oldTableSchema, + boolean makeMissingFieldsNullable) { + org.apache.avro.Schema reconciled = AvroSchemaEvolutionUtils.reconcileSchema( + incomingSchema.getAvroSchema(), oldTableSchema.getAvroSchema(), makeMissingFieldsNullable); + return HoodieSchema.fromAvroSchema(reconciled); + } + + /** + * Reconciles nullability and type-promotion requirements between a source + * (incoming) schema and a target (existing) schema, adjusting the source to be + * in line with the target's nullability and promotable types. + * + * <p>Semantics match + * {@link AvroSchemaEvolutionUtils#reconcileSchemaRequirements(org.apache.avro.Schema, org.apache.avro.Schema, boolean)}. + * If {@code shouldReorderColumns} is true, the source's fields are ordered to match + * the target's positional layout, preserving inter-commit field ordering.</p> + * + * @param sourceSchema incoming source schema to be reconciled + * @param targetSchema target schema to reconcile against + * @param shouldReorderColumns if true, fields in the result follow the target's order + * @return source-shaped HoodieSchema with nullability and types reconciled + */ + public static HoodieSchema reconcileSchemaRequirements(HoodieSchema sourceSchema, + HoodieSchema targetSchema, + boolean shouldReorderColumns) { + org.apache.avro.Schema reconciled = AvroSchemaEvolutionUtils.reconcileSchemaRequirements( + sourceSchema == null ? null : sourceSchema.getAvroSchema(), + targetSchema == null ? null : targetSchema.getAvroSchema(), + shouldReorderColumns); + return HoodieSchema.fromAvroSchema(reconciled); + } + + /** + * Normalizes union ordering so {@code null} sits first within nullable union + * branches, matching the ordering Hudi has historically written to disk. Returns + * {@code HoodieSchema.NULL_SCHEMA} for a {@code null} input and the schema + * unchanged for non-record types. Wraps the legacy + * {@code InternalSchemaConverter.fixNullOrdering} during the migration. + */ + public static HoodieSchema fixNullOrdering(HoodieSchema schema) { + return InternalSchemaConverter.fixNullOrdering(schema); + } + + /** + * Collects top-level columns whose primitive type differs between two schemas, + * keyed by the column's index in {@code schema}. The pair holds (newType, + * oldType) so callers can build a cast plan from {@code oldType} to + * {@code newType}. HoodieSchema-direct replacement for + * {@link InternalSchemaUtils#collectTypeChangedCols(InternalSchema, InternalSchema)}. + * + * <p>Walks ids on the HoodieSchema accessors directly; only converts to + * {@link Type} at the result construction (callers expect Type pairs for the + * cast-plan downstream).</p> + */ + public static Map<Integer, Pair<Type, Type>> collectTypeChangedCols(HoodieSchema schema, HoodieSchema oldSchema) { + Set<Integer> ids = schema.getAllIds(); + Set<Integer> otherIds = oldSchema.getAllIds(); + Map<Integer, Pair<Type, Type>> result = new HashMap<>(); + List<Integer> topLevelIds = schema.getFields().stream() + .map(HoodieSchemaField::fieldId).collect(Collectors.toList()); + for (Integer id : ids) { + if (!otherIds.contains(id)) { + continue; + } + HoodieSchema thisType = schema.findType(id); Review Comment: π€ I think this comparison is now stricter than the legacy `InternalSchemaUtils.collectTypeChangedCols`. Legacy `InternalSchema.findType(id)` returned the unwrapped `Type` (no nullability info), so a field that only flipped nullability (e.g. `int?` β `int`) compared equal and was skipped. New `HoodieSchema.findType(id)` returns `field.schema()` per `HoodieSchemaIndex.walk`, which is the `[null, T]` union for nullable fields β so `thisType.equals(otherType)` is now false for a nullability-only change and the column is reported as type-changed. Was that intentional, or should the comparison `getNonNullType()` both sides first to match legacy behavior? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java: ########## @@ -210,29 +208,29 @@ protected Pair<ClosableIterator<T>, HoodieSchema> getRecordsIterator(HoodieDataB /** * Get final Read Schema for support evolution. Review Comment: π€ Same record-name drift you fixed in `AbstractHoodieLogRecordScanner` / `HoodieMergeHelper` / `HoodieAvroParquetReader` β legacy passed `readerSchema.getFullName()` to `InternalSchemaConverter.convert(...)` here, but `mergeSchemaGetRenamed()` (no-arg) defaults to `querySchema.getFullName()` (i.e. `evolutionSchemaOpt.get().getFullName()`). Could you switch to `mergeSchemaGetRenamed(readerSchema.getFullName())` to preserve the legacy name on the projection target? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java: ########## @@ -132,35 +134,39 @@ public Option<UnaryOperator<T>> getOutputConverter() { } public Pair<HoodieSchema, Map<String, String>> getRequiredSchemaForFileAndRenamedColumns(StoragePath path) { - if (internalSchema.isEmptySchema()) { + if (!evolutionSchemaOpt.isPresent()) { Review Comment: π€ Same record-name drift here β legacy passed `requiredSchema.getFullName()` to `InternalSchemaConverter.convert(...)`, but `mergeSchemaGetRenamed()` (no-arg) defaults to `evolutionSchemaOpt.get().getFullName()`. Could you switch to `mergeSchemaGetRenamed(requiredSchema.getFullName())` so the interned merged schema keeps the required-schema's full name? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java: ########## @@ -1687,43 +1694,48 @@ public void updateColumnType(String colName, Type newType) { * @param doc . */ public void updateColumnComment(String colName, String doc) { - Pair<InternalSchema, HoodieTableMetaClient> pair = getInternalSchemaAndMetaClient(); - InternalSchema newSchema = new InternalSchemaChangeApplier(pair.getLeft()).applyColumnCommentChange(colName, doc); - commitTableChange(newSchema, pair.getRight()); + Pair<HoodieSchema, HoodieTableMetaClient> pair = getEvolutionSchemaAndMetaClient(); + HoodieSchema evolved = new HoodieSchemaChangeApplier(pair.getLeft()).applyColumnCommentChange(colName, doc); + commitTableChange(evolved, pair.getRight()); } /** - * reorder the position of col. + * Reorder the position of col. * * @param colName column which need to be reordered. if we want to change col from a nested filed, the fullName should be specified. * @param referColName reference position. * @param orderType col position change type. now support three change types: first/after/before */ - public void reOrderColPosition(String colName, String referColName, TableChange.ColumnPositionChange.ColumnPositionType orderType) { + public void reOrderColPosition(String colName, String referColName, ColumnPositionType orderType) { if (colName == null || orderType == null || referColName == null) { return; } - //get internalSchema - Pair<InternalSchema, HoodieTableMetaClient> pair = getInternalSchemaAndMetaClient(); - InternalSchema newSchema = new InternalSchemaChangeApplier(pair.getLeft()) + Pair<HoodieSchema, HoodieTableMetaClient> pair = getEvolutionSchemaAndMetaClient(); + HoodieSchema evolved = new HoodieSchemaChangeApplier(pair.getLeft()) .applyReOrderColPositionChange(colName, referColName, orderType); - commitTableChange(newSchema, pair.getRight()); + commitTableChange(evolved, pair.getRight()); } - public Pair<InternalSchema, HoodieTableMetaClient> getInternalSchemaAndMetaClient() { + /** + * Resolves the table's evolution schema (or assigns fresh ids to the data schema + * as a fallback) and pairs it with a fresh metaClient. + */ + public Pair<HoodieSchema, HoodieTableMetaClient> getEvolutionSchemaAndMetaClient() { HoodieTableMetaClient metaClient = createMetaClient(true); TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); - return Pair.of(getInternalSchema(schemaUtil), metaClient); + return Pair.of(getEvolutionSchema(schemaUtil), metaClient); } - public void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient metaClient) { + /** + * Persists the post-DDL evolution schema to commit metadata and the history file. + */ + public void commitTableChange(HoodieSchema newEvolutionSchema, HoodieTableMetaClient metaClient) { TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElseGet( - () -> SerDeHelper.inheritSchemas(getInternalSchema(schemaUtil), "")); - HoodieSchema schema = InternalSchemaConverter.convert(newSchema, HoodieSchemaUtils.getRecordQualifiedName(config.getTableName())); + () -> HoodieSchemaSerDe.inheritHistory(getEvolutionSchema(schemaUtil), "")); String commitActionType = CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, metaClient.getTableType()); String instantTime = startCommit(commitActionType, metaClient); - config.setSchema(schema.toString()); + config.setSchema(newEvolutionSchema.toString()); Review Comment: π€ The legacy `commitTableChange` forcibly renamed the schema to `HoodieSchemaUtils.getRecordQualifiedName(config.getTableName())` before `config.setSchema(...)`. The new code stores `newEvolutionSchema.toString()` directly, so the record name now drifts to whatever the input HoodieSchema carried. You already applied the canonical-name fix in `AlterTableCommand.commitWithSchema` (lines 256-264) β should the same normalization be done here? The Flink `HoodieCatalogUtil.alterTable` path goes through this method and would otherwise diverge from the Spark path. <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala: ########## @@ -244,13 +247,22 @@ object AlterTableCommand extends Logging { /** * Generate an commit with new schema to change the table's schema. * - * @param internalSchema new schema after change + * @param evolutionSchema new schema after change * @param historySchemaStr history schemas * @param table The hoodie table. * @param sparkSession The spark session. */ - def commitWithSchema(internalSchema: InternalSchema, historySchemaStr: String, table: CatalogTable, sparkSession: SparkSession): Unit = { - val schema = InternalSchemaConverter.convert(internalSchema, HoodieSchemaUtils.getRecordQualifiedName(table.identifier.table)) + def commitWithSchema(rawEvolutionSchema: HoodieSchema, historySchemaStr: String, table: CatalogTable, sparkSession: SparkSession): Unit = { + // Force the persisted schema's record name to the canonical + // <namespace>.<tableName> form, matching the legacy + // InternalSchemaConverter.convert(merged, getRecordQualifiedName(table.identifier.table)) + // behavior. Without this, an evolution schema parsed from prior commit metadata would + // round-trip its existing name, which can drift from the table-identifier-based name + // and end up persisted in LATEST_SCHEMA / consumed by the write client. + val canonicalName = HoodieCommonSchemaUtils.getRecordQualifiedName(table.identifier.table) + val evolutionSchema = if (rawEvolutionSchema.getFullName == canonicalName) rawEvolutionSchema + else HoodieSchemaInternalSchemaBridge.withRecordName(rawEvolutionSchema, canonicalName) + val schema = evolutionSchema val path = getTableLocation(table, sparkSession) Review Comment: π€ nit: `val schema = evolutionSchema` is a redundant alias β `schema` isn't used anywhere that `evolutionSchema` couldn't be used directly. Could you drop this line and use `evolutionSchema` consistently throughout the method? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java: ########## @@ -200,26 +207,26 @@ public void doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt LOG.warn("Expected realtime split for mor table. Found split: {}", split); return; } - if (internalSchemaOption.isPresent()) { + if (evolutionSchemaOption.isPresent()) { HoodieSchema tableSchema = getSchemaFromCache(); List<String> requiredColumns = getRequireColumn(job); - InternalSchema prunedInternalSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), - requiredColumns); + HoodieSchema writerSchema = HoodieSchemaInternalSchemaBridge.withRecordName( + evolutionSchemaOption.get(), tableSchema.getName()); + HoodieSchema prunedEvolutionSchema = HoodieSchemaInternalSchemaBridge.pruneByLeafNames( Review Comment: π€ I think the reader schema's record name drifts from the writer schema's here. Legacy did `InternalSchemaConverter.convert(prunedInternalSchema, tableSchema.getName())` for both, but `pruneByLeafNames` preserves the source's full name β and `HoodieSchemaSerDe.fromJson` defaults that to `"hoodieSchema"` (see `defaultRecordName`), so `prunedEvolutionSchema.getFullName()` will typically be `"hoodieSchema"` while the writer is `tableSchema.getName()`. Could you wrap the pruned result in `HoodieSchemaInternalSchemaBridge.withRecordName(..., tableSchema.getName())` to match the writer (same fix you applied in `HoodieAvroParquetReader` via `mergeSchema(baseSchema.getFullName())`)? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaHistoryCache.java: ########## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema.evolution; + +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.TimelineLayout; +import org.apache.hudi.common.util.InternalSchemaCache; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.storage.HoodieStorage; + +/** + * HoodieSchema-shaped faΓ§ade over the global schema-history cache. Delegates to + * {@link InternalSchemaCache} so the underlying segment-locked Caffeine cache is + * shared across the legacy and new code paths during migration β there's only one + * cache per JVM, regardless of which API the caller used. + * + * <p>String-based entry points (commit-metadata blob lookup) pass through + * unchanged; methods that previously returned {@link InternalSchema} now return + * {@link HoodieSchema} via {@link HoodieSchemaInternalSchemaBridge}, with field + * ids preserved.</p> + */ +public final class HoodieSchemaHistoryCache { + + private HoodieSchemaHistoryCache() { + } + + /** + * Resolves a schema by version id, hitting the cache when possible and falling + * back to the schema-history files on disk. Returns null when no schema with + * that id exists for the table. + */ + public static HoodieSchema searchSchemaAndCache(long versionId, HoodieTableMetaClient metaClient) { + InternalSchema internal = InternalSchemaCache.searchSchemaAndCache(versionId, metaClient); + return wrap(internal); + } Review Comment: π€ nit: this method's name still says `InternalSchema` even though the new class deliberately drops that vocabulary β could you rename to something like `getEvolutionSchemaAndAvroSchemaForClusteringAndCompaction` (and update the one caller in `RunCompactionActionExecutor`) so the faΓ§ade reads consistently? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkInternalSchemaConverter.java: ########## @@ -212,27 +208,30 @@ private static void collectColNamesFromStructType(DataType sparkType, Deque<Stri private static void addFullName(DataType sparkType, String name, Deque<String> fieldNames, List<String> resultSet) { if (!(sparkType instanceof StructType) && !(sparkType instanceof ArrayType) && !(sparkType instanceof MapType)) { - resultSet.add(InternalSchemaUtils.createFullName(name, fieldNames)); + if (fieldNames.isEmpty()) { Review Comment: π€ nit: this inlined `addFullName` re-implements what `HoodieSchemaIndex.createFullName(String, Deque<String>)` already does. Could you call that helper (or extract a shared util) instead of duplicating the descending-iterator logic? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaInternalSchemaBridge.java: ########## @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema.evolution; + +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.schema.HoodieSchemaUtils; +import org.apache.hudi.common.schema.types.Type; +import org.apache.hudi.common.schema.types.Types; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.convert.InternalSchemaConverter; +import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * One-way bridge from {@link InternalSchema} to {@link HoodieSchema} that preserves + * column ids by stamping them as Avro custom properties on the HoodieSchema's + * underlying schema tree. + * + * <p>This exists during the InternalSchema β HoodieSchema migration. The existing + * {@link InternalSchemaConverter#convert(InternalSchema, String)} produces a + * structurally-correct HoodieSchema but discards field ids. Downstream code in + * the new evolution layer relies on {@code field-id} / {@code element-id} / + * {@code key-id} / {@code value-id} properties being present, so we walk the + * InternalSchema and stamped HoodieSchema in lock-step and copy ids over.</p> + * + * <p>The walk order matches {@code InternalSchemaConverter.visitInternalSchemaToBuildHoodieSchema} + * (record fields in declared order; array element after array; map key + value after map), + * so positional pairing is exact.</p> + * + * <p>Public for the migration period only β Phase 4 callsite migrations across + * different packages need access to the conversion. Once Phase 5 rewrites the + * action algebra on pure HoodieSchema, this bridge and its dependency on + * {@code InternalSchema} go away.</p> + */ +public final class HoodieSchemaInternalSchemaBridge { + + private HoodieSchemaInternalSchemaBridge() { + } + + /** + * Converts a {@link HoodieSchema} to an {@link InternalSchema}, preserving column + * ids carried as {@code field-id} / {@code element-id} / {@code key-id} / + * {@code value-id} Avro custom properties. This is the inverse of + * {@link #toHoodieSchema(InternalSchema, String)} and exists so the faΓ§ade can + * round-trip a HoodieSchema through the legacy applier without renumbering ids on + * every call. + * + * <p>For HoodieSchemas that have not yet had ids assigned (e.g. freshly parsed + * input), this falls back to the existing + * {@link InternalSchemaConverter#convert(HoodieSchema)} which mints fresh ids.</p> + */ + public static InternalSchema toInternalSchema(HoodieSchema hoodieSchema) { + if (hoodieSchema == null || hoodieSchema.isEmptySchema()) { + return InternalSchema.getEmptyInternalSchema(); + } + // Take the structurally-correct InternalSchema produced by the existing converter, + // then walk both schemas in parallel and overwrite the InternalSchema's freshly-minted + // ids with the ids carried as Avro properties on the HoodieSchema (where present). + InternalSchema fresh = InternalSchemaConverter.convert(hoodieSchema, hoodieSchema.getNameToPosition()); + Types.RecordType originalRecord = fresh.getRecord(); + Types.RecordType reidentified = (Types.RecordType) reidentify(hoodieSchema, originalRecord); + InternalSchema result = (originalRecord == reidentified) + ? fresh + : new InternalSchema(reidentified); + long schemaId = hoodieSchema.schemaId(); + if (schemaId >= 0) { + result.setSchemaId(schemaId); + } + int maxColumnId = hoodieSchema.maxColumnId(); + if (maxColumnId >= 0) { + result.setMaxColumnId(maxColumnId); + } + return result; + } + + /** + * Walks a HoodieSchema and the corresponding InternalSchema {@link Type} in parallel + * and produces a {@link Type} where each addressable id matches the HoodieSchema's + * Avro custom property (when present). Returns the original {@code internalType} + * unchanged when no overrides apply, so callers can short-circuit. + */ + private static Type reidentify(HoodieSchema hoodieSchema, Type internalType) { + HoodieSchema effective = hoodieSchema.isNullable() ? hoodieSchema.getNonNullType() : hoodieSchema; + switch (internalType.typeId()) { + case RECORD: { + Types.RecordType record = (Types.RecordType) internalType; + if (effective.getType() != HoodieSchemaType.RECORD) { + return internalType; + } + List<Types.Field> originalFields = record.fields(); + List<Types.Field> rebuilt = new ArrayList<>(originalFields.size()); + boolean anyChange = false; + for (int i = 0; i < originalFields.size(); i++) { + Types.Field original = originalFields.get(i); + HoodieSchemaField hf = effective.getFields().get(i); + int overrideId = hf.fieldId(); + Type childType = reidentify(hf.schema(), original.type()); + int finalId = overrideId >= 0 ? overrideId : original.fieldId(); + if (finalId == original.fieldId() && childType == original.type()) { + rebuilt.add(original); + } else { + rebuilt.add(Types.Field.get(finalId, original.isOptional(), original.name(), childType, original.doc())); + anyChange = true; + } + } + return anyChange ? Types.RecordType.get(rebuilt, record.name()) : record; + } + case ARRAY: { + Types.ArrayType array = (Types.ArrayType) internalType; + if (effective.getType() != HoodieSchemaType.ARRAY) { + return internalType; + } + int overrideElementId = readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.ELEMENT_ID_PROP), -1); + Type newElement = reidentify(effective.getElementType(), array.elementType()); + int finalElementId = overrideElementId >= 0 ? overrideElementId : array.elementId(); + if (finalElementId == array.elementId() && newElement == array.elementType()) { + return array; + } + return Types.ArrayType.get(finalElementId, array.isElementOptional(), newElement); + } + case MAP: { + Types.MapType map = (Types.MapType) internalType; + if (effective.getType() != HoodieSchemaType.MAP) { + return internalType; + } + int overrideKeyId = readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.KEY_ID_PROP), -1); + int overrideValueId = readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.VALUE_ID_PROP), -1); + Type newValue = reidentify(effective.getValueType(), map.valueType()); + int finalKeyId = overrideKeyId >= 0 ? overrideKeyId : map.keyId(); + int finalValueId = overrideValueId >= 0 ? overrideValueId : map.valueId(); + if (finalKeyId == map.keyId() && finalValueId == map.valueId() && newValue == map.valueType()) { + return map; + } + return Types.MapType.get(finalKeyId, finalValueId, map.keyType(), newValue, map.isValueOptional()); + } + default: + return internalType; + } + } + + private static int readIntProp(Object raw, int fallback) { + return raw instanceof Number ? ((Number) raw).intValue() : fallback; + } + + /** + * Converts an {@link InternalSchema} to a {@link HoodieSchema} and stamps every + * sub-schema with the corresponding field id from the source. The schema-level + * version id and max column id are also propagated. + */ + public static HoodieSchema toHoodieSchema(InternalSchema internalSchema, String recordName) { + if (internalSchema == null || internalSchema.isEmptySchema()) { + return HoodieSchema.empty(); + } + HoodieSchema hoodieSchema = InternalSchemaConverter.convert(internalSchema, recordName); + stampIds(hoodieSchema, internalSchema.getRecord()); + hoodieSchema.setSchemaId(internalSchema.schemaId()); + hoodieSchema.setMaxColumnId(internalSchema.getMaxColumnId()); + hoodieSchema.invalidateIdIndex(); + return hoodieSchema; + } + + /** + * Prunes {@code source} to the supplied leaf-name list, preserving field ids. + * The returned HoodieSchema's record name is taken from {@code source}. + * + * <p>Single entry point for the bridge round-trip pattern that several call + * sites had open-coded: bridge to {@link InternalSchema}, prune via + * {@link InternalSchemaUtils#pruneInternalSchema}, then bridge back.</p> + */ + public static HoodieSchema pruneByLeafNames(HoodieSchema source, List<String> leafNames) { + InternalSchema pruned = InternalSchemaUtils.pruneInternalSchema(toInternalSchema(source), leafNames); + return toHoodieSchema(pruned, source.getFullName()); + } + + /** + * Prunes {@code source} down to the leaves of {@code requiredSchema}, preserving + * field ids. Returns a HoodieSchema named after {@code requiredSchema}. + */ + public static HoodieSchema pruneByRequiredSchema(HoodieSchema source, HoodieSchema requiredSchema) { + InternalSchema pruned = InternalSchemaUtils.pruneInternalSchema( + toInternalSchema(source), HoodieSchemaUtils.collectLeafNames(requiredSchema)); + return toHoodieSchema(pruned, requiredSchema.getFullName()); + } + + /** + * Returns a HoodieSchema with the same fields and ids as {@code source} but with + * its record name set to {@code recordName}. Implemented as a bridge round-trip + * since {@link HoodieSchema} has no in-place rename API. + */ + public static HoodieSchema withRecordName(HoodieSchema source, String recordName) { + return toHoodieSchema(toInternalSchema(source), recordName); + } + + private static void stampIds(HoodieSchema hoodieSchema, Type type) { + HoodieSchema effective = hoodieSchema.isNullable() ? hoodieSchema.getNonNullType() : hoodieSchema; + switch (type.typeId()) { + case RECORD: { + Types.RecordType record = (Types.RecordType) type; + // The HoodieSchema produced by InternalSchemaConverter preserves the declared + // field order, so positional pairing with InternalSchema is exact. + if (effective.getType() != HoodieSchemaType.RECORD) { + return; + } + for (int i = 0; i < record.fields().size(); i++) { + Types.Field internalField = record.fields().get(i); + HoodieSchemaField hoodieField = effective.getFields().get(i); + stampPropIfAbsent(hoodieField.getAvroField()::getObjectProp, + hoodieField.getAvroField()::addProp, HoodieSchema.FIELD_ID_PROP, internalField.fieldId()); + stampIds(hoodieField.schema(), internalField.type()); + } + return; + } + case ARRAY: { + Types.ArrayType array = (Types.ArrayType) type; + if (effective.getType() != HoodieSchemaType.ARRAY) { + return; + } + stampPropIfAbsent(effective.getAvroSchema()::getObjectProp, effective.getAvroSchema()::addProp, HoodieSchema.ELEMENT_ID_PROP, array.elementId()); + stampIds(effective.getElementType(), array.elementType()); + return; + } + case MAP: { + Types.MapType map = (Types.MapType) type; + if (effective.getType() != HoodieSchemaType.MAP) { + return; + } + stampPropIfAbsent(effective.getAvroSchema()::getObjectProp, effective.getAvroSchema()::addProp, HoodieSchema.KEY_ID_PROP, map.keyId()); + stampPropIfAbsent(effective.getAvroSchema()::getObjectProp, effective.getAvroSchema()::addProp, HoodieSchema.VALUE_ID_PROP, map.valueId()); + stampIds(effective.getValueType(), map.valueType()); + return; + } + default: + // primitives have no addressable child id + } + } + + /** + * Stamps {@code value} under {@code key} only if no value is already present, since + * Avro's JsonProperties is set-once and would throw on a second {@code addProp}. + * The same HoodieSchema instance can be reached more than once when an + * InternalSchema's {@link Types.RecordType} is shared across paths (the + * InternalSchemaConverter caches by Type identity), so this guard makes + * {@link #stampIds} idempotent. Mismatched re-stamps surface as a hard failure + * since the field-id is the schema's identity and a divergence here means Review Comment: π€ nit: `stampPropIfAbsent` declares its parameters as `java.util.function.Function<...>` / `java.util.function.BiConsumer<...>` inline β could you import these and use unqualified names? It would also let you simplify the call sites (`effective.getAvroSchema()::getObjectProp` etc.) which are a bit dense as-is. <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java: ########## @@ -99,72 +100,23 @@ public static HoodieSchema convert(InternalSchema internalSchema, String name) { } public static InternalSchema pruneHoodieSchemaToInternalSchema(HoodieSchema schema, InternalSchema originSchema) { - List<String> pruneNames = collectColNamesFromSchema(schema); - return InternalSchemaUtils.pruneInternalSchema(originSchema, pruneNames); + return InternalSchemaUtils.pruneInternalSchema(originSchema, + HoodieSchemaUtils.collectLeafNames(schema)); } /** * Collect all the leaf nodes names. * + * <p>Delegates to {@link org.apache.hudi.common.schema.HoodieSchemaUtils#collectLeafNames} + * β the HoodieSchema-direct walker that moved out of internal.schema in the + * Phase 5 ramp-up. Kept here for {@link TestInternalSchemaConverter} visibility.</p> Review Comment: π€ nit: "Phase 5 ramp-up" is internal migration jargon that won't mean anything to a future reader. Could you drop the phase reference and just say something like "delegates to HoodieSchemaUtils#collectLeafNames; kept here only so TestInternalSchemaConverter can exercise the wrapper"? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java: ########## @@ -64,14 +62,14 @@ public HoodieAvroParquetReader(InputSplit inputSplit, Configuration conf, Option MessageType messageType = fileFooter.getFileMetaData().getSchema(); baseSchema = getAvroSchemaConverter(conf).convert(messageType); - if (internalSchemaOption.isPresent()) { + if (evolutionSchemaOption.isPresent()) { // do schema reconciliation in case there exists read column which is not in the file schema. - InternalSchema mergedInternalSchema = new InternalSchemaMerger( - InternalSchemaConverter.convert(baseSchema), - internalSchemaOption.get(), - true, - true).mergeSchema(); - baseSchema = InternalSchemaConverter.convert(mergedInternalSchema, baseSchema.getFullName()); + // Preserve the file schema's record name on the merged result, matching the legacy + // InternalSchemaConverter.convert(merged, baseSchema.getFullName()) behavior β the + // hardcoded "schema" name from getEvolutionSchemaOption("schema") would otherwise leak Review Comment: π€ nit: the comment refers to `getEvolutionSchemaOption("schema")`, but there's no such method/call here β I think this is meant to describe the legacy `InternalSchemaConverter.convert(merged, "schema")` default. Could you rephrase so a reader doesn't go looking for that method? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java: ########## @@ -1315,6 +1353,172 @@ public Object getProp(String key) { public void addProp(String key, Object value) { ValidationUtils.checkArgument(key != null && !key.isEmpty(), "Property key cannot be null or empty"); avroSchema.addProp(key, value); + this.idIndex = null; + } + + /** + * Factory for an "empty" schema sentinel: an unnamed record with no fields and an + * unset schemaId. Equivalent to {@code InternalSchema#getEmptyInternalSchema()} β + * used by callers that need a placeholder when no evolution schema is available + * (e.g. schema-on-read disabled or no schema in commit metadata). Pair with + * {@link #isEmptySchema()} to detect the sentinel. + */ + public static HoodieSchema empty() { + return createRecord("EmptySchema", null, "hudi", false, Collections.emptyList()); + } + + /** + * Returns true if this schema is the "empty" sentinel β i.e. a record with + * no fields (the {@link #empty()} placeholder). Replaces + * {@code InternalSchema#isEmptySchema()}. + * + * <p>Note: the legacy {@code InternalSchema.isEmptySchema()} keyed off + * {@code versionId < 0}, which was effectively a no-fields check (only the + * EMPTY_SCHEMA singleton ever had a negative version id). We mirror the + * structural intent here so any HoodieSchema built via {@code parse()} / + * {@code fromAvroSchema()} / {@code createRecord()} without a subsequent + * {@link #setSchemaId(long)} is not mis-classified as empty.</p> + */ + public boolean isEmptySchema() { + return type != HoodieSchemaType.RECORD || avroSchema.getFields().isEmpty(); + } + + /** + * Returns the schema version id (replaces InternalSchema#schemaId). + * Returns -1 if no version has been assigned. + */ + public long schemaId() { + return schemaId; + } + + /** + * Sets the schema version id. Typically derived from a commit instant timestamp. + * Mutable: callers that need to re-stamp the version (e.g. after an evolution + * operation) can call this multiple times. + */ + public HoodieSchema setSchemaId(long schemaId) { + this.schemaId = schemaId; + return this; + } + + /** + * Returns the highest column id assigned to any sub-schema. If an explicit + * max-column-id has been recorded (e.g. preserved across a column deletion), + * that value is returned; otherwise the highest id currently present in the + * schema is returned. Replaces InternalSchema#maxColumnId. + * + * <p>Returns -1 if no ids have been assigned anywhere in the schema.</p> + */ + public int maxColumnId() { + if (explicitMaxColumnId >= 0) { + return explicitMaxColumnId; + } + return index().maxColumnIdSeen(); + } + + /** + * Records the highest column id explicitly. Useful after column deletions, where + * the highest currently-present id is less than the highest ever assigned and + * subsequent column additions must avoid colliding with previously-used ids. + */ + public HoodieSchema setMaxColumnId(int maxColumnId) { + this.explicitMaxColumnId = maxColumnId; + return this; + } + + /** + * Returns the dot-joined full name of the field that owns column id {@code id}, + * or empty string if none. Replaces InternalSchema#findFullName. + */ + public String findFullName(int id) { + String result = index().idToName().get(id); + return result == null ? "" : result; + } + + /** + * Returns the column id assigned to the field at {@code fullName}, or -1 if not found. + * Replaces InternalSchema#findIdByName. + */ + public int findIdByName(String fullName) { + if (fullName == null || fullName.isEmpty()) { + return -1; + } + Integer id = index().nameToId().get(fullName); + return id == null ? -1 : id; + } + + /** + * Returns all column ids in this schema. Replaces InternalSchema#getAllIds. + */ + public java.util.Set<Integer> getAllIds() { Review Comment: π€ nit: the new methods (`getAllIds`, `getAllColsFullName`, `idIndex`, etc.) use fully-qualified `java.util.Set` / `java.util.List` / `java.util.ArrayList` / `java.util.Map` inline. Could you add the imports and drop the prefixes to match the rest of the file? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala: ########## @@ -791,20 +796,21 @@ object HoodieBaseRelation extends SparkAdapterSupport { } /** - * Projects provided schema by picking only required (projected) top-level columns from it + * Projects provided schema by picking only required (projected) top-level columns from it. * - * @param tableSchema schema to project (either of [[InternalSchema]] or Avro's [[Schema]]) + * @param tableSchema Right is the schema-on-read evolution schema (with field ids); + * Left is the structural HoodieSchema fallback. * @param requiredColumns required top-level columns to be projected */ - def projectSchema(tableSchema: Either[HoodieSchema, InternalSchema], requiredColumns: Array[String]): (HoodieSchema, StructType, InternalSchema) = { + def projectSchema(tableSchema: Either[HoodieSchema, HoodieSchema], requiredColumns: Array[String]): (HoodieSchema, StructType, Option[HoodieSchema]) = { Review Comment: π€ nit: now that both arms of the `Either` are `HoodieSchema`, this signature is hard to read at the call site β `Either[HoodieSchema, HoodieSchema]` doesn't communicate which arm means what. Could you replace it with two separate parameters (e.g. `evolutionSchema: Option[HoodieSchema], fallbackSchema: HoodieSchema`) or a small sealed wrapper, so the intent doesn't depend on which side of the Either you happen to land on? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
