This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a5fde6b5c22 [HUDI-6203] Add support to standalone utility tool to
fetch file size stats for a given table w/ optional partition filters (#8645)
a5fde6b5c22 is described below
commit a5fde6b5c22fed353e5ac4350fa1beed15b802df
Author: Amrish Lal <[email protected]>
AuthorDate: Wed May 17 11:10:53 2023 -0700
[HUDI-6203] Add support to standalone utility tool to fetch file size stats
for a given table w/ optional partition filters (#8645)
Provides file size stats for the latest updates that Hudi is consuming. The
following stats are produced by this class: Number of files, Total table size,
Minimum file size, Maximum file size, Average file size, Median file size, p50
file size, p90 file size, p95 file size, p99 file size.
---
.../org/apache/hudi/utilities/TableSizeStats.java | 466 +++++++++++++++++++++
1 file changed, 466 insertions(+)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
new file mode 100644
index 00000000000..95b4128f8cf
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/TableSizeStats.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewManager;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.UniformReservoir;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.format.DateTimeParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class provides file size updates for the latest files that hudi is
consuming. These stats are at table level by default, but
+ * specifying --enable-partition-stats will also show stats at the partition
level. If a start date (--start-date parameter) and/or
+ * end date (--end-date parameter) are specified, stats are based on files
that were modified in the half-open interval
+ * [start date (--start-date parameter), end date (--end-date parameter)).
--num-days parameter can be used to select data files over
+ * last --num-days. If --start-date is specified, --num-days will be ignored.
If none of the date parameters are set, stats will be
+ * computed over all data files of all partitions in the table. Note that date
filtering is carried out only if the partition name
+ * has the format '[column name=]yyyy-M-d', '[column name=]yyyy/M/d'.
+ * <br><br>
+ * The following stats are produced by this class:
+ * Number of files.
+ * Total table size.
+ * Minimum file size
+ * Maximum file size
+ * Average file size
+ * Median file size
+ * p50 file size
+ * p90 file size
+ * p95 file size
+ * p99 file size
+ * <br><br>
+ * Sample spark-submit command:
+ * ./bin/spark-submit \
+ * --class org.apache.hudi.utilities.TableSizeStats \
+ *
$HUDI_DIR/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.11-0.14.0-SNAPSHOT.jar
\
+ * --base-path <base-path> \
+ * --num-days <number-of-days>
+ */
+public class TableSizeStats implements Serializable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TableSizeStats.class);
+
+ // Date formatter for parsing partition dates (example: 2023/5/5/ or
2023-5-5).
+ private static final DateTimeFormatter DATE_FORMATTER =
+ (new
DateTimeFormatterBuilder()).appendOptional(DateTimeFormatter.ofPattern("yyyy/M/d")).appendOptional(DateTimeFormatter.ofPattern("yyyy-M-d")).toFormatter();
+
+ // File size stats will be displayed in the units specified below.
+ private static final String[] FILE_SIZE_UNITS = {"B", "KB", "MB", "GB",
"TB"};
+
+ // Spark context
+ private transient JavaSparkContext jsc;
+ // config
+ private Config cfg;
+ // Properties with source, hoodie client, key generator etc.
+ private TypedProperties props;
+
+ public TableSizeStats(JavaSparkContext jsc, Config cfg) {
+ this.jsc = jsc;
+ this.cfg = cfg;
+
+ this.props = cfg.propsFilePath == null
+ ? UtilHelpers.buildProperties(cfg.configs)
+ : readConfigFromFileSystem(jsc, cfg);
+ }
+
+ /**
+ * Reads config from the file system.
+ *
+ * @param jsc {@link JavaSparkContext} instance.
+ * @param cfg {@link Config} instance.
+ * @return the {@link TypedProperties} instance.
+ */
+ private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc,
Config cfg) {
+ return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new
Path(cfg.propsFilePath), cfg.configs)
+ .getProps(true);
+ }
+
+ public static class Config implements Serializable {
+ @Parameter(names = {"--base-path", "-bp"}, description = "Base path for
the table", required = false)
+ public String basePath = null;
+
+ @Parameter(names = {"--num-days", "-nd"}, description = "Consider files
modified within this many days.", required = false)
+ public long numDays = 0;
+
+ @Parameter(names = {"--start-date", "-sd"}, description = "Consider files
modified on or after this date.", required = false)
+ public String startDate = null;
+
+ @Parameter(names = {"--end-date", "-ed"}, description = "Consider files
modified before this date.", required = false)
+ public String endDate = null;
+
+ @Parameter(names = {"--enable-table-stats", "-fs"}, description = "Show
file-level stats.", required = false)
+ public boolean tableStats = false;
+
+ @Parameter(names = {"--enable-partition-stats", "-ps"}, description =
"Show partition-level stats.", required = false)
+ public boolean partitionStats = false;
+
+ @Parameter(names = {"--props-path", "-pp"}, description = "Properties file
containing base paths one per line", required = false)
+ public String propsFilePath = null;
+
+ @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism
for valuation", required = false)
+ public int parallelism = 200;
+
+ @Parameter(names = {"--spark-master", "-ms"}, description = "Spark
master", required = false)
+ public String sparkMaster = null;
+
+ @Parameter(names = {"--spark-memory", "-sm"}, description = "spark memory
to use", required = false)
+ public String sparkMemory = "1g";
+
+ @Parameter(names = {"--hoodie-conf"}, description = "Any configuration
that can be set in the properties file "
+ + "(using the CLI parameter \"--props\") can also be passed command
line using this parameter. This can be repeated",
+ splitter = IdentitySplitter.class)
+ public List<String> configs = new ArrayList<>();
+
+ @Parameter(names = {"--help", "-h"}, help = true)
+ public Boolean help = false;
+
+ @Override
+ public String toString() {
+ return "TableSizeStats {\n"
+ + " --base-path " + basePath + ", \n"
+ + " --num-days " + numDays + ", \n"
+ + " --start-date " + startDate + ", \n"
+ + " --end-date " + endDate + ", \n"
+ + " --enable-table-stats " + tableStats + ", \n"
+ + " --enable-partition-stats " + partitionStats + ", \n"
+ + " --parallelism " + parallelism + ", \n"
+ + " --spark-master " + sparkMaster + ", \n"
+ + " --spark-memory " + sparkMemory + ", \n"
+ + " --props " + propsFilePath + ", \n"
+ + " --hoodie-conf " + configs
+ + "\n}";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Config config = (Config) o;
+ return basePath.equals(config.basePath)
+ && Objects.equals(numDays, config.numDays)
+ && Objects.equals(startDate, config.startDate)
+ && Objects.equals(endDate, config.endDate)
+ && Objects.equals(tableStats, config.tableStats)
+ && Objects.equals(partitionStats, config.partitionStats)
+ && Objects.equals(parallelism, config.parallelism)
+ && Objects.equals(sparkMaster, config.sparkMaster)
+ && Objects.equals(sparkMemory, config.sparkMemory)
+ && Objects.equals(propsFilePath, config.propsFilePath)
+ && Objects.equals(configs, config.configs);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(basePath, numDays, startDate, endDate, tableStats,
partitionStats, parallelism, sparkMaster, sparkMemory, propsFilePath, configs,
help);
+ }
+ }
+
+ public static void main(String[] args) {
+ final Config cfg = new Config();
+ JCommander cmd = new JCommander(cfg, null, args);
+
+ if (cfg.help || args.length == 0) {
+ cmd.usage();
+ System.exit(1);
+ }
+
+ SparkConf sparkConf = UtilHelpers.buildSparkConf("Table-Size-Stats",
cfg.sparkMaster);
+ sparkConf.set("spark.executor.memory", cfg.sparkMemory);
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+ try {
+ TableSizeStats tableSizeStats = new TableSizeStats(jsc, cfg);
+ tableSizeStats.run();
+ } catch (TableNotFoundException e) {
+ LOG.warn(String.format("The Hudi data table is not found: [%s].",
cfg.basePath), e);
+ } catch (Throwable throwable) {
+ LOG.error("Failed to get table size stats for " + cfg, throwable);
+ } finally {
+ jsc.stop();
+ }
+ }
+
+ public void run() {
+ try {
+ LOG.info(cfg.toString());
+ LOG.info(" ****** Fetching table size stats ******");
+
+ // Determine starting and ending date intervals for filtering data files.
+ LocalDate[] dateInterval = getUserSpecifiedDateInterval(cfg);
+
+ if (cfg.propsFilePath != null) {
+ List<String> filePaths = getFilePaths(cfg.propsFilePath,
jsc.hadoopConfiguration());
+ for (String filePath : filePaths) {
+ logTableStats(filePath, dateInterval);
+ }
+ } else {
+ if (cfg.basePath == null) {
+ throw new HoodieIOException("Base path needs to be set.");
+ }
+ logTableStats(cfg.basePath, dateInterval);
+ }
+
+ } catch (Exception e) {
+ throw new HoodieException("Unable to do fetch table size stats." +
cfg.basePath, e);
+ }
+ }
+
+ private void logTableStats(String basePath, LocalDate[] dateInterval) throws
IOException {
+
+ LOG.warn("Processing table " + basePath);
+ HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
+ .enable(isMetadataEnabled(basePath, jsc))
+ .build();
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+ HoodieTableMetadata tableMetadata =
HoodieTableMetadata.create(engineContext, metadataConfig, basePath,
+ FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue());
+ SerializableConfiguration serializableConfiguration = new
SerializableConfiguration(jsc.hadoopConfiguration());
+
+ List<String> allPartitions = tableMetadata.getAllPartitionPaths();
+
+ // As a sanity check, throw exception and exit early if date interval is
specified, but the first partition does not have
+ // date.
+ if (dateInterval != null && getPartitionDate(allPartitions.get(0)) ==
null) {
+ throw new HoodieException(
+ "Cannot apply --start-date, --end-date, or --num-days when partition
does not contain date. Interval: " + Arrays.toString(dateInterval) + ",
Partition Name: " + allPartitions.get(0));
+ }
+
+ final Histogram tableHistogram = new Histogram(new
UniformReservoir(1_000_000));
+ allPartitions.forEach(partition -> {
+ LocalDate partitionDate = null;
+ LocalDate startDate = null;
+ LocalDate endDate = null;
+ if (dateInterval != null) {
+ // Date interval is specified, so try to parse date out of partition
name.
+ partitionDate = getPartitionDate(partition);
+ startDate = dateInterval[0];
+ endDate = dateInterval[1];
+ }
+
+ // Compute file size stats for all files in this partition if:
+ // 1. partition date is null (i.e partition name does not contain a date)
+ // 2. both start date and end date are null (not specified).
+ // 3. endDate is null (not specified) and partition date is equal to or
after startDate.
+ // 4. startDate is null (not specified) and partition date is before
endDate.
+ // 5. startDate and endDate are both specified and partition date lies
in the range [startDate, endDate)
+ if (partitionDate == null
+ || (startDate == null && endDate == null)
+ || (endDate == null && (partitionDate.isEqual(startDate) ||
partitionDate.isAfter(startDate)))
+ || (startDate == null && partitionDate.isBefore(endDate))
+ || (startDate != null && endDate != null &&
((partitionDate.isEqual(startDate) || partitionDate.isAfter(startDate)) &&
partitionDate.isBefore(endDate)))) {
+ HoodieTableMetaClient metaClientLocal = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(serializableConfiguration.get()).build();
+ HoodieMetadataConfig metadataConfig1 =
HoodieMetadataConfig.newBuilder()
+ .enable(false)
+ .build();
+ HoodieTableFileSystemView fileSystemView = FileSystemViewManager
+ .createInMemoryFileSystemView(new
HoodieLocalEngineContext(serializableConfiguration.get()),
+ metaClientLocal, metadataConfig1);
+ List<HoodieBaseFile> baseFiles =
fileSystemView.getLatestBaseFiles(partition).collect(Collectors.toList());
+
+ // No need to collect partition level stats if user hasn't requested
partition-level stats or if there are no partitions in this table.
+ final Histogram partitionHistogram = cfg.partitionStats &&
partition.trim().length() > 0 ? new Histogram(new UniformReservoir(1_000_000))
: null;
+ baseFiles.forEach(baseFile -> {
+ // Add file size to histogram since the file was modified within the
specified date range.
+ if (partitionHistogram != null) {
+ partitionHistogram.update(baseFile.getFileSize());
+ }
+
+ tableHistogram.update(baseFile.getFileSize());
+ });
+
+ // Display file size distribution stats for partition
+ if (partitionHistogram != null) {
+ logStats("Partition stats [name: " + partition + (partitionDate !=
null ? ", has date: yes" : "") + "]", partitionHistogram);
+ }
+ }
+ });
+
+ if (cfg.tableStats) {
+ // Display file size distribution stats for entire table.
+ logStats("Table stats [path: " + basePath + "]", tableHistogram);
+ } else {
+ // Display only total talbe size
+ LOG.info("Total size: {}",
getFileSizeUnit(Arrays.stream(tableHistogram.getSnapshot().getValues()).sum()));
+ }
+ }
+
+ private static boolean isMetadataEnabled(String basePath, JavaSparkContext
jsc) {
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(basePath)
+ .setConf(jsc.hadoopConfiguration()).build();
+
+ Set<String> partitions =
metaClient.getTableConfig().getMetadataPartitions();
+ return !partitions.isEmpty() && partitions.contains("files");
+ }
+
+ private static List<String> getFilePaths(String propsPath, Configuration
hadoopConf) {
+ List<String> filePaths = new ArrayList<>();
+ FileSystem fs = FSUtils.getFs(
+ propsPath,
+ Option.ofNullable(hadoopConf).orElseGet(Configuration::new)
+ );
+
+ try (BufferedReader reader = new BufferedReader(new
InputStreamReader(fs.open(new Path(propsPath))))) {
+ String line = reader.readLine();
+ while (line != null) {
+ filePaths.add(line);
+ line = reader.readLine();
+ }
+ } catch (IOException ioe) {
+ LOG.error("Error reading in properties from dfs from file." + propsPath);
+ throw new HoodieIOException("Cannot read properties from dfs from file "
+ propsPath, ioe);
+ }
+ return filePaths;
+ }
+
+ private static LocalDate[] getUserSpecifiedDateInterval(Config cfg) {
+ // Set endDate to null by default.
+ LocalDate endDate = null;
+ if (cfg.endDate != null) {
+ try {
+ endDate = LocalDate.parse(cfg.endDate, DATE_FORMATTER);
+ LOG.info("Setting ending date to {}. ", endDate);
+ } catch (DateTimeParseException dtpe) {
+ throw new HoodieException("Unable to parse --end-date. ", dtpe);
+ }
+ } else {
+ LOG.info("End date is not specified: {}.", endDate);
+ }
+
+ // Set startDate to null by default.
+ LocalDate startDate = null;
+
+ // Set startDate to cfg.startDate if specified. cfg.startDate takes
priority over cfg.numDays if both are specified.
+ if (cfg.startDate != null) {
+ startDate = LocalDate.parse(cfg.startDate, DATE_FORMATTER);
+ LOG.info("Setting starting date to {}.", startDate);
+ } else {
+ if (cfg.numDays == 0) {
+ LOG.info("Start date not specified: {}.", startDate);
+ } else if (cfg.numDays > 0) {
+ endDate = LocalDate.now();
+ startDate = endDate.minusDays(cfg.numDays);
+ LOG.info("Setting starting date to {} ({} - {} days). ", startDate,
endDate, cfg.numDays);
+ } else {
+ throw new HoodieException("--num-days must specify a positive value.");
+ }
+ }
+
+ // Check if starting date is before ending date.
+ if (startDate != null && endDate != null && !startDate.isBefore(endDate)) {
+ throw new HoodieException("Starting date must be before ending date.
Start Date: " + startDate + ", End Date: " + endDate);
+ }
+
+ return startDate == null && endDate == null ? null : new
LocalDate[]{startDate, endDate};
+ }
+
+ @Nullable
+ private static LocalDate getPartitionDate(String partition) {
+ // Partition name should conform to date format if startDate and/or
endDate are specified. Otherwise, we don't
+ // need to parse partition name as date.
+ String dateString = partition;
+ if (partition.contains("=")) {
+ // Assume partition date format of "<column>=<date>" and try parsing out
date.
+ String[] parts = partition.split("=");
+ if (parts != null && parts.length == 2) {
+ dateString = parts[1].trim();
+ }
+ }
+
+ LocalDate partitionDate = null;
+ try {
+ return LocalDate.parse(dateString, DATE_FORMATTER);
+ } catch (DateTimeParseException dtpe) {
+ LOG.error("Partition name {} must conform to date format if
--start-date, --end-date, or --num-days are specified. ", partition, dtpe);
+ }
+ return partitionDate;
+ }
+
+ private static String getFileSizeUnit(double size) {
+ int counter = 0;
+ while (size > 1024 && counter < FILE_SIZE_UNITS.length) {
+ size /= 1024;
+ counter++;
+ }
+
+ return String.format("%.2f %s", size, FILE_SIZE_UNITS[counter]);
+ }
+
+ private static void logStats(String header, Histogram histogram) {
+ LOG.info(header);
+ Snapshot snapshot = histogram.getSnapshot();
+ LOG.info("Number of files: {}", snapshot.size());
+ LOG.info("Total size: {}",
getFileSizeUnit(Arrays.stream(snapshot.getValues()).sum()));
+ LOG.info("Minimum file size: {}", getFileSizeUnit(snapshot.getMin()));
+ LOG.info("Maximum file size: {}", getFileSizeUnit(snapshot.getMax()));
+ LOG.info("Average file size: {}", getFileSizeUnit(snapshot.getMean()));
+ LOG.info("Median file size: {}", getFileSizeUnit(snapshot.getMedian()));
+ LOG.info("P50 file size: {}", getFileSizeUnit(snapshot.getValue(0.5)));
+ LOG.info("P90 file size: {}", getFileSizeUnit(snapshot.getValue(0.9)));
+ LOG.info("P95 file size: {}", getFileSizeUnit(snapshot.getValue(0.95)));
+ LOG.info("P99 file size: {}", getFileSizeUnit(snapshot.getValue(0.99)));
+ }
+}
\ No newline at end of file