This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/rfc-15 by this push:
     new 11661dc  [HUDI-1325] [RFC-15] Merge updates of unsynced instants to 
metadata table (#2342)
11661dc is described below

commit 11661dc6aa03a8b8d39f823f9991ed8402a35ef1
Author: rmpifer <[email protected]>
AuthorDate: Mon Dec 28 09:52:02 2020 -0800

    [HUDI-1325] [RFC-15] Merge updates of unsynced instants to metadata table 
(#2342)
    
    Co-authored-by: Ryan Pifer <[email protected]>
---
 .../metadata/HoodieBackedTableMetadataWriter.java  | 227 +--------------
 .../hudi/client/TestCompactionAdminClient.java     |   6 +
 ...Metadata.java => TestHoodieBackedMetadata.java} | 158 +++++++++--
 .../hudi/table/upgrade/TestUpgradeDowngrade.java   |   6 +
 .../hudi/testutils/HoodieClientTestHarness.java    |   7 +-
 .../hudi/metadata/AbstractHoodieTableMetadata.java | 303 ++++++++++++++++++++
 .../hudi/metadata/HoodieBackedTableMetadata.java   | 205 +------------
 .../HoodieMetadataMergedInstantRecordScanner.java  | 115 ++++++++
 .../hudi/metadata/HoodieTableMetadataUtil.java     | 316 +++++++++++++++++++++
 9 files changed, 906 insertions(+), 437 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index f89a198..c3ba2a9 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -42,13 +42,10 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
-import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -56,7 +53,6 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieMetricsConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.metrics.DistributedRegistry;
@@ -72,18 +68,14 @@ import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
-import static 
org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
 
 /**
@@ -232,7 +224,7 @@ public class HoodieBackedTableMetadataWriter implements 
HoodieTableMetadataWrite
     return metadataWriteConfig;
   }
 
-  public HoodieTableMetadata metadata() {
+  public HoodieBackedTableMetadata metadata() {
     return metadata;
   }
 
@@ -413,38 +405,12 @@ public class HoodieBackedTableMetadataWriter implements 
HoodieTableMetadataWrite
       LOG.info("Syncing " + instantsToSync.size() + " instants to metadata 
table: " + instantsToSync);
 
       // Read each instant in order and sync it to metadata table
-      final HoodieActiveTimeline timeline = 
datasetMetaClient.getActiveTimeline();
       for (HoodieInstant instant : instantsToSync) {
         LOG.info("Syncing instant " + instant + " to metadata table");
-        ValidationUtils.checkArgument(instant.isCompleted(), "Only completed 
instants can be synced.");
-
-        switch (instant.getAction()) {
-          case HoodieTimeline.CLEAN_ACTION:
-            HoodieCleanMetadata cleanMetadata = 
CleanerUtils.getCleanerMetadata(datasetMetaClient, instant);
-            update(cleanMetadata, instant.getTimestamp());
-            break;
-          case HoodieTimeline.DELTA_COMMIT_ACTION:
-          case HoodieTimeline.COMMIT_ACTION:
-          case HoodieTimeline.COMPACTION_ACTION:
-            HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(
-                timeline.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
-            update(commitMetadata, instant.getTimestamp());
-            break;
-          case HoodieTimeline.ROLLBACK_ACTION:
-            HoodieRollbackMetadata rollbackMetadata = 
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
-                timeline.getInstantDetails(instant).get());
-            update(rollbackMetadata, instant.getTimestamp());
-            break;
-          case HoodieTimeline.RESTORE_ACTION:
-            HoodieRestoreMetadata restoreMetadata = 
TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
-                timeline.getInstantDetails(instant).get());
-            update(restoreMetadata, instant.getTimestamp());
-            break;
-          case HoodieTimeline.SAVEPOINT_ACTION:
-            // Nothing to be done here
-            break;
-          default:
-            throw new HoodieException("Unknown type of action " + 
instant.getAction());
+
+        Option<List<HoodieRecord>> records = 
HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient, instant, 
metadata.getSyncedInstantTime());
+        if (records.isPresent()) {
+          commit(jsc, prepRecords(jsc, records.get(), 
MetadataPartitionType.FILES.partitionPath()), instant.getTimestamp());
         }
       }
       // re-init the table metadata, for any future writes.
@@ -466,39 +432,7 @@ public class HoodieBackedTableMetadataWriter implements 
HoodieTableMetadataWrite
       return;
     }
 
-    List<HoodieRecord> records = new LinkedList<>();
-    List<String> allPartitions = new LinkedList<>();
-    commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, 
writeStats) -> {
-      final String partition = partitionStatName.equals("") ? 
NON_PARTITIONED_NAME : partitionStatName;
-      allPartitions.add(partition);
-
-      Map<String, Long> newFiles = new HashMap<>(writeStats.size());
-      writeStats.forEach(hoodieWriteStat -> {
-        String pathWithPartition = hoodieWriteStat.getPath();
-        if (pathWithPartition == null) {
-          // Empty partition
-          LOG.warn("Unable to find path in write stat to update metadata table 
" + hoodieWriteStat);
-          return;
-        }
-
-        int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : 
partition.length() + 1;
-        String filename = pathWithPartition.substring(offset);
-        ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate 
files in HoodieCommitMetadata");
-        newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes());
-      });
-
-      // New files added to a partition
-      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
-          partition, Option.of(newFiles), Option.empty());
-      records.add(record);
-    });
-
-    // New partitions created
-    HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new 
ArrayList<>(allPartitions));
-    records.add(record);
-
-    LOG.info("Updating at " + instantTime + " from Commit/" + 
commitMetadata.getOperationType()
-        + ". #partitions_updated=" + records.size());
+    List<HoodieRecord> records = 
HoodieTableMetadataUtil.convertMetadataToRecords(commitMetadata, instantTime);
     commit(jsc, prepRecords(jsc, records, 
MetadataPartitionType.FILES.partitionPath()), instantTime);
   }
 
@@ -514,21 +448,7 @@ public class HoodieBackedTableMetadataWriter implements 
HoodieTableMetadataWrite
       return;
     }
 
-    List<HoodieRecord> records = new LinkedList<>();
-    int[] fileDeleteCount = {0};
-    cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition, 
deletedPathInfo) -> {
-      fileDeleteCount[0] += deletedPathInfo.size();
-
-      // Files deleted from a partition
-      List<String> deletedFilenames = deletedPathInfo.stream().map(p -> new 
Path(p.getFilePath()).getName())
-          .collect(Collectors.toList());
-      HoodieRecord record = 
HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
-          Option.of(deletedFilenames));
-      records.add(record);
-    });
-
-    LOG.info("Updating at " + instantTime + " from CleanerPlan. 
#partitions_updated=" + records.size()
-        + ", #files_deleted=" + fileDeleteCount[0]);
+    List<HoodieRecord> records = 
HoodieTableMetadataUtil.convertMetadataToRecords(cleanerPlan, instantTime);
     commit(jsc, prepRecords(jsc, records, 
MetadataPartitionType.FILES.partitionPath()), instantTime);
   }
 
@@ -544,21 +464,7 @@ public class HoodieBackedTableMetadataWriter implements 
HoodieTableMetadataWrite
       return;
     }
 
-    List<HoodieRecord> records = new LinkedList<>();
-    int[] fileDeleteCount = {0};
-
-    cleanMetadata.getPartitionMetadata().forEach((partition, 
partitionMetadata) -> {
-      // Files deleted from a partition
-      List<String> deletedFiles = partitionMetadata.getSuccessDeleteFiles();
-      HoodieRecord record = 
HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
-          Option.of(new ArrayList<>(deletedFiles)));
-
-      records.add(record);
-      fileDeleteCount[0] += deletedFiles.size();
-    });
-
-    LOG.info("Updating at " + instantTime + " from Clean. 
#partitions_updated=" + records.size()
-        + ", #files_deleted=" + fileDeleteCount[0]);
+    List<HoodieRecord> records = 
HoodieTableMetadataUtil.convertMetadataToRecords(cleanMetadata, instantTime);
     commit(jsc, prepRecords(jsc, records, 
MetadataPartitionType.FILES.partitionPath()), instantTime);
   }
 
@@ -574,12 +480,8 @@ public class HoodieBackedTableMetadataWriter implements 
HoodieTableMetadataWrite
       return;
     }
 
-    Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
-    Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
-    restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
-      rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles, 
partitionToAppendedFiles));
-    });
-    commitRollback(jsc, partitionToDeletedFiles, partitionToAppendedFiles, 
instantTime, "Restore");
+    List<HoodieRecord> records = 
HoodieTableMetadataUtil.convertMetadataToRecords(restoreMetadata, instantTime, 
metadata.getSyncedInstantTime());
+    commit(jsc, prepRecords(jsc, records, 
MetadataPartitionType.FILES.partitionPath()), instantTime);
   }
 
   /**
@@ -594,114 +496,7 @@ public class HoodieBackedTableMetadataWriter implements 
HoodieTableMetadataWrite
       return;
     }
 
-    Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
-    Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
-    processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, 
partitionToAppendedFiles);
-    commitRollback(jsc, partitionToDeletedFiles, partitionToAppendedFiles, 
instantTime, "Rollback");
-  }
-
-  /**
-   * Extracts information about the deleted and append files from the {@code 
HoodieRollbackMetadata}.
-   *
-   * During a rollback files may be deleted (COW, MOR) or rollback blocks be 
appended (MOR only) to files. This
-   * function will extract this change file for each partition.
-   *
-   * @param rollbackMetadata {@code HoodieRollbackMetadata}
-   * @param partitionToDeletedFiles The {@code Map} to fill with files deleted 
per partition.
-   * @param partitionToAppendedFiles The {@code Map} to fill with files 
appended per partition and their sizes.
-   */
-  private void processRollbackMetadata(HoodieRollbackMetadata rollbackMetadata,
-                                       Map<String, List<String>> 
partitionToDeletedFiles,
-                                       Map<String, Map<String, Long>> 
partitionToAppendedFiles) {
-    rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
-      final String partition = pm.getPartitionPath();
-
-      if (!pm.getSuccessDeleteFiles().isEmpty()) {
-        if (!partitionToDeletedFiles.containsKey(partition)) {
-          partitionToDeletedFiles.put(partition, new ArrayList<>());
-        }
-
-        // Extract deleted file name from the absolute paths saved in 
getSuccessDeleteFiles()
-        List<String> deletedFiles = pm.getSuccessDeleteFiles().stream().map(p 
-> new Path(p).getName())
-            .collect(Collectors.toList());
-        partitionToDeletedFiles.get(partition).addAll(deletedFiles);
-      }
-
-      if (!pm.getAppendFiles().isEmpty()) {
-        if (!partitionToAppendedFiles.containsKey(partition)) {
-          partitionToAppendedFiles.put(partition, new HashMap<>());
-        }
-
-        // Extract appended file name from the absolute paths saved in 
getAppendFiles()
-        pm.getAppendFiles().forEach((path, size) -> {
-          partitionToAppendedFiles.get(partition).merge(new 
Path(path).getName(), size, (oldSize, newSizeCopy) -> {
-            return size + oldSize;
-          });
-        });
-      }
-    });
-  }
-
-  /**
-   * Create file delete records and commit.
-   *
-   * @param partitionToDeletedFiles {@code Map} of partitions and the deleted 
files
-   * @param instantTime Timestamp at which the deletes took place
-   * @param operation Type of the operation which caused the files to be 
deleted
-   */
-  private void commitRollback(JavaSparkContext jsc, Map<String, List<String>> 
partitionToDeletedFiles,
-                              Map<String, Map<String, Long>> 
partitionToAppendedFiles, String instantTime,
-                              String operation) {
-    List<HoodieRecord> records = new LinkedList<>();
-    int[] fileChangeCount = {0, 0}; // deletes, appends
-
-    partitionToDeletedFiles.forEach((partition, deletedFiles) -> {
-      // Rollbacks deletes instants from timeline. The instant being 
rolled-back may not have been synced to the
-      // metadata table. Hence, the deleted filed need to be checked against 
the metadata.
-      try {
-        FileStatus[] existingStatuses = metadata.fetchAllFilesInPartition(new 
Path(metadata.getDatasetBasePath(), partition));
-        Set<String> currentFiles =
-            Arrays.stream(existingStatuses).map(s -> 
s.getPath().getName()).collect(Collectors.toSet());
-
-        int origCount = deletedFiles.size();
-        deletedFiles.removeIf(f -> !currentFiles.contains(f));
-        if (deletedFiles.size() != origCount) {
-          LOG.warn("Some Files to be deleted as part of " + operation + " at " 
+ instantTime + " were not found in the "
-              + " metadata for partition " + partition
-              + ". To delete = " + origCount + ", found=" + 
deletedFiles.size());
-        }
-
-        fileChangeCount[0] += deletedFiles.size();
-
-        Option<Map<String, Long>> filesAdded = Option.empty();
-        if (partitionToAppendedFiles.containsKey(partition)) {
-          filesAdded = Option.of(partitionToAppendedFiles.remove(partition));
-        }
-
-        HoodieRecord record = 
HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded,
-            Option.of(new ArrayList<>(deletedFiles)));
-        records.add(record);
-      } catch (IOException e) {
-        throw new HoodieMetadataException("Failed to commit rollback deletes 
at instant " + instantTime, e);
-      }
-    });
-
-    partitionToAppendedFiles.forEach((partition, appendedFileMap) -> {
-      fileChangeCount[1] += appendedFileMap.size();
-
-      // Validate that no appended file has been deleted
-      ValidationUtils.checkState(
-          
!appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition,
 Collections.emptyList())),
-            "Rollback file cannot both be appended and deleted");
-
-      // New files added to a partition
-      HoodieRecord record = 
HoodieMetadataPayload.createPartitionFilesRecord(partition, 
Option.of(appendedFileMap),
-          Option.empty());
-      records.add(record);
-    });
-
-    LOG.info("Updating at " + instantTime + " from " + operation + ". 
#partitions_updated=" + records.size()
-        + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + 
fileChangeCount[1]);
+    List<HoodieRecord> records = 
HoodieTableMetadataUtil.convertMetadataToRecords(rollbackMetadata, instantTime, 
metadata.getSyncedInstantTime());
     commit(jsc, prepRecords(jsc, records, 
MetadataPartitionType.FILES.partitionPath()), instantTime);
   }
 
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
 
