[ 
https://issues.apache.org/jira/browse/STORM-303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13990740#comment-13990740
 ] 

ASF GitHub Bot commented on STORM-303:
--------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/94#discussion_r12329897
  
    --- Diff: external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java ---
    @@ -147,80 +146,97 @@ public EmitState next(SpoutOutputCollector collector) 
{
     
         private void fill() {
             long start = System.nanoTime();
    -        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, 
_consumer, _partition, _emittedToOffset);
    +        long offset;
    +        final boolean had_failed = !failed.isEmpty();
    +
    +        // Are there failed tuples? If so, fetch those first.
    +        if (had_failed) {
    +            offset = failed.first();
    +        } else {
    +            offset = _emittedToOffset + 1;
    +        }
    +
    +        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, 
_consumer, _partition, offset);
             long end = System.nanoTime();
             long millis = (end - start) / 1000000;
             _fetchAPILatencyMax.update(millis);
             _fetchAPILatencyMean.update(millis);
             _fetchAPICallCount.incr();
    -        int numMessages = countMessages(msgs);
    -        _fetchAPIMessageCount.incrBy(numMessages);
    +        if (msgs != null) {
    +            int numMessages = 0;
     
    -        if (numMessages > 0) {
    -            LOG.info("Fetched " + numMessages + " messages from: " + 
_partition);
    -        }
    -        for (MessageAndOffset msg : msgs) {
    -            _pending.add(_emittedToOffset);
    -            _waitingToEmit.add(new MessageAndRealOffset(msg.message(), 
_emittedToOffset));
    -            _emittedToOffset = msg.nextOffset();
    -        }
    -        if (numMessages > 0) {
    -            LOG.info("Added " + numMessages + " messages from: " + 
_partition + " to internal buffers");
    -        }
    -    }
    -
    -    private int countMessages(ByteBufferMessageSet messageSet) {
    -        int counter = 0;
    -        for (MessageAndOffset messageAndOffset : messageSet) {
    -            counter = counter + 1;
    +            for (MessageAndOffset msg : msgs) {
    +                final Long cur_offset = msg.offset();
    +                if (!had_failed || failed.contains(cur_offset)) {
    +                    numMessages += 1;
    +                    _pending.add(cur_offset);
    +                    _waitingToEmit.add(new 
MessageAndRealOffset(msg.message(), cur_offset));
    +                    _emittedToOffset = Math.max(cur_offset, 
_emittedToOffset);
    +                    if (had_failed) {
    +                        failed.remove(cur_offset);
    +                    }
    +                }
    +            }
    +            _fetchAPIMessageCount.incrBy(numMessages);
             }
    -        return counter;
         }
     
         public void ack(Long offset) {
    -        _pending.remove(offset);
    +        if (!_pending.isEmpty() && _pending.first() < offset - 
_spoutConfig.maxOffsetBehind) {
    +            // Too many things pending!
    +            _pending.headSet(offset).clear();
    +        } else {
    +            _pending.remove(offset);
    +        }
    +        numberAcked++;
         }
     
         public void fail(Long offset) {
    -        //TODO: should it use in-memory ack set to skip anything that's 
been acked but not committed???
    -        // things might get crazy with lots of timeouts
    -        if (_emittedToOffset > offset) {
    -            _emittedToOffset = offset;
    -            _pending.tailSet(offset).clear();
    +        if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) {
    +            LOG.info(
    +                    "Skipping failed tuple at offset=" + offset +
    +                            " because it's more than maxOffsetBehind=" + 
_spoutConfig.maxOffsetBehind +
    +                            " behind _emittedToOffset=" + _emittedToOffset
    +            );
    +        } else {
    +            LOG.debug("failing at offset=" + offset + " with 
_pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + 
_emittedToOffset);
    +            failed.add(offset);
    +            numberFailed++;
    +            if (numberAcked == 0 && numberFailed > 
_spoutConfig.maxOffsetBehind) {
    +                throw new RuntimeException("Too many tuple failures");
    +            }
             }
         }
     
         public void commit() {
    -        long lastCompletedOffset = lastCompletedOffset();
    -        if (lastCompletedOffset != lastCommittedOffset()) {
    -            LOG.info("Writing last completed offset (" + 
lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + 
_topologyInstanceId);
    -            Map<Object, Object> data = ImmutableMap.builder()
    +        LOG.debug("Committing offset for " + _partition);
    +        long committedTo;
    +        if (_pending.isEmpty()) {
    +            committedTo = _emittedToOffset;
    +        } else {
    +            committedTo = _pending.first() - 1;
    +        }
    +        if (committedTo != _committedTo) {
    +            LOG.debug("Writing committed offset to ZK: " + committedTo);
    +
    +            Map<Object, Object> data = (Map<Object, Object>) 
ImmutableMap.builder()
                         .put("topology", ImmutableMap.of("id", 
_topologyInstanceId,
                                 "name", _stormConf.get(Config.TOPOLOGY_NAME)))
    -                    .put("offset", lastCompletedOffset)
    +                    .put("offset", committedTo)
                         .put("partition", _partition.partition)
                         .put("broker", ImmutableMap.of("host", 
_partition.host.host,
                                 "port", _partition.host.port))
                         .put("topic", _spoutConfig.topic).build();
                 _state.writeJSON(committedPath(), data);
    -            _committedTo = lastCompletedOffset;
    -            LOG.info("Wrote last completed offset (" + lastCompletedOffset 
+ ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
    -        } else {
    -            LOG.info("No new offset for " + _partition + " for topology: " 
+ _topologyInstanceId);
    +
    +            //LOG.info("Wrote committed offset to ZK: " + committedTo);
    --- End diff --
    
    Either remove this or change it to debug.


> Forward port of storm-kafka work
> --------------------------------
>
>                 Key: STORM-303
>                 URL: https://issues.apache.org/jira/browse/STORM-303
>             Project: Apache Storm (Incubating)
>          Issue Type: Bug
>    Affects Versions: 0.9.2-incubating
>         Environment: linux
>            Reporter: Brenden Matthews
>
> This is a placeholder issue for the patch at 
> https://github.com/apache/incubator-storm/pull/94.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to