This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d13310c6c119 perf(table-services): Incremental clean planning (for
COW) should ignore partitions from instants with only new file groups (#18016)
d13310c6c119 is described below
commit d13310c6c1191a1bf4ed5cc0c9486aa3ab3d4dc5
Author: Krishen <[email protected]>
AuthorDate: Mon Mar 9 18:09:17 2026 -0700
perf(table-services): Incremental clean planning (for COW) should ignore
partitions from instants with only new file groups (#18016)
Summary and Changelog:
Optimizes the incremental clean planner to only target partitions in commit
instants that have modified existing file groups.
Changes:
Added new getWritePartitionPathsWithExistingFileGroupsModified() API for
commit/replacecommit metadata which returns the distinct set of partitions that
the instant left uncleaned files in (due to a updating or replacing file groups)
Updated CleanPlanner#getPartitionsForInstants() to use
getWritePartitionPathsWithExistingFileGroupsModified() instead of returning all
partitions from write stats
Added unit tests for getWritePartitionPathsWithExistingFileGroupsModified()
covering insert-only, update-only, and mixed scenarios
Behavior change: When clean planner incrementally processes instants since
the last earliest-commit-to-retain (ECTR), it now only selects partitions where
file groups were actually updated or replaced. Insert-only operations that
create new file groups in a partition, no longer trigger unnecessary partition
scans during cleaning.
Impact
No public API changes. Internal performance optimization that can reduce
the number of partitions scanned during incremental cleaning. For workloads
with many insert-only commits touching thousands of partitions, this
significantly reduces clean planning overhead.
---------
Co-authored-by: Krishen Bhan <“[email protected]”>
---
.../hudi/table/action/clean/CleanPlanner.java | 13 ++
.../java/org/apache/hudi/table/TestCleaner.java | 199 +++++++++++++++++++++
.../hudi/common/model/HoodieCommitMetadata.java | 15 ++
.../common/model/TestHoodieCommitMetadata.java | 104 +++++++++++
4 files changed, 331 insertions(+)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index d5d5b5ebabe2..772f0a236219 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -32,6 +32,8 @@ import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import
org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV1MigrationHandler;
@@ -241,6 +243,17 @@ public class CleanPlanner<T, I, K, O> implements
Serializable {
} else {
HoodieCommitMetadata commitMetadata =
hoodieTable.getActiveTimeline().readCommitMetadata(instant);
+ WriteOperationType operationType = commitMetadata.getOperationType();
+ if ((WriteOperationType.isUpsert(operationType) ||
WriteOperationType.isInsertWithoutReplace(operationType))
+ && HoodieTimeline.COMMIT_ACTION.equals(instant.getAction())
+ &&
hoodieTable.getMetaClient().getTableType().equals(HoodieTableType.COPY_ON_WRITE))
{
+ // For COW upsert/insert, only check partitions where the write
updated an existing file slice
+ // (leaving behind an older version to clean). Partitions with only
new file slices have nothing to clean yet.
+ return
commitMetadata.getWritePartitionPathsWithUpdatedFileGroups().stream();
+ }
+ // For MOR, small file handling during inserts can cause deltacommits
to create new base files (file slices)
+ // in existing file groups, so their partitions must still be returned.
+ // TODO: filter MOR deltacommit operation types guaranteed to not
create new file slices for existing file groups
return commitMetadata.getPartitionToWriteStats().keySet().stream();
}
} catch (IOException e) {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 34ea53573d88..24fcb656ef1d 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -40,6 +40,7 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
@@ -49,6 +50,7 @@ import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -125,6 +127,7 @@ public class TestCleaner extends HoodieCleanerTestBase {
private static final int BIG_BATCH_INSERT_SIZE = 500;
private static final int PARALLELISM = 10;
+ private static final String BASE_FILE_EXTENSION =
HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
/**
* Helper method to do first batch of insert for clean by versions/commits
tests.
@@ -1326,4 +1329,200 @@ public class TestCleaner extends HoodieCleanerTestBase {
HoodieCleanerPlan cleanPlan =
metaClient.reloadActiveTimeline().readCleanerPlan(metaClient.createNewInstant(State.REQUESTED,
HoodieTimeline.CLEAN_ACTION, cleanInstant.get()));
return Pair.of(cleanInstant.orElse(null), cleanPlan);
}
+
+ /**
+ * Test that incremental clean properly scans partitions from different
operation types on a COW table.
+ * This test verifies that with incremental clean enabled and
KEEP_LATEST_COMMITS policy,
+ * CleanPlanner::getPartitionsForInstants correctly identifies partitions to
clean:
+ * - Partitions with only inserts (new file groups) are NOT included in
clean plan
+ * (because getWritePartitionPathsWithUpdatedFileGroups() returns empty
for insert-only partitions)
+ * - Partitions with upsert (existing file groups modified) ARE included in
clean plan
+ */
+ @Test
+ public void
testIncrementalCleanPartitionScanningWithDifferentOperationTypesCOW() throws
Exception {
+ // Setup: Keep latest 1 commit, incremental clean enabled
+ HoodieWriteConfig config = getConfigBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withIncrementalCleaningMode(true)
+ .withAutoClean(false)
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+ .retainCommits(1)
+ .build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .withAutoArchive(false)
+ .build())
+ .withEmbeddedTimelineServerEnabled(false)
+ .build();
+
+ // Define partitions for different operation types
+ final String partitionInsertOnly = "2020/01/01"; // Insert only - should
NOT be in clean plan
+ final String partitionUpsert = "2020/01/02"; // Upsert - should be in
clean plan
+
+ HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
getMetadataWriter(config), Option.of(context));
+ try {
+ testTable.withPartitionMetaFiles(partitionInsertOnly, partitionUpsert);
+
+ // File IDs for each partition
+ String fileIdUpsert = UUID.randomUUID().toString();
+ String fileIdInsertOnly = UUID.randomUUID().toString();
+
+ // Commit 1: Initial insert to partitionUpsert using the standard
commitWithMdt pattern
+ Map<String, List<String>> part1ToFileId = new HashMap<>();
+ part1ToFileId.put(partitionUpsert,
Collections.singletonList(fileIdUpsert));
+ commitWithMdt("001", part1ToFileId, testTable, config, true, false);
+ testTable = tearDownTestTableAndReinit(testTable, config);
+
+ // Commit 2: Upsert on partitionUpsert (same fileId = update)
+ part1ToFileId = new HashMap<>();
+ part1ToFileId.put(partitionUpsert,
Collections.singletonList(fileIdUpsert));
+ commitWithMdt("002", part1ToFileId, testTable, config, true, false);
+ testTable = tearDownTestTableAndReinit(testTable, config);
+
+ // Commit 3: Insert to partitionInsertOnly (new file group)
+ Map<String, List<String>> part2ToFileId = new HashMap<>();
+ part2ToFileId.put(partitionInsertOnly,
Collections.singletonList(fileIdInsertOnly));
+ commitWithMdt("003", part2ToFileId, testTable, config, true, false);
+ testTable = tearDownTestTableAndReinit(testTable, config);
+
+ // Commit 4: Upsert on partitionUpsert (creates third file version -
this will be retained)
+ part1ToFileId = new HashMap<>();
+ part1ToFileId.put(partitionUpsert,
Collections.singletonList(fileIdUpsert));
+ commitWithMdt("004", part1ToFileId, testTable, config, true, false);
+ testTable = tearDownTestTableAndReinit(testTable, config);
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ // Run cleaner
+ List<HoodieCleanStat> cleanStats = runCleaner(config);
+
+ // Verify: partitionUpsert should have files cleaned,
partitionInsertOnly should NOT
+ // Check that partitionInsertOnly is not in the cleaned partitions (or
has 0 files cleaned)
+ boolean insertOnlyPartitionCleaned = cleanStats.stream()
+ .anyMatch(stat -> stat.getPartitionPath().equals(partitionInsertOnly)
+ && stat.getSuccessDeleteFiles().size() > 0);
+ assertFalse(insertOnlyPartitionCleaned,
+ "Insert-only partition should NOT have files cleaned as it only has
new file groups with no older versions");
+
+ // Check that partitionUpsert IS in the cleaned partitions (has old file
versions to clean)
+ boolean upsertPartitionCleaned = cleanStats.stream()
+ .anyMatch(stat -> stat.getPartitionPath().equals(partitionUpsert)
+ && stat.getSuccessDeleteFiles().size() > 0);
+ assertTrue(upsertPartitionCleaned,
+ "Upsert partition should be in clean plan as it has updated file
groups with older versions");
+ } finally {
+ testTable.close();
+ }
+ }
+
+ /**
+ * Test that incremental clean properly scans partitions from different
operation types on a MOR table.
+ * This test verifies that with incremental clean enabled and
KEEP_LATEST_COMMITS policy,
+ * CleanPlanner::getPartitionsForInstants correctly identifies partitions to
clean:
+ * - Partitions with only delta commits (not part of compaction) are NOT
included in clean plan
+ * (because getPartitionsForInstants returns Stream.empty() for delta
commits in MOR)
+ * - Partitions that are part of compaction ARE included in clean plan
+ */
+ @Test
+ public void
testIncrementalCleanPartitionScanningWithDifferentOperationTypesMOR() throws
Exception {
+ // Initialize MOR table
+ HoodieTestUtils.init(storageConf, basePath, HoodieTableType.MERGE_ON_READ);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ // Setup: Keep latest 1 commit, incremental clean enabled
+ // Disable metadata to avoid issues with file size validation
+ HoodieWriteConfig config = getConfigBuilder()
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withIncrementalCleaningMode(true)
+ .withAutoClean(false)
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+ .retainCommits(1)
+ .build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .withAutoArchive(false)
+ .build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(false)
+ .build())
+ .withEmbeddedTimelineServerEnabled(false)
+ .build();
+
+ // Define partitions for different operation types
+ final String partitionDeltaOnly = "2020/01/01"; // Delta commits only -
NOT part of compaction, should NOT be in clean plan
+ final String partitionCompaction = "2020/01/02"; // Delta commits +
compaction - should be in clean plan
+
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ try {
+ testTable.withPartitionMetaFiles(partitionDeltaOnly,
partitionCompaction);
+
+ // File IDs for each partition
+ String fileIdCompaction = UUID.randomUUID().toString();
+ String fileIdDeltaOnly = UUID.randomUUID().toString();
+
+ // Delta commit 1: Insert to partitionCompaction (creates base file)
+ testTable.addDeltaCommit("001")
+ .withBaseFilesInPartition(partitionCompaction, fileIdCompaction);
+
+ // Delta commit 2: Upsert to partitionCompaction (creates log files)
+ testTable.addDeltaCommit("002")
+ .withLogFile(partitionCompaction, fileIdCompaction, 1);
+
+ // First compaction on partitionCompaction (creates new base file)
+ testTable.addRequestedCompaction("003", new
FileSlice(partitionCompaction, "001", fileIdCompaction));
+ HoodieCommitMetadata compaction1Metadata = new HoodieCommitMetadata();
+ compaction1Metadata.setOperationType(WriteOperationType.COMPACT);
+ HoodieWriteStat statC1 = new HoodieWriteStat();
+ statC1.setPartitionPath(partitionCompaction);
+ statC1.setFileId(fileIdCompaction);
+ statC1.setPath(partitionCompaction + "/" +
FSUtils.makeBaseFileName("003", "0", fileIdCompaction, BASE_FILE_EXTENSION));
+ compaction1Metadata.addWriteStat(partitionCompaction, statC1);
+ testTable.addCompaction("003", compaction1Metadata)
+ .withBaseFilesInPartition(partitionCompaction, fileIdCompaction);
+
+ // Delta commit 3: Upsert to partitionCompaction (creates log files
after compaction)
+ testTable.addDeltaCommit("004")
+ .withLogFile(partitionCompaction, fileIdCompaction, 2);
+
+ // Second compaction on partitionCompaction (creates another base file
version)
+ testTable.addRequestedCompaction("005", new
FileSlice(partitionCompaction, "003", fileIdCompaction));
+ HoodieCommitMetadata compaction2Metadata = new HoodieCommitMetadata();
+ compaction2Metadata.setOperationType(WriteOperationType.COMPACT);
+ HoodieWriteStat statC2 = new HoodieWriteStat();
+ statC2.setPartitionPath(partitionCompaction);
+ statC2.setFileId(fileIdCompaction);
+ statC2.setPath(partitionCompaction + "/" +
FSUtils.makeBaseFileName("005", "0", fileIdCompaction, BASE_FILE_EXTENSION));
+ compaction2Metadata.addWriteStat(partitionCompaction, statC2);
+ testTable.addCompaction("005", compaction2Metadata)
+ .withBaseFilesInPartition(partitionCompaction, fileIdCompaction);
+
+ // Delta commit 4: Insert to partitionDeltaOnly (creates base file -
never compacted)
+ testTable.addDeltaCommit("006")
+ .withBaseFilesInPartition(partitionDeltaOnly, fileIdDeltaOnly);
+
+ // Delta commit 5: Upsert to partitionDeltaOnly (creates log file -
still never compacted)
+ testTable.addDeltaCommit("007")
+ .withLogFile(partitionDeltaOnly, fileIdDeltaOnly, 1);
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ // Run cleaner
+ List<HoodieCleanStat> cleanStats = runCleaner(config);
+
+ // Verify: partitionCompaction should have files cleaned,
partitionDeltaOnly should NOT
+ // Check that partitionDeltaOnly is not in the cleaned partitions (or
has 0 files cleaned)
+ boolean deltaOnlyPartitionCleaned = cleanStats.stream()
+ .anyMatch(stat -> stat.getPartitionPath().equals(partitionDeltaOnly)
+ && stat.getSuccessDeleteFiles().size() > 0);
+ assertFalse(deltaOnlyPartitionCleaned,
+ "Delta-only partition should NOT have files cleaned as it was never
part of any compaction");
+
+ // Check that partitionCompaction IS in the cleaned partitions
+ boolean compactionPartitionCleaned = cleanStats.stream()
+ .anyMatch(stat -> stat.getPartitionPath().equals(partitionCompaction)
+ && stat.getSuccessDeleteFiles().size() > 0);
+ assertTrue(compactionPartitionCleaned,
+ "Compaction partition should be in clean plan as it has old base
files from compaction");
+ } finally {
+ testTable.close();
+ }
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 93bb06077265..51171b83b0e6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.model;
+import java.util.Set;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.JsonUtils;
import org.apache.hudi.common.util.Option;
@@ -499,4 +500,18 @@ public class HoodieCommitMetadata implements Serializable {
public HashSet<String> getWritePartitionPaths() {
return new HashSet<>(partitionToWriteStats.keySet());
}
+
+ public Set<String> getWritePartitionPathsWithUpdatedFileGroups() {
+ return getPartitionToWriteStats()
+ .entrySet()
+ .stream()
+ .filter(partitionAndWriteStats -> partitionAndWriteStats
+ .getValue()
+ .stream()
+ .anyMatch(writeStat ->
!Option.ofNullable(writeStat.getPrevCommit())
+ .orElse(HoodieWriteStat.NULL_COMMIT)
+ .equalsIgnoreCase(HoodieWriteStat.NULL_COMMIT)))
+ .map(partitionAndWriteStats -> partitionAndWriteStats.getKey())
+ .collect(Collectors.toSet());
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
index 7af4f6d70547..0c4781427f8d 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieCommitMetadata.java
@@ -44,8 +44,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.convertMetadataToByteArray;
@@ -274,4 +276,106 @@ public class TestHoodieCommitMetadata {
}
return schema;
}
+
+ @Test
+ public void
testGetWritePartitionPathsWithExistingFileGroupsModified_AllInserts() {
+ // When all partitions have only insert stats (prevCommit is "null"), the
result should be empty
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+
+ HoodieWriteStat insertStat1 = createWriteStatWithPrevFileId("partition1",
"file1", "null");
+ HoodieWriteStat insertStat2 = createWriteStatWithPrevFileId("partition2",
"file2", "null");
+
+ commitMetadata.addWriteStat("partition1", insertStat1);
+ commitMetadata.addWriteStat("partition2", insertStat2);
+
+ Set<String> result =
commitMetadata.getWritePartitionPathsWithUpdatedFileGroups();
+ assertTrue(result.isEmpty(), "Result should be empty when all stats are
inserts (prevCommit = 'null')");
+ }
+
+ @Test
+ public void
testGetWritePartitionPathsWithExistingFileGroupsModified_AllUpdates() {
+ // When all partitions have update stats (prevCommit is a valid commit
time), all partitions should be returned
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+
+ HoodieWriteStat updateStat1 = createWriteStatWithPrevFileId("partition1",
"file1", "20240101120000");
+ HoodieWriteStat updateStat2 = createWriteStatWithPrevFileId("partition2",
"file2", "20240101130000");
+
+ commitMetadata.addWriteStat("partition1", updateStat1);
+ commitMetadata.addWriteStat("partition2", updateStat2);
+
+ Set<String> result =
commitMetadata.getWritePartitionPathsWithUpdatedFileGroups();
+ Set<String> expected = new HashSet<>(Arrays.asList("partition1",
"partition2"));
+ assertEquals(expected, result, "Result should contain all partitions with
updates");
+ }
+
+ @Test
+ public void
testGetWritePartitionPathsWithExistingFileGroupsModified_MixedInsertsAndUpdates()
{
+ // When some partitions have inserts and some have updates, only the
update partitions should be returned
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+
+ HoodieWriteStat insertStat = createWriteStatWithPrevFileId("partition1",
"file1", "null");
+ HoodieWriteStat updateStat = createWriteStatWithPrevFileId("partition2",
"file2", "20240101120000");
+
+ commitMetadata.addWriteStat("partition1", insertStat);
+ commitMetadata.addWriteStat("partition2", updateStat);
+
+ Set<String> result =
commitMetadata.getWritePartitionPathsWithUpdatedFileGroups();
+ Set<String> expected = new HashSet<>(Arrays.asList("partition2"));
+ assertEquals(expected, result, "Result should only contain partitions with
updates");
+ }
+
+ @Test
+ public void
testGetWritePartitionPathsWithExistingFileGroupsModified_PartitionWithBothInsertAndUpdate()
{
+ // When a partition has both insert and update stats, it should be included
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+
+ HoodieWriteStat insertStat = createWriteStatWithPrevFileId("partition1",
"file1", "null");
+ HoodieWriteStat updateStat = createWriteStatWithPrevFileId("partition1",
"file2", "20240101120000");
+
+ commitMetadata.addWriteStat("partition1", insertStat);
+ commitMetadata.addWriteStat("partition1", updateStat);
+
+ Set<String> result =
commitMetadata.getWritePartitionPathsWithUpdatedFileGroups();
+ Set<String> expected = new HashSet<>(Arrays.asList("partition1"));
+ assertEquals(expected, result, "Result should contain partition with at
least one update");
+ }
+
+ @Test
+ public void
testGetWritePartitionPathsWithExistingFileGroupsModified_NullPrevCommit() {
+ // When prevCommit is null (not the string "null"), the partition should
not be included
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+
+ HoodieWriteStat statWithNullPrevCommit =
createWriteStatWithPrevFileId("partition1", "file1", null);
+ HoodieWriteStat updateStat = createWriteStatWithPrevFileId("partition2",
"file2", "20240101120000");
+
+ commitMetadata.addWriteStat("partition1", statWithNullPrevCommit);
+ commitMetadata.addWriteStat("partition2", updateStat);
+
+ Set<String> result =
commitMetadata.getWritePartitionPathsWithUpdatedFileGroups();
+ Set<String> expected = new HashSet<>(Arrays.asList("partition2"));
+ assertEquals(expected, result, "Result should not include partitions where
prevCommit is null");
+ }
+
+ @Test
+ public void
testGetWritePartitionPathsWithExistingFileGroupsModified_EmptyMetadata() {
+ // When metadata is empty, result should be empty
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+
+ Set<String> result =
commitMetadata.getWritePartitionPathsWithUpdatedFileGroups();
+ assertTrue(result.isEmpty(), "Result should be empty for empty metadata");
+ }
+
+ /**
+ * Helper method to create a HoodieWriteStat with specified partition,
fileId, and prevCommit.
+ */
+ private HoodieWriteStat createWriteStatWithPrevFileId(String partitionPath,
String fileId, String prevCommit) {
+ HoodieWriteStat writeStat = new HoodieWriteStat();
+ writeStat.setPartitionPath(partitionPath);
+ writeStat.setFileId(fileId);
+ writeStat.setPrevCommit(prevCommit);
+ writeStat.setPath(partitionPath + "/" + fileId + ".parquet");
+ writeStat.setNumWrites(100);
+ writeStat.setNumDeletes(0);
+ return writeStat;
+ }
}