This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3789840be3d [HUDI-7921] Fixing file system view closures in MDT
(#11496)
3789840be3d is described below
commit 3789840be3d041cbcfc6b24786740210e4e6d6ac
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Jul 10 19:25:41 2024 -0700
[HUDI-7921] Fixing file system view closures in MDT (#11496)
---
.../metadata/HoodieBackedTableMetadataWriter.java | 55 ++--
.../common/testutils/HoodieMetadataTestTable.java | 6 +
.../java/org/apache/hudi/table/TestCleaner.java | 326 +++++++++++----------
.../table/functional/TestCleanPlanExecutor.java | 325 ++++++++++----------
.../hudi/testutils/HoodieCleanerTestBase.java | 31 +-
.../hudi/metadata/HoodieBackedTableMetadata.java | 4 +
.../hudi/metadata/HoodieTableMetadataUtil.java | 48 +--
.../hudi/common/testutils/HoodieTestTable.java | 8 +-
8 files changed, 440 insertions(+), 363 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 89d21e79b22..c38a68e37cf 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1081,9 +1081,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
private HoodieData<HoodieRecord>
getFunctionalIndexUpdates(HoodieCommitMetadata commitMetadata, String
indexPartition, String instantTime) throws Exception {
HoodieIndexDefinition indexDefinition =
getFunctionalIndexDefinition(indexPartition);
List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
- HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemView(dataMetaClient);
commitMetadata.getPartitionToWriteStats().forEach((dataPartition, value)
-> {
- List<FileSlice> fileSlices =
getPartitionLatestFileSlicesIncludingInflight(dataMetaClient,
Option.ofNullable(fsView), dataPartition);
+ List<FileSlice> fileSlices =
getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, Option.empty(),
dataPartition);
fileSlices.forEach(fileSlice -> {
// Filter log files for the instant time and add to this partition
fileSlice pairs
List<HoodieLogFile> logFilesForInstant = fileSlice.getLogFiles()
@@ -1411,35 +1410,35 @@ public abstract class
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
HoodieData<HoodieRecord>> partitionRecordsMap) {
// The result set
HoodieData<HoodieRecord> allPartitionRecords =
engineContext.emptyHoodieData();
+ try (HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemView(metadataMetaClient)) {
+ for (Map.Entry<MetadataPartitionType, HoodieData<HoodieRecord>> entry :
partitionRecordsMap.entrySet()) {
+ final String partitionName =
HoodieIndexUtils.getPartitionNameFromPartitionType(entry.getKey(),
dataMetaClient, dataWriteConfig.getIndexingConfig().getIndexName());
+ HoodieData<HoodieRecord> records = entry.getValue();
+
+ List<FileSlice> fileSlices =
+
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
Option.ofNullable(fsView), partitionName);
+ if (fileSlices.isEmpty()) {
+ // scheduling of INDEX only initializes the file group and not add
commit
+ // so if there are no committed file slices, look for inflight slices
+ fileSlices =
getPartitionLatestFileSlicesIncludingInflight(metadataMetaClient,
Option.ofNullable(fsView), partitionName);
+ }
+ final int fileGroupCount = fileSlices.size();
+ ValidationUtils.checkArgument(fileGroupCount > 0,
String.format("FileGroup count for MDT partition %s should be >0",
partitionName));
+
+ List<FileSlice> finalFileSlices = fileSlices;
+ HoodieData<HoodieRecord> rddSinglePartitionRecords = records.map(r -> {
+ FileSlice slice =
finalFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
+ fileGroupCount));
+ r.unseal();
+ r.setCurrentLocation(new
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
+ r.seal();
+ return r;
+ });
- HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemView(metadataMetaClient);
- for (Map.Entry<MetadataPartitionType, HoodieData<HoodieRecord>> entry :
partitionRecordsMap.entrySet()) {
- final String partitionName =
HoodieIndexUtils.getPartitionNameFromPartitionType(entry.getKey(),
dataMetaClient, dataWriteConfig.getIndexingConfig().getIndexName());
- HoodieData<HoodieRecord> records = entry.getValue();
-
- List<FileSlice> fileSlices =
-
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
Option.ofNullable(fsView), partitionName);
- if (fileSlices.isEmpty()) {
- // scheduling of INDEX only initializes the file group and not add
commit
- // so if there are no committed file slices, look for inflight slices
- fileSlices =
getPartitionLatestFileSlicesIncludingInflight(metadataMetaClient,
Option.ofNullable(fsView), partitionName);
+ allPartitionRecords =
allPartitionRecords.union(rddSinglePartitionRecords);
}
- final int fileGroupCount = fileSlices.size();
- ValidationUtils.checkArgument(fileGroupCount > 0,
String.format("FileGroup count for MDT partition %s should be >0",
partitionName));
-
- List<FileSlice> finalFileSlices = fileSlices;
- HoodieData<HoodieRecord> rddSinglePartitionRecords = records.map(r -> {
- FileSlice slice =
finalFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
- fileGroupCount));
- r.unseal();
- r.setCurrentLocation(new
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
- r.seal();
- return r;
- });
-
- allPartitionRecords =
allPartitionRecords.union(rddSinglePartitionRecords);
+ return allPartitionRecords;
}
- return allPartitionRecords;
}
/**
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
index 15230b8cb96..57b987d46b1 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
@@ -181,4 +181,10 @@ public class HoodieMetadataTestTable extends
HoodieTestTable {
return this;
}
+ @Override
+ public void close() throws Exception {
+ if (writer != null) {
+ this.writer.close();
+ }
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 49d5ccb7667..fd74d9f652b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -278,24 +278,24 @@ public class TestCleaner extends HoodieCleanerTestBase {
@Test
public void testEarliestInstantToRetainForPendingCompaction() throws
IOException {
HoodieWriteConfig writeConfig = getConfigBuilder().withPath(basePath)
- .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
- .withEnableBackupForRemoteFileSystemView(false)
- .build())
- .withCleanConfig(HoodieCleanConfig.newBuilder()
- .withAutoClean(false)
-
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
- .retainCommits(1)
- .build())
- .withCompactionConfig(HoodieCompactionConfig.newBuilder()
- .withInlineCompaction(false)
- .withMaxNumDeltaCommitsBeforeCompaction(1)
- .compactionSmallFileSize(1024 * 1024 * 1024)
- .build())
- .withArchivalConfig(HoodieArchivalConfig.newBuilder()
- .withAutoArchive(false)
- .archiveCommitsWith(2,3)
- .build())
- .withEmbeddedTimelineServerEnabled(false).build();
+ .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
+ .withEnableBackupForRemoteFileSystemView(false)
+ .build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withAutoClean(false)
+ .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+ .retainCommits(1)
+ .build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(1)
+ .compactionSmallFileSize(1024 * 1024 * 1024)
+ .build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .withAutoArchive(false)
+ .archiveCommitsWith(2, 3)
+ .build())
+ .withEmbeddedTimelineServerEnabled(false).build();
HoodieTestUtils.init(storageConf, basePath, HoodieTableType.MERGE_ON_READ);
@@ -322,7 +322,7 @@ public class TestCleaner extends HoodieCleanerTestBase {
Option<HoodieCleanerPlan> cleanPlan = table.scheduleCleaning(context,
instantTime, Option.empty());
assertEquals(cleanPlan.get().getFilePathsToBeDeletedPerPartition().get(partition1).size(),
1);
assertEquals(earliestInstantToRetain,
cleanPlan.get().getEarliestInstantToRetain().getTimestamp(),
- "clean until " + earliestInstantToRetain);
+ "clean until " + earliestInstantToRetain);
table.getMetaClient().reloadActiveTimeline();
table.clean(context, instantTime);
@@ -357,7 +357,7 @@ public class TestCleaner extends HoodieCleanerTestBase {
// earliest commit to retain should be earlier than first pending
compaction in incremental cleaning scenarios.
instantTime = client.createNewInstantTime();
cleanPlan = table.scheduleCleaning(context, instantTime, Option.empty());
-
assertEquals(earliestInstantToRetain,cleanPlan.get().getEarliestInstantToRetain().getTimestamp());
+ assertEquals(earliestInstantToRetain,
cleanPlan.get().getEarliestInstantToRetain().getTimestamp());
}
}
@@ -570,10 +570,11 @@ public class TestCleaner extends HoodieCleanerTestBase {
int instantClean = startInstant;
HoodieTestTable testTable = HoodieTestTable.of(metaClient);
- try (HoodieTableMetadataWriter metadataWriter =
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
+
+ try {
for (int i = 0; i < commitCount; i++, startInstant++) {
String commitTime = HoodieTestTable.makeNewCommitTime(startInstant,
"%09d");
- commitWithMdt(commitTime, Collections.emptyMap(), testTable,
metadataWriter);
+ commitWithMdt(commitTime, Collections.emptyMap(), testTable, config);
}
List<HoodieCleanStat> cleanStats = runCleaner(config);
@@ -601,6 +602,8 @@ public class TestCleaner extends HoodieCleanerTestBase {
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants().countInstants());
assertFalse(timeline.getTimelineOfActions(
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().containsInstant(HoodieTestTable.makeNewCommitTime(--instantClean,
"%09d")));
+ } finally {
+ testTable.close();
}
}
@@ -614,88 +617,87 @@ public class TestCleaner extends HoodieCleanerTestBase {
.retainCommits(2).build())
.build();
- try (HoodieTableMetadataWriter metadataWriter =
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
- HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
metadataWriter, Option.of(context));
- String p0 = "2020/01/01";
- String p1 = "2020/01/02";
-
- // make 1 commit, with 1 file per partition
- String file1P0C0 = UUID.randomUUID().toString();
- String file1P1C0 = UUID.randomUUID().toString();
- Map<String, List<String>> part1ToFileId =
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
- {
- put(p0, CollectionUtils.createImmutableList(file1P0C0));
- put(p1, CollectionUtils.createImmutableList(file1P1C0));
- }
- });
- commitWithMdt("00000000000001", part1ToFileId, testTable,
metadataWriter, true, true);
- metaClient = HoodieTableMetaClient.reload(metaClient);
-
- List<HoodieCleanStat> hoodieCleanStatsOne =
runCleanerWithInstantFormat(config, true);
- assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any
partitions and clean any files");
- assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
- assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
-
- // make next replacecommit, with 1 clustering operation. logically
delete p0. No change to p1
- // notice that clustering generates empty inflight commit files
- Map<String, String> partitionAndFileId002 =
testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0);
- String file2P0C1 = partitionAndFileId002.get(p0);
- Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata>
replaceMetadata =
- generateReplaceCommitMetadata("00000000000002", p0, file1P0C0,
file2P0C1);
- testTable.addReplaceCommit("00000000000002",
Option.of(replaceMetadata.getKey()), Option.empty(),
replaceMetadata.getValue());
-
- // run cleaner
- List<HoodieCleanStat> hoodieCleanStatsTwo =
runCleanerWithInstantFormat(config, true);
- assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any
partitions and clean any files");
- assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
- assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
- assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
-
- // make next replacecommit, with 1 clustering operation. Replace data in
p1. No change to p0
- // notice that clustering generates empty inflight commit files
- Map<String, String> partitionAndFileId003 =
testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1);
- String file3P1C2 = partitionAndFileId003.get(p1);
- replaceMetadata = generateReplaceCommitMetadata("00000000000003", p1,
file1P1C0, file3P1C2);
- testTable.addReplaceCommit("00000000000003",
Option.of(replaceMetadata.getKey()), Option.empty(),
replaceMetadata.getValue());
-
- // run cleaner
- List<HoodieCleanStat> hoodieCleanStatsThree =
runCleanerWithInstantFormat(config, true);
- assertEquals(0, hoodieCleanStatsThree.size(), "Must not scan any
partitions and clean any files");
- assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
- assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
- assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
- assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
-
- // make next replacecommit, with 1 clustering operation. Replace data in
p0 again
- // notice that clustering generates empty inflight commit files
- Map<String, String> partitionAndFileId004 =
testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0);
- String file4P0C3 = partitionAndFileId004.get(p0);
- replaceMetadata = generateReplaceCommitMetadata("00000000000004", p0,
file2P0C1, file4P0C3);
- testTable.addReplaceCommit("00000000000004",
Option.of(replaceMetadata.getKey()), Option.empty(),
replaceMetadata.getValue());
-
- // run cleaner
- List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config, 5, true);
- assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
- assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
- assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
- assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
- //file1P1C0 still stays because its not replaced until 3 and its the
only version available
- assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
-
- // make next replacecommit, with 1 clustering operation. Replace all
data in p1. no new files created
- // notice that clustering generates empty inflight commit files
- Map<String, String> partitionAndFileId005 =
testTable.forReplaceCommit("00000000000006").getFileIdsWithBaseFilesInPartitions(p1);
- String file4P1C4 = partitionAndFileId005.get(p1);
- replaceMetadata = generateReplaceCommitMetadata("00000000000006", p0,
file3P1C2, file4P1C4);
- testTable.addReplaceCommit("00000000000006",
Option.of(replaceMetadata.getKey()), Option.empty(),
replaceMetadata.getValue());
+ HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
getMetadataWriter(config), Option.of(context));
+ String p0 = "2020/01/01";
+ String p1 = "2020/01/02";
+
+ // make 1 commit, with 1 file per partition
+ String file1P0C0 = UUID.randomUUID().toString();
+ String file1P1C0 = UUID.randomUUID().toString();
+ Map<String, List<String>> part1ToFileId = Collections.unmodifiableMap(new
HashMap<String, List<String>>() {
+ {
+ put(p0, CollectionUtils.createImmutableList(file1P0C0));
+ put(p1, CollectionUtils.createImmutableList(file1P1C0));
+ }
+ });
+ commitWithMdt("00000000000001", part1ToFileId, testTable, config, true,
true);
+ testTable = tearDownTestTableAndReinit(testTable, config);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
- List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, 7, true);
- assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
- assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
- assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
- assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
- assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
- }
+ List<HoodieCleanStat> hoodieCleanStatsOne =
runCleanerWithInstantFormat(config, true);
+ assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions
and clean any files");
+ assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+ assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+ // make next replacecommit, with 1 clustering operation. logically delete
p0. No change to p1
+ // notice that clustering generates empty inflight commit files
+ Map<String, String> partitionAndFileId002 =
testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0);
+ String file2P0C1 = partitionAndFileId002.get(p0);
+ Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata>
replaceMetadata =
+ generateReplaceCommitMetadata("00000000000002", p0, file1P0C0,
file2P0C1);
+ testTable.addReplaceCommit("00000000000002",
Option.of(replaceMetadata.getKey()), Option.empty(),
replaceMetadata.getValue());
+
+ // run cleaner
+ List<HoodieCleanStat> hoodieCleanStatsTwo =
runCleanerWithInstantFormat(config, true);
+ assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions
and clean any files");
+ assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+ assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+ assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+ // make next replacecommit, with 1 clustering operation. Replace data in
p1. No change to p0
+ // notice that clustering generates empty inflight commit files
+ Map<String, String> partitionAndFileId003 =
testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1);
+ String file3P1C2 = partitionAndFileId003.get(p1);
+ replaceMetadata = generateReplaceCommitMetadata("00000000000003", p1,
file1P1C0, file3P1C2);
+ testTable.addReplaceCommit("00000000000003",
Option.of(replaceMetadata.getKey()), Option.empty(),
replaceMetadata.getValue());
+
+ // run cleaner
+ List<HoodieCleanStat> hoodieCleanStatsThree =
runCleanerWithInstantFormat(config, true);
+ assertEquals(0, hoodieCleanStatsThree.size(), "Must not scan any
partitions and clean any files");
+ assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+ assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+ assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
+ assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+ // make next replacecommit, with 1 clustering operation. Replace data in
p0 again
+ // notice that clustering generates empty inflight commit files
+ Map<String, String> partitionAndFileId004 =
testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0);
+ String file4P0C3 = partitionAndFileId004.get(p0);
+ replaceMetadata = generateReplaceCommitMetadata("00000000000004", p0,
file2P0C1, file4P0C3);
+ testTable.addReplaceCommit("00000000000004",
Option.of(replaceMetadata.getKey()), Option.empty(),
replaceMetadata.getValue());
+
+ // run cleaner
+ List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config, 5, true);
+ assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
+ assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+ assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
+ assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+ //file1P1C0 still stays because its not replaced until 3 and its the only
version available
+ assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+ // make next replacecommit, with 1 clustering operation. Replace all data
in p1. no new files created
+ // notice that clustering generates empty inflight commit files
+ Map<String, String> partitionAndFileId005 =
testTable.forReplaceCommit("00000000000006").getFileIdsWithBaseFilesInPartitions(p1);
+ String file4P1C4 = partitionAndFileId005.get(p1);
+ replaceMetadata = generateReplaceCommitMetadata("00000000000006", p0,
file3P1C2, file4P1C4);
+ testTable.addReplaceCommit("00000000000006",
Option.of(replaceMetadata.getKey()), Option.empty(),
replaceMetadata.getValue());
+
+ List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, 7, true);
+ assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
+ assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+ assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
+ assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+ assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
}
private Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata>
generateReplaceCommitMetadata(
@@ -1040,8 +1042,8 @@ public class TestCleaner extends HoodieCleanerTestBase {
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
.build();
- try (HoodieTableMetadataWriter metadataWriter =
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
- HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
metadataWriter, Option.of(context));
+ HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
getMetadataWriter(config), Option.of(context));
+ try {
String p0 = "2020/01/01";
String p1 = "2020/01/02";
@@ -1054,7 +1056,8 @@ public class TestCleaner extends HoodieCleanerTestBase {
put(p1, CollectionUtils.createImmutableList(file1P1C0));
}
});
- commitWithMdt("00000000000001", part1ToFileId, testTable,
metadataWriter, true, true);
+ commitWithMdt("00000000000001", part1ToFileId, testTable, config, true,
true);
+ testTable = tearDownTestTableAndReinit(testTable, config);
metaClient = HoodieTableMetaClient.reload(metaClient);
// make next replacecommit, with 1 clustering operation. logically
delete p0. No change to p1
@@ -1087,6 +1090,8 @@ public class TestCleaner extends HoodieCleanerTestBase {
assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
//file1P1C0 still stays because its not replaced until 3 and its the
only version available
assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+ } finally {
+ testTable.close();
}
}
@@ -1110,10 +1115,10 @@ public class TestCleaner extends HoodieCleanerTestBase {
.withMarkersType(MarkerType.DIRECT.name())
.withPath(basePath)
.build();
- try (HoodieTableMetadataWriter metadataWriter =
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
- // reload because table configs could have been updated
- metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
metadataWriter, Option.of(context));
+ // reload because table configs could have been updated
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
getMetadataWriter(config), Option.of(context));
+ try {
String p1 = "part_1";
String p2 = "part_2";
@@ -1127,9 +1132,9 @@ public class TestCleaner extends HoodieCleanerTestBase {
put(p1, CollectionUtils.createImmutableList(file1P1, file2P1));
}
});
- commitWithMdt("10", part1ToFileId, testTable, metadataWriter);
+ commitWithMdt("10", part1ToFileId, testTable, config);
testTable.addClean("15");
- commitWithMdt("20", part1ToFileId, testTable, metadataWriter);
+ commitWithMdt("20", part1ToFileId, testTable, config);
// add clean instant
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new
HoodieActionInstant("", "", ""),
@@ -1146,8 +1151,10 @@ public class TestCleaner extends HoodieCleanerTestBase {
put(p2, CollectionUtils.createImmutableList(file3P2, file4P2));
}
});
- commitWithMdt("30", part2ToFileId, testTable, metadataWriter);
- commitWithMdt("40", part2ToFileId, testTable, metadataWriter);
+ commitWithMdt("30", part2ToFileId, testTable, config);
+ testTable = tearDownTestTableAndReinit(testTable, config);
+ commitWithMdt("40", part2ToFileId, testTable, config);
+ testTable = tearDownTestTableAndReinit(testTable, config);
// empty commits
String file5P2 = UUID.randomUUID().toString();
@@ -1157,8 +1164,10 @@ public class TestCleaner extends HoodieCleanerTestBase {
put(p2, CollectionUtils.createImmutableList(file5P2, file6P2));
}
});
- commitWithMdt("50", part2ToFileId, testTable, metadataWriter);
- commitWithMdt("60", part2ToFileId, testTable, metadataWriter);
+ commitWithMdt("50", part2ToFileId, testTable, config);
+ testTable = tearDownTestTableAndReinit(testTable, config);
+ commitWithMdt("60", part2ToFileId, testTable, config);
+ testTable = tearDownTestTableAndReinit(testTable, config);
// archive commit 1, 2
new HoodieTimelineArchiver<>(config, HoodieSparkTable.create(config,
context, metaClient))
@@ -1176,6 +1185,8 @@ public class TestCleaner extends HoodieCleanerTestBase {
assertTrue(testTable.baseFileExists(p1, "20", file2P1), "Latest
FileSlice exists");
assertTrue(testTable.baseFileExists(p2, "40", file3P2), "Latest
FileSlice exists");
assertTrue(testTable.baseFileExists(p2, "40", file4P2), "Latest
FileSlice exists");
+ } finally {
+ testTable.close();
}
}
@@ -1190,37 +1201,36 @@ public class TestCleaner extends HoodieCleanerTestBase {
HoodieTableMetaClient metaClient =
HoodieTestUtils.init(storageConf, basePath,
HoodieTableType.MERGE_ON_READ);
- try (HoodieTableMetadataWriter metadataWriter =
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
-
- final String partition = "2016/03/15";
- String timePrefix = "00000000000";
- Map<String, String> expFileIdToPendingCompaction = new HashMap<String,
String>() {
- {
- put("fileId2", timePrefix + "004");
- put("fileId3", timePrefix + "006");
- put("fileId4", timePrefix + "008");
- put("fileId5", timePrefix + "010");
- }
- };
- Map<String, String> fileIdToLatestInstantBeforeCompaction = new
HashMap<String, String>() {
- {
- put("fileId1", timePrefix + "000");
- put("fileId2", timePrefix + "000");
- put("fileId3", timePrefix + "001");
- put("fileId4", timePrefix + "003");
- put("fileId5", timePrefix + "005");
- put("fileId6", timePrefix + "009");
- put("fileId7", timePrefix + "013");
- }
- };
+ final String partition = "2016/03/15";
+ String timePrefix = "00000000000";
+ Map<String, String> expFileIdToPendingCompaction = new HashMap<String,
String>() {
+ {
+ put("fileId2", timePrefix + "004");
+ put("fileId3", timePrefix + "006");
+ put("fileId4", timePrefix + "008");
+ put("fileId5", timePrefix + "010");
+ }
+ };
+ Map<String, String> fileIdToLatestInstantBeforeCompaction = new
HashMap<String, String>() {
+ {
+ put("fileId1", timePrefix + "000");
+ put("fileId2", timePrefix + "000");
+ put("fileId3", timePrefix + "001");
+ put("fileId4", timePrefix + "003");
+ put("fileId5", timePrefix + "005");
+ put("fileId6", timePrefix + "009");
+ put("fileId7", timePrefix + "013");
+ }
+ };
- // Generate 7 file-groups. First one has only one slice and no pending
compaction. File Slices (2 - 5) has
- // multiple versions with pending compaction. File Slices (6 - 7) have
multiple file-slices but not under
- // compactions
- // FileIds 2-5 will be under compaction
- // reload because table configs could have been updated
- metaClient = HoodieTableMetaClient.reload(metaClient);
- HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ // Generate 7 file-groups. First one has only one slice and no pending
compaction. File Slices (2 - 5) has
+ // multiple versions with pending compaction. File Slices (6 - 7) have
multiple file-slices but not under
+ // compactions
+ // FileIds 2-5 will be under compaction
+ // reload because table configs could have been updated
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ try {
testTable.withPartitionMetaFiles(partition);
@@ -1236,56 +1246,56 @@ public class TestCleaner extends HoodieCleanerTestBase {
Map<String, List<String>> part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file1P1, file2P1, file3P1,
file4P1, file5P1, file6P1, file7P1));
// all 7 fileIds
- commitWithMdt(timePrefix + "000", part1ToFileId, testTable,
metadataWriter, true, true);
+ commitWithMdt(timePrefix + "000", part1ToFileId, testTable, config,
true, true);
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file3P1, file4P1, file5P1,
file6P1, file7P1));
// fileIds 3 to 7
- commitWithMdt(timePrefix + "001", part1ToFileId, testTable,
metadataWriter, true, true);
+ commitWithMdt(timePrefix + "001", part1ToFileId, testTable, config,
true, true);
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file4P1, file5P1, file6P1,
file7P1));
// fileIds 4 to 7
- commitWithMdt(timePrefix + "003", part1ToFileId, testTable,
metadataWriter, true, true);
+ commitWithMdt(timePrefix + "003", part1ToFileId, testTable, config,
true, true);
// add compaction
testTable.addRequestedCompaction(timePrefix + "004", new
FileSlice(partition, timePrefix + "000", file2P1));
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file2P1));
- commitWithMdt(timePrefix + "005", part1ToFileId, testTable,
metadataWriter, false, true);
+ commitWithMdt(timePrefix + "005", part1ToFileId, testTable, config,
false, true);
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file5P1, file6P1, file7P1));
- commitWithMdt(timePrefix + "0055", part1ToFileId, testTable,
metadataWriter, true, true);
+ commitWithMdt(timePrefix + "0055", part1ToFileId, testTable, config,
true, true);
testTable.addRequestedCompaction(timePrefix + "006", new
FileSlice(partition, timePrefix + "001", file3P1));
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file3P1));
- commitWithMdt(timePrefix + "007", part1ToFileId, testTable,
metadataWriter, false, true);
+ commitWithMdt(timePrefix + "007", part1ToFileId, testTable, config,
false, true);
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file6P1, file7P1));
- commitWithMdt(timePrefix + "0075", part1ToFileId, testTable,
metadataWriter, true, true);
+ commitWithMdt(timePrefix + "0075", part1ToFileId, testTable, config,
true, true);
testTable.addRequestedCompaction(timePrefix + "008", new
FileSlice(partition, timePrefix + "003", file4P1));
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file4P1));
- commitWithMdt(timePrefix + "009", part1ToFileId, testTable,
metadataWriter, false, true);
+ commitWithMdt(timePrefix + "009", part1ToFileId, testTable, config,
false, true);
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file6P1, file7P1));
- commitWithMdt(timePrefix + "0095", part1ToFileId, testTable,
metadataWriter, true, true);
+ commitWithMdt(timePrefix + "0095", part1ToFileId, testTable, config,
true, true);
testTable.addRequestedCompaction(timePrefix + "010", new
FileSlice(partition, timePrefix + "005", file5P1));
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file5P1));
- commitWithMdt(timePrefix + "011", part1ToFileId, testTable,
metadataWriter, false, true);
+ commitWithMdt(timePrefix + "011", part1ToFileId, testTable, config,
false, true);
part1ToFileId = new HashMap<>();
part1ToFileId.put(partition, Arrays.asList(file7P1));
- commitWithMdt(timePrefix + "013", part1ToFileId, testTable,
metadataWriter, true, true);
+ commitWithMdt(timePrefix + "013", part1ToFileId, testTable, config,
true, true);
// Clean now
metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -1325,6 +1335,8 @@ public class TestCleaner extends HoodieCleanerTestBase {
assertEquals(expNumFilesDeleted, numDeleted, "Correct number of files
deleted");
assertEquals(expNumFilesUnderCompactionDeleted,
numFilesUnderCompactionDeleted,
"Correct number of files under compaction deleted");
+ } finally {
+ testTable.close();
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
index 92bb04513d3..4ed5cad00ae 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
@@ -39,8 +39,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.metadata.HoodieTableMetadataWriter;
-import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.testutils.HoodieCleanerTestBase;
import org.junit.jupiter.api.Test;
@@ -118,137 +116,140 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
.withMaxCommitsBeforeCleaning(2)
.build()).build();
- try (HoodieTableMetadataWriter metadataWriter =
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
- HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
metadataWriter, Option.of(context));
- String p0 = "2020/01/01";
- String p1 = "2020/01/02";
- Map<String, List<BootstrapFileMapping>> bootstrapMapping =
enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null;
-
- // make 1 commit, with 1 file per partition
- String file1P0C0 = enableBootstrapSourceClean ?
bootstrapMapping.get(p0).get(0).getFileId()
- : UUID.randomUUID().toString();
- String file1P1C0 = enableBootstrapSourceClean ?
bootstrapMapping.get(p1).get(0).getFileId()
- : UUID.randomUUID().toString();
- Map<String, List<String>> part1ToFileId =
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
- {
- put(p0, CollectionUtils.createImmutableList(file1P0C0));
- put(p1, CollectionUtils.createImmutableList(file1P1C0));
- }
- });
- commitWithMdt("00000000000001", part1ToFileId, testTable,
metadataWriter, true, true);
- metaClient = HoodieTableMetaClient.reload(metaClient);
-
- List<HoodieCleanStat> hoodieCleanStatsOne =
- runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 2,
true);
- assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any
partitions and clean any files");
- assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
- assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
-
- // make next commit, with 1 insert & 1 update per partition
- Map<String, String> partitionAndFileId002 =
testTable.addInflightCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p0,
p1);
- String file2P0C1 = partitionAndFileId002.get(p0);
- String file2P1C1 = partitionAndFileId002.get(p1);
- Map<String, List<String>> part2ToFileId =
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
- {
- put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
- put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
- }
- });
- commitWithMdt("00000000000003", part2ToFileId, testTable,
metadataWriter, true, true);
- metaClient = HoodieTableMetaClient.reload(metaClient);
-
- List<HoodieCleanStat> hoodieCleanStatsTwo =
- runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 4,
true);
- assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any
partitions and clean any files");
- assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
- assertTrue(testTable.baseFileExists(p1, "00000000000003", file2P1C1));
- assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
- assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
-
- // make next commit, with 2 updates to existing files, and 1 insert
- String file3P0C2 =
testTable.addInflightCommit("00000000000005").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
- Map<String, List<String>> part3ToFileId =
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
- {
- put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1,
file3P0C2));
- }
- });
- commitWithMdt("00000000000005", part3ToFileId, testTable,
metadataWriter, true, true);
- metaClient = HoodieTableMetaClient.reload(metaClient);
-
- List<HoodieCleanStat> hoodieCleanStatsThree =
- runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 6,
true);
- assertEquals(0, hoodieCleanStatsThree.size(),
- "Must not clean any file. We have to keep 1 version before the
latest commit time to keep");
- assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
-
- // make next commit, with 2 updates to existing files, and 1 insert
- String file4P0C3 =
testTable.addInflightCommit("00000000000007").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
- Map<String, List<String>> part4ToFileId =
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
- {
- put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1,
file4P0C3));
- }
- });
- commitWithMdt("00000000000007", part4ToFileId, testTable,
metadataWriter);
- metaClient = HoodieTableMetaClient.reload(metaClient);
-
- List<HoodieCleanStat> hoodieCleanStatsFour =
- runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 8,
true);
- // enableBootstrapSourceClean would delete the bootstrap base file as
the same time
- HoodieCleanStat partitionCleanStat = getCleanStat(hoodieCleanStatsFour,
p0);
-
- assertEquals(3, partitionCleanStat.getSuccessDeleteFiles().size());
- assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
- assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0));
- assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0));
- assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
- assertTrue(testTable.baseFileExists(p0, "00000000000005", file2P0C1));
- assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
- assertTrue(testTable.baseFileExists(p0, "00000000000007", file4P0C3));
- if (enableBootstrapSourceClean) {
- assertEquals(1,
partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size());
- assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
- p0).get(0).getBootstrapFileStatus().getPath().getUri())));
+ HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
getMetadataWriter(config), Option.of(context));
+ String p0 = "2020/01/01";
+ String p1 = "2020/01/02";
+ Map<String, List<BootstrapFileMapping>> bootstrapMapping =
enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null;
+
+ // make 1 commit, with 1 file per partition
+ String file1P0C0 = enableBootstrapSourceClean ?
bootstrapMapping.get(p0).get(0).getFileId()
+ : UUID.randomUUID().toString();
+ String file1P1C0 = enableBootstrapSourceClean ?
bootstrapMapping.get(p1).get(0).getFileId()
+ : UUID.randomUUID().toString();
+ Map<String, List<String>> part1ToFileId = Collections.unmodifiableMap(new
HashMap<String, List<String>>() {
+ {
+ put(p0, CollectionUtils.createImmutableList(file1P0C0));
+ put(p1, CollectionUtils.createImmutableList(file1P1C0));
}
+ });
+ commitWithMdt("00000000000001", part1ToFileId, testTable, config, true,
true);
+ testTable = tearDownTestTableAndReinit(testTable, config);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ List<HoodieCleanStat> hoodieCleanStatsOne =
+ runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 2,
true);
+ assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions
and clean any files");
+ assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+ assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+ // make next commit, with 1 insert & 1 update per partition
+ Map<String, String> partitionAndFileId002 =
testTable.addInflightCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p0,
p1);
+ String file2P0C1 = partitionAndFileId002.get(p0);
+ String file2P1C1 = partitionAndFileId002.get(p1);
+ Map<String, List<String>> part2ToFileId = Collections.unmodifiableMap(new
HashMap<String, List<String>>() {
+ {
+ put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
+ put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
+ }
+ });
+ commitWithMdt("00000000000003", part2ToFileId, testTable, config, true,
true);
+ testTable = tearDownTestTableAndReinit(testTable, config);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ List<HoodieCleanStat> hoodieCleanStatsTwo =
+ runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 4,
true);
+ assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions
and clean any files");
+ assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
+ assertTrue(testTable.baseFileExists(p1, "00000000000003", file2P1C1));
+ assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+ assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+ // make next commit, with 2 updates to existing files, and 1 insert
+ String file3P0C2 =
testTable.addInflightCommit("00000000000005").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
+ Map<String, List<String>> part3ToFileId = Collections.unmodifiableMap(new
HashMap<String, List<String>>() {
+ {
+ put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1,
file3P0C2));
+ }
+ });
+ commitWithMdt("00000000000005", part3ToFileId, testTable, config, true,
true);
+ testTable = tearDownTestTableAndReinit(testTable, config);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ List<HoodieCleanStat> hoodieCleanStatsThree =
+ runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 6,
true);
+ assertEquals(0, hoodieCleanStatsThree.size(),
+ "Must not clean any file. We have to keep 1 version before the latest
commit time to keep");
+ assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+
+ // make next commit, with 2 updates to existing files, and 1 insert
+ String file4P0C3 =
testTable.addInflightCommit("00000000000007").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
+ Map<String, List<String>> part4ToFileId = Collections.unmodifiableMap(new
HashMap<String, List<String>>() {
+ {
+ put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1,
file4P0C3));
+ }
+ });
+ commitWithMdt("00000000000007", part4ToFileId, testTable, config);
+ testTable = tearDownTestTableAndReinit(testTable, config);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ List<HoodieCleanStat> hoodieCleanStatsFour =
+ runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 8,
true);
+ // enableBootstrapSourceClean would delete the bootstrap base file as the
same time
+ HoodieCleanStat partitionCleanStat = getCleanStat(hoodieCleanStatsFour,
p0);
+
+ assertEquals(3, partitionCleanStat.getSuccessDeleteFiles().size());
+ assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+ assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0));
+ assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0));
+ assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
+ assertTrue(testTable.baseFileExists(p0, "00000000000005", file2P0C1));
+ assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
+ assertTrue(testTable.baseFileExists(p0, "00000000000007", file4P0C3));
+ if (enableBootstrapSourceClean) {
+ assertEquals(1,
partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size());
+ assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
+ p0).get(0).getBootstrapFileStatus().getPath().getUri())));
+ }
- metaClient = HoodieTableMetaClient.reload(metaClient);
-
- String file5P0C4 =
testTable.addInflightCommit("00000000000009").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
- Map<String, List<String>> part5ToFileId =
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
- {
- put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1,
file5P0C4));
- }
- });
- commitWithMdt("00000000000009", part5ToFileId, testTable,
metadataWriter, true, true);
- metaClient = HoodieTableMetaClient.reload(metaClient);
-
- List<HoodieCleanStat> hoodieCleanStatsFive =
- runCleaner(config, simulateFailureRetry, simulateMetadataFailure,
10, true);
-
- assertEquals(0, hoodieCleanStatsFive.size(), "Must not clean any files
since at least 2 commits are needed from last clean operation before "
- + "clean can be scheduled again");
- assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0));
- assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0));
- assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
- assertTrue(testTable.baseFileExists(p0, "00000000000005", file2P0C1));
- assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
- assertTrue(testTable.baseFileExists(p0, "00000000000007", file4P0C3));
+ metaClient = HoodieTableMetaClient.reload(metaClient);
- // No cleaning on partially written file, with no commit.
- testTable.forCommit("00000000000011").withBaseFilesInPartition(p0,
file3P0C2);
- HoodieCommitMetadata commitMetadata =
generateCommitMetadata("00000000000011", Collections.singletonMap(p0,
- CollectionUtils.createImmutableList(file3P0C2)));
- metaClient.getActiveTimeline().createNewInstant(
- new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.COMMIT_ACTION, "00000000000011"));
- metaClient.getActiveTimeline().transitionRequestedToInflight(
- new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.COMMIT_ACTION, "00000000000011"),
- serializeCommitMetadata(commitMetadata));
- List<HoodieCleanStat> hoodieCleanStatsFive2 =
- runCleaner(config, simulateFailureRetry, simulateMetadataFailure,
12, true);
- HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive2, p0);
- assertNull(cleanStat, "Must not clean any files");
- assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
- assertTrue(testTable.baseFileExists(p0, "00000000000007", file4P0C3));
- }
+ String file5P0C4 =
testTable.addInflightCommit("00000000000009").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
+ Map<String, List<String>> part5ToFileId = Collections.unmodifiableMap(new
HashMap<String, List<String>>() {
+ {
+ put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1,
file5P0C4));
+ }
+ });
+ commitWithMdt("00000000000009", part5ToFileId, testTable, config, true,
true);
+ testTable = tearDownTestTableAndReinit(testTable, config);
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
+ List<HoodieCleanStat> hoodieCleanStatsFive =
+ runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 10,
true);
+
+ assertEquals(0, hoodieCleanStatsFive.size(), "Must not clean any files
since at least 2 commits are needed from last clean operation before "
+ + "clean can be scheduled again");
+ assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0));
+ assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0));
+ assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
+ assertTrue(testTable.baseFileExists(p0, "00000000000005", file2P0C1));
+ assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
+ assertTrue(testTable.baseFileExists(p0, "00000000000007", file4P0C3));
+
+ // No cleaning on partially written file, with no commit.
+ testTable.forCommit("00000000000011").withBaseFilesInPartition(p0,
file3P0C2);
+ HoodieCommitMetadata commitMetadata =
generateCommitMetadata("00000000000011", Collections.singletonMap(p0,
+ CollectionUtils.createImmutableList(file3P0C2)));
+ metaClient.getActiveTimeline().createNewInstant(
+ new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.COMMIT_ACTION, "00000000000011"));
+ metaClient.getActiveTimeline().transitionRequestedToInflight(
+ new HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieTimeline.COMMIT_ACTION, "00000000000011"),
+ serializeCommitMetadata(commitMetadata));
+ List<HoodieCleanStat> hoodieCleanStatsFive2 =
+ runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 12,
true);
+ HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive2, p0);
+ assertNull(cleanStat, "Must not clean any files");
+ assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
+ assertTrue(testTable.baseFileExists(p0, "00000000000007", file4P0C3));
}
/**
@@ -263,9 +264,9 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.build();
- try (HoodieTableMetadataWriter metadataWriter =
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
- HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
metadataWriter, Option.of(context));
+ HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
getMetadataWriter(config), Option.of(context));
+ try {
final String p0 = "2020/01/01";
final String p1 = "2020/01/02";
@@ -331,6 +332,8 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files");
assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
+ } finally {
+ testTable.close();
}
}
@@ -345,9 +348,9 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
.build();
- try (HoodieTableMetadataWriter metadataWriter =
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
- HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
metadataWriter, Option.of(context));
+ HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
getMetadataWriter(config), Option.of(context));
+ try {
final String p0 = "2020/01/01";
final String p1 = "2020/01/02";
final Map<String, List<BootstrapFileMapping>> bootstrapMapping =
generateBootstrapIndexAndSourceData(p0, p1);
@@ -431,6 +434,8 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files");
assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
+ } finally {
+ testTable.close();
}
}
@@ -451,8 +456,8 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
.build()).build();
HoodieTableMetaClient metaClient = HoodieTestUtils.init(storageConf,
basePath, HoodieTableType.MERGE_ON_READ);
- try (HoodieTableMetadataWriter metadataWriter =
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
- HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
getMetadataWriter(config), Option.of(context));
+ try {
String p0 = "2020/01/01";
// Make 3 files, one base file and 2 log files associated with base file
String file1P0 =
testTable.addDeltaCommit("000").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
@@ -461,13 +466,15 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
put(p0, CollectionUtils.createImmutableList(file1P0));
}
});
- commitWithMdt("000", part1ToFileId, testTable, metadataWriter, true,
true);
+ commitWithMdt("000", part1ToFileId, testTable, config, true, true);
+ testTable = tearDownTestTableAndReinit(testTable, config);
// Make 2 files, one base file and 1 log files associated with base file
testTable.addDeltaCommit("001")
.withBaseFilesInPartition(p0, file1P0).getLeft()
.withLogFile(p0, file1P0, 3);
- commitWithMdt("001", part1ToFileId, testTable, metadataWriter, true,
true);
+ commitWithMdt("001", part1ToFileId, testTable, config, true, true);
+ testTable = tearDownTestTableAndReinit(testTable, config);
List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);
assertEquals(3,
@@ -477,6 +484,8 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2));
assertTrue(testTable.baseFileExists(p0, "001", file1P0));
assertTrue(testTable.logFileExists(p0, "001", file1P0, 3));
+ } finally {
+ testTable.close();
}
}
@@ -496,8 +505,8 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
.build();
HoodieTableMetaClient metaClient = HoodieTestUtils.init(storageConf,
basePath, HoodieTableType.MERGE_ON_READ);
- try (HoodieTableMetadataWriter metadataWriter =
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
- HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+ HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
getMetadataWriter(config), Option.of(context));
+ try {
String p0 = "2020/01/01";
// Make 3 files, one base file and 2 log files associated with base file
String file1P0 =
testTable.addDeltaCommit("000").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
@@ -506,19 +515,22 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
put(p0, CollectionUtils.createImmutableList(file1P0));
}
});
- commitWithMdt("000", part1ToFileId, testTable, metadataWriter, true,
true);
+ commitWithMdt("000", part1ToFileId, testTable, config, true, true);
+ testTable = tearDownTestTableAndReinit(testTable, config);
// Make 2 files, one base file and 1 log files associated with base file
testTable.addDeltaCommit("001")
.withBaseFilesInPartition(p0, file1P0).getLeft()
.withLogFile(p0, file1P0, 3);
- commitWithMdt("001", part1ToFileId, testTable, metadataWriter, true,
true);
+ commitWithMdt("001", part1ToFileId, testTable, config, true, true);
+ testTable = tearDownTestTableAndReinit(testTable, config);
// Make 2 files, one base file and 1 log files associated with base file
testTable.addDeltaCommit("002")
.withBaseFilesInPartition(p0, file1P0).getLeft()
.withLogFile(p0, file1P0, 4);
- commitWithMdt("002", part1ToFileId, testTable, metadataWriter, true,
true);
+ commitWithMdt("002", part1ToFileId, testTable, config, true, true);
+ testTable = tearDownTestTableAndReinit(testTable, config);
List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);
assertEquals(3,
@@ -530,6 +542,8 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
assertTrue(testTable.logFileExists(p0, "001", file1P0, 3));
assertTrue(testTable.baseFileExists(p0, "002", file1P0));
assertTrue(testTable.logFileExists(p0, "002", file1P0, 4));
+ } finally {
+ testTable.close();
}
}
@@ -586,8 +600,8 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
String file1P2 = UUID.randomUUID().toString();
String file2P2 = UUID.randomUUID().toString();
- try (HoodieTableMetadataWriter metadataWriter =
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
- HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
metadataWriter, Option.of(context));
+ HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
getMetadataWriter(config), Option.of(context));
+ try {
testTable.withPartitionMetaFiles(p1, p2);
Map<String, List<String>> part1ToFileId =
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
{
@@ -595,9 +609,11 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
put(p2, CollectionUtils.createImmutableList(file1P2, file2P2));
}
});
- commitWithMdt(commitInstant, part1ToFileId, testTable, metadataWriter,
true, true);
+ commitWithMdt(commitInstant, part1ToFileId, testTable, config, true,
true);
+ testTable = tearDownTestTableAndReinit(testTable, config);
testTable.addDeletePartitionCommit(deleteInstant1, p1,
Arrays.asList(file1P1, file2P1));
+ testTable = tearDownTestTableAndReinit(testTable, config);
testTable.addDeletePartitionCommit(deleteInstant2, p2,
Arrays.asList(file1P2, file2P2));
runCleaner(config);
@@ -613,6 +629,8 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
assertTrue(testTable.baseFileExists(p2, commitInstant, file1P2), "p2
retained");
assertTrue(testTable.baseFileExists(p2, commitInstant, file2P2), "p2
retained");
}
+ } finally {
+ testTable.close();
}
}
@@ -634,8 +652,8 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
.build())
.build();
- try (HoodieTableMetadataWriter metadataWriter =
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
- HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
metadataWriter, Option.of(context));
+ HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient,
getMetadataWriter(config), Option.of(context));
+ try {
String p0 = "2020/01/01";
String p1 = "2020/01/02";
Map<String, List<BootstrapFileMapping>> bootstrapMapping =
enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null;
@@ -654,7 +672,8 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
put(p1, CollectionUtils.createImmutableList(file1P1C0));
}
});
- commitWithMdt(firstCommitTs, part1ToFileId, testTable, metadataWriter,
true, true);
+ commitWithMdt(firstCommitTs, part1ToFileId, testTable, config, true,
true);
+ testTable = tearDownTestTableAndReinit(testTable, config);
metaClient = HoodieTableMetaClient.reload(metaClient);
List<HoodieCleanStat> hoodieCleanStatsOne =
@@ -675,7 +694,8 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
}
});
- commitWithMdt(secondCommitTs, part2ToFileId, testTable, metadataWriter,
true, true);
+ commitWithMdt(secondCommitTs, part2ToFileId, testTable, config, true,
true);
+ testTable = tearDownTestTableAndReinit(testTable, config);
metaClient = HoodieTableMetaClient.reload(metaClient);
// make next commit, with 1 insert per partition
@@ -690,7 +710,8 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1,
file3P1C1));
}
});
- commitWithMdt(thirdCommitTs, part3ToFileId, testTable, metadataWriter,
true, true);
+ commitWithMdt(thirdCommitTs, part3ToFileId, testTable, config, true,
true);
+ testTable = tearDownTestTableAndReinit(testTable, config);
metaClient = HoodieTableMetaClient.reload(metaClient);
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config,
simulateFailureRetry, simulateMetadataFailure);
@@ -705,6 +726,8 @@ public class TestCleanPlanExecutor extends
HoodieCleanerTestBase {
assertTrue(testTable.baseFileExists(p1, secondCommitTs, file1P1C0));
assertFalse(testTable.baseFileExists(p0, firstCommitTs, file1P0C0));
assertFalse(testTable.baseFileExists(p1, firstCommitTs, file1P1C0));
+ } finally {
+ testTable.close();
}
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
index 83d5e2c54bb..907d9ba9152 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.FileCreateUtils;
+import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.CleanerUtils;
@@ -39,6 +40,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.storage.StoragePath;
import java.io.File;
@@ -176,12 +178,12 @@ public class HoodieCleanerTestBase extends
HoodieClientTestBase {
}
public void commitWithMdt(String instantTime, Map<String, List<String>>
partToFileId,
- HoodieTestTable testTable,
HoodieTableMetadataWriter metadataWriter) throws Exception {
- commitWithMdt(instantTime, partToFileId, testTable, metadataWriter, true,
false);
+ HoodieTestTable testTable, HoodieWriteConfig
config) throws Exception {
+ commitWithMdt(instantTime, partToFileId, testTable, config, true, false);
}
public void commitWithMdt(String instantTime, Map<String, List<String>>
partToFileId,
- HoodieTestTable testTable,
HoodieTableMetadataWriter metadataWriter, boolean addBaseFiles, boolean
addLogFiles) throws Exception {
+ HoodieTestTable testTable, HoodieWriteConfig
config, boolean addBaseFiles, boolean addLogFiles) throws Exception {
testTable.addInflightCommit(instantTime);
Map<String, List<String>> partToFileIds = new HashMap<>();
partToFileId.forEach((key, value) -> {
@@ -206,12 +208,23 @@ public class HoodieCleanerTestBase extends
HoodieClientTestBase {
}
});
HoodieCommitMetadata commitMeta = generateCommitMetadata(instantTime,
partToFileIds);
- metadataWriter.performTableServices(Option.of(instantTime));
- metadataWriter.updateFromWriteStatuses(commitMeta,
context.emptyHoodieData(), instantTime);
- metaClient.getActiveTimeline().saveAsComplete(
- new HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, instantTime),
- serializeCommitMetadata(commitMeta));
- metaClient = HoodieTableMetaClient.reload(metaClient);
+ try (HoodieTableMetadataWriter metadataWriter = getMetadataWriter(config))
{
+ metadataWriter.performTableServices(Option.of(instantTime));
+ metadataWriter.updateFromWriteStatuses(commitMeta,
context.emptyHoodieData(), instantTime);
+ metaClient.getActiveTimeline().saveAsComplete(
+ new HoodieInstant(HoodieInstant.State.INFLIGHT,
HoodieTimeline.COMMIT_ACTION, instantTime),
+ serializeCommitMetadata(commitMeta));
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ }
+ }
+
+ protected HoodieTableMetadataWriter getMetadataWriter(HoodieWriteConfig
config) {
+ return SparkHoodieBackedTableMetadataWriter.create(storageConf, config,
context);
+ }
+
+ protected HoodieTestTable tearDownTestTableAndReinit(HoodieTestTable
testTable, HoodieWriteConfig config) throws Exception {
+ testTable.close();
+ return HoodieMetadataTestTable.of(metaClient, getMetadataWriter(config),
Option.of(context));
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 04839c57a94..0ef17e9e233 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -695,6 +695,10 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
public void close() {
closePartitionReaders();
partitionFileSliceMap.clear();
+ if (this.metadataFileSystemView != null) {
+ this.metadataFileSystemView.close();
+ this.metadataFileSystemView = null;
+ }
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index eb95c8e27fc..ab27afd17c4 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1078,21 +1078,28 @@ public class HoodieTableMetadataUtil {
Option<HoodieTableFileSystemView> fileSystemView,
String partition,
boolean
mergeFileSlices) {
- HoodieTableFileSystemView fsView = fileSystemView.orElseGet(() ->
getFileSystemView(metaClient));
- Stream<FileSlice> fileSliceStream;
- if (mergeFileSlices) {
- if
(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent())
{
- fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(
- // including pending compaction instant as the last instant so
that the finished delta commits
- // that start earlier than the compaction can be queried.
- partition,
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant().get().getTimestamp());
+ HoodieTableFileSystemView fsView = null;
+ try {
+ fsView = fileSystemView.orElseGet(() -> getFileSystemView(metaClient));
+ Stream<FileSlice> fileSliceStream;
+ if (mergeFileSlices) {
+ if
(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent())
{
+ fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(
+ // including pending compaction instant as the last instant so
that the finished delta commits
+ // that start earlier than the compaction can be queried.
+ partition,
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant().get().getTimestamp());
+ } else {
+ return Collections.emptyList();
+ }
} else {
- return Collections.emptyList();
+ fileSliceStream = fsView.getLatestFileSlices(partition);
+ }
+ return
fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(Collectors.toList());
+ } finally {
+ if (!fileSystemView.isPresent()) {
+ fsView.close();
}
- } else {
- fileSliceStream = fsView.getLatestFileSlices(partition);
}
- return
fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(Collectors.toList());
}
/**
@@ -1106,11 +1113,18 @@ public class HoodieTableMetadataUtil {
public static List<FileSlice>
getPartitionLatestFileSlicesIncludingInflight(HoodieTableMetaClient metaClient,
Option<HoodieTableFileSystemView> fileSystemView,
String partition) {
- HoodieTableFileSystemView fsView = fileSystemView.orElseGet(() ->
getFileSystemView(metaClient));
- Stream<FileSlice> fileSliceStream =
fsView.getLatestFileSlicesIncludingInflight(partition);
- return fileSliceStream
- .sorted(Comparator.comparing(FileSlice::getFileId))
- .collect(Collectors.toList());
+ HoodieTableFileSystemView fsView = null;
+ try {
+ fsView = fileSystemView.orElseGet(() -> getFileSystemView(metaClient));
+ Stream<FileSlice> fileSliceStream =
fsView.getLatestFileSlicesIncludingInflight(partition);
+ return fileSliceStream
+ .sorted(Comparator.comparing(FileSlice::getFileId))
+ .collect(Collectors.toList());
+ } finally {
+ if (!fileSystemView.isPresent()) {
+ fsView.close();
+ }
+ }
}
public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index e52949bfddc..e7e442f3ed6 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -138,7 +138,7 @@ import static
org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
/**
* Test Hoodie Table for testing only.
*/
-public class HoodieTestTable {
+public class HoodieTestTable implements AutoCloseable {
public static final String PHONY_TABLE_SCHEMA =
"{\"namespace\": \"org.apache.hudi.avro.model\", \"type\": \"record\",
\"name\": \"PhonyRecord\", \"fields\": []}";
@@ -1334,6 +1334,11 @@ public class HoodieTestTable {
return writeStats;
}
+ @Override
+ public void close() throws Exception {
+ // no-op
+ }
+
/**
* Exception for {@link HoodieTestTable}.
*/
@@ -1440,4 +1445,5 @@ public class HoodieTestTable {
return this.commitsToPartitionToLogFileInfoStats.get(commitTime);
}
}
+
}