hudi-agent commented on code in PR #18688: URL: https://github.com/apache/hudi/pull/18688#discussion_r3195007009
########## hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaChangeApplier.java: ########## @@ -0,0 +1,786 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema.evolution; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.exception.HoodieSchemaException; +import org.apache.hudi.exception.SchemaCompatibilityException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +/** + * HoodieSchema-shaped applier for column-level schema evolution operations + * (ADD / DELETE / RENAME / UPDATE / REORDER). + * + * <p>Each method returns a new {@link HoodieSchema}; the input is not mutated. + * Field ids are preserved end-to-end so subsequent callers can resolve renamed + * columns by id. + */ +public class HoodieSchemaChangeApplier { + + private final HoodieSchema latestSchema; + + public HoodieSchemaChangeApplier(HoodieSchema latestSchema) { + this.latestSchema = latestSchema; + } + + /** + * Add a column to the table. For nested fields, use a dot-separated full path + * in {@code colName}. Position can be FIRST (no reference), or AFTER/BEFORE a + * sibling of the same parent. The new column is always nullable (Hudi + * convention for added columns). + */ + public HoodieSchema applyAddChange(String colName, + HoodieSchema colType, + String doc, + String position, + ColumnPositionType positionType) { + if (positionType == null) { + throw new IllegalArgumentException("positionType should be specified"); + } + String parentName = parentOf(colName); + String leafName = leafOf(colName); + if (positionType == ColumnPositionType.AFTER || positionType == ColumnPositionType.BEFORE) { + if (position == null || position.isEmpty()) { + throw new IllegalArgumentException( + "position should not be null/empty_string when specify positionChangeType as after/before"); + } + if (!parentName.equals(parentOf(position))) { + throw new IllegalArgumentException("cannot reorder two columns which has different parent"); + } + } + // Duplicate-name check runs before any position guard because the legacy + // applier added the column first and only validated position afterwards; + // tests assert the duplicate message wins for `add columns(existing first)`. + if (parentName.isEmpty()) { + if (hasColumnInsensitive(latestSchema, leafName)) { + throw new HoodieSchemaException( + String.format("Cannot add column '%s' because it already exists in the schema", leafName)); + } + } else if (hasColumnInsensitive(latestSchema, colName)) { + throw new HoodieSchemaException( + String.format("Cannot add column '%s' to parent '%s' because the column already exists at path '%s'", + leafName, parentName, colName)); + } + if (positionType == ColumnPositionType.FIRST) { + // For ADD, the legacy guard fired unconditionally because srcId == -1; + // forbid here for parity. The new column would otherwise land at the + // head of a record and shift the meta-column block when applied at + // top level. + throw new IllegalArgumentException( + "forbid adjust top-level columns position by using through first syntax"); + } + checkForbiddenAfterMetaColumn(positionType, position); + + int newId = nextColumnId(latestSchema); + HoodieSchema effectiveType = colType.isNullable() ? colType.getNonNullType() : colType; + // Added columns are always nullable, so stamp NULL_VALUE as the default to + // match Avro's [null, T] convention (the legacy convert() back leg did this + // automatically; the applier produces it explicitly). + HoodieSchemaField newField = HoodieSchemaField.of( + leafName, HoodieSchema.createNullable(effectiveType), doc, HoodieSchema.NULL_VALUE); + newField.addProp(HoodieSchema.FIELD_ID_PROP, newId); + String refLeaf = (position != null && !position.isEmpty()) ? leafOf(position) : null; + + HoodieSchema result = rewriteRecordAt(latestSchema, parentName, + parent -> insertFieldInRecord(parent, newField, refLeaf, positionType)); + // The new field's id raised the max — bump the schema's tracker so callers + // that consume {@link HoodieSchema#maxColumnId()} (notably the SerDe) see + // the up-to-date ceiling. + if (result.maxColumnId() < newId) { + result.setMaxColumnId(newId); + } + result.invalidateIdIndex(); + return result; + } + + /** + * Delete one or more columns. For nested fields, use dot-separated full paths. + * Each name is resolved to its column id and the corresponding field is + * removed wherever it appears in the schema tree (top-level, in a nested + * record, in a record under a map value, etc.). Throws when any name + * doesn't resolve. + */ + public HoodieSchema applyDeleteChange(String... colNames) { + HoodieSchema result = latestSchema; + for (String colName : colNames) { + int targetId = result.findIdByName(colName); + if (targetId < 0) { + throw new IllegalArgumentException( + String.format("cannot delete col: %s which does not exist in schema", colName)); + } + result = deleteFieldById(result, targetId); + } + return result; + } + + /** + * Rename a column without changing its id, type, or position. + * {@code newName} is a leaf name — the parent path is taken from {@code colName}. + */ + public HoodieSchema applyRenameChange(String colName, String newName) { + if (newName == null || newName.isEmpty()) { + throw new SchemaCompatibilityException( + String.format("Cannot rename column '%s' to empty or null name. New name must be non-empty", colName)); + } + if (hasColumnInsensitive(latestSchema, newName)) { + throw new SchemaCompatibilityException( + String.format("Cannot rename column '%s' to '%s' because a column with name '%s' already exists in the schema", + colName, newName, newName)); + } + return rewriteFieldByName(latestSchema, colName, + field -> rebuildField(field, newName, field.schema(), field.doc().orElse(null))); + } + + /** + * Toggle a column's nullability. Only required → optional is allowed by + * default; the inverse must be forced explicitly via the underlying applier. + */ + public HoodieSchema applyColumnNullabilityChange(String colName, boolean nullable) { + return rewriteFieldByName(latestSchema, colName, field -> { + HoodieSchema fieldSchema = field.schema(); + HoodieSchema effective = fieldSchema.isNullable() ? fieldSchema.getNonNullType() : fieldSchema; + HoodieSchema newSchema = nullable ? HoodieSchema.createNullable(effective) : effective; + return rebuildField(field, field.name(), newSchema, field.doc().orElse(null)); + }); + } + + /** + * Promote a column to a wider type. Type-promotion legality is decided by + * the caller — this applier does not enforce compatibility. + */ + public HoodieSchema applyColumnTypeChange(String colName, HoodieSchema newType) { + return rewriteFieldByName(latestSchema, colName, field -> { + HoodieSchema fieldSchema = field.schema(); + boolean wasNullable = fieldSchema.isNullable(); + HoodieSchema effectiveNewType = newType.isNullable() ? newType.getNonNullType() : newType; + HoodieSchema finalSchema = wasNullable ? HoodieSchema.createNullable(effectiveNewType) : effectiveNewType; + return rebuildField(field, field.name(), finalSchema, field.doc().orElse(null)); + }); + } + + /** + * Update a column's documentation string. + */ + public HoodieSchema applyColumnCommentChange(String colName, String doc) { + return rewriteFieldByName(latestSchema, colName, + field -> rebuildField(field, field.name(), field.schema(), doc)); + } + + /** + * Reorder a column relative to a sibling within the same enclosing struct. + * FIRST is allowed without a reference; AFTER/BEFORE require a reference + * column with the same parent. + */ + public HoodieSchema applyReOrderColPositionChange(String colName, + String referColName, + ColumnPositionType positionType) { + if (positionType == null) { + throw new IllegalArgumentException("positionType should be specified"); + } + String parentName = parentOf(colName); + String colLeaf = leafOf(colName); + if (positionType != ColumnPositionType.FIRST) { + if (referColName == null || referColName.isEmpty()) { + throw new IllegalArgumentException( + "referColName should not be null/empty when positionType is " + positionType); + } + if (!parentName.equals(parentOf(referColName))) { + throw new IllegalArgumentException("cannot reorder two columns which has different parent"); + } + } else if (parentName.isEmpty()) { + // Reordering a top-level column to FIRST would shift the meta columns; + // forbid it (matches the legacy InternalSchemaChangeApplier guard). + throw new IllegalArgumentException( + "forbid adjust top-level columns position by using through first syntax"); + } + checkForbiddenAfterMetaColumn(positionType, referColName); + String refLeaf = (referColName != null && !referColName.isEmpty()) ? leafOf(referColName) : null; + return rewriteRecordAt(latestSchema, parentName, + parent -> moveFieldInRecord(parent, colLeaf, refLeaf, positionType)); + } + + /** + * Case-insensitive check for whether {@code name} matches any full column + * path in {@code schema}. Mirrors the legacy {@code InternalSchema.hasColumn( + * name, false)} contract used by add/rename validation, including the + * loose semantics where a leaf name supplied for a top-level operation Review Comment: 🤖 Could `checkForbiddenAfterMetaColumn` also need to fire on `BEFORE`? Legacy `BaseColumnChange.addPositionChange` had `case BEFORE: checkColModifyIsLegal(dsrName); break;`, which threw if `dsrName` was *any* of the 5 meta columns. With this guard only handling AFTER, `ADD COLUMN foo … BEFORE _hoodie_commit_seqno` (or REORDER … BEFORE _hoodie_record_key) now succeeds and inserts an ordinary column inside the meta-column block — same hazard the AFTER check is trying to prevent. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaChangeApplier.java: ########## @@ -0,0 +1,786 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema.evolution; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.exception.HoodieSchemaException; +import org.apache.hudi.exception.SchemaCompatibilityException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +/** + * HoodieSchema-shaped applier for column-level schema evolution operations + * (ADD / DELETE / RENAME / UPDATE / REORDER). + * + * <p>Each method returns a new {@link HoodieSchema}; the input is not mutated. + * Field ids are preserved end-to-end so subsequent callers can resolve renamed + * columns by id. + */ +public class HoodieSchemaChangeApplier { + + private final HoodieSchema latestSchema; + + public HoodieSchemaChangeApplier(HoodieSchema latestSchema) { + this.latestSchema = latestSchema; + } + + /** + * Add a column to the table. For nested fields, use a dot-separated full path + * in {@code colName}. Position can be FIRST (no reference), or AFTER/BEFORE a + * sibling of the same parent. The new column is always nullable (Hudi + * convention for added columns). + */ + public HoodieSchema applyAddChange(String colName, + HoodieSchema colType, + String doc, + String position, + ColumnPositionType positionType) { + if (positionType == null) { + throw new IllegalArgumentException("positionType should be specified"); + } + String parentName = parentOf(colName); + String leafName = leafOf(colName); + if (positionType == ColumnPositionType.AFTER || positionType == ColumnPositionType.BEFORE) { + if (position == null || position.isEmpty()) { + throw new IllegalArgumentException( + "position should not be null/empty_string when specify positionChangeType as after/before"); + } + if (!parentName.equals(parentOf(position))) { + throw new IllegalArgumentException("cannot reorder two columns which has different parent"); + } + } + // Duplicate-name check runs before any position guard because the legacy + // applier added the column first and only validated position afterwards; + // tests assert the duplicate message wins for `add columns(existing first)`. + if (parentName.isEmpty()) { + if (hasColumnInsensitive(latestSchema, leafName)) { + throw new HoodieSchemaException( + String.format("Cannot add column '%s' because it already exists in the schema", leafName)); + } + } else if (hasColumnInsensitive(latestSchema, colName)) { + throw new HoodieSchemaException( + String.format("Cannot add column '%s' to parent '%s' because the column already exists at path '%s'", + leafName, parentName, colName)); + } + if (positionType == ColumnPositionType.FIRST) { + // For ADD, the legacy guard fired unconditionally because srcId == -1; + // forbid here for parity. The new column would otherwise land at the + // head of a record and shift the meta-column block when applied at + // top level. + throw new IllegalArgumentException( + "forbid adjust top-level columns position by using through first syntax"); + } + checkForbiddenAfterMetaColumn(positionType, position); + + int newId = nextColumnId(latestSchema); + HoodieSchema effectiveType = colType.isNullable() ? colType.getNonNullType() : colType; + // Added columns are always nullable, so stamp NULL_VALUE as the default to + // match Avro's [null, T] convention (the legacy convert() back leg did this + // automatically; the applier produces it explicitly). + HoodieSchemaField newField = HoodieSchemaField.of( + leafName, HoodieSchema.createNullable(effectiveType), doc, HoodieSchema.NULL_VALUE); + newField.addProp(HoodieSchema.FIELD_ID_PROP, newId); + String refLeaf = (position != null && !position.isEmpty()) ? leafOf(position) : null; + + HoodieSchema result = rewriteRecordAt(latestSchema, parentName, + parent -> insertFieldInRecord(parent, newField, refLeaf, positionType)); + // The new field's id raised the max — bump the schema's tracker so callers + // that consume {@link HoodieSchema#maxColumnId()} (notably the SerDe) see + // the up-to-date ceiling. + if (result.maxColumnId() < newId) { + result.setMaxColumnId(newId); + } + result.invalidateIdIndex(); + return result; + } + + /** + * Delete one or more columns. For nested fields, use dot-separated full paths. + * Each name is resolved to its column id and the corresponding field is + * removed wherever it appears in the schema tree (top-level, in a nested + * record, in a record under a map value, etc.). Throws when any name + * doesn't resolve. + */ + public HoodieSchema applyDeleteChange(String... colNames) { + HoodieSchema result = latestSchema; + for (String colName : colNames) { + int targetId = result.findIdByName(colName); + if (targetId < 0) { + throw new IllegalArgumentException( + String.format("cannot delete col: %s which does not exist in schema", colName)); + } + result = deleteFieldById(result, targetId); + } + return result; + } + + /** + * Rename a column without changing its id, type, or position. + * {@code newName} is a leaf name — the parent path is taken from {@code colName}. + */ + public HoodieSchema applyRenameChange(String colName, String newName) { + if (newName == null || newName.isEmpty()) { + throw new SchemaCompatibilityException( + String.format("Cannot rename column '%s' to empty or null name. New name must be non-empty", colName)); + } + if (hasColumnInsensitive(latestSchema, newName)) { + throw new SchemaCompatibilityException( + String.format("Cannot rename column '%s' to '%s' because a column with name '%s' already exists in the schema", + colName, newName, newName)); + } + return rewriteFieldByName(latestSchema, colName, + field -> rebuildField(field, newName, field.schema(), field.doc().orElse(null))); + } + + /** + * Toggle a column's nullability. Only required → optional is allowed by + * default; the inverse must be forced explicitly via the underlying applier. + */ + public HoodieSchema applyColumnNullabilityChange(String colName, boolean nullable) { + return rewriteFieldByName(latestSchema, colName, field -> { + HoodieSchema fieldSchema = field.schema(); + HoodieSchema effective = fieldSchema.isNullable() ? fieldSchema.getNonNullType() : fieldSchema; + HoodieSchema newSchema = nullable ? HoodieSchema.createNullable(effective) : effective; + return rebuildField(field, field.name(), newSchema, field.doc().orElse(null)); + }); + } + + /** + * Promote a column to a wider type. Type-promotion legality is decided by + * the caller — this applier does not enforce compatibility. + */ + public HoodieSchema applyColumnTypeChange(String colName, HoodieSchema newType) { + return rewriteFieldByName(latestSchema, colName, field -> { + HoodieSchema fieldSchema = field.schema(); + boolean wasNullable = fieldSchema.isNullable(); + HoodieSchema effectiveNewType = newType.isNullable() ? newType.getNonNullType() : newType; + HoodieSchema finalSchema = wasNullable ? HoodieSchema.createNullable(effectiveNewType) : effectiveNewType; + return rebuildField(field, field.name(), finalSchema, field.doc().orElse(null)); + }); + } + + /** + * Update a column's documentation string. + */ + public HoodieSchema applyColumnCommentChange(String colName, String doc) { + return rewriteFieldByName(latestSchema, colName, + field -> rebuildField(field, field.name(), field.schema(), doc)); + } + + /** + * Reorder a column relative to a sibling within the same enclosing struct. + * FIRST is allowed without a reference; AFTER/BEFORE require a reference + * column with the same parent. + */ + public HoodieSchema applyReOrderColPositionChange(String colName, + String referColName, + ColumnPositionType positionType) { + if (positionType == null) { + throw new IllegalArgumentException("positionType should be specified"); + } + String parentName = parentOf(colName); + String colLeaf = leafOf(colName); + if (positionType != ColumnPositionType.FIRST) { + if (referColName == null || referColName.isEmpty()) { + throw new IllegalArgumentException( + "referColName should not be null/empty when positionType is " + positionType); + } + if (!parentName.equals(parentOf(referColName))) { + throw new IllegalArgumentException("cannot reorder two columns which has different parent"); + } + } else if (parentName.isEmpty()) { + // Reordering a top-level column to FIRST would shift the meta columns; + // forbid it (matches the legacy InternalSchemaChangeApplier guard). + throw new IllegalArgumentException( + "forbid adjust top-level columns position by using through first syntax"); + } + checkForbiddenAfterMetaColumn(positionType, referColName); + String refLeaf = (referColName != null && !referColName.isEmpty()) ? leafOf(referColName) : null; + return rewriteRecordAt(latestSchema, parentName, + parent -> moveFieldInRecord(parent, colLeaf, refLeaf, positionType)); + } + + /** + * Case-insensitive check for whether {@code name} matches any full column + * path in {@code schema}. Mirrors the legacy {@code InternalSchema.hasColumn( + * name, false)} contract used by add/rename validation, including the + * loose semantics where a leaf name supplied for a top-level operation + * matches against any full path that equals it case-insensitively. + */ + private static boolean hasColumnInsensitive(HoodieSchema schema, String name) { + if (name == null || name.isEmpty()) { + return false; + } + String lowered = name.toLowerCase(Locale.ROOT); + for (String fullName : schema.getAllColsFullName()) { + if (fullName.toLowerCase(Locale.ROOT).equals(lowered)) { + return true; + } + } + return false; + } + + /** + * Forbids AFTER referencing one of the leading meta columns + * ({@code HOODIE_META_COLUMNS} minus the last two — i.e., + * {@code _hoodie_commit_time}, {@code _hoodie_commit_seqno}, + * {@code _hoodie_record_key}). Inserting an ordinary column there would + * split the meta-column block. + */ + private static void checkForbiddenAfterMetaColumn(ColumnPositionType positionType, String position) { + if (positionType != ColumnPositionType.AFTER || position == null || position.isEmpty()) { + return; + } + List<String> leadingMetaCols = HoodieRecord.HOODIE_META_COLUMNS + .subList(0, HoodieRecord.HOODIE_META_COLUMNS.size() - 2); + if (leadingMetaCols.stream().anyMatch(f -> f.equalsIgnoreCase(position))) { + throw new IllegalArgumentException( + "forbid adjust the position of ordinary columns between meta columns"); + } + } Review Comment: 🤖 The legacy guard used `subList(0, size - 2)` (indices 0..2) too, but reasoning forward: AFTER `_hoodie_partition_path` would still split the meta block, putting the new column between `_hoodie_partition_path` and `_hoodie_file_name`. Was this gap intentional in the legacy code, or is it worth widening to all 5 meta cols here while we're cleaning this up? The current scope (3 leading cols) is parity, just flagging in case it matters. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java: ########## @@ -663,6 +680,43 @@ public static HoodieSchema createDecimal(String name, String namespace, String d return new HoodieSchema.Decimal(decimalSchema); } + /** + * Creates a FIXED-backed decimal schema with the minimum byte size needed to + * hold {@code precision} digits. Equivalent to {@code Types.DecimalType.get(p, s)} — + * use this when the target schema is being chosen for type-evolution promotion + * from an integral type, where {@code DECIMAL_BYTES} is not a compatible target. + * + * <p>The Avro fixed name is derived from the spec ({@code decimal_p<P>_s<S>_b<B>}) + * so two callers minting decimals with the same precision/scale/size produce + * structurally equal Avro fixed schemas — Avro's {@code Schema.toString} dedupes + * those without complaint. Different specs get different names, so they never + * collide either, even when both end up reachable from the same enclosing + * record (which can happen after a type-promote rebuild leaves the old fixed + * still referenced from {@code latest_schema} history). + */ + public static HoodieSchema createDecimalFixed(int precision, int scale) { + int size = computeMinBytesForDecimalPrecision(precision); + return createDecimal(decimalFixedName(precision, scale, size), null, null, precision, scale, size); + } + + /** + * Spec-derived Avro fixed name for {@link #createDecimalFixed}. Centralized so + * the SerDe (which materializes legacy {@code decimal_fixed(p,s)[size]} type + * strings) can use the same convention and produce equal Avro types when two Review Comment: 🤖 Could you clarify the read-side compatibility story here? Old Parquet/Avro data was written with the Avro fixed name `"decimal"`/`"fixed"`; with this change new schemas mint `"decimal_p<P>_s<S>_b<B>"`/`"fixed_b<size>"`. Do we have a reconciliation path where a HoodieSchema parsed from an old file (name `"decimal"`) can still match a HoodieSchema produced by `createDecimalFixed(p, s)` (name `"decimal_p_s_b"`) — Avro fixed equality includes the full name, so a strict comparison would now reject what used to match. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaChangeApplier.java: ########## @@ -0,0 +1,786 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema.evolution; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.exception.HoodieSchemaException; +import org.apache.hudi.exception.SchemaCompatibilityException; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +/** + * HoodieSchema-shaped applier for column-level schema evolution operations + * (ADD / DELETE / RENAME / UPDATE / REORDER). + * + * <p>Each method returns a new {@link HoodieSchema}; the input is not mutated. + * Field ids are preserved end-to-end so subsequent callers can resolve renamed + * columns by id. + */ +public class HoodieSchemaChangeApplier { + + private final HoodieSchema latestSchema; + + public HoodieSchemaChangeApplier(HoodieSchema latestSchema) { + this.latestSchema = latestSchema; + } + + /** + * Add a column to the table. For nested fields, use a dot-separated full path + * in {@code colName}. Position can be FIRST (no reference), or AFTER/BEFORE a + * sibling of the same parent. The new column is always nullable (Hudi + * convention for added columns). + */ + public HoodieSchema applyAddChange(String colName, + HoodieSchema colType, + String doc, + String position, + ColumnPositionType positionType) { + if (positionType == null) { + throw new IllegalArgumentException("positionType should be specified"); + } + String parentName = parentOf(colName); + String leafName = leafOf(colName); + if (positionType == ColumnPositionType.AFTER || positionType == ColumnPositionType.BEFORE) { + if (position == null || position.isEmpty()) { + throw new IllegalArgumentException( + "position should not be null/empty_string when specify positionChangeType as after/before"); + } + if (!parentName.equals(parentOf(position))) { + throw new IllegalArgumentException("cannot reorder two columns which has different parent"); + } + } + // Duplicate-name check runs before any position guard because the legacy + // applier added the column first and only validated position afterwards; + // tests assert the duplicate message wins for `add columns(existing first)`. + if (parentName.isEmpty()) { + if (hasColumnInsensitive(latestSchema, leafName)) { + throw new HoodieSchemaException( + String.format("Cannot add column '%s' because it already exists in the schema", leafName)); + } + } else if (hasColumnInsensitive(latestSchema, colName)) { + throw new HoodieSchemaException( + String.format("Cannot add column '%s' to parent '%s' because the column already exists at path '%s'", + leafName, parentName, colName)); + } + if (positionType == ColumnPositionType.FIRST) { + // For ADD, the legacy guard fired unconditionally because srcId == -1; + // forbid here for parity. The new column would otherwise land at the + // head of a record and shift the meta-column block when applied at + // top level. + throw new IllegalArgumentException( + "forbid adjust top-level columns position by using through first syntax"); + } + checkForbiddenAfterMetaColumn(positionType, position); + + int newId = nextColumnId(latestSchema); + HoodieSchema effectiveType = colType.isNullable() ? colType.getNonNullType() : colType; + // Added columns are always nullable, so stamp NULL_VALUE as the default to + // match Avro's [null, T] convention (the legacy convert() back leg did this + // automatically; the applier produces it explicitly). + HoodieSchemaField newField = HoodieSchemaField.of( + leafName, HoodieSchema.createNullable(effectiveType), doc, HoodieSchema.NULL_VALUE); + newField.addProp(HoodieSchema.FIELD_ID_PROP, newId); + String refLeaf = (position != null && !position.isEmpty()) ? leafOf(position) : null; + + HoodieSchema result = rewriteRecordAt(latestSchema, parentName, + parent -> insertFieldInRecord(parent, newField, refLeaf, positionType)); + // The new field's id raised the max — bump the schema's tracker so callers + // that consume {@link HoodieSchema#maxColumnId()} (notably the SerDe) see + // the up-to-date ceiling. + if (result.maxColumnId() < newId) { + result.setMaxColumnId(newId); + } + result.invalidateIdIndex(); + return result; + } + + /** + * Delete one or more columns. For nested fields, use dot-separated full paths. + * Each name is resolved to its column id and the corresponding field is + * removed wherever it appears in the schema tree (top-level, in a nested + * record, in a record under a map value, etc.). Throws when any name + * doesn't resolve. + */ + public HoodieSchema applyDeleteChange(String... colNames) { + HoodieSchema result = latestSchema; + for (String colName : colNames) { + int targetId = result.findIdByName(colName); + if (targetId < 0) { + throw new IllegalArgumentException( + String.format("cannot delete col: %s which does not exist in schema", colName)); + } + result = deleteFieldById(result, targetId); + } + return result; + } + + /** + * Rename a column without changing its id, type, or position. + * {@code newName} is a leaf name — the parent path is taken from {@code colName}. + */ + public HoodieSchema applyRenameChange(String colName, String newName) { + if (newName == null || newName.isEmpty()) { + throw new SchemaCompatibilityException( + String.format("Cannot rename column '%s' to empty or null name. New name must be non-empty", colName)); + } + if (hasColumnInsensitive(latestSchema, newName)) { + throw new SchemaCompatibilityException( + String.format("Cannot rename column '%s' to '%s' because a column with name '%s' already exists in the schema", + colName, newName, newName)); + } + return rewriteFieldByName(latestSchema, colName, + field -> rebuildField(field, newName, field.schema(), field.doc().orElse(null))); + } Review Comment: 🤖 Legacy `ColumnUpdateChange.renameColumn` started with `checkColModifyIsLegal(name)` to forbid renaming any of the 5 meta columns. The new validation here only guards `newName` (non-empty, no duplicate) — it doesn't guard `colName`, so `ALTER TABLE … RENAME COLUMN _hoodie_commit_time TO foo` would now succeed and break Hudi's expected meta-column layout. Same gap exists in `applyColumnNullabilityChange` / `applyColumnTypeChange` / `applyColumnCommentChange` below — legacy called `checkColModifyIsLegal(name)` in each. <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag quality.</i></sub> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
