scwhittle commented on code in PR #37304:
URL: https://github.com/apache/beam/pull/37304#discussion_r2741192718


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/FinalizeBundleHandler.java:
##########
@@ -45,14 +49,11 @@
  * <p>See <a href="https://s.apache.org/beam-finalizing-bundles";>Apache Beam 
Portability API: How to
  * Finalize Bundles</a> for further details.
  */
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
 public class FinalizeBundleHandler {
 
   /** A {@link BundleFinalizer.Callback} and expiry time pair. */
   @AutoValue
-  abstract static class CallbackRegistration {
+  public abstract static class CallbackRegistration {

Review Comment:
   Intellij had a warning and the type is part of the public method interface 
below.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/FinalizeBundleHandler.java:
##########
@@ -64,55 +65,53 @@ public static CallbackRegistration create(
   }
 
   private final ConcurrentMap<String, Collection<CallbackRegistration>> 
bundleFinalizationCallbacks;
+  private final ReentrantLock cleanupLock = new ReentrantLock();
+  private final Condition queueMinChanged = cleanupLock.newCondition();
+
+  @GuardedBy("cleanupLock")
   private final PriorityQueue<TimestampedValue<String>> cleanUpQueue;
 
   @SuppressWarnings("unused")
-  private final Future<Void> cleanUpResult;
+  private final Future<?> cleanUpResult;
 
+  @SuppressWarnings("methodref.receiver.bound")
   public FinalizeBundleHandler(ExecutorService executorService) {
     this.bundleFinalizationCallbacks = new ConcurrentHashMap<>();
     this.cleanUpQueue =
         new PriorityQueue<>(11, 
Comparator.comparing(TimestampedValue::getTimestamp));
-    // Wait until we have at least one element. We are notified on each element
-    // being added.
-    // Wait until the current time has past the expiry time for the head of the
-    // queue.
-    // We are notified on each element being added.
-    // Wait until we have at least one element. We are notified on each element
-    // being added.
-    // Wait until the current time has past the expiry time for the head of the
-    // queue.
-    // We are notified on each element being added.
-    cleanUpResult =
-        executorService.submit(
-            (Callable<Void>)
-                () -> {
-                  while (true) {
-                    synchronized (cleanUpQueue) {
-                      TimestampedValue<String> expiryTime = 
cleanUpQueue.peek();
-
-                      // Wait until we have at least one element. We are 
notified on each element
-                      // being added.
-                      while (expiryTime == null) {
-                        cleanUpQueue.wait();
-                        expiryTime = cleanUpQueue.peek();
-                      }
-
-                      // Wait until the current time has past the expiry time 
for the head of the
-                      // queue.
-                      // We are notified on each element being added.
-                      Instant now = Instant.now();
-                      while (expiryTime.getTimestamp().isAfter(now)) {
-                        Duration timeDifference = new Duration(now, 
expiryTime.getTimestamp());
-                        cleanUpQueue.wait(timeDifference.getMillis());
-                        expiryTime = cleanUpQueue.peek();
-                        now = Instant.now();
-                      }
-
-                      
bundleFinalizationCallbacks.remove(cleanUpQueue.poll().getValue());
-                    }
-                  }
-                });
+
+    cleanUpResult = executorService.submit(this::cleanupThreadBody);
+  }
+
+  private void cleanupThreadBody() {
+    cleanupLock.lock();
+    try {
+      while (true) {
+        final @Nullable TimestampedValue<String> minValue = 
cleanUpQueue.peek();
+        if (minValue == null) {
+          // Wait for an element to be added and loop to re-examine the min.
+          queueMinChanged.await();
+          continue;
+        }
+
+        Instant now = Instant.now();
+        Duration timeDifference = new Duration(now, minValue.getTimestamp());
+        if (timeDifference.getMillis() > 0
+            && queueMinChanged.await(timeDifference.getMillis(), 
TimeUnit.MILLISECONDS)) {
+          // If the time didn't elapse, loop to re-examine the min.
+          continue;
+        }
+
+        // The minimum element has an expiry time before now.
+        // It may or may not actually be present in the map if the 
finalization has already been
+        // completed.
+        bundleFinalizationCallbacks.remove(minValue.getValue());

Review Comment:
   Ouch, yeah that's bad. I'm guessing the tests for this class need to be 
improved then too.  I will update and get back to you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to