This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch rfc-15
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 389be1be88aa897fbec3777a7def6abb26e11a35
Author: Prashant Wason <[email protected]>
AuthorDate: Thu Oct 8 22:26:07 2020 -0700

    [RFC-15] Add metrics to track the time for each file system call.
---
 .../hudi/common/fs/HoodieWrapperFileSystem.java    | 273 +++++++++++++--------
 .../hudi/common/fs/SizeAwareFSDataInputStream.java |  93 +++++++
 .../common/fs/SizeAwareFSDataOutputStream.java     |  22 +-
 3 files changed, 280 insertions(+), 108 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
index c3f6189..9cea356 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.common.fs;
 
 import org.apache.hudi.common.metrics.Registry;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 
@@ -65,15 +66,45 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   public static final String HOODIE_SCHEME_PREFIX = "hoodie-";
 
-  private enum MetricName {
-    create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, 
listFiles
+  protected enum MetricName {
+    create, rename, delete, listStatus, mkdirs, getFileStatus, globStatus, 
listFiles, read, write
   }
 
   private ConcurrentMap<String, SizeAwareFSDataOutputStream> openStreams = new 
ConcurrentHashMap<>();
   private FileSystem fileSystem;
   private URI uri;
   private ConsistencyGuard consistencyGuard = new NoOpConsistencyGuard();
-  private Registry metricsRegistry = 
Registry.getRegistry(this.getClass().getSimpleName());
+  private static Registry metricsRegistry = 
Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName());
+  private static Registry metricsRegistryMetaFolder =
+      Registry.getRegistry(HoodieWrapperFileSystem.class.getSimpleName() + 
"MetaFolder");
+
+  @FunctionalInterface
+  public interface CheckedFunction<R> {
+    R get() throws IOException;
+  }
+
+  private static Registry getMetricRegistryForPath(Path p) {
+    return ((p != null) && 
(p.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)))
+        ? metricsRegistryMetaFolder : metricsRegistry;
+  }
+
+  protected static <R> R executeFuncWithTimeMetrics(String metricName, Path p, 
CheckedFunction<R> func) throws IOException {
+    Registry registry = getMetricRegistryForPath(p);
+
+    long t1 = System.currentTimeMillis();
+    R res = func.get();
+    registry.increment(metricName);
+    registry.add(metricName + ".totalDuration", System.currentTimeMillis() - 
t1);
+
+    return res;
+  }
+
+  protected static <R> R executeFuncWithTimeAndByteMetrics(String metricName, 
Path p, long byteCount,
+                                                           CheckedFunction<R> 
func) throws IOException {
+    Registry registry = getMetricRegistryForPath(p);
+    registry.add(metricName + ".totalBytes", byteCount);
+    return executeFuncWithTimeMetrics(metricName, p, func);
+  }
 
   public HoodieWrapperFileSystem() {}
 
@@ -140,16 +171,17 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-    return fileSystem.open(convertToDefaultPath(f), bufferSize);
+    return wrapInputStream(f, fileSystem.open(convertToDefaultPath(f), 
bufferSize));
   }
 
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission, boolean 
overwrite, int bufferSize,
       short replication, long blockSize, Progressable progress) throws 
IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    final Path translatedPath = convertToDefaultPath(f);
-    return wrapOutputStream(f,
-        fileSystem.create(translatedPath, permission, overwrite, bufferSize, 
replication, blockSize, progress));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      final Path translatedPath = convertToDefaultPath(f);
+      return wrapOutputStream(f,
+          fileSystem.create(translatedPath, permission, overwrite, bufferSize, 
replication, blockSize, progress));
+    });
   }
 
   private FSDataOutputStream wrapOutputStream(final Path path, 
FSDataOutputStream fsDataOutputStream)
@@ -164,79 +196,99 @@ public class HoodieWrapperFileSystem extends FileSystem {
     return os;
   }
 
+  private FSDataInputStream wrapInputStream(final Path path, FSDataInputStream 
fsDataInputStream) throws IOException {
+    if (fsDataInputStream instanceof SizeAwareFSDataInputStream) {
+      return fsDataInputStream;
+    }
+
+    SizeAwareFSDataInputStream os = new SizeAwareFSDataInputStream(path, 
fsDataInputStream);
+    return os;
+  }
+
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite) throws 
IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), 
overwrite));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), 
overwrite));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f)));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f)));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, Progressable progress) throws 
IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), 
progress));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), 
progress));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, short replication) throws 
IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), 
replication));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), 
replication));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, short replication, Progressable 
progress) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), 
replication, progress));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), 
replication, progress));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) 
throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), 
overwrite, bufferSize));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), 
overwrite, bufferSize));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, 
Progressable progress)
       throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), 
