[
https://issues.apache.org/jira/browse/SPARK-26450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Herman van Hovell resolved SPARK-26450.
---------------------------------------
Resolution: Fixed
Assignee: Bruce Robbins
Fix Version/s: 3.0.0
> Map of schema is built too frequently in some wide queries
> ----------------------------------------------------------
>
> Key: SPARK-26450
> URL: https://issues.apache.org/jira/browse/SPARK-26450
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.4.0
> Reporter: Bruce Robbins
> Assignee: Bruce Robbins
> Priority: Minor
> Fix For: 3.0.0
>
>
> When executing queries with wide projections and wide schemas, Spark rebuilds
> an attribute map for the same schema many times.
> For example:
> {noformat}
> select * from orctbl where id1 = 1
> {noformat}
> Assume {{orctbl}} has 6000 columns and 34 files. In that case, the above
> query creates an AttributeSeq object 270,000 times[1]. Each AttributeSeq
> instantiation builds a map of the entire list of 6000 attributes (but not
> until lazy val exprIdToOrdinal is referenced).
> Whenever OrcFileFormat reads a new file, it generates a new unsafe
> projection. That results in this
> [function|https://github.com/apache/spark/blob/827383a97c11a61661440ff86ce0c3382a2a23b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala#L319]
> getting called:
> {code:java}
> protected def bind(in: Seq[Expression], inputSchema: Seq[Attribute]):
> Seq[Expression] =
> in.map(BindReferences.bindReference(_, inputSchema))
> {code}
> For each column in the projection, this line calls bindReference. Each call
> passes inputSchema, a Sequence of Attributes, to a parameter position
> expecting an AttributeSeq. The compiler implicitly calls the constructor for
> AttributeSeq, which (lazily) builds a map for every attribute in the schema.
> Therefore, this function builds a map of the entire schema once for each
> column in the projection, and it does this for each input file. For the above
> example query, this accounts for 204K instantiations of AttributeSeq.
> Readers for CSV and JSON tables do something similar.
> In addition, ProjectExec also creates an unsafe projection for each task. As
> a result, this
> [line|https://github.com/apache/spark/blob/827383a97c11a61661440ff86ce0c3382a2a23b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L91]
> gets called, which has the same issue:
> {code:java}
> def toBoundExprs(exprs: Seq[Expression], inputSchema: Seq[Attribute]):
> Seq[Expression] = {
> exprs.map(BindReferences.bindReference(_, inputSchema))
> }
> {code}
> The above affects all wide queries that have a projection node, regardless of
> the file reader. For the example query, ProjectExec accounts for the
> additional 66K instantiations of the AttributeSeq.
> Spark can save time by pre-building the AttributeSeq right before the map
> operations in {{bind}} and {{toBoundExprs}}. The time saved depends on size
> of schema, size of projection, number of input files (for Orc), number of
> file splits (for CSV, and JSON tables), and number of tasks.
> For a 6000 column CSV table with 500K records and 34 input files, the time
> savings is only 6%[1] because Spark doesn't create as many unsafe projections
> as compared to Orc tables.
> On the other hand, for a 6000 column Orc table with 500K records and 34 input
> files, the time savings is about 16%[1].
> [1] based on queries run in local mode with 8 executor threads on my laptop.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]