Reza ardeshir rokni created BEAM-11971:
------------------------------------------

             Summary: Direct Runner State is null while active timers exist
                 Key: BEAM-11971
                 URL: https://issues.apache.org/jira/browse/BEAM-11971
             Project: Beam
          Issue Type: Bug
          Components: runner-direct
            Reporter: Reza ardeshir rokni


State is set to Null while active timer is present, this issue does not show in 
other runners.

 

The following example will reach the IllegalStateException within 10-20 times 
of it being run . LOOP_COUNT does not seem to be a factor as it reproduces with 
100 or 100000 LOOP_COUNT. The number of keys is a factor as it did not 
reproduce with only one key, have not tried with more than 3 keys to see if its 
easier to reproduce. 

public void testToFailure() throws Exception {
int count = 0;

while(true){
failingTest();
System.out.println(String.format("Got to Count %s", String.valueOf(count++)));
}
}


public void failingTest() throws Exception {
Instant now = Instant.now();
TestStream<Integer> stream =
TestStream.create(BigEndianIntegerCoder.of())
.addElements(1)
.advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
.addElements(2)
.advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
.addElements(3)
.advanceWatermarkToInfinity();

p.apply(stream)
.apply(WithKeys.of(x -> x))
.setCoder(KvCoder.of(BigEndianIntegerCoder.of(), BigEndianIntegerCoder.of()))
.apply(new TestToFail());
p.run();
}

public static class TestToFail
extends PTransform<PCollection<KV<Integer, Integer>>, PCollection<Integer>> {

@Override
public PCollection<Integer> expand(PCollection<KV<Integer, Integer>> input) {
return input.apply(ParDo.of(new LoopingRead()));
}
}

public static class LoopingRead extends DoFn<KV<Integer, Integer>, Integer> {

static int LOOP_COUNT = 100;

@StateId("value")
private final StateSpec<ValueState<Integer>> value =
StateSpecs.value(BigEndianIntegerCoder.of());

@StateId("count")
private final StateSpec<ValueState<Integer>> count =
StateSpecs.value(BigEndianIntegerCoder.of());

@TimerId("actionTimers")
private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

@ProcessElement
public void processElement(
ProcessContext c,
@StateId("value") ValueState<Integer> value,
@TimerId("actionTimers") Timer timers) {

value.write(c.element().getValue());
timers.set(c.timestamp().plus(Duration.millis(1000)));
}

/** */
@OnTimer("actionTimers")
public void onTimer(
OnTimerContext c,
@StateId("value") ValueState<Integer> value,
@StateId("count") ValueState<Integer> count,
@TimerId("actionTimers") Timer timers) {

if (value.read() == null) {
throw new IllegalStateException("BINGO!");
}
Integer counter = Optional.ofNullable(count.read()).orElse(0) + 1;
count.write(counter);
value.write(value.read() + counter);

if (counter < LOOP_COUNT) {
timers.set(c.timestamp().plus(Duration.standardSeconds(1)));
}
}
}
}

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to