arunpandianp commented on code in PR #38073:
URL: https://github.com/apache/beam/pull/38073#discussion_r3042333590


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java:
##########
@@ -118,36 +118,43 @@ static StreamingCommitFinalizer 
create(BoundedQueueExecutor workExecutor) {
    * has been successfully committed to the backing state store.
    */
   public void cacheCommitFinalizers(Map<Long, Pair<Instant, Runnable>> 
callbacks) {
+    List<FinalizationInfo> finalizeInfos = new ArrayList<>();
     for (Map.Entry<Long, Pair<Instant, Runnable>> entry : 
callbacks.entrySet()) {
-      Long finalizeId = entry.getKey();
-      final FinalizationInfo info =
+      finalizeInfos.add(
           FinalizationInfo.create(
-              finalizeId, entry.getValue().getLeft(), 
entry.getValue().getRight());
-
+              entry.getKey(), entry.getValue().getLeft(), 
entry.getValue().getRight()));
+    }
+    if (!finalizeInfos.isEmpty()) {
+      boolean shouldStartCleanupThread = false;
       lock.lock();
       try {
-        FinalizationInfo existingInfo = 
commitFinalizationCallbacks.put(finalizeId, info);
-        if (existingInfo != null) {
-          throw new IllegalStateException(
-              "Expected to not have any past callbacks for bundle "
-                  + finalizeId
-                  + " but had "
-                  + existingInfo);
+        for (FinalizationInfo info : finalizeInfos) {
+          FinalizationInfo existingInfo = 
commitFinalizationCallbacks.put(info.getId(), info);
+          if (existingInfo != null) {
+            throw new IllegalStateException(
+                "Expected to not have any past callbacks for bundle "
+                    + info.getId()
+                    + " but had "
+                    + existingInfo);
+          }
+          cleanUpQueue.add(info);
+          @SuppressWarnings("ReferenceEquality")
+          boolean newMin = cleanUpQueue.peek() == info;
+          if (newMin) {
+            queueMinChanged.signal();
+          }
         }
         if (!cleanUpThreadStarted) {
           // Start the cleanup thread lazily for pipelines that don't use 
finalization callbacks
-          // and some tests.
+          // and some tests. Run the thread without the lock held.
           cleanUpThreadStarted = true;
-          finalizationExecutor.execute(this::cleanupThreadBody, 0);
-        }
-        cleanUpQueue.add(info);
-        @SuppressWarnings("ReferenceEquality")
-        boolean newMin = cleanUpQueue.peek() == info;
-        if (newMin) {
-          queueMinChanged.signal();
+          shouldStartCleanupThread = true;
         }
       } finally {
         lock.unlock();
+        if (shouldStartCleanupThread) {
+          finalizationExecutor.forceExecute(this::cleanupThreadBody, 0);

Review Comment:
   cleanupThreadBody won't ever terminate unless the worker is shutdown right? 
If so, running cleanupThreadBody in finalizationExecutor will take away a 
harness thread. Worker will deadlock when run with harnessThreads set to 1. Can 
we create a separate cleanupThread/threadpool? 
   
   We could use a ScheduledThreadPoolExecutor + the schedule methods and remove 
the custom expiry logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to