This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch rfc-15 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ce1c983e7a0b2fb34270a12d33a7c300433aabc3 Author: Prashant Wason <[email protected]> AuthorDate: Fri Oct 16 17:12:09 2020 -0700 [RFC-15] Using distributed metrics registry in metadata code. --- .../apache/hudi/metadata/HoodieMetadataWriter.java | 36 +++++----- .../apache/hudi/metadata/TestHoodieMetadata.java | 2 +- .../hudi/common/table/log/HoodieLogFileReader.java | 5 +- .../apache/hudi/metadata/HoodieMetadataReader.java | 79 +++++++++++++--------- 4 files changed, 70 insertions(+), 52 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java index 295f15f..5491451 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java +++ b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java @@ -45,6 +45,7 @@ import org.apache.hudi.client.utils.ClientUtils; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFileFormat; @@ -68,6 +69,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.metrics.DistributedRegistry; import org.apache.hudi.table.HoodieTable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -95,12 +97,6 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial private static Map<String, HoodieMetadataWriter> instances = new HashMap<>(); public static HoodieMetadataWriter instance(Configuration conf, HoodieWriteConfig writeConfig) { - try { - return new HoodieMetadataWriter(conf, writeConfig); - } catch (IOException e) { - throw new HoodieMetadataException("Could not initialize HoodieMetadataWriter", e); - } - /* return instances.computeIfAbsent(writeConfig.getBasePath(), k -> { try { return new HoodieMetadataWriter(conf, writeConfig); @@ -108,12 +104,11 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial throw new HoodieMetadataException("Could not initialize HoodieMetadataWriter", e); } }); - */ } HoodieMetadataWriter(Configuration hadoopConf, HoodieWriteConfig writeConfig) throws IOException { super(hadoopConf, writeConfig.getBasePath(), writeConfig.getSpillableMapBasePath(), - writeConfig.useFileListingMetadata(), writeConfig.getFileListingMetadataVerify()); + writeConfig.useFileListingMetadata(), writeConfig.getFileListingMetadataVerify(), false); if (writeConfig.useFileListingMetadata()) { this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX; @@ -126,6 +121,14 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial // Metadata Table cannot have its metadata optimized ValidationUtils.checkArgument(this.config.shouldAutoCommit(), "Auto commit is required for Metadata Table"); ValidationUtils.checkArgument(!this.config.useFileListingMetadata(), "File listing cannot be used for Metadata Table"); + + if (config.isMetricsOn()) { + if (config.isExecutorMetricsEnabled()) { + metricsRegistry = Registry.getRegistry("HoodieMetadata", DistributedRegistry.class.getName()); + } else { + metricsRegistry = Registry.getRegistry("HoodieMetadata"); + } + } } else { enabled = false; } @@ -162,6 +165,7 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial if (writeConfig.isMetricsOn()) { HoodieMetricsConfig.Builder metricsConfig = HoodieMetricsConfig.newBuilder() .withReporterType(writeConfig.getMetricsReporterType().toString()) + .withExecutorMetrics(writeConfig.isExecutorMetricsEnabled()) .on(true); switch (writeConfig.getMetricsReporterType()) { case GRAPHITE: @@ -215,6 +219,10 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial */ public void initialize(JavaSparkContext jsc) { try { + if (metricsRegistry instanceof DistributedRegistry) { + ((DistributedRegistry) metricsRegistry).register(jsc); + } + if (enabled) { initializeAndSync(jsc); } @@ -261,12 +269,6 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial } else { updateMetrics(INITIALIZE_STR, durationInMs); } - - // Total size of the metadata and count of base/log files - Map<String, String> stats = getStats(false); - updateMetrics(Long.valueOf(stats.get(STAT_TOTAL_BASE_FILE_SIZE)), - Long.valueOf(stats.get(STAT_TOTAL_LOG_FILE_SIZE)), Integer.valueOf(stats.get(STAT_COUNT_BASE_FILES)), - Integer.valueOf(stats.get(STAT_COUNT_LOG_FILES))); } /** @@ -287,7 +289,7 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial HoodieFileFormat.HFILE.toString()); // List all partitions in the basePath of the containing dataset - FileSystem fs = FSUtils.getFs(datasetBasePath, hadoopConf.get()); + FileSystem fs = datasetMetaClient.getFs(); List<String> partitions = FSUtils.getAllPartitionPaths(fs, datasetBasePath, false); LOG.info("Initializing metadata table by using file listings in " + partitions.size() + " partitions"); @@ -297,7 +299,7 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial int parallelism = Math.min(partitions.size(), jsc.defaultParallelism()) + 1; // +1 to prevent 0 parallelism JavaPairRDD<String, FileStatus[]> partitionFileListRDD = jsc.parallelize(partitions, parallelism) .mapToPair(partition -> { - FileSystem fsys = FSUtils.getFs(dbasePath, serializedConf.get()); + FileSystem fsys = datasetMetaClient.getFs(); FileStatus[] statuses = FSUtils.getAllDataFilesInPartition(fsys, new Path(dbasePath, partition)); return new Tuple2<>(partition, statuses); }); @@ -473,7 +475,7 @@ public class HoodieMetadataWriter extends HoodieMetadataReader implements Serial return; } - HoodieActiveTimeline timeline = metaClient.getActiveTimeline(); + HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline(); long cnt = timeline.filterCompletedInstants().getInstants().filter(i -> i.getTimestamp().equals(instantTime)).count(); if (cnt == 1) { LOG.info("Ignoring update from cleaner plan for already completed instant " + instantTime); diff --git a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java index a901916..9395d5f 100644 --- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java +++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java @@ -762,6 +762,6 @@ public class TestHoodieMetadata extends HoodieClientTestHarness { .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withUseFileListingMetadata(useFileListingMetadata) .withMetricsConfig(HoodieMetricsConfig.newBuilder().withReporterType("CONSOLE").on(enableMetrics) - .usePrefix("unit-test").build()); + .withExecutorMetrics(true).usePrefix("unit-test").build()); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index 5d2e185..d7ef2d3 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -19,6 +19,7 @@ package org.apache.hudi.common.table.log; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.fs.SizeAwareFSDataInputStream; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock; import org.apache.hudi.common.table.log.block.HoodieCommandBlock; @@ -72,8 +73,8 @@ public class HoodieLogFileReader implements HoodieLogFormat.Reader { boolean readBlockLazily, boolean reverseReader) throws IOException { FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize); if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) { - this.inputStream = new FSDataInputStream( - new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize)); + this.inputStream = new SizeAwareFSDataInputStream(logFile.getPath(), new FSDataInputStream( + new BufferedFSInputStream((FSInputStream) fsDataInputStream.getWrappedStream(), bufferSize))); } else { // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream // need to wrap in another BufferedFSInputStream the make bufferSize work? diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java index 6ca0831..fed5516 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java @@ -99,6 +99,7 @@ public class HoodieMetadataReader implements Serializable { public static final String VALIDATE_FILES_STR = "validate_files"; public static final String VALIDATE_ERRORS_STR = "validate_errors"; public static final String SCAN_STR = "scan"; + public static final String BASEFILE_READ_STR = "basefile_read"; // Stats names public static final String STAT_TOTAL_BASE_FILE_SIZE = "totalBaseFileSizeInBytes"; @@ -116,7 +117,7 @@ public class HoodieMetadataReader implements Serializable { protected final SerializableConfiguration hadoopConf; protected final String datasetBasePath; protected final String metadataBasePath; - protected transient Registry metricsRegistry; + protected Registry metricsRegistry; protected HoodieTableMetaClient metaClient; protected boolean enabled; private final boolean validateLookups; @@ -138,6 +139,17 @@ public class HoodieMetadataReader implements Serializable { */ public HoodieMetadataReader(Configuration conf, String datasetBasePath, String spillableMapDirectory, boolean enabled, boolean validateLookups) { + this(conf, datasetBasePath, spillableMapDirectory, enabled, validateLookups, false); + } + + /** + * Create a the Metadata Table in read-only mode. + * + * @param hadoopConf {@code Configuration} + * @param basePath The basePath for the dataset + */ + public HoodieMetadataReader(Configuration conf, String datasetBasePath, String spillableMapDirectory, + boolean enabled, boolean validateLookups, boolean enableMetrics) { this.hadoopConf = new SerializableConfiguration(conf); this.datasetBasePath = datasetBasePath; this.metadataBasePath = getMetadataTableBasePath(datasetBasePath); @@ -158,6 +170,10 @@ public class HoodieMetadataReader implements Serializable { LOG.info("Metadata table is disabled."); } + if (enableMetrics) { + metricsRegistry = Registry.getRegistry("HoodieMetadata"); + } + this.enabled = enabled; } @@ -334,10 +350,12 @@ public class HoodieMetadataReader implements Serializable { // Retrieve record from base file HoodieRecord<HoodieMetadataPayload> hoodieRecord = null; if (basefileReader != null) { + long t1 = System.currentTimeMillis(); Option<GenericRecord> baseRecord = basefileReader.getRecordByKey(key); if (baseRecord.isPresent()) { hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) baseRecord.get(), metaClient.getTableConfig().getPayloadClass()); + updateMetrics(BASEFILE_READ_STR, System.currentTimeMillis() - t1); } } @@ -406,7 +424,7 @@ public class HoodieMetadataReader implements Serializable { // TODO: The below code may open the metadata to include incomplete instants on the dataset logRecordScanner = - new HoodieMetadataMergedLogRecordScanner(FSUtils.getFs(datasetBasePath, hadoopConf.get()), metadataBasePath, + new HoodieMetadataMergedLogRecordScanner(metaClient.getFs(), metadataBasePath, logFilePaths, schema, latestMetaInstantTimestamp, maxMemorySizeInBytes, bufferSize, spillableMapDirectory, null); @@ -512,25 +530,29 @@ public class HoodieMetadataReader implements Serializable { } public Map<String, String> getStats(boolean detailed) throws IOException { + metaClient.reloadActiveTimeline(); + HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline()); + return getStats(fsView, detailed); + } + + private Map<String, String> getStats(HoodieTableFileSystemView fsView, boolean detailed) throws IOException { Map<String, String> stats = new HashMap<>(); - FileSystem fs = FSUtils.getFs(metadataBasePath, hadoopConf.get()); // Total size of the metadata and count of base/log files long totalBaseFileSizeInBytes = 0; long totalLogFileSizeInBytes = 0; int baseFileCount = 0; int logFileCount = 0; - HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline()); List<FileSlice> latestSlices = fsView.getLatestFileSlices(METADATA_PARTITION_NAME).collect(Collectors.toList()); for (FileSlice slice : latestSlices) { if (slice.getBaseFile().isPresent()) { - totalBaseFileSizeInBytes += fs.getFileStatus(new Path(slice.getBaseFile().get().getPath())).getLen(); + totalBaseFileSizeInBytes += slice.getBaseFile().get().getFileStatus().getLen(); ++baseFileCount; } Iterator<HoodieLogFile> it = slice.getLogFiles().iterator(); while (it.hasNext()) { - totalLogFileSizeInBytes += fs.getFileStatus(it.next().getPath()).getLen(); + totalLogFileSizeInBytes += it.next().getFileStatus().getLen(); ++logFileCount; } } @@ -550,40 +572,33 @@ public class HoodieMetadataReader implements Serializable { } protected void updateMetrics(String action, long durationInMs) { - String countKey = action + ".count"; - String durationKey = action + ".duration"; - Registry registry = getMetricsRegistry(); - - // Update average for duration and total for count - long existingCount = registry.getAllCounts().getOrDefault(countKey, 0L); - long existingDuration = registry.getAllCounts().getOrDefault(durationKey, 0L); - long avgDuration = (long)Math.ceil((existingDuration * existingCount + durationInMs) / (existingCount + 1)); + if (metricsRegistry == null) { + return; + } - registry.add(countKey, 1); - registry.add(durationKey, avgDuration - existingDuration); + // Update sum of duration and total for count + String countKey = action + ".count"; + String durationKey = action + ".totalDuration"; + metricsRegistry.add(countKey, 1); + metricsRegistry.add(durationKey, durationInMs); - LOG.info(String.format("Updating metadata metrics (%s=%dms, %s=%d)", durationKey, avgDuration, countKey, - existingCount + 1)); + LOG.info(String.format("Updating metadata metrics (%s=%dms, %s=1)", durationKey, durationInMs, countKey)); } protected void updateMetrics(long totalBaseFileSizeInBytes, long totalLogFileSizeInBytes, int baseFileCount, - int logFileCount) { - LOG.info(String.format("Updating metadata size metrics (basefile.size=%d, logfile.size=%d, basefile.count=%d, " - + "logfile.count=%d)", totalBaseFileSizeInBytes, totalLogFileSizeInBytes, baseFileCount, logFileCount)); - - Registry registry = getMetricsRegistry(); - registry.add("basefile.size", totalBaseFileSizeInBytes); - registry.add("logfile.size", totalLogFileSizeInBytes); - registry.add("basefile.count", baseFileCount); - registry.add("logfile.count", logFileCount); - } - - private Registry getMetricsRegistry() { + int logFileCount) { if (metricsRegistry == null) { - metricsRegistry = Registry.getRegistry("HoodieMetadata"); + return; } - return metricsRegistry; + // Update sizes and count for metadata table's data files + metricsRegistry.add("basefile.size", totalBaseFileSizeInBytes); + metricsRegistry.add("logfile.size", totalLogFileSizeInBytes); + metricsRegistry.add("basefile.count", baseFileCount); + metricsRegistry.add("logfile.count", logFileCount); + + LOG.info(String.format("Updating metadata size metrics (basefile.size=%d, logfile.size=%d, basefile.count=%d, " + + "logfile.count=%d)", totalBaseFileSizeInBytes, totalLogFileSizeInBytes, baseFileCount, logFileCount)); } /**
