gemini-code-assist[bot] commented on code in PR #38970:
URL: https://github.com/apache/beam/pull/38970#discussion_r3415730999
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/AsyncWrapperTest.java:
##########
@@ -791,14 +793,14 @@ public void testLoadCorrectness() {
timers.get(key));
}));
try {
- Thread.sleep(random.nextInt(200));
+ Thread.sleep(random.nextInt(2));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
try {
Review Comment:

Reducing the sleep time to `50 + random.nextInt(20)` milliseconds in a load
test is highly likely to cause flakiness. Under heavy concurrent multi-threaded
load, background threads may not complete all their tasks within such a short
window (50-70ms). Keeping a slightly larger sleep duration (e.g., 1-2 seconds)
is much safer and still significantly faster than the original 3-5 seconds.
```java
Thread.sleep(1000 + random.nextInt(1000));
```
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/AsyncWrapperTest.java:
##########
@@ -419,7 +421,7 @@ public void testMultiKey() {
// execution task has not finished processing yet.
@Test
public void testLongItem() {
- BasicDofn dofn = new BasicDofn(1000);
+ BasicDofn dofn = new BasicDofn(10);
Review Comment:

Reducing the mock processing delay to 10ms introduces a severe race
condition. Immediately after submitting the task, the main thread asserts
`assertEquals(0, dofn.getProcessed())`. If the background thread completes
within 10ms (which is highly likely on fast machines or if the main thread is
briefly descheduled), this assertion will fail, making the test flaky.
Since `waitForEmpty` now polls every 5ms, the test will resume almost
instantly once the async task completes. Therefore, you can safely keep a
larger delay (e.g., `500` ms) to guarantee that the intermediate assertions do
not race with the background execution, without sacrificing test execution
speed.
```suggestion
BasicDofn dofn = new BasicDofn(500);
```
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/AsyncWrapperTest.java:
##########
@@ -231,20 +231,22 @@ private void waitForEmpty(AsyncWrapper<?, ?, ?>
asyncWrapper) {
private void waitForEmpty(AsyncWrapper<?, ?, ?> asyncWrapper, int
timeoutSeconds) {
int count = 0;
+ // Poll every 5 milliseconds instead of 1000 milliseconds for instant
response
+ int maxIterations = timeoutSeconds * 200;
while (!asyncWrapper.isEmpty()) {
try {
- Thread.sleep(1000);
+ Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
count += 1;
- if (count > timeoutSeconds) {
+ if (count > maxIterations) {
throw new RuntimeException("Timed out waiting for async dofn to be
empty");
}
}
Review Comment:

Using a loop counter (`count`) combined with `Thread.sleep(5)` to enforce a
timeout is prone to flakiness. `Thread.sleep` does not guarantee exact sleep
durations, and OS scheduling or CPU throttling in CI environments can cause
each sleep to take significantly longer than 5ms. This can lead to premature
timeouts.
Using a time-based check with `System.currentTimeMillis()` is much more
robust and standard.
```java
long limit = System.currentTimeMillis() + timeoutSeconds * 1000L;
while (!asyncWrapper.isEmpty()) {
if (System.currentTimeMillis() > limit) {
throw new RuntimeException("Timed out waiting for async dofn to be
empty");
}
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]