nsivabalan commented on a change in pull request #2500:
URL: https://github.com/apache/hudi/pull/2500#discussion_r581562884



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -74,20 +75,28 @@
 
   public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema 
readerSchema, int bufferSize,
                              boolean readBlockLazily, boolean reverseReader) 
throws IOException {
+    FSDataInputStream inputStreamLocal;
     FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), 
bufferSize);
-    if (FSUtils.isGCSInputStream(fsDataInputStream)) {
-      this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
-          new BufferedFSInputStream((FSInputStream) ((
-              (FSDataInputStream) 
fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
-    } else if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
-      this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
+    if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {

Review comment:
       since I couldn't verify GCS locally, I was relying on feedback given by 
@vburenin 
   
   this is an excerpt from his previous comment.
   ```
   I had a failure here running a job with a previous GCS detector, it would 
determine it to be GCS based on FSDataInputStream, but when it calls 
getWrappedStream().getWrappedStream() it ends up with 
GoogleHadoopFSDataInputStream that could not be cast to FSInputStream. So as a 
quick fix IF to this checking for FSInputStream first:
   
       if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
         this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
             new BufferedFSInputStream((FSInputStream) 
fsDataInputStream.getWrappedStream(), bufferSize)));
       } else if (FSUtils.isGCSInputStream(fsDataInputStream)) {
         this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
             new BufferedFSInputStream((FSInputStream) ((
                 (FSDataInputStream) 
fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
       } else {
         // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
         // need to wrap in another BufferedFSInputStream the make bufferSize 
work?
         this.inputStream = fsDataInputStream;
       }
   I would suggest you rearrange it here too as there is no need to do anything 
special if fsDataInputStream.getWrappedStream() is instanceof FSInputStream
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to