This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e8aea3fb38b [Dataflow Streaming] Reduce contention on work submission
(#33687)
e8aea3fb38b is described below
commit e8aea3fb38bed383cc9f8ed862c99461e0d5a7c1
Author: Arun Pandian <[email protected]>
AuthorDate: Fri Jan 24 02:36:48 2025 -0800
[Dataflow Streaming] Reduce contention on work submission (#33687)
* [Dataflow Streaming] Reduce contention on work submission
---
.../dataflow/worker/util/BoundedQueueExecutor.java | 47 ++++++++++++++++++++--
1 file changed, 43 insertions(+), 4 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
index 9905c0ae5b5..dc611174b7e 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
@@ -17,10 +17,12 @@
*/
package org.apache.beam.runners.dataflow.worker.util;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard;
@@ -36,6 +38,9 @@ public class BoundedQueueExecutor {
// Used to guard elementsOutstanding and bytesOutstanding.
private final Monitor monitor = new Monitor();
+ private final ConcurrentLinkedQueue<Long> decrementQueue = new
ConcurrentLinkedQueue<>();
+ private final Object decrementQueueDrainLock = new Object();
+ private final AtomicBoolean isDecrementBatchPending = new
AtomicBoolean(false);
private int elementsOutstanding = 0;
private long bytesOutstanding = 0;
@@ -236,10 +241,44 @@ public class BoundedQueueExecutor {
}
private void decrementCounters(long workBytes) {
- monitor.enter();
- --elementsOutstanding;
- bytesOutstanding -= workBytes;
- monitor.leave();
+ // All threads queue decrements and one thread grabs the monitor and
updates
+ // counters. We do this to reduce contention on monitor which is locked by
+ // GetWork thread
+ decrementQueue.add(workBytes);
+ boolean submittedToExistingBatch = isDecrementBatchPending.getAndSet(true);
+ if (submittedToExistingBatch) {
+ // There is already a thread about to drain the decrement queue
+ // Current thread does not need to drain.
+ return;
+ }
+ synchronized (decrementQueueDrainLock) {
+ // By setting false here, we may allow another decrement to claim
submission of the next batch
+ // and start waiting on the decrementQueueDrainLock.
+ //
+ // However this prevents races that would leave decrements in the queue
and unclaimed and we
+ // are ensured there is at most one additional thread blocked. This
helps prevent the executor
+ // from creating threads over the limit if many were contending on the
lock while their
+ // decrements were already applied.
+ isDecrementBatchPending.set(false);
+ long bytesToDecrement = 0;
+ int elementsToDecrement = 0;
+ while (true) {
+ Long pollResult = decrementQueue.poll();
+ if (pollResult == null) {
+ break;
+ }
+ bytesToDecrement += pollResult;
+ ++elementsToDecrement;
+ }
+ if (elementsToDecrement == 0) {
+ return;
+ }
+
+ monitor.enter();
+ elementsOutstanding -= elementsToDecrement;
+ bytesOutstanding -= bytesToDecrement;
+ monitor.leave();
+ }
}
private long bytesAvailable() {