arunpandianp commented on code in PR #38592:
URL: https://github.com/apache/beam/pull/38592#discussion_r3332895199


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
     }
   }
 
-  private void executeMonitorHeld(Runnable work, long workBytes) {
+  class BoundedQueueExecutorWorkHandleImpl
+      implements BoundedQueueExecutorWorkHandle, AutoCloseable {
+
+    private int elements;
+    private long bytes;
+    private boolean closed = false;
+
+    private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
+      this.elements = elements;
+      this.bytes = bytes;
+    }
+
+    public synchronized void addBudget(int elements, long bytes) {
+      Preconditions.checkState(!closed, "Cannot add budget to a closed 
WorkBudgetHandle");
+      this.elements += elements;
+      this.bytes += bytes;
+    }
+
+    public synchronized void cancel() {
+      this.closed = true;
+    }
+
+    @Override
+    public synchronized void close() {
+      if (closed) return;
+      closed = true;
+      decrementCounters(this.elements, this.bytes);
+    }
+  }
+
+  private static class QueuedWork implements Runnable {
+
+    private final ExecutableWork work;
+    private final BoundedQueueExecutorWorkHandleImpl handle;
+    private final long workBytes;
+
+    public QueuedWork(
+        ExecutableWork work, BoundedQueueExecutorWorkHandleImpl handle, long 
workBytes) {
+      this.work = work;
+      this.handle = handle;
+      this.workBytes = workBytes;
+    }
+
+    public void cancelHandle() {
+      handle.cancel();
+    }
+
+    public ExecutableWork getWork() {
+      return work;
+    }
+
+    public long getWorkBytes() {
+      return workBytes;
+    }
+
+    @Override
+    public void run() {
+      Preconditions.checkArgument(!handle.closed);
+      try {
+        work.run(handle);
+      } finally {
+        handle.close();
+      }
+    }
+  }
+
+  private void executeMonitorHeld(ExecutableWork work, long workBytes) {
+    ++elementsOutstanding;
     bytesOutstanding += workBytes;
+    monitor.leave();
+    BoundedQueueExecutorWorkHandleImpl handle =
+        new BoundedQueueExecutorWorkHandleImpl(1, workBytes);
+    try {

Review Comment:
   Here we need to close only on the catch block and not on finally. So keeping 
the explicit `.close()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to