alexeykudinkin commented on code in PR #6815:
URL: https://github.com/apache/hudi/pull/6815#discussion_r1083233582


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -488,25 +489,24 @@ public void remove() {
   private static FSDataInputStream getFSDataInputStream(FileSystem fs,
                                                         HoodieLogFile logFile,
                                                         int bufferSize) throws 
IOException {
-    FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), 
bufferSize);
+    FSDataInputStream inputStream = fs.open(logFile.getPath(), bufferSize);
+    FSDataInputStream targetInputStream;
 
     if (FSUtils.isGCSFileSystem(fs)) {
       // in GCS FS, we might need to interceptor seek offsets as we might get 
EOF exception
-      return new 
SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, 
logFile, bufferSize), true);
-    }
-
-    if (FSUtils.isCHDFileSystem(fs)) {
-      return new BoundedFsDataInputStream(fs, logFile.getPath(), 
fsDataInputStream);
-    }
-
-    if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
-      return new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
-          new BufferedFSInputStream((FSInputStream) 
fsDataInputStream.getWrappedStream(), bufferSize)));
+      targetInputStream = new 
SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(inputStream, logFile, 
bufferSize), true);
+    } else if (FSUtils.isCHDFileSystem(fs)) {
+      targetInputStream = new BoundedFsDataInputStream(fs, logFile.getPath(), 
inputStream);
+    } else if (inputStream.getWrappedStream() instanceof FSInputStream) {
+      targetInputStream = new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
+          new BufferedFSInputStream((FSInputStream) 
inputStream.getWrappedStream(), bufferSize)));
+    } else {
+      // inputStream.getWrappedStream() maybe a BufferedFSInputStream
+      // need to wrap in another BufferedFSInputStream the make bufferSize 
work?
+      targetInputStream = inputStream;
     }
 
-    // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
-    // need to wrap in another BufferedFSInputStream the make bufferSize work?
-    return fsDataInputStream;
+    return new LeakTrackingFSDataInputStream(targetInputStream);

Review Comment:
   Correct. See my response below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to