[
https://issues.apache.org/jira/browse/SPARK-19609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16020287#comment-16020287
]
Jayesh lalwani commented on SPARK-19609:
----------------------------------------
Just like to point out that this would help applications that are using the
Structured Streaming capabilities of Spark immensely, especially when you are
talking about implementing applications that [~matei] refers to as "Continuous
applications"
One of the selling points of Structured Streaming is that you can join
streaming data with data from batch sources, like databases. In these cases, it
is frequently required that streaming data be joined with database tables that
contain millions(or even billions) of rows. This meets the exact profile of
data that this ticket describes: A smaller set of data being joined with a
larger set
The problem is that without this change, there is no good way of joining a
streaming data frame with a large database table. You can either
a) use spark.read.jdbc to read the huge table into a data frame, and then join
it with a streaming data frame
OR
b) call df.map/df.mapWithPartition on your streaming data frame, and run a
database query yourself
The problem right now with a) is that the huge table is read at every execution
of a micro batch. In fact, if your spark application has multiple sinks, it
will run it once for every sink, because Spark doesn't have a good mechanism
for specifying data frames that need to be cached for the micro batch. The
problem with b) is that it adds complexity into the application level code.
And you are really leaving the Dataframe abstraction when you are calling the
map function. For example, what if your SQL query was getting 1000x records.
Suddenly, you have millions of records being created in each partition, and you
have to tell Spark to repartition the output, which incurs cost.
Ideally, we would like to use spark.read.jdbc followed by a join with the
streaming dataframe, and have Spark push down the join predicates to the
database. This will result in less data being pulled into Spark in every micro
batch.
> Broadcast joins should pushdown join constraints as Filter to the larger
> relation
> ---------------------------------------------------------------------------------
>
> Key: SPARK-19609
> URL: https://issues.apache.org/jira/browse/SPARK-19609
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 2.1.0
> Reporter: Nick Dimiduk
>
> For broadcast inner-joins, where the smaller relation is known to be small
> enough to materialize on a worker, the set of values for all join columns is
> known and fits in memory. Spark should translate these values into a
> {{Filter}} pushed down to the datasource. The common join condition of
> equality, i.e. {{lhs.a == rhs.a}}, can be written as an {{a in ...}} clause.
> An example of pushing such filters is already present in the form of
> {{IsNotNull}} filters via [~sameerag]'s work on SPARK-12957 subtasks.
> This optimization could even work when the smaller relation does not fit
> entirely in memory. This could be done by partitioning the smaller relation
> into N pieces, applying this predicate pushdown for each piece, and unioning
> the results.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]