kennknowles commented on a change in pull request #11924:
URL: https://github.com/apache/beam/pull/11924#discussion_r436173779



##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -3938,10 +3939,11 @@ public void testEventTimeTimerOrdering() throws 
Exception {
       ValidatesRunner.class,
       UsesTimersInParDo.class,
       UsesStatefulParDo.class,
+      UsesUnboundedPCollections.class,
       UsesStrictTimerOrdering.class
     })
     public void testEventTimeTimerOrderingWithCreate() throws Exception {
-      final int numTestElements = 100;
+      final int numTestElements = 5;

Review comment:
       Why shrink it? Does the test get really slow? Is this going to be a perf 
problem overall?

##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -577,12 +583,21 @@ public void flushState() {
                         WindmillTimerInternals.windmillTimerToTimerData(
                             WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, 
timer, windowCoder))
                 .iterator();
+
+        cachedFiredUserTimers.forEachRemaining(toBeFiredTimersOrdered::add);
+      }
+
+      Instant currentInputWatermark = 
userTimerInternals.currentInputWatermarkTime();
+      if (userTimerInternals.hasTimerBefore(currentInputWatermark)) {
+        while (!toBeFiredTimersOrdered.isEmpty()) {
+          userTimerInternals.setTimer(toBeFiredTimersOrdered.poll());
+        }
       }

Review comment:
       Yea I don't actually understand what this block is for.
   
   FWIW to do timer deletion/reset cheaply without building a bespoke data 
structure just keep a map from id to firing time or tombstone. This way, 
whenever a timer comes up in the prio queue you pull out the actual time for it 
from the map. If it is actually set for another time, don't fire it. If it is 
obsolete, don't fire it.

##########
File path: 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
##########
@@ -577,12 +583,21 @@ public void flushState() {
                         WindmillTimerInternals.windmillTimerToTimerData(
                             WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, 
timer, windowCoder))
                 .iterator();
+
+        cachedFiredUserTimers.forEachRemaining(toBeFiredTimersOrdered::add);

Review comment:
       Do we even need `cachedFiredUserTimers`? It seems obsolete if we 
populate the priority queue. The name is also wrong - even before this PR it 
wasn't a cache. It is a lazily initialized iterator. Instead, we should have a 
lazily initialized priority queue (like you do) and just a flag to say whether 
the incoming timers have been loaded yet.

##########
File path: 
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
##########
@@ -4040,7 +4043,8 @@ public void onTimer(
             }
           };
 
-      PCollection<String> output = 
pipeline.apply(transform).apply(ParDo.of(fn));
+      PCollection<String> output =
+          
pipeline.apply(transform).setIsBoundedInternal(IsBounded.UNBOUNDED).apply(ParDo.of(fn));

Review comment:
       Should not be calling `setIsBoundedInternal` here. Is this just to force 
streaming mode? We need to just create a separate run of ValidatesRunner that 
forces streaming mode.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to