lidavidm commented on a change in pull request #9147:
URL: https://github.com/apache/arrow/pull/9147#discussion_r556542740



##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -259,7 +259,8 @@ private static ArrowMessage frame(BufferAllocator 
allocator, final InputStream s
       ArrowBuf body = null;
       ArrowBuf appMetadata = null;
       while (stream.available() > 0) {
-        int tag = readRawVarint32(stream);
+        int tag = readRawVarint32WithEOFCheck(stream);
+
         switch (tag) {

Review comment:
       Ah, actually, now that I look at this, ArrowMessage#frame overall 
doesn't seem right...it shouldn't be checking stream.available() at all, 
instead it should be looping and reading the first byte, and handing it to 
readRawVarint32(int, InputStream) if it's >= 0.
   
   I think that would explain the actual issues you are seeing - we should not 
rely on available() at all and instead use read(). So it should look like
   
   ```java
   while (true) {
       int firstByte = stream.read();
       if (firstByte < 0) {
           // EOF
           break;    
       }
       int tag = readRawVarint32(firstByte, stream);
       // ...
   }
   ```

##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -259,7 +259,8 @@ private static ArrowMessage frame(BufferAllocator 
allocator, final InputStream s
       ArrowBuf body = null;
       ArrowBuf appMetadata = null;
       while (stream.available() > 0) {
-        int tag = readRawVarint32(stream);
+        int tag = readRawVarint32WithEOFCheck(stream);
+
         switch (tag) {

Review comment:
       As-is, -1 will get silently ignored and that happens to be correct here, 
fortunately. But we shouldn't be relying on available() at all.

##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -332,6 +333,23 @@ private static ArrowMessage frame(BufferAllocator 
allocator, final InputStream s
 
   }
 
+  /**
+   * Get first byte with EOF check, it is especially needed when using grpc 
compression.
+   * InflaterInputStream need another read to change reachEOF after all bytes 
has been read.
+   *
+   * @param is InputStream
+   * @return -1 if stream is not available, otherwise it will return the 
actual value.
+   * @throws IOException Read first byte failed.
+   */
+  private static int readRawVarint32WithEOFCheck(InputStream is) throws 
IOException {
+    int firstByte = is.read();

Review comment:
       Sorry, my read of this is that EOF implies available == 0, but not the 
other way around. Otherwise, since the base InputStream always returns 0 for 
available, that would imply it's always at EOF, which is rather useless, right? 
And the method definition is about the number of bytes that can be read 
_without blocking_, not the number of bytes that can be read overall.
   
   If empirically, that doesn't match the behavior of the InflaterInputStream, 
that's fine - but let's make it clear in that case so that it doesn't confuse 
people without that context.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to