scwhittle commented on code in PR #38592:
URL: https://github.com/apache/beam/pull/38592#discussion_r3316419463
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java:
##########
@@ -18,24 +18,29 @@
package org.apache.beam.runners.dataflow.worker.streaming;
import com.google.auto.value.AutoValue;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
+import org.apache.beam.runners.dataflow.worker.util.ExceptionUtils;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
/** {@link Work} instance and a processing function used to process the work.
*/
@AutoValue
Review Comment:
should we remove the autovalue? It is pretty simple class and we run it a
lot, could benefit from just being a final class without virtual function
overhead
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
+ implements BoundedQueueExecutorWorkHandle, AutoCloseable {
+
+ private int elements;
+ private long bytes;
+ private boolean closed = false;
+
+ private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
+ this.elements = elements;
+ this.bytes = bytes;
+ }
+
+ public synchronized void addBudget(int elements, long bytes) {
+ Preconditions.checkState(!closed, "Cannot add budget to a closed
WorkBudgetHandle");
+ this.elements += elements;
Review Comment:
should we incrementCounters here to match decrementCounters in close?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
Review Comment:
add a comment
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java:
##########
@@ -18,24 +18,29 @@
package org.apache.beam.runners.dataflow.worker.streaming;
import com.google.auto.value.AutoValue;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
+import org.apache.beam.runners.dataflow.worker.util.ExceptionUtils;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
/** {@link Work} instance and a processing function used to process the work.
*/
@AutoValue
-public abstract class ExecutableWork implements Runnable {
+public abstract class ExecutableWork {
- public static ExecutableWork create(Work work, Consumer<Work> executeWorkFn)
{
+ public static ExecutableWork create(
+ Work work, BiConsumer<Work, BoundedQueueExecutorWorkHandle>
executeWorkFn) {
Review Comment:
comment on executeWorkFn and what handle is
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
+ implements BoundedQueueExecutorWorkHandle, AutoCloseable {
+
+ private int elements;
+ private long bytes;
+ private boolean closed = false;
+
+ private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
+ this.elements = elements;
+ this.bytes = bytes;
+ }
+
+ public synchronized void addBudget(int elements, long bytes) {
+ Preconditions.checkState(!closed, "Cannot add budget to a closed
WorkBudgetHandle");
+ this.elements += elements;
+ this.bytes += bytes;
+ }
+
+ public synchronized void cancel() {
+ this.closed = true;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (closed) return;
+ closed = true;
+ decrementCounters(this.elements, this.bytes);
+ }
+ }
+
+ private static class QueuedWork implements Runnable {
+
+ private final ExecutableWork work;
+ private final BoundedQueueExecutorWorkHandleImpl handle;
+ private final long workBytes;
+
+ public QueuedWork(
+ ExecutableWork work, BoundedQueueExecutorWorkHandleImpl handle, long
workBytes) {
+ this.work = work;
+ this.handle = handle;
+ this.workBytes = workBytes;
+ }
+
+ public void cancelHandle() {
Review Comment:
comment
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
+ implements BoundedQueueExecutorWorkHandle, AutoCloseable {
+
+ private int elements;
+ private long bytes;
+ private boolean closed = false;
+
+ private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
+ this.elements = elements;
+ this.bytes = bytes;
+ }
+
+ public synchronized void addBudget(int elements, long bytes) {
+ Preconditions.checkState(!closed, "Cannot add budget to a closed
WorkBudgetHandle");
+ this.elements += elements;
+ this.bytes += bytes;
+ }
+
+ public synchronized void cancel() {
+ this.closed = true;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (closed) return;
+ closed = true;
+ decrementCounters(this.elements, this.bytes);
+ }
+ }
+
+ private static class QueuedWork implements Runnable {
Review Comment:
final?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
+ implements BoundedQueueExecutorWorkHandle, AutoCloseable {
+
+ private int elements;
+ private long bytes;
+ private boolean closed = false;
+
+ private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
+ this.elements = elements;
+ this.bytes = bytes;
+ }
+
+ public synchronized void addBudget(int elements, long bytes) {
+ Preconditions.checkState(!closed, "Cannot add budget to a closed
WorkBudgetHandle");
+ this.elements += elements;
+ this.bytes += bytes;
+ }
+
+ public synchronized void cancel() {
+ this.closed = true;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (closed) return;
+ closed = true;
+ decrementCounters(this.elements, this.bytes);
+ }
+ }
+
+ private static class QueuedWork implements Runnable {
+
+ private final ExecutableWork work;
+ private final BoundedQueueExecutorWorkHandleImpl handle;
+ private final long workBytes;
+
+ public QueuedWork(
+ ExecutableWork work, BoundedQueueExecutorWorkHandleImpl handle, long
workBytes) {
+ this.work = work;
+ this.handle = handle;
+ this.workBytes = workBytes;
+ }
+
+ public void cancelHandle() {
+ handle.cancel();
+ }
+
+ public ExecutableWork getWork() {
+ return work;
+ }
+
+ public long getWorkBytes() {
+ return workBytes;
+ }
+
+ @Override
+ public void run() {
+ Preconditions.checkArgument(!handle.closed);
+ try {
+ work.run(handle);
+ } finally {
+ handle.close();
+ }
+ }
+ }
+
+ private void executeMonitorHeld(ExecutableWork work, long workBytes) {
+ ++elementsOutstanding;
bytesOutstanding += workBytes;
+ monitor.leave();
+ BoundedQueueExecutorWorkHandleImpl handle =
+ new BoundedQueueExecutorWorkHandleImpl(1, workBytes);
+ try {
Review Comment:
use
try (BoundedQueueExecutorWorkHandleImpl handle = new
BoundedQueueExecutorWorkHandleImpl(1, workBytes)) {
}
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -232,21 +334,48 @@ private void executeMonitorHeld(Runnable work, long
workBytes) {
try {
work.run();
} finally {
- decrementCounters(workBytes);
+ decrementCounters(1, 0L);
}
});
- } catch (RuntimeException e) {
- // If the execute() call threw an exception, decrement counters here.
- decrementCounters(workBytes);
- throw e;
+ } catch (Throwable e) {
+ decrementCounters(1, 0L);
+ throw ExceptionUtils.propagate(e);
+ }
+ }
+
+ @VisibleForTesting
+ BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() {
+ return new BoundedQueueExecutorWorkHandleImpl(0, 0L);
+ }
+
+ /**
+ * Poll additional work to be executed inline inside with the current
execute(ExecutableWork work,
+ * long workBytes) call. It is the responsibility of the caller to execute
or discard the returned
+ * ExecutableWork. Budget for the returned work is released when the
execute() call finishes.
+ *
+ * @param handle the handle that was passed to ExecutableWork.executeWorkFn
+ */
+ public Optional<ExecutableWork> pollWork(BoundedQueueExecutorWorkHandle
handle) {
+ Preconditions.checkArgument(handle instanceof
BoundedQueueExecutorWorkHandleImpl);
+ BoundedQueueExecutorWorkHandleImpl internalHandle =
(BoundedQueueExecutorWorkHandleImpl) handle;
+ while (true) {
+ Runnable runnable = executor.getQueue().poll();
+ if (runnable == null) {
+ return Optional.empty();
+ }
+ if (runnable instanceof QueuedWork) {
+ QueuedWork queuedWork = (QueuedWork) runnable;
+ queuedWork.cancelHandle();
Review Comment:
if this is the only use of cancel/addBudget, you could instead have way for
a handle to consume/merge with another
I think that more limited API makes it clearer that the budget is already
accounted to answering some of my questions above.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -232,21 +334,48 @@ private void executeMonitorHeld(Runnable work, long
workBytes) {
try {
work.run();
} finally {
- decrementCounters(workBytes);
+ decrementCounters(1, 0L);
}
});
- } catch (RuntimeException e) {
- // If the execute() call threw an exception, decrement counters here.
- decrementCounters(workBytes);
- throw e;
+ } catch (Throwable e) {
+ decrementCounters(1, 0L);
+ throw ExceptionUtils.propagate(e);
+ }
+ }
+
+ @VisibleForTesting
+ BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() {
+ return new BoundedQueueExecutorWorkHandleImpl(0, 0L);
+ }
+
+ /**
+ * Poll additional work to be executed inline inside with the current
execute(ExecutableWork work,
+ * long workBytes) call. It is the responsibility of the caller to execute
or discard the returned
+ * ExecutableWork. Budget for the returned work is released when the
execute() call finishes.
+ *
+ * @param handle the handle that was passed to ExecutableWork.executeWorkFn
+ */
+ public Optional<ExecutableWork> pollWork(BoundedQueueExecutorWorkHandle
handle) {
+ Preconditions.checkArgument(handle instanceof
BoundedQueueExecutorWorkHandleImpl);
+ BoundedQueueExecutorWorkHandleImpl internalHandle =
(BoundedQueueExecutorWorkHandleImpl) handle;
+ while (true) {
+ Runnable runnable = executor.getQueue().poll();
+ if (runnable == null) {
+ return Optional.empty();
+ }
+ if (runnable instanceof QueuedWork) {
+ QueuedWork queuedWork = (QueuedWork) runnable;
+ queuedWork.cancelHandle();
+ internalHandle.addBudget(1, queuedWork.getWorkBytes());
+ return Optional.of(queuedWork.getWork());
+ }
+ // Pop and execute standard callbacks immediately on the calling thread
to drain the queue
Review Comment:
what are the standard callbacks? It seems like this could be undesirable to
pause execution of a work item to try to merge stuff and then end up blocking
on some other callback for potentially a long time.
Other ways to handle this could be:
- don't remove the runnable and just stop iterating here
- pull off runnables and put them back (if they are rare)
- have separate queues
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -351,6 +356,176 @@ public void
testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsReduced(
executor.shutdown();
}
+ @Test
+ public void testRunnableExceptionPropagationDecrementsCounters() throws
Exception {
+ CountDownLatch processStart = new CountDownLatch(1);
+ CountDownLatch processStop = new CountDownLatch(1);
+
+ Runnable work =
+ () -> {
+ processStart.countDown();
+ try {
+ processStop.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ throw new RuntimeException("Simulated finalizer processing
exception");
+ };
+
+ executor.forceExecute(work);
+ processStart.await();
+
+ assertEquals(1, executor.elementsOutstanding());
+
+ processStop.countDown();
+
+ // Wait until outstanding elements are released
+ while (executor.elementsOutstanding() != 0) {
+ Thread.sleep(10);
+ }
+
+ assertEquals(0, executor.elementsOutstanding());
+ }
+
+ @Test
+ public void testPollWorkAndInlineBatchExecution() throws Exception {
+ BoundedQueueExecutor testExecutor =
+ new BoundedQueueExecutor(
+ 1,
+ DEFAULT_THREAD_EXPIRATION_SEC,
+ TimeUnit.SECONDS,
+ 10,
+ MAXIMUM_BYTES_OUTSTANDING,
+ new ThreadFactoryBuilder()
+ .setNameFormat("testPollWorkAndInlineBatchExecution-%d")
+ .setDaemon(true)
+ .build(),
+ useFairMonitor);
+
+ CountDownLatch blockerStart = new CountDownLatch(1);
+ CountDownLatch blockerStop = new CountDownLatch(1);
+ ExecutableWork blockerWork = createSleepProcessWork(blockerStart,
blockerStop);
+
+ CountDownLatch start1 = new CountDownLatch(1);
+ CountDownLatch stop1 = new CountDownLatch(1);
+ ExecutableWork m1 = createSleepProcessWork(start1, stop1);
+
+ CountDownLatch start2 = new CountDownLatch(1);
+ CountDownLatch stop2 = new CountDownLatch(1);
+ ExecutableWork m2 = createSleepProcessWork(start2, stop2);
+
+ // 1. Occupy the single worker thread with blocker work so subsequent
tasks remain queued.
+ testExecutor.execute(blockerWork, 0);
+ blockerStart.await();
+ assertEquals(1, testExecutor.elementsOutstanding());
+ assertEquals(0, testExecutor.bytesOutstanding());
+
+ // 2. Enqueue tasks to stay in the queue.
+ testExecutor.execute(m1, 1000);
+ testExecutor.execute(m2, 2000);
+
+ assertEquals(3, testExecutor.elementsOutstanding());
+ assertEquals(3000, testExecutor.bytesOutstanding());
+
+ // 3. Create the batch handle.
+ try (BoundedQueueExecutorWorkHandleImpl batchHandle =
testExecutor.createEmptyBudgetHandle()) {
+ // 4. Poll tasks inline.
+ Optional<ExecutableWork> polled1 = testExecutor.pollWork(batchHandle);
+ assertTrue(polled1.isPresent());
+ assertEquals(m1, polled1.get());
+
+ Optional<ExecutableWork> polled2 = testExecutor.pollWork(batchHandle);
+ assertTrue(polled2.isPresent());
+ assertEquals(m2, polled2.get());
+
+ // Queue should now be empty.
+ Optional<ExecutableWork> polled3 = testExecutor.pollWork(batchHandle);
+ assertFalse(polled3.isPresent());
+
+ // 5. Run polled tasks inline.
+ start1.countDown();
+ stop1.countDown();
+ polled1.get().run(batchHandle);
+
+ start2.countDown();
+ stop2.countDown();
+ polled2.get().run(batchHandle);
+
+ // Outstanding counts should NOT yet be decremented.
+ assertEquals(3, testExecutor.elementsOutstanding());
+ assertEquals(3000, testExecutor.bytesOutstanding());
+ }
+
+ // 6. Upon close, outstanding counts should immediately reflect the batch
decrement in one shot.
Review Comment:
should we just unblock the blocker and wait for zeros?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -232,21 +334,48 @@ private void executeMonitorHeld(Runnable work, long
workBytes) {
try {
work.run();
} finally {
- decrementCounters(workBytes);
+ decrementCounters(1, 0L);
}
});
- } catch (RuntimeException e) {
- // If the execute() call threw an exception, decrement counters here.
- decrementCounters(workBytes);
- throw e;
+ } catch (Throwable e) {
+ decrementCounters(1, 0L);
+ throw ExceptionUtils.propagate(e);
+ }
+ }
+
+ @VisibleForTesting
+ BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() {
+ return new BoundedQueueExecutorWorkHandleImpl(0, 0L);
+ }
+
+ /**
+ * Poll additional work to be executed inline inside with the current
execute(ExecutableWork work,
+ * long workBytes) call. It is the responsibility of the caller to execute
or discard the returned
+ * ExecutableWork. Budget for the returned work is released when the
execute() call finishes.
+ *
+ * @param handle the handle that was passed to ExecutableWork.executeWorkFn
+ */
+ public Optional<ExecutableWork> pollWork(BoundedQueueExecutorWorkHandle
handle) {
+ Preconditions.checkArgument(handle instanceof
BoundedQueueExecutorWorkHandleImpl);
+ BoundedQueueExecutorWorkHandleImpl internalHandle =
(BoundedQueueExecutorWorkHandleImpl) handle;
+ while (true) {
+ Runnable runnable = executor.getQueue().poll();
+ if (runnable == null) {
+ return Optional.empty();
+ }
+ if (runnable instanceof QueuedWork) {
+ QueuedWork queuedWork = (QueuedWork) runnable;
+ queuedWork.cancelHandle();
+ internalHandle.addBudget(1, queuedWork.getWorkBytes());
+ return Optional.of(queuedWork.getWork());
+ }
+ // Pop and execute standard callbacks immediately on the calling thread
to drain the queue
+ runnable.run();
Review Comment:
if you do run it here we need to think about how to handle the possible
exception as well.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
+ implements BoundedQueueExecutorWorkHandle, AutoCloseable {
+
+ private int elements;
+ private long bytes;
+ private boolean closed = false;
+
+ private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
+ this.elements = elements;
+ this.bytes = bytes;
+ }
+
+ public synchronized void addBudget(int elements, long bytes) {
+ Preconditions.checkState(!closed, "Cannot add budget to a closed
WorkBudgetHandle");
+ this.elements += elements;
+ this.bytes += bytes;
+ }
+
+ public synchronized void cancel() {
+ this.closed = true;
Review Comment:
see below comment on merge function first
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
+ implements BoundedQueueExecutorWorkHandle, AutoCloseable {
+
+ private int elements;
Review Comment:
add GuardedBy("this") annotations to these
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
+ implements BoundedQueueExecutorWorkHandle, AutoCloseable {
+
+ private int elements;
+ private long bytes;
+ private boolean closed = false;
+
+ private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
+ this.elements = elements;
+ this.bytes = bytes;
+ }
+
+ public synchronized void addBudget(int elements, long bytes) {
Review Comment:
should we enforce elements and bytes being non-negative?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
+ implements BoundedQueueExecutorWorkHandle, AutoCloseable {
+
+ private int elements;
+ private long bytes;
+ private boolean closed = false;
+
+ private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
+ this.elements = elements;
+ this.bytes = bytes;
+ }
+
+ public synchronized void addBudget(int elements, long bytes) {
+ Preconditions.checkState(!closed, "Cannot add budget to a closed
WorkBudgetHandle");
+ this.elements += elements;
+ this.bytes += bytes;
+ }
+
+ public synchronized void cancel() {
+ this.closed = true;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (closed) return;
+ closed = true;
+ decrementCounters(this.elements, this.bytes);
+ }
+ }
+
+ private static class QueuedWork implements Runnable {
+
+ private final ExecutableWork work;
+ private final BoundedQueueExecutorWorkHandleImpl handle;
+ private final long workBytes;
+
+ public QueuedWork(
+ ExecutableWork work, BoundedQueueExecutorWorkHandleImpl handle, long
workBytes) {
+ this.work = work;
+ this.handle = handle;
+ this.workBytes = workBytes;
+ }
+
+ public void cancelHandle() {
+ handle.cancel();
+ }
+
+ public ExecutableWork getWork() {
+ return work;
+ }
+
+ public long getWorkBytes() {
+ return workBytes;
+ }
+
+ @Override
+ public void run() {
+ Preconditions.checkArgument(!handle.closed);
+ try {
Review Comment:
since it's autocloseable could you do
try (handle) {
work.run(handle);
}
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -232,21 +334,48 @@ private void executeMonitorHeld(Runnable work, long
workBytes) {
try {
work.run();
} finally {
- decrementCounters(workBytes);
+ decrementCounters(1, 0L);
}
});
- } catch (RuntimeException e) {
- // If the execute() call threw an exception, decrement counters here.
- decrementCounters(workBytes);
- throw e;
+ } catch (Throwable e) {
+ decrementCounters(1, 0L);
+ throw ExceptionUtils.propagate(e);
+ }
+ }
+
+ @VisibleForTesting
+ BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() {
+ return new BoundedQueueExecutorWorkHandleImpl(0, 0L);
+ }
+
+ /**
+ * Poll additional work to be executed inline inside with the current
execute(ExecutableWork work,
+ * long workBytes) call. It is the responsibility of the caller to execute
or discard the returned
Review Comment:
what does discard mean?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
Review Comment:
final?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
+ implements BoundedQueueExecutorWorkHandle, AutoCloseable {
+
+ private int elements;
+ private long bytes;
+ private boolean closed = false;
+
+ private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
+ this.elements = elements;
+ this.bytes = bytes;
+ }
+
+ public synchronized void addBudget(int elements, long bytes) {
+ Preconditions.checkState(!closed, "Cannot add budget to a closed
WorkBudgetHandle");
+ this.elements += elements;
+ this.bytes += bytes;
+ }
+
+ public synchronized void cancel() {
+ this.closed = true;
Review Comment:
should we call close() instead? seems like we are going to not decrement the
counters properly
If this is intended I think it could have better method name and comments
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
+ implements BoundedQueueExecutorWorkHandle, AutoCloseable {
+
+ private int elements;
+ private long bytes;
+ private boolean closed = false;
+
+ private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
+ this.elements = elements;
+ this.bytes = bytes;
+ }
+
+ public synchronized void addBudget(int elements, long bytes) {
+ Preconditions.checkState(!closed, "Cannot add budget to a closed
WorkBudgetHandle");
+ this.elements += elements;
Review Comment:
see below comment on merge function first
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -232,21 +334,48 @@ private void executeMonitorHeld(Runnable work, long
workBytes) {
try {
work.run();
} finally {
- decrementCounters(workBytes);
+ decrementCounters(1, 0L);
}
});
- } catch (RuntimeException e) {
- // If the execute() call threw an exception, decrement counters here.
- decrementCounters(workBytes);
- throw e;
+ } catch (Throwable e) {
+ decrementCounters(1, 0L);
+ throw ExceptionUtils.propagate(e);
+ }
+ }
+
+ @VisibleForTesting
+ BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() {
+ return new BoundedQueueExecutorWorkHandleImpl(0, 0L);
+ }
+
+ /**
+ * Poll additional work to be executed inline inside with the current
execute(ExecutableWork work,
+ * long workBytes) call. It is the responsibility of the caller to execute
or discard the returned
+ * ExecutableWork. Budget for the returned work is released when the
execute() call finishes.
+ *
+ * @param handle the handle that was passed to ExecutableWork.executeWorkFn
+ */
+ public Optional<ExecutableWork> pollWork(BoundedQueueExecutorWorkHandle
handle) {
+ Preconditions.checkArgument(handle instanceof
BoundedQueueExecutorWorkHandleImpl);
+ BoundedQueueExecutorWorkHandleImpl internalHandle =
(BoundedQueueExecutorWorkHandleImpl) handle;
+ while (true) {
+ Runnable runnable = executor.getQueue().poll();
+ if (runnable == null) {
+ return Optional.empty();
+ }
+ if (runnable instanceof QueuedWork) {
+ QueuedWork queuedWork = (QueuedWork) runnable;
+ queuedWork.cancelHandle();
+ internalHandle.addBudget(1, queuedWork.getWorkBytes());
+ return Optional.of(queuedWork.getWork());
+ }
+ // Pop and execute standard callbacks immediately on the calling thread
to drain the queue
+ runnable.run();
}
}
- private void decrementCounters(long workBytes) {
- // All threads queue decrements and one thread grabs the monitor and
updates
Review Comment:
keep comment?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]