Steve Niemitz created BEAM-7614:
-----------------------------------

             Summary: Event-time timers seem to sometimes fire multiple times 
on dataflow + streaming engine
                 Key: BEAM-7614
                 URL: https://issues.apache.org/jira/browse/BEAM-7614
             Project: Beam
          Issue Type: Bug
          Components: runner-dataflow
            Reporter: Steve Niemitz


This is kind of hard to reproduce, but I've seen it happen a few times in the 
wild now.

We have a DoFn that sets an event-time timer at window.maxTimestamp, the timer 
callback does something like:
{code:java}
def onWindowClose(
  @StateId(...) key: ValueState[K], 
  @StateId(...) values: CombiningState[V],
  out: OutputReceiver[O], 
  ...
) {
  
  val k = key.read()
  val values = values.read()

  out.output(KV.of(k, values)

  key.clear()
  values.clear()  
}{code}
Essentially, keep track of the key, accumulate values seen in a window, and 
emit them at the end of the window.  

ProcessElement is pretty simple as well:
{code:java}
def processElement(
  ctx: ProcessContext, 
  @StateId(...) key: ValueState[K], 
  @StateId(...) values: CombiningState[V],
  ...
) {
  key.write(ctx.element().getKey())
  value.add(ctx.element().getValue())
  timer.set(window.maxTimestamp())
}{code}
However, *ONLY* when running on streaming engine (this doesn't happen 
otherwise), I'll see cases where the onWindowClose timer fires with a null key, 
and empty values.

This can only happen if the timer fired twice, since it wouldn't have been set 
if no elements had arrived, and if late data had arrived, it would have set the 
key (and added to the combining state).  Also, we never have late date in our 
pipeline.

An interesting other thing I noticed is that these "phantom firings" seem to 
happen ~10-15 minutes _AFTER_ the window closes.

Again, its pretty rate, we'll have millions of keys in a window, and I'll only 
see the error happen every few hours (with hourly windows).

Let me know if I can clarify anything else!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to