This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3789840be3d [HUDI-7921] Fixing file system view closures in MDT 
(#11496)
3789840be3d is described below

commit 3789840be3d041cbcfc6b24786740210e4e6d6ac
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Jul 10 19:25:41 2024 -0700

    [HUDI-7921] Fixing file system view closures in MDT (#11496)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |  55 ++--
 .../common/testutils/HoodieMetadataTestTable.java  |   6 +
 .../java/org/apache/hudi/table/TestCleaner.java    | 326 +++++++++++----------
 .../table/functional/TestCleanPlanExecutor.java    | 325 ++++++++++----------
 .../hudi/testutils/HoodieCleanerTestBase.java      |  31 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |   4 +
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  48 +--
 .../hudi/common/testutils/HoodieTestTable.java     |   8 +-
 8 files changed, 440 insertions(+), 363 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 89d21e79b22..c38a68e37cf 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1081,9 +1081,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   private HoodieData<HoodieRecord> 
getFunctionalIndexUpdates(HoodieCommitMetadata commitMetadata, String 
indexPartition, String instantTime) throws Exception {
     HoodieIndexDefinition indexDefinition = 
getFunctionalIndexDefinition(indexPartition);
     List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
-    HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemView(dataMetaClient);
     commitMetadata.getPartitionToWriteStats().forEach((dataPartition, value) 
-> {
-      List<FileSlice> fileSlices = 
getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, 
Option.ofNullable(fsView), dataPartition);
+      List<FileSlice> fileSlices = 
getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, Option.empty(), 
dataPartition);
       fileSlices.forEach(fileSlice -> {
         // Filter log files for the instant time and add to this partition 
fileSlice pairs
         List<HoodieLogFile> logFilesForInstant = fileSlice.getLogFiles()
@@ -1411,35 +1410,35 @@ public abstract class 
HoodieBackedTableMetadataWriter<I> implements HoodieTableM
       HoodieData<HoodieRecord>> partitionRecordsMap) {
     // The result set
     HoodieData<HoodieRecord> allPartitionRecords = 
engineContext.emptyHoodieData();
+    try (HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemView(metadataMetaClient)) {
+      for (Map.Entry<MetadataPartitionType, HoodieData<HoodieRecord>> entry : 
partitionRecordsMap.entrySet()) {
+        final String partitionName = 
HoodieIndexUtils.getPartitionNameFromPartitionType(entry.getKey(), 
dataMetaClient, dataWriteConfig.getIndexingConfig().getIndexName());
+        HoodieData<HoodieRecord> records = entry.getValue();
+
+        List<FileSlice> fileSlices =
+            
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), partitionName);
+        if (fileSlices.isEmpty()) {
+          // scheduling of INDEX only initializes the file group and not add 
commit
+          // so if there are no committed file slices, look for inflight slices
+          fileSlices = 
getPartitionLatestFileSlicesIncludingInflight(metadataMetaClient, 
Option.ofNullable(fsView), partitionName);
+        }
+        final int fileGroupCount = fileSlices.size();
+        ValidationUtils.checkArgument(fileGroupCount > 0, 
String.format("FileGroup count for MDT partition %s should be >0", 
partitionName));
+
+        List<FileSlice> finalFileSlices = fileSlices;
+        HoodieData<HoodieRecord> rddSinglePartitionRecords = records.map(r -> {
+          FileSlice slice = 
finalFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
+              fileGroupCount));
+          r.unseal();
+          r.setCurrentLocation(new 
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
+          r.seal();
+          return r;
+        });
 
-    HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemView(metadataMetaClient);
-    for (Map.Entry<MetadataPartitionType, HoodieData<HoodieRecord>> entry : 
partitionRecordsMap.entrySet()) {
-      final String partitionName = 
HoodieIndexUtils.getPartitionNameFromPartitionType(entry.getKey(), 
dataMetaClient, dataWriteConfig.getIndexingConfig().getIndexName());
-      HoodieData<HoodieRecord> records = entry.getValue();
-
-      List<FileSlice> fileSlices =
-          
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.ofNullable(fsView), partitionName);
-      if (fileSlices.isEmpty()) {
-        // scheduling of INDEX only initializes the file group and not add 
commit
-        // so if there are no committed file slices, look for inflight slices
-        fileSlices = 
getPartitionLatestFileSlicesIncludingInflight(metadataMetaClient, 
Option.ofNullable(fsView), partitionName);
+        allPartitionRecords = 
allPartitionRecords.union(rddSinglePartitionRecords);
       }
-      final int fileGroupCount = fileSlices.size();
-      ValidationUtils.checkArgument(fileGroupCount > 0, 
String.format("FileGroup count for MDT partition %s should be >0", 
partitionName));
-
-      List<FileSlice> finalFileSlices = fileSlices;
-      HoodieData<HoodieRecord> rddSinglePartitionRecords = records.map(r -> {
-        FileSlice slice = 
finalFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(),
-            fileGroupCount));
-        r.unseal();
-        r.setCurrentLocation(new 
HoodieRecordLocation(slice.getBaseInstantTime(), slice.getFileId()));
-        r.seal();
-        return r;
-      });
-
-      allPartitionRecords = 
allPartitionRecords.union(rddSinglePartitionRecords);
+      return allPartitionRecords;
     }
-    return allPartitionRecords;
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
index 15230b8cb96..57b987d46b1 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java
@@ -181,4 +181,10 @@ public class HoodieMetadataTestTable extends 
HoodieTestTable {
     return this;
   }
 
+  @Override
+  public void close() throws Exception {
+    if (writer != null) {
+      this.writer.close();
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 49d5ccb7667..fd74d9f652b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -278,24 +278,24 @@ public class TestCleaner extends HoodieCleanerTestBase {
   @Test
   public void testEarliestInstantToRetainForPendingCompaction() throws 
IOException {
     HoodieWriteConfig writeConfig = getConfigBuilder().withPath(basePath)
-            .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
-                    .withEnableBackupForRemoteFileSystemView(false)
-                    .build())
-            .withCleanConfig(HoodieCleanConfig.newBuilder()
-                    .withAutoClean(false)
-                    
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
-                    .retainCommits(1)
-                    .build())
-            .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-                    .withInlineCompaction(false)
-                    .withMaxNumDeltaCommitsBeforeCompaction(1)
-                    .compactionSmallFileSize(1024 * 1024 * 1024)
-                    .build())
-            .withArchivalConfig(HoodieArchivalConfig.newBuilder()
-                    .withAutoArchive(false)
-                    .archiveCommitsWith(2,3)
-                    .build())
-            .withEmbeddedTimelineServerEnabled(false).build();
+        .withFileSystemViewConfig(new FileSystemViewStorageConfig.Builder()
+            .withEnableBackupForRemoteFileSystemView(false)
+            .build())
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withAutoClean(false)
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+            .retainCommits(1)
+            .build())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withInlineCompaction(false)
+            .withMaxNumDeltaCommitsBeforeCompaction(1)
+            .compactionSmallFileSize(1024 * 1024 * 1024)
+            .build())
+        .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+            .withAutoArchive(false)
+            .archiveCommitsWith(2, 3)
+            .build())
+        .withEmbeddedTimelineServerEnabled(false).build();
 
     HoodieTestUtils.init(storageConf, basePath, HoodieTableType.MERGE_ON_READ);
 
@@ -322,7 +322,7 @@ public class TestCleaner extends HoodieCleanerTestBase {
       Option<HoodieCleanerPlan> cleanPlan = table.scheduleCleaning(context, 
instantTime, Option.empty());
       
assertEquals(cleanPlan.get().getFilePathsToBeDeletedPerPartition().get(partition1).size(),
 1);
       assertEquals(earliestInstantToRetain, 
cleanPlan.get().getEarliestInstantToRetain().getTimestamp(),
-              "clean until " + earliestInstantToRetain);
+          "clean until " + earliestInstantToRetain);
       table.getMetaClient().reloadActiveTimeline();
       table.clean(context, instantTime);
 
