gemini-code-assist[bot] commented on code in PR #38970:
URL: https://github.com/apache/beam/pull/38970#discussion_r3415730999


##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/AsyncWrapperTest.java:
##########
@@ -791,14 +793,14 @@ public void testLoadCorrectness() {
                     timers.get(key));
               }));
       try {
-        Thread.sleep(random.nextInt(200));
+        Thread.sleep(random.nextInt(2));
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
     }
 
     try {

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Reducing the sleep time to `50 + random.nextInt(20)` milliseconds in a load 
test is highly likely to cause flakiness. Under heavy concurrent multi-threaded 
load, background threads may not complete all their tasks within such a short 
window (50-70ms). Keeping a slightly larger sleep duration (e.g., 1-2 seconds) 
is much safer and still significantly faster than the original 3-5 seconds.
   
   ```java
         Thread.sleep(1000 + random.nextInt(1000));
   ```



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/AsyncWrapperTest.java:
##########
@@ -419,7 +421,7 @@ public void testMultiKey() {
   // execution task has not finished processing yet.
   @Test
   public void testLongItem() {
-    BasicDofn dofn = new BasicDofn(1000);
+    BasicDofn dofn = new BasicDofn(10);

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Reducing the mock processing delay to 10ms introduces a severe race 
condition. Immediately after submitting the task, the main thread asserts 
`assertEquals(0, dofn.getProcessed())`. If the background thread completes 
within 10ms (which is highly likely on fast machines or if the main thread is 
briefly descheduled), this assertion will fail, making the test flaky.
   
   Since `waitForEmpty` now polls every 5ms, the test will resume almost 
instantly once the async task completes. Therefore, you can safely keep a 
larger delay (e.g., `500` ms) to guarantee that the intermediate assertions do 
not race with the background execution, without sacrificing test execution 
speed.
   
   ```suggestion
       BasicDofn dofn = new BasicDofn(500);
   ```



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/AsyncWrapperTest.java:
##########
@@ -231,20 +231,22 @@ private void waitForEmpty(AsyncWrapper<?, ?, ?> 
asyncWrapper) {
 
   private void waitForEmpty(AsyncWrapper<?, ?, ?> asyncWrapper, int 
timeoutSeconds) {
     int count = 0;
+    // Poll every 5 milliseconds instead of 1000 milliseconds for instant 
response
+    int maxIterations = timeoutSeconds * 200;
     while (!asyncWrapper.isEmpty()) {
       try {
-        Thread.sleep(1000);
+        Thread.sleep(5);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new RuntimeException(e);
       }
       count += 1;
-      if (count > timeoutSeconds) {
+      if (count > maxIterations) {
         throw new RuntimeException("Timed out waiting for async dofn to be 
empty");
       }
     }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using a loop counter (`count`) combined with `Thread.sleep(5)` to enforce a 
timeout is prone to flakiness. `Thread.sleep` does not guarantee exact sleep 
durations, and OS scheduling or CPU throttling in CI environments can cause 
each sleep to take significantly longer than 5ms. This can lead to premature 
timeouts.
   
   Using a time-based check with `System.currentTimeMillis()` is much more 
robust and standard.
   
   ```java
       long limit = System.currentTimeMillis() + timeoutSeconds * 1000L;
       while (!asyncWrapper.isEmpty()) {
         if (System.currentTimeMillis() > limit) {
           throw new RuntimeException("Timed out waiting for async dofn to be 
empty");
         }
         try {
           Thread.sleep(5);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
           throw new RuntimeException(e);
         }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to