b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
index 1200f67..c42110e 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/client/TestCompactionAdminClient.java
@@ -37,6 +37,7 @@ import org.apache.hudi.testutils.HoodieClientTestBase;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
@@ -70,6 +71,11 @@ public class TestCompactionAdminClient extends 
HoodieClientTestBase {
     client = new CompactionAdminClient(jsc, basePath);
   }
 
+  @AfterEach
+  public void cleanUp() throws Exception {
+    cleanupResources();
+  }
+
   @Test
   public void testUnscheduleCompactionPlan() throws Exception {
     int numEntriesPerInstant = 10;
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java 
b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
similarity index 86%
rename from 
hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
rename to 
hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
index 2823035..48d07e5 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
@@ -73,10 +73,11 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
-public class TestHoodieFsMetadata extends HoodieClientTestHarness {
-  private static final Logger LOG = 
LogManager.getLogger(TestHoodieFsMetadata.class);
+public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
+  private static final Logger LOG = 
LogManager.getLogger(TestHoodieBackedMetadata.class);
 
   @TempDir
   public java.nio.file.Path tempFolder;
@@ -91,7 +92,7 @@ public class TestHoodieFsMetadata extends 
HoodieClientTestHarness {
     initSparkContexts("TestHoodieMetadata");
     initFileSystem();
     fs.mkdirs(new Path(basePath));
-    initMetaClient();
+    initMetaClient(tableType);
     initTestDataGenerator();
     metadataTableBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(basePath);
   }
@@ -168,6 +169,7 @@ public class TestHoodieFsMetadata extends 
HoodieClientTestHarness {
   //@ParameterizedTest
   //@EnumSource(HoodieTableType.class)
   //public void testTableOperations(HoodieTableType tableType) throws 
Exception {
+  @Test
   public void testTableOperations() throws Exception {
     //FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
     init(HoodieTableType.COPY_ON_WRITE);
@@ -257,6 +259,7 @@ public class TestHoodieFsMetadata extends 
HoodieClientTestHarness {
   //@ParameterizedTest
   //@EnumSource(HoodieTableType.class)
   //public void testRollbackOperations(HoodieTableType tableType) throws 
Exception {
+  @Test
   public void testRollbackOperations() throws Exception {
     //FIXME(metadata): This is broken for MOR, until HUDI-1434 is fixed
     init(HoodieTableType.COPY_ON_WRITE);
@@ -364,6 +367,40 @@ public class TestHoodieFsMetadata extends 
HoodieClientTestHarness {
   }
 
   /**
+   * Test when syncing rollback to metadata if the commit being rolled back 
has not been synced that essentially a no-op
+   * occurs to metadata.
+   * @throws Exception
+   */
+  @Test
+  public void testRollbackUnsyncedCommit() throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE);
+
+    try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, 
getWriteConfig(true, true))) {
+      // Initialize table with metadata
+      String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+      client.startCommitWithTime(newCommitTime);
+      List<WriteStatus> writeStatuses = 
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+    }
+
+    String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
+    try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, 
getWriteConfig(true, false))) {
+      // Commit with metadata disabled
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 10);
+      List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 
1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      client.rollback(newCommitTime);
+    }
+
+    try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, 
getWriteConfig(true, true))) {
+      validateMetadata(client);
+    }
+  }
+
+  /**
    * Test sync of table operations.
    */
   //@ParameterizedTest
