yihua commented on code in PR #6031:
URL: https://github.com/apache/hudi/pull/6031#discussion_r913281573


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -285,6 +288,22 @@ private boolean isBlockCorrupted(int blocksize) throws 
IOException {
       // release-3.1.0-RC1/BufferedFSInputStream.java#L73
       inputStream.seek(currentPos);
       return true;
+    } catch (IOException e) {
+      if (logFile.getFileSize() < 0) {
+        long logFileSize = FSUtils.getFileSize(fs, logFile.getPath());
+        logFile.setFileLen(logFileSize);
+      }
+      if (endOfBlockPos > logFile.getFileSize() || endOfBlockPos < 0) {
+        LOG.info("Found corrupted block in file " + logFile + " with block 
size(" + blocksize + ") running past EOF");
+        // this is corrupt
+        // This seek is required because contract of seek() is different for 
naked DFSInputStream vs BufferedFSInputStream
+        // release-3.1.0-RC1/DFSInputStream.java#L1455
+        // release-3.1.0-RC1/BufferedFSInputStream.java#L73
+        inputStream.seek(currentPos);
+        return true;
+      } else {
+        throw e;
+      }

Review Comment:
   Instead of changing the core reader logic here, could you add the 
scheme-specific logic to a new implementation of `FSDataInputStream` like 
`SchemeAwareFSDataInputStream` and integrate that through 
`getFSDataInputStream()`?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -285,6 +288,22 @@ private boolean isBlockCorrupted(int blocksize) throws 
IOException {
       // release-3.1.0-RC1/BufferedFSInputStream.java#L73
       inputStream.seek(currentPos);
       return true;
+    } catch (IOException e) {
+      if (logFile.getFileSize() < 0) {
+        long logFileSize = FSUtils.getFileSize(fs, logFile.getPath());
+        logFile.setFileLen(logFileSize);
+      }
+      if (endOfBlockPos > logFile.getFileSize() || endOfBlockPos < 0) {
+        LOG.info("Found corrupted block in file " + logFile + " with block 
size(" + blocksize + ") running past EOF");
+        // this is corrupt
+        // This seek is required because contract of seek() is different for 
naked DFSInputStream vs BufferedFSInputStream
+        // release-3.1.0-RC1/DFSInputStream.java#L1455
+        // release-3.1.0-RC1/BufferedFSInputStream.java#L73
+        inputStream.seek(currentPos);
+        return true;
+      } else {
+        throw e;
+      }

Review Comment:
   See here:
   ```
   private static FSDataInputStream getFSDataInputStream(FileSystem fs,
                                                           HoodieLogFile 
logFile,
                                                           int bufferSize) 
throws IOException {
       FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), 
bufferSize);
   
       if (FSUtils.isGCSFileSystem(fs)) {
         // in GCS FS, we might need to interceptor seek offsets as we might 
get EOF exception
         return new 
SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream, 
logFile, bufferSize), true);
       }
   
       if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
         return new TimedFSDataInputStream(logFile.getPath(), new 
FSDataInputStream(
             new BufferedFSInputStream((FSInputStream) 
fsDataInputStream.getWrappedStream(), bufferSize)));
       }
   
       // fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
       // need to wrap in another BufferedFSInputStream the make bufferSize 
work?
       return fsDataInputStream;
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to