bharathv commented on a change in pull request #3377:
URL: https://github.com/apache/hbase/pull/3377#discussion_r650309749



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
##########
@@ -381,8 +385,13 @@ private static void checkLength(int len, int max) throws 
IOException {
     private void readCompressedValue(InputStream in, byte[] outArray, int 
outOffset,
         int expectedLength) throws IOException {
       int compressedLen = StreamUtils.readRawVarint32(in);
-      int read = compression.getValueCompressor().decompress(in, 
compressedLen, outArray,
-        outOffset, expectedLength);
+      // A partial read of the compressed bytes, depending on which 
compression codec is used,
+      // can cause messy IO errors. This can happen when the reader is 
actively tailing a file
+      // being written, for replication.
+      byte[] buffer = new byte[compressedLen];
+      IOUtils.readFully(in, buffer, 0, compressedLen);

Review comment:
       Trying to understand how this actually works, are we relying on the 
EOFException thrown by readFully here so that upper layers in 
ProtofbufReader#next() handles it?
   
   If so, curious if the actual fix should be somewhere around 
ProtobufLogReader#extractHiddenEof()? I mean this works but if we extract the 
right EOF exception, we can avoid this copy? 

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
##########
@@ -381,8 +385,13 @@ private static void checkLength(int len, int max) throws 
IOException {
     private void readCompressedValue(InputStream in, byte[] outArray, int 
outOffset,
         int expectedLength) throws IOException {
       int compressedLen = StreamUtils.readRawVarint32(in);
-      int read = compression.getValueCompressor().decompress(in, 
compressedLen, outArray,
-        outOffset, expectedLength);
+      // A partial read of the compressed bytes, depending on which 
compression codec is used,
+      // can cause messy IO errors. This can happen when the reader is 
actively tailing a file
+      // being written, for replication.
+      byte[] buffer = new byte[compressedLen];
+      IOUtils.readFully(in, buffer, 0, compressedLen);

Review comment:
       Thanks for the detailed comment.
   
   > If we do not read in the complete segment of the compressed stream, the 
decompressor, depending on type, will throw random exceptions, maybe IO 
exceptions, maybe others. These are not EOFExceptions. They permanently confuse 
the log reader.
   
   Ya, thought so. I think we have to live with that assumption since anyone 
can plugin any random codec implementation.
   
   >  we still need to rewind both the reader and the output stream.
   
   Where is this output stream that we need to rewind? Isn't that the job of 
the compression codec to clean up the state if the read() fails (or did I 
misunderstand something?)
   
   What I had in my mind was an "intercepting" input stream that wraps this 
compressed input stream and keeps track of bytes read so far. It essentially 
does what IOUtils.readFully() does but without copying it into a buffer. It 
just throws EOFException when it is end of stream and readBytesSoFar < 
totalBytesToBeRead. In some sense we are intercepting the exception that the 
decompressor runs into even before it happens and we can be sure that it is due 
to this EOF and not some thing else with the above check. But then I didn't get 
this part of rewinding the decompressor's internal output stream.
   
   > Flushing the writer is insufficient, although I do that too in this patch 
to attempt to minimize the time where a tailer might not have a complete 
WALedit serialization at the current end of file.
   
   Ya, figured that, was wondering if frequent flushing is detrimental to the 
performance, instead we should probably harden this read path and leave 
flushing to the upper layers? 

##########
File path: 
hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedDelegatingInputStream.java
##########
@@ -104,8 +104,7 @@ public int available() throws IOException {
     if (pos >= limit) {
       return 0;
     }
-    int available = in.available();
-    return (int) Math.min(available, limit - pos);
+    return (int) (limit - pos);

Review comment:
       Ya, something like this in a wrapping input stream is what I had in my 
mind but doing this in **BoundedDelegatingInputStream** is even more elegant 
and clean..
   
   nit: Looks like javadoc needs updation..
   I think this behavior is very subtle and not obvious, would be great to back 
it up with a small comment and how tailing reads depend on it.. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to