@@ -472,7 +509,6 @@ public class TestHoodieFsMetadata extends 
HoodieClientTestHarness {
       validateMetadata(client);
       assertTrue(metadata(client).isInSync());
     }
-
   }
 
   /**
@@ -620,31 +656,100 @@ public class TestHoodieFsMetadata extends 
HoodieClientTestHarness {
   }
 
   /**
+   * Test when reading from metadata table which is out of sync with dataset 
that results are still consistent.
+   */
+  //  @ParameterizedTest
+  //  @EnumSource(HoodieTableType.class)
+  @Test
+  public void testMetadataOutOfSync() throws Exception {
+    init(HoodieTableType.COPY_ON_WRITE);
+
+    HoodieWriteClient unsyncedClient = new HoodieWriteClient<>(jsc, 
getWriteConfig(true, true));
+
+    // Enable metadata so table is initialized
+    try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, 
getWriteConfig(true, true))) {
+      // Perform Bulk Insert
+      String newCommitTime = "001";
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+      client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+    }
+
+    // Perform commit operations with metadata disabled
+    try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, 
getWriteConfig(true, false))) {
+      // Perform Insert
+      String newCommitTime = "002";
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+      client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
+
+      // Perform Upsert
+      newCommitTime = "003";
+      client.startCommitWithTime(newCommitTime);
+      records = dataGen.generateUniqueUpdates(newCommitTime, 20);
+      client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+
+      // Compaction
+      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
+        newCommitTime = "004";
+        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
+        client.compact(newCommitTime);
+      }
+    }
+
+    assertFalse(metadata(unsyncedClient).isInSync());
+    validateMetadata(unsyncedClient);
+
+    // Perform clean operation with metadata disabled
+    try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, 
getWriteConfig(true, false))) {
+      // One more commit needed to trigger clean so upsert and compact
+      String newCommitTime = "005";
+      client.startCommitWithTime(newCommitTime);
+      List<HoodieRecord> records = dataGen.generateUpdates(newCommitTime, 20);
+      client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
+
+      if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
+        newCommitTime = "006";
+        client.scheduleCompactionAtInstant(newCommitTime, Option.empty());
+        client.compact(newCommitTime);
+      }
+
+      // Clean
+      newCommitTime = "007";
+      client.clean(newCommitTime);
+    }
+
+    assertFalse(metadata(unsyncedClient).isInSync());
+    validateMetadata(unsyncedClient);
+
+    // Perform restore with metadata disabled
+    try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, 
getWriteConfig(true, false))) {
+      client.restoreToInstant("004");
+    }
+
+    assertFalse(metadata(unsyncedClient).isInSync());
+    validateMetadata(unsyncedClient);
+  }
+
+  /**
    * Validate the metadata tables contents to ensure it matches what is on the 
file system.
    *
    * @throws IOException
    */
   private void validateMetadata(HoodieWriteClient client) throws IOException {
     HoodieWriteConfig config = client.getConfig();
-    HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
-    assertNotNull(metadataWriter, "MetadataWriter should have been 
initialized");
+
+    HoodieBackedTableMetadata tableMetadata = metadata(client);
+    assertNotNull(tableMetadata, "MetadataReader should have been 
initialized");
     if (!config.useFileListingMetadata()) {
       return;
     }
 
     HoodieTimer timer = new HoodieTimer().startTimer();
 
-    // Validate write config for metadata table
-    HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig();
-    assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata 
table for metadata table");
-    assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify 
for metadata table");
-
-    // Metadata table should be in sync with the dataset
-    assertTrue(metadata(client).isInSync());
-
     // Partitions should match
     List<String> fsPartitions = FSUtils.getAllFoldersWithPartitionMetaFile(fs, 
basePath);
-    List<String> metadataPartitions = 
metadataWriter.metadata().getAllPartitionPaths();
+    List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
 
     Collections.sort(fsPartitions);
     Collections.sort(metadataPartitions);
@@ -665,8 +770,9 @@ public class TestHoodieFsMetadata extends 
HoodieClientTestHarness {
         } else {
           partitionPath = new Path(basePath, partition);
         }
+
         FileStatus[] fsStatuses = FSUtils.getAllDataFilesInPartition(fs, 
partitionPath);
-        FileStatus[] metaStatuses = 
metadataWriter.metadata().getAllFilesInPartition(partitionPath);
+        FileStatus[] metaStatuses = 
tableMetadata.getAllFilesInPartition(partitionPath);
         List<String> fsFileNames = Arrays.stream(fsStatuses)
             .map(s -> s.getPath().getName()).collect(Collectors.toList());
         List<String> metadataFilenames = Arrays.stream(metaStatuses)
@@ -687,9 +793,9 @@ public class TestHoodieFsMetadata extends 
HoodieClientTestHarness {
         // FileSystemView should expose the same data
         List<HoodieFileGroup> fileGroups = 
tableView.getAllFileGroups(partition).collect(Collectors.toList());
 
-        fileGroups.forEach(g -> 
LogManager.getLogger(TestHoodieFsMetadata.class).info(g));
-        fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> 
LogManager.getLogger(TestHoodieFsMetadata.class).info(b)));
-        fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> 
LogManager.getLogger(TestHoodieFsMetadata.class).info(s)));
+        fileGroups.forEach(g -> 
LogManager.getLogger(TestHoodieBackedMetadata.class).info(g));
+        fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> 
LogManager.getLogger(TestHoodieBackedMetadata.class).info(b)));
+        fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> 
LogManager.getLogger(TestHoodieBackedMetadata.class).info(s)));
 
         long numFiles = fileGroups.stream()
             .mapToLong(g -> g.getAllBaseFiles().count() + 
g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum())
@@ -702,10 +808,18 @@ public class TestHoodieFsMetadata extends 
HoodieClientTestHarness {
       }
     });
 