overwrite, bufferSize, progress));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), 
overwrite, bufferSize, progress));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, 
short replication, long blockSize,
       Progressable progress) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f,
-        fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, 
replication, blockSize, progress));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f,
+          fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, 
replication, blockSize, progress));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission, 
EnumSet<CreateFlag> flags, int bufferSize,
       short replication, long blockSize, Progressable progress) throws 
IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f,
-        fileSystem.create(convertToDefaultPath(f), permission, flags, 
bufferSize, replication, blockSize, progress));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f,
+          fileSystem.create(convertToDefaultPath(f), permission, flags, 
bufferSize, replication, blockSize, progress));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission, 
EnumSet<CreateFlag> flags, int bufferSize,
       short replication, long blockSize, Progressable progress, 
Options.ChecksumOpt checksumOpt) throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), 
permission, flags, bufferSize, replication,
-        blockSize, progress, checksumOpt));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), 
permission, flags, bufferSize, replication,
+          blockSize, progress, checksumOpt));
+    });
   }
 
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, 
short replication, long blockSize)
       throws IOException {
-    this.metricsRegistry.increment(MetricName.create.name());
-    return wrapOutputStream(f,
-        fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, 
replication, blockSize));
+    return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
+      return wrapOutputStream(f,
+          fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, 
replication, blockSize));
+    });
   }
 
   @Override
