Repository: spark Updated Branches: refs/heads/branch-2.3 f5983823e -> 8db935f97
[SPARK-25164][SQL] Avoid rebuilding column and path list for each column in parquet reader ## What changes were proposed in this pull request? VectorizedParquetRecordReader::initializeInternal rebuilds the column list and path list once for each column. Therefore, it indirectly iterates 2\*colCount\*colCount times for each parquet file. This inefficiency impacts jobs that read parquet-backed tables with many columns and many files. Jobs that read tables with few columns or few files are not impacted. This PR changes initializeInternal so that it builds each list only once. I ran benchmarks on my laptop with 1 worker thread, running this query: <pre> sql("select * from parquet_backed_table where id1 = 1").collect </pre> There are roughly one matching row for every 425 rows, and the matching rows are sprinkled pretty evenly throughout the table (that is, every page for column <code>id1</code> has at least one matching row). 6000 columns, 1 million rows, 67 32M files: master | branch | improvement -------|---------|----------- 10.87 min | 6.09 min | 44% 6000 columns, 1 million rows, 23 98m files: master | branch | improvement -------|---------|----------- 7.39 min | 5.80 min | 21% 600 columns 10 million rows, 67 32M files: master | branch | improvement -------|---------|----------- 1.95 min | 1.96 min | -0.5% 60 columns, 100 million rows, 67 32M files: master | branch | improvement -------|---------|----------- 0.55 min | 0.55 min | 0% ## How was this patch tested? - sql unit tests - pyspark-sql tests Closes #22188 from bersprockets/SPARK-25164. Authored-by: Bruce Robbins <bersprock...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8db935f9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8db935f9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8db935f9 Branch: refs/heads/branch-2.3 Commit: 8db935f9724d0820a010340651a4d61da1bfa916 Parents: f598382 Author: Bruce Robbins <bersprock...@gmail.com> Authored: Thu Aug 23 14:52:23 2018 +0800 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Mon Aug 27 16:08:13 2018 -0700 ---------------------------------------------------------------------- .../datasources/parquet/VectorizedParquetRecordReader.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/8db935f9/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index bb1b236..0fcb4a9 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -272,21 +272,23 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa private void initializeInternal() throws IOException, UnsupportedOperationException { // Check that the requested schema is supported. missingColumns = new boolean[requestedSchema.getFieldCount()]; + List<ColumnDescriptor> columns = requestedSchema.getColumns(); + List<String[]> paths = requestedSchema.getPaths(); for (int i = 0; i < requestedSchema.getFieldCount(); ++i) { Type t = requestedSchema.getFields().get(i); if (!t.isPrimitive() || t.isRepetition(Type.Repetition.REPEATED)) { throw new UnsupportedOperationException("Complex types not supported."); } - String[] colPath = requestedSchema.getPaths().get(i); + String[] colPath = paths.get(i); if (fileSchema.containsPath(colPath)) { ColumnDescriptor fd = fileSchema.getColumnDescription(colPath); - if (!fd.equals(requestedSchema.getColumns().get(i))) { + if (!fd.equals(columns.get(i))) { throw new UnsupportedOperationException("Schema evolution not supported."); } missingColumns[i] = false; } else { - if (requestedSchema.getColumns().get(i).getMaxDefinitionLevel() == 0) { + if (columns.get(i).getMaxDefinitionLevel() == 0) { // Column is missing in data but the required data is non-nullable. This file is invalid. throw new IOException("Required column is missing in data file. Col: " + Arrays.toString(colPath)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org