[ 
https://issues.apache.org/jira/browse/SPARK-22053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das updated SPARK-22053:
----------------------------------
    Description: 
Stream-stream inner join is traditionally implemented using a two-way symmetric 
hash join. At a high level, we want to do the following.
- For each stream, we maintain the past rows as state in State Store. 
  - For each joining key, there can be multiple rows that have been received. 
  - So, we have to effectively maintain a key-to-list-of-values multimap as 
state for each stream.
- In each batch, for each input row in each stream
  - Look up the other streams state to see if there are are matching rows, and 
output them if they satisfy the joining condition
  - Add the input row to corresponding stream’s state.
  - If the data has a timestamp/window column with watermark, then we will use 
that to calculate the threshold for keys that are required to buffered for 
future matches, and drop the rest from the state.





> Implement stream-stream inner join
> ----------------------------------
>
>                 Key: SPARK-22053
>                 URL: https://issues.apache.org/jira/browse/SPARK-22053
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Tathagata Das
>            Assignee: Tathagata Das
>
> Stream-stream inner join is traditionally implemented using a two-way 
> symmetric hash join. At a high level, we want to do the following.
> - For each stream, we maintain the past rows as state in State Store. 
>   - For each joining key, there can be multiple rows that have been received. 
>   - So, we have to effectively maintain a key-to-list-of-values multimap as 
> state for each stream.
> - In each batch, for each input row in each stream
>   - Look up the other streams state to see if there are are matching rows, 
> and output them if they satisfy the joining condition
>   - Add the input row to corresponding stream’s state.
>   - If the data has a timestamp/window column with watermark, then we will 
> use that to calculate the threshold for keys that are required to buffered 
> for future matches, and drop the rest from the state.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to