This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 83eac32573a86262acc9eeb4827995be29ad3103
Author: Vamshi Krishna Kyatham 
<[email protected]>
AuthorDate: Fri Oct 24 22:51:48 2025 -0700

    fix: fix partition stats delete properly for downgrade from V9 to V8 
(#14138)
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |   5 +-
 .../hudi/table/upgrade/UpgradeDowngradeUtils.java  |  14 ++-
 .../upgrade/TestNineToEightDowngradeHandler.java   | 120 +++++++++++++++++++++
 3 files changed, 136 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index cb4c32049f1c..e5880bd29a54 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1112,10 +1112,11 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
       HoodieTableMetaClient metaClient = table.getMetaClient();
       // For secondary index and expression index with wrong parameters, index 
definition for the MDT partition is
       // removed so that such indices are not recreated while initializing the 
writer.
+      // Also remove index definitions for col stats and partition stats when 
they are dropped (e.g., during downgrade).
       metadataPartitions.forEach(partition -> {
-        if (MetadataPartitionType.isExpressionOrSecondaryIndex(partition)) {
+        metaClient.getIndexForMetadataPartition(partition).ifPresent(indexDef 
-> {
           metaClient.deleteIndexDefinition(partition);
-        }
+        });
       });
 
       Option<HoodieTableMetadataWriter> metadataWriterOpt = 
table.getMetadataWriter(dropInstant);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
index c3cc4eef4ed5..b7c51f31c5cf 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
@@ -57,6 +57,7 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.metadata.HoodieIndexVersion;
+import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -294,12 +295,23 @@ public class UpgradeDowngradeUtils {
                                               HoodieTable table, 
SupportsUpgradeDowngrade upgradeDowngradeHelper, String operationType) {
     HoodieTableMetaClient metaClient = table.getMetaClient();
     try (BaseHoodieWriteClient writeClient = 
upgradeDowngradeHelper.getWriteClient(config, context)) {
-      List<String> mdtPartitions = 
metaClient.getTableConfig().getMetadataPartitions()
+      Set<String> metadataPartitions = 
metaClient.getTableConfig().getMetadataPartitions();
+      List<String> mdtPartitions = metadataPartitions
           .stream()
           .filter(partition -> 
metaClient.getIndexForMetadataPartition(partition)
               .map(indexDef -> 
HoodieIndexVersion.V1.lowerThan(indexDef.getVersion()))
               .orElse(false))
           .collect(Collectors.toList());
+
+      // If col stats V2 is being deleted and partition stats exists, delete 
partition stats as well
+      // This handles the case where partition stats might not have an index 
definition in index.json
+      String colStatsPartition = 
MetadataPartitionType.COLUMN_STATS.getPartitionPath();
+      String partitionStatsPartition = 
MetadataPartitionType.PARTITION_STATS.getPartitionPath();
+      if (mdtPartitions.contains(colStatsPartition)
+          && metadataPartitions.contains(partitionStatsPartition)) {
+        mdtPartitions.add(partitionStatsPartition);
+      }
+
       LOG.info("Dropping from MDT partitions for {}: {}", operationType, 
mdtPartitions);
       if (!mdtPartitions.isEmpty()) {
         writeClient.dropIndex(mdtPartitions);
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java
index 4dc4123a9d90..c274c5c845c4 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestNineToEightDowngradeHandler.java
@@ -325,4 +325,124 @@ class TestNineToEightDowngradeHandler {
       assertEquals(new HashSet<>(Arrays.asList("secondary_index_v2")), new 
HashSet<>(capturedIndexes));
     }
   }
+
+  @Test
+  void testDowngradeDropsPartitionStatsWhenColStatsV2IsDeleted() {
+    try (MockedStatic<UpgradeDowngradeUtils> mockedUtils = 
Mockito.mockStatic(UpgradeDowngradeUtils.class)) {
+      mockedUtils.when(() -> 
UpgradeDowngradeUtils.rollbackFailedWritesAndCompact(
+          any(HoodieTable.class),
+          any(HoodieEngineContext.class),
+          any(HoodieWriteConfig.class),
+          any(SupportsUpgradeDowngrade.class),
+          anyBoolean(),
+          any(HoodieTableVersion.class)
+      )).thenAnswer(invocation -> null);
+
+      mockedUtils.when(() -> UpgradeDowngradeUtils.dropNonV1IndexPartitions(
+          eq(config),
+          eq(context),
+          eq(table),
+          eq(upgradeDowngradeHelper),
+          any(String.class)
+      )).thenCallRealMethod();
+
+      // Mocking index definitions: col_stats V2 is present but 
partition_stats is NOT in index.json
+      Map<String, HoodieIndexDefinition> indexDefs = new HashMap<>();
+      indexDefs.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(), 
HoodieIndexDefinition.newBuilder()
+          .withIndexName(MetadataPartitionType.COLUMN_STATS.getPartitionPath())
+          .withIndexType(MetadataPartitionType.COLUMN_STATS.getPartitionPath())
+          .withVersion(HoodieIndexVersion.V2)
+          .build());
+      HoodieIndexMetadata metadata = new HoodieIndexMetadata(indexDefs);
+
+      when(table.getMetaClient()).thenReturn(metaClient);
+      when(metaClient.getIndexMetadata()).thenReturn(Option.of(metadata));
+      when(metaClient.getTableConfig()).thenReturn(tableConfig);
+
+      // Mocking metadata partitions to include both col_stats and 
partition_stats
+      Set<String> mdtPartitions = new HashSet<>();
+      mdtPartitions.add(MetadataPartitionType.COLUMN_STATS.getPartitionPath());
+      
mdtPartitions.add(MetadataPartitionType.PARTITION_STATS.getPartitionPath()); // 
partition_stats exists but not in index.json
+      when(tableConfig.getMetadataPartitions()).thenReturn(mdtPartitions);
+
+      // Mocking getIndexForMetadataPartition to return the index definition 
for col_stats but empty for partition_stats
+      
when(metaClient.getIndexForMetadataPartition(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))
+          
.thenReturn(Option.of(indexDefs.get(MetadataPartitionType.COLUMN_STATS.getPartitionPath())));
+      
when(metaClient.getIndexForMetadataPartition(MetadataPartitionType.PARTITION_STATS.getPartitionPath()))
+          .thenReturn(Option.empty()); // partition_stats has no index 
definition
+
+      BaseHoodieWriteClient writeClient = mock(BaseHoodieWriteClient.class);
+      when(upgradeDowngradeHelper.getWriteClient(config, 
context)).thenReturn(writeClient);
+
+      handler.downgrade(config, context, "20240101120000", 
upgradeDowngradeHelper);
+
+      ArgumentCaptor<List<String>> argumentCaptor = 
ArgumentCaptor.forClass(List.class);
+      verify(writeClient, times(1)).dropIndex(argumentCaptor.capture());
+      List<String> capturedIndexes = argumentCaptor.getValue();
+      assertEquals(new HashSet<>(Arrays.asList(
+          MetadataPartitionType.COLUMN_STATS.getPartitionPath(),
+          MetadataPartitionType.PARTITION_STATS.getPartitionPath()
+      )), new HashSet<>(capturedIndexes));
+    }
+  }
+
+  @Test
+  void testDowngradeDoesNotDropV1Indexes() {
+    try (MockedStatic<UpgradeDowngradeUtils> mockedUtils = 
Mockito.mockStatic(UpgradeDowngradeUtils.class)) {
+      mockedUtils.when(() -> 
UpgradeDowngradeUtils.rollbackFailedWritesAndCompact(
+          any(HoodieTable.class),
+          any(HoodieEngineContext.class),
+          any(HoodieWriteConfig.class),
+          any(SupportsUpgradeDowngrade.class),
+          anyBoolean(),
+          any(HoodieTableVersion.class)
+      )).thenAnswer(invocation -> null);
+
+      mockedUtils.when(() -> UpgradeDowngradeUtils.dropNonV1IndexPartitions(
+          eq(config),
+          eq(context),
+          eq(table),
+          eq(upgradeDowngradeHelper),
+          any(String.class)
+      )).thenCallRealMethod();
+
+      // Mocking index definitions: col_stats and partition_stats both have V1 
(created in V8, upgraded to V9)
+      Map<String, HoodieIndexDefinition> indexDefs = new HashMap<>();
+      indexDefs.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(), 
HoodieIndexDefinition.newBuilder()
+          .withIndexName(MetadataPartitionType.COLUMN_STATS.getPartitionPath())
+          .withIndexType(MetadataPartitionType.COLUMN_STATS.getPartitionPath())
+          .withVersion(HoodieIndexVersion.V1)
+          .build());
+      indexDefs.put(MetadataPartitionType.PARTITION_STATS.getPartitionPath(), 
HoodieIndexDefinition.newBuilder()
+          
.withIndexName(MetadataPartitionType.PARTITION_STATS.getPartitionPath())
+          
.withIndexType(MetadataPartitionType.PARTITION_STATS.getPartitionPath())
+          .withVersion(HoodieIndexVersion.V1)
+          .build());
+      HoodieIndexMetadata metadata = new HoodieIndexMetadata(indexDefs);
+
+      when(table.getMetaClient()).thenReturn(metaClient);
+      when(metaClient.getIndexMetadata()).thenReturn(Option.of(metadata));
+      when(metaClient.getTableConfig()).thenReturn(tableConfig);
+
+      // Mocking metadata partitions to include both col_stats and 
partition_stats
+      Set<String> mdtPartitions = new HashSet<>();
+      mdtPartitions.add(MetadataPartitionType.COLUMN_STATS.getPartitionPath());
+      
mdtPartitions.add(MetadataPartitionType.PARTITION_STATS.getPartitionPath());
+      when(tableConfig.getMetadataPartitions()).thenReturn(mdtPartitions);
+
+      // Mocking getIndexForMetadataPartition to return V1 index definitions
+      
when(metaClient.getIndexForMetadataPartition(MetadataPartitionType.COLUMN_STATS.getPartitionPath()))
+          
.thenReturn(Option.of(indexDefs.get(MetadataPartitionType.COLUMN_STATS.getPartitionPath())));
+      
when(metaClient.getIndexForMetadataPartition(MetadataPartitionType.PARTITION_STATS.getPartitionPath()))
+          
.thenReturn(Option.of(indexDefs.get(MetadataPartitionType.PARTITION_STATS.getPartitionPath())));
+
+      BaseHoodieWriteClient writeClient = mock(BaseHoodieWriteClient.class);
+      when(upgradeDowngradeHelper.getWriteClient(config, 
context)).thenReturn(writeClient);
+
+      handler.downgrade(config, context, "20240101120000", 
upgradeDowngradeHelper);
+
+      // Verify that dropIndex is not called at all since there are no V2 
indexes to drop
+      verify(writeClient, times(0)).dropIndex(any());
+    }
+  }
 }

Reply via email to