This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d13310c6c119 perf(table-services): Incremental clean planning (for 
COW) should ignore partitions from instants with only new file groups (#18016)
d13310c6c119 is described below

commit d13310c6c1191a1bf4ed5cc0c9486aa3ab3d4dc5
Author: Krishen <[email protected]>
AuthorDate: Mon Mar 9 18:09:17 2026 -0700

    perf(table-services): Incremental clean planning (for COW) should ignore 
partitions from instants with only new file groups (#18016)
    
    Summary and Changelog:
    Optimizes the incremental clean planner to only target partitions in commit 
instants that have modified existing file groups.
    Changes:
    
    Added new getWritePartitionPathsWithExistingFileGroupsModified() API for 
commit/replacecommit metadata which returns the distinct set of partitions that 
the instant left uncleaned files in (due to a updating or replacing file groups)
    Updated CleanPlanner#getPartitionsForInstants() to use 
getWritePartitionPathsWithExistingFileGroupsModified() instead of returning all 
partitions from write stats
    Added unit tests for getWritePartitionPathsWithExistingFileGroupsModified() 
covering insert-only, update-only, and mixed scenarios
    Behavior change: When clean planner incrementally processes instants since 
the last earliest-commit-to-retain (ECTR), it now only selects partitions where 
file groups were actually updated or replaced. Insert-only operations that 
create new file groups in a partition, no longer trigger unnecessary partition 
scans during cleaning.
    Impact
    No public API changes. Internal performance optimization that can reduce 
the number of partitions scanned during incremental cleaning. For workloads 
with many insert-only commits touching thousands of partitions, this 
significantly reduces clean planning overhead.
    
    ---------
    
    Co-authored-by: Krishen Bhan <“[email protected]”>
---
 .../hudi/table/action/clean/CleanPlanner.java      |  13 ++
 .../java/org/apache/hudi/table/TestCleaner.java    | 199 +++++++++++++++++++++
 .../hudi/common/model/HoodieCommitMetadata.java    |  15 ++
 .../common/model/TestHoodieCommitMetadata.java     | 104 +++++++++++
 4 files changed, 331 insertions(+)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index d5d5b5ebabe2..772f0a236219 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -32,6 +32,8 @@ import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import 
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler;
@@ -241,6 +243,17 @@ public class CleanPlanner<T, I, K, O> implements 
Serializable {
       } else {
         HoodieCommitMetadata commitMetadata =
             hoodieTable.getActiveTimeline().readCommitMetadata(instant);
+        WriteOperationType operationType = commitMetadata.getOperationType();
+        if ((WriteOperationType.isUpsert(operationType) || 
WriteOperationType.isInsertWithoutReplace(operationType))
+            && HoodieTimeline.COMMIT_ACTION.equals(instant.getAction())
+            && 
hoodieTable.getMetaClient().getTableType().equals(HoodieTableType.COPY_ON_WRITE))
 {
+          // For COW upsert/insert, only check partitions where the write 
updated an existing file slice
+          // (leaving behind an older version to clean). Partitions with only 
new file slices have nothing to clean yet.
+          return 
commitMetadata.getWritePartitionPathsWithUpdatedFileGroups().stream();
+        }
+        // For MOR, small file handling during inserts can cause deltacommits 
to create new base files (file slices)
+        // in existing file groups, so their partitions must still be returned.
+        // TODO: filter MOR deltacommit operation types guaranteed to not 
create new file slices for existing file groups
         return commitMetadata.getPartitionToWriteStats().keySet().stream();
       }
     } catch (IOException e) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 34ea53573d88..24fcb656ef1d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -40,6 +40,7 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -49,6 +50,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.model.TableServiceType;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -125,6 +127,7 @@ public class TestCleaner extends HoodieCleanerTestBase {
 
   private static final int BIG_BATCH_INSERT_SIZE = 500;
   private static final int PARALLELISM = 10;
+  private static final String BASE_FILE_EXTENSION = 
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
 
   /**
    * Helper method to do first batch of insert for clean by versions/commits 
tests.
@@ -1326,4 +1329,200 @@ public class TestCleaner extends HoodieCleanerTestBase {
     HoodieCleanerPlan cleanPlan = 
metaClient.reloadActiveTimeline().readCleanerPlan(metaClient.createNewInstant(State.REQUESTED,
 HoodieTimeline.CLEAN_ACTION, cleanInstant.get()));
     return Pair.of(cleanInstant.orElse(null), cleanPlan);
   }
+
+  /**
+   * Test that incremental clean properly scans partitions from different 
operation types on a COW table.
+   * This test verifies that with incremental clean enabled and 
KEEP_LATEST_COMMITS policy,
+   * CleanPlanner::getPartitionsForInstants correctly identifies partitions to 
clean:
+   * - Partitions with only inserts (new file groups) are NOT included in 
clean plan
+   *   (because getWritePartitionPathsWithUpdatedFileGroups() returns empty 
for insert-only partitions)
+   * - Partitions with upsert (existing file groups modified) ARE included in 
clean plan
+   */
+  @Test
+  public void 
testIncrementalCleanPartitionScanningWithDifferentOperationTypesCOW() throws 
Exception {
+    // Setup: Keep latest 1 commit, incremental clean enabled
+    HoodieWriteConfig config = getConfigBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withIncrementalCleaningMode(true)
+            .withAutoClean(false)
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+            .retainCommits(1)
+            .build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+            .withAutoArchive(false)
+            .build())
+        .withEmbeddedTimelineServerEnabled(false)
+        .build();
+
+    // Define partitions for different operation types
+    final String partitionInsertOnly = "2020/01/01";  // Insert only - should 
NOT be in clean plan
+    final String partitionUpsert = "2020/01/02";      // Upsert - should be in 
clean plan
+
+    HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
getMetadataWriter(config), Option.of(context));
+    try {
+      testTable.withPartitionMetaFiles(partitionInsertOnly, partitionUpsert);
+
+      // File IDs for each partition
+      String fileIdUpsert = UUID.randomUUID().toString();
+      String fileIdInsertOnly = UUID.randomUUID().toString();
+
+      // Commit 1: Initial insert to partitionUpsert using the standard 
commitWithMdt pattern
+      Map<String, List<String>> part1ToFileId = new HashMap<>();
+      part1ToFileId.put(partitionUpsert, 
Collections.singletonList(fileIdUpsert));
+      commitWithMdt("001", part1ToFileId, testTable, config, true, false);
+      testTable = tearDownTestTableAndReinit(testTable, config);
+
+      // Commit 2: Upsert on partitionUpsert (same fileId = update)
+      part1ToFileId = new HashMap<>();
+      part1ToFileId.put(partitionUpsert, 
Collections.singletonList(fileIdUpsert));
+      commitWithMdt("002", part1ToFileId, testTable, config, true, false);
+      testTable = tearDownTestTableAndReinit(testTable, config);
+
+      // Commit 3: Insert to partitionInsertOnly (new file group)
+      Map<String, List<String>> part2ToFileId = new HashMap<>();
+      part2ToFileId.put(partitionInsertOnly, 
Collections.singletonList(fileIdInsertOnly));
+      commitWithMdt("003", part2ToFileId, testTable, config, true, false);
+      testTable = tearDownTestTableAndReinit(testTable, config);
+
+      // Commit 4: Upsert on partitionUpsert (creates third file version - 
this will be retained)
+      part1ToFileId = new HashMap<>();
+      part1ToFileId.put(partitionUpsert, 
Collections.singletonList(fileIdUpsert));
+      commitWithMdt("004", part1ToFileId, testTable, config, true, false);
+      testTable = tearDownTestTableAndReinit(testTable, config);
+
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+
+      // Run cleaner
+      List<HoodieCleanStat> cleanStats = runCleaner(config);
+
+      // Verify: partitionUpsert should have files cleaned, 
partitionInsertOnly should NOT
+      // Check that partitionInsertOnly is not in the cleaned partitions (or 
has 0 files cleaned)
+      boolean insertOnlyPartitionCleaned = cleanStats.stream()
+          .anyMatch(stat -> stat.getPartitionPath().equals(partitionInsertOnly)
+              && stat.getSuccessDeleteFiles().size() > 0);
+      assertFalse(insertOnlyPartitionCleaned,
+          "Insert-only partition should NOT have files cleaned as it only has 
new file groups with no older versions");
+
+      // Check that partitionUpsert IS in the cleaned partitions (has old file 
versions to clean)
+      boolean upsertPartitionCleaned = cleanStats.stream()
+          .anyMatch(stat -> stat.getPartitionPath().equals(partitionUpsert)
+              && stat.getSuccessDeleteFiles().size() > 0);
+      assertTrue(upsertPartitionCleaned,
+          "Upsert partition should be in clean plan as it has updated file 
groups with older versions");
+    } finally {
+      testTable.close();
+    }
+  }
+
+  /**
+   * Test that incremental clean properly scans partitions from different 
operation types on a MOR table.
+   * This test verifies that with incremental clean enabled and 
KEEP_LATEST_COMMITS policy,
+   * CleanPlanner::getPartitionsForInstants correctly identifies partitions to 
clean:
+   * - Partitions with only delta commits (not part of compaction) are NOT 
included in clean plan
+   *   (because getPartitionsForInstants returns Stream.empty() for delta 
commits in MOR)
+   * - Partitions that are part of compaction ARE included in clean plan
+   */
+  @Test
+  public void 
testIncrementalCleanPartitionScanningWithDifferentOperationTypesMOR() throws 
Exception {
+    // Initialize MOR table
+    HoodieTestUtils.init(storageConf, basePath, HoodieTableType.MERGE_ON_READ);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    // Setup: Keep latest 1 commit, incremental clean enabled
+    // Disable metadata to avoid issues with file size validation
+    HoodieWriteConfig config = getConfigBuilder()
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withIncrementalCleaningMode(true)
+            .withAutoClean(false)
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+            .retainCommits(1)
+            .build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+            .withAutoArchive(false)
+            .build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(false)
+            .build())
+        .withEmbeddedTimelineServerEnabled(false)
+        .build();
+
+    // Define partitions for different operation types
+    final String partitionDeltaOnly = "2020/01/01";    // Delta commits only - 
NOT part of compaction, should NOT be in clean plan
+    final String partitionCompaction = "2020/01/02";   // Delta commits + 
compaction - should be in clean plan
+
+    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+    try {
+      testTable.withPartitionMetaFiles(partitionDeltaOnly, 
partitionCompaction);
+
+      // File IDs for each partition
+      String fileIdCompaction = UUID.randomUUID().toString();
+      String fileIdDeltaOnly = UUID.randomUUID().toString();
+
+      // Delta commit 1: Insert to partitionCompaction (creates base file)
+      testTable.addDeltaCommit("001")
+          .withBaseFilesInPartition(partitionCompaction, fileIdCompaction);
+
+      // Delta commit 2: Upsert to partitionCompaction (creates log files)
+      testTable.addDeltaCommit("002")
+          .withLogFile(partitionCompaction, fileIdCompaction, 1);
+
+      // First compaction on partitionCompaction (creates new base file)
+      testTable.addRequestedCompaction("003", new 
FileSlice(partitionCompaction, "001", fileIdCompaction));
+      HoodieCommitMetadata compaction1Metadata = new HoodieCommitMetadata();
+      compaction1Metadata.setOperationType(WriteOperationType.COMPACT);
+      HoodieWriteStat statC1 = new HoodieWriteStat();
+      statC1.setPartitionPath(partitionCompaction);
+      statC1.setFileId(fileIdCompaction);
+      statC1.setPath(partitionCompaction + "/" + 
FSUtils.makeBaseFileName("003", "0", fileIdCompaction, BASE_FILE_EXTENSION));
+      compaction1Metadata.addWriteStat(partitionCompaction, statC1);
+      testTable.addCompaction("003", compaction1Metadata)
+          .withBaseFilesInPartition(partitionCompaction, fileIdCompaction);
+
+      // Delta commit 3: Upsert to partitionCompaction (creates log files 
after compaction)
+      testTable.addDeltaCommit("004")
+          .withLogFile(partitionCompaction, fileIdCompaction, 2);
+
+      // Second compaction on partitionCompaction (creates another base file 
version)
+      testTable.addRequestedCompaction("005", new 
FileSlice(partitionCompaction, "003", fileIdCompaction));
+      HoodieCommitMetadata compaction2Metadata = new HoodieCommitMetadata();
+      compaction2Metadata.setOperationType(WriteOperationType.COMPACT);
+      HoodieWriteStat statC2 = new HoodieWriteStat();
+      statC2.setPartitionPath(partitionCompaction);
+      statC2.setFileId(fileIdCompaction);
+      statC2.setPath(partitionCompaction + "/" + 
FSUtils.makeBaseFileName("005", "0", fileIdCompaction, BASE_FILE_EXTENSION));
+      compaction2Metadata.addWriteStat(partitionCompaction, statC2);
+      testTable.addCompaction("005", compaction2Metadata)
+          .withBaseFilesInPartition(partitionCompaction, fileIdCompaction);
+
+      // Delta commit 4: Insert to partitionDeltaOnly (creates base file - 
never compacted)
+      testTable.addDeltaCommit("006")
+          .withBaseFilesInPartition(partitionDeltaOnly, fileIdDeltaOnly);
+
+      // Delta commit 5: Upsert to partitionDeltaOnly (creates log file - 
still never compacted)
+      testTable.addDeltaCommit("007")
+          .withLogFile(partitionDeltaOnly, fileIdDeltaOnly, 1);
+
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+
+      // Run cleaner
+      List<HoodieCleanStat> cleanStats = runCleaner(config);
+
+      // Verify: partitionCompaction should have files cleaned, 
partitionDeltaOnly should NOT
+      // Check that partitionDeltaOnly is not in the cleaned partitions (or 
has 0 files cleaned)
+      boolean deltaOnlyPartitionCleaned = cleanStats.stream()
+          .anyMatch(stat -> stat.getPartitionPath().equals(partitionDeltaOnly)
+              && stat.getSuccessDeleteFiles().size() > 0);
+      assertFalse(deltaOnlyPartitionCleaned,
+          "Delta-only partition should NOT have files cleaned as it was never 
part of any compaction");
+
+      // Check that partitionCompaction IS in the cleaned partitions
+      boolean compactionPartitionCleaned = cleanStats.stream()
+          .anyMatch(stat -> stat.getPartitionPath().equals(partitionCompaction)
+              && stat.getSuccessDeleteFiles().size() > 0);
+      assertTrue(compactionPartitionCleaned,
+          "Compaction partition should be in clean plan as it has old base 
files from compaction");
+    } finally {
+      testTable.close();
+    }
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 93bb06077265..51171b83b0e6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.model;
 
+import java.util.Set;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.JsonUtils;
 import org.apache.hudi.common.util.Option;
@@ -499,4 +500,18 @@ public class HoodieCommitMetadata implements Serializable {
   public HashSet<String> getWritePartitionPaths() {
     return new HashSet<>(partitionToWriteStats.keySet());
   }
+
+  public Set<String> getWritePartitionPathsWithUpdatedFileGroups() {
+    return getPartitionToWriteStats()
+        .entrySet()
+        .stream()
+        .filter(partitionAndWriteStats -> partitionAndWriteStats
+            .getValue()
+            .stream()
+            .anyMatch(writeStat -> 
!Option.ofNullable(writeStat.getPrevCommit())
+                .orElse(HoodieWriteStat.NULL_COMMIT)
+                .equalsIgnoreCase(HoodieWriteStat.NULL_COMMIT)))
+        .map(partitionAndWriteStats -> partitionAndWriteStats.getKey())
+        .collect(Collectors.toSet());
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
index 7af4f6d70547..0c4781427f8d 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
@@ -44,8 +44,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.convertMetadataToByteArray;
@@ -274,4 +276,106 @@ public class TestHoodieCommitMetadata {
     }
     return schema;
   }
+
+  @Test
+  public void 
testGetWritePartitionPathsWithExistingFileGroupsModified_AllInserts() {
+    // When all partitions have only insert stats (prevCommit is "null"), the 
result should be empty
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+
+    HoodieWriteStat insertStat1 = createWriteStatWithPrevFileId("partition1", 
"file1", "null");
+    HoodieWriteStat insertStat2 = createWriteStatWithPrevFileId("partition2", 
"file2", "null");
+
+    commitMetadata.addWriteStat("partition1", insertStat1);
+    commitMetadata.addWriteStat("partition2", insertStat2);
+
+    Set<String> result = 
commitMetadata.getWritePartitionPathsWithUpdatedFileGroups();
+    assertTrue(result.isEmpty(), "Result should be empty when all stats are 
inserts (prevCommit = 'null')");
+  }
+
+  @Test
+  public void 
testGetWritePartitionPathsWithExistingFileGroupsModified_AllUpdates() {
+    // When all partitions have update stats (prevCommit is a valid commit 
time), all partitions should be returned
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+
+    HoodieWriteStat updateStat1 = createWriteStatWithPrevFileId("partition1", 
"file1", "20240101120000");
+    HoodieWriteStat updateStat2 = createWriteStatWithPrevFileId("partition2", 
"file2", "20240101130000");
+
+    commitMetadata.addWriteStat("partition1", updateStat1);
+    commitMetadata.addWriteStat("partition2", updateStat2);
+
+    Set<String> result = 
commitMetadata.getWritePartitionPathsWithUpdatedFileGroups();
+    Set<String> expected = new HashSet<>(Arrays.asList("partition1", 
"partition2"));
+    assertEquals(expected, result, "Result should contain all partitions with 
updates");
+  }
+
+  @Test
+  public void 
testGetWritePartitionPathsWithExistingFileGroupsModified_MixedInsertsAndUpdates()
 {
+    // When some partitions have inserts and some have updates, only the 
update partitions should be returned
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+
+    HoodieWriteStat insertStat = createWriteStatWithPrevFileId("partition1", 
"file1", "null");
+    HoodieWriteStat updateStat = createWriteStatWithPrevFileId("partition2", 
"file2", "20240101120000");
+
+    commitMetadata.addWriteStat("partition1", insertStat);
+    commitMetadata.addWriteStat("partition2", updateStat);
+
+    Set<String> result = 
commitMetadata.getWritePartitionPathsWithUpdatedFileGroups();
+    Set<String> expected = new HashSet<>(Arrays.asList("partition2"));
+    assertEquals(expected, result, "Result should only contain partitions with 
updates");
+  }
+
+  @Test
+  public void 
testGetWritePartitionPathsWithExistingFileGroupsModified_PartitionWithBothInsertAndUpdate()
 {
+    // When a partition has both insert and update stats, it should be included
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+
+    HoodieWriteStat insertStat = createWriteStatWithPrevFileId("partition1", 
"file1", "null");
+    HoodieWriteStat updateStat = createWriteStatWithPrevFileId("partition1", 
"file2", "20240101120000");
+
+    commitMetadata.addWriteStat("partition1", insertStat);
+    commitMetadata.addWriteStat("partition1", updateStat);
+
+    Set<String> result = 
commitMetadata.getWritePartitionPathsWithUpdatedFileGroups();
+    Set<String> expected = new HashSet<>(Arrays.asList("partition1"));
+    assertEquals(expected, result, "Result should contain partition with at 
least one update");
+  }
+
+  @Test
+  public void 
testGetWritePartitionPathsWithExistingFileGroupsModified_NullPrevCommit() {
+    // When prevCommit is null (not the string "null"), the partition should 
not be included
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+
+    HoodieWriteStat statWithNullPrevCommit = 
createWriteStatWithPrevFileId("partition1", "file1", null);
+    HoodieWriteStat updateStat = createWriteStatWithPrevFileId("partition2", 
"file2", "20240101120000");
+
+    commitMetadata.addWriteStat("partition1", statWithNullPrevCommit);
+    commitMetadata.addWriteStat("partition2", updateStat);
+
+    Set<String> result = 
commitMetadata.getWritePartitionPathsWithUpdatedFileGroups();
+    Set<String> expected = new HashSet<>(Arrays.asList("partition2"));
+    assertEquals(expected, result, "Result should not include partitions where 
prevCommit is null");
+  }
+
+  @Test
+  public void 
testGetWritePartitionPathsWithExistingFileGroupsModified_EmptyMetadata() {
+    // When metadata is empty, result should be empty
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+
+    Set<String> result = 
commitMetadata.getWritePartitionPathsWithUpdatedFileGroups();
+    assertTrue(result.isEmpty(), "Result should be empty for empty metadata");
+  }
+
+  /**
+   * Helper method to create a HoodieWriteStat with specified partition, 
fileId, and prevCommit.
+   */
+  private HoodieWriteStat createWriteStatWithPrevFileId(String partitionPath, 
String fileId, String prevCommit) {
+    HoodieWriteStat writeStat = new HoodieWriteStat();
+    writeStat.setPartitionPath(partitionPath);
+    writeStat.setFileId(fileId);
+    writeStat.setPrevCommit(prevCommit);
+    writeStat.setPath(partitionPath + "/" + fileId + ".parquet");
+    writeStat.setNumWrites(100);
+    writeStat.setNumDeletes(0);
+    return writeStat;
+  }
 }

Reply via email to