scwhittle commented on code in PR #30572:
URL: https://github.com/apache/beam/pull/30572#discussion_r1521123032


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java:
##########
@@ -99,7 +103,7 @@ public void stop() {
   private void invalidateStuckCommits() {

Review Comment:
   this is now pretty abstract as both the computation supplier and the 
invalidation function are injected.  Could it be a single function 
invalidateStuckCommits(deadline)?



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java:
##########
@@ -17,89 +17,111 @@
  */
 package org.apache.beam.runners.dataflow.worker.windmill.work.budget;
 
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoInteractions;
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertFalse;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
 
 @RunWith(JUnit4.class)
 public class GetWorkBudgetRefresherTest {
-  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private static final int WAIT_BUFFER = 10;
-  private final Runnable redistributeBudget = Mockito.mock(Runnable.class);
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
 
-  private GetWorkBudgetRefresher createBudgetRefresher() {
-    return createBudgetRefresher(false);
+  private GetWorkBudgetRefresher createBudgetRefresher(Runnable 
redistributeBudget) {
+    return createBudgetRefresher(false, redistributeBudget);
   }
 
-  private GetWorkBudgetRefresher createBudgetRefresher(Boolean 
isBudgetRefreshPaused) {
+  private GetWorkBudgetRefresher createBudgetRefresher(
+      boolean isBudgetRefreshPaused, Runnable redistributeBudget) {
     return new GetWorkBudgetRefresher(() -> isBudgetRefreshPaused, 
redistributeBudget);
   }
 
   @Test
   public void testStop_successfullyTerminates() throws InterruptedException {
-    GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher();
+    CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
+    Runnable redistributeBudget = redistributeBudgetLatch::countDown;
+    GetWorkBudgetRefresher budgetRefresher = 
createBudgetRefresher(redistributeBudget);
     budgetRefresher.start();
     budgetRefresher.stop();
     budgetRefresher.requestBudgetRefresh();
-    Thread.sleep(WAIT_BUFFER);
-    verifyNoInteractions(redistributeBudget);
+    boolean redistributeBudgetRan =
+        redistributeBudgetLatch.await(WAIT_BUFFER, TimeUnit.MILLISECONDS);
+    // Make sure that redistributeBudgetLatch.countDown() is never called.
+    assertThat(redistributeBudgetLatch.getCount()).isEqualTo(1);
+    assertFalse(redistributeBudgetRan);
   }
 
   @Test
   public void testRequestBudgetRefresh_triggersBudgetRefresh() throws 
InterruptedException {
-    GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher();
+    CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
+    Runnable redistributeBudget = redistributeBudgetLatch::countDown;
+    GetWorkBudgetRefresher budgetRefresher = 
createBudgetRefresher(redistributeBudget);
     budgetRefresher.start();
     budgetRefresher.requestBudgetRefresh();
     // Wait a bit for redistribute budget to run.
-    Thread.sleep(WAIT_BUFFER);
-    verify(redistributeBudget, times(1)).run();
+    redistributeBudgetLatch.await();
+    assertThat(redistributeBudgetLatch.getCount()).isEqualTo(0);
   }
 
   @Test
   public void testScheduledBudgetRefresh() throws InterruptedException {
-    GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher();
+    CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
+    Runnable redistributeBudget = redistributeBudgetLatch::countDown;
+    GetWorkBudgetRefresher budgetRefresher = 
createBudgetRefresher(redistributeBudget);
     budgetRefresher.start();
-    Thread.sleep(GetWorkBudgetRefresher.SCHEDULED_BUDGET_REFRESH_MILLIS + 
WAIT_BUFFER);
-    verify(redistributeBudget, times(1)).run();
+    // Wait a bit for scheduled redistribute budget to run.

Review Comment:
   nit: "a bit" is a little confusing since it's just waiting
   
   ditto below



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java:
##########
@@ -99,7 +103,7 @@ public void stop() {
   private void invalidateStuckCommits() {

Review Comment:
   Or alternativley since this is just to get a hook for testing, can you 
modify the test instead to use the executor or computation supplier instead to 
enforce it runs?  It seems like you then call stop() which would block if the 
invalidation was running, either could be sufficient since it indicates it has 
started.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to