This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 600bd612dc2 [Dataflow Streaming] Remove GetWorkBudgetRefresher which 
is unused and has a flaky test. (#37317)
600bd612dc2 is described below

commit 600bd612dc2ef495fef08675398361159f7a03df
Author: Sam Whittle <[email protected]>
AuthorDate: Mon Jan 19 11:27:56 2026 +0100

    [Dataflow Streaming] Remove GetWorkBudgetRefresher which is unused and has 
a flaky test. (#37317)
---
 .../work/budget/GetWorkBudgetRefresher.java        | 133 ---------------------
 .../work/budget/GetWorkBudgetRefresherTest.java    | 127 --------------------
 2 files changed, 260 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java
deleted file mode 100644
index d81c7d0593f..00000000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.windmill.work.budget;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Supplier;
-import javax.annotation.concurrent.ThreadSafe;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.fn.stream.AdvancingPhaser;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handles refreshing the budget either via triggered or scheduled execution 
using a {@link
- * java.util.concurrent.Phaser} to emulate publish/subscribe pattern.
- */
-@Internal
-@ThreadSafe
-public final class GetWorkBudgetRefresher {
-  @VisibleForTesting public static final int SCHEDULED_BUDGET_REFRESH_MILLIS = 
100;
-  private static final int INITIAL_BUDGET_REFRESH_PHASE = 0;
-  private static final String BUDGET_REFRESH_THREAD = 
"GetWorkBudgetRefreshThread";
-  private static final Logger LOG = 
LoggerFactory.getLogger(GetWorkBudgetRefresher.class);
-
-  private final AdvancingPhaser budgetRefreshTrigger;
-  private final ExecutorService budgetRefreshExecutor;
-  private final Supplier<Boolean> isBudgetRefreshPaused;
-  private final Runnable redistributeBudget;
-
-  public GetWorkBudgetRefresher(
-      Supplier<Boolean> isBudgetRefreshPaused, Runnable redistributeBudget) {
-    this.budgetRefreshTrigger = new AdvancingPhaser(1);
-    this.budgetRefreshExecutor =
-        Executors.newSingleThreadExecutor(
-            new ThreadFactoryBuilder()
-                .setNameFormat(BUDGET_REFRESH_THREAD)
-                .setUncaughtExceptionHandler(
-                    (t, e) ->
-                        LOG.error(
-                            "{} failed due to uncaught exception during 
execution. ",
-                            t.getName(),
-                            e))
-                .build());
-    this.isBudgetRefreshPaused = isBudgetRefreshPaused;
-    this.redistributeBudget = redistributeBudget;
-  }
-
-  @SuppressWarnings("FutureReturnValueIgnored")
-  public void start() {
-    budgetRefreshExecutor.submit(this::subscribeToRefreshBudget);
-  }
-
-  /** Publishes an event to trigger a budget refresh. */
-  public void requestBudgetRefresh() {
-    budgetRefreshTrigger.arrive();
-  }
-
-  public void stop() {
-    budgetRefreshTrigger.arriveAndDeregister();
-    // Put the budgetRefreshTrigger in a terminated state, 
#waitForBudgetRefreshEventWithTimeout
-    // will subsequently return false, and #subscribeToRefreshBudget will 
return, completing the
-    // task.
-    budgetRefreshTrigger.forceTermination();
-    budgetRefreshExecutor.shutdownNow();
-  }
-
-  private void subscribeToRefreshBudget() {
-    int currentBudgetRefreshPhase = INITIAL_BUDGET_REFRESH_PHASE;
-    // Runs forever until #stop is called.
-    while (true) {
-      currentBudgetRefreshPhase = 
waitForBudgetRefreshEventWithTimeout(currentBudgetRefreshPhase);
-      // Phaser.awaitAdvanceInterruptibly(...) returns a negative value if the 
phaser is
-      // terminated, else returns when either a budget refresh has been 
manually triggered or
-      // SCHEDULED_BUDGET_REFRESH_MILLIS have passed.
-      if (currentBudgetRefreshPhase < 0) {
-        return;
-      }
-      // Budget refreshes are paused during endpoint updates.
-      if (!isBudgetRefreshPaused.get()) {
-        redistributeBudget.run();
-      }
-    }
-  }
-
-  /**
-   * Waits for a budget refresh trigger event with a timeout. Returns the 
current phase of the
-   * {@link #budgetRefreshTrigger}, to be used for following waits for the 
{@link
-   * #budgetRefreshTrigger} to advance.
-   *
-   * <p>Budget refresh event is triggered when {@link #budgetRefreshTrigger} 
moves on from the given
-   * currentBudgetRefreshPhase.
-   */
-  private int waitForBudgetRefreshEventWithTimeout(int 
currentBudgetRefreshPhase) {
-    try {
-      // Wait for budgetRefreshTrigger to advance FROM the current phase.
-      return budgetRefreshTrigger.awaitAdvanceInterruptibly(
-          currentBudgetRefreshPhase, SCHEDULED_BUDGET_REFRESH_MILLIS, 
TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new BudgetRefreshException("Error occurred waiting for budget 
refresh.", e);
-    } catch (TimeoutException ignored) {
-      // Intentionally do nothing since we trigger the budget refresh on the 
timeout.
-    }
-
-    return currentBudgetRefreshPhase;
-  }
-
-  private static class BudgetRefreshException extends RuntimeException {
-    private BudgetRefreshException(String msg, Throwable sourceException) {
-      super(msg, sourceException);
-    }
-  }
-}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java
deleted file mode 100644
index d3c00606726..00000000000
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.windmill.work.budget;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.junit.Assert.assertFalse;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(JUnit4.class)
-public class GetWorkBudgetRefresherTest {
-  private static final int WAIT_BUFFER = 10;
-  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
-
-  private GetWorkBudgetRefresher createBudgetRefresher(Runnable 
redistributeBudget) {
-    return createBudgetRefresher(false, redistributeBudget);
-  }
-
-  private GetWorkBudgetRefresher createBudgetRefresher(
-      boolean isBudgetRefreshPaused, Runnable redistributeBudget) {
-    return new GetWorkBudgetRefresher(() -> isBudgetRefreshPaused, 
redistributeBudget);
-  }
-
-  @Test
-  public void testStop_successfullyTerminates() throws InterruptedException {
-    CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
-    Runnable redistributeBudget = redistributeBudgetLatch::countDown;
-    GetWorkBudgetRefresher budgetRefresher = 
createBudgetRefresher(redistributeBudget);
-    budgetRefresher.start();
-    budgetRefresher.stop();
-    budgetRefresher.requestBudgetRefresh();
-    boolean redistributeBudgetRan =
-        redistributeBudgetLatch.await(WAIT_BUFFER, TimeUnit.MILLISECONDS);
-    // Make sure that redistributeBudgetLatch.countDown() is never called.
-    assertThat(redistributeBudgetLatch.getCount()).isEqualTo(1);
-    assertFalse(redistributeBudgetRan);
-  }
-
-  @Test
-  public void testRequestBudgetRefresh_triggersBudgetRefresh() throws 
InterruptedException {
-    CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
-    Runnable redistributeBudget = redistributeBudgetLatch::countDown;
-    GetWorkBudgetRefresher budgetRefresher = 
createBudgetRefresher(redistributeBudget);
-    budgetRefresher.start();
-    budgetRefresher.requestBudgetRefresh();
-    // Wait for redistribute budget to run.
-    redistributeBudgetLatch.await();
-    assertThat(redistributeBudgetLatch.getCount()).isEqualTo(0);
-  }
-
-  @Test
-  public void testScheduledBudgetRefresh() throws InterruptedException {
-    CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
-    Runnable redistributeBudget = redistributeBudgetLatch::countDown;
-    GetWorkBudgetRefresher budgetRefresher = 
createBudgetRefresher(redistributeBudget);
-    budgetRefresher.start();
-    // Wait for scheduled redistribute budget to run.
-    redistributeBudgetLatch.await();
-    assertThat(redistributeBudgetLatch.getCount()).isEqualTo(0);
-  }
-
-  @Test
-  public void testTriggeredAndScheduledBudgetRefresh_concurrent() throws 
InterruptedException {
-    CountDownLatch redistributeBudgetLatch = new CountDownLatch(2);
-    Runnable redistributeBudget = redistributeBudgetLatch::countDown;
-    GetWorkBudgetRefresher budgetRefresher = 
createBudgetRefresher(redistributeBudget);
-    budgetRefresher.start();
-    Thread budgetRefreshTriggerThread = new 
Thread(budgetRefresher::requestBudgetRefresh);
-    budgetRefreshTriggerThread.start();
-    budgetRefreshTriggerThread.join();
-    // Wait for triggered and scheduled redistribute budget to run.
-    redistributeBudgetLatch.await();
-    assertThat(redistributeBudgetLatch.getCount()).isEqualTo(0);
-  }
-
-  @Test
-  public void testTriggeredBudgetRefresh_doesNotRunWhenBudgetRefreshPaused()
-      throws InterruptedException {
-    CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
-    Runnable redistributeBudget = redistributeBudgetLatch::countDown;
-    GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(true, 
redistributeBudget);
-    budgetRefresher.start();
-    budgetRefresher.requestBudgetRefresh();
-    boolean redistributeBudgetRan =
-        redistributeBudgetLatch.await(WAIT_BUFFER, TimeUnit.MILLISECONDS);
-    // Make sure that redistributeBudgetLatch.countDown() is never called.
-    assertThat(redistributeBudgetLatch.getCount()).isEqualTo(1);
-    assertFalse(redistributeBudgetRan);
-  }
-
-  @Test
-  public void testScheduledBudgetRefresh_doesNotRunWhenBudgetRefreshPaused()
-      throws InterruptedException {
-    CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
-    Runnable redistributeBudget = redistributeBudgetLatch::countDown;
-    GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(true, 
redistributeBudget);
-    budgetRefresher.start();
-    boolean redistributeBudgetRan =
-        redistributeBudgetLatch.await(
-            GetWorkBudgetRefresher.SCHEDULED_BUDGET_REFRESH_MILLIS + 
WAIT_BUFFER,
-            TimeUnit.MILLISECONDS);
-    // Make sure that redistributeBudgetLatch.countDown() is never called.
-    assertThat(redistributeBudgetLatch.getCount()).isEqualTo(1);
-    assertFalse(redistributeBudgetRan);
-  }
-}

Reply via email to