lidavidm commented on a change in pull request #9147:
URL: https://github.com/apache/arrow/pull/9147#discussion_r556542740
##########
File path:
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -259,7 +259,8 @@ private static ArrowMessage frame(BufferAllocator
allocator, final InputStream s
ArrowBuf body = null;
ArrowBuf appMetadata = null;
while (stream.available() > 0) {
- int tag = readRawVarint32(stream);
+ int tag = readRawVarint32WithEOFCheck(stream);
+
switch (tag) {
Review comment:
Ah, actually, now that I look at this, ArrowMessage#frame overall
doesn't seem right...it shouldn't be checking stream.available() at all,
instead it should be looping and reading the first byte, and handing it to
readRawVarint32(int, InputStream) if it's >= 0.
I think that would explain the actual issues you are seeing - we should not
rely on available() at all and instead use read(). So it should look like
```java
while (true) {
int firstByte = stream.read();
if (firstByte < 0) {
// EOF
break;
}
int tag = readRawVarint32(firstByte, stream);
// ...
}
```
##########
File path:
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -259,7 +259,8 @@ private static ArrowMessage frame(BufferAllocator
allocator, final InputStream s
ArrowBuf body = null;
ArrowBuf appMetadata = null;
while (stream.available() > 0) {
- int tag = readRawVarint32(stream);
+ int tag = readRawVarint32WithEOFCheck(stream);
+
switch (tag) {
Review comment:
As-is, -1 will get silently ignored and that happens to be correct here,
fortunately. But we shouldn't be relying on available() at all.
##########
File path:
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -332,6 +333,23 @@ private static ArrowMessage frame(BufferAllocator
allocator, final InputStream s
}
+ /**
+ * Get first byte with EOF check, it is especially needed when using grpc
compression.
+ * InflaterInputStream need another read to change reachEOF after all bytes
has been read.
+ *
+ * @param is InputStream
+ * @return -1 if stream is not available, otherwise it will return the
actual value.
+ * @throws IOException Read first byte failed.
+ */
+ private static int readRawVarint32WithEOFCheck(InputStream is) throws
IOException {
+ int firstByte = is.read();
Review comment:
Sorry, my read of this is that EOF implies available == 0, but not the
other way around. Otherwise, since the base InputStream always returns 0 for
available, that would imply it's always at EOF, which is rather useless, right?
And the method definition is about the number of bytes that can be read
_without blocking_, not the number of bytes that can be read overall.
If empirically, that doesn't match the behavior of the InflaterInputStream,
that's fine - but let's make it clear in that case so that it doesn't confuse
people without that context.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]