damccorm opened a new issue, #20966:
URL: https://github.com/apache/beam/issues/20966

   State is set to `null` while active timer is present, this issue does not 
show in other runners.
   
   The following example will reach the IllegalStateException within 10-20 
times of it being run. `LOOP_COUNT` does not seem to be a factor as it 
reproduces with 100 or 100000 `LOOP_COUNT`. The number of keys is a factor as 
it did not reproduce with only one key, have not tried with more than 3 keys to 
see if it's easier to reproduce. 
    
   ```
   
   package test;
   
   import org.apache.beam.sdk.Pipeline;
   import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
   import
   org.apache.beam.sdk.coders.KvCoder;
   import org.apache.beam.sdk.state.StateSpec;
   import org.apache.beam.sdk.state.StateSpecs;
   import
   org.apache.beam.sdk.state.TimeDomain;
   import org.apache.beam.sdk.state.Timer;
   import org.apache.beam.sdk.state.TimerSpec;
   import
   org.apache.beam.sdk.state.TimerSpecs;
   import org.apache.beam.sdk.state.ValueState;
   import org.apache.beam.sdk.testing.TestStream;
   import
   org.apache.beam.sdk.transforms.DoFn;
   import org.apache.beam.sdk.transforms.PTransform;
   import org.apache.beam.sdk.transforms.ParDo;
   import
   org.apache.beam.sdk.transforms.WithKeys;
   import org.apache.beam.sdk.values.KV;
   import org.apache.beam.sdk.values.PCollection;
   import
   org.joda.time.Duration;
   import org.joda.time.Instant;
   
   import java.util.Optional;
    
   
   public class
   Test {
   
      public static void main (String [] args) throws Exception{
          Test.testToFailure();
   
      }
   
      public
   static void testToFailure() throws Exception {
          int count = 0;
   
          while (true) {
              failingTest();
              System.out.println(
                      String.format("Got
   to Count %s", String.valueOf(count++)));
          }
      }
   
      public static void failingTest() throws
   Exception {
          Pipeline p = Pipeline.create();
   
          Instant now = Instant.now();
          TestStream<Integer>
   stream =
                  TestStream.create(BigEndianIntegerCoder.of())
                          .addElements(1)
                          
.advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
                          .addElements(2)
                          
.advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
                          .addElements(3)
                          .advanceWatermarkToInfinity();
   
          p.apply(stream)
                  .apply(WithKeys.of(x
   -> x))
                  .setCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
BigEndianIntegerCoder.of()))
                  .apply(new
   TestToFail());
          p.run();
      }
   
      public static class TestToFail
              extends PTransform<PCollection<KV<Integer,
   Integer>>, PCollection<Integer>> {
   
          @Override
          public PCollection<Integer> expand(PCollection<KV<Integer,
   Integer>> input) {
              return input.apply(ParDo.of(new LoopingRead()));
          }
      }
   
      public
   static class LoopingRead extends DoFn<KV<Integer, Integer>, Integer> {
   
          static int LOOP_COUNT
   = 100;
   
          @StateId("value")
          private final StateSpec<ValueState<Integer>> value =
                  StateSpecs.value(BigEndianIntegerCoder.of());
   
          @StateId("count")
          private
   final StateSpec<ValueState<Integer>> count =
                  StateSpecs.value(BigEndianIntegerCoder.of());
   
          @TimerId("actionTimers")
          private
   final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
   
          @ProcessElement
          public
   void processElement(
                  ProcessContext c,
                  @StateId("value") ValueState<Integer>
   value,
                  @TimerId("actionTimers") Timer timers) {
   
              value.write(c.element().getValue());
              timers.set(c.timestamp().plus(Duration.millis(1000)));
          }
   
          /**
   */
          @OnTimer("actionTimers")
          public void onTimer(
                  OnTimerContext c,
                  @StateId("value")
   ValueState<Integer> value,
                  @StateId("count") ValueState<Integer> count,
                  @TimerId("actionTimers")
   Timer timers) {
   
              if (value.read() == null) {
                  throw new IllegalStateException("BINGO!");
              }
              Integer
   counter = Optional.ofNullable(count.read()).orElse(0) + 1;
              count.write(counter);
              value.write(value.read()
   + counter);
   
              if (counter < LOOP_COUNT) {
                  timers.set(c.timestamp().plus(Duration.standardSeconds(1)));
              }
          }
      }
   }
   
   ```
   
   
   Imported from Jira 
[BEAM-11971](https://issues.apache.org/jira/browse/BEAM-11971). Original Jira 
may contain additional context.
   Reported by: [email protected].


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to