This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e8f16989b952588461bd38d1cd5a4679c29e259f
Author: Prashant Wason <[email protected]>
AuthorDate: Fri Oct 16 14:26:07 2020 -0700

    [RFC-15] Added a distributed metrics registry for spark which can be used 
to collect metrics from executors.
    
    This helps create a stats dashboard which shows the metadata table 
improvements in real-time for production tables.
---
 .../apache/hudi/client/AbstractHoodieClient.java   |  24 ++++-
 .../hudi/client/AbstractHoodieWriteClient.java     |   5 +
 .../org/apache/hudi/client/HoodieWriteClient.java  |  27 +++--
 .../apache/hudi/config/HoodieMetricsConfig.java    |   8 ++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +
 .../apache/hudi/metrics/DistributedRegistry.java   | 107 ++++++++++++++++++++
 .../hudi/common/fs/HoodieWrapperFileSystem.java    |  25 +++--
 .../hudi/common/fs/SizeAwareFSDataInputStream.java |   8 --
 .../common/fs/SizeAwareFSDataOutputStream.java     |   7 --
 .../metrics/{Registry.java => LocalRegistry.java}  |  60 +++---------
 .../org/apache/hudi/common/metrics/Registry.java   | 109 ++++++++++++---------
 11 files changed, 250 insertions(+), 134 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java 
b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
index cdd125e..fd02b6d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
@@ -22,10 +22,12 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.utils.ClientUtils;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
-
+import org.apache.hudi.metrics.DistributedRegistry;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -122,6 +124,26 @@ public abstract class AbstractHoodieClient implements 
Serializable, AutoCloseabl
   }
 
   protected HoodieTableMetaClient createMetaClient(boolean 
loadActiveTimelineOnLoad) {
+    if (config.isMetricsOn()) {
+      Registry registry;
+      Registry registryMeta;
+
+      if (config.isExecutorMetricsEnabled()) {
+        // Create a distributed registry for HoodieWrapperFileSystem
+        registry = 
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName(),
+            DistributedRegistry.class.getName());
+        ((DistributedRegistry)registry).register(jsc);
+        registryMeta = 
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + 
"MetaFolder",
+            DistributedRegistry.class.getName());
+        ((DistributedRegistry)registryMeta).register(jsc);
+      } else {
+        registry = 
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName());
+        registryMeta = 
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + 
"MetaFolder");
+      }
+
+      HoodieWrapperFileSystem.setMetricsRegistry(registry, registryMeta);
+    }
+
     return ClientUtils.createMetaClient(hadoopConf, config, 
loadActiveTimelineOnLoad);
   }
 }
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
 
b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index 4566fe8..a4015a6 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -232,6 +232,11 @@ public abstract class AbstractHoodieWriteClient<T extends 
HoodieRecordPayload> e
     return table;
   }
 
