alexeykudinkin commented on code in PR #6815:
URL: https://github.com/apache/hudi/pull/6815#discussion_r1083233582
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -488,25 +489,24 @@ public void remove() {
private static FSDataInputStream getFSDataInputStream(FileSystem fs,
HoodieLogFile logFile,
int bufferSize) throws
IOException {
- FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(),
bufferSize);
+ FSDataInputStream inputStream = fs.open(logFile.getPath(), bufferSize);
+ FSDataInputStream targetInputStream;
if (FSUtils.isGCSFileSystem(fs)) {
// in GCS FS, we might need to interceptor seek offsets as we might get
EOF exception
- return new
SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream,
logFile, bufferSize), true);
- }
-
- if (FSUtils.isCHDFileSystem(fs)) {
- return new BoundedFsDataInputStream(fs, logFile.getPath(),
fsDataInputStream);
- }
-
- if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
- return new TimedFSDataInputStream(logFile.getPath(), new
FSDataInputStream(
- new BufferedFSInputStream((FSInputStream)
fsDataInputStream.getWrappedStream(), bufferSize)));
+ targetInputStream = new
SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(inputStream, logFile,
bufferSize), true);
+ } else if (FSUtils.isCHDFileSystem(fs)) {
+ targetInputStream = new BoundedFsDataInputStream(fs, logFile.getPath(),
inputStream);
+ } else if (inputStream.getWrappedStream() instanceof FSInputStream) {
+ targetInputStream = new TimedFSDataInputStream(logFile.getPath(), new
FSDataInputStream(
+ new BufferedFSInputStream((FSInputStream)
inputStream.getWrappedStream(), bufferSize)));
+ } else {
+ // inputStream.getWrappedStream() maybe a BufferedFSInputStream
+ // need to wrap in another BufferedFSInputStream the make bufferSize
work?
+ targetInputStream = inputStream;
}
- // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
- // need to wrap in another BufferedFSInputStream the make bufferSize work?
- return fsDataInputStream;
+ return new LeakTrackingFSDataInputStream(targetInputStream);
Review Comment:
Correct. See my response below
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]