[ 
https://issues.apache.org/jira/browse/STORM-2241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15742646#comment-15742646
 ] 

SHRAVANKUMAR DUBBUDU commented on STORM-2241:
---------------------------------------------

nextTuple() method have for loop which may be blocking in emitting tuples.

> KafkaSpout implementaion
> ------------------------
>
>                 Key: STORM-2241
>                 URL: https://issues.apache.org/jira/browse/STORM-2241
>             Project: Apache Storm
>          Issue Type: Question
>          Components: storm-kafka
>    Affects Versions: 0.10.0
>            Reporter: SHRAVANKUMAR DUBBUDU
>            Priority: Minor
>
> Storm ISpout documentaion say 'Storm executes ack, fail, and nextTuple all on 
> the same thread. This means that an implementor of an ISpout does not need to 
> worry about concurrency issues between those methods. However, it also means 
> that an implementor must ensure that nextTuple is non-blocking: otherwise the 
> method could block acks and fails that are pending to be processed.'
> Where as KafkaSpout has below nextTuple() implementation
> @Override
>     public void nextTuple() {
>         List<PartitionManager> managers = 
> _coordinator.getMyManagedPartitions();
>         for (int i = 0; i < managers.size(); i++) {
>             try {
>                 // in case the number of managers decreased
>                 _currPartitionIndex = _currPartitionIndex % managers.size();
>                 EmitState state = 
> managers.get(_currPartitionIndex).next(_collector);
>                 if (state != EmitState.EMITTED_MORE_LEFT) {
>                     _currPartitionIndex = (_currPartitionIndex + 1) % 
> managers.size();
>                 }
>                 if (state != EmitState.NO_EMITTED) {
>                     break;
>                 }
>             } catch (FailedFetchException e) {
>                 LOG.warn("Fetch failed", e);
>                 _coordinator.refresh();
>             }
>         }
>         long now = System.currentTimeMillis();
>         if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
>             commit();
>         }
>     }
> We are seeing events are getting replayed when there is slower bolt in the 
> topology chain causing duplicate messages.
> Is there any way this can be fixed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to