This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch revert-15056-timer_clear_and_watermark
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 280914177712c1dbf3bb558b248954ab1197c662
Author: reuvenlax <[email protected]>
AuthorDate: Wed Jul 7 11:57:46 2021 -0700

    Revert "Merge pull request #15056: [BEAM-10887] Timer clear"
    
    This reverts commit 2c6f42c596b6356f25454dc3b86055d2f40448d5.
---
 .../beam/runners/core/InMemoryTimerInternals.java  |   3 +-
 .../apache/beam/runners/core/SimpleDoFnRunner.java |   5 -
 .../apache/beam/runners/core/TimerInternals.java   |  20 +-
 .../beam/runners/direct/DirectTimerInternals.java  |  58 +-
 .../direct/StatefulParDoEvaluatorFactory.java      |  56 +-
 .../beam/runners/direct/WatermarkManager.java      |  89 +--
 .../wrappers/streaming/DoFnOperator.java           |   4 +-
 .../streaming/ExecutableStageDoFnOperator.java     |   3 +-
 runners/google-cloud-dataflow-java/build.gradle    |   1 +
 .../worker/StreamingModeExecutionContext.java      |  96 +--
 .../dataflow/worker/WindmillTimerInternals.java    |  39 +-
 .../beam/runners/samza/runtime/KeyedInternals.java |   5 +-
 .../samza/runtime/SamzaTimerInternalsFactory.java  |   3 +-
 .../spark/stateful/SparkTimerInternals.java        |   2 +-
 .../main/java/org/apache/beam/sdk/state/Timer.java |   3 -
 .../org/apache/beam/sdk/transforms/ParDoTest.java  | 807 ++++++---------------
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    |  11 -
 .../beam/fn/harness/FnApiDoFnRunnerTest.java       |  16 -
 18 files changed, 308 insertions(+), 913 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 863a92b..d0b3bed 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -153,8 +153,7 @@ public class InMemoryTimerInternals implements 
TimerInternals {
   }
 
   @Override
-  public void deleteTimer(
-      StateNamespace namespace, String timerId, String timerFamilyId, 
TimeDomain timeDomain) {
+  public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain 
timeDomain) {
     throw new UnsupportedOperationException("Canceling a timer by ID is not 
yet supported.");
   }
 
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 0676915..b21252e 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -1145,11 +1145,6 @@ public class SimpleDoFnRunner<InputT, OutputT> 
implements DoFnRunner<InputT, Out
     }
 
     @Override
