[ 
https://issues.apache.org/jira/browse/BEAM-3806?focusedWorklogId=79153&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-79153
 ]

ASF GitHub Bot logged work on BEAM-3806:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Mar/18 01:07
            Start Date: 10/Mar/18 01:07
    Worklog Time Spent: 10m 
      Work Description: bjchambers closed pull request #4829: [BEAM-3806] Fix 
direct-runner hang
URL: https://github.com/apache/beam/pull/4829
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index b01f166b625..747a6671941 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -26,7 +26,6 @@
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.SortedMultiset;
@@ -39,6 +38,7 @@
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -143,7 +143,7 @@
    * timestamp which indicates we have received all of the data and there will 
be no more on-time or
    * late data. This value is represented by {@link 
WatermarkManager#THE_END_OF_TIME}.
    */
-  private interface Watermark {
+  @VisibleForTesting interface Watermark {
     /**
      * Returns the current value of this watermark.
      */
@@ -211,13 +211,13 @@ public static WatermarkUpdate fromTimestamps(Instant 
oldTime, Instant currentTim
    *
    * <p>See {@link #refresh()} for more information.
    */
-  private static class AppliedPTransformInputWatermark implements Watermark {
+  @VisibleForTesting static class AppliedPTransformInputWatermark implements 
Watermark {
     private final Collection<? extends Watermark> inputWatermarks;
     private final SortedMultiset<CommittedBundle<?>> pendingElements;
 
     // This tracks only the quantity of timers at each timestamp, for quickly 
getting the cross-key
     // minimum
-    private final SortedMultiset<Instant> pendingTimers;
+    private final SortedMultiset<TimerData> pendingTimers;
 
     // Entries in this table represent the authoritative timestamp for which
     // a per-key-and-StateNamespace timer is set.
@@ -290,15 +290,15 @@ private synchronized void 
removePending(CommittedBundle<?> completed) {
       pendingElements.remove(completed);
     }
 
-    private synchronized Instant getEarliestTimerTimestamp() {
+    @VisibleForTesting synchronized Instant getEarliestTimerTimestamp() {
       if (pendingTimers.isEmpty()) {
         return BoundedWindow.TIMESTAMP_MAX_VALUE;
       } else {
-        return pendingTimers.firstEntry().getElement();
+        return pendingTimers.firstEntry().getElement().getTimestamp();
       }
     }
 
-    private synchronized void updateTimers(TimerUpdate update) {
+    @VisibleForTesting synchronized void updateTimers(TimerUpdate update) {
       NavigableSet<TimerData> keyTimers =
           objectTimers.computeIfAbsent(update.key, k -> new TreeSet<>());
       Table<StateNamespace, String, TimerData> existingTimersForKey =
@@ -311,10 +311,12 @@ private synchronized void updateTimers(TimerUpdate 
update) {
               existingTimersForKey.get(timer.getNamespace(), 
timer.getTimerId());
 
           if (existingTimer == null) {
-            pendingTimers.add(timer.getTimestamp());
+            pendingTimers.add(timer);
             keyTimers.add(timer);
           } else if (!existingTimer.equals(timer)) {
+            pendingTimers.remove(existingTimer);
             keyTimers.remove(existingTimer);
+            pendingTimers.add(timer);
             keyTimers.add(timer);
           } // else the timer is already set identically, so noop
 
@@ -329,7 +331,7 @@ private synchronized void updateTimers(TimerUpdate update) {
               existingTimersForKey.get(timer.getNamespace(), 
timer.getTimerId());
 
           if (existingTimer != null) {
-            pendingTimers.remove(existingTimer.getTimestamp());
+            pendingTimers.remove(existingTimer);
             keyTimers.remove(existingTimer);
             existingTimersForKey.remove(existingTimer.getNamespace(), 
existingTimer.getTimerId());
           }
@@ -338,12 +340,14 @@ private synchronized void updateTimers(TimerUpdate 
update) {
 
       for (TimerData timer : update.getCompletedTimers()) {
         if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
-          pendingTimers.remove(timer.getTimestamp());
+          keyTimers.remove(timer);
+          pendingTimers.remove(timer);
         }
       }
     }
 
-    private synchronized Map<StructuralKey<?>, List<TimerData>> 
extractFiredEventTimeTimers() {
+    @VisibleForTesting
+    synchronized Map<StructuralKey<?>, List<TimerData>> 
extractFiredEventTimeTimers() {
       return extractFiredTimers(currentWatermark.get(), objectTimers);
     }
 
@@ -1363,9 +1367,9 @@ public static TimerUpdateBuilder builder(StructuralKey<?> 
key) {
 
       private TimerUpdateBuilder(StructuralKey<?> key) {
         this.key = key;
-        this.completedTimers = new HashSet<>();
-        this.setTimers = new HashSet<>();
-        this.deletedTimers = new HashSet<>();
+        this.completedTimers = new LinkedHashSet<>();
+        this.setTimers = new LinkedHashSet<>();
+        this.deletedTimers = new LinkedHashSet<>();
       }
 
       /**
@@ -1409,9 +1413,9 @@ public TimerUpdateBuilder deletedTimer(TimerData 
deletedTimer) {
       public TimerUpdate build() {
         return new TimerUpdate(
             key,
-            ImmutableSet.copyOf(completedTimers),
-            ImmutableSet.copyOf(setTimers),
-            ImmutableSet.copyOf(deletedTimers));
+            ImmutableList.copyOf(completedTimers),
+            ImmutableList.copyOf(setTimers),
+            ImmutableList.copyOf(deletedTimers));
       }
     }
 
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index c0ad157e3c9..4f6d4da174f 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -19,10 +19,14 @@
 
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
@@ -32,15 +36,18 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
+import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
+import 
org.apache.beam.runners.direct.WatermarkManager.AppliedPTransformInputWatermark;
 import org.apache.beam.runners.direct.WatermarkManager.FiredTimers;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import 
org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
+import org.apache.beam.runners.direct.WatermarkManager.Watermark;
 import org.apache.beam.runners.local.StructuralKey;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -76,6 +83,7 @@
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
 
 /**
  * Tests for {@link WatermarkManager}.
@@ -1475,6 +1483,52 @@ public void eventTimeTimersCanBeReset() {
     assertThat(firstFired.getTimers(), contains(overridingTimer));
   }
 
+  @Test
+  public void inputWatermarkDuplicates() {
+    Watermark mockWatermark = Mockito.mock(Watermark.class);
+
+    AppliedPTransformInputWatermark underTest =
+        new AppliedPTransformInputWatermark(ImmutableList.of(mockWatermark));
+
+    // Refresh
+    when(mockWatermark.get()).thenReturn(new Instant(0));
+    underTest.refresh();
+    assertEquals(new Instant(0), underTest.get());
+
+    // Apply a timer update
+    StructuralKey<String> key = StructuralKey.of("key", StringUtf8Coder.of());
+    TimerData timer1 = TimerData
+        .of("a", StateNamespaces.global(), new Instant(100), 
TimeDomain.EVENT_TIME);
+    TimerData timer2 = TimerData
+        .of("a", StateNamespaces.global(), new Instant(200), 
TimeDomain.EVENT_TIME);
+    
underTest.updateTimers(TimerUpdate.builder(key).setTimer(timer1).setTimer(timer2).build());
+
+    // Only the last timer update should be observable
+    assertEquals(timer2.getTimestamp(), underTest.getEarliestTimerTimestamp());
+
+    // Advance the input watermark
+    when(mockWatermark.get()).thenReturn(new Instant(1000));
+    underTest.refresh();
+    assertEquals(new Instant(1000), underTest.get()); // input watermark is 
not held by timers
+
+    // Examine the fired event time timers
+    Map<StructuralKey<?>, List<TimerData>> fired = 
underTest.extractFiredEventTimeTimers();
+    List<TimerData> timers = fired.get(key);
+    assertNotNull(timers);
+    assertThat(timers, contains(timer2));
+
+    // Update based on timer firings
+    underTest.updateTimers(TimerUpdate.builder(key)
+        .withCompletedTimers(timers).build());
+
+    // Now we should be able to advance
+    assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, 
underTest.getEarliestTimerTimestamp());
+
+    // Nothing left to fire
+    fired = underTest.extractFiredEventTimeTimers();
+    assertThat(fired.entrySet(), empty());
+  }
+
   @Test
   public void timerUpdateBuilderBuildAddsAllAddedTimers() {
     TimerData set = TimerData.of(StateNamespaces.global(), new Instant(10L), 
TimeDomain.EVENT_TIME);
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index f6f05ad03a8..b5026b05a50 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -3353,4 +3353,37 @@ public void onTimer(OnTimerContext c, PipelineOptions 
options) {
 
     pipeline.run();
   }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void duplicateTimerSetting() {
+    TestStream<KV<String, String>> stream = TestStream
+        .create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
+        .addElements(KV.of("key1", "v1"))
+        .advanceWatermarkToInfinity();
+
+    PCollection<String> result = pipeline
+        .apply(stream)
+        .apply(ParDo.of(new TwoTimerDoFn()));
+    PAssert.that(result).containsInAnyOrder("It works");
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  private static class TwoTimerDoFn extends DoFn<KV<String, String>, String> {
+    @TimerId("timer")
+    private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    @ProcessElement
+    public void process(ProcessContext c,
+        @TimerId("timer") Timer timer) {
+      timer.offset(Duration.standardMinutes(10)).setRelative();
+      timer.offset(Duration.standardMinutes(30)).setRelative();
+    }
+
+    @OnTimer("timer")
+    public void onTimer(OnTimerContext c, @TimerId("timer") Timer timer) {
+      c.output("It works");
+    }
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 79153)
    Time Spent: 0.5h  (was: 20m)

> DirectRunner hangs if multiple timers set in the same bundle
> ------------------------------------------------------------
>
>                 Key: BEAM-3806
>                 URL: https://issues.apache.org/jira/browse/BEAM-3806
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>            Reporter: Ben Chambers
>            Assignee: Thomas Groh
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> See the repro below:
> {code:java}
> package com.simbly.beam.cassandra;
> import org.apache.beam.sdk.coders.KvCoder;
> import org.apache.beam.sdk.coders.StringUtf8Coder;
> import org.apache.beam.sdk.state.TimeDomain;
> import org.apache.beam.sdk.state.Timer;
> import org.apache.beam.sdk.state.TimerSpec;
> import org.apache.beam.sdk.state.TimerSpecs;
> import org.apache.beam.sdk.testing.PAssert;
> import org.apache.beam.sdk.testing.TestPipeline;
> import org.apache.beam.sdk.testing.TestStream;
> import org.apache.beam.sdk.transforms.DoFn;
> import org.apache.beam.sdk.transforms.ParDo;
> import org.apache.beam.sdk.values.KV;
> import org.apache.beam.sdk.values.PCollection;
> import org.joda.time.Duration;
> import org.junit.Rule;
> import org.junit.Test;
> public class DirectRunnerTest {
>   @Rule
>   public TestPipeline pipeline = TestPipeline.create();
>   @Test
>   public void badTimerBehavior() {
>     TestStream<KV<String, String>> stream = TestStream
>         .create(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
>         .addElements(KV.of("key1", "v1"))
>         .advanceWatermarkToInfinity();
>     PCollection<String> result = pipeline
>         .apply(stream)
>         .apply(ParDo.of(new TestDoFn()));
>     PAssert.that(result).containsInAnyOrder("It works");
>     pipeline.run().waitUntilFinish();
>   }
>   private static class TestDoFn extends DoFn<KV<String, String>, String> {
>     @TimerId("timer")
>     private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
>     @ProcessElement
>     public void process(ProcessContext c,
>         @TimerId("timer") Timer timer) {
>       timer.offset(Duration.standardMinutes(10)).setRelative();
>       timer.offset(Duration.standardMinutes(30)).setRelative();
>     }
>     @OnTimer("timer")
>     public void onTimer(OnTimerContext c, @TimerId("timer") Timer timer) {
>       c.output("It works");
>     }
>   }
> }
> {code}
> From inspection, this seems to be caused by the logic in 
> [WatermarkManager|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java#L313],
>  which does the following if there are multiple timers for akey:
>  # Adds the first timer to the `pendingTimers`, `keyTimers`, and 
> `existingTimersForKey`.
>  # Removes the first timer from `keyTimers`
>  # Adds the second timer to `keyTimers` and `existingTimersForKey`.
> This leads to inconsistencies since pendingTimers has only the first timer, 
> keyTimers only the second, and existingTimers has both. This becomes more 
> problematic since one of these lists is used for *firing* (and thus releasing 
> holds) and the other is used for holds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to