Github user srdo commented on the issue:
https://github.com/apache/storm/pull/2492
This breaks the semantics of transactional spouts. Please see
http://storm.apache.org/releases/1.1.1/Trident-spouts.html and the sections
about transactional and opaque spouts here
http://storm.apache.org/releases/1.1.1/Trident-state.html. The latter link
describes exactly you case in the example for why transactional spouts should
not always be used.
> You might be wondering â why wouldn't you just always use a
transactional spout? They're simple and easy to understand. One reason you
might not use one is because they're not necessarily very fault-tolerant. For
example, the way TransactionalTridentKafkaSpout works is the batch for a txid
will contain tuples from all the Kafka partitions for a topic. Once a batch has
been emitted, any time that batch is re-emitted in the future the exact same
set of tuples must be emitted to meet the semantics of transactional spouts.
Now suppose a batch is emitted from TransactionalTridentKafkaSpout, the batch
fails to process, and at the same time one of the Kafka nodes goes down. You're
now incapable of replaying the same batch as you did before (since the node is
down and some partitions for the topic are not unavailable), and processing
will halt.
Basically transactional non-opaque spouts guarantee that a given
transaction id always corresponds to the same set of tuples. So for example if
txid = 1 contains tuples [A, B, C], then if txid = 1 is reemitted, it has to
still contain exactly [A, B, C]. This change will break that guarantee, because
if e.g. A, B have been deleted from Kafka, we'll hit the new code and txid = 1
might now contain [C, E, F] instead.
If you need to be able to handle offsets being deleted from Kafka (which is
what you're trying to do with config.usestarttimeifoffsetoutofrange), you need
to instead use an Opaque spout
(https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java).
---