-    public void clear() {
-      timerInternals.deleteTimer(namespace, timerId, timerFamilyId, 
spec.getTimeDomain());
-    }
-
-    @Override
     public Timer offset(Duration offset) {
       this.offset = offset;
       return this;
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index f93a45a..05d8827 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -76,8 +76,7 @@ public interface TimerInternals {
    * manage timers for different time domains in very different ways, thus the 
{@link TimeDomain} is
    * a required parameter.
    */
-  void deleteTimer(
-      StateNamespace namespace, String timerId, String timerFamilyId, 
TimeDomain timeDomain);
+  void deleteTimer(StateNamespace namespace, String timerId, TimeDomain 
timeDomain);
 
   /** @deprecated use {@link #deleteTimer(StateNamespace, String, 
TimeDomain)}. */
   @Deprecated
@@ -186,8 +185,6 @@ public interface TimerInternals {
 
     public abstract TimeDomain getDomain();
 
-    public abstract boolean getDeleted();
-
     // When adding a new field, make sure to add it to the compareTo() method.
 
     /** Construct a {@link TimerData} for the given parameters. */
@@ -198,7 +195,7 @@ public interface TimerInternals {
         Instant outputTimestamp,
         TimeDomain domain) {
       return new AutoValue_TimerInternals_TimerData(
-          timerId, "", namespace, timestamp, outputTimestamp, domain, false);
+          timerId, "", namespace, timestamp, outputTimestamp, domain);
     }
 
     /**
@@ -213,7 +210,7 @@ public interface TimerInternals {
         Instant outputTimestamp,
         TimeDomain domain) {
       return new AutoValue_TimerInternals_TimerData(
-          timerId, timerFamilyId, namespace, timestamp, outputTimestamp, 
domain, false);
+          timerId, timerFamilyId, namespace, timestamp, outputTimestamp, 
domain);
     }
 
     /**
@@ -231,17 +228,6 @@ public interface TimerInternals {
       return of(timerId, namespace, timestamp, outputTimestamp, domain);
     }
 
-    public TimerData deleted() {
-      return new AutoValue_TimerInternals_TimerData(
-          getTimerId(),
-          getTimerFamilyId(),
-          getNamespace(),
-          getTimestamp(),
-          getOutputTimestamp(),
-          getDomain(),
-          true);
-    }
-
     /**
      * {@inheritDoc}.
      *
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 5a477bb..d240e1b 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -17,13 +17,13 @@
  */
 package org.apache.beam.runners.direct;
 
+import java.util.stream.StreamSupport;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import 
org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
 import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 
@@ -71,16 +71,8 @@ class DirectTimerInternals implements TimerInternals {
   }
 
   @Override
-  public void deleteTimer(
-      StateNamespace namespace, String timerId, String timerFamilyId, 
TimeDomain timeDomain) {
-    deleteTimer(
-        TimerData.of(
-            timerId,
-            timerFamilyId,
-            namespace,
-            BoundedWindow.TIMESTAMP_MIN_VALUE,
-            BoundedWindow.TIMESTAMP_MAX_VALUE,
-            timeDomain));
+  public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain 
timeDomain) {
+    throw new UnsupportedOperationException("Canceling of timer by ID is not 
yet supported.");
   }
 
   /** @deprecated use {@link #deleteTimer(StateNamespace, String, 
TimeDomain)}. */
@@ -101,19 +93,10 @@ class DirectTimerInternals implements TimerInternals {
     return timerUpdateBuilder.build();
   }
 
-  public boolean containsUpdateForTimeBefore(
-      Instant maxWatermarkTime, Instant maxProcessingTime, Instant 
maxSynchronizedProcessingTime) {
+  public boolean containsUpdateForTimeBefore(Instant time) {
     TimerUpdate update = timerUpdateBuilder.build();
-    return hasTimeBefore(
-            update.getSetTimers(),
-            maxWatermarkTime,
-            maxProcessingTime,
-            maxSynchronizedProcessingTime)
-        || hasTimeBefore(
-            update.getDeletedTimers(),
-            maxWatermarkTime,
-            maxProcessingTime,
-            maxSynchronizedProcessingTime);
+    return hasTimeBefore(update.getSetTimers(), time)
+        || hasTimeBefore(update.getDeletedTimers(), time);
   }
 
   @Override
@@ -136,31 +119,8 @@ class DirectTimerInternals implements TimerInternals {
     return watermarks.getOutputWatermark();
   }
 
-  private boolean hasTimeBefore(
-      Iterable<? extends TimerData> timers,
-      Instant maxWatermarkTime,
-      Instant maxProcessingTime,
-      Instant maxSynchronizedProcessingTime) {
-    for (TimerData timerData : timers) {
-      Instant currentTime;
-      switch (timerData.getDomain()) {
-        case EVENT_TIME:
-          currentTime = maxWatermarkTime;
-          break;
-        case PROCESSING_TIME:
-          currentTime = maxProcessingTime;
-          break;
-        case SYNCHRONIZED_PROCESSING_TIME:
-          currentTime = maxSynchronizedProcessingTime;
-          break;
-        default:
-          throw new RuntimeException("Unexpected timeDomain " + 
timerData.getDomain());
-      }
-      if (timerData.getTimestamp().isBefore(currentTime)
-          || timerData.getTimestamp().isEqual(currentTime)) {
-        return true;
-      }
-    }
-    return false;
+  private boolean hasTimeBefore(Iterable<? extends TimerData> timers, Instant 
time) {
+    return StreamSupport.stream(timers.spliterator(), false)
+        .anyMatch(td -> td.getTimestamp().isBefore(time));
   }
 }
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index ebd305f..4044252 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -47,7 +47,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering;
 import org.joda.time.Instant;
 
 /** A {@link TransformEvaluatorFactory} for stateful {@link ParDo}. */
@@ -174,31 +173,14 @@ final class StatefulParDoEvaluatorFactory<K, InputT, 
OutputT> implements Transfo
       for (WindowedValue<KV<K, InputT>> windowedValue : 
gbkResult.getValue().elementsIterable()) {
         delegateEvaluator.processElement(windowedValue);
       }
+
+      final Instant inputWatermarkTime = 
timerInternals.currentInputWatermarkTime();
       PriorityQueue<TimerData> toBeFiredTimers =
           new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));
+      gbkResult.getValue().timersIterable().forEach(toBeFiredTimers::add);
 
-      Instant maxWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      Instant maxProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      Instant maxSynchronizedProcessingTime = 
BoundedWindow.TIMESTAMP_MIN_VALUE;
-      for (TimerData timerData : gbkResult.getValue().timersIterable()) {
-        toBeFiredTimers.add(timerData);
-        switch (timerData.getDomain()) {
-          case EVENT_TIME:
-            maxWatermarkTime = Ordering.natural().max(maxWatermarkTime, 
timerData.getTimestamp());
-            break;
-          case PROCESSING_TIME:
-            maxProcessingTime = Ordering.natural().max(maxProcessingTime, 
timerData.getTimestamp());
-            break;
-          case SYNCHRONIZED_PROCESSING_TIME:
-            maxSynchronizedProcessingTime =
-                Ordering.natural().max(maxSynchronizedProcessingTime, 
timerData.getTimestamp());
-        }
-      }
-
-      while (!timerInternals.containsUpdateForTimeBefore(
-              maxWatermarkTime, maxProcessingTime, 
maxSynchronizedProcessingTime)
+      while (!timerInternals.containsUpdateForTimeBefore(inputWatermarkTime)
           && !toBeFiredTimers.isEmpty()) {
-
         TimerData timer = toBeFiredTimers.poll();
         checkState(
             timer.getNamespace() instanceof WindowNamespace,
@@ -210,23 +192,13 @@ final class StatefulParDoEvaluatorFactory<K, InputT, 
OutputT> implements Transfo
         BoundedWindow timerWindow = windowNamespace.getWindow();
 
         delegateEvaluator.onTimer(timer, gbkResult.getValue().key(), 
timerWindow);
-        clearWatermarkHold(timer);
-      }
-      pushedBackTimers.addAll(toBeFiredTimers);
-    }
 
-    private void clearWatermarkHold(TimerData timer) {
-      StateTag<WatermarkHoldState> timerWatermarkHoldTag = setTimerTag(timer);
-      stepContext.stateInternals().state(timer.getNamespace(), 
timerWatermarkHoldTag).clear();
-      stepContext.stateInternals().commit();
-    }
+        StateTag<WatermarkHoldState> timerWatermarkHoldTag = 
setTimerTag(timer);
 
-    private void setWatermarkHold(TimerData timer) {
-      StateTag<WatermarkHoldState> timerWatermarkHoldTag = setTimerTag(timer);
-      stepContext
-          .stateInternals()
-          .state(timer.getNamespace(), timerWatermarkHoldTag)
-          .add(timer.getOutputTimestamp());
+        stepContext.stateInternals().state(timer.getNamespace(), 
timerWatermarkHoldTag).clear();
+        stepContext.stateInternals().commit();
+      }
+      pushedBackTimers.addAll(toBeFiredTimers);
     }
 
     @Override
@@ -235,12 +207,14 @@ final class StatefulParDoEvaluatorFactory<K, InputT, 
OutputT> implements Transfo
       TransformResult<KV<K, InputT>> delegateResult = 
delegateEvaluator.finishBundle();
       boolean isTimerDeclared = false;
       for (TimerData timerData : 
delegateResult.getTimerUpdate().getSetTimers()) {
-        setWatermarkHold(timerData);
+        StateTag<WatermarkHoldState> timerWatermarkHoldTag = 
setTimerTag(timerData);
+
+        stepContext
+            .stateInternals()
+            .state(timerData.getNamespace(), timerWatermarkHoldTag)
+            .add(timerData.getOutputTimestamp());
         isTimerDeclared = true;
       }
-      for (TimerData timerData : 
delegateResult.getTimerUpdate().getDeletedTimers()) {
-        clearWatermarkHold(timerData);
-      }
 
       CopyOnAccessInMemoryStateInternals state;
       Instant watermarkHold;
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index a043256..dbc0a96 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -45,6 +46,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.runners.core.StateNamespace;
@@ -64,10 +66,8 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBasedTable;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SortedMultiset;
@@ -635,7 +635,7 @@ class WatermarkManager<ExecutableT, CollectionT> {
       Table<StateNamespace, String, TimerData> existingTimersForKey =
           existingTimers.computeIfAbsent(update.key, k -> 
HashBasedTable.create());
 
-      for (TimerData addedTimer : update.setTimers.values()) {
+      for (TimerData addedTimer : update.setTimers) {
         NavigableSet<TimerData> timerQueue = 
timerMap.get(addedTimer.getDomain());
         if (timerQueue == null) {
           continue;
@@ -659,7 +659,7 @@ class WatermarkManager<ExecutableT, CollectionT> {
             addedTimer);
       }
 
-      for (TimerData deletedTimer : update.deletedTimers.values()) {
+      for (TimerData deletedTimer : update.deletedTimers) {
         NavigableSet<TimerData> timerQueue = 
timerMap.get(deletedTimer.getDomain());
         if (timerQueue == null) {
           continue;
@@ -670,6 +670,7 @@ class WatermarkManager<ExecutableT, CollectionT> {
             existingTimersForKey.get(
                 deletedTimer.getNamespace(),
                 deletedTimer.getTimerId() + '+' + 
deletedTimer.getTimerFamilyId());
+
         if (existingTimer != null) {
           pendingTimers.remove(deletedTimer);
           timerQueue.remove(deletedTimer);
@@ -1547,25 +1548,6 @@ class WatermarkManager<ExecutableT, CollectionT> {
     }
   }
 
-  @AutoValue
-  public abstract static class TimerKey {
-    abstract TimeDomain getDomain();
-
-    abstract String getId();
-
-    abstract String getFamily();
-
-    abstract Object getNamespace();
-
-    static TimerKey of(TimerData timerData) {
-      return new AutoValue_WatermarkManager_TimerKey(
-          timerData.getDomain(),
-          timerData.getTimerId(),
-          timerData.getTimerFamilyId(),
-          timerData.getNamespace().getCacheKey());
-    }
-  }
-
   /**
    * A collection of newly set, deleted, and completed timers.
    *
@@ -1577,8 +1559,8 @@ class WatermarkManager<ExecutableT, CollectionT> {
   public static class TimerUpdate {
     private final StructuralKey<?> key;
     private final Iterable<? extends TimerData> completedTimers;
-    private final Map<TimerKey, ? extends TimerData> setTimers;
-    private final Map<TimerKey, ? extends TimerData> deletedTimers;
+    private final Iterable<? extends TimerData> setTimers;
+    private final Iterable<? extends TimerData> deletedTimers;
     private final Iterable<? extends TimerData> pushedBackTimers;
 
     /** Returns a TimerUpdate for a null key with no timers. */
@@ -1586,8 +1568,8 @@ class WatermarkManager<ExecutableT, CollectionT> {
       return new TimerUpdate(
           null,
           Collections.emptyList(),
-          Collections.emptyMap(),
-          Collections.emptyMap(),
+          Collections.emptyList(),
+          Collections.emptyList(),
           Collections.emptyList());
     }
 
@@ -1605,14 +1587,14 @@ class WatermarkManager<ExecutableT, CollectionT> {
     public static final class TimerUpdateBuilder {
       private final StructuralKey<?> key;
       private final Collection<TimerData> completedTimers;
-      private final Map<TimerKey, TimerData> setTimers;
-      private final Map<TimerKey, TimerData> deletedTimers;
+      private final Collection<TimerData> setTimers;
+      private final Collection<TimerData> deletedTimers;
 
       private TimerUpdateBuilder(StructuralKey<?> key) {
         this.key = key;
-        this.completedTimers = Sets.newLinkedHashSet();
-        this.setTimers = Maps.newLinkedHashMap();
-        this.deletedTimers = Maps.newLinkedHashMap();
+        this.completedTimers = new LinkedHashSet<>();
+        this.setTimers = new LinkedHashSet<>();
+        this.deletedTimers = new LinkedHashSet<>();
       }
 
       /**
@@ -1634,8 +1616,8 @@ class WatermarkManager<ExecutableT, CollectionT> {
             "Got a timer for after the end of time (%s), got %s",
             BoundedWindow.TIMESTAMP_MAX_VALUE,
             setTimer.getTimestamp());
-        deletedTimers.remove(TimerKey.of(setTimer));
-        setTimers.put(TimerKey.of(setTimer), setTimer);
+        deletedTimers.remove(setTimer);
+        setTimers.add(setTimer);
         return this;
       }
 
@@ -1644,9 +1626,8 @@ class WatermarkManager<ExecutableT, CollectionT> {
        * it has previously been set. Returns this {@link TimerUpdateBuilder}.
        */
       public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) {
-        TimerKey key = TimerKey.of(deletedTimer);
-        deletedTimers.put(key, deletedTimer);
-        setTimers.remove(key);
+        deletedTimers.add(deletedTimer);
+        setTimers.remove(deletedTimer);
         return this;
       }
 
@@ -1658,12 +1639,19 @@ class WatermarkManager<ExecutableT, CollectionT> {
         return new TimerUpdate(
             key,
             ImmutableList.copyOf(completedTimers),
-            ImmutableMap.copyOf(setTimers),
-            ImmutableMap.copyOf(deletedTimers),
+            ImmutableList.copyOf(setTimers),
+            ImmutableList.copyOf(deletedTimers),
             Collections.emptyList());
       }
     }
 
+    private static Map<String, TimerData> indexTimerData(Iterable<? extends 
TimerData> timerData) {
+      return StreamSupport.stream(timerData.spliterator(), false)
+          .collect(
+              Collectors.toMap(
+                  TimerUpdate::getTimerIdAndTimerFamilyIdWithNamespace, e -> 
e, (a, b) -> b));
+    }
+
     private static String getTimerIdAndTimerFamilyIdWithNamespace(TimerData 
td) {
       return td.getNamespace() + td.getTimerId() + td.getTimerFamilyId();
     }
@@ -1671,8 +1659,8 @@ class WatermarkManager<ExecutableT, CollectionT> {
     private TimerUpdate(
         StructuralKey<?> key,
         Iterable<? extends TimerData> completedTimers,
-        Map<TimerKey, ? extends TimerData> setTimers,
-        Map<TimerKey, ? extends TimerData> deletedTimers,
+        Iterable<? extends TimerData> setTimers,
+        Iterable<? extends TimerData> deletedTimers,
         Iterable<? extends TimerData> pushedBackTimers) {
       this.key = key;
       this.completedTimers = completedTimers;
@@ -1693,12 +1681,12 @@ class WatermarkManager<ExecutableT, CollectionT> {
 
     @VisibleForTesting
     public Iterable<? extends TimerData> getSetTimers() {
-      return setTimers.values();
+      return setTimers;
     }
 
     @VisibleForTesting
     public Iterable<? extends TimerData> getDeletedTimers() {
-      return deletedTimers.values();
+      return deletedTimers;
     }
 
     Iterable<? extends TimerData> getPushedBackTimers() {
@@ -1707,8 +1695,8 @@ class WatermarkManager<ExecutableT, CollectionT> {
 
     boolean isEmpty() {
       return Iterables.isEmpty(completedTimers)
-          && setTimers.isEmpty()
-          && deletedTimers.isEmpty()
+          && Iterables.isEmpty(setTimers)
+          && Iterables.isEmpty(deletedTimers)
           && Iterables.isEmpty(pushedBackTimers);
     }
 
@@ -1720,18 +1708,17 @@ class WatermarkManager<ExecutableT, CollectionT> {
     public TimerUpdate withCompletedTimers(Iterable<TimerData> 
completedTimers) {
       List<TimerData> timersToComplete = new ArrayList<>();
       Set<TimerData> pushedBack = Sets.newHashSet(pushedBackTimers);
-      Map<TimerKey, TimerData> newSetTimers = Maps.newLinkedHashMap();
-      newSetTimers.putAll(setTimers);
+      Map<String, TimerData> newSetTimers = indexTimerData(setTimers);
       for (TimerData td : completedTimers) {
-        TimerKey timerKey = TimerKey.of(td);
+        String timerIdWithNs = getTimerIdAndTimerFamilyIdWithNamespace(td);
         if (!pushedBack.contains(td)) {
           timersToComplete.add(td);
-        } else if (!newSetTimers.containsKey(timerKey)) {
-          newSetTimers.put(timerKey, td);
+        } else if (!newSetTimers.containsKey(timerIdWithNs)) {
+          newSetTimers.put(timerIdWithNs, td);
         }
       }
       return new TimerUpdate(
-          key, timersToComplete, newSetTimers, deletedTimers, 
Collections.emptyList());
+          key, timersToComplete, newSetTimers.values(), deletedTimers, 
Collections.emptyList());
     }
 
     /**
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 60abddd..7f3dc0b 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -1530,8 +1530,7 @@ public class DoFnOperator<InputT, OutputT>
     }
 
     @Override
-    public void deleteTimer(
-        StateNamespace namespace, String timerId, String timerFamilyId, 
TimeDomain timeDomain) {
+    public void deleteTimer(StateNamespace namespace, String timerId, 
TimeDomain timeDomain) {
       try {
         cancelPendingTimerById(getContextTimerId(timerId, namespace));
       } catch (Exception e) {
@@ -1546,7 +1545,6 @@ public class DoFnOperator<InputT, OutputT>
       deleteTimer(
           timer.getNamespace(),
           constructTimerId(timer.getTimerFamilyId(), timer.getTimerId()),
-          timer.getTimerFamilyId(),
           timer.getDomain());
     }
 
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 0ce771b..47b8aec 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -596,8 +596,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> 
extends DoFnOperator<I
     }
 
     @Override
-    public void deleteTimer(
-        StateNamespace namespace, String timerId, String timerFamilyId, 
TimeDomain timeDomain) {
+    public void deleteTimer(StateNamespace namespace, String timerId, 
TimeDomain timeDomain) {
       throw new UnsupportedOperationException(
           "It is not expected to use SdfFlinkTimerInternals to delete a 
timer");
     }
diff --git a/runners/google-cloud-dataflow-java/build.gradle 
b/runners/google-cloud-dataflow-java/build.gradle
index d03ff2b..e01e441 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -319,6 +319,7 @@ task validatesRunnerStreaming {
       'org.apache.beam.sdk.testing.UsesMapState',
       'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput',
       'org.apache.beam.sdk.testing.UsesSetState',
+      'org.apache.beam.sdk.testing.UsesStrictTimerOrdering',
     ],
   ))
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index 2350b81..1aed4bb 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -29,13 +29,11 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
@@ -63,9 +61,6 @@ import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table.Cell;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -545,7 +540,7 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
               synchronizedProcessingTime);
 
       this.cachedFiredTimers = null;
-      this.orderedUserTimers = null;
+      this.cachedFiredUserTimers = null;
     }
 
     public void flushState() {
@@ -586,81 +581,30 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
     }
 
     // Lazily initialized
-    private NavigableSet<TimerData> orderedUserTimers = null;
-    private Set<String> deletedTimers = null;
+    private Iterator<TimerData> cachedFiredUserTimers = null;
 
     public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> 
windowCoder) {
-      if (orderedUserTimers == null) {
-        orderedUserTimers = Sets.newTreeSet();
-        deletedTimers = Sets.newHashSet();
-        
FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
-            .filter(
-                timer ->
-                    WindmillTimerInternals.isUserTimer(timer)
-                        && timer.getStateFamily().equals(stateFamily))
-            .transform(
-                timer ->
-                    WindmillTimerInternals.windmillTimerToTimerData(
-                        WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, 
windowCoder))
-            .iterator()
-            .forEachRemaining(
-                timer -> {
-                  orderedUserTimers.add(timer);
-                });
-      }
-
-      // Extract recently set or deleted timers. This operation is 
destructive, meaning that each
-      // call returns the modifications since the last call.
-      Table<String, StateNamespace, TimerData> justModifiedTimers =
-          userTimerInternals.extractJustModifiedTimers();
-      for (Cell<String, StateNamespace, TimerData> cell : 
justModifiedTimers.cellSet()) {
-        Instant currentMaxTime;
-        switch (cell.getValue().getDomain()) {
-          case EVENT_TIME:
-            currentMaxTime = userTimerInternals.currentInputWatermarkTime();
-            break;
-          case PROCESSING_TIME:
-            currentMaxTime = userTimerInternals.currentProcessingTime();
-            break;
-          case SYNCHRONIZED_PROCESSING_TIME:
-            currentMaxTime = 
userTimerInternals.currentSynchronizedProcessingTime();
-            break;
-          default:
-            throw new RuntimeException("Unexpected domain " + 
cell.getValue().getDomain());
-        }
-        // If the the modified timer falls within the range of timers eligible 
to fire, insert
-        // it into the priority queue. If it falls outside the range, then 
don't: If it's a
-        // brand-new timer, it won't
-        // affect order in the current bundle. If it's a reset or clear of an 
existing timer, we
-        // will detect this below
-        // before we fire the timer.
-        if (cell.getValue().getTimestamp().isBefore(currentMaxTime)
-            || cell.getValue().getTimestamp().isEqual(currentMaxTime)) {
-          if (!cell.getValue().getDeleted()) {
-            orderedUserTimers.add(cell.getValue());
-          }
-        }
+      if (cachedFiredUserTimers == null) {
+        cachedFiredUserTimers =
+            
FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
+                .filter(
+                    timer ->
+                        WindmillTimerInternals.isUserTimer(timer)
+                            && timer.getStateFamily().equals(stateFamily))
+                .transform(
+                    timer ->
+                        WindmillTimerInternals.windmillTimerToTimerData(
+                            WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, 
timer, windowCoder))
+                .iterator();
       }
 
-      while (!orderedUserTimers.isEmpty()) {
-        TimerData nextTimer = orderedUserTimers.pollFirst();
-        // If the timer for this key is in justModifiedTimers, ignore the old 
value. The new value
-        // for this timer will be elsewhere in the priority queue.
-        @Nullable
-        TimerData updatedTimer =
-            justModifiedTimers.get(
-                WindmillTimerInternals.getTimerDataKey(nextTimer), 
nextTimer.getNamespace());
-        if (updatedTimer == null || updatedTimer.equals(nextTimer)) {
-          // User timers must be explicitly deleted when delivered, to release 
the implied hold.
-          // This will also add the deletion to the next call to
-          // WindmillTimerInternals.extractJustModifiedTimers,
-          // which will prevent the timer from firing if an old value for the 
timer was in the input
-          // bundle.
-          userTimerInternals.deleteTimer(nextTimer);
-          return nextTimer;
-        }
+      if (!cachedFiredUserTimers.hasNext()) {
+        return null;
       }
-      return null;
+      TimerData nextTimer = cachedFiredUserTimers.next();
+      // User timers must be explicitly deleted when delivered, to release the 
implied hold
+      userTimerInternals.deleteTimer(nextTimer);
+      return nextTimer;
     }
 
     @Override
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index 015fa6d..3308f9f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -59,8 +59,6 @@ class WindmillTimerInternals implements TimerInternals {
   // across namespaces.
   private Table<String, StateNamespace, TimerData> timers = 
HashBasedTable.create();
 
-  private Table<String, StateNamespace, TimerData> recentlyModifiedTimers = 
HashBasedTable.create();
-
   // Map from timer id to whether it is to be deleted or set
   private Table<String, StateNamespace, Boolean> timerStillPresent = 
HashBasedTable.create();
 
@@ -96,22 +94,12 @@ class WindmillTimerInternals implements TimerInternals {
         synchronizedProcessingTime);
   }
 
-  public Table<String, StateNamespace, TimerData> extractJustModifiedTimers() {
-    Table<String, StateNamespace, TimerData> justModified = 
recentlyModifiedTimers;
-    recentlyModifiedTimers = HashBasedTable.create();
-    return justModified;
-  }
-
   @Override
   public void setTimer(TimerData timerKey) {
     timers.put(
         getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
         timerKey.getNamespace(),
         timerKey);
-    recentlyModifiedTimers.put(
-        getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
-        timerKey.getNamespace(),
-        timerKey);
     timerStillPresent.put(
         getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
         timerKey.getNamespace(),
@@ -130,18 +118,10 @@ class WindmillTimerInternals implements TimerInternals {
         getTimerDataKey(timerId, timerFamilyId),
         namespace,
         TimerData.of(timerId, timerFamilyId, namespace, timestamp, 
outputTimestamp, timeDomain));
-    recentlyModifiedTimers.put(
-        getTimerDataKey(timerId, timerFamilyId),
-        namespace,
-        TimerData.of(timerId, timerFamilyId, namespace, timestamp, 
outputTimestamp, timeDomain));
     timerStillPresent.put(getTimerDataKey(timerId, timerFamilyId), namespace, 
true);
   }
 
-  public static String getTimerDataKey(TimerData timerData) {
-    return getTimerDataKey(timerData.getTimerId(), 
timerData.getTimerFamilyId());
-  }
-
-  private static String getTimerDataKey(String timerId, String timerFamilyId) {
+  private String getTimerDataKey(String timerId, String timerFamilyId) {
     // Identifies timer uniquely with timerFamilyId
     return timerId + '+' + timerFamilyId;
   }
@@ -152,11 +132,6 @@ class WindmillTimerInternals implements TimerInternals {
         getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
         timerKey.getNamespace(),
         timerKey);
-    recentlyModifiedTimers.put(
-        getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
-        timerKey.getNamespace(),
-        timerKey.deleted());
-
     timerStillPresent.put(
         getTimerDataKey(timerKey.getTimerId(), timerKey.getTimerFamilyId()),
         timerKey.getNamespace(),
@@ -169,16 +144,8 @@ class WindmillTimerInternals implements TimerInternals {
   }
 
   @Override
-  public void deleteTimer(
-      StateNamespace namespace, String timerId, String timerFamilyId, 
TimeDomain timeDomain) {
-    deleteTimer(
-        TimerData.of(
-            timerId,
-            timerFamilyId,
-            namespace,
-            BoundedWindow.TIMESTAMP_MIN_VALUE,
-            BoundedWindow.TIMESTAMP_MAX_VALUE,
-            timeDomain));
+  public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain 
timeDomain) {
+    throw new UnsupportedOperationException("Deletion of timers by ID is not 
supported.");
   }
 
   @Override
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
index 9a8d852..2ac97d3 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/KeyedInternals.java
@@ -138,9 +138,8 @@ class KeyedInternals<K> {
     }
 
     @Override
-    public void deleteTimer(
-        StateNamespace namespace, String timerId, String timerFamilyId, 
TimeDomain timeDomain) {
-      getInternals().deleteTimer(namespace, timerId, timerFamilyId, 
timeDomain);
+    public void deleteTimer(StateNamespace namespace, String timerId, 
TimeDomain timeDomain) {
+      getInternals().deleteTimer(namespace, timerId, timeDomain);
     }
 
     @Override
diff --git 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
index 4b34a25..d692851 100644
--- 
a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
+++ 
b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaTimerInternalsFactory.java
@@ -318,8 +318,7 @@ public class SamzaTimerInternalsFactory<K> implements 
TimerInternalsFactory<K> {
     }
 
     @Override
-    public void deleteTimer(
-        StateNamespace namespace, String timerId, String timerFamilyId, 
TimeDomain timeDomain) {
+    public void deleteTimer(StateNamespace namespace, String timerId, 
TimeDomain timeDomain) {
       Instant now = Instant.now();
       deleteTimer(TimerData.of(timerId, namespace, now, now, timeDomain));
     }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
index d22fe8e..b726c23 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java
@@ -120,7 +120,7 @@ public class SparkTimerInternals implements TimerInternals {
   }
 
   @Override
-  public void deleteTimer(StateNamespace namespace, String timerId, String 
timerFamilyId, TimeDomain timeDomain) {
+  public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain 
timeDomain) {
     throw new UnsupportedOperationException("Deleting a timer by ID is not yet 
supported.");
   }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
index 5f1c047..437df4d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
@@ -81,9 +81,6 @@ public interface Timer {
    */
   void setRelative();
 
-  /** Clears a timer. Previous set timers will become unset. */
-  void clear();
-
   /** Offsets the target timestamp used by {@link #setRelative()} by the given 
duration. */
   Timer offset(Duration offset);
 
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index e0ed7ad..e022b14 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -50,6 +50,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -58,7 +59,9 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntFunction;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.stream.LongStream;
 import java.util.stream.StreamSupport;
 import org.apache.beam.sdk.coders.AtomicCoder;
@@ -137,6 +140,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -3451,7 +3455,6 @@ public class ParDoTest implements Serializable {
   /** Tests to validate ParDo timers. */
   @RunWith(JUnit4.class)
   public static class TimerTests extends SharedTestBase implements 
Serializable {
-
     @Test
     public void testTimerNotKeyed() {
       final String timerId = "foo";
@@ -4277,217 +4280,206 @@ public class ParDoTest implements Serializable {
     }
 
     /** A test makes sure that an event time timers are correctly ordered. */
-    //    @Test
-    //    @Category({
-    //      ValidatesRunner.class,
-    //      UsesTimersInParDo.class,
-    //      UsesTestStream.class,
-    //      UsesStatefulParDo.class,
-    //      UsesStrictTimerOrdering.class
-    //    })
-    //    public void testEventTimeTimerOrdering() throws Exception {
-    //      final int numTestElements = 100;
-    //      final Instant now = new Instant(1500000000000L);
-    //      TestStream.Builder<KV<String, String>> builder =
-    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
-    //              .advanceWatermarkTo(new Instant(0));
-    //
-    //      for (int i = 0; i < numTestElements; i++) {
-    //        builder =
-    //            builder.addElements(TimestampedValue.of(KV.of("dummy", "" + 
i), now.plus(i *
-    // 1000)));
-    //        if ((i + 1) % 10 == 0) {
-    //          builder = builder.advanceWatermarkTo(now.plus((i + 1) * 1000));
-    //        }
-    //      }
-    //
-    //      testEventTimeTimerOrderingWithInputPTransform(
-    //          now, numTestElements, builder.advanceWatermarkToInfinity());
-    //    }
-    //
-    //    /** A test makes sure that an event time timers are correctly 
ordered using Create
-    // transform. */
-    //    @Test
-    //    @Category({
-    //      ValidatesRunner.class,
-    //      UsesTimersInParDo.class,
-    //      UsesStatefulParDo.class,
-    //      UsesStrictTimerOrdering.class
-    //    })
-    //    public void testEventTimeTimerOrderingWithCreate() throws Exception {
-    //      final int numTestElements = 100;
-    //      final Instant now = new Instant(1500000000000L);
-    //
-    //      List<TimestampedValue<KV<String, String>>> elements = new 
ArrayList<>();
-    //      for (int i = 0; i < numTestElements; i++) {
-    //        elements.add(TimestampedValue.of(KV.of("dummy", "" + i), 
now.plus(i * 1000)));
-    //      }
-    //
-    //      testEventTimeTimerOrderingWithInputPTransform(
-    //          now, numTestElements, Create.timestamped(elements));
-    //    }
-    //
-    //    private void testEventTimeTimerOrderingWithInputPTransform(
-    //        Instant now,
-    //        int numTestElements,
-    //        PTransform<PBegin, PCollection<KV<String, String>>> transform)
-    //        throws Exception {
-    //
-    //      final String timerIdBagAppend = "append";
-    //      final String timerIdGc = "gc";
-    //      final String bag = "bag";
-    //      final String minTimestamp = "minTs";
-    //      final Instant gcTimerStamp = now.plus((numTestElements + 1) * 
1000);
-    //
-    //      DoFn<KV<String, String>, String> fn =
-    //          new DoFn<KV<String, String>, String>() {
-    //
-    //            @TimerId(timerIdBagAppend)
-    //            private final TimerSpec appendSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
-    //
-    //            @TimerId(timerIdGc)
-    //            private final TimerSpec gcSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
-    //
-    //            @StateId(bag)
-    //            private final StateSpec<BagState<TimestampedValue<String>>> 
bagStateSpec =
-    //                StateSpecs.bag();
-    //
-    //            @StateId(minTimestamp)
-    //            private final StateSpec<ValueState<Instant>> 
minTimestampSpec =
-    // StateSpecs.value();
-    //
-    //            @ProcessElement
-    //            public void processElement(
-    //                ProcessContext context,
-    //                @TimerId(timerIdBagAppend) Timer bagTimer,
-    //                @TimerId(timerIdGc) Timer gcTimer,
-    //                @StateId(bag) BagState<TimestampedValue<String>> 
bagState,
-    //                @StateId(minTimestamp) ValueState<Instant> 
minStampState) {
-    //
-    //              Instant currentMinStamp =
-    //                  MoreObjects.firstNonNull(minStampState.read(),
-    // BoundedWindow.TIMESTAMP_MAX_VALUE);
-    //              if 
(currentMinStamp.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
-    //                gcTimer.set(gcTimerStamp);
-    //              }
-    //              if (currentMinStamp.isAfter(context.timestamp())) {
-    //                minStampState.write(context.timestamp());
-    //                bagTimer.set(context.timestamp());
-    //              }
-    //              
bagState.add(TimestampedValue.of(context.element().getValue(),
-    // context.timestamp()));
-    //            }
-    //
-    //            @OnTimer(timerIdBagAppend)
-    //            public void onTimer(
-    //                OnTimerContext context,
-    //                @TimerId(timerIdBagAppend) Timer timer,
-    //                @StateId(bag) BagState<TimestampedValue<String>> 
bagState) {
-    //
-    //              List<TimestampedValue<String>> flush = new ArrayList<>();
-    //              Instant flushTime = context.timestamp();
-    //              for (TimestampedValue<String> val : bagState.read()) {
-    //                if (!val.getTimestamp().isAfter(flushTime)) {
-    //                  flush.add(val);
-    //                }
-    //              }
-    //              
flush.sort(Comparator.comparing(TimestampedValue::getTimestamp));
-    //              context.output(
-    //
-    // 
Joiner.on(":").join(flush.stream().map(TimestampedValue::getValue).iterator()));
-    //              Instant newMinStamp = flushTime.plus(1000);
-    //              if (flush.size() < numTestElements) {
-    //                timer.set(newMinStamp);
-    //              }
-    //            }
-    //
-    //            @OnTimer(timerIdGc)
-    //            public void onTimer(
-    //                OnTimerContext context, @StateId(bag) 
BagState<TimestampedValue<String>>
-    // bagState) {
-    //
-    //              String output =
-    //                  Joiner.on(":")
-    //                          .join(
-    //                              
StreamSupport.stream(bagState.read().spliterator(), false)
-    //
-    // .sorted(Comparator.comparing(TimestampedValue::getTimestamp))
-    //                                  .map(TimestampedValue::getValue)
-    //                                  .iterator())
-    //                      + ":cleanup";
-    //              context.output(output);
-    //              bagState.clear();
-    //            }
-    //          };
-    //
-    //      PCollection<String> output = 
pipeline.apply(transform).apply(ParDo.of(fn));
-    //      List<String> expected =
-    //          IntStream.rangeClosed(0, numTestElements)
-    //              .mapToObj(expandFn(numTestElements))
-    //              .collect(Collectors.toList());
-    //      PAssert.that(output).containsInAnyOrder(expected);
-    //      pipeline.run();
-    //    }
-    //
-    //    private IntFunction<String> expandFn(int numTestElements) {
-    //      return i ->
-    //          Joiner.on(":")
-    //                  .join(
-    //                      IntStream.rangeClosed(0, Math.min(numTestElements 
- 1, i))
-    //                          .mapToObj(String::valueOf)
-    //                          .iterator())
-    //              + (i == numTestElements ? ":cleanup" : "");
-    //    }
-
-    //    @Test
-    //    @Category({ValidatesRunner.class, UsesTimersInParDo.class})
-    //    public void testPipelineOptionsParameterOnTimer() {
-    //      final String timerId = "thisTimer";
-    //
-    //      PCollection<String> results =
-    //          pipeline
-    //              .apply(Create.of(KV.of(0L, 0L)))
-    //              .apply(
-    //                  ParDo.of(
-    //                      new DoFn<KV<Long, Long>, String>() {
-    //                        @TimerId(timerId)
-    //                        private final TimerSpec spec =
-    // TimerSpecs.timer(TimeDomain.EVENT_TIME);
-    //
-    //                        @ProcessElement
-    //                        public void process(
-    //                            ProcessContext c, BoundedWindow w, 
@TimerId(timerId) Timer timer)
-    // {
-    //                          timer.set(w.maxTimestamp());
-    //                        }
-    //
-    //                        @OnTimer(timerId)
-    //                        public void onTimer(OutputReceiver<String> r, 
PipelineOptions options)
-    // {
-    //                          
r.output(options.as(MyOptions.class).getFakeOption());
-    //                        }
-    //                      }));
-    //
-    //      String testOptionValue = "not fake anymore";
-    //      
pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue);
-    //      PAssert.that(results).containsInAnyOrder("not fake anymore");
-    //
-    //      pipeline.run();
-    //    }
-    //
-    //    @Test
-    //    @Category({ValidatesRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
-    //    public void duplicateTimerSetting() {
-    //      TestStream<KV<String, String>> stream =
-    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
-    //              .addElements(KV.of("key1", "v1"))
-    //              .advanceWatermarkToInfinity();
-    //
-    //      PCollection<String> result = 
pipeline.apply(stream).apply(ParDo.of(new TwoTimerDoFn()));
-    //      PAssert.that(result).containsInAnyOrder("It works");
-    //
-    //      pipeline.run().waitUntilFinish();
-    //    }
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      UsesTestStream.class,
+      UsesStatefulParDo.class,
+      UsesStrictTimerOrdering.class
+    })
+    public void testEventTimeTimerOrdering() throws Exception {
+      final int numTestElements = 100;
+      final Instant now = new Instant(1500000000000L);
+      TestStream.Builder<KV<String, String>> builder =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+              .advanceWatermarkTo(new Instant(0));
+
+      for (int i = 0; i < numTestElements; i++) {
+        builder =
+            builder.addElements(TimestampedValue.of(KV.of("dummy", "" + i), 
now.plus(i * 1000)));
+        if ((i + 1) % 10 == 0) {
+          builder = builder.advanceWatermarkTo(now.plus((i + 1) * 1000));
+        }
+      }
+
+      testEventTimeTimerOrderingWithInputPTransform(
+          now, numTestElements, builder.advanceWatermarkToInfinity());
+    }
+
+    /** A test makes sure that an event time timers are correctly ordered 
using Create transform. */
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      UsesStatefulParDo.class,
+      UsesStrictTimerOrdering.class
+    })
+    public void testEventTimeTimerOrderingWithCreate() throws Exception {
+      final int numTestElements = 100;
+      final Instant now = new Instant(1500000000000L);
+
+      List<TimestampedValue<KV<String, String>>> elements = new ArrayList<>();
+      for (int i = 0; i < numTestElements; i++) {
+        elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i * 
1000)));
+      }
+
+      testEventTimeTimerOrderingWithInputPTransform(
+          now, numTestElements, Create.timestamped(elements));
+    }
+
+    private void testEventTimeTimerOrderingWithInputPTransform(
+        Instant now,
+        int numTestElements,
+        PTransform<PBegin, PCollection<KV<String, String>>> transform)
+        throws Exception {
+
+      final String timerIdBagAppend = "append";
+      final String timerIdGc = "gc";
+      final String bag = "bag";
+      final String minTimestamp = "minTs";
+      final Instant gcTimerStamp = now.plus((numTestElements + 1) * 1000);
+
+      DoFn<KV<String, String>, String> fn =
+          new DoFn<KV<String, String>, String>() {
+
+            @TimerId(timerIdBagAppend)
+            private final TimerSpec appendSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @TimerId(timerIdGc)
+            private final TimerSpec gcSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @StateId(bag)
+            private final StateSpec<BagState<TimestampedValue<String>>> 
bagStateSpec =
+                StateSpecs.bag();
+
+            @StateId(minTimestamp)
+            private final StateSpec<ValueState<Instant>> minTimestampSpec = 
StateSpecs.value();
+
+            @ProcessElement
+            public void processElement(
+                ProcessContext context,
+                @TimerId(timerIdBagAppend) Timer bagTimer,
+                @TimerId(timerIdGc) Timer gcTimer,
+                @StateId(bag) BagState<TimestampedValue<String>> bagState,
+                @StateId(minTimestamp) ValueState<Instant> minStampState) {
+
+              Instant currentMinStamp =
+                  MoreObjects.firstNonNull(minStampState.read(), 
BoundedWindow.TIMESTAMP_MAX_VALUE);
+              if (currentMinStamp.equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+                gcTimer.set(gcTimerStamp);
+              }
+              if (currentMinStamp.isAfter(context.timestamp())) {
+                minStampState.write(context.timestamp());
+                bagTimer.set(context.timestamp());
+              }
+              bagState.add(TimestampedValue.of(context.element().getValue(), 
context.timestamp()));
+            }
+
+            @OnTimer(timerIdBagAppend)
+            public void onTimer(
+                OnTimerContext context,
+                @TimerId(timerIdBagAppend) Timer timer,
+                @StateId(bag) BagState<TimestampedValue<String>> bagState) {
+
+              List<TimestampedValue<String>> flush = new ArrayList<>();
+              Instant flushTime = context.timestamp();
+              for (TimestampedValue<String> val : bagState.read()) {
+                if (!val.getTimestamp().isAfter(flushTime)) {
+                  flush.add(val);
+                }
+              }
+              flush.sort(Comparator.comparing(TimestampedValue::getTimestamp));
+              context.output(
+                  
Joiner.on(":").join(flush.stream().map(TimestampedValue::getValue).iterator()));
+              Instant newMinStamp = flushTime.plus(1000);
+              if (flush.size() < numTestElements) {
+                timer.set(newMinStamp);
+              }
+            }
+
+            @OnTimer(timerIdGc)
+            public void onTimer(
+                OnTimerContext context, @StateId(bag) 
BagState<TimestampedValue<String>> bagState) {
+
+              String output =
+                  Joiner.on(":")
+                          .join(
+                              
StreamSupport.stream(bagState.read().spliterator(), false)
+                                  
.sorted(Comparator.comparing(TimestampedValue::getTimestamp))
+                                  .map(TimestampedValue::getValue)
+                                  .iterator())
+                      + ":cleanup";
+              context.output(output);
+              bagState.clear();
+            }
+          };
+
+      PCollection<String> output = 
pipeline.apply(transform).apply(ParDo.of(fn));
+      List<String> expected =
+          IntStream.rangeClosed(0, numTestElements)
+              .mapToObj(expandFn(numTestElements))
+              .collect(Collectors.toList());
+      PAssert.that(output).containsInAnyOrder(expected);
+      pipeline.run();
+    }
+
+    private IntFunction<String> expandFn(int numTestElements) {
+      return i ->
+          Joiner.on(":")
+                  .join(
+                      IntStream.rangeClosed(0, Math.min(numTestElements - 1, 
i))
+                          .mapToObj(String::valueOf)
+                          .iterator())
+              + (i == numTestElements ? ":cleanup" : "");
+    }
+
+    @Test
+    @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+    public void testPipelineOptionsParameterOnTimer() {
+      final String timerId = "thisTimer";
+
+      PCollection<String> results =
+          pipeline
+              .apply(Create.of(KV.of(0L, 0L)))
+              .apply(
+                  ParDo.of(
+                      new DoFn<KV<Long, Long>, String>() {
+                        @TimerId(timerId)
+                        private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+                        @ProcessElement
+                        public void process(
+                            ProcessContext c, BoundedWindow w, 
@TimerId(timerId) Timer timer) {
+                          timer.set(w.maxTimestamp());
+                        }
+
+                        @OnTimer(timerId)
+                        public void onTimer(OutputReceiver<String> r, 
PipelineOptions options) {
+                          
r.output(options.as(MyOptions.class).getFakeOption());
+                        }
+                      }));
+
+      String testOptionValue = "not fake anymore";
+      pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue);
+      PAssert.that(results).containsInAnyOrder("not fake anymore");
+
+      pipeline.run();
+    }
+
+    @Test
+    @Category({ValidatesRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
+    public void duplicateTimerSetting() {
+      TestStream<KV<String, String>> stream =
+          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
StringUtf8Coder.of()))
+              .addElements(KV.of("key1", "v1"))
+              .advanceWatermarkToInfinity();
+
+      PCollection<String> result = pipeline.apply(stream).apply(ParDo.of(new 
TwoTimerDoFn()));
+      PAssert.that(result).containsInAnyOrder("It works");
+
+      pipeline.run().waitUntilFinish();
+    }
 
     @Test
     @Category({
@@ -4496,48 +4488,23 @@ public class ParDoTest implements Serializable {
       UsesTestStream.class,
       UsesStrictTimerOrdering.class
     })
-    public void testTwoTimersSettingEachOtherBounded() {
-      testTwoTimersSettingEachOther(IsBounded.BOUNDED);
-    }
-
-    //    @Test
-    //    @Category({
-    //            ValidatesRunner.class,
-    //            UsesTimersInParDo.class,
-    //            UsesTestStream.class,
-    //            UsesStrictTimerOrdering.class
-    //    })
-    //    public void testTwoTimersSettingEachOtherUnbounded() {
-    //      testTwoTimersSettingEachOther(IsBounded.UNBOUNDED);
-    //    }
-
-    private void testTwoTimersSettingEachOther(IsBounded isBounded) {
+    public void testTwoTimersSettingEachOther() {
       Instant now = new Instant(1500000000000L);
       Instant end = now.plus(100);
       TestStream<KV<Void, Void>> input =
           TestStream.create(KvCoder.of(VoidCoder.of(), VoidCoder.of()))
               .addElements(KV.of(null, null))
               .advanceWatermarkToInfinity();
-      pipeline.apply(TwoTimerTest.of(now, end, input, isBounded));
+      pipeline.apply(TwoTimerTest.of(now, end, input));
       pipeline.run();
     }
 
-    //    @Test
-    //    @Category({ValidatesRunner.class, UsesTimersInParDo.class, 
UsesStrictTimerOrdering.class})
-    //    public void testTwoTimersSettingEachOtherWithCreateAsInputBounded() {
-    //      testTwoTimersSettingEachOtherWithCreateAsInput(IsBounded.BOUNDED);;
-    //    }
-
     @Test
     @Category({ValidatesRunner.class, UsesTimersInParDo.class, 
UsesStrictTimerOrdering.class})
-    public void testTwoTimersSettingEachOtherWithCreateAsInputUnbounded() {
-      testTwoTimersSettingEachOtherWithCreateAsInput(IsBounded.UNBOUNDED);
-    }
-
-    private void testTwoTimersSettingEachOtherWithCreateAsInput(IsBounded 
isBounded) {
+    public void testTwoTimersSettingEachOtherWithCreateAsInput() {
       Instant now = new Instant(1500000000000L);
       Instant end = now.plus(100);
-      pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), 
isBounded));
+      pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null))));
       pipeline.run();
     }
 
