nsivabalan commented on pull request #2500:
URL: https://github.com/apache/hudi/pull/2500#issuecomment-792339635
If I am not wrong, there was a bug in the code version that you ran. I
tested 4 different variants of code to arrive at the latest proposal. Let me
walk through them :) Sorry about the lengthy response. Hopefully we get a
closure.
1st variant. Current master branch:
```
FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
LOG.warn("HoodieLogFileReader :: canonical name :: " +
fsDataInputStream.getClass().getCanonicalName() + ", name "
+ fsDataInputStream.getClass().getName());
if (FSUtils.isGCSInputStream(fsDataInputStream)) {
LOG.warn("HoodieLogFileReader :: 111 start GCSFileSystem " +
fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new
FSDataInputStream(
new BufferedFSInputStream((FSInputStream) ((
(FSDataInputStream)
fsDataInputStream.getWrappedStream()).getWrappedStream()), bufferSize)));
LOG.warn("HoodieLogFileReader :: 111 completed ");
} else if (fsDataInputStream.getWrappedStream() instanceof
FSInputStream) {
LOG.warn("HoodieLogFileReader :: 222 start " +
fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
this.inputStream = new TimedFSDataInputStream(logFile.getPath(), new
FSDataInputStream(
new BufferedFSInputStream((FSInputStream)
fsDataInputStream.getWrappedStream(), bufferSize)));
LOG.warn("HoodieLogFileReader :: 222 complete");
} else {
LOG.warn("HoodieLogFileReader :: 333 ");
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize
work?
this.inputStream = fsDataInputStream;
}
```
Output from my run:
"HoodieLogFileReader :: canonical name ::
org.apache.hadoop.fs.FSDataInputStream, name
org.apache.hadoop.fs.FSDataInputStream"
"HoodieLogFileReader :: 111 start GCSFileSystem
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream"
Caused by: java.lang.ClassCastException:
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream cannot be cast to
org.apache.hadoop
.fs.FSDataInputStream
at
org.apache.hudi.common.table.log.HoodieLogFileReader.<init>(HoodieLogFileReader.java:84)
at
org.apache.hudi.common.table.log.HoodieLogFormatReader.<init>(HoodieLogFormatReader.java:62)
at
org.apache.hudi.common.table.log.AbstractHoodieLogRecordScanner.scan(AbstractHoodieLogRecordScanner.java:131)
... 24 more
2nd variant:
This PR just before my last commit.
```
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
LOG.warn("HoodieLogFileReader 1111 " + logFile.getFileName() + " " +
fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new
FSDataInputStream(
new BufferedFSInputStream((FSInputStream)
fsDataInputStream.getWrappedStream(), bufferSize)));
} else if (FSUtils.isGCSFileSystem(fs)) {
LOG.warn("HoodieLogFileReader 2222 aaa " + logFile.getFileName() + " "
+ fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
try {
FSInputStream localFSInputStream =
(FSInputStream)(((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream());
inputStreamLocal = new SchemeAwareFSDataInputStream(new
TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream(localFSInputStream,bufferSize))),
true);
LOG.warn("HoodieLogFileReader 2222 aaa succeeded " +
logFile.getFileName());
} catch (ClassCastException e) {
Log.warn("HoodieLogFileReader 2222 bbb (aaa failed) " +
logFile.getFileName() + " " + e.getCause()
+ ", msg " + e.getMessage());
// if we cannot cast
fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream,
fallback to using as is
LOG.warn("Cannot cast
fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream with
GCSFileSystem, falling back to original "
+ "fsDataInputStream");
inputStreamLocal = fsDataInputStream;
}
} else {
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize
work?
LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName());
inputStreamLocal = fsDataInputStream;
}
```
Output from the run:
"HoodieLogFileReader 1111
.0d7ba334-2847-4b24-997e-1dbecfd12e3b-0_20210306132835.log.1_0-55-75
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream"
So, what this essentially means is that fsDataInputStream.getWrappedStream()
instanceof FSInputStream for GCSFileSystem. And the execution don't even go
into the else block here which is our intention actually.
3rd variant: just to check if fsDataInputStream.getWrappedStream() is an
instance of FSDataInputStream or FSInputStream
```
if (FSUtils.isGCSFileSystem(fs)) {
LOG.warn("HoodieLogFileReader 111 aaa " + logFile.getFileName() + "
can_name: " + fsDataInputStream.getWrappedStream().getClass().getCanonicalName()
+ ". Is wrappedStream instance of fsDataInputStream " +
(fsDataInputStream.getWrappedStream() instanceof FSDataInputStream)
+ " , is wrappedSTream instance of fsInputStream " +
(fsDataInputStream.getWrappedStream() instanceof FSInputStream));
try {
FSInputStream localFSInputStream =
(FSInputStream)(((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream());
inputStreamLocal = new SchemeAwareFSDataInputStream(new
TimedFSDataInputStream(logFile.getPath(), new FSDataInputStream(
new BufferedFSInputStream(localFSInputStream,bufferSize))),
true);
LOG.warn("HoodieLogFileReader 111 aaa succeeded " +
logFile.getFileName());
} catch (ClassCastException e) {
LOG.warn("HoodieLogFileReader 111 bbb (aaa failed) " +
logFile.getFileName() + " " + e.getCause()
+ ", msg " + e.getMessage());
// if we cannot cast
fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream,
fallback to using as is
LOG.warn("Cannot cast
fsDataInputStream.getWrappedStream().getWrappedStream() to FSInputStream with
GCSFileSystem, falling back to original "
+ "fsDataInputStream");
inputStreamLocal = fsDataInputStream;
}
} else if (fsDataInputStream.getWrappedStream() instanceof
FSInputStream) {
LOG.warn("HoodieLogFileReader 222 " + logFile.getFileName() + " " +
fsDataInputStream.getWrappedStream().getClass().getCanonicalName());
inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new
FSDataInputStream(
new BufferedFSInputStream((FSInputStream)
fsDataInputStream.getWrappedStream(), bufferSize)));
LOG.warn("HoodieLogFileReader 222 completed ");
} else {
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize
work?
LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName());
inputStreamLocal = fsDataInputStream;
}
```
Output from the run:
"HoodieLogFileReader 111 aaa
.978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1_0-55-76 can_name:
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream. Is wrappedStream
instance of fsDataInputStream false , is wrappedSTream instance of
fsInputStream true"
"HoodieLogFileReader 111 bbb (aaa failed)
.978be663-e43b-427e-a102-f26066b15776-0_20210306140026.log.1_0-55-76 null, msg
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream cannot be cast to
org.apache.hadoop.fs.FSDataInputStream"
"Cannot cast fsDataInputStream.getWrappedStream().getWrappedStream() to
FSInputStream with GCSFileSystem, falling back to original fsDataInputStream"
.
.
After this, the seek ran into issue.
Caused by: java.io.EOFException: Invalid seek offset: position value (1584)
must be between 0 and 1584 for 'gs://dataproc-staging-us-
.
.
.
So, since we encountered class cast exception, we don't leverage the
SchemeAwareFSDataInputStream class at all and hence ran into seek issue.
4th variant:
my latest commit w/ the proposed fix.
```
if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
LOG.warn("HoodieLogFileReader 111 start " + logFile.getFileName());
inputStreamLocal = new TimedFSDataInputStream(logFile.getPath(), new
FSDataInputStream(
new BufferedFSInputStream((FSInputStream)
fsDataInputStream.getWrappedStream(), bufferSize)));
LOG.warn("HoodieLogFileReader 111 completed ");
if (FSUtils.isGCSFileSystem(fs)) {
LOG.warn("HoodieLogFileReader 222 GCS. Wrapping with
SchemeAwareFSDataInputStream");
inputStreamLocal = new
SchemeAwareFSDataInputStream(inputStreamLocal, true);
}
} else {
// fsDataInputStream.getWrappedStream() maybe a BufferedFSInputStream
// need to wrap in another BufferedFSInputStream the make bufferSize
work?
LOG.warn("HoodieLogFileReader 3333 " + logFile.getFileName());
inputStreamLocal = fsDataInputStream;
}
```
Output from the run:
"HoodieLogFileReader 111 start
.7a1a0684-b710-4a44-97c4-4c98b75db8a2-0_20210306142209.log.1_2-55-76"
"HoodieLogFileReader 111 completed "
"HoodieLogFileReader 222 GCS. Wrapping with SchemeAwareFSDataInputStream"
// No exceptions. all good.
Summary: at some point, we came up w/ an if else block where condition 1
refers to fsDataInputStream.getWrappedStream() instanceof FSInputStream. and
condition2 caters to GCSFileSystem. But in reality, GCSFileSystem also falls
into first condition i.e. fsDataInputStream.getWrappedStream() instanceof
FSInputStream. Hence the proposed fix.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]