This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit b3fa99685ed923045bf966b0fc5b273a881e157a Author: Sagar Sumit <[email protected]> AuthorDate: Wed Feb 1 18:13:25 2023 +0530 [HUDI-5646] Guard dropping columns by a config, do not allow by default (#7787) * [HUDI-5646] Guard dropping columns by a config, do not allow by default * Replaced superfluous `isSchemaCompatible` override by explicitly specifying whether column drop should be allowed; * Revisited `HoodieSparkSqlWriter` to avoid (unnecessary) schema handling for delete operations * Remove meta-fields from latest table schema during analysis * Disable schema validation when partition columns are dropped --------- Co-authored-by: Alexey Kudinkin <[email protected]> Co-authored-by: sivabalan <[email protected]> --- .../org/apache/hudi/config/HoodieWriteConfig.java | 18 +++ .../java/org/apache/hudi/table/HoodieTable.java | 12 +- .../hudi/client/TestTableSchemaEvolution.java | 125 +++++++++++++-------- .../hudi/testutils/HoodieClientTestUtils.java | 19 +++- .../java/org/apache/hudi/avro/AvroSchemaUtils.java | 23 +++- .../hudi/common/table/TableSchemaResolver.java | 36 ------ .../org/apache/hudi/common/util/ParquetUtils.java | 34 +++--- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 68 +++++------ .../AlterHoodieTableChangeColumnCommand.scala | 2 +- .../hudi/TestAvroSchemaResolutionSupport.scala | 19 +++- .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 49 +++++--- .../hudi/functional/TestBasicSchemaEvolution.scala | 21 +++- .../hudi/functional/TestCOWDataSourceStorage.scala | 10 +- .../hudi/functional/TestColumnStatsIndex.scala | 7 +- 14 files changed, 268 insertions(+), 175 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index e187fff4483..e6525a2b1dc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -227,6 +227,15 @@ public class HoodieWriteConfig extends HoodieConfig { .defaultValue("true") .withDocumentation("Validate the schema used for the write against the latest schema, for backwards compatibility."); + public static final ConfigProperty<String> SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP = ConfigProperty + .key("hoodie.datasource.write.schema.allow.auto.evolution.column.drop") + .defaultValue("false") + .sinceVersion("0.13.0") + .withDocumentation("Controls whether table's schema is allowed to automatically evolve when " + + "incoming batch's schema can have any of the columns dropped. By default, Hudi will not " + + "allow this kind of (auto) schema evolution. Set this config to true to allow table's " + + "schema to be updated automatically when columns are dropped from the new incoming batch."); + public static final ConfigProperty<String> INSERT_PARALLELISM_VALUE = ConfigProperty .key("hoodie.insert.shuffle.parallelism") .defaultValue("0") @@ -1086,6 +1095,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(AVRO_SCHEMA_VALIDATE_ENABLE); } + public boolean shouldAllowAutoEvolutionColumnDrop() { + return getBooleanOrDefault(SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP); + } + public String getTableName() { return getString(TBL_NAME); } @@ -2451,6 +2464,11 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withAllowAutoEvolutionColumnDrop(boolean shouldAllowDroppedColumns) { + writeConfig.setValue(SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP, String.valueOf(shouldAllowDroppedColumns)); + return this; + } + public Builder forTable(String tableName) { writeConfig.setValue(TBL_NAME, tableName); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index af8d8d23261..591ebc430dc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -800,7 +800,7 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { */ private void validateSchema() throws HoodieUpsertException, HoodieInsertException { - if (!config.shouldValidateAvroSchema() || getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) { + if (!shouldValidateAvroSchema() || getActiveTimeline().getCommitsTimeline().filterCompletedInstants().empty()) { // Check not required return; } @@ -812,7 +812,7 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { TableSchemaResolver schemaResolver = new TableSchemaResolver(getMetaClient()); writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema()); tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaResolver.getTableAvroSchemaWithoutMetadataFields()); - isValid = isSchemaCompatible(tableSchema, writerSchema); + isValid = isSchemaCompatible(tableSchema, writerSchema, config.shouldAllowAutoEvolutionColumnDrop()); } catch (Exception e) { throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e); } @@ -1010,4 +1010,12 @@ public abstract class HoodieTable<T, I, K, O> implements Serializable { public Runnable getPreExecuteRunnable() { return Functions.noop(); } + + private boolean shouldValidateAvroSchema() { + // TODO(HUDI-4772) re-enable validations in case partition columns + // being dropped from the data-file after fixing the write schema + Boolean shouldDropPartitionColumns = metaClient.getTableConfig().shouldDropPartitionColumns(); + + return config.shouldValidateAvroSchema() && !shouldDropPartitionColumns; + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java index f778c7cceac..686563ebfdd 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java @@ -18,8 +18,6 @@ package org.apache.hudi.client; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieAvroRecord; @@ -34,10 +32,17 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieInsertException; +import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.testutils.HoodieClientTestBase; import org.apache.hudi.testutils.HoodieClientTestUtils; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.List; @@ -79,36 +84,35 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { @Test public void testSchemaCompatibilityBasic() { - assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA), + assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA, false), "Same schema is compatible"); String reorderedSchema = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + TIP_NESTED_SCHEMA + FARE_NESTED_SCHEMA + MAP_TYPE_SCHEMA + TRIP_SCHEMA_SUFFIX; - assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, reorderedSchema), + assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, reorderedSchema, false), "Reordered fields are compatible"); - assertTrue(isSchemaCompatible(reorderedSchema, TRIP_EXAMPLE_SCHEMA), + assertTrue(isSchemaCompatible(reorderedSchema, TRIP_EXAMPLE_SCHEMA, false), "Reordered fields are compatible"); String renamedSchema = TRIP_EXAMPLE_SCHEMA.replace("tip_history", "tip_future"); - // NOTE: That even though renames could be carried over as "column drop" and "column add" - // both of which are legitimate operations, no data carry-over will occur (exactly b/c - // it's an old column being dropped, and the new one being added) - assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedSchema), + assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedSchema, false), + "Renaming fields is essentially: dropping old field, created a new one"); + assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedSchema, true), "Renaming fields is essentially: dropping old field, created a new one"); String renamedRecordSchema = TRIP_EXAMPLE_SCHEMA.replace("triprec", "triprec_renamed"); - assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedRecordSchema), + assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, renamedRecordSchema, false), "Renamed record name is not compatible"); String swappedFieldSchema = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA.replace("city_to_state", "fare") + FARE_NESTED_SCHEMA.replace("fare", "city_to_state") + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX; - assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, swappedFieldSchema), + assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, swappedFieldSchema, false), "Swapped fields are not compatible"); String typeChangeSchemaDisallowed = TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA.replace("string", "boolean") + TRIP_SCHEMA_SUFFIX; - assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, typeChangeSchemaDisallowed), + assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, typeChangeSchemaDisallowed, false), "Incompatible field type change is not allowed"); // Array of allowed schema field type transitions @@ -119,10 +123,10 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { for (String[] fieldChange : allowedFieldChanges) { String fromSchema = TRIP_SCHEMA_PREFIX + EXTRA_FIELD_SCHEMA.replace("string", fieldChange[0]) + TRIP_SCHEMA_SUFFIX; String toSchema = TRIP_SCHEMA_PREFIX + EXTRA_FIELD_SCHEMA.replace("string", fieldChange[1]) + TRIP_SCHEMA_SUFFIX; - assertTrue(isSchemaCompatible(fromSchema, toSchema), + assertTrue(isSchemaCompatible(fromSchema, toSchema, false), "Compatible field type change is not allowed"); if (!fieldChange[0].equals("byte") && fieldChange[1].equals("byte")) { - assertFalse(isSchemaCompatible(toSchema, fromSchema), + assertFalse(isSchemaCompatible(toSchema, fromSchema, false), "Incompatible field type change is allowed"); } } @@ -130,32 +134,31 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { // Names and aliases should match String fromSchema = TRIP_SCHEMA_PREFIX + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX; String toSchema = TRIP_SCHEMA_PREFIX.replace("triprec", "new_triprec") + EXTRA_FIELD_SCHEMA + TRIP_SCHEMA_SUFFIX; - assertFalse(isSchemaCompatible(fromSchema, toSchema), "Field names should match"); - assertFalse(isSchemaCompatible(toSchema, fromSchema), "Field names should match"); + assertFalse(isSchemaCompatible(fromSchema, toSchema, false), "Field names should match"); + assertFalse(isSchemaCompatible(toSchema, fromSchema, false), "Field names should match"); - assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED), + assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED, false), "Added field with default is compatible (Evolved Schema)"); String multipleAddedFieldSchema = TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + EXTRA_FIELD_SCHEMA + EXTRA_FIELD_SCHEMA.replace("new_field", "new_new_field") + TRIP_SCHEMA_SUFFIX; - assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, multipleAddedFieldSchema), + assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, multipleAddedFieldSchema, false), "Multiple added fields with defaults are compatible"); - assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, - TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA - + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + EXTRA_FIELD_WITHOUT_DEFAULT_SCHEMA + TRIP_SCHEMA_SUFFIX), + assertFalse(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + EXTRA_FIELD_WITHOUT_DEFAULT_SCHEMA + TRIP_SCHEMA_SUFFIX, false), "Added field without default and not nullable is not compatible (Evolved Schema)"); - assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, - TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA - + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX + EXTRA_FIELD_NULLABLE_SCHEMA), + assertTrue(isSchemaCompatible(TRIP_EXAMPLE_SCHEMA, TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX + EXTRA_FIELD_NULLABLE_SCHEMA, false), "Added nullable field is compatible (Evolved Schema)"); } - @Test - public void testMORTable() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testMORTable(boolean shouldAllowDroppedColumns) throws Exception { tableType = HoodieTableType.MERGE_ON_READ; // Create the table @@ -165,7 +168,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { .setTimelineLayoutVersion(VERSION_1) .initTable(metaClient.getHadoopConf(), metaClient.getBasePath()); - HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA); + HoodieWriteConfig hoodieWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA, shouldAllowDroppedColumns); SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig); // Initial inserts with TRIP_EXAMPLE_SCHEMA @@ -194,20 +197,26 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { checkReadRecords("000", numRecords); // Insert with evolved schema (column dropped) is allowed - HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED); + HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED, shouldAllowDroppedColumns); client = getHoodieWriteClient(hoodieDevolvedWriteConfig); final List<HoodieRecord> failedRecords = generateInsertsWithSchema("005", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED); // We cannot use insertBatch directly here because we want to insert records // with a evolved schema and insertBatch inserts records using the TRIP_EXAMPLE_SCHEMA. - writeBatch(client, "005", "004", Option.empty(), "003", numRecords, - (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, false, numRecords, 2 * numRecords, 5, false); + try { + writeBatch(client, "005", "004", Option.empty(), "003", numRecords, + (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, false, numRecords, 2 * numRecords, 5, false); + assertTrue(shouldAllowDroppedColumns); + } catch (HoodieInsertException e) { + assertFalse(shouldAllowDroppedColumns); + return; + } - // Update with evolved schema (column dropped) is allowed + // Update with evolved schema (column dropped) might be allowed depending on config set. updateBatch(hoodieDevolvedWriteConfig, client, "006", "005", Option.empty(), initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, false, numUpdateRecords, 2 * numRecords, 0); // Insert with an evolved scheme is allowed - HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED); + HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED, shouldAllowDroppedColumns); client = getHoodieWriteClient(hoodieEvolvedWriteConfig); // We cannot use insertBatch directly here because we want to insert records @@ -230,19 +239,28 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { // Now try updating w/ the original schema (should succeed) client = getHoodieWriteClient(hoodieWriteConfig); - updateBatch(hoodieWriteConfig, client, "009", "008", Option.empty(), - initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, false, numUpdateRecords, 4 * numRecords, 9); + try { + updateBatch(hoodieWriteConfig, client, "009", "008", Option.empty(), + initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, false, numUpdateRecords, 4 * numRecords, 9); + assertTrue(shouldAllowDroppedColumns); + } catch (HoodieUpsertException e) { + assertFalse(shouldAllowDroppedColumns); + } } - @Test - public void testCopyOnWriteTable() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testCopyOnWriteTable(boolean shouldAllowDroppedColumns) throws Exception { // Create the table HoodieTableMetaClient.withPropertyBuilder() .fromMetaClient(metaClient) .setTimelineLayoutVersion(VERSION_1) .initTable(metaClient.getHadoopConf(), metaClient.getBasePath()); - HoodieWriteConfig hoodieWriteConfig = getWriteConfigBuilder(TRIP_EXAMPLE_SCHEMA).withRollbackUsingMarkers(false).build(); + HoodieWriteConfig hoodieWriteConfig = getWriteConfigBuilder(TRIP_EXAMPLE_SCHEMA) + .withRollbackUsingMarkers(false) + .withAllowAutoEvolutionColumnDrop(shouldAllowDroppedColumns) + .build(); SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig); // Initial inserts with TRIP_EXAMPLE_SCHEMA @@ -266,11 +284,17 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { checkReadRecords("000", numRecords); // Inserting records w/ new evolved schema (w/ tip column dropped) - HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED); + HoodieWriteConfig hoodieDevolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED, shouldAllowDroppedColumns); client = getHoodieWriteClient(hoodieDevolvedWriteConfig); final List<HoodieRecord> failedRecords = generateInsertsWithSchema("004", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_DROPPED); - writeBatch(client, "004", "003", Option.empty(), "003", numRecords, - (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, true, numRecords, numRecords * 2, 1, false); + try { + writeBatch(client, "004", "003", Option.empty(), "003", numRecords, + (String s, Integer a) -> failedRecords, SparkRDDWriteClient::insert, true, numRecords, numRecords * 2, 1, false); + assertTrue(shouldAllowDroppedColumns); + } catch (HoodieInsertException e) { + assertFalse(shouldAllowDroppedColumns); + return; + } // Updating records w/ new evolved schema updateBatch(hoodieDevolvedWriteConfig, client, "005", "004", Option.empty(), @@ -278,7 +302,7 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { numUpdateRecords, 2 * numRecords, 5); // Inserting with evolved schema is allowed - HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED); + HoodieWriteConfig hoodieEvolvedWriteConfig = getWriteConfig(TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED, shouldAllowDroppedColumns); client = getHoodieWriteClient(hoodieEvolvedWriteConfig); final List<HoodieRecord> evolvedRecords = generateInsertsWithSchema("006", numRecords, TRIP_EXAMPLE_SCHEMA_EVOLVED_COL_ADDED); // We cannot use insertBatch directly here because we want to insert records @@ -299,9 +323,14 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { // Now try updating w/ the original schema (should succeed) client = getHoodieWriteClient(hoodieWriteConfig); - updateBatch(hoodieWriteConfig, client, "008", "007", Option.empty(), - initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, true, - numUpdateRecords, 3 * numRecords, 8); + try { + updateBatch(hoodieWriteConfig, client, "008", "007", Option.empty(), + initCommitTime, numUpdateRecords, SparkRDDWriteClient::upsert, false, true, + numUpdateRecords, 3 * numRecords, 8); + assertTrue(shouldAllowDroppedColumns); + } catch (HoodieUpsertException e) { + assertFalse(shouldAllowDroppedColumns); + } } private void checkReadRecords(String instantTime, int numExpectedRecords) throws IOException { @@ -362,8 +391,8 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { }).collect(Collectors.toList()); } - private HoodieWriteConfig getWriteConfig(String schema) { - return getWriteConfigBuilder(schema).build(); + private HoodieWriteConfig getWriteConfig(String schema, boolean shouldAllowDroppedColumns) { + return getWriteConfigBuilder(schema).withAllowAutoEvolutionColumnDrop(shouldAllowDroppedColumns).build(); } private HoodieWriteConfig.Builder getWriteConfigBuilder(String schema) { @@ -373,8 +402,8 @@ public class TestTableSchemaEvolution extends HoodieClientTestBase { .withAvroSchemaValidate(true); } - private static boolean isSchemaCompatible(String oldSchema, String newSchema) { - return AvroSchemaUtils.isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema)); + private static boolean isSchemaCompatible(String oldSchema, String newSchema, boolean shouldAllowDroppedColumns) { + return AvroSchemaUtils.isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema), shouldAllowDroppedColumns); } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java index 609fdb0bd5c..09447965b2c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestUtils.java @@ -37,6 +37,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.storage.HoodieHFileUtils; @@ -96,11 +97,17 @@ public class HoodieClientTestUtils { .setMaster("local[4]") .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") - .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") .set("spark.sql.shuffle.partitions", "4") .set("spark.default.parallelism", "4"); - if (HoodieSparkUtils.gteqSpark3_2()) { + // NOTE: This utility is used in modules where this class might not be present, therefore + // to avoid littering output w/ [[ClassNotFoundException]]s we will skip adding it + // in case this utility is used in the module not providing it + if (canLoadClass("org.apache.spark.sql.hudi.HoodieSparkSessionExtension")) { + sparkConf.set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"); + } + + if (canLoadClass("org.apache.spark.sql.hudi.catalog.HoodieCatalog") && HoodieSparkUtils.gteqSpark3_2()) { sparkConf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog"); } @@ -326,4 +333,12 @@ public class HoodieClientTestUtils { throw new HoodieException("Failed to read schema from commit metadata", e); } } + + private static boolean canLoadClass(String className) { + try { + return ReflectionUtils.getClass(className) != null; + } catch (Exception e) { + return false; + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java index 395fc100bf1..545acdcf309 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java @@ -20,6 +20,7 @@ package org.apache.hudi.avro; import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; +import org.apache.avro.SchemaCompatibility; import java.util.List; import java.util.Objects; @@ -36,10 +37,10 @@ public class AvroSchemaUtils { private AvroSchemaUtils() {} /** - * See {@link #isSchemaCompatible(Schema, Schema, boolean)} doc for more details + * See {@link #isSchemaCompatible(Schema, Schema, boolean, boolean)} doc for more details */ - public static boolean isSchemaCompatible(Schema prevSchema, Schema newSchema) { - return isSchemaCompatible(prevSchema, newSchema, true); + public static boolean isSchemaCompatible(Schema prevSchema, Schema newSchema, boolean allowProjection) { + return isSchemaCompatible(prevSchema, newSchema, true, allowProjection); } /** @@ -50,10 +51,22 @@ public class AvroSchemaUtils { * @param newSchema new instance of the schema * @param checkNaming controls whether schemas fully-qualified names should be checked */ - public static boolean isSchemaCompatible(Schema prevSchema, Schema newSchema, boolean checkNaming) { + public static boolean isSchemaCompatible(Schema prevSchema, Schema newSchema, boolean checkNaming, boolean allowProjection) { // NOTE: We're establishing compatibility of the {@code prevSchema} and {@code newSchema} // as following: {@code newSchema} is considered compatible to {@code prevSchema}, // iff data written using {@code prevSchema} could be read by {@code newSchema} + + // In case schema projection is not allowed, new schema has to have all the same fields as the + // old schema + if (!allowProjection) { + // Check that each field in the oldSchema can be populated in the newSchema + if (prevSchema.getFields().stream() + .map(oldSchemaField -> SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField)) + .anyMatch(Objects::isNull)) { + return false; + } + } + AvroSchemaCompatibility.SchemaPairCompatibility result = AvroSchemaCompatibility.checkReaderWriterCompatibility(newSchema, prevSchema, checkNaming); return result.getType() == AvroSchemaCompatibility.SchemaCompatibilityType.COMPATIBLE; @@ -88,7 +101,7 @@ public class AvroSchemaUtils { private static boolean isAtomicSchemasCompatible(Schema oneAtomicType, Schema anotherAtomicType) { // NOTE: Checking for compatibility of atomic types, we should ignore their // corresponding fully-qualified names (as irrelevant) - return isSchemaCompatible(oneAtomicType, anotherAtomicType, false); + return isSchemaCompatible(oneAtomicType, anotherAtomicType, false, true); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 4eddd6df031..03ed542bd60 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -18,7 +18,6 @@ package org.apache.hudi.common.table; -import org.apache.hudi.avro.AvroSchemaUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; @@ -31,7 +30,6 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.Functions.Function1; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; @@ -315,40 +313,6 @@ public class TableSchemaResolver { return Option.empty(); } - /** - * Get latest schema either from incoming schema or table schema. - * @param writeSchema incoming batch's write schema. - * @param convertTableSchemaToAddNamespace {@code true} if table schema needs to be converted. {@code false} otherwise. - * @param converterFn converter function to be called over table schema (to add namespace may be). Each caller can decide if any conversion is required. - * @return the latest schema. - * - * @deprecated will be removed (HUDI-4472) - */ - @Deprecated - public Schema getLatestSchema(Schema writeSchema, boolean convertTableSchemaToAddNamespace, - Function1<Schema, Schema> converterFn) { - Schema latestSchema = writeSchema; - try { - if (metaClient.isTimelineNonEmpty()) { - Schema tableSchema = getTableAvroSchemaWithoutMetadataFields(); - if (convertTableSchemaToAddNamespace && converterFn != null) { - tableSchema = converterFn.apply(tableSchema); - } - if (writeSchema.getFields().size() < tableSchema.getFields().size() && AvroSchemaUtils.isSchemaCompatible(writeSchema, tableSchema)) { - // if incoming schema is a subset (old schema) compared to table schema. For eg, one of the - // ingestion pipeline is still producing events in old schema - latestSchema = tableSchema; - LOG.debug("Using latest table schema to rewrite incoming records " + tableSchema.toString()); - } - } - } catch (IllegalArgumentException | InvalidTableException e) { - LOG.warn("Could not find any commits, falling back to using incoming batch's write schema"); - } catch (Exception e) { - LOG.warn("Unknown exception thrown " + e.getMessage() + ", Falling back to using incoming batch's write schema"); - } - return latestSchema; - } - private MessageType readSchemaFromParquetBaseFile(Path parquetFilePath) throws IOException { LOG.info("Reading schema from " + parquetFilePath); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index bc736090d5e..d0ef867a2d4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -37,6 +37,7 @@ import org.apache.log4j.Logger; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -314,20 +315,25 @@ public class ParquetUtils extends BaseFileUtils { .flatMap(blockMetaData -> blockMetaData.getColumns().stream() .filter(f -> cols.contains(f.getPath().toDotString())) - .map(columnChunkMetaData -> - HoodieColumnRangeMetadata.<Comparable>create( - parquetFilePath.getName(), - columnChunkMetaData.getPath().toDotString(), - convertToNativeJavaType( - columnChunkMetaData.getPrimitiveType(), - columnChunkMetaData.getStatistics().genericGetMin()), - convertToNativeJavaType( - columnChunkMetaData.getPrimitiveType(), - columnChunkMetaData.getStatistics().genericGetMax()), - columnChunkMetaData.getStatistics().getNumNulls(), - columnChunkMetaData.getValueCount(), - columnChunkMetaData.getTotalSize(), - columnChunkMetaData.getTotalUncompressedSize())) + .map(columnChunkMetaData -> { + Statistics stats = columnChunkMetaData.getStatistics(); + return HoodieColumnRangeMetadata.<Comparable>create( + parquetFilePath.getName(), + columnChunkMetaData.getPath().toDotString(), + convertToNativeJavaType( + columnChunkMetaData.getPrimitiveType(), + stats.genericGetMin()), + convertToNativeJavaType( + columnChunkMetaData.getPrimitiveType(), + stats.genericGetMax()), + // NOTE: In case when column contains only nulls Parquet won't be creating + // stats for it instead returning stubbed (empty) object. In that case + // we have to equate number of nulls to the value count ourselves + stats.isEmpty() ? columnChunkMetaData.getValueCount() : stats.getNumNulls(), + columnChunkMetaData.getValueCount(), + columnChunkMetaData.getTotalSize(), + columnChunkMetaData.getTotalUncompressedSize()); + }) ) .collect(groupingByCollector); diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 0092019dad5..764f9474ee0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -26,7 +26,8 @@ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption} import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.avro.AvroSchemaUtils.{isCompatibleProjectionOf, isSchemaCompatible} -import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils} +import org.apache.hudi.avro.HoodieAvroUtils +import org.apache.hudi.avro.HoodieAvroUtils.removeMetadataFields import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} import org.apache.hudi.common.config.{ConfigProperty, HoodieCommonConfig, HoodieConfig, HoodieMetadataConfig, TypedProperties} @@ -230,31 +231,6 @@ object HoodieSparkSqlWriter { } } - // NOTE: Target writer's schema is deduced based on - // - Source's schema - // - Existing table's schema (including its Hudi's [[InternalSchema]] representation) - val writerSchema = deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt, parameters) - - validateSchemaForHoodieIsDeleted(writerSchema) - - // NOTE: PLEASE READ CAREFULLY BEFORE CHANGING THIS - // We have to register w/ Kryo all of the Avro schemas that might potentially be used to decode - // records into Avro format. Otherwise, Kryo wouldn't be able to apply an optimization allowing - // it to avoid the need to ser/de the whole schema along _every_ Avro record - val targetAvroSchemas = sourceSchema +: writerSchema +: latestTableSchemaOpt.toSeq - registerAvroSchemasWithKryo(sparkContext, targetAvroSchemas: _*) - - log.info(s"Registered Avro schemas: ${targetAvroSchemas.map(_.toString(true)).mkString("\n")}") - - // Short-circuit if bulk_insert via row is enabled. - // scalastyle:off - if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation == WriteOperationType.BULK_INSERT) { - val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, hoodieConfig, df, tblName, - basePath, path, instantTime, writerSchema, tableConfig.isTablePartitioned) - return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) - } - // scalastyle:on - val (writeResult, writeClient: SparkRDDWriteClient[_]) = operation match { case WriteOperationType.DELETE => @@ -312,8 +288,24 @@ object HoodieSparkSqlWriter { client.startCommitWithTime(instantTime, commitActionType) val writeStatuses = DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, instantTime) (writeStatuses, client) + + // Here all other (than DELETE, DELETE_PARTITION) write operations are handled case _ => - // Here all other (than DELETE, DELETE_PARTITION) write operations are handled + // NOTE: Target writer's schema is deduced based on + // - Source's schema + // - Existing table's schema (including its Hudi's [[InternalSchema]] representation) + val writerSchema = deduceWriterSchema(sourceSchema, latestTableSchemaOpt, internalSchemaOpt, parameters) + + validateSchemaForHoodieIsDeleted(writerSchema) + + // Short-circuit if bulk_insert via row is enabled. + // scalastyle:off + if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) && operation == WriteOperationType.BULK_INSERT) { + val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, hoodieConfig, df, tblName, + basePath, path, instantTime, writerSchema, tableConfig.isTablePartitioned) + return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig) + } + // scalastyle:on // Check whether partition columns should be persisted w/in the data-files, or should // be instead omitted from them and simply encoded into the partition path (which is Spark's @@ -404,7 +396,11 @@ object HoodieSparkSqlWriter { // writer's schema. No additional handling is required case None => sourceSchema // Otherwise, we need to make sure we reconcile incoming and latest table schemas - case Some(latestTableSchema) => + case Some(latestTableSchemaWithMetaFields) => + // NOTE: Meta-fields will be unconditionally injected by Hudi writing handles, for the sake of + // deducing proper writer schema we're stripping them to make sure we can perform proper + // analysis + val latestTableSchema = removeMetadataFields(latestTableSchemaWithMetaFields) // Before validating whether schemas are compatible, we need to "canonicalize" source's schema // relative to the table's one, by doing a (minor) reconciliation of the nullability constraints: // for ex, if in incoming schema column A is designated as non-null, but it's designated as nullable @@ -417,6 +413,9 @@ object HoodieSparkSqlWriter { sourceSchema } + val allowAutoEvolutionColumnDrop = opts.getOrDefault(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key, + HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.defaultValue).toBoolean + if (shouldReconcileSchema) { internalSchemaOpt match { case Some(internalSchema) => @@ -428,7 +427,8 @@ object HoodieSparkSqlWriter { case None => // In case schema reconciliation is enabled we will employ (legacy) reconciliation // strategy to produce target writer's schema (see definition below) - val (reconciledSchema, isCompatible) = reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema) + val (reconciledSchema, isCompatible) = + reconcileSchemasLegacy(latestTableSchema, canonicalizedSourceSchema) // NOTE: In some cases we need to relax constraint of incoming dataset's schema to be compatible // w/ the table's one and allow schemas to diverge. This is required in cases where @@ -455,7 +455,7 @@ object HoodieSparkSqlWriter { // w/ the table's one and allow schemas to diverge. This is required in cases where // partial updates will be performed (for ex, `MERGE INTO` Spark SQL statement) and as such // only incoming dataset's projection has to match the table's schema, and not the whole one - if (!shouldValidateSchemasCompatibility || AvroSchemaUtils.isSchemaCompatible(latestTableSchema, canonicalizedSourceSchema)) { + if (!shouldValidateSchemasCompatibility || isSchemaCompatible(latestTableSchema, canonicalizedSourceSchema, allowAutoEvolutionColumnDrop)) { canonicalizedSourceSchema } else { log.error( @@ -550,14 +550,18 @@ object HoodieSparkSqlWriter { // the other one (schema A contains schema B, if schema B is a projection of A). This enables us, // to always "extend" the schema during schema evolution and hence never lose the data (when, for ex // existing column is being dropped in a new batch) + // + // NOTE: By default Hudi doesn't allow automatic schema evolution to drop the columns from the target + // table. However, when schema reconciliation is turned on, we would allow columns to be dropped + // in the incoming batch (as these would be reconciled in anyway) if (isCompatibleProjectionOf(tableSchema, newSchema)) { // Picking table schema as a writer schema we need to validate that we'd be able to // rewrite incoming batch's data (written in new schema) into it - (tableSchema, isSchemaCompatible(newSchema, tableSchema)) + (tableSchema, isSchemaCompatible(newSchema, tableSchema, true)) } else { // Picking new schema as a writer schema we need to validate that we'd be able to // rewrite table's data into it - (newSchema, isSchemaCompatible(tableSchema, newSchema)) + (newSchema, isSchemaCompatible(tableSchema, newSchema, true)) } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala index dd2aae06bce..cb5a3f6fa75 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableChangeColumnCommand.scala @@ -97,7 +97,7 @@ case class AlterHoodieTableChangeColumnCommand( private def validateSchema(newSchema: Schema, metaClient: HoodieTableMetaClient): Unit = { val schemaUtil = new TableSchemaResolver(metaClient) val tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields) - if (!AvroSchemaUtils.isSchemaCompatible(tableSchema, newSchema)) { + if (!AvroSchemaUtils.isSchemaCompatible(tableSchema, newSchema, true)) { throw new HoodieException("Failed schema compatibility check for newSchema :" + newSchema + ", origin table schema :" + tableSchema + ", base path :" + metaClient.getBasePath) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala index ad476fb38f3..cd6396f08fb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroSchemaResolutionSupport.scala @@ -20,6 +20,7 @@ package org.apache.hudi import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.exception.SchemaCompatibilityException import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row, SparkSession} @@ -33,7 +34,7 @@ import scala.language.postfixOps * Test cases to validate Hudi's support for writing and reading when evolving schema implicitly via Avro's Schema Resolution * Note: Test will explicitly write into different partitions to ensure that a Hudi table will have multiple filegroups with different schemas. */ -class TestAvroSchemaResolutionSupport extends HoodieClientTestBase { +class TestAvroSchemaResolutionSupport extends HoodieClientTestBase with ScalaAssertionSupport { var spark: SparkSession = _ val commonOpts: Map[String, String] = Map( @@ -82,12 +83,13 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase { .save(saveDir) } - def upsertData(df: DataFrame, saveDir: String, isCow: Boolean = true): Unit = { - val opts = if (isCow) { + def upsertData(df: DataFrame, saveDir: String, isCow: Boolean = true, shouldAllowDroppedColumns: Boolean = false): Unit = { + var opts = if (isCow) { commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE") } else { commonOpts ++ Map(DataSourceWriteOptions.TABLE_TYPE.key -> "MERGE_ON_READ") } + opts = opts ++ Map(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> shouldAllowDroppedColumns.toString) df.write.format("hudi") .options(opts) @@ -228,7 +230,11 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase { upsertDf.show(false) // upsert - upsertData(upsertDf, tempRecordPath, isCow) + assertThrows(classOf[SchemaCompatibilityException]) { + upsertData(upsertDf, tempRecordPath, isCow) + } + + upsertData(upsertDf, tempRecordPath, isCow, shouldAllowDroppedColumns = true) // read out the table val readDf = spark.read.format("hudi").load(tempRecordPath) @@ -776,7 +782,10 @@ class TestAvroSchemaResolutionSupport extends HoodieClientTestBase { df6 = df6.withColumn("userid", df6.col("userid").cast("float")) df6.printSchema() df6.show(false) - upsertData(df6, tempRecordPath) + assertThrows(classOf[SchemaCompatibilityException]) { + upsertData(df6, tempRecordPath) + } + upsertData(df6, tempRecordPath, shouldAllowDroppedColumns = true) // 7. Rearrange column position var df7 = Seq((7, "newcol1", 700, newPartition)).toDF("id", "newcol1", "userid", "name") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 657c7762c38..232883b71be 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -30,7 +30,7 @@ import org.apache.hudi.common.model._ import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.config.{HoodieBootstrapConfig, HoodieIndexConfig, HoodieWriteConfig} -import org.apache.hudi.exception.HoodieException +import org.apache.hudi.exception.{HoodieException, SchemaCompatibilityException} import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode import org.apache.hudi.functional.TestBootstrap import org.apache.hudi.keygen.{ComplexKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator} @@ -46,7 +46,7 @@ import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.Arguments.arguments -import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, ValueSource} +import org.junit.jupiter.params.provider.{Arguments, CsvSource, EnumSource, MethodSource, ValueSource} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{spy, times, verify} import org.scalatest.Assertions.assertThrows @@ -657,13 +657,21 @@ class TestHoodieSparkSqlWriter { * @param tableType Type of table */ @ParameterizedTest - @ValueSource(strings = Array("COPY_ON_WRITE", "MERGE_ON_READ")) - def testSchemaEvolutionForTableType(tableType: String): Unit = { + @CsvSource(value = Array( + "COPY_ON_WRITE,true", + "COPY_ON_WRITE,false", + "MERGE_ON_READ,true", + "MERGE_ON_READ,false" + )) + def testSchemaEvolutionForTableType(tableType: String, allowColumnDrop: Boolean): Unit = { + val opts = getCommonParams(tempPath, hoodieFooTableName, tableType) ++ Map( + HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> allowColumnDrop.toString + ) + // Create new table // NOTE: We disable Schema Reconciliation by default (such that Writer's // schema is favored over existing Table's schema) - val noReconciliationOpts = getCommonParams(tempPath, hoodieFooTableName, tableType) - .updated(DataSourceWriteOptions.RECONCILE_SCHEMA.key, "false") + val noReconciliationOpts = opts.updated(DataSourceWriteOptions.RECONCILE_SCHEMA.key, "false") // Generate 1st batch val schema = DataSourceTestUtils.getStructTypeExampleSchema @@ -748,22 +756,29 @@ class TestHoodieSparkSqlWriter { recordsSeq = convertRowListToSeq(records) val df5 = spark.createDataFrame(sc.parallelize(recordsSeq), structType) - HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, noReconciliationOpts, df5) - val snapshotDF5 = spark.read.format("org.apache.hudi") - .load(tempBasePath + "/*/*/*/*") + if (allowColumnDrop) { + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, noReconciliationOpts, df5) - assertEquals(35, snapshotDF5.count()) + val snapshotDF5 = spark.read.format("org.apache.hudi") + .load(tempBasePath + "/*/*/*/*") - assertEquals(df5.intersect(dropMetaFields(snapshotDF5)).except(df5).count, 0) + assertEquals(35, snapshotDF5.count()) - val fifthBatchActualSchema = fetchActualSchema() - val fifthBatchExpectedSchema = { - val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieFooTableName) - AvroConversionUtils.convertStructTypeToAvroSchema(df5.schema, structName, nameSpace) - } + assertEquals(df5.intersect(dropMetaFields(snapshotDF5)).except(df5).count, 0) + + val fifthBatchActualSchema = fetchActualSchema() + val fifthBatchExpectedSchema = { + val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(hoodieFooTableName) + AvroConversionUtils.convertStructTypeToAvroSchema(df5.schema, structName, nameSpace) + } - assertEquals(fifthBatchExpectedSchema, fifthBatchActualSchema) + assertEquals(fifthBatchExpectedSchema, fifthBatchActualSchema) + } else { + assertThrows[SchemaCompatibilityException] { + HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, noReconciliationOpts, df5) + } + } } /** diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala index bc0585a01e5..ccbd04a45b6 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala @@ -107,11 +107,11 @@ class TestBasicSchemaEvolution extends HoodieClientTestBase with ScalaAssertionS DataSourceWriteOptions.OPERATION.key -> opType ) - def appendData(schema: StructType, batch: Seq[Row]): Unit = { + def appendData(schema: StructType, batch: Seq[Row], shouldAllowDroppedColumns: Boolean = false): Unit = { HoodieUnsafeUtils.createDataFrameFromRows(spark, batch, schema) .write .format("org.apache.hudi") - .options(opts) + .options(opts ++ Map(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> shouldAllowDroppedColumns.toString)) .mode(SaveMode.Append) .save(basePath) } @@ -202,7 +202,8 @@ class TestBasicSchemaEvolution extends HoodieClientTestBase with ScalaAssertionS } // - // 3. Write 3d batch with another schema (w/ omitted a _nullable_ column `second_name`, expected to succeed) + // 3. Write 3d batch with another schema (w/ omitted a _nullable_ column `second_name`, expected to succeed if + // col drop is enabled) // val thirdSchema = StructType( @@ -217,7 +218,14 @@ class TestBasicSchemaEvolution extends HoodieClientTestBase with ScalaAssertionS Row("8", "Ron", "14", 1, 1), Row("9", "Germiona", "16", 1, 1)) - appendData(thirdSchema, thirdBatch) + if (shouldReconcileSchema) { + appendData(thirdSchema, thirdBatch) + } else { + assertThrows(classOf[SchemaCompatibilityException]) { + appendData(thirdSchema, thirdBatch) + } + appendData(thirdSchema, thirdBatch, shouldAllowDroppedColumns = true) + } val (tableSchemaAfterThirdBatch, rowsAfterThirdBatch) = loadTable() // NOTE: In case schema reconciliation is ENABLED, Hudi would prefer the table's schema over the new batch @@ -270,7 +278,10 @@ class TestBasicSchemaEvolution extends HoodieClientTestBase with ScalaAssertionS appendData(fourthSchema, fourthBatch) } } else { - appendData(fourthSchema, fourthBatch) + assertThrows(classOf[SchemaCompatibilityException]) { + appendData(fourthSchema, fourthBatch) + } + appendData(fourthSchema, fourthBatch, shouldAllowDroppedColumns = true) val (latestTableSchema, rows) = loadTable() assertEquals(fourthSchema, latestTableSchema) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala index 15b6751328c..cb807319d94 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala @@ -68,10 +68,12 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { "false|org.apache.hudi.keygen.TimestampBasedKeyGenerator|_row_key" ), delimiter = '|') def testCopyOnWriteStorage(isMetadataEnabled: Boolean, keyGenClass: String, recordKeys: String): Unit = { - var options: Map[String, String] = commonOpts + - (HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled)) + - (DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> keyGenClass) + - (DataSourceWriteOptions.RECORDKEY_FIELD.key() -> recordKeys) + var options: Map[String, String] = commonOpts ++ Map( + HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled), + DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key -> keyGenClass, + DataSourceWriteOptions.RECORDKEY_FIELD.key -> recordKeys, + HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key -> "true") + val isTimestampBasedKeyGen: Boolean = classOf[TimestampBasedKeyGenerator].getName.equals(keyGenClass) if (isTimestampBasedKeyGen) { options += DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "_row_key" diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala index 0d3b09b4c17..9e674db3541 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.{LocatedFileStatus, Path} import org.apache.hudi.ColumnStatsIndexSupport.composeIndexSchema import org.apache.hudi.DataSourceWriteOptions.{PRECOMBINE_FIELD, RECORDKEY_FIELD} import org.apache.hudi.HoodieConversionUtils.toProperties -import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieStorageConfig} +import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig, HoodieStorageConfig} import org.apache.hudi.common.model.HoodieTableType import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.ParquetUtils @@ -31,7 +31,6 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.functional.TestColumnStatsIndex.ColumnStatsTestCase import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceWriteOptions} - import org.apache.spark.sql._ import org.apache.spark.sql.functions.typedLit import org.apache.spark.sql.types._ @@ -42,7 +41,6 @@ import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource, V import java.math.BigInteger import java.sql.{Date, Timestamp} - import scala.collection.JavaConverters._ import scala.util.Random @@ -193,7 +191,8 @@ class TestColumnStatsIndex extends HoodieClientTestBase { HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", RECORDKEY_FIELD.key -> "c1", PRECOMBINE_FIELD.key -> "c1", - HoodieTableConfig.POPULATE_META_FIELDS.key -> "true" + HoodieTableConfig.POPULATE_META_FIELDS.key -> "true", + HoodieCommonConfig.RECONCILE_SCHEMA.key -> "true" ) ++ metadataOpts val sourceJSONTablePath = getClass.getClassLoader.getResource("index/colstats/input-table-json").toString