@@ -4787,27 +4754,19 @@ public class ParDoTest implements Serializable {
     private static class TwoTimerTest extends PTransform<PBegin, PDone> {
 
       private static PTransform<PBegin, PDone> of(
-          Instant start,
-          Instant end,
-          PTransform<PBegin, PCollection<KV<Void, Void>>> input,
-          IsBounded isBounded) {
-        return new TwoTimerTest(start, end, input, isBounded);
+          Instant start, Instant end, PTransform<PBegin, PCollection<KV<Void, 
Void>>> input) {
+        return new TwoTimerTest(start, end, input);
       }
 
       private final Instant start;
       private final Instant end;
       private final transient PTransform<PBegin, PCollection<KV<Void, Void>>> 
inputPTransform;
-      private IsBounded isBounded;
 
       public TwoTimerTest(
-          Instant start,
-          Instant end,
-          PTransform<PBegin, PCollection<KV<Void, Void>>> input,
-          IsBounded isBounded) {
+          Instant start, Instant end, PTransform<PBegin, PCollection<KV<Void, 
Void>>> input) {
         this.start = start;
         this.end = end;
         this.inputPTransform = input;
-        this.isBounded = isBounded;
       }
 
       @Override
@@ -4819,7 +4778,6 @@ public class ParDoTest implements Serializable {
         PCollection<String> result =
             input
                 .apply(inputPTransform)
-                .setIsBoundedInternal(isBounded)
                 .apply(
                     ParDo.of(
                         new DoFn<KV<Void, Void>, String>() {
@@ -4839,6 +4797,7 @@ public class ParDoTest implements Serializable {
                               @TimerId(timerName1) Timer t1,
                               @TimerId(timerName2) Timer t2,
                               @StateId(countStateName) ValueState<Integer> 
state) {
+
                             state.write(0);
                             t1.set(start);
                             // set the t2 timer after end, so that we test that
@@ -4892,348 +4851,6 @@ public class ParDoTest implements Serializable {
         return PDone.in(input.getPipeline());
       }
     }
-
-    //    @Test
-    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
-    //    public void testSetAndClearProcessingTimeTimer() {
-    //
-    //      final String timerId = "processing-timer";
-    //
-    //      DoFn<KV<String, Integer>, Integer> fn =
-    //          new DoFn<KV<String, Integer>, Integer>() {
-    //
-    //            @TimerId(timerId)
-    //            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
-    //
-    //            @ProcessElement
-    //            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer>
-    // r) {
-    //              timer.offset(Duration.standardSeconds(1)).setRelative();
-    //              timer.clear();
-    //              r.output(3);
-    //            }
-    //
-    //            @OnTimer(timerId)
-    //            public void onTimer(TimeDomain timeDomain, 
OutputReceiver<Integer> r) {
-    //              r.output(42);
-    //            }
-    //          };
-    //
-    //      TestStream<KV<String, Integer>> stream =
-    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()))
-    //              .addElements(KV.of("hello", 37))
-    //              .advanceProcessingTime(
-    //                  Duration.millis(
-    //                          DateTimeUtils.currentTimeMillis() / 1000 * 
1000) // round to seconds
-    //                      .plus(Duration.standardMinutes(2)))
-    //              .advanceWatermarkToInfinity();
-    //
-    //      PCollection<Integer> output = 
pipeline.apply(stream).apply(ParDo.of(fn));
-    //      PAssert.that(output).containsInAnyOrder(3);
-    //      pipeline.run();
-    //    }
-    //
-    //    @Test
-    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
-    //    public void testSetAndClearEventTimeTimer() {
-    //      final String timerId = "event-timer";
-    //
-    //      DoFn<KV<String, Integer>, Integer> fn =
-    //          new DoFn<KV<String, Integer>, Integer>() {
-    //
-    //            @TimerId(timerId)
-    //            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
-    //
-    //            @ProcessElement
-    //            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer>
-    // r) {
-    //              timer.offset(Duration.standardSeconds(1)).setRelative();
-    //              timer.clear();
-    //              r.output(3);
-    //            }
-    //
-    //            @OnTimer(timerId)
-    //            public void onTimer(OutputReceiver<Integer> r) {
-    //              r.output(42);
-    //            }
-    //          };
-    //
-    //      TestStream<KV<String, Integer>> stream =
-    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()))
-    //              .advanceWatermarkTo(new Instant(0))
-    //              .addElements(KV.of("hello", 37))
-    //              .advanceWatermarkTo(new 
Instant(0).plus(Duration.standardSeconds(1)))
-    //              .advanceWatermarkToInfinity();
-    //
-    //      PCollection<Integer> output = 
pipeline.apply(stream).apply(ParDo.of(fn));
-    //      PAssert.that(output).containsInAnyOrder(3);
-    //      pipeline.run();
-    //    }
-    //
-    //    @Test
-    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
-    //    public void testClearUnsetProcessingTimeTimer() {
-    //      final String timerId = "processing-timer";
-    //
-    //      DoFn<KV<String, Integer>, Integer> fn =
-    //          new DoFn<KV<String, Integer>, Integer>() {
-    //
-    //            @TimerId(timerId)
-    //            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
-    //
-    //            @ProcessElement
-    //            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer>
-    // r) {
-    //              timer.clear();
-    //              r.output(3);
-    //            }
-    //
-    //            @OnTimer(timerId)
-    //            public void onTimer(TimeDomain timeDomain, 
OutputReceiver<Integer> r) {
-    //              r.output(42);
-    //            }
-    //          };
-    //
-    //      TestStream<KV<String, Integer>> stream =
-    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()))
-    //              .addElements(KV.of("hello", 37))
-    //              .advanceProcessingTime(
-    //                  Duration.millis(
-    //                          DateTimeUtils.currentTimeMillis() / 1000 * 
1000) // round to seconds
-    //                      .plus(Duration.standardMinutes(4)))
-    //              .advanceWatermarkToInfinity();
-    //
-    //      PCollection<Integer> output = 
pipeline.apply(stream).apply(ParDo.of(fn));
-    //      PAssert.that(output).containsInAnyOrder(3);
-    //      pipeline.run();
-    //    }
-    //
-    //    @Test
-    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
-    //    public void testClearUnsetEventTimeTimer() {
-    //      final String timerId = "event-timer";
-    //
-    //      DoFn<KV<String, Integer>, Integer> fn =
-    //          new DoFn<KV<String, Integer>, Integer>() {
-    //
-    //            @TimerId(timerId)
-    //            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
-    //
-    //            @ProcessElement
-    //            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer>
-    // r) {
-    //              timer.clear();
-    //              r.output(3);
-    //            }
-    //
-    //            @OnTimer(timerId)
-    //            public void onTimer(OutputReceiver<Integer> r) {
-    //              r.output(42);
-    //            }
-    //          };
-    //
-    //      TestStream<KV<String, Integer>> stream =
-    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()))
-    //              .advanceWatermarkTo(new Instant(0))
-    //              .addElements(KV.of("hello", 37))
-    //              .advanceWatermarkTo(new 
Instant(0).plus(Duration.standardSeconds(1)))
-    //              .advanceWatermarkToInfinity();
-    //
-    //      PCollection<Integer> output = 
pipeline.apply(stream).apply(ParDo.of(fn));
-    //      PAssert.that(output).containsInAnyOrder(3);
-    //      pipeline.run();
-    //    }
-    //
-    //    @Test
-    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
-    //    public void testClearProcessingTimeTimer() {
-    //      final String timerId = "processing-timer";
-    //      final String clearTimerId = "clear-timer";
-    //
-    //      DoFn<KV<String, Integer>, Integer> fn =
-    //          new DoFn<KV<String, Integer>, Integer>() {
-    //
-    //            @TimerId(timerId)
-    //            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
-    //
-    //            @TimerId(clearTimerId)
-    //            private final TimerSpec clearTimerSpec =
-    // TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
-    //
-    //            @ProcessElement
-    //            public void processElement(
-    //                @TimerId(timerId) Timer timer,
-    //                @TimerId(clearTimerId) Timer clearTimer,
-    //                OutputReceiver<Integer> r) {
-    //              timer.offset(Duration.standardSeconds(1)).setRelative();
-    //              
clearTimer.offset(Duration.standardSeconds(2)).setRelative();
-    //
-    //              r.output(3);
-    //            }
-    //
-    //            @OnTimer(timerId)
-    //            public void onTimer(
-    //                OutputReceiver<Integer> r, @TimerId(clearTimerId) Timer 
clearTimer) {
-    //              System.err.println("onTimer");
-    //              r.output(42);
-    //              clearTimer.clear();
-    //            }
-    //
-    //            // This should never fire since we clear the timer in the 
earlier timer.
-    //            @OnTimer(clearTimerId)
-    //            public void clearTimer(OutputReceiver<Integer> r) {
-    //              System.err.println("clearTimer");
-    //              r.output(43);
-    //            }
-    //          };
-    //
-    //      TestStream<KV<String, Integer>> stream =
-    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()))
-    //              .addElements(KV.of("hello", 37))
-    //              .advanceProcessingTime(
-    //                  Duration.millis(
-    //                          DateTimeUtils.currentTimeMillis() / 1000 * 
1000) // round to seconds
-    //                      .plus(Duration.standardMinutes(2)))
-    //              .advanceProcessingTime(
-    //                  Duration.millis(
-    //                          DateTimeUtils.currentTimeMillis() / 1000 * 
1000) // round to seconds
-    //                      .plus(Duration.standardMinutes(4)))
-    //              .advanceWatermarkToInfinity();
-    //
-    //      PCollection<Integer> output = 
pipeline.apply(stream).apply(ParDo.of(fn));
-    //      PAssert.that(output).containsInAnyOrder(3, 42);
-    //      pipeline.run();
-    //    }
-
-    //    @Test
-    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
-    //    public void testClearEventTimeTimer() {
-    //      final String timerId = "event-timer";
-    //      final String clearTimerId = "clear-timer";
-    //
-    //      DoFn<KV<String, Integer>, Integer> fn =
-    //          new DoFn<KV<String, Integer>, Integer>() {
-    //
-    //            @TimerId(timerId)
-    //            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
-    //
-    //            @TimerId(clearTimerId)
-    //            private final TimerSpec clearSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
-    //
-    //            @ProcessElement
-    //            public void processElement(
-    //                @TimerId(timerId) Timer timer,
-    //                @TimerId(clearTimerId) Timer clearTimer,
-    //                OutputReceiver<Integer> r) {
-    //              timer.offset(Duration.standardSeconds(1)).setRelative();
-    //              
clearTimer.offset(Duration.standardSeconds(2)).setRelative();
-    //
-    //              r.output(3);
-    //            }
-    //
-    //            @OnTimer(timerId)
-    //            public void onTimer(
-    //                OutputReceiver<Integer> r, @TimerId(clearTimerId) Timer 
clearTimer) {
-    //              r.output(42);
-    //              clearTimer.clear();
-    //            }
-    //
-    //            // This should never fire since we clear the timer in the 
earlier timer.
-    //            @OnTimer(clearTimerId)
-    //            public void clearTimer(OutputReceiver<Integer> r) {
-    //              r.output(43);
-    //            }
-    //          };
-    //
-    //      TestStream<KV<String, Integer>> stream =
-    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()))
-    //              .advanceWatermarkTo(new Instant(0))
-    //              .addElements(KV.of("hello", 37))
-    //              .advanceWatermarkTo(new 
Instant(0).plus(Duration.standardSeconds(1)))
-    //              .advanceWatermarkToInfinity();
-    //
-    //      PCollection<Integer> output = 
pipeline.apply(stream).apply(ParDo.of(fn));
-    //      PAssert.that(output).containsInAnyOrder(3, 42);
-    //      pipeline.run();
-    //    }
-    //
-    //    @Test
-    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
-    //    public void testSetProcessingTimerAfterClear() {
-    //      final String timerId = "processing-timer";
-    //
-    //      DoFn<KV<String, Integer>, Integer> fn =
-    //          new DoFn<KV<String, Integer>, Integer>() {
-    //
-    //            @TimerId(timerId)
-    //            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
-    //
-    //            @ProcessElement
-    //            public void processElement(
-    //                @Element KV<String, Integer> e,
-    //                @TimerId(timerId) Timer timer,
-    //                OutputReceiver<Integer> r) {
-    //              timer.clear();
-    //              timer.offset(Duration.standardSeconds(1)).setRelative();
-    //              r.output(3);
-    //            }
-    //
-    //            @OnTimer(timerId)
-    //            public void onTimer(TimeDomain timeDomain, 
OutputReceiver<Integer> r) {
-    //              r.output(42);
-    //            }
-    //          };
-    //
-    //      TestStream<KV<String, Integer>> stream =
-    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()))
-    //              .addElements(KV.of("hello", 37), KV.of("hello", 38))
-    //              .advanceProcessingTime(
-    //                  Duration.millis(
-    //                          DateTimeUtils.currentTimeMillis() / 1000 * 
1000) // round to seconds
-    //                      .plus(Duration.standardMinutes(2)))
-    //              .advanceWatermarkToInfinity();
-    //
-    //      PCollection<Integer> output = 
pipeline.apply(stream).apply(ParDo.of(fn));
-    //      PAssert.that(output).containsInAnyOrder(3, 3, 42);
-    //      pipeline.run();
-    //    }
-    //
-    //    @Test
-    //    @Category({NeedsRunner.class, UsesTimersInParDo.class, 
UsesTestStream.class})
-    //    public void testSetEventTimerAfterClear() {
-    //      final String timerId = "event-timer";
-    //
-    //      DoFn<KV<String, Integer>, Integer> fn =
-    //          new DoFn<KV<String, Integer>, Integer>() {
-    //
-    //            @TimerId(timerId)
-    //            private final TimerSpec spec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
-    //
-    //            @ProcessElement
-    //            public void processElement(@TimerId(timerId) Timer timer, 
OutputReceiver<Integer>
-    // r) {
-    //              timer.clear();
-    //              timer.offset(Duration.standardSeconds(1)).setRelative();
-    //              r.output(3);
-    //            }
-    //
-    //            @OnTimer(timerId)
-    //            public void onTimer(OutputReceiver<Integer> r) {
-    //              r.output(42);
-    //            }
-    //          };
-    //
-    //      TestStream<KV<String, Integer>> stream =
-    //          TestStream.create(KvCoder.of(StringUtf8Coder.of(), 
VarIntCoder.of()))
-    //              .advanceWatermarkTo(new Instant(0))
-    //              .addElements(KV.of("hello", 37), KV.of("hello", 38))
-    //              .advanceWatermarkTo(new 
Instant(0).plus(Duration.standardSeconds(1)))
-    //              .advanceWatermarkToInfinity();
-    //
-    //      PCollection<Integer> output = 
pipeline.apply(stream).apply(ParDo.of(fn));
-    //      PAssert.that(output).containsInAnyOrder(3, 3, 42);
-    //      pipeline.run();
-    //    }
   }
 
   /** Tests validating Timer coder inference behaviors. */
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 52485db..4711c4c 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -1781,17 +1781,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
     }
 
     @Override
