This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d305948cac63e2a88a8d2df5bdf64ffd6fc49f5a
Author: Vinoth Chandar <[email protected]>
AuthorDate: Thu Oct 29 18:33:56 2020 -0700

    [RFC-15] Fixing code review comments
---
 .../apache/hudi/cli/commands/MetadataCommand.java  | 11 ++--
 .../apache/hudi/client/AbstractHoodieClient.java   |  5 +-
 .../org/apache/hudi/client/HoodieWriteClient.java  |  2 +-
 .../apache/hudi/config/HoodieMetadataConfig.java   | 61 +++++++++++-----------
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 16 +++---
 .../apache/hudi/metadata/HoodieMetadataWriter.java |  3 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |  2 +-
 .../apache/hudi/metadata/TestHoodieMetadata.java   | 16 +++---
 .../hudi/common/fs/HoodieWrapperFileSystem.java    | 29 +++++-----
 ...nputStream.java => TimedFSDataInputStream.java} | 18 +++----
 .../apache/hudi/common/metrics/LocalRegistry.java  |  4 +-
 .../org/apache/hudi/common/metrics/Registry.java   | 27 +++++-----
 .../hudi/common/table/log/HoodieLogFileReader.java |  4 +-
 .../apache/hudi/metadata/HoodieMetadataReader.java | 36 ++++++-------
 14 files changed, 113 insertions(+), 121 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
index 4ecc6a9..a45e9b4 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hudi.cli.HoodieCLI;
 import org.apache.hudi.cli.utils.SparkUtil;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.config.HoodieMetadataConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.HoodieMetadataReader;
@@ -71,13 +72,11 @@ public class MetadataCommand implements CommandMarker {
       HoodieCLI.fs.mkdirs(metadataPath);
     }
 
-    long t1 = System.currentTimeMillis();
+    HoodieTimer timer = new HoodieTimer().startTimer();
     HoodieWriteConfig writeConfig = getWriteConfig();
     initJavaSparkContext();
-    HoodieMetadataWriter.instance(HoodieCLI.conf, writeConfig).initialize(jsc);
-    long t2 = System.currentTimeMillis();
-
-    return String.format("Created Metadata Table in %s (duration=%.2fsec)", 
metadataPath, (t2 - t1) / 1000.0);
+    HoodieMetadataWriter.create(HoodieCLI.conf, writeConfig).initialize(jsc);
+    return String.format("Created Metadata Table in %s (duration=%.2f secs)", 
metadataPath, timer.endTimer() / 1000.0);
   }
 
   @CliCommand(value = "metadata delete", help = "Remove the Metadata Table")
@@ -115,7 +114,7 @@ public class MetadataCommand implements CommandMarker {
     } else {
       HoodieWriteConfig writeConfig = getWriteConfig();
       initJavaSparkContext();
-      HoodieMetadataWriter.instance(HoodieCLI.conf, 
writeConfig).initialize(jsc);
+      HoodieMetadataWriter.create(HoodieCLI.conf, writeConfig).initialize(jsc);
     }
     long t2 = System.currentTimeMillis();
 
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java 
b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
index fd02b6d..f1abe7c 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
@@ -72,6 +72,7 @@ public abstract class AbstractHoodieClient implements 
Serializable, AutoCloseabl
     this.timelineServer = timelineServer;
     shouldStopTimelineServer = !timelineServer.isPresent();
     startEmbeddedServerView();
