[
https://issues.apache.org/jira/browse/STORM-2228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jungtaek Lim resolved STORM-2228.
---------------------------------
Resolution: Fixed
Fix Version/s: 1.1.0
2.0.0
Marking as 'Resolved' since STORM-2225 is resolved.
> KafkaSpout does not replay properly when a topic maps to multiple streams
> -------------------------------------------------------------------------
>
> Key: STORM-2228
> URL: https://issues.apache.org/jira/browse/STORM-2228
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 1.0.0, 2.0.0, 1.0.1, 1.0.2, 1.1.0, 1.0.3
> Reporter: Robert Joseph Evans
> Assignee: Robert Joseph Evans
> Priority: Blocker
> Fix For: 2.0.0, 1.1.0
>
>
> In the example.
> KafkaSpoutTopologyMainNamedTopics.java
> The code creates a TuplesBuilder and a KafkaSpoutStreams
> {code}
> protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
> return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
> new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0],
> TOPICS[1]),
> new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
> .build();
> }
> protected KafkaSpoutStreams getKafkaSpoutStreams() {
> final Fields outputFields = new Fields("topic", "partition", "offset",
> "key", "value");
> final Fields outputFields1 = new Fields("topic", "partition", "offset");
> return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0],
> new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent
> to test_stream
> .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) //
> contents of topic test2 sent to test_stream
> .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})
> // contents of topic test2 sent to test2_stream
> .build();
> }
> {code}
> Essentially the code is trying to take {{TOPICS\[0]}}, {{TOPICS\[1]}}, and
> {{TOPICS\[2]}} translate them to {{Fields("topic", "partition", "offset",
> "key", "value")}} and output them on {{STREAMS\[0]}}. Then just for
> {{TOPICS\[2]}} they want it to be output as {{Fields("topic", "partition",
> "offset")}} to {{STREAMS\[2]}}. (Don't know what happened to {{STREAMS\[1]}})
> There are two issues here. First with how the TupleBuilder and the
> SpoutStreams are split up, but coupled {{STREAMS\[2]}} is actually getting
> the full "topic" "partition" "offset" "key" "value", but this minor. The
> real issue is that the code uses the same KafkaSpoutMessageId for all the
> tuples emitted to both {{STREAMS\[1]}} and {{STREAMS\[2]}}.
> https://git.corp.yahoo.com/storm/storm/blob/5bcbb8d6d700d0d238d23f8f6d3976667aaedab9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L284-L304
> The code, however, is written to assume that it will only ever get one
> ack/fail for a given KafkaSpoutMessageId. This means that if one of the
> emitted tuple trees succeed and then the other fails, the failure will not
> result in anything being replayed! This violates how storm is intended to
> work.
> I discovered this as a part of STORM-2225, and I am fine with fixing it on
> STORM-2225 (I would just remove support for that functionality because there
> are other ways of doing this correctly). But that would not maintain
> backwards compatibility and I am not sure it would be appropriate for 1.x
> releases. I really would like to have feedback from others on this.
> I can put something into 1.x where it will throw an exception if acking is
> enabled and this situation is present, but I don't want to spend the time
> tying to do reference counting on the number of tuples actually emitted. If
> someone else wants to do that I would be happy to turn this JIRA over to them.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)