[ 
https://issues.apache.org/jira/browse/BEAM-11971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reuven Lax updated BEAM-11971:
------------------------------

The cause is that the GC timer occasionally fires before the looping timer is 
done. This means that the DirectRunner is violating Beam's contract on timer 
ordering. For a given key, all timers must fire in order (no guarantee across 
keys). The GC timer is set past the end of the window (in this example, the 
Global window), so it should not fire as long as an unfired user timer is 
active.

Several issues are causing this bug:
 * The DirectRunner implements timer ordering by short circuiting the bundle 
and "pushing back" the remaining  timers to the next bundle. This technique is 
hard to validate and appears racy. A more direct approach with a priority queue 
(such as used by FnApiDoFnRunner) is preferred.
 * DirectTimerInternals maintains timers in three separate data structures. 
Appears to be at least one consistency bug caused by looping timres (titmer 
removed from existingTimers even though it was reset)
 * Finally, the main cause of this bug: the direct runner uses various 
Concurrent* data structures. However these data structures provide only weak 
consistency when iterating. One example of this: the runner tries not to fire 
timers for a transform if any bundles for that transform are in flight. This is 
important for consistency, and can cause misordered timers if violated. This is 
accomplished by passing the set of transforms with in-flight bundles in here:

[https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java#L186]

However inflightBundles is a ConcurrentHashMap, so the keySet() iterator may 
provide a stale view. A transform recently added to the map might not appear in 
keySet(), breaking consistency of QuiescenceDriver.

The simple fix is to replace these concurrent data strtuctures with standard 
data structures guarded by synchronized blocks. 

 

> Direct Runner State is null while active timers exist
> -----------------------------------------------------
>
>                 Key: BEAM-11971
>                 URL: https://issues.apache.org/jira/browse/BEAM-11971
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>            Reporter: Reza ardeshir rokni
>            Priority: P3
>
> State is set to {{null}} while active timer is present, this issue does not 
> show in other runners.
> The following example will reach the IllegalStateException within 10-20 times 
> of it being run. {{LOOP_COUNT}} does not seem to be a factor as it reproduces 
> with 100 or 100000 {{LOOP_COUNT}}. The number of keys is a factor as it did 
> not reproduce with only one key, have not tried with more than 3 keys to see 
> if it's easier to reproduce. 
>  
> {code}
> package test;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
> import org.apache.beam.sdk.coders.KvCoder;
> import org.apache.beam.sdk.state.StateSpec;
> import org.apache.beam.sdk.state.StateSpecs;
> import org.apache.beam.sdk.state.TimeDomain;
> import org.apache.beam.sdk.state.Timer;
> import org.apache.beam.sdk.state.TimerSpec;
> import org.apache.beam.sdk.state.TimerSpecs;
> import org.apache.beam.sdk.state.ValueState;
> import org.apache.beam.sdk.testing.TestStream;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.PTransform;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.WithKeys;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> import org.joda.time.Duration;
> import org.joda.time.Instant;
> import java.util.Optional;
>  
> public class Test {
>    public static void main (String [] args) throws Exception{
>        Test.testToFailure();
>    }
>    public static void testToFailure() throws Exception {
>        int count = 0;
>        while (true) {
>            failingTest();
>            System.out.println(
>                    String.format("Got to Count %s", String.valueOf(count++)));
>        }
>    }
>    public static void failingTest() throws Exception {
>        Pipeline p = Pipeline.create();
>        Instant now = Instant.now();
>        TestStream<Integer> stream =
>                TestStream.create(BigEndianIntegerCoder.of())
>                        .addElements(1)
>                        
> .advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
>                        .addElements(2)
>                        
> .advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
>                        .addElements(3)
>                        .advanceWatermarkToInfinity();
>        p.apply(stream)
>                .apply(WithKeys.of(x -> x))
>                .setCoder(KvCoder.of(BigEndianIntegerCoder.of(), 
> BigEndianIntegerCoder.of()))
>                .apply(new TestToFail());
>        p.run();
>    }
>    public static class TestToFail
>            extends PTransform<PCollection<KV<Integer, Integer>>, 
> PCollection<Integer>> {
>        @Override
>        public PCollection<Integer> expand(PCollection<KV<Integer, Integer>> 
> input) {
>            return input.apply(ParDo.of(new LoopingRead()));
>        }
>    }
>    public static class LoopingRead extends DoFn<KV<Integer, Integer>, 
> Integer> {
>        static int LOOP_COUNT = 100;
>        @StateId("value")
>        private final StateSpec<ValueState<Integer>> value =
>                StateSpecs.value(BigEndianIntegerCoder.of());
>        @StateId("count")
>        private final StateSpec<ValueState<Integer>> count =
>                StateSpecs.value(BigEndianIntegerCoder.of());
>        @TimerId("actionTimers")
>        private final TimerSpec timer = 
> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>        @ProcessElement
>        public void processElement(
>                ProcessContext c,
>                @StateId("value") ValueState<Integer> value,
>                @TimerId("actionTimers") Timer timers) {
>            value.write(c.element().getValue());
>            timers.set(c.timestamp().plus(Duration.millis(1000)));
>        }
>        /** */
>        @OnTimer("actionTimers")
>        public void onTimer(
>                OnTimerContext c,
>                @StateId("value") ValueState<Integer> value,
>                @StateId("count") ValueState<Integer> count,
>                @TimerId("actionTimers") Timer timers) {
>            if (value.read() == null) {
>                throw new IllegalStateException("BINGO!");
>            }
>            Integer counter = Optional.ofNullable(count.read()).orElse(0) + 1;
>            count.write(counter);
>            value.write(value.read() + counter);
>            if (counter < LOOP_COUNT) {
>                timers.set(c.timestamp().plus(Duration.standardSeconds(1)));
>            }
>        }
>    }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to