This is an automated email from the ASF dual-hosted git repository.
dzamo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 9c401c61f1 DRILL-8416: Memory leak when the async Parquet reader skips
empty pages (#2784)
9c401c61f1 is described below
commit 9c401c61f11d84514dab0f2e1397aa3959d82249
Author: James Turton <[email protected]>
AuthorDate: Tue Apr 4 14:15:21 2023 +0200
DRILL-8416: Memory leak when the async Parquet reader skips empty pages
(#2784)
---
.../parquet/columnreaders/AsyncPageReader.java | 5 ++++-
.../drill/exec/store/parquet/TestEmptyParquet.java | 5 ++---
.../store/parquet2/TestDrillParquetReader.java | 23 +++++++++++++++++++++
.../resources/parquet/empty_dict_pages.parquet | Bin 0 -> 2896 bytes
4 files changed, 29 insertions(+), 4 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index d882504989..a38c34dc5e 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -21,6 +21,7 @@ import static org.apache.parquet.column.Encoding.valueOf;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
@@ -70,7 +71,7 @@ import org.slf4j.LoggerFactory;
* invariant here is that there is space for at least one more page in the
queue before the Future read task
* is submitted to the pool). This sequence is important. Not doing so can
lead to deadlocks - producer
* threads may block on putting data into the queue which is full while the
consumer threads might be
- * blocked trying to read from a queue that has no data.
+ * blocked trying to read from a queue that has no /data.
* The first request to the page reader can be either to load a dictionary
page or a data page; this leads
* to the rather odd looking code in the constructor since the parent
PageReader calls
* loadDictionaryIfExists in the constructor.
@@ -305,6 +306,7 @@ class AsyncPageReader extends PageReader {
pageHeader.compressed_page_size
);
skip(pageHeader.compressed_page_size);
+ Optional.ofNullable(readStatus.getPageData()).map(DrillBuf::release);
return;
}
@@ -325,6 +327,7 @@ class AsyncPageReader extends PageReader {
default:
logger.warn("skipping page of type {} of size {}",
pageHeader.getType(), pageHeader.compressed_page_size);
skip(pageHeader.compressed_page_size);
+ Optional.ofNullable(readStatus.getPageData()).map(DrillBuf::release);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java
index 780aa6e646..80602ab251 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java
@@ -416,10 +416,9 @@ public class TestEmptyParquet extends ClusterTest {
}
/**
- * Test a Parquet file containing a zero-byte dictionary page, c.f.
- * DRILL-8023.
+ * Test a Parquet file containing a zero-byte dictionary page.
*/
- @Test
+ @Test // DRILL-8023
public void testEmptyDictPage() throws Exception {
try {
client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, false);
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java
index b5c192483b..a4c518a147 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java
@@ -1358,4 +1358,27 @@ public class TestDrillParquetReader extends
BaseTestQuery {
testRunAndPrint(UserBitShared.QueryType.SQL, "select * from
cp.`parquet2/allTypes.parquet`");
}
+ @Test // DRILL-8416
+ public void testEmptyDictPages() throws Exception {
+ String query = "select " +
+ "`name`, `type`, `begin`, `end` " +
+ "from cp.`parquet/empty_dict_pages.parquet` t";
+ String[] columns = {"`name`", "`type`", "`begin`", "`end`"};
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns(columns)
+ .baselineValues("TP_001", "TP", null, null)
+ .baselineValues("TP_002", "TP", null, null)
+ .baselineValues("TP_003", "TP", null, null)
+ .baselineValues("TP_004", "TP", null, null)
+ .baselineValues("TP_005", "TP", null, null)
+ .baselineValues("TP_006", "TP", null, null)
+ .baselineValues("TP_007", "TP", null, null)
+ .baselineValues("TP_008", "TP", null, null)
+ .baselineValues("TP_009", "TP", null, null)
+ .baselineValues("TP_010", "TP", null, null)
+ .build()
+ .run();
+ }
}
diff --git a/exec/java-exec/src/test/resources/parquet/empty_dict_pages.parquet
b/exec/java-exec/src/test/resources/parquet/empty_dict_pages.parquet
new file mode 100644
index 0000000000..9a94f9aa08
Binary files /dev/null and
b/exec/java-exec/src/test/resources/parquet/empty_dict_pages.parquet differ