This is an automated email from the ASF dual-hosted git repository.
jonvex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0fe119a0cf1e [HUDI-9566] Ban schema evolution on columns with SI
(#13595)
0fe119a0cf1e is described below
commit 0fe119a0cf1e0b8ef44a2049fd15e56fcb62cfb9
Author: Davis-Zhang-Onehouse
<[email protected]>
AuthorDate: Fri Jul 25 08:22:16 2025 -0700
[HUDI-9566] Ban schema evolution on columns with SI (#13595)
---
.../java/org/apache/hudi/table/HoodieTable.java | 77 +++++-
.../hudi/table/TestHoodieTableSchemaEvolution.java | 307 +++++++++++++++++++++
.../spark/sql/hudi/command/AlterTableCommand.scala | 1 +
.../functional/TestHiveTableSchemaEvolution.java | 2 +
.../apache/hudi/functional/TestHoodieIndex.java | 4 +-
.../apache/spark/sql/hudi/ddl/TestSpark3DDL.scala | 252 +++++++++--------
.../sql/hudi/dml/others/TestTimeTravelTable.scala | 97 +++----
.../hudi/feature/index/TestSecondaryIndex.scala | 112 +++++++-
8 files changed, 678 insertions(+), 174 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 82a2cc63be33..f3ba73c6dac5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table;
+import org.apache.hudi.avro.AvroSchemaCompatibility;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
@@ -45,6 +46,8 @@ import org.apache.hudi.common.fs.FailSafeConsistencyGuard;
import org.apache.hudi.common.fs.OptimisticConsistencyGuard;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieIndexMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -102,6 +105,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -112,6 +116,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
import static
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
import static
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.LAZY;
import static
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
@@ -928,7 +933,7 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
* When inserting/updating data, we read records using the last used schema
and convert them to the
* GenericRecords with writerSchema. Hence, we need to ensure that this
conversion can take place without errors.
*/
- private void validateSchema() throws HoodieUpsertException,
HoodieInsertException {
+ public void validateSchema() throws HoodieUpsertException,
HoodieInsertException {
boolean shouldValidate = config.shouldValidateAvroSchema();
boolean allowProjection = config.shouldAllowAutoEvolutionColumnDrop();
@@ -949,6 +954,12 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
Schema writerSchema =
HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
Schema tableSchema =
HoodieAvroUtils.createHoodieWriteSchema(existingTableSchema.get());
AvroSchemaUtils.checkSchemaCompatible(tableSchema, writerSchema,
shouldValidate, allowProjection, getDropPartitionColNames());
+
+ // Check secondary index column compatibility
+ Option<HoodieIndexMetadata> indexMetadata =
metaClient.getIndexMetadata();
+ if (indexMetadata.isPresent()) {
+ validateSecondaryIndexSchemaEvolution(tableSchema, writerSchema,
indexMetadata.get());
+ }
} catch (SchemaCompatibilityException e) {
throw e;
} catch (Exception e) {
@@ -956,6 +967,70 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
}
}
+ /**
+ * Validates that columns with secondary indexes are not evolved in an
incompatible way.
+ *
+ * @param tableSchema the current table schema
+ * @param writerSchema the new writer schema
+ * @param indexMetadata the index metadata containing all index definitions
+ * @throws SchemaCompatibilityException if a secondary index column has
incompatible evolution
+ */
+ static void validateSecondaryIndexSchemaEvolution(
+ Schema tableSchema,
+ Schema writerSchema,
+ HoodieIndexMetadata indexMetadata) throws SchemaCompatibilityException {
+
+ // Filter for secondary index definitions
+ List<HoodieIndexDefinition> secondaryIndexDefs =
indexMetadata.getIndexDefinitions().values().stream()
+ .filter(indexDef ->
MetadataPartitionType.fromPartitionPath(indexDef.getIndexName()).equals(MetadataPartitionType.SECONDARY_INDEX))
+ .collect(Collectors.toList());
+
+ if (secondaryIndexDefs.isEmpty()) {
+ return;
+ }
+
+ // Create a map from source field to index name for efficient lookup
+ Map<String, String> columnToIndexName = new HashMap<>();
+ for (HoodieIndexDefinition indexDef : secondaryIndexDefs) {
+ String indexName = indexDef.getIndexName();
+ for (String sourceField : indexDef.getSourceFields()) {
+ // Note: If a column is part of multiple indexes, this will use the
last one
+ // This is fine since we just need any index name for error reporting
+ columnToIndexName.put(sourceField, indexName);
+ }
+ }
+
+ // Check each indexed column for schema evolution
+ for (Map.Entry<String, String> entry : columnToIndexName.entrySet()) {
+ String columnName = entry.getKey();
+ String indexName = entry.getValue();
+
+ Schema.Field tableField = tableSchema.getField(columnName);
+
+ if (tableField == null) {
+ // This shouldn't happen as indexed columns should exist in table
schema
+ LOG.warn("Secondary index '{}' references non-existent column: {}",
indexName, columnName);
+ continue;
+ }
+
+ // Use AvroSchemaCompatibility's field lookup logic to handle aliases
+ Schema.Field writerField =
AvroSchemaCompatibility.lookupWriterField(writerSchema, tableField);
+
+ if (writerField != null &&
!tableField.schema().equals(writerField.schema())) {
+ // Check if this is just making the field nullable/non-nullable, which
is safe from SI perspective
+ if
(resolveNullableSchema(tableField.schema()).equals(resolveNullableSchema(writerField.schema())))
{
+ continue;
+ }
+
+ String errorMessage = String.format(
+ "Column '%s' has secondary index '%s' and cannot evolve from
schema '%s' to '%s'. "
+ + "Please drop the secondary index before changing the column
type.",
+ columnName, indexName, tableField.schema(), writerField.schema());
+ throw new SchemaCompatibilityException(errorMessage);
+ }
+ }
+ }
+
public void validateUpsertSchema() throws HoodieUpsertException {
if (isMetadataTable) {
return;
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestHoodieTableSchemaEvolution.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestHoodieTableSchemaEvolution.java
new file mode 100644
index 000000000000..3143beb6ed0b
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/TestHoodieTableSchemaEvolution.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.exception.SchemaCompatibilityException;
+import org.apache.hudi.metadata.MetadataPartitionType;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieTable#validateSecondaryIndexSchemaEvolution}.
+ */
+public class TestHoodieTableSchemaEvolution {
+
+ private static final String TABLE_SCHEMA = "{"
+ + "\"type\": \"record\","
+ + "\"name\": \"test\","
+ + "\"fields\": ["
+ + " {\"name\": \"id\", \"type\": \"int\"},"
+ + " {\"name\": \"name\", \"type\": \"string\"},"
+ + " {\"name\": \"age\", \"type\": \"int\"},"
+ + " {\"name\": \"salary\", \"type\": \"double\"},"
+ + " {\"name\": \"nullable_field\", \"type\": [\"null\", \"string\"],
\"default\": null}"
+ + "]}";
+
+ @Test
+ public void testNoSecondaryIndexes() {
+ // When there are no secondary indexes, any schema evolution should be
allowed
+ Schema tableSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+ Schema writerSchema = new
Schema.Parser().parse(TABLE_SCHEMA.replace("\"age\", \"type\": \"int\"",
"\"age\", \"type\": \"long\""));
+
+ HoodieIndexMetadata indexMetadata = new HoodieIndexMetadata(new
HashMap<>());
+
+ assertDoesNotThrow(() ->
+ HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema,
writerSchema, indexMetadata));
+ }
+
+ @Test
+ public void testNoSchemaChange() {
+ // When schema doesn't change, validation should pass even with secondary
indexes
+ Schema tableSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+ Schema writerSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+
+ HoodieIndexDefinition indexDef =
createSecondaryIndexDefinition("secondary_index_age", "age");
+ HoodieIndexMetadata indexMetadata = createIndexMetadata(indexDef);
+
+ assertDoesNotThrow(() ->
+ HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema,
writerSchema, indexMetadata));
+ }
+
+ @Test
+ public void testNonIndexedColumnEvolution() {
+ // Evolution of non-indexed columns should be allowed
+ Schema tableSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+ Schema writerSchema = new
Schema.Parser().parse(TABLE_SCHEMA.replace("\"name\", \"type\": \"string\"",
"\"name\", \"type\": [\"null\", \"string\"]"));
+
+ HoodieIndexDefinition indexDef =
createSecondaryIndexDefinition("secondary_index_age", "age");
+ HoodieIndexMetadata indexMetadata = createIndexMetadata(indexDef);
+
+ assertDoesNotThrow(() ->
+ HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema,
writerSchema, indexMetadata));
+ }
+
+ @ParameterizedTest
+ @MethodSource("provideInvalidSchemaEvolutions")
+ public void testIndexedColumnTypeEvolution(String fieldName, String
originalType, String evolvedType) {
+ // Type evolution of indexed columns should fail
+ String originalSchema = TABLE_SCHEMA;
+ String evolvedSchema = TABLE_SCHEMA.replace("\"" + fieldName + "\",
\"type\": \"" + originalType + "\"",
+ "\"" + fieldName + "\",
\"type\": \"" + evolvedType + "\"");
+
+ Schema tableSchema = new Schema.Parser().parse(originalSchema);
+ Schema writerSchema = new Schema.Parser().parse(evolvedSchema);
+
+ HoodieIndexDefinition indexDef =
createSecondaryIndexDefinition("secondary_index_" + fieldName, fieldName);
+ HoodieIndexMetadata indexMetadata = createIndexMetadata(indexDef);
+
+ SchemaCompatibilityException exception =
assertThrows(SchemaCompatibilityException.class, () ->
+ HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema,
writerSchema, indexMetadata));
+
+ assertTrue(exception.getMessage().contains("secondary index"));
+ assertTrue(exception.getMessage().contains(fieldName));
+ assertTrue(exception.getMessage().contains("secondary_index_" +
fieldName));
+ }
+
+ private static Stream<Arguments> provideInvalidSchemaEvolutions() {
+ return Stream.of(
+ Arguments.of("age", "int", "long"), // int to long
+ Arguments.of("age", "int", "double"), // int to double
+ Arguments.of("salary", "double", "float") // double to float
+ );
+ }
+
+ @Test
+ public void testMultipleIndexesOnSameColumn() {
+ // When a column has multiple indexes, error should mention at least one
+ Schema tableSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+ Schema writerSchema = new
Schema.Parser().parse(TABLE_SCHEMA.replace("\"age\", \"type\": \"int\"",
"\"age\", \"type\": \"long\""));
+
+ HoodieIndexDefinition indexDef1 =
createSecondaryIndexDefinition("secondary_index_age_1", "age");
+ HoodieIndexDefinition indexDef2 =
createSecondaryIndexDefinition("secondary_index_age_2", "age");
+
+ Map<String, HoodieIndexDefinition> indexDefs = new HashMap<>();
+ indexDefs.put("secondary_index_age_1", indexDef1);
+ indexDefs.put("secondary_index_age_2", indexDef2);
+ HoodieIndexMetadata indexMetadata = new HoodieIndexMetadata(indexDefs);
+
+ SchemaCompatibilityException exception =
assertThrows(SchemaCompatibilityException.class, () ->
+ HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema,
writerSchema, indexMetadata));
+
+ assertTrue(exception.getMessage().contains("age"));
+ // Should contain at least one of the index names
+ assertTrue(exception.getMessage().contains("secondary_index_age_1") ||
exception.getMessage().contains("secondary_index_age_2"));
+ }
+
+ @Test
+ public void testCompoundIndex() {
+ // Test index on multiple columns - if any column evolves, should fail
+ Schema tableSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+ Schema writerSchema = new
Schema.Parser().parse(TABLE_SCHEMA.replace("\"age\", \"type\": \"int\"",
"\"age\", \"type\": \"long\""));
+
+ HoodieIndexDefinition indexDef =
createSecondaryIndexDefinition("secondary_index_compound", "name", "age");
+ HoodieIndexMetadata indexMetadata = createIndexMetadata(indexDef);
+
+ SchemaCompatibilityException exception =
assertThrows(SchemaCompatibilityException.class, () ->
+ HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema,
writerSchema, indexMetadata));
+
+ assertTrue(exception.getMessage().contains("age"));
+ assertTrue(exception.getMessage().contains("secondary_index_compound"));
+ }
+
+ @Test
+ public void testFieldWithAlias() {
+ // Test schema evolution with field aliases
+ String tableSchemaStr = "{"
+ + "\"type\": \"record\","
+ + "\"name\": \"test\","
+ + "\"fields\": ["
+ + " {\"name\": \"id\", \"type\": \"int\"},"
+ + " {\"name\": \"old_name\", \"type\": \"string\", \"aliases\":
[\"new_name\"]}"
+ + "]}";
+
+ String writerSchemaStr = "{"
+ + "\"type\": \"record\","
+ + "\"name\": \"test\","
+ + "\"fields\": ["
+ + " {\"name\": \"id\", \"type\": \"int\"},"
+ + " {\"name\": \"new_name\", \"type\": \"string\"}" // Field renamed
using alias
+ + "]}";
+
+ Schema tableSchema = new Schema.Parser().parse(tableSchemaStr);
+ Schema writerSchema = new Schema.Parser().parse(writerSchemaStr);
+
+ HoodieIndexDefinition indexDef =
createSecondaryIndexDefinition("secondary_index_name", "old_name");
+ HoodieIndexMetadata indexMetadata = createIndexMetadata(indexDef);
+
+ // Should pass because the field is found via alias and type hasn't changed
+ assertDoesNotThrow(() ->
+ HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema,
writerSchema, indexMetadata));
+ }
+
+ @Test
+ public void testNullableFieldEvolution() {
+ // Test evolution from non-nullable to nullable
+ String evolvedSchema = TABLE_SCHEMA.replace("\"name\", \"type\":
\"string\"",
+ "\"name\", \"type\":
[\"null\", \"string\"], \"default\": null");
+
+ Schema tableSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+ Schema writerSchema = new Schema.Parser().parse(evolvedSchema);
+
+ HoodieIndexDefinition indexDef =
createSecondaryIndexDefinition("secondary_index_name", "name");
+ HoodieIndexMetadata indexMetadata = createIndexMetadata(indexDef);
+
+ // Making a field nullable is a backward-compatible change and should be
allowed
+ assertDoesNotThrow(() ->
+ HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema,
writerSchema, indexMetadata));
+ }
+
+ @Test
+ public void testMissingIndexedColumnInTableSchema() {
+ // Edge case: index references a column that doesn't exist in table schema
+ Schema tableSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+ Schema writerSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+
+ HoodieIndexDefinition indexDef =
createSecondaryIndexDefinition("secondary_index_nonexistent",
"nonexistent_column");
+ HoodieIndexMetadata indexMetadata = createIndexMetadata(indexDef);
+
+ // Should handle gracefully (logs warning and continues)
+ assertDoesNotThrow(() ->
+ HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema,
writerSchema, indexMetadata));
+ }
+
+ @Test
+ public void testNonSecondaryIndexDefinitions() {
+ // Test that non-secondary index definitions are ignored
+ Schema tableSchema = new Schema.Parser().parse(TABLE_SCHEMA);
+ Schema writerSchema = new
Schema.Parser().parse(TABLE_SCHEMA.replace("\"age\", \"type\": \"int\"",
"\"age\", \"type\": \"long\""));
+
+ // Create an expression index (not secondary index)
+ HoodieIndexDefinition expressionIndexDef =
HoodieIndexDefinition.newBuilder()
+
.withIndexName(MetadataPartitionType.EXPRESSION_INDEX.getPartitionPath() +
"_expr_idx")
+
.withIndexType(MetadataPartitionType.EXPRESSION_INDEX.getPartitionPath())
+ .withSourceFields(Collections.singletonList("age"))
+ .build();
+
+ HoodieIndexMetadata indexMetadata =
createIndexMetadata(expressionIndexDef);
+
+ // Should not throw because it's not a secondary index
+ assertDoesNotThrow(() ->
+ HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchema,
writerSchema, indexMetadata));
+ }
+
+ @Test
+ public void testFixedTypeEvolution() {
+ // Test fixed type size changes
+ String fixed8Schema = "{"
+ + "\"type\": \"record\","
+ + "\"name\": \"test\","
+ + "\"fields\": ["
+ + " {\"name\": \"id\", \"type\": \"int\"},"
+ + " {\"name\": \"fixed_field\", \"type\": {\"type\": \"fixed\",
\"name\": \"FixedField\", \"size\": 8}}"
+ + "]}";
+
+ String fixed16Schema = "{"
+ + "\"type\": \"record\","
+ + "\"name\": \"test\","
+ + "\"fields\": ["
+ + " {\"name\": \"id\", \"type\": \"int\"},"
+ + " {\"name\": \"fixed_field\", \"type\": {\"type\": \"fixed\",
\"name\": \"FixedField\", \"size\": 16}}"
+ + "]}";
+
+ Schema tableSchema = new Schema.Parser().parse(fixed8Schema);
+ Schema writerSchema = new Schema.Parser().parse(fixed16Schema);
+
+ HoodieIndexDefinition indexDef =
createSecondaryIndexDefinition("secondary_index_fixed", "fixed_field");
+ HoodieIndexMetadata indexMetadata = createIndexMetadata(indexDef);
+
+ final Schema tableSchemaFixed1 = tableSchema;
+ final Schema writerSchemaFixed1 = writerSchema;
+ final HoodieIndexMetadata indexMetadataFixed = indexMetadata;
+ SchemaCompatibilityException exception =
assertThrows(SchemaCompatibilityException.class, () ->
+ HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchemaFixed1,
writerSchemaFixed1, indexMetadataFixed));
+
+ assertTrue(exception.getMessage().contains("fixed_field"));
+ assertTrue(exception.getMessage().contains("secondary index"));
+
+ // Fixed size decrease
+ tableSchema = new Schema.Parser().parse(fixed16Schema);
+ writerSchema = new Schema.Parser().parse(fixed8Schema);
+
+ final Schema tableSchemaFixed2 = tableSchema;
+ final Schema writerSchemaFixed2 = writerSchema;
+ exception = assertThrows(SchemaCompatibilityException.class, () ->
+ HoodieTable.validateSecondaryIndexSchemaEvolution(tableSchemaFixed2,
writerSchemaFixed2, indexMetadataFixed));
+
+ assertTrue(exception.getMessage().contains("fixed_field"));
+ assertTrue(exception.getMessage().contains("secondary index"));
+ }
+
+ private HoodieIndexDefinition createSecondaryIndexDefinition(String
indexName, String... sourceFields) {
+ return HoodieIndexDefinition.newBuilder()
+
.withIndexName(MetadataPartitionType.SECONDARY_INDEX.getPartitionPath() + "_" +
indexName)
+
.withIndexType(MetadataPartitionType.SECONDARY_INDEX.getPartitionPath())
+ .withSourceFields(Arrays.asList(sourceFields))
+ .build();
+ }
+
+ private HoodieIndexMetadata createIndexMetadata(HoodieIndexDefinition...
indexDefs) {
+ Map<String, HoodieIndexDefinition> indexDefMap = new HashMap<>();
+ for (HoodieIndexDefinition indexDef : indexDefs) {
+ indexDefMap.put(indexDef.getIndexName(), indexDef);
+ }
+ return new HoodieIndexMetadata(indexDefMap);
+ }
+}
\ No newline at end of file
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
index 40132b5e54b7..65a25b6ab019 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
@@ -277,6 +277,7 @@ object AlterTableCommand extends Logging {
client.setOperationType(WriteOperationType.ALTER_SCHEMA)
val hoodieTable = HoodieSparkTable.create(client.getConfig,
client.getEngineContext)
+ hoodieTable.validateSchema()
val timeLine = hoodieTable.getActiveTimeline
val instantGenerator = metaClient.getTimelineLayout.getInstantGenerator
val requested = instantGenerator.createNewInstant(State.REQUESTED,
commitActionType, instantTime)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
index 8d3c925f3740..ecc0a2afbf5c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
@@ -99,6 +99,7 @@ public class TestHiveTableSchemaEvolution {
String path = new
Path(basePath.toAbsolutePath().toString()).toUri().toString();
spark.sql("set hoodie.schema.on.read.enable=true");
+ spark.sql("set
hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true");
spark.sql(String.format("create table %s (col0 int, col1 float, col2
string, col3 timestamp) using hudi "
+ "tblproperties (type='mor', primaryKey='col0',
preCombineField='col1', "
@@ -145,6 +146,7 @@ public class TestHiveTableSchemaEvolution {
String path = new
Path(basePath.toAbsolutePath().toString()).toUri().toString();
spark.sql("set hoodie.schema.on.read.enable=true");
+ spark.sql("set
hoodie.datasource.write.schema.allow.auto.evolution.column.drop=true");
spark.sql(String.format("create table %s (col0 int, col1 float, col2
string) using hudi "
+ "tblproperties (type='%s', primaryKey='col0',
preCombineField='col1', "
+
"hoodie.compaction.payload.class='org.apache.hudi.common.model.OverwriteWithLatestAvroPayload')
location '%s'",
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
index 846b10897b79..6109bf8cc9cf 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieIndex.java
@@ -560,7 +560,7 @@ public class TestHoodieIndex extends TestHoodieMetadataBase
{
assertTrue(HoodieIndexUtils.checkIfValidCommit(timeline,
instantTimestampSec));
// With a sec format instant time checkIfValid should return false
assuming its > first entry in the active timeline
- Thread.sleep(1010); // sleep required so that new timestamp differs in the
seconds rather than msec
+ Thread.sleep(2010); // sleep required so that new timestamp differs in the
seconds rather than msec
instantTimestamp = WriteClientTestUtils.createNewInstantTime();
instantTimestampSec = instantTimestamp.substring(0,
instantTimestamp.length() -
HoodieInstantTimeGenerator.DEFAULT_MILLIS_EXT.length());
assertFalse(timeline.empty());
@@ -572,7 +572,7 @@ public class TestHoodieIndex extends TestHoodieMetadataBase
{
// Timestamp contain in inflight timeline, checkContainsInstant() should
return true
String checkInstantTimestampSec = instantTimestamp.substring(0,
instantTimestamp.length() -
HoodieInstantTimeGenerator.DEFAULT_MILLIS_EXT.length());
String checkInstantTimestamp = checkInstantTimestampSec +
HoodieInstantTimeGenerator.DEFAULT_MILLIS_EXT;
- Thread.sleep(1010); // sleep required so that new timestamp differs in the
seconds rather than msec
+ Thread.sleep(2010); // sleep required so that new timestamp differs in the
seconds rather than msec
String newTimestamp = WriteClientTestUtils.createNewInstantTime();
String newTimestampSec = newTimestamp.substring(0, newTimestamp.length() -
HoodieInstantTimeGenerator.DEFAULT_MILLIS_EXT.length());
final HoodieInstant instant5 =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.DELTA_COMMIT_ACTION, newTimestamp);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index 1a5e17893fa1..df3382b356de 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -534,50 +534,53 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
test("Test Chinese table ") {
withRecordType()(withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
- val tableName = generateTableName
- val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
- spark.sql("set hoodie.schema.on.read.enable=true")
- spark.sql("set " +
DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key + "=upsert")
- spark.sql(
- s"""
- |create table $tableName (
- | id int, comb int, `名字` string, col9 string, `成绩` int, `身高`
float, `体重` double, `上次更新时间` date, par date
- |) using hudi
- | location '$tablePath'
- | options (
- | type = '$tableType',
- | primaryKey = 'id',
- | preCombineField = 'comb'
- | )
- | partitioned by (par)
+ withSparkSqlSessionConfig(
+ "hoodie.schema.on.read.enable" -> "true",
+ "hoodie.datasource.write.schema.allow.auto.evolution.column.drop" ->
"true",
+ DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key ->
"upsert") {
+ val tableName = generateTableName
+ val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int, comb int, `名字` string, col9 string, `成绩` int, `身高`
float, `体重` double, `上次更新时间` date, par date
+ |) using hudi
+ | location '$tablePath'
+ | options (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'comb'
+ | )
+ | partitioned by (par)
""".stripMargin)
- spark.sql(
- s"""
- | insert into $tableName values
- | (1,3,'李明', '读书', 100,180.0001,99.0001,DATE'2021-12-25',
DATE'2021-12-26')
- |""".stripMargin)
- spark.sql(s"alter table $tableName rename column col9 to `爱好_Best`")
-
- // update current table to produce log files for mor
- spark.sql(
- s"""
- | insert into $tableName values
- | (1,3,'李明', '读书', 100,180.0001,99.0001,DATE'2021-12-26',
DATE'2021-12-26')
- |""".stripMargin)
-
- // alter date to string
- spark.sql(s"alter table $tableName alter column `上次更新时间` type string ")
- checkAnswer(spark.sql(s"select `上次更新时间` from $tableName").collect())(
- Seq("2021-12-26")
- )
- // alter string to date
- spark.sql(s"alter table $tableName alter column `上次更新时间` type date ")
- spark.sql(s"select `上次更新时间` from $tableName").collect()
- checkAnswer(spark.sql(s"select `上次更新时间` from $tableName").collect())(
- Seq(java.sql.Date.valueOf("2021-12-26"))
- )
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1,3,'李明', '读书', 100,180.0001,99.0001,DATE'2021-12-25',
DATE'2021-12-26')
+ |""".stripMargin)
+ spark.sql(s"alter table $tableName rename column col9 to `爱好_Best`")
+
+ // update current table to produce log files for mor
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1,3,'李明', '读书', 100,180.0001,99.0001,DATE'2021-12-26',
DATE'2021-12-26')
+ |""".stripMargin)
+
+ // alter date to string
+ spark.sql(s"alter table $tableName alter column `上次更新时间` type string
")
+ checkAnswer(spark.sql(s"select `上次更新时间` from $tableName").collect())(
+ Seq("2021-12-26")
+ )
+ // alter string to date
+ spark.sql(s"alter table $tableName alter column `上次更新时间` type date ")
+ spark.sql(s"select `上次更新时间` from $tableName").collect()
+ checkAnswer(spark.sql(s"select `上次更新时间` from $tableName").collect())(
+ Seq(java.sql.Date.valueOf("2021-12-26"))
+ )
+ }
+
spark.sessionState.conf.unsetConf(DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key)
}
-
spark.sessionState.conf.unsetConf(DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION.key)
})
}
@@ -585,61 +588,64 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
test("Test alter column by add rename and drop") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
- val tableName = generateTableName
- val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
- spark.sql("set hoodie.schema.on.read.enable=true")
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | location '$tablePath'
- | options (
- | type = '$tableType',
- | primaryKey = 'id',
- | preCombineField = 'ts'
- | )
+ withSparkSqlSessionConfig(
+ "hoodie.schema.on.read.enable" -> "true",
+ "hoodie.datasource.write.schema.allow.auto.evolution.column.drop" ->
"true") {
+ val tableName = generateTableName
+ val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '$tablePath'
+ | options (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
""".stripMargin)
- spark.sql(s"show create table ${tableName}").show(false)
- spark.sql(s"insert into ${tableName} values (1, 'jack', 0.9, 1000)")
- spark.sql(s"update ${tableName} set price = 1.9 where id = 1")
+ spark.sql(s"show create table ${tableName}").show(false)
+ spark.sql(s"insert into ${tableName} values (1, 'jack', 0.9, 1000)")
+ spark.sql(s"update ${tableName} set price = 1.9 where id = 1")
- spark.sql(s"alter table ${tableName} alter column id type long")
- checkAnswer(createTestResult(tableName))(
- Seq(1, "jack", 1.9, 1000)
- )
- // test add action, include position change
- spark.sql(s"alter table ${tableName} add columns(ext1 string comment
'add ext1' after name)")
- spark.sql(s"insert into ${tableName} values (2, 'jack', 'exx1', 0.9,
1000)")
- checkAnswer(createTestResult(tableName))(
- Seq(1, "jack", null, 1.9, 1000), Seq(2, "jack","exx1", 0.9, 1000)
- )
- // test rename
- spark.sql(s"alter table ${tableName} rename column price to newprice")
- checkAnswer(createTestResult(tableName))(
- Seq(1, "jack", null, 1.9, 1000), Seq(2, "jack","exx1", 0.9, 1000)
- )
- spark.sql(s"update ${tableName} set ext1 = 'haha' where id = 1 ")
- checkAnswer(createTestResult(tableName))(
- Seq(1, "jack", "haha", 1.9, 1000), Seq(2, "jack","exx1", 0.9, 1000)
- )
- var maxColumnId = getMaxColumnId(tablePath)
- // drop column newprice
- spark.sql(s"alter table ${tableName} drop column newprice")
- checkAnswer(createTestResult(tableName))(
- Seq(1, "jack", "haha", 1000), Seq(2, "jack","exx1", 1000)
- )
- validateInternalSchema(tablePath, isDropColumn = true,
currentMaxColumnId = maxColumnId)
- maxColumnId = getMaxColumnId(tablePath)
- // add newprice back
- spark.sql(s"alter table ${tableName} add columns(newprice string
comment 'add newprice back' after ext1)")
- checkAnswer(createTestResult(tableName))(
- Seq(1, "jack", "haha", null, 1000), Seq(2, "jack","exx1", null, 1000)
- )
- validateInternalSchema(tablePath, isDropColumn = false,
currentMaxColumnId = maxColumnId)
+ spark.sql(s"alter table ${tableName} alter column id type long")
+ checkAnswer(createTestResult(tableName))(
+ Seq(1, "jack", 1.9, 1000)
+ )
+ // test add action, include position change
+ spark.sql(s"alter table ${tableName} add columns(ext1 string comment
'add ext1' after name)")
+ spark.sql(s"insert into ${tableName} values (2, 'jack', 'exx1', 0.9,
1000)")
+ checkAnswer(createTestResult(tableName))(
+ Seq(1, "jack", null, 1.9, 1000), Seq(2, "jack", "exx1", 0.9, 1000)
+ )
+ // test rename
+ spark.sql(s"alter table ${tableName} rename column price to
newprice")
+ checkAnswer(createTestResult(tableName))(
+ Seq(1, "jack", null, 1.9, 1000), Seq(2, "jack", "exx1", 0.9, 1000)
+ )
+ spark.sql(s"update ${tableName} set ext1 = 'haha' where id = 1 ")
+ checkAnswer(createTestResult(tableName))(
+ Seq(1, "jack", "haha", 1.9, 1000), Seq(2, "jack", "exx1", 0.9,
1000)
+ )
+ var maxColumnId = getMaxColumnId(tablePath)
+ // drop column newprice
+ spark.sql(s"alter table ${tableName} drop column newprice")
+ checkAnswer(createTestResult(tableName))(
+ Seq(1, "jack", "haha", 1000), Seq(2, "jack", "exx1", 1000)
+ )
+ validateInternalSchema(tablePath, isDropColumn = true,
currentMaxColumnId = maxColumnId)
+ maxColumnId = getMaxColumnId(tablePath)
+ // add newprice back
+ spark.sql(s"alter table ${tableName} add columns(newprice string
comment 'add newprice back' after ext1)")
+ checkAnswer(createTestResult(tableName))(
+ Seq(1, "jack", "haha", null, 1000), Seq(2, "jack", "exx1", null,
1000)
+ )
+ validateInternalSchema(tablePath, isDropColumn = false,
currentMaxColumnId = maxColumnId)
+ }
}
}
}
@@ -691,35 +697,38 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
test("Test alter column multiple times") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
- val tableName = generateTableName
- val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
- spark.sql("set hoodie.schema.on.read.enable=true")
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | col1 string,
- | col2 string,
- | ts long
- |) using hudi
- | location '$tablePath'
- | options (
- | type = '$tableType',
- | primaryKey = 'id',
- | preCombineField = 'ts'
- | )
+ withSparkSqlSessionConfig(
+ "hoodie.schema.on.read.enable" -> "true",
+ "hoodie.datasource.write.schema.allow.auto.evolution.column.drop" ->
"true") {
+ val tableName = generateTableName
+ val tablePath = s"${new Path(tmp.getCanonicalPath,
tableName).toUri.toString}"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | col1 string,
+ | col2 string,
+ | ts long
+ |) using hudi
+ | location '$tablePath'
+ | options (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
""".stripMargin)
- spark.sql(s"show create table ${tableName}").show(false)
- spark.sql(s"insert into ${tableName} values (1, 'aaa', 'bbb', 1000)")
+ spark.sql(s"show create table ${tableName}").show(false)
+ spark.sql(s"insert into ${tableName} values (1, 'aaa', 'bbb', 1000)")
- // Rename to a previously existing column name + insert
- spark.sql(s"alter table ${tableName} drop column col1")
- spark.sql(s"alter table ${tableName} rename column col2 to col1")
+ // Rename to a previously existing column name + insert
+ spark.sql(s"alter table ${tableName} drop column col1")
+ spark.sql(s"alter table ${tableName} rename column col2 to col1")
- spark.sql(s"insert into ${tableName} values (2, 'aaa', 1000)")
- checkAnswer(spark.sql(s"select col1 from ${tableName} order by
id").collect())(
- Seq("bbb"), Seq("aaa")
- )
+ spark.sql(s"insert into ${tableName} values (2, 'aaa', 1000)")
+ checkAnswer(spark.sql(s"select col1 from ${tableName} order by
id").collect())(
+ Seq("bbb"), Seq("aaa")
+ )
+ }
}
}
}
@@ -727,6 +736,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
test("Test alter column with complex schema") {
withTempDir { tmp =>
withSQLConf(s"${DataSourceWriteOptions.SPARK_SQL_INSERT_INTO_OPERATION}"
-> "upsert",
+ "hoodie.datasource.write.schema.allow.auto.evolution.column.drop" ->
"true",
"hoodie.schema.on.read.enable" -> "true",
"spark.sql.parquet.enableNestedColumnVectorizedReader" -> "false") {
val tableName = generateTableName
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestTimeTravelTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestTimeTravelTable.scala
index 855bfdace0f5..e6ddc1ed724f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestTimeTravelTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestTimeTravelTable.scala
@@ -278,55 +278,58 @@ class TestTimeTravelTable extends HoodieSparkSqlTestBase {
test("Test Time Travel With Schema Evolution") {
withRecordType()(withTempDir { tmp =>
- spark.sql("set hoodie.schema.on.read.enable=true")
- val tableName = generateTableName
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price double,
- | ts long
- |) using hudi
- | tblproperties (
- | primaryKey = 'id',
- | preCombineField = 'ts'
- | )
- | location '${tmp.getCanonicalPath}/$tableName'
+ withSparkSqlSessionConfig("hoodie.schema.on.read.enable" -> "true",
+
"hoodie.datasource.write.schema.allow.auto.evolution.column.drop" -> "true") {
+ spark.sql("set hoodie.schema.on.read.enable=true")
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ | location '${tmp.getCanonicalPath}/$tableName'
""".stripMargin)
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
-
- val metaClient = createMetaClient(spark,
s"${tmp.getCanonicalPath}/$tableName")
- val instant1 = metaClient.reloadActiveTimeline().getAllCommitsTimeline
- .lastInstant().get().requestedTime
-
- // add column
- spark.sql(s"alter table $tableName add columns (company string)")
- spark.sql(s"insert into $tableName values(2, 'a2', 11, 1100, 'hudi')")
- val instant2 = metaClient.reloadActiveTimeline().getAllCommitsTimeline
- .lastInstant().get().requestedTime
-
- // drop column
- spark.sql(s"alter table $tableName drop column price")
-
- val result1 = spark.sql(s"select * from ${tableName} timestamp as of
$instant1 order by id")
- .drop("_hoodie_commit_time", "_hoodie_commit_seqno",
"_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
- checkAnswer(result1)(Seq(1, "a1", 10.0, 1000))
-
- val result2 = spark.sql(s"select * from ${tableName} timestamp as of
$instant2 order by id")
- .drop("_hoodie_commit_time", "_hoodie_commit_seqno",
"_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
- checkAnswer(result2)(
- Seq(1, "a1", 10.0, 1000, null),
- Seq(2, "a2", 11.0, 1100, "hudi")
- )
-
- val result3 = spark.sql(s"select * from ${tableName} order by id")
- .drop("_hoodie_commit_time", "_hoodie_commit_seqno",
"_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
- checkAnswer(result3)(
- Seq(1, "a1", 1000, null),
- Seq(2, "a2", 1100, "hudi")
- )
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+
+ val metaClient = createMetaClient(spark,
s"${tmp.getCanonicalPath}/$tableName")
+ val instant1 = metaClient.reloadActiveTimeline().getAllCommitsTimeline
+ .lastInstant().get().requestedTime
+
+ // add column
+ spark.sql(s"alter table $tableName add columns (company string)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 11, 1100, 'hudi')")
+ val instant2 = metaClient.reloadActiveTimeline().getAllCommitsTimeline
+ .lastInstant().get().requestedTime
+
+ // drop column
+ spark.sql(s"alter table $tableName drop column price")
+
+ val result1 = spark.sql(s"select * from ${tableName} timestamp as of
$instant1 order by id")
+ .drop("_hoodie_commit_time", "_hoodie_commit_seqno",
"_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
+ checkAnswer(result1)(Seq(1, "a1", 10.0, 1000))
+
+ val result2 = spark.sql(s"select * from ${tableName} timestamp as of
$instant2 order by id")
+ .drop("_hoodie_commit_time", "_hoodie_commit_seqno",
"_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
+ checkAnswer(result2)(
+ Seq(1, "a1", 10.0, 1000, null),
+ Seq(2, "a2", 11.0, 1100, "hudi")
+ )
+
+ val result3 = spark.sql(s"select * from ${tableName} order by id")
+ .drop("_hoodie_commit_time", "_hoodie_commit_seqno",
"_hoodie_record_key", "_hoodie_partition_path", "_hoodie_file_name").collect()
+ checkAnswer(result3)(
+ Seq(1, "a1", 1000, null),
+ Seq(2, "a2", 1100, "hudi")
+ )
+ }
})
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
index 5e3a304a2640..a25a748dd953 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestSecondaryIndex.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.{DataSourceReadOptions,
DataSourceWriteOptions, HoodieSpa
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode}
import org.apache.hudi.common.model.WriteOperationType
-import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
+import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator,
HoodieTestUtils}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig,
HoodieWriteConfig}
@@ -35,7 +35,7 @@ import org.apache.hudi.storage.StoragePath
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertNotNull, assertTrue}
import java.util.concurrent.atomic.AtomicInteger
@@ -714,6 +714,22 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
String.format("%03d", new Integer(instantTime.incrementAndGet()))
}
+ private def validateFieldType(basePath: String, fieldName: String,
expectedType: String): Unit = {
+ val metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+ val schemaResolver = new TableSchemaResolver(metaClient)
+ val tableSchema = schemaResolver.getTableAvroSchema(false)
+ val field = tableSchema.getField(fieldName)
+ assertNotNull(field, s"$fieldName field should exist in table schema")
+ val fieldType = field.schema()
+ assertTrue(
+ fieldType.toString.contains(expectedType),
+ s"$fieldName field should be of type $expectedType, but got:
${fieldType.toString}"
+ )
+ }
+
private def createTempTableAndInsert(tableName: String, basePath: String) = {
spark.sql(
s"""
@@ -749,7 +765,6 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
)
}
-
/**
* Test secondary index with nullable columns covering comprehensive
scenarios for both COW and MOR:
* - Initial creation with null values
@@ -1078,4 +1093,95 @@ class TestSecondaryIndex extends HoodieSparkSqlTestBase {
.map(_.getString(0))
.filter(_.contains(SECONDARY_INDEX_RECORD_KEY_SEPARATOR))
}
+
+ test("Test Secondary Index with Schema Evolution - Column Type Change Should
Fail") {
+ import spark.implicits._
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ // Helper function to validate schema evolution exception
+ def validateSchemaEvolutionException(exception: Exception, columnName:
String, indexName: String): Unit = {
+ assertTrue(
+ exception.getMessage.contains("Failed upsert schema compatibility
check") ||
+ exception.getMessage.contains(s"Column '$columnName' has secondary
index '$indexName'") ||
+ (exception.getCause != null &&
+ exception.getCause.getMessage.contains(s"Column '$columnName' has
secondary index '$indexName' and cannot evolve")),
+ s"Got unexpected exception message: ${exception.getMessage}"
+ )
+ }
+
+ // Create table with initial schema where quantity1 and quantity2 are int
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | quantity1 int,
+ | quantity2 int,
+ | ts long
+ |) using hudi
+ | options (
+ | primaryKey ='id',
+ | type = 'cow',
+ | preCombineField = 'ts',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.metadata.record.index.enable = 'true'
+ | )
+ | location '$basePath'
+ """.stripMargin)
+
+ // Insert initial data with integer quantities
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 100, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 20, 200, 1001)")
+
+ // Create secondary indexes on both quantity columns
+ spark.sql(s"create index idx_quantity1 on $tableName (quantity1)")
+ spark.sql(s"create index idx_quantity2 on $tableName (quantity2)")
+ spark.sql(s"set hoodie.schema.on.read.enable = true")
+
+ // Verify indexes exist
+ checkAnswer(s"show indexes from default.$tableName")(
+ Seq("column_stats", "column_stats", ""),
+ Seq("record_index", "record_index", ""),
+ Seq("secondary_index_idx_quantity1", "secondary_index", "quantity1"),
+ Seq("secondary_index_idx_quantity2", "secondary_index", "quantity2")
+ )
+
+ // Validate that both quantity fields are initially int type
+ validateFieldType(basePath, "quantity1", "int")
+ validateFieldType(basePath, "quantity2", "int")
+
+ // Try SQL ALTER on quantity2 - should fail because of secondary index
+ val caughtAlterException = intercept[Exception] {
+ spark.sql(s"ALTER TABLE $tableName ALTER COLUMN quantity2 TYPE double")
+ }
+ validateSchemaEvolutionException(caughtAlterException, "quantity2",
"secondary_index_idx_quantity2")
+
+ // Try data source write evolution on quantity1 - should fail because of
secondary index
+ val caughtDSWriteException = intercept[Exception] {
+ val evolvedData = Seq(
+ (3, "a3", 30.5, 300, 1002L) // Note: quantity1 is now double (30.5)
instead of int
+ ).toDF("id", "name", "quantity1", "quantity2", "ts")
+
+ evolvedData.write
+ .format("hudi")
+ .option("hoodie.table.name", tableName)
+ .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
+ .option("hoodie.datasource.write.recordkey.field", "id")
+ .option("hoodie.datasource.write.precombine.field", "ts")
+ .option("hoodie.datasource.write.operation", "upsert")
+ .option("hoodie.schema.on.read.enable", "true")
+ .mode("append")
+ .save(basePath)
+ }
+ validateSchemaEvolutionException(caughtDSWriteException, "quantity1",
"secondary_index_idx_quantity1")
+
+ // Verify that the original data is still intact
+ checkAnswer(s"SELECT id, name, quantity1, quantity2, ts FROM $tableName
ORDER BY id")(
+ Seq(1, "a1", 10, 100, 1000),
+ Seq(2, "a2", 20, 200, 1001)
+ )
+ }
+ }
}