@@ -246,50 +298,53 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public boolean rename(Path src, Path dst) throws IOException {
-    this.metricsRegistry.increment(MetricName.rename.name());
-    try {
-      consistencyGuard.waitTillFileAppears(convertToDefaultPath(src));
-    } catch (TimeoutException e) {
-      throw new HoodieException("Timed out waiting for " + src + " to appear", 
e);
-    }
-
-    boolean success = fileSystem.rename(convertToDefaultPath(src), 
convertToDefaultPath(dst));
-
-    if (success) {
+    return executeFuncWithTimeMetrics(MetricName.rename.name(), src, () -> {
       try {
-        consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+        consistencyGuard.waitTillFileAppears(convertToDefaultPath(src));
       } catch (TimeoutException e) {
-        throw new HoodieException("Timed out waiting for " + dst + " to 
appear", e);
+        throw new HoodieException("Timed out waiting for " + src + " to 
appear", e);
       }
 
-      try {
-        consistencyGuard.waitTillFileDisappears(convertToDefaultPath(src));
-      } catch (TimeoutException e) {
-        throw new HoodieException("Timed out waiting for " + src + " to 
disappear", e);
+      boolean success = fileSystem.rename(convertToDefaultPath(src), 
convertToDefaultPath(dst));
+
+      if (success) {
+        try {
+          consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+        } catch (TimeoutException e) {
+          throw new HoodieException("Timed out waiting for " + dst + " to 
appear", e);
+        }
+
+        try {
+          consistencyGuard.waitTillFileDisappears(convertToDefaultPath(src));
+        } catch (TimeoutException e) {
+          throw new HoodieException("Timed out waiting for " + src + " to 
disappear", e);
+        }
       }
-    }
-    return success;
+      return success;
+    });
   }
 
   @Override
   public boolean delete(Path f, boolean recursive) throws IOException {
-    this.metricsRegistry.increment(MetricName.delete.name());
-    boolean success = fileSystem.delete(convertToDefaultPath(f), recursive);
-
-    if (success) {
-      try {
-        consistencyGuard.waitTillFileDisappears(f);
-      } catch (TimeoutException e) {
-        throw new HoodieException("Timed out waiting for " + f + " to 
disappear", e);
+    return executeFuncWithTimeMetrics(MetricName.delete.name(), f, () -> {
+      boolean success = fileSystem.delete(convertToDefaultPath(f), recursive);
+
+      if (success) {
+        try {
+          consistencyGuard.waitTillFileDisappears(f);
+        } catch (TimeoutException e) {
+          throw new HoodieException("Timed out waiting for " + f + " to 
disappear", e);
+        }
       }
-    }
-    return success;
+      return success;
+    });
   }
 
   @Override
   public FileStatus[] listStatus(Path f) throws IOException {
-    this.metricsRegistry.increment(MetricName.listStatus.name());
-    return fileSystem.listStatus(convertToDefaultPath(f));
+    return executeFuncWithTimeMetrics(MetricName.listStatus.name(), f, () -> {
+      return fileSystem.listStatus(convertToDefaultPath(f));
+    });
   }
 
   @Override
@@ -304,27 +359,29 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-    this.metricsRegistry.increment(MetricName.mkdirs.name());
-    boolean success = fileSystem.mkdirs(convertToDefaultPath(f), permission);
-    if (success) {
-      try {
-        consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
-      } catch (TimeoutException e) {
-        throw new HoodieException("Timed out waiting for directory " + f + " 
to appear", e);
+    return executeFuncWithTimeMetrics(MetricName.mkdirs.name(), f, () -> {
+      boolean success = fileSystem.mkdirs(convertToDefaultPath(f), permission);
+      if (success) {
+        try {
+          consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
+        } catch (TimeoutException e) {
+          throw new HoodieException("Timed out waiting for directory " + f + " 
to appear", e);
+        }
       }
-    }
-    return success;
+      return success;
+    });
   }
 
   @Override
   public FileStatus getFileStatus(Path f) throws IOException {
-    this.metricsRegistry.increment(MetricName.getFileStatus.name());
-    try {
-      consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
-    } catch (TimeoutException e) {
-      // pass
-    }
-    return fileSystem.getFileStatus(convertToDefaultPath(f));
+    return executeFuncWithTimeMetrics(MetricName.getFileStatus.name(), f, () 
-> {
+      try {
+        consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
+      } catch (TimeoutException e) {
+        // pass
+      }
+      return fileSystem.getFileStatus(convertToDefaultPath(f));
+    });
   }
 
   @Override
@@ -389,7 +446,7 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public FSDataInputStream open(Path f) throws IOException {
-    return fileSystem.open(convertToDefaultPath(f));
+    return wrapInputStream(f, fileSystem.open(convertToDefaultPath(f)));
   }
 
   @Override
@@ -462,8 +519,9 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public boolean delete(Path f) throws IOException {
-    this.metricsRegistry.increment(MetricName.delete.name());
-    return delete(f, true);
+    return executeFuncWithTimeMetrics(MetricName.delete.name(), f, () -> {
+      return delete(f, true);
+    });
   }
 
   @Override
@@ -508,32 +566,37 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException 
{
-    this.metricsRegistry.increment(MetricName.listStatus.name());
-    return fileSystem.listStatus(convertToDefaultPath(f), filter);
+    return executeFuncWithTimeMetrics(MetricName.listStatus.name(), f, () -> {
+      return fileSystem.listStatus(convertToDefaultPath(f), filter);
+    });
   }
 
   @Override
   public FileStatus[] listStatus(Path[] files) throws IOException {
-    this.metricsRegistry.increment(MetricName.listStatus.name());
-    return fileSystem.listStatus(convertDefaults(files));
+    return executeFuncWithTimeMetrics(MetricName.listStatus.name(), 
files.length > 0 ? files[0] : null, () -> {
+      return fileSystem.listStatus(convertDefaults(files));
+    });
   }
 
   @Override
   public FileStatus[] listStatus(Path[] files, PathFilter filter) throws 
IOException {
-    this.metricsRegistry.increment(MetricName.listStatus.name());
-    return fileSystem.listStatus(convertDefaults(files), filter);
+    return executeFuncWithTimeMetrics(MetricName.listStatus.name(), 
files.length > 0 ? files[0] : null, () -> {
+      return fileSystem.listStatus(convertDefaults(files), filter);
+    });
   }
 
   @Override
   public FileStatus[] globStatus(Path pathPattern) throws IOException {
-    this.metricsRegistry.increment(MetricName.globStatus.name());
-    return fileSystem.globStatus(convertToDefaultPath(pathPattern));
+    return executeFuncWithTimeMetrics(MetricName.globStatus.name(), 
pathPattern, () -> {
+      return fileSystem.globStatus(convertToDefaultPath(pathPattern));
+    });
   }
 
   @Override
   public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws 
IOException {
-    this.metricsRegistry.increment(MetricName.globStatus.name());
-    return fileSystem.globStatus(convertToDefaultPath(pathPattern), filter);
+    return executeFuncWithTimeMetrics(MetricName.globStatus.name(), 
pathPattern, () -> {
+      return fileSystem.globStatus(convertToDefaultPath(pathPattern), filter);
+    });
   }
 
   @Override
@@ -543,8 +606,9 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean 
recursive) throws IOException {
-    this.metricsRegistry.increment(MetricName.listFiles.name());
-    return fileSystem.listFiles(convertToDefaultPath(f), recursive);
+    return executeFuncWithTimeMetrics(MetricName.listFiles.name(), f, () -> {
+      return fileSystem.listFiles(convertToDefaultPath(f), recursive);
+    });
   }
 
   @Override
@@ -554,16 +618,17 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public boolean mkdirs(Path f) throws IOException {
-    this.metricsRegistry.increment(MetricName.mkdirs.name());
-    boolean success = fileSystem.mkdirs(convertToDefaultPath(f));
-    if (success) {
-      try {
-        consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
-      } catch (TimeoutException e) {
-        throw new HoodieException("Timed out waiting for directory " + f + " 
to appear", e);
+    return executeFuncWithTimeMetrics(MetricName.mkdirs.name(), f, () -> {
+      boolean success = fileSystem.mkdirs(convertToDefaultPath(f));
+      if (success) {
+        try {
+          consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
+        } catch (TimeoutException e) {
+          throw new HoodieException("Timed out waiting for directory " + f + " 
to appear", e);
+        }
       }
-    }
-    return success;
+      return success;
+    });
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
new file mode 100644
index 0000000..684a625
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataInputStream.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.fs;
+
+import org.apache.hudi.common.metrics.Registry;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.io.ByteBufferPool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.EnumSet;
+
+/**
+ * Wrapper over <code>FSDataInputStream</code> to keep track of the size of 
the written bytes.
+ */
+public class SizeAwareFSDataInputStream extends FSDataInputStream {
+
+  // Path
+  private final Path path;
+  // Registry or read and write metrics
+  Registry metricsRegistry;
+
+  public SizeAwareFSDataInputStream(Path path, FSDataInputStream in) throws 
IOException {
+    super(in);
+    this.path = path;
+  }
+
+  @Override
+  public int read(ByteBuffer buf) throws IOException {
+    return 
HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
+        path, 0, () -> {
+            return super.read(buf);
+      });
+  }
+
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length) throws 
IOException {
+    return 
HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
+        path, length, () -> {
+            return super.read(position, buffer, offset, length);
+      });
+  }
+
+  @Override
+  public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, 
EnumSet<ReadOption> opts)
+          throws IOException, UnsupportedOperationException {
+    return 
HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
+        path, maxLength, () -> {
+          return super.read(bufferPool, maxLength, opts);
+      });
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer) throws IOException {
+    
HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
+        path, buffer.length, () -> {
+          super.readFully(position, buffer);
+          return null;
+        });
+  }
+
+  @Override
+  public void readFully(long position, byte[] buffer, int offset, int length) 
throws IOException {
+    
HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.read.name(),
+        path, length, () -> {
+          super.readFully(position, buffer, offset, length);
+          return null;
+        });
+  }
+
+  public void setMetricRegistry(Registry metricsRegistry) {
+    this.metricsRegistry = metricsRegistry;
+  }
+
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java
index 0b70bed..e0607d0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/SizeAwareFSDataOutputStream.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.fs;
 
+import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -41,6 +42,8 @@ public class SizeAwareFSDataOutputStream extends 
FSDataOutputStream {
   private final Path path;
   // Consistency guard
   private final ConsistencyGuard consistencyGuard;
+  // Registry or read and write metrics
+  Registry metricsRegistry;
 
   public SizeAwareFSDataOutputStream(Path path, FSDataOutputStream out, 
ConsistencyGuard consistencyGuard,
       Runnable closeCallback) throws IOException {
@@ -52,14 +55,22 @@ public class SizeAwareFSDataOutputStream extends 
FSDataOutputStream {
 
   @Override
   public synchronized void write(byte[] b, int off, int len) throws 
IOException {
-    bytesWritten.addAndGet(len);
-    super.write(b, off, len);
+    
HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.write.name(),
 path,
+        len, () -> {
+            bytesWritten.addAndGet(len);
+            super.write(b, off, len);
+            return null;
+        });
   }
 
   @Override
   public void write(byte[] b) throws IOException {
-    bytesWritten.addAndGet(b.length);
-    super.write(b);
+    
HoodieWrapperFileSystem.executeFuncWithTimeAndByteMetrics(HoodieWrapperFileSystem.MetricName.write.name(),
 path,
+        b.length, () -> {
+            bytesWritten.addAndGet(b.length);
+            super.write(b);
+            return null;
+        });
   }
 
   @Override
@@ -77,4 +88,7 @@ public class SizeAwareFSDataOutputStream extends 
FSDataOutputStream {
     return bytesWritten.get();
   }
 
+  public void setMetricRegistry(Registry metricsRegistry) {
+    this.metricsRegistry = metricsRegistry;
+  }
 }

Reply via email to