This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e765cc04 GH-3040: DictionaryFilter.canDrop may return false positive 
result when dict size exceeds 8k (#3041)
2e765cc04 is described below

commit 2e765cc04c5ecdaaffcd4f52f8e7c56ee04936d7
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Nov 7 04:34:30 2024 +0800

    GH-3040: DictionaryFilter.canDrop may return false positive result when 
dict size exceeds 8k (#3041)
    
    * GH-3040: DictionaryFilter.canDrop may return false positive result when 
dict size exceeds 8k
    
    * style
    
    * check bytesRead
    
    * import
---
 .../java/org/apache/parquet/bytes/BytesInput.java  | 12 +++++++-
 .../bytes/AvailableAgnosticInputStream.java        | 35 ++++++++++++++++++++++
 .../org/apache/parquet/bytes/TestBytesInput.java   | 14 +++++++++
 3 files changed, 60 insertions(+), 1 deletion(-)

diff --git 
a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java 
b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
index 88bb1da7c..25ec5dc86 100644
--- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
@@ -20,12 +20,14 @@ package org.apache.parquet.bytes;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.util.Arrays;
 import java.util.List;
@@ -376,7 +378,15 @@ public abstract class BytesInput {
         ByteBuffer workBuf = buffer.duplicate();
         int pos = buffer.position();
         workBuf.limit(pos + byteCount);
-        Channels.newChannel(in).read(workBuf);
+        ReadableByteChannel channel = Channels.newChannel(in);
+        int remaining = byteCount;
+        while (remaining > 0) {
+          int bytesRead = channel.read(workBuf);
+          if (bytesRead < 0) {
+            throw new EOFException("Reached the end of stream with " + 
remaining + " bytes left to read");
+          }
+          remaining -= bytesRead;
+        }
         buffer.position(pos + byteCount);
       } catch (IOException e) {
         new RuntimeException("Exception occurred during reading input stream", 
e);
diff --git 
a/parquet-common/src/test/java/org/apache/parquet/bytes/AvailableAgnosticInputStream.java
 
b/parquet-common/src/test/java/org/apache/parquet/bytes/AvailableAgnosticInputStream.java
new file mode 100644
index 000000000..ca8cb38cf
--- /dev/null
+++ 
b/parquet-common/src/test/java/org/apache/parquet/bytes/AvailableAgnosticInputStream.java
@@ -0,0 +1,35 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.parquet.bytes;
+
+import java.io.ByteArrayInputStream;
+
+public class AvailableAgnosticInputStream extends ByteArrayInputStream {
+
+  public AvailableAgnosticInputStream(byte[] buf) {
+    super(buf);
+  }
+
+  // In practice, there are some implementations always return 0 even if they 
has more data
+  @Override
+  public synchronized int available() {
+    return 0;
+  }
+}
diff --git 
a/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesInput.java 
b/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesInput.java
index d2c9e8235..6ffe3c650 100644
--- a/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesInput.java
+++ b/parquet-common/src/test/java/org/apache/parquet/bytes/TestBytesInput.java
@@ -140,6 +140,20 @@ public class TestBytesInput {
     validate(data, factory);
   }
 
+  @Test
+  public void testFromLargeAvailableAgnosticInputStream() throws IOException {
+    // allocate a bytes that large than
+    // java.nio.channel.Channels.ReadableByteChannelImpl.TRANSFER_SIZE = 8192
+    byte[] data = new byte[9 * 1024];
+    RANDOM.nextBytes(data);
+    byte[] input = new byte[data.length + 10];
+    RANDOM.nextBytes(input);
+    System.arraycopy(data, 0, input, 0, data.length);
+    Supplier<BytesInput> factory = () -> BytesInput.from(new 
AvailableAgnosticInputStream(input), 9 * 1024);
+
+    validate(data, factory);
+  }
+
   @Test
   public void testFromByteArrayOutputStream() throws IOException {
     byte[] data = new byte[1000];

Reply via email to