[ 
https://issues.apache.org/jira/browse/STORM-2334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roshan Naik updated STORM-2334:
-------------------------------
    Description: 
Create a general purpose windowed bolt that performs Joins on multiple data 
streams.

Since, depending on the topo config,  the bolt could be receiving data either 
on 'default' streams or on named streams .... join bolt should be able to 
differentiate the incoming data based on names of upstream components as well 
as stream names.

*Example:*

The following SQL style join involving 4 tables :

{code}
select  userId, key4, key2, key3
from stream1 
join       stream2  on stream2.userId =  stream1.key1
join       stream3  on stream3.key3   =  stream2.userId
left join  stream4  on stream4.key4   =  stream3.key3
{code}

Could be expressed using the Join Bolt over 4 named streams as :

{code}
new JoinBolt(STREAM, "stream1", "key1") //'STREAM' arg indicates that 
stream1/2/3/4 are names of streams. 'key1' is the key on which 
     .join     ("stream2", "userId",  "stream1") //join stream2 on 
stream2.userId=stream1.key1
     .join     ("stream3", "key3",    "stream2") //join stream3 on 
stream3.key3=stream2.userId   
     .leftjoin ("stream4", "key4",    "stream3") //left join stream4 on 
stream4.key4=stream3.key3
     .select("userId, key4, key2, key3")         // chose output fields
     .withWindowLength(..)
     .withSlidingInterval(..);
{code}

Or based on named source components :

{code}
new JoinBolt(SOURCE, "kafkaSpout1", "key1") //'SOURCE' arg indicates that 
kafkaSpout1, hdfsSpout3 etc are names of upstream components 
     .join     ("kafkaSpout2", "userId",    "kafkaSpout1" )    
     .join     ("hdfsSpout3",  "key3",      "kafkaSpout2")
     .leftjoin ("mqttSpout1",  "key4",      "hdfsSpout3")
     .select ("userId, key4, key2, key3")
     .withWindowLength(..)
     .withSlidingInterval(..);
{code}


In order for the tuples to  be joined correctly, 'fields grouping' should be 
employed on the incoming streams. Each stream should be grouped on the same key 
using which it will be joined against other streams.  This is a restriction 
compared to SQL which allows join a table with others on any key and any number 
of keys.

*For example:* If a 'Stream1' is Fields Grouped on 'key1', we cannot use a 
different 'key2' on 'Stream1' to join it with other streams. However, 'Stream1' 
can be joined using the same key with multiple other streams as show in this 
SQL.

{code}
select ....
from stream1 
join  stream2  on stream2.userId =  stream1.key1
join  stream3  on stream3.key3   =  stream1.key2  // not supportable in Join 
Bolt 
{code}

Consequently the join bolt's syntax is a bit simplified compared to SQL. The 
key name for any given stream only appears once, as soon the stream is 
introduced for the first time in the join. Thereafter that key is implicitly 
used for joining. See the case of 'stream3' being joined with both 'stream2' 
and 'stream4' in the first example.


  was:
Create a general purpose windowed bolt that performs Joins on multiple data 
streams.

Since, depending on the topo config,  the bolt could be receiving data either 
on 'default' streams or on named streams .... join bolt should be able to 
differentiate the incoming data based on names of upstream components as well 
as stream names.

*Example:*

The following SQL style join involving 4 tables :

{code}
select  userId, key4, key2, key3
from stream1 
join       stream2  on stream2.userId =  stream1.key1
join       stream3  on stream3.key3   =  stream2.userId
left join  stream4  on stream4.key4   =  stream3.key3
{code}

Could be expressed using the Join Bolt over 4 named streams as :

{code}
new JoinBolt(STREAM, "stream1", "key1") //'STREAM' arg indicates that 
stream1/2/3/4 are names of streams. 'key1' is the key on which 
     .join     ("stream2", "userId",  "stream1") //join stream2 on 
stream2.userId=stream1.key1
     .join     ("stream3", "key3",    "stream2") //join stream3 on 
stream3.key3=stream2.userId   
     .leftjoin ("stream4", "key4",    "stream3") //left join stream4 on 
stream4.key4=stream3.key3
     .select("userId, key4, key2, key3")
     .withWindowLength(..)
     .withSlidingInterval(..);
{code}

Or based on named source components :

{code}
new JoinBolt(SOURCE, "kafkaSpout1", "key1") //'SOURCE' arg indicates that 
kafkaSpout1, hdfsSpout3 etc are names of upstream components 
     .join     ("kafkaSpout2", "userId",    "kafkaSpout1" )    
     .join     ("hdfsSpout3",  "key3",      "kafkaSpout2")
     .leftjoin ("mqttSpout1",  "key4",      "hdfsSpout3")
     .select ("userId,key4,key2,key3")
     .withWindowLength(..)
     .withSlidingInterval(..);
{code}


In order for the tuples to  be joined correctly, 'fields grouping' should be 
employed on the incoming streams. Each stream should be grouped on the same key 
that will be used to join that stream against other streams.  This is a 
restriction compared to SQL.

*For example:* If a 'Stream1' is Fields Grouped on 'key1', we cannot use a 
different 'key2' on 'Stream1' to join it with other streams. However, 'Stream1' 
can be joined using the same key with multiple other streams as show in this 
SQL.

{code}
select ....
from stream1 
join  stream2  on stream2.userId =  stream1.key1
join  stream3  on stream3.key3   =  stream1.key2  // not supportable in Join 
Bolt 
{code}

Consequently the join bolt's syntax is a bit simplified compared to SQL. The 
key name for any given stream only appears once, as soon the stream is 
introduced for the first time in the join. Thereafter that key is implicitly 
used for joining. See the case of 'stream3' being joined with both 'stream2' 
and 'stream4' in the first example.



> Bolt for Joining streams
> ------------------------
>
>                 Key: STORM-2334
>                 URL: https://issues.apache.org/jira/browse/STORM-2334
>             Project: Apache Storm
>          Issue Type: Bug
>    Affects Versions: 2.0.0, 1.x
>            Reporter: Roshan Naik
>            Assignee: Roshan Naik
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Create a general purpose windowed bolt that performs Joins on multiple data 
> streams.
> Since, depending on the topo config,  the bolt could be receiving data either 
> on 'default' streams or on named streams .... join bolt should be able to 
> differentiate the incoming data based on names of upstream components as well 
> as stream names.
> *Example:*
> The following SQL style join involving 4 tables :
> {code}
> select  userId, key4, key2, key3
> from stream1 
> join       stream2  on stream2.userId =  stream1.key1
> join       stream3  on stream3.key3   =  stream2.userId
> left join  stream4  on stream4.key4   =  stream3.key3
> {code}
> Could be expressed using the Join Bolt over 4 named streams as :
> {code}
> new JoinBolt(STREAM, "stream1", "key1") //'STREAM' arg indicates that 
> stream1/2/3/4 are names of streams. 'key1' is the key on which 
>      .join     ("stream2", "userId",  "stream1") //join stream2 on 
> stream2.userId=stream1.key1
>      .join     ("stream3", "key3",    "stream2") //join stream3 on 
> stream3.key3=stream2.userId   
>      .leftjoin ("stream4", "key4",    "stream3") //left join stream4 on 
> stream4.key4=stream3.key3
>      .select("userId, key4, key2, key3")         // chose output fields
>      .withWindowLength(..)
>      .withSlidingInterval(..);
> {code}
> Or based on named source components :
> {code}
> new JoinBolt(SOURCE, "kafkaSpout1", "key1") //'SOURCE' arg indicates that 
> kafkaSpout1, hdfsSpout3 etc are names of upstream components 
>      .join     ("kafkaSpout2", "userId",    "kafkaSpout1" )    
>      .join     ("hdfsSpout3",  "key3",      "kafkaSpout2")
>      .leftjoin ("mqttSpout1",  "key4",      "hdfsSpout3")
>      .select ("userId, key4, key2, key3")
>      .withWindowLength(..)
>      .withSlidingInterval(..);
> {code}
> In order for the tuples to  be joined correctly, 'fields grouping' should be 
> employed on the incoming streams. Each stream should be grouped on the same 
> key using which it will be joined against other streams.  This is a 
> restriction compared to SQL which allows join a table with others on any key 
> and any number of keys.
> *For example:* If a 'Stream1' is Fields Grouped on 'key1', we cannot use a 
> different 'key2' on 'Stream1' to join it with other streams. However, 
> 'Stream1' can be joined using the same key with multiple other streams as 
> show in this SQL.
> {code}
> select ....
> from stream1 
> join  stream2  on stream2.userId =  stream1.key1
> join  stream3  on stream3.key3   =  stream1.key2  // not supportable in Join 
> Bolt 
> {code}
> Consequently the join bolt's syntax is a bit simplified compared to SQL. The 
> key name for any given stream only appears once, as soon the stream is 
> introduced for the first time in the join. Thereafter that key is implicitly 
> used for joining. See the case of 'stream3' being joined with both 'stream2' 
> and 'stream4' in the first example.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to