[
https://issues.apache.org/jira/browse/SPARK-11838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sean Owen updated SPARK-11838:
------------------------------
Component/s: SQL
> Spark SQL query fragment RDD reuse
> ----------------------------------
>
> Key: SPARK-11838
> URL: https://issues.apache.org/jira/browse/SPARK-11838
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Reporter: Mikhail Bautin
>
> With many analytical Spark SQL workloads against slowly changing tables,
> successive queries frequently share fragments that produce the same result.
> Instead of re-computing those fragments for every query, it makes sense to
> detect similar fragments and substitute RDDs previously created for matching
> SparkPlan fragments into every new SparkPlan at the execution time whenever
> possible. Even if no RDDs are persist()-ed to memory/disk/off-heap memory,
> many stages can still be skipped due to map output files being present on
> executor nodes.
> The implementation involves the following steps:
> (1) Logical plan "canonicalization".
> Logical plans mapping to the same "canonical" logical plan should always
> produce the same results (except for possible output column reordering),
> although the inverse statement won't always be true.
> - Re-mapping expression ids to "canonical expression ids" (successively
> increasing numbers always starting with 1).
> - Eliminating alias names that are unimportant after analysis completion.
> Only the names that are necessary to determine the Hive table columns to be
> scanned are retained.
> - Reordering columns in projections, grouping/aggregation expressions, etc.
> This can be done e.g. by using the string representation as a sort key. Union
> inputs always have to be reordered the same way.
> - Tree traversal has to happen starting from leaves and progressing towards
> the root, because we need to already have identified canonical expression ids
> for children of a node before we can come up with sort keys that would allow
> to reorder expressions in a node deterministically. This is a bit more
> complicated for Union nodes.
> - Special handling for MetastoreRelations. We replace MetastoreRelation
> with a special class CanonicalMetastoreRelation that uses attributes and
> partitionKeys as part of its equals() and hashCode() implementation, but the
> visible attributes and aprtitionKeys are restricted to expression ids that
> the rest of the query actually needs from that MetastoreRelation.
> An example of logical plans and corresponding canonical logical plans:
> https://gist.githubusercontent.com/mbautin/ef1317b341211d9606cf/raw
> (2) Tracking LogicalPlan fragments corresponding to SparkPlan fragments. When
> generating a SparkPlan, we keep an optional reference to a LogicalPlan
> instance in every node. This allows us to populate the cache with mappings
> from canonical logical plans of query fragments to the corresponding RDDs
> generated as part of query execution. Note that there is no new work
> necessary to generate the RDDs, we are merely utilizing the RDDs that would
> have been produced as part of SparkPlan execution anyway.
> (3) SparkPlan fragment substitution. After generating a SparkPlan and before
> calling prepare() or execute() on it, we check if any of its nodes have an
> associated LogicalPlan that maps to a canonical logical plan matching a cache
> entry. If so, we substitute a PhysicalRDD (or a new class UnsafePhysicalRDD
> wrapping an RDD of UnsafeRow) scanning the previously created RDD instead of
> the current query fragment. If the expected column order differs from what
> the current SparkPlan fragment produces, we add a projection to reorder the
> columns. We also add safe/unsafe row conversions as necessary to match the
> row type that is expected by the parent of the current SparkPlan fragment.
> (4) The execute() method of SparkPlan also needs to perform the cache lookup
> and substitution described above before producing a new RDD for the current
> SparkPlan node. The "loading cache" pattern (e.g. as implemented in Guava)
> allows to reuse query fragments between simultaneously submitted queries:
> whichever query runs execute() for a particular fragment's canonical logical
> plan starts producing an RDD first, and if another query has a fragment with
> the same canonical logical plan, it waits for the RDD to be produced by the
> first query and inserts it in its SparkPlan instead.
> This kind of query fragment caching will mostly be useful for slowly-changing
> or static tables. Even with slowly-changing tables, the cache needs to be
> invalidated when those data set changes take place. One of the following
> approaches could be used:
> - Application logic could explicitly invalidate the cache when it detects a
> change
> - We could add a key that encodes the set of files in HDFS present at the
> moment of LogicalPlan creation to CanonicalMetastoreRelation
> - We could append version numbers to table names that are increased whenever
> a table is updated. This version number stays in the LogicalPlan but gets
> removed before doing a Hive table lookup. It could also be used to filter the
> set of files to scan from the Hive table.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]