nsivabalan commented on a change in pull request #2496:
URL: https://github.com/apache/hudi/pull/2496#discussion_r567425453



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/fs/TimedSizeAwareOutputStream.java
##########
@@ -20,19 +20,21 @@
 
 import org.apache.hudi.exception.HoodieException;
 
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * Wrapper over <code>FSDataOutputStream</code> to keep track of the size of 
the written bytes. This gives a cheap way
+ * Wrapper over <code>OutputStream</code> to keep track of the size of the 
written bytes. This gives a cheap way
  * to check on the underlying file size.
  */
-public class SizeAwareFSDataOutputStream extends FSDataOutputStream {
+public class TimedSizeAwareOutputStream extends OutputStream {

Review comment:
       I see we don't have unit tests for these FileSystems. If incase you plan 
to write one for newly introduced fileSystem, can you please try if we can add 
for these too. If not, can you create a jira for testing gaps and tag it here. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/fs/TimedFSInputStream.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.fs;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * A wrapper over <code>FSInputStream</code> that also times the operations.
+ */
+public class TimedFSInputStream extends FSInputStream {

Review comment:
       Do we have unit tests for this FS?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
##########
@@ -79,22 +84,51 @@ public static void setMetricsRegistry(Registry registry, 
Registry registryMeta)
     METRICS_REGISTRY_META = registryMeta;
   }
 
+  // Configs which controls IO buffering and buffer sizes
+  private static final String CONFIG_IO_BUFFER_PREFIX = "hoodie.fs.io.buffer";

Review comment:
       +1. would be nice to avoid duplicated configs.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/fs/HoodieWrapperFileSystem.java
##########
@@ -192,27 +261,74 @@ public FSDataOutputStream create(Path f, FsPermission 
permission, boolean overwr
     return executeFuncWithTimeMetrics(MetricName.create.name(), f, () -> {
       final Path translatedPath = convertToDefaultPath(f);
       return wrapOutputStream(f,
-          fileSystem.create(translatedPath, permission, overwrite, bufferSize, 
replication, blockSize, progress));
+          fileSystem.create(translatedPath, permission, overwrite, bufferSize, 
replication, blockSize, progress), bufferSize);
     });
   }
 
-  private FSDataOutputStream wrapOutputStream(final Path path, 
FSDataOutputStream fsDataOutputStream)
+
+  /**
+   * The stream hierarchy after wrapping will be as follows.
+   *
+   *  FSDataOuputStream (returned)
+   *      BufferedOutputStream (required for output buffering)
+   *          TimedSizeAwareOutputStream  (required for tracking metrics, 
timings and the number of bytes written)

Review comment:
       Do we need to enable TimedSizeAwareOutputStream by default whenever 
buffering is enabled. Why not we make this on configurable too. Wondering if 
there will be any overhead, as we might be measuring metrics for very 
read/write calls to the FileSystem. Please correct me if my understanding is 
wrong. Users may not be interested in these metrics unless they want to debug 
something IMO. Let me know your thoughts. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -79,15 +79,12 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile 
logFile, Schema readerSc
       this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
           new BufferedFSInputStream((FSInputStream) ((
               (FSDataInputStream) 
fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
-    } else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
-      this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
-          new BufferedFSInputStream((FSInputStream) 
fsDataInputStream.getWrappedStream(), bufferSize)));
     } else {
-      // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
-      // need to wrap in another BufferedFSInputStream the make bufferSize 
work?
       this.inputStream = fsDataInputStream;
     }
 
+    LOG.error("Opened inputstream of type " + 
this.inputStream.getClass().getName() + "  wrapping over "

Review comment:
       why error logging ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to