scwhittle commented on code in PR #28835:
URL: https://github.com/apache/beam/pull/28835#discussion_r1368663156


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/DispatcherClient.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill;
+
+import static 
org.apache.beam.runners.dataflow.worker.windmill.util.WindmillChannelFactory.LOCALHOST;
+import static 
org.apache.beam.runners.dataflow.worker.windmill.util.WindmillChannelFactory.localhostChannel;
+import static 
org.apache.beam.runners.dataflow.worker.windmill.util.WindmillChannelFactory.remoteChannel;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.concurrent.ThreadSafe;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import 
org.apache.beam.runners.dataflow.worker.windmill.util.WindmillGrpcStubFactory;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Manages endpoints and stubs for connecting to the Windmill Dispatcher. */
+@ThreadSafe
+public class DispatcherClient {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DispatcherClient.class);
+  private final WindmillGrpcStubFactory windmillGrpcStubFactory;
+  private final int windmillServiceRpcChannelTimeoutSec;
+
+  @GuardedBy("this")
+  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
+
+  @GuardedBy("this")
+  private final Set<HostAndPort> dispatcherEndpoints;
+
+  @GuardedBy("this")
+  private final Random rand;
+
+  private DispatcherClient(
+      WindmillGrpcStubFactory windmillGrpcStubFactory,
+      int windmillServiceRpcChannelTimeoutSec,
+      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+      Set<HostAndPort> dispatcherEndpoints,
+      Random rand) {
+    this.windmillGrpcStubFactory = windmillGrpcStubFactory;
+    this.windmillServiceRpcChannelTimeoutSec = 
windmillServiceRpcChannelTimeoutSec;
+    this.dispatcherStubs = dispatcherStubs;
+    this.dispatcherEndpoints = dispatcherEndpoints;
+    this.rand = rand;
+  }
+
+  public static DispatcherClient create(
+      WindmillGrpcStubFactory windmillGrpcStubFactory, int 
windmillServiceRpcChannelTimeoutSec) {
+    return new DispatcherClient(
+        windmillGrpcStubFactory,
+        windmillServiceRpcChannelTimeoutSec,
+        new ArrayList<>(),
+        new HashSet<>(),
+        new Random());
+  }
+
+  @VisibleForTesting
+  static DispatcherClient forTesting(
+      WindmillGrpcStubFactory windmillGrpcStubFactory,
+      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+      Set<HostAndPort> dispatcherEndpoints) {
+    return new DispatcherClient(

Review Comment:
   enforce that the endpoints and stubs are same size?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamEngineConnectionState.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill;
+
+import com.google.auto.value.AutoValue;
+import java.util.function.Supplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Represents the current state of connections to Streaming Engine. Updated 
when backend worker
+ * pools change due to backend worker crashes or when backend workers assigned 
to the key range
+ * being processed by this user worker change.
+ */
+@AutoValue
+abstract class StreamEngineConnectionState {
+  static final StreamEngineConnectionState EMPTY =
+      builder()
+          .setWindmillConnections(ImmutableMap.of())

Review Comment:
   would it make sense for empty maps to be the default for all of these 
fields? could modify builder() to set them or perhaps a default annotation 
would work?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamingEngineClient.java:
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.errorprone.annotations.CheckReturnValue;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkerMetadataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.grpcclient.ThrottleTimer;
+import 
org.apache.beam.runners.dataflow.worker.windmill.util.WindmillGrpcStubFactory;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client for StreamingEngine. Given a {@link GetWorkBudget}, divides the 
budget and starts the
+ * {@link 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream}(s).
+ */
+@CheckReturnValue
+@ThreadSafe
+public class StreamingEngineClient {
+  @VisibleForTesting static final int SCHEDULED_BUDGET_REFRESH_MILLIS = 100;
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingEngineClient.class);
+  private static final String SCHEDULED_BUDGET_REFRESH_THREAD = 
"ScheduledBudgetRefreshThread";
+  private static final String TRIGGERED_BUDGET_REFRESH_THREAD = 
"TriggeredBudgetRefreshThread";
+  private static final String CONSUMER_WORKER_METADATA_THREAD = 
"ConsumeWorkerMetadataThread";
+  private final AtomicBoolean started;
+  private final AtomicBoolean running;
+  private final GetWorkBudget totalGetWorkBudget;
+  private final StreamingEngineStreamFactory streamingEngineStreamFactory;
+  private final WorkItemReceiver workItemReceiver;
+  private final WindmillGrpcStubFactory windmillGrpcStubFactory;
+  private final GetWorkRequest getWorkRequest;
+  private final GetWorkBudgetDistributor getWorkBudgetDistributor;
+  private final DispatcherClient dispatcherClient;
+  /**
+   * Reference to {@link GetWorkerMetadataStream} that is lazily initialized, 
with its initial value
+   * being null.
+   */
+  private final AtomicReference<GetWorkerMetadataStream> 
getWorkerMetadataStream;
+
+  private final AtomicBoolean isBudgetRefreshPaused;
+  /** Writes are guarded by synchronization, reads are lock free. */
+  private final AtomicReference<StreamEngineConnectionState> connections;
+
+  private final ExecutorService triggeredBudgetRefreshListener;
+  /**
+   * Used to implement publish/subscribe behavior for triggering budget 
refreshes/redistribution.
+   * Subscriber {@link #triggeredBudgetRefreshListener} waits for messages to 
be written to the
+   * queue, blocking if necessary.
+   */
+  private final SynchronousQueue<Boolean> budgetRefreshTrigger;
+
+  /** Redistributes the budget on a timed cadence. */
+  private final ScheduledExecutorService scheduledBudgetRefreshExecutor;
+
+  private final ThrottleTimer getWorkerMetadataThrottleTimer;
+  private final CountDownLatch getWorkerMetadataReady;
+  private final ExecutorService consumeWorkerMetadataExecutor;
+
+  private StreamingEngineClient(
+      AtomicBoolean running,
+      GetWorkBudget totalGetWorkBudget,
+      AtomicReference<StreamEngineConnectionState> connections,
+      StreamingEngineStreamFactory streamingEngineStreamFactory,
+      WorkItemReceiver workItemReceiver,
+      WindmillGrpcStubFactory windmillGrpcStubFactory,
+      GetWorkRequest getWorkRequest,
+      GetWorkBudgetDistributor getWorkBudgetDistributor,
+      DispatcherClient dispatcherClient) {
+    this.started = new AtomicBoolean();
+    this.running = running;
+    this.totalGetWorkBudget = totalGetWorkBudget;
+    this.streamingEngineStreamFactory = streamingEngineStreamFactory;
+    this.workItemReceiver = workItemReceiver;
+    this.connections = connections;
+    this.windmillGrpcStubFactory = windmillGrpcStubFactory;
+    this.getWorkRequest = getWorkRequest;
+    this.getWorkBudgetDistributor = getWorkBudgetDistributor;
+    this.dispatcherClient = dispatcherClient;
+    this.isBudgetRefreshPaused = new AtomicBoolean(false);
+    this.budgetRefreshTrigger = new SynchronousQueue<>(/* fair= */ true);
+    this.getWorkerMetadataThrottleTimer = new ThrottleTimer();
+    this.scheduledBudgetRefreshExecutor =
+        createSingleThreadedExecutor(SCHEDULED_BUDGET_REFRESH_THREAD);
+    this.consumeWorkerMetadataExecutor =
+        createSingleThreadedExecutor(CONSUMER_WORKER_METADATA_THREAD);
+    this.triggeredBudgetRefreshListener =
+        createSingleThreadedExecutor(TRIGGERED_BUDGET_REFRESH_THREAD);
+    this.getWorkerMetadataStream = new AtomicReference<>();
+    this.getWorkerMetadataReady = new CountDownLatch(1);
+  }
+
+  public static StreamingEngineClient create(
+      AtomicBoolean running,
+      GetWorkBudget totalGetWorkBudget,
+      StreamingEngineStreamFactory streamingEngineStreamFactory,
+      WorkItemReceiver workItemReceiver,
+      WindmillGrpcStubFactory windmillGrpcStubFactory,
+      GetWorkRequest getWorkRequest,

Review Comment:
   this seems like it overlaps with totalGetWorkBudget, can one of them be 
removed? could remove budget or change this to JobHeader?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamingEngineClient.java:
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.errorprone.annotations.CheckReturnValue;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkerMetadataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.grpcclient.ThrottleTimer;
+import 
org.apache.beam.runners.dataflow.worker.windmill.util.WindmillGrpcStubFactory;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client for StreamingEngine. Given a {@link GetWorkBudget}, divides the 
budget and starts the
+ * {@link 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream}(s).
+ */
+@CheckReturnValue
+@ThreadSafe
+public class StreamingEngineClient {
+  @VisibleForTesting static final int SCHEDULED_BUDGET_REFRESH_MILLIS = 100;
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingEngineClient.class);
+  private static final String SCHEDULED_BUDGET_REFRESH_THREAD = 
"ScheduledBudgetRefreshThread";
+  private static final String TRIGGERED_BUDGET_REFRESH_THREAD = 
"TriggeredBudgetRefreshThread";
+  private static final String CONSUMER_WORKER_METADATA_THREAD = 
"ConsumeWorkerMetadataThread";
+  private final AtomicBoolean started;
+  private final AtomicBoolean running;
+  private final GetWorkBudget totalGetWorkBudget;
+  private final StreamingEngineStreamFactory streamingEngineStreamFactory;
+  private final WorkItemReceiver workItemReceiver;
+  private final WindmillGrpcStubFactory windmillGrpcStubFactory;
+  private final GetWorkRequest getWorkRequest;
+  private final GetWorkBudgetDistributor getWorkBudgetDistributor;
+  private final DispatcherClient dispatcherClient;
+  /**
+   * Reference to {@link GetWorkerMetadataStream} that is lazily initialized, 
with its initial value
+   * being null.
+   */
+  private final AtomicReference<GetWorkerMetadataStream> 
getWorkerMetadataStream;
+
+  private final AtomicBoolean isBudgetRefreshPaused;
+  /** Writes are guarded by synchronization, reads are lock free. */
+  private final AtomicReference<StreamEngineConnectionState> connections;
+
+  private final ExecutorService triggeredBudgetRefreshListener;
+  /**
+   * Used to implement publish/subscribe behavior for triggering budget 
refreshes/redistribution.
+   * Subscriber {@link #triggeredBudgetRefreshListener} waits for messages to 
be written to the
+   * queue, blocking if necessary.
+   */
+  private final SynchronousQueue<Boolean> budgetRefreshTrigger;
+
+  /** Redistributes the budget on a timed cadence. */
+  private final ScheduledExecutorService scheduledBudgetRefreshExecutor;
+
+  private final ThrottleTimer getWorkerMetadataThrottleTimer;
+  private final CountDownLatch getWorkerMetadataReady;
+  private final ExecutorService consumeWorkerMetadataExecutor;
+
+  private StreamingEngineClient(
+      AtomicBoolean running,
+      GetWorkBudget totalGetWorkBudget,
+      AtomicReference<StreamEngineConnectionState> connections,
+      StreamingEngineStreamFactory streamingEngineStreamFactory,
+      WorkItemReceiver workItemReceiver,
+      WindmillGrpcStubFactory windmillGrpcStubFactory,
+      GetWorkRequest getWorkRequest,
+      GetWorkBudgetDistributor getWorkBudgetDistributor,
+      DispatcherClient dispatcherClient) {
+    this.started = new AtomicBoolean();
+    this.running = running;
+    this.totalGetWorkBudget = totalGetWorkBudget;
+    this.streamingEngineStreamFactory = streamingEngineStreamFactory;
+    this.workItemReceiver = workItemReceiver;
+    this.connections = connections;
+    this.windmillGrpcStubFactory = windmillGrpcStubFactory;
+    this.getWorkRequest = getWorkRequest;
+    this.getWorkBudgetDistributor = getWorkBudgetDistributor;
+    this.dispatcherClient = dispatcherClient;
+    this.isBudgetRefreshPaused = new AtomicBoolean(false);
+    this.budgetRefreshTrigger = new SynchronousQueue<>(/* fair= */ true);
+    this.getWorkerMetadataThrottleTimer = new ThrottleTimer();
+    this.scheduledBudgetRefreshExecutor =
+        createSingleThreadedExecutor(SCHEDULED_BUDGET_REFRESH_THREAD);
+    this.consumeWorkerMetadataExecutor =
+        createSingleThreadedExecutor(CONSUMER_WORKER_METADATA_THREAD);
+    this.triggeredBudgetRefreshListener =
+        createSingleThreadedExecutor(TRIGGERED_BUDGET_REFRESH_THREAD);
+    this.getWorkerMetadataStream = new AtomicReference<>();
+    this.getWorkerMetadataReady = new CountDownLatch(1);
+  }
+
+  public static StreamingEngineClient create(
+      AtomicBoolean running,
+      GetWorkBudget totalGetWorkBudget,
+      StreamingEngineStreamFactory streamingEngineStreamFactory,
+      WorkItemReceiver workItemReceiver,
+      WindmillGrpcStubFactory windmillGrpcStubFactory,
+      GetWorkRequest getWorkRequest,
+      GetWorkBudgetDistributor getWorkBudgetDistributor,
+      DispatcherClient dispatcherClient) {
+    StreamingEngineClient streamingEngineClient =
+        new StreamingEngineClient(
+            running,
+            totalGetWorkBudget,
+            new AtomicReference<>(StreamEngineConnectionState.EMPTY),
+            streamingEngineStreamFactory,
+            workItemReceiver,
+            windmillGrpcStubFactory,
+            getWorkRequest,
+            getWorkBudgetDistributor,
+            dispatcherClient);
+    streamingEngineClient.startGetWorkerMetadataStream();
+    return streamingEngineClient;
+  }
+
+  @VisibleForTesting
+  static StreamingEngineClient forTesting(
+      AtomicBoolean running,
+      GetWorkBudget totalGetWorkBudget,
+      AtomicReference<StreamEngineConnectionState> connections,
+      StreamingEngineStreamFactory streamingEngineStreamFactory,
+      WorkItemReceiver workItemReceiver,
+      WindmillGrpcStubFactory windmillGrpcStubFactory,
+      GetWorkRequest getWorkRequest,
+      GetWorkBudgetDistributor getWorkBudgetDistributor,
+      DispatcherClient dispatcherClient) {
+    StreamingEngineClient streamingEngineClient =
+        new StreamingEngineClient(
+            running,
+            totalGetWorkBudget,
+            connections,
+            streamingEngineStreamFactory,
+            workItemReceiver,
+            windmillGrpcStubFactory,
+            getWorkRequest,
+            getWorkBudgetDistributor,
+            dispatcherClient);
+    streamingEngineClient.startGetWorkerMetadataStream();
+    return streamingEngineClient;
+  }
+
+  private static ScheduledExecutorService createSingleThreadedExecutor(String 
threadName) {
+    return Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactoryBuilder()
+            .setNameFormat(threadName)
+            .setUncaughtExceptionHandler(
+                (t, e) ->
+                    LOG.error(
+                        "{} failed due to uncaught exception during execution. 
", t.getName(), e))
+            .build());
+  }
+
+  /**
+   * Starts the streams with the {@link #connections} values. Does nothing if 
this has already been
+   * called.
+   *
+   * @throws IllegalArgumentException if trying to start before {@link 
#connections} are set with
+   *     {@link GetWorkerMetadataStream}.
+   */
+  public void startAndCacheStreams() {
+    // Do nothing if we have already initialized the initial streams.
+    if (!started.compareAndSet(false, true)) {
+      return;
+    }
+    waitForStreamingEngineEndpoints();
+    StreamEngineConnectionState currentConnectionsState = connections.get();
+    Preconditions.checkState(
+        !StreamEngineConnectionState.EMPTY.equals(currentConnectionsState),
+        "Cannot start streams without connections.");
+    LOG.info("Starting initial GetWorkStreams with connections={}", 
currentConnectionsState);
+    ImmutableCollection<WindmillStreamSender> windmillStreamSenders =
+        currentConnectionsState.windmillStreams().values();
+    getWorkBudgetDistributor.distributeBudget(
+        currentConnectionsState.windmillStreams().values(), 
totalGetWorkBudget);
+    windmillStreamSenders.forEach(WindmillStreamSender::startStreams);
+    startBudgetRefreshThreads();
+  }
+
+  private void waitForStreamingEngineEndpoints() {
+    try {
+      getWorkerMetadataReady.await();
+    } catch (InterruptedException e) {
+      throw new StreamingEngineClientException(
+          "Error occurred waiting for StreamingEngine backend endpoints.", e);
+    }
+  }
+
+  /**
+   * {@link java.util.function.Consumer<WindmillEndpoints>} used to update 
{@link #connections} on
+   * new backend worker metadata.
+   */
+  private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
+    isBudgetRefreshPaused.set(true);
+    LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
+    ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
+        
createNewWindmillConnections(ImmutableSet.copyOf(newWindmillEndpoints.windmillEndpoints()));
+    ImmutableMap<WindmillConnection, WindmillStreamSender> newWindmillStreams =
+        
closeStaleStreamsAndCreateNewStreams(ImmutableSet.copyOf(newWindmillConnections.values()));
+    ImmutableMap<Endpoint, Supplier<GetDataStream>> newGlobalDataStreams =
+        createNewGlobalDataStreams(
+            
ImmutableSet.copyOf(newWindmillEndpoints.globalDataEndpoints().values()));
+
+    StreamEngineConnectionState newConnectionsState =
+        StreamEngineConnectionState.builder()
+            .setWindmillConnections(newWindmillConnections)
+            .setWindmillStreams(newWindmillStreams)
+            .setGlobalDataEndpoints(newWindmillEndpoints.globalDataEndpoints())
+            .setGlobalDataStreams(newGlobalDataStreams)
+            .build();
+
+    LOG.info(
+        "Setting new connections: {}. Previous connections: {}.",
+        newConnectionsState,
+        connections.get());
+
+    isBudgetRefreshPaused.set(false);
+    connections.set(newConnectionsState);
+
+    // On first worker metadata. Trigger
+    if (getWorkerMetadataReady.getCount() > 0) {
+      getWorkerMetadataReady.countDown();
+    } else {
+      requestBudgetRefresh();
+    }
+  }
+
+  public ImmutableList<Long> getAndResetThrottleTimes() {
+    StreamEngineConnectionState currentConnections = connections.get();
+
+    ImmutableList<Long> keyedWorkStreamThrottleTimes =
+        currentConnections.windmillStreams().values().stream()
+            .map(WindmillStreamSender::getAndResetThrottleTime)
+            .collect(toImmutableList());
+
+    return ImmutableList.<Long>builder()
+        .add(getWorkerMetadataThrottleTimer.getAndResetThrottleTime())
+        .addAll(keyedWorkStreamThrottleTimes)
+        .build();
+  }
+
+  /** Starts {@link GetWorkerMetadataStream}. */
+  @SuppressWarnings({
+    "FutureReturnValueIgnored", // ignoring Future returned from 
Executor.submit()
+    "nullness" // Uninitialized value of getWorkerMetadataStream is null.
+  })
+  private synchronized void startGetWorkerMetadataStream() {
+    // We only want to set and start this value once.
+    getWorkerMetadataStream.compareAndSet(

Review Comment:
   this is just called once during create methods, I think you could remove the 
atomic and instead just assert it is null before setting it.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamingEngineClient.java:
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.errorprone.annotations.CheckReturnValue;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkerMetadataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.grpcclient.ThrottleTimer;
+import 
org.apache.beam.runners.dataflow.worker.windmill.util.WindmillGrpcStubFactory;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client for StreamingEngine. Given a {@link GetWorkBudget}, divides the 
budget and starts the
+ * {@link 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream}(s).
+ */
+@CheckReturnValue
+@ThreadSafe
+public class StreamingEngineClient {
+  @VisibleForTesting static final int SCHEDULED_BUDGET_REFRESH_MILLIS = 100;
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingEngineClient.class);
+  private static final String SCHEDULED_BUDGET_REFRESH_THREAD = 
"ScheduledBudgetRefreshThread";
+  private static final String TRIGGERED_BUDGET_REFRESH_THREAD = 
"TriggeredBudgetRefreshThread";
+  private static final String CONSUMER_WORKER_METADATA_THREAD = 
"ConsumeWorkerMetadataThread";
+  private final AtomicBoolean started;
+  private final AtomicBoolean running;
+  private final GetWorkBudget totalGetWorkBudget;
+  private final StreamingEngineStreamFactory streamingEngineStreamFactory;
+  private final WorkItemReceiver workItemReceiver;
+  private final WindmillGrpcStubFactory windmillGrpcStubFactory;
+  private final GetWorkRequest getWorkRequest;
+  private final GetWorkBudgetDistributor getWorkBudgetDistributor;
+  private final DispatcherClient dispatcherClient;
+  /**
+   * Reference to {@link GetWorkerMetadataStream} that is lazily initialized, 
with its initial value
+   * being null.
+   */
+  private final AtomicReference<GetWorkerMetadataStream> 
getWorkerMetadataStream;
+
+  private final AtomicBoolean isBudgetRefreshPaused;
+  /** Writes are guarded by synchronization, reads are lock free. */
+  private final AtomicReference<StreamEngineConnectionState> connections;
+
+  private final ExecutorService triggeredBudgetRefreshListener;
+  /**
+   * Used to implement publish/subscribe behavior for triggering budget 
refreshes/redistribution.
+   * Subscriber {@link #triggeredBudgetRefreshListener} waits for messages to 
be written to the
+   * queue, blocking if necessary.
+   */
+  private final SynchronousQueue<Boolean> budgetRefreshTrigger;
+
+  /** Redistributes the budget on a timed cadence. */
+  private final ScheduledExecutorService scheduledBudgetRefreshExecutor;

Review Comment:
   I'm wondering if we can get rid of the schedule. It seems like budget 
distribution can be just based upon events:
   - getting new set of endpoints
   - budget returned
   
   Benefits without a scheduled component is that we don't have possible 
latency waiting for the scheduling to occur. If there are things that we do 
want to evaluate periodically (for example if we want to reclaim budget we 
think is misplaced) perhaps we could do that as a followup.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamEngineConnectionState.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill;
+
+import com.google.auto.value.AutoValue;
+import java.util.function.Supplier;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * Represents the current state of connections to Streaming Engine. Updated 
when backend worker
+ * pools change due to backend worker crashes or when backend workers assigned 
to the key range

Review Comment:
   nit: key ranges
   
   also the service is free to change it for any reason, so maybe better to 
phrase as examples of why it would change instead of exhaustive list.
   "The connections may change during pipeline execution for example during 
autoscaling, load-balancing or backend updates."



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamingEngineClient.java:
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.errorprone.annotations.CheckReturnValue;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkerMetadataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.grpcclient.ThrottleTimer;
+import 
org.apache.beam.runners.dataflow.worker.windmill.util.WindmillGrpcStubFactory;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client for StreamingEngine. Given a {@link GetWorkBudget}, divides the 
budget and starts the
+ * {@link 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream}(s).
+ */
+@CheckReturnValue
+@ThreadSafe
+public class StreamingEngineClient {
+  @VisibleForTesting static final int SCHEDULED_BUDGET_REFRESH_MILLIS = 100;
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingEngineClient.class);
+  private static final String SCHEDULED_BUDGET_REFRESH_THREAD = 
"ScheduledBudgetRefreshThread";
+  private static final String TRIGGERED_BUDGET_REFRESH_THREAD = 
"TriggeredBudgetRefreshThread";
+  private static final String CONSUMER_WORKER_METADATA_THREAD = 
"ConsumeWorkerMetadataThread";
+  private final AtomicBoolean started;
+  private final AtomicBoolean running;
+  private final GetWorkBudget totalGetWorkBudget;
+  private final StreamingEngineStreamFactory streamingEngineStreamFactory;
+  private final WorkItemReceiver workItemReceiver;
+  private final WindmillGrpcStubFactory windmillGrpcStubFactory;
+  private final GetWorkRequest getWorkRequest;
+  private final GetWorkBudgetDistributor getWorkBudgetDistributor;
+  private final DispatcherClient dispatcherClient;
+  /**
+   * Reference to {@link GetWorkerMetadataStream} that is lazily initialized, 
with its initial value
+   * being null.
+   */
+  private final AtomicReference<GetWorkerMetadataStream> 
getWorkerMetadataStream;
+
+  private final AtomicBoolean isBudgetRefreshPaused;
+  /** Writes are guarded by synchronization, reads are lock free. */
+  private final AtomicReference<StreamEngineConnectionState> connections;
+
+  private final ExecutorService triggeredBudgetRefreshListener;
+  /**
+   * Used to implement publish/subscribe behavior for triggering budget 
refreshes/redistribution.
+   * Subscriber {@link #triggeredBudgetRefreshListener} waits for messages to 
be written to the
+   * queue, blocking if necessary.
+   */
+  private final SynchronousQueue<Boolean> budgetRefreshTrigger;
+
+  /** Redistributes the budget on a timed cadence. */
+  private final ScheduledExecutorService scheduledBudgetRefreshExecutor;
+
+  private final ThrottleTimer getWorkerMetadataThrottleTimer;
+  private final CountDownLatch getWorkerMetadataReady;
+  private final ExecutorService consumeWorkerMetadataExecutor;
+
+  private StreamingEngineClient(
+      AtomicBoolean running,
+      GetWorkBudget totalGetWorkBudget,
+      AtomicReference<StreamEngineConnectionState> connections,
+      StreamingEngineStreamFactory streamingEngineStreamFactory,
+      WorkItemReceiver workItemReceiver,
+      WindmillGrpcStubFactory windmillGrpcStubFactory,
+      GetWorkRequest getWorkRequest,
+      GetWorkBudgetDistributor getWorkBudgetDistributor,
+      DispatcherClient dispatcherClient) {
+    this.started = new AtomicBoolean();
+    this.running = running;
+    this.totalGetWorkBudget = totalGetWorkBudget;
+    this.streamingEngineStreamFactory = streamingEngineStreamFactory;
+    this.workItemReceiver = workItemReceiver;
+    this.connections = connections;
+    this.windmillGrpcStubFactory = windmillGrpcStubFactory;
+    this.getWorkRequest = getWorkRequest;
+    this.getWorkBudgetDistributor = getWorkBudgetDistributor;
+    this.dispatcherClient = dispatcherClient;
+    this.isBudgetRefreshPaused = new AtomicBoolean(false);
+    this.budgetRefreshTrigger = new SynchronousQueue<>(/* fair= */ true);
+    this.getWorkerMetadataThrottleTimer = new ThrottleTimer();
+    this.scheduledBudgetRefreshExecutor =
+        createSingleThreadedExecutor(SCHEDULED_BUDGET_REFRESH_THREAD);
+    this.consumeWorkerMetadataExecutor =
+        createSingleThreadedExecutor(CONSUMER_WORKER_METADATA_THREAD);
+    this.triggeredBudgetRefreshListener =
+        createSingleThreadedExecutor(TRIGGERED_BUDGET_REFRESH_THREAD);
+    this.getWorkerMetadataStream = new AtomicReference<>();
+    this.getWorkerMetadataReady = new CountDownLatch(1);
+  }
+
+  public static StreamingEngineClient create(
+      AtomicBoolean running,

Review Comment:
   this seems like an odd parameter, can it just be an isRunning() method on 
this class?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamingEngineClient.java:
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.errorprone.annotations.CheckReturnValue;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkerMetadataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.grpcclient.ThrottleTimer;
+import 
org.apache.beam.runners.dataflow.worker.windmill.util.WindmillGrpcStubFactory;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client for StreamingEngine. Given a {@link GetWorkBudget}, divides the 
budget and starts the
+ * {@link 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream}(s).
+ */
+@CheckReturnValue
+@ThreadSafe
+public class StreamingEngineClient {
+  @VisibleForTesting static final int SCHEDULED_BUDGET_REFRESH_MILLIS = 100;
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingEngineClient.class);
+  private static final String SCHEDULED_BUDGET_REFRESH_THREAD = 
"ScheduledBudgetRefreshThread";
+  private static final String TRIGGERED_BUDGET_REFRESH_THREAD = 
"TriggeredBudgetRefreshThread";
+  private static final String CONSUMER_WORKER_METADATA_THREAD = 
"ConsumeWorkerMetadataThread";
+  private final AtomicBoolean started;
+  private final AtomicBoolean running;
+  private final GetWorkBudget totalGetWorkBudget;
+  private final StreamingEngineStreamFactory streamingEngineStreamFactory;
+  private final WorkItemReceiver workItemReceiver;
+  private final WindmillGrpcStubFactory windmillGrpcStubFactory;
+  private final GetWorkRequest getWorkRequest;
+  private final GetWorkBudgetDistributor getWorkBudgetDistributor;
+  private final DispatcherClient dispatcherClient;
+  /**
+   * Reference to {@link GetWorkerMetadataStream} that is lazily initialized, 
with its initial value
+   * being null.
+   */
+  private final AtomicReference<GetWorkerMetadataStream> 
getWorkerMetadataStream;
+
+  private final AtomicBoolean isBudgetRefreshPaused;
+  /** Writes are guarded by synchronization, reads are lock free. */
+  private final AtomicReference<StreamEngineConnectionState> connections;
+
+  private final ExecutorService triggeredBudgetRefreshListener;
+  /**
+   * Used to implement publish/subscribe behavior for triggering budget 
refreshes/redistribution.
+   * Subscriber {@link #triggeredBudgetRefreshListener} waits for messages to 
be written to the
+   * queue, blocking if necessary.
+   */
+  private final SynchronousQueue<Boolean> budgetRefreshTrigger;
+
+  /** Redistributes the budget on a timed cadence. */
+  private final ScheduledExecutorService scheduledBudgetRefreshExecutor;
+
+  private final ThrottleTimer getWorkerMetadataThrottleTimer;
+  private final CountDownLatch getWorkerMetadataReady;
+  private final ExecutorService consumeWorkerMetadataExecutor;
+
+  private StreamingEngineClient(
+      AtomicBoolean running,
+      GetWorkBudget totalGetWorkBudget,
+      AtomicReference<StreamEngineConnectionState> connections,
+      StreamingEngineStreamFactory streamingEngineStreamFactory,
+      WorkItemReceiver workItemReceiver,
+      WindmillGrpcStubFactory windmillGrpcStubFactory,
+      GetWorkRequest getWorkRequest,
+      GetWorkBudgetDistributor getWorkBudgetDistributor,
+      DispatcherClient dispatcherClient) {
+    this.started = new AtomicBoolean();
+    this.running = running;
+    this.totalGetWorkBudget = totalGetWorkBudget;
+    this.streamingEngineStreamFactory = streamingEngineStreamFactory;
+    this.workItemReceiver = workItemReceiver;
+    this.connections = connections;
+    this.windmillGrpcStubFactory = windmillGrpcStubFactory;
+    this.getWorkRequest = getWorkRequest;
+    this.getWorkBudgetDistributor = getWorkBudgetDistributor;
+    this.dispatcherClient = dispatcherClient;
+    this.isBudgetRefreshPaused = new AtomicBoolean(false);
+    this.budgetRefreshTrigger = new SynchronousQueue<>(/* fair= */ true);
+    this.getWorkerMetadataThrottleTimer = new ThrottleTimer();
+    this.scheduledBudgetRefreshExecutor =
+        createSingleThreadedExecutor(SCHEDULED_BUDGET_REFRESH_THREAD);
+    this.consumeWorkerMetadataExecutor =
+        createSingleThreadedExecutor(CONSUMER_WORKER_METADATA_THREAD);
+    this.triggeredBudgetRefreshListener =
+        createSingleThreadedExecutor(TRIGGERED_BUDGET_REFRESH_THREAD);
+    this.getWorkerMetadataStream = new AtomicReference<>();
+    this.getWorkerMetadataReady = new CountDownLatch(1);
+  }
+
+  public static StreamingEngineClient create(

Review Comment:
   I think some comments would be good here



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/StreamingEngineClient.java:
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
+
+import com.google.errorprone.annotations.CheckReturnValue;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.GetWorkRequest;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetDataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkerMetadataStream;
+import 
org.apache.beam.runners.dataflow.worker.windmill.grpcclient.ThrottleTimer;
+import 
org.apache.beam.runners.dataflow.worker.windmill.util.WindmillGrpcStubFactory;
+import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
+import 
org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client for StreamingEngine. Given a {@link GetWorkBudget}, divides the 
budget and starts the
+ * {@link 
org.apache.beam.runners.dataflow.worker.windmill.WindmillStream.GetWorkStream}(s).
+ */
+@CheckReturnValue
+@ThreadSafe
+public class StreamingEngineClient {
+  @VisibleForTesting static final int SCHEDULED_BUDGET_REFRESH_MILLIS = 100;
+  private static final Logger LOG = 
LoggerFactory.getLogger(StreamingEngineClient.class);
+  private static final String SCHEDULED_BUDGET_REFRESH_THREAD = 
"ScheduledBudgetRefreshThread";
+  private static final String TRIGGERED_BUDGET_REFRESH_THREAD = 
"TriggeredBudgetRefreshThread";
+  private static final String CONSUMER_WORKER_METADATA_THREAD = 
"ConsumeWorkerMetadataThread";
+  private final AtomicBoolean started;
+  private final AtomicBoolean running;
+  private final GetWorkBudget totalGetWorkBudget;
+  private final StreamingEngineStreamFactory streamingEngineStreamFactory;
+  private final WorkItemReceiver workItemReceiver;
+  private final WindmillGrpcStubFactory windmillGrpcStubFactory;
+  private final GetWorkRequest getWorkRequest;
+  private final GetWorkBudgetDistributor getWorkBudgetDistributor;
+  private final DispatcherClient dispatcherClient;
+  /**
+   * Reference to {@link GetWorkerMetadataStream} that is lazily initialized, 
with its initial value
+   * being null.
+   */
+  private final AtomicReference<GetWorkerMetadataStream> 
getWorkerMetadataStream;
+
+  private final AtomicBoolean isBudgetRefreshPaused;
+  /** Writes are guarded by synchronization, reads are lock free. */
+  private final AtomicReference<StreamEngineConnectionState> connections;
+
+  private final ExecutorService triggeredBudgetRefreshListener;
+  /**
+   * Used to implement publish/subscribe behavior for triggering budget 
refreshes/redistribution.
+   * Subscriber {@link #triggeredBudgetRefreshListener} waits for messages to 
be written to the
+   * queue, blocking if necessary.
+   */
+  private final SynchronousQueue<Boolean> budgetRefreshTrigger;
+
+  /** Redistributes the budget on a timed cadence. */
+  private final ScheduledExecutorService scheduledBudgetRefreshExecutor;
+
+  private final ThrottleTimer getWorkerMetadataThrottleTimer;
+  private final CountDownLatch getWorkerMetadataReady;
+  private final ExecutorService consumeWorkerMetadataExecutor;
+
+  private StreamingEngineClient(
+      AtomicBoolean running,
+      GetWorkBudget totalGetWorkBudget,
+      AtomicReference<StreamEngineConnectionState> connections,
+      StreamingEngineStreamFactory streamingEngineStreamFactory,
+      WorkItemReceiver workItemReceiver,
+      WindmillGrpcStubFactory windmillGrpcStubFactory,
+      GetWorkRequest getWorkRequest,
+      GetWorkBudgetDistributor getWorkBudgetDistributor,
+      DispatcherClient dispatcherClient) {
+    this.started = new AtomicBoolean();
+    this.running = running;
+    this.totalGetWorkBudget = totalGetWorkBudget;
+    this.streamingEngineStreamFactory = streamingEngineStreamFactory;
+    this.workItemReceiver = workItemReceiver;
+    this.connections = connections;
+    this.windmillGrpcStubFactory = windmillGrpcStubFactory;
+    this.getWorkRequest = getWorkRequest;
+    this.getWorkBudgetDistributor = getWorkBudgetDistributor;
+    this.dispatcherClient = dispatcherClient;
+    this.isBudgetRefreshPaused = new AtomicBoolean(false);
+    this.budgetRefreshTrigger = new SynchronousQueue<>(/* fair= */ true);
+    this.getWorkerMetadataThrottleTimer = new ThrottleTimer();
+    this.scheduledBudgetRefreshExecutor =
+        createSingleThreadedExecutor(SCHEDULED_BUDGET_REFRESH_THREAD);
+    this.consumeWorkerMetadataExecutor =
+        createSingleThreadedExecutor(CONSUMER_WORKER_METADATA_THREAD);
+    this.triggeredBudgetRefreshListener =
+        createSingleThreadedExecutor(TRIGGERED_BUDGET_REFRESH_THREAD);
+    this.getWorkerMetadataStream = new AtomicReference<>();
+    this.getWorkerMetadataReady = new CountDownLatch(1);
+  }
+
+  public static StreamingEngineClient create(
+      AtomicBoolean running,
+      GetWorkBudget totalGetWorkBudget,
+      StreamingEngineStreamFactory streamingEngineStreamFactory,
+      WorkItemReceiver workItemReceiver,
+      WindmillGrpcStubFactory windmillGrpcStubFactory,
+      GetWorkRequest getWorkRequest,
+      GetWorkBudgetDistributor getWorkBudgetDistributor,
+      DispatcherClient dispatcherClient) {
+    StreamingEngineClient streamingEngineClient =
+        new StreamingEngineClient(
+            running,
+            totalGetWorkBudget,
+            new AtomicReference<>(StreamEngineConnectionState.EMPTY),
+            streamingEngineStreamFactory,
+            workItemReceiver,
+            windmillGrpcStubFactory,
+            getWorkRequest,
+            getWorkBudgetDistributor,
+            dispatcherClient);
+    streamingEngineClient.startGetWorkerMetadataStream();
+    return streamingEngineClient;
+  }
+
+  @VisibleForTesting
+  static StreamingEngineClient forTesting(
+      AtomicBoolean running,
+      GetWorkBudget totalGetWorkBudget,
+      AtomicReference<StreamEngineConnectionState> connections,
+      StreamingEngineStreamFactory streamingEngineStreamFactory,
+      WorkItemReceiver workItemReceiver,
+      WindmillGrpcStubFactory windmillGrpcStubFactory,
+      GetWorkRequest getWorkRequest,
+      GetWorkBudgetDistributor getWorkBudgetDistributor,
+      DispatcherClient dispatcherClient) {
+    StreamingEngineClient streamingEngineClient =
+        new StreamingEngineClient(
+            running,
+            totalGetWorkBudget,
+            connections,
+            streamingEngineStreamFactory,
+            workItemReceiver,
+            windmillGrpcStubFactory,
+            getWorkRequest,
+            getWorkBudgetDistributor,
+            dispatcherClient);
+    streamingEngineClient.startGetWorkerMetadataStream();
+    return streamingEngineClient;
+  }
+
+  private static ScheduledExecutorService createSingleThreadedExecutor(String 
threadName) {
+    return Executors.newSingleThreadScheduledExecutor(
+        new ThreadFactoryBuilder()
+            .setNameFormat(threadName)
+            .setUncaughtExceptionHandler(
+                (t, e) ->
+                    LOG.error(
+                        "{} failed due to uncaught exception during execution. 
", t.getName(), e))
+            .build());
+  }
+
+  /**
+   * Starts the streams with the {@link #connections} values. Does nothing if 
this has already been
+   * called.
+   *
+   * @throws IllegalArgumentException if trying to start before {@link 
#connections} are set with
+   *     {@link GetWorkerMetadataStream}.
+   */
+  public void startAndCacheStreams() {
+    // Do nothing if we have already initialized the initial streams.
+    if (!started.compareAndSet(false, true)) {
+      return;
+    }
+    waitForStreamingEngineEndpoints();
+    StreamEngineConnectionState currentConnectionsState = connections.get();
+    Preconditions.checkState(
+        !StreamEngineConnectionState.EMPTY.equals(currentConnectionsState),
+        "Cannot start streams without connections.");
+    LOG.info("Starting initial GetWorkStreams with connections={}", 
currentConnectionsState);
+    ImmutableCollection<WindmillStreamSender> windmillStreamSenders =
+        currentConnectionsState.windmillStreams().values();
+    getWorkBudgetDistributor.distributeBudget(
+        currentConnectionsState.windmillStreams().values(), 
totalGetWorkBudget);
+    windmillStreamSenders.forEach(WindmillStreamSender::startStreams);
+    startBudgetRefreshThreads();
+  }
+
+  private void waitForStreamingEngineEndpoints() {
+    try {
+      getWorkerMetadataReady.await();
+    } catch (InterruptedException e) {
+      throw new StreamingEngineClientException(
+          "Error occurred waiting for StreamingEngine backend endpoints.", e);
+    }
+  }
+
+  /**
+   * {@link java.util.function.Consumer<WindmillEndpoints>} used to update 
{@link #connections} on
+   * new backend worker metadata.
+   */
+  private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints 
newWindmillEndpoints) {
+    isBudgetRefreshPaused.set(true);
+    LOG.info("Consuming new windmill endpoints: {}", newWindmillEndpoints);
+    ImmutableMap<Endpoint, WindmillConnection> newWindmillConnections =
+        
createNewWindmillConnections(ImmutableSet.copyOf(newWindmillEndpoints.windmillEndpoints()));
+    ImmutableMap<WindmillConnection, WindmillStreamSender> newWindmillStreams =
+        
closeStaleStreamsAndCreateNewStreams(ImmutableSet.copyOf(newWindmillConnections.values()));
+    ImmutableMap<Endpoint, Supplier<GetDataStream>> newGlobalDataStreams =
+        createNewGlobalDataStreams(
+            
ImmutableSet.copyOf(newWindmillEndpoints.globalDataEndpoints().values()));
+
+    StreamEngineConnectionState newConnectionsState =
+        StreamEngineConnectionState.builder()
+            .setWindmillConnections(newWindmillConnections)
+            .setWindmillStreams(newWindmillStreams)
+            .setGlobalDataEndpoints(newWindmillEndpoints.globalDataEndpoints())
+            .setGlobalDataStreams(newGlobalDataStreams)
+            .build();
+
+    LOG.info(
+        "Setting new connections: {}. Previous connections: {}.",
+        newConnectionsState,
+        connections.get());
+
+    isBudgetRefreshPaused.set(false);
+    connections.set(newConnectionsState);
+
+    // On first worker metadata. Trigger
+    if (getWorkerMetadataReady.getCount() > 0) {
+      getWorkerMetadataReady.countDown();
+    } else {
+      requestBudgetRefresh();
+    }
+  }
+
+  public ImmutableList<Long> getAndResetThrottleTimes() {
+    StreamEngineConnectionState currentConnections = connections.get();
+
+    ImmutableList<Long> keyedWorkStreamThrottleTimes =
+        currentConnections.windmillStreams().values().stream()
+            .map(WindmillStreamSender::getAndResetThrottleTime)
+            .collect(toImmutableList());
+
+    return ImmutableList.<Long>builder()
+        .add(getWorkerMetadataThrottleTimer.getAndResetThrottleTime())
+        .addAll(keyedWorkStreamThrottleTimes)
+        .build();
+  }
+
+  /** Starts {@link GetWorkerMetadataStream}. */
+  @SuppressWarnings({
+    "FutureReturnValueIgnored", // ignoring Future returned from 
Executor.submit()
+    "nullness" // Uninitialized value of getWorkerMetadataStream is null.
+  })
+  private synchronized void startGetWorkerMetadataStream() {
+    // We only want to set and start this value once.
+    getWorkerMetadataStream.compareAndSet(
+        null,
+        streamingEngineStreamFactory.createGetWorkerMetadataStream(
+            dispatcherClient.getDispatcherStub(),
+            getWorkerMetadataThrottleTimer,
+            endpoints ->
+                consumeWorkerMetadataExecutor.submit(
+                    () -> consumeWindmillWorkerEndpoints(endpoints))));
+  }
+
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private void startBudgetRefreshThreads() {
+    scheduledBudgetRefreshExecutor.scheduleAtFixedRate(
+        () -> {
+          if (running.get() && started.get()) {
+            refreshBudget();
+          }
+        },
+        SCHEDULED_BUDGET_REFRESH_MILLIS,
+        SCHEDULED_BUDGET_REFRESH_MILLIS,
+        TimeUnit.MILLISECONDS);
+    triggeredBudgetRefreshListener.submit(this::triggeredBudgetRefresh);
+  }
+
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private void triggeredBudgetRefresh() {
+    while (running.get() && started.get()) {
+      try {
+        // Blocks here until a value is available.
+        budgetRefreshTrigger.take();
+        refreshBudget();
+      } catch (InterruptedException e) {
+        LOG.error("Error occurred during triggered budget refresh. ", e);
+        break;
+      }
+    }
+
+    // On error or if not ready retry.
+    triggeredBudgetRefreshListener.submit(this::triggeredBudgetRefresh);
+  }
+
+  private void refreshBudget() {
+    if (isBudgetRefreshPaused.get()) {
+      return;
+    }
+
+    synchronized (this) {
+      getWorkBudgetDistributor.distributeBudget(
+          connections.get().windmillStreams().values(), totalGetWorkBudget);
+    }
+  }
+
+  private void requestBudgetRefresh() {
+    try {
+      budgetRefreshTrigger.put(true);

Review Comment:
   this is going to cause this thread to block until the refresh thread takes 
it.
   
   another option would be to use some sort of lock where you set value to 
true, and the other thread is waiting until condition is met, where then it 
clears value, releases lock, processes budget and then goes back to waiting.  
That makes the requestBudgetRefresh entirely non-blocking which if we're going 
to call from grpc threads etc is likely a property we want 
   
   com/google/common/util/concurrent/Monitor.java is perhaps one way to do that 
that though there may be better options.
   
   It might be nice to structure so that refreshing is always done by a single 
background thread which either waits for triggers or if we need periodic 
evaluation can timeout and reevaluate as well.  Having a single modifier will 
make it easier to think through the possible races.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to