lidavidm commented on a change in pull request #9147: URL: https://github.com/apache/arrow/pull/9147#discussion_r556542740
########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java ########## @@ -259,7 +259,8 @@ private static ArrowMessage frame(BufferAllocator allocator, final InputStream s ArrowBuf body = null; ArrowBuf appMetadata = null; while (stream.available() > 0) { - int tag = readRawVarint32(stream); + int tag = readRawVarint32WithEOFCheck(stream); + switch (tag) { Review comment: Ah, actually, now that I look at this, ArrowMessage#frame overall doesn't seem right...it shouldn't be checking stream.available() at all, instead it should be looping and reading the first byte, and handing it to readRawVarint32(int, InputStream) if it's >= 0. I think that would explain the actual issues you are seeing - we should not rely on available() at all and instead use read(). So it should look like ```java while (true) { int firstByte = stream.read(); if (firstByte < 0) { // EOF break; } int tag = readRawVarint32(firstByte, stream); // ... } ``` ########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java ########## @@ -259,7 +259,8 @@ private static ArrowMessage frame(BufferAllocator allocator, final InputStream s ArrowBuf body = null; ArrowBuf appMetadata = null; while (stream.available() > 0) { - int tag = readRawVarint32(stream); + int tag = readRawVarint32WithEOFCheck(stream); + switch (tag) { Review comment: As-is, -1 will get silently ignored and that happens to be correct here, fortunately. But we shouldn't be relying on available() at all. ########## File path: java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java ########## @@ -332,6 +333,23 @@ private static ArrowMessage frame(BufferAllocator allocator, final InputStream s } + /** + * Get first byte with EOF check, it is especially needed when using grpc compression. + * InflaterInputStream need another read to change reachEOF after all bytes has been read. + * + * @param is InputStream + * @return -1 if stream is not available, otherwise it will return the actual value. + * @throws IOException Read first byte failed. + */ + private static int readRawVarint32WithEOFCheck(InputStream is) throws IOException { + int firstByte = is.read(); Review comment: Sorry, my read of this is that EOF implies available == 0, but not the other way around. Otherwise, since the base InputStream always returns 0 for available, that would imply it's always at EOF, which is rather useless, right? And the method definition is about the number of bytes that can be read _without blocking_, not the number of bytes that can be read overall. If empirically, that doesn't match the behavior of the InflaterInputStream, that's fine - but let's make it clear in that case so that it doesn't confuse people without that context. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org