This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 468ae40dc8e Fix AsyncWrapperTest timeout and flakiness issues. (#38970)
468ae40dc8e is described below

commit 468ae40dc8e2424ba4acd957226e369c97d1e7b1
Author: tejasiyer-dev <[email protected]>
AuthorDate: Tue Jun 16 05:15:55 2026 -0700

    Fix AsyncWrapperTest timeout and flakiness issues. (#38970)
    
    * Fix AsyncWrapperTest timeout and flakiness issues.
    
    * Fix AsyncWrapperTest to prevent new race conditions and premature 
timeouts.
---
 .../beam/sdk/transforms/AsyncWrapperTest.java      | 43 +++++++++++-----------
 1 file changed, 21 insertions(+), 22 deletions(-)

diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/AsyncWrapperTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/AsyncWrapperTest.java
index e95f586e884..183b1851459 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/AsyncWrapperTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/AsyncWrapperTest.java
@@ -230,21 +230,20 @@ public class AsyncWrapperTest implements Serializable {
   }
 
   private void waitForEmpty(AsyncWrapper<?, ?, ?> asyncWrapper, int 
timeoutSeconds) {
-    int count = 0;
+    long limit = System.currentTimeMillis() + timeoutSeconds * 1000L;
     while (!asyncWrapper.isEmpty()) {
+      if (System.currentTimeMillis() > limit) {
+        throw new RuntimeException("Timed out waiting for async dofn to be 
empty");
+      }
       try {
-        Thread.sleep(1000);
+        Thread.sleep(5);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
         throw new RuntimeException(e);
       }
-      count += 1;
-      if (count > timeoutSeconds) {
-        throw new RuntimeException("Timed out waiting for async dofn to be 
empty");
-      }
     }
     try {
-      Thread.sleep(1000);
+      Thread.sleep(5);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
     }
@@ -419,7 +418,7 @@ public class AsyncWrapperTest implements Serializable {
   // execution task has not finished processing yet.
   @Test
   public void testLongItem() {
-    BasicDofn dofn = new BasicDofn(1000);
+    BasicDofn dofn = new BasicDofn(500);
     AsyncWrapper<String, String, String> asyncWrapper =
         new AsyncWrapper<>(
             dofn, 1, Duration.standardSeconds(5), null, null, null, null, 
useThreadPool);
@@ -438,7 +437,7 @@ public class AsyncWrapperTest implements Serializable {
     assertEquals(0, dofn.getProcessed());
     assertEquals(1, fakeBagState.items.size());
 
-    waitForEmpty(asyncWrapper, 20);
+    waitForEmpty(asyncWrapper, 2);
 
     result =
         asyncWrapper.commitFinishedItemsDirect(
@@ -538,7 +537,7 @@ public class AsyncWrapperTest implements Serializable {
   // Identical elements should not spawn multiple concurrent background 
executions.
   @Test
   public void testDuplicates() {
-    BasicDofn dofn = new BasicDofn(1000);
+    BasicDofn dofn = new BasicDofn(10);
     AsyncWrapper<String, String, String> asyncWrapper =
         new AsyncWrapper<>(
             dofn, 1, Duration.standardSeconds(5), null, null, null, null, 
useThreadPool);
@@ -568,7 +567,7 @@ public class AsyncWrapperTest implements Serializable {
   // has cleared are correctly tracked and processed.
   @Test
   public void testSlowDuplicates() {
-    BasicDofn dofn = new BasicDofn(5000);
+    BasicDofn dofn = new BasicDofn(20);
     AsyncWrapper<String, String, String> asyncWrapper =
         new AsyncWrapper<>(
             dofn, 1, Duration.standardSeconds(5), null, null, null, null, 
useThreadPool);
@@ -581,7 +580,7 @@ public class AsyncWrapperTest implements Serializable {
     asyncWrapper.processDirect(msg, GlobalWindow.INSTANCE, Instant.now(), 
fakeBagState, fakeTimer);
 
     try {
-      Thread.sleep(10000);
+      Thread.sleep(100);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
     }
@@ -610,7 +609,7 @@ public class AsyncWrapperTest implements Serializable {
   // and decrement immediately upon execution completion.
   @Test
   public void testBufferCount() {
-    BasicDofn dofn = new BasicDofn(1000);
+    BasicDofn dofn = new BasicDofn(10);
     AsyncWrapper<String, String, String> asyncWrapper =
         new AsyncWrapper<>(
             dofn, 1, Duration.standardSeconds(5), null, null, null, null, 
useThreadPool);
@@ -637,7 +636,7 @@ public class AsyncWrapperTest implements Serializable {
   // the scheduler must block and delay submissions appropriately.
   @Test
   public void testBufferStopsAcceptingItems() {
-    BasicDofn dofn = new BasicDofn(1000);
+    BasicDofn dofn = new BasicDofn(500);
     AsyncWrapper<String, String, String> asyncWrapper =
         new AsyncWrapper<>(
             dofn,
@@ -670,7 +669,7 @@ public class AsyncWrapperTest implements Serializable {
     }
 
     try {
-      Thread.sleep(200);
+      Thread.sleep(100);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
     }
@@ -707,7 +706,7 @@ public class AsyncWrapperTest implements Serializable {
   // Verifies actively cancelled elements are cleanly dropped from the buffer 
during throttling.
   @Test
   public void testBufferWithCancellation() {
-    BasicDofn dofn = new BasicDofn(1000);
+    BasicDofn dofn = new BasicDofn(10);
     AsyncWrapper<String, String, String> asyncWrapper =
         new AsyncWrapper<>(
             dofn, 1, Duration.standardSeconds(5), null, null, null, null, 
useThreadPool);
@@ -746,7 +745,7 @@ public class AsyncWrapperTest implements Serializable {
   // across multiple keys correctly under heavy multi-threaded load.
   @Test
   public void testLoadCorrectness() {
-    BasicDofn dofn = new BasicDofn(1000);
+    BasicDofn dofn = new BasicDofn(10);
     AsyncWrapper<String, String, String> asyncWrapper =
         new AsyncWrapper<>(
             dofn,
@@ -791,14 +790,14 @@ public class AsyncWrapperTest implements Serializable {
                     timers.get(key));
               }));
       try {
-        Thread.sleep(random.nextInt(200));
+        Thread.sleep(random.nextInt(2));
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
     }
 
     try {
-      Thread.sleep(3000 + random.nextInt(2000));
+      Thread.sleep(1000 + random.nextInt(1000));
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
     }
@@ -834,7 +833,7 @@ public class AsyncWrapperTest implements Serializable {
         }
       }
       try {
-        Thread.sleep(1000 + random.nextInt(2000));
+        Thread.sleep(10 + random.nextInt(20));
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
       }
@@ -854,7 +853,7 @@ public class AsyncWrapperTest implements Serializable {
   // must complete cleanly without thread or lock deadlocks.
   @Test
   public void testResetStateConcurrentTeardown() {
-    BasicDofn dofn = new BasicDofn(500);
+    BasicDofn dofn = new BasicDofn(10);
     AsyncWrapper<String, String, String> asyncWrapper =
         new AsyncWrapper<>(
             dofn, 1, Duration.standardSeconds(5), null, null, null, null, 
useThreadPool);
@@ -867,7 +866,7 @@ public class AsyncWrapperTest implements Serializable {
         KV.of("key1", "1"), GlobalWindow.INSTANCE, Instant.now(), 
fakeBagState, fakeTimer);
 
     try {
-      Thread.sleep(50);
+      Thread.sleep(2);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
     }

Reply via email to