[
https://issues.apache.org/jira/browse/BEAM-11971?focusedWorklogId=754974&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-754974
]
ASF GitHub Bot logged work on BEAM-11971:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 09/Apr/22 23:08
Start Date: 09/Apr/22 23:08
Worklog Time Spent: 10m
Work Description: reuvenlax merged PR #16928:
URL: https://github.com/apache/beam/pull/16928
Issue Time Tracking
-------------------
Worklog Id: (was: 754974)
Time Spent: 8h 50m (was: 8h 40m)
> Direct Runner State is null while active timers exist
> -----------------------------------------------------
>
> Key: BEAM-11971
> URL: https://issues.apache.org/jira/browse/BEAM-11971
> Project: Beam
> Issue Type: Bug
> Components: runner-direct
> Reporter: Reza ardeshir rokni
> Assignee: Reuven Lax
> Priority: P3
> Time Spent: 8h 50m
> Remaining Estimate: 0h
>
> State is set to {{null}} while active timer is present, this issue does not
> show in other runners.
> The following example will reach the IllegalStateException within 10-20 times
> of it being run. {{LOOP_COUNT}} does not seem to be a factor as it reproduces
> with 100 or 100000 {{LOOP_COUNT}}. The number of keys is a factor as it did
> not reproduce with only one key, have not tried with more than 3 keys to see
> if it's easier to reproduce.
>
> {code}
> package test;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
> import org.apache.beam.sdk.coders.KvCoder;
> import org.apache.beam.sdk.state.StateSpec;
> import org.apache.beam.sdk.state.StateSpecs;
> import org.apache.beam.sdk.state.TimeDomain;
> import org.apache.beam.sdk.state.Timer;
> import org.apache.beam.sdk.state.TimerSpec;
> import org.apache.beam.sdk.state.TimerSpecs;
> import org.apache.beam.sdk.state.ValueState;
> import org.apache.beam.sdk.testing.TestStream;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.PTransform;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.WithKeys;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> import org.joda.time.Duration;
> import org.joda.time.Instant;
> import java.util.Optional;
>
> public class Test {
> public static void main (String [] args) throws Exception{
> Test.testToFailure();
> }
> public static void testToFailure() throws Exception {
> int count = 0;
> while (true) {
> failingTest();
> System.out.println(
> String.format("Got to Count %s", String.valueOf(count++)));
> }
> }
> public static void failingTest() throws Exception {
> Pipeline p = Pipeline.create();
> Instant now = Instant.now();
> TestStream<Integer> stream =
> TestStream.create(BigEndianIntegerCoder.of())
> .addElements(1)
>
> .advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
> .addElements(2)
>
> .advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
> .addElements(3)
> .advanceWatermarkToInfinity();
> p.apply(stream)
> .apply(WithKeys.of(x -> x))
> .setCoder(KvCoder.of(BigEndianIntegerCoder.of(),
> BigEndianIntegerCoder.of()))
> .apply(new TestToFail());
> p.run();
> }
> public static class TestToFail
> extends PTransform<PCollection<KV<Integer, Integer>>,
> PCollection<Integer>> {
> @Override
> public PCollection<Integer> expand(PCollection<KV<Integer, Integer>>
> input) {
> return input.apply(ParDo.of(new LoopingRead()));
> }
> }
> public static class LoopingRead extends DoFn<KV<Integer, Integer>,
> Integer> {
> static int LOOP_COUNT = 100;
> @StateId("value")
> private final StateSpec<ValueState<Integer>> value =
> StateSpecs.value(BigEndianIntegerCoder.of());
> @StateId("count")
> private final StateSpec<ValueState<Integer>> count =
> StateSpecs.value(BigEndianIntegerCoder.of());
> @TimerId("actionTimers")
> private final TimerSpec timer =
> TimerSpecs.timer(TimeDomain.EVENT_TIME);
> @ProcessElement
> public void processElement(
> ProcessContext c,
> @StateId("value") ValueState<Integer> value,
> @TimerId("actionTimers") Timer timers) {
> value.write(c.element().getValue());
> timers.set(c.timestamp().plus(Duration.millis(1000)));
> }
> /** */
> @OnTimer("actionTimers")
> public void onTimer(
> OnTimerContext c,
> @StateId("value") ValueState<Integer> value,
> @StateId("count") ValueState<Integer> count,
> @TimerId("actionTimers") Timer timers) {
> if (value.read() == null) {
> throw new IllegalStateException("BINGO!");
> }
> Integer counter = Optional.ofNullable(count.read()).orElse(0) + 1;
> count.write(counter);
> value.write(value.read() + counter);
> if (counter < LOOP_COUNT) {
> timers.set(c.timestamp().plus(Duration.standardSeconds(1)));
> }
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)