Repository: spark
Updated Branches:
  refs/heads/branch-2.3 f5983823e -> 8db935f97


[SPARK-25164][SQL] Avoid rebuilding column and path list for each column in 
parquet reader

## What changes were proposed in this pull request?

VectorizedParquetRecordReader::initializeInternal rebuilds the column list and 
path list once for each column. Therefore, it indirectly iterates 
2\*colCount\*colCount times for each parquet file.

This inefficiency impacts jobs that read parquet-backed tables with many 
columns and many files. Jobs that read tables with few columns or few files are 
not impacted.

This PR changes initializeInternal so that it builds each list only once.

I ran benchmarks on my laptop with 1 worker thread, running this query:
<pre>
sql("select * from parquet_backed_table where id1 = 1").collect
</pre>
There are roughly one matching row for every 425 rows, and the matching rows 
are sprinkled pretty evenly throughout the table (that is, every page for 
column <code>id1</code> has at least one matching row).

6000 columns, 1 million rows, 67 32M files:

master | branch | improvement
-------|---------|-----------
10.87 min | 6.09 min | 44%

6000 columns, 1 million rows, 23 98m files:

master | branch | improvement
-------|---------|-----------
7.39 min | 5.80 min | 21%

600 columns 10 million rows, 67 32M files:

master | branch | improvement
-------|---------|-----------
1.95 min | 1.96 min | -0.5%

60 columns, 100 million rows, 67 32M files:

master | branch | improvement
-------|---------|-----------
0.55 min | 0.55 min | 0%

## How was this patch tested?

- sql unit tests
- pyspark-sql tests

Closes #22188 from bersprockets/SPARK-25164.

Authored-by: Bruce Robbins <bersprock...@gmail.com>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8db935f9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8db935f9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8db935f9

Branch: refs/heads/branch-2.3
Commit: 8db935f9724d0820a010340651a4d61da1bfa916
Parents: f598382
Author: Bruce Robbins <bersprock...@gmail.com>
Authored: Thu Aug 23 14:52:23 2018 +0800
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Mon Aug 27 16:08:13 2018 -0700

----------------------------------------------------------------------
 .../datasources/parquet/VectorizedParquetRecordReader.java   | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8db935f9/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index bb1b236..0fcb4a9 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -272,21 +272,23 @@ public class VectorizedParquetRecordReader extends 
SpecificParquetRecordReaderBa
   private void initializeInternal() throws IOException, 
UnsupportedOperationException {
     // Check that the requested schema is supported.
     missingColumns = new boolean[requestedSchema.getFieldCount()];
+    List<ColumnDescriptor> columns = requestedSchema.getColumns();
+    List<String[]> paths = requestedSchema.getPaths();
     for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
       Type t = requestedSchema.getFields().get(i);
       if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) {
         throw new UnsupportedOperationException("Complex types not 
supported.");
       }
 
-      String[] colPath = requestedSchema.getPaths().get(i);
+      String[] colPath = paths.get(i);
       if (fileSchema.containsPath(colPath)) {
         ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
-        if (!fd.equals(requestedSchema.getColumns().get(i))) {
+        if (!fd.equals(columns.get(i))) {
           throw new UnsupportedOperationException("Schema evolution not 
supported.");
         }
         missingColumns[i] = false;
       } else {
-        if (requestedSchema.getColumns().get(i).getMaxDefinitionLevel() == 0) {
+        if (columns.get(i).getMaxDefinitionLevel() == 0) {
           // Column is missing in data but the required data is non-nullable. 
This file is invalid.
           throw new IOException("Required column is missing in data file. Col: 
" +
             Arrays.toString(colPath));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to