This is an automated email from the ASF dual-hosted git repository.

jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fe119a0cf1e [HUDI-9566] Ban schema evolution on columns with SI 
(#13595)
0fe119a0cf1e is described below

commit 0fe119a0cf1e0b8ef44a2049fd15e56fcb62cfb9
Author: Davis-Zhang-Onehouse 
<[email protected]>
AuthorDate: Fri Jul 25 08:22:16 2025 -0700

    [HUDI-9566] Ban schema evolution on columns with SI (#13595)
---
 .../java/org/apache/hudi/table/HoodieTable.java    |  77 +++++-
 .../hudi/table/TestHoodieTableSchemaEvolution.java | 307 +++++++++++++++++++++
 .../spark/sql/hudi/command/AlterTableCommand.scala |   1 +
 .../functional/TestHiveTableSchemaEvolution.java   |   2 +
 .../apache/hudi/functional/TestHoodieIndex.java    |   4 +-
 .../apache/spark/sql/hudi/ddl/TestSpark3DDL.scala  | 252 +++++++++--------
 .../sql/hudi/dml/others/TestTimeTravelTable.scala  |  97 +++----
 .../hudi/feature/index/TestSecondaryIndex.scala    | 112 +++++++-
 8 files changed, 678 insertions(+), 174 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 82a2cc63be33..f3ba73c6dac5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.avro.AvroSchemaCompatibility;
 import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -45,6 +46,8 @@ import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
 import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieIndexMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -102,6 +105,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -112,6 +116,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
 import static 
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
 import static 
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.LAZY;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
@@ -928,7 +933,7 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
    * When inserting/updating data, we read records using the last used schema 
and convert them to the
    * GenericRecords with writerSchema. Hence, we need to ensure that this 
conversion can take place without errors.
    */
-  private void validateSchema() throws HoodieUpsertException, 
HoodieInsertException {
+  public void validateSchema() throws HoodieUpsertException, 
HoodieInsertException {
 
     boolean shouldValidate = config.shouldValidateAvroSchema();
     boolean allowProjection = config.shouldAllowAutoEvolutionColumnDrop();
@@ -949,6 +954,12 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
       Schema writerSchema = 
HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
       Schema tableSchema = 
HoodieAvroUtils.createHoodieWriteSchema(existingTableSchema.get());
       AvroSchemaUtils.checkSchemaCompatible(tableSchema, writerSchema, 
shouldValidate, allowProjection, getDropPartitionColNames());
+      
+      // Check secondary index column compatibility
+      Option<HoodieIndexMetadata> indexMetadata = 
metaClient.getIndexMetadata();
+      if (indexMetadata.isPresent()) {
+        validateSecondaryIndexSchemaEvolution(tableSchema, writerSchema, 
indexMetadata.get());
+      }
     } catch (SchemaCompatibilityException e) {
       throw e;
     } catch (Exception e) {
@@ -956,6 +967,70 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
     }
   }
 
+  /**
+   * Validates that columns with secondary indexes are not evolved in an 
incompatible way.
+   *
+   * @param tableSchema the current table schema
+   * @param writerSchema the new writer schema
+   * @param indexMetadata the index metadata containing all index definitions
+   * @throws SchemaCompatibilityException if a secondary index column has 
incompatible evolution
+   */
+  static void validateSecondaryIndexSchemaEvolution(
+      Schema tableSchema,
+      Schema writerSchema,
+      HoodieIndexMetadata indexMetadata) throws SchemaCompatibilityException {
+    
+    // Filter for secondary index definitions
+    List<HoodieIndexDefinition> secondaryIndexDefs = 
indexMetadata.getIndexDefinitions().values().stream()
+        .filter(indexDef -> 
MetadataPartitionType.fromPartitionPath(indexDef.getIndexName()).equals(MetadataPartitionType.SECONDARY_INDEX))
+        .collect(Collectors.toList());
+    
+    if (secondaryIndexDefs.isEmpty()) {
+      return;
+    }
+    
+    // Create a map from source field to index name for efficient lookup
+    Map<String, String> columnToIndexName = new HashMap<>();
+    for (HoodieIndexDefinition indexDef : secondaryIndexDefs) {
+      String indexName = indexDef.getIndexName();
+      for (String sourceField : indexDef.getSourceFields()) {
+        // Note: If a column is part of multiple indexes, this will use the 
last one
+        // This is fine since we just need any index name for error reporting
+        columnToIndexName.put(sourceField, indexName);
+      }
+    }
+    
+    // Check each indexed column for schema evolution
+    for (Map.Entry<String, String> entry : columnToIndexName.entrySet()) {
+      String columnName = entry.getKey();
+      String indexName = entry.getValue();
+      
+      Schema.Field tableField = tableSchema.getField(columnName);
+      
+      if (tableField == null) {
+        // This shouldn't happen as indexed columns should exist in table 
schema
+        LOG.warn("Secondary index '{}' references non-existent column: {}", 
indexName, columnName);
+        continue;
+      }
+      
+      // Use AvroSchemaCompatibility's field lookup logic to handle aliases
+      Schema.Field writerField = 
AvroSchemaCompatibility.lookupWriterField(writerSchema, tableField);
+      
+      if (writerField != null && 
!tableField.schema().equals(writerField.schema())) {
+        // Check if this is just making the field nullable/non-nullable, which 
is safe from SI perspective
+        if 
(resolveNullableSchema(tableField.schema()).equals(resolveNullableSchema(writerField.schema())))
 {
+          continue;
+        }
+        
+        String errorMessage = String.format(
+            "Column '%s' has secondary index '%s' and cannot evolve from 
schema '%s' to '%s'. "
+            + "Please drop the secondary index before changing the column 
type.",
+            columnName, indexName, tableField.schema(), writerField.schema());
+        throw new SchemaCompatibilityException(errorMessage);
+      }
+    }
+  }
+
   public void validateUpsertSchema() throws HoodieUpsertException {
     if (isMetadataTable) {
       return;
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestHoodieTableSchemaEvolution.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestHoodieTableSchemaEvolution.java
new file mode 100644
index 000000000000..3143beb6ed0b
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestHoodieTableSchemaEvolution.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.exception.SchemaCompatibilityException;
+import org.apache.hudi.metadata.MetadataPartitionType;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieTable#validateSecondaryIndexSchemaEvolution}.
+ */
+public class TestHoodieTableSchemaEvolution {
+
+  private static final String TABLE_SCHEMA = "{"
+      + "\"type\": \"record\","
+      + "\"name\": \"test\","
+      + "\"fields\": ["
+      + "  {\"name\": \"id\", \"type\": \"int\"},"
+      + "  {\"name\": \"name\", \"type\": \"string\"},"
+      + "  {\"name\": \"age\", \"type\": \"int\"},"
+      + "  {\"name\": \"salary\", \"type\": \"double\"},"
+      + "  {\"name\": \"nullable_field\", \"type\": [\"null\", \"string\"], 
\"default\": null}"
+      + "]}";
+
+  @Test
+  public void testNoSecondaryIndexes() {
+    // When there are no secondary indexes, any schema evolution should be 
allowed
+    Schema tableSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+    Schema writerSchema = new 
Schema.Parser().parse(TABLE_SCHEMA.replace("\"age\", \"type\": \"int\"", 
"\"age\", \"type\": \"long\""));
+    
+    HoodieIndexMetadata indexMetadata = new HoodieIndexMetadata(new 
HashMap<>());
+    
+    assertDoesNotThrow(() -> 
+        HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema, 
writerSchema, indexMetadata));
+  }
+
+  @Test
+  public void testNoSchemaChange() {
+    // When schema doesn't change, validation should pass even with secondary 
indexes
+    Schema tableSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+    Schema writerSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+    
+    HoodieIndexDefinition indexDef = 
createSecondaryIndexDefinition("secondary_index_age", "age");
+    HoodieIndexMetadata indexMetadata = createIndexMetadata(indexDef);
+    
+    assertDoesNotThrow(() -> 
+        HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema, 
writerSchema, indexMetadata));
+  }
+
+  @Test
+  public void testNonIndexedColumnEvolution() {
+    // Evolution of non-indexed columns should be allowed
+    Schema tableSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+    Schema writerSchema = new 
Schema.Parser().parse(TABLE_SCHEMA.replace("\"name\", \"type\": \"string\"", 
"\"name\", \"type\": [\"null\", \"string\"]"));
+    
+    HoodieIndexDefinition indexDef = 
createSecondaryIndexDefinition("secondary_index_age", "age");
+    HoodieIndexMetadata indexMetadata = createIndexMetadata(indexDef);
+    
+    assertDoesNotThrow(() -> 
+        HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema, 
writerSchema, indexMetadata));
+  }
+
+  @ParameterizedTest
+  @MethodSource("provideInvalidSchemaEvolutions")
+  public void testIndexedColumnTypeEvolution(String fieldName, String 
originalType, String evolvedType) {
+    // Type evolution of indexed columns should fail
+    String originalSchema = TABLE_SCHEMA;
+    String evolvedSchema = TABLE_SCHEMA.replace("\"" + fieldName + "\", 
\"type\": \"" + originalType + "\"", 
+                                                 "\"" + fieldName + "\", 
\"type\": \"" + evolvedType + "\"");
+    
+    Schema tableSchema = new Schema.Parser().parse(originalSchema);
+    Schema writerSchema = new Schema.Parser().parse(evolvedSchema);
+    
+    HoodieIndexDefinition indexDef = 
createSecondaryIndexDefinition("secondary_index_" + fieldName, fieldName);
+    HoodieIndexMetadata indexMetadata = createIndexMetadata(indexDef);
+    
+    SchemaCompatibilityException exception = 
assertThrows(SchemaCompatibilityException.class, () -> 
+        HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema, 
writerSchema, indexMetadata));
+    
+    assertTrue(exception.getMessage().contains("secondary index"));
+    assertTrue(exception.getMessage().contains(fieldName));
+    assertTrue(exception.getMessage().contains("secondary_index_" + 
fieldName));
+  }
+
+  private static Stream<Arguments> provideInvalidSchemaEvolutions() {
+    return Stream.of(
+        Arguments.of("age", "int", "long"),       // int to long
+        Arguments.of("age", "int", "double"),     // int to double
+        Arguments.of("salary", "double", "float") // double to float
+    );
+  }
+
+  @Test
+  public void testMultipleIndexesOnSameColumn() {
+    // When a column has multiple indexes, error should mention at least one
+    Schema tableSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+    Schema writerSchema = new 
Schema.Parser().parse(TABLE_SCHEMA.replace("\"age\", \"type\": \"int\"", 
"\"age\", \"type\": \"long\""));
+    
+    HoodieIndexDefinition indexDef1 = 
createSecondaryIndexDefinition("secondary_index_age_1", "age");
+    HoodieIndexDefinition indexDef2 = 
createSecondaryIndexDefinition("secondary_index_age_2", "age");
+    
+    Map<String, HoodieIndexDefinition> indexDefs = new HashMap<>();
+    indexDefs.put("secondary_index_age_1", indexDef1);
+    indexDefs.put("secondary_index_age_2", indexDef2);
+    HoodieIndexMetadata indexMetadata = new HoodieIndexMetadata(indexDefs);
+    
+    SchemaCompatibilityException exception = 
assertThrows(SchemaCompatibilityException.class, () -> 
+        HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema, 
writerSchema, indexMetadata));
+    
+    assertTrue(exception.getMessage().contains("age"));
+    // Should contain at least one of the index names
+    assertTrue(exception.getMessage().contains("secondary_index_age_1") || 
exception.getMessage().contains("secondary_index_age_2"));
+  }
+
+  @Test
+  public void testCompoundIndex() {
+    // Test index on multiple columns - if any column evolves, should fail
+    Schema tableSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+    Schema writerSchema = new 
Schema.Parser().parse(TABLE_SCHEMA.replace("\"age\", \"type\": \"int\"", 
"\"age\", \"type\": \"long\""));
+    
+    HoodieIndexDefinition indexDef = 
createSecondaryIndexDefinition("secondary_index_compound", "name", "age");
+    HoodieIndexMetadata indexMetadata = createIndexMetadata(indexDef);
+    
+    SchemaCompatibilityException exception = 
assertThrows(SchemaCompatibilityException.class, () -> 
+        HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema, 
writerSchema, indexMetadata));
+    
+    assertTrue(exception.getMessage().contains("age"));
+    assertTrue(exception.getMessage().contains("secondary_index_compound"));
+  }
+
+  @Test
+  public void testFieldWithAlias() {
+    // Test schema evolution with field aliases
+    String tableSchemaStr = "{"
+        + "\"type\": \"record\","
+        + "\"name\": \"test\","
+        + "\"fields\": ["
+        + "  {\"name\": \"id\", \"type\": \"int\"},"
+        + "  {\"name\": \"old_name\", \"type\": \"string\", \"aliases\": 
[\"new_name\"]}"
+        + "]}";
+    
+    String writerSchemaStr = "{"
+        + "\"type\": \"record\","
+        + "\"name\": \"test\","
+        + "\"fields\": ["
+        + "  {\"name\": \"id\", \"type\": \"int\"},"
+        + "  {\"name\": \"new_name\", \"type\": \"string\"}"  // Field renamed 
using alias
+        + "]}";
+    
+    Schema tableSchema = new Schema.Parser().parse(tableSchemaStr);
+    Schema writerSchema = new Schema.Parser().parse(writerSchemaStr);
+    
+    HoodieIndexDefinition indexDef = 
createSecondaryIndexDefinition("secondary_index_name", "old_name");
+    HoodieIndexMetadata indexMetadata = createIndexMetadata(indexDef);
+    
+    // Should pass because the field is found via alias and type hasn't changed
+    assertDoesNotThrow(() -> 
+        HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema, 
writerSchema, indexMetadata));
+  }
+
+  @Test
+  public void testNullableFieldEvolution() {
+    // Test evolution from non-nullable to nullable
+    String evolvedSchema = TABLE_SCHEMA.replace("\"name\", \"type\": 
\"string\"", 
+                                                 "\"name\", \"type\": 
[\"null\", \"string\"], \"default\": null");
+    
+    Schema tableSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+    Schema writerSchema = new Schema.Parser().parse(evolvedSchema);
+    
+    HoodieIndexDefinition indexDef = 
createSecondaryIndexDefinition("secondary_index_name", "name");
+    HoodieIndexMetadata indexMetadata = createIndexMetadata(indexDef);
+    
+    // Making a field nullable is a backward-compatible change and should be 
allowed
+    assertDoesNotThrow(() -> 
+        HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema, 
writerSchema, indexMetadata));
+  }
+
+  @Test
+  public void testMissingIndexedColumnInTableSchema() {
+    // Edge case: index references a column that doesn't exist in table schema
+    Schema tableSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+    Schema writerSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+    
+    HoodieIndexDefinition indexDef = 
createSecondaryIndexDefinition("secondary_index_nonexistent", 
"nonexistent_column");
+    HoodieIndexMetadata indexMetadata = createIndexMetadata(indexDef);
+    
+    // Should handle gracefully (logs warning and continues)
+    assertDoesNotThrow(() -> 
+        HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema, 
writerSchema, indexMetadata));
+  }
+
+  @Test
+  public void testNonSecondaryIndexDefinitions() {
+    // Test that non-secondary index definitions are ignored
+    Schema tableSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+    Schema writerSchema = new 
Schema.Parser().parse(TABLE_SCHEMA.replace("\"age\", \"type\": \"int\"", 
"\"age\", \"type\": \"long\""));
+    
+    // Create an expression index (not secondary index)
+    HoodieIndexDefinition expressionIndexDef = 
HoodieIndexDefinition.newBuilder()
+        
.withIndexName(MetadataPartitionType.EXPRESSION_INDEX.getPartitionPath() + 
"_expr_idx")
+        
.withIndexType(MetadataPartitionType.EXPRESSION_INDEX.getPartitionPath())
+        .withSourceFields(Collections.singletonList("age"))
+        .build();
+    
+    HoodieIndexMetadata indexMetadata = 
createIndexMetadata(expressionIndexDef);
+    
+    // Should not throw because it's not a secondary index
+    assertDoesNotThrow(() -> 
+        HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema, 
writerSchema, indexMetadata));
+  }
+
+  @Test
+  public void testFixedTypeEvolution() {
+    // Test fixed type size changes
+    String fixed8Schema = "{"
+        + "\"type\": \"record\","
+        + "\"name\": \"test\","
+        + "\"fields\": ["
+        + "  {\"name\": \"id\", \"type\": \"int\"},"
+        + "  {\"name\": \"fixed_field\", \"type\": {\"type\": \"fixed\", 
\"name\": \"FixedField\", \"size\": 8}}"
+        + "]}";
+    
+    String fixed16Schema = "{"
+        + "\"type\": \"record\","
+        + "\"name\": \"test\","
+        + "\"fields\": ["
+        + "  {\"name\": \"id\", \"type\": \"int\"},"
+        + "  {\"name\": \"fixed_field\", \"type\": {\"type\": \"fixed\", 
\"name\": \"FixedField\", \"size\": 16}}"
+        + "]}";
+    
+    Schema tableSchema = new Schema.Parser().parse(fixed8Schema);
+    Schema writerSchema = new Schema.Parser().parse(fixed16Schema);
+    
+    HoodieIndexDefinition indexDef = 
createSecondaryIndexDefinition("secondary_index_fixed", "fixed_field");
+    HoodieIndexMetadata indexMetadata = createIndexMetadata(indexDef);
+    
+    final Schema tableSchemaFixed1 = tableSchema;
+    final Schema writerSchemaFixed1 = writerSchema;
+    final HoodieIndexMetadata indexMetadataFixed = indexMetadata;
+    SchemaCompatibilityException exception = 
assertThrows(SchemaCompatibilityException.class, () -> 
+        HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchemaFixed1, 
writerSchemaFixed1, indexMetadataFixed));
+    
+    assertTrue(exception.getMessage().contains("fixed_field"));
+    assertTrue(exception.getMessage().contains("secondary index"));
+    
+    // Fixed size decrease
+    tableSchema = new Schema.Parser().parse(fixed16Schema);
+    writerSchema = new Schema.Parser().parse(fixed8Schema);
+    
+    final Schema tableSchemaFixed2 = tableSchema;
+    final Schema writerSchemaFixed2 = writerSchema;
+    exception = assertThrows(SchemaCompatibilityException.class, () -> 
+        HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchemaFixed2, 
writerSchemaFixed2, indexMetadataFixed));
+    
+    assertTrue(exception.getMessage().contains("fixed_field"));
+    assertTrue(exception.getMessage().contains("secondary index"));
+  }
+
+  private HoodieIndexDefinition createSecondaryIndexDefinition(String 
indexName, String... sourceFields) {
+    return HoodieIndexDefinition.newBuilder()
+        
.withIndexName(MetadataPartitionType.SECONDARY_INDEX.getPartitionPath() + "_" + 
indexName)
+        
.withIndexType(MetadataPartitionType.SECONDARY_INDEX.getPartitionPath())
+        .withSourceFields(Arrays.asList(sourceFields))
+        .build();
+  }
+
+  private HoodieIndexMetadata createIndexMetadata(HoodieIndexDefinition... 
indexDefs) {
+    Map<String, HoodieIndexDefinition> indexDefMap = new HashMap<>();
+    for (HoodieIndexDefinition indexDef : indexDefs) {
+      indexDefMap.put(indexDef.getIndexName(), indexDef);
+    }
+    return new HoodieIndexMetadata(indexDefMap);
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
index 40132b5e54b7..65a25b6ab019 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
@@ -277,6 +277,7 @@ object AlterTableCommand extends Logging {
     client.setOperationType(WriteOperationType.ALTER_SCHEMA)
 
     val hoodieTable = HoodieSparkTable.create(client.getConfig, 
client.getEngineContext)
+    hoodieTable.validateSchema()
     val timeLine = hoodieTable.getActiveTimeline
     val instantGenerator = metaClient.getTimelineLayout.getInstantGenerator
     val requested = instantGenerator.createNewInstant(State.REQUESTED, 
commitActionType, instantTime)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
index 8d3c925f3740..ecc0a2afbf5c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
@@ -99,6 +99,7 @@ public class TestHiveTableSchemaEvolution {
     String path = new 
Path(basePath.toAbsolutePath().toString()).toUri().toString();
 
     spark.sql("set hoodie.schema.on.read.enable=true");
+    spark.sql("set 
hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true");
 
     spark.sql(String.format("create table %s (col0 int, col1 float, col2 
string, col3 timestamp) using hudi "
             + "tblproperties (type='mor', primaryKey='col0', 
preCombineField='col1', "
@@ -145,6 +146,7 @@ public class TestHiveTableSchemaEvolution {
     String path = new 
Path(basePath.toAbsolutePath().toString()).toUri().toString();
 
     spark.sql("set hoodie.schema.on.read.enable=true");
+    spark.sql("set 
hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true");
     spark.sql(String.format("create table %s (col0 int, col1 float, col2 
string) using hudi "
             + "tblproperties (type='%s', primaryKey='col0', 
preCombineField='col1', "
             + 
"hoodie.compaction.payload.class='org.apache.hudi.common.model.OverwriteWithLatestAvroPayload')
 location '%s'",
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
index 846b10897b79..6109bf8cc9cf 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
@@ -560,7 +560,7 @@ public class TestHoodieIndex extends TestHoodieMetadataBase 
{
     assertTrue(HoodieIndexUtils.checkIfValidCommit(timeline, 
instantTimestampSec));
 
     // With a sec format instant time checkIfValid should return false 
assuming its > first entry in the active timeline
-    Thread.sleep(1010); // sleep required so that new timestamp differs in the 
seconds rather than msec
+    Thread.sleep(2010); // sleep required so that new timestamp differs in the 
seconds rather than msec
     instantTimestamp = WriteClientTestUtils.createNewInstantTime();
     instantTimestampSec = instantTimestamp.substring(0, 
instantTimestamp.length() - 
HoodieInstantTimeGenerator.DEFAULT_MILLIS_EXT.length());
     assertFalse(timeline.empty());
@@ -572,7 +572,7 @@ public class TestHoodieIndex extends TestHoodieMetadataBase 
{
     // Timestamp contain in inflight timeline, checkContainsInstant() should 
return true
     String checkInstantTimestampSec = instantTimestamp.substring(0, 
instantTimestamp.length() - 
HoodieInstantTimeGenerator.DEFAULT_MILLIS_EXT.length());
     String checkInstantTimestamp = checkInstantTimestampSec + 
HoodieInstantTimeGenerator.DEFAULT_MILLIS_EXT;
-    Thread.sleep(1010); // sleep required so that new timestamp differs in the 
seconds rather than msec
+    Thread.sleep(2010); // sleep required so that new timestamp differs in the 
seconds rather than msec
     String newTimestamp = WriteClientTestUtils.createNewInstantTime();
     String newTimestampSec = newTimestamp.substring(0, newTimestamp.length() - 
HoodieInstantTimeGenerator.DEFAULT_MILLIS_EXT.length());
     final HoodieInstant instant5 = 
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.DELTA_COMMIT_ACTION, newTimestamp);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index 1a5e17893fa1..df3382b356de 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -534,50 +534,53 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
   test("Test Chinese table ") {
     withRecordType()(withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
-        val tableName = generateTableName
-        val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
-        spark.sql("set hoodie.schema.on.read.enable=true")
-        spark.sql("set " + 
DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
-        spark.sql(
-          s"""
-             |create table $tableName (
-             |  id int, comb int, `名字` string, col9 string, `成绩` int, `身高` 
float, `体重` double, `上次更新时间` date, par date
-             |) using hudi
-             | location '$tablePath'
-             | options (
-             |  type = '$tableType',
-             |  primaryKey = 'id',
-             |  preCombineField = 'comb'
-             | )
-             | partitioned by (par)
+        withSparkSqlSessionConfig(
+          "hoodie.schema.on.read.enable" -> "true",
+          "hoodie.datasource.write.schema.allow.auto.evolution.column.drop" -> 
"true",
+          DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key -> 
"upsert") {
+          val tableName = generateTableName
+          val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int, comb int, `名字` string, col9 string, `成绩` int, `身高` 
float, `体重` double, `上次更新时间` date, par date
+               |) using hudi
+               | location '$tablePath'
+               | options (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'comb'
+               | )
+               | partitioned by (par)
              """.stripMargin)
-        spark.sql(
-          s"""
-             | insert into $tableName values
-             | (1,3,'李明', '读书', 100,180.0001,99.0001,DATE'2021-12-25', 
DATE'2021-12-26')
-             |""".stripMargin)
-        spark.sql(s"alter table $tableName rename column col9 to `爱好_Best`")
-
-        // update current table to produce log files for mor
-        spark.sql(
-          s"""
-             | insert into $tableName values
-             | (1,3,'李明', '读书', 100,180.0001,99.0001,DATE'2021-12-26', 
DATE'2021-12-26')
-             |""".stripMargin)
-
-        // alter date to string
-        spark.sql(s"alter table $tableName alter column `上次更新时间` type string ")
-        checkAnswer(spark.sql(s"select `上次更新时间` from $tableName").collect())(
-          Seq("2021-12-26")
-        )
-        // alter string to date
-        spark.sql(s"alter table $tableName alter column `上次更新时间` type date ")
-        spark.sql(s"select `上次更新时间` from $tableName").collect()
-        checkAnswer(spark.sql(s"select `上次更新时间` from $tableName").collect())(
-          Seq(java.sql.Date.valueOf("2021-12-26"))
-        )
+          spark.sql(
+            s"""
+               | insert into $tableName values
+               | (1,3,'李明', '读书', 100,180.0001,99.0001,DATE'2021-12-25', 
DATE'2021-12-26')
+               |""".stripMargin)
+          spark.sql(s"alter table $tableName rename column col9 to `爱好_Best`")
+
+          // update current table to produce log files for mor
+          spark.sql(
+            s"""
+               | insert into $tableName values
+               | (1,3,'李明', '读书', 100,180.0001,99.0001,DATE'2021-12-26', 
DATE'2021-12-26')
+               |""".stripMargin)
+
+          // alter date to string
+          spark.sql(s"alter table $tableName alter column `上次更新时间` type string 
")
+          checkAnswer(spark.sql(s"select `上次更新时间` from $tableName").collect())(
+            Seq("2021-12-26")
+          )
+          // alter string to date
+          spark.sql(s"alter table $tableName alter column `上次更新时间` type date ")
+          spark.sql(s"select `上次更新时间` from $tableName").collect()
+          checkAnswer(spark.sql(s"select `上次更新时间` from $tableName").collect())(
+            Seq(java.sql.Date.valueOf("2021-12-26"))
+          )
+        }
+        
spark.sessionState.conf.unsetConf(DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key)
       }
-      
spark.sessionState.conf.unsetConf(DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key)
     })
   }
 
@@ -585,61 +588,64 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
   test("Test alter column by add rename and drop") {
     withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
-        val tableName = generateTableName
-        val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
-        spark.sql("set hoodie.schema.on.read.enable=true")
-        spark.sql(
-          s"""
-             |create table $tableName (
-             |  id int,
-             |  name string,
-             |  price double,
-             |  ts long
-             |) using hudi
-             | location '$tablePath'
-             | options (
-             |  type = '$tableType',
-             |  primaryKey = 'id',
-             |  preCombineField = 'ts'
-             | )
+        withSparkSqlSessionConfig(
+          "hoodie.schema.on.read.enable" -> "true",
+          "hoodie.datasource.write.schema.allow.auto.evolution.column.drop" -> 
"true") {
+          val tableName = generateTableName
+          val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  name string,
+               |  price double,
+               |  ts long
+               |) using hudi
+               | location '$tablePath'
+               | options (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts'
+               | )
              """.stripMargin)
-        spark.sql(s"show create table ${tableName}").show(false)
-        spark.sql(s"insert into ${tableName} values (1, 'jack', 0.9, 1000)")
-        spark.sql(s"update ${tableName} set price = 1.9  where id =  1")
+          spark.sql(s"show create table ${tableName}").show(false)
+          spark.sql(s"insert into ${tableName} values (1, 'jack', 0.9, 1000)")
+          spark.sql(s"update ${tableName} set price = 1.9  where id =  1")
 
-        spark.sql(s"alter table ${tableName} alter column id type long")
-        checkAnswer(createTestResult(tableName))(
-          Seq(1, "jack", 1.9, 1000)
-        )
-        // test add action, include position change
-        spark.sql(s"alter table ${tableName} add columns(ext1 string comment 
'add ext1' after name)")
-        spark.sql(s"insert into ${tableName} values (2, 'jack', 'exx1', 0.9, 
1000)")
-        checkAnswer(createTestResult(tableName))(
-          Seq(1, "jack", null, 1.9, 1000), Seq(2, "jack","exx1", 0.9, 1000)
-        )
-        // test rename
-        spark.sql(s"alter table ${tableName} rename column price to newprice")
-        checkAnswer(createTestResult(tableName))(
-          Seq(1, "jack", null, 1.9, 1000), Seq(2, "jack","exx1", 0.9, 1000)
-        )
-        spark.sql(s"update ${tableName} set ext1 =  'haha' where id =  1 ")
-        checkAnswer(createTestResult(tableName))(
-          Seq(1, "jack", "haha", 1.9, 1000), Seq(2, "jack","exx1", 0.9, 1000)
-        )
-        var maxColumnId = getMaxColumnId(tablePath)
-        // drop column newprice
-        spark.sql(s"alter table ${tableName} drop column newprice")
-        checkAnswer(createTestResult(tableName))(
-          Seq(1, "jack", "haha", 1000), Seq(2, "jack","exx1", 1000)
-        )
-        validateInternalSchema(tablePath, isDropColumn = true, 
currentMaxColumnId = maxColumnId)
-        maxColumnId = getMaxColumnId(tablePath)
-        // add newprice back
-        spark.sql(s"alter table ${tableName} add columns(newprice string 
comment 'add newprice back' after ext1)")
-        checkAnswer(createTestResult(tableName))(
-          Seq(1, "jack", "haha", null, 1000), Seq(2, "jack","exx1", null, 1000)
-        )
-        validateInternalSchema(tablePath, isDropColumn = false, 
currentMaxColumnId = maxColumnId)
+          spark.sql(s"alter table ${tableName} alter column id type long")
+          checkAnswer(createTestResult(tableName))(
+            Seq(1, "jack", 1.9, 1000)
+          )
+          // test add action, include position change
+          spark.sql(s"alter table ${tableName} add columns(ext1 string comment 
'add ext1' after name)")
+          spark.sql(s"insert into ${tableName} values (2, 'jack', 'exx1', 0.9, 
1000)")
+          checkAnswer(createTestResult(tableName))(
+            Seq(1, "jack", null, 1.9, 1000), Seq(2, "jack", "exx1", 0.9, 1000)
+          )
+          // test rename
+          spark.sql(s"alter table ${tableName} rename column price to 
newprice")
+          checkAnswer(createTestResult(tableName))(
+            Seq(1, "jack", null, 1.9, 1000), Seq(2, "jack", "exx1", 0.9, 1000)
+          )
+          spark.sql(s"update ${tableName} set ext1 =  'haha' where id =  1 ")
+          checkAnswer(createTestResult(tableName))(
+            Seq(1, "jack", "haha", 1.9, 1000), Seq(2, "jack", "exx1", 0.9, 
1000)
+          )
+          var maxColumnId = getMaxColumnId(tablePath)
+          // drop column newprice
+          spark.sql(s"alter table ${tableName} drop column newprice")
+          checkAnswer(createTestResult(tableName))(
+            Seq(1, "jack", "haha", 1000), Seq(2, "jack", "exx1", 1000)
+          )
+          validateInternalSchema(tablePath, isDropColumn = true, 
currentMaxColumnId = maxColumnId)
+          maxColumnId = getMaxColumnId(tablePath)
+          // add newprice back
+          spark.sql(s"alter table ${tableName} add columns(newprice string 
comment 'add newprice back' after ext1)")
+          checkAnswer(createTestResult(tableName))(
+            Seq(1, "jack", "haha", null, 1000), Seq(2, "jack", "exx1", null, 
1000)
+          )
+          validateInternalSchema(tablePath, isDropColumn = false, 
currentMaxColumnId = maxColumnId)
+        }
       }
     }
   }
@@ -691,35 +697,38 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
   test("Test alter column multiple times") {
     withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
-        val tableName = generateTableName
-        val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
-        spark.sql("set hoodie.schema.on.read.enable=true")
-        spark.sql(
-          s"""
-             |create table $tableName (
-             |  id int,
-             |  col1 string,
-             |  col2 string,
-             |  ts long
-             |) using hudi
-             | location '$tablePath'
-             | options (
-             |  type = '$tableType',
-             |  primaryKey = 'id',
-             |  preCombineField = 'ts'
-             | )
+        withSparkSqlSessionConfig(
+          "hoodie.schema.on.read.enable" -> "true",
+          "hoodie.datasource.write.schema.allow.auto.evolution.column.drop" -> 
"true") {
+          val tableName = generateTableName
+          val tablePath = s"${new Path(tmp.getCanonicalPath, 
tableName).toUri.toString}"
+          spark.sql(
+            s"""
+               |create table $tableName (
+               |  id int,
+               |  col1 string,
+               |  col2 string,
+               |  ts long
+               |) using hudi
+               | location '$tablePath'
+               | options (
+               |  type = '$tableType',
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts'
+               | )
              """.stripMargin)
-        spark.sql(s"show create table ${tableName}").show(false)
-        spark.sql(s"insert into ${tableName} values (1, 'aaa', 'bbb', 1000)")
+          spark.sql(s"show create table ${tableName}").show(false)
+          spark.sql(s"insert into ${tableName} values (1, 'aaa', 'bbb', 1000)")
 
-        // Rename to a previously existing column name + insert
-        spark.sql(s"alter table ${tableName} drop column col1")
-        spark.sql(s"alter table ${tableName} rename column col2 to col1")
+          // Rename to a previously existing column name + insert
+          spark.sql(s"alter table ${tableName} drop column col1")
+          spark.sql(s"alter table ${tableName} rename column col2 to col1")
 
-        spark.sql(s"insert into ${tableName} values (2, 'aaa', 1000)")
-        checkAnswer(spark.sql(s"select col1 from ${tableName} order by 
id").collect())(
-          Seq("bbb"), Seq("aaa")
-        )
+          spark.sql(s"insert into ${tableName} values (2, 'aaa', 1000)")
+          checkAnswer(spark.sql(s"select col1 from ${tableName} order by 
id").collect())(
+            Seq("bbb"), Seq("aaa")
+          )
+        }
       }
     }
   }
@@ -727,6 +736,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
   test("Test alter column with complex schema") {
     withTempDir { tmp =>
       withSQLConf(s"${DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION}" 
-> "upsert",
+        "hoodie.datasource.write.schema.allow.auto.evolution.column.drop" -> 
"true",
         "hoodie.schema.on.read.enable" -> "true",
         "spark.sql.parquet.enableNestedColumnVectorizedReader" -> "false") {
         val tableName = generateTableName
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestTimeTravelTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestTimeTravelTable.scala
index 855bfdace0f5..e6ddc1ed724f 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestTimeTravelTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestTimeTravelTable.scala
@@ -278,55 +278,58 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase {
 
   test("Test Time Travel With Schema Evolution") {
     withRecordType()(withTempDir { tmp =>
-      spark.sql("set hoodie.schema.on.read.enable=true")
-      val tableName = generateTableName
-      spark.sql(
-        s"""
-           |create table $tableName (
-           |  id int,
-           |  name string,
-           |  price double,
-           |  ts long
-           |) using hudi
-           | tblproperties (
-           |  primaryKey = 'id',
-           |  preCombineField = 'ts'
-           | )
-           | location '${tmp.getCanonicalPath}/$tableName'
+      withSparkSqlSessionConfig("hoodie.schema.on.read.enable" -> "true",
+                                
"hoodie.datasource.write.schema.allow.auto.evolution.column.drop" -> "true") {
+        spark.sql("set hoodie.schema.on.read.enable=true")
+        val tableName = generateTableName
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | tblproperties (
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts'
+             | )
+             | location '${tmp.getCanonicalPath}/$tableName'
        """.stripMargin)
 
-      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-
-      val metaClient = createMetaClient(spark, 
s"${tmp.getCanonicalPath}/$tableName")
-      val instant1 = metaClient.reloadActiveTimeline().getAllCommitsTimeline
-        .lastInstant().get().requestedTime
-
-      // add column
-      spark.sql(s"alter table $tableName add columns (company string)")
-      spark.sql(s"insert into $tableName values(2, 'a2', 11, 1100, 'hudi')")
-      val instant2 = metaClient.reloadActiveTimeline().getAllCommitsTimeline
-        .lastInstant().get().requestedTime
-
-      // drop column
-      spark.sql(s"alter table $tableName drop column price")
-
-      val result1 = spark.sql(s"select * from ${tableName} timestamp as of 
$instant1 order by id")
-        .drop("_hoodie_commit_time", "_hoodie_commit_seqno", 
"_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
-      checkAnswer(result1)(Seq(1, "a1", 10.0, 1000))
-
-      val result2 = spark.sql(s"select * from ${tableName} timestamp as of 
$instant2 order by id")
-        .drop("_hoodie_commit_time", "_hoodie_commit_seqno", 
"_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
-      checkAnswer(result2)(
-        Seq(1, "a1", 10.0, 1000, null),
-        Seq(2, "a2", 11.0, 1100, "hudi")
-      )
-
-      val result3 = spark.sql(s"select * from ${tableName} order by id")
-        .drop("_hoodie_commit_time", "_hoodie_commit_seqno", 
"_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
-      checkAnswer(result3)(
-        Seq(1, "a1", 1000, null),
-        Seq(2, "a2", 1100, "hudi")
-      )
+        spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+
+        val metaClient = createMetaClient(spark, 
s"${tmp.getCanonicalPath}/$tableName")
+        val instant1 = metaClient.reloadActiveTimeline().getAllCommitsTimeline
+          .lastInstant().get().requestedTime
+
+        // add column
+        spark.sql(s"alter table $tableName add columns (company string)")
+        spark.sql(s"insert into $tableName values(2, 'a2', 11, 1100, 'hudi')")
+        val instant2 = metaClient.reloadActiveTimeline().getAllCommitsTimeline
+          .lastInstant().get().requestedTime
+
+        // drop column
+        spark.sql(s"alter table $tableName drop column price")
+
+        val result1 = spark.sql(s"select * from ${tableName} timestamp as of 
$instant1 order by id")
+          .drop("_hoodie_commit_time", "_hoodie_commit_seqno", 
"_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
+        checkAnswer(result1)(Seq(1, "a1", 10.0, 1000))
+
+        val result2 = spark.sql(s"select * from ${tableName} timestamp as of 
$instant2 order by id")
+          .drop("_hoodie_commit_time", "_hoodie_commit_seqno", 
"_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
+        checkAnswer(result2)(
+          Seq(1, "a1", 10.0, 1000, null),
+          Seq(2, "a2", 11.0, 1100, "hudi")
+        )
+
+        val result3 = spark.sql(s"select * from ${tableName} order by id")
+          .drop("_hoodie_commit_time", "_hoodie_commit_seqno", 
"_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
+        checkAnswer(result3)(
+          Seq(1, "a1", 1000, null),
+          Seq(2, "a2", 1100, "hudi")
+        )
+      }
     })
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
index 5e3a304a2640..a25a748dd953 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.{DataSourceReadOptions, 
DataSourceWriteOptions, HoodieSpa
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode}
 import org.apache.hudi.common.model.WriteOperationType
-import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
+import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.testutils.{HoodieTestDataGenerator, 
HoodieTestUtils}
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, 
HoodieWriteConfig}
@@ -35,7 +35,7 @@ import org.apache.hudi.storage.StoragePath
 
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotNull, assertTrue}
 
 import java.util.concurrent.atomic.AtomicInteger
 
@@ -714,6 +714,22 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
     String.format("%03d", new Integer(instantTime.incrementAndGet()))
   }
 
+  private def validateFieldType(basePath: String, fieldName: String, 
expectedType: String): Unit = {
+    val metaClient = HoodieTableMetaClient.builder()
+      .setBasePath(basePath)
+      .setConf(HoodieTestUtils.getDefaultStorageConf)
+      .build()
+    val schemaResolver = new TableSchemaResolver(metaClient)
+    val tableSchema = schemaResolver.getTableAvroSchema(false)
+    val field = tableSchema.getField(fieldName)
+    assertNotNull(field, s"$fieldName field should exist in table schema")
+    val fieldType = field.schema()
+    assertTrue(
+      fieldType.toString.contains(expectedType),
+      s"$fieldName field should be of type $expectedType, but got: 
${fieldType.toString}"
+    )
+  }
+
   private def createTempTableAndInsert(tableName: String, basePath: String) = {
     spark.sql(
       s"""
@@ -749,7 +765,6 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
     )
   }
 
-
   /**
    * Test secondary index with nullable columns covering comprehensive 
scenarios for both COW and MOR:
    * - Initial creation with null values
@@ -1078,4 +1093,95 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
       .map(_.getString(0))
       .filter(_.contains(SECONDARY_INDEX_RECORD_KEY_SEPARATOR))
   }
+
+  test("Test Secondary Index with Schema Evolution - Column Type Change Should 
Fail") {
+    import spark.implicits._
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+      // Helper function to validate schema evolution exception
+      def validateSchemaEvolutionException(exception: Exception, columnName: 
String, indexName: String): Unit = {
+        assertTrue(
+          exception.getMessage.contains("Failed upsert schema compatibility 
check") ||
+          exception.getMessage.contains(s"Column '$columnName' has secondary 
index '$indexName'") ||
+          (exception.getCause != null &&
+            exception.getCause.getMessage.contains(s"Column '$columnName' has 
secondary index '$indexName' and cannot evolve")),
+          s"Got unexpected exception message: ${exception.getMessage}"
+        )
+      }
+
+      // Create table with initial schema where quantity1 and quantity2 are int
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  quantity1 int,
+           |  quantity2 int,
+           |  ts long
+           |) using hudi
+           | options (
+           |  primaryKey ='id',
+           |  type = 'cow',
+           |  preCombineField = 'ts',
+           |  hoodie.metadata.enable = 'true',
+           |  hoodie.metadata.record.index.enable = 'true'
+           | )
+           | location '$basePath'
+   """.stripMargin)
+
+      // Insert initial data with integer quantities
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 100, 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 20, 200, 1001)")
+
+      // Create secondary indexes on both quantity columns
+      spark.sql(s"create index idx_quantity1 on $tableName (quantity1)")
+      spark.sql(s"create index idx_quantity2 on $tableName (quantity2)")
+      spark.sql(s"set hoodie.schema.on.read.enable = true")
+
+      // Verify indexes exist
+      checkAnswer(s"show indexes from default.$tableName")(
+        Seq("column_stats", "column_stats", ""),
+        Seq("record_index", "record_index", ""),
+        Seq("secondary_index_idx_quantity1", "secondary_index", "quantity1"),
+        Seq("secondary_index_idx_quantity2", "secondary_index", "quantity2")
+      )
+
+      // Validate that both quantity fields are initially int type
+      validateFieldType(basePath, "quantity1", "int")
+      validateFieldType(basePath, "quantity2", "int")
+
+      // Try SQL ALTER on quantity2 - should fail because of secondary index
+      val caughtAlterException = intercept[Exception] {
+        spark.sql(s"ALTER TABLE $tableName ALTER COLUMN quantity2 TYPE double")
+      }
+      validateSchemaEvolutionException(caughtAlterException, "quantity2", 
"secondary_index_idx_quantity2")
+
+      // Try data source write evolution on quantity1 - should fail because of 
secondary index
+      val caughtDSWriteException = intercept[Exception] {
+        val evolvedData = Seq(
+          (3, "a3", 30.5, 300, 1002L) // Note: quantity1 is now double (30.5) 
instead of int
+        ).toDF("id", "name", "quantity1", "quantity2", "ts")
+
+        evolvedData.write
+          .format("hudi")
+          .option("hoodie.table.name", tableName)
+          .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
+          .option("hoodie.datasource.write.recordkey.field", "id")
+          .option("hoodie.datasource.write.precombine.field", "ts")
+          .option("hoodie.datasource.write.operation", "upsert")
+          .option("hoodie.schema.on.read.enable", "true")
+          .mode("append")
+          .save(basePath)
+      }
+      validateSchemaEvolutionException(caughtDSWriteException, "quantity1", 
"secondary_index_idx_quantity1")
+
+      // Verify that the original data is still intact
+      checkAnswer(s"SELECT id, name, quantity1, quantity2, ts FROM $tableName 
ORDER BY id")(
+        Seq(1, "a1", 10, 100, 1000),
+        Seq(2, "a2", 20, 200, 1001)
+      )
+    }
+  }
 }

Reply via email to