Ben-Zvi closed pull request #1433: DRILL-6685: Fixed exception when reading
Parquet data
URL: https://github.com/apache/drill/pull/1433
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java
index fecf1ce3158..a708f52353f 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java
@@ -97,4 +97,22 @@ protected final boolean load(boolean force) {
protected final int remainingPageData() {
return pageInfo.pageDataLen - pageInfo.pageDataOff;
}
+
+ /**
+ * Fixed length readers calculate upfront the maximum number of entries to
process as entry length
+ * are known.
+ * @param valuesToRead requested number of values to read
+ * @param entrySz sizeof(integer) + column's precision
+ * @return maximum entries to read within each call (based on the bulk
entry, entry size, and requested
+ * number of entries to read)
+ */
+ protected final int getFixedLengthMaxRecordsToRead(int valuesToRead, int
entrySz) {
+ // Let's start with bulk's entry and requested values-to-read constraints
+ int numEntriesToRead = Math.min(entry.getMaxEntries(), valuesToRead);
+
+ // Now include the size of the fixed entry (since they are fixed)
+ numEntriesToRead = Math.min(numEntriesToRead, buffer.limit() / entrySz);
+
+ return numEntriesToRead;
+ }
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
index a6e7077241a..e66bd051163 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
@@ -43,7 +43,7 @@ final VarLenColumnBulkEntry getEntry(int valuesToRead) {
final int expectedDataLen = columnPrecInfo.precision;
final int entrySz = 4 + columnPrecInfo.precision;
- final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
+ final int readBatch = getFixedLengthMaxRecordsToRead(valuesToRead,
entrySz);
Preconditions.checkState(readBatch > 0, "Read batch count [%d] should be
greater than zero", readBatch);
final int[] valueLengths = entry.getValuesLength();
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
index 3869113249b..caf5c73472b 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
@@ -45,7 +45,7 @@ final VarLenColumnBulkEntry getEntry(int valuesToRead) {
final int expectedDataLen = columnPrecInfo.precision;
final int entrySz = 4 + columnPrecInfo.precision;
- final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
+ final int readBatch = getFixedLengthMaxRecordsToRead(valuesToRead,
entrySz);
Preconditions.checkState(readBatch > 0, "Read batch count [%s] should be
greater than zero", readBatch);
final int[] valueLengths = entry.getValuesLength();
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetBulkReader.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetBulkReader.java
new file mode 100644
index 00000000000..315ff93be4c
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetBulkReader.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/** Tests the Parquet bulk reader */
+public class TestParquetBulkReader extends ClusterTest {
+
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+ startCluster(builder);
+ }
+
+ private static final String DATAFILE =
"cp.`parquet/fourvarchar_asc_nulls.parquet`";
+
+ /** Load variable length data which has nulls */
+ @Test
+ public void testNullCount() throws Exception {
+ try {
+ alterSession();
+
+ testBuilder()
+ .sqlQuery("select count(*) as c from %s where VarbinaryValue3 is
null", DATAFILE)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(71L)
+ .go();
+
+ testBuilder()
+ .sqlQuery("select count(*) as c from %s where VarbinaryValue1 is
null", DATAFILE)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(44L)
+ .go();
+ } finally {
+ resetSession();
+ }
+ }
+
+ /** Load variable length data which has non-nulls data */
+ @Test
+ public void testNotNullCount() throws Exception {
+ try {
+ alterSession();
+
+ testBuilder()
+ .sqlQuery("select count(*) as c from %s where VarbinaryValue3 is not
null", DATAFILE)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(0L)
+ .go();
+
+ testBuilder()
+ .sqlQuery("select count(*) as c from %s where VarbinaryValue1 is not
null", DATAFILE)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(27L)
+ .go();
+ } finally {
+ resetSession();
+ }
+ }
+
+ /** Load variable columns with fixed length data with large precision and
null values */
+ @Test
+ public void testFixedLengthWithLargePrecisionAndNulls() throws Exception {
+ try {
+ alterSession();
+
+ testBuilder()
+ .sqlQuery("select count(*) as c from %s where index < 50 and
length(VarbinaryValue1) = 400", DATAFILE)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(25L)
+ .go();
+ } finally {
+ resetSession();
+ }
+ }
+
+ /** Load variable length data which was originally fixed length and then
became variable length */
+ @Test
+ public void testFixedLengthToVarlen() throws Exception {
+ try {
+ alterSession();
+
+ testBuilder()
+ .sqlQuery("select count(*) as c from %s where index < 60 and
length(VarbinaryValue1) <= 800", DATAFILE)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(27L)
+ .go();
+ } finally {
+ resetSession();
+ }
+ }
+
+ /** Load variable length data with values larger than chunk size (4k) */
+ @Test
+ public void testLargeVarlen() throws Exception {
+ try {
+ alterSession();
+
+ testBuilder()
+ .sqlQuery("select count(*) as c from %s where length(VarbinaryValue2)
= 4500", DATAFILE)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(19L)
+ .go();
+ } finally {
+ resetSession();
+ }
+ }
+
+ private void alterSession() {
+ client.alterSession(ExecConstants.PARQUET_FLAT_READER_BULK, true);
+
client.alterSession(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_KEY,
false);
+ }
+
+ private void resetSession() {
+ client.resetSession(ExecConstants.PARQUET_FLAT_READER_BULK);
+
client.resetSession(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_KEY);
+ }
+
+}
diff --git
a/exec/java-exec/src/test/resources/parquet/fourvarchar_asc_nulls.parquet
b/exec/java-exec/src/test/resources/parquet/fourvarchar_asc_nulls.parquet
new file mode 100644
index 00000000000..abe4aafc595
Binary files /dev/null and
b/exec/java-exec/src/test/resources/parquet/fourvarchar_asc_nulls.parquet differ
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services