[ 
https://issues.apache.org/jira/browse/STORM-2228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

P. Taylor Goetz updated STORM-2228:
-----------------------------------
    Comment: was deleted

(was: Added to 1.1.0 release epic.)

> KafkaSpout does not replay properly when a topic maps to multiple streams
> -------------------------------------------------------------------------
>
>                 Key: STORM-2228
>                 URL: https://issues.apache.org/jira/browse/STORM-2228
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.0.0, 2.0.0, 1.0.1, 1.0.2, 1.1.0, 1.0.3
>            Reporter: Robert Joseph Evans
>            Assignee: Robert Joseph Evans
>            Priority: Blocker
>
> In the example.
> KafkaSpoutTopologyMainNamedTopics.java
> The code creates a TuplesBuilder and a KafkaSpoutStreams
> {code}
> protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
>     return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
>             new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], 
> TOPICS[1]),
>             new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
>             .build();
> }
> protected KafkaSpoutStreams getKafkaSpoutStreams() {
>     final Fields outputFields = new Fields("topic", "partition", "offset", 
> "key", "value");
>     final Fields outputFields1 = new Fields("topic", "partition", "offset");
>     return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], 
> new String[]{TOPICS[0], TOPICS[1]})  // contents of topics test, test1, sent 
> to test_stream
>             .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // 
> contents of topic test2 sent to test_stream
>             .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  
> // contents of topic test2 sent to test2_stream
>             .build();
> }
> {code}
> Essentially the code is trying to take {{TOPICS\[0]}}, {{TOPICS\[1]}}, and 
> {{TOPICS\[2]}} translate them to {{Fields("topic", "partition", "offset", 
> "key", "value")}} and output them on {{STREAMS\[0]}}. Then just for 
> {{TOPICS\[2]}} they want it to be output as {{Fields("topic", "partition", 
> "offset")}} to {{STREAMS\[2]}}.  (Don't know what happened to {{STREAMS\[1]}})
> There are two issues here.  First with how the TupleBuilder and the 
> SpoutStreams are split up, but coupled {{STREAMS\[2]}} is actually getting 
> the full "topic" "partition" "offset" "key" "value", but this minor.  The 
> real issue is that the code uses the same KafkaSpoutMessageId for all the 
> tuples emitted to both {{STREAMS\[1]}} and {{STREAMS\[2]}}.
> https://git.corp.yahoo.com/storm/storm/blob/5bcbb8d6d700d0d238d23f8f6d3976667aaedab9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L284-L304
> The code, however, is written to assume that it will only ever get one 
> ack/fail for a given KafkaSpoutMessageId.  This means that if one of the 
> emitted tuple trees succeed and then the other fails, the failure will not 
> result in anything being replayed!  This violates how storm is intended to 
> work.
> I discovered this as a part of STORM-2225, and I am fine with fixing it on 
> STORM-2225 (I would just remove support for that functionality because there 
> are other ways of doing this correctly).  But that would not maintain 
> backwards compatibility and I am not sure it would be appropriate for 1.x 
> releases.  I really would like to have feedback from others on this.
> I can put something into 1.x where it will throw an exception if acking is 
> enabled and this situation is present, but I don't want to spend the time 
> tying to do reference counting on the number of tuples actually emitted.  If 
> someone else wants to do that I would be happy to turn this JIRA over to them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to