This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new e87b80308 PARQUET-2060: Fix infinite loop while reading corrupt files
(#1245)
e87b80308 is described below
commit e87b80308869b77f914fcfd04364686e11158950
Author: Rathin Bhargava <[email protected]>
AuthorDate: Tue Jan 30 06:09:58 2024 +0100
PARQUET-2060: Fix infinite loop while reading corrupt files (#1245)
---
.../hadoop/codec/NonBlockedDecompressorStream.java | 9 +++++++++
.../parquet/hadoop/codec/TestCompressionCodec.java | 21 +++++++++++++++++++++
2 files changed, 30 insertions(+)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressorStream.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressorStream.java
index 0ebc078d6..6f2c10611 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressorStream.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressorStream.java
@@ -52,6 +52,15 @@ public class NonBlockedDecompressorStream extends
DecompressorStream {
if (decompressor.finished()) {
decompressor.reset();
}
+
+ // Decompress is called after reading all the compressed input data. The
decompressor also
+ // decompresses the data all at once. If the decompressor returns 0, it
does not have any input
+ // data to decompress, and neither does it have any decompressed data to
return. This is an
+ // invalid state and we error out here.
+ if (decompressedBytes == 0) {
+ throw new IOException("Corrupt file: Zero bytes read during
decompression.");
+ }
+
return decompressedBytes;
}
}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
index e046162cc..f202c6350 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/codec/TestCompressionCodec.java
@@ -20,7 +20,10 @@ package org.apache.parquet.hadoop.codec;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.when;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -34,6 +37,7 @@ import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
public class TestCompressionCodec {
@@ -172,4 +176,21 @@ public class TestCompressionCodec {
return null;
}
}
+
+ @Test
+ public void TestDecompressorInvalidState() throws IOException {
+ // Create a mock Decompressor that returns 0 when decompress is called.
+ Decompressor mockDecompressor = Mockito.mock(Decompressor.class);
+ when(mockDecompressor.decompress(Mockito.any(byte[].class),
Mockito.anyInt(), Mockito.anyInt()))
+ .thenReturn(0);
+
+ // Create a NonBlockedDecompressorStream with the mock Decompressor.
+ NonBlockedDecompressorStream decompressorStream =
+ new NonBlockedDecompressorStream(new ByteArrayInputStream(new
byte[0]), mockDecompressor, 1024);
+
+ assertThrows(IOException.class, () -> {
+ // Attempt to read from the stream, which should trigger the IOException.
+ decompressorStream.read(new byte[1024], 0, 1024);
+ });
+ }
}