This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 53583f12c30d3df6ab8cd05edd6237542907dc34 Author: Lin Liu <[email protected]> AuthorDate: Mon Nov 3 19:31:55 2025 -0800 fix: Avoid deleting metadata table with MOR during upgrade / downgrade (#14191) --------- Co-authored-by: Lokesh Jain <[email protected]> Co-authored-by: Lokesh Jain <[email protected]> Co-authored-by: Y Ethan Guo <[email protected]> Co-authored-by: sivabalan <[email protected]> --- .../hudi/table/upgrade/UpgradeDowngradeUtils.java | 103 +++++++++----- .../table/upgrade/TestUpgradeDowngradeUtils.java | 155 ++++++++++++++++++++- .../functional/TestNewHoodieParquetFileFormat.java | 2 +- .../hudi/table/upgrade/TestUpgradeDowngrade.java | 16 +++ .../hudi/functional/TestEightToNineUpgrade.scala | 2 +- .../TestUpgradeOrDowngradeProcedure.scala | 45 +++--- 6 files changed, 265 insertions(+), 58 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java index b7c51f31c5cf..9d08375cfebe 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java @@ -28,7 +28,8 @@ import org.apache.hudi.common.model.AWSDmsAvroPayload; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; import org.apache.hudi.common.model.EventTimeAvroPayload; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; -import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieIndexMetadata; import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.PartialUpdateAvroPayload; @@ -47,6 +48,7 @@ import org.apache.hudi.common.table.timeline.TimelineFactory; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieCompactionConfig; @@ -72,6 +74,7 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -84,6 +87,8 @@ import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMI public class UpgradeDowngradeUtils { private static final Logger LOG = LoggerFactory.getLogger(UpgradeDowngradeUtils.class); + static final String FALSE = "false"; + static final String TRUE = "true"; // Map of actions that were renamed in table version 8 static final Map<String, String> SIX_TO_EIGHT_TIMELINE_ACTION_MAP = CollectionUtils.createImmutableMap( @@ -100,36 +105,6 @@ public class UpgradeDowngradeUtils { PartialUpdateAvroPayload.class.getName(), PostgresDebeziumAvroPayload.class.getName())); - /** - * Utility method to run compaction for MOR table as part of downgrade step. - * - * @Deprecated Use {@link UpgradeDowngradeUtils#rollbackFailedWritesAndCompact(HoodieTable, HoodieEngineContext, HoodieWriteConfig, SupportsUpgradeDowngrade, boolean, HoodieTableVersion)} instead. - */ - public static void runCompaction(HoodieTable table, HoodieEngineContext context, HoodieWriteConfig config, - SupportsUpgradeDowngrade upgradeDowngradeHelper) { - try { - if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { - // set required configs for scheduling compaction. - HoodieInstantTimeGenerator.setCommitTimeZone(table.getMetaClient().getTableConfig().getTimelineTimezone()); - HoodieWriteConfig compactionConfig = HoodieWriteConfig.newBuilder().withProps(config.getProps()).build(); - compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(), "true"); - compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1"); - compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key(), CompactionTriggerStrategy.NUM_COMMITS.name()); - compactionConfig.setValue(HoodieCompactionConfig.COMPACTION_STRATEGY.key(), UnBoundedCompactionStrategy.class.getName()); - compactionConfig.setValue(HoodieMetadataConfig.ENABLE.key(), "false"); - try (BaseHoodieWriteClient writeClient = upgradeDowngradeHelper.getWriteClient(compactionConfig, context)) { - Option<String> compactionInstantOpt = writeClient.scheduleCompaction(Option.empty()); - if (compactionInstantOpt.isPresent()) { - HoodieWriteMetadata result = writeClient.compact(compactionInstantOpt.get()); - writeClient.commitCompaction(compactionInstantOpt.get(), result, Option.of(table)); - } - } - } - } catch (Exception e) { - throw new HoodieException(e); - } - } - /** * See HUDI-6040. */ @@ -214,6 +189,13 @@ public class UpgradeDowngradeUtils { if (table.isMetadataTable() && tableVersion.equals(HoodieTableVersion.NINE)) { properties.put(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.SINGLE_WRITER.name()); } + + // Set properties based on existing and inflight metadata partitions. + Set<String> metadataPartitions = table.getMetaClient().getTableConfig().getMetadataPartitions(); + metadataPartitions.addAll(table.getMetaClient().getTableConfig().getMetadataPartitionsInflight()); + setPropertiesBasedOnMetadataPartitions(properties, metadataPartitions, table); + + // Construct rollback config. HoodieWriteConfig rollbackWriteConfig = HoodieWriteConfig.newBuilder() .withProps(properties) .withWriteTableVersion(tableVersion.versionCode()) @@ -231,8 +213,8 @@ public class UpgradeDowngradeUtils { } else { rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(), "false"); } - rollbackWriteConfig.setValue(HoodieMetadataConfig.ENABLE.key(), "false"); + // Do the rollback and compact. try (BaseHoodieWriteClient writeClient = upgradeDowngradeHelper.getWriteClient(rollbackWriteConfig, context)) { writeClient.rollbackFailedWrites(table.getMetaClient()); if (shouldCompact) { @@ -248,12 +230,61 @@ public class UpgradeDowngradeUtils { } } + @VisibleForTesting + public static void setPropertiesBasedOnMetadataPartitions(TypedProperties properties, + Set<String> metadataPartitions, + HoodieTable table) { + if (metadataPartitions.isEmpty()) { + properties.put(HoodieMetadataConfig.ENABLE.key(), FALSE); + return; + } + // Read index definitions if any. + Option<HoodieIndexMetadata> indexMetadataOpt = table.getMetaClient().getIndexMetadata(); + Map<String, HoodieIndexDefinition> indexDefinitions = indexMetadataOpt.isEmpty() + ? Collections.emptyMap() + : indexMetadataOpt.get().getIndexDefinitions(); + // Enable metadata table. + properties.put(HoodieMetadataConfig.ENABLE.key(), TRUE); + // Enable individual index. + for (String partition : metadataPartitions) { + switch (partition) { + case HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS: + case HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS: + properties.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), TRUE); + if (indexDefinitions.containsKey(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)) { + List<String> sourceFields = indexDefinitions.get(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS).getSourceFields(); + if (!sourceFields.isEmpty()) { + properties.put(HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key(), String.join(",", sourceFields)); + } + } + break; + case HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS: + properties.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER.key(), TRUE); + break; + case HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX: + if (indexDefinitions.containsKey(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)) { + Map<String, String> options = indexDefinitions.get(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX).getIndexOptions(); + if (options.getOrDefault("isPartitioned", FALSE).equals(TRUE)) { + properties.put(HoodieMetadataConfig.PARTITIONED_RECORD_INDEX_ENABLE_PROP.key(), TRUE); + } else { + properties.put(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), TRUE); + } + } else { + properties.put(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), TRUE); + } + break; + default: + // No op. + } + } + } + // If the metadata table is enabled for the data table, and // existing metadata table is behind the data table, then delete it. static void checkAndHandleMetadataTable(HoodieEngineContext context, - HoodieTable table, - HoodieWriteConfig config, - HoodieTableMetaClient metaClient, boolean checkforMetadataLagging) { + HoodieTable table, + HoodieWriteConfig config, + HoodieTableMetaClient metaClient, boolean checkforMetadataLagging) { if (!table.isMetadataTable() && config.isMetadataTableEnabled() && (!checkforMetadataLagging || isMetadataTableBehindDataTable(config, metaClient))) { @@ -262,7 +293,7 @@ public class UpgradeDowngradeUtils { } static boolean isMetadataTableBehindDataTable(HoodieWriteConfig config, - HoodieTableMetaClient metaClient) { + HoodieTableMetaClient metaClient) { // if metadata table does not exist, then it is not behind if (!metaClient.getTableConfig().isMetadataTableAvailable()) { return false; diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeUtils.java index a90a57542b3f..1f1ec23b804e 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeUtils.java @@ -18,19 +18,51 @@ package org.apache.hudi.table.upgrade; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieIndexDefinition; +import org.apache.hudi.common.model.HoodieIndexMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.table.HoodieTable; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.junit.jupiter.MockitoExtension; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import static org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.FALSE; +import static org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.TRUE; import static org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.convertCompletionTimeToEpoch; +import static org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.setPropertiesBasedOnMetadataPartitions; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; -public class TestUpgradeDowngradeUtils { +@ExtendWith(MockitoExtension.class) +class TestUpgradeDowngradeUtils { + + private HoodieTable createMockTable(Option<HoodieIndexMetadata> indexMetadataOpt) { + HoodieTable table = mock(HoodieTable.class); + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + when(table.getMetaClient()).thenReturn(metaClient); + when(metaClient.getIndexMetadata()).thenReturn(indexMetadataOpt); + return table; + } @Test void testConvertCompletionTimeToEpoch() { @@ -53,4 +85,125 @@ public class TestUpgradeDowngradeUtils { assertEquals(-1, convertCompletionTimeToEpoch(inValidInstant), "Epoch time for invalid input should be -1."); } + + @Test + void testSetPropertiesBasedOnMetadataPartitionsWithEmptySet() { + TypedProperties properties = new TypedProperties(); + Set<String> emptyPartitions = new HashSet<>(); + + setPropertiesBasedOnMetadataPartitions(properties, emptyPartitions, null); + assertEquals(FALSE, properties.getString(HoodieMetadataConfig.ENABLE.key())); + } + + @Test + void testSetPropertiesBasedOnMetadataPartitionsWithBloomFilters() { + TypedProperties properties = new TypedProperties(); + Set<String> partitions = new HashSet<>(); + partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS); + HoodieTable table = createMockTable(Option.empty()); + + setPropertiesBasedOnMetadataPartitions(properties, partitions, table); + assertEquals(TRUE, properties.getString(HoodieMetadataConfig.ENABLE.key())); + assertEquals(TRUE, properties.getString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER.key())); + } + + @Test + void testSetPropertiesBasedOnMetadataPartitionsWithAnyOtherIndexes() { + TypedProperties properties = new TypedProperties(); + Set<String> partitions = new HashSet<>(); + partitions.add("any_other_index"); + HoodieTable table = createMockTable(Option.empty()); + + setPropertiesBasedOnMetadataPartitions(properties, partitions, table); + assertEquals(TRUE, properties.getString(HoodieMetadataConfig.ENABLE.key())); + } + + @Test + void testSetPropertiesBasedOnMetadataPartitionsWithMultiplePartitions() { + TypedProperties properties = new TypedProperties(); + Set<String> partitions = new HashSet<>(); + + partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS); + partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS); + partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX); + HoodieTable table = createMockTable(Option.empty()); + + setPropertiesBasedOnMetadataPartitions(properties, partitions, table); + assertEquals(TRUE, properties.getString(HoodieMetadataConfig.ENABLE.key())); + assertEquals(TRUE, properties.getString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key())); + assertEquals(TRUE, properties.getString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER.key())); + assertEquals(TRUE, properties.getString(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key())); + } + + @Test + void testSetPropertiesBasedOnMetadataPartitionsWithColumnStatsAndSourceFields() { + TypedProperties properties = new TypedProperties(); + Set<String> partitions = new HashSet<>(); + partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS); + + Map<String, HoodieIndexDefinition> indexDefinitions = new HashMap<>(); + List<String> sourceFields = Arrays.asList("field1", "field2", "field3"); + HoodieIndexDefinition columnStatsDef = HoodieIndexDefinition.newBuilder() + .withIndexName(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) + .withIndexType(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS) + .withSourceFields(sourceFields) + .build(); + indexDefinitions.put(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, columnStatsDef); + HoodieIndexMetadata indexMetadata = new HoodieIndexMetadata(indexDefinitions); + HoodieTable table = createMockTable(Option.of(indexMetadata)); + + setPropertiesBasedOnMetadataPartitions(properties, partitions, table); + assertEquals(TRUE, properties.getString(HoodieMetadataConfig.ENABLE.key())); + assertEquals(TRUE, properties.getString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key())); + assertEquals("field1,field2,field3", properties.getString(HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key())); + } + + @Test + void testSetPropertiesBasedOnMetadataPartitionsWithRecordIndexPartitioned() { + TypedProperties properties = new TypedProperties(); + Set<String> partitions = new HashSet<>(); + partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX); + + Map<String, HoodieIndexDefinition> indexDefinitions = new HashMap<>(); + Map<String, String> indexOptions = new HashMap<>(); + indexOptions.put("isPartitioned", TRUE); + HoodieIndexDefinition recordIndexDef = HoodieIndexDefinition.newBuilder() + .withIndexName(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) + .withIndexType(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) + .withIndexOptions(indexOptions) + .build(); + indexDefinitions.put(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX, recordIndexDef); + HoodieIndexMetadata indexMetadata = new HoodieIndexMetadata(indexDefinitions); + HoodieTable table = createMockTable(Option.of(indexMetadata)); + + setPropertiesBasedOnMetadataPartitions(properties, partitions, table); + assertEquals(TRUE, properties.getString(HoodieMetadataConfig.ENABLE.key())); + assertEquals(TRUE, properties.getString(HoodieMetadataConfig.PARTITIONED_RECORD_INDEX_ENABLE_PROP.key())); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testSetPropertiesBasedOnMetadataPartitionsWithRecordIndexNonPartitioned(boolean explicitlySetPartitionedInIndexDef) { + TypedProperties properties = new TypedProperties(); + Set<String> partitions = new HashSet<>(); + partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX); + + Map<String, HoodieIndexDefinition> indexDefinitions = new HashMap<>(); + Map<String, String> indexOptions = new HashMap<>(); + if (explicitlySetPartitionedInIndexDef) { + indexOptions.put("isPartitioned", FALSE); + } + HoodieIndexDefinition recordIndexDef = HoodieIndexDefinition.newBuilder() + .withIndexName(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) + .withIndexType(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX) + .withIndexOptions(indexOptions) + .build(); + indexDefinitions.put(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX, recordIndexDef); + HoodieIndexMetadata indexMetadata = new HoodieIndexMetadata(indexDefinitions); + HoodieTable table = createMockTable(Option.of(indexMetadata)); + + setPropertiesBasedOnMetadataPartitions(properties, partitions, table); + assertEquals(TRUE, properties.getString(HoodieMetadataConfig.ENABLE.key())); + assertEquals(TRUE, properties.getString(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key())); + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java index 0053c3f4a977..f1297643c72f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java @@ -53,7 +53,7 @@ public class TestNewHoodieParquetFileFormat extends TestBootstrapReadBase { @ParameterizedTest @MethodSource("testArgs") - public void testNewParquetFileFormat(HoodieTableType tableType, Integer nPartitions) { + void testNewParquetFileFormat(HoodieTableType tableType, Integer nPartitions) { this.bootstrapType = nPartitions == 0 ? "metadata" : "mixed"; this.dashPartitions = true; this.tableType = tableType; diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java index 032eda704eb7..6ce364a1557a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java @@ -41,6 +41,8 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.spark.api.java.JavaRDD; @@ -101,6 +103,20 @@ public class TestUpgradeDowngrade extends SparkClientFunctionalTestHarness { } } + @Test + public void testUpgradeDowngradeUtilsCompaction() throws Exception { + HoodieTableVersion originalVersion = HoodieTableVersion.EIGHT; + HoodieTableMetaClient originalMetaClient = loadFixtureTable(HoodieTableVersion.EIGHT, "-mor"); + HoodieWriteConfig config = createWriteConfig(originalMetaClient, false); + + int numCompactionInstants = originalMetaClient.getActiveTimeline().filterCompletedOrMajorOrMinorCompactionInstants().countInstants(); + HoodieTable table = HoodieSparkTable.create(config, context(), metaClient); + UpgradeDowngradeUtils.rollbackFailedWritesAndCompact(table, context(), config, SparkUpgradeDowngradeHelper.getInstance(), true, originalVersion); + originalMetaClient = HoodieTableMetaClient.reload(originalMetaClient); + assertEquals(numCompactionInstants + 1, originalMetaClient.getActiveTimeline().filterCompletedOrMajorOrMinorCompactionInstants().countInstants()); + assertTrue(originalMetaClient.getTableConfig().isMetadataTableAvailable()); + } + @Disabled @ParameterizedTest @MethodSource("upgradeDowngradeVersionPairs") diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala index 372adbc713a3..d100f35f70b4 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala @@ -129,7 +129,7 @@ class TestEightToNineUpgrade extends RecordLevelIndexTestBase { val payloadClass = classOf[MySqlDebeziumAvroPayload].getName var opts: Map[String, String] = Map( HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClass, - HoodieMetadataConfig.ENABLE.key() -> "false" + HoodieMetadataConfig.ENABLE.key() -> "true" ) val columns = Seq("ts", "key", "rider", "driver", DebeziumConstants.FLATTENED_FILE_COL_NAME, DebeziumConstants.FLATTENED_POS_COL_NAME, DebeziumConstants.ADDED_SEQ_COL_NAME, DebeziumConstants.FLATTENED_OP_COL_NAME) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala index c588b3f4afbb..05ff2514911b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala @@ -21,10 +21,12 @@ import org.apache.hudi.common.config.HoodieConfig import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, HoodieTableVersion} import org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR import org.apache.hudi.common.util.{BinaryUtil, ConfigUtils, StringUtils} +import org.apache.hudi.metadata.MetadataPartitionType import org.apache.hudi.storage.StoragePath import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.NAME_FORMAT_0_X +import org.junit.jupiter.api.Assertions.assertTrue import java.io.IOException import java.time.Instant @@ -148,7 +150,7 @@ class TestUpgradeOrDowngradeProcedure extends HoodieSparkProcedureTestBase { } } - test("Test downgrade table from version eight to version seven") { + test("Test downgrade table to version six") { withTempDir { tmp => val tableName = generateTableName val tablePath = s"${tmp.getCanonicalPath}/$tableName" @@ -169,32 +171,37 @@ class TestUpgradeOrDowngradeProcedure extends HoodieSparkProcedureTestBase { | ) """.stripMargin) + spark.sql("set hoodie.merge.small.file.group.candidates.limit=0") spark.sql("set hoodie.compact.inline=true") - spark.sql("set hoodie.compact.inline.max.delta.commits=1") + spark.sql("set hoodie.compact.inline.max.delta.commits=4") spark.sql("set hoodie.clean.commits.retained = 2") spark.sql("set hoodie.keep.min.commits = 3") spark.sql("set hoodie.keep.min.commits = 4") + spark.sql("set hoodie.metadata.record.index.enable = true") + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") - spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") - spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") - spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") - spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"update $tableName set name = 'a2' where id = 1") + spark.sql(s"update $tableName set name = 'a3' where id = 1") var metaClient = createMetaClient(spark, tablePath) - // verify hoodie.table.version of the table is EIGHT - if (metaClient.getTableConfig.getTableVersion.versionCode().equals(HoodieTableVersion.EIGHT.versionCode())) { - // downgrade table from version eight to version seven - checkAnswer(s"""call downgrade_table(table => '$tableName', to_version => 'SEVEN')""")(Seq(true)) - metaClient = HoodieTableMetaClient.reload(metaClient) - assertResult(HoodieTableVersion.SEVEN.versionCode) { - metaClient.getTableConfig.getTableVersion.versionCode() - } - // Verify whether the naming format of instant files is consistent with 0.x - metaClient.reloadActiveTimeline().getInstants.iterator().asScala.forall(f => NAME_FORMAT_0_X.matcher(INSTANT_FILE_NAME_GENERATOR.getFileName(f)).find()) - checkAnswer(s"select id, name, price, ts from $tableName")( - Seq(1, "a1", 10.0, 1000) - ) + val numCompactionInstants = metaClient.getActiveTimeline.filterCompletedOrMajorOrMinorCompactionInstants.countInstants + // Disabling record index should not affect downgrade + spark.sql("set hoodie.metadata.record.index.enable = false") + // downgrade table to version six + checkAnswer(s"""call downgrade_table(table => '$tableName', to_version => 'SIX')""")(Seq(true)) + metaClient = createMetaClient(spark, tablePath) + assertResult(numCompactionInstants + 1)(metaClient.getActiveTimeline.filterCompletedOrMajorOrMinorCompactionInstants.countInstants) + assertResult(HoodieTableVersion.SIX.versionCode) { + metaClient.getTableConfig.getTableVersion.versionCode() } + // Verify whether the naming format of instant files is consistent with 0.x + metaClient.reloadActiveTimeline().getInstants.iterator().asScala.forall(f => NAME_FORMAT_0_X.matcher(INSTANT_FILE_NAME_GENERATOR.getFileName(f)).find()) + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a3", 10.0, 1000) + ) + // Ensure files and record index partition are available after downgrade + assertTrue(metaClient.getTableConfig.isMetadataTableAvailable) + assertTrue(metaClient.getTableConfig.isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX)) } }
