gemini-code-assist[bot] commented on code in PR #38920:
URL: https://github.com/apache/beam/pull/38920#discussion_r3396344073
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java:
##########
@@ -553,4 +553,71 @@ public void testPollWorkWithLinkedBlockingQueue() throws
Exception {
blockerStop.countDown();
testExecutor.shutdown();
}
+
+ @Test
+ public void testPollWorkDropsFailedWork() throws Exception {
+ BoundedQueueExecutor testExecutor =
+ new BoundedQueueExecutor(
+ /* initialMaximumPoolSize= */ 1,
+ /* keepAliveTime= */ 60,
+ /* unit= */ TimeUnit.SECONDS,
+ /* maximumElementsOutstanding= */ 100,
+ /* maximumBytesOutstanding= */ 10000000,
+ new
ThreadFactoryBuilder().setNameFormat("testStealing-%d").setDaemon(true).build(),
+ useFairMonitor,
+ /*useKeyGroupWorkQueue=*/ true);
+
+ // Create blocker task to occupy the worker thread
+ CountDownLatch blockerStart = new CountDownLatch(1);
+ CountDownLatch blockerStop = new CountDownLatch(1);
+ ExecutableWork blockerWork =
+ createWorkWithCompIdAndKeyGroup(
+ "blockerComp",
+ DEFAULT_KEY_GROUP,
+ ignored -> {
+ blockerStart.countDown();
+ try {
+ blockerStop.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ testExecutor.execute(blockerWork, 0);
+ blockerStart.await();
+
+ Work.KeyGroup keyGroup1 = Work.KeyGroup.create(1, 1);
+
+ // Create executable tasks
+ ExecutableWork work1 = createWorkWithCompIdAndKeyGroup("compA", keyGroup1,
ignored -> {});
+ ExecutableWork work2 = createWorkWithCompIdAndKeyGroup("compA", keyGroup1,
ignored -> {});
+
+ // Mark work1 as failed
+ work1.work().setFailed();
+
+ // Enqueue tasks
+ testExecutor.execute(work1, 100);
+ testExecutor.execute(work2, 150);
+
+ // Total outstanding elements must be 3 (blocker + work1 + work2)
+ assertEquals(3, testExecutor.elementsOutstanding());
+
+ // Steal work from keyGroup1.
+ // The first work in queue is work1, which is failed.
+ // It should be dropped, its handle closed, and work2 should be returned.
+ try (BoundedQueueExecutorWorkHandleImpl stealHandle =
testExecutor.createBudgetHandle(0, 0L)) {
+ ExecutableWork stolen = testExecutor.pollWork("compA", keyGroup1,
stealHandle);
+ assertNotNull(stolen);
+ assertEquals(work2, stolen);
+ // blocker (1) + work2 (1) = 2. work1 (1) should have been released.
+ assertEquals(2, testExecutor.elementsOutstanding());
+ }
+ // work2 should also be released now because stealHandle is closed.
+ // blocker (1) = 1.
+ assertEquals(1, testExecutor.elementsOutstanding());
+
+ // Unblock the blocker and shut down
+ blockerStop.countDown();
+ testExecutor.shutdown();
Review Comment:

If any assertion fails during the test execution (for example,
`assertEquals(3, testExecutor.elementsOutstanding())`), the
`blockerStop.countDown()` and `testExecutor.shutdown()` calls will be skipped.
This will cause the blocker thread to hang indefinitely on
`blockerStop.await()`, leading to thread leaks and potential test suite hangs.
We should wrap the execution and assertions in a `try-finally` block to
ensure that resources are always cleaned up and the blocker thread is unblocked.
```java
try {
// Enqueue tasks
testExecutor.execute(work1, 100);
testExecutor.execute(work2, 150);
// Total outstanding elements must be 3 (blocker + work1 + work2)
assertEquals(3, testExecutor.elementsOutstanding());
// Steal work from keyGroup1.
// The first work in queue is work1, which is failed.
// It should be dropped, its handle closed, and work2 should be
returned.
try (BoundedQueueExecutorWorkHandleImpl stealHandle =
testExecutor.createBudgetHandle(0, 0L)) {
ExecutableWork stolen = testExecutor.pollWork("compA", keyGroup1,
stealHandle);
assertNotNull(stolen);
assertEquals(work2, stolen);
// blocker (1) + work2 (1) = 2. work1 (1) should have been released.
assertEquals(2, testExecutor.elementsOutstanding());
}
// work2 should also be released now because stealHandle is closed.
// blocker (1) = 1.
assertEquals(1, testExecutor.elementsOutstanding());
} finally {
// Unblock the blocker and shut down
blockerStop.countDown();
testExecutor.shutdown();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]