acrites commented on code in PR #38073:
URL: https://github.com/apache/beam/pull/38073#discussion_r3048337816


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java:
##########
@@ -118,36 +118,43 @@ static StreamingCommitFinalizer 
create(BoundedQueueExecutor workExecutor) {
    * has been successfully committed to the backing state store.
    */
   public void cacheCommitFinalizers(Map<Long, Pair<Instant, Runnable>> 
callbacks) {
+    List<FinalizationInfo> finalizeInfos = new ArrayList<>();
     for (Map.Entry<Long, Pair<Instant, Runnable>> entry : 
callbacks.entrySet()) {
-      Long finalizeId = entry.getKey();
-      final FinalizationInfo info =
+      finalizeInfos.add(
           FinalizationInfo.create(
-              finalizeId, entry.getValue().getLeft(), 
entry.getValue().getRight());
-
+              entry.getKey(), entry.getValue().getLeft(), 
entry.getValue().getRight()));
+    }
+    if (!finalizeInfos.isEmpty()) {
+      boolean shouldStartCleanupThread = false;
       lock.lock();
       try {
-        FinalizationInfo existingInfo = 
commitFinalizationCallbacks.put(finalizeId, info);
-        if (existingInfo != null) {
-          throw new IllegalStateException(
-              "Expected to not have any past callbacks for bundle "
-                  + finalizeId
-                  + " but had "
-                  + existingInfo);
+        for (FinalizationInfo info : finalizeInfos) {
+          FinalizationInfo existingInfo = 
commitFinalizationCallbacks.put(info.getId(), info);
+          if (existingInfo != null) {
+            throw new IllegalStateException(
+                "Expected to not have any past callbacks for bundle "
+                    + info.getId()
+                    + " but had "
+                    + existingInfo);
+          }
+          cleanUpQueue.add(info);
+          @SuppressWarnings("ReferenceEquality")
+          boolean newMin = cleanUpQueue.peek() == info;
+          if (newMin) {
+            queueMinChanged.signal();
+          }
         }
         if (!cleanUpThreadStarted) {
           // Start the cleanup thread lazily for pipelines that don't use 
finalization callbacks
-          // and some tests.
+          // and some tests. Run the thread without the lock held.
           cleanUpThreadStarted = true;
-          finalizationExecutor.execute(this::cleanupThreadBody, 0);
-        }
-        cleanUpQueue.add(info);
-        @SuppressWarnings("ReferenceEquality")
-        boolean newMin = cleanUpQueue.peek() == info;
-        if (newMin) {
-          queueMinChanged.signal();
+          shouldStartCleanupThread = true;
         }
       } finally {
         lock.unlock();
+        if (shouldStartCleanupThread) {
+          finalizationExecutor.forceExecute(this::cleanupThreadBody, 0);

Review Comment:
   I think this is a great idea!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to