-    public void clear() {
-      TimerHandler<K> consumer = (TimerHandler) timerHandlers.get(timerId);
-      try {
-        consumer.accept(
-            Timer.cleared(userKey, dynamicTimerTag, 
Collections.singletonList(boundedWindow)));
-      } catch (Throwable t) {
-        throw UserCodeException.wrap(t);
-      }
-    }
-
-    @Override
     public org.apache.beam.sdk.state.Timer offset(Duration offset) {
       this.offset = offset;
       return this;
diff --git 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 0d58619..1f2d839 100644
--- 
a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ 
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -1098,13 +1098,9 @@ public class FnApiDoFnRunnerTest implements Serializable 
{
           fakeTimerClient.getTimers(eventTimer),
           contains(
               timerInGlobalWindow("X", new Instant(1000L), new Instant(1001L)),
-              clearedTimerInGlobalWindow("X"),
               timerInGlobalWindow("Y", new Instant(1100L), new Instant(1101L)),
-              clearedTimerInGlobalWindow("Y"),
               timerInGlobalWindow("X", new Instant(1200L), new Instant(1201L)),
-              clearedTimerInGlobalWindow("X"),
               timerInGlobalWindow("Y", new Instant(1300L), new Instant(1301L)),
-              clearedTimerInGlobalWindow("Y"),
               timerInGlobalWindow("A", new Instant(1400L), new Instant(2411L)),
               timerInGlobalWindow("B", new Instant(1500L), new Instant(2511L)),
               timerInGlobalWindow("A", new Instant(1600L), new Instant(2611L)),
@@ -1117,13 +1113,9 @@ public class FnApiDoFnRunnerTest implements Serializable 
{
           fakeTimerClient.getTimers(processingTimer),
           contains(
               timerInGlobalWindow("X", new Instant(1000L), new 
Instant(10002L)),
-              clearedTimerInGlobalWindow("X"),
               timerInGlobalWindow("Y", new Instant(1100L), new 
Instant(10002L)),
-              clearedTimerInGlobalWindow("Y"),
               timerInGlobalWindow("X", new Instant(1200L), new 
Instant(10002L)),
-              clearedTimerInGlobalWindow("X"),
               timerInGlobalWindow("Y", new Instant(1300L), new 
Instant(10002L)),
-              clearedTimerInGlobalWindow("Y"),
               timerInGlobalWindow("A", new Instant(1400L), new 
Instant(10012L)),
               timerInGlobalWindow("B", new Instant(1500L), new 
Instant(10012L)),
               timerInGlobalWindow("A", new Instant(1600L), new 
Instant(10012L)),
@@ -1224,12 +1216,6 @@ public class FnApiDoFnRunnerTest implements Serializable 
{
       return dynamicTimerInGlobalWindow(userKey, "", holdTimestamp, 
fireTimestamp);
     }
 
-    private <K> org.apache.beam.runners.core.construction.Timer<K> 
clearedTimerInGlobalWindow(
-        K userKey) {
-      return org.apache.beam.runners.core.construction.Timer.cleared(
-          userKey, "", Collections.singletonList(GlobalWindow.INSTANCE));
-    }
-
     private <K> org.apache.beam.runners.core.construction.Timer<K> 
dynamicTimerInGlobalWindow(
         K userKey, String dynamicTimerTag, Instant holdTimestamp, Instant 
fireTimestamp) {
       return org.apache.beam.runners.core.construction.Timer.of(
@@ -1279,10 +1265,8 @@ public class FnApiDoFnRunnerTest implements Serializable 
{
         bagState.add(context.element().getValue());
 
         
eventTimeTimer.withOutputTimestamp(context.timestamp()).set(context.timestamp().plus(1L));
-        eventTimeTimer.clear();
         processingTimeTimer.offset(Duration.millis(2L));
         processingTimeTimer.setRelative();
-        processingTimeTimer.clear();
         eventTimerFamily
             .get("event-timer1")
             .withOutputTimestamp(context.timestamp())

Reply via email to