@@ -357,7 +357,7 @@ public class TestCleaner extends HoodieCleanerTestBase {
       // earliest commit to retain should be earlier than first pending 
compaction in incremental cleaning scenarios.
       instantTime = client.createNewInstantTime();
       cleanPlan = table.scheduleCleaning(context, instantTime, Option.empty());
-      
assertEquals(earliestInstantToRetain,cleanPlan.get().getEarliestInstantToRetain().getTimestamp());
+      assertEquals(earliestInstantToRetain, 
cleanPlan.get().getEarliestInstantToRetain().getTimestamp());
     }
   }
 
@@ -570,10 +570,11 @@ public class TestCleaner extends HoodieCleanerTestBase {
     int instantClean = startInstant;
 
     HoodieTestTable testTable = HoodieTestTable.of(metaClient);
-    try (HoodieTableMetadataWriter metadataWriter = 
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
+
+    try {
       for (int i = 0; i < commitCount; i++, startInstant++) {
         String commitTime = HoodieTestTable.makeNewCommitTime(startInstant, 
"%09d");
-        commitWithMdt(commitTime, Collections.emptyMap(), testTable, 
metadataWriter);
+        commitWithMdt(commitTime, Collections.emptyMap(), testTable, config);
       }
 
       List<HoodieCleanStat> cleanStats = runCleaner(config);
@@ -601,6 +602,8 @@ public class TestCleaner extends HoodieCleanerTestBase {
           
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterCompletedInstants().countInstants());
       assertFalse(timeline.getTimelineOfActions(
           
CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION)).filterInflightsAndRequested().containsInstant(HoodieTestTable.makeNewCommitTime(--instantClean,
 "%09d")));
+    } finally {
+      testTable.close();
     }
   }
 
@@ -614,88 +617,87 @@ public class TestCleaner extends HoodieCleanerTestBase {
             .retainCommits(2).build())
         .build();
 
-    try (HoodieTableMetadataWriter metadataWriter = 
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
-      HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
metadataWriter, Option.of(context));
-      String p0 = "2020/01/01";
-      String p1 = "2020/01/02";
-
-      // make 1 commit, with 1 file per partition
-      String file1P0C0 = UUID.randomUUID().toString();
-      String file1P1C0 = UUID.randomUUID().toString();
-      Map<String, List<String>> part1ToFileId = 
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
-        {
-          put(p0, CollectionUtils.createImmutableList(file1P0C0));
-          put(p1, CollectionUtils.createImmutableList(file1P1C0));
-        }
-      });
-      commitWithMdt("00000000000001", part1ToFileId, testTable, 
metadataWriter, true, true);
-      metaClient = HoodieTableMetaClient.reload(metaClient);
-
-      List<HoodieCleanStat> hoodieCleanStatsOne = 
runCleanerWithInstantFormat(config, true);
-      assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any 
partitions and clean any files");
-      assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
-      assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
-
-      // make next replacecommit, with 1 clustering operation. logically 
delete p0. No change to p1
-      // notice that clustering generates empty inflight commit files
-      Map<String, String> partitionAndFileId002 = 
testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0);
-      String file2P0C1 = partitionAndFileId002.get(p0);
-      Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> 
replaceMetadata =
-          generateReplaceCommitMetadata("00000000000002", p0, file1P0C0, 
file2P0C1);
-      testTable.addReplaceCommit("00000000000002", 
Option.of(replaceMetadata.getKey()), Option.empty(), 
replaceMetadata.getValue());
-
-      // run cleaner
-      List<HoodieCleanStat> hoodieCleanStatsTwo = 
runCleanerWithInstantFormat(config, true);
-      assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any 
partitions and clean any files");
-      assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
-      assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
-      assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
-
-      // make next replacecommit, with 1 clustering operation. Replace data in 
p1. No change to p0
-      // notice that clustering generates empty inflight commit files
-      Map<String, String> partitionAndFileId003 = 
testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1);
-      String file3P1C2 = partitionAndFileId003.get(p1);
-      replaceMetadata = generateReplaceCommitMetadata("00000000000003", p1, 
file1P1C0, file3P1C2);
-      testTable.addReplaceCommit("00000000000003", 
Option.of(replaceMetadata.getKey()), Option.empty(), 
replaceMetadata.getValue());
-
-      // run cleaner
-      List<HoodieCleanStat> hoodieCleanStatsThree = 
runCleanerWithInstantFormat(config, true);
-      assertEquals(0, hoodieCleanStatsThree.size(), "Must not scan any 
partitions and clean any files");
-      assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
-      assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
-      assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
-      assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
-
-      // make next replacecommit, with 1 clustering operation. Replace data in 
p0 again
-      // notice that clustering generates empty inflight commit files
-      Map<String, String> partitionAndFileId004 = 
testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0);
-      String file4P0C3 = partitionAndFileId004.get(p0);
-      replaceMetadata = generateReplaceCommitMetadata("00000000000004", p0, 
file2P0C1, file4P0C3);
-      testTable.addReplaceCommit("00000000000004", 
Option.of(replaceMetadata.getKey()), Option.empty(), 
replaceMetadata.getValue());
-
-      // run cleaner
-      List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config, 5, true);
-      assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
-      assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
-      assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
-      assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
-      //file1P1C0 still stays because its not replaced until 3 and its the 
only version available
-      assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
-
-      // make next replacecommit, with 1 clustering operation. Replace all 
data in p1. no new files created
-      // notice that clustering generates empty inflight commit files
-      Map<String, String> partitionAndFileId005 = 
testTable.forReplaceCommit("00000000000006").getFileIdsWithBaseFilesInPartitions(p1);
-      String file4P1C4 = partitionAndFileId005.get(p1);
-      replaceMetadata = generateReplaceCommitMetadata("00000000000006", p0, 
file3P1C2, file4P1C4);
-      testTable.addReplaceCommit("00000000000006", 
Option.of(replaceMetadata.getKey()), Option.empty(), 
replaceMetadata.getValue());
+    HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
getMetadataWriter(config), Option.of(context));
+    String p0 = "2020/01/01";
+    String p1 = "2020/01/02";
+
+    // make 1 commit, with 1 file per partition
+    String file1P0C0 = UUID.randomUUID().toString();
+    String file1P1C0 = UUID.randomUUID().toString();
+    Map<String, List<String>> part1ToFileId = Collections.unmodifiableMap(new 
HashMap<String, List<String>>() {
+      {
+        put(p0, CollectionUtils.createImmutableList(file1P0C0));
+        put(p1, CollectionUtils.createImmutableList(file1P1C0));
+      }
+    });
+    commitWithMdt("00000000000001", part1ToFileId, testTable, config, true, 
true);
+    testTable = tearDownTestTableAndReinit(testTable, config);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
 
