This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1313b396c3c2 fix: fix downgrade to not delete unintended partitions in
MDT (#14162)
1313b396c3c2 is described below
commit 1313b396c3c229d5a67b8991b08cf4858e35fafc
Author: vamsikarnika <[email protected]>
AuthorDate: Tue Oct 28 10:28:51 2025 +0530
fix: fix downgrade to not delete unintended partitions in MDT (#14162)
---------
Co-authored-by: Vamsi <[email protected]>
Co-authored-by: sivabalan <[email protected]>
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 2 +-
.../java/org/apache/hudi/table/HoodieTable.java | 12 +--
.../hudi/table/upgrade/UpgradeDowngrade.java | 2 +-
.../org/apache/hudi/table/HoodieFlinkTable.java | 3 +-
.../org/apache/hudi/table/HoodieJavaTable.java | 9 ++-
.../hudi/client/StreamingMetadataWriteHandler.java | 2 +-
.../org/apache/hudi/table/HoodieSparkTable.java | 7 +-
.../hudi/table/upgrade/TestUpgradeDowngrade.java | 91 ++++++++++++++++++++--
8 files changed, 106 insertions(+), 22 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index e5880bd29a54..cc19970fb60c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1119,7 +1119,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
});
});
- Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(dropInstant);
+ Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(dropInstant, false, false);
// first update table config. Metadata writer initializes the inflight
metadata
// partitions so we need to first remove the metadata before creating
the writer
// Also the partitions need to be removed after creating the metadata
writer since the writer
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 1438b7e59712..4ae51ac75298 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -1085,7 +1085,7 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
* @return instance of {@link HoodieTableMetadataWriter}
*/
public final Option<HoodieTableMetadataWriter> getMetadataWriter(String
triggeringInstantTimestamp) {
- return getMetadataWriter(triggeringInstantTimestamp, false);
+ return getMetadataWriter(triggeringInstantTimestamp, false, true);
}
/**
@@ -1094,8 +1094,8 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
* @param triggeringInstantTimestamp - The instant that is triggering this
metadata write
* @return instance of {@link HoodieTableMetadataWriter}
*/
- public final Option<HoodieTableMetadataWriter> getMetadataWriter(String
triggeringInstantTimestamp, boolean streamingWrites) {
- return getMetadataWriter(triggeringInstantTimestamp, EAGER,
streamingWrites);
+ public final Option<HoodieTableMetadataWriter> getMetadataWriter(String
triggeringInstantTimestamp, boolean streamingWrites, boolean
autoDetectAndDeleteMetadataPartitions) {
+ return getMetadataWriter(triggeringInstantTimestamp, EAGER,
streamingWrites, autoDetectAndDeleteMetadataPartitions);
}
/**
@@ -1105,7 +1105,7 @@ public abstract class HoodieTable<T, I, K, O> implements
Serializable {
* @return An instance of {@link HoodieTableMetadataWriter}.
*/
public Option<HoodieTableMetadataWriter> getIndexingMetadataWriter(String
triggeringInstantTimestamp) {
- return getMetadataWriter(triggeringInstantTimestamp, LAZY, false);
+ return getMetadataWriter(triggeringInstantTimestamp, LAZY, false, false);
}
/**
@@ -1121,12 +1121,14 @@ public abstract class HoodieTable<T, I, K, O>
implements Serializable {
* @param triggeringInstantTimestamp The instant that is triggering this
metadata write
* @param failedWritesCleaningPolicy Cleaning policy on failed writes
* @param streamingWrites Whether streaming write is enabled
+ * @param autoDetectAndDeleteMetadataPartitions true when metadata
partitions could be deleted based on incoming write config properties.
* @return instance of {@link HoodieTableMetadataWriter}
*/
protected Option<HoodieTableMetadataWriter> getMetadataWriter(
String triggeringInstantTimestamp,
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
- boolean streamingWrites) {
+ boolean streamingWrites,
+ boolean autoDetectAndDeleteMetadataPartitions) {
// Each engine is expected to override this and
// provide the actual metadata writer, if enabled.
return Option.empty();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
index bc643313b99a..581452afa747 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
@@ -269,7 +269,7 @@ public class UpgradeDowngrade {
HoodieTable table = upgradeDowngradeHelper.getTable(updatedConfig,
context);
String newInstant = table.getMetaClient().createNewInstantTime(false);
- Option<HoodieTableMetadataWriter> mdtWriterOpt =
table.getMetadataWriter(newInstant);
+ Option<HoodieTableMetadataWriter> mdtWriterOpt =
table.getMetadataWriter(newInstant, false, false);
mdtWriterOpt.ifPresent(mdtWriter -> {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
commitMetadata.setOperationType(WriteOperationType.UPSERT);
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 403526d23aba..910a66a2c3f0 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -100,7 +100,8 @@ public abstract class HoodieFlinkTable<T>
protected Option<HoodieTableMetadataWriter> getMetadataWriter(
String triggeringInstantTimestamp,
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
- boolean streamingWrites) {
+ boolean streamingWrites,
+ boolean autoDetectAndDeleteMetadataPartitions) {
if (isMetadataTable()) {
return Option.empty();
}
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
index 77200c69cf5e..32d6e7d281ec 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaTable.java
@@ -81,7 +81,8 @@ public abstract class HoodieJavaTable<T>
@Override
protected Option<HoodieTableMetadataWriter> getMetadataWriter(String
triggeringInstantTimestamp,
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
- boolean
streamingWrites) {
+ boolean
streamingWrites,
+ boolean
autoDetectAndDeleteMetadataPartitions) {
if (isMetadataTable()) {
return Option.empty();
}
@@ -93,8 +94,10 @@ public abstract class HoodieJavaTable<T>
getContext().getStorageConf(), config, failedWritesCleaningPolicy,
getContext(),
Option.of(triggeringInstantTimestamp));
// even with metadata enabled, some index could have been disabled
- // delete metadata partitions corresponding to such indexes
- deleteMetadataIndexIfNecessary();
+ // delete metadata partitions corresponding to such indexes if
autoDetectAndDeleteMdtPartitions is enabled
+ if (autoDetectAndDeleteMetadataPartitions) {
+ deleteMetadataIndexIfNecessary();
+ }
try {
if (isMetadataTableExists || metaClient.getStorage().exists(
HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))) {
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java
index 93cf377cb6be..470f10013ccb 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java
@@ -137,7 +137,7 @@ public class StreamingMetadataWriteHandler {
return this.metadataWriterMap.get(triggeringInstant);
}
- Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(triggeringInstant, true);
+ Option<HoodieTableMetadataWriter> metadataWriterOpt =
table.getMetadataWriter(triggeringInstant, true, true);
metadataWriterMap.put(triggeringInstant, metadataWriterOpt); // populate
this for every new instant time.
// if metadata table does not exist, the map will contain an entry, with
value Option.empty.
// if not, it will contain the metadata writer instance.
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index a9276fd37902..fa66af392713 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -96,14 +96,17 @@ public abstract class HoodieSparkTable<T>
protected Option<HoodieTableMetadataWriter> getMetadataWriter(
String triggeringInstantTimestamp,
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
- boolean streamingWrites) {
+ boolean streamingWrites,
+ boolean autoDetectAndDeleteMetadataPartitions) {
if (isMetadataTable()) {
return Option.empty();
}
if (config.isMetadataTableEnabled()) {
// if any partition is deleted, we need to reload the metadata table
writer so that new table configs are picked up
// to reflect the delete mdt partitions.
- deleteMetadataIndexIfNecessary();
+ if (autoDetectAndDeleteMetadataPartitions) {
+ deleteMetadataIndexIfNecessary();
+ }
// Create the metadata table writer. First time after the upgrade this
creation might trigger
// metadata table bootstrapping. Bootstrapping process could fail and
checking the table
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index 5ae4fc222a1c..032eda704eb7 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -18,8 +18,13 @@
package org.apache.hudi.table.upgrade;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
@@ -27,15 +32,18 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.Disabled;
@@ -49,6 +57,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.net.URI;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@@ -57,6 +66,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
import static org.apache.hudi.keygen.KeyGenUtils.getComplexKeygenErrorMessage;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -98,31 +108,31 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
boolean isUpgrade = fromVersion.lesserThan(toVersion);
String operation = isUpgrade ? "upgrade" : "downgrade";
LOG.info("Testing {} from version {} to {}", operation, fromVersion,
toVersion);
-
+
HoodieTableMetaClient originalMetaClient = loadFixtureTable(fromVersion,
suffix);
assertEquals(fromVersion,
originalMetaClient.getTableConfig().getTableVersion(),
"Fixture table should be at expected version");
-
+
HoodieWriteConfig config = createWriteConfig(originalMetaClient, true);
-
+
int initialPendingCommits =
originalMetaClient.getCommitsTimeline().filterPendingExcludingCompaction().countInstants();
int initialCompletedCommits =
originalMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants();
-
+
Dataset<Row> originalData = readTableData(originalMetaClient, "before " +
operation);
-
+
// Confirm that there are log files before rollback and compaction
operations
if (isRollbackAndCompactTransition(fromVersion, toVersion)) {
validateLogFilesCount(originalMetaClient, operation,
suffix.equals("-mor"));
}
-
+
new UpgradeDowngrade(originalMetaClient, config, context(),
SparkUpgradeDowngradeHelper.getInstance())
.run(toVersion, null);
-
+
HoodieTableMetaClient resultMetaClient = HoodieTableMetaClient.builder()
.setConf(storageConf().newInstance())
.setBasePath(originalMetaClient.getBasePath())
.build();
-
+
assertTableVersionOnDataAndMetadataTable(resultMetaClient, toVersion);
validateVersionSpecificProperties(resultMetaClient, toVersion);
validateDataConsistency(originalData, resultMetaClient, "after " +
operation);
@@ -376,6 +386,62 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
}
}
+ @ParameterizedTest
+ @MethodSource("testMdtValidationDowngrade")
+ public void
testMdtPartitionNotDroppedWhenDowngradedFromTableVersionNine(HoodieTableType
tableType, boolean mdtEnabled) throws Exception {
+ HoodieTableVersion fromVersion = HoodieTableVersion.NINE;
+ HoodieTableVersion toVersion = HoodieTableVersion.EIGHT;
+
+ Properties props = new Properties();
+ props.put(HoodieTableConfig.TYPE.key(), tableType.name());
+ HoodieTableMetaClient metaClient =
+ getHoodieMetaClient(storageConf(), URI.create(basePath()).getPath(),
props);
+
+ HoodieWriteConfig writeConfig = getConfigBuilder(true)
+ .withPath(metaClient.getBasePath())
+ .withWriteTableVersion(fromVersion.versionCode())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .withEnableRecordIndex(true).build())
+ .withProps(props)
+ .build();
+
+ SparkRDDWriteClient writeClient = new SparkRDDWriteClient(context(),
writeConfig);
+ String partitionPath = "2021/09/11";
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[]{partitionPath});
+
+ String instant1 = getCommitTimeAtUTC(1);
+ List<HoodieRecord> records = dataGenerator.generateInserts(instant1, 100);
+ JavaRDD<HoodieRecord> dataset = jsc().parallelize(records, 2);
+
+ WriteClientTestUtils.startCommitWithTime(writeClient, instant1);
+ writeClient.commit(instant1, writeClient.insert(dataset, instant1));
+ metaClient.reloadTableConfig();
+
+ // verify record index partition exists before downgrade
+
assertTrue(metaClient.getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath()));
+
+ HoodieWriteConfig.Builder upgradeWriteConfig =
HoodieWriteConfig.newBuilder()
+ .withPath(metaClient.getBasePath())
+ .withProps(props);
+ if (mdtEnabled) {
+
upgradeWriteConfig.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(false).build());
+ } else {
+
upgradeWriteConfig.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build());
+ }
+
+ new UpgradeDowngrade(metaClient, upgradeWriteConfig.build(), context(),
SparkUpgradeDowngradeHelper.getInstance())
+ .run(toVersion, null);
+
+ HoodieTableMetaClient resultMetaClient = HoodieTableMetaClient.builder()
+ .setConf(storageConf().newInstance())
+ .setBasePath(metaClient.getBasePath())
+ .build();
+
+ resultMetaClient.reloadTableConfig();
+ // verify record index partition exists after downgrade
+
assertTrue(resultMetaClient.getTableConfig().getMetadataPartitions().contains(MetadataPartitionType.RECORD_INDEX.getPartitionPath()));
+ }
+
/**
* Load a fixture table from resources and copy it to a temporary location
for testing.
*/
@@ -536,6 +602,15 @@ public class TestUpgradeDowngrade extends
SparkClientFunctionalTestHarness {
);
}
+ private static Stream<Arguments> testMdtValidationDowngrade() {
+ return Stream.of(
+ Arguments.of(HoodieTableType.COPY_ON_WRITE, true),
+ Arguments.of(HoodieTableType.COPY_ON_WRITE, false),
+ Arguments.of(HoodieTableType.MERGE_ON_READ, true),
+ Arguments.of(HoodieTableType.MERGE_ON_READ, false)
+ );
+ }
+
private static Stream<Arguments> testArgsPayloadUpgradeDowngrade() {
String[] payloadTypes = {
"default", "overwrite", "partial", "postgres", "mysql",