Repository: spark Updated Branches: refs/heads/master 2a0a8f753 -> 8cc591c91
[SPARK-25164][SQL] Avoid rebuilding column and path list for each column in parquet reader ## What changes were proposed in this pull request? VectorizedParquetRecordReader::initializeInternal rebuilds the column list and path list once for each column. Therefore, it indirectly iterates 2\*colCount\*colCount times for each parquet file. This inefficiency impacts jobs that read parquet-backed tables with many columns and many files. Jobs that read tables with few columns or few files are not impacted. This PR changes initializeInternal so that it builds each list only once. I ran benchmarks on my laptop with 1 worker thread, running this query: <pre> sql("select * from parquet_backed_table where id1 = 1").collect </pre> There are roughly one matching row for every 425 rows, and the matching rows are sprinkled pretty evenly throughout the table (that is, every page for column <code>id1</code> has at least one matching row). 6000 columns, 1 million rows, 67 32M files: master | branch | improvement -------|---------|----------- 10.87 min | 6.09 min | 44% 6000 columns, 1 million rows, 23 98m files: master | branch | improvement -------|---------|----------- 7.39 min | 5.80 min | 21% 600 columns 10 million rows, 67 32M files: master | branch | improvement -------|---------|----------- 1.95 min | 1.96 min | -0.5% 60 columns, 100 million rows, 67 32M files: master | branch | improvement -------|---------|----------- 0.55 min | 0.55 min | 0% ## How was this patch tested? - sql unit tests - pyspark-sql tests Closes #22188 from bersprockets/SPARK-25164. Authored-by: Bruce Robbins <bersprock...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8cc591c9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8cc591c9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8cc591c9 Branch: refs/heads/master Commit: 8cc591c91a0c63effeed73801299985ba8a4a99e Parents: 2a0a8f7 Author: Bruce Robbins <bersprock...@gmail.com> Authored: Thu Aug 23 14:52:23 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Thu Aug 23 14:52:23 2018 +0800 ---------------------------------------------------------------------- .../datasources/parquet/VectorizedParquetRecordReader.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8cc591c9/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 5934a23..f028613 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -270,21 +270,23 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa private void initializeInternal() throws IOException, UnsupportedOperationException { // Check that the requested schema is supported. missingColumns = new boolean[requestedSchema.getFieldCount()]; + List<ColumnDescriptor> columns = requestedSchema.getColumns(); + List<String[]> paths = requestedSchema.getPaths(); for (int i = 0; i < requestedSchema.getFieldCount(); ++i) { Type t = requestedSchema.getFields().get(i); if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) { throw new UnsupportedOperationException("Complex types not supported."); } - String[] colPath = requestedSchema.getPaths().get(i); + String[] colPath = paths.get(i); if (fileSchema.containsPath(colPath)) { ColumnDescriptor fd = fileSchema.getColumnDescription(colPath); - if (!fd.equals(requestedSchema.getColumns().get(i))) { + if (!fd.equals(columns.get(i))) { throw new UnsupportedOperationException("Schema evolution not supported."); } missingColumns[i] = false; } else { - if (requestedSchema.getColumns().get(i).getMaxDefinitionLevel() == 0) { + if (columns.get(i).getMaxDefinitionLevel() == 0) { // Column is missing in data but the required data is non-nullable. This file is invalid. throw new IOException("Required column is missing in data file. Col: " + Arrays.toString(colPath)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org