[ 
https://issues.apache.org/jira/browse/SPARK-22947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16449935#comment-16449935
 ] 

Li Jin commented on SPARK-22947:
--------------------------------

I came across this blog today:

[https://databricks.com/blog/2018/03/13/introducing-stream-stream-joins-in-apache-spark-2-3.html]

And realized the Ad Monetization problem pretty much described asof join case 
in streaming mode.

 

 

> SPIP: as-of join in Spark SQL
> -----------------------------
>
>                 Key: SPARK-22947
>                 URL: https://issues.apache.org/jira/browse/SPARK-22947
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.2.1
>            Reporter: Li Jin
>            Priority: Major
>         Attachments: SPIP_ as-of join in Spark SQL (1).pdf
>
>
> h2. Background and Motivation
> Time series analysis is one of the most common analysis on financial data. In 
> time series analysis, as-of join is a very common operation. Supporting as-of 
> join in Spark SQL will allow many use cases of using Spark SQL for time 
> series analysis.
> As-of join is “join on time” with inexact time matching criteria. Various 
> library has implemented asof join or similar functionality:
> Kdb: https://code.kx.com/wiki/Reference/aj
> Pandas: 
> http://pandas.pydata.org/pandas-docs/version/0.19.0/merging.html#merging-merge-asof
> R: This functionality is called “Last Observation Carried Forward”
> https://www.rdocumentation.org/packages/zoo/versions/1.8-0/topics/na.locf
> JuliaDB: http://juliadb.org/latest/api/joins.html#IndexedTables.asofjoin
> Flint: https://github.com/twosigma/flint#temporal-join-functions
> This proposal advocates introducing new API in Spark SQL to support as-of 
> join.
> h2. Target Personas
> Data scientists, data engineers
> h2. Goals
> * New API in Spark SQL that allows as-of join
> * As-of join of multiple table (>2) should be performant, because it’s very 
> common that users need to join multiple data sources together for further 
> analysis.
> * Define Distribution, Partitioning and shuffle strategy for ordered time 
> series data
> h2. Non-Goals
> These are out of scope for the existing SPIP, should be considered in future 
> SPIP as improvement to Spark’s time series analysis ability:
> * Utilize partition information from data source, i.e, begin/end of each 
> partition to reduce sorting/shuffling
> * Define API for user to implement asof join time spec in business calendar 
> (i.e. lookback one business day, this is very common in financial data 
> analysis because of market calendars)
> * Support broadcast join
> h2. Proposed API Changes
> h3. TimeContext
> TimeContext is an object that defines the time scope of the analysis, it has 
> begin time (inclusive) and end time (exclusive). User should be able to 
> change the time scope of the analysis (i.e, from one month to five year) by 
> just changing the TimeContext. 
> To Spark engine, TimeContext is a hint that:
> can be used to repartition data for join
> serve as a predicate that can be pushed down to storage layer
> Time context is similar to filtering time by begin/end, the main difference 
> is that time context can be expanded based on the operation taken (see 
> example in as-of join).
> Time context example:
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> {code}
> h3. asofJoin
> h4. User Case A (join without key)
> Join two DataFrames on time, with one day lookback:
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> dfA = ...
> dfB = ...
> JoinSpec joinSpec = JoinSpec(timeContext).on("time").tolerance("-1day")
> result = dfA.asofJoin(dfB, joinSpec)
> {code}
> Example input/output:
> {code:java}
> dfA:
> time, quantity
> 20160101, 100
> 20160102, 50
> 20160104, -50
> 20160105, 100
> dfB:
> time, price
> 20151231, 100.0
> 20160104, 105.0
> 20160105, 102.0
> output:
> time, quantity, price
> 20160101, 100, 100.0
> 20160102, 50, null
> 20160104, -50, 105.0
> 20160105, 100, 102.0
> {code}
> Note row (20160101, 100) of dfA is joined with (20151231, 100.0) of dfB. This 
> is an important illustration of the time context - it is able to expand the 
> context to 20151231 on dfB because of the 1 day lookback.
> h4. Use Case B (join with key)
> To join on time and another key (for instance, id), we use “by” to specify 
> the key.
> {code:java}
> TimeContext timeContext = TimeContext("20160101", "20170101")
> dfA = ...
> dfB = ...
> JoinSpec joinSpec = 
> JoinSpec(timeContext).on("time").by("id").tolerance("-1day")
> result = dfA.asofJoin(dfB, joinSpec)
> {code}
> Example input/output:
> {code:java}
> dfA:
> time, id, quantity
> 20160101, 1, 100
> 20160101, 2, 50
> 20160102, 1, -50
> 20160102, 2, 50
> dfB:
> time, id, price
> 20151231, 1, 100.0
> 20150102, 1, 105.0
> 20150102, 2, 195.0
> Output:
> time, id, quantity, price
> 20160101, 1, 100, 100.0
> 20160101, 2, 50, null
> 20160102, 1, -50, 105.0
> 20160102, 2, 50, 195.0
> {code}
> h2. Optional Design Sketch
> h3. Implementation A
> (This is just initial thought of how to implement this)
> (1) Using begin/end of the TimeContext, we first partition the left DataFrame 
> intonon-overlapping partitions. For the purpose of demonstration, assume we 
> partition it into one-day partitions:
> {code:java}
> [20160101, 20160102) [20160102, 20160103) ... [20161231, 20170101)
> {code}
> (2) Then we partition right DataFrame into overlapping partitions, taking 
> into account tolerance, e.g. one day lookback:
> {code:java}
> [20151231, 20160102) [20160101, 20160103) ... [20161230, 20170101)
> {code}
> (3) Pair left and right partitions
> (4) For each pair of partitions, because all data for the join is in the 
> partition pair, we can now join the partition pair locally.
> (5) Use partitioning in (1) as the output distribution so we can reuse it for 
> sequential asof joins 
> h2. Optional Rejected Sketch
> h3. Rejected Implementation A
> Another implementation is to sample the data to figure out its time range 
> instead of using a time context. This approach is implemented in 
> https://github.com/twosigma/flint
> This approach suffers in performance if sampling data is expensive. For 
> instance, when the data to be sampled is the output of an expensive 
> computation, sampling the data would cause the expensive computation to be 
> done twice.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to