This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 83eac32573a86262acc9eeb4827995be29ad3103 Author: Vamshi Krishna Kyatham <[email protected]> AuthorDate: Fri Oct 24 22:51:48 2025 -0700 fix: fix partition stats delete properly for downgrade from V9 to V8 (#14138) --- .../apache/hudi/client/BaseHoodieWriteClient.java | 5 +- .../hudi/table/upgrade/UpgradeDowngradeUtils.java | 14 ++- .../upgrade/TestNineToEightDowngradeHandler.java | 120 +++++++++++++++++++++ 3 files changed, 136 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index cb4c32049f1c..e5880bd29a54 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -1112,10 +1112,11 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> extends BaseHoodieClient HoodieTableMetaClient metaClient = table.getMetaClient(); // For secondary index and expression index with wrong parameters, index definition for the MDT partition is // removed so that such indices are not recreated while initializing the writer. + // Also remove index definitions for col stats and partition stats when they are dropped (e.g., during downgrade). metadataPartitions.forEach(partition -> { - if (MetadataPartitionType.isExpressionOrSecondaryIndex(partition)) { + metaClient.getIndexForMetadataPartition(partition).ifPresent(indexDef -> { metaClient.deleteIndexDefinition(partition); - } + }); }); Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(dropInstant); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java index c3cc4eef4ed5..b7c51f31c5cf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java @@ -57,6 +57,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.metadata.HoodieIndexVersion; +import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -294,12 +295,23 @@ public class UpgradeDowngradeUtils { HoodieTable table, SupportsUpgradeDowngrade upgradeDowngradeHelper, String operationType) { HoodieTableMetaClient metaClient = table.getMetaClient(); try (BaseHoodieWriteClient writeClient = upgradeDowngradeHelper.getWriteClient(config, context)) { - List<String> mdtPartitions = metaClient.getTableConfig().getMetadataPartitions() + Set<String> metadataPartitions = metaClient.getTableConfig().getMetadataPartitions(); + List<String> mdtPartitions = metadataPartitions .stream() .filter(partition -> metaClient.getIndexForMetadataPartition(partition) .map(indexDef -> HoodieIndexVersion.V1.lowerThan(indexDef.getVersion())) .orElse(false)) .collect(Collectors.toList()); + + // If col stats V2 is being deleted and partition stats exists, delete partition stats as well + // This handles the case where partition stats might not have an index definition in index.json + String colStatsPartition = MetadataPartitionType.COLUMN_STATS.getPartitionPath(); + String partitionStatsPartition = MetadataPartitionType.PARTITION_STATS.getPartitionPath(); + if (mdtPartitions.contains(colStatsPartition) + && metadataPartitions.contains(partitionStatsPartition)) { + mdtPartitions.add(partitionStatsPartition); + } + LOG.info("Dropping from MDT partitions for {}: {}", operationType, mdtPartitions); if (!mdtPartitions.isEmpty()) { writeClient.dropIndex(mdtPartitions); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java index 4dc4123a9d90..c274c5c845c4 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java @@ -325,4 +325,124 @@ class TestNineToEightDowngradeHandler { assertEquals(new HashSet<>(Arrays.asList("secondary_index_v2")), new HashSet<>(capturedIndexes)); } } + + @Test + void testDowngradeDropsPartitionStatsWhenColStatsV2IsDeleted() { + try (MockedStatic<UpgradeDowngradeUtils> mockedUtils = Mockito.mockStatic(UpgradeDowngradeUtils.class)) { + mockedUtils.when(() -> UpgradeDowngradeUtils.rollbackFailedWritesAndCompact( + any(HoodieTable.class), + any(HoodieEngineContext.class), + any(HoodieWriteConfig.class), + any(SupportsUpgradeDowngrade.class), + anyBoolean(), + any(HoodieTableVersion.class) + )).thenAnswer(invocation -> null); + + mockedUtils.when(() -> UpgradeDowngradeUtils.dropNonV1IndexPartitions( + eq(config), + eq(context), + eq(table), + eq(upgradeDowngradeHelper), + any(String.class) + )).thenCallRealMethod(); + + // Mocking index definitions: col_stats V2 is present but partition_stats is NOT in index.json + Map<String, HoodieIndexDefinition> indexDefs = new HashMap<>(); + indexDefs.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(), HoodieIndexDefinition.newBuilder() + .withIndexName(MetadataPartitionType.COLUMN_STATS.getPartitionPath()) + .withIndexType(MetadataPartitionType.COLUMN_STATS.getPartitionPath()) + .withVersion(HoodieIndexVersion.V2) + .build()); + HoodieIndexMetadata metadata = new HoodieIndexMetadata(indexDefs); + + when(table.getMetaClient()).thenReturn(metaClient); + when(metaClient.getIndexMetadata()).thenReturn(Option.of(metadata)); + when(metaClient.getTableConfig()).thenReturn(tableConfig); + + // Mocking metadata partitions to include both col_stats and partition_stats + Set<String> mdtPartitions = new HashSet<>(); + mdtPartitions.add(MetadataPartitionType.COLUMN_STATS.getPartitionPath()); + mdtPartitions.add(MetadataPartitionType.PARTITION_STATS.getPartitionPath()); // partition_stats exists but not in index.json + when(tableConfig.getMetadataPartitions()).thenReturn(mdtPartitions); + + // Mocking getIndexForMetadataPartition to return the index definition for col_stats but empty for partition_stats + when(metaClient.getIndexForMetadataPartition(MetadataPartitionType.COLUMN_STATS.getPartitionPath())) + .thenReturn(Option.of(indexDefs.get(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))); + when(metaClient.getIndexForMetadataPartition(MetadataPartitionType.PARTITION_STATS.getPartitionPath())) + .thenReturn(Option.empty()); // partition_stats has no index definition + + BaseHoodieWriteClient writeClient = mock(BaseHoodieWriteClient.class); + when(upgradeDowngradeHelper.getWriteClient(config, context)).thenReturn(writeClient); + + handler.downgrade(config, context, "20240101120000", upgradeDowngradeHelper); + + ArgumentCaptor<List<String>> argumentCaptor = ArgumentCaptor.forClass(List.class); + verify(writeClient, times(1)).dropIndex(argumentCaptor.capture()); + List<String> capturedIndexes = argumentCaptor.getValue(); + assertEquals(new HashSet<>(Arrays.asList( + MetadataPartitionType.COLUMN_STATS.getPartitionPath(), + MetadataPartitionType.PARTITION_STATS.getPartitionPath() + )), new HashSet<>(capturedIndexes)); + } + } + + @Test + void testDowngradeDoesNotDropV1Indexes() { + try (MockedStatic<UpgradeDowngradeUtils> mockedUtils = Mockito.mockStatic(UpgradeDowngradeUtils.class)) { + mockedUtils.when(() -> UpgradeDowngradeUtils.rollbackFailedWritesAndCompact( + any(HoodieTable.class), + any(HoodieEngineContext.class), + any(HoodieWriteConfig.class), + any(SupportsUpgradeDowngrade.class), + anyBoolean(), + any(HoodieTableVersion.class) + )).thenAnswer(invocation -> null); + + mockedUtils.when(() -> UpgradeDowngradeUtils.dropNonV1IndexPartitions( + eq(config), + eq(context), + eq(table), + eq(upgradeDowngradeHelper), + any(String.class) + )).thenCallRealMethod(); + + // Mocking index definitions: col_stats and partition_stats both have V1 (created in V8, upgraded to V9) + Map<String, HoodieIndexDefinition> indexDefs = new HashMap<>(); + indexDefs.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(), HoodieIndexDefinition.newBuilder() + .withIndexName(MetadataPartitionType.COLUMN_STATS.getPartitionPath()) + .withIndexType(MetadataPartitionType.COLUMN_STATS.getPartitionPath()) + .withVersion(HoodieIndexVersion.V1) + .build()); + indexDefs.put(MetadataPartitionType.PARTITION_STATS.getPartitionPath(), HoodieIndexDefinition.newBuilder() + .withIndexName(MetadataPartitionType.PARTITION_STATS.getPartitionPath()) + .withIndexType(MetadataPartitionType.PARTITION_STATS.getPartitionPath()) + .withVersion(HoodieIndexVersion.V1) + .build()); + HoodieIndexMetadata metadata = new HoodieIndexMetadata(indexDefs); + + when(table.getMetaClient()).thenReturn(metaClient); + when(metaClient.getIndexMetadata()).thenReturn(Option.of(metadata)); + when(metaClient.getTableConfig()).thenReturn(tableConfig); + + // Mocking metadata partitions to include both col_stats and partition_stats + Set<String> mdtPartitions = new HashSet<>(); + mdtPartitions.add(MetadataPartitionType.COLUMN_STATS.getPartitionPath()); + mdtPartitions.add(MetadataPartitionType.PARTITION_STATS.getPartitionPath()); + when(tableConfig.getMetadataPartitions()).thenReturn(mdtPartitions); + + // Mocking getIndexForMetadataPartition to return V1 index definitions + when(metaClient.getIndexForMetadataPartition(MetadataPartitionType.COLUMN_STATS.getPartitionPath())) + .thenReturn(Option.of(indexDefs.get(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))); + when(metaClient.getIndexForMetadataPartition(MetadataPartitionType.PARTITION_STATS.getPartitionPath())) + .thenReturn(Option.of(indexDefs.get(MetadataPartitionType.PARTITION_STATS.getPartitionPath()))); + + BaseHoodieWriteClient writeClient = mock(BaseHoodieWriteClient.class); + when(upgradeDowngradeHelper.getWriteClient(config, context)).thenReturn(writeClient); + + handler.downgrade(config, context, "20240101120000", upgradeDowngradeHelper); + + // Verify that dropIndex is not called at all since there are no V2 indexes to drop + verify(writeClient, times(0)).dropIndex(any()); + } + } }
