This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8a137631da8 [HUDI-7518] Fix HoodieMetadataPayload merging logic around
repeated deletes (#10913)
8a137631da8 is described below
commit 8a137631da86c0ee99cf847b317484414ca4b7a2
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Mar 26 19:13:09 2024 -0700
[HUDI-7518] Fix HoodieMetadataPayload merging logic around repeated deletes
(#10913)
---
.../common/testutils/HoodieMetadataTestTable.java | 11 ++
.../functional/TestHoodieBackedTableMetadata.java | 126 ++++++++++++++++++++-
.../hudi/metadata/HoodieMetadataPayload.java | 53 +++++----
.../hudi/common/testutils/HoodieTestTable.java | 13 +++
.../hudi/metadata/TestHoodieMetadataPayload.java | 87 ++++++++++++--
5 files changed, 254 insertions(+), 36 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
index 3bcba72eb68..b1974ff3e4e 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.testutils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -134,6 +135,16 @@ public class HoodieMetadataTestTable extends
HoodieTestTable {
return cleanMetadata;
}
+ @Override
+ public void repeatClean(String cleanCommitTime,
+ HoodieCleanerPlan cleanerPlan,
+ HoodieCleanMetadata cleanMetadata) throws
IOException {
+ super.repeatClean(cleanCommitTime, cleanerPlan, cleanMetadata);
+ if (writer != null) {
+ writer.update(cleanMetadata, cleanCommitTime);
+ }
+ }
+
public HoodieTestTable addCompaction(String instantTime,
HoodieCommitMetadata commitMetadata) throws Exception {
super.addCompaction(instantTime, commitMetadata);
if (writer != null) {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 1a268675ac7..8ca0d4e16a9 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -19,10 +19,14 @@
package org.apache.hudi.client.functional;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
@@ -32,8 +36,12 @@ import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -43,7 +51,6 @@ import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieMetadataLogRecordReader;
import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.hudi.metadata.HoodieTableMetadataKeyGenerator;
-import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
@@ -66,6 +73,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -76,8 +84,12 @@ import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
+import static org.apache.hudi.common.model.WriteOperationType.BULK_INSERT;
+import static org.apache.hudi.common.model.WriteOperationType.COMPACT;
import static org.apache.hudi.common.model.WriteOperationType.INSERT;
import static org.apache.hudi.common.model.WriteOperationType.UPSERT;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.CLEAN_ACTION;
+import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -285,6 +297,112 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
validateMetadata(testTable);
}
+ /**
+ * This tests the case where the two clean actions delete the same file and
commit
+ * to the metadata table. The metadata table should not contain the deleted
file afterwards.
+ * A new cleaner plan may contain the same file to delete if the previous
cleaner
+ * plan has not been successfully executed before the new one is scheduled.
+ */
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void testRepeatedCleanActionsWithMetadataTableEnabled(final
HoodieTableType tableType) throws Exception {
+ initPath();
+ writeConfig = getWriteConfigBuilder(true, true, false)
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withMaxNumDeltaCommitsBeforeCompaction(4)
+ .build())
+ .build();
+ init(tableType, writeConfig);
+ String partition = "p1";
+ // Simulate two bulk insert operations adding two data files in partition
"p1"
+ String instant1 = metaClient.createNewInstantTime();
+ HoodieCommitMetadata commitMetadata1 =
+ testTable.doWriteOperation(instant1, BULK_INSERT, emptyList(),
asList(partition), 1);
+ String instant2 = metaClient.createNewInstantTime();
+ HoodieCommitMetadata commitMetadata2 =
+ testTable.doWriteOperation(instant2, BULK_INSERT, emptyList(),
asList(partition), 1);
+
+ final HoodieTableMetaClient metadataMetaClient =
HoodieTableMetaClient.builder()
+ .setConf(hadoopConf)
+ .setBasePath(metadataTableBasePath)
+ .build();
+ while (getNumCompactions(metadataMetaClient) == 0) {
+ // Write until the compaction happens in the metadata table
+ testTable.doWriteOperation(
+ metaClient.createNewInstantTime(), BULK_INSERT, emptyList(),
asList(partition), 1);
+ metadataMetaClient.reloadActiveTimeline();
+ }
+
+ assertEquals(1, getNumCompactions(metadataMetaClient));
+
+ List<String> fileIdsToReplace = new ArrayList<>();
+
fileIdsToReplace.addAll(commitMetadata1.getFileIdAndRelativePaths().keySet());
+
fileIdsToReplace.addAll(commitMetadata2.getFileIdAndRelativePaths().keySet());
+ // Simulate clustering operation replacing two data files with a new data
file
+ testTable.doCluster(
+ metaClient.createNewInstantTime(),
+ Collections.singletonMap(partition, fileIdsToReplace),
asList(partition), 1);
+ Set<String> fileSetBeforeCleaning = getFilePathsInPartition(partition);
+
+ // Simulate two clean actions deleting the same set of date files
+ // based on the first two commits
+ String cleanInstant = metaClient.createNewInstantTime();
+ HoodieCleanMetadata cleanMetadata =
testTable.doCleanBasedOnCommits(cleanInstant, asList(instant1, instant2));
+ List<String> deleteFileList =
cleanMetadata.getPartitionMetadata().get(partition).getDeletePathPatterns();
+ assertTrue(deleteFileList.size() > 0);
+
+ Set<String> fileSetAfterFirstCleaning = getFilePathsInPartition(partition);
+ validateFilesAfterCleaning(deleteFileList, fileSetBeforeCleaning,
fileSetAfterFirstCleaning);
+
+ metaClient.reloadActiveTimeline();
+ HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(
+ metaClient, new HoodieInstant(HoodieInstant.State.REQUESTED,
CLEAN_ACTION, cleanInstant));
+ testTable.repeatClean(metaClient.createNewInstantTime(), cleanerPlan,
cleanMetadata);
+
+ // Compaction should not happen after the first compaction in this test
case
+ assertEquals(1, getNumCompactions(metadataMetaClient));
+ Set<String> fileSetAfterSecondCleaning =
getFilePathsInPartition(partition);
+ validateFilesAfterCleaning(deleteFileList, fileSetBeforeCleaning,
fileSetAfterSecondCleaning);
+ }
+
+ private int getNumCompactions(HoodieTableMetaClient metaClient) {
+ HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+ return timeline
+ .filter(s -> {
+ try {
+ return s.getAction().equals(HoodieTimeline.COMMIT_ACTION)
+ && HoodieCommitMetadata.fromBytes(
+ timeline.getInstantDetails(s).get(),
HoodieCommitMetadata.class)
+ .getOperationType().equals(COMPACT);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .countInstants();
+ }
+
+ private Set<String> getFilePathsInPartition(String partition) throws
IOException {
+ HoodieBackedTableMetadata tableMetadata = new HoodieBackedTableMetadata(
+ new HoodieLocalEngineContext(hadoopConf),
+ HoodieMetadataConfig.newBuilder().enable(true).build(),
+ basePath);
+ return Arrays.stream(tableMetadata.getAllFilesInPartition(new
Path(basePath, partition)))
+ .map(status -> status.getPath().getName()).collect(Collectors.toSet());
+ }
+
+ private void validateFilesAfterCleaning(List<String> deleteFileList,
+ Set<String> fileSetBeforeCleaning,
+ Set<String> fileSetAfterCleaning) {
+ assertEquals(deleteFileList.size(), fileSetBeforeCleaning.size() -
fileSetAfterCleaning.size());
+ for (String deleteFile : deleteFileList) {
+ assertFalse(fileSetAfterCleaning.contains(deleteFile));
+ }
+ for (String file : fileSetAfterCleaning) {
+ assertTrue(fileSetBeforeCleaning.contains(file));
+ }
+ }
+
/**
* Verify the metadata table log files for the record field correctness. On
disk format
* should be based on meta fields and key deduplication config. And the
in-memory merged
@@ -302,7 +420,7 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
// Compaction should not be triggered yet. Let's verify no base file
// and few log files available.
List<FileSlice> fileSlices = table.getSliceView()
-
.getLatestFileSlices(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList());
+
.getLatestFileSlices(FILES.getPartitionPath()).collect(Collectors.toList());
if (fileSlices.isEmpty()) {
throw new IllegalStateException("LogFile slices are not available!");
}
@@ -377,7 +495,7 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
.withBasePath(metadataMetaClient.getBasePath())
.withLogFilePaths(logFilePaths)
.withLatestInstantTime(latestCommitTimestamp)
- .withPartition(MetadataPartitionType.FILES.getPartitionPath())
+ .withPartition(FILES.getPartitionPath())
.withReaderSchema(schema)
.withMaxMemorySizeInBytes(100000L)
.withBufferSize(4096)
@@ -401,7 +519,7 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
private void verifyMetadataRecordKeyExcludeFromPayloadBaseFiles(HoodieTable
table) throws IOException {
table.getHoodieView().sync();
List<FileSlice> fileSlices = table.getSliceView()
-
.getLatestFileSlices(MetadataPartitionType.FILES.getPartitionPath()).collect(Collectors.toList());
+
.getLatestFileSlices(FILES.getPartitionPath()).collect(Collectors.toList());
if (!fileSlices.get(0).getBaseFile().isPresent()) {
throw new IllegalStateException("Base file not available!");
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 1139d9cdc4f..4d0b8a5d662 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -49,6 +49,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
@@ -99,7 +101,7 @@ import static
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_L
* During compaction on the table, the deletions are merged with additions and
hence records are pruned.
*/
public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadataPayload> {
-
+ private static final Logger LOG =
LoggerFactory.getLogger(HoodieMetadataPayload.class);
/**
* Type of the record. This can be an enum in the schema but Avro1.8
* has a bug - https://issues.apache.org/jira/browse/AVRO-1810
@@ -550,27 +552,34 @@ public class HoodieMetadataPayload implements
HoodieRecordPayload<HoodieMetadata
// - First we merge records from all of the delta
log-files
// - Then we merge records from base-files with the delta
ones (coming as a result
// of the previous step)
- (oldFileInfo, newFileInfo) ->
- // NOTE: We can’t assume that MT update records will be
ordered the same way as actual
- // FS operations (since they are not atomic), therefore
MT record merging should be a
- // _commutative_ & _associative_ operation (ie one that
would work even in case records
- // will get re-ordered), which is
- // - Possible for file-sizes (since file-sizes will
ever grow, we can simply
- // take max of the old and new records)
- // - Not possible for is-deleted flags*
- //
- // *However, we’re assuming that the case of concurrent
write and deletion of the same
- // file is _impossible_ -- it would only be possible
with concurrent upsert and
- // rollback operation (affecting the same log-file),
which is implausible, b/c either
- // of the following have to be true:
- // - We’re appending to failed log-file (then the
other writer is trying to
- // rollback it concurrently, before it’s own write)
- // - Rollback (of completed instant) is running
concurrently with append (meaning
- // that restore is running concurrently with a write,
which is also nut supported
- // currently)
- newFileInfo.getIsDeleted()
- ? null
- : new
HoodieMetadataFileInfo(Math.max(newFileInfo.getSize(), oldFileInfo.getSize()),
false));
+ (oldFileInfo, newFileInfo) -> {
+ // NOTE: We can’t assume that MT update records will be ordered
the same way as actual
+ // FS operations (since they are not atomic), therefore MT
record merging should be a
+ // _commutative_ & _associative_ operation (ie one that
would work even in case records
+ // will get re-ordered), which is
+ // - Possible for file-sizes (since file-sizes will
ever grow, we can simply
+ // take max of the old and new records)
+ // - Not possible for is-deleted flags*
+ //
+ // *However, we’re assuming that the case of concurrent
write and deletion of the same
+ // file is _impossible_ -- it would only be possible with
concurrent upsert and
+ // rollback operation (affecting the same log-file), which
is implausible, b/c either
+ // of the following have to be true:
+ // - We’re appending to failed log-file (then the other
writer is trying to
+ // rollback it concurrently, before it’s own write)
+ // - Rollback (of completed instant) is running
concurrently with append (meaning
+ // that restore is running concurrently with a write,
which is also nut supported
+ // currently)
+ if (newFileInfo.getIsDeleted()) {
+ if (oldFileInfo.getIsDeleted()) {
+ LOG.warn("A file is repeatedly deleted in the files
partition of the metadata table: " + key);
+ return newFileInfo;
+ }
+ return null;
+ }
+ return new HoodieMetadataFileInfo(
+ Math.max(newFileInfo.getSize(), oldFileInfo.getSize()),
false);
+ });
});
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index ca13ff79c52..a23a106b5ca 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -962,6 +962,19 @@ public class HoodieTestTable {
return cleanerMeta.getValue();
}
+ /**
+ * Repeats the same cleaning based on the cleaner plan and clean commit
metadata.
+ *
+ * @param cleanCommitTime new clean commit time to use.
+ * @param cleanerPlan cleaner plan to write to the metadata.
+ * @param cleanMetadata clean metadata in data table to use.
+ */
+ public void repeatClean(String cleanCommitTime,
+ HoodieCleanerPlan cleanerPlan,
+ HoodieCleanMetadata cleanMetadata) throws
IOException {
+ addClean(cleanCommitTime, cleanerPlan, cleanMetadata);
+ }
+
public HoodieCleanMetadata doCleanBasedOnCommits(String cleanCommitTime,
List<String> commitsToClean) throws IOException {
Map<String, Integer> partitionFileCountsToDelete = new HashMap<>();
for (String commitTime : commitsToClean) {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
index cde9341f5cd..941587531a5 100644
---
a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
+++
b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java
@@ -28,6 +28,7 @@ import org.apache.avro.generic.IndexedRecord;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -39,11 +40,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
* Tests {@link HoodieMetadataPayload}.
*/
public class TestHoodieMetadataPayload extends HoodieCommonTestHarness {
+ public static final String PARTITION_NAME = "2022/10/01";
@Test
public void testFileSystemMetadataPayloadMerging() {
- String partitionName = "2022/10/01";
-
Map<String, Long> firstCommitAddedFiles = createImmutableMap(
Pair.of("file1.parquet", 1000L),
Pair.of("file2.parquet", 2000L),
@@ -51,7 +51,7 @@ public class TestHoodieMetadataPayload extends
HoodieCommonTestHarness {
);
HoodieRecord<HoodieMetadataPayload> firstPartitionFilesRecord =
- HoodieMetadataPayload.createPartitionFilesRecord(partitionName,
firstCommitAddedFiles, Collections.emptyList());
+ HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME,
firstCommitAddedFiles, Collections.emptyList());
Map<String, Long> secondCommitAddedFiles = createImmutableMap(
// NOTE: This is an append
@@ -63,13 +63,13 @@ public class TestHoodieMetadataPayload extends
HoodieCommonTestHarness {
List<String> secondCommitDeletedFiles =
Collections.singletonList("file1.parquet");
HoodieRecord<HoodieMetadataPayload> secondPartitionFilesRecord =
- HoodieMetadataPayload.createPartitionFilesRecord(partitionName,
secondCommitAddedFiles, secondCommitDeletedFiles);
+ HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME,
secondCommitAddedFiles, secondCommitDeletedFiles);
HoodieMetadataPayload combinedPartitionFilesRecordPayload =
secondPartitionFilesRecord.getData().preCombine(firstPartitionFilesRecord.getData());
HoodieMetadataPayload expectedCombinedPartitionedFilesRecordPayload =
- HoodieMetadataPayload.createPartitionFilesRecord(partitionName,
+ HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME,
createImmutableMap(
Pair.of("file2.parquet", 2000L),
Pair.of("file3.parquet", 3333L),
@@ -82,9 +82,76 @@ public class TestHoodieMetadataPayload extends
HoodieCommonTestHarness {
assertEquals(expectedCombinedPartitionedFilesRecordPayload,
combinedPartitionFilesRecordPayload);
}
+ @Test
+ public void testFileSystemMetadataPayloadMergingWithDeletions() {
+ Map<String, Long> addedFileMap = createImmutableMap(
+ Pair.of("file1.parquet", 1000L),
+ Pair.of("file2.parquet", 2000L),
+ Pair.of("file3.parquet", 3000L),
+ Pair.of("file4.parquet", 4000L)
+ );
+ HoodieRecord<HoodieMetadataPayload> additionRecord =
+ HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME,
addedFileMap, Collections.emptyList());
+
+ List<String> deletedFileList1 = new ArrayList<>();
+ deletedFileList1.add("file1.parquet");
+ deletedFileList1.add("file3.parquet");
+ HoodieRecord<HoodieMetadataPayload> deletionRecord1 =
+ HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME,
Collections.emptyMap(), deletedFileList1);
+
+ List<String> deletedFileList2 = new ArrayList<>();
+ deletedFileList2.add("file1.parquet");
+ deletedFileList2.add("file4.parquet");
+ HoodieRecord<HoodieMetadataPayload> deletionRecord2 =
+ HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME,
Collections.emptyMap(), deletedFileList2);
+
+ assertEquals(
+ HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME,
+ createImmutableMap(
+ Pair.of("file2.parquet", 2000L),
+ Pair.of("file4.parquet", 4000L)
+ ),
+ Collections.emptyList()
+ ).getData(),
+ deletionRecord1.getData().preCombine(additionRecord.getData())
+ );
+
+ List<String> expectedDeleteFileList = new ArrayList<>();
+ expectedDeleteFileList.add("file1.parquet");
+ expectedDeleteFileList.add("file3.parquet");
+ expectedDeleteFileList.add("file4.parquet");
+
+ assertEquals(
+ HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME,
+ Collections.emptyMap(),
+ expectedDeleteFileList
+ ).getData(),
+ deletionRecord2.getData().preCombine(deletionRecord1.getData())
+ );
+
+ assertEquals(
+ HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME,
+ createImmutableMap(
+ Pair.of("file2.parquet", 2000L)
+ ),
+ Collections.emptyList()
+ ).getData(),
+
deletionRecord2.getData().preCombine(deletionRecord1.getData()).preCombine(additionRecord.getData())
+ );
+
+ assertEquals(
+ HoodieMetadataPayload.createPartitionFilesRecord(PARTITION_NAME,
+ createImmutableMap(
+ Pair.of("file2.parquet", 2000L)
+ ),
+ Collections.singletonList("file1.parquet")
+ ).getData(),
+
deletionRecord2.getData().preCombine(deletionRecord1.getData().preCombine(additionRecord.getData()))
+ );
+ }
+
@Test
public void testColumnStatsPayloadMerging() throws IOException {
- String partitionPath = "2022/10/01";
String fileName = "file.parquet";
String targetColName = "c1";
@@ -92,7 +159,7 @@ public class TestHoodieMetadataPayload extends
HoodieCommonTestHarness {
HoodieColumnRangeMetadata.<Comparable>create(fileName, targetColName,
100, 1000, 5, 1000, 123456, 123456);
HoodieRecord<HoodieMetadataPayload> columnStatsRecord =
- HoodieMetadataPayload.createColumnStatsRecords(partitionPath,
Collections.singletonList(c1Metadata), false)
+ HoodieMetadataPayload.createColumnStatsRecords(PARTITION_NAME,
Collections.singletonList(c1Metadata), false)
.findFirst().get();
////////////////////////////////////////////////////////////////////////
@@ -105,7 +172,7 @@ public class TestHoodieMetadataPayload extends
HoodieCommonTestHarness {
HoodieColumnRangeMetadata.<Comparable>create(fileName, targetColName,
0, 500, 0, 100, 12345, 12345);
HoodieRecord<HoodieMetadataPayload> updatedColumnStatsRecord =
- HoodieMetadataPayload.createColumnStatsRecords(partitionPath,
Collections.singletonList(c1AppendedBlockMetadata), false)
+ HoodieMetadataPayload.createColumnStatsRecords(PARTITION_NAME,
Collections.singletonList(c1AppendedBlockMetadata), false)
.findFirst().get();
HoodieMetadataPayload combinedMetadataPayload =
@@ -115,7 +182,7 @@ public class TestHoodieMetadataPayload extends
HoodieCommonTestHarness {
HoodieColumnRangeMetadata.<Comparable>create(fileName, targetColName,
0, 1000, 5, 1100, 135801, 135801);
HoodieRecord<HoodieMetadataPayload> expectedColumnStatsRecord =
- HoodieMetadataPayload.createColumnStatsRecords(partitionPath,
Collections.singletonList(expectedColumnRangeMetadata), false)
+ HoodieMetadataPayload.createColumnStatsRecords(PARTITION_NAME,
Collections.singletonList(expectedColumnRangeMetadata), false)
.findFirst().get();
// Assert combined payload
@@ -135,7 +202,7 @@ public class TestHoodieMetadataPayload extends
HoodieCommonTestHarness {
HoodieColumnRangeMetadata.<Comparable>stub(fileName, targetColName);
HoodieRecord<HoodieMetadataPayload> deletedColumnStatsRecord =
- HoodieMetadataPayload.createColumnStatsRecords(partitionPath,
Collections.singletonList(c1StubbedMetadata), true)
+ HoodieMetadataPayload.createColumnStatsRecords(PARTITION_NAME,
Collections.singletonList(c1StubbedMetadata), true)
.findFirst().get();
// NOTE: In this case, deleted (or tombstone) record will be therefore
deleting