First of all I am not an expert on trident. I understand it at a high level, but I have not done too much with the internals of it. And yes we are all in agreement that the documentation for trident is quite bad. It was something that Nathan finished just before he left Twitter so a lot of the deep knowledge about it is still with him. In all distributed systems there really is no way to get truly exactly once processing. All processing needs to be idempotent with retry. Spark streaming, beam, trident, flink, all offer some variant of this. they just differ in how far you would have to roll back in your processing. The recently announced transactions in spark streaming can be thought of as the same thing but with an interesting twist because the input and the output are both going to the same system so they can atomically commit the the output and update the pointer for the input at the same time.
The way trident does it is by dividing the input into a micro batch. The guarantee is that the batches will be committed to some external system in order. They may be processed out of order, but will be committed in order. If one batch fails that batch will be replayed and all other outstanding batches will not be committed until the previous batch has completed. This is why writing a result to an external system in storm requires you to write to a State https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/trident/state/State.java All state defines is how to commit data to it. There is nothing about how that data is stored or what that data is. Typically the data will be stored in a key value store but it could be anything. https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/trident/state/map/MapState.java How you implement those guarantees in the state is up to you. If could be through transactions to a database, it could be through adding the transaction id to the key. https://github.com/apache/storm/blob/master/external/storm-hbase/src/main/java/org/apache/storm/hbase/trident/state/HBaseState.java HBase ignores the commits and assumes all of the operations are already idempotent. https://github.com/apache/storm/blob/master/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java Hdfs does use the commits to rotate files and can then recover from replayed data if need be. You might also want to look at OpaqueMap https://github.com/apache/storm/blob/master/storm-client/src/jvm/org/apache/storm/trident/state/map/OpaqueMap.java to see how it tries to add the transaction id to different backing maps so that is can make the operations idempotent. - Bobby On Sunday, May 14, 2017, 3:49:11 PM CDT, Tech Id <[email protected]> wrote:Hi, We have been using Storm for more than an year now and generally very happy with the product. However I want to bring up a concern for the documentation of Trident. Me and a few colleagues went over the documentation at http://storm.apache.org/releases/current/Trident-state.html But it is still unclear how the micro-batching achieves its goal. Consider this statement for instance: "*Suppose your topology computes word count and you want to store the word counts in a key/value database ... what you can do is store the transaction id with the count in the database as an atomic value. *" *Question:* The key/value database is the target of Trident topology or something internal to Trident? - If it is the target of the topology, how can we change the system from being a key-value store to a key-value-txid store? It seems that we are making the target itself as idempotent and not really providing exactly-once semantics to Trident itself in a generic way because target's idempotency is specific to the target and should not be considered an exactly-once property of streaming. - If the key-value store is something internal to Trident, then how are we achieving exactly-once for the actual target? It seems that the actual target and the key-value store are two different systems with no distributed transaction. Clearly, one system can fail and the other can succeed, thus loosing the exactly-once promise. *Another Question:* It seems that a micro-batch will be continuously replayed even if just one word in that batch failed to update its count in the key-value store. If that is true, the next batch will not even be sent to the target system because that next batch might have the same word and since its previous batch was not successful due to the same failing word, this next batch would also fail. (Rule 3: "*State updates are ordered among batches. That is, the state updates for batch 3 won't be applied until the state updates for batch 2 have succeeded.*"). This means that the whole topology will be stuck at the failing batch. This is much worse than single-tuple at-least-once delivery because a batch may be having thousands to millions of messages and all of them are now stuck with the topology coming to a stand-still. If the problem was temporary, exactly-once-delivery will suffer a giant hit on latency as compared to its atleast-once-delivery sibling. If this is true, then this fact should be added to the documentation so that users know beforehand what they are signing up for. *A side note:* This statement seems to be very old and completely untrue as of now: "*One side note – once Kafka supports replication, it will be possible to have transactional spouts that are fault-tolerant to node failure, but that feature does not exist yet*" Kafka does support replication and we should consider fixing the docs to reflect the same. Thanks TI
