[
https://issues.apache.org/jira/browse/STORM-2334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
P. Taylor Goetz updated STORM-2334:
-----------------------------------
Fix Version/s: (was: 1.1.1)
1.1.0
> Bolt for Joining streams
> ------------------------
>
> Key: STORM-2334
> URL: https://issues.apache.org/jira/browse/STORM-2334
> Project: Apache Storm
> Issue Type: Bug
> Affects Versions: 2.0.0, 1.x
> Reporter: Roshan Naik
> Assignee: Roshan Naik
> Fix For: 2.0.0, 1.1.0
>
> Time Spent: 6h 10m
> Remaining Estimate: 0h
>
> Create a general purpose windowed bolt that performs Joins on multiple data
> streams.
> Since, depending on the topo config, the bolt could be receiving data either
> on 'default' streams or on named streams .... join bolt should be able to
> differentiate the incoming data based on names of upstream components as well
> as stream names.
> *Example:*
> The following SQL style join involving 4 tables :
> {code}
> select userId, key4, key2, key3
> from stream1
> join stream2 on stream2.userId = stream1.key1
> join stream3 on stream3.key3 = stream2.userId
> left join stream4 on stream4.key4 = stream3.key3
> {code}
> Could be expressed using the Join Bolt over 4 named streams as :
> {code}
> new JoinBolt(STREAM, "stream1", "key1") //'STREAM' arg indicates that
> stream1/2/3/4 are names of streams. 'key1' is the key on which
> .join ("stream2", "userId", "stream1") //join stream2 on
> stream2.userId=stream1.key1
> .join ("stream3", "key3", "stream2") //join stream3 on
> stream3.key3=stream2.userId
> .leftjoin ("stream4", "key4", "stream3") //left join stream4 on
> stream4.key4=stream3.key3
> .select("userId, key4, key2, key3") // chose output fields
> .withWindowLength(..)
> .withSlidingInterval(..);
> {code}
> Or based on named source components :
> {code}
> new JoinBolt(SOURCE, "kafkaSpout1", "key1") //'SOURCE' arg indicates that
> kafkaSpout1, hdfsSpout3 etc are names of upstream components
> .join ("kafkaSpout2", "userId", "kafkaSpout1" )
> .join ("hdfsSpout3", "key3", "kafkaSpout2")
> .leftjoin ("mqttSpout1", "key4", "hdfsSpout3")
> .select ("userId, key4, key2, key3")
> .withWindowLength(..)
> .withSlidingInterval(..);
> {code}
> In order for the tuples to be joined correctly, 'fields grouping' should be
> employed on the incoming streams. Each stream should be grouped on the same
> key using which it will be joined against other streams. This is a
> restriction compared to SQL which allows join a table with others on any key
> and any number of keys.
> *For example:* If a 'Stream1' is Fields Grouped on 'key1', we cannot use a
> different 'key2' on 'Stream1' to join it with other streams. However,
> 'Stream1' can be joined using the same key with multiple other streams as
> show in this SQL.
> {code}
> select ....
> from stream1
> join stream2 on stream2.userId = stream1.key1
> join stream3 on stream3.key3 = stream1.key2 // not supportable in Join
> Bolt
> {code}
> Consequently the join bolt's syntax is a bit simplified compared to SQL. The
> key name for any given stream only appears once, as soon the stream is
> introduced for the first time in the join. Thereafter that key is implicitly
> used for joining. See the case of 'stream3' being joined with both 'stream2'
> and 'stream4' in the first example.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)