This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new e87b80308 PARQUET-2060: Fix infinite loop while reading corrupt files 
(#1245)
e87b80308 is described below

commit e87b80308869b77f914fcfd04364686e11158950
Author: Rathin Bhargava <[email protected]>
AuthorDate: Tue Jan 30 06:09:58 2024 +0100

    PARQUET-2060: Fix infinite loop while reading corrupt files (#1245)
---
 .../hadoop/codec/NonBlockedDecompressorStream.java  |  9 +++++++++
 .../parquet/hadoop/codec/TestCompressionCodec.java  | 21 +++++++++++++++++++++
 2 files changed, 30 insertions(+)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressorStream.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressorStream.java
index 0ebc078d6..6f2c10611 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressorStream.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressorStream.java
@@ -52,6 +52,15 @@ public class NonBlockedDecompressorStream extends 
DecompressorStream {
     if (decompressor.finished()) {
       decompressor.reset();
     }
+
+    // Decompress is called after reading all the compressed input data. The 
decompressor also
+    // decompresses the data all at once. If the decompressor returns 0, it 
does not have any input
+    // data to decompress, and neither does it have any decompressed data to 
return. This is an
+    // invalid state and we error out here.
+    if (decompressedBytes == 0) {
+      throw new IOException("Corrupt file: Zero bytes read during 
decompression.");
+    }
+
     return decompressedBytes;
   }
 }
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
index e046162cc..f202c6350 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
@@ -20,7 +20,10 @@ package org.apache.parquet.hadoop.codec;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -34,6 +37,7 @@ import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class TestCompressionCodec {
 
@@ -172,4 +176,21 @@ public class TestCompressionCodec {
         return null;
     }
   }
+
+  @Test
+  public void TestDecompressorInvalidState() throws IOException {
+    // Create a mock Decompressor that returns 0 when decompress is called.
+    Decompressor mockDecompressor = Mockito.mock(Decompressor.class);
+    when(mockDecompressor.decompress(Mockito.any(byte[].class), 
Mockito.anyInt(), Mockito.anyInt()))
+        .thenReturn(0);
+
+    // Create a NonBlockedDecompressorStream with the mock Decompressor.
+    NonBlockedDecompressorStream decompressorStream =
+        new NonBlockedDecompressorStream(new ByteArrayInputStream(new 
byte[0]), mockDecompressor, 1024);
+
+    assertThrows(IOException.class, () -> {
+      // Attempt to read from the stream, which should trigger the IOException.
+      decompressorStream.read(new byte[1024], 0, 1024);
+    });
+  }
 }

Reply via email to