stczwd commented on a change in pull request #9147:
URL: https://github.com/apache/arrow/pull/9147#discussion_r555572500



##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -332,6 +333,23 @@ private static ArrowMessage frame(BufferAllocator 
allocator, final InputStream s
 
   }
 
+  /**
+   * Get first byte with EOF check, it is especially needed when using grpc 
compression.
+   * InflaterInputStream need another read to change reachEOF after all bytes 
has been read.
+   *
+   * @param is InputStream
+   * @return -1 if stream is not available, otherwise it will return the 
actual value.
+   * @throws IOException Read first byte failed.
+   */

Review comment:
       Great, thanks.

##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -332,6 +333,23 @@ private static ArrowMessage frame(BufferAllocator 
allocator, final InputStream s
 
   }
 
+  /**
+   * Get first byte with EOF check, it is especially needed when using grpc 
compression.
+   * InflaterInputStream need another read to change reachEOF after all bytes 
has been read.
+   *
+   * @param is InputStream
+   * @return -1 if stream is not available, otherwise it will return the 
actual value.
+   * @throws IOException Read first byte failed.
+   */
+  private static int readRawVarint32WithEOFCheck(InputStream is) throws 
IOException {
+    int firstByte = is.read();

Review comment:
       what if `firstByte < 0` but `is.available >0`?

##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -259,7 +259,8 @@ private static ArrowMessage frame(BufferAllocator 
allocator, final InputStream s
       ArrowBuf body = null;
       ArrowBuf appMetadata = null;
       while (stream.available() > 0) {
-        int tag = readRawVarint32(stream);
+        int tag = readRawVarint32WithEOFCheck(stream);
+
         switch (tag) {

Review comment:
       readRawVarint32 will throw InvalidProtocolBufferException before the tag 
return.

##########
File path: 
java/flight/flight-core/src/main/java/org/apache/arrow/flight/ArrowMessage.java
##########
@@ -332,6 +333,23 @@ private static ArrowMessage frame(BufferAllocator 
allocator, final InputStream s
 
   }
 
+  /**
+   * Get first byte with EOF check, it is especially needed when using grpc 
compression.
+   * InflaterInputStream need another read to change reachEOF after all bytes 
has been read.
+   *
+   * @param is InputStream
+   * @return -1 if stream is not available, otherwise it will return the 
actual value.
+   * @throws IOException Read first byte failed.
+   */
+  private static int readRawVarint32WithEOFCheck(InputStream is) throws 
IOException {
+    int firstByte = is.read();
+    if (is.available() <= 0) {
+      return -1;
+    } else {
+      return CodedInputStream.readRawVarint32(firstByte, is);
+    }
+  }
+
   private static int readRawVarint32(InputStream is) throws IOException {
     int firstByte = is.read();

Review comment:
       Hm, I have thought about this. This function is also called for getting 
size, we should keep checking of `firstByte < -1`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to