This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a137631da8 [HUDI-7518] Fix HoodieMetadataPayload merging logic around 
repeated deletes (#10913)
8a137631da8 is described below

commit 8a137631da86c0ee99cf847b317484414ca4b7a2
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Mar 26 19:13:09 2024 -0700

    [HUDI-7518] Fix HoodieMetadataPayload merging logic around repeated deletes 
(#10913)
---
 .../common/testutils/HoodieMetadataTestTable.java  |  11 ++
 .../functional/TestHoodieBackedTableMetadata.java  | 126 ++++++++++++++++++++-
 .../hudi/metadata/HoodieMetadataPayload.java       |  53 +++++----
 .../hudi/common/testutils/HoodieTestTable.java     |  13 +++
 .../hudi/metadata/TestHoodieMetadataPayload.java   |  87 ++++++++++++--
 5 files changed, 254 insertions(+), 36 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
index 3bcba72eb68..b1974ff3e4e 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.testutils;
 
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -134,6 +135,16 @@ public class HoodieMetadataTestTable extends 
HoodieTestTable {
     return cleanMetadata;
   }
 
+  @Override
+  public void repeatClean(String cleanCommitTime,
+                          HoodieCleanerPlan cleanerPlan,
+                          HoodieCleanMetadata cleanMetadata) throws 
IOException {
+    super.repeatClean(cleanCommitTime, cleanerPlan, cleanMetadata);
+    if (writer != null) {
+      writer.update(cleanMetadata, cleanCommitTime);
+    }
+  }
+
   public HoodieTestTable addCompaction(String instantTime, 
HoodieCommitMetadata commitMetadata) throws Exception {
     super.addCompaction(instantTime, commitMetadata);
     if (writer != null) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 1a268675ac7..8ca0d4e16a9 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -19,10 +19,14 @@
 package org.apache.hudi.client.functional;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
@@ -32,8 +36,12 @@ import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -43,7 +51,6 @@ import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
 import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
-import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 
@@ -66,6 +73,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -76,8 +84,12 @@ import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.emptyList;
+import static org.apache.hudi.common.model.WriteOperationType.BULK_INSERT;
+import static org.apache.hudi.common.model.WriteOperationType.COMPACT;
 import static org.apache.hudi.common.model.WriteOperationType.INSERT;
 import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
+import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -285,6 +297,112 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
     validateMetadata(testTable);
   }
 
+  /**
+   * This tests the case where the two clean actions delete the same file and 
commit
+   * to the metadata table. The metadata table should not contain the deleted 
file afterwards.
+   * A new cleaner plan may contain the same file to delete if the previous 
cleaner
+   * plan has not been successfully executed before the new one is scheduled.
+   */
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testRepeatedCleanActionsWithMetadataTableEnabled(final 
HoodieTableType tableType) throws Exception {
+    initPath();
+    writeConfig = getWriteConfigBuilder(true, true, false)
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withMaxNumDeltaCommitsBeforeCompaction(4)
+            .build())
+        .build();
+    init(tableType, writeConfig);
+    String partition = "p1";
+    // Simulate two bulk insert operations adding two data files in partition 
"p1"
+    String instant1 = metaClient.createNewInstantTime();
+    HoodieCommitMetadata commitMetadata1 =
+        testTable.doWriteOperation(instant1, BULK_INSERT, emptyList(), 
asList(partition), 1);
+    String instant2 = metaClient.createNewInstantTime();
+    HoodieCommitMetadata commitMetadata2 =
+        testTable.doWriteOperation(instant2, BULK_INSERT, emptyList(), 
asList(partition), 1);
+
+    final HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder()
+        .setConf(hadoopConf)
+        .setBasePath(metadataTableBasePath)
+        .build();
+    while (getNumCompactions(metadataMetaClient) == 0) {
+      // Write until the compaction happens in the metadata table
+      testTable.doWriteOperation(
+          metaClient.createNewInstantTime(), BULK_INSERT, emptyList(), 
asList(partition), 1);
+      metadataMetaClient.reloadActiveTimeline();
+    }
+
+    assertEquals(1, getNumCompactions(metadataMetaClient));
+
+    List<String> fileIdsToReplace = new ArrayList<>();
+    
fileIdsToReplace.addAll(commitMetadata1.getFileIdAndRelativePaths().keySet());
+    
fileIdsToReplace.addAll(commitMetadata2.getFileIdAndRelativePaths().keySet());
+    // Simulate clustering operation replacing two data files with a new data 
file
+    testTable.doCluster(
+        metaClient.createNewInstantTime(),
+        Collections.singletonMap(partition, fileIdsToReplace), 
asList(partition), 1);
+    Set<String> fileSetBeforeCleaning = getFilePathsInPartition(partition);
+
+    // Simulate two clean actions deleting the same set of date files
+    // based on the first two commits
+    String cleanInstant = metaClient.createNewInstantTime();
+    HoodieCleanMetadata cleanMetadata = 
testTable.doCleanBasedOnCommits(cleanInstant, asList(instant1, instant2));
+    List<String> deleteFileList = 
cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns();
+    assertTrue(deleteFileList.size() > 0);
+
+    Set<String> fileSetAfterFirstCleaning = getFilePathsInPartition(partition);
+    validateFilesAfterCleaning(deleteFileList, fileSetBeforeCleaning, 
fileSetAfterFirstCleaning);
+
+    metaClient.reloadActiveTimeline();
+    HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(
+        metaClient, new HoodieInstant(HoodieInstant.State.REQUESTED, 
CLEAN_ACTION, cleanInstant));
+    testTable.repeatClean(metaClient.createNewInstantTime(), cleanerPlan, 
cleanMetadata);
+
+    // Compaction should not happen after the first compaction in this test 
case
+    assertEquals(1, getNumCompactions(metadataMetaClient));
+    Set<String> fileSetAfterSecondCleaning = 
getFilePathsInPartition(partition);
+    validateFilesAfterCleaning(deleteFileList, fileSetBeforeCleaning, 
fileSetAfterSecondCleaning);
+  }
+
+  private int getNumCompactions(HoodieTableMetaClient metaClient) {
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    return timeline
+        .filter(s -> {
+          try {
+            return s.getAction().equals(HoodieTimeline.COMMIT_ACTION)
+                && HoodieCommitMetadata.fromBytes(
+                    timeline.getInstantDetails(s).get(), 
HoodieCommitMetadata.class)
+                .getOperationType().equals(COMPACT);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        })
+        .countInstants();
+  }
+
+  private Set<String> getFilePathsInPartition(String partition) throws 
IOException {
+    HoodieBackedTableMetadata tableMetadata = new HoodieBackedTableMetadata(
+        new HoodieLocalEngineContext(hadoopConf),
+        HoodieMetadataConfig.newBuilder().enable(true).build(),
+        basePath);
+    return Arrays.stream(tableMetadata.getAllFilesInPartition(new 
Path(basePath, partition)))
+        .map(status -> status.getPath().getName()).collect(Collectors.toSet());
+  }
+
+  private void validateFilesAfterCleaning(List<String> deleteFileList,
+                                          Set<String> fileSetBeforeCleaning,
+                                          Set<String> fileSetAfterCleaning) {
+    assertEquals(deleteFileList.size(), fileSetBeforeCleaning.size() - 
fileSetAfterCleaning.size());
+    for (String deleteFile : deleteFileList) {
+      assertFalse(fileSetAfterCleaning.contains(deleteFile));
+    }
+    for (String file : fileSetAfterCleaning) {
+      assertTrue(fileSetBeforeCleaning.contains(file));
+    }
+  }
+
   /**
    * Verify the metadata table log files for the record field correctness. On 
disk format
    * should be based on meta fields and key deduplication config. And the 
in-memory merged
@@ -302,7 +420,7 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
     // Compaction should not be triggered yet. Let's verify no base file
     // and few log files available.
     List<FileSlice> fileSlices = table.getSliceView()
-        
.getLatestFileSlices(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList());
+        
.getLatestFileSlices(FILES.getPartitionPath()).collect(Collectors.toList());
     if (fileSlices.isEmpty()) {
       throw new IllegalStateException("LogFile slices are not available!");
     }
@@ -377,7 +495,7 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
         .withBasePath(metadataMetaClient.getBasePath())
         .withLogFilePaths(logFilePaths)
         .withLatestInstantTime(latestCommitTimestamp)
-        .withPartition(MetadataPartitionType.FILES.getPartitionPath())
+        .withPartition(FILES.getPartitionPath())
         .withReaderSchema(schema)
         .withMaxMemorySizeInBytes(100000L)
         .withBufferSize(4096)
@@ -401,7 +519,7 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
   private void verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(HoodieTable 
table) throws IOException {
     table.getHoodieView().sync();
     List<FileSlice> fileSlices = table.getSliceView()
-        
.getLatestFileSlices(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList());
+        
.getLatestFileSlices(FILES.getPartitionPath()).collect(Collectors.toList());
     if (!fileSlices.get(0).getBaseFile().isPresent()) {
       throw new IllegalStateException("Base file not available!");
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 1139d9cdc4f..4d0b8a5d662 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -99,7 +101,7 @@ import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_L
  * During compaction on the table, the deletions are merged with additions and 
hence records are pruned.
  */
 public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadataPayload> {
-
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMetadataPayload.class);
   /**
    * Type of the record. This can be an enum in the schema but Avro1.8
    * has a bug - https://issues.apache.org/jira/browse/AVRO-1810
@@ -550,27 +552,34 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
             //          - First we merge records from all of the delta 
log-files
             //          - Then we merge records from base-files with the delta 
ones (coming as a result
             //          of the previous step)
-            (oldFileInfo, newFileInfo) ->
-                // NOTE: We can’t assume that MT update records will be 
ordered the same way as actual
-                //       FS operations (since they are not atomic), therefore 
MT record merging should be a
-                //       _commutative_ & _associative_ operation (ie one that 
would work even in case records
-                //       will get re-ordered), which is
-                //          - Possible for file-sizes (since file-sizes will 
ever grow, we can simply
-                //          take max of the old and new records)
-                //          - Not possible for is-deleted flags*
-                //
-                //       *However, we’re assuming that the case of concurrent 
write and deletion of the same
-                //       file is _impossible_ -- it would only be possible 
with concurrent upsert and
-                //       rollback operation (affecting the same log-file), 
which is implausible, b/c either
-                //       of the following have to be true:
-                //          - We’re appending to failed log-file (then the 
other writer is trying to
-                //          rollback it concurrently, before it’s own write)
-                //          - Rollback (of completed instant) is running 
concurrently with append (meaning
-                //          that restore is running concurrently with a write, 
which is also nut supported
-                //          currently)
-                newFileInfo.getIsDeleted()
-                    ? null
-                    : new 
HoodieMetadataFileInfo(Math.max(newFileInfo.getSize(), oldFileInfo.getSize()), 
false));
+            (oldFileInfo, newFileInfo) -> {
+              // NOTE: We can’t assume that MT update records will be ordered 
the same way as actual
+              //       FS operations (since they are not atomic), therefore MT 
record merging should be a
+              //       _commutative_ & _associative_ operation (ie one that 
would work even in case records
+              //       will get re-ordered), which is
+              //          - Possible for file-sizes (since file-sizes will 
ever grow, we can simply
+              //          take max of the old and new records)
+              //          - Not possible for is-deleted flags*
+              //
+              //       *However, we’re assuming that the case of concurrent 
write and deletion of the same
+              //       file is _impossible_ -- it would only be possible with 
concurrent upsert and
+              //       rollback operation (affecting the same log-file), which 
is implausible, b/c either
+              //       of the following have to be true:
+              //          - We’re appending to failed log-file (then the other 
writer is trying to
+              //          rollback it concurrently, before it’s own write)
+              //          - Rollback (of completed instant) is running 
concurrently with append (meaning
+              //          that restore is running concurrently with a write, 
which is also nut supported
+              //          currently)
+              if (newFileInfo.getIsDeleted()) {
+                if (oldFileInfo.getIsDeleted()) {
+                  LOG.warn("A file is repeatedly deleted in the files 
partition of the metadata table: " + key);
+                  return newFileInfo;
+                }
+                return null;
+              }
+              return new HoodieMetadataFileInfo(
+                  Math.max(newFileInfo.getSize(), oldFileInfo.getSize()), 
false);
+            });
       });
     }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index ca13ff79c52..a23a106b5ca 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -962,6 +962,19 @@ public class HoodieTestTable {
     return cleanerMeta.getValue();
   }
 
+  /**
+   * Repeats the same cleaning based on the cleaner plan and clean commit 
metadata.
+   *
+   * @param cleanCommitTime new clean commit time to use.
+   * @param cleanerPlan     cleaner plan to write to the metadata.
+   * @param cleanMetadata   clean metadata in data table to use.
+   */
+  public void repeatClean(String cleanCommitTime,
+                          HoodieCleanerPlan cleanerPlan,
+                          HoodieCleanMetadata cleanMetadata) throws 
IOException {
+    addClean(cleanCommitTime, cleanerPlan, cleanMetadata);
+  }
+
   public HoodieCleanMetadata doCleanBasedOnCommits(String cleanCommitTime, 
List<String> commitsToClean) throws IOException {
     Map<String, Integer> partitionFileCountsToDelete = new HashMap<>();
     for (String commitTime : commitsToClean) {
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
index cde9341f5cd..941587531a5 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
@@ -28,6 +28,7 @@ import org.apache.avro.generic.IndexedRecord;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -39,11 +40,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  * Tests {@link HoodieMetadataPayload}.
  */
 public class TestHoodieMetadataPayload extends HoodieCommonTestHarness {
+  public static final String PARTITION_NAME = "2022/10/01";
 
   @Test
   public void testFileSystemMetadataPayloadMerging() {
-    String partitionName = "2022/10/01";
-
     Map<String, Long> firstCommitAddedFiles = createImmutableMap(
         Pair.of("file1.parquet", 1000L),
         Pair.of("file2.parquet", 2000L),
@@ -51,7 +51,7 @@ public class TestHoodieMetadataPayload extends 
HoodieCommonTestHarness {
     );
 
     HoodieRecord<HoodieMetadataPayload> firstPartitionFilesRecord =
-        HoodieMetadataPayload.createPartitionFilesRecord(partitionName, 
firstCommitAddedFiles, Collections.emptyList());
+        HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME, 
firstCommitAddedFiles, Collections.emptyList());
 
     Map<String, Long> secondCommitAddedFiles = createImmutableMap(
         // NOTE: This is an append
@@ -63,13 +63,13 @@ public class TestHoodieMetadataPayload extends 
HoodieCommonTestHarness {
     List<String> secondCommitDeletedFiles = 
Collections.singletonList("file1.parquet");
 
     HoodieRecord<HoodieMetadataPayload> secondPartitionFilesRecord =
-        HoodieMetadataPayload.createPartitionFilesRecord(partitionName, 
secondCommitAddedFiles, secondCommitDeletedFiles);
+        HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME, 
secondCommitAddedFiles, secondCommitDeletedFiles);
 
     HoodieMetadataPayload combinedPartitionFilesRecordPayload =
         
secondPartitionFilesRecord.getData().preCombine(firstPartitionFilesRecord.getData());
 
     HoodieMetadataPayload expectedCombinedPartitionedFilesRecordPayload =
-        HoodieMetadataPayload.createPartitionFilesRecord(partitionName,
+        HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME,
             createImmutableMap(
                 Pair.of("file2.parquet", 2000L),
                 Pair.of("file3.parquet", 3333L),
@@ -82,9 +82,76 @@ public class TestHoodieMetadataPayload extends 
HoodieCommonTestHarness {
     assertEquals(expectedCombinedPartitionedFilesRecordPayload, 
combinedPartitionFilesRecordPayload);
   }
 
+  @Test
+  public void testFileSystemMetadataPayloadMergingWithDeletions() {
+    Map<String, Long> addedFileMap = createImmutableMap(
+        Pair.of("file1.parquet", 1000L),
+        Pair.of("file2.parquet", 2000L),
+        Pair.of("file3.parquet", 3000L),
+        Pair.of("file4.parquet", 4000L)
+    );
+    HoodieRecord<HoodieMetadataPayload> additionRecord =
+        HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME, 
addedFileMap, Collections.emptyList());
+
+    List<String> deletedFileList1 = new ArrayList<>();
+    deletedFileList1.add("file1.parquet");
+    deletedFileList1.add("file3.parquet");
+    HoodieRecord<HoodieMetadataPayload> deletionRecord1 =
+        HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME, 
Collections.emptyMap(), deletedFileList1);
+
+    List<String> deletedFileList2 = new ArrayList<>();
+    deletedFileList2.add("file1.parquet");
+    deletedFileList2.add("file4.parquet");
+    HoodieRecord<HoodieMetadataPayload> deletionRecord2 =
+        HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME, 
Collections.emptyMap(), deletedFileList2);
+
+    assertEquals(
+        HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME,
+            createImmutableMap(
+                Pair.of("file2.parquet", 2000L),
+                Pair.of("file4.parquet", 4000L)
+            ),
+            Collections.emptyList()
+        ).getData(),
+        deletionRecord1.getData().preCombine(additionRecord.getData())
+    );
+
+    List<String> expectedDeleteFileList = new ArrayList<>();
+    expectedDeleteFileList.add("file1.parquet");
+    expectedDeleteFileList.add("file3.parquet");
+    expectedDeleteFileList.add("file4.parquet");
+    
+    assertEquals(
+        HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME,
+            Collections.emptyMap(),
+            expectedDeleteFileList
+        ).getData(),
+        deletionRecord2.getData().preCombine(deletionRecord1.getData())
+    );
+
+    assertEquals(
+        HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME,
+            createImmutableMap(
+                Pair.of("file2.parquet", 2000L)
+            ),
+            Collections.emptyList()
+        ).getData(),
+        
deletionRecord2.getData().preCombine(deletionRecord1.getData()).preCombine(additionRecord.getData())
+    );
+
+    assertEquals(
+        HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME,
+            createImmutableMap(
+                Pair.of("file2.parquet", 2000L)
+            ),
+            Collections.singletonList("file1.parquet")
+        ).getData(),
+        
deletionRecord2.getData().preCombine(deletionRecord1.getData().preCombine(additionRecord.getData()))
+    );
+  }
+
   @Test
   public void testColumnStatsPayloadMerging() throws IOException {
-    String partitionPath = "2022/10/01";
     String fileName = "file.parquet";
     String targetColName = "c1";
 
@@ -92,7 +159,7 @@ public class TestHoodieMetadataPayload extends 
HoodieCommonTestHarness {
         HoodieColumnRangeMetadata.<Comparable>create(fileName, targetColName, 
100, 1000, 5, 1000, 123456, 123456);
 
     HoodieRecord<HoodieMetadataPayload> columnStatsRecord =
-        HoodieMetadataPayload.createColumnStatsRecords(partitionPath, 
Collections.singletonList(c1Metadata), false)
+        HoodieMetadataPayload.createColumnStatsRecords(PARTITION_NAME, 
Collections.singletonList(c1Metadata), false)
             .findFirst().get();
 
     ////////////////////////////////////////////////////////////////////////
@@ -105,7 +172,7 @@ public class TestHoodieMetadataPayload extends 
HoodieCommonTestHarness {
         HoodieColumnRangeMetadata.<Comparable>create(fileName, targetColName, 
0, 500, 0, 100, 12345, 12345);
 
     HoodieRecord<HoodieMetadataPayload> updatedColumnStatsRecord =
-        HoodieMetadataPayload.createColumnStatsRecords(partitionPath, 
Collections.singletonList(c1AppendedBlockMetadata), false)
+        HoodieMetadataPayload.createColumnStatsRecords(PARTITION_NAME, 
Collections.singletonList(c1AppendedBlockMetadata), false)
             .findFirst().get();
 
     HoodieMetadataPayload combinedMetadataPayload =
@@ -115,7 +182,7 @@ public class TestHoodieMetadataPayload extends 
HoodieCommonTestHarness {
         HoodieColumnRangeMetadata.<Comparable>create(fileName, targetColName, 
0, 1000, 5, 1100, 135801, 135801);
 
     HoodieRecord<HoodieMetadataPayload> expectedColumnStatsRecord =
-        HoodieMetadataPayload.createColumnStatsRecords(partitionPath, 
Collections.singletonList(expectedColumnRangeMetadata), false)
+        HoodieMetadataPayload.createColumnStatsRecords(PARTITION_NAME, 
Collections.singletonList(expectedColumnRangeMetadata), false)
             .findFirst().get();
 
     // Assert combined payload
@@ -135,7 +202,7 @@ public class TestHoodieMetadataPayload extends 
HoodieCommonTestHarness {
         HoodieColumnRangeMetadata.<Comparable>stub(fileName, targetColName);
 
     HoodieRecord<HoodieMetadataPayload> deletedColumnStatsRecord =
-        HoodieMetadataPayload.createColumnStatsRecords(partitionPath, 
Collections.singletonList(c1StubbedMetadata), true)
+        HoodieMetadataPayload.createColumnStatsRecords(PARTITION_NAME, 
Collections.singletonList(c1StubbedMetadata), true)
             .findFirst().get();
 
     // NOTE: In this case, deleted (or tombstone) record will be therefore 
deleting

Reply via email to