This is an automated email from the ASF dual-hosted git repository.

boaz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f9d19c  DRILL-6685: Fixed exception when reading Parquet data
0f9d19c is described below

commit 0f9d19c4c90a148573cfed908dd03ad771306bc1
Author: Salim Achouche <[email protected]>
AuthorDate: Wed Aug 15 14:02:23 2018 -0700

    DRILL-6685: Fixed exception when reading Parquet data
---
 .../VarLenAbstractPageEntryReader.java             |  18 +++
 .../columnreaders/VarLenFixedEntryReader.java      |   2 +-
 .../VarLenNullableFixedEntryReader.java            |   2 +-
 .../exec/store/parquet/TestParquetBulkReader.java  | 149 +++++++++++++++++++++
 .../parquet/fourvarchar_asc_nulls.parquet          | Bin 0 -> 110651 bytes
 5 files changed, 169 insertions(+), 2 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java
index fecf1ce..a708f52 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenAbstractPageEntryReader.java
@@ -97,4 +97,22 @@ abstract class VarLenAbstractPageEntryReader extends 
VarLenAbstractEntryReader {
   protected final int remainingPageData() {
     return pageInfo.pageDataLen - pageInfo.pageDataOff;
   }
+
+  /**
+   * Fixed length readers calculate upfront the maximum number of entries to 
process as entry length
+   * are known.
+   * @param valuesToRead requested number of values to read
+   * @param entrySz sizeof(integer) + column's precision
+   * @return maximum entries to read within each call (based on the bulk 
entry, entry size, and requested
+   *         number of entries to read)
+   */
+  protected final int getFixedLengthMaxRecordsToRead(int valuesToRead, int 
entrySz) {
+    // Let's start with bulk's entry and requested values-to-read constraints
+    int numEntriesToRead = Math.min(entry.getMaxEntries(), valuesToRead);
+
+    // Now include the size of the fixed entry (since they are fixed)
+    numEntriesToRead = Math.min(numEntriesToRead, buffer.limit() / entrySz);
+
+    return numEntriesToRead;
+  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
index a6e7077..e66bd05 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenFixedEntryReader.java
@@ -43,7 +43,7 @@ final class VarLenFixedEntryReader extends 
VarLenAbstractPageEntryReader {
 
     final int expectedDataLen = columnPrecInfo.precision;
     final int entrySz = 4 + columnPrecInfo.precision;
-    final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
+    final int readBatch = getFixedLengthMaxRecordsToRead(valuesToRead, 
entrySz);
     Preconditions.checkState(readBatch > 0, "Read batch count [%d] should be 
greater than zero", readBatch);
 
     final int[] valueLengths = entry.getValuesLength();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
index 3869113..caf5c73 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenNullableFixedEntryReader.java
@@ -45,7 +45,7 @@ final class VarLenNullableFixedEntryReader extends 
VarLenAbstractPageEntryReader
 
     final int expectedDataLen = columnPrecInfo.precision;
     final int entrySz = 4 + columnPrecInfo.precision;
-    final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
+    final int readBatch = getFixedLengthMaxRecordsToRead(valuesToRead, 
entrySz);
     Preconditions.checkState(readBatch > 0, "Read batch count [%s] should be 
greater than zero", readBatch);
 
     final int[] valueLengths = entry.getValuesLength();
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetBulkReader.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetBulkReader.java
new file mode 100644
index 0000000..315ff93
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetBulkReader.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/** Tests the Parquet bulk reader */
+public class TestParquetBulkReader extends ClusterTest {
+
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+    startCluster(builder);
+  }
+
+  private static final String DATAFILE = 
"cp.`parquet/fourvarchar_asc_nulls.parquet`";
+
+  /** Load variable length data which has nulls */
+  @Test
+  public void testNullCount() throws Exception {
+    try {
+      alterSession();
+
+      testBuilder()
+        .sqlQuery("select count(*) as c from %s where VarbinaryValue3 is 
null", DATAFILE)
+        .unOrdered()
+        .baselineColumns("c")
+        .baselineValues(71L)
+        .go();
+
+      testBuilder()
+        .sqlQuery("select count(*) as c from %s where VarbinaryValue1 is 
null", DATAFILE)
+        .unOrdered()
+        .baselineColumns("c")
+        .baselineValues(44L)
+        .go();
+    } finally {
+      resetSession();
+    }
+  }
+
+  /** Load variable length data which has non-nulls data */
+  @Test
+  public void testNotNullCount() throws Exception {
+    try {
+      alterSession();
+
+      testBuilder()
+        .sqlQuery("select count(*) as c from %s where VarbinaryValue3 is not 
null", DATAFILE)
+        .unOrdered()
+        .baselineColumns("c")
+        .baselineValues(0L)
+        .go();
+
+      testBuilder()
+        .sqlQuery("select count(*) as c from %s where VarbinaryValue1 is not 
null", DATAFILE)
+        .unOrdered()
+        .baselineColumns("c")
+        .baselineValues(27L)
+        .go();
+    } finally {
+      resetSession();
+    }
+  }
+
+  /** Load variable columns with fixed length data with large precision and 
null values */
+  @Test
+  public void testFixedLengthWithLargePrecisionAndNulls() throws Exception {
+    try {
+      alterSession();
+
+      testBuilder()
+        .sqlQuery("select count(*) as c from %s where index < 50 and 
length(VarbinaryValue1) = 400", DATAFILE)
+        .unOrdered()
+        .baselineColumns("c")
+        .baselineValues(25L)
+        .go();
+    } finally {
+      resetSession();
+    }
+  }
+
+  /** Load variable length data which was originally fixed length and then 
became variable length */
+  @Test
+  public void testFixedLengthToVarlen() throws Exception {
+    try {
+      alterSession();
+
+      testBuilder()
+        .sqlQuery("select count(*) as c from %s where index < 60 and 
length(VarbinaryValue1) <= 800", DATAFILE)
+        .unOrdered()
+        .baselineColumns("c")
+        .baselineValues(27L)
+        .go();
+    } finally {
+      resetSession();
+    }
+  }
+
+  /** Load variable length data with values larger than chunk size (4k) */
+  @Test
+  public void testLargeVarlen() throws Exception {
+    try {
+      alterSession();
+
+      testBuilder()
+        .sqlQuery("select count(*) as c from %s where length(VarbinaryValue2) 
= 4500", DATAFILE)
+        .unOrdered()
+        .baselineColumns("c")
+        .baselineValues(19L)
+        .go();
+    } finally {
+      resetSession();
+    }
+  }
+
+  private void alterSession() {
+    client.alterSession(ExecConstants.PARQUET_FLAT_READER_BULK, true);
+    
client.alterSession(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_KEY,
 false);
+  }
+
+  private void resetSession() {
+    client.resetSession(ExecConstants.PARQUET_FLAT_READER_BULK);
+    
client.resetSession(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_KEY);
+  }
+
+}
diff --git 
a/exec/java-exec/src/test/resources/parquet/fourvarchar_asc_nulls.parquet 
b/exec/java-exec/src/test/resources/parquet/fourvarchar_asc_nulls.parquet
new file mode 100644
index 0000000..abe4aaf
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/parquet/fourvarchar_asc_nulls.parquet differ

Reply via email to