[
https://issues.apache.org/jira/browse/STORM-2241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15742646#comment-15742646
]
SHRAVANKUMAR DUBBUDU commented on STORM-2241:
---------------------------------------------
nextTuple() method have for loop which may be blocking in emitting tuples.
> KafkaSpout implementaion
> ------------------------
>
> Key: STORM-2241
> URL: https://issues.apache.org/jira/browse/STORM-2241
> Project: Apache Storm
> Issue Type: Question
> Components: storm-kafka
> Affects Versions: 0.10.0
> Reporter: SHRAVANKUMAR DUBBUDU
> Priority: Minor
>
> Storm ISpout documentaion say 'Storm executes ack, fail, and nextTuple all on
> the same thread. This means that an implementor of an ISpout does not need to
> worry about concurrency issues between those methods. However, it also means
> that an implementor must ensure that nextTuple is non-blocking: otherwise the
> method could block acks and fails that are pending to be processed.'
> Where as KafkaSpout has below nextTuple() implementation
> @Override
> public void nextTuple() {
> List<PartitionManager> managers =
> _coordinator.getMyManagedPartitions();
> for (int i = 0; i < managers.size(); i++) {
> try {
> // in case the number of managers decreased
> _currPartitionIndex = _currPartitionIndex % managers.size();
> EmitState state =
> managers.get(_currPartitionIndex).next(_collector);
> if (state != EmitState.EMITTED_MORE_LEFT) {
> _currPartitionIndex = (_currPartitionIndex + 1) %
> managers.size();
> }
> if (state != EmitState.NO_EMITTED) {
> break;
> }
> } catch (FailedFetchException e) {
> LOG.warn("Fetch failed", e);
> _coordinator.refresh();
> }
> }
> long now = System.currentTimeMillis();
> if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
> commit();
> }
> }
> We are seeing events are getting replayed when there is slower bolt in the
> topology chain causing duplicate messages.
> Is there any way this can be fixed.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)