[ 
https://issues.apache.org/jira/browse/SPARK-22053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-22053:
----------------------------------
    Description: 
Stream-stream inner join is traditionally implemented using a two-way symmetric 
hash join. At a high level, we want to do the following.
1. For each stream, we maintain the past rows as state in State Store. 
    - For each joining key, there can be multiple rows that have been received. 
    - So, we have to effectively maintain a key-to-list-of-values multimap as 
state for each stream.
2. In each batch, for each input row in each stream
    - Look up the other streams state to see if there are are matching rows, 
and output them if they satisfy the joining condition
    - Add the input row to corresponding stream’s state.
    - If the data has a timestamp/window column with watermark, then we will 
use that to calculate the threshold for keys that are required to buffered for 
future matches and drop the rest from the state.

Cleaning up old unnecessary state rows depends completely on whether watermark 
has been defined and what are join conditions. We definitely want to support 
state clean up two types of queries that are likely to be common. 
- Queries to time range conditions - E.g. {{SELECT * FROM leftTable, rightTable 
ON leftKey = rightKey AND leftTime > rightTime - INTERVAL 8 MINUTES AND 
leftTime < rightTime + INTERVAL 1 HOUR}}
- Queries with windows as the matching key - E.g. {{SELECT * FROM leftTable, 
rightTable ON leftKey = rightKey AND window(leftTime, "1 hour") = 
window(rightTime, "1 hour")}} (pseudo-SQL)


  was:
Stream-stream inner join is traditionally implemented using a two-way symmetric 
hash join. At a high level, we want to do the following.
- For each stream, we maintain the past rows as state in State Store. 
    - For each joining key, there can be multiple rows that have been received. 
    - So, we have to effectively maintain a key-to-list-of-values multimap as 
state for each stream.
- In each batch, for each input row in each stream
    - Look up the other streams state to see if there are are matching rows, 
and output them if they satisfy the joining condition
    - Add the input row to corresponding stream’s state.
    - If the data has a timestamp/window column with watermark, then we will 
use that to calculate the threshold for keys that are required to buffered for 
future matches, and drop the rest from the state.






> Implement stream-stream inner join
> ----------------------------------
>
>                 Key: SPARK-22053
>                 URL: https://issues.apache.org/jira/browse/SPARK-22053
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Tathagata Das
>            Assignee: Tathagata Das
>
> Stream-stream inner join is traditionally implemented using a two-way 
> symmetric hash join. At a high level, we want to do the following.
> 1. For each stream, we maintain the past rows as state in State Store. 
>     - For each joining key, there can be multiple rows that have been 
> received. 
>     - So, we have to effectively maintain a key-to-list-of-values multimap as 
> state for each stream.
> 2. In each batch, for each input row in each stream
>     - Look up the other streams state to see if there are are matching rows, 
> and output them if they satisfy the joining condition
>     - Add the input row to corresponding stream’s state.
>     - If the data has a timestamp/window column with watermark, then we will 
> use that to calculate the threshold for keys that are required to buffered 
> for future matches and drop the rest from the state.
> Cleaning up old unnecessary state rows depends completely on whether 
> watermark has been defined and what are join conditions. We definitely want 
> to support state clean up two types of queries that are likely to be common. 
> - Queries to time range conditions - E.g. {{SELECT * FROM leftTable, 
> rightTable ON leftKey = rightKey AND leftTime > rightTime - INTERVAL 8 
> MINUTES AND leftTime < rightTime + INTERVAL 1 HOUR}}
> - Queries with windows as the matching key - E.g. {{SELECT * FROM leftTable, 
> rightTable ON leftKey = rightKey AND window(leftTime, "1 hour") = 
> window(rightTime, "1 hour")}} (pseudo-SQL)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to