-      List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, 7, true);
-      assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
-      assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
-      assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
-      assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
-      assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
-    }
+    List<HoodieCleanStat> hoodieCleanStatsOne = 
runCleanerWithInstantFormat(config, true);
+    assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions 
and clean any files");
+    assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+    // make next replacecommit, with 1 clustering operation. logically delete 
p0. No change to p1
+    // notice that clustering generates empty inflight commit files
+    Map<String, String> partitionAndFileId002 = 
testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0);
+    String file2P0C1 = partitionAndFileId002.get(p0);
+    Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> 
replaceMetadata =
+        generateReplaceCommitMetadata("00000000000002", p0, file1P0C0, 
file2P0C1);
+    testTable.addReplaceCommit("00000000000002", 
Option.of(replaceMetadata.getKey()), Option.empty(), 
replaceMetadata.getValue());
+
+    // run cleaner
+    List<HoodieCleanStat> hoodieCleanStatsTwo = 
runCleanerWithInstantFormat(config, true);
+    assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions 
and clean any files");
+    assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+    assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+    // make next replacecommit, with 1 clustering operation. Replace data in 
p1. No change to p0
+    // notice that clustering generates empty inflight commit files
+    Map<String, String> partitionAndFileId003 = 
testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1);
+    String file3P1C2 = partitionAndFileId003.get(p1);
+    replaceMetadata = generateReplaceCommitMetadata("00000000000003", p1, 
file1P1C0, file3P1C2);
+    testTable.addReplaceCommit("00000000000003", 
Option.of(replaceMetadata.getKey()), Option.empty(), 
replaceMetadata.getValue());
+
+    // run cleaner
+    List<HoodieCleanStat> hoodieCleanStatsThree = 
runCleanerWithInstantFormat(config, true);
+    assertEquals(0, hoodieCleanStatsThree.size(), "Must not scan any 
partitions and clean any files");
+    assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+    assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
+    assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+    // make next replacecommit, with 1 clustering operation. Replace data in 
p0 again
+    // notice that clustering generates empty inflight commit files
+    Map<String, String> partitionAndFileId004 = 
testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0);
+    String file4P0C3 = partitionAndFileId004.get(p0);
+    replaceMetadata = generateReplaceCommitMetadata("00000000000004", p0, 
file2P0C1, file4P0C3);
+    testTable.addReplaceCommit("00000000000004", 
Option.of(replaceMetadata.getKey()), Option.empty(), 
replaceMetadata.getValue());
+
+    // run cleaner
+    List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config, 5, true);
+    assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
+    assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+    assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
+    assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    //file1P1C0 still stays because its not replaced until 3 and its the only 
version available
+    assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+    // make next replacecommit, with 1 clustering operation. Replace all data 
in p1. no new files created
+    // notice that clustering generates empty inflight commit files
+    Map<String, String> partitionAndFileId005 = 
testTable.forReplaceCommit("00000000000006").getFileIdsWithBaseFilesInPartitions(p1);
+    String file4P1C4 = partitionAndFileId005.get(p1);
+    replaceMetadata = generateReplaceCommitMetadata("00000000000006", p0, 
file3P1C2, file4P1C4);
+    testTable.addReplaceCommit("00000000000006", 
Option.of(replaceMetadata.getKey()), Option.empty(), 
replaceMetadata.getValue());
+
+    List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, 7, true);
+    assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
+    assertTrue(testTable.baseFileExists(p0, "00000000000002", file2P0C1));
+    assertTrue(testTable.baseFileExists(p1, "00000000000003", file3P1C2));
+    assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
   }
 
   private Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> 
generateReplaceCommitMetadata(
@@ -1040,8 +1042,8 @@ public class TestCleaner extends HoodieCleanerTestBase {
             
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS).retainCommits(2).build())
         .build();
 
-    try (HoodieTableMetadataWriter metadataWriter = 
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
-      HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
metadataWriter, Option.of(context));
+    HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
getMetadataWriter(config), Option.of(context));
+    try {
       String p0 = "2020/01/01";
       String p1 = "2020/01/02";
 
@@ -1054,7 +1056,8 @@ public class TestCleaner extends HoodieCleanerTestBase {
           put(p1, CollectionUtils.createImmutableList(file1P1C0));
         }
       });
-      commitWithMdt("00000000000001", part1ToFileId, testTable, 
metadataWriter, true, true);
+      commitWithMdt("00000000000001", part1ToFileId, testTable, config, true, 
true);
+      testTable = tearDownTestTableAndReinit(testTable, config);
       metaClient = HoodieTableMetaClient.reload(metaClient);
 
       // make next replacecommit, with 1 clustering operation. logically 
delete p0. No change to p1
@@ -1087,6 +1090,8 @@ public class TestCleaner extends HoodieCleanerTestBase {
       assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
       //file1P1C0 still stays because its not replaced until 3 and its the 
only version available
       assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+    } finally {
+      testTable.close();
     }
   }
 
@@ -1110,10 +1115,10 @@ public class TestCleaner extends HoodieCleanerTestBase {
         .withMarkersType(MarkerType.DIRECT.name())
         .withPath(basePath)
         .build();
-    try (HoodieTableMetadataWriter metadataWriter = 
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
-      // reload because table configs could have been updated
-      metaClient = HoodieTableMetaClient.reload(metaClient);
-      HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
metadataWriter, Option.of(context));
+    // reload because table configs could have been updated
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
getMetadataWriter(config), Option.of(context));
+    try {
 
       String p1 = "part_1";
       String p2 = "part_2";
@@ -1127,9 +1132,9 @@ public class TestCleaner extends HoodieCleanerTestBase {
           put(p1, CollectionUtils.createImmutableList(file1P1, file2P1));
         }
       });
-      commitWithMdt("10", part1ToFileId, testTable, metadataWriter);
+      commitWithMdt("10", part1ToFileId, testTable, config);
       testTable.addClean("15");
-      commitWithMdt("20", part1ToFileId, testTable, metadataWriter);
+      commitWithMdt("20", part1ToFileId, testTable, config);
 
       // add clean instant
       HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new 
HoodieActionInstant("", "", ""),
@@ -1146,8 +1151,10 @@ public class TestCleaner extends HoodieCleanerTestBase {
           put(p2, CollectionUtils.createImmutableList(file3P2, file4P2));
         }
       });
-      commitWithMdt("30", part2ToFileId, testTable, metadataWriter);
-      commitWithMdt("40", part2ToFileId, testTable, metadataWriter);
+      commitWithMdt("30", part2ToFileId, testTable, config);
+      testTable = tearDownTestTableAndReinit(testTable, config);
+      commitWithMdt("40", part2ToFileId, testTable, config);
+      testTable = tearDownTestTableAndReinit(testTable, config);
 
       // empty commits
       String file5P2 = UUID.randomUUID().toString();
@@ -1157,8 +1164,10 @@ public class TestCleaner extends HoodieCleanerTestBase {
           put(p2, CollectionUtils.createImmutableList(file5P2, file6P2));
         }
       });
