This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ce1c983e7a0b2fb34270a12d33a7c300433aabc3
Author: Prashant Wason <[email protected]>
AuthorDate: Fri Oct 16 17:12:09 2020 -0700

    [RFC-15] Using distributed metrics registry in metadata code.
---
 .../apache/hudi/metadata/HoodieMetadataWriter.java | 36 +++++-----
 .../apache/hudi/metadata/TestHoodieMetadata.java   |  2 +-
 .../hudi/common/table/log/HoodieLogFileReader.java |  5 +-
 .../apache/hudi/metadata/HoodieMetadataReader.java | 79 +++++++++++++---------
 4 files changed, 70 insertions(+), 52 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
index 295f15f..5491451 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriter.java
@@ -45,6 +45,7 @@ import org.apache.hudi.client.utils.ClientUtils;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -68,6 +69,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metrics.DistributedRegistry;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -95,12 +97,6 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
   private static Map<String, HoodieMetadataWriter> instances = new HashMap<>();
 
   public static HoodieMetadataWriter instance(Configuration conf, 
HoodieWriteConfig writeConfig) {
-    try {
-      return new HoodieMetadataWriter(conf, writeConfig);
-    } catch (IOException e) {
-      throw new HoodieMetadataException("Could not initialize 
HoodieMetadataWriter", e);
-    }
-    /*
     return instances.computeIfAbsent(writeConfig.getBasePath(), k -> {
       try {
         return new HoodieMetadataWriter(conf, writeConfig);
@@ -108,12 +104,11 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
         throw new HoodieMetadataException("Could not initialize 
HoodieMetadataWriter", e);
       }
     });
-    */
   }
 
   HoodieMetadataWriter(Configuration hadoopConf, HoodieWriteConfig 
writeConfig) throws IOException {
     super(hadoopConf, writeConfig.getBasePath(), 
writeConfig.getSpillableMapBasePath(),
-        writeConfig.useFileListingMetadata(), 
writeConfig.getFileListingMetadataVerify());
+        writeConfig.useFileListingMetadata(), 
writeConfig.getFileListingMetadataVerify(), false);
 
     if (writeConfig.useFileListingMetadata()) {
       this.tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
@@ -126,6 +121,14 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
       // Metadata Table cannot have its metadata optimized
       ValidationUtils.checkArgument(this.config.shouldAutoCommit(), "Auto 
commit is required for Metadata Table");
       ValidationUtils.checkArgument(!this.config.useFileListingMetadata(), 
"File listing cannot be used for Metadata Table");
+
+      if (config.isMetricsOn()) {
+        if (config.isExecutorMetricsEnabled()) {
+          metricsRegistry = Registry.getRegistry("HoodieMetadata", 
DistributedRegistry.class.getName());
+        } else {
+          metricsRegistry = Registry.getRegistry("HoodieMetadata");
+        }
+      }
     } else {
       enabled = false;
     }
@@ -162,6 +165,7 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
     if (writeConfig.isMetricsOn()) {
       HoodieMetricsConfig.Builder metricsConfig = 
HoodieMetricsConfig.newBuilder()
           .withReporterType(writeConfig.getMetricsReporterType().toString())
+          .withExecutorMetrics(writeConfig.isExecutorMetricsEnabled())
           .on(true);
       switch (writeConfig.getMetricsReporterType()) {
         case GRAPHITE:
@@ -215,6 +219,10 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
    */
   public void initialize(JavaSparkContext jsc) {
     try {
+      if (metricsRegistry instanceof DistributedRegistry) {
+        ((DistributedRegistry) metricsRegistry).register(jsc);
+      }
+
       if (enabled) {
         initializeAndSync(jsc);
       }
@@ -261,12 +269,6 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
     } else {
       updateMetrics(INITIALIZE_STR, durationInMs);
     }
-
-    // Total size of the metadata and count of base/log files
-    Map<String, String> stats = getStats(false);
-    updateMetrics(Long.valueOf(stats.get(STAT_TOTAL_BASE_FILE_SIZE)),
-        Long.valueOf(stats.get(STAT_TOTAL_LOG_FILE_SIZE)), 
Integer.valueOf(stats.get(STAT_COUNT_BASE_FILES)),
-        Integer.valueOf(stats.get(STAT_COUNT_LOG_FILES)));
   }
 
   /**
@@ -287,7 +289,7 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
         HoodieFileFormat.HFILE.toString());
 
     // List all partitions in the basePath of the containing dataset
-    FileSystem fs = FSUtils.getFs(datasetBasePath, hadoopConf.get());
+    FileSystem fs = datasetMetaClient.getFs();
     List<String> partitions = FSUtils.getAllPartitionPaths(fs, 
datasetBasePath, false);
     LOG.info("Initializing metadata table by using file listings in " + 
partitions.size() + " partitions");
 
@@ -297,7 +299,7 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
     int parallelism =  Math.min(partitions.size(), jsc.defaultParallelism()) + 
1; // +1 to prevent 0 parallelism
     JavaPairRDD<String, FileStatus[]> partitionFileListRDD = 
jsc.parallelize(partitions, parallelism)
         .mapToPair(partition -> {
-          FileSystem fsys = FSUtils.getFs(dbasePath, serializedConf.get());
+          FileSystem fsys = datasetMetaClient.getFs();
           FileStatus[] statuses = FSUtils.getAllDataFilesInPartition(fsys, new 
Path(dbasePath, partition));
           return new Tuple2<>(partition, statuses);
         });
@@ -473,7 +475,7 @@ public class HoodieMetadataWriter extends 
HoodieMetadataReader implements Serial
       return;
     }
 
-    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
     long cnt = timeline.filterCompletedInstants().getInstants().filter(i -> 
i.getTimestamp().equals(instantTime)).count();
     if (cnt == 1) {
       LOG.info("Ignoring update from cleaner plan for already completed 
instant " + instantTime);
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java 
b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
index a901916..9395d5f 100644
--- a/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
+++ b/hudi-client/src/test/java/org/apache/hudi/metadata/TestHoodieMetadata.java
@@ -762,6 +762,6 @@ public class TestHoodieMetadata extends 
HoodieClientTestHarness {
         
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
         .withUseFileListingMetadata(useFileListingMetadata)
         
.withMetricsConfig(HoodieMetricsConfig.newBuilder().withReporterType("CONSOLE").on(enableMetrics)
-                           .usePrefix("unit-test").build());
+                           
.withExecutorMetrics(true).usePrefix("unit-test").build());
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 5d2e185..d7ef2d3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.table.log;
 
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.SizeAwareFSDataInputStream;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
@@ -72,8 +73,8 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
       boolean readBlockLazily, boolean reverseReader) throws IOException {
     FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), 
bufferSize);
     if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
-      this.inputStream = new FSDataInputStream(
-          new BufferedFSInputStream((FSInputStream) 
fsDataInputStream.getWrappedStream(), bufferSize));
+      this.inputStream = new SizeAwareFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
+          new BufferedFSInputStream((FSInputStream) 
fsDataInputStream.getWrappedStream(), bufferSize)));
     } else {
       // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
       // need to wrap in another BufferedFSInputStream the make bufferSize 
work?
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java
index 6ca0831..fed5516 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataReader.java
@@ -99,6 +99,7 @@ public class HoodieMetadataReader implements Serializable {
   public static final String VALIDATE_FILES_STR = "validate_files";
   public static final String VALIDATE_ERRORS_STR = "validate_errors";
   public static final String SCAN_STR = "scan";
+  public static final String BASEFILE_READ_STR = "basefile_read";
 
   // Stats names
   public static final String STAT_TOTAL_BASE_FILE_SIZE = 
"totalBaseFileSizeInBytes";
@@ -116,7 +117,7 @@ public class HoodieMetadataReader implements Serializable {
   protected final SerializableConfiguration hadoopConf;
   protected final String datasetBasePath;
   protected final String metadataBasePath;
-  protected transient Registry metricsRegistry;
+  protected Registry metricsRegistry;
   protected HoodieTableMetaClient metaClient;
   protected boolean enabled;
   private final boolean validateLookups;
@@ -138,6 +139,17 @@ public class HoodieMetadataReader implements Serializable {
    */
   public HoodieMetadataReader(Configuration conf, String datasetBasePath, 
String spillableMapDirectory,
                               boolean enabled, boolean validateLookups) {
+    this(conf, datasetBasePath, spillableMapDirectory, enabled, 
validateLookups, false);
+  }
+
+  /**
+   * Create a the Metadata Table in read-only mode.
+   *
+   * @param hadoopConf {@code Configuration}
+   * @param basePath The basePath for the dataset
+   */
+  public HoodieMetadataReader(Configuration conf, String datasetBasePath, 
String spillableMapDirectory,
+                              boolean enabled, boolean validateLookups, 
boolean enableMetrics) {
     this.hadoopConf = new SerializableConfiguration(conf);
     this.datasetBasePath = datasetBasePath;
     this.metadataBasePath = getMetadataTableBasePath(datasetBasePath);
@@ -158,6 +170,10 @@ public class HoodieMetadataReader implements Serializable {
       LOG.info("Metadata table is disabled.");
     }
 
+    if (enableMetrics) {
+      metricsRegistry = Registry.getRegistry("HoodieMetadata");
+    }
+
     this.enabled = enabled;
   }
 
@@ -334,10 +350,12 @@ public class HoodieMetadataReader implements Serializable 
{
     // Retrieve record from base file
     HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
     if (basefileReader != null) {
+      long t1 = System.currentTimeMillis();
       Option<GenericRecord> baseRecord = basefileReader.getRecordByKey(key);
       if (baseRecord.isPresent()) {
         hoodieRecord = 
SpillableMapUtils.convertToHoodieRecordPayload((GenericRecord) baseRecord.get(),
             metaClient.getTableConfig().getPayloadClass());
+        updateMetrics(BASEFILE_READ_STR, System.currentTimeMillis() - t1);
       }
     }
 
@@ -406,7 +424,7 @@ public class HoodieMetadataReader implements Serializable {
 
     // TODO: The below code may open the metadata to include incomplete 
instants on the dataset
     logRecordScanner =
-        new 
HoodieMetadataMergedLogRecordScanner(FSUtils.getFs(datasetBasePath, 
hadoopConf.get()), metadataBasePath,
+        new HoodieMetadataMergedLogRecordScanner(metaClient.getFs(), 
metadataBasePath,
             logFilePaths, schema, latestMetaInstantTimestamp, 
maxMemorySizeInBytes, bufferSize,
             spillableMapDirectory, null);
 
@@ -512,25 +530,29 @@ public class HoodieMetadataReader implements Serializable 
{
   }
 
   public Map<String, String> getStats(boolean detailed) throws IOException {
+    metaClient.reloadActiveTimeline();
+    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
+    return getStats(fsView, detailed);
+  }
+
+  private Map<String, String> getStats(HoodieTableFileSystemView fsView, 
boolean detailed) throws IOException {
     Map<String, String> stats = new HashMap<>();
-    FileSystem fs = FSUtils.getFs(metadataBasePath, hadoopConf.get());
 
     // Total size of the metadata and count of base/log files
     long totalBaseFileSizeInBytes = 0;
     long totalLogFileSizeInBytes = 0;
     int baseFileCount = 0;
     int logFileCount = 0;
-    HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
     List<FileSlice> latestSlices = 
fsView.getLatestFileSlices(METADATA_PARTITION_NAME).collect(Collectors.toList());
 
     for (FileSlice slice : latestSlices) {
       if (slice.getBaseFile().isPresent()) {
-        totalBaseFileSizeInBytes += fs.getFileStatus(new 
Path(slice.getBaseFile().get().getPath())).getLen();
+        totalBaseFileSizeInBytes += 
slice.getBaseFile().get().getFileStatus().getLen();
         ++baseFileCount;
       }
       Iterator<HoodieLogFile> it = slice.getLogFiles().iterator();
       while (it.hasNext()) {
-        totalLogFileSizeInBytes += 
fs.getFileStatus(it.next().getPath()).getLen();
+        totalLogFileSizeInBytes += it.next().getFileStatus().getLen();
         ++logFileCount;
       }
     }
@@ -550,40 +572,33 @@ public class HoodieMetadataReader implements Serializable 
{
   }
 
   protected void updateMetrics(String action, long durationInMs) {
-    String countKey = action + ".count";
-    String durationKey = action + ".duration";
-    Registry registry = getMetricsRegistry();
-
-    // Update average for duration and total for count
-    long existingCount = registry.getAllCounts().getOrDefault(countKey, 0L);
-    long existingDuration = registry.getAllCounts().getOrDefault(durationKey, 
0L);
-    long avgDuration = (long)Math.ceil((existingDuration * existingCount + 
durationInMs) / (existingCount + 1));
+    if (metricsRegistry == null) {
+      return;
+    }
 
-    registry.add(countKey, 1);
-    registry.add(durationKey, avgDuration - existingDuration);
+    // Update sum of duration and total for count
+    String countKey = action + ".count";
+    String durationKey = action + ".totalDuration";
+    metricsRegistry.add(countKey, 1);
+    metricsRegistry.add(durationKey, durationInMs);
 
-    LOG.info(String.format("Updating metadata metrics (%s=%dms, %s=%d)", 
durationKey, avgDuration, countKey,
-        existingCount + 1));
+    LOG.info(String.format("Updating metadata metrics (%s=%dms, %s=1)", 
durationKey, durationInMs, countKey));
   }
 
   protected void updateMetrics(long totalBaseFileSizeInBytes, long 
totalLogFileSizeInBytes, int baseFileCount,
-                            int logFileCount) {
-    LOG.info(String.format("Updating metadata size metrics (basefile.size=%d, 
logfile.size=%d, basefile.count=%d, "
-        + "logfile.count=%d)", totalBaseFileSizeInBytes, 
totalLogFileSizeInBytes, baseFileCount, logFileCount));
-
-    Registry registry = getMetricsRegistry();
-    registry.add("basefile.size", totalBaseFileSizeInBytes);
-    registry.add("logfile.size", totalLogFileSizeInBytes);
-    registry.add("basefile.count", baseFileCount);
-    registry.add("logfile.count", logFileCount);
-  }
-
-  private Registry getMetricsRegistry() {
+                               int logFileCount) {
     if (metricsRegistry == null) {
-      metricsRegistry = Registry.getRegistry("HoodieMetadata");
+      return;
     }
 
-    return metricsRegistry;
+    // Update sizes and count for metadata table's data files
+    metricsRegistry.add("basefile.size", totalBaseFileSizeInBytes);
+    metricsRegistry.add("logfile.size", totalLogFileSizeInBytes);
+    metricsRegistry.add("basefile.count", baseFileCount);
+    metricsRegistry.add("logfile.count", logFileCount);
+
+    LOG.info(String.format("Updating metadata size metrics (basefile.size=%d, 
logfile.size=%d, basefile.count=%d, "
+        + "logfile.count=%d)", totalBaseFileSizeInBytes, 
totalLogFileSizeInBytes, baseFileCount, logFileCount));
   }
 
   /**

Reply via email to