This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch rfc-15 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d305948cac63e2a88a8d2df5bdf64ffd6fc49f5a Author: Vinoth Chandar <[email protected]> AuthorDate: Thu Oct 29 18:33:56 2020 -0700 [RFC-15] Fixing code review comments --- .../apache/hudi/cli/commands/MetadataCommand.java | 11 ++-- .../apache/hudi/client/AbstractHoodieClient.java | 5 +- .../org/apache/hudi/client/HoodieWriteClient.java | 2 +- .../apache/hudi/config/HoodieMetadataConfig.java | 61 +++++++++++----------- .../org/apache/hudi/config/HoodieWriteConfig.java | 16 +++--- .../apache/hudi/metadata/HoodieMetadataWriter.java | 3 +- .../java/org/apache/hudi/table/HoodieTable.java | 2 +- .../apache/hudi/metadata/TestHoodieMetadata.java | 16 +++--- .../hudi/common/fs/HoodieWrapperFileSystem.java | 29 +++++----- ...nputStream.java => TimedFSDataInputStream.java} | 18 +++---- .../apache/hudi/common/metrics/LocalRegistry.java | 4 +- .../org/apache/hudi/common/metrics/Registry.java | 27 +++++----- .../hudi/common/table/log/HoodieLogFileReader.java | 4 +- .../apache/hudi/metadata/HoodieMetadataReader.java | 36 ++++++------- 14 files changed, 113 insertions(+), 121 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java index 4ecc6a9..a45e9b4 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.config.HoodieMetadataConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.metadata.HoodieMetadataReader; @@ -71,13 +72,11 @@ public class MetadataCommand implements CommandMarker { HoodieCLI.fs.mkdirs(metadataPath); } - long t1 = System.currentTimeMillis(); + HoodieTimer timer = new HoodieTimer().startTimer(); HoodieWriteConfig writeConfig = getWriteConfig(); initJavaSparkContext(); - HoodieMetadataWriter.instance(HoodieCLI.conf, writeConfig).initialize(jsc); - long t2 = System.currentTimeMillis(); - - return String.format("Created Metadata Table in %s (duration=%.2fsec)", metadataPath, (t2 - t1) / 1000.0); + HoodieMetadataWriter.create(HoodieCLI.conf, writeConfig).initialize(jsc); + return String.format("Created Metadata Table in %s (duration=%.2f secs)", metadataPath, timer.endTimer() / 1000.0); } @CliCommand(value = "metadata delete", help = "Remove the Metadata Table") @@ -115,7 +114,7 @@ public class MetadataCommand implements CommandMarker { } else { HoodieWriteConfig writeConfig = getWriteConfig(); initJavaSparkContext(); - HoodieMetadataWriter.instance(HoodieCLI.conf, writeConfig).initialize(jsc); + HoodieMetadataWriter.create(HoodieCLI.conf, writeConfig).initialize(jsc); } long t2 = System.currentTimeMillis(); diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java index fd02b6d..f1abe7c 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java @@ -72,6 +72,7 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl this.timelineServer = timelineServer; shouldStopTimelineServer = !timelineServer.isPresent(); startEmbeddedServerView(); + initWrapperFSMetrics(); } /** @@ -123,7 +124,7 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl return config; } - protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) { + private void initWrapperFSMetrics() { if (config.isMetricsOn()) { Registry registry; Registry registryMeta; @@ -143,7 +144,9 @@ public abstract class AbstractHoodieClient implements Serializable, AutoCloseabl HoodieWrapperFileSystem.setMetricsRegistry(registry, registryMeta); } + } + protected HoodieTableMetaClient createMetaClient(boolean loadActiveTimelineOnLoad) { return ClientUtils.createMetaClient(hadoopConf, config, loadActiveTimelineOnLoad); } } diff --git a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java index 614f0dc..c180a88 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java @@ -124,7 +124,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo this.rollbackPending = rollbackPending; // Initialize Metadata Table - HoodieMetadataWriter.instance(hadoopConf, writeConfig).initialize(jsc); + HoodieMetadataWriter.create(hadoopConf, writeConfig).initialize(jsc); } /** diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java index ca9c723..53142b1 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieMetadataConfig.java @@ -19,7 +19,6 @@ package org.apache.hudi.config; import org.apache.hudi.common.config.DefaultHoodieConfig; -import org.apache.hudi.config.HoodieCompactionConfig.Builder; import javax.annotation.concurrent.Immutable; @@ -37,33 +36,33 @@ public class HoodieMetadataConfig extends DefaultHoodieConfig { public static final String METADATA_PREFIX = "hoodie.metadata"; // Enable the internal Metadata Table which saves file listings - public static final String METADATA_ENABLE = METADATA_PREFIX + ".enable"; + public static final String METADATA_ENABLE_PROP = METADATA_PREFIX + ".enable"; public static final boolean DEFAULT_METADATA_ENABLE = false; // Validate contents of Metadata Table on each access against the actual filesystem - public static final String METADATA_VALIDATE = METADATA_PREFIX + ".validate"; + public static final String METADATA_VALIDATE_PROP = METADATA_PREFIX + ".validate"; public static final boolean DEFAULT_METADATA_VALIDATE = false; // Parallelism for inserts - public static final String INSERT_PARALLELISM = METADATA_PREFIX + ".insert.parallelism"; - public static final int DEFAULT_INSERT_PARALLELISM = 1; + public static final String METADATA_INSERT_PARALLELISM_PROP = METADATA_PREFIX + ".insert.parallelism"; + public static final int DEFAULT_METADATA_INSERT_PARALLELISM = 1; // Async clean - public static final String ASYNC_CLEAN = METADATA_PREFIX + ".clean.async"; - public static final boolean DEFAULT_ASYNC_CLEAN = false; + public static final String METADATA_ASYNC_CLEAN_PROP = METADATA_PREFIX + ".clean.async"; + public static final boolean DEFAULT_METADATA_ASYNC_CLEAN = false; // Maximum delta commits before compaction occurs - public static final String COMPACT_NUM_DELTA_COMMITS = METADATA_PREFIX + ".compact.max.delta.commits"; - public static final int DEFAULT_COMPACT_NUM_DELTA_COMMITS = 24; + public static final String METADATA_COMPACT_NUM_DELTA_COMMITS_PROP = METADATA_PREFIX + ".compact.max.delta.commits"; + public static final int DEFAULT_METADATA_COMPACT_NUM_DELTA_COMMITS = 24; // Archival settings - public static final String MIN_COMMITS_TO_KEEP = METADATA_PREFIX + ".keep.min.commits"; + public static final String MIN_COMMITS_TO_KEEP_PROP = METADATA_PREFIX + ".keep.min.commits"; public static final int DEFAULT_MIN_COMMITS_TO_KEEP = 20; - public static final String MAX_COMMITS_TO_KEEP = METADATA_PREFIX + ".keep.max.commits"; + public static final String MAX_COMMITS_TO_KEEP_PROP = METADATA_PREFIX + ".keep.max.commits"; public static final int DEFAULT_MAX_COMMITS_TO_KEEP = 30; // Cleaner commits retained - public static final String CLEANER_COMMITS_RETAINED = METADATA_PREFIX + ".cleaner.commits.retained"; + public static final String CLEANER_COMMITS_RETAINED_PROP = METADATA_PREFIX + ".cleaner.commits.retained"; public static final int DEFAULT_CLEANER_COMMITS_RETAINED = 3; private HoodieMetadataConfig(Properties props) { @@ -91,58 +90,58 @@ public class HoodieMetadataConfig extends DefaultHoodieConfig { } public Builder enable(boolean enable) { - props.setProperty(METADATA_ENABLE, String.valueOf(enable)); + props.setProperty(METADATA_ENABLE_PROP, String.valueOf(enable)); return this; } public Builder validate(boolean validate) { - props.setProperty(METADATA_VALIDATE, String.valueOf(validate)); + props.setProperty(METADATA_VALIDATE_PROP, String.valueOf(validate)); return this; } public Builder withInsertParallelism(int parallelism) { - props.setProperty(INSERT_PARALLELISM, String.valueOf(parallelism)); + props.setProperty(METADATA_INSERT_PARALLELISM_PROP, String.valueOf(parallelism)); return this; } public Builder withAsyncClean(boolean asyncClean) { - props.setProperty(ASYNC_CLEAN, String.valueOf(asyncClean)); + props.setProperty(METADATA_ASYNC_CLEAN_PROP, String.valueOf(asyncClean)); return this; } public Builder withMaxNumDeltaCommitsBeforeCompaction(int maxNumDeltaCommitsBeforeCompaction) { - props.setProperty(COMPACT_NUM_DELTA_COMMITS, String.valueOf(maxNumDeltaCommitsBeforeCompaction)); + props.setProperty(METADATA_COMPACT_NUM_DELTA_COMMITS_PROP, String.valueOf(maxNumDeltaCommitsBeforeCompaction)); return this; } public Builder archiveCommitsWith(int minToKeep, int maxToKeep) { - props.setProperty(MIN_COMMITS_TO_KEEP, String.valueOf(minToKeep)); - props.setProperty(MAX_COMMITS_TO_KEEP, String.valueOf(maxToKeep)); + props.setProperty(MIN_COMMITS_TO_KEEP_PROP, String.valueOf(minToKeep)); + props.setProperty(MAX_COMMITS_TO_KEEP_PROP, String.valueOf(maxToKeep)); return this; } public Builder retainCommits(int commitsRetained) { - props.setProperty(CLEANER_COMMITS_RETAINED, String.valueOf(commitsRetained)); + props.setProperty(CLEANER_COMMITS_RETAINED_PROP, String.valueOf(commitsRetained)); return this; } public HoodieMetadataConfig build() { HoodieMetadataConfig config = new HoodieMetadataConfig(props); - setDefaultOnCondition(props, !props.containsKey(METADATA_ENABLE), METADATA_ENABLE, + setDefaultOnCondition(props, !props.containsKey(METADATA_ENABLE_PROP), METADATA_ENABLE_PROP, String.valueOf(DEFAULT_METADATA_ENABLE)); - setDefaultOnCondition(props, !props.containsKey(METADATA_VALIDATE), METADATA_VALIDATE, + setDefaultOnCondition(props, !props.containsKey(METADATA_VALIDATE_PROP), METADATA_VALIDATE_PROP, String.valueOf(DEFAULT_METADATA_VALIDATE)); - setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, - String.valueOf(DEFAULT_INSERT_PARALLELISM)); - setDefaultOnCondition(props, !props.containsKey(ASYNC_CLEAN), ASYNC_CLEAN, - String.valueOf(DEFAULT_ASYNC_CLEAN)); - setDefaultOnCondition(props, !props.containsKey(COMPACT_NUM_DELTA_COMMITS), - COMPACT_NUM_DELTA_COMMITS, String.valueOf(DEFAULT_COMPACT_NUM_DELTA_COMMITS)); - setDefaultOnCondition(props, !props.containsKey(CLEANER_COMMITS_RETAINED), CLEANER_COMMITS_RETAINED, + setDefaultOnCondition(props, !props.containsKey(METADATA_INSERT_PARALLELISM_PROP), METADATA_INSERT_PARALLELISM_PROP, + String.valueOf(DEFAULT_METADATA_INSERT_PARALLELISM)); + setDefaultOnCondition(props, !props.containsKey(METADATA_ASYNC_CLEAN_PROP), METADATA_ASYNC_CLEAN_PROP, + String.valueOf(DEFAULT_METADATA_ASYNC_CLEAN)); + setDefaultOnCondition(props, !props.containsKey(METADATA_COMPACT_NUM_DELTA_COMMITS_PROP), + METADATA_COMPACT_NUM_DELTA_COMMITS_PROP, String.valueOf(DEFAULT_METADATA_COMPACT_NUM_DELTA_COMMITS)); + setDefaultOnCondition(props, !props.containsKey(CLEANER_COMMITS_RETAINED_PROP), CLEANER_COMMITS_RETAINED_PROP, String.valueOf(DEFAULT_CLEANER_COMMITS_RETAINED)); - setDefaultOnCondition(props, !props.containsKey(MAX_COMMITS_TO_KEEP), MAX_COMMITS_TO_KEEP, + setDefaultOnCondition(props, !props.containsKey(MAX_COMMITS_TO_KEEP_PROP), MAX_COMMITS_TO_KEEP_PROP, String.valueOf(DEFAULT_MAX_COMMITS_TO_KEEP)); - setDefaultOnCondition(props, !props.containsKey(MIN_COMMITS_TO_KEEP), MIN_COMMITS_TO_KEEP, + setDefaultOnCondition(props, !props.containsKey(MIN_COMMITS_TO_KEEP_PROP), MIN_COMMITS_TO_KEEP_PROP, String.valueOf(DEFAULT_MIN_COMMITS_TO_KEEP)); return config; diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index b499ac9..293c3c0 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -763,35 +763,35 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { * File listing metadata configs. */ public boolean useFileListingMetadata() { - return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_ENABLE)); + return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_ENABLE_PROP)); } public boolean getFileListingMetadataVerify() { - return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_VALIDATE)); + return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_VALIDATE_PROP)); } public int getMetadataInsertParallelism() { - return Integer.parseInt(props.getProperty(HoodieMetadataConfig.INSERT_PARALLELISM)); + return Integer.parseInt(props.getProperty(HoodieMetadataConfig.METADATA_INSERT_PARALLELISM_PROP)); } public int getMetadataCompactDeltaCommitMax() { - return Integer.parseInt(props.getProperty(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS)); + return Integer.parseInt(props.getProperty(HoodieMetadataConfig.METADATA_COMPACT_NUM_DELTA_COMMITS_PROP)); } public boolean isMetadataAsyncClean() { - return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.ASYNC_CLEAN)); + return Boolean.parseBoolean(props.getProperty(HoodieMetadataConfig.METADATA_ASYNC_CLEAN_PROP)); } public int getMetadataMaxCommitsToKeep() { - return Integer.parseInt(props.getProperty(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP)); + return Integer.parseInt(props.getProperty(HoodieMetadataConfig.MAX_COMMITS_TO_KEEP_PROP)); } public int getMetadataMinCommitsToKeep() { - return Integer.parseInt(props.getProperty(HoodieMetadataConfig.MIN_COMMITS_TO_KEEP)); + return Integer.parseInt(props.getProperty(HoodieMetadataConfig.MIN_COMMITS_TO_KEEP_PROP)); } public int getMetadataCleanerCommitsRetained() { - return Integer.parseInt(props.getProperty(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED)); + return Integer.parseInt(props.getProperty(HoodieMetadataConfig.CLEANER_COMMITS_RETAINED_PROP)); } public static class Builder { diff --git a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java index 655890b..08a785d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java +++ b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java @@ -99,7 +99,7 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial private String tableName; private static Map<String, HoodieMetadataWriter> instances = new HashMap<>(); - public static HoodieMetadataWriter instance(Configuration conf, HoodieWriteConfig writeConfig) { + public static HoodieMetadataWriter create(Configuration conf, HoodieWriteConfig writeConfig) { String key = writeConfig.getBasePath(); if (instances.containsKey(key)) { if (instances.get(key).enabled() != writeConfig.useFileListingMetadata()) { @@ -314,7 +314,6 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial } } String createInstantTime = latestInstant.isPresent() ? latestInstant.get().getTimestamp() : SOLO_COMMIT_TIMESTAMP; - LOG.info("Creating a new metadata table in " + metadataBasePath + " at instant " + createInstantTime); metaClient = HoodieTableMetaClient.initTableType(hadoopConf.get(), metadataBasePath.toString(), HoodieTableType.MERGE_ON_READ, tableName, "archived", HoodieMetadataPayload.class.getName(), diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java index 286e6db..471fc8d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -634,7 +634,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri public HoodieMetadataWriter metadata() { if (metadataWriter == null) { - metadataWriter = HoodieMetadataWriter.instance(hadoopConfiguration.get(), config); + metadataWriter = HoodieMetadataWriter.create(hadoopConfiguration.get(), config); } return metadataWriter; diff --git a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java index 98675e4..62962d5 100644 --- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java +++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java @@ -30,7 +30,6 @@ import java.nio.file.Paths; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Random; import java.util.stream.Collectors; import org.apache.hadoop.fs.FileStatus; @@ -54,6 +53,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; @@ -73,6 +73,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; public class TestHoodieMetadata extends HoodieClientTestHarness { private static final Logger LOG = LogManager.getLogger(TestHoodieMetadata.class); @@ -473,13 +474,12 @@ public class TestHoodieMetadata extends HoodieClientTestHarness { * Instants on Metadata Table should be archived as per config. * Metadata Table should be automatically compacted as per config. */ - @Test - public void testArchivingAndCompaction() throws Exception { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testArchivingAndCompaction(boolean asyncClean) throws Exception { init(HoodieTableType.COPY_ON_WRITE); final int maxDeltaCommitsBeforeCompaction = 6; - // Test autoClean and asyncClean based on this flag which is randomly chosen. - boolean asyncClean = new Random().nextBoolean(); HoodieWriteConfig config = getWriteConfigBuilder(true, true, false) .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true) .archiveCommitsWith(2, 4).retainCommits(1) @@ -645,7 +645,7 @@ public class TestHoodieMetadata extends HoodieClientTestHarness { return; } - long t1 = System.currentTimeMillis(); + HoodieTimer timer = new HoodieTimer().startTimer(); // Validate write config for metadata table HoodieWriteConfig metadataWriteConfig = metadata.getWriteConfig(); @@ -751,11 +751,11 @@ public class TestHoodieMetadata extends HoodieClientTestHarness { + numFileVersions + " but was " + fsView.getAllFileSlices(partition).count()); }); - LOG.info("Validation time=" + (System.currentTimeMillis() - t1)); + LOG.info("Validation time=" + timer.endTimer()); } private HoodieMetadataWriter metadata(HoodieWriteClient client) { - return HoodieMetadataWriter.instance(hadoopConf, client.getConfig()); + return HoodieMetadataWriter.create(hadoopConf, client.getConfig()); } // TODO: this can be moved to TestHarness after merge from master diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java index cdda082..f9c2296 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java @@ -20,6 +20,7 @@ package org.apache.hudi.common.fs; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -70,12 +71,19 @@ public class HoodieWrapperFileSystem extends FileSystem { create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, listFiles, read, write } + private static Registry METRICS_REGISTRY_DATA; + private static Registry METRICS_REGISTRY_META; + + public static void setMetricsRegistry(Registry registry, Registry registryMeta) { + METRICS_REGISTRY_DATA = registry; + METRICS_REGISTRY_META = registryMeta; + } + + private ConcurrentMap<String, SizeAwareFSDataOutputStream> openStreams = new ConcurrentHashMap<>(); private FileSystem fileSystem; private URI uri; private ConsistencyGuard consistencyGuard = new NoOpConsistencyGuard(); - private static Registry metricsRegistry; - private static Registry metricsRegistryMetaFolder; @FunctionalInterface public interface CheckedFunction<R> { @@ -84,17 +92,17 @@ public class HoodieWrapperFileSystem extends FileSystem { private static Registry getMetricRegistryForPath(Path p) { return ((p != null) && (p.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME))) - ? metricsRegistryMetaFolder : metricsRegistry; + ? METRICS_REGISTRY_META : METRICS_REGISTRY_DATA; } protected static <R> R executeFuncWithTimeMetrics(String metricName, Path p, CheckedFunction<R> func) throws IOException { - long t1 = System.currentTimeMillis(); + HoodieTimer timer = new HoodieTimer().startTimer(); R res = func.get(); Registry registry = getMetricRegistryForPath(p); if (registry != null) { registry.increment(metricName); - registry.add(metricName + ".totalDuration", System.currentTimeMillis() - t1); + registry.add(metricName + ".totalDuration", timer.endTimer()); } return res; @@ -110,11 +118,6 @@ public class HoodieWrapperFileSystem extends FileSystem { return executeFuncWithTimeMetrics(metricName, p, func); } - public static void setMetricsRegistry(Registry registry, Registry registryMeta) { - metricsRegistry = registry; - metricsRegistryMetaFolder = registryMeta; - } - public HoodieWrapperFileSystem() {} public HoodieWrapperFileSystem(FileSystem fileSystem, ConsistencyGuard consistencyGuard) { @@ -206,12 +209,10 @@ public class HoodieWrapperFileSystem extends FileSystem { } private FSDataInputStream wrapInputStream(final Path path, FSDataInputStream fsDataInputStream) throws IOException { - if (fsDataInputStream instanceof SizeAwareFSDataInputStream) { + if (fsDataInputStream instanceof TimedFSDataInputStream) { return fsDataInputStream; } - - SizeAwareFSDataInputStream os = new SizeAwareFSDataInputStream(path, fsDataInputStream); - return os; + return new TimedFSDataInputStream(path, fsDataInputStream); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/TimedFSDataInputStream.java similarity index 82% rename from hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java rename to hudi-common/src/main/java/org/apache/hudi/common/fs/TimedFSDataInputStream.java index f5adb0d..c621111 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/TimedFSDataInputStream.java @@ -28,14 +28,14 @@ import java.nio.ByteBuffer; import java.util.EnumSet; /** - * Wrapper over <code>FSDataInputStream</code> to keep track of the size of the written bytes. + * Wrapper over <code>FSDataInputStream</code> that also times the operations. */ -public class SizeAwareFSDataInputStream extends FSDataInputStream { +public class TimedFSDataInputStream extends FSDataInputStream { // Path private final Path path; - public SizeAwareFSDataInputStream(Path path, FSDataInputStream in) throws IOException { + public TimedFSDataInputStream(Path path, FSDataInputStream in) { super(in); this.path = path; } @@ -43,26 +43,20 @@ public class SizeAwareFSDataInputStream extends FSDataInputStream { @Override public int read(ByteBuffer buf) throws IOException { return HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(), - path, 0, () -> { - return super.read(buf); - }); + path, 0, () -> super.read(buf)); } @Override public int read(long position, byte[] buffer, int offset, int length) throws IOException { return HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(), - path, length, () -> { - return super.read(position, buffer, offset, length); - }); + path, length, () -> super.read(position, buffer, offset, length)); } @Override public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException { return HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(), - path, maxLength, () -> { - return super.read(bufferPool, maxLength, opts); - }); + path, maxLength, () -> super.read(bufferPool, maxLength, opts)); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java b/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java index 36aeab9..9383223 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/metrics/LocalRegistry.java @@ -23,11 +23,11 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** - * Lightweight Metrics Registry to track Hudi events. + * Registry that tracks metrics local to a single jvm process. */ public class LocalRegistry implements Registry { ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>(); - private String name; + private final String name; public LocalRegistry(String name) { this.name = name; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java index 0a56297..19822fb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/metrics/Registry.java @@ -30,7 +30,8 @@ import org.apache.hudi.common.util.ReflectionUtils; * Interface which defines a lightweight Metrics Registry to track Hudi events. */ public interface Registry extends Serializable { - static ConcurrentHashMap<String, Registry> REGISTRYMAP = new ConcurrentHashMap<>(); + + ConcurrentHashMap<String, Registry> REGISTRY_MAP = new ConcurrentHashMap<>(); /** * Get (or create) the registry for a provided name. @@ -39,7 +40,7 @@ public interface Registry extends Serializable { * * @param registryName Name of the registry */ - public static Registry getRegistry(String registryName) { + static Registry getRegistry(String registryName) { return getRegistry(registryName, LocalRegistry.class.getName()); } @@ -49,13 +50,13 @@ public interface Registry extends Serializable { * @param registryName Name of the registry. * @param clazz The fully qualified name of the registry class to create. */ - public static Registry getRegistry(String registryName, String clazz) { + static Registry getRegistry(String registryName, String clazz) { synchronized (Registry.class) { - if (!REGISTRYMAP.containsKey(registryName)) { + if (!REGISTRY_MAP.containsKey(registryName)) { Registry registry = (Registry)ReflectionUtils.loadClass(clazz, registryName); - REGISTRYMAP.put(registryName, registry); + REGISTRY_MAP.put(registryName, registry); } - return REGISTRYMAP.get(registryName); + return REGISTRY_MAP.get(registryName); } } @@ -66,10 +67,10 @@ public interface Registry extends Serializable { * @param prefixWithRegistryName prefix each metric name with the registry name. * @return */ - public static Map<String, Long> getAllMetrics(boolean flush, boolean prefixWithRegistryName) { + static Map<String, Long> getAllMetrics(boolean flush, boolean prefixWithRegistryName) { synchronized (Registry.class) { HashMap<String, Long> allMetrics = new HashMap<>(); - REGISTRYMAP.forEach((registryName, registry) -> { + REGISTRY_MAP.forEach((registryName, registry) -> { allMetrics.putAll(registry.getAllCounts(prefixWithRegistryName)); if (flush) { registry.clear(); @@ -82,14 +83,14 @@ public interface Registry extends Serializable { /** * Clear all metrics. */ - public void clear(); + void clear(); /** * Increment the metric. * * @param name Name of the metric to increment. */ - public void increment(String name); + void increment(String name); /** * Add value to the metric. @@ -97,12 +98,12 @@ public interface Registry extends Serializable { * @param name Name of the metric. * @param value The value to add to the metrics. */ - public void add(String name, long value); + void add(String name, long value); /** * Get all Counter type metrics. */ - public default Map<String, Long> getAllCounts() { + default Map<String, Long> getAllCounts() { return getAllCounts(false); } @@ -111,5 +112,5 @@ public interface Registry extends Serializable { * * @param prefixWithRegistryName If true, the names of all metrics are prefixed with name of this registry. */ - public abstract Map<String, Long> getAllCounts(boolean prefixWithRegistryName); + Map<String, Long> getAllCounts(boolean prefixWithRegistryName); } \ No newline at end of file diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index d7ef2d3..7c96a0f 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -19,7 +19,7 @@ package org.apache.hudi.common.table.log; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.fs.SizeAwareFSDataInputStream; +import org.apache.hudi.common.fs.TimedFSDataInputStream; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; @@ -73,7 +73,7 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { boolean readBlockLazily, boolean reverseReader) throws IOException { FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize); if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { - this.inputStream = new SizeAwareFSDataInputStream(logFile.getPath(), new FSDataInputStream( + this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream( new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); } else { // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java index fed5516..c14f402 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java @@ -52,6 +52,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.SpillableMapUtils; import org.apache.hudi.common.util.ValidationUtils; @@ -133,9 +134,6 @@ public class HoodieMetadataReader implements Serializable { /** * Create a the Metadata Table in read-only mode. - * - * @param hadoopConf {@code Configuration} - * @param basePath The basePath for the dataset */ public HoodieMetadataReader(Configuration conf, String datasetBasePath, String spillableMapDirectory, boolean enabled, boolean validateLookups) { @@ -144,9 +142,6 @@ public class HoodieMetadataReader implements Serializable { /** * Create a the Metadata Table in read-only mode. - * - * @param hadoopConf {@code Configuration} - * @param basePath The basePath for the dataset */ public HoodieMetadataReader(Configuration conf, String datasetBasePath, String spillableMapDirectory, boolean enabled, boolean validateLookups, boolean enableMetrics) { @@ -230,9 +225,9 @@ public class HoodieMetadataReader implements Serializable { * Returns a list of all partitions. */ protected List<String> getAllPartitionPaths() throws IOException { - long t1 = System.currentTimeMillis(); + HoodieTimer timer = new HoodieTimer().startTimer(); Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getMergedRecordByKey(RECORDKEY_PARTITION_LIST); - updateMetrics(LOOKUP_PARTITIONS_STR, System.currentTimeMillis() - t1); + updateMetrics(LOOKUP_PARTITIONS_STR, timer.endTimer()); List<String> partitions = Collections.emptyList(); if (hoodieRecord.isPresent()) { @@ -251,9 +246,9 @@ public class HoodieMetadataReader implements Serializable { if (validateLookups) { // Validate the Metadata Table data by listing the partitions from the file system - t1 = System.currentTimeMillis(); + timer.startTimer(); List<String> actualPartitions = getAllPartitionPathsByListing(metaClient.getFs(), datasetBasePath, false); - updateMetrics(VALIDATE_PARTITIONS_STR, System.currentTimeMillis() - t1); + updateMetrics(VALIDATE_PARTITIONS_STR, timer.endTimer()); Collections.sort(actualPartitions); Collections.sort(partitions); @@ -284,9 +279,9 @@ public class HoodieMetadataReader implements Serializable { partitionName = NON_PARTITIONED_NAME; } - long t1 = System.currentTimeMillis(); + HoodieTimer timer = new HoodieTimer().startTimer(); Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getMergedRecordByKey(partitionName); - updateMetrics(LOOKUP_FILES_STR, System.currentTimeMillis() - t1); + updateMetrics(LOOKUP_FILES_STR, timer.endTimer()); FileStatus[] statuses = {}; if (hoodieRecord.isPresent()) { @@ -299,20 +294,21 @@ public class HoodieMetadataReader implements Serializable { if (validateLookups) { // Validate the Metadata Table data by listing the partitions from the file system - t1 = System.currentTimeMillis(); + timer.startTimer(); // Ignore partition metadata file FileStatus[] directStatuses = metaClient.getFs().listStatus(partitionPath, p -> !p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)); - updateMetrics(VALIDATE_FILES_STR, System.currentTimeMillis() - t1); + updateMetrics(VALIDATE_FILES_STR, timer.endTimer()); List<String> directFilenames = Arrays.stream(directStatuses) - .map(s -> s.getPath().getName()).collect(Collectors.toList()); + .map(s -> s.getPath().getName()).sorted() + .collect(Collectors.toList()); + List<String> metadataFilenames = Arrays.stream(statuses) - .map(s -> s.getPath().getName()).collect(Collectors.toList()); + .map(s -> s.getPath().getName()).sorted() + .collect(Collectors.toList()); - Collections.sort(metadataFilenames); - Collections.sort(directFilenames); if (!metadataFilenames.equals(directFilenames)) { LOG.error("Validation of metadata file listing for partition " + partitionName + " failed."); LOG.error("File list from metadata: " + Arrays.toString(metadataFilenames.toArray())); @@ -350,12 +346,12 @@ public class HoodieMetadataReader implements Serializable { // Retrieve record from base file HoodieRecord<HoodieMetadataPayload> hoodieRecord = null; if (basefileReader != null) { - long t1 = System.currentTimeMillis(); + HoodieTimer timer = new HoodieTimer().startTimer(); Option<GenericRecord> baseRecord = basefileReader.getRecordByKey(key); if (baseRecord.isPresent()) { hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) baseRecord.get(), metaClient.getTableConfig().getPayloadClass()); - updateMetrics(BASEFILE_READ_STR, System.currentTimeMillis() - t1); + updateMetrics(BASEFILE_READ_STR, timer.endTimer()); } }
