This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/rfc-15 by this push:
     new 7d31f0a  [RFC-15] Adding interfaces for HoodieMetadata, 
HoodieMetadataWriter (#2266)
7d31f0a is described below

commit 7d31f0ab1aff1eed56fcb5c473c32fc7ed83847e
Author: vinoth chandar <[email protected]>
AuthorDate: Tue Dec 1 08:38:07 2020 -0800

    [RFC-15] Adding interfaces for HoodieMetadata, HoodieMetadataWriter (#2266)
    
    - moved MetadataReader to HoodieBackedTableMetadata, under the 
HoodieTableMetadata interface
     - moved MetadataWriter to HoodieBackedTableMetadataWriter, under the 
HoodieTableMetadataWriter
     - Pulled all the metrics into HoodieMetadataMetrics
     - Writer now wraps the metadata, instead of extending it
     - New enum for MetadataPartitionType
     - Streamlined code flow inside HoodieBackedTableMetadataWriter w.r.t 
initializing metadata state
---
 .../apache/hudi/cli/commands/MetadataCommand.java  |  64 +++-
 .../hudi/client/AbstractHoodieWriteClient.java     |   2 +-
 .../org/apache/hudi/client/HoodieWriteClient.java  |   6 +-
 ...r.java => HoodieBackedTableMetadataWriter.java} | 426 ++++++++++-----------
 .../metadata/HoodieMetadataFileSystemView.java     |   5 +-
 .../hudi/metadata/HoodieTableMetadataWriter.java   |  51 +++
 .../java/org/apache/hudi/table/HoodieTable.java    |  20 +-
 .../hudi/table/HoodieTimelineArchiveLog.java       |   3 +-
 .../table/action/clean/CleanActionExecutor.java    |   2 +-
 .../hudi/table/action/clean/CleanPlanner.java      |   3 +-
 .../action/commit/BaseCommitActionExecutor.java    |   2 +-
 .../action/restore/BaseRestoreActionExecutor.java  |   2 +-
 .../rollback/BaseRollbackActionExecutor.java       |   2 +-
 ...odieMetadata.java => TestHoodieFsMetadata.java} |  63 +--
 ...aReader.java => HoodieBackedTableMetadata.java} | 312 ++++-----------
 .../HoodieMetadataMergedLogRecordScanner.java      |   7 +-
 .../hudi/metadata/HoodieMetadataMetrics.java       | 148 +++++++
 .../hudi/metadata/HoodieMetadataPayload.java       |   7 +-
 .../apache/hudi/metadata/HoodieTableMetadata.java  |  86 +++++
 .../hudi/metadata/MetadataPartitionType.java       |  33 ++
 20 files changed, 724 insertions(+), 520 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
index a45e9b4..2eb9988 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
@@ -22,12 +22,14 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.utils.SparkUtil;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieMetadataConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.metadata.HoodieMetadataReader;
-import org.apache.hudi.metadata.HoodieMetadataWriter;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+
 import org.apache.spark.api.java.JavaSparkContext;
 import org.springframework.shell.core.CommandMarker;
 import org.springframework.shell.core.annotation.CliCommand;