-      commitWithMdt("50", part2ToFileId, testTable, metadataWriter);
-      commitWithMdt("60", part2ToFileId, testTable, metadataWriter);
+      commitWithMdt("50", part2ToFileId, testTable, config);
+      testTable = tearDownTestTableAndReinit(testTable, config);
+      commitWithMdt("60", part2ToFileId, testTable, config);
+      testTable = tearDownTestTableAndReinit(testTable, config);
 
       // archive commit 1, 2
       new HoodieTimelineArchiver<>(config, HoodieSparkTable.create(config, 
context, metaClient))
@@ -1176,6 +1185,8 @@ public class TestCleaner extends HoodieCleanerTestBase {
       assertTrue(testTable.baseFileExists(p1, "20", file2P1), "Latest 
FileSlice exists");
       assertTrue(testTable.baseFileExists(p2, "40", file3P2), "Latest 
FileSlice exists");
       assertTrue(testTable.baseFileExists(p2, "40", file4P2), "Latest 
FileSlice exists");
+    } finally {
+      testTable.close();
     }
   }
 
@@ -1190,37 +1201,36 @@ public class TestCleaner extends HoodieCleanerTestBase {
     HoodieTableMetaClient metaClient =
         HoodieTestUtils.init(storageConf, basePath, 
HoodieTableType.MERGE_ON_READ);
 
-    try (HoodieTableMetadataWriter metadataWriter = 
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
-
-      final String partition = "2016/03/15";
-      String timePrefix = "00000000000";
-      Map<String, String> expFileIdToPendingCompaction = new HashMap<String, 
String>() {
-        {
-          put("fileId2", timePrefix + "004");
-          put("fileId3", timePrefix + "006");
-          put("fileId4", timePrefix + "008");
-          put("fileId5", timePrefix + "010");
-        }
-      };
-      Map<String, String> fileIdToLatestInstantBeforeCompaction = new 
HashMap<String, String>() {
-        {
-          put("fileId1", timePrefix + "000");
-          put("fileId2", timePrefix + "000");
-          put("fileId3", timePrefix + "001");
-          put("fileId4", timePrefix + "003");
-          put("fileId5", timePrefix + "005");
-          put("fileId6", timePrefix + "009");
-          put("fileId7", timePrefix + "013");
-        }
-      };
+    final String partition = "2016/03/15";
+    String timePrefix = "00000000000";
+    Map<String, String> expFileIdToPendingCompaction = new HashMap<String, 
String>() {
+      {
+        put("fileId2", timePrefix + "004");
+        put("fileId3", timePrefix + "006");
+        put("fileId4", timePrefix + "008");
+        put("fileId5", timePrefix + "010");
+      }
+    };
+    Map<String, String> fileIdToLatestInstantBeforeCompaction = new 
HashMap<String, String>() {
+      {
+        put("fileId1", timePrefix + "000");
+        put("fileId2", timePrefix + "000");
+        put("fileId3", timePrefix + "001");
+        put("fileId4", timePrefix + "003");
+        put("fileId5", timePrefix + "005");
+        put("fileId6", timePrefix + "009");
+        put("fileId7", timePrefix + "013");
+      }
+    };
 
-      // Generate 7 file-groups. First one has only one slice and no pending 
compaction. File Slices (2 - 5) has
-      // multiple versions with pending compaction. File Slices (6 - 7) have 
multiple file-slices but not under
-      // compactions
-      // FileIds 2-5 will be under compaction
-      // reload because table configs could have been updated
-      metaClient = HoodieTableMetaClient.reload(metaClient);
-      HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+    // Generate 7 file-groups. First one has only one slice and no pending 
compaction. File Slices (2 - 5) has
+    // multiple versions with pending compaction. File Slices (6 - 7) have 
multiple file-slices but not under
+    // compactions
+    // FileIds 2-5 will be under compaction
+    // reload because table configs could have been updated
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+    try {
 
       testTable.withPartitionMetaFiles(partition);
 
@@ -1236,56 +1246,56 @@ public class TestCleaner extends HoodieCleanerTestBase {
       Map<String, List<String>> part1ToFileId = new HashMap<>();
       part1ToFileId.put(partition, Arrays.asList(file1P1, file2P1, file3P1, 
file4P1, file5P1, file6P1, file7P1));
       // all 7 fileIds
-      commitWithMdt(timePrefix + "000", part1ToFileId, testTable, 
metadataWriter, true, true);
+      commitWithMdt(timePrefix + "000", part1ToFileId, testTable, config, 
true, true);
       part1ToFileId = new HashMap<>();
       part1ToFileId.put(partition, Arrays.asList(file3P1, file4P1, file5P1, 
file6P1, file7P1));
       // fileIds 3 to 7
-      commitWithMdt(timePrefix + "001", part1ToFileId, testTable, 
metadataWriter, true, true);
+      commitWithMdt(timePrefix + "001", part1ToFileId, testTable, config, 
true, true);
       part1ToFileId = new HashMap<>();
       part1ToFileId.put(partition, Arrays.asList(file4P1, file5P1, file6P1, 
file7P1));
       // fileIds 4 to 7
-      commitWithMdt(timePrefix + "003", part1ToFileId, testTable, 
metadataWriter, true, true);
+      commitWithMdt(timePrefix + "003", part1ToFileId, testTable, config, 
true, true);
 
       // add compaction
       testTable.addRequestedCompaction(timePrefix + "004", new 
FileSlice(partition, timePrefix + "000", file2P1));
 
       part1ToFileId = new HashMap<>();
       part1ToFileId.put(partition, Arrays.asList(file2P1));
-      commitWithMdt(timePrefix + "005", part1ToFileId, testTable, 
metadataWriter, false, true);
+      commitWithMdt(timePrefix + "005", part1ToFileId, testTable, config, 
false, true);
 
       part1ToFileId = new HashMap<>();
       part1ToFileId.put(partition, Arrays.asList(file5P1, file6P1, file7P1));
-      commitWithMdt(timePrefix + "0055", part1ToFileId, testTable, 
metadataWriter, true, true);
+      commitWithMdt(timePrefix + "0055", part1ToFileId, testTable, config, 
true, true);
 
       testTable.addRequestedCompaction(timePrefix + "006", new 
FileSlice(partition, timePrefix + "001", file3P1));
 
       part1ToFileId = new HashMap<>();
       part1ToFileId.put(partition, Arrays.asList(file3P1));
-      commitWithMdt(timePrefix + "007", part1ToFileId, testTable, 
metadataWriter, false, true);
+      commitWithMdt(timePrefix + "007", part1ToFileId, testTable, config, 
false, true);
 
       part1ToFileId = new HashMap<>();
       part1ToFileId.put(partition, Arrays.asList(file6P1, file7P1));
-      commitWithMdt(timePrefix + "0075", part1ToFileId, testTable, 
metadataWriter, true, true);
+      commitWithMdt(timePrefix + "0075", part1ToFileId, testTable, config, 
true, true);
 
       testTable.addRequestedCompaction(timePrefix + "008", new 
FileSlice(partition, timePrefix + "003", file4P1));
 
       part1ToFileId = new HashMap<>();
       part1ToFileId.put(partition, Arrays.asList(file4P1));
-      commitWithMdt(timePrefix + "009", part1ToFileId, testTable, 
metadataWriter, false, true);
+      commitWithMdt(timePrefix + "009", part1ToFileId, testTable, config, 
false, true);
 
       part1ToFileId = new HashMap<>();
       part1ToFileId.put(partition, Arrays.asList(file6P1, file7P1));
-      commitWithMdt(timePrefix + "0095", part1ToFileId, testTable, 
metadataWriter, true, true);
+      commitWithMdt(timePrefix + "0095", part1ToFileId, testTable, config, 
true, true);
 
       testTable.addRequestedCompaction(timePrefix + "010", new 
FileSlice(partition, timePrefix + "005", file5P1));
 
       part1ToFileId = new HashMap<>();
       part1ToFileId.put(partition, Arrays.asList(file5P1));
-      commitWithMdt(timePrefix + "011", part1ToFileId, testTable, 
metadataWriter, false, true);
+      commitWithMdt(timePrefix + "011", part1ToFileId, testTable, config, 
false, true);
 
       part1ToFileId = new HashMap<>();
       part1ToFileId.put(partition, Arrays.asList(file7P1));
-      commitWithMdt(timePrefix + "013", part1ToFileId, testTable, 
metadataWriter, true, true);
+      commitWithMdt(timePrefix + "013", part1ToFileId, testTable, config, 
true, true);
 
       // Clean now
       metaClient = HoodieTableMetaClient.reload(metaClient);
@@ -1325,6 +1335,8 @@ public class TestCleaner extends HoodieCleanerTestBase {
       assertEquals(expNumFilesDeleted, numDeleted, "Correct number of files 
deleted");
       assertEquals(expNumFilesUnderCompactionDeleted, 
numFilesUnderCompactionDeleted,
           "Correct number of files under compaction deleted");
+    } finally {
+      testTable.close();
     }
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
index 92bb04513d3..4ed5cad00ae 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java
@@ -39,8 +39,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.metadata.HoodieTableMetadataWriter;
-import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.testutils.HoodieCleanerTestBase;
 
 import org.junit.jupiter.api.Test;
@@ -118,137 +116,140 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
             .withMaxCommitsBeforeCleaning(2)
             .build()).build();
 
-    try (HoodieTableMetadataWriter metadataWriter = 
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
-      HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
metadataWriter, Option.of(context));
-      String p0 = "2020/01/01";
-      String p1 = "2020/01/02";
-      Map<String, List<BootstrapFileMapping>> bootstrapMapping = 
enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null;
-
-      // make 1 commit, with 1 file per partition
-      String file1P0C0 = enableBootstrapSourceClean ? 
bootstrapMapping.get(p0).get(0).getFileId()
-          : UUID.randomUUID().toString();
-      String file1P1C0 = enableBootstrapSourceClean ? 
bootstrapMapping.get(p1).get(0).getFileId()
-          : UUID.randomUUID().toString();
-      Map<String, List<String>> part1ToFileId = 
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
-        {
-          put(p0, CollectionUtils.createImmutableList(file1P0C0));
-          put(p1, CollectionUtils.createImmutableList(file1P1C0));
-        }
-      });
-      commitWithMdt("00000000000001", part1ToFileId, testTable, 
metadataWriter, true, true);
-      metaClient = HoodieTableMetaClient.reload(metaClient);
-
-      List<HoodieCleanStat> hoodieCleanStatsOne =
-          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 2, 
true);
-      assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any 
partitions and clean any files");
-      assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
-      assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
-
-      // make next commit, with 1 insert & 1 update per partition
-      Map<String, String> partitionAndFileId002 = 
testTable.addInflightCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p0,
 p1);
-      String file2P0C1 = partitionAndFileId002.get(p0);
-      String file2P1C1 = partitionAndFileId002.get(p1);
-      Map<String, List<String>> part2ToFileId = 
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
-        {
-          put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
-          put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
-        }
-      });
-      commitWithMdt("00000000000003", part2ToFileId, testTable, 
metadataWriter, true, true);
-      metaClient = HoodieTableMetaClient.reload(metaClient);
-
-      List<HoodieCleanStat> hoodieCleanStatsTwo =
-          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 4, 
true);
-      assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any 
partitions and clean any files");
-      assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
-      assertTrue(testTable.baseFileExists(p1, "00000000000003", file2P1C1));
-      assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
-      assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
-
-      // make next commit, with 2 updates to existing files, and 1 insert
-      String file3P0C2 = 
testTable.addInflightCommit("00000000000005").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
-      Map<String, List<String>> part3ToFileId = 
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
-        {
-          put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, 
file3P0C2));
-        }
-      });
-      commitWithMdt("00000000000005", part3ToFileId, testTable, 
metadataWriter, true, true);
-      metaClient = HoodieTableMetaClient.reload(metaClient);
-
-      List<HoodieCleanStat> hoodieCleanStatsThree =
-          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 6, 
true);
-      assertEquals(0, hoodieCleanStatsThree.size(),
-          "Must not clean any file. We have to keep 1 version before the 
latest commit time to keep");
-      assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
-
-      // make next commit, with 2 updates to existing files, and 1 insert
-      String file4P0C3 = 
testTable.addInflightCommit("00000000000007").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
-      Map<String, List<String>> part4ToFileId = 
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
-        {
-          put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, 
file4P0C3));
-        }
-      });
-      commitWithMdt("00000000000007", part4ToFileId, testTable, 
metadataWriter);
-      metaClient = HoodieTableMetaClient.reload(metaClient);
-
-      List<HoodieCleanStat> hoodieCleanStatsFour =
-          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 8, 
true);
-      // enableBootstrapSourceClean would delete the bootstrap base file as 
the same time
-      HoodieCleanStat partitionCleanStat = getCleanStat(hoodieCleanStatsFour, 
p0);
-
-      assertEquals(3, partitionCleanStat.getSuccessDeleteFiles().size());
-      assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
-      assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0));
-      assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0));
-      assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
-      assertTrue(testTable.baseFileExists(p0, "00000000000005", file2P0C1));
-      assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
-      assertTrue(testTable.baseFileExists(p0, "00000000000007", file4P0C3));
-      if (enableBootstrapSourceClean) {
-        assertEquals(1, 
partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size());
-        assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
-            p0).get(0).getBootstrapFileStatus().getPath().getUri())));
+    HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
getMetadataWriter(config), Option.of(context));
+    String p0 = "2020/01/01";
+    String p1 = "2020/01/02";
+    Map<String, List<BootstrapFileMapping>> bootstrapMapping = 
enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null;
+
+    // make 1 commit, with 1 file per partition
+    String file1P0C0 = enableBootstrapSourceClean ? 
bootstrapMapping.get(p0).get(0).getFileId()
+        : UUID.randomUUID().toString();
+    String file1P1C0 = enableBootstrapSourceClean ? 
bootstrapMapping.get(p1).get(0).getFileId()
+        : UUID.randomUUID().toString();
+    Map<String, List<String>> part1ToFileId = Collections.unmodifiableMap(new 
HashMap<String, List<String>>() {
+      {
+        put(p0, CollectionUtils.createImmutableList(file1P0C0));
+        put(p1, CollectionUtils.createImmutableList(file1P1C0));
       }
+    });
+    commitWithMdt("00000000000001", part1ToFileId, testTable, config, true, 
true);
+    testTable = tearDownTestTableAndReinit(testTable, config);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    List<HoodieCleanStat> hoodieCleanStatsOne =
+        runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 2, 
true);
+    assertEquals(0, hoodieCleanStatsOne.size(), "Must not scan any partitions 
and clean any files");
+    assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+    // make next commit, with 1 insert & 1 update per partition
+    Map<String, String> partitionAndFileId002 = 
testTable.addInflightCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p0,
 p1);
