This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch rfc-15 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit e8f16989b952588461bd38d1cd5a4679c29e259f Author: Prashant Wason <[email protected]> AuthorDate: Fri Oct 16 14:26:07 2020 -0700 [RFC-15] Added a distributed metrics registry for spark which can be used to collect metrics from executors. This helps create a stats dashboard which shows the metadata table improvements in real-time for production tables. --- .../apache/hudi/client/AbstractHoodieClient.java | 24 ++++- .../hudi/client/AbstractHoodieWriteClient.java | 5 + .../org/apache/hudi/client/HoodieWriteClient.java | 27 +++-- .../apache/hudi/config/HoodieMetricsConfig.java | 8 ++ .../org/apache/hudi/config/HoodieWriteConfig.java | 4 + .../apache/hudi/metrics/DistributedRegistry.java | 107 ++++++++++++++++++++ .../hudi/common/fs/HoodieWrapperFileSystem.java | 25 +++-- .../hudi/common/fs/SizeAwareFSDataInputStream.java | 8 -- .../common/fs/SizeAwareFSDataOutputStream.java | 7 -- .../metrics/{Registry.java => LocalRegistry.java} | 60 +++--------- .../org/apache/hudi/common/metrics/Registry.java | 109 ++++++++++++--------- 11 files changed, 250 insertions(+), 134 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java index cdd125e..fd02b6d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java @@ -22,10 +22,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.utils.ClientUtils; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; - +import org.apache.hudi.metrics.DistributedRegistry; import org.apache.hadoop.fs.FileSystem; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -122,6 +124,26 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl } protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) { + if (config.isMetricsOn()) { + Registry registry; + Registry registryMeta; + + if (config.isExecutorMetricsEnabled()) { + // Create a distributed registry for HoodieWrapperFileSystem + registry = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName(), + DistributedRegistry.class.getName()); + ((DistributedRegistry)registry).register(jsc); + registryMeta = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + "MetaFolder", + DistributedRegistry.class.getName()); + ((DistributedRegistry)registryMeta).register(jsc); + } else { + registry = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName()); + registryMeta = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + "MetaFolder"); + } + + HoodieWrapperFileSystem.setMetricsRegistry(registry, registryMeta); + } + return ClientUtils.createMetaClient(hadoopConf, config, loadActiveTimelineOnLoad); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 4566fe8..a4015a6 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -232,6 +232,11 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e return table; } + protected HoodieTable<T> getTable() { + HoodieTableMetaClient metaClient = createMetaClient(true); + return HoodieTable.create(metaClient, config, hadoopConf); + } + /** * Sets write schema from last instant since deletes may not have schema set in the config. */ diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index 5fa72d4..a0019b0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -146,7 +146,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo */ public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> hoodieRecords) { // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable<T> table = HoodieTable.create(config, hadoopConf); + HoodieTable<T> table = getTable(); Timer.Context indexTimer = metrics.getIndexCtx(); JavaRDD<HoodieRecord<T>> recordsWithLocation = getIndex().tagLocation(hoodieRecords, jsc, table); metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop())); @@ -169,7 +169,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo */ protected void rollBackInflightBootstrap() { LOG.info("Rolling back pending bootstrap if present"); - HoodieTable<T> table = HoodieTable.create(config, hadoopConf); + HoodieTable<T> table = getTable(); HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); Option<String> instant = Option.fromJavaOptional( inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst()); @@ -438,7 +438,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * @param comment - Comment for the savepoint */ public void savepoint(String user, String comment) { - HoodieTable<T> table = HoodieTable.create(config, hadoopConf); + HoodieTable<T> table = getTable(); if (table.getCompletedCommitsTimeline().empty()) { throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty"); } @@ -462,7 +462,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * @param comment - Comment for the savepoint */ public void savepoint(String instantTime, String user, String comment) { - HoodieTable<T> table = HoodieTable.create(config, hadoopConf); + HoodieTable<T> table = getTable(); table.savepoint(jsc, instantTime, user, comment); } @@ -474,7 +474,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * @return true if the savepoint was deleted successfully */ public void deleteSavepoint(String savepointTime) { - HoodieTable<T> table = HoodieTable.create(config, hadoopConf); + HoodieTable<T> table = getTable(); SavepointHelpers.deleteSavepoint(table, savepointTime); } @@ -489,7 +489,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * @return true if the savepoint was restored to successfully */ public void restoreToSavepoint(String savepointTime) { - HoodieTable<T> table = HoodieTable.create(config, hadoopConf); + HoodieTable<T> table = getTable(); SavepointHelpers.validateSavepointPresence(table, savepointTime); restoreToInstant(savepointTime); SavepointHelpers.validateSavepointRestore(table, savepointTime); @@ -506,7 +506,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo final String rollbackInstantTime = HoodieActiveTimeline.createNewInstantTime(); final Timer.Context context = this.metrics.getRollbackCtx(); try { - HoodieTable<T> table = HoodieTable.create(config, hadoopConf); + HoodieTable<T> table = getTable(); Option<HoodieInstant> commitInstantOpt = Option.fromJavaOptional(table.getActiveTimeline().getCommitsTimeline().getInstants() .filter(instant -> HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), commitInstantTime)) .findFirst()); @@ -537,7 +537,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo final String restoreInstantTime = HoodieActiveTimeline.createNewInstantTime(); Timer.Context context = metrics.getRollbackCtx(); try { - HoodieTable<T> table = HoodieTable.create(config, hadoopConf); + HoodieTable<T> table = getTable(); HoodieRestoreMetadata restoreMetadata = table.restore(jsc, restoreInstantTime, instantTime); if (context != null) { final long durationInMs = metrics.getDurationInMs(context.stop()); @@ -571,7 +571,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException { LOG.info("Cleaner started"); final Timer.Context context = metrics.getCleanCtx(); - HoodieCleanMetadata metadata = HoodieTable.create(config, hadoopConf).clean(jsc, cleanInstantTime); + HoodieCleanMetadata metadata = getTable().clean(jsc, cleanInstantTime); if (context != null && metadata != null) { long durationMs = metrics.getDurationInMs(context.stop()); metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted()); @@ -660,8 +660,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo */ public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException { LOG.info("Scheduling compaction at instant time :" + instantTime); - Option<HoodieCompactionPlan> plan = HoodieTable.create(config, hadoopConf) - .scheduleCompaction(jsc, instantTime, extraMetadata); + Option<HoodieCompactionPlan> plan = getTable().scheduleCompaction(jsc, instantTime, extraMetadata); return plan.isPresent(); } @@ -684,7 +683,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo */ public void commitCompaction(String compactionInstantTime, JavaRDD<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata) throws IOException { - HoodieTable<T> table = HoodieTable.create(config, hadoopConf); + HoodieTable<T> table = getTable(); HoodieCommitMetadata metadata = CompactHelpers.createCompactionMetadata( table, compactionInstantTime, writeStatuses, config.getSchema()); extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata)); @@ -733,7 +732,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * Cleanup all pending commits. */ private void rollbackPendingCommits() { - HoodieTable<T> table = HoodieTable.create(config, hadoopConf); + HoodieTable<T> table = getTable(); HoodieTimeline inflightTimeline = table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction(); List<String> commits = inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp) .collect(Collectors.toList()); @@ -755,7 +754,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo * @return RDD of Write Status */ private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean shouldComplete) { - HoodieTable<T> table = HoodieTable.create(config, hadoopConf); + HoodieTable<T> table = getTable(); HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline(); HoodieInstant inflightInstant = HoodieTimeline.getCompactionInflightInstant(compactionInstantTime); if (pendingCompactionTimeline.containsInstant(inflightInstant)) { diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java index 800c75f..b6cb6e5 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetricsConfig.java @@ -62,6 +62,9 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig { public static final String METRICS_REPORTER_CLASS = METRIC_PREFIX + ".reporter.class"; public static final String DEFAULT_METRICS_REPORTER_CLASS = ""; + // Enable metrics collection from executors + public static final String ENABLE_EXECUTOR_METRICS = METRIC_PREFIX + ".executor.enable"; + private HoodieMetricsConfig(Properties props) { super(props); } @@ -126,6 +129,11 @@ public class HoodieMetricsConfig extends DefaultHoodieConfig { return this; } + public Builder withExecutorMetrics(boolean enable) { + props.setProperty(ENABLE_EXECUTOR_METRICS, String.valueOf(enable)); + return this; + } + public HoodieMetricsConfig build() { HoodieMetricsConfig config = new HoodieMetricsConfig(props); setDefaultOnCondition(props, !props.containsKey(METRICS_ON), METRICS_ON, String.valueOf(DEFAULT_METRICS_ON)); diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 527f18f..82199ff 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -686,6 +686,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Boolean.parseBoolean(props.getProperty(HoodieMetricsPrometheusConfig.PUSHGATEWAY_RANDOM_JOB_NAME_SUFFIX)); } + public boolean isExecutorMetricsEnabled() { + return Boolean.parseBoolean(props.getProperty(HoodieMetricsConfig.ENABLE_EXECUTOR_METRICS, "false")); + } + /** * memory configs. */ diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java b/hudi-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java new file mode 100644 index 0000000..5197a22 --- /dev/null +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/DistributedRegistry.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metrics; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hudi.common.metrics.Registry; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.util.AccumulatorV2; + +/** + * Lightweight Metrics Registry to track Hudi events. + */ +public class DistributedRegistry extends AccumulatorV2<Map<String, Long>, Map<String, Long>> + implements Registry, Serializable { + private String name; + ConcurrentHashMap<String, Long> counters = new ConcurrentHashMap<>(); + + public DistributedRegistry(String name) { + this.name = name; + } + + public void register(JavaSparkContext jsc) { + if (!isRegistered()) { + jsc.sc().register(this); + } + } + + @Override + public void clear() { + counters.clear(); + } + + @Override + public void increment(String name) { + counters.merge(name, 1L, (oldValue, newValue) -> oldValue + newValue); + } + + @Override + public void add(String name, long value) { + counters.merge(name, value, (oldValue, newValue) -> oldValue + newValue); + } + + /** + * Get all Counter type metrics. + */ + @Override + public Map<String, Long> getAllCounts(boolean prefixWithRegistryName) { + HashMap<String, Long> countersMap = new HashMap<>(); + counters.forEach((k, v) -> { + String key = prefixWithRegistryName ? name + "." + k : k; + countersMap.put(key, v); + }); + return countersMap; + } + + @Override + public void add(Map<String, Long> arg) { + arg.forEach((key, value) -> add(key, value)); + } + + @Override + public AccumulatorV2<Map<String, Long>, Map<String, Long>> copy() { + DistributedRegistry registry = new DistributedRegistry(name); + counters.forEach((key, value) -> registry.add(key, value)); + return registry; + } + + @Override + public boolean isZero() { + return counters.isEmpty(); + } + + @Override + public void merge(AccumulatorV2<Map<String, Long>, Map<String, Long>> acc) { + acc.value().forEach((key, value) -> add(key, value)); + } + + @Override + public void reset() { + counters.clear(); + } + + @Override + public Map<String, Long> value() { + return counters; + } +} \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java index 9cea356..cdda082 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java @@ -74,9 +74,8 @@ public class HoodieWrapperFileSystem extends FileSystem { private FileSystem fileSystem; private URI uri; private ConsistencyGuard consistencyGuard = new NoOpConsistencyGuard(); - private static Registry metricsRegistry = Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName()); - private static Registry metricsRegistryMetaFolder = - Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + "MetaFolder"); + private static Registry metricsRegistry; + private static Registry metricsRegistryMetaFolder; @FunctionalInterface public interface CheckedFunction<R> { @@ -89,12 +88,14 @@ public class HoodieWrapperFileSystem extends FileSystem { } protected static <R> R executeFuncWithTimeMetrics(String metricName, Path p, CheckedFunction<R> func) throws IOException { - Registry registry = getMetricRegistryForPath(p); - long t1 = System.currentTimeMillis(); R res = func.get(); - registry.increment(metricName); - registry.add(metricName + ".totalDuration", System.currentTimeMillis() - t1); + + Registry registry = getMetricRegistryForPath(p); + if (registry != null) { + registry.increment(metricName); + registry.add(metricName + ".totalDuration", System.currentTimeMillis() - t1); + } return res; } @@ -102,10 +103,18 @@ public class HoodieWrapperFileSystem extends FileSystem { protected static <R> R executeFuncWithTimeAndByteMetrics(String metricName, Path p, long byteCount, CheckedFunction<R> func) throws IOException { Registry registry = getMetricRegistryForPath(p); - registry.add(metricName + ".totalBytes", byteCount); + if (registry != null) { + registry.add(metricName + ".totalBytes", byteCount); + } + return executeFuncWithTimeMetrics(metricName, p, func); } + public static void setMetricsRegistry(Registry registry, Registry registryMeta) { + metricsRegistry = registry; + metricsRegistryMetaFolder = registryMeta; + } + public HoodieWrapperFileSystem() {} public HoodieWrapperFileSystem(FileSystem fileSystem, ConsistencyGuard consistencyGuard) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java index 684a625..f5adb0d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java @@ -18,7 +18,6 @@ package org.apache.hudi.common.fs; -import org.apache.hudi.common.metrics.Registry; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.ReadOption; @@ -35,8 +34,6 @@ public class SizeAwareFSDataInputStream extends FSDataInputStream { // Path private final Path path; - // Registry or read and write metrics - Registry metricsRegistry; public SizeAwareFSDataInputStream(Path path, FSDataInputStream in) throws IOException { super(in); @@ -85,9 +82,4 @@ public class SizeAwareFSDataInputStream extends FSDataInputStream { return null; }); } - - public void setMetricRegistry(Registry metricsRegistry) { - this.metricsRegistry = metricsRegistry; - } - } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java index e0607d0..ac07cd7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java @@ -18,7 +18,6 @@ package org.apache.hudi.common.fs; -import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.exception.HoodieException; import org.apache.hadoop.fs.FSDataOutputStream; @@ -42,8 +41,6 @@ public class SizeAwareFSDataOutputStream extends FSDataOutputStream { private final Path path; // Consistency guard private final ConsistencyGuard consistencyGuard; - // Registry or read and write metrics - Registry metricsRegistry; public SizeAwareFSDataOutputStream(Path path, FSDataOutputStream out, ConsistencyGuard consistencyGuard, Runnable closeCallback) throws IOException { @@ -87,8 +84,4 @@ public class SizeAwareFSDataOutputStream extends FSDataOutputStream { public long getBytesWritten() { return bytesWritten.get(); } - - public void setMetricRegistry(Registry metricsRegistry) { - this.metricsRegistry = metricsRegistry; - } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java b/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java similarity index 60% copy from hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java copy to hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java index 169e8bc..36aeab9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java @@ -22,76 +22,36 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; - /** * Lightweight Metrics Registry to track Hudi events. */ -public class Registry { +public class LocalRegistry implements Registry { ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>(); - final String name; - - private static ConcurrentHashMap<String, Registry> registryMap = new ConcurrentHashMap<>(); + private String name; - private Registry(String name) { + public LocalRegistry(String name) { this.name = name; } - /** - * Get (or create) the registry for a provided name. - */ - public static synchronized Registry getRegistry(String registryName) { - if (!registryMap.containsKey(registryName)) { - registryMap.put(registryName, new Registry(registryName)); - } - return registryMap.get(registryName); - } - - /** - * Get all registered metrics. - * @param flush clean all metrics as part of this operation. - * @param prefixWithRegistryName prefix each metric name with the registry name. - * @return - */ - public static synchronized Map<String, Long> getAllMetrics(boolean flush, boolean prefixWithRegistryName) { - HashMap<String, Long> allMetrics = new HashMap<>(); - registryMap.forEach((registryName, registry) -> { - allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName)); - if (flush) { - registry.clear(); - } - }); - return allMetrics; - } - + @Override public void clear() { counters.clear(); } + @Override public void increment(String name) { getCounter(name).increment(); } + @Override public void add(String name, long value) { getCounter(name).add(value); } - private synchronized Counter getCounter(String name) { - if (!counters.containsKey(name)) { - counters.put(name, new Counter()); - } - return counters.get(name); - } - - /** - * Get all Counter type metrics. - */ - public Map<String, Long> getAllCounts() { - return getAllCounts(false); - } - /** * Get all Counter type metrics. */ + @Override public Map<String, Long> getAllCounts(boolean prefixWithRegistryName) { HashMap<String, Long> countersMap = new HashMap<>(); counters.forEach((k, v) -> { @@ -101,4 +61,10 @@ public class Registry { return countersMap; } + private synchronized Counter getCounter(String name) { + if (!counters.containsKey(name)) { + counters.put(name, new Counter()); + } + return counters.get(name); + } } \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java index 169e8bc..0a56297 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java @@ -18,87 +18,98 @@ package org.apache.hudi.common.metrics; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hudi.common.util.ReflectionUtils; + /** - * Lightweight Metrics Registry to track Hudi events. + * Interface which defines a lightweight Metrics Registry to track Hudi events. */ -public class Registry { - ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>(); - final String name; - - private static ConcurrentHashMap<String, Registry> registryMap = new ConcurrentHashMap<>(); +public interface Registry extends Serializable { + static ConcurrentHashMap<String, Registry> REGISTRYMAP = new ConcurrentHashMap<>(); - private Registry(String name) { - this.name = name; + /** + * Get (or create) the registry for a provided name. + * + * This function creates a {@code LocalRegistry}. + * + * @param registryName Name of the registry + */ + public static Registry getRegistry(String registryName) { + return getRegistry(registryName, LocalRegistry.class.getName()); } /** - * Get (or create) the registry for a provided name. + * Get (or create) the registry for a provided name and given class. + * + * @param registryName Name of the registry. + * @param clazz The fully qualified name of the registry class to create. */ - public static synchronized Registry getRegistry(String registryName) { - if (!registryMap.containsKey(registryName)) { - registryMap.put(registryName, new Registry(registryName)); + public static Registry getRegistry(String registryName, String clazz) { + synchronized (Registry.class) { + if (!REGISTRYMAP.containsKey(registryName)) { + Registry registry = (Registry)ReflectionUtils.loadClass(clazz, registryName); + REGISTRYMAP.put(registryName, registry); + } + return REGISTRYMAP.get(registryName); } - return registryMap.get(registryName); } /** * Get all registered metrics. - * @param flush clean all metrics as part of this operation. + * + * @param flush clear all metrics after this operation. * @param prefixWithRegistryName prefix each metric name with the registry name. * @return */ - public static synchronized Map<String, Long> getAllMetrics(boolean flush, boolean prefixWithRegistryName) { - HashMap<String, Long> allMetrics = new HashMap<>(); - registryMap.forEach((registryName, registry) -> { - allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName)); - if (flush) { - registry.clear(); - } - }); - return allMetrics; - } - - public void clear() { - counters.clear(); + public static Map<String, Long> getAllMetrics(boolean flush, boolean prefixWithRegistryName) { + synchronized (Registry.class) { + HashMap<String, Long> allMetrics = new HashMap<>(); + REGISTRYMAP.forEach((registryName, registry) -> { + allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName)); + if (flush) { + registry.clear(); + } + }); + return allMetrics; + } } - public void increment(String name) { - getCounter(name).increment(); - } + /** + * Clear all metrics. + */ + public void clear(); - public void add(String name, long value) { - getCounter(name).add(value); - } + /** + * Increment the metric. + * + * @param name Name of the metric to increment. + */ + public void increment(String name); - private synchronized Counter getCounter(String name) { - if (!counters.containsKey(name)) { - counters.put(name, new Counter()); - } - return counters.get(name); - } + /** + * Add value to the metric. + * + * @param name Name of the metric. + * @param value The value to add to the metrics. + */ + public void add(String name, long value); /** * Get all Counter type metrics. */ - public Map<String, Long> getAllCounts() { + public default Map<String, Long> getAllCounts() { return getAllCounts(false); } /** * Get all Counter type metrics. + * + * @param prefixWithRegistryName If true, the names of all metrics are prefixed with name of this registry. */ - public Map<String, Long> getAllCounts(boolean prefixWithRegistryName) { - HashMap<String, Long> countersMap = new HashMap<>(); - counters.forEach((k, v) -> { - String key = prefixWithRegistryName ? name + "." + k : k; - countersMap.put(key, v.getValue()); - }); - return countersMap; - } - + public abstract Map<String, Long> getAllCounts(boolean prefixWithRegistryName); } \ No newline at end of file