@@ -45,23 +47,45 @@ import java.util.Map;
  */
 @Component
 public class MetadataCommand implements CommandMarker {
+
   private JavaSparkContext jsc;
+  private static String metadataBaseDirectory;
+
+  /**
+   * Sets the directory to store/read Metadata Table.
+   *
+   * This can be used to store the metadata table away from the dataset 
directory.
+   *  - Useful for testing as well as for using via the HUDI CLI so that the 
actual dataset is not written to.
+   *  - Useful for testing Metadata Table performance and operations on 
existing datasets before enabling.
+   */
+  public static void setMetadataBaseDirectory(String metadataDir) {
+    ValidationUtils.checkState(metadataBaseDirectory == null,
+        "metadataBaseDirectory is already set to " + metadataBaseDirectory);
+    metadataBaseDirectory = metadataDir;
+  }
+
+  public static String getMetadataTableBasePath(String tableBasePath) {
+    if (metadataBaseDirectory != null) {
+      return metadataBaseDirectory;
+    }
+    return HoodieTableMetadata.getMetadataTableBasePath(tableBasePath);
+  }
 
   @CliCommand(value = "metadata set", help = "Set options for Metadata Table")
   public String set(@CliOption(key = {"metadataDir"},
       help = "Directory to read/write metadata table (can be different from 
dataset)", unspecifiedDefaultValue = "")
       final String metadataDir) {
     if (!metadataDir.isEmpty()) {
-      HoodieMetadataReader.setMetadataBaseDirectory(metadataDir);
+      setMetadataBaseDirectory(metadataDir);
     }
 
-    return String.format("Ok");
+    return "Ok";
   }
 
   @CliCommand(value = "metadata create", help = "Create the Metadata Table if 
it does not exist")
   public String create() throws IOException {
     HoodieCLI.getTableMetaClient();
-    Path metadataPath = new 
Path(HoodieMetadataReader.getMetadataTableBasePath(HoodieCLI.basePath));
+    Path metadataPath = new Path(getMetadataTableBasePath(HoodieCLI.basePath));
     try {
       FileStatus[] statuses = HoodieCLI.fs.listStatus(metadataPath);
       if (statuses.length > 0) {
@@ -75,15 +99,14 @@ public class MetadataCommand implements CommandMarker {
     HoodieTimer timer = new HoodieTimer().startTimer();
     HoodieWriteConfig writeConfig = getWriteConfig();
     initJavaSparkContext();
-    HoodieMetadataWriter.create(HoodieCLI.conf, writeConfig).initialize(jsc);
+    HoodieTableMetadataWriter.create(HoodieCLI.conf, writeConfig, jsc);
     return String.format("Created Metadata Table in %s (duration=%.2f secs)", 
metadataPath, timer.endTimer() / 1000.0);
   }
 
   @CliCommand(value = "metadata delete", help = "Remove the Metadata Table")
   public String delete() throws Exception {
     HoodieCLI.getTableMetaClient();
-    HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
-    Path metadataPath = new 
Path(HoodieMetadataReader.getMetadataTableBasePath(HoodieCLI.basePath));
+    Path metadataPath = new Path(getMetadataTableBasePath(HoodieCLI.basePath));
     try {
       FileStatus[] statuses = HoodieCLI.fs.listStatus(metadataPath);
       if (statuses.length > 0) {
@@ -100,7 +123,7 @@ public class MetadataCommand implements CommandMarker {
   public String init(@CliOption(key = {"readonly"}, unspecifiedDefaultValue = 
"false",
       help = "Open in read-only mode") final boolean readOnly) throws 
Exception {
     HoodieCLI.getTableMetaClient();
-    Path metadataPath = new 
Path(HoodieMetadataReader.getMetadataTableBasePath(HoodieCLI.basePath));
+    Path metadataPath = new Path(getMetadataTableBasePath(HoodieCLI.basePath));
     try {
       HoodieCLI.fs.listStatus(metadataPath);
     } catch (FileNotFoundException e) {
@@ -114,7 +137,7 @@ public class MetadataCommand implements CommandMarker {
     } else {
       HoodieWriteConfig writeConfig = getWriteConfig();
       initJavaSparkContext();
-      HoodieMetadataWriter.create(HoodieCLI.conf, writeConfig).initialize(jsc);
+      HoodieTableMetadataWriter.create(HoodieCLI.conf, writeConfig, jsc);
     }
     long t2 = System.currentTimeMillis();
 
@@ -125,11 +148,11 @@ public class MetadataCommand implements CommandMarker {
   @CliCommand(value = "metadata stats", help = "Print stats about the 
metadata")
   public String stats() throws IOException {
     HoodieCLI.getTableMetaClient();
-    HoodieMetadataReader metaReader = new HoodieMetadataReader(HoodieCLI.conf, 
HoodieCLI.basePath, "/tmp", true, false);
-    Map<String, String> stats = metaReader.getStats(true);
+    HoodieBackedTableMetadata metadata = new 
HoodieBackedTableMetadata(HoodieCLI.conf, HoodieCLI.basePath, "/tmp", true, 
false, false);
+    Map<String, String> stats = metadata.stats();
 
     StringBuffer out = new StringBuffer("\n");
-    out.append(String.format("Base path: %s\n", 
HoodieMetadataReader.getMetadataTableBasePath(HoodieCLI.basePath)));
+    out.append(String.format("Base path: %s\n", 
getMetadataTableBasePath(HoodieCLI.basePath)));
     for (Map.Entry<String, String> entry : stats.entrySet()) {
       out.append(String.format("%s: %s\n", entry.getKey(), entry.getValue()));
     }
@@ -140,15 +163,15 @@ public class MetadataCommand implements CommandMarker {
   @CliCommand(value = "metadata list-partitions", help = "Print a list of all 
partitions from the metadata")
   public String listPartitions() throws IOException {
     HoodieCLI.getTableMetaClient();
-    HoodieMetadataReader metaReader = new HoodieMetadataReader(HoodieCLI.conf, 
HoodieCLI.basePath, "/tmp", true, false);
+    HoodieBackedTableMetadata metadata = new 
HoodieBackedTableMetadata(HoodieCLI.conf, HoodieCLI.basePath, "/tmp", true, 
false, false);
 
     StringBuffer out = new StringBuffer("\n");
-    if (!metaReader.enabled()) {
+    if (!metadata.enabled()) {
       out.append("=== Metadata Table not initilized. Using file listing to get 
list of partitions. ===\n\n");
     }
 
     long t1 = System.currentTimeMillis();
-    List<String> partitions = metaReader.getAllPartitionPaths(HoodieCLI.fs, 
HoodieCLI.basePath, false);
+    List<String> partitions = metadata.getAllPartitionPaths();
     long t2 = System.currentTimeMillis();
 
     int[] count = {0};
@@ -171,16 +194,15 @@ public class MetadataCommand implements CommandMarker {
       @CliOption(key = {"partition"}, help = "Name of the partition to list 
files", mandatory = true)
       final String partition) throws IOException {
     HoodieCLI.getTableMetaClient();
-    HoodieMetadataReader metaReader = new HoodieMetadataReader(HoodieCLI.conf, 
HoodieCLI.basePath, "/tmp", true, false);
+    HoodieBackedTableMetadata metaReader = new 
HoodieBackedTableMetadata(HoodieCLI.conf, HoodieCLI.basePath, "/tmp", true, 
false, false);
 
     StringBuffer out = new StringBuffer("\n");
     if (!metaReader.enabled()) {
-      out.append("=== Metadata Table not initilized. Using file listing to get 
list of files in partition. ===\n\n");
+      out.append("=== Metadata Table not initialized. Using file listing to 
get list of files in partition. ===\n\n");
     }
 
     long t1 = System.currentTimeMillis();
-    FileStatus[] statuses = metaReader.getAllFilesInPartition(HoodieCLI.conf, 
HoodieCLI.basePath,
-        new Path(HoodieCLI.basePath, partition));
+    FileStatus[] statuses = metaReader.getAllFilesInPartition(new 
Path(HoodieCLI.basePath, partition));
     long t2 = System.currentTimeMillis();
 
     Arrays.stream(statuses).sorted((p1, p2) -> 
p2.getPath().getName().compareTo(p1.getPath().getName())).forEach(p -> {
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
 
b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index a4015a6..eff299b 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -128,7 +128,7 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload> e
     finalizeWrite(table, instantTime, stats);
 
     try {
-      table.metadata().update(jsc, metadata, instantTime);
+      table.metadataWriter(jsc).update(metadata, instantTime);
 
       activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, 
instantTime),
           Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java 
b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index c180a88..c6310ac 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -44,7 +44,7 @@ import org.apache.hudi.exception.HoodieRestoreException;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.exception.HoodieSavepointException;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.metadata.HoodieMetadataWriter;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metrics.HoodieMetrics;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.HoodieTimelineArchiveLog;
@@ -124,7 +124,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     this.rollbackPending = rollbackPending;
 
     // Initialize Metadata Table
-    HoodieMetadataWriter.create(hadoopConf, writeConfig).initialize(jsc);
+    HoodieTableMetadataWriter.create(hadoopConf, writeConfig, jsc);
   }
 
   /**
@@ -701,7 +701,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     finalizeWrite(table, compactionCommitTime, writeStats);
     LOG.info("Committing Compaction " + compactionCommitTime + ". Finished 
with result " + metadata);
 
-    table.metadata().update(jsc, metadata, compactionCommitTime);
+    table.metadataWriter(jsc).update(metadata, compactionCommitTime);
 
     CompactHelpers.completeInflightCompaction(table, compactionCommitTime, 
metadata);
 
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
similarity index 68%
rename from 
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
rename to 
hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 08a785d..992c240 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -18,22 +18,6 @@
 
 package org.apache.hudi.metadata;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
@@ -41,10 +25,11 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.client.utils.ClientUtils;
+import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -63,6 +48,7 @@ import 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
 import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
 import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
@@ -74,74 +60,100 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.metrics.DistributedRegistry;
 import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 import scala.Tuple2;
 
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.METADATA_TABLE_NAME_SUFFIX;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
+
 /**
- * Writer for Metadata Table.
- *
- * Partition and file listing are saved within an internal MOR table called 
Metadata Table. This table is created
- * by listing files and partitions (first time) and kept in sync using the 
instants on the main dataset.
+ * Writer implementation backed by an internal hudi table. Partition and file 
listing are saved within an internal MOR table
+ * called Metadata Table. This table is created by listing files and 
partitions (first time)
+ * and kept in sync using the instants on the main dataset.
  */
-public class HoodieMetadataWriter extends HoodieMetadataReader implements 
Serializable {
-  private static final Logger LOG = 
LogManager.getLogger(HoodieMetadataWriter.class);
+public class HoodieBackedTableMetadataWriter implements 
HoodieTableMetadataWriter {
 
-  // Metric names
-  public static final String INITIALIZE_STR = "initialize";
-  public static final String SYNC_STR = "sync";
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBackedTableMetadataWriter.class);
 
-  private HoodieWriteConfig config;
+  private HoodieWriteConfig metadataWriteConfig;
+  private HoodieWriteConfig datasetWriteConfig;
   private String tableName;
-  private static Map<String, HoodieMetadataWriter> instances = new HashMap<>();
-
-  public static HoodieMetadataWriter create(Configuration conf, 
HoodieWriteConfig writeConfig) {
-    String key = writeConfig.getBasePath();
-    if (instances.containsKey(key)) {
-      if (instances.get(key).enabled() != 
writeConfig.useFileListingMetadata()) {
-        // Enabled state has changed. Remove so it is recreated.
-        instances.remove(key);
-      }
-    }
 
-    return instances.computeIfAbsent(key, k -> {
-      try {
-        return new HoodieMetadataWriter(conf, writeConfig);
-      } catch (IOException e) {
-        throw new HoodieMetadataException("Could not initialize 
HoodieMetadataWriter", e);
-      }
-    });
-  }
+  private HoodieBackedTableMetadata metadata;
+  private HoodieTableMetaClient metaClient;
+  private Option<HoodieMetadataMetrics> metrics;
+  private boolean enabled;
+  private SerializableConfiguration hadoopConf;
+  private final transient JavaSparkContext jsc;
 
-  HoodieMetadataWriter(Configuration hadoopConf, HoodieWriteConfig 
writeConfig) throws IOException {
-    super(hadoopConf, writeConfig.getBasePath(), 
writeConfig.getSpillableMapBasePath(),
-        writeConfig.useFileListingMetadata(), 
writeConfig.getFileListingMetadataVerify(), false);
+  HoodieBackedTableMetadataWriter(Configuration hadoopConf, HoodieWriteConfig 
writeConfig, JavaSparkContext jsc) {
+    this.datasetWriteConfig = writeConfig;
+    this.jsc = jsc;
+    this.hadoopConf = new SerializableConfiguration(hadoopConf);
 
     if (writeConfig.useFileListingMetadata()) {
       this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
-      this.config = createMetadataWriteConfig(writeConfig);
+      this.metadataWriteConfig = createMetadataWriteConfig(writeConfig);
       enabled = true;
 
       // Inline compaction and auto clean is required as we dont expose this 
table outside
-      ValidationUtils.checkArgument(this.config.isAutoClean(), "Auto clean is 
required for Metadata Compaction config");
-      ValidationUtils.checkArgument(this.config.isInlineCompaction(), "Inline 
compaction is required for Metadata Compaction config");
-      // Metadata Table cannot have its metadata optimized
-      ValidationUtils.checkArgument(this.config.shouldAutoCommit(), "Auto 
commit is required for Metadata Table");
-      ValidationUtils.checkArgument(!this.config.useFileListingMetadata(), 
"File listing cannot be used for Metadata Table");
-
-      if (config.isMetricsOn()) {
-        if (config.isExecutorMetricsEnabled()) {
-          metricsRegistry = Registry.getRegistry("HoodieMetadata", 
DistributedRegistry.class.getName());
+      ValidationUtils.checkArgument(this.metadataWriteConfig.isAutoClean(), 
"Auto clean is required for Metadata Compaction config");
+      
ValidationUtils.checkArgument(this.metadataWriteConfig.isInlineCompaction(), 
"Inline compaction is required for Metadata Compaction config");
+      // Metadata Table cannot have metadata listing turned on. (infinite 
loop, much?)
+      
ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(), 
"Auto commit is required for Metadata Table");
+      
ValidationUtils.checkArgument(!this.metadataWriteConfig.useFileListingMetadata(),
 "File listing cannot be used for Metadata Table");
+
+      if (metadataWriteConfig.isMetricsOn()) {
+        Registry registry;
+        if (metadataWriteConfig.isExecutorMetricsEnabled()) {
+          registry = Registry.getRegistry("HoodieMetadata", 
DistributedRegistry.class.getName());
         } else {
-          metricsRegistry = Registry.getRegistry("HoodieMetadata");
+          registry = Registry.getRegistry("HoodieMetadata");
         }
+        this.metrics = Option.of(new HoodieMetadataMetrics(registry));
+      } else {
+        this.metrics = Option.empty();
+      }
+
+      HoodieTableMetaClient datasetMetaClient = new 
HoodieTableMetaClient(hadoopConf, datasetWriteConfig.getBasePath());
+      initialize(jsc, datasetMetaClient);
+      if (enabled) {
+        // (re) init the metadata for reading.
+        initTableMetadata();
+
+        // This is always called even in case the table was created for the 
first time. This is because
+        // initFromFilesystem() does file listing and hence may take a long 
time during which some new updates
+        // may have occurred on the table. Hence, calling this always ensures 
that the metadata is brought in sync
+        // with the active timeline.
+        HoodieTimer timer = new HoodieTimer().startTimer();
+        syncFromInstants(jsc, datasetMetaClient);
+        metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.SYNC_STR, 
timer.endTimer()));
       }
     } else {
       enabled = false;
+      this.metrics = Option.empty();
     }
   }
 
@@ -149,9 +161,8 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
    * Create a {@code HoodieWriteConfig} to use for the Metadata Table.
    *
    * @param writeConfig {@code HoodieWriteConfig} of the main dataset writer
-   * @param schemaStr Metadata Table schema
    */
-  private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig 
writeConfig) throws IOException {
+  private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig 
writeConfig) {
     int parallelism = writeConfig.getMetadataInsertParallelism();
 
     // Create the write config for the metadata table by borrowing options 
from the main write config.
@@ -168,7 +179,7 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
         .withAvroSchemaValidate(true)
         .withEmbeddedTimelineServerEnabled(false)
         .withAssumeDatePartitioning(false)
-        .withPath(metadataBasePath)
+        
.withPath(HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()))
         .withSchema(HoodieMetadataRecord.getClassSchema().toString())
         .forTable(tableName)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder()
@@ -217,16 +228,11 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
   }
 
   public HoodieWriteConfig getWriteConfig() {
-    return config;
+    return metadataWriteConfig;
   }
 
-  /**
-   * Reload the metadata table by syncing it based on the commits on the 
dataset.
-   */
-  public void reload(JavaSparkContext jsc) throws IOException {
-    if (enabled) {
-      initialize(jsc);
-    }
+  public HoodieTableMetadata metadata() {
+    return metadata;
   }
 
   /**
@@ -240,57 +246,37 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
    * which are large in size (AVRO or JSON encoded and not compressed) and 
incur considerable IO for de-serialization
    * and decoding.
    */
-  public void initialize(JavaSparkContext jsc) {
+  private void initialize(JavaSparkContext jsc, HoodieTableMetaClient 
datasetMetaClient) {
     try {
-      if (metricsRegistry instanceof DistributedRegistry) {
-        ((DistributedRegistry) metricsRegistry).register(jsc);
-      }
+      metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> {
+        if (registry instanceof DistributedRegistry) {
+          ((DistributedRegistry) registry).register(jsc);
+        }
+      });
 
       if (enabled) {
-        initializeAndSync(jsc);
+        bootstrapIfNeeded(jsc, datasetMetaClient);
       }
     } catch (IOException e) {
-      LOG.error("Failed to initialize metadata table. Metdata will be 
disabled.", e);
+      LOG.error("Failed to initialize metadata table. Disabling the writer.", 
e);
       enabled = false;
     }
   }
 
-  private void initializeAndSync(JavaSparkContext jsc) throws IOException {
-    long t1 = System.currentTimeMillis();
-
-    HoodieTableMetaClient datasetMetaClient = new 
HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
-    FileSystem fs = FSUtils.getFs(metadataBasePath, hadoopConf.get());
-    boolean exists = fs.exists(new Path(metadataBasePath, 
HoodieTableMetaClient.METAFOLDER_NAME));
+  private void initTableMetadata() {
+    this.metadata = new HoodieBackedTableMetadata(hadoopConf.get(), 
datasetWriteConfig.getBasePath(), datasetWriteConfig.getSpillableMapBasePath(),
+        datasetWriteConfig.useFileListingMetadata(), 
datasetWriteConfig.getFileListingMetadataVerify(), false,
+        datasetWriteConfig.shouldAssumeDatePartitioning());
+    this.metaClient = metadata.getMetaClient();
+  }
 
+  private void bootstrapIfNeeded(JavaSparkContext jsc, HoodieTableMetaClient 
datasetMetaClient) throws IOException {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    boolean exists = datasetMetaClient.getFs().exists(new 
Path(metadataWriteConfig.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME));
     if (!exists) {
       // Initialize for the first time by listing partitions and files 
directly from the file system
-      initFromFilesystem(jsc, datasetMetaClient);
-    } else {
-      metaClient = ClientUtils.createMetaClient(hadoopConf.get(), config, 
true);
-    }
-
-    /*
-    // TODO: We may not be able to sync in certain cases (instants archived 
etc)
-    //if (!canSync(datasetMetaClient)) {
-      // Need to recreate the table as sync has failed
-      // TODO: delete the table
-    //  initFromFilesystem(datasetMetaClient);
-    //}
-    */
-
-    // This is always called even in case the table was created for the first 
time. This is because
-    // initFromFilesystem() does file listing and hence may take a long time 
during which some new updates
-    // may have occurred on the table. Hence, calling this always ensures that 
the metadata is brought in sync
-    // with the active timeline.
-    syncFromInstants(jsc, datasetMetaClient);
-
-    // Publish some metrics
-    long durationInMs = System.currentTimeMillis() - t1;
-    // Time to initilize and sync
-    if (exists) {
-      updateMetrics(SYNC_STR, durationInMs);
-    } else {
-      updateMetrics(INITIALIZE_STR, durationInMs);
+      bootstrapFromFilesystem(jsc, datasetMetaClient);
+      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
     }
   }
 
@@ -299,7 +285,7 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
    *
    * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
    */
-  private void initFromFilesystem(JavaSparkContext jsc, HoodieTableMetaClient 
datasetMetaClient) throws IOException {
+  private void bootstrapFromFilesystem(JavaSparkContext jsc, 
HoodieTableMetaClient datasetMetaClient) throws IOException {
     ValidationUtils.checkState(enabled, "Metadata table cannot be initialized 
as it is not enabled");
 
     // If there is no commit on the dataset yet, use the SOLO_COMMIT_TIMESTAMP 
as the instant time for initial commit
@@ -313,24 +299,26 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
         latestInstant = Option.of(instant);
       }
     }
-    String createInstantTime = latestInstant.isPresent() ? 
latestInstant.get().getTimestamp() : SOLO_COMMIT_TIMESTAMP;
-    LOG.info("Creating a new metadata table in " + metadataBasePath + " at 
instant " + createInstantTime);
-    metaClient = HoodieTableMetaClient.initTableType(hadoopConf.get(), 
metadataBasePath.toString(),
+
+    String createInstantTime = 
latestInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+    LOG.info("Creating a new metadata table in " + 
metadataWriteConfig.getBasePath() + " at instant " + createInstantTime);
+
+    HoodieTableMetaClient.initTableType(hadoopConf.get(), 
metadataWriteConfig.getBasePath(),
         HoodieTableType.MERGE_ON_READ, tableName, "archived", 
HoodieMetadataPayload.class.getName(),
         HoodieFileFormat.HFILE.toString());
+    initTableMetadata();
 
     // List all partitions in the basePath of the containing dataset
     FileSystem fs = datasetMetaClient.getFs();
-    List<String> partitions = FSUtils.getAllPartitionPaths(fs, 
datasetBasePath, false);
+    List<String> partitions = FSUtils.getAllPartitionPaths(fs, 
datasetWriteConfig.getBasePath(), 
datasetWriteConfig.shouldAssumeDatePartitioning());
     LOG.info("Initializing metadata table by using file listings in " + 
partitions.size() + " partitions");
 
     // List all partitions in parallel and collect the files in them
-    final String dbasePath = datasetBasePath;
     int parallelism =  Math.min(partitions.size(), jsc.defaultParallelism()) + 
1; // +1 to prevent 0 parallelism
     JavaPairRDD<String, FileStatus[]> partitionFileListRDD = 
jsc.parallelize(partitions, parallelism)
         .mapToPair(partition -> {
           FileSystem fsys = datasetMetaClient.getFs();
-          FileStatus[] statuses = FSUtils.getAllDataFilesInPartition(fsys, new 
Path(dbasePath, partition));
+          FileStatus[] statuses = FSUtils.getAllDataFilesInPartition(fsys, new 
Path(datasetWriteConfig.getBasePath(), partition));
           return new Tuple2<>(partition, statuses);
         });
 
@@ -339,12 +327,12 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
 
     // Create a HoodieCommitMetadata with writeStats for all discovered files
     int[] stats = {0};
-    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
 
     partitionFileList.forEach(t -> {
       final String partition = t._1;
       try {
-        if (!fs.exists(new Path(datasetBasePath, partition + Path.SEPARATOR + 
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))) {
+        if (!fs.exists(new Path(datasetWriteConfig.getBasePath(), partition + 
Path.SEPARATOR + HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE))) {
           return;
         }
       } catch (IOException e) {
@@ -367,20 +355,20 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
         writeStat.setPath(partition + Path.SEPARATOR + 
status.getPath().getName());
         writeStat.setPartitionPath(partition);
         writeStat.setTotalWriteBytes(status.getLen());
-        metadata.addWriteStat(partition, writeStat);
+        commitMetadata.addWriteStat(partition, writeStat);
         stats[0] += 1;
       });
 
       // If the partition has no files then create a writeStat with no file 
path
-      if (metadata.getWriteStats(partition) == null) {
+      if (commitMetadata.getWriteStats(partition) == null) {
         HoodieWriteStat writeStat = new HoodieWriteStat();
         writeStat.setPartitionPath(partition);
-        metadata.addWriteStat(partition, writeStat);
+        commitMetadata.addWriteStat(partition, writeStat);
       }
     });
 
     LOG.info("Committing " + partitionFileList.size() + " partitions and " + 
stats[0] + " files to metadata");
-    update(jsc, metadata, createInstantTime);
+    update(commitMetadata, createInstantTime);
   }
 
   /**
@@ -388,71 +376,77 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
    *
    * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
    */
-  private void syncFromInstants(JavaSparkContext jsc, HoodieTableMetaClient 
datasetMetaClient) throws IOException {
+  private void syncFromInstants(JavaSparkContext jsc, HoodieTableMetaClient 
datasetMetaClient) {
     ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it 
is not enabled");
 
-    List<HoodieInstant> instantsToSync = findInstantsToSync(datasetMetaClient);
-    if (instantsToSync.isEmpty()) {
-      return;
-    }
+    try {
+      List<HoodieInstant> instantsToSync = 
metadata.findInstantsToSync(datasetMetaClient);
+      if (instantsToSync.isEmpty()) {
+        return;
+      }
 
-    LOG.info("Syncing " + instantsToSync.size() + " instants to metadata 
table: " + instantsToSync);
-
-    // Read each instant in order and sync it to metadata table
-    final HoodieActiveTimeline timeline = 
datasetMetaClient.getActiveTimeline();
-    for (HoodieInstant instant : instantsToSync) {
-      LOG.info("Syncing instant " + instant + " to metadata table");
-
-      switch (instant.getAction()) {
-        case HoodieTimeline.CLEAN_ACTION: {
-          // CLEAN is synced from the
-          // - inflight instant which contains the HoodieCleanerPlan, or
-          // - complete instant which contains the HoodieCleanMetadata
-          try {
-            HoodieInstant inflightCleanInstant = new HoodieInstant(true, 
instant.getAction(), instant.getTimestamp());
-            ValidationUtils.checkArgument(inflightCleanInstant.isInflight());
-            HoodieCleanerPlan cleanerPlan = 
CleanerUtils.getCleanerPlan(datasetMetaClient, inflightCleanInstant);
-            update(jsc, cleanerPlan, instant.getTimestamp());
-          } catch (HoodieIOException e) {
-            HoodieInstant cleanInstant = new HoodieInstant(false, 
instant.getAction(), instant.getTimestamp());
-            ValidationUtils.checkArgument(cleanInstant.isCompleted());
-            HoodieCleanMetadata cleanMetadata = 
CleanerUtils.getCleanerMetadata(datasetMetaClient, cleanInstant);
-            update(jsc, cleanMetadata, instant.getTimestamp());
+      LOG.info("Syncing " + instantsToSync.size() + " instants to metadata 
table: " + instantsToSync);
+
+      // Read each instant in order and sync it to metadata table
+      final HoodieActiveTimeline timeline = 
datasetMetaClient.getActiveTimeline();
+      for (HoodieInstant instant : instantsToSync) {
+        LOG.info("Syncing instant " + instant + " to metadata table");
+
+        switch (instant.getAction()) {
+          case HoodieTimeline.CLEAN_ACTION: {
+            // CLEAN is synced from the
+            // - inflight instant which contains the HoodieCleanerPlan, or
+            // - complete instant which contains the HoodieCleanMetadata
+            try {
+              HoodieInstant inflightCleanInstant = new HoodieInstant(true, 
instant.getAction(), instant.getTimestamp());
+              ValidationUtils.checkArgument(inflightCleanInstant.isInflight());
+              HoodieCleanerPlan cleanerPlan = 
CleanerUtils.getCleanerPlan(datasetMetaClient, inflightCleanInstant);
+              update(cleanerPlan, instant.getTimestamp());
+            } catch (HoodieIOException e) {
+              HoodieInstant cleanInstant = new HoodieInstant(false, 
instant.getAction(), instant.getTimestamp());
+              ValidationUtils.checkArgument(cleanInstant.isCompleted());
+              HoodieCleanMetadata cleanMetadata = 
CleanerUtils.getCleanerMetadata(datasetMetaClient, cleanInstant);
+              update(cleanMetadata, instant.getTimestamp());
+            }
+            break;
+          }
+          case HoodieTimeline.DELTA_COMMIT_ACTION:
+          case HoodieTimeline.COMMIT_ACTION:
+          case HoodieTimeline.COMPACTION_ACTION: {
+            ValidationUtils.checkArgument(instant.isCompleted());
+            HoodieCommitMetadata commitMetadata = 
HoodieCommitMetadata.fromBytes(
+                timeline.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
+            update(commitMetadata, instant.getTimestamp());
+            break;
+          }
+          case HoodieTimeline.ROLLBACK_ACTION: {
+            ValidationUtils.checkArgument(instant.isCompleted());
+            HoodieRollbackMetadata rollbackMetadata = 
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
+                timeline.getInstantDetails(instant).get());
+            update(rollbackMetadata, instant.getTimestamp());
+            break;
+          }
+          case HoodieTimeline.RESTORE_ACTION: {
+            ValidationUtils.checkArgument(instant.isCompleted());
+            HoodieRestoreMetadata restoreMetadata = 
TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
+                timeline.getInstantDetails(instant).get());
+            update(restoreMetadata, instant.getTimestamp());
+            break;
+          }
+          case HoodieTimeline.SAVEPOINT_ACTION: {
+            ValidationUtils.checkArgument(instant.isCompleted());
+            // Nothing to be done here
+            break;
+          }
+          default: {
+            throw new HoodieException("Unknown type of action " + 
instant.getAction());
           }
-          break;
-        }
-        case HoodieTimeline.DELTA_COMMIT_ACTION:
-        case HoodieTimeline.COMMIT_ACTION:
-        case HoodieTimeline.COMPACTION_ACTION: {
-          ValidationUtils.checkArgument(instant.isCompleted());
-          HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
-              timeline.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
-          update(jsc, commitMetadata, instant.getTimestamp());
-          break;
-        }
-        case HoodieTimeline.ROLLBACK_ACTION: {
-          ValidationUtils.checkArgument(instant.isCompleted());
-          HoodieRollbackMetadata rollbackMetadata = 
TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
-              timeline.getInstantDetails(instant).get());
-          update(jsc, rollbackMetadata, instant.getTimestamp());
-          break;
-        }
-        case HoodieTimeline.RESTORE_ACTION: {
-          ValidationUtils.checkArgument(instant.isCompleted());
-          HoodieRestoreMetadata restoreMetadata = 
TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
-              timeline.getInstantDetails(instant).get());
-          update(jsc, restoreMetadata, instant.getTimestamp());
-          break;
-        }
-        case HoodieTimeline.SAVEPOINT_ACTION: {
-          ValidationUtils.checkArgument(instant.isCompleted());
-          // Nothing to be done here
-          break;
-        }
-        default: {
-          throw new HoodieException("Unknown type of action " + 
instant.getAction());
         }
       }
+      // re-init the table metadata, for any future writes.
+      initTableMetadata();
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Unable to sync instants from data to 
metadata table.", ioe);
     }
   }
 
@@ -462,7 +456,8 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
    * @param commitMetadata {@code HoodieCommitMetadata}
    * @param instantTime Timestamp at which the commit was performed
    */
-  public void update(JavaSparkContext jsc, HoodieCommitMetadata 
commitMetadata, String instantTime) {
+  @Override
+  public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
     if (!enabled) {
       return;
     }
@@ -494,12 +489,12 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
     });
 
     // New partitions created
-    HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new 
ArrayList<String>(allPartitions));
+    HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(new 
ArrayList<>(allPartitions));
     records.add(record);
 
     LOG.info("Updating at " + instantTime + " from Commit/" + 
commitMetadata.getOperationType()
         + ". #partitions_updated=" + records.size());
-    commit(jsc, prepRecords(jsc, records, METADATA_PARTITION_NAME), 
instantTime);
+    commit(jsc, prepRecords(jsc, records, 
MetadataPartitionType.FILES.partitionPath()), instantTime);
   }
 
   /**
@@ -508,7 +503,8 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
    * @param cleanerPlan {@code HoodieCleanerPlan}
    * @param instantTime Timestamp at which the clean plan was generated
    */
-  public void update(JavaSparkContext jsc, HoodieCleanerPlan cleanerPlan, 
String instantTime) {
+  @Override
+  public void update(HoodieCleanerPlan cleanerPlan, String instantTime) {
     if (!enabled) {
       return;
     }
@@ -535,7 +531,7 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
 
     LOG.info("Updating at " + instantTime + " from CleanerPlan. 
#partitions_updated=" + records.size()
         + ", #files_deleted=" + fileDeleteCount[0]);
-    commit(jsc, prepRecords(jsc, records, METADATA_PARTITION_NAME), 
instantTime);
+    commit(jsc, prepRecords(jsc, records, 
MetadataPartitionType.FILES.partitionPath()), instantTime);
   }
 
   /**
@@ -544,7 +540,8 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
    * @param cleanMetadata {@code HoodieCleanMetadata}
    * @param instantTime Timestamp at which the clean was completed
    */
-  public void update(JavaSparkContext jsc, HoodieCleanMetadata cleanMetadata, 
String instantTime) {
+  @Override
+  public void update(HoodieCleanMetadata cleanMetadata, String instantTime) {
     if (!enabled) {
       return;
     }
@@ -564,7 +561,7 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
 
     LOG.info("Updating at " + instantTime + " from Clean. 
#partitions_updated=" + records.size()
         + ", #files_deleted=" + fileDeleteCount[0]);
-    commit(jsc, prepRecords(jsc, records, METADATA_PARTITION_NAME), 
instantTime);
+    commit(jsc, prepRecords(jsc, records, 
MetadataPartitionType.FILES.partitionPath()), instantTime);
   }
 
   /**
@@ -573,7 +570,8 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
    * @param restoreMetadata {@code HoodieRestoreMetadata}
    * @param instantTime Timestamp at which the restore was performed
    */
-  public void update(JavaSparkContext jsc, HoodieRestoreMetadata 
restoreMetadata, String instantTime) {
+  @Override
+  public void update(HoodieRestoreMetadata restoreMetadata, String 
instantTime) {
     if (!enabled) {
       return;
     }
@@ -592,7 +590,8 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
    * @param rollbackMetadata {@code HoodieRollbackMetadata}
    * @param instantTime Timestamp at which the rollback was performed
    */
-  public void update(JavaSparkContext jsc, HoodieRollbackMetadata 
rollbackMetadata, String instantTime) {
+  @Override
+  public void update(HoodieRollbackMetadata rollbackMetadata, String 
instantTime) {
     if (!enabled) {
       return;
     }
@@ -662,7 +661,7 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
       // Rollbacks deletes instants from timeline. The instant being 
rolled-back may not have been synced to the
       // metadata table. Hence, the deleted filed need to be checked against 
the metadata.
       try {
-        FileStatus[] existingStatuses = getAllFilesInPartition(new 
Path(datasetBasePath, partition));
+        FileStatus[] existingStatuses = metadata.fetchAllFilesInPartition(new 
Path(metadata.getDatasetBasePath(), partition));
         Set<String> currentFiles =
             Arrays.stream(existingStatuses).map(s -> 
s.getPath().getName()).collect(Collectors.toSet());
 
@@ -705,7 +704,7 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
 
     LOG.info("Updating at " + instantTime + " from " + operation + ". 
#partitions_updated=" + records.size()
         + ", #files_deleted=" + fileChangeCount[0] + ", #files_appended=" + 
fileChangeCount[1]);
-    commit(jsc, prepRecords(jsc, records, METADATA_PARTITION_NAME), 
instantTime);
+    commit(jsc, prepRecords(jsc, records, 
MetadataPartitionType.FILES.partitionPath()), instantTime);
   }
 
   /**
@@ -716,10 +715,9 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
    */
   private synchronized void commit(JavaSparkContext jsc, JavaRDD<HoodieRecord> 
recordRDD, String instantTime) {
     ValidationUtils.checkState(enabled, "Metadata table cannot be committed to 
as it is not enabled");
+    metadata.closeReaders();
 
-    closeReaders();
-
-    try (HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config, 
true)) {
+    try (HoodieWriteClient writeClient = new HoodieWriteClient(jsc, 
metadataWriteConfig, true)) {
       writeClient.startCommitWithTime(instantTime);
       List<WriteStatus> statuses = writeClient.upsertPreppedRecords(recordRDD, 
instantTime).collect();
       statuses.forEach(writeStatus -> {
@@ -730,15 +728,17 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
     }
 
     // Update total size of the metadata and count of base/log files
-    Map<String, String> stats;
-    try {
-      stats = getStats(false);
-      updateMetrics(Long.valueOf(stats.get(STAT_TOTAL_BASE_FILE_SIZE)),
-          Long.valueOf(stats.get(STAT_TOTAL_LOG_FILE_SIZE)), 
Integer.valueOf(stats.get(STAT_COUNT_BASE_FILES)),
-          Integer.valueOf(stats.get(STAT_COUNT_LOG_FILES)));
-    } catch (IOException e) {
-      LOG.error("Could not publish metadata size metrics", e);
-    }
+    metrics.ifPresent(m -> {
+      try {
+        Map<String, String> stats = m.getStats(false, metaClient, metadata);
+        
m.updateMetrics(Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE)),
+            
Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE)),
+            
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES)),
+            
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES)));
+      } catch (HoodieIOException e) {
+        LOG.error("Could not publish metadata size metrics", e);
+      }
+    });
   }
 
   /**
@@ -748,16 +748,16 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
    * base file.
    */
   private JavaRDD<HoodieRecord> prepRecords(JavaSparkContext jsc, 
List<HoodieRecord> records, String partitionName) {
-    HoodieTable table = HoodieTable.create(metaClient, config, 
hadoopConf.get());
+    HoodieTable table = HoodieTable.create(metadataWriteConfig, 
hadoopConf.get());
     SliceView fsView = table.getSliceView();
     List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName)
-        .map(s -> s.getBaseFile())
-        .filter(b -> b.isPresent())
-        .map(b -> b.get())
+        .map(FileSlice::getBaseFile)
+        .filter(Option::isPresent)
+        .map(Option::get)
         .collect(Collectors.toList());
 
     // All the metadata fits within a single base file
-    if (partitionName.equals(METADATA_PARTITION_NAME)) {
+    if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) {
       if (baseFiles.size() > 1) {
         throw new HoodieMetadataException("Multiple base files found in 
metadata partition");
       }
@@ -770,10 +770,10 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
       instantTime = baseFiles.get(0).getCommitTime();
     } else {
       // If there is a log file then we can assume that it has the data
-      List<HoodieLogFile> logFiles = 
fsView.getLatestFileSlices(HoodieMetadataWriter.METADATA_PARTITION_NAME)
-          .map(s -> s.getLatestLogFile())
-          .filter(b -> b.isPresent())
-          .map(b -> b.get())
+      List<HoodieLogFile> logFiles = 
fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath())
+          .map(FileSlice::getLatestLogFile)
+          .filter(Option::isPresent)
+          .map(Option::get)
           .collect(Collectors.toList());
       if (logFiles.isEmpty()) {
         // No base and log files. All are new inserts
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
index b91c6d4..8c23ea8 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
@@ -47,9 +47,6 @@ public class HoodieMetadataFileSystemView extends 
HoodieTableFileSystemView {
    */
   @Override
   protected FileStatus[] listPartition(Path partitionPath) throws IOException {
-    FileStatus[] statuses = 
hoodieTable.metadata().getAllFilesInPartition(metaClient.getHadoopConf(),
-        metaClient.getBasePath(), partitionPath);
-
-    return statuses;
+    return hoodieTable.metadata().getAllFilesInPartition(partitionPath);
   }
 }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
new file mode 100644
index 0000000..dc15e9d
--- /dev/null
+++ 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.Serializable;
+
+/**
+ * Interface that supports updating metadata for a given table, as actions 
complete.
+ */
+public interface HoodieTableMetadataWriter extends Serializable {
+
+  static HoodieTableMetadataWriter create(Configuration conf, 
HoodieWriteConfig writeConfig, JavaSparkContext jsc) {
+    return new HoodieBackedTableMetadataWriter(conf, writeConfig, jsc);
+  }
+
+  void update(HoodieCommitMetadata commitMetadata, String instantTime);
+
+  void update(HoodieCleanerPlan cleanerPlan, String instantTime);
+
+  void update(HoodieCleanMetadata cleanMetadata, String instantTime);
+
+  void update(HoodieRestoreMetadata restoreMetadata, String instantTime);
+
+  void update(HoodieRollbackMetadata rollbackMetadata, String instantTime);
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 471fc8d..936b04e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -63,7 +63,8 @@ import org.apache.hudi.exception.HoodieInsertException;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
-import org.apache.hudi.metadata.HoodieMetadataWriter;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.table.action.bootstrap.HoodieBootstrapWriteMetadata;
 import org.apache.log4j.LogManager;
@@ -94,7 +95,9 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload> implements Seri
 
   private SerializableConfiguration hadoopConfiguration;
   private transient FileSystemViewManager viewManager;
-  private HoodieMetadataWriter metadataWriter;
+
+  private HoodieTableMetadataWriter metadataWriter;
+  private HoodieTableMetadata metadata;
 
   protected final SparkTaskContextSupplier sparkTaskContextSupplier = new 
SparkTaskContextSupplier();
 
@@ -632,11 +635,18 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload> implements Seri
     return getBaseFileFormat() == HoodieFileFormat.HFILE;
   }
 
-  public HoodieMetadataWriter metadata() {
-    if (metadataWriter == null) {
-      metadataWriter = HoodieMetadataWriter.create(hadoopConfiguration.get(), 
config);
+  public HoodieTableMetadata metadata() {
+    if (metadata == null) {
+      metadata = HoodieTableMetadata.create(hadoopConfiguration.get(), 
config.getBasePath(), config.getSpillableMapBasePath(),
+          config.useFileListingMetadata(), 
config.getFileListingMetadataVerify(), config.isMetricsOn(), 
config.shouldAssumeDatePartitioning());
     }
+    return metadata;
+  }
 
+  public HoodieTableMetadataWriter metadataWriter(JavaSparkContext jsc) {
+    if (metadataWriter == null) {
+      metadataWriter = 
HoodieTableMetadataWriter.create(hadoopConfiguration.get(), config, jsc);
+    }
     return metadataWriter;
   }
 }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
index 2bffcc2..5d9c571 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
@@ -53,6 +53,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
 
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -216,7 +217,7 @@ public class HoodieTimelineArchiveLog {
 
     // For metadata tables, ensure commits >= latest compaction commit are 
retained. This is required for
     // metadata table sync.
-    if (table.metadata().isMetadataTable(config.getBasePath())) {
+    if (HoodieTableMetadata.isMetadataTable(config.getBasePath())) {
       Option<HoodieInstant> latestCompactionInstant =
           
table.getActiveTimeline().filterPendingCompactionTimeline().lastInstant();
       if (latestCompactionInstant.isPresent()) {
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
 
b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index b8d4212..ebc8b66 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -256,7 +256,7 @@ public class CleanActionExecutor extends 
BaseActionExecutor<HoodieCleanMetadata>
           cleanStats
       );
 
-      table.metadata().update(jsc, cleanerPlan, cleanInstant.getTimestamp());
+      table.metadataWriter(jsc).update(cleanerPlan, 
cleanInstant.getTimestamp());
 
       
table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
           TimelineMetadataUtils.serializeCleanMetadata(metadata));
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
 
b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index 0bde91a..8a2168c 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -185,8 +185,7 @@ public class CleanPlanner<T extends HoodieRecordPayload<T>> 
implements Serializa
    */
   private List<String> getPartitionPathsForFullCleaning() throws IOException {
     // Go to brute force mode of scanning all partitions
-    return 
hoodieTable.metadata().getAllPartitionPaths(hoodieTable.getMetaClient().getFs(),
-        hoodieTable.getMetaClient().getBasePath(), 
config.shouldAssumeDatePartitioning());
+    return hoodieTable.metadata().getAllPartitionPaths();
   }
 
   /**
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
 
b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index 535fa40..1d3e469 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -226,7 +226,7 @@ public abstract class BaseCommitActionExecutor<T extends 
HoodieRecordPayload<T>,
       HoodieCommitMetadata metadata = CommitUtils.buildMetadata(writeStats, 
result.getPartitionToReplaceFileIds(),
           extraMetadata, operationType, getSchemaToStoreInCommit(), 
getCommitActionType());
 
-      table.metadata().update(jsc, metadata, instantTime);
+      table.metadataWriter(jsc).update(metadata, instantTime);
 
       activeTimeline.saveAsComplete(new HoodieInstant(true, 
getCommitActionType(), instantTime),
           Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
 
b/hudi-client/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
index 1f768de..1dec11d 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/action/restore/BaseRestoreActionExecutor.java
@@ -93,7 +93,7 @@ public abstract class BaseRestoreActionExecutor extends 
BaseActionExecutor<Hoodi
     HoodieRestoreMetadata restoreMetadata = 
TimelineMetadataUtils.convertRestoreMetadata(
         instantTime, durationInMs, instantsRolledBack, instantToMetadata);
 
-    table.metadata().update(jsc, restoreMetadata, instantTime);
+    table.metadataWriter(jsc).update(restoreMetadata, instantTime);
 
     table.getActiveTimeline().saveAsComplete(new HoodieInstant(true, 
HoodieTimeline.RESTORE_ACTION, instantTime),
         TimelineMetadataUtils.serializeRestoreMetadata(restoreMetadata));
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
 
b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
index 29eb533..4d455c0 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/action/rollback/BaseRollbackActionExecutor.java
@@ -112,7 +112,7 @@ public abstract class BaseRollbackActionExecutor extends 
BaseActionExecutor<Hood
         Collections.singletonList(instantToRollback),
         stats);
     if (!skipTimelinePublish) {
-      table.metadata().update(jsc, rollbackMetadata, instantTime);
+      table.metadataWriter(jsc).update(rollbackMetadata, instantTime);
       finishRollback(rollbackMetadata);
     }
 
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java 
b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
similarity index 93%
rename from 
hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
rename to 
hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
index 62962d5..7dfb67c 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieFsMetadata.java
@@ -21,6 +21,7 @@ package org.apache.hudi.metadata;
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -39,6 +40,7 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.ClientUtils;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieKey;
@@ -75,8 +77,8 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
-public class TestHoodieMetadata extends HoodieClientTestHarness {
-  private static final Logger LOG = 
LogManager.getLogger(TestHoodieMetadata.class);
+public class TestHoodieFsMetadata extends HoodieClientTestHarness {
+  private static final Logger LOG = 
LogManager.getLogger(TestHoodieFsMetadata.class);
 
   @TempDir
   public java.nio.file.Path tempFolder;
@@ -100,10 +102,9 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
       dfs.mkdirs(new Path(basePath));
     }
     initMetaClient();
-
     initTestDataGenerator();
 
-    metadataTableBasePath = 
HoodieMetadataReader.getMetadataTableBasePath(basePath);
+    metadataTableBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(basePath);
   }
 
   @AfterEach
@@ -159,15 +160,15 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
     try (HoodieWriteClient client = new HoodieWriteClient<>(jsc, 
getWriteConfig(true, true))) {
       client.startCommitWithTime("005");
 
-      List<String> partitions = metadata(client).getAllPartitionPaths(dfs, 
basePath, false);
+      List<String> partitions = 
metadataWriter(client).metadata().getAllPartitionPaths();
       assertFalse(partitions.contains(nonPartitionDirectory),
           "Must not contain the non-partition " + nonPartitionDirectory);
       assertTrue(partitions.contains("p1"), "Must contain partition p1");
       assertTrue(partitions.contains("p2"), "Must contain partition p2");
 
-      FileStatus[] statuses = 
metadata(client).getAllFilesInPartition(hadoopConf, basePath, new 
Path(basePath, "p1"));
+      FileStatus[] statuses = metadata(client).getAllFilesInPartition(new 
Path(basePath, "p1"));
       assertTrue(statuses.length == 2);
-      statuses = metadata(client).getAllFilesInPartition(hadoopConf, basePath, 
new Path(basePath, "p2"));
+      statuses = metadata(client).getAllFilesInPartition(new Path(basePath, 
"p2"));
       assertTrue(statuses.length == 5);
     }
   }
@@ -272,6 +273,7 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
       List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
       client.startCommitWithTime(newCommitTime);
       List<WriteStatus> writeStatuses = 
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
+      assertNoWriteErrors(writeStatuses);
       validateMetadata(client);
 
       // Rollback of inserts
@@ -475,7 +477,7 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
    * Metadata Table should be automatically compacted as per config.
    */
   @ParameterizedTest
-  @ValueSource(booleans =  {true, false})
+  @ValueSource(booleans =  {false})
   public void testArchivingAndCompaction(boolean asyncClean) throws Exception {
     init(HoodieTableType.COPY_ON_WRITE);
 
@@ -600,7 +602,7 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
       List<WriteStatus> writeStatuses = 
client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
       validateMetadata(client);
 
-      List<String> metadataPartitions = 
metadata(client).getAllPartitionPaths(dfs, basePath, false);
+      List<String> metadataPartitions = 
metadata(client).getAllPartitionPaths();
       assertTrue(metadataPartitions.contains(""), "Must contain empty 
partition");
     }
   }
@@ -622,9 +624,9 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
       validateMetadata(client);
 
       Registry metricsRegistry = Registry.getRegistry("HoodieMetadata");
-      
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataWriter.INITIALIZE_STR
 + ".count"));
-      
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataWriter.INITIALIZE_STR
 + ".totalDuration"));
-      
assertEquals(metricsRegistry.getAllCounts().get(HoodieMetadataWriter.INITIALIZE_STR
 + ".count"), 1L);
+      
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR
 + ".count"));
+      
assertTrue(metricsRegistry.getAllCounts().containsKey(HoodieMetadataMetrics.INITIALIZE_STR
 + ".totalDuration"));
+      
assertEquals(metricsRegistry.getAllCounts().get(HoodieMetadataMetrics.INITIALIZE_STR
 + ".count"), 1L);
       assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.size"));
       assertTrue(metricsRegistry.getAllCounts().containsKey("logfile.size"));
       assertTrue(metricsRegistry.getAllCounts().containsKey("basefile.count"));
@@ -639,8 +641,8 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
    */
   private void validateMetadata(HoodieWriteClient client) throws IOException {
     HoodieWriteConfig config = client.getConfig();
-    HoodieMetadataWriter metadata = metadata(client);
-    assertFalse(metadata == null, "MetadataWriter should have been 
initialized");
+    HoodieBackedTableMetadataWriter metadataWriter = metadataWriter(client);
+    assertNotNull(metadataWriter, "MetadataWriter should have been 
initialized");
     if (!config.useFileListingMetadata()) {
       return;
     }
@@ -648,16 +650,16 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
     HoodieTimer timer = new HoodieTimer().startTimer();
 
     // Validate write config for metadata table
-    HoodieWriteConfig metadataWriteConfig = metadata.getWriteConfig();
+    HoodieWriteConfig metadataWriteConfig = metadataWriter.getWriteConfig();
     assertFalse(metadataWriteConfig.useFileListingMetadata(), "No metadata 
table for metadata table");
     assertFalse(metadataWriteConfig.getFileListingMetadataVerify(), "No verify 
for metadata table");
 
     // Metadata table should be in sync with the dataset
-    assertTrue(metadata.isInSync());
+    assertTrue(metadata(client).isInSync());
 
     // Partitions should match
     List<String> fsPartitions = 
FSUtils.getAllFoldersWithPartitionMetaFile(dfs, basePath);
-    List<String> metadataPartitions = metadata.getAllPartitionPaths(dfs, 
basePath, false);
+    List<String> metadataPartitions = 
metadataWriter.metadata().getAllPartitionPaths();
 
     Collections.sort(fsPartitions);
     Collections.sort(metadataPartitions);
@@ -679,7 +681,7 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
           partitionPath = new Path(basePath, partition);
         }
         FileStatus[] fsStatuses = FSUtils.getAllDataFilesInPartition(dfs, 
partitionPath);
-        FileStatus[] metaStatuses = 
metadata.getAllFilesInPartition(hadoopConf, basePath, partitionPath);
+        FileStatus[] metaStatuses = 
metadataWriter.metadata().getAllFilesInPartition(partitionPath);
         List<String> fsFileNames = Arrays.stream(fsStatuses)
             .map(s -> s.getPath().getName()).collect(Collectors.toList());
         List<String> metadataFilenames = Arrays.stream(metaStatuses)
@@ -700,9 +702,9 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
         // FileSystemView should expose the same data
         List<HoodieFileGroup> fileGroups = 
tableView.getAllFileGroups(partition).collect(Collectors.toList());
 
-        fileGroups.stream().forEach(g -> 
LogManager.getLogger(TestHoodieMetadata.class).info(g));
-        fileGroups.stream().forEach(g -> g.getAllBaseFiles().forEach(b -> 
LogManager.getLogger(TestHoodieMetadata.class).info(b)));
-        fileGroups.stream().forEach(g -> g.getAllFileSlices().forEach(s -> 
LogManager.getLogger(TestHoodieMetadata.class).info(s)));
+        fileGroups.forEach(g -> 
LogManager.getLogger(TestHoodieFsMetadata.class).info(g));
+        fileGroups.forEach(g -> g.getAllBaseFiles().forEach(b -> 
LogManager.getLogger(TestHoodieFsMetadata.class).info(b)));
+        fileGroups.forEach(g -> g.getAllFileSlices().forEach(s -> 
LogManager.getLogger(TestHoodieFsMetadata.class).info(s)));
 
         long numFiles = fileGroups.stream()
             .mapToLong(g -> g.getAllBaseFiles().count() + 
g.getAllFileSlices().mapToLong(s -> s.getLogFiles().count()).sum())
@@ -718,7 +720,7 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
     HoodieTableMetaClient metadataMetaClient = new 
HoodieTableMetaClient(hadoopConf, metadataTableBasePath);
 
     // Metadata table should be in sync with the dataset
-    assertTrue(metadata.isInSync());
+    assertTrue(metadataWriter.metadata().isInSync());
 
     // Metadata table is MOR
     assertEquals(metadataMetaClient.getTableType(), 
HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
@@ -730,9 +732,9 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
     // Metadata table has a fixed number of partitions
     // Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that 
function filters all directory
     // in the .hoodie folder.
-    List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(dfs, 
metadata.getMetadataTableBasePath(basePath),
+    List<String> metadataTablePartitions = FSUtils.getAllPartitionPaths(dfs, 
HoodieTableMetadata.getMetadataTableBasePath(basePath),
         false);
-    assertEquals(HoodieMetadataWriter.METADATA_ALL_PARTITIONS.length, 
metadataTablePartitions.size());
+    assertEquals(MetadataPartitionType.values().length, 
metadataTablePartitions.size());
 
     // Metadata table should automatically compact and clean
     // versions are +1 as autoclean / compaction happens end of commits
@@ -747,6 +749,7 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
             "Should limit files to num versions configured");
       }
 
+      List<FileSlice> slices = 
fsView.getAllFileSlices(partition).collect(Collectors.toList());
       assertTrue(fsView.getAllFileSlices(partition).count() <= 
numFileVersions, "Should limit file slice to "
           + numFileVersions + " but was " + 
fsView.getAllFileSlices(partition).count());
     });
@@ -754,8 +757,14 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
     LOG.info("Validation time=" + timer.endTimer());
   }
 
-  private HoodieMetadataWriter metadata(HoodieWriteClient client) {
-    return HoodieMetadataWriter.create(hadoopConf, client.getConfig());
+  private HoodieBackedTableMetadataWriter metadataWriter(HoodieWriteClient 
client) {
+    return (HoodieBackedTableMetadataWriter) 
HoodieTableMetadataWriter.create(hadoopConf, client.getConfig(), jsc);
+  }
+
+  private HoodieBackedTableMetadata metadata(HoodieWriteClient client) {
+    HoodieWriteConfig clientConfig = client.getConfig();
+    return (HoodieBackedTableMetadata) HoodieTableMetadata.create(hadoopConf, 
clientConfig.getBasePath(), clientConfig.getSpillableMapBasePath(),
+        clientConfig.useFileListingMetadata(), 
clientConfig.getFileListingMetadataVerify(), false, 
clientConfig.shouldAssumeDatePartitioning());
   }
 
   // TODO: this can be moved to TestHarness after merge from master
@@ -774,7 +783,7 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
                                                           boolean 
enableMetrics) {
     return 
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 
2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
-        .withAutoCommit(autoCommit).withAssumeDatePartitioning(true)
+        .withAutoCommit(autoCommit).withAssumeDatePartitioning(false)
         
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024
 * 1024 * 1024)
             
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
             
.withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
similarity index 59%
rename from 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java
rename to 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index c14f402..5ce933d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -19,11 +19,9 @@
 package org.apache.hudi.metadata;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -43,7 +41,6 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -64,112 +61,68 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 /**
- * Reader for Metadata Table.
+ * Table metadata provided by an internal DFS backed Hudi metadata table.
  *
  * If the metadata table does not exist, RPC calls are used to retrieve file 
listings from the file system.
  * No updates are applied to the table and it is not synced.
  */
-public class HoodieMetadataReader implements Serializable {
-  private static final Logger LOG = 
LogManager.getLogger(HoodieMetadataReader.class);
-
-  // Base path of the Metadata Table relative to the dataset (.hoodie/metadata)
-  private static final String METADATA_TABLE_REL_PATH = 
HoodieTableMetaClient.METAFOLDER_NAME + Path.SEPARATOR
-      + "metadata";
-
-  // Table name suffix
-  protected static final String METADATA_TABLE_NAME_SUFFIX = "_metadata";
-
-  // Timestamp for a commit when the base dataset had not had any commits yet.
-  protected static final String SOLO_COMMIT_TIMESTAMP = "00000000000000";
-
-
-  // Name of partition which saves file listings
-  public static final String METADATA_PARTITION_NAME = "metadata_partition";
-  // List of all partitions
-  public static final String[] METADATA_ALL_PARTITIONS = 
{METADATA_PARTITION_NAME};
-  // Key for the record which saves list of all partitions
-  protected static final String RECORDKEY_PARTITION_LIST = 
"__all_partitions__";
-
-  // The partition name used for non-partitioned tables
-  protected static final String NON_PARTITIONED_NAME = ".";
-
-  // Metric names
-  public static final String LOOKUP_PARTITIONS_STR = "lookup_partitions";
-  public static final String LOOKUP_FILES_STR = "lookup_files";
-  public static final String VALIDATE_PARTITIONS_STR = "validate_partitions";
-  public static final String VALIDATE_FILES_STR = "validate_files";
-  public static final String VALIDATE_ERRORS_STR = "validate_errors";
-  public static final String SCAN_STR = "scan";
-  public static final String BASEFILE_READ_STR = "basefile_read";
-
-  // Stats names
-  public static final String STAT_TOTAL_BASE_FILE_SIZE = 
"totalBaseFileSizeInBytes";
-  public static final String STAT_TOTAL_LOG_FILE_SIZE = 
"totalLogFileSizeInBytes";
-  public static final String STAT_COUNT_BASE_FILES = "baseFileCount";
-  public static final String STAT_COUNT_LOG_FILES = "logFileCount";
-  public static final String STAT_COUNT_PARTITION = "partitionCount";
-  public static final String STAT_IN_SYNC = "isInSync";
-  public static final String STAT_LAST_COMPACTION_TIMESTAMP = 
"lastCompactionTimestamp";
-
-  // A base directory where the metadata tables should be saved outside the 
dataset directory.
-  // This is used in tests on existing datasets.
-  private static String metadataBaseDirectory;
-
-  protected final SerializableConfiguration hadoopConf;
-  protected final String datasetBasePath;
-  protected final String metadataBasePath;
-  protected Registry metricsRegistry;
-  protected HoodieTableMetaClient metaClient;
-  protected boolean enabled;
-  private final boolean validateLookups;
-  private long maxMemorySizeInBytes = 1024 * 1024 * 1024; // TODO
-  private int bufferSize = 10 * 1024 * 1024; // TODO
+public class HoodieBackedTableMetadata implements HoodieTableMetadata {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBackedTableMetadata.class);
+  private static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
+  private static final int BUFFER_SIZE = 10 * 1024 * 1024;
 
-  // Directory used for Splillable Map when merging records
-  private String spillableMapDirectory;
+  private final SerializableConfiguration hadoopConf;
+  private final String datasetBasePath;
+  private final String metadataBasePath;
+  private final Option<HoodieMetadataMetrics> metrics;
+  private HoodieTableMetaClient metaClient;
+
+  private boolean enabled;
+  private final boolean validateLookups;
+  private final boolean assumeDatePartitioning;
+  // Directory used for Spillable Map when merging records
+  private final String spillableMapDirectory;
 
   // Readers for the base and log file which store the metadata
   private transient HoodieFileReader<GenericRecord> basefileReader;
   private transient HoodieMetadataMergedLogRecordScanner logRecordScanner;
 
-  /**
-   * Create a the Metadata Table in read-only mode.
-   */
-  public HoodieMetadataReader(Configuration conf, String datasetBasePath, 
String spillableMapDirectory,
-                              boolean enabled, boolean validateLookups) {
-    this(conf, datasetBasePath, spillableMapDirectory, enabled, 
validateLookups, false);
+  public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, 
String spillableMapDirectory,
+                                   boolean enabled, boolean validateLookups, 
boolean assumeDatePartitioning) {
+    this(conf, datasetBasePath, spillableMapDirectory, enabled, 
validateLookups, false, assumeDatePartitioning);
   }
 
-  /**
-   * Create a the Metadata Table in read-only mode.
-   */
-  public HoodieMetadataReader(Configuration conf, String datasetBasePath, 
String spillableMapDirectory,
-                              boolean enabled, boolean validateLookups, 
boolean enableMetrics) {
+  public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, 
String spillableMapDirectory,
+                                   boolean enabled, boolean validateLookups, 
boolean enableMetrics,
+                                   boolean assumeDatePartitioning) {
     this.hadoopConf = new SerializableConfiguration(conf);
     this.datasetBasePath = datasetBasePath;
-    this.metadataBasePath = getMetadataTableBasePath(datasetBasePath);
+    this.metadataBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath);
     this.validateLookups = validateLookups;
     this.spillableMapDirectory = spillableMapDirectory;
+    this.enabled = enabled;
+    this.assumeDatePartitioning = assumeDatePartitioning;
 
     if (enabled) {
       try {
-        metaClient = new HoodieTableMetaClient(hadoopConf.get(), 
metadataBasePath);
+        this.metaClient = new HoodieTableMetaClient(hadoopConf.get(), 
metadataBasePath);
       } catch (TableNotFoundException e) {
         LOG.error("Metadata table was not found at path " + metadataBasePath);
-        enabled = false;
+        this.enabled = false;
       } catch (Exception e) {
         LOG.error("Failed to initialize metadata table at path " + 
metadataBasePath, e);
-        enabled = false;
+        this.enabled = false;
       }
     } else {
       LOG.info("Metadata table is disabled.");
     }
 
     if (enableMetrics) {
-      metricsRegistry = Registry.getRegistry("HoodieMetadata");
+      this.metrics = Option.of(new 
HoodieMetadataMetrics(Registry.getRegistry("HoodieMetadata")));
+    } else {
+      this.metrics = Option.empty();
     }
-
-    this.enabled = enabled;
   }
 
   /**
@@ -180,21 +133,20 @@ public class HoodieMetadataReader implements Serializable 
{
    *
    * On any errors retrieving the listing from the metadata, defaults to using 
the file system listings.
    *
-   * @param fs The {@code FileSystem}
-   * @param basePath Base path of the dataset
-   * @param assumeDatePartitioning True if the dataset uses date based 
partitioning
    */
-  public List<String> getAllPartitionPaths(FileSystem fs, String basePath, 
boolean assumeDatePartitioning)
+  @Override
+  public List<String> getAllPartitionPaths()
       throws IOException {
     if (enabled) {
       try {
-        return getAllPartitionPaths();
+        return fetchAllPartitionPaths();
       } catch (Exception e) {
-        LOG.error("Failed to retrive list of partition from metadata", e);
+        LOG.error("Failed to retrieve list of partition from metadata", e);
       }
     }
 
-    return getAllPartitionPathsByListing(fs, basePath, assumeDatePartitioning);
+    FileSystem fs = FSUtils.getFs(datasetBasePath, hadoopConf.get());
+    return FSUtils.getAllPartitionPaths(fs, datasetBasePath, 
assumeDatePartitioning);
   }
 
   /**
@@ -205,29 +157,29 @@ public class HoodieMetadataReader implements Serializable 
{
    *
    * On any errors retrieving the listing from the metadata, defaults to using 
the file system listings.
    *
-   * @param hadoopConf {@code Configuration}
    * @param partitionPath The absolute path of the partition to list
    */
-  public FileStatus[] getAllFilesInPartition(Configuration hadoopConf, String 
basePath, Path partitionPath)
+  @Override
+  public FileStatus[] getAllFilesInPartition(Path partitionPath)
       throws IOException {
     if (enabled) {
       try {
-        return getAllFilesInPartition(partitionPath);
+        return fetchAllFilesInPartition(partitionPath);
       } catch (Exception e) {
         LOG.error("Failed to retrive files in partition " + partitionPath + " 
from metadata", e);
       }
     }
 
-    return getAllFilesInPartitionByListing(hadoopConf, basePath, 
partitionPath);
+    return FSUtils.getFs(partitionPath.toString(), 
hadoopConf.get()).listStatus(partitionPath);
   }
 
   /**
    * Returns a list of all partitions.
    */
-  protected List<String> getAllPartitionPaths() throws IOException {
+  protected List<String> fetchAllPartitionPaths() throws IOException {
     HoodieTimer timer = new HoodieTimer().startTimer();
     Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = 
getMergedRecordByKey(RECORDKEY_PARTITION_LIST);
-    updateMetrics(LOOKUP_PARTITIONS_STR, timer.endTimer());
+    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer()));
 
     List<String> partitions = Collections.emptyList();
     if (hoodieRecord.isPresent()) {
@@ -247,8 +199,8 @@ public class HoodieMetadataReader implements Serializable {
     if (validateLookups) {
       // Validate the Metadata Table data by listing the partitions from the 
file system
       timer.startTimer();
-      List<String> actualPartitions  = 
getAllPartitionPathsByListing(metaClient.getFs(), datasetBasePath, false);
-      updateMetrics(VALIDATE_PARTITIONS_STR, timer.endTimer());
+      List<String> actualPartitions  = 
FSUtils.getAllPartitionPaths(metaClient.getFs(), datasetBasePath, false);
+      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR, 
timer.endTimer()));
 
       Collections.sort(actualPartitions);
       Collections.sort(partitions);
@@ -257,7 +209,7 @@ public class HoodieMetadataReader implements Serializable {
         LOG.error("Partitions from metadata: " + 
Arrays.toString(partitions.toArray()));
         LOG.error("Partitions from file system: " + 
Arrays.toString(actualPartitions.toArray()));
 
-        updateMetrics(VALIDATE_ERRORS_STR, 0);
+        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
       }
 
       // Return the direct listing as it should be correct
@@ -273,15 +225,15 @@ public class HoodieMetadataReader implements Serializable 
{
    *
    * @param partitionPath The absolute path of the partition
    */
-  FileStatus[] getAllFilesInPartition(Path partitionPath) throws IOException {
+  FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException 
{
     String partitionName = FSUtils.getRelativePartitionPath(new 
Path(datasetBasePath), partitionPath);
-    if (partitionName.equals("")) {
+    if (partitionName.isEmpty()) {
       partitionName = NON_PARTITIONED_NAME;
     }
 
     HoodieTimer timer = new HoodieTimer().startTimer();
     Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = 
getMergedRecordByKey(partitionName);
-    updateMetrics(LOOKUP_FILES_STR, timer.endTimer());
+    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
 
     FileStatus[] statuses = {};
     if (hoodieRecord.isPresent()) {
@@ -299,7 +251,7 @@ public class HoodieMetadataReader implements Serializable {
       // Ignore partition metadata file
       FileStatus[] directStatuses = 
metaClient.getFs().listStatus(partitionPath,
           p -> 
!p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
-      updateMetrics(VALIDATE_FILES_STR, timer.endTimer());
+      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer()));
 
       List<String> directFilenames = Arrays.stream(directStatuses)
           .map(s -> s.getPath().getName()).sorted()
@@ -314,7 +266,7 @@ public class HoodieMetadataReader implements Serializable {
         LOG.error("File list from metadata: " + 
Arrays.toString(metadataFilenames.toArray()));
         LOG.error("File list from direct listing: " + 
Arrays.toString(directFilenames.toArray()));
 
-        updateMetrics(VALIDATE_ERRORS_STR, 0);
+        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
       }
 
       // Return the direct listing as it should be correct
@@ -325,16 +277,6 @@ public class HoodieMetadataReader implements Serializable {
     return statuses;
   }
 
-  private FileStatus[] getAllFilesInPartitionByListing(Configuration 
hadoopConf, String basePath, Path partitionPath)
-      throws IOException {
-    return FSUtils.getFs(partitionPath.toString(), 
hadoopConf).listStatus(partitionPath);
-  }
-
-  private List<String> getAllPartitionPathsByListing(FileSystem fs, String 
basePath, boolean assumeDatePartitioning)
-      throws IOException {
-    return FSUtils.getAllPartitionPaths(fs, basePath, assumeDatePartitioning);
-  }
-
   /**
    * Retrieve the merged {@code HoodieRecord} mapped to the given key.
    *
@@ -349,9 +291,9 @@ public class HoodieMetadataReader implements Serializable {
       HoodieTimer timer = new HoodieTimer().startTimer();
       Option<GenericRecord> baseRecord = basefileReader.getRecordByKey(key);
       if (baseRecord.isPresent()) {
-        hoodieRecord = 
SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) baseRecord.get(),
+        hoodieRecord = 
SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
             metaClient.getTableConfig().getPayloadClass());
-        updateMetrics(BASEFILE_READ_STR, timer.endTimer());
+        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, timer.endTimer()));
       }
     }
 
@@ -379,19 +321,18 @@ public class HoodieMetadataReader implements Serializable 
{
       return;
     }
 
-    long t1 = System.currentTimeMillis();
+    HoodieTimer timer = new HoodieTimer().startTimer();
 
     // Metadata is in sync till the latest completed instant on the dataset
     HoodieTableMetaClient datasetMetaClient = new 
HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
     Option<HoodieInstant> datasetLatestInstant = 
datasetMetaClient.getActiveTimeline().filterCompletedInstants()
         .lastInstant();
-    String latestInstantTime = datasetLatestInstant.isPresent() ? 
datasetLatestInstant.get().getTimestamp()
-        : SOLO_COMMIT_TIMESTAMP;
+    String latestInstantTime = 
datasetLatestInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
 
     // Find the latest file slice
     HoodieTimeline timeline = metaClient.reloadActiveTimeline();
     HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
-    List<FileSlice> latestSlices = 
fsView.getLatestFileSlices(METADATA_PARTITION_NAME).collect(Collectors.toList());
+    List<FileSlice> latestSlices = 
fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
     ValidationUtils.checkArgument(latestSlices.size() == 1);
 
     // If the base file is present then create a reader
@@ -399,7 +340,6 @@ public class HoodieMetadataReader implements Serializable {
     if (basefile.isPresent()) {
       String basefilePath = basefile.get().getPath();
       basefileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), 
new Path(basefilePath));
-
       LOG.info("Opened metadata base file from " + basefilePath + " at instant 
" + basefile.get().getCommitTime());
     }
 
@@ -408,26 +348,26 @@ public class HoodieMetadataReader implements Serializable 
{
         .collect(Collectors.toList());
 
     Option<HoodieInstant> lastInstant = 
timeline.filterCompletedInstants().lastInstant();
-    String latestMetaInstantTimestamp = lastInstant.isPresent() ? 
lastInstant.get().getTimestamp()
-        : SOLO_COMMIT_TIMESTAMP;
+    String latestMetaInstantTimestamp = 
lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+
     if (!HoodieTimeline.compareTimestamps(latestInstantTime, 
HoodieTimeline.EQUALS, latestMetaInstantTimestamp)) {
-      // TODO: This can be false positive if the metadata table had a 
compaction or clean
+      // TODO(metadata): This can be false positive if the metadata table had 
a compaction or clean
       LOG.warn("Metadata has more recent instant " + 
latestMetaInstantTimestamp + " than dataset " + latestInstantTime);
     }
 
     // Load the schema
     Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
 
-    // TODO: The below code may open the metadata to include incomplete 
instants on the dataset
+    // TODO(metadata): The below code may open the metadata to include 
incomplete instants on the dataset
     logRecordScanner =
         new HoodieMetadataMergedLogRecordScanner(metaClient.getFs(), 
metadataBasePath,
-            logFilePaths, schema, latestMetaInstantTimestamp, 
maxMemorySizeInBytes, bufferSize,
+            logFilePaths, schema, latestMetaInstantTimestamp, 
MAX_MEMORY_SIZE_IN_BYTES, BUFFER_SIZE,
             spillableMapDirectory, null);
 
     LOG.info("Opened metadata log files from " + logFilePaths + " at instant " 
+ latestInstantTime
         + "(dataset instant=" + latestInstantTime + ", metadata instant=" + 
latestMetaInstantTimestamp + ")");
 
-    updateMetrics(SCAN_STR, System.currentTimeMillis() - t1);
+    metrics.ifPresent(metrics -> 
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, timer.endTimer()));
   }
 
   protected void closeReaders() {
@@ -441,8 +381,8 @@ public class HoodieMetadataReader implements Serializable {
   /**
    * Return {@code True} if all Instants from the dataset have been synced 
with the Metadata Table.
    */
+  @Override
   public boolean isInSync() {
-    // There should not be any instants to sync
     return enabled && findInstantsToSync().isEmpty();
   }
 
@@ -464,19 +404,22 @@ public class HoodieMetadataReader implements Serializable 
{
 
     // If there has not been any compaction then the first delta commit 
instant should be the one at which
     // the metadata table was created. We should not sync any instants before 
that creation time.
+    // FIXME(metadata): or it could be that compaction has not happened for a 
while, right.
     Option<HoodieInstant> oldestMetaInstant = Option.empty();
     if (!compactionTimestamp.isPresent()) {
       oldestMetaInstant = 
metaTimeline.getDeltaCommitTimeline().filterCompletedInstants().firstInstant();
       if (oldestMetaInstant.isPresent()) {
-        // TODO: Ensure this is the instant at which we created the metadata 
table
+        // FIXME(metadata): Ensure this is the instant at which we created the 
metadata table
       }
     }
 
-    String metaSyncTimestamp = compactionTimestamp.isPresent() ? 
compactionTimestamp.get()
-        : oldestMetaInstant.isPresent() ? 
oldestMetaInstant.get().getTimestamp() : SOLO_COMMIT_TIMESTAMP;
+    String metaSyncTimestamp = compactionTimestamp.orElse(
+        
oldestMetaInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP)
+    );
 
     // Metadata table is updated when an instant is completed except for the 
following:
     //  CLEAN: metadata table is updated during inflight. So for CLEAN we 
accept inflight actions.
+    // FIXME(metadata): This need not be the case, right? It's risky to do 
this?
     List<HoodieInstant> datasetInstants = 
datasetMetaClient.getActiveTimeline().getInstants()
         .filter(i -> i.isCompleted() || 
(i.getAction().equals(HoodieTimeline.CLEAN_ACTION) && i.isInflight()))
         .filter(i -> metaSyncTimestamp.isEmpty()
@@ -495,7 +438,7 @@ public class HoodieMetadataReader implements Serializable {
       if (metadataInstantMap.containsKey(instant.getTimestamp())) {
         // instant already synced to metadata table
         if (!instantsToSync.isEmpty()) {
-          // TODO: async clean and async compaction are not yet handled. They 
have a timestamp which is in the past
+          // FIXME(metadata): async clean and async compaction are not yet 
handled. They have a timestamp which is in the past
           // (when the operation was scheduled) and even on completion they 
retain their old timestamp.
           LOG.warn("Found out-of-order already synced instant " + instant + ". 
Instants to sync=" + instantsToSync);
         }
@@ -509,11 +452,13 @@ public class HoodieMetadataReader implements Serializable 
{
   /**
    * Return the timestamp of the latest compaction instant.
    */
+  @Override
   public Option<String> getLatestCompactionTimestamp() {
     if (!enabled) {
       return Option.empty();
     }
 
+    //FIXME(metadata): should we really reload this?
     HoodieTimeline timeline = metaClient.reloadActiveTimeline();
     Option<HoodieInstant> lastCompactionInstant = 
timeline.filterCompletedInstants()
         .filter(i -> 
i.getAction().equals(HoodieTimeline.COMMIT_ACTION)).lastInstant();
@@ -525,114 +470,23 @@ public class HoodieMetadataReader implements 
Serializable {
     }
   }
 
-  public Map<String, String> getStats(boolean detailed) throws IOException {
-    metaClient.reloadActiveTimeline();
-    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
-    return getStats(fsView, detailed);
-  }
-
-  private Map<String, String> getStats(HoodieTableFileSystemView fsView, 
boolean detailed) throws IOException {
-    Map<String, String> stats = new HashMap<>();
-
-    // Total size of the metadata and count of base/log files
-    long totalBaseFileSizeInBytes = 0;
-    long totalLogFileSizeInBytes = 0;
-    int baseFileCount = 0;
-    int logFileCount = 0;
-    List<FileSlice> latestSlices = 
fsView.getLatestFileSlices(METADATA_PARTITION_NAME).collect(Collectors.toList());
-
-    for (FileSlice slice : latestSlices) {
-      if (slice.getBaseFile().isPresent()) {
-        totalBaseFileSizeInBytes += 
slice.getBaseFile().get().getFileStatus().getLen();
-        ++baseFileCount;
-      }
-      Iterator<HoodieLogFile> it = slice.getLogFiles().iterator();
-      while (it.hasNext()) {
-        totalLogFileSizeInBytes += it.next().getFileStatus().getLen();
-        ++logFileCount;
-      }
-    }
-
-    stats.put(STAT_TOTAL_BASE_FILE_SIZE, 
String.valueOf(totalBaseFileSizeInBytes));
-    stats.put(STAT_TOTAL_LOG_FILE_SIZE, 
String.valueOf(totalLogFileSizeInBytes));
-    stats.put(STAT_COUNT_BASE_FILES, String.valueOf(baseFileCount));
-    stats.put(STAT_COUNT_LOG_FILES, String.valueOf(logFileCount));
-
-    if (detailed) {
-      stats.put(STAT_COUNT_PARTITION, 
String.valueOf(getAllPartitionPaths().size()));
-      stats.put(STAT_IN_SYNC, String.valueOf(isInSync()));
-      stats.put(STAT_LAST_COMPACTION_TIMESTAMP, 
getLatestCompactionTimestamp().orElseGet(() -> "none"));
-    }
-
-    return stats;
-  }
-
-  protected void updateMetrics(String action, long durationInMs) {
-    if (metricsRegistry == null) {
-      return;
-    }
-
-    // Update sum of duration and total for count
-    String countKey = action + ".count";
-    String durationKey = action + ".totalDuration";
-    metricsRegistry.add(countKey, 1);
-    metricsRegistry.add(durationKey, durationInMs);
-
-    LOG.info(String.format("Updating metadata metrics (%s=%dms, %s=1)", 
durationKey, durationInMs, countKey));
-  }
-
-  protected void updateMetrics(long totalBaseFileSizeInBytes, long 
totalLogFileSizeInBytes, int baseFileCount,
-                               int logFileCount) {
-    if (metricsRegistry == null) {
-      return;
-    }
-
-    // Update sizes and count for metadata table's data files
-    metricsRegistry.add("basefile.size", totalBaseFileSizeInBytes);
-    metricsRegistry.add("logfile.size", totalLogFileSizeInBytes);
-    metricsRegistry.add("basefile.count", baseFileCount);
-    metricsRegistry.add("logfile.count", logFileCount);
-
-    LOG.info(String.format("Updating metadata size metrics (basefile.size=%d, 
logfile.size=%d, basefile.count=%d, "
-        + "logfile.count=%d)", totalBaseFileSizeInBytes, 
totalLogFileSizeInBytes, baseFileCount, logFileCount));
+  public boolean enabled() {
+    return enabled;
   }
 
-  /**
-   * Return the base path of the Metadata Table.
-   *
-   * @param tableBasePath The base path of the dataset
-   */
-  public static String getMetadataTableBasePath(String tableBasePath) {
-    if (metadataBaseDirectory != null) {
-      return metadataBaseDirectory;
-    }
-
-    return tableBasePath + Path.SEPARATOR + METADATA_TABLE_REL_PATH;
+  public SerializableConfiguration getHadoopConf() {
+    return hadoopConf;
   }
 
-  /**
-   * Returns {@code True} if the given path contains a metadata table.
-   *
-   * @param basePath The base path to check
-   */
-  public static boolean isMetadataTable(String basePath) {
-    return basePath.endsWith(METADATA_TABLE_REL_PATH);
+  public String getDatasetBasePath() {
+    return datasetBasePath;
   }
 
-  /**
-   * Sets the directory to store/read Metadata Table.
-   *
-   * This can be used to store the metadata table away from the dataset 
directory.
-   *  - Useful for testing as well as for using via the HUDI CLI so that the 
actual dataset is not written to.
-   *  - Useful for testing Metadata Table performance and operations on 
existing datasets before enabling.
-   */
-  public static void setMetadataBaseDirectory(String metadataDir) {
-    ValidationUtils.checkState(metadataBaseDirectory == null,
-        "metadataBaseDirectory is already set to " + metadataBaseDirectory);
-    metadataBaseDirectory = metadataDir;
+  public HoodieTableMetaClient getMetaClient() {
+    return metaClient;
   }
 
-  public boolean enabled() {
-    return enabled;
+  public Map<String, String> stats() {
+    return metrics.map(m -> m.getStats(true, metaClient, this)).orElse(new 
HashMap<>());
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
index 191bbcd..3f245ad 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
@@ -70,11 +70,6 @@ public class HoodieMetadataMergedLogRecordScanner extends 
HoodieMergedLogRecordS
    * @return {@code HoodieRecord} if key was found else {@code Option.empty()}
    */
   public Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String 
key) {
-    HoodieRecord record = records.get(key);
-    if (record == null) {
-      return Option.empty();
-    }
-
-    return Option.of(record);
+    return Option.ofNullable((HoodieRecord) records.get(key));
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
new file mode 100644
index 0000000..29a2219
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.exception.HoodieIOException;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class HoodieMetadataMetrics implements Serializable {
+
+  // Metric names
+  public static final String LOOKUP_PARTITIONS_STR = "lookup_partitions";
+  public static final String LOOKUP_FILES_STR = "lookup_files";
+  public static final String VALIDATE_PARTITIONS_STR = "validate_partitions";
+  public static final String VALIDATE_FILES_STR = "validate_files";
+  public static final String VALIDATE_ERRORS_STR = "validate_errors";
+  public static final String SCAN_STR = "scan";
+  public static final String BASEFILE_READ_STR = "basefile_read";
+  public static final String INITIALIZE_STR = "initialize";
+  public static final String SYNC_STR = "sync";
+
+  // Stats names
+  public static final String STAT_TOTAL_BASE_FILE_SIZE = 
"totalBaseFileSizeInBytes";
+  public static final String STAT_TOTAL_LOG_FILE_SIZE = 
"totalLogFileSizeInBytes";
+  public static final String STAT_COUNT_BASE_FILES = "baseFileCount";
+  public static final String STAT_COUNT_LOG_FILES = "logFileCount";
+  public static final String STAT_COUNT_PARTITION = "partitionCount";
+  public static final String STAT_IN_SYNC = "isInSync";
+  public static final String STAT_LAST_COMPACTION_TIMESTAMP = 
"lastCompactionTimestamp";
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieMetadataMetrics.class);
+
+  private final Registry metricsRegistry;
+
+  public HoodieMetadataMetrics(Registry metricsRegistry) {
+    this.metricsRegistry = metricsRegistry;
+  }
+
+  public Map<String, String> getStats(boolean detailed, HoodieTableMetaClient 
metaClient, HoodieTableMetadata metadata) {
+    try {
+      metaClient.reloadActiveTimeline();
+      HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
+      return getStats(fsView, detailed, metadata);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Unable to get metadata stats.", ioe);
+    }
+  }
+
+  private Map<String, String> getStats(HoodieTableFileSystemView fsView, 
boolean detailed, HoodieTableMetadata tableMetadata) throws IOException {
+    Map<String, String> stats = new HashMap<>();
+
+    // Total size of the metadata and count of base/log files
+    long totalBaseFileSizeInBytes = 0;
+    long totalLogFileSizeInBytes = 0;
+    int baseFileCount = 0;
+    int logFileCount = 0;
+    List<FileSlice> latestSlices = 
fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
+
+    for (FileSlice slice : latestSlices) {
+      if (slice.getBaseFile().isPresent()) {
+        totalBaseFileSizeInBytes += 
slice.getBaseFile().get().getFileStatus().getLen();
+        ++baseFileCount;
+      }
+      Iterator<HoodieLogFile> it = slice.getLogFiles().iterator();
+      while (it.hasNext()) {
+        totalLogFileSizeInBytes += it.next().getFileStatus().getLen();
+        ++logFileCount;
+      }
+    }
+
+    stats.put(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE, 
String.valueOf(totalBaseFileSizeInBytes));
+    stats.put(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE, 
String.valueOf(totalLogFileSizeInBytes));
+    stats.put(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES, 
String.valueOf(baseFileCount));
+    stats.put(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES, 
String.valueOf(logFileCount));
+
+    if (detailed) {
+      stats.put(HoodieMetadataMetrics.STAT_COUNT_PARTITION, 
String.valueOf(tableMetadata.getAllPartitionPaths().size()));
+      stats.put(HoodieMetadataMetrics.STAT_IN_SYNC, 
String.valueOf(tableMetadata.isInSync()));
+      stats.put(HoodieMetadataMetrics.STAT_LAST_COMPACTION_TIMESTAMP, 
tableMetadata.getLatestCompactionTimestamp().orElseGet(() -> "none"));
+    }
+
+    return stats;
+  }
+
+  protected void updateMetrics(String action, long durationInMs) {
+    if (metricsRegistry == null) {
+      return;
+    }
+
+    // Update sum of duration and total for count
+    String countKey = action + ".count";
+    String durationKey = action + ".totalDuration";
+    metricsRegistry.add(countKey, 1);
+    metricsRegistry.add(durationKey, durationInMs);
+
+    LOG.info(String.format("Updating metadata metrics (%s=%dms, %s=1)", 
durationKey, durationInMs, countKey));
+  }
+
+  protected void updateMetrics(long totalBaseFileSizeInBytes, long 
totalLogFileSizeInBytes, int baseFileCount,
+                               int logFileCount) {
+    if (metricsRegistry == null) {
+      return;
+    }
+
+    // Update sizes and count for metadata table's data files
+    metricsRegistry.add("basefile.size", totalBaseFileSizeInBytes);
+    metricsRegistry.add("logfile.size", totalLogFileSizeInBytes);
+    metricsRegistry.add("basefile.count", baseFileCount);
+    metricsRegistry.add("logfile.count", logFileCount);
+
+    LOG.info(String.format("Updating metadata size metrics (basefile.size=%d, 
logfile.size=%d, basefile.count=%d, "
+        + "logfile.count=%d)", totalBaseFileSizeInBytes, 
totalLogFileSizeInBytes, baseFileCount, logFileCount));
+  }
+
+  public Registry registry() {
+    return metricsRegistry;
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 087984c..d99b342 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -40,8 +40,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static 
org.apache.hudi.metadata.HoodieMetadataReader.METADATA_PARTITION_NAME;
-import static 
org.apache.hudi.metadata.HoodieMetadataReader.RECORDKEY_PARTITION_LIST;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.RECORDKEY_PARTITION_LIST;
 
 /**
  * This is a payload which saves information about a single entry in the 
Metadata Table.
@@ -100,7 +99,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
     partitions.forEach(partition -> fileInfo.put(partition, new 
HoodieMetadataFileInfo(0L,  false)));
 
-    HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, 
METADATA_PARTITION_NAME);
+    HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.FILES.partitionPath());
     HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
     return new HoodieRecord<>(key, payload);
   }
@@ -120,7 +119,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
     filesDeleted.ifPresent(
         m -> m.forEach(filename -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(0L,  true))));
 
-    HoodieKey key = new HoodieKey(partition, METADATA_PARTITION_NAME);
+    HoodieKey key = new HoodieKey(partition, 
MetadataPartitionType.FILES.partitionPath());
     HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
     return new HoodieRecord<>(key, payload);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
new file mode 100644
index 0000000..3a1a7a4
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface that supports querying various pieces of metadata about a hudi 
table.
+ */
+public interface HoodieTableMetadata extends Serializable {
+
+  // Table name suffix
+  String METADATA_TABLE_NAME_SUFFIX = "_metadata";
+  // Timestamp for a commit when the base dataset had not had any commits yet.
+  String SOLO_COMMIT_TIMESTAMP = "00000000000000";
+  // Key for the record which saves list of all partitions
+  String RECORDKEY_PARTITION_LIST = "__all_partitions__";
+  // The partition name used for non-partitioned tables
+  String NON_PARTITIONED_NAME = ".";
+
+  // Base path of the Metadata Table relative to the dataset (.hoodie/metadata)
+  static final String METADATA_TABLE_REL_PATH = 
HoodieTableMetaClient.METAFOLDER_NAME + Path.SEPARATOR + "metadata";
+
+  /**
+   * Return the base path of the Metadata Table.
+   *
+   * @param tableBasePath The base path of the dataset
+   */
+  static String getMetadataTableBasePath(String tableBasePath) {
+    return tableBasePath + Path.SEPARATOR + METADATA_TABLE_REL_PATH;
+  }
+
+  /**
+   * Returns {@code True} if the given path contains a metadata table.
+   *
+   * @param basePath The base path to check
+   */
+  static boolean isMetadataTable(String basePath) {
+    return basePath.endsWith(METADATA_TABLE_REL_PATH);
+  }
+
+  static HoodieTableMetadata create(Configuration conf, String 
datasetBasePath, String spillableMapPath, boolean useFileListingFromMetadata,
+                                    boolean verifyListings, boolean 
enableMetrics, boolean shouldAssumeDatePartitioning) {
+    return new HoodieBackedTableMetadata(conf, datasetBasePath, 
spillableMapPath, useFileListingFromMetadata, verifyListings,
+        enableMetrics, shouldAssumeDatePartitioning);
+  }
+
+  /**
+   * Fetch all the files at the given partition path, per the latest snapshot 
of the metadata.
+   */
+  FileStatus[] getAllFilesInPartition(Path partitionPath) throws IOException;
+
+  /**
+   * Fetch list of all partition paths, per the latest snapshot of the 
metadata.
+   */
+  List<String> getAllPartitionPaths() throws IOException;
+
+  Option<String> getLatestCompactionTimestamp();
+
+  boolean isInSync();
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
new file mode 100644
index 0000000..0436de7
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+public enum MetadataPartitionType {
+  FILES("files");
+
+  private final String partitionPath;
+
+  MetadataPartitionType(String partitionPath) {
+    this.partitionPath = partitionPath;
+  }
+
+  public String partitionPath() {
+    return partitionPath;
+  }
+}

Reply via email to