-    HoodieTableMetaClient metadataMetaClient = new 
HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
+    HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
+    assertNotNull(metadataWriter, "MetadataWriter should have been 
initialized");
+
+    // Validate write config for metadata table
+    HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig();
+    assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata 
table for metadata table");
+    assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify 
for metadata table");
 
     // Metadata table should be in sync with the dataset
-    assertTrue(metadataWriter.metadata().isInSync());
+    assertTrue(metadata(client).isInSync());
+
+    HoodieTableMetaClient metadataMetaClient = new 
HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
 
     // Metadata table is MOR
     assertEquals(metadataMetaClient.getTableType(), 
HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
@@ -742,7 +856,7 @@ public class TestHoodieFsMetadata extends 
HoodieClientTestHarness {
 
   private HoodieBackedTableMetadata metadata(HoodieWriteClient client) {
     HoodieWriteConfig clientConfig = client.getConfig();
-    return (HoodieBackedTableMetadata) HoodieTableMetadata.create(hadoopConf, 
clientConfig.getBasePath(), clientConfig.getSpillableMapBasePath(),
+    return (HoodieBackedTableMetadata) 
AbstractHoodieTableMetadata.create(hadoopConf, clientConfig.getBasePath(), 
clientConfig.getSpillableMapBasePath(),
         clientConfig.useFileListingMetadata(), 
clientConfig.getFileListingMetadataVerify(), false, 
clientConfig.shouldAssumeDatePartitioning());
   }
 
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
 
b/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
index e1dc4ce..da5f434 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -91,6 +92,11 @@ public class TestUpgradeDowngrade extends 
HoodieClientTestBase {
     initDFSMetaClient();
   }
 
+  @AfterEach
+  public void cleanUp() throws Exception {
+    cleanupResources();
+  }
+
   @Test
   public void testLeftOverUpdatedPropFileCleanup() throws IOException {
     testUpgradeInternal(true, true, HoodieTableType.MERGE_ON_READ);
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
 
b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index f1e3f17..cf49174 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -21,6 +21,7 @@ import org.apache.hudi.client.HoodieReadClient;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -189,6 +190,10 @@ public abstract class HoodieClientTestHarness extends 
HoodieCommonTestHarness im
    * @throws IOException
    */
   protected void initMetaClient() throws IOException {
+    initMetaClient(getTableType());
+  }
+
+  protected void initMetaClient(HoodieTableType tableType) throws IOException {
     if (basePath == null) {
       throw new IllegalStateException("The base path has not been 
initialized.");
     }
@@ -197,7 +202,7 @@ public abstract class HoodieClientTestHarness extends 
HoodieCommonTestHarness im
       throw new IllegalStateException("The Spark context has not been 
initialized.");
     }
 
-    metaClient = HoodieTestUtils.init(hadoopConf, basePath, getTableType());
+    metaClient = HoodieTestUtils.init(hadoopConf, basePath, tableType);
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java
new file mode 100644
index 0000000..15b9ebf
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/AbstractHoodieTableMetadata.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Interface that supports querying various pieces of metadata about a hudi 
table.
+ */
+public abstract class AbstractHoodieTableMetadata implements 
HoodieTableMetadata {
+
+  private static final Logger LOG = 
LogManager.getLogger(AbstractHoodieTableMetadata.class);
+
+  static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
+  static final int BUFFER_SIZE = 10 * 1024 * 1024;
+
+  protected final SerializableConfiguration hadoopConf;
+  protected final Option<HoodieMetadataMetrics> metrics;
+  protected final String datasetBasePath;
+
+  // Directory used for Spillable Map when merging records
+  final String spillableMapDirectory;
+
+  protected boolean enabled;
+  private final boolean validateLookups;
+  private final boolean assumeDatePartitioning;
+
+  private transient HoodieMetadataMergedInstantRecordScanner 
timelineRecordScanner;
+
+  protected AbstractHoodieTableMetadata(Configuration hadoopConf, String 
datasetBasePath, String spillableMapDirectory,
+                                        boolean enabled, boolean 
validateLookups, boolean enableMetrics,
+                                        boolean assumeDatePartitioning) {
+    this.hadoopConf = new SerializableConfiguration(hadoopConf);
+    this.datasetBasePath = datasetBasePath;
+    this.spillableMapDirectory = spillableMapDirectory;
+
+    this.enabled = enabled;
+    this.validateLookups = validateLookups;
+    this.assumeDatePartitioning = assumeDatePartitioning;
+
+    if (enableMetrics) {
+      this.metrics = Option.of(new 
HoodieMetadataMetrics(Registry.getRegistry("HoodieMetadata")));
+    } else {
+      this.metrics = Option.empty();
+    }
+  }
+
+  public static AbstractHoodieTableMetadata create(Configuration conf, String 
datasetBasePath, String spillableMapPath, boolean useFileListingFromMetadata,
+                                                   boolean verifyListings, 
boolean enableMetrics, boolean shouldAssumeDatePartitioning) {
+    return new HoodieBackedTableMetadata(conf, datasetBasePath, 
spillableMapPath, useFileListingFromMetadata, verifyListings,
+        enableMetrics, shouldAssumeDatePartitioning);
+  }
+
+
+  /**
+   * Return the list of files in a partition.
+   *
+   * If the Metadata Table is enabled, the listing is retrieved from the 
stored metadata. Otherwise, the list of
+   * partitions is retrieved directly from the underlying {@code FileSystem}.
+   *
+   * On any errors retrieving the listing from the metadata, defaults to using 
the file system listings.
+   *
+   * @param partitionPath The absolute path of the partition to list
+   */
+
+  public FileStatus[] getAllFilesInPartition(Path partitionPath) throws 
IOException {
+    if (enabled) {
+      try {
+        return fetchAllFilesInPartition(partitionPath);
+      } catch (Exception e) {
+        LOG.error("Failed to retrive files in partition " + partitionPath + " 
from metadata", e);
+      }
+    }
+    return FSUtils.getFs(partitionPath.toString(), 
hadoopConf.get()).listStatus(partitionPath);
+  }
+
+  /**
+   * Return the list of partitions in the dataset.
+   *
+   * If the Metadata Table is enabled, the listing is retrieved from the 
stored metadata. Otherwise, the list of
+   * partitions is retrieved directly from the underlying {@code FileSystem}.
+   *
+   * On any errors retrieving the listing from the metadata, defaults to using 
the file system listings.
+   *
+   */
+  public List<String> getAllPartitionPaths() throws IOException {
+    if (enabled) {
+      try {
+        return fetchAllPartitionPaths();
+      } catch (Exception e) {
+        LOG.error("Failed to retrieve list of partition from metadata", e);
+      }
+    }
+
+    return new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, 
assumeDatePartitioning).getAllPartitionPaths();
+  }
+
+  /**
+   * Returns a list of all partitions.
+   */
+  /**
+   * Returns a list of all partitions.
+   */
+  protected List<String> fetchAllPartitionPaths() throws IOException {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = 
getMergedRecordByKey(RECORDKEY_PARTITION_LIST);
+    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer()));
+
+    List<String> partitions = Collections.emptyList();
+    if (hoodieRecord.isPresent()) {
+      if (!hoodieRecord.get().getData().getDeletions().isEmpty()) {
+        throw new HoodieMetadataException("Metadata partition list record is 
inconsistent: "
+                + hoodieRecord.get().getData());
+      }
+
+      partitions = hoodieRecord.get().getData().getFilenames();
+      // Partition-less tables have a single empty partition
+      if (partitions.contains(NON_PARTITIONED_NAME)) {
+        partitions.remove(NON_PARTITIONED_NAME);
+        partitions.add("");
+      }
+    }
+
+    if (validateLookups) {
+      // Validate the Metadata Table data by listing the partitions from the 
file system
+      timer.startTimer();
+      FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new 
FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, 
assumeDatePartitioning);
+      List<String> actualPartitions = 
fileSystemBackedTableMetadata.getAllPartitionPaths();
+      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR, 
timer.endTimer()));
+
+      Collections.sort(actualPartitions);
+      Collections.sort(partitions);
+      if (!actualPartitions.equals(partitions)) {
+        LOG.error("Validation of metadata partition list failed. Lists do not 
match.");
+        LOG.error("Partitions from metadata: " + 
Arrays.toString(partitions.toArray()));
+        LOG.error("Partitions from file system: " + 
Arrays.toString(actualPartitions.toArray()));
+
+        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
+      }
+
+      // Return the direct listing as it should be correct
+      partitions = actualPartitions;
+    }
+
+    LOG.info("Listed partitions from metadata: #partitions=" + 
partitions.size());
+    return partitions;
+  }
+
+  /**
+   * Return all the files from the partition.
+   *
+   * @param partitionPath The absolute path of the partition
+   */
+  private FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws 
IOException {
+    String partitionName = FSUtils.getRelativePartitionPath(new 
Path(datasetBasePath), partitionPath);
+    if (partitionName.isEmpty()) {
+      partitionName = NON_PARTITIONED_NAME;
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = 
getMergedRecordByKey(partitionName);
+    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
+
+    FileStatus[] statuses = {};
+    if (hoodieRecord.isPresent()) {
+      if (!hoodieRecord.get().getData().getDeletions().isEmpty()) {
+        throw new HoodieMetadataException("Metadata record for partition " + 
partitionName + " is inconsistent: "
+                + hoodieRecord.get().getData());
+      }
+      statuses = hoodieRecord.get().getData().getFileStatuses(partitionPath);
+    }
+
+    if (validateLookups) {
+      // Validate the Metadata Table data by listing the partitions from the 
file system
+      timer.startTimer();
+
+      // Ignore partition metadata file
+      HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
+      FileStatus[] directStatuses = 
metaClient.getFs().listStatus(partitionPath,
+          p -> 
!p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
+      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer()));
+
+      List<String> directFilenames = Arrays.stream(directStatuses)
+              .map(s -> s.getPath().getName()).sorted()
+              .collect(Collectors.toList());
+
+      List<String> metadataFilenames = Arrays.stream(statuses)
+              .map(s -> s.getPath().getName()).sorted()
+              .collect(Collectors.toList());
+
+      if (!metadataFilenames.equals(directFilenames)) {
+        LOG.error("Validation of metadata file listing for partition " + 
partitionName + " failed.");
+        LOG.error("File list from metadata: " + 
Arrays.toString(metadataFilenames.toArray()));
+        LOG.error("File list from direct listing: " + 
Arrays.toString(directFilenames.toArray()));
+
+        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
+      }
+
+      // Return the direct listing as it should be correct
+      statuses = directStatuses;
+    }
+
+    LOG.info("Listed file in partition from metadata: partition=" + 
partitionName + ", #files=" + statuses.length);
+    return statuses;
+  }
+
+  /**
+   * Retrieve the merged {@code HoodieRecord} mapped to the given key.
+   *
+   * @param key The key of the record
+   */
+  private Option<HoodieRecord<HoodieMetadataPayload>> 
getMergedRecordByKey(String key) throws IOException {
+    Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord;
+    openTimelineScanner();
+
+    Option<HoodieRecord<HoodieMetadataPayload>> metadataHoodieRecord = 
getRecordByKeyFromMetadata(key);
+    // Retrieve record from unsynced timeline instants
+    Option<HoodieRecord<HoodieMetadataPayload>> timelineHoodieRecord = 
timelineRecordScanner.getRecordByKey(key);
+    if (timelineHoodieRecord.isPresent()) {
+      if (metadataHoodieRecord.isPresent()) {
+        HoodieRecordPayload mergedPayload = 
timelineHoodieRecord.get().getData().preCombine(metadataHoodieRecord.get().getData());
+        mergedRecord = Option.of(new 
HoodieRecord(metadataHoodieRecord.get().getKey(), mergedPayload));
+      } else {
+        mergedRecord = timelineHoodieRecord;
+      }
+    } else {
+      mergedRecord = metadataHoodieRecord;
+    }
+    return mergedRecord;
+  }
+
+  protected abstract Option<HoodieRecord<HoodieMetadataPayload>> 
getRecordByKeyFromMetadata(String key) throws IOException;
+
+  private void openTimelineScanner() throws IOException {
+    if (timelineRecordScanner != null) {
+      // Already opened
+      return;
+    }
+
+    HoodieTableMetaClient datasetMetaClient = new 
HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
+    List<HoodieInstant> unsyncedInstants = 
findInstantsToSync(datasetMetaClient);
+    Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
+    timelineRecordScanner =
+            new HoodieMetadataMergedInstantRecordScanner(datasetMetaClient, 
unsyncedInstants, getSyncedInstantTime(), schema, MAX_MEMORY_SIZE_IN_BYTES, 
spillableMapDirectory, null);
+  }
+
+  protected List<HoodieInstant> findInstantsToSync() {
+    HoodieTableMetaClient datasetMetaClient = new 
HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
+    return findInstantsToSync(datasetMetaClient);
+  }
+
+  protected abstract List<HoodieInstant> 
findInstantsToSync(HoodieTableMetaClient datasetMetaClient);
+
+  public boolean isInSync() {
+    return enabled && findInstantsToSync().isEmpty();
+  }
+
+  protected void closeReaders() {
+    timelineRecordScanner = null;
+  }
+
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 3a1c7bf..bf5aa76 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -19,8 +19,6 @@
 package org.apache.hudi.metadata;
 
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,17 +27,13 @@ import java.util.stream.Collectors;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.common.config.SerializableConfiguration;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieLogFile;
-import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -52,7 +46,6 @@ import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -65,24 +58,13 @@ import org.apache.log4j.Logger;
  * If the metadata table does not exist, RPC calls are used to retrieve file 
listings from the file system.
  * No updates are applied to the table and it is not synced.
  */
-public class HoodieBackedTableMetadata implements HoodieTableMetadata {
+public class HoodieBackedTableMetadata extends AbstractHoodieTableMetadata {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieBackedTableMetadata.class);
-  private static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
-  private static final int BUFFER_SIZE = 10 * 1024 * 1024;
 
-  private final SerializableConfiguration hadoopConf;
-  private final String datasetBasePath;
   private final String metadataBasePath;
-  private final Option<HoodieMetadataMetrics> metrics;
   private HoodieTableMetaClient metaClient;
 
-  private boolean enabled;
-  private final boolean validateLookups;
-  private final boolean assumeDatePartitioning;
-  // Directory used for Spillable Map when merging records
-  private final String spillableMapDirectory;
-
   // Readers for the base and log file which store the metadata
   private transient HoodieFileReader<GenericRecord> baseFileReader;
   private transient HoodieMetadataMergedLogRecordScanner logRecordScanner;
@@ -95,13 +77,8 @@ public class HoodieBackedTableMetadata implements 
HoodieTableMetadata {
   public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, 
String spillableMapDirectory,
                                    boolean enabled, boolean validateLookups, 
boolean enableMetrics,
                                    boolean assumeDatePartitioning) {
-    this.hadoopConf = new SerializableConfiguration(conf);
-    this.datasetBasePath = datasetBasePath;
+    super(conf, datasetBasePath, spillableMapDirectory, enabled, 
validateLookups, enableMetrics, assumeDatePartitioning);
     this.metadataBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath);
-    this.validateLookups = validateLookups;
-    this.spillableMapDirectory = spillableMapDirectory;
-    this.enabled = enabled;
-    this.assumeDatePartitioning = assumeDatePartitioning;
 
     if (enabled) {
       try {
@@ -116,164 +93,6 @@ public class HoodieBackedTableMetadata implements 
HoodieTableMetadata {
     } else {
       LOG.info("Metadata table is disabled.");
     }
-
-    if (enableMetrics) {
-      this.metrics = Option.of(new 
HoodieMetadataMetrics(Registry.getRegistry("HoodieMetadata")));
-    } else {
-      this.metrics = Option.empty();
-    }
-  }
-
-  /**
-   * Return the list of partitions in the dataset.
-   *
-   * If the Metadata Table is enabled, the listing is retrieved from the 
stored metadata. Otherwise, the list of
-   * partitions is retrieved directly from the underlying {@code FileSystem}.
-   *
-   * On any errors retrieving the listing from the metadata, defaults to using 
the file system listings.
-   *
-   */
-  @Override
-  public List<String> getAllPartitionPaths()
-      throws IOException {
-    if (enabled) {
-      try {
-        return fetchAllPartitionPaths();
-      } catch (Exception e) {
-        LOG.error("Failed to retrieve list of partition from metadata", e);
-      }
-    }
-
-    return new FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, 
assumeDatePartitioning).getAllPartitionPaths();
-  }
-
-  /**
-   * Return the list of files in a partition.
-   *
-   * If the Metadata Table is enabled, the listing is retrieved from the 
stored metadata. Otherwise, the list of
-   * partitions is retrieved directly from the underlying {@code FileSystem}.
-   *
-   * On any errors retrieving the listing from the metadata, defaults to using 
the file system listings.
-   *
-   * @param partitionPath The absolute path of the partition to list
-   */
-  @Override
-  public FileStatus[] getAllFilesInPartition(Path partitionPath)
-      throws IOException {
-    if (enabled) {
-      try {
-        return fetchAllFilesInPartition(partitionPath);
-      } catch (Exception e) {
-        LOG.error("Failed to retrive files in partition " + partitionPath + " 
from metadata", e);
-      }
-    }
-
-    return FSUtils.getFs(partitionPath.toString(), 
hadoopConf.get()).listStatus(partitionPath);
-  }
-
-  /**
-   * Returns a list of all partitions.
-   */
-  protected List<String> fetchAllPartitionPaths() throws IOException {
-    HoodieTimer timer = new HoodieTimer().startTimer();
-    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = 
getMergedRecordByKey(RECORDKEY_PARTITION_LIST);
-    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer()));
-
-    List<String> partitions = Collections.emptyList();
-    if (hoodieRecord.isPresent()) {
-      if (!hoodieRecord.get().getData().getDeletions().isEmpty()) {
-        throw new HoodieMetadataException("Metadata partition list record is 
inconsistent: "
-            + hoodieRecord.get().getData());
-      }
-
-      partitions = hoodieRecord.get().getData().getFilenames();
-      // Partition-less tables have a single empty partition
-      if (partitions.contains(NON_PARTITIONED_NAME)) {
-        partitions.remove(NON_PARTITIONED_NAME);
-        partitions.add("");
-      }
-    }
-
-    if (validateLookups) {
-      // Validate the Metadata Table data by listing the partitions from the 
file system
-      timer.startTimer();
-      FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new 
FileSystemBackedTableMetadata(hadoopConf, datasetBasePath, 
assumeDatePartitioning);
-      List<String> actualPartitions = 
fileSystemBackedTableMetadata.getAllPartitionPaths();
-      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR, 
timer.endTimer()));
-
-      Collections.sort(actualPartitions);
-      Collections.sort(partitions);
-      if (!actualPartitions.equals(partitions)) {
-        LOG.error("Validation of metadata partition list failed. Lists do not 
match.");
-        LOG.error("Partitions from metadata: " + 
Arrays.toString(partitions.toArray()));
-        LOG.error("Partitions from file system: " + 
Arrays.toString(actualPartitions.toArray()));
-
-        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
-      }
-
-      // Return the direct listing as it should be correct
-      partitions = actualPartitions;
-    }
-
-    LOG.info("Listed partitions from metadata: #partitions=" + 
partitions.size());
-    return partitions;
-  }
-
-  /**
-   * Return all the files from the partition.
-   *
-   * @param partitionPath The absolute path of the partition
-   */
-  FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException 
{
-    String partitionName = FSUtils.getRelativePartitionPath(new 
Path(datasetBasePath), partitionPath);
-    if (partitionName.isEmpty()) {
-      partitionName = NON_PARTITIONED_NAME;
-    }
-
-    HoodieTimer timer = new HoodieTimer().startTimer();
-    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = 
getMergedRecordByKey(partitionName);
-    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
-
-    FileStatus[] statuses = {};
-    if (hoodieRecord.isPresent()) {
-      if (!hoodieRecord.get().getData().getDeletions().isEmpty()) {
-        throw new HoodieMetadataException("Metadata record for partition " + 
partitionName + " is inconsistent: "
-              + hoodieRecord.get().getData());
-      }
-      statuses = hoodieRecord.get().getData().getFileStatuses(partitionPath);
-    }
-
-    if (validateLookups) {
-      // Validate the Metadata Table data by listing the partitions from the 
file system
-      timer.startTimer();
-
-      // Ignore partition metadata file
-      FileStatus[] directStatuses = 
metaClient.getFs().listStatus(partitionPath,
-          p -> 
!p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
-      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer()));
-
-      List<String> directFilenames = Arrays.stream(directStatuses)
-          .map(s -> s.getPath().getName()).sorted()
-          .collect(Collectors.toList());
-
-      List<String> metadataFilenames = Arrays.stream(statuses)
-          .map(s -> s.getPath().getName()).sorted()
-          .collect(Collectors.toList());
-
-      if (!metadataFilenames.equals(directFilenames)) {
-        LOG.error("Validation of metadata file listing for partition " + 
partitionName + " failed.");
-        LOG.error("File list from metadata: " + 
Arrays.toString(metadataFilenames.toArray()));
-        LOG.error("File list from direct listing: " + 
Arrays.toString(directFilenames.toArray()));
-
-        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
-      }
-
-      // Return the direct listing as it should be correct
-      statuses = directStatuses;
-    }
-
-    LOG.info("Listed file in partition from metadata: partition=" + 
partitionName + ", #files=" + statuses.length);
-    return statuses;
   }
 
   /**
@@ -281,7 +100,8 @@ public class HoodieBackedTableMetadata implements 
HoodieTableMetadata {
    *
    * @param key The key of the record
    */
-  private Option<HoodieRecord<HoodieMetadataPayload>> 
getMergedRecordByKey(String key) throws IOException {
+  @Override
+  protected Option<HoodieRecord<HoodieMetadataPayload>> 
getRecordByKeyFromMetadata(String key) throws IOException {
     openBaseAndLogFiles();
 
     // Retrieve record from base file
@@ -314,7 +134,7 @@ public class HoodieBackedTableMetadata implements 
HoodieTableMetadata {
   /**
    * Open readers to the base and log files.
    */
-  private synchronized void openBaseAndLogFiles() throws IOException {
+  protected synchronized void openBaseAndLogFiles() throws IOException {
     if (logRecordScanner != null) {
       // Already opened
       return;
@@ -363,7 +183,9 @@ public class HoodieBackedTableMetadata implements 
HoodieTableMetadata {
     metrics.ifPresent(metrics -> 
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, timer.endTimer()));
   }
 
+  @Override
   protected void closeReaders() {
+    super.closeReaders();
     if (baseFileReader != null) {
       baseFileReader.close();
       baseFileReader = null;
@@ -372,19 +194,6 @@ public class HoodieBackedTableMetadata implements 
HoodieTableMetadata {
   }
 
   /**
-   * Return {@code True} if all Instants from the dataset have been synced 
with the Metadata Table.
-   */
-  @Override
-  public boolean isInSync() {
-    return enabled && findInstantsToSync().isEmpty();
-  }
-
-  private List<HoodieInstant> findInstantsToSync() {
-    HoodieTableMetaClient datasetMetaClient = new 
HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
-    return findInstantsToSync(datasetMetaClient);
-  }
-
-  /**
    * Return an ordered list of instants which have not been synced to the 
Metadata Table.
 
    * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java
new file mode 100644
index 0000000..c98f48c
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedInstantRecordScanner.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.avro.Schema;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.DefaultSizeEstimator;
+import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Provides functionality to convert timeline instants to table metadata 
records and then merge by key. Specify
+ *  a filter to limit keys that are merged and stored in memory.
+ */
+public class HoodieMetadataMergedInstantRecordScanner {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieMetadataMergedInstantRecordScanner.class);
+
+  HoodieTableMetaClient metaClient;
+  private List<HoodieInstant> instants;
+  private Option<String> lastSyncTs;
+  private Set<String> mergeKeyFilter;
+  protected final ExternalSpillableMap<String, HoodieRecord<? extends 
HoodieRecordPayload>> records;
+
+  public HoodieMetadataMergedInstantRecordScanner(HoodieTableMetaClient 
metaClient, List<HoodieInstant> instants,
+                                                  Option<String> lastSyncTs, 
Schema readerSchema, Long maxMemorySizeInBytes,
+                                                  String spillableMapBasePath, 
Set<String> mergeKeyFilter) throws IOException {
+    this.metaClient = metaClient;
+    this.instants = instants;
+    this.lastSyncTs = lastSyncTs;
+    this.mergeKeyFilter = mergeKeyFilter != null ? mergeKeyFilter : 
Collections.emptySet();
+    this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, 
spillableMapBasePath, new DefaultSizeEstimator(),
+            new HoodieRecordSizeEstimator(readerSchema));
+
+    scan();
+  }
+
+  /**
+   * Converts instants in scanner to metadata table records and processes each 
record.
+   *
+   * @param
+   * @throws IOException
+   */
+  private void scan() {
+    for (HoodieInstant instant : instants) {
+      try {
+        Option<List<HoodieRecord>> records = 
HoodieTableMetadataUtil.convertInstantToMetaRecords(metaClient, instant, 
lastSyncTs);
+        if (records.isPresent()) {
+          records.get().forEach(record -> processNextRecord(record));
+        }
+      } catch (Exception e) {
+        LOG.error(String.format("Got exception when processing timeline 
instant %s", instant.getTimestamp()), e);
+        throw new HoodieException(String.format("Got exception when processing 
timeline instant %s", instant.getTimestamp()), e);
+      }
+    }
+  }
+
+  /**
+   * Process metadata table record by merging with existing record if it is a 
part of the key filter.
+   *
+   * @param hoodieRecord
+   */
+  private void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> 
hoodieRecord) {
+    String key = hoodieRecord.getRecordKey();
+    if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(key)) {
+      if (records.containsKey(key)) {
+        // Merge and store the merged record
+        HoodieRecordPayload combinedValue = 
hoodieRecord.getData().preCombine(records.get(key).getData());
+        records.put(key, new HoodieRecord<>(new HoodieKey(key, 
hoodieRecord.getPartitionPath()), combinedValue));
+      } else {
+        // Put the record as is
+        records.put(key, hoodieRecord);
+      }
+    }
+  }
+
+  /**
+   * Retrieve merged hoodie record for given key.
+   *
+   * @param key of the record to retrieve
+   * @return {@code HoodieRecord} if key was found else {@code Option.empty()}
+   */
+  public Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String 
key) {
+    return Option.ofNullable((HoodieRecord) records.get(key));
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
new file mode 100644
index 0000000..9a25825
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
+
+/**
+ * A utility to convert timeline information to metadata table records.
+ */
+public class HoodieTableMetadataUtil {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieTableMetadataUtil.class);
+
+  /**
+   * Converts a timeline instant to metadata table records.
+   *
+   * @param datasetMetaClient The meta client associated with the timeline 
instant
+   * @param instant to fetch and convert to metadata table records
+   * @return a list of metadata table records
+   * @throws IOException
+   */
+  public static Option<List<HoodieRecord>> 
convertInstantToMetaRecords(HoodieTableMetaClient datasetMetaClient, 
HoodieInstant instant, Option<String> lastSyncTs) throws IOException {
+    HoodieTimeline timeline = datasetMetaClient.getActiveTimeline();
+    Option<List<HoodieRecord>> records = Option.empty();
+    ValidationUtils.checkArgument(instant.isCompleted(), "Only completed 
instants can be synced.");
+
+    switch (instant.getAction()) {
+      case HoodieTimeline.CLEAN_ACTION:
+        HoodieCleanMetadata cleanMetadata = 
CleanerUtils.getCleanerMetadata(datasetMetaClient, instant);
+        records = Option.of(convertMetadataToRecords(cleanMetadata, 
instant.getTimestamp()));
+        break;
+      case HoodieTimeline.DELTA_COMMIT_ACTION:
+      case HoodieTimeline.COMMIT_ACTION:
+      case HoodieTimeline.COMPACTION_ACTION:
+        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
+                timeline.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
+        records = Option.of(convertMetadataToRecords(commitMetadata, 
instant.getTimestamp()));
+        break;
+      case HoodieTimeline.ROLLBACK_ACTION:
+        HoodieRollbackMetadata rollbackMetadata = 
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
+                timeline.getInstantDetails(instant).get());
+        records = Option.of(convertMetadataToRecords(rollbackMetadata, 
instant.getTimestamp(), lastSyncTs));
+        break;
+      case HoodieTimeline.RESTORE_ACTION:
+        HoodieRestoreMetadata restoreMetadata = 
TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
+                timeline.getInstantDetails(instant).get());
+        records = Option.of(convertMetadataToRecords(restoreMetadata, 
instant.getTimestamp(), lastSyncTs));
+        break;
+      case HoodieTimeline.SAVEPOINT_ACTION:
+        // Nothing to be done here
+        break;
+      default:
+        throw new HoodieException("Unknown type of action " + 
instant.getAction());
+    }
+
+    return records;
+  }
+
+  /**
+   * Finds all new files/partitions created as part of commit and creates 
metadata table records for them.
+   *
+   * @param commitMetadata
+   * @param instantTime
+   * @return a list of metadata table records
+   */
+  public static List<HoodieRecord> 
convertMetadataToRecords(HoodieCommitMetadata commitMetadata, String 
instantTime) {
+
+    List<HoodieRecord> records = new LinkedList<>();
+    List<String> allPartitions = new LinkedList<>();
+    commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, 
writeStats) -> {
+      final String partition = partitionStatName.equals("") ? 
NON_PARTITIONED_NAME : partitionStatName;
+      allPartitions.add(partition);
+
+      Map<String, Long> newFiles = new HashMap<>(writeStats.size());
+      writeStats.forEach(hoodieWriteStat -> {
+        String pathWithPartition = hoodieWriteStat.getPath();
+        if (pathWithPartition == null) {
+          // Empty partition
+          LOG.warn("Unable to find path in write stat to update metadata table 
" + hoodieWriteStat);
+          return;
+        }
+
+        int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : 
partition.length() + 1;
+        String filename = pathWithPartition.substring(offset);
+        ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate 
files in HoodieCommitMetadata");
+        newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes());
+      });
+
+      // New files added to a partition
+      HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
+              partition, Option.of(newFiles), Option.empty());
+      records.add(record);
+    });
+
+    // New partitions created
+    HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new 
ArrayList<>(allPartitions));
+    records.add(record);
+
+    LOG.info("Updating at " + instantTime + " from Commit/" + 
commitMetadata.getOperationType()
+            + ". #partitions_updated=" + records.size());
+    return records;
+  }
+
+  /**
+   * Finds all files that will be deleted as part of a planned clean and 
creates metadata table records for them.
+   *
+   * @param cleanerPlan from timeline to convert
+   * @param instantTime
+   * @return a list of metadata table records
+   */
+  public static List<HoodieRecord> convertMetadataToRecords(HoodieCleanerPlan 
cleanerPlan, String instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+
+    int[] fileDeleteCount = {0};
+    cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition, 
deletedPathInfo) -> {
+      fileDeleteCount[0] += deletedPathInfo.size();
+
+      // Files deleted from a partition
+      List<String> deletedFilenames = deletedPathInfo.stream().map(p -> new 
Path(p.getFilePath()).getName())
+              .collect(Collectors.toList());
+      HoodieRecord record = 
HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
+              Option.of(deletedFilenames));
+      records.add(record);
+    });
+
+    LOG.info("Found at " + instantTime + " from CleanerPlan. 
#partitions_updated=" + records.size()
+            + ", #files_deleted=" + fileDeleteCount[0]);
+
+    return records;
+  }
+
+  /**
+   * Finds all files that were deleted as part of a clean and creates metadata 
table records for them.
+   *
+   * @param cleanMetadata
+   * @param instantTime
+   * @return a list of metadata table records
+   */
+  public static List<HoodieRecord> 
convertMetadataToRecords(HoodieCleanMetadata cleanMetadata, String instantTime) 
{
+    List<HoodieRecord> records = new LinkedList<>();
+    int[] fileDeleteCount = {0};
+
+    cleanMetadata.getPartitionMetadata().forEach((partition, 
partitionMetadata) -> {
+      // Files deleted from a partition
+      List<String> deletedFiles = partitionMetadata.getSuccessDeleteFiles();
+      HoodieRecord record = 
HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
+              Option.of(new ArrayList<>(deletedFiles)));
+
+      records.add(record);
+      fileDeleteCount[0] += deletedFiles.size();
+    });
+
+    LOG.info("Found at " + instantTime + " from Clean. #partitions_updated=" + 
records.size()
+            + ", #files_deleted=" + fileDeleteCount[0]);
+
+    return records;
+  }
+
+  /**
+   * Aggregates all files deleted and appended to from all rollbacks 
associated with a restore operation then
+   * creates metadata table records for them.
+   *
+   * @param restoreMetadata
+   * @param instantTime
+   * @return a list of metadata table records
+   */
+  public static List<HoodieRecord> 
convertMetadataToRecords(HoodieRestoreMetadata restoreMetadata, String 
instantTime, Option<String> lastSyncTs) {
+    Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
+    Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
+    restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
+      rms.forEach(rm -> processRollbackMetadata(rm, partitionToDeletedFiles, 
partitionToAppendedFiles, lastSyncTs));
+    });
+
+
+    return convertFilesToRecords(partitionToDeletedFiles, 
partitionToAppendedFiles, instantTime, "Restore");
+  }
+
+  public static List<HoodieRecord> 
convertMetadataToRecords(HoodieRollbackMetadata rollbackMetadata, String 
instantTime, Option<String> lastSyncTs) {
+
+    Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
+    Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
+    processRollbackMetadata(rollbackMetadata, partitionToDeletedFiles, 
partitionToAppendedFiles, lastSyncTs);
+    return convertFilesToRecords(partitionToDeletedFiles, 
partitionToAppendedFiles, instantTime, "Rollback");
+  }
+
+  /**
+   * Extracts information about the deleted and append files from the {@code 
HoodieRollbackMetadata}.
+   *
+   * During a rollback files may be deleted (COW, MOR) or rollback blocks be 
appended (MOR only) to files. This
+   * function will extract this change file for each partition.
+   *
+   * @param rollbackMetadata {@code HoodieRollbackMetadata}
+   * @param partitionToDeletedFiles The {@code Map} to fill with files deleted 
per partition.
+   * @param partitionToAppendedFiles The {@code Map} to fill with files 
appended per partition and their sizes.
+   */
+  private static void processRollbackMetadata(HoodieRollbackMetadata 
rollbackMetadata,
+                                       Map<String, List<String>> 
partitionToDeletedFiles,
+                                       Map<String, Map<String, Long>> 
partitionToAppendedFiles,
+                                       Option<String> lastSyncTs) {
+
+    rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
+      // If commit being rolled back has not been synced to metadata table yet 
then there is no need to update metadata
+      if (lastSyncTs.isPresent() && 
HoodieTimeline.compareTimestamps(rollbackMetadata.getCommitsRollback().get(0), 
HoodieTimeline.GREATER_THAN, lastSyncTs.get())) {
+        return;
+      }
+
+      final String partition = pm.getPartitionPath();
+      if (!pm.getSuccessDeleteFiles().isEmpty()) {
+        if (!partitionToDeletedFiles.containsKey(partition)) {
+          partitionToDeletedFiles.put(partition, new ArrayList<>());
+        }
+
+        // Extract deleted file name from the absolute paths saved in 
getSuccessDeleteFiles()
+        List<String> deletedFiles = pm.getSuccessDeleteFiles().stream().map(p 
-> new Path(p).getName())
+                .collect(Collectors.toList());
+        partitionToDeletedFiles.get(partition).addAll(deletedFiles);
+      }
+
+      if (!pm.getAppendFiles().isEmpty()) {
+        if (!partitionToAppendedFiles.containsKey(partition)) {
+          partitionToAppendedFiles.put(partition, new HashMap<>());
+        }
+
+        // Extract appended file name from the absolute paths saved in 
getAppendFiles()
+        pm.getAppendFiles().forEach((path, size) -> {
+          partitionToAppendedFiles.get(partition).merge(new 
Path(path).getName(), size, (oldSize, newSizeCopy) -> {
+            return size + oldSize;
+          });
+        });
+      }
+    });
+  }
+
+  private static List<HoodieRecord> convertFilesToRecords(Map<String, 
List<String>> partitionToDeletedFiles,
+                                                   Map<String, Map<String, 
Long>> partitionToAppendedFiles, String instantTime,
+                                                   String operation) {
+    List<HoodieRecord> records = new LinkedList<>();
+    int[] fileChangeCount = {0, 0}; // deletes, appends
+
+    partitionToDeletedFiles.forEach((partition, deletedFiles) -> {
+      fileChangeCount[0] += deletedFiles.size();
+
+      Option<Map<String, Long>> filesAdded = Option.empty();
+      if (partitionToAppendedFiles.containsKey(partition)) {
+        filesAdded = Option.of(partitionToAppendedFiles.remove(partition));
+      }
+
+      HoodieRecord record = 
HoodieMetadataPayload.createPartitionFilesRecord(partition, filesAdded,
+              Option.of(new ArrayList<>(deletedFiles)));
+      records.add(record);
+    });
+
+    partitionToAppendedFiles.forEach((partition, appendedFileMap) -> {
+      fileChangeCount[1] += appendedFileMap.size();
+
+      // Validate that no appended file has been deleted
+      ValidationUtils.checkState(
+              
!appendedFileMap.keySet().removeAll(partitionToDeletedFiles.getOrDefault(partition,
 Collections.emptyList())),
+              "Rollback file cannot both be appended and deleted");
+
+      // New files added to a partition
+      HoodieRecord record = 
HoodieMetadataPayload.createPartitionFilesRecord(partition, 
Option.of(appendedFileMap),
+              Option.empty());
+      records.add(record);
+    });
+
+    LOG.info("Found at " + instantTime + " from " + operation + ". 
#partitions_updated=" + records.size()
+            + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" 
+ fileChangeCount[1]);
+
+    return records;
+  }
+
+}

Reply via email to