This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 42066ee3af90 fix: Avoid deleting metadata table with MOR during
upgrade / downgrade (#14191)
42066ee3af90 is described below
commit 42066ee3af90c521c7c045fad166f09c612d6e6e
Author: Lin Liu <[email protected]>
AuthorDate: Mon Nov 3 19:31:55 2025 -0800
fix: Avoid deleting metadata table with MOR during upgrade / downgrade
(#14191)
---------
Co-authored-by: Lokesh Jain <[email protected]>
Co-authored-by: Lokesh Jain <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
Co-authored-by: sivabalan <[email protected]>
---
.../hudi/table/upgrade/UpgradeDowngradeUtils.java | 103 +++++++++-----
.../table/upgrade/TestUpgradeDowngradeUtils.java | 155 ++++++++++++++++++++-
.../functional/TestNewHoodieParquetFileFormat.java | 2 +-
.../hudi/table/upgrade/TestUpgradeDowngrade.java | 16 +++
.../hudi/functional/TestEightToNineUpgrade.scala | 2 +-
.../TestUpgradeOrDowngradeProcedure.scala | 45 +++---
6 files changed, 265 insertions(+), 58 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
index b7c51f31c5cf..9d08375cfebe 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
@@ -28,7 +28,8 @@ import org.apache.hudi.common.model.AWSDmsAvroPayload;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
-import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieIndexMetadata;
import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
@@ -47,6 +48,7 @@ import org.apache.hudi.common.table.timeline.TimelineFactory;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
@@ -72,6 +74,7 @@ import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -84,6 +87,8 @@ import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMI
public class UpgradeDowngradeUtils {
private static final Logger LOG =
LoggerFactory.getLogger(UpgradeDowngradeUtils.class);
+ static final String FALSE = "false";
+ static final String TRUE = "true";
// Map of actions that were renamed in table version 8
static final Map<String, String> SIX_TO_EIGHT_TIMELINE_ACTION_MAP =
CollectionUtils.createImmutableMap(
@@ -100,36 +105,6 @@ public class UpgradeDowngradeUtils {
PartialUpdateAvroPayload.class.getName(),
PostgresDebeziumAvroPayload.class.getName()));
- /**
- * Utility method to run compaction for MOR table as part of downgrade step.
- *
- * @Deprecated Use {@link
UpgradeDowngradeUtils#rollbackFailedWritesAndCompact(HoodieTable,
HoodieEngineContext, HoodieWriteConfig, SupportsUpgradeDowngrade, boolean,
HoodieTableVersion)} instead.
- */
- public static void runCompaction(HoodieTable table, HoodieEngineContext
context, HoodieWriteConfig config,
- SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
- try {
- if (table.getMetaClient().getTableType() ==
HoodieTableType.MERGE_ON_READ) {
- // set required configs for scheduling compaction.
-
HoodieInstantTimeGenerator.setCommitTimeZone(table.getMetaClient().getTableConfig().getTimelineTimezone());
- HoodieWriteConfig compactionConfig =
HoodieWriteConfig.newBuilder().withProps(config.getProps()).build();
- compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(),
"true");
-
compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"1");
-
compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key(),
CompactionTriggerStrategy.NUM_COMMITS.name());
-
compactionConfig.setValue(HoodieCompactionConfig.COMPACTION_STRATEGY.key(),
UnBoundedCompactionStrategy.class.getName());
- compactionConfig.setValue(HoodieMetadataConfig.ENABLE.key(), "false");
- try (BaseHoodieWriteClient writeClient =
upgradeDowngradeHelper.getWriteClient(compactionConfig, context)) {
- Option<String> compactionInstantOpt =
writeClient.scheduleCompaction(Option.empty());
- if (compactionInstantOpt.isPresent()) {
- HoodieWriteMetadata result =
writeClient.compact(compactionInstantOpt.get());
- writeClient.commitCompaction(compactionInstantOpt.get(), result,
Option.of(table));
- }
- }
- }
- } catch (Exception e) {
- throw new HoodieException(e);
- }
- }
-
/**
* See HUDI-6040.
*/
@@ -214,6 +189,13 @@ public class UpgradeDowngradeUtils {
if (table.isMetadataTable() &&
tableVersion.equals(HoodieTableVersion.NINE)) {
properties.put(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.SINGLE_WRITER.name());
}
+
+ // Set properties based on existing and inflight metadata partitions.
+ Set<String> metadataPartitions =
table.getMetaClient().getTableConfig().getMetadataPartitions();
+
metadataPartitions.addAll(table.getMetaClient().getTableConfig().getMetadataPartitionsInflight());
+ setPropertiesBasedOnMetadataPartitions(properties, metadataPartitions,
table);
+
+ // Construct rollback config.
HoodieWriteConfig rollbackWriteConfig = HoodieWriteConfig.newBuilder()
.withProps(properties)
.withWriteTableVersion(tableVersion.versionCode())
@@ -231,8 +213,8 @@ public class UpgradeDowngradeUtils {
} else {
rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(),
"false");
}
- rollbackWriteConfig.setValue(HoodieMetadataConfig.ENABLE.key(), "false");
+ // Do the rollback and compact.
try (BaseHoodieWriteClient writeClient =
upgradeDowngradeHelper.getWriteClient(rollbackWriteConfig, context)) {
writeClient.rollbackFailedWrites(table.getMetaClient());
if (shouldCompact) {
@@ -248,12 +230,61 @@ public class UpgradeDowngradeUtils {
}
}
+ @VisibleForTesting
+ public static void setPropertiesBasedOnMetadataPartitions(TypedProperties
properties,
+ Set<String>
metadataPartitions,
+ HoodieTable table) {
+ if (metadataPartitions.isEmpty()) {
+ properties.put(HoodieMetadataConfig.ENABLE.key(), FALSE);
+ return;
+ }
+ // Read index definitions if any.
+ Option<HoodieIndexMetadata> indexMetadataOpt =
table.getMetaClient().getIndexMetadata();
+ Map<String, HoodieIndexDefinition> indexDefinitions =
indexMetadataOpt.isEmpty()
+ ? Collections.emptyMap()
+ : indexMetadataOpt.get().getIndexDefinitions();
+ // Enable metadata table.
+ properties.put(HoodieMetadataConfig.ENABLE.key(), TRUE);
+ // Enable individual index.
+ for (String partition : metadataPartitions) {
+ switch (partition) {
+ case HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS:
+ case HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS:
+
properties.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(),
TRUE);
+ if
(indexDefinitions.containsKey(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS))
{
+ List<String> sourceFields =
indexDefinitions.get(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS).getSourceFields();
+ if (!sourceFields.isEmpty()) {
+
properties.put(HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key(),
String.join(",", sourceFields));
+ }
+ }
+ break;
+ case HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS:
+
properties.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER.key(),
TRUE);
+ break;
+ case HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX:
+ if
(indexDefinitions.containsKey(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX))
{
+ Map<String, String> options =
indexDefinitions.get(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX).getIndexOptions();
+ if (options.getOrDefault("isPartitioned", FALSE).equals(TRUE)) {
+
properties.put(HoodieMetadataConfig.PARTITIONED_RECORD_INDEX_ENABLE_PROP.key(),
TRUE);
+ } else {
+
properties.put(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), TRUE);
+ }
+ } else {
+
properties.put(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), TRUE);
+ }
+ break;
+ default:
+ // No op.
+ }
+ }
+ }
+
// If the metadata table is enabled for the data table, and
// existing metadata table is behind the data table, then delete it.
static void checkAndHandleMetadataTable(HoodieEngineContext context,
- HoodieTable table,
- HoodieWriteConfig config,
- HoodieTableMetaClient
metaClient, boolean checkforMetadataLagging) {
+ HoodieTable table,
+ HoodieWriteConfig config,
+ HoodieTableMetaClient metaClient,
boolean checkforMetadataLagging) {
if (!table.isMetadataTable()
&& config.isMetadataTableEnabled()
&& (!checkforMetadataLagging || isMetadataTableBehindDataTable(config,
metaClient))) {
@@ -262,7 +293,7 @@ public class UpgradeDowngradeUtils {
}
static boolean isMetadataTableBehindDataTable(HoodieWriteConfig config,
- HoodieTableMetaClient
metaClient) {
+ HoodieTableMetaClient
metaClient) {
// if metadata table does not exist, then it is not behind
if (!metaClient.getTableConfig().isMetadataTableAvailable()) {
return false;
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeUtils.java
index a90a57542b3f..1f1ec23b804e 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeUtils.java
@@ -18,19 +18,51 @@
package org.apache.hudi.table.upgrade;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.table.HoodieTable;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.junit.jupiter.MockitoExtension;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import static org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.FALSE;
+import static org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.TRUE;
import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.convertCompletionTimeToEpoch;
+import static
org.apache.hudi.table.upgrade.UpgradeDowngradeUtils.setPropertiesBasedOnMetadataPartitions;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
-public class TestUpgradeDowngradeUtils {
+@ExtendWith(MockitoExtension.class)
+class TestUpgradeDowngradeUtils {
+
+ private HoodieTable createMockTable(Option<HoodieIndexMetadata>
indexMetadataOpt) {
+ HoodieTable table = mock(HoodieTable.class);
+ HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+ when(table.getMetaClient()).thenReturn(metaClient);
+ when(metaClient.getIndexMetadata()).thenReturn(indexMetadataOpt);
+ return table;
+ }
@Test
void testConvertCompletionTimeToEpoch() {
@@ -53,4 +85,125 @@ public class TestUpgradeDowngradeUtils {
assertEquals(-1, convertCompletionTimeToEpoch(inValidInstant), "Epoch time
for invalid input should be -1.");
}
+
+ @Test
+ void testSetPropertiesBasedOnMetadataPartitionsWithEmptySet() {
+ TypedProperties properties = new TypedProperties();
+ Set<String> emptyPartitions = new HashSet<>();
+
+ setPropertiesBasedOnMetadataPartitions(properties, emptyPartitions, null);
+ assertEquals(FALSE,
properties.getString(HoodieMetadataConfig.ENABLE.key()));
+ }
+
+ @Test
+ void testSetPropertiesBasedOnMetadataPartitionsWithBloomFilters() {
+ TypedProperties properties = new TypedProperties();
+ Set<String> partitions = new HashSet<>();
+ partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS);
+ HoodieTable table = createMockTable(Option.empty());
+
+ setPropertiesBasedOnMetadataPartitions(properties, partitions, table);
+ assertEquals(TRUE,
properties.getString(HoodieMetadataConfig.ENABLE.key()));
+ assertEquals(TRUE,
properties.getString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER.key()));
+ }
+
+ @Test
+ void testSetPropertiesBasedOnMetadataPartitionsWithAnyOtherIndexes() {
+ TypedProperties properties = new TypedProperties();
+ Set<String> partitions = new HashSet<>();
+ partitions.add("any_other_index");
+ HoodieTable table = createMockTable(Option.empty());
+
+ setPropertiesBasedOnMetadataPartitions(properties, partitions, table);
+ assertEquals(TRUE,
properties.getString(HoodieMetadataConfig.ENABLE.key()));
+ }
+
+ @Test
+ void testSetPropertiesBasedOnMetadataPartitionsWithMultiplePartitions() {
+ TypedProperties properties = new TypedProperties();
+ Set<String> partitions = new HashSet<>();
+
+ partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS);
+ partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS);
+ partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX);
+ HoodieTable table = createMockTable(Option.empty());
+
+ setPropertiesBasedOnMetadataPartitions(properties, partitions, table);
+ assertEquals(TRUE,
properties.getString(HoodieMetadataConfig.ENABLE.key()));
+ assertEquals(TRUE,
properties.getString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key()));
+ assertEquals(TRUE,
properties.getString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER.key()));
+ assertEquals(TRUE,
properties.getString(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key()));
+ }
+
+ @Test
+ void
testSetPropertiesBasedOnMetadataPartitionsWithColumnStatsAndSourceFields() {
+ TypedProperties properties = new TypedProperties();
+ Set<String> partitions = new HashSet<>();
+ partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS);
+
+ Map<String, HoodieIndexDefinition> indexDefinitions = new HashMap<>();
+ List<String> sourceFields = Arrays.asList("field1", "field2", "field3");
+ HoodieIndexDefinition columnStatsDef = HoodieIndexDefinition.newBuilder()
+ .withIndexName(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
+ .withIndexType(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS)
+ .withSourceFields(sourceFields)
+ .build();
+ indexDefinitions.put(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS,
columnStatsDef);
+ HoodieIndexMetadata indexMetadata = new
HoodieIndexMetadata(indexDefinitions);
+ HoodieTable table = createMockTable(Option.of(indexMetadata));
+
+ setPropertiesBasedOnMetadataPartitions(properties, partitions, table);
+ assertEquals(TRUE,
properties.getString(HoodieMetadataConfig.ENABLE.key()));
+ assertEquals(TRUE,
properties.getString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key()));
+ assertEquals("field1,field2,field3",
properties.getString(HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key()));
+ }
+
+ @Test
+ void testSetPropertiesBasedOnMetadataPartitionsWithRecordIndexPartitioned() {
+ TypedProperties properties = new TypedProperties();
+ Set<String> partitions = new HashSet<>();
+ partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX);
+
+ Map<String, HoodieIndexDefinition> indexDefinitions = new HashMap<>();
+ Map<String, String> indexOptions = new HashMap<>();
+ indexOptions.put("isPartitioned", TRUE);
+ HoodieIndexDefinition recordIndexDef = HoodieIndexDefinition.newBuilder()
+ .withIndexName(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)
+ .withIndexType(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)
+ .withIndexOptions(indexOptions)
+ .build();
+ indexDefinitions.put(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX,
recordIndexDef);
+ HoodieIndexMetadata indexMetadata = new
HoodieIndexMetadata(indexDefinitions);
+ HoodieTable table = createMockTable(Option.of(indexMetadata));
+
+ setPropertiesBasedOnMetadataPartitions(properties, partitions, table);
+ assertEquals(TRUE,
properties.getString(HoodieMetadataConfig.ENABLE.key()));
+ assertEquals(TRUE,
properties.getString(HoodieMetadataConfig.PARTITIONED_RECORD_INDEX_ENABLE_PROP.key()));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void
testSetPropertiesBasedOnMetadataPartitionsWithRecordIndexNonPartitioned(boolean
explicitlySetPartitionedInIndexDef) {
+ TypedProperties properties = new TypedProperties();
+ Set<String> partitions = new HashSet<>();
+ partitions.add(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX);
+
+ Map<String, HoodieIndexDefinition> indexDefinitions = new HashMap<>();
+ Map<String, String> indexOptions = new HashMap<>();
+ if (explicitlySetPartitionedInIndexDef) {
+ indexOptions.put("isPartitioned", FALSE);
+ }
+ HoodieIndexDefinition recordIndexDef = HoodieIndexDefinition.newBuilder()
+ .withIndexName(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)
+ .withIndexType(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX)
+ .withIndexOptions(indexOptions)
+ .build();
+ indexDefinitions.put(HoodieTableMetadataUtil.PARTITION_NAME_RECORD_INDEX,
recordIndexDef);
+ HoodieIndexMetadata indexMetadata = new
HoodieIndexMetadata(indexDefinitions);
+ HoodieTable table = createMockTable(Option.of(indexMetadata));
+
+ setPropertiesBasedOnMetadataPartitions(properties, partitions, table);
+ assertEquals(TRUE,
properties.getString(HoodieMetadataConfig.ENABLE.key()));
+ assertEquals(TRUE,
properties.getString(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key()));
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
index 0053c3f4a977..f1297643c72f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestNewHoodieParquetFileFormat.java
@@ -53,7 +53,7 @@ public class TestNewHoodieParquetFileFormat extends
TestBootstrapReadBase {
@ParameterizedTest
@MethodSource("testArgs")
- public void testNewParquetFileFormat(HoodieTableType tableType, Integer
nPartitions) {
+ void testNewParquetFileFormat(HoodieTableType tableType, Integer
nPartitions) {
this.bootstrapType = nPartitions == 0 ? "metadata" : "mixed";
this.dashPartitions = true;
this.tableType = tableType;
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index 032eda704eb7..6ce364a1557a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -41,6 +41,8 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.spark.api.java.JavaRDD;
@@ -101,6 +103,20 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
}
}
+ @Test
+ public void testUpgradeDowngradeUtilsCompaction() throws Exception {
+ HoodieTableVersion originalVersion = HoodieTableVersion.EIGHT;
+ HoodieTableMetaClient originalMetaClient =
loadFixtureTable(HoodieTableVersion.EIGHT, "-mor");
+ HoodieWriteConfig config = createWriteConfig(originalMetaClient, false);
+
+ int numCompactionInstants =
originalMetaClient.getActiveTimeline().filterCompletedOrMajorOrMinorCompactionInstants().countInstants();
+ HoodieTable table = HoodieSparkTable.create(config, context(), metaClient);
+ UpgradeDowngradeUtils.rollbackFailedWritesAndCompact(table, context(),
config, SparkUpgradeDowngradeHelper.getInstance(), true, originalVersion);
+ originalMetaClient = HoodieTableMetaClient.reload(originalMetaClient);
+ assertEquals(numCompactionInstants + 1,
originalMetaClient.getActiveTimeline().filterCompletedOrMajorOrMinorCompactionInstants().countInstants());
+ assertTrue(originalMetaClient.getTableConfig().isMetadataTableAvailable());
+ }
+
@Disabled
@ParameterizedTest
@MethodSource("upgradeDowngradeVersionPairs")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala
index 372adbc713a3..d100f35f70b4 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestEightToNineUpgrade.scala
@@ -129,7 +129,7 @@ class TestEightToNineUpgrade extends
RecordLevelIndexTestBase {
val payloadClass = classOf[MySqlDebeziumAvroPayload].getName
var opts: Map[String, String] = Map(
HoodieWriteConfig.WRITE_PAYLOAD_CLASS_NAME.key() -> payloadClass,
- HoodieMetadataConfig.ENABLE.key() -> "false"
+ HoodieMetadataConfig.ENABLE.key() -> "true"
)
val columns = Seq("ts", "key", "rider", "driver",
DebeziumConstants.FLATTENED_FILE_COL_NAME,
DebeziumConstants.FLATTENED_POS_COL_NAME,
DebeziumConstants.ADDED_SEQ_COL_NAME,
DebeziumConstants.FLATTENED_OP_COL_NAME)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
index c588b3f4afbb..05ff2514911b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
@@ -21,10 +21,12 @@ import org.apache.hudi.common.config.HoodieConfig
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
HoodieTableVersion}
import
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR
import org.apache.hudi.common.util.{BinaryUtil, ConfigUtils, StringUtils}
+import org.apache.hudi.metadata.MetadataPartitionType
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.NAME_FORMAT_0_X
+import org.junit.jupiter.api.Assertions.assertTrue
import java.io.IOException
import java.time.Instant
@@ -148,7 +150,7 @@ class TestUpgradeOrDowngradeProcedure extends
HoodieSparkProcedureTestBase {
}
}
- test("Test downgrade table from version eight to version seven") {
+ test("Test downgrade table to version six") {
withTempDir { tmp =>
val tableName = generateTableName
val tablePath = s"${tmp.getCanonicalPath}/$tableName"
@@ -169,32 +171,37 @@ class TestUpgradeOrDowngradeProcedure extends
HoodieSparkProcedureTestBase {
| )
""".stripMargin)
+ spark.sql("set hoodie.merge.small.file.group.candidates.limit=0")
spark.sql("set hoodie.compact.inline=true")
- spark.sql("set hoodie.compact.inline.max.delta.commits=1")
+ spark.sql("set hoodie.compact.inline.max.delta.commits=4")
spark.sql("set hoodie.clean.commits.retained = 2")
spark.sql("set hoodie.keep.min.commits = 3")
spark.sql("set hoodie.keep.min.commits = 4")
+ spark.sql("set hoodie.metadata.record.index.enable = true")
+
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
- spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"update $tableName set name = 'a2' where id = 1")
+ spark.sql(s"update $tableName set name = 'a3' where id = 1")
var metaClient = createMetaClient(spark, tablePath)
- // verify hoodie.table.version of the table is EIGHT
- if
(metaClient.getTableConfig.getTableVersion.versionCode().equals(HoodieTableVersion.EIGHT.versionCode()))
{
- // downgrade table from version eight to version seven
- checkAnswer(s"""call downgrade_table(table => '$tableName', to_version
=> 'SEVEN')""")(Seq(true))
- metaClient = HoodieTableMetaClient.reload(metaClient)
- assertResult(HoodieTableVersion.SEVEN.versionCode) {
- metaClient.getTableConfig.getTableVersion.versionCode()
- }
- // Verify whether the naming format of instant files is consistent
with 0.x
-
metaClient.reloadActiveTimeline().getInstants.iterator().asScala.forall(f =>
NAME_FORMAT_0_X.matcher(INSTANT_FILE_NAME_GENERATOR.getFileName(f)).find())
- checkAnswer(s"select id, name, price, ts from $tableName")(
- Seq(1, "a1", 10.0, 1000)
- )
+ val numCompactionInstants =
metaClient.getActiveTimeline.filterCompletedOrMajorOrMinorCompactionInstants.countInstants
+ // Disabling record index should not affect downgrade
+ spark.sql("set hoodie.metadata.record.index.enable = false")
+ // downgrade table to version six
+ checkAnswer(s"""call downgrade_table(table => '$tableName', to_version
=> 'SIX')""")(Seq(true))
+ metaClient = createMetaClient(spark, tablePath)
+ assertResult(numCompactionInstants +
1)(metaClient.getActiveTimeline.filterCompletedOrMajorOrMinorCompactionInstants.countInstants)
+ assertResult(HoodieTableVersion.SIX.versionCode) {
+ metaClient.getTableConfig.getTableVersion.versionCode()
}
+ // Verify whether the naming format of instant files is consistent with
0.x
+
metaClient.reloadActiveTimeline().getInstants.iterator().asScala.forall(f =>
NAME_FORMAT_0_X.matcher(INSTANT_FILE_NAME_GENERATOR.getFileName(f)).find())
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a3", 10.0, 1000)
+ )
+ // Ensure files and record index partition are available after downgrade
+ assertTrue(metaClient.getTableConfig.isMetadataTableAvailable)
+
assertTrue(metaClient.getTableConfig.isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX))
}
}