[
https://issues.apache.org/jira/browse/STORM-303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13990740#comment-13990740
]
ASF GitHub Bot commented on STORM-303:
--------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/94#discussion_r12329897
--- Diff: external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java ---
@@ -147,80 +146,97 @@ public EmitState next(SpoutOutputCollector collector)
{
private void fill() {
long start = System.nanoTime();
- ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig,
_consumer, _partition, _emittedToOffset);
+ long offset;
+ final boolean had_failed = !failed.isEmpty();
+
+ // Are there failed tuples? If so, fetch those first.
+ if (had_failed) {
+ offset = failed.first();
+ } else {
+ offset = _emittedToOffset + 1;
+ }
+
+ ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig,
_consumer, _partition, offset);
long end = System.nanoTime();
long millis = (end - start) / 1000000;
_fetchAPILatencyMax.update(millis);
_fetchAPILatencyMean.update(millis);
_fetchAPICallCount.incr();
- int numMessages = countMessages(msgs);
- _fetchAPIMessageCount.incrBy(numMessages);
+ if (msgs != null) {
+ int numMessages = 0;
- if (numMessages > 0) {
- LOG.info("Fetched " + numMessages + " messages from: " +
_partition);
- }
- for (MessageAndOffset msg : msgs) {
- _pending.add(_emittedToOffset);
- _waitingToEmit.add(new MessageAndRealOffset(msg.message(),
_emittedToOffset));
- _emittedToOffset = msg.nextOffset();
- }
- if (numMessages > 0) {
- LOG.info("Added " + numMessages + " messages from: " +
_partition + " to internal buffers");
- }
- }
-
- private int countMessages(ByteBufferMessageSet messageSet) {
- int counter = 0;
- for (MessageAndOffset messageAndOffset : messageSet) {
- counter = counter + 1;
+ for (MessageAndOffset msg : msgs) {
+ final Long cur_offset = msg.offset();
+ if (!had_failed || failed.contains(cur_offset)) {
+ numMessages += 1;
+ _pending.add(cur_offset);
+ _waitingToEmit.add(new
MessageAndRealOffset(msg.message(), cur_offset));
+ _emittedToOffset = Math.max(cur_offset,
_emittedToOffset);
+ if (had_failed) {
+ failed.remove(cur_offset);
+ }
+ }
+ }
+ _fetchAPIMessageCount.incrBy(numMessages);
}
- return counter;
}
public void ack(Long offset) {
- _pending.remove(offset);
+ if (!_pending.isEmpty() && _pending.first() < offset -
_spoutConfig.maxOffsetBehind) {
+ // Too many things pending!
+ _pending.headSet(offset).clear();
+ } else {
+ _pending.remove(offset);
+ }
+ numberAcked++;
}
public void fail(Long offset) {
- //TODO: should it use in-memory ack set to skip anything that's
been acked but not committed???
- // things might get crazy with lots of timeouts
- if (_emittedToOffset > offset) {
- _emittedToOffset = offset;
- _pending.tailSet(offset).clear();
+ if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) {
+ LOG.info(
+ "Skipping failed tuple at offset=" + offset +
+ " because it's more than maxOffsetBehind=" +
_spoutConfig.maxOffsetBehind +
+ " behind _emittedToOffset=" + _emittedToOffset
+ );
+ } else {
+ LOG.debug("failing at offset=" + offset + " with
_pending.size()=" + _pending.size() + " pending and _emittedToOffset=" +
_emittedToOffset);
+ failed.add(offset);
+ numberFailed++;
+ if (numberAcked == 0 && numberFailed >
_spoutConfig.maxOffsetBehind) {
+ throw new RuntimeException("Too many tuple failures");
+ }
}
}
public void commit() {
- long lastCompletedOffset = lastCompletedOffset();
- if (lastCompletedOffset != lastCommittedOffset()) {
- LOG.info("Writing last completed offset (" +
lastCompletedOffset + ") to ZK for " + _partition + " for topology: " +
_topologyInstanceId);
- Map<Object, Object> data = ImmutableMap.builder()
+ LOG.debug("Committing offset for " + _partition);
+ long committedTo;
+ if (_pending.isEmpty()) {
+ committedTo = _emittedToOffset;
+ } else {
+ committedTo = _pending.first() - 1;
+ }
+ if (committedTo != _committedTo) {
+ LOG.debug("Writing committed offset to ZK: " + committedTo);
+
+ Map<Object, Object> data = (Map<Object, Object>)
ImmutableMap.builder()
.put("topology", ImmutableMap.of("id",
_topologyInstanceId,
"name", _stormConf.get(Config.TOPOLOGY_NAME)))
- .put("offset", lastCompletedOffset)
+ .put("offset", committedTo)
.put("partition", _partition.partition)
.put("broker", ImmutableMap.of("host",
_partition.host.host,
"port", _partition.host.port))
.put("topic", _spoutConfig.topic).build();
_state.writeJSON(committedPath(), data);
- _committedTo = lastCompletedOffset;
- LOG.info("Wrote last completed offset (" + lastCompletedOffset
+ ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
- } else {
- LOG.info("No new offset for " + _partition + " for topology: "
+ _topologyInstanceId);
+
+ //LOG.info("Wrote committed offset to ZK: " + committedTo);
--- End diff --
Either remove this or change it to debug.
> Forward port of storm-kafka work
> --------------------------------
>
> Key: STORM-303
> URL: https://issues.apache.org/jira/browse/STORM-303
> Project: Apache Storm (Incubating)
> Issue Type: Bug
> Affects Versions: 0.9.2-incubating
> Environment: linux
> Reporter: Brenden Matthews
>
> This is a placeholder issue for the patch at
> https://github.com/apache/incubator-storm/pull/94.
--
This message was sent by Atlassian JIRA
(v6.2#6252)