[
https://issues.apache.org/jira/browse/STORM-2334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Roshan Naik updated STORM-2334:
-------------------------------
Description:
Create a general purpose windowed bolt that performs Joins on multiple data
streams.
Since, depending on the topo config, the bolt could be receiving data either
on 'default' streams or on named streams .... join bolt should be able to
differentiate the incoming data based on names of upstream components as well
as stream names.
*Example:*
The following SQL style join involving 4 tables :
{code}
select userId, key4, key2, key3
from stream1
join stream2 on stream2.userId = stream1.key1
join stream3 on stream3.key3 = stream2.userId
left join stream4 on stream4.key4 = stream3.key3
{code}
Could be expressed using the Join Bolt over 4 named streams as :
{code}
new JoinBolt(STREAM, "stream1", "key1") //'STREAM' arg indicates that
stream1/2/3/4 are names of streams. 'key1' is the key on which
.join ("stream2", "userId", "stream1") //join stream2 on
stream2.userId=stream1.key1
.join ("stream3", "key3", "stream2") //join stream3 on
stream3.key3=stream2.userId
.leftjoin ("stream4", "key4", "stream3") //left join stream4 on
stream4.key4=stream3.key3
.select("userId, key4, key2, key3") // chose output fields
.withWindowLength(..)
.withSlidingInterval(..);
{code}
Or based on named source components :
{code}
new JoinBolt(SOURCE, "kafkaSpout1", "key1") //'SOURCE' arg indicates that
kafkaSpout1, hdfsSpout3 etc are names of upstream components
.join ("kafkaSpout2", "userId", "kafkaSpout1" )
.join ("hdfsSpout3", "key3", "kafkaSpout2")
.leftjoin ("mqttSpout1", "key4", "hdfsSpout3")
.select ("userId, key4, key2, key3")
.withWindowLength(..)
.withSlidingInterval(..);
{code}
In order for the tuples to be joined correctly, 'fields grouping' should be
employed on the incoming streams. Each stream should be grouped on the same key
using which it will be joined against other streams. This is a restriction
compared to SQL which allows join a table with others on any key and any number
of keys.
*For example:* If a 'Stream1' is Fields Grouped on 'key1', we cannot use a
different 'key2' on 'Stream1' to join it with other streams. However, 'Stream1'
can be joined using the same key with multiple other streams as show in this
SQL.
{code}
select ....
from stream1
join stream2 on stream2.userId = stream1.key1
join stream3 on stream3.key3 = stream1.key2 // not supportable in Join
Bolt
{code}
Consequently the join bolt's syntax is a bit simplified compared to SQL. The
key name for any given stream only appears once, as soon the stream is
introduced for the first time in the join. Thereafter that key is implicitly
used for joining. See the case of 'stream3' being joined with both 'stream2'
and 'stream4' in the first example.
was:
Create a general purpose windowed bolt that performs Joins on multiple data
streams.
Since, depending on the topo config, the bolt could be receiving data either
on 'default' streams or on named streams .... join bolt should be able to
differentiate the incoming data based on names of upstream components as well
as stream names.
*Example:*
The following SQL style join involving 4 tables :
{code}
select userId, key4, key2, key3
from stream1
join stream2 on stream2.userId = stream1.key1
join stream3 on stream3.key3 = stream2.userId
left join stream4 on stream4.key4 = stream3.key3
{code}
Could be expressed using the Join Bolt over 4 named streams as :
{code}
new JoinBolt(STREAM, "stream1", "key1") //'STREAM' arg indicates that
stream1/2/3/4 are names of streams. 'key1' is the key on which
.join ("stream2", "userId", "stream1") //join stream2 on
stream2.userId=stream1.key1
.join ("stream3", "key3", "stream2") //join stream3 on
stream3.key3=stream2.userId
.leftjoin ("stream4", "key4", "stream3") //left join stream4 on
stream4.key4=stream3.key3
.select("userId, key4, key2, key3")
.withWindowLength(..)
.withSlidingInterval(..);
{code}
Or based on named source components :
{code}
new JoinBolt(SOURCE, "kafkaSpout1", "key1") //'SOURCE' arg indicates that
kafkaSpout1, hdfsSpout3 etc are names of upstream components
.join ("kafkaSpout2", "userId", "kafkaSpout1" )
.join ("hdfsSpout3", "key3", "kafkaSpout2")
.leftjoin ("mqttSpout1", "key4", "hdfsSpout3")
.select ("userId,key4,key2,key3")
.withWindowLength(..)
.withSlidingInterval(..);
{code}
In order for the tuples to be joined correctly, 'fields grouping' should be
employed on the incoming streams. Each stream should be grouped on the same key
that will be used to join that stream against other streams. This is a
restriction compared to SQL.
*For example:* If a 'Stream1' is Fields Grouped on 'key1', we cannot use a
different 'key2' on 'Stream1' to join it with other streams. However, 'Stream1'
can be joined using the same key with multiple other streams as show in this
SQL.
{code}
select ....
from stream1
join stream2 on stream2.userId = stream1.key1
join stream3 on stream3.key3 = stream1.key2 // not supportable in Join
Bolt
{code}
Consequently the join bolt's syntax is a bit simplified compared to SQL. The
key name for any given stream only appears once, as soon the stream is
introduced for the first time in the join. Thereafter that key is implicitly
used for joining. See the case of 'stream3' being joined with both 'stream2'
and 'stream4' in the first example.
> Bolt for Joining streams
> ------------------------
>
> Key: STORM-2334
> URL: https://issues.apache.org/jira/browse/STORM-2334
> Project: Apache Storm
> Issue Type: Bug
> Affects Versions: 2.0.0, 1.x
> Reporter: Roshan Naik
> Assignee: Roshan Naik
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Create a general purpose windowed bolt that performs Joins on multiple data
> streams.
> Since, depending on the topo config, the bolt could be receiving data either
> on 'default' streams or on named streams .... join bolt should be able to
> differentiate the incoming data based on names of upstream components as well
> as stream names.
> *Example:*
> The following SQL style join involving 4 tables :
> {code}
> select userId, key4, key2, key3
> from stream1
> join stream2 on stream2.userId = stream1.key1
> join stream3 on stream3.key3 = stream2.userId
> left join stream4 on stream4.key4 = stream3.key3
> {code}
> Could be expressed using the Join Bolt over 4 named streams as :
> {code}
> new JoinBolt(STREAM, "stream1", "key1") //'STREAM' arg indicates that
> stream1/2/3/4 are names of streams. 'key1' is the key on which
> .join ("stream2", "userId", "stream1") //join stream2 on
> stream2.userId=stream1.key1
> .join ("stream3", "key3", "stream2") //join stream3 on
> stream3.key3=stream2.userId
> .leftjoin ("stream4", "key4", "stream3") //left join stream4 on
> stream4.key4=stream3.key3
> .select("userId, key4, key2, key3") // chose output fields
> .withWindowLength(..)
> .withSlidingInterval(..);
> {code}
> Or based on named source components :
> {code}
> new JoinBolt(SOURCE, "kafkaSpout1", "key1") //'SOURCE' arg indicates that
> kafkaSpout1, hdfsSpout3 etc are names of upstream components
> .join ("kafkaSpout2", "userId", "kafkaSpout1" )
> .join ("hdfsSpout3", "key3", "kafkaSpout2")
> .leftjoin ("mqttSpout1", "key4", "hdfsSpout3")
> .select ("userId, key4, key2, key3")
> .withWindowLength(..)
> .withSlidingInterval(..);
> {code}
> In order for the tuples to be joined correctly, 'fields grouping' should be
> employed on the incoming streams. Each stream should be grouped on the same
> key using which it will be joined against other streams. This is a
> restriction compared to SQL which allows join a table with others on any key
> and any number of keys.
> *For example:* If a 'Stream1' is Fields Grouped on 'key1', we cannot use a
> different 'key2' on 'Stream1' to join it with other streams. However,
> 'Stream1' can be joined using the same key with multiple other streams as
> show in this SQL.
> {code}
> select ....
> from stream1
> join stream2 on stream2.userId = stream1.key1
> join stream3 on stream3.key3 = stream1.key2 // not supportable in Join
> Bolt
> {code}
> Consequently the join bolt's syntax is a bit simplified compared to SQL. The
> key name for any given stream only appears once, as soon the stream is
> introduced for the first time in the join. Thereafter that key is implicitly
> used for joining. See the case of 'stream3' being joined with both 'stream2'
> and 'stream4' in the first example.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)