This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch revert-16748-revert-timer
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f68daa08e24cb58cfcb7b6967598ee073c017949
Author: reuvenlax <[email protected]>
AuthorDate: Wed Feb 23 10:27:22 2022 -0800

    Revert "[BEAM-11971] Revert "Fix timer consistency in direct runner" 
(#16748)"
    
    This reverts commit 98e5fc5d3a3229a3185005d7557f0f5c34ef7ec5.
---
 .../apache/beam/runners/core/TimerInternals.java   |  17 +-
 .../beam/runners/direct/DirectTimerInternals.java  |  72 +++---
 .../direct/ExecutorServiceParallelExecutor.java    |   2 +-
 .../beam/runners/direct/QuiescenceDriver.java      |  78 ++++---
 .../direct/StatefulParDoEvaluatorFactory.java      |  83 ++++---
 .../beam/runners/direct/WatermarkManager.java      | 250 +++++++++------------
 .../apache/beam/runners/local/ExecutionDriver.java |   2 +-
 .../org/apache/beam/sdk/transforms/ParDoTest.java  |  10 +-
 8 files changed, 232 insertions(+), 282 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index 965be82..143254a 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -262,11 +262,22 @@ public interface TimerInternals {
               .compare(this.getDomain(), that.getDomain())
               .compare(this.getTimerId(), that.getTimerId())
               .compare(this.getTimerFamilyId(), that.getTimerFamilyId());
-      if (chain.result() == 0 && 
!this.getNamespace().equals(that.getNamespace())) {
+      int compResult = chain.result();
+      if (compResult == 0 && !this.getNamespace().equals(that.getNamespace())) 
{
         // Obtaining the stringKey may be expensive; only do so if required
-        chain = chain.compare(getNamespace().stringKey(), 
that.getNamespace().stringKey());
+        compResult = 
this.getNamespace().stringKey().compareTo(that.getNamespace().stringKey());
       }
-      return chain.result();
+      return compResult;
+    }
+
+    public String stringKey() {
+      return getNamespace().stringKey()
+          + "/"
+          + getDomain().toString()
+          + "/"
+          + getTimerFamilyId()
+          + ":"
+          + getTimerId();
     }
   }
 
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 5a477bb..23d011f 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
+import java.util.Map;
+import java.util.NavigableSet;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
@@ -24,6 +26,8 @@ import 
org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBu
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 
@@ -35,6 +39,8 @@ class DirectTimerInternals implements TimerInternals {
   private final Clock processingTimeClock;
   private final TransformWatermarks watermarks;
   private final TimerUpdateBuilder timerUpdateBuilder;
+  private final Map<TimeDomain, NavigableSet<TimerData>> modifiedTimers;
+  private final Map<String, TimerData> modifiedTimerIds;
 
   public static DirectTimerInternals create(
       Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder 
timerUpdateBuilder) {
@@ -46,6 +52,11 @@ class DirectTimerInternals implements TimerInternals {
     this.processingTimeClock = clock;
     this.watermarks = watermarks;
     this.timerUpdateBuilder = timerUpdateBuilder;
+    this.modifiedTimers = Maps.newHashMap();
+    this.modifiedTimers.put(TimeDomain.EVENT_TIME, Sets.newTreeSet());
+    this.modifiedTimers.put(TimeDomain.PROCESSING_TIME, Sets.newTreeSet());
+    this.modifiedTimers.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, 
Sets.newTreeSet());
+    this.modifiedTimerIds = Maps.newHashMap();
   }
 
   @Override
@@ -56,8 +67,7 @@ class DirectTimerInternals implements TimerInternals {
       Instant target,
       Instant outputTimestamp,
       TimeDomain timeDomain) {
-    timerUpdateBuilder.setTimer(
-        TimerData.of(timerId, timerFamilyId, namespace, target, 
outputTimestamp, timeDomain));
+    setTimer(TimerData.of(timerId, timerFamilyId, namespace, target, 
outputTimestamp, timeDomain));
   }
 
   /**
@@ -68,6 +78,8 @@ class DirectTimerInternals implements TimerInternals {
   @Override
   public void setTimer(TimerData timerData) {
     timerUpdateBuilder.setTimer(timerData);
+    getModifiedTimersOrdered(timerData.getDomain()).add(timerData);
+    modifiedTimerIds.put(timerData.stringKey(), timerData);
   }
 
   @Override
@@ -93,27 +105,25 @@ class DirectTimerInternals implements TimerInternals {
   /** @deprecated use {@link #deleteTimer(StateNamespace, String, 
TimeDomain)}. */
   @Deprecated
   @Override
-  public void deleteTimer(TimerData timerKey) {
-    timerUpdateBuilder.deletedTimer(timerKey);
+  public void deleteTimer(TimerData timerData) {
+    timerUpdateBuilder.deletedTimer(timerData);
+    modifiedTimerIds.put(timerData.stringKey(), timerData);
   }
 
   public TimerUpdate getTimerUpdate() {
     return timerUpdateBuilder.build();
   }
 
-  public boolean containsUpdateForTimeBefore(
-      Instant maxWatermarkTime, Instant maxProcessingTime, Instant 
maxSynchronizedProcessingTime) {
-    TimerUpdate update = timerUpdateBuilder.build();
-    return hasTimeBefore(
-            update.getSetTimers(),
-            maxWatermarkTime,
-            maxProcessingTime,
-            maxSynchronizedProcessingTime)
-        || hasTimeBefore(
-            update.getDeletedTimers(),
-            maxWatermarkTime,
-            maxProcessingTime,
-            maxSynchronizedProcessingTime);
+  public NavigableSet<TimerData> getModifiedTimersOrdered(TimeDomain 
timeDomain) {
+    NavigableSet<TimerData> modified = modifiedTimers.get(timeDomain);
+    if (modified == null) {
+      throw new IllegalStateException("Unexpected time domain " + timeDomain);
+    }
+    return modified;
+  }
+
+  public Map<String, TimerData> getModifiedTimerIds() {
+    return modifiedTimerIds;
   }
 
   @Override
@@ -135,32 +145,4 @@ class DirectTimerInternals implements TimerInternals {
   public @Nullable Instant currentOutputWatermarkTime() {
     return watermarks.getOutputWatermark();
   }
-
-  private boolean hasTimeBefore(
-      Iterable<? extends TimerData> timers,
-      Instant maxWatermarkTime,
-      Instant maxProcessingTime,
-      Instant maxSynchronizedProcessingTime) {
-    for (TimerData timerData : timers) {
-      Instant currentTime;
-      switch (timerData.getDomain()) {
-        case EVENT_TIME:
-          currentTime = maxWatermarkTime;
-          break;
-        case PROCESSING_TIME:
-          currentTime = maxProcessingTime;
-          break;
-        case SYNCHRONIZED_PROCESSING_TIME:
-          currentTime = maxSynchronizedProcessingTime;
-          break;
-        default:
-          throw new RuntimeException("Unexpected timeDomain " + 
timerData.getDomain());
-      }
-      if (timerData.getTimestamp().isBefore(currentTime)
-          || timerData.getTimestamp().isEqual(currentTime)) {
-        return true;
-      }
-    }
-    return false;
-  }
 }
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 7ffd261..44f2974 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -173,7 +173,7 @@ final class ExecutorServiceParallelExecutor
           @Override
           public void run() {
             DriverState drive = executionDriver.drive();
-            if (drive.isTermainal()) {
+            if (drive.isTerminal()) {
               State newPipelineState = State.UNKNOWN;
               switch (drive) {
                 case FAILED:
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
index 3499967..b5e5736 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
@@ -24,7 +24,6 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -39,6 +38,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,7 +80,7 @@ class QuiescenceDriver implements ExecutionDriver {
   // watermark of a PTransform before enqueuing the resulting bundle to 
pendingUpdates of downstream
   // PTransform, which can lead to watermark being updated past the emitted 
elements.
   private final Map<AppliedPTransform<?, ?, ?>, 
Collection<CommittedBundle<?>>> inflightBundles =
-      new ConcurrentHashMap<>();
+      Maps.newHashMap();
 
   private final AtomicReference<ExecutorState> state =
       new AtomicReference<>(ExecutorState.QUIESCENT);
@@ -164,15 +164,17 @@ class QuiescenceDriver implements ExecutionDriver {
 
   private void processBundle(
       CommittedBundle<?> bundle, AppliedPTransform<?, ?, ?> consumer, 
CompletionCallback callback) {
-    inflightBundles.compute(
-        consumer,
-        (k, v) -> {
-          if (v == null) {
-            v = new ArrayList<>();
-          }
-          v.add(bundle);
-          return v;
-        });
+    synchronized (inflightBundles) {
+      inflightBundles.compute(
+          consumer,
+          (k, v) -> {
+            if (v == null) {
+              v = new ArrayList<>();
+            }
+            v.add(bundle);
+            return v;
+          });
+    }
     outstandingWork.incrementAndGet();
     bundleProcessor.process(bundle, consumer, callback);
   }
@@ -180,24 +182,28 @@ class QuiescenceDriver implements ExecutionDriver {
   /** Fires any available timers. */
   private void fireTimers() {
     try {
-      for (FiredTimers<AppliedPTransform<?, ?, ?>> transformTimers :
-          evaluationContext.extractFiredTimers(inflightBundles.keySet())) {
-        Collection<TimerData> delivery = transformTimers.getTimers();
-        KeyedWorkItem<?, Object> work =
-            KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), 
delivery);
-        @SuppressWarnings({"unchecked", "rawtypes"})
-        CommittedBundle<?> bundle =
-            evaluationContext
-                .createKeyedBundle(
-                    transformTimers.getKey(),
-                    (PCollection)
-                        Iterables.getOnlyElement(
-                            
transformTimers.getExecutable().getMainInputs().values()))
-                .add(WindowedValue.valueInGlobalWindow(work))
-                .commit(evaluationContext.now());
-        processBundle(
-            bundle, transformTimers.getExecutable(), new 
TimerIterableCompletionCallback(delivery));
-        state.set(ExecutorState.ACTIVE);
+      synchronized (inflightBundles) {
+        for (FiredTimers<AppliedPTransform<?, ?, ?>> transformTimers :
+            evaluationContext.extractFiredTimers(inflightBundles.keySet())) {
+          Collection<TimerData> delivery = transformTimers.getTimers();
+          KeyedWorkItem<?, Object> work =
+              KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), 
delivery);
+          @SuppressWarnings({"unchecked", "rawtypes"})
+          CommittedBundle<?> bundle =
+              evaluationContext
+                  .createKeyedBundle(
+                      transformTimers.getKey(),
+                      (PCollection)
+                          Iterables.getOnlyElement(
+                              
transformTimers.getExecutable().getMainInputs().values()))
+                  .add(WindowedValue.valueInGlobalWindow(work))
+                  .commit(evaluationContext.now());
+          processBundle(
+              bundle,
+              transformTimers.getExecutable(),
+              new TimerIterableCompletionCallback(delivery));
+          state.set(ExecutorState.ACTIVE);
+        }
       }
     } catch (Exception e) {
       LOG.error("Internal Error while delivering timers", e);
@@ -313,12 +319,14 @@ class QuiescenceDriver implements ExecutionDriver {
         state.set(ExecutorState.ACTIVE);
       }
       outstandingWork.decrementAndGet();
-      inflightBundles.compute(
-          result.getTransform(),
-          (k, v) -> {
-            v.remove(inputBundle);
-            return v.isEmpty() ? null : v;
-          });
+      synchronized (inflightBundles) {
+        inflightBundles.compute(
+            result.getTransform(),
+            (k, v) -> {
+              v.remove(inputBundle);
+              return v.isEmpty() ? null : v;
+            });
+      }
       return committedResult;
     }
 
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index ebd305f..054b22d 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -17,14 +17,10 @@
  */
 package org.apache.beam.runners.direct;
 
-import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
-
 import com.google.auto.value.AutoValue;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.PriorityQueue;
+import java.util.NavigableSet;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
@@ -47,7 +43,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering;
 import org.joda.time.Instant;
 
 /** A {@link TransformEvaluatorFactory} for stateful {@link ParDo}. */
@@ -155,7 +150,6 @@ final class StatefulParDoEvaluatorFactory<K, InputT, 
OutputT> implements Transfo
       implements TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> {
 
     private final DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, 
InputT>> delegateEvaluator;
-    private final List<TimerData> pushedBackTimers = new ArrayList<>();
     private final DirectTimerInternals timerInternals;
 
     DirectStepContext stepContext;
@@ -174,45 +168,46 @@ final class StatefulParDoEvaluatorFactory<K, InputT, 
OutputT> implements Transfo
       for (WindowedValue<KV<K, InputT>> windowedValue : 
gbkResult.getValue().elementsIterable()) {
         delegateEvaluator.processElement(windowedValue);
       }
-      PriorityQueue<TimerData> toBeFiredTimers =
-          new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));
 
-      Instant maxWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      Instant maxProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      Instant maxSynchronizedProcessingTime = 
BoundedWindow.TIMESTAMP_MIN_VALUE;
       for (TimerData timerData : gbkResult.getValue().timersIterable()) {
-        toBeFiredTimers.add(timerData);
-        switch (timerData.getDomain()) {
-          case EVENT_TIME:
-            maxWatermarkTime = Ordering.natural().max(maxWatermarkTime, 
timerData.getTimestamp());
-            break;
-          case PROCESSING_TIME:
-            maxProcessingTime = Ordering.natural().max(maxProcessingTime, 
timerData.getTimestamp());
-            break;
-          case SYNCHRONIZED_PROCESSING_TIME:
-            maxSynchronizedProcessingTime =
-                Ordering.natural().max(maxSynchronizedProcessingTime, 
timerData.getTimestamp());
+        // Get any new or modified timers that are earlier than the current 
one. In order to
+        // maintain timer ordering,
+        // we need to fire these timers first.
+        NavigableSet<TimerData> earlierTimers =
+            
timerInternals.getModifiedTimersOrdered(timerData.getDomain()).headSet(timerData,
 true);
+        while (!earlierTimers.isEmpty()) {
+          TimerData insertedTimer = earlierTimers.pollFirst();
+          if (timerModified(insertedTimer)) {
+            continue;
+          }
+          // Make sure to register this timer as deleted. This could be a 
timer that was originally
+          // set for the future
+          // and not in the bundle but was reset to an earlier time in this 
bundle. If we don't
+          // explicity delete the
+          // future timer, then it will still fire.
+          timerInternals.deleteTimer(insertedTimer);
+          processTimer(insertedTimer, gbkResult.getValue().key());
         }
-      }
 
-      while (!timerInternals.containsUpdateForTimeBefore(
-              maxWatermarkTime, maxProcessingTime, 
maxSynchronizedProcessingTime)
-          && !toBeFiredTimers.isEmpty()) {
-
-        TimerData timer = toBeFiredTimers.poll();
-        checkState(
-            timer.getNamespace() instanceof WindowNamespace,
-            "Expected Timer %s to be in a %s, but got %s",
-            timer,
-            WindowNamespace.class.getSimpleName(),
-            timer.getNamespace().getClass().getName());
-        WindowNamespace<?> windowNamespace = (WindowNamespace) 
timer.getNamespace();
-        BoundedWindow timerWindow = windowNamespace.getWindow();
-
-        delegateEvaluator.onTimer(timer, gbkResult.getValue().key(), 
timerWindow);
-        clearWatermarkHold(timer);
+        // As long as the timer hasn't been modified or deleted earlier in the 
bundle, fire it.
+        if (!timerModified(timerData)) {
+          processTimer(timerData, gbkResult.getValue().key());
+        }
       }
-      pushedBackTimers.addAll(toBeFiredTimers);
+    }
+
+    // Check to see if a timer has been modified inside this bundle.
+    private boolean timerModified(TimerData timerData) {
+      @Nullable
+      TimerData modifiedTimer = 
timerInternals.getModifiedTimerIds().get(timerData.stringKey());
+      return modifiedTimer != null && !modifiedTimer.equals(timerData);
+    }
+
+    private void processTimer(TimerData timerData, K key) throws Exception {
+      WindowNamespace<?> windowNamespace = (WindowNamespace) 
timerData.getNamespace();
+      BoundedWindow timerWindow = windowNamespace.getWindow();
+      delegateEvaluator.onTimer(timerData, key, timerWindow);
+      clearWatermarkHold(timerData);
     }
 
     private void clearWatermarkHold(TimerData timer) {
@@ -256,9 +251,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, 
OutputT> implements Transfo
         watermarkHold = delegateResult.getWatermarkHold();
       }
 
-      TimerUpdate timerUpdate =
-          
delegateResult.getTimerUpdate().withPushedBackTimers(pushedBackTimers);
-      pushedBackTimers.clear();
+      TimerUpdate timerUpdate = delegateResult.getTimerUpdate();
       StepTransformResult.Builder<KeyedWorkItem<K, KV<K, InputT>>> 
regroupedResult =
           StepTransformResult.<KeyedWorkItem<K, KV<K, InputT>>>withHold(
                   delegateResult.getTransform(), watermarkHold)
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index e6626d0..a68021e 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -26,17 +26,15 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Objects;
+import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -45,7 +43,6 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.local.Bundle;
@@ -60,16 +57,14 @@ import org.apache.beam.sdk.values.PCollection;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBasedTable;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Queues;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SortedMultiset;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TreeMultiset;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
@@ -239,7 +234,7 @@ class WatermarkManager<ExecutableT, CollectionT> {
 
     // Entries in this table represent the authoritative timestamp for which
     // a per-key-and-StateNamespace timer is set.
-    private final Map<StructuralKey<?>, Table<StateNamespace, String, 
TimerData>> existingTimers;
+    private final Map<StructuralKey<?>, Map<String, TimerData>> existingTimers;
 
     // This per-key sorted set allows quick retrieval of timers that should 
fire for a key
     private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
@@ -343,15 +338,14 @@ class WatermarkManager<ExecutableT, CollectionT> {
     synchronized void updateTimers(TimerUpdate update) {
       NavigableSet<TimerData> keyTimers =
           objectTimers.computeIfAbsent(update.key, k -> new TreeSet<>());
-      Table<StateNamespace, String, TimerData> existingTimersForKey =
-          existingTimers.computeIfAbsent(update.key, k -> 
HashBasedTable.create());
+      Map<String, TimerData> existingTimersForKey =
+          existingTimers.computeIfAbsent(update.key, k -> Maps.newHashMap());
 
+      HashSet<String> newSetTimers = Sets.newHashSet();
       for (TimerData timer : update.getSetTimers()) {
         if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
-          @Nullable
-          TimerData existingTimer =
-              existingTimersForKey.get(
-                  timer.getNamespace(), timer.getTimerId() + '+' + 
timer.getTimerFamilyId());
+          newSetTimers.add(timer.stringKey());
+          @Nullable TimerData existingTimer = 
existingTimersForKey.get(timer.stringKey());
 
           if (existingTimer == null) {
             pendingTimers.add(timer);
@@ -366,32 +360,29 @@ class WatermarkManager<ExecutableT, CollectionT> {
             keyTimers.add(timer);
           }
 
-          existingTimersForKey.put(
-              timer.getNamespace(), timer.getTimerId() + '+' + 
timer.getTimerFamilyId(), timer);
+          existingTimersForKey.put(timer.stringKey(), timer);
         }
       }
 
       for (TimerData timer : update.getDeletedTimers()) {
         if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
-          @Nullable
-          TimerData existingTimer =
-              existingTimersForKey.get(
-                  timer.getNamespace(), timer.getTimerId() + '+' + 
timer.getTimerFamilyId());
+          @Nullable TimerData existingTimer = 
existingTimersForKey.get(timer.stringKey());
 
           if (existingTimer != null) {
             pendingTimers.remove(existingTimer);
             keyTimers.remove(existingTimer);
-            existingTimersForKey.remove(
-                existingTimer.getNamespace(),
-                existingTimer.getTimerId() + '+' + 
existingTimer.getTimerFamilyId());
+            existingTimersForKey.remove(existingTimer.stringKey());
           }
         }
       }
 
       for (TimerData timer : update.getCompletedTimers()) {
         if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
-          keyTimers.remove(timer);
-          pendingTimers.remove(timer);
+          if (!newSetTimers.contains(timer.stringKey())) {
+            keyTimers.remove(timer);
+            pendingTimers.remove(timer);
+            existingTimersForKey.remove(timer.stringKey());
+          }
         }
       }
 
@@ -519,7 +510,7 @@ class WatermarkManager<ExecutableT, CollectionT> {
     private final Collection<Bundle<?, ?>> pendingBundles;
     private final Map<StructuralKey<?>, NavigableSet<TimerData>> 
processingTimers;
     private final Map<StructuralKey<?>, NavigableSet<TimerData>> 
synchronizedProcessingTimers;
-    private final Map<StructuralKey<?>, Table<StateNamespace, String, 
TimerData>> existingTimers;
+    private final Map<StructuralKey<?>, Map<String, TimerData>> existingTimers;
 
     private final NavigableSet<TimerData> pendingTimers;
 
@@ -629,21 +620,18 @@ class WatermarkManager<ExecutableT, CollectionT> {
     }
 
     private synchronized void updateTimers(TimerUpdate update) {
-      Map<TimeDomain, NavigableSet<TimerData>> timerMap = timerMap(update.key);
-      Table<StateNamespace, String, TimerData> existingTimersForKey =
-          existingTimers.computeIfAbsent(update.key, k -> 
HashBasedTable.create());
+      Map<String, TimerData> existingTimersForKey =
+          existingTimers.computeIfAbsent(update.key, k -> Maps.newHashMap());
 
+      HashSet<String> newSetTimers = Sets.newHashSet();
       for (TimerData addedTimer : update.setTimers.values()) {
-        NavigableSet<TimerData> timerQueue = 
timerMap.get(addedTimer.getDomain());
+        NavigableSet<TimerData> timerQueue =
+            processQueueForDomain(update.key, addedTimer.getDomain());
         if (timerQueue == null) {
           continue;
         }
-
-        @Nullable
-        TimerData existingTimer =
-            existingTimersForKey.get(
-                addedTimer.getNamespace(),
-                addedTimer.getTimerId() + '+' + addedTimer.getTimerFamilyId());
+        newSetTimers.add(addedTimer.stringKey());
+        @Nullable TimerData existingTimer = 
existingTimersForKey.get(addedTimer.stringKey());
         if (existingTimer == null) {
           timerQueue.add(addedTimer);
         } else if (!existingTimer.equals(addedTimer)) {
@@ -651,34 +639,30 @@ class WatermarkManager<ExecutableT, CollectionT> {
           timerQueue.add(addedTimer);
         } // else the timer is already set identically, so noop.
 
-        existingTimersForKey.put(
-            addedTimer.getNamespace(),
-            addedTimer.getTimerId() + '+' + addedTimer.getTimerFamilyId(),
-            addedTimer);
+        existingTimersForKey.put(addedTimer.stringKey(), addedTimer);
       }
 
       for (TimerData deletedTimer : update.deletedTimers.values()) {
-        NavigableSet<TimerData> timerQueue = 
timerMap.get(deletedTimer.getDomain());
+        NavigableSet<TimerData> timerQueue =
+            processQueueForDomain(update.key, deletedTimer.getDomain());
         if (timerQueue == null) {
           continue;
         }
-
-        @Nullable
-        TimerData existingTimer =
-            existingTimersForKey.get(
-                deletedTimer.getNamespace(),
-                deletedTimer.getTimerId() + '+' + 
deletedTimer.getTimerFamilyId());
+        String timerKey = deletedTimer.stringKey();
+        @Nullable TimerData existingTimer = existingTimersForKey.get(timerKey);
         if (existingTimer != null) {
           pendingTimers.remove(deletedTimer);
           timerQueue.remove(deletedTimer);
-          existingTimersForKey.remove(
-              existingTimer.getNamespace(),
-              existingTimer.getTimerId() + '+' + 
existingTimer.getTimerFamilyId());
+          existingTimersForKey.remove(timerKey);
         }
       }
 
       for (TimerData completedTimer : update.completedTimers) {
-        pendingTimers.remove(completedTimer);
+        String timerKey = completedTimer.stringKey();
+        if (!newSetTimers.contains(timerKey)) {
+          pendingTimers.remove(completedTimer);
+          existingTimersForKey.remove(timerKey);
+        }
       }
 
       // notify of TimerData update
@@ -713,15 +697,16 @@ class WatermarkManager<ExecutableT, CollectionT> {
       return firedTimers;
     }
 
-    private Map<TimeDomain, NavigableSet<TimerData>> timerMap(StructuralKey<?> 
key) {
-      NavigableSet<TimerData> processingQueue =
-          processingTimers.computeIfAbsent(key, k -> new TreeSet<>());
-      NavigableSet<TimerData> synchronizedProcessingQueue =
-          synchronizedProcessingTimers.computeIfAbsent(key, k -> new 
TreeSet<>());
-      EnumMap<TimeDomain, NavigableSet<TimerData>> result = new 
EnumMap<>(TimeDomain.class);
-      result.put(TimeDomain.PROCESSING_TIME, processingQueue);
-      result.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, 
synchronizedProcessingQueue);
-      return result;
+    private @Nullable NavigableSet<TimerData> processQueueForDomain(
+        StructuralKey<?> key, TimeDomain timeDomain) {
+      switch (timeDomain) {
+        case PROCESSING_TIME:
+          return processingTimers.computeIfAbsent(key, k -> new TreeSet<>());
+        case SYNCHRONIZED_PROCESSING_TIME:
+          return synchronizedProcessingTimers.computeIfAbsent(key, k -> new 
TreeSet<>());
+        default:
+          return null;
+      }
     }
 
     @Override
@@ -853,7 +838,7 @@ class WatermarkManager<ExecutableT, CollectionT> {
    *
    * <p>The result collection retains ordering of timers (from earliest to 
latest).
    */
-  private static Map<StructuralKey<?>, List<TimerData>> extractFiredTimers(
+  private static synchronized Map<StructuralKey<?>, List<TimerData>> 
extractFiredTimers(
       Instant latestTime, Map<StructuralKey<?>, NavigableSet<TimerData>> 
objectTimers) {
     Map<StructuralKey<?>, List<TimerData>> result = new HashMap<>();
     Set<StructuralKey<?>> emptyKeys = new HashSet<>();
@@ -895,8 +880,7 @@ class WatermarkManager<ExecutableT, CollectionT> {
   private final Map<ExecutableT, TransformWatermarks> transformToWatermarks;
 
   /** A queue of pending updates to the state of this {@link 
WatermarkManager}. */
-  private final ConcurrentLinkedQueue<PendingWatermarkUpdate<ExecutableT, 
CollectionT>>
-      pendingUpdates;
+  private final Queue<PendingWatermarkUpdate<ExecutableT, CollectionT>> 
pendingUpdates;
 
   /** A lock used to control concurrency for updating pending values. */
   private final Lock refreshLock;
@@ -914,7 +898,7 @@ class WatermarkManager<ExecutableT, CollectionT> {
    * bundle processor at a time.
    */
   private final Map<ExecutableT, Set<String>> 
transformsWithAlreadyExtractedTimers =
-      new ConcurrentHashMap<>();
+      Maps.newHashMap();
 
   /**
    * Creates a new {@link WatermarkManager}. All watermarks within the newly 
created {@link
@@ -942,7 +926,7 @@ class WatermarkManager<ExecutableT, CollectionT> {
     this.graph = graph;
     this.getName = getName;
 
-    this.pendingUpdates = new ConcurrentLinkedQueue<>();
+    this.pendingUpdates = Queues.newArrayDeque();
 
     this.refreshLock = new ReentrantLock();
     this.pendingRefreshes = new HashSet<>();
@@ -1000,18 +984,20 @@ class WatermarkManager<ExecutableT, CollectionT> {
       Map<ExecutableT, Set<String>> transformsWithAlreadyExtractedTimers, 
ExecutableT executable) {
 
     return update -> {
-      String timerIdWithNs = 
TimerUpdate.getTimerIdAndTimerFamilyIdWithNamespace(update);
-      transformsWithAlreadyExtractedTimers.compute(
-          executable,
-          (k, v) -> {
-            if (v != null) {
-              v.remove(timerIdWithNs);
-              if (v.isEmpty()) {
-                v = null;
+      String timerIdWithNs = update.stringKey();
+      synchronized (transformsWithAlreadyExtractedTimers) {
+        transformsWithAlreadyExtractedTimers.compute(
+            executable,
+            (k, v) -> {
+              if (v != null) {
+                v.remove(timerIdWithNs);
+                if (v.isEmpty()) {
+                  v = null;
+                }
               }
-            }
-            return v;
-          });
+              return v;
+            });
+      }
     };
   }
 
@@ -1108,9 +1094,11 @@ class WatermarkManager<ExecutableT, CollectionT> {
       @Nullable Bundle<?, ? extends CollectionT> unprocessedInputs,
       Iterable<? extends Bundle<?, ? extends CollectionT>> outputs,
       Instant earliestHold) {
-    pendingUpdates.offer(
-        PendingWatermarkUpdate.create(
-            executable, completed, timerUpdate, unprocessedInputs, outputs, 
earliestHold));
+    synchronized (pendingUpdates) {
+      pendingUpdates.offer(
+          PendingWatermarkUpdate.create(
+              executable, completed, timerUpdate, unprocessedInputs, outputs, 
earliestHold));
+    }
     tryApplyPendingUpdates();
   }
 
@@ -1140,10 +1128,12 @@ class WatermarkManager<ExecutableT, CollectionT> {
   /** Applies up to {@code numUpdates}, or all available updates if numUpdates 
is non-positive. */
   @GuardedBy("refreshLock")
   private void applyNUpdates(int numUpdates) {
-    for (int i = 0; !pendingUpdates.isEmpty() && (i < numUpdates || numUpdates 
<= 0); i++) {
-      PendingWatermarkUpdate<ExecutableT, CollectionT> pending = 
pendingUpdates.poll();
-      applyPendingUpdate(pending);
-      pendingRefreshes.add(pending.getExecutable());
+    synchronized (pendingUpdates) {
+      for (int i = 0; !pendingUpdates.isEmpty() && ((i < numUpdates) || 
(numUpdates <= 0)); i++) {
+        PendingWatermarkUpdate<ExecutableT, CollectionT> pending = 
pendingUpdates.poll();
+        applyPendingUpdate(pending);
+        pendingRefreshes.add(pending.getExecutable());
+      }
     }
   }
 
@@ -1269,26 +1259,27 @@ class WatermarkManager<ExecutableT, CollectionT> {
         if (ignoredExecutables.contains(transform)) {
           continue;
         }
-        if (!transformsWithAlreadyExtractedTimers.containsKey(transform)) {
-          TransformWatermarks watermarks = watermarksEntry.getValue();
-          Collection<FiredTimers<ExecutableT>> firedTimers = 
watermarks.extractFiredTimers();
-          if (!firedTimers.isEmpty()) {
-            List<TimerData> newTimers =
-                firedTimers.stream()
-                    .flatMap(f -> f.getTimers().stream())
-                    .collect(Collectors.toList());
-            transformsWithAlreadyExtractedTimers.compute(
-                transform,
-                (k, v) -> {
-                  if (v == null) {
-                    v = new HashSet<>();
-                  }
-                  final Set<String> toUpdate = v;
-                  newTimers.forEach(
-                      td -> 
toUpdate.add(TimerUpdate.getTimerIdAndTimerFamilyIdWithNamespace(td)));
-                  return v;
-                });
-            allTimers.addAll(firedTimers);
+        synchronized (transformsWithAlreadyExtractedTimers) {
+          if (!transformsWithAlreadyExtractedTimers.containsKey(transform)) {
+            TransformWatermarks watermarks = watermarksEntry.getValue();
+            Collection<FiredTimers<ExecutableT>> firedTimers = 
watermarks.extractFiredTimers();
+            if (!firedTimers.isEmpty()) {
+              List<TimerData> newTimers =
+                  firedTimers.stream()
+                      .flatMap(f -> f.getTimers().stream())
+                      .collect(Collectors.toList());
+              transformsWithAlreadyExtractedTimers.compute(
+                  transform,
+                  (k, v) -> {
+                    if (v == null) {
+                      v = new HashSet<>();
+                    }
+                    final Set<String> toUpdate = v;
+                    newTimers.forEach(td -> toUpdate.add(td.stringKey()));
+                    return v;
+                  });
+              allTimers.addAll(firedTimers);
+            }
           }
         }
       }
@@ -1563,24 +1554,18 @@ class WatermarkManager<ExecutableT, CollectionT> {
    *
    * <p>setTimers and deletedTimers are collections of {@link TimerData} that 
have been added to the
    * {@link TimerInternals} of an executed step. completedTimers are timers 
that were delivered as
-   * the input to the executed step. pushedBackTimers are timers that were in 
completedTimers at the
-   * input, but were pushed back due to processing constraints.
+   * the input to the executed step.
    */
   public static class TimerUpdate {
     private final StructuralKey<?> key;
     private final Iterable<? extends TimerData> completedTimers;
     private final Map<TimerKey, ? extends TimerData> setTimers;
     private final Map<TimerKey, ? extends TimerData> deletedTimers;
-    private final Iterable<? extends TimerData> pushedBackTimers;
 
     /** Returns a TimerUpdate for a null key with no timers. */
     public static TimerUpdate empty() {
       return new TimerUpdate(
-          null,
-          Collections.emptyList(),
-          Collections.emptyMap(),
-          Collections.emptyMap(),
-          Collections.emptyList());
+          null, Collections.emptyList(), Collections.emptyMap(), 
Collections.emptyMap());
     }
 
     /**
@@ -1651,26 +1636,19 @@ class WatermarkManager<ExecutableT, CollectionT> {
             key,
             ImmutableList.copyOf(completedTimers),
             ImmutableMap.copyOf(setTimers),
-            ImmutableMap.copyOf(deletedTimers),
-            Collections.emptyList());
+            ImmutableMap.copyOf(deletedTimers));
       }
     }
 
-    private static String getTimerIdAndTimerFamilyIdWithNamespace(TimerData 
td) {
-      return td.getNamespace() + td.getTimerId() + td.getTimerFamilyId();
-    }
-
     private TimerUpdate(
         StructuralKey<?> key,
         Iterable<? extends TimerData> completedTimers,
         Map<TimerKey, ? extends TimerData> setTimers,
-        Map<TimerKey, ? extends TimerData> deletedTimers,
-        Iterable<? extends TimerData> pushedBackTimers) {
+        Map<TimerKey, ? extends TimerData> deletedTimers) {
       this.key = key;
       this.completedTimers = completedTimers;
       this.setTimers = setTimers;
       this.deletedTimers = deletedTimers;
-      this.pushedBackTimers = pushedBackTimers;
     }
 
     @VisibleForTesting
@@ -1693,46 +1671,21 @@ class WatermarkManager<ExecutableT, CollectionT> {
       return deletedTimers.values();
     }
 
-    Iterable<? extends TimerData> getPushedBackTimers() {
-      return pushedBackTimers;
-    }
-
     boolean isEmpty() {
-      return Iterables.isEmpty(completedTimers)
-          && setTimers.isEmpty()
-          && deletedTimers.isEmpty()
-          && Iterables.isEmpty(pushedBackTimers);
+      return Iterables.isEmpty(completedTimers) && setTimers.isEmpty() && 
deletedTimers.isEmpty();
     }
 
     /**
      * Returns a {@link TimerUpdate} that is like this one, but with the 
specified completed timers.
-     * Note that if any of the completed timers is in pushedBackTimers, then 
it is set instead. The
-     * pushedBackTimers are cleared afterwards.
      */
     public TimerUpdate withCompletedTimers(Iterable<TimerData> 
completedTimers) {
       List<TimerData> timersToComplete = new ArrayList<>();
-      Set<TimerData> pushedBack = Sets.newHashSet(pushedBackTimers);
       Map<TimerKey, TimerData> newSetTimers = Maps.newLinkedHashMap();
       newSetTimers.putAll(setTimers);
       for (TimerData td : completedTimers) {
-        TimerKey timerKey = TimerKey.of(td);
-        if (!pushedBack.contains(td)) {
-          timersToComplete.add(td);
-        } else if (!newSetTimers.containsKey(timerKey)) {
-          newSetTimers.put(timerKey, td);
-        }
+        timersToComplete.add(td);
       }
-      return new TimerUpdate(
-          key, timersToComplete, newSetTimers, deletedTimers, 
Collections.emptyList());
-    }
-
-    /**
-     * Returns a {@link TimerUpdate} that is like this one, but with the 
pushedBackTimersare removed
-     * set by provided pushedBackTimers.
-     */
-    public TimerUpdate withPushedBackTimers(Iterable<TimerData> 
pushedBackTimers) {
-      return new TimerUpdate(
-          key, completedTimers, setTimers, deletedTimers, 
Lists.newArrayList(pushedBackTimers));
+      return new TimerUpdate(key, timersToComplete, newSetTimers, 
deletedTimers);
     }
 
     @Override
@@ -1759,7 +1712,6 @@ class WatermarkManager<ExecutableT, CollectionT> {
           .add("setTimers", setTimers)
           .add("completedTimers", completedTimers)
           .add("deletedTimers", deletedTimers)
-          .add("pushedBackTimers", pushedBackTimers)
           .toString();
     }
   }
diff --git 
a/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java
 
b/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java
index c17dc54..3638e4e 100644
--- 
a/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java
+++ 
b/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java
@@ -33,7 +33,7 @@ public interface ExecutionDriver {
       this.terminal = terminal;
     }
 
-    public boolean isTermainal() {
+    public boolean isTerminal() {
       return terminal;
     }
   }
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index fc80127..6f603a2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -4121,7 +4121,8 @@ public class ParDoTest implements Serializable {
       ValidatesRunner.class,
       UsesStatefulParDo.class,
       UsesTimersInParDo.class,
-      UsesLoopingTimer.class
+      UsesLoopingTimer.class,
+      UsesStrictTimerOrdering.class
     })
     public void testEventTimeTimerLoop() {
       final String stateId = "count";
@@ -4141,6 +4142,7 @@ public class ParDoTest implements Serializable {
             public void processElement(
                 @StateId(stateId) ValueState<Integer> countState,
                 @TimerId(timerId) Timer loopTimer) {
+              countState.write(0);
               loopTimer.offset(Duration.millis(1)).setRelative();
             }
 
@@ -4149,7 +4151,7 @@ public class ParDoTest implements Serializable {
                 @StateId(stateId) ValueState<Integer> countState,
                 @TimerId(timerId) Timer loopTimer,
                 OutputReceiver<Integer> r) {
-              int count = MoreObjects.firstNonNull(countState.read(), 0);
+              int count = Preconditions.checkNotNull(countState.read());
               if (count < loopCount) {
                 r.output(count);
                 countState.write(count + 1);
@@ -4159,7 +4161,9 @@ public class ParDoTest implements Serializable {
           };
 
       PCollection<Integer> output =
-          pipeline.apply(Create.of(KV.of("hello", 42))).apply(ParDo.of(fn));
+          pipeline
+              .apply(Create.of(KV.of("hello1", 42), KV.of("hello2", 42), 
KV.of("hello3", 42)))
+              .apply(ParDo.of(fn));
 
       PAssert.that(output).containsInAnyOrder(0, 1, 2, 3, 4);
       pipeline.run();

Reply via email to