Repository: spark
Updated Branches:
  refs/heads/master 2a0a8f753 -> 8cc591c91


[SPARK-25164][SQL] Avoid rebuilding column and path list for each column in 
parquet reader

## What changes were proposed in this pull request?

VectorizedParquetRecordReader::initializeInternal rebuilds the column list and 
path list once for each column. Therefore, it indirectly iterates 
2\*colCount\*colCount times for each parquet file.

This inefficiency impacts jobs that read parquet-backed tables with many 
columns and many files. Jobs that read tables with few columns or few files are 
not impacted.

This PR changes initializeInternal so that it builds each list only once.

I ran benchmarks on my laptop with 1 worker thread, running this query:
<pre>
sql("select * from parquet_backed_table where id1 = 1").collect
</pre>
There are roughly one matching row for every 425 rows, and the matching rows 
are sprinkled pretty evenly throughout the table (that is, every page for 
column <code>id1</code> has at least one matching row).

6000 columns, 1 million rows, 67 32M files:

master | branch | improvement
-------|---------|-----------
10.87 min | 6.09 min | 44%

6000 columns, 1 million rows, 23 98m files:

master | branch | improvement
-------|---------|-----------
7.39 min | 5.80 min | 21%

600 columns 10 million rows, 67 32M files:

master | branch | improvement
-------|---------|-----------
1.95 min | 1.96 min | -0.5%

60 columns, 100 million rows, 67 32M files:

master | branch | improvement
-------|---------|-----------
0.55 min | 0.55 min | 0%

## How was this patch tested?

- sql unit tests
- pyspark-sql tests

Closes #22188 from bersprockets/SPARK-25164.

Authored-by: Bruce Robbins <bersprock...@gmail.com>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8cc591c9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8cc591c9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8cc591c9

Branch: refs/heads/master
Commit: 8cc591c91a0c63effeed73801299985ba8a4a99e
Parents: 2a0a8f7
Author: Bruce Robbins <bersprock...@gmail.com>
Authored: Thu Aug 23 14:52:23 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Aug 23 14:52:23 2018 +0800

----------------------------------------------------------------------
 .../datasources/parquet/VectorizedParquetRecordReader.java   | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8cc591c9/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 5934a23..f028613 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -270,21 +270,23 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
   private void initializeInternal() throws IOException, 
UnsupportedOperationException {
     // Check that the requested schema is supported.
     missingColumns = new boolean[requestedSchema.getFieldCount()];
+    List<ColumnDescriptor> columns = requestedSchema.getColumns();
+    List<String[]> paths = requestedSchema.getPaths();
     for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
       Type t = requestedSchema.getFields().get(i);
       if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
         throw new UnsupportedOperationException("Complex types not 
supported.");
       }
 
-      String[] colPath = requestedSchema.getPaths().get(i);
+      String[] colPath = paths.get(i);
       if (fileSchema.containsPath(colPath)) {
         ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
-        if (!fd.equals(requestedSchema.getColumns().get(i))) {
+        if (!fd.equals(columns.get(i))) {
           throw new UnsupportedOperationException("Schema evolution not 
supported.");
         }
         missingColumns[i] = false;
       } else {
-        if (requestedSchema.getColumns().get(i).getMaxDefinitionLevel() == 0) {
+        if (columns.get(i).getMaxDefinitionLevel() == 0) {
           // Column is missing in data but the required data is non-nullable. 
This file is invalid.
           throw new IOException("Required column is missing in data file. Col: 
" +
             Arrays.toString(colPath));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to