[
https://issues.apache.org/jira/browse/SPARK-25164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bruce Robbins updated SPARK-25164:
----------------------------------
Description:
{{VectorizedParquetRecordReader.initializeInternal}} loops through each column,
and for each column it calls
{noformat}
requestedSchema.getColumns().get(i)
{noformat}
However, {{MessageType.getColumns}} will build the entire column list from
getPaths(0).
{noformat}
public List<ColumnDescriptor> getColumns() {
List<String[]> paths = this.getPaths(0);
List<ColumnDescriptor> columns = new
ArrayList<ColumnDescriptor>(paths.size());
for (String[] path : paths) {
// TODO: optimize this
PrimitiveType primitiveType = getType(path).asPrimitiveType();
columns.add(new ColumnDescriptor(
path,
primitiveType,
getMaxRepetitionLevel(path),
getMaxDefinitionLevel(path)));
}
return columns;
}
{noformat}
This means that for each parquet file, this routine indirectly iterates
colCount*colCount times.
This is actually not particularly noticeable unless you have:
- many parquet files
- many columns
To verify that this is an issue, I created a 1 million record parquet table
with 6000 columns of type double and 67 files (so initializeInternal is called
67 times). I ran the following query:
{noformat}
sql("select * from 6000_1m_double where id1 = 1").collect
{noformat}
I used Spark from the master branch. I had 8 executor threads. The filter
returns only a few thousand records. The query ran (on average) for 6.4 minutes.
Then I cached the column list at the top of {{initializeInternal}} as follows:
{noformat}
List<ColumnDescriptor> columnCache = requestedSchema.getColumns();
{noformat}
Then I changed {{initializeInternal}} to use {{columnCache}} rather than
{{requestedSchema.getColumns()}}.
With the column cache variable, the same query runs in 5 minutes. So with my
simple query, you save %22 of time by not rebuilding the column list for each
column.
You get additional savings with a paths cache variable, now saving 34% in total
on the above query.
was:
{{VectorizedParquetRecordReader.initializeInternal}} loops through each column,
and for each column it calls
{noformat}
requestedSchema.getColumns().get(i)
{noformat}
However, {{MessageType.getColumns}} will build the entire column list from
getPaths(0).
{noformat}
public List<ColumnDescriptor> getColumns() {
List<String[]> paths = this.getPaths(0);
List<ColumnDescriptor> columns = new
ArrayList<ColumnDescriptor>(paths.size());
for (String[] path : paths) {
// TODO: optimize this
PrimitiveType primitiveType = getType(path).asPrimitiveType();
columns.add(new ColumnDescriptor(
path,
primitiveType,
getMaxRepetitionLevel(path),
getMaxDefinitionLevel(path)));
}
return columns;
}
{noformat}
This means that for each parquet file, this routine indirectly iterates
colCount*colCount times.
This is actually not particularly noticeable unless you have:
- many parquet files
- many columns
To verify that this is an issue, I created a 1 million record parquet table
with 6000 columns of type double and 67 files (so initializeInternal is called
67 times). I ran the following query:
{noformat}
sql("select * from 6000_1m_double where id1 = 1").collect
{noformat}
I used Spark from the master branch. I had 8 executor threads. The filter
returns only a few thousand records. The query ran (on average) for 6.4 minutes.
Then I cached the column list at the top of {{initializeInternal}} as follows:
{noformat}
List<ColumnDescriptor> columnCache = requestedSchema.getColumns();
{noformat}
Then I changed {{initializeInternal}} to use {{columnCache}} rather than
{{requestedSchema.getColumns()}}.
With the column cache variable, the same query runs in 5 minutes. So with my
simple query, you save %22 of time by not rebuilding the column list for each
column.
> Parquet reader builds entire list of columns once for each column
> -----------------------------------------------------------------
>
> Key: SPARK-25164
> URL: https://issues.apache.org/jira/browse/SPARK-25164
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.4.0
> Reporter: Bruce Robbins
> Priority: Minor
>
> {{VectorizedParquetRecordReader.initializeInternal}} loops through each
> column, and for each column it calls
> {noformat}
> requestedSchema.getColumns().get(i)
> {noformat}
> However, {{MessageType.getColumns}} will build the entire column list from
> getPaths(0).
> {noformat}
> public List<ColumnDescriptor> getColumns() {
> List<String[]> paths = this.getPaths(0);
> List<ColumnDescriptor> columns = new
> ArrayList<ColumnDescriptor>(paths.size());
> for (String[] path : paths) {
> // TODO: optimize this
>
> PrimitiveType primitiveType = getType(path).asPrimitiveType();
> columns.add(new ColumnDescriptor(
> path,
> primitiveType,
> getMaxRepetitionLevel(path),
> getMaxDefinitionLevel(path)));
> }
> return columns;
> }
> {noformat}
> This means that for each parquet file, this routine indirectly iterates
> colCount*colCount times.
> This is actually not particularly noticeable unless you have:
> - many parquet files
> - many columns
> To verify that this is an issue, I created a 1 million record parquet table
> with 6000 columns of type double and 67 files (so initializeInternal is
> called 67 times). I ran the following query:
> {noformat}
> sql("select * from 6000_1m_double where id1 = 1").collect
> {noformat}
> I used Spark from the master branch. I had 8 executor threads. The filter
> returns only a few thousand records. The query ran (on average) for 6.4
> minutes.
> Then I cached the column list at the top of {{initializeInternal}} as follows:
> {noformat}
> List<ColumnDescriptor> columnCache = requestedSchema.getColumns();
> {noformat}
> Then I changed {{initializeInternal}} to use {{columnCache}} rather than
> {{requestedSchema.getColumns()}}.
> With the column cache variable, the same query runs in 5 minutes. So with my
> simple query, you save %22 of time by not rebuilding the column list for each
> column.
> You get additional savings with a paths cache variable, now saving 34% in
> total on the above query.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]