[
https://issues.apache.org/jira/browse/BEAM-11971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Reuven Lax updated BEAM-11971:
------------------------------
The cause is that the GC timer occasionally fires before the looping timer is
done. This means that the DirectRunner is violating Beam's contract on timer
ordering. For a given key, all timers must fire in order (no guarantee across
keys). The GC timer is set past the end of the window (in this example, the
Global window), so it should not fire as long as an unfired user timer is
active.
Several issues are causing this bug:
* The DirectRunner implements timer ordering by short circuiting the bundle
and "pushing back" the remaining timers to the next bundle. This technique is
hard to validate and appears racy. A more direct approach with a priority queue
(such as used by FnApiDoFnRunner) is preferred.
* DirectTimerInternals maintains timers in three separate data structures.
Appears to be at least one consistency bug caused by looping timres (titmer
removed from existingTimers even though it was reset)
* Finally, the main cause of this bug: the direct runner uses various
Concurrent* data structures. However these data structures provide only weak
consistency when iterating. One example of this: the runner tries not to fire
timers for a transform if any bundles for that transform are in flight. This is
important for consistency, and can cause misordered timers if violated. This is
accomplished by passing the set of transforms with in-flight bundles in here:
[https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java#L186]
However inflightBundles is a ConcurrentHashMap, so the keySet() iterator may
provide a stale view. A transform recently added to the map might not appear in
keySet(), breaking consistency of QuiescenceDriver.
The simple fix is to replace these concurrent data strtuctures with standard
data structures guarded by synchronized blocks.
> Direct Runner State is null while active timers exist
> -----------------------------------------------------
>
> Key: BEAM-11971
> URL: https://issues.apache.org/jira/browse/BEAM-11971
> Project: Beam
> Issue Type: Bug
> Components: runner-direct
> Reporter: Reza ardeshir rokni
> Priority: P3
>
> State is set to {{null}} while active timer is present, this issue does not
> show in other runners.
> The following example will reach the IllegalStateException within 10-20 times
> of it being run. {{LOOP_COUNT}} does not seem to be a factor as it reproduces
> with 100 or 100000 {{LOOP_COUNT}}. The number of keys is a factor as it did
> not reproduce with only one key, have not tried with more than 3 keys to see
> if it's easier to reproduce.
>
> {code}
> package test;
> import org.apache.beam.sdk.Pipeline;
> import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
> import org.apache.beam.sdk.coders.KvCoder;
> import org.apache.beam.sdk.state.StateSpec;
> import org.apache.beam.sdk.state.StateSpecs;
> import org.apache.beam.sdk.state.TimeDomain;
> import org.apache.beam.sdk.state.Timer;
> import org.apache.beam.sdk.state.TimerSpec;
> import org.apache.beam.sdk.state.TimerSpecs;
> import org.apache.beam.sdk.state.ValueState;
> import org.apache.beam.sdk.testing.TestStream;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.PTransform;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.transforms.WithKeys;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> import org.joda.time.Duration;
> import org.joda.time.Instant;
> import java.util.Optional;
>
> public class Test {
> public static void main (String [] args) throws Exception{
> Test.testToFailure();
> }
> public static void testToFailure() throws Exception {
> int count = 0;
> while (true) {
> failingTest();
> System.out.println(
> String.format("Got to Count %s", String.valueOf(count++)));
> }
> }
> public static void failingTest() throws Exception {
> Pipeline p = Pipeline.create();
> Instant now = Instant.now();
> TestStream<Integer> stream =
> TestStream.create(BigEndianIntegerCoder.of())
> .addElements(1)
>
> .advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
> .addElements(2)
>
> .advanceWatermarkTo(now.plus(Duration.standardSeconds(1)))
> .addElements(3)
> .advanceWatermarkToInfinity();
> p.apply(stream)
> .apply(WithKeys.of(x -> x))
> .setCoder(KvCoder.of(BigEndianIntegerCoder.of(),
> BigEndianIntegerCoder.of()))
> .apply(new TestToFail());
> p.run();
> }
> public static class TestToFail
> extends PTransform<PCollection<KV<Integer, Integer>>,
> PCollection<Integer>> {
> @Override
> public PCollection<Integer> expand(PCollection<KV<Integer, Integer>>
> input) {
> return input.apply(ParDo.of(new LoopingRead()));
> }
> }
> public static class LoopingRead extends DoFn<KV<Integer, Integer>,
> Integer> {
> static int LOOP_COUNT = 100;
> @StateId("value")
> private final StateSpec<ValueState<Integer>> value =
> StateSpecs.value(BigEndianIntegerCoder.of());
> @StateId("count")
> private final StateSpec<ValueState<Integer>> count =
> StateSpecs.value(BigEndianIntegerCoder.of());
> @TimerId("actionTimers")
> private final TimerSpec timer =
> TimerSpecs.timer(TimeDomain.EVENT_TIME);
> @ProcessElement
> public void processElement(
> ProcessContext c,
> @StateId("value") ValueState<Integer> value,
> @TimerId("actionTimers") Timer timers) {
> value.write(c.element().getValue());
> timers.set(c.timestamp().plus(Duration.millis(1000)));
> }
> /** */
> @OnTimer("actionTimers")
> public void onTimer(
> OnTimerContext c,
> @StateId("value") ValueState<Integer> value,
> @StateId("count") ValueState<Integer> count,
> @TimerId("actionTimers") Timer timers) {
> if (value.read() == null) {
> throw new IllegalStateException("BINGO!");
> }
> Integer counter = Optional.ofNullable(count.read()).orElse(0) + 1;
> count.write(counter);
> value.write(value.read() + counter);
> if (counter < LOOP_COUNT) {
> timers.set(c.timestamp().plus(Duration.standardSeconds(1)));
> }
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)