lukecwik commented on a change in pull request #15056:
URL: https://github.com/apache/beam/pull/15056#discussion_r660780333



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -81,6 +81,9 @@
    */
   void setRelative();
 
+  /** Clears a timer. */

Review comment:
       ```suggestion
     /** Previously set timers will become unset. */
   ```

##########
File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
##########
@@ -670,7 +670,8 @@ private synchronized void updateTimers(TimerUpdate update) {
             existingTimersForKey.get(
                 deletedTimer.getNamespace(),
                 deletedTimer.getTimerId() + '+' + 
deletedTimer.getTimerFamilyId());
-
+        System.err.println(

Review comment:
       drop debugging statement

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4851,6 +4851,342 @@ public void onTimer2(
         return PDone.in(input.getPipeline());
       }
     }
+

Review comment:
       Should we cover the case where the timer becomes eligible and still 
fires even though it is being cleared in the same bundle?
   
   e.g.
   set timer A for 1, set timer B for 2, advance time to 3, have timer A 
callback clear B, B still fires since it is part of the same bundle

##########
File path: 
runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
##########
@@ -173,48 +174,73 @@ public void processElement(WindowedValue<KeyedWorkItem<K, 
KV<K, InputT>>> gbkRes
       for (WindowedValue<KV<K, InputT>> windowedValue : 
gbkResult.getValue().elementsIterable()) {
         delegateEvaluator.processElement(windowedValue);
       }
-
-      final Instant inputWatermarkTime = 
timerInternals.currentInputWatermarkTime();
       PriorityQueue<TimerData> toBeFiredTimers =
           new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));
-      gbkResult.getValue().timersIterable().forEach(toBeFiredTimers::add);
 
-      while (!timerInternals.containsUpdateForTimeBefore(inputWatermarkTime)
+      Instant maxWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      Instant maxProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+      Instant maxSynchronizedProcessingTime = 
BoundedWindow.TIMESTAMP_MIN_VALUE;
+      for (TimerData timerData : gbkResult.getValue().timersIterable()) {
+        toBeFiredTimers.add(timerData);
+        switch (timerData.getDomain()) {
+          case EVENT_TIME:
+            maxWatermarkTime = Ordering.natural().max(maxWatermarkTime, 
timerData.getTimestamp());
+            break;
+          case PROCESSING_TIME:
+            maxProcessingTime = Ordering.natural().max(maxProcessingTime, 
timerData.getTimestamp());
+            break;
+          case SYNCHRONIZED_PROCESSING_TIME:
+            maxSynchronizedProcessingTime =
+                Ordering.natural().max(maxSynchronizedProcessingTime, 
timerData.getTimestamp());
+        }
+      }
+
+      while (!timerInternals.containsUpdateForTimeBefore(
+              maxWatermarkTime, maxProcessingTime, 
maxSynchronizedProcessingTime)
           && !toBeFiredTimers.isEmpty()) {
+
         TimerData timer = toBeFiredTimers.poll();
         checkState(
             timer.getNamespace() instanceof WindowNamespace,
-            "Expected Timer %s to be in a %s, but got %s",
+            "Expected Timer %s to be in a        %s, but got %s",

Review comment:
       ```suggestion
               "Expected Timer %s to be in a %s, but got %s",
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/state/Timer.java
##########
@@ -81,6 +81,9 @@
    */

Review comment:
       We should add documentation to Timer saying that set and/or clear calls 
may only become visible after this bundle completes and may not be applied 
immediately allowing for existing timers which have become eligible to still 
fire.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to