+    initWrapperFSMetrics();
   }
 
   /**
@@ -123,7 +124,7 @@ public abstract class AbstractHoodieClient implements 
Serializable, AutoCloseabl
     return config;
   }
 
-  protected HoodieTableMetaClient createMetaClient(boolean 
loadActiveTimelineOnLoad) {
+  private void initWrapperFSMetrics() {
     if (config.isMetricsOn()) {
       Registry registry;
       Registry registryMeta;
@@ -143,7 +144,9 @@ public abstract class AbstractHoodieClient implements 
Serializable, AutoCloseabl
 
       HoodieWrapperFileSystem.setMetricsRegistry(registry, registryMeta);
     }
+  }
 
+  protected HoodieTableMetaClient createMetaClient(boolean 
loadActiveTimelineOnLoad) {
     return ClientUtils.createMetaClient(hadoopConf, config, 
loadActiveTimelineOnLoad);
   }
 }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java 
b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 614f0dc..c180a88 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -124,7 +124,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     this.rollbackPending = rollbackPending;
 
     // Initialize Metadata Table
-    HoodieMetadataWriter.instance(hadoopConf, writeConfig).initialize(jsc);
+    HoodieMetadataWriter.create(hadoopConf, writeConfig).initialize(jsc);
   }
 
   /**
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java 
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java
index ca9c723..53142b1 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.config;
 
 import org.apache.hudi.common.config.DefaultHoodieConfig;
-import org.apache.hudi.config.HoodieCompactionConfig.Builder;
 
 import javax.annotation.concurrent.Immutable;
 
@@ -37,33 +36,33 @@ public class HoodieMetadataConfig extends 
DefaultHoodieConfig {
   public static final String METADATA_PREFIX = "hoodie.metadata";
 
   // Enable the internal Metadata Table which saves file listings
-  public static final String METADATA_ENABLE = METADATA_PREFIX + ".enable";
+  public static final String METADATA_ENABLE_PROP = METADATA_PREFIX + 
".enable";
   public static final boolean DEFAULT_METADATA_ENABLE = false;
 
   // Validate contents of Metadata Table on each access against the actual 
filesystem
-  public static final String METADATA_VALIDATE = METADATA_PREFIX + ".validate";
+  public static final String METADATA_VALIDATE_PROP = METADATA_PREFIX + 
".validate";
   public static final boolean DEFAULT_METADATA_VALIDATE = false;
 
   // Parallelism for inserts
-  public static final String INSERT_PARALLELISM = METADATA_PREFIX + 
".insert.parallelism";
-  public static final int DEFAULT_INSERT_PARALLELISM = 1;
+  public static final String METADATA_INSERT_PARALLELISM_PROP = 
METADATA_PREFIX + ".insert.parallelism";
+  public static final int DEFAULT_METADATA_INSERT_PARALLELISM = 1;
 
   // Async clean
-  public static final String ASYNC_CLEAN = METADATA_PREFIX + ".clean.async";
-  public static final boolean DEFAULT_ASYNC_CLEAN = false;
+  public static final String METADATA_ASYNC_CLEAN_PROP = METADATA_PREFIX + 
".clean.async";
+  public static final boolean DEFAULT_METADATA_ASYNC_CLEAN = false;
 
   // Maximum delta commits before compaction occurs
-  public static final String COMPACT_NUM_DELTA_COMMITS = METADATA_PREFIX + 
".compact.max.delta.commits";
-  public static final int DEFAULT_COMPACT_NUM_DELTA_COMMITS = 24;
+  public static final String METADATA_COMPACT_NUM_DELTA_COMMITS_PROP = 
METADATA_PREFIX + ".compact.max.delta.commits";
+  public static final int DEFAULT_METADATA_COMPACT_NUM_DELTA_COMMITS = 24;
 
   // Archival settings
-  public static final String MIN_COMMITS_TO_KEEP = METADATA_PREFIX + 
".keep.min.commits";
+  public static final String MIN_COMMITS_TO_KEEP_PROP = METADATA_PREFIX + 
".keep.min.commits";
   public static final int DEFAULT_MIN_COMMITS_TO_KEEP = 20;
-  public static final String MAX_COMMITS_TO_KEEP = METADATA_PREFIX + 
".keep.max.commits";
+  public static final String MAX_COMMITS_TO_KEEP_PROP = METADATA_PREFIX + 
".keep.max.commits";
   public static final int DEFAULT_MAX_COMMITS_TO_KEEP = 30;
 
   // Cleaner commits retained
-  public static final String CLEANER_COMMITS_RETAINED = METADATA_PREFIX + 
".cleaner.commits.retained";
+  public static final String CLEANER_COMMITS_RETAINED_PROP = METADATA_PREFIX + 
".cleaner.commits.retained";
   public static final int DEFAULT_CLEANER_COMMITS_RETAINED = 3;
 
   private HoodieMetadataConfig(Properties props) {
@@ -91,58 +90,58 @@ public class HoodieMetadataConfig extends 
DefaultHoodieConfig {
     }
 
     public Builder enable(boolean enable) {
-      props.setProperty(METADATA_ENABLE, String.valueOf(enable));
+      props.setProperty(METADATA_ENABLE_PROP, String.valueOf(enable));
       return this;
     }
 
     public Builder validate(boolean validate) {
-      props.setProperty(METADATA_VALIDATE, String.valueOf(validate));
+      props.setProperty(METADATA_VALIDATE_PROP, String.valueOf(validate));
       return this;
     }
 
     public Builder withInsertParallelism(int parallelism) {
-      props.setProperty(INSERT_PARALLELISM, String.valueOf(parallelism));
+      props.setProperty(METADATA_INSERT_PARALLELISM_PROP, 
String.valueOf(parallelism));
       return this;
     }
 
     public Builder withAsyncClean(boolean asyncClean) {
-      props.setProperty(ASYNC_CLEAN, String.valueOf(asyncClean));
+      props.setProperty(METADATA_ASYNC_CLEAN_PROP, String.valueOf(asyncClean));
       return this;
     }
 
     public Builder withMaxNumDeltaCommitsBeforeCompaction(int 
maxNumDeltaCommitsBeforeCompaction) {
-      props.setProperty(COMPACT_NUM_DELTA_COMMITS, 
String.valueOf(maxNumDeltaCommitsBeforeCompaction));
+      props.setProperty(METADATA_COMPACT_NUM_DELTA_COMMITS_PROP, 
String.valueOf(maxNumDeltaCommitsBeforeCompaction));
       return this;
     }
 
     public Builder archiveCommitsWith(int minToKeep, int maxToKeep) {
-      props.setProperty(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep));
-      props.setProperty(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep));
+      props.setProperty(MIN_COMMITS_TO_KEEP_PROP, String.valueOf(minToKeep));
+      props.setProperty(MAX_COMMITS_TO_KEEP_PROP, String.valueOf(maxToKeep));
       return this;
     }
 
     public Builder retainCommits(int commitsRetained) {
-      props.setProperty(CLEANER_COMMITS_RETAINED, 
String.valueOf(commitsRetained));
+      props.setProperty(CLEANER_COMMITS_RETAINED_PROP, 
String.valueOf(commitsRetained));
       return this;
     }
 
     public HoodieMetadataConfig build() {
       HoodieMetadataConfig config = new HoodieMetadataConfig(props);
-      setDefaultOnCondition(props, !props.containsKey(METADATA_ENABLE), 
METADATA_ENABLE,
+      setDefaultOnCondition(props, !props.containsKey(METADATA_ENABLE_PROP), 
METADATA_ENABLE_PROP,
           String.valueOf(DEFAULT_METADATA_ENABLE));
-      setDefaultOnCondition(props, !props.containsKey(METADATA_VALIDATE), 
METADATA_VALIDATE,
+      setDefaultOnCondition(props, !props.containsKey(METADATA_VALIDATE_PROP), 
METADATA_VALIDATE_PROP,
           String.valueOf(DEFAULT_METADATA_VALIDATE));
-      setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), 
INSERT_PARALLELISM,
-          String.valueOf(DEFAULT_INSERT_PARALLELISM));
-      setDefaultOnCondition(props, !props.containsKey(ASYNC_CLEAN), 
ASYNC_CLEAN,
-          String.valueOf(DEFAULT_ASYNC_CLEAN));
-      setDefaultOnCondition(props, 
!props.containsKey(COMPACT_NUM_DELTA_COMMITS),
-          COMPACT_NUM_DELTA_COMMITS, 
String.valueOf(DEFAULT_COMPACT_NUM_DELTA_COMMITS));
-      setDefaultOnCondition(props, 
!props.containsKey(CLEANER_COMMITS_RETAINED), CLEANER_COMMITS_RETAINED,
+      setDefaultOnCondition(props, 
!props.containsKey(METADATA_INSERT_PARALLELISM_PROP), 
METADATA_INSERT_PARALLELISM_PROP,
+          String.valueOf(DEFAULT_METADATA_INSERT_PARALLELISM));
+      setDefaultOnCondition(props, 
!props.containsKey(METADATA_ASYNC_CLEAN_PROP), METADATA_ASYNC_CLEAN_PROP,
+          String.valueOf(DEFAULT_METADATA_ASYNC_CLEAN));
+      setDefaultOnCondition(props, 
!props.containsKey(METADATA_COMPACT_NUM_DELTA_COMMITS_PROP),
+          METADATA_COMPACT_NUM_DELTA_COMMITS_PROP, 
String.valueOf(DEFAULT_METADATA_COMPACT_NUM_DELTA_COMMITS));
+      setDefaultOnCondition(props, 
!props.containsKey(CLEANER_COMMITS_RETAINED_PROP), 
CLEANER_COMMITS_RETAINED_PROP,
           String.valueOf(DEFAULT_CLEANER_COMMITS_RETAINED));
-      setDefaultOnCondition(props, !props.containsKey(MAX_COMMITS_TO_KEEP), 
MAX_COMMITS_TO_KEEP,
+      setDefaultOnCondition(props, 
!props.containsKey(MAX_COMMITS_TO_KEEP_PROP), MAX_COMMITS_TO_KEEP_PROP,
           String.valueOf(DEFAULT_MAX_COMMITS_TO_KEEP));
-      setDefaultOnCondition(props, !props.containsKey(MIN_COMMITS_TO_KEEP), 
MIN_COMMITS_TO_KEEP,
+      setDefaultOnCondition(props, 
!props.containsKey(MIN_COMMITS_TO_KEEP_PROP), MIN_COMMITS_TO_KEEP_PROP,
           String.valueOf(DEFAULT_MIN_COMMITS_TO_KEEP));
 
       return config;
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java 
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index b499ac9..293c3c0 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -763,35 +763,35 @@ public class HoodieWriteConfig extends 
DefaultHoodieConfig {
    * File listing metadata configs.
    */
   public boolean useFileListingMetadata() {
-    return 
Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_ENABLE));
+    return 
Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_ENABLE_PROP));
   }
 
   public boolean getFileListingMetadataVerify() {
-    return 
Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_VALIDATE));
+    return 
Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_VALIDATE_PROP));
   }
 
   public int getMetadataInsertParallelism() {
-    return 
Integer.parseInt(props.getProperty(HoodieMetadataConfig.INSERT_PARALLELISM));
+    return 
Integer.parseInt(props.getProperty(HoodieMetadataConfig.METADATA_INSERT_PARALLELISM_PROP));
   }
 
   public int getMetadataCompactDeltaCommitMax() {
-    return 
Integer.parseInt(props.getProperty(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS));
+    return 
Integer.parseInt(props.getProperty(HoodieMetadataConfig.METADATA_COMPACT_NUM_DELTA_COMMITS_PROP));
   }
 
   public boolean isMetadataAsyncClean() {
-    return 
Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.ASYNC_CLEAN));
+    return 
Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_ASYNC_CLEAN_PROP));
   }
 
   public int getMetadataMaxCommitsToKeep() {
-    return 
Integer.parseInt(props.getProperty(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP));
+    return 
Integer.parseInt(props.getProperty(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP_PROP));
   }
 
   public int getMetadataMinCommitsToKeep() {
-    return 
Integer.parseInt(props.getProperty(HoodieMetadataConfig.MIN_COMMITS_TO_KEEP));
+    return 
Integer.parseInt(props.getProperty(HoodieMetadataConfig.MIN_COMMITS_TO_KEEP_PROP));
   }
 
   public int getMetadataCleanerCommitsRetained() {
-    return 
Integer.parseInt(props.getProperty(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED));
+    return 
Integer.parseInt(props.getProperty(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED_PROP));
   }
 
   public static class Builder {
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
index 655890b..08a785d 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
@@ -99,7 +99,7 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
   private String tableName;
   private static Map<String, HoodieMetadataWriter> instances = new HashMap<>();
 
-  public static HoodieMetadataWriter instance(Configuration conf, 
HoodieWriteConfig writeConfig) {
+  public static HoodieMetadataWriter create(Configuration conf, 
HoodieWriteConfig writeConfig) {
     String key = writeConfig.getBasePath();
     if (instances.containsKey(key)) {
       if (instances.get(key).enabled() != 
writeConfig.useFileListingMetadata()) {
@@ -314,7 +314,6 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
       }
     }
     String createInstantTime = latestInstant.isPresent() ? 
latestInstant.get().getTimestamp() : SOLO_COMMIT_TIMESTAMP;
-
     LOG.info("Creating a new metadata table in " + metadataBasePath + " at 
instant " + createInstantTime);
     metaClient = HoodieTableMetaClient.initTableType(hadoopConf.get(), 
metadataBasePath.toString(),
         HoodieTableType.MERGE_ON_READ, tableName, "archived", 
HoodieMetadataPayload.class.getName(),
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 286e6db..471fc8d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -634,7 +634,7 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload> implements Seri
 
   public HoodieMetadataWriter metadata() {
     if (metadataWriter == null) {
-      metadataWriter = 
HoodieMetadataWriter.instance(hadoopConfiguration.get(), config);
+      metadataWriter = HoodieMetadataWriter.create(hadoopConfiguration.get(), 
config);
     }
 
     return metadataWriter;
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java 
b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
index 98675e4..62962d5 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
+++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
@@ -30,7 +30,6 @@ import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Random;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -54,6 +53,7 @@ import 
org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -73,6 +73,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 public class TestHoodieMetadata extends HoodieClientTestHarness {
   private static final Logger LOG = 
LogManager.getLogger(TestHoodieMetadata.class);
@@ -473,13 +474,12 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
    * Instants on Metadata Table should be archived as per config.
    * Metadata Table should be automatically compacted as per config.
    */
-  @Test
-  public void testArchivingAndCompaction() throws Exception {
+  @ParameterizedTest
+  @ValueSource(booleans =  {true, false})
+  public void testArchivingAndCompaction(boolean asyncClean) throws Exception {
     init(HoodieTableType.COPY_ON_WRITE);
 
     final int maxDeltaCommitsBeforeCompaction = 6;
-    // Test autoClean and asyncClean based on this flag which is randomly 
chosen.
-    boolean asyncClean = new Random().nextBoolean();
     HoodieWriteConfig config = getWriteConfigBuilder(true, true, false)
         .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
             .archiveCommitsWith(2, 4).retainCommits(1)
@@ -645,7 +645,7 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
       return;
     }
 
-    long t1 = System.currentTimeMillis();
+    HoodieTimer timer = new HoodieTimer().startTimer();
 
     // Validate write config for metadata table
     HoodieWriteConfig metadataWriteConfig = metadata.getWriteConfig();
@@ -751,11 +751,11 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
           + numFileVersions + " but was " + 
fsView.getAllFileSlices(partition).count());
     });
 
-    LOG.info("Validation time=" + (System.currentTimeMillis() - t1));
+    LOG.info("Validation time=" + timer.endTimer());
   }
 
   private HoodieMetadataWriter metadata(HoodieWriteClient client) {
-    return HoodieMetadataWriter.instance(hadoopConf, client.getConfig());
+    return HoodieMetadataWriter.create(hadoopConf, client.getConfig());
   }
 
   // TODO: this can be moved to TestHarness after merge from master
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
index cdda082..f9c2296 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.fs;
 
 import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
@@ -70,12 +71,19 @@ public class HoodieWrapperFileSystem extends FileSystem {
     create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, 
listFiles, read, write
   }
 
+  private static Registry METRICS_REGISTRY_DATA;
+  private static Registry METRICS_REGISTRY_META;
+
+  public static void setMetricsRegistry(Registry registry, Registry 
registryMeta) {
+    METRICS_REGISTRY_DATA = registry;
+    METRICS_REGISTRY_META = registryMeta;
+  }
+
+
   private ConcurrentMap<String, SizeAwareFSDataOutputStream> openStreams = new 
ConcurrentHashMap<>();
   private FileSystem fileSystem;
   private URI uri;
   private ConsistencyGuard consistencyGuard = new NoOpConsistencyGuard();
-  private static Registry metricsRegistry;
-  private static Registry metricsRegistryMetaFolder;
 
   @FunctionalInterface
   public interface CheckedFunction<R> {
@@ -84,17 +92,17 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   private static Registry getMetricRegistryForPath(Path p) {
     return ((p != null) && 
(p.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)))
-        ? metricsRegistryMetaFolder : metricsRegistry;
+        ? METRICS_REGISTRY_META : METRICS_REGISTRY_DATA;
   }
 
   protected static <R> R executeFuncWithTimeMetrics(String metricName, Path p, 
CheckedFunction<R> func) throws IOException {
-    long t1 = System.currentTimeMillis();
+    HoodieTimer timer = new HoodieTimer().startTimer();
     R res = func.get();
 
     Registry registry = getMetricRegistryForPath(p);
     if (registry != null) {
       registry.increment(metricName);
-      registry.add(metricName + ".totalDuration", System.currentTimeMillis() - 
t1);
+      registry.add(metricName + ".totalDuration", timer.endTimer());
     }
 
     return res;
@@ -110,11 +118,6 @@ public class HoodieWrapperFileSystem extends FileSystem {
     return executeFuncWithTimeMetrics(metricName, p, func);
   }
 
-  public static void setMetricsRegistry(Registry registry, Registry 
registryMeta) {
-    metricsRegistry = registry;
-    metricsRegistryMetaFolder = registryMeta;
-  }
-
   public HoodieWrapperFileSystem() {}
 
   public HoodieWrapperFileSystem(FileSystem fileSystem, ConsistencyGuard 
consistencyGuard) {
@@ -206,12 +209,10 @@ public class HoodieWrapperFileSystem extends FileSystem {
   }
 
   private FSDataInputStream wrapInputStream(final Path path, FSDataInputStream 
fsDataInputStream) throws IOException {
-    if (fsDataInputStream instanceof SizeAwareFSDataInputStream) {
+    if (fsDataInputStream instanceof TimedFSDataInputStream) {
       return fsDataInputStream;
     }
-
-    SizeAwareFSDataInputStream os = new SizeAwareFSDataInputStream(path, 
fsDataInputStream);
-    return os;
+    return new TimedFSDataInputStream(path, fsDataInputStream);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/TimedFSDataInputStream.java
similarity index 82%
rename from 
hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/fs/TimedFSDataInputStream.java
index f5adb0d..c621111 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/TimedFSDataInputStream.java
@@ -28,14 +28,14 @@ import java.nio.ByteBuffer;
 import java.util.EnumSet;
 
 /**
- * Wrapper over <code>FSDataInputStream</code> to keep track of the size of 
the written bytes.
+ * Wrapper over <code>FSDataInputStream</code> that also times the operations.
  */
-public class SizeAwareFSDataInputStream extends FSDataInputStream {
+public class TimedFSDataInputStream extends FSDataInputStream {
 
   // Path
   private final Path path;
 
-  public SizeAwareFSDataInputStream(Path path, FSDataInputStream in) throws 
IOException {
+  public TimedFSDataInputStream(Path path, FSDataInputStream in) {
     super(in);
     this.path = path;
   }
@@ -43,26 +43,20 @@ public class SizeAwareFSDataInputStream extends 
FSDataInputStream {
   @Override
   public int read(ByteBuffer buf) throws IOException {
     return 
HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
-        path, 0, () -> {
-            return super.read(buf);
-      });
+        path, 0, () -> super.read(buf));
   }
 
   @Override
   public int read(long position, byte[] buffer, int offset, int length) throws 
IOException {
     return 
HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
-        path, length, () -> {
-            return super.read(position, buffer, offset, length);
-      });
+        path, length, () -> super.read(position, buffer, offset, length));
   }
 
   @Override
   public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, 
EnumSet<ReadOption> opts)
           throws IOException, UnsupportedOperationException {
     return 
HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
-        path, maxLength, () -> {
-          return super.read(bufferPool, maxLength, opts);
-      });
+        path, maxLength, () -> super.read(bufferPool, maxLength, opts));
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java 
b/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
index 36aeab9..9383223 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
@@ -23,11 +23,11 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * Lightweight Metrics Registry to track Hudi events.
+ * Registry that tracks metrics local to a single jvm process.
  */
 public class LocalRegistry implements Registry {
   ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>();
-  private String name;
+  private final String name;
 
   public LocalRegistry(String name) {
     this.name = name;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java 
b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
index 0a56297..19822fb 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
@@ -30,7 +30,8 @@ import org.apache.hudi.common.util.ReflectionUtils;
  * Interface which defines a lightweight Metrics Registry to track Hudi events.
  */
 public interface Registry extends Serializable {
-  static ConcurrentHashMap<String, Registry> REGISTRYMAP = new 
ConcurrentHashMap<>();
+
+  ConcurrentHashMap<String, Registry> REGISTRY_MAP = new ConcurrentHashMap<>();
 
   /**
    * Get (or create) the registry for a provided name.
@@ -39,7 +40,7 @@ public interface Registry extends Serializable {
    *
    * @param registryName Name of the registry
    */
-  public static Registry getRegistry(String registryName) {
+  static Registry getRegistry(String registryName) {
     return getRegistry(registryName, LocalRegistry.class.getName());
   }
 
@@ -49,13 +50,13 @@ public interface Registry extends Serializable {
    * @param registryName Name of the registry.
    * @param clazz The fully qualified name of the registry class to create.
    */
-  public static Registry getRegistry(String registryName, String clazz) {
+  static Registry getRegistry(String registryName, String clazz) {
     synchronized (Registry.class) {
-      if (!REGISTRYMAP.containsKey(registryName)) {
+      if (!REGISTRY_MAP.containsKey(registryName)) {
         Registry registry = (Registry)ReflectionUtils.loadClass(clazz, 
registryName);
-        REGISTRYMAP.put(registryName, registry);
+        REGISTRY_MAP.put(registryName, registry);
       }
-      return REGISTRYMAP.get(registryName);
+      return REGISTRY_MAP.get(registryName);
     }
   }
 
@@ -66,10 +67,10 @@ public interface Registry extends Serializable {
    * @param prefixWithRegistryName prefix each metric name with the registry 
name.
    * @return
    */
-  public static Map<String, Long> getAllMetrics(boolean flush, boolean 
prefixWithRegistryName) {
+  static Map<String, Long> getAllMetrics(boolean flush, boolean 
prefixWithRegistryName) {
     synchronized (Registry.class) {
       HashMap<String, Long> allMetrics = new HashMap<>();
-      REGISTRYMAP.forEach((registryName, registry) -> {
+      REGISTRY_MAP.forEach((registryName, registry) -> {
         allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName));
         if (flush) {
           registry.clear();
@@ -82,14 +83,14 @@ public interface Registry extends Serializable {
   /**
    * Clear all metrics.
    */
-  public void clear();
+  void clear();
 
   /**
    * Increment the metric.
    *
    * @param name Name of the metric to increment.
    */
-  public void increment(String name);
+  void increment(String name);
 
   /**
    * Add value to the metric.
@@ -97,12 +98,12 @@ public interface Registry extends Serializable {
    * @param name Name of the metric.
    * @param value The value to add to the metrics.
    */
-  public void add(String name, long value);
+  void add(String name, long value);
 
   /**
    * Get all Counter type metrics.
    */
-  public default Map<String, Long> getAllCounts() {
+  default Map<String, Long> getAllCounts() {
     return getAllCounts(false);
   }
 
@@ -111,5 +112,5 @@ public interface Registry extends Serializable {
    *
    * @param prefixWithRegistryName If true, the names of all metrics are 
prefixed with name of this registry.
    */
-  public abstract Map<String, Long> getAllCounts(boolean 
prefixWithRegistryName);
+  Map<String, Long> getAllCounts(boolean prefixWithRegistryName);
 }
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index d7ef2d3..7c96a0f 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -19,7 +19,7 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.fs.SizeAwareFSDataInputStream;
+import org.apache.hudi.common.fs.TimedFSDataInputStream;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
@@ -73,7 +73,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
       boolean readBlockLazily, boolean reverseReader) throws IOException {
     FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), 
bufferSize);
     if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
-      this.inputStream = new SizeAwareFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
+      this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
           new BufferedFSInputStream((FSInputStream) 
fsDataInputStream.getWrappedStream(), bufferSize)));
     } else {
       // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java
index fed5516..c14f402 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java
@@ -52,6 +52,7 @@ import 
org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SpillableMapUtils;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -133,9 +134,6 @@ public class HoodieMetadataReader implements Serializable {
 
   /**
    * Create a the Metadata Table in read-only mode.
-   *
-   * @param hadoopConf {@code Configuration}
-   * @param basePath The basePath for the dataset
    */
   public HoodieMetadataReader(Configuration conf, String datasetBasePath, 
String spillableMapDirectory,
                               boolean enabled, boolean validateLookups) {
@@ -144,9 +142,6 @@ public class HoodieMetadataReader implements Serializable {
 
   /**
    * Create a the Metadata Table in read-only mode.
-   *
-   * @param hadoopConf {@code Configuration}
-   * @param basePath The basePath for the dataset
    */
   public HoodieMetadataReader(Configuration conf, String datasetBasePath, 
String spillableMapDirectory,
                               boolean enabled, boolean validateLookups, 
boolean enableMetrics) {
@@ -230,9 +225,9 @@ public class HoodieMetadataReader implements Serializable {
    * Returns a list of all partitions.
    */
   protected List<String> getAllPartitionPaths() throws IOException {
-    long t1 = System.currentTimeMillis();
+    HoodieTimer timer = new HoodieTimer().startTimer();
     Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = 
getMergedRecordByKey(RECORDKEY_PARTITION_LIST);
-    updateMetrics(LOOKUP_PARTITIONS_STR, System.currentTimeMillis() - t1);
+    updateMetrics(LOOKUP_PARTITIONS_STR, timer.endTimer());
 
     List<String> partitions = Collections.emptyList();
     if (hoodieRecord.isPresent()) {
@@ -251,9 +246,9 @@ public class HoodieMetadataReader implements Serializable {
 
     if (validateLookups) {
       // Validate the Metadata Table data by listing the partitions from the 
file system
-      t1 = System.currentTimeMillis();
+      timer.startTimer();
       List<String> actualPartitions  = 
getAllPartitionPathsByListing(metaClient.getFs(), datasetBasePath, false);
-      updateMetrics(VALIDATE_PARTITIONS_STR, System.currentTimeMillis() - t1);
+      updateMetrics(VALIDATE_PARTITIONS_STR, timer.endTimer());
 
       Collections.sort(actualPartitions);
       Collections.sort(partitions);
@@ -284,9 +279,9 @@ public class HoodieMetadataReader implements Serializable {
       partitionName = NON_PARTITIONED_NAME;
     }
 
-    long t1 = System.currentTimeMillis();
+    HoodieTimer timer = new HoodieTimer().startTimer();
     Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = 
getMergedRecordByKey(partitionName);
-    updateMetrics(LOOKUP_FILES_STR, System.currentTimeMillis() - t1);
+    updateMetrics(LOOKUP_FILES_STR, timer.endTimer());
 
     FileStatus[] statuses = {};
     if (hoodieRecord.isPresent()) {
@@ -299,20 +294,21 @@ public class HoodieMetadataReader implements Serializable 
{
 
     if (validateLookups) {
       // Validate the Metadata Table data by listing the partitions from the 
file system
-      t1 = System.currentTimeMillis();
+      timer.startTimer();
 
       // Ignore partition metadata file
       FileStatus[] directStatuses = 
metaClient.getFs().listStatus(partitionPath,
           p -> 
!p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
-      updateMetrics(VALIDATE_FILES_STR, System.currentTimeMillis() - t1);
+      updateMetrics(VALIDATE_FILES_STR, timer.endTimer());
 
       List<String> directFilenames = Arrays.stream(directStatuses)
-          .map(s -> s.getPath().getName()).collect(Collectors.toList());
+          .map(s -> s.getPath().getName()).sorted()
+          .collect(Collectors.toList());
+
       List<String> metadataFilenames = Arrays.stream(statuses)
-          .map(s -> s.getPath().getName()).collect(Collectors.toList());
+          .map(s -> s.getPath().getName()).sorted()
+          .collect(Collectors.toList());
 
-      Collections.sort(metadataFilenames);
-      Collections.sort(directFilenames);
       if (!metadataFilenames.equals(directFilenames)) {
         LOG.error("Validation of metadata file listing for partition " + 
partitionName + " failed.");
         LOG.error("File list from metadata: " + 
Arrays.toString(metadataFilenames.toArray()));
@@ -350,12 +346,12 @@ public class HoodieMetadataReader implements Serializable 
{
     // Retrieve record from base file
     HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
     if (basefileReader != null) {
-      long t1 = System.currentTimeMillis();
+      HoodieTimer timer = new HoodieTimer().startTimer();
       Option<GenericRecord> baseRecord = basefileReader.getRecordByKey(key);
       if (baseRecord.isPresent()) {
         hoodieRecord = 
SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) baseRecord.get(),
             metaClient.getTableConfig().getPayloadClass());
-        updateMetrics(BASEFILE_READ_STR, System.currentTimeMillis() - t1);
+        updateMetrics(BASEFILE_READ_STR, timer.endTimer());
       }
     }
 

Reply via email to