acrites commented on code in PR #37723:
URL: https://github.com/apache/beam/pull/37723#discussion_r2874147014


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java:
##########
@@ -17,69 +17,166 @@
  */
 package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
 
-import java.time.Duration;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
 import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import org.apache.commons.lang3.tuple.Pair;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @ThreadSafe
 @Internal
 final class StreamingCommitFinalizer {
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamingCommitFinalizer.class);
-  private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY = 
Duration.ofMinutes(5L);
-  private final Cache<Long, Runnable> commitFinalizerCache;
+
+  /** A {@link Runnable} and expiry time pair. */
+  @AutoValue
+  public abstract static class FinalizationInfo {
+    public abstract Long getId();
+
+    public abstract Instant getExpiryTime();
+
+    public abstract Runnable getCallback();
+
+    public static FinalizationInfo create(Long id, Instant expiryTime, 
Runnable callback) {
+      return new AutoValue_StreamingCommitFinalizer_FinalizationInfo(id, 
expiryTime, callback);
+    }
+  }
+
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition queueMinChanged = lock.newCondition();
+
+  @GuardedBy("lock")
+  private final HashMap<Long, FinalizationInfo> commitFinalizationCallbacks = 
new HashMap<>();
+
+  @GuardedBy("lock")
+  private final PriorityQueue<FinalizationInfo> cleanUpQueue =
+      new PriorityQueue<>(11, 
Comparator.comparing(FinalizationInfo::getExpiryTime));
+
   private final BoundedQueueExecutor finalizationExecutor;
 
-  private StreamingCommitFinalizer(
-      Cache<Long, Runnable> commitFinalizerCache, BoundedQueueExecutor 
finalizationExecutor) {
-    this.commitFinalizerCache = commitFinalizerCache;
-    this.finalizationExecutor = finalizationExecutor;
+  private StreamingCommitFinalizer(BoundedQueueExecutor 
finalizationCleanupExecutor) {
+    finalizationExecutor = finalizationCleanupExecutor;
+    finalizationCleanupExecutor.execute(this::cleanupThreadBody, 0);
+  }
+
+  private void cleanupThreadBody() {
+    lock.lock();
+    try {
+      while (true) {
+        final @Nullable FinalizationInfo minValue = cleanUpQueue.peek();
+        if (minValue == null) {
+          // Wait for an element to be added and loop to re-examine the min.
+          queueMinChanged.await();
+          continue;
+        }
+
+        Instant now = Instant.now();
+        Duration timeDifference = new Duration(now, minValue.getExpiryTime());
+        if (timeDifference.getMillis() < 0
+            || (queueMinChanged.await(timeDifference.getMillis(), 
TimeUnit.MILLISECONDS)
+                && cleanUpQueue.peek() == minValue)) {
+          // The minimum element has an expiry time before now, either because 
it had elapsed when
+          // we pulled it or because we awaited it, and it is still the 
minimum.
+          checkState(minValue == cleanUpQueue.poll());
+          checkState(commitFinalizationCallbacks.remove(minValue.getId()) == 
minValue);
+        }
+      }
+    } catch (InterruptedException e) {
+      // We're being shutdown.
+    } finally {
+      lock.unlock();
+    }
   }
 
   static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) {
-    return new StreamingCommitFinalizer(
-        
CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_ENTRY_EXPIRY).build(),
-        workExecutor);
+    return new StreamingCommitFinalizer(workExecutor);
   }
 
   /**
    * Stores a map of user worker generated finalization ids and callbacks to 
execute once a commit
    * has been successfully committed to the backing state store.
    */
-  void cacheCommitFinalizers(Map<Long, Runnable> commitCallbacks) {
-    commitFinalizerCache.putAll(commitCallbacks);
+  public void cacheCommitFinalizers(Map<Long, Pair<Instant, Runnable>> 
callbacks) {
+    for (Map.Entry<Long, Pair<Instant, Runnable>> entry : 
callbacks.entrySet()) {
+      Long finalizeId = entry.getKey();
+      final FinalizationInfo info =
+          FinalizationInfo.create(
+              finalizeId, entry.getValue().getLeft(), 
entry.getValue().getRight());
+
+      lock.lock();
+      try {
+        FinalizationInfo existingInfo = 
commitFinalizationCallbacks.put(finalizeId, info);
+        if (existingInfo != null) {
+          throw new IllegalStateException(
+              "Expected to not have any past callbacks for bundle "
+                  + finalizeId
+                  + " but had "
+                  + existingInfo);
+        }
+        cleanUpQueue.add(info);
+        @SuppressWarnings("ReferenceEquality")
+        boolean newMin = cleanUpQueue.peek() == info;
+        if (newMin) {
+          queueMinChanged.signal();
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
   }
 
   /**
    * When this method is called, the commits associated with the provided 
finalizeIds have been
    * successfully persisted in the backing state store. If the commitCallback 
for the finalizationId
    * is still cached it is invoked.
    */
-  void finalizeCommits(Iterable<Long> finalizeIds) {
-    for (long finalizeId : finalizeIds) {
-      @Nullable Runnable finalizeCommit = 
commitFinalizerCache.getIfPresent(finalizeId);
-      // NOTE: It is possible the same callback id may be removed twice if
-      // windmill restarts.
-      // TODO: It is also possible for an earlier finalized id to be lost.
-      // We should automatically discard all older callbacks for the same 
computation and key.
-      if (finalizeCommit != null) {
-        commitFinalizerCache.invalidate(finalizeId);
-        finalizationExecutor.forceExecute(
-            () -> {
-              try {
-                finalizeCommit.run();
-              } catch (Throwable t) {
-                LOG.error("Source checkpoint finalization failed:", t);
-              }
-            },
-            0);
+  public void finalizeCommits(Iterable<Long> finalizeIds) {
+    List<Runnable> callbacksToExecute = new ArrayList<>();
+    lock.lock();
+    try {
+      for (long finalizeId : finalizeIds) {
+        @Nullable FinalizationInfo info = 
commitFinalizationCallbacks.remove(finalizeId);
+        if (info != null) {
+          checkState(cleanUpQueue.remove(info));
+          callbacksToExecute.add(info.getCallback());
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+    for (Runnable callback : callbacksToExecute) {
+      try {
+        finalizationExecutor.execute(callback, 0);
+      } catch (Throwable t) {
+        LOG.error("Commit finalization failed:", t);
       }
     }
   }
+
+  // Only exposed for tests.

Review Comment:
   Looks like we can just drop `public`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to