This is an automated email from the ASF dual-hosted git repository.
boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 0f9d19c DRILL-6685: Fixed exception when reading Parquet data
0f9d19c is described below
commit 0f9d19c4c90a148573cfed908dd03ad771306bc1
Author: Salim Achouche <[email protected]>
AuthorDate: Wed Aug 15 14:02:23 2018 -0700
DRILL-6685: Fixed exception when reading Parquet data
---
.../VarLenAbstractPageEntryReader.java | 18 +++
.../columnreaders/VarLenFixedEntryReader.java | 2 +-
.../VarLenNullableFixedEntryReader.java | 2 +-
.../exec/store/parquet/TestParquetBulkReader.java | 149 +++++++++++++++++++++
.../parquet/fourvarchar_asc_nulls.parquet | Bin 0 -> 110651 bytes
5 files changed, 169 insertions(+), 2 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java
index fecf1ce..a708f52 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java
@@ -97,4 +97,22 @@ abstract class VarLenAbstractPageEntryReader extends
VarLenAbstractEntryReader {
protected final int remainingPageData() {
return pageInfo.pageDataLen - pageInfo.pageDataOff;
}
+
+ /**
+ * Fixed length readers calculate upfront the maximum number of entries to
process as entry length
+ * are known.
+ * @param valuesToRead requested number of values to read
+ * @param entrySz sizeof(integer) + column's precision
+ * @return maximum entries to read within each call (based on the bulk
entry, entry size, and requested
+ * number of entries to read)
+ */
+ protected final int getFixedLengthMaxRecordsToRead(int valuesToRead, int
entrySz) {
+ // Let's start with bulk's entry and requested values-to-read constraints
+ int numEntriesToRead = Math.min(entry.getMaxEntries(), valuesToRead);
+
+ // Now include the size of the fixed entry (since they are fixed)
+ numEntriesToRead = Math.min(numEntriesToRead, buffer.limit() / entrySz);
+
+ return numEntriesToRead;
+ }
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
index a6e7077..e66bd05 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
@@ -43,7 +43,7 @@ final class VarLenFixedEntryReader extends
VarLenAbstractPageEntryReader {
final int expectedDataLen = columnPrecInfo.precision;
final int entrySz = 4 + columnPrecInfo.precision;
- final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
+ final int readBatch = getFixedLengthMaxRecordsToRead(valuesToRead,
entrySz);
Preconditions.checkState(readBatch > 0, "Read batch count [%d] should be
greater than zero", readBatch);
final int[] valueLengths = entry.getValuesLength();
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
index 3869113..caf5c73 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
@@ -45,7 +45,7 @@ final class VarLenNullableFixedEntryReader extends
VarLenAbstractPageEntryReader
final int expectedDataLen = columnPrecInfo.precision;
final int entrySz = 4 + columnPrecInfo.precision;
- final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
+ final int readBatch = getFixedLengthMaxRecordsToRead(valuesToRead,
entrySz);
Preconditions.checkState(readBatch > 0, "Read batch count [%s] should be
greater than zero", readBatch);
final int[] valueLengths = entry.getValuesLength();
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetBulkReader.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetBulkReader.java
new file mode 100644
index 0000000..315ff93
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetBulkReader.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/** Tests the Parquet bulk reader */
+public class TestParquetBulkReader extends ClusterTest {
+
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+ startCluster(builder);
+ }
+
+ private static final String DATAFILE =
"cp.`parquet/fourvarchar_asc_nulls.parquet`";
+
+ /** Load variable length data which has nulls */
+ @Test
+ public void testNullCount() throws Exception {
+ try {
+ alterSession();
+
+ testBuilder()
+ .sqlQuery("select count(*) as c from %s where VarbinaryValue3 is
null", DATAFILE)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(71L)
+ .go();
+
+ testBuilder()
+ .sqlQuery("select count(*) as c from %s where VarbinaryValue1 is
null", DATAFILE)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(44L)
+ .go();
+ } finally {
+ resetSession();
+ }
+ }
+
+ /** Load variable length data which has non-nulls data */
+ @Test
+ public void testNotNullCount() throws Exception {
+ try {
+ alterSession();
+
+ testBuilder()
+ .sqlQuery("select count(*) as c from %s where VarbinaryValue3 is not
null", DATAFILE)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(0L)
+ .go();
+
+ testBuilder()
+ .sqlQuery("select count(*) as c from %s where VarbinaryValue1 is not
null", DATAFILE)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(27L)
+ .go();
+ } finally {
+ resetSession();
+ }
+ }
+
+ /** Load variable columns with fixed length data with large precision and
null values */
+ @Test
+ public void testFixedLengthWithLargePrecisionAndNulls() throws Exception {
+ try {
+ alterSession();
+
+ testBuilder()
+ .sqlQuery("select count(*) as c from %s where index < 50 and
length(VarbinaryValue1) = 400", DATAFILE)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(25L)
+ .go();
+ } finally {
+ resetSession();
+ }
+ }
+
+ /** Load variable length data which was originally fixed length and then
became variable length */
+ @Test
+ public void testFixedLengthToVarlen() throws Exception {
+ try {
+ alterSession();
+
+ testBuilder()
+ .sqlQuery("select count(*) as c from %s where index < 60 and
length(VarbinaryValue1) <= 800", DATAFILE)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(27L)
+ .go();
+ } finally {
+ resetSession();
+ }
+ }
+
+ /** Load variable length data with values larger than chunk size (4k) */
+ @Test
+ public void testLargeVarlen() throws Exception {
+ try {
+ alterSession();
+
+ testBuilder()
+ .sqlQuery("select count(*) as c from %s where length(VarbinaryValue2)
= 4500", DATAFILE)
+ .unOrdered()
+ .baselineColumns("c")
+ .baselineValues(19L)
+ .go();
+ } finally {
+ resetSession();
+ }
+ }
+
+ private void alterSession() {
+ client.alterSession(ExecConstants.PARQUET_FLAT_READER_BULK, true);
+
client.alterSession(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_KEY,
false);
+ }
+
+ private void resetSession() {
+ client.resetSession(ExecConstants.PARQUET_FLAT_READER_BULK);
+
client.resetSession(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_KEY);
+ }
+
+}
diff --git
a/exec/java-exec/src/test/resources/parquet/fourvarchar_asc_nulls.parquet
b/exec/java-exec/src/test/resources/parquet/fourvarchar_asc_nulls.parquet
new file mode 100644
index 0000000..abe4aaf
Binary files /dev/null and
b/exec/java-exec/src/test/resources/parquet/fourvarchar_asc_nulls.parquet differ