Steve Niemitz created BEAM-6813:
-----------------------------------

             Summary: Issues with state + timers in java Direct Runner 
                 Key: BEAM-6813
                 URL: https://issues.apache.org/jira/browse/BEAM-6813
             Project: Beam
          Issue Type: Bug
          Components: runner-direct
    Affects Versions: 2.11.0
            Reporter: Steve Niemitz


I was experimenting with a stateful DoFn with timers, and ran into a weird bug 
where a state cell I was writing to would come back as null when I read it 
inside a timer callback.

I've attached the code below [1] (please excuse the scala ;) ).

After I dug into this a little bit, I found that the state's value was present 
in the `underlying` table in CopyOnAccessMemoryStateTable [2], but not set in 
the `stateTable` itself on the instance. [3]   Based on my very rudimentary 
understanding of how this works in the direct runner, it seems like commit() is 
not being called on the state table before the timer is firing?
  
 [1]
{code:java}
private final class AggregatorDoFn[K, V, Acc, Out](
  combiner: CombineFn[V, Acc, Out],
  keyCoder: Coder[K],
  accumulatorCoder: Coder[Acc]
) extends DoFn[KV[K, V], KV[K, Out]] {

  @StateId(KeyId)
  private final val keySpec = StateSpecs.value(keyCoder)

  @StateId(AggregationId)
  private final val stateSpec = StateSpecs.combining(accumulatorCoder, combiner)

  @StateId("numElements")
  private final val numElementsSpec = StateSpecs.combining(Sum.ofLongs())

  @TimerId(FlushTimerId)
  private final val flushTimerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME)

  @ProcessElement
  def processElement(
    @StateId(KeyId) key: ValueState[K],
    @StateId(AggregationId) state: CombiningState[V, Acc, Out],
    @StateId("numElements") numElements: CombiningState[JLong, _, JLong],
    @TimerId(FlushTimerId) flushTimer: Timer,
    @Element element: KV[K, V],
    window: BoundedWindow
  ): Unit = {
    key.write(element.getKey)
    state.add(element.getValue)
    numElements.add(1L)

    if (numElements.read() == 1) {
      flushTimer
        .offset(Duration.standardSeconds(10))
        .setRelative()
    }
  }

  @OnTimer(FlushTimerId)
  def onFlushTimer(
    @StateId(KeyId) key: ValueState[K],
    @StateId(AggregationId) state: CombiningState[V, _, Out],
    @StateId("numElements") numElements: CombiningState[JLong, _, JLong],
    output: OutputReceiver[KV[K, Out]]
  ): Unit = {
    if (numElements.read() > 0) {
      val k = key.read()
      output.output(
        KV.of(k, state.read())
      )
    }
    numElements.clear()
  }
}{code}
[2]
 [https://imgur.com/a/xvPR5nd]

[3]
 [https://imgur.com/a/jznMdaQ]
  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to