[
https://issues.apache.org/jira/browse/BEAM-6813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16845499#comment-16845499
]
Kenneth Knowles commented on BEAM-6813:
---------------------------------------
[~kedin] looked at something similar, so pinging here. I wonder if we can catch
the issue.
> Issues with state + timers in java Direct Runner
> -------------------------------------------------
>
> Key: BEAM-6813
> URL: https://issues.apache.org/jira/browse/BEAM-6813
> Project: Beam
> Issue Type: Bug
> Components: runner-direct
> Affects Versions: 2.11.0
> Reporter: Steve Niemitz
> Priority: Major
>
> I was experimenting with a stateful DoFn with timers, and ran into a weird
> bug where a state cell I was writing to would come back as null when I read
> it inside a timer callback.
> I've attached the code below [1] (please excuse the scala ;) ).
> After I dug into this a little bit, I found that the state's value was
> present in the `underlying` table in CopyOnAccessMemoryStateTable [2], but
> not set in the `stateTable` itself on the instance. [3] Based on my very
> rudimentary understanding of how this works in the direct runner, it seems
> like commit() is not being called on the state table before the timer is
> firing?
>
> [1]
> {code:java}
> private final class AggregatorDoFn[K, V, Acc, Out](
> combiner: CombineFn[V, Acc, Out],
> keyCoder: Coder[K],
> accumulatorCoder: Coder[Acc]
> ) extends DoFn[KV[K, V], KV[K, Out]] {
> @StateId(KeyId)
> private final val keySpec = StateSpecs.value(keyCoder)
> @StateId(AggregationId)
> private final val stateSpec = StateSpecs.combining(accumulatorCoder,
> combiner)
> @StateId("numElements")
> private final val numElementsSpec = StateSpecs.combining(Sum.ofLongs())
> @TimerId(FlushTimerId)
> private final val flushTimerSpec =
> TimerSpecs.timer(TimeDomain.PROCESSING_TIME)
> @ProcessElement
> def processElement(
> @StateId(KeyId) key: ValueState[K],
> @StateId(AggregationId) state: CombiningState[V, Acc, Out],
> @StateId("numElements") numElements: CombiningState[JLong, _, JLong],
> @TimerId(FlushTimerId) flushTimer: Timer,
> @Element element: KV[K, V],
> window: BoundedWindow
> ): Unit = {
> key.write(element.getKey)
> state.add(element.getValue)
> numElements.add(1L)
> if (numElements.read() == 1) {
> flushTimer
> .offset(Duration.standardSeconds(10))
> .setRelative()
> }
> }
> @OnTimer(FlushTimerId)
> def onFlushTimer(
> @StateId(KeyId) key: ValueState[K],
> @StateId(AggregationId) state: CombiningState[V, _, Out],
> @StateId("numElements") numElements: CombiningState[JLong, _, JLong],
> output: OutputReceiver[KV[K, Out]]
> ): Unit = {
> if (numElements.read() > 0) {
> val k = key.read()
> output.output(
> KV.of(k, state.read())
> )
> }
> numElements.clear()
> }
> }{code}
> [2]
> [https://imgur.com/a/xvPR5nd]
> [3]
> [https://imgur.com/a/jznMdaQ]
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)