+    String file2P0C1 = partitionAndFileId002.get(p0);
+    String file2P1C1 = partitionAndFileId002.get(p1);
+    Map<String, List<String>> part2ToFileId = Collections.unmodifiableMap(new 
HashMap<String, List<String>>() {
+      {
+        put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1));
+        put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
+      }
+    });
+    commitWithMdt("00000000000003", part2ToFileId, testTable, config, true, 
true);
+    testTable = tearDownTestTableAndReinit(testTable, config);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    List<HoodieCleanStat> hoodieCleanStatsTwo =
+        runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 4, 
true);
+    assertEquals(0, hoodieCleanStatsTwo.size(), "Must not scan any partitions 
and clean any files");
+    assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
+    assertTrue(testTable.baseFileExists(p1, "00000000000003", file2P1C1));
+    assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertTrue(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
+
+    // make next commit, with 2 updates to existing files, and 1 insert
+    String file3P0C2 = 
testTable.addInflightCommit("00000000000005").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
+    Map<String, List<String>> part3ToFileId = Collections.unmodifiableMap(new 
HashMap<String, List<String>>() {
+      {
+        put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, 
file3P0C2));
+      }
+    });
+    commitWithMdt("00000000000005", part3ToFileId, testTable, config, true, 
true);
+    testTable = tearDownTestTableAndReinit(testTable, config);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    List<HoodieCleanStat> hoodieCleanStatsThree =
+        runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 6, 
true);
+    assertEquals(0, hoodieCleanStatsThree.size(),
+        "Must not clean any file. We have to keep 1 version before the latest 
commit time to keep");
+    assertTrue(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+
+    // make next commit, with 2 updates to existing files, and 1 insert
+    String file4P0C3 = 
testTable.addInflightCommit("00000000000007").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
+    Map<String, List<String>> part4ToFileId = Collections.unmodifiableMap(new 
HashMap<String, List<String>>() {
+      {
+        put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, 
file4P0C3));
+      }
+    });
+    commitWithMdt("00000000000007", part4ToFileId, testTable, config);
+    testTable = tearDownTestTableAndReinit(testTable, config);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    List<HoodieCleanStat> hoodieCleanStatsFour =
+        runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 8, 
true);
+    // enableBootstrapSourceClean would delete the bootstrap base file as the 
same time
+    HoodieCleanStat partitionCleanStat = getCleanStat(hoodieCleanStatsFour, 
p0);
+
+    assertEquals(3, partitionCleanStat.getSuccessDeleteFiles().size());
+    assertFalse(testTable.baseFileExists(p0, "00000000000001", file1P0C0));
+    assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0));
+    assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0));
+    assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
+    assertTrue(testTable.baseFileExists(p0, "00000000000005", file2P0C1));
+    assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
+    assertTrue(testTable.baseFileExists(p0, "00000000000007", file4P0C3));
+    if (enableBootstrapSourceClean) {
+      assertEquals(1, 
partitionCleanStat.getSuccessDeleteBootstrapBaseFiles().size());
+      assertFalse(Files.exists(Paths.get(bootstrapMapping.get(
+          p0).get(0).getBootstrapFileStatus().getPath().getUri())));
+    }
 
