Stig Rohde Døssing created STORM-3280:
-----------------------------------------

             Summary: Trident-based windowing does not appear to guarantee 
at-least-once
                 Key: STORM-3280
                 URL: https://issues.apache.org/jira/browse/STORM-3280
             Project: Apache Storm
          Issue Type: Bug
          Components: trident
    Affects Versions: 1.2.2, 2.0.0
            Reporter: Stig Rohde Døssing


[~shaikasifullah] mentioned that he was experiencing lost tuples when 
restarting a Trident topology that uses windowing alongside the opaque Kafka 
spout.

I think this is due to a bug in the Trident windowing implementation.

Trident doesn't use the regular acking mechanism to keep track of all tuples in 
a batch. Instead, the bolt executors in Trident send "coordinator" tuples 
downstream following each batch, indicating how many tuples were in the batch. 
These coordinator tuples are anchored to the initial "emit batch" tuple at the 
master batch coordinator (MBC). The next bolt executor in line checks if it 
received all the expected tuples, and fails the "emit batch" tree if not. 
Otherwise, the entire batch is considered acked when the coordinator tuple is 
acked, which happens as soon as it is received (purposefully ignoring the 
commit mechanism here).

The bolt executor notifies the wrapped bolt when a batch starts, and when it 
finishes. The expectation is that the bolt will emit any new tuples it wants 
anchored to the coordinator tuple before the bolt executor considers the batch 
finished. See 
https://github.com/apache/storm/blob/19fbfb9ac8f82719cf70fedf6a024acaeec4e804/storm-client/src/jvm/org/apache/storm/trident/topology/TridentBoltExecutor.java#L127.

The windowing mechanism in Trident is implemented via a processor 
https://github.com/apache/storm/blob/19fbfb9ac8f82719cf70fedf6a024acaeec4e804/storm-client/src/jvm/org/apache/storm/trident/windowing/WindowTridentProcessor.java#L147.
 The processor collects received tuples grouped by batch, and only passes them 
to the WindowManager when a batch is considered complete. At this point, it 
will also check if any triggers have fired (e.g. due to timeout), and will emit 
any resulting windows.

The issue here is that there is no correlation between the finished batch and 
which tuples the window processor chooses to emit during the finishBatch call. 
Unless it emits exactly the tuples from the received batch, there is a risk of 
losing the at-least-once property, since the bolt executor will ack the 
coordinator tuple immediately following finishBatch.

Just to give a concrete example:

MBC starts txid 1 by emitting an "emit batch" tuple
Spout executor receives the tuple, emits tuple 1-10, then emits coordinator 
tuple containing expected count of 10 tuples.
Bolt executor receives tuple 1-10
Bolt executor receives coordinator tuple from upstream spout, containing an 
expected count of 10 tuples
Bolt executor calls finishBatch
Window processor is configured with a window of 10 seconds, and decides not to 
emit the 10 tuples. Since nothing is emitted, no new tuples are anchored at the 
coordinator tuple.
Bolt executor acks the coordinator tuple at the MBC
The MBC sees that the "emit batch" tuple has been acked, and starts the commit 
process. At this point Trident is free to assume the 10 tuples have been 
correctly processed and e.g. write to Zookeeper that the Kafka spout should 
pick up at offset 10 next time it starts.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to