[ 
https://issues.apache.org/jira/browse/BEAM-3806?focusedWorklogId=79327&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79327
 ]

ASF GitHub Bot logged work on BEAM-3806:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Mar/18 03:27
            Start Date: 12/Mar/18 03:27
    Worklog Time Spent: 10m 
      Work Description: aljoscha commented on issue #4829: [BEAM-3806] Fix 
direct-runner hang
URL: https://github.com/apache/beam/pull/4829#issuecomment-372184334
 
 
   I think this broke `ParDoTest` for runners that don't support `TestStream` 
because the news tests are missing a `UsesTestStream` annotation.
   
   Fix: https://github.com/apache/beam/pull/4849

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 79327)
    Time Spent: 40m  (was: 0.5h)

> DirectRunner hangs if multiple timers set in the same bundle
> ------------------------------------------------------------
>
>                 Key: BEAM-3806
>                 URL: https://issues.apache.org/jira/browse/BEAM-3806
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>            Reporter: Ben Chambers
>            Assignee: Thomas Groh
>            Priority: Major
>             Fix For: 2.5.0
>
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> See the repro below:
> {code:java}
> package com.simbly.beam.cassandra;
> import org.apache.beam.sdk.coders.KvCoder;
> import org.apache.beam.sdk.coders.StringUtf8Coder;
> import org.apache.beam.sdk.state.TimeDomain;
> import org.apache.beam.sdk.state.Timer;
> import org.apache.beam.sdk.state.TimerSpec;
> import org.apache.beam.sdk.state.TimerSpecs;
> import org.apache.beam.sdk.testing.PAssert;
> import org.apache.beam.sdk.testing.TestPipeline;
> import org.apache.beam.sdk.testing.TestStream;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> import org.joda.time.Duration;
> import org.junit.Rule;
> import org.junit.Test;
> public class DirectRunnerTest {
>   @Rule
>   public TestPipeline pipeline = TestPipeline.create();
>   @Test
>   public void badTimerBehavior() {
>     TestStream<KV<String, String>> stream = TestStream
>         .create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
>         .addElements(KV.of("key1", "v1"))
>         .advanceWatermarkToInfinity();
>     PCollection<String> result = pipeline
>         .apply(stream)
>         .apply(ParDo.of(new TestDoFn()));
>     PAssert.that(result).containsInAnyOrder("It works");
>     pipeline.run().waitUntilFinish();
>   }
>   private static class TestDoFn extends DoFn<KV<String, String>, String> {
>     @TimerId("timer")
>     private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>     @ProcessElement
>     public void process(ProcessContext c,
>         @TimerId("timer") Timer timer) {
>       timer.offset(Duration.standardMinutes(10)).setRelative();
>       timer.offset(Duration.standardMinutes(30)).setRelative();
>     }
>     @OnTimer("timer")
>     public void onTimer(OnTimerContext c, @TimerId("timer") Timer timer) {
>       c.output("It works");
>     }
>   }
> }
> {code}
> From inspection, this seems to be caused by the logic in 
> [WatermarkManager|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java#L313],
>  which does the following if there are multiple timers for akey:
>  # Adds the first timer to the `pendingTimers`, `keyTimers`, and 
> `existingTimersForKey`.
>  # Removes the first timer from `keyTimers`
>  # Adds the second timer to `keyTimers` and `existingTimersForKey`.
> This leads to inconsistencies since pendingTimers has only the first timer, 
> keyTimers only the second, and existingTimers has both. This becomes more 
> problematic since one of these lists is used for *firing* (and thus releasing 
> holds) and the other is used for holds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to