yihua commented on code in PR #6031:
URL: https://github.com/apache/hudi/pull/6031#discussion_r913281573
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -285,6 +288,22 @@ private boolean isBlockCorrupted(int blocksize) throws
IOException {
// release-3.1.0-RC1/BufferedFSInputStream.java#L73
inputStream.seek(currentPos);
return true;
+ } catch (IOException e) {
+ if (logFile.getFileSize() < 0) {
+ long logFileSize = FSUtils.getFileSize(fs, logFile.getPath());
+ logFile.setFileLen(logFileSize);
+ }
+ if (endOfBlockPos > logFile.getFileSize() || endOfBlockPos < 0) {
+ LOG.info("Found corrupted block in file " + logFile + " with block
size(" + blocksize + ") running past EOF");
+ // this is corrupt
+ // This seek is required because contract of seek() is different for
naked DFSInputStream vs BufferedFSInputStream
+ // release-3.1.0-RC1/DFSInputStream.java#L1455
+ // release-3.1.0-RC1/BufferedFSInputStream.java#L73
+ inputStream.seek(currentPos);
+ return true;
+ } else {
+ throw e;
+ }
Review Comment:
Instead of changing the core reader logic here, could you add the
scheme-specific logic to a new implementation of `FSDataInputStream` like
`SchemeAwareFSDataInputStream` and integrate that through
`getFSDataInputStream()`?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -285,6 +288,22 @@ private boolean isBlockCorrupted(int blocksize) throws
IOException {
// release-3.1.0-RC1/BufferedFSInputStream.java#L73
inputStream.seek(currentPos);
return true;
+ } catch (IOException e) {
+ if (logFile.getFileSize() < 0) {
+ long logFileSize = FSUtils.getFileSize(fs, logFile.getPath());
+ logFile.setFileLen(logFileSize);
+ }
+ if (endOfBlockPos > logFile.getFileSize() || endOfBlockPos < 0) {
+ LOG.info("Found corrupted block in file " + logFile + " with block
size(" + blocksize + ") running past EOF");
+ // this is corrupt
+ // This seek is required because contract of seek() is different for
naked DFSInputStream vs BufferedFSInputStream
+ // release-3.1.0-RC1/DFSInputStream.java#L1455
+ // release-3.1.0-RC1/BufferedFSInputStream.java#L73
+ inputStream.seek(currentPos);
+ return true;
+ } else {
+ throw e;
+ }
Review Comment:
See here:
```
private static FSDataInputStream getFSDataInputStream(FileSystem fs,
HoodieLogFile
logFile,
int bufferSize)
throws IOException {
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(),
bufferSize);
if (FSUtils.isGCSFileSystem(fs)) {
// in GCS FS, we might need to interceptor seek offsets as we might
get EOF exception
return new
SchemeAwareFSDataInputStream(getFSDataInputStreamForGCS(fsDataInputStream,
logFile, bufferSize), true);
}
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
return new TimedFSDataInputStream(logFile.getPath(), new
FSDataInputStream(
new BufferedFSInputStream((FSInputStream)
fsDataInputStream.getWrappedStream(), bufferSize)));
}
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize
work?
return fsDataInputStream;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]