+  protected HoodieTable<T> getTable() {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    return HoodieTable.create(metaClient, config, hadoopConf);
+  }
+
   /**
    * Sets write schema from last instant since deletes may not have schema set 
in the config.
    */
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java 
b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
index 5fa72d4..a0019b0 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java
@@ -146,7 +146,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> 
hoodieRecords) {
     // Create a Hoodie table which encapsulated the commits and files visible
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     Timer.Context indexTimer = metrics.getIndexCtx();
     JavaRDD<HoodieRecord<T>> recordsWithLocation = 
getIndex().tagLocation(hoodieRecords, jsc, table);
     metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer 
== null ? 0L : indexTimer.stop()));
@@ -169,7 +169,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   protected void rollBackInflightBootstrap() {
     LOG.info("Rolling back pending bootstrap if present");
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     HoodieTimeline inflightTimeline = 
table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
     Option<String> instant = Option.fromJavaOptional(
         
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
@@ -438,7 +438,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * @param comment - Comment for the savepoint
    */
   public void savepoint(String user, String comment) {
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     if (table.getCompletedCommitsTimeline().empty()) {
       throw new HoodieSavepointException("Could not savepoint. Commit timeline 
is empty");
     }
@@ -462,7 +462,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * @param comment - Comment for the savepoint
    */
   public void savepoint(String instantTime, String user, String comment) {
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     table.savepoint(jsc, instantTime, user, comment);
   }
 
@@ -474,7 +474,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * @return true if the savepoint was deleted successfully
    */
   public void deleteSavepoint(String savepointTime) {
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     SavepointHelpers.deleteSavepoint(table, savepointTime);
   }
 
@@ -489,7 +489,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * @return true if the savepoint was restored to successfully
    */
   public void restoreToSavepoint(String savepointTime) {
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     SavepointHelpers.validateSavepointPresence(table, savepointTime);
     restoreToInstant(savepointTime);
     SavepointHelpers.validateSavepointRestore(table, savepointTime);
@@ -506,7 +506,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     final String rollbackInstantTime = 
HoodieActiveTimeline.createNewInstantTime();
     final Timer.Context context = this.metrics.getRollbackCtx();
     try {
-      HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+      HoodieTable<T> table = getTable();
       Option<HoodieInstant> commitInstantOpt = 
Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants()
               .filter(instant -> 
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime))
               .findFirst());
@@ -537,7 +537,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     final String restoreInstantTime = 
HoodieActiveTimeline.createNewInstantTime();
     Timer.Context context = metrics.getRollbackCtx();
     try {
-      HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+      HoodieTable<T> table = getTable();
       HoodieRestoreMetadata restoreMetadata = table.restore(jsc, 
restoreInstantTime, instantTime);
       if (context != null) {
         final long durationInMs = metrics.getDurationInMs(context.stop());
@@ -571,7 +571,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   public HoodieCleanMetadata clean(String cleanInstantTime) throws 
HoodieIOException {
     LOG.info("Cleaner started");
     final Timer.Context context = metrics.getCleanCtx();
-    HoodieCleanMetadata metadata = HoodieTable.create(config, 
hadoopConf).clean(jsc, cleanInstantTime);
+    HoodieCleanMetadata metadata = getTable().clean(jsc, cleanInstantTime);
     if (context != null && metadata != null) {
       long durationMs = metrics.getDurationInMs(context.stop());
       metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
@@ -660,8 +660,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   public boolean scheduleCompactionAtInstant(String instantTime, 
Option<Map<String, String>> extraMetadata) throws HoodieIOException {
     LOG.info("Scheduling compaction at instant time :" + instantTime);
-    Option<HoodieCompactionPlan> plan = HoodieTable.create(config, hadoopConf)
-        .scheduleCompaction(jsc, instantTime, extraMetadata);
+    Option<HoodieCompactionPlan> plan = getTable().scheduleCompaction(jsc, 
instantTime, extraMetadata);
     return plan.isPresent();
   }
 
@@ -684,7 +683,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   public void commitCompaction(String compactionInstantTime, 
JavaRDD<WriteStatus> writeStatuses,
                                Option<Map<String, String>> extraMetadata) 
throws IOException {
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     HoodieCommitMetadata metadata = CompactHelpers.createCompactionMetadata(
         table, compactionInstantTime, writeStatuses, config.getSchema());
     extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
@@ -733,7 +732,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * Cleanup all pending commits.
    */
   private void rollbackPendingCommits() {
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     HoodieTimeline inflightTimeline = 
table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
     List<String> commits = 
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
         .collect(Collectors.toList());
@@ -755,7 +754,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    * @return RDD of Write Status
    */
   private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean 
shouldComplete) {
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
+    HoodieTable<T> table = getTable();
     HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
     HoodieInstant inflightInstant = 
HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
     if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java 
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
index 800c75f..b6cb6e5 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java
@@ -62,6 +62,9 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig {
   public static final String METRICS_REPORTER_CLASS = METRIC_PREFIX + 
".reporter.class";
   public static final String DEFAULT_METRICS_REPORTER_CLASS = "";
 
+  // Enable metrics collection from executors
+  public static final String ENABLE_EXECUTOR_METRICS = METRIC_PREFIX + 
".executor.enable";
+
   private HoodieMetricsConfig(Properties props) {
     super(props);
   }
@@ -126,6 +129,11 @@ public class HoodieMetricsConfig extends 
DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withExecutorMetrics(boolean enable) {
+      props.setProperty(ENABLE_EXECUTOR_METRICS, String.valueOf(enable));
+      return this;
+    }
+
     public HoodieMetricsConfig build() {
       HoodieMetricsConfig config = new HoodieMetricsConfig(props);
       setDefaultOnCondition(props, !props.containsKey(METRICS_ON), METRICS_ON, 
String.valueOf(DEFAULT_METRICS_ON));
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java 
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 527f18f..82199ff 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -686,6 +686,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
     return 
Boolean.parseBoolean(props.getProperty(HoodieMetricsPrometheusConfig.PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX));
   }
 
+  public boolean isExecutorMetricsEnabled() {
+    return 
Boolean.parseBoolean(props.getProperty(HoodieMetricsConfig.ENABLE_EXECUTOR_METRICS,
 "false"));
+  }
+
   /**
    * memory configs.
    */
diff --git 
a/hudi-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java 
b/hudi-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
new file mode 100644
index 0000000..5197a22
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metrics;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.util.AccumulatorV2;
+
+/**
+ * Lightweight Metrics Registry to track Hudi events.
+ */
+public class DistributedRegistry extends AccumulatorV2<Map<String, Long>, 
Map<String, Long>>
+    implements Registry, Serializable {
+  private String name;
+  ConcurrentHashMap<String, Long> counters = new ConcurrentHashMap<>();
+
+  public DistributedRegistry(String name) {
+    this.name = name;
+  }
+
+  public void register(JavaSparkContext jsc) {
+    if (!isRegistered()) {
+      jsc.sc().register(this);
+    }
+  }
+
+  @Override
+  public void clear() {
+    counters.clear();
+  }
+
+  @Override
+  public void increment(String name) {
+    counters.merge(name,  1L, (oldValue, newValue) -> oldValue + newValue);
+  }
+
+  @Override
+  public void add(String name, long value) {
+    counters.merge(name,  value, (oldValue, newValue) -> oldValue + newValue);
+  }
+
+  /**
+   * Get all Counter type metrics.
+   */
+  @Override
+  public Map<String, Long> getAllCounts(boolean prefixWithRegistryName) {
+    HashMap<String, Long> countersMap = new HashMap<>();
+    counters.forEach((k, v) -> {
+      String key = prefixWithRegistryName ? name + "." + k : k;
+      countersMap.put(key, v);
+    });
+    return countersMap;
+  }
+
+  @Override
+  public void add(Map<String, Long> arg) {
+    arg.forEach((key, value) -> add(key, value));
+  }
+
+  @Override
+  public AccumulatorV2<Map<String, Long>, Map<String, Long>> copy() {
+    DistributedRegistry registry = new DistributedRegistry(name);
+    counters.forEach((key, value) -> registry.add(key, value));
+    return registry;
+  }
+
+  @Override
+  public boolean isZero() {
+    return counters.isEmpty();
+  }
+
+  @Override
+  public void merge(AccumulatorV2<Map<String, Long>, Map<String, Long>> acc) {
+    acc.value().forEach((key, value) -> add(key, value));
+  }
+
+  @Override
+  public void reset() {
+    counters.clear();
+  }
+
+  @Override
+  public Map<String, Long> value() {
+    return counters;
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
index 9cea356..cdda082 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
@@ -74,9 +74,8 @@ public class HoodieWrapperFileSystem extends FileSystem {
   private FileSystem fileSystem;
   private URI uri;
   private ConsistencyGuard consistencyGuard = new NoOpConsistencyGuard();
-  private static Registry metricsRegistry = 
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName());
-  private static Registry metricsRegistryMetaFolder =
-      Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + 
"MetaFolder");
+  private static Registry metricsRegistry;
+  private static Registry metricsRegistryMetaFolder;
 
   @FunctionalInterface
   public interface CheckedFunction<R> {
@@ -89,12 +88,14 @@ public class HoodieWrapperFileSystem extends FileSystem {
   }
 
   protected static <R> R executeFuncWithTimeMetrics(String metricName, Path p, 
CheckedFunction<R> func) throws IOException {
-    Registry registry = getMetricRegistryForPath(p);
-
     long t1 = System.currentTimeMillis();
     R res = func.get();
-    registry.increment(metricName);
-    registry.add(metricName + ".totalDuration", System.currentTimeMillis() - 
t1);
+
+    Registry registry = getMetricRegistryForPath(p);
+    if (registry != null) {
+      registry.increment(metricName);
+      registry.add(metricName + ".totalDuration", System.currentTimeMillis() - 
t1);
+    }
 
     return res;
   }
@@ -102,10 +103,18 @@ public class HoodieWrapperFileSystem extends FileSystem {
   protected static <R> R executeFuncWithTimeAndByteMetrics(String metricName, 
Path p, long byteCount,
                                                            CheckedFunction<R> 
func) throws IOException {
     Registry registry = getMetricRegistryForPath(p);
-    registry.add(metricName + ".totalBytes", byteCount);
+    if (registry != null) {
+      registry.add(metricName + ".totalBytes", byteCount);
+    }
+
     return executeFuncWithTimeMetrics(metricName, p, func);
   }
 
+  public static void setMetricsRegistry(Registry registry, Registry 
registryMeta) {
+    metricsRegistry = registry;
+    metricsRegistryMetaFolder = registryMeta;
+  }
+
   public HoodieWrapperFileSystem() {}
 
   public HoodieWrapperFileSystem(FileSystem fileSystem, ConsistencyGuard 
consistencyGuard) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
index 684a625..f5adb0d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.fs;
 
-import org.apache.hudi.common.metrics.Registry;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.ReadOption;
@@ -35,8 +34,6 @@ public class SizeAwareFSDataInputStream extends 
FSDataInputStream {
 
   // Path
   private final Path path;
-  // Registry or read and write metrics
-  Registry metricsRegistry;
 
   public SizeAwareFSDataInputStream(Path path, FSDataInputStream in) throws 
IOException {
     super(in);
@@ -85,9 +82,4 @@ public class SizeAwareFSDataInputStream extends 
FSDataInputStream {
           return null;
         });
   }
-
-  public void setMetricRegistry(Registry metricsRegistry) {
-    this.metricsRegistry = metricsRegistry;
-  }
-
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java
index e0607d0..ac07cd7 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.common.fs;
 
-import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -42,8 +41,6 @@ public class SizeAwareFSDataOutputStream extends 
FSDataOutputStream {
   private final Path path;
   // Consistency guard
   private final ConsistencyGuard consistencyGuard;
-  // Registry or read and write metrics
-  Registry metricsRegistry;
 
   public SizeAwareFSDataOutputStream(Path path, FSDataOutputStream out, 
ConsistencyGuard consistencyGuard,
       Runnable closeCallback) throws IOException {
@@ -87,8 +84,4 @@ public class SizeAwareFSDataOutputStream extends 
FSDataOutputStream {
   public long getBytesWritten() {
     return bytesWritten.get();
   }
-
-  public void setMetricRegistry(Registry metricsRegistry) {
-    this.metricsRegistry = metricsRegistry;
-  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java 
b/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
similarity index 60%
copy from hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
copy to 
hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
index 169e8bc..36aeab9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java
@@ -22,76 +22,36 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-
 /**
  * Lightweight Metrics Registry to track Hudi events.
  */
-public class Registry {
+public class LocalRegistry implements Registry {
   ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>();
-  final String name;
-
-  private static ConcurrentHashMap<String, Registry> registryMap = new 
ConcurrentHashMap<>();
+  private String name;
 
-  private Registry(String name) {
+  public LocalRegistry(String name) {
     this.name = name;
   }
 
-  /**
-   * Get (or create) the registry for a provided name.
-   */
-  public static synchronized Registry getRegistry(String registryName) {
-    if (!registryMap.containsKey(registryName)) {
-      registryMap.put(registryName, new Registry(registryName));
-    }
-    return registryMap.get(registryName);
-  }
-
-  /**
-   * Get all registered metrics.
-   * @param flush clean all metrics as part of this operation.
-   * @param prefixWithRegistryName prefix each metric name with the registry 
name.
-   * @return
-   */
-  public static synchronized Map<String, Long> getAllMetrics(boolean flush, 
boolean prefixWithRegistryName) {
-    HashMap<String, Long> allMetrics = new HashMap<>();
-    registryMap.forEach((registryName, registry) -> {
-      allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName));
-      if (flush) {
-        registry.clear();
-      }
-    });
-    return allMetrics;
-  }
-
+  @Override
   public void clear() {
     counters.clear();
   }
 
+  @Override
   public void increment(String name) {
     getCounter(name).increment();
   }
 
+  @Override
   public void add(String name, long value) {
     getCounter(name).add(value);
   }
 
-  private synchronized Counter getCounter(String name) {
-    if (!counters.containsKey(name)) {
-      counters.put(name, new Counter());
-    }
-    return counters.get(name);
-  }
-
-  /**
-   * Get all Counter type metrics.
-   */
-  public Map<String, Long> getAllCounts() {
-    return getAllCounts(false);
-  }
-
   /**
    * Get all Counter type metrics.
    */
+  @Override
   public Map<String, Long> getAllCounts(boolean prefixWithRegistryName) {
     HashMap<String, Long> countersMap = new HashMap<>();
     counters.forEach((k, v) -> {
@@ -101,4 +61,10 @@ public class Registry {
     return countersMap;
   }
 
+  private synchronized Counter getCounter(String name) {
+    if (!counters.containsKey(name)) {
+      counters.put(name, new Counter());
+    }
+    return counters.get(name);
+  }
 }
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java 
b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
index 169e8bc..0a56297 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java
@@ -18,87 +18,98 @@
 
 package org.apache.hudi.common.metrics;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hudi.common.util.ReflectionUtils;
+
 
 /**
- * Lightweight Metrics Registry to track Hudi events.
+ * Interface which defines a lightweight Metrics Registry to track Hudi events.
  */
-public class Registry {
-  ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>();
-  final String name;
-
-  private static ConcurrentHashMap<String, Registry> registryMap = new 
ConcurrentHashMap<>();
+public interface Registry extends Serializable {
+  static ConcurrentHashMap<String, Registry> REGISTRYMAP = new 
ConcurrentHashMap<>();
 
-  private Registry(String name) {
-    this.name = name;
+  /**
+   * Get (or create) the registry for a provided name.
+   *
+   * This function creates a {@code LocalRegistry}.
+   *
+   * @param registryName Name of the registry
+   */
+  public static Registry getRegistry(String registryName) {
+    return getRegistry(registryName, LocalRegistry.class.getName());
   }
 
   /**
-   * Get (or create) the registry for a provided name.
+   * Get (or create) the registry for a provided name and given class.
+   *
+   * @param registryName Name of the registry.
+   * @param clazz The fully qualified name of the registry class to create.
    */
-  public static synchronized Registry getRegistry(String registryName) {
-    if (!registryMap.containsKey(registryName)) {
-      registryMap.put(registryName, new Registry(registryName));
+  public static Registry getRegistry(String registryName, String clazz) {
+    synchronized (Registry.class) {
+      if (!REGISTRYMAP.containsKey(registryName)) {
+        Registry registry = (Registry)ReflectionUtils.loadClass(clazz, 
registryName);
+        REGISTRYMAP.put(registryName, registry);
+      }
+      return REGISTRYMAP.get(registryName);
     }
-    return registryMap.get(registryName);
   }
 
   /**
    * Get all registered metrics.
-   * @param flush clean all metrics as part of this operation.
+   *
+   * @param flush clear all metrics after this operation.
    * @param prefixWithRegistryName prefix each metric name with the registry 
name.
    * @return
    */
-  public static synchronized Map<String, Long> getAllMetrics(boolean flush, 
boolean prefixWithRegistryName) {
-    HashMap<String, Long> allMetrics = new HashMap<>();
-    registryMap.forEach((registryName, registry) -> {
-      allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName));
-      if (flush) {
-        registry.clear();
-      }
-    });
-    return allMetrics;
-  }
-
-  public void clear() {
-    counters.clear();
+  public static Map<String, Long> getAllMetrics(boolean flush, boolean 
prefixWithRegistryName) {
+    synchronized (Registry.class) {
+      HashMap<String, Long> allMetrics = new HashMap<>();
+      REGISTRYMAP.forEach((registryName, registry) -> {
+        allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName));
+        if (flush) {
+          registry.clear();
+        }
+      });
+      return allMetrics;
+    }
   }
 
-  public void increment(String name) {
-    getCounter(name).increment();
-  }
+  /**
+   * Clear all metrics.
+   */
+  public void clear();
 
-  public void add(String name, long value) {
-    getCounter(name).add(value);
-  }
+  /**
+   * Increment the metric.
+   *
+   * @param name Name of the metric to increment.
+   */
+  public void increment(String name);
 
-  private synchronized Counter getCounter(String name) {
-    if (!counters.containsKey(name)) {
-      counters.put(name, new Counter());
-    }
-    return counters.get(name);
-  }
+  /**
+   * Add value to the metric.
+   *
+   * @param name Name of the metric.
+   * @param value The value to add to the metrics.
+   */
+  public void add(String name, long value);
 
   /**
    * Get all Counter type metrics.
    */
-  public Map<String, Long> getAllCounts() {
+  public default Map<String, Long> getAllCounts() {
     return getAllCounts(false);
   }
 
   /**
    * Get all Counter type metrics.
+   *
+   * @param prefixWithRegistryName If true, the names of all metrics are 
prefixed with name of this registry.
    */
-  public Map<String, Long> getAllCounts(boolean prefixWithRegistryName) {
-    HashMap<String, Long> countersMap = new HashMap<>();
-    counters.forEach((k, v) -> {
-      String key = prefixWithRegistryName ? name + "." + k : k;
-      countersMap.put(key, v.getValue());
-    });
-    return countersMap;
-  }
-
+  public abstract Map<String, Long> getAllCounts(boolean 
prefixWithRegistryName);
 }
\ No newline at end of file

Reply via email to