arunpandianp commented on code in PR #38592:
URL: https://github.com/apache/beam/pull/38592#discussion_r3332871554
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java:
##########
@@ -18,24 +18,29 @@
package org.apache.beam.runners.dataflow.worker.streaming;
import com.google.auto.value.AutoValue;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
+import org.apache.beam.runners.dataflow.worker.util.ExceptionUtils;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
/** {@link Work} instance and a processing function used to process the work.
*/
@AutoValue
Review Comment:
Done.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ExecutableWork.java:
##########
@@ -18,24 +18,29 @@
package org.apache.beam.runners.dataflow.worker.streaming;
import com.google.auto.value.AutoValue;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
+import org.apache.beam.runners.dataflow.worker.util.ExceptionUtils;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
/** {@link Work} instance and a processing function used to process the work.
*/
@AutoValue
-public abstract class ExecutableWork implements Runnable {
+public abstract class ExecutableWork {
- public static ExecutableWork create(Work work, Consumer<Work> executeWorkFn)
{
+ public static ExecutableWork create(
+ Work work, BiConsumer<Work, BoundedQueueExecutorWorkHandle>
executeWorkFn) {
Review Comment:
Done.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
+ implements BoundedQueueExecutorWorkHandle, AutoCloseable {
+
+ private int elements;
+ private long bytes;
+ private boolean closed = false;
+
+ private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
+ this.elements = elements;
+ this.bytes = bytes;
+ }
+
+ public synchronized void addBudget(int elements, long bytes) {
+ Preconditions.checkState(!closed, "Cannot add budget to a closed
WorkBudgetHandle");
+ this.elements += elements;
Review Comment:
Added merge function.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
Review Comment:
Done.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
+ implements BoundedQueueExecutorWorkHandle, AutoCloseable {
+
+ private int elements;
+ private long bytes;
+ private boolean closed = false;
+
+ private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
+ this.elements = elements;
+ this.bytes = bytes;
+ }
+
+ public synchronized void addBudget(int elements, long bytes) {
+ Preconditions.checkState(!closed, "Cannot add budget to a closed
WorkBudgetHandle");
+ this.elements += elements;
+ this.bytes += bytes;
+ }
+
+ public synchronized void cancel() {
+ this.closed = true;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (closed) return;
+ closed = true;
+ decrementCounters(this.elements, this.bytes);
+ }
+ }
+
+ private static class QueuedWork implements Runnable {
+
+ private final ExecutableWork work;
+ private final BoundedQueueExecutorWorkHandleImpl handle;
+ private final long workBytes;
+
+ public QueuedWork(
+ ExecutableWork work, BoundedQueueExecutorWorkHandleImpl handle, long
workBytes) {
+ this.work = work;
+ this.handle = handle;
+ this.workBytes = workBytes;
+ }
+
+ public void cancelHandle() {
Review Comment:
Removed after adding merge.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
+ implements BoundedQueueExecutorWorkHandle, AutoCloseable {
+
+ private int elements;
+ private long bytes;
+ private boolean closed = false;
+
+ private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
+ this.elements = elements;
+ this.bytes = bytes;
+ }
+
+ public synchronized void addBudget(int elements, long bytes) {
+ Preconditions.checkState(!closed, "Cannot add budget to a closed
WorkBudgetHandle");
+ this.elements += elements;
+ this.bytes += bytes;
+ }
+
+ public synchronized void cancel() {
+ this.closed = true;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (closed) return;
+ closed = true;
+ decrementCounters(this.elements, this.bytes);
+ }
+ }
+
+ private static class QueuedWork implements Runnable {
+
+ private final ExecutableWork work;
+ private final BoundedQueueExecutorWorkHandleImpl handle;
+ private final long workBytes;
+
+ public QueuedWork(
+ ExecutableWork work, BoundedQueueExecutorWorkHandleImpl handle, long
workBytes) {
+ this.work = work;
+ this.handle = handle;
+ this.workBytes = workBytes;
+ }
+
+ public void cancelHandle() {
+ handle.cancel();
+ }
+
+ public ExecutableWork getWork() {
+ return work;
+ }
+
+ public long getWorkBytes() {
+ return workBytes;
+ }
+
+ @Override
+ public void run() {
+ Preconditions.checkArgument(!handle.closed);
+ try {
Review Comment:
Done.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -232,21 +334,48 @@ private void executeMonitorHeld(Runnable work, long
workBytes) {
try {
work.run();
} finally {
- decrementCounters(workBytes);
+ decrementCounters(1, 0L);
}
});
- } catch (RuntimeException e) {
- // If the execute() call threw an exception, decrement counters here.
- decrementCounters(workBytes);
- throw e;
+ } catch (Throwable e) {
+ decrementCounters(1, 0L);
+ throw ExceptionUtils.propagate(e);
+ }
+ }
+
+ @VisibleForTesting
+ BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() {
+ return new BoundedQueueExecutorWorkHandleImpl(0, 0L);
+ }
+
+ /**
+ * Poll additional work to be executed inline inside with the current
execute(ExecutableWork work,
+ * long workBytes) call. It is the responsibility of the caller to execute
or discard the returned
Review Comment:
I was thinking about failed work, removed discard to keep it simple.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -351,6 +356,176 @@ public void
testRecordTotalTimeMaxActiveThreadsUsedWhenMaximumPoolSizeIsReduced(
executor.shutdown();
}
+ @Test
+ public void testRunnableExceptionPropagationDecrementsCounters() throws
Exception {
+ CountDownLatch processStart = new CountDownLatch(1);
+ CountDownLatch processStop = new CountDownLatch(1);
+
+ Runnable work =
+ () -> {
+ processStart.countDown();
+ try {
+ processStop.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ throw new RuntimeException("Simulated finalizer processing
exception");
+ };
+
+ executor.forceExecute(work);
+ processStart.await();
+
+ assertEquals(1, executor.elementsOutstanding());
+
+ processStop.countDown();
+
+ // Wait until outstanding elements are released
+ while (executor.elementsOutstanding() != 0) {
+ Thread.sleep(10);
+ }
+
+ assertEquals(0, executor.elementsOutstanding());
+ }
+
+ @Test
+ public void testPollWorkAndInlineBatchExecution() throws Exception {
+ BoundedQueueExecutor testExecutor =
+ new BoundedQueueExecutor(
+ 1,
+ DEFAULT_THREAD_EXPIRATION_SEC,
+ TimeUnit.SECONDS,
+ 10,
+ MAXIMUM_BYTES_OUTSTANDING,
+ new ThreadFactoryBuilder()
+ .setNameFormat("testPollWorkAndInlineBatchExecution-%d")
+ .setDaemon(true)
+ .build(),
+ useFairMonitor);
+
+ CountDownLatch blockerStart = new CountDownLatch(1);
+ CountDownLatch blockerStop = new CountDownLatch(1);
+ ExecutableWork blockerWork = createSleepProcessWork(blockerStart,
blockerStop);
+
+ CountDownLatch start1 = new CountDownLatch(1);
+ CountDownLatch stop1 = new CountDownLatch(1);
+ ExecutableWork m1 = createSleepProcessWork(start1, stop1);
+
+ CountDownLatch start2 = new CountDownLatch(1);
+ CountDownLatch stop2 = new CountDownLatch(1);
+ ExecutableWork m2 = createSleepProcessWork(start2, stop2);
+
+ // 1. Occupy the single worker thread with blocker work so subsequent
tasks remain queued.
+ testExecutor.execute(blockerWork, 0);
+ blockerStart.await();
+ assertEquals(1, testExecutor.elementsOutstanding());
+ assertEquals(0, testExecutor.bytesOutstanding());
+
+ // 2. Enqueue tasks to stay in the queue.
+ testExecutor.execute(m1, 1000);
+ testExecutor.execute(m2, 2000);
+
+ assertEquals(3, testExecutor.elementsOutstanding());
+ assertEquals(3000, testExecutor.bytesOutstanding());
+
+ // 3. Create the batch handle.
+ try (BoundedQueueExecutorWorkHandleImpl batchHandle =
testExecutor.createEmptyBudgetHandle()) {
+ // 4. Poll tasks inline.
+ Optional<ExecutableWork> polled1 = testExecutor.pollWork(batchHandle);
+ assertTrue(polled1.isPresent());
+ assertEquals(m1, polled1.get());
+
+ Optional<ExecutableWork> polled2 = testExecutor.pollWork(batchHandle);
+ assertTrue(polled2.isPresent());
+ assertEquals(m2, polled2.get());
+
+ // Queue should now be empty.
+ Optional<ExecutableWork> polled3 = testExecutor.pollWork(batchHandle);
+ assertFalse(polled3.isPresent());
+
+ // 5. Run polled tasks inline.
+ start1.countDown();
+ stop1.countDown();
+ polled1.get().run(batchHandle);
+
+ start2.countDown();
+ stop2.countDown();
+ polled2.get().run(batchHandle);
+
+ // Outstanding counts should NOT yet be decremented.
+ assertEquals(3, testExecutor.elementsOutstanding());
+ assertEquals(3000, testExecutor.bytesOutstanding());
+ }
+
+ // 6. Upon close, outstanding counts should immediately reflect the batch
decrement in one shot.
Review Comment:
removed related code.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -232,21 +334,48 @@ private void executeMonitorHeld(Runnable work, long
workBytes) {
try {
work.run();
} finally {
- decrementCounters(workBytes);
+ decrementCounters(1, 0L);
}
});
- } catch (RuntimeException e) {
- // If the execute() call threw an exception, decrement counters here.
- decrementCounters(workBytes);
- throw e;
+ } catch (Throwable e) {
+ decrementCounters(1, 0L);
+ throw ExceptionUtils.propagate(e);
+ }
+ }
+
+ @VisibleForTesting
+ BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() {
+ return new BoundedQueueExecutorWorkHandleImpl(0, 0L);
+ }
+
+ /**
+ * Poll additional work to be executed inline inside with the current
execute(ExecutableWork work,
+ * long workBytes) call. It is the responsibility of the caller to execute
or discard the returned
+ * ExecutableWork. Budget for the returned work is released when the
execute() call finishes.
+ *
+ * @param handle the handle that was passed to ExecutableWork.executeWorkFn
+ */
+ public Optional<ExecutableWork> pollWork(BoundedQueueExecutorWorkHandle
handle) {
+ Preconditions.checkArgument(handle instanceof
BoundedQueueExecutorWorkHandleImpl);
+ BoundedQueueExecutorWorkHandleImpl internalHandle =
(BoundedQueueExecutorWorkHandleImpl) handle;
+ while (true) {
+ Runnable runnable = executor.getQueue().poll();
+ if (runnable == null) {
+ return Optional.empty();
+ }
+ if (runnable instanceof QueuedWork) {
+ QueuedWork queuedWork = (QueuedWork) runnable;
+ queuedWork.cancelHandle();
+ internalHandle.addBudget(1, queuedWork.getWorkBytes());
+ return Optional.of(queuedWork.getWork());
+ }
+ // Pop and execute standard callbacks immediately on the calling thread
to drain the queue
+ runnable.run();
Review Comment:
removed related code.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
+ implements BoundedQueueExecutorWorkHandle, AutoCloseable {
+
+ private int elements;
+ private long bytes;
+ private boolean closed = false;
+
+ private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
+ this.elements = elements;
+ this.bytes = bytes;
+ }
+
+ public synchronized void addBudget(int elements, long bytes) {
Review Comment:
removed addBudget and added merge.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -232,21 +334,48 @@ private void executeMonitorHeld(Runnable work, long
workBytes) {
try {
work.run();
} finally {
- decrementCounters(workBytes);
+ decrementCounters(1, 0L);
}
});
- } catch (RuntimeException e) {
- // If the execute() call threw an exception, decrement counters here.
- decrementCounters(workBytes);
- throw e;
+ } catch (Throwable e) {
+ decrementCounters(1, 0L);
+ throw ExceptionUtils.propagate(e);
+ }
+ }
+
+ @VisibleForTesting
+ BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() {
+ return new BoundedQueueExecutorWorkHandleImpl(0, 0L);
+ }
+
+ /**
+ * Poll additional work to be executed inline inside with the current
execute(ExecutableWork work,
+ * long workBytes) call. It is the responsibility of the caller to execute
or discard the returned
+ * ExecutableWork. Budget for the returned work is released when the
execute() call finishes.
+ *
+ * @param handle the handle that was passed to ExecutableWork.executeWorkFn
+ */
+ public Optional<ExecutableWork> pollWork(BoundedQueueExecutorWorkHandle
handle) {
+ Preconditions.checkArgument(handle instanceof
BoundedQueueExecutorWorkHandleImpl);
+ BoundedQueueExecutorWorkHandleImpl internalHandle =
(BoundedQueueExecutorWorkHandleImpl) handle;
+ while (true) {
+ Runnable runnable = executor.getQueue().poll();
+ if (runnable == null) {
+ return Optional.empty();
+ }
+ if (runnable instanceof QueuedWork) {
+ QueuedWork queuedWork = (QueuedWork) runnable;
+ queuedWork.cancelHandle();
+ internalHandle.addBudget(1, queuedWork.getWorkBytes());
+ return Optional.of(queuedWork.getWork());
+ }
+ // Pop and execute standard callbacks immediately on the calling thread
to drain the queue
+ runnable.run();
}
}
- private void decrementCounters(long workBytes) {
- // All threads queue decrements and one thread grabs the monitor and
updates
Review Comment:
Done.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -232,21 +334,48 @@ private void executeMonitorHeld(Runnable work, long
workBytes) {
try {
work.run();
} finally {
- decrementCounters(workBytes);
+ decrementCounters(1, 0L);
}
});
- } catch (RuntimeException e) {
- // If the execute() call threw an exception, decrement counters here.
- decrementCounters(workBytes);
- throw e;
+ } catch (Throwable e) {
+ decrementCounters(1, 0L);
+ throw ExceptionUtils.propagate(e);
+ }
+ }
+
+ @VisibleForTesting
+ BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() {
+ return new BoundedQueueExecutorWorkHandleImpl(0, 0L);
+ }
+
+ /**
+ * Poll additional work to be executed inline inside with the current
execute(ExecutableWork work,
+ * long workBytes) call. It is the responsibility of the caller to execute
or discard the returned
+ * ExecutableWork. Budget for the returned work is released when the
execute() call finishes.
+ *
+ * @param handle the handle that was passed to ExecutableWork.executeWorkFn
+ */
+ public Optional<ExecutableWork> pollWork(BoundedQueueExecutorWorkHandle
handle) {
+ Preconditions.checkArgument(handle instanceof
BoundedQueueExecutorWorkHandleImpl);
+ BoundedQueueExecutorWorkHandleImpl internalHandle =
(BoundedQueueExecutorWorkHandleImpl) handle;
+ while (true) {
+ Runnable runnable = executor.getQueue().poll();
+ if (runnable == null) {
+ return Optional.empty();
+ }
+ if (runnable instanceof QueuedWork) {
+ QueuedWork queuedWork = (QueuedWork) runnable;
+ queuedWork.cancelHandle();
+ internalHandle.addBudget(1, queuedWork.getWorkBytes());
+ return Optional.of(queuedWork.getWork());
+ }
+ // Pop and execute standard callbacks immediately on the calling thread
to drain the queue
Review Comment:
I removed the pollWork method in this PR. Will add it in future PRs.
Planning to change the queue to have it internally track queuedWork by key
group.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
}
}
- private void executeMonitorHeld(Runnable work, long workBytes) {
+ class BoundedQueueExecutorWorkHandleImpl
+ implements BoundedQueueExecutorWorkHandle, AutoCloseable {
+
+ private int elements;
+ private long bytes;
+ private boolean closed = false;
+
+ private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
+ this.elements = elements;
+ this.bytes = bytes;
+ }
+
+ public synchronized void addBudget(int elements, long bytes) {
+ Preconditions.checkState(!closed, "Cannot add budget to a closed
WorkBudgetHandle");
+ this.elements += elements;
+ this.bytes += bytes;
+ }
+
+ public synchronized void cancel() {
+ this.closed = true;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (closed) return;
+ closed = true;
+ decrementCounters(this.elements, this.bytes);
+ }
+ }
+
+ private static class QueuedWork implements Runnable {
+
+ private final ExecutableWork work;
+ private final BoundedQueueExecutorWorkHandleImpl handle;
+ private final long workBytes;
+
+ public QueuedWork(
+ ExecutableWork work, BoundedQueueExecutorWorkHandleImpl handle, long
workBytes) {
+ this.work = work;
+ this.handle = handle;
+ this.workBytes = workBytes;
+ }
+
+ public void cancelHandle() {
+ handle.cancel();
+ }
+
+ public ExecutableWork getWork() {
+ return work;
+ }
+
+ public long getWorkBytes() {
+ return workBytes;
+ }
+
+ @Override
+ public void run() {
+ Preconditions.checkArgument(!handle.closed);
+ try {
+ work.run(handle);
+ } finally {
+ handle.close();
+ }
+ }
+ }
+
+ private void executeMonitorHeld(ExecutableWork work, long workBytes) {
+ ++elementsOutstanding;
bytesOutstanding += workBytes;
+ monitor.leave();
+ BoundedQueueExecutorWorkHandleImpl handle =
+ new BoundedQueueExecutorWorkHandleImpl(1, workBytes);
+ try {
Review Comment:
Done.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -232,21 +334,48 @@ private void executeMonitorHeld(Runnable work, long
workBytes) {
try {
work.run();
} finally {
- decrementCounters(workBytes);
+ decrementCounters(1, 0L);
}
});
- } catch (RuntimeException e) {
- // If the execute() call threw an exception, decrement counters here.
- decrementCounters(workBytes);
- throw e;
+ } catch (Throwable e) {
+ decrementCounters(1, 0L);
+ throw ExceptionUtils.propagate(e);
+ }
+ }
+
+ @VisibleForTesting
+ BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() {
+ return new BoundedQueueExecutorWorkHandleImpl(0, 0L);
+ }
+
+ /**
+ * Poll additional work to be executed inline inside with the current
execute(ExecutableWork work,
+ * long workBytes) call. It is the responsibility of the caller to execute
or discard the returned
+ * ExecutableWork. Budget for the returned work is released when the
execute() call finishes.
+ *
+ * @param handle the handle that was passed to ExecutableWork.executeWorkFn
+ */
+ public Optional<ExecutableWork> pollWork(BoundedQueueExecutorWorkHandle
handle) {
+ Preconditions.checkArgument(handle instanceof
BoundedQueueExecutorWorkHandleImpl);
+ BoundedQueueExecutorWorkHandleImpl internalHandle =
(BoundedQueueExecutorWorkHandleImpl) handle;
+ while (true) {
+ Runnable runnable = executor.getQueue().poll();
+ if (runnable == null) {
+ return Optional.empty();
+ }
+ if (runnable instanceof QueuedWork) {
+ QueuedWork queuedWork = (QueuedWork) runnable;
+ queuedWork.cancelHandle();
Review Comment:
Done. Added a merge method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]