-      metaClient = HoodieTableMetaClient.reload(metaClient);
-
-      String file5P0C4 = 
testTable.addInflightCommit("00000000000009").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
-      Map<String, List<String>> part5ToFileId = 
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
-        {
-          put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, 
file5P0C4));
-        }
-      });
-      commitWithMdt("00000000000009", part5ToFileId, testTable, 
metadataWriter, true, true);
-      metaClient = HoodieTableMetaClient.reload(metaClient);
-
-      List<HoodieCleanStat> hoodieCleanStatsFive =
-          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 
10, true);
-
-      assertEquals(0, hoodieCleanStatsFive.size(), "Must not clean any files 
since at least 2 commits are needed from last clean operation before "
-          + "clean can be scheduled again");
-      assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0));
-      assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0));
-      assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
-      assertTrue(testTable.baseFileExists(p0, "00000000000005", file2P0C1));
-      assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
-      assertTrue(testTable.baseFileExists(p0, "00000000000007", file4P0C3));
+    metaClient = HoodieTableMetaClient.reload(metaClient);
 
-      // No cleaning on partially written file, with no commit.
-      testTable.forCommit("00000000000011").withBaseFilesInPartition(p0, 
file3P0C2);
-      HoodieCommitMetadata commitMetadata = 
generateCommitMetadata("00000000000011", Collections.singletonMap(p0,
-          CollectionUtils.createImmutableList(file3P0C2)));
-      metaClient.getActiveTimeline().createNewInstant(
-          new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.COMMIT_ACTION, "00000000000011"));
-      metaClient.getActiveTimeline().transitionRequestedToInflight(
-          new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.COMMIT_ACTION, "00000000000011"),
-          serializeCommitMetadata(commitMetadata));
-      List<HoodieCleanStat> hoodieCleanStatsFive2 =
-          runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 
12, true);
-      HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive2, p0);
-      assertNull(cleanStat, "Must not clean any files");
-      assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
-      assertTrue(testTable.baseFileExists(p0, "00000000000007", file4P0C3));
-    }
+    String file5P0C4 = 
testTable.addInflightCommit("00000000000009").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
+    Map<String, List<String>> part5ToFileId = Collections.unmodifiableMap(new 
HashMap<String, List<String>>() {
+      {
+        put(p0, CollectionUtils.createImmutableList(file1P0C0, file2P0C1, 
file5P0C4));
+      }
+    });
+    commitWithMdt("00000000000009", part5ToFileId, testTable, config, true, 
true);
+    testTable = tearDownTestTableAndReinit(testTable, config);
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    List<HoodieCleanStat> hoodieCleanStatsFive =
+        runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 10, 
true);
+
+    assertEquals(0, hoodieCleanStatsFive.size(), "Must not clean any files 
since at least 2 commits are needed from last clean operation before "
+        + "clean can be scheduled again");
+    assertTrue(testTable.baseFileExists(p0, "00000000000003", file1P0C0));
+    assertTrue(testTable.baseFileExists(p0, "00000000000005", file1P0C0));
+    assertTrue(testTable.baseFileExists(p0, "00000000000003", file2P0C1));
+    assertTrue(testTable.baseFileExists(p0, "00000000000005", file2P0C1));
+    assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
+    assertTrue(testTable.baseFileExists(p0, "00000000000007", file4P0C3));
+
+    // No cleaning on partially written file, with no commit.
+    testTable.forCommit("00000000000011").withBaseFilesInPartition(p0, 
file3P0C2);
+    HoodieCommitMetadata commitMetadata = 
generateCommitMetadata("00000000000011", Collections.singletonMap(p0,
+        CollectionUtils.createImmutableList(file3P0C2)));
+    metaClient.getActiveTimeline().createNewInstant(
+        new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.COMMIT_ACTION, "00000000000011"));
+    metaClient.getActiveTimeline().transitionRequestedToInflight(
+        new HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieTimeline.COMMIT_ACTION, "00000000000011"),
+        serializeCommitMetadata(commitMetadata));
+    List<HoodieCleanStat> hoodieCleanStatsFive2 =
+        runCleaner(config, simulateFailureRetry, simulateMetadataFailure, 12, 
true);
+    HoodieCleanStat cleanStat = getCleanStat(hoodieCleanStatsFive2, p0);
+    assertNull(cleanStat, "Must not clean any files");
+    assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
+    assertTrue(testTable.baseFileExists(p0, "00000000000007", file4P0C3));
   }
 
   /**
@@ -263,9 +264,9 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
                 
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
             .build();
 
-    try (HoodieTableMetadataWriter metadataWriter = 
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
-      HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
metadataWriter, Option.of(context));
+    HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
getMetadataWriter(config), Option.of(context));
 
+    try {
       final String p0 = "2020/01/01";
       final String p1 = "2020/01/02";
 
@@ -331,6 +332,8 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
       List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
       assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files");
       assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
+    } finally {
+      testTable.close();
     }
   }
 
@@ -345,9 +348,9 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
                 
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainFileVersions(1).build())
             .build();
 
-    try (HoodieTableMetadataWriter metadataWriter = 
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
-      HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
metadataWriter, Option.of(context));
+    HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
getMetadataWriter(config), Option.of(context));
 
+    try {
       final String p0 = "2020/01/01";
       final String p1 = "2020/01/02";
       final Map<String, List<BootstrapFileMapping>> bootstrapMapping = 
generateBootstrapIndexAndSourceData(p0, p1);
@@ -431,6 +434,8 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
       List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
       assertEquals(0, hoodieCleanStatsFour.size(), "Must not clean any files");
       assertTrue(testTable.baseFileExists(p0, "00000000000005", file3P0C2));
+    } finally {
+      testTable.close();
     }
   }
 
@@ -451,8 +456,8 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
             .build()).build();
 
     HoodieTableMetaClient metaClient = HoodieTestUtils.init(storageConf, 
basePath, HoodieTableType.MERGE_ON_READ);
-    try (HoodieTableMetadataWriter metadataWriter = 
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
-      HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+    HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
getMetadataWriter(config), Option.of(context));
+    try {
       String p0 = "2020/01/01";
       // Make 3 files, one base file and 2 log files associated with base file
       String file1P0 = 
testTable.addDeltaCommit("000").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
@@ -461,13 +466,15 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
           put(p0, CollectionUtils.createImmutableList(file1P0));
         }
       });
-      commitWithMdt("000", part1ToFileId, testTable, metadataWriter, true, 
true);
+      commitWithMdt("000", part1ToFileId, testTable, config, true, true);
+      testTable = tearDownTestTableAndReinit(testTable, config);
 
       // Make 2 files, one base file and 1 log files associated with base file
       testTable.addDeltaCommit("001")
           .withBaseFilesInPartition(p0, file1P0).getLeft()
           .withLogFile(p0, file1P0, 3);
-      commitWithMdt("001", part1ToFileId, testTable, metadataWriter, true, 
true);
+      commitWithMdt("001", part1ToFileId, testTable, config, true, true);
+      testTable = tearDownTestTableAndReinit(testTable, config);
 
       List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);
       assertEquals(3,
@@ -477,6 +484,8 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
       assertFalse(testTable.logFilesExist(p0, "000", file1P0, 1, 2));
       assertTrue(testTable.baseFileExists(p0, "001", file1P0));
       assertTrue(testTable.logFileExists(p0, "001", file1P0, 3));
+    } finally {
+      testTable.close();
     }
   }
 
@@ -496,8 +505,8 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
         .build();
 
     HoodieTableMetaClient metaClient = HoodieTestUtils.init(storageConf, 
basePath, HoodieTableType.MERGE_ON_READ);
-    try (HoodieTableMetadataWriter metadataWriter = 
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
-      HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+    HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
getMetadataWriter(config), Option.of(context));
+    try {
       String p0 = "2020/01/01";
       // Make 3 files, one base file and 2 log files associated with base file
       String file1P0 = 
testTable.addDeltaCommit("000").getFileIdsWithBaseFilesInPartitions(p0).get(p0);
@@ -506,19 +515,22 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
           put(p0, CollectionUtils.createImmutableList(file1P0));
         }
       });
-      commitWithMdt("000", part1ToFileId, testTable, metadataWriter, true, 
true);
+      commitWithMdt("000", part1ToFileId, testTable, config, true, true);
+      testTable = tearDownTestTableAndReinit(testTable, config);
 
       // Make 2 files, one base file and 1 log files associated with base file
       testTable.addDeltaCommit("001")
           .withBaseFilesInPartition(p0, file1P0).getLeft()
           .withLogFile(p0, file1P0, 3);
-      commitWithMdt("001", part1ToFileId, testTable, metadataWriter, true, 
true);
+      commitWithMdt("001", part1ToFileId, testTable, config, true, true);
+      testTable = tearDownTestTableAndReinit(testTable, config);
 
       // Make 2 files, one base file and 1 log files associated with base file
       testTable.addDeltaCommit("002")
           .withBaseFilesInPartition(p0, file1P0).getLeft()
           .withLogFile(p0, file1P0, 4);
-      commitWithMdt("002", part1ToFileId, testTable, metadataWriter, true, 
true);
+      commitWithMdt("002", part1ToFileId, testTable, config, true, true);
+      testTable = tearDownTestTableAndReinit(testTable, config);
 
       List<HoodieCleanStat> hoodieCleanStats = runCleaner(config);
       assertEquals(3,
@@ -530,6 +542,8 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
       assertTrue(testTable.logFileExists(p0, "001", file1P0, 3));
       assertTrue(testTable.baseFileExists(p0, "002", file1P0));
       assertTrue(testTable.logFileExists(p0, "002", file1P0, 4));
+    } finally {
+      testTable.close();
     }
   }
 
@@ -586,8 +600,8 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
     String file1P2 = UUID.randomUUID().toString();
     String file2P2 = UUID.randomUUID().toString();
 
-    try (HoodieTableMetadataWriter metadataWriter = 
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
-      HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
metadataWriter, Option.of(context));
+    HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
getMetadataWriter(config), Option.of(context));
+    try {
       testTable.withPartitionMetaFiles(p1, p2);
       Map<String, List<String>> part1ToFileId = 
Collections.unmodifiableMap(new HashMap<String, List<String>>() {
         {
@@ -595,9 +609,11 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
           put(p2, CollectionUtils.createImmutableList(file1P2, file2P2));
         }
       });
-      commitWithMdt(commitInstant, part1ToFileId, testTable, metadataWriter, 
true, true);
+      commitWithMdt(commitInstant, part1ToFileId, testTable, config, true, 
true);
+      testTable = tearDownTestTableAndReinit(testTable, config);
 
       testTable.addDeletePartitionCommit(deleteInstant1, p1, 
Arrays.asList(file1P1, file2P1));
+      testTable = tearDownTestTableAndReinit(testTable, config);
       testTable.addDeletePartitionCommit(deleteInstant2, p2, 
Arrays.asList(file1P2, file2P2));
 
       runCleaner(config);
@@ -613,6 +629,8 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
         assertTrue(testTable.baseFileExists(p2, commitInstant, file1P2), "p2 
retained");
         assertTrue(testTable.baseFileExists(p2, commitInstant, file2P2), "p2 
retained");
       }
+    } finally {
+      testTable.close();
     }
   }
 
@@ -634,8 +652,8 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
             .build())
         .build();
 
-    try (HoodieTableMetadataWriter metadataWriter = 
SparkHoodieBackedTableMetadataWriter.create(storageConf, config, context)) {
-      HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
metadataWriter, Option.of(context));
+    HoodieTestTable testTable = HoodieMetadataTestTable.of(metaClient, 
getMetadataWriter(config), Option.of(context));
+    try {
       String p0 = "2020/01/01";
       String p1 = "2020/01/02";
       Map<String, List<BootstrapFileMapping>> bootstrapMapping = 
enableBootstrapSourceClean ? generateBootstrapIndexAndSourceData(p0, p1) : null;
@@ -654,7 +672,8 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
           put(p1, CollectionUtils.createImmutableList(file1P1C0));
         }
       });
-      commitWithMdt(firstCommitTs, part1ToFileId, testTable, metadataWriter, 
true, true);
+      commitWithMdt(firstCommitTs, part1ToFileId, testTable, config, true, 
true);
+      testTable = tearDownTestTableAndReinit(testTable, config);
       metaClient = HoodieTableMetaClient.reload(metaClient);
 
       List<HoodieCleanStat> hoodieCleanStatsOne =
@@ -675,7 +694,8 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
           put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1));
         }
       });
-      commitWithMdt(secondCommitTs, part2ToFileId, testTable, metadataWriter, 
true, true);
+      commitWithMdt(secondCommitTs, part2ToFileId, testTable, config, true, 
true);
+      testTable = tearDownTestTableAndReinit(testTable, config);
       metaClient = HoodieTableMetaClient.reload(metaClient);
 
       // make next commit, with 1 insert per partition
@@ -690,7 +710,8 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
           put(p1, CollectionUtils.createImmutableList(file1P1C0, file2P1C1, 
file3P1C1));
         }
       });
-      commitWithMdt(thirdCommitTs, part3ToFileId, testTable, metadataWriter, 
true, true);
+      commitWithMdt(thirdCommitTs, part3ToFileId, testTable, config, true, 
true);
+      testTable = tearDownTestTableAndReinit(testTable, config);
       metaClient = HoodieTableMetaClient.reload(metaClient);
 
       List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config, 
simulateFailureRetry, simulateMetadataFailure);
@@ -705,6 +726,8 @@ public class TestCleanPlanExecutor extends 
HoodieCleanerTestBase {
       assertTrue(testTable.baseFileExists(p1, secondCommitTs, file1P1C0));
       assertFalse(testTable.baseFileExists(p0, firstCommitTs, file1P0C0));
       assertFalse(testTable.baseFileExists(p1, firstCommitTs, file1P1C0));
+    } finally {
+      testTable.close();
     }
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
index 83d5e2c54bb..907d9ba9152 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.FileCreateUtils;
+import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.CleanerUtils;
@@ -39,6 +40,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
 import org.apache.hudi.storage.StoragePath;
 
 import java.io.File;
@@ -176,12 +178,12 @@ public class HoodieCleanerTestBase extends 
HoodieClientTestBase {
   }
 
   public void commitWithMdt(String instantTime, Map<String, List<String>> 
partToFileId,
-                            HoodieTestTable testTable, 
HoodieTableMetadataWriter metadataWriter) throws Exception {
-    commitWithMdt(instantTime, partToFileId, testTable, metadataWriter, true, 
false);
+                            HoodieTestTable testTable, HoodieWriteConfig 
config) throws Exception {
+    commitWithMdt(instantTime, partToFileId, testTable, config, true, false);
   }
 
   public void commitWithMdt(String instantTime, Map<String, List<String>> 
partToFileId,
-                            HoodieTestTable testTable, 
HoodieTableMetadataWriter metadataWriter, boolean addBaseFiles, boolean 
addLogFiles) throws Exception {
+                            HoodieTestTable testTable, HoodieWriteConfig 
config, boolean addBaseFiles, boolean addLogFiles) throws Exception {
     testTable.addInflightCommit(instantTime);
     Map<String, List<String>> partToFileIds = new HashMap<>();
     partToFileId.forEach((key, value) -> {
@@ -206,12 +208,23 @@ public class HoodieCleanerTestBase extends 
HoodieClientTestBase {
       }
     });
     HoodieCommitMetadata commitMeta = generateCommitMetadata(instantTime, 
partToFileIds);
-    metadataWriter.performTableServices(Option.of(instantTime));
-    metadataWriter.updateFromWriteStatuses(commitMeta, 
context.emptyHoodieData(), instantTime);
-    metaClient.getActiveTimeline().saveAsComplete(
-        new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, instantTime),
-        serializeCommitMetadata(commitMeta));
-    metaClient = HoodieTableMetaClient.reload(metaClient);
+    try (HoodieTableMetadataWriter metadataWriter = getMetadataWriter(config)) 
{
+      metadataWriter.performTableServices(Option.of(instantTime));
+      metadataWriter.updateFromWriteStatuses(commitMeta, 
context.emptyHoodieData(), instantTime);
+      metaClient.getActiveTimeline().saveAsComplete(
+          new HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.COMMIT_ACTION, instantTime),
+          serializeCommitMetadata(commitMeta));
+      metaClient = HoodieTableMetaClient.reload(metaClient);
+    }
+  }
+
+  protected HoodieTableMetadataWriter getMetadataWriter(HoodieWriteConfig 
config) {
+    return SparkHoodieBackedTableMetadataWriter.create(storageConf, config, 
context);
+  }
+
+  protected HoodieTestTable tearDownTestTableAndReinit(HoodieTestTable 
testTable, HoodieWriteConfig config) throws Exception {
+    testTable.close();
+    return HoodieMetadataTestTable.of(metaClient, getMetadataWriter(config), 
Option.of(context));
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 04839c57a94..0ef17e9e233 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -695,6 +695,10 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   public void close() {
     closePartitionReaders();
     partitionFileSliceMap.clear();
+    if (this.metadataFileSystemView != null) {
+      this.metadataFileSystemView.close();
+      this.metadataFileSystemView = null;
+    }
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index eb95c8e27fc..ab27afd17c4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1078,21 +1078,28 @@ public class HoodieTableMetadataUtil {
                                                         
Option<HoodieTableFileSystemView> fileSystemView,
                                                         String partition,
                                                         boolean 
mergeFileSlices) {
-    HoodieTableFileSystemView fsView = fileSystemView.orElseGet(() -> 
getFileSystemView(metaClient));
-    Stream<FileSlice> fileSliceStream;
-    if (mergeFileSlices) {
-      if 
(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent())
 {
-        fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(
-            // including pending compaction instant as the last instant so 
that the finished delta commits
-            // that start earlier than the compaction can be queried.
-            partition, 
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant().get().getTimestamp());
+    HoodieTableFileSystemView fsView = null;
+    try {
+      fsView = fileSystemView.orElseGet(() -> getFileSystemView(metaClient));
+      Stream<FileSlice> fileSliceStream;
+      if (mergeFileSlices) {
+        if 
(metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent())
 {
+          fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(
+              // including pending compaction instant as the last instant so 
that the finished delta commits
+              // that start earlier than the compaction can be queried.
+              partition, 
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants().lastInstant().get().getTimestamp());
+        } else {
+          return Collections.emptyList();
+        }
       } else {
-        return Collections.emptyList();
+        fileSliceStream = fsView.getLatestFileSlices(partition);
+      }
+      return 
fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(Collectors.toList());
+    } finally {
+      if (!fileSystemView.isPresent()) {
+        fsView.close();
       }
-    } else {
-      fileSliceStream = fsView.getLatestFileSlices(partition);
     }
-    return 
fileSliceStream.sorted(Comparator.comparing(FileSlice::getFileId)).collect(Collectors.toList());
   }
 
   /**
@@ -1106,11 +1113,18 @@ public class HoodieTableMetadataUtil {
   public static List<FileSlice> 
getPartitionLatestFileSlicesIncludingInflight(HoodieTableMetaClient metaClient,
                                                                               
Option<HoodieTableFileSystemView> fileSystemView,
                                                                               
String partition) {
-    HoodieTableFileSystemView fsView = fileSystemView.orElseGet(() -> 
getFileSystemView(metaClient));
-    Stream<FileSlice> fileSliceStream = 
fsView.getLatestFileSlicesIncludingInflight(partition);
-    return fileSliceStream
-        .sorted(Comparator.comparing(FileSlice::getFileId))
-        .collect(Collectors.toList());
+    HoodieTableFileSystemView fsView = null;
+    try {
+      fsView = fileSystemView.orElseGet(() -> getFileSystemView(metaClient));
+      Stream<FileSlice> fileSliceStream = 
fsView.getLatestFileSlicesIncludingInflight(partition);
+      return fileSliceStream
+          .sorted(Comparator.comparing(FileSlice::getFileId))
+          .collect(Collectors.toList());
+    } finally {
+      if (!fileSystemView.isPresent()) {
+        fsView.close();
+      }
+    }
   }
 
   public static HoodieData<HoodieRecord> 
convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index e52949bfddc..e7e442f3ed6 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -138,7 +138,7 @@ import static 
org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
 /**
  * Test Hoodie Table for testing only.
  */
-public class HoodieTestTable {
+public class HoodieTestTable implements AutoCloseable {
 
   public static final String PHONY_TABLE_SCHEMA =
       "{\"namespace\": \"org.apache.hudi.avro.model\", \"type\": \"record\", 
\"name\": \"PhonyRecord\", \"fields\": []}";
@@ -1334,6 +1334,11 @@ public class HoodieTestTable {
     return writeStats;
   }
 
+  @Override
+  public void close() throws Exception {
+    // no-op
+  }
+
   /**
    * Exception for {@link HoodieTestTable}.
    */
@@ -1440,4 +1445,5 @@ public class HoodieTestTable {
       return this.commitsToPartitionToLogFileInfoStats.get(commitTime);
     }
   }
+
 }

Reply via email to