bharathv commented on a change in pull request #3377:
URL: https://github.com/apache/hbase/pull/3377#discussion_r650309749
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
##########
@@ -381,8 +385,13 @@ private static void checkLength(int len, int max) throws
IOException {
private void readCompressedValue(InputStream in, byte[] outArray, int
outOffset,
int expectedLength) throws IOException {
int compressedLen = StreamUtils.readRawVarint32(in);
- int read = compression.getValueCompressor().decompress(in,
compressedLen, outArray,
- outOffset, expectedLength);
+ // A partial read of the compressed bytes, depending on which
compression codec is used,
+ // can cause messy IO errors. This can happen when the reader is
actively tailing a file
+ // being written, for replication.
+ byte[] buffer = new byte[compressedLen];
+ IOUtils.readFully(in, buffer, 0, compressedLen);
Review comment:
Trying to understand how this actually works, are we relying on the
EOFException thrown by readFully here so that upper layers in
ProtofbufReader#next() handles it?
If so, curious if the actual fix should be somewhere around
ProtobufLogReader#extractHiddenEof()? I mean this works but if we extract the
right EOF exception, we can avoid this copy?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
##########
@@ -381,8 +385,13 @@ private static void checkLength(int len, int max) throws
IOException {
private void readCompressedValue(InputStream in, byte[] outArray, int
outOffset,
int expectedLength) throws IOException {
int compressedLen = StreamUtils.readRawVarint32(in);
- int read = compression.getValueCompressor().decompress(in,
compressedLen, outArray,
- outOffset, expectedLength);
+ // A partial read of the compressed bytes, depending on which
compression codec is used,
+ // can cause messy IO errors. This can happen when the reader is
actively tailing a file
+ // being written, for replication.
+ byte[] buffer = new byte[compressedLen];
+ IOUtils.readFully(in, buffer, 0, compressedLen);
Review comment:
Thanks for the detailed comment.
> If we do not read in the complete segment of the compressed stream, the
decompressor, depending on type, will throw random exceptions, maybe IO
exceptions, maybe others. These are not EOFExceptions. They permanently confuse
the log reader.
Ya, thought so. I think we have to live with that assumption since anyone
can plugin any random codec implementation.
> we still need to rewind both the reader and the output stream.
Where is this output stream that we need to rewind? Isn't that the job of
the compression codec to clean up the state if the read() fails (or did I
misunderstand something?)
What I had in my mind was an "intercepting" input stream that wraps this
compressed input stream and keeps track of bytes read so far. It essentially
does what IOUtils.readFully() does but without copying it into a buffer. It
just throws EOFException when it is end of stream and readBytesSoFar <
totalBytesToBeRead. In some sense we are intercepting the exception that the
decompressor runs into even before it happens and we can be sure that it is due
to this EOF and not some thing else with the above check. But then I didn't get
this part of rewinding the decompressor's internal output stream.
> Flushing the writer is insufficient, although I do that too in this patch
to attempt to minimize the time where a tailer might not have a complete
WALedit serialization at the current end of file.
Ya, figured that, was wondering if frequent flushing is detrimental to the
performance, instead we should probably harden this read path and leave
flushing to the upper layers?
##########
File path:
hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
##########
@@ -104,8 +104,7 @@ public int available() throws IOException {
if (pos >= limit) {
return 0;
}
- int available = in.available();
- return (int) Math.min(available, limit - pos);
+ return (int) (limit - pos);
Review comment:
Ya, something like this in a wrapping input stream is what I had in my
mind but doing this in **BoundedDelegatingInputStream** is even more elegant
and clean..
nit: Looks like javadoc needs updation..
I think this behavior is very subtle and not obvious, would be great to back
it up with a small comment and how tailing reads depend on it..
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]