Github user revans2 commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/94#discussion_r12329897
--- Diff: external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java ---
@@ -147,80 +146,97 @@ public EmitState next(SpoutOutputCollector collector)
{
private void fill() {
long start = System.nanoTime();
- ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig,
_consumer, _partition, _emittedToOffset);
+ long offset;
+ final boolean had_failed = !failed.isEmpty();
+
+ // Are there failed tuples? If so, fetch those first.
+ if (had_failed) {
+ offset = failed.first();
+ } else {
+ offset = _emittedToOffset + 1;
+ }
+
+ ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig,
_consumer, _partition, offset);
long end = System.nanoTime();
long millis = (end - start) / 1000000;
_fetchAPILatencyMax.update(millis);
_fetchAPILatencyMean.update(millis);
_fetchAPICallCount.incr();
- int numMessages = countMessages(msgs);
- _fetchAPIMessageCount.incrBy(numMessages);
+ if (msgs != null) {
+ int numMessages = 0;
- if (numMessages > 0) {
- LOG.info("Fetched " + numMessages + " messages from: " +
_partition);
- }
- for (MessageAndOffset msg : msgs) {
- _pending.add(_emittedToOffset);
- _waitingToEmit.add(new MessageAndRealOffset(msg.message(),
_emittedToOffset));
- _emittedToOffset = msg.nextOffset();
- }
- if (numMessages > 0) {
- LOG.info("Added " + numMessages + " messages from: " +
_partition + " to internal buffers");
- }
- }
-
- private int countMessages(ByteBufferMessageSet messageSet) {
- int counter = 0;
- for (MessageAndOffset messageAndOffset : messageSet) {
- counter = counter + 1;
+ for (MessageAndOffset msg : msgs) {
+ final Long cur_offset = msg.offset();
+ if (!had_failed || failed.contains(cur_offset)) {
+ numMessages += 1;
+ _pending.add(cur_offset);
+ _waitingToEmit.add(new
MessageAndRealOffset(msg.message(), cur_offset));
+ _emittedToOffset = Math.max(cur_offset,
_emittedToOffset);
+ if (had_failed) {
+ failed.remove(cur_offset);
+ }
+ }
+ }
+ _fetchAPIMessageCount.incrBy(numMessages);
}
- return counter;
}
public void ack(Long offset) {
- _pending.remove(offset);
+ if (!_pending.isEmpty() && _pending.first() < offset -
_spoutConfig.maxOffsetBehind) {
+ // Too many things pending!
+ _pending.headSet(offset).clear();
+ } else {
+ _pending.remove(offset);
+ }
+ numberAcked++;
}
public void fail(Long offset) {
- //TODO: should it use in-memory ack set to skip anything that's
been acked but not committed???
- // things might get crazy with lots of timeouts
- if (_emittedToOffset > offset) {
- _emittedToOffset = offset;
- _pending.tailSet(offset).clear();
+ if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) {
+ LOG.info(
+ "Skipping failed tuple at offset=" + offset +
+ " because it's more than maxOffsetBehind=" +
_spoutConfig.maxOffsetBehind +
+ " behind _emittedToOffset=" + _emittedToOffset
+ );
+ } else {
+ LOG.debug("failing at offset=" + offset + " with
_pending.size()=" + _pending.size() + " pending and _emittedToOffset=" +
_emittedToOffset);
+ failed.add(offset);
+ numberFailed++;
+ if (numberAcked == 0 && numberFailed >
_spoutConfig.maxOffsetBehind) {
+ throw new RuntimeException("Too many tuple failures");
+ }
}
}
public void commit() {
- long lastCompletedOffset = lastCompletedOffset();
- if (lastCompletedOffset != lastCommittedOffset()) {
- LOG.info("Writing last completed offset (" +
lastCompletedOffset + ") to ZK for " + _partition + " for topology: " +
_topologyInstanceId);
- Map<Object, Object> data = ImmutableMap.builder()
+ LOG.debug("Committing offset for " + _partition);
+ long committedTo;
+ if (_pending.isEmpty()) {
+ committedTo = _emittedToOffset;
+ } else {
+ committedTo = _pending.first() - 1;
+ }
+ if (committedTo != _committedTo) {
+ LOG.debug("Writing committed offset to ZK: " + committedTo);
+
+ Map<Object, Object> data = (Map<Object, Object>)
ImmutableMap.builder()
.put("topology", ImmutableMap.of("id",
_topologyInstanceId,
"name", _stormConf.get(Config.TOPOLOGY_NAME)))
- .put("offset", lastCompletedOffset)
+ .put("offset", committedTo)
.put("partition", _partition.partition)
.put("broker", ImmutableMap.of("host",
_partition.host.host,
"port", _partition.host.port))
.put("topic", _spoutConfig.topic).build();
_state.writeJSON(committedPath(), data);
- _committedTo = lastCompletedOffset;
- LOG.info("Wrote last completed offset (" + lastCompletedOffset
+ ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
- } else {
- LOG.info("No new offset for " + _partition + " for topology: "
+ _topologyInstanceId);
+
+ //LOG.info("Wrote committed offset to ZK: " + committedTo);
--- End diff --
Either remove this or change it to debug.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---