m-trieu commented on code in PR #30046:
URL: https://github.com/apache/beam/pull/30046#discussion_r1459690150


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##########
@@ -81,6 +69,65 @@ public static WindmillEndpoints.Builder builder() {
     return new AutoValue_WindmillEndpoints.Builder();
   }
 
+  private static Optional<WindmillServiceAddress> parseDirectEndpoint(String 
directEndpoint) {
+    Optional<WindmillServiceAddress> directEndpointIpV6Address =
+        
tryParseDirectEndpointIntoIpV6Address(directEndpoint).map(WindmillServiceAddress::create);
+
+    return directEndpointIpV6Address.isPresent()
+        ? directEndpointIpV6Address
+        : 
tryParseEndpointIntoHostAndPort(directEndpoint).map(WindmillServiceAddress::create);

Review Comment:
   i am wondering if we should jsut drop the ipv6
   since HostAndPort supports ipv6



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java:
##########
@@ -76,6 +76,10 @@ public abstract GetWorkStream getWorkStream(
   /** Returns the amount of time the server has been throttled and resets the 
time to 0. */
   public abstract long getAndResetThrottleTime();
 
+  public long clientId() {
+    return 0L;

Review Comment:
   ah ok out of curiosity wouldn't the other components restart also? 
   looks like we create a new worker in StreamingDataflowWorker.main
   
   And inject the WindmillServerStub (GrpcWindmillServer) via the options.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -45,92 +48,156 @@ class GrpcDispatcherClient {
   private final WindmillStubFactory windmillStubFactory;
 
   @GuardedBy("this")
-  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
-
-  @GuardedBy("this")
-  private final Set<HostAndPort> dispatcherEndpoints;
+  private final AtomicReference<DispatcherStubs> dispatcherStubs;
 
   @GuardedBy("this")
   private final Random rand;
 
   private GrpcDispatcherClient(
       WindmillStubFactory windmillStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
-      Set<HostAndPort> dispatcherEndpoints,
+      AtomicReference<DispatcherStubs> dispatcherStubs,

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##########
@@ -90,31 +137,20 @@ public static WindmillEndpoints.Builder builder() {
    */
   @AutoValue
   public abstract static class Endpoint {
-    /**
-     * {@link WindmillServiceAddress} representation of {@link
-     * Windmill.WorkerMetadataResponse.Endpoint#getDirectEndpoint()}. The 
proto's direct_endpoint
-     * string can be converted to either {@link Inet6Address} or {@link 
HostAndPort}.
-     */
-    public abstract Optional<WindmillServiceAddress> directEndpoint();
-
-    /**
-     * Corresponds to {@link 
Windmill.WorkerMetadataResponse.Endpoint#getWorkerToken()} in the
-     * windmill.proto file.
-     */
-    public abstract Optional<String> workerToken();
-
     public static Endpoint.Builder builder() {
       return new AutoValue_WindmillEndpoints_Endpoint.Builder();
     }
 
-    public static Endpoint from(Windmill.WorkerMetadataResponse.Endpoint 
endpointProto) {
+    public static Endpoint from(
+        Windmill.WorkerMetadataResponse.Endpoint endpointProto, String 
authenticatingService) {
       Endpoint.Builder endpointBuilder = Endpoint.builder();
       if (endpointProto.hasDirectEndpoint() && 
!endpointProto.getDirectEndpoint().isEmpty()) {

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##########
@@ -81,6 +69,65 @@ public static WindmillEndpoints.Builder builder() {
     return new AutoValue_WindmillEndpoints.Builder();
   }
 
+  private static Optional<WindmillServiceAddress> parseDirectEndpoint(String 
directEndpoint) {
+    Optional<WindmillServiceAddress> directEndpointIpV6Address =
+        
tryParseDirectEndpointIntoIpV6Address(directEndpoint).map(WindmillServiceAddress::create);
+
+    return directEndpointIpV6Address.isPresent()
+        ? directEndpointIpV6Address
+        : 
tryParseEndpointIntoHostAndPort(directEndpoint).map(WindmillServiceAddress::create);

Review Comment:
   its possible that the tryParseEndpointIntoHostAndPort fails so it returns 
Optional<HostAndPort> and orElseGet requires a signature that returns 
HostAndPort



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -45,92 +48,156 @@ class GrpcDispatcherClient {
   private final WindmillStubFactory windmillStubFactory;
 
   @GuardedBy("this")

Review Comment:
   updated



##########
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto:
##########
@@ -853,14 +858,25 @@ message WorkerMetadataResponse {
   message Endpoint {
     // IPv6 address of a streaming engine windmill worker.
     optional string direct_endpoint = 1;
-    optional string worker_token = 2;
+    optional string backend_worker_token = 2;
+    optional int64 port = 3;
   }
+
   repeated Endpoint work_endpoints = 2;
 
   // Maps from GlobalData tag to the endpoint that should be used for GetData
   // calls to retrieve that global data.
   map<string, Endpoint> global_data_endpoints = 3;
 
+  // DNS name of the windmill worker service. This is used only in the case
+  // direct path communication is established using VIP. In order to enact
+  // Server Authorization checks, User Worker (client) set authority of the
+  // gRPC channel to this endpoint. Server Authorization checks ensure the
+  // servers clients connect to are being run by authorized borg roles. All

Review Comment:
   done.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##########
@@ -90,31 +137,20 @@ public static WindmillEndpoints.Builder builder() {
    */
   @AutoValue
   public abstract static class Endpoint {
-    /**
-     * {@link WindmillServiceAddress} representation of {@link
-     * Windmill.WorkerMetadataResponse.Endpoint#getDirectEndpoint()}. The 
proto's direct_endpoint
-     * string can be converted to either {@link Inet6Address} or {@link 
HostAndPort}.
-     */
-    public abstract Optional<WindmillServiceAddress> directEndpoint();
-
-    /**
-     * Corresponds to {@link 
Windmill.WorkerMetadataResponse.Endpoint#getWorkerToken()} in the
-     * windmill.proto file.
-     */
-    public abstract Optional<String> workerToken();
-
     public static Endpoint.Builder builder() {
       return new AutoValue_WindmillEndpoints_Endpoint.Builder();
     }
 
-    public static Endpoint from(Windmill.WorkerMetadataResponse.Endpoint 
endpointProto) {
+    public static Endpoint from(
+        Windmill.WorkerMetadataResponse.Endpoint endpointProto, String 
authenticatingService) {
       Endpoint.Builder endpointBuilder = Endpoint.builder();
       if (endpointProto.hasDirectEndpoint() && 
!endpointProto.getDirectEndpoint().isEmpty()) {
         parseDirectEndpoint(endpointProto.getDirectEndpoint())
             .ifPresent(endpointBuilder::setDirectEndpoint);
       }
-      if (endpointProto.hasWorkerToken() && 
!endpointProto.getWorkerToken().isEmpty()) {
-        endpointBuilder.setWorkerToken(endpointProto.getWorkerToken());
+      if (endpointProto.hasBackendWorkerToken()

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -44,93 +47,172 @@ class GrpcDispatcherClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDispatcherClient.class);
   private final WindmillStubFactory windmillStubFactory;
 
-  @GuardedBy("this")
-  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
-
-  @GuardedBy("this")
-  private final Set<HostAndPort> dispatcherEndpoints;
+  /**
+   * Current dispatcher endpoints and stubs used to communicate with Windmill 
Dispatcher.
+   *
+   * @implNote Reads are lock free, writes are synchronized.
+   */
+  private final AtomicReference<DispatcherStubs> dispatcherStubs;
 
   @GuardedBy("this")
   private final Random rand;
 
   private GrpcDispatcherClient(
       WindmillStubFactory windmillStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
-      Set<HostAndPort> dispatcherEndpoints,
+      DispatcherStubs initialDispatcherStubs,
       Random rand) {
     this.windmillStubFactory = windmillStubFactory;
-    this.dispatcherStubs = dispatcherStubs;
-    this.dispatcherEndpoints = dispatcherEndpoints;
     this.rand = rand;
+    this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
   }
 
   static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) {
-    return new GrpcDispatcherClient(
-        windmillStubFactory, new ArrayList<>(), new HashSet<>(), new Random());
+    return new GrpcDispatcherClient(windmillStubFactory, 
DispatcherStubs.empty(), new Random());
   }
 
   @VisibleForTesting
   static GrpcDispatcherClient forTesting(
       WindmillStubFactory windmillGrpcStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+      List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
+      List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs,
       Set<HostAndPort> dispatcherEndpoints) {
-    Preconditions.checkArgument(dispatcherEndpoints.size() == 
dispatcherStubs.size());
+    Preconditions.checkArgument(
+        dispatcherEndpoints.size() == windmillServiceStubs.size()
+            && windmillServiceStubs.size() == 
windmillMetadataServiceStubs.size());
     return new GrpcDispatcherClient(
-        windmillGrpcStubFactory, dispatcherStubs, dispatcherEndpoints, new 
Random());
+        windmillGrpcStubFactory,
+        DispatcherStubs.create(
+            dispatcherEndpoints, windmillServiceStubs, 
windmillMetadataServiceStubs),
+        new Random());
   }
 
-  synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() {
+  CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
+    ImmutableList<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs =
+        dispatcherStubs.get().windmillServiceStubs();
     Preconditions.checkState(
-        !dispatcherStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
+        !windmillServiceStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
 
-    return (dispatcherStubs.size() == 1
-        ? dispatcherStubs.get(0)
-        : dispatcherStubs.get(rand.nextInt(dispatcherStubs.size())));
+    return (windmillServiceStubs.size() == 1
+        ? windmillServiceStubs.get(0)
+        : randomlySelectNextStub(windmillServiceStubs));
   }
 
-  synchronized boolean isReady() {
-    return !dispatcherStubs.isEmpty();
+  CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStub() {
+    ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs =
+        dispatcherStubs.get().windmillMetadataServiceStubs();
+    Preconditions.checkState(
+        !windmillMetadataServiceStubs.isEmpty(), "windmillServiceEndpoint has 
not been set");
+
+    return (windmillMetadataServiceStubs.size() == 1
+        ? windmillMetadataServiceStubs.get(0)
+        : randomlySelectNextStub(windmillMetadataServiceStubs));
+  }
+
+  private synchronized <T> T randomlySelectNextStub(List<T> stubs) {
+    return stubs.get(rand.nextInt(dispatcherStubs.get().size()));
+  }
+
+  boolean isReady() {
+    return dispatcherStubs.get().isReady();
   }
 
   synchronized void consumeWindmillDispatcherEndpoints(
       ImmutableSet<HostAndPort> dispatcherEndpoints) {
+    ImmutableSet<HostAndPort> currentDispatcherEndpoints =
+        dispatcherStubs.get().dispatcherEndpoints();
     Preconditions.checkArgument(
         dispatcherEndpoints != null && !dispatcherEndpoints.isEmpty(),
         "Cannot set dispatcher endpoints to nothing.");
-    if (this.dispatcherEndpoints.equals(dispatcherEndpoints)) {
+    if (currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
       // The endpoints are equal don't recreate the stubs.
       return;
     }
 
     LOG.info("Creating a new windmill stub, endpoints: {}", 
dispatcherEndpoints);
-    if (!this.dispatcherEndpoints.isEmpty()) {
-      LOG.info("Previous windmill stub endpoints: {}", 
this.dispatcherEndpoints);
+    if (!currentDispatcherEndpoints.isEmpty()) {
+      LOG.info("Previous windmill stub endpoints: {}", 
currentDispatcherEndpoints);
     }
 
-    resetDispatcherEndpoints(dispatcherEndpoints);
+    LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", 
dispatcherEndpoints);
+    dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, 
windmillStubFactory));
   }
 
-  private synchronized void resetDispatcherEndpoints(
-      ImmutableSet<HostAndPort> newDispatcherEndpoints) {
-    LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", 
newDispatcherEndpoints);
-    this.dispatcherStubs.clear();
-    this.dispatcherEndpoints.clear();
-    this.dispatcherEndpoints.addAll(newDispatcherEndpoints);
+  /**
+   * Endpoints and gRPC stubs used to communicate with the Windmill 
Dispatcher. {@link
+   * #dispatcherEndpoints()}, {@link #windmillServiceStubs()}, and {@link
+   * #windmillMetadataServiceStubs()} collections should all be of the same 
size.
+   */
+  @AutoValue
+  abstract static class DispatcherStubs {
+
+    private static DispatcherStubs empty() {
+      return new AutoValue_GrpcDispatcherClient_DispatcherStubs(
+          ImmutableSet.of(), ImmutableList.of(), ImmutableList.of());
+    }
 
-    dispatcherEndpoints.stream()
-        .map(this::createDispatcherStubForWindmillService)
-        .forEach(dispatcherStubs::add);
-  }
+    private static DispatcherStubs create(
+        Set<HostAndPort> endpoints,
+        List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
+        List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs) {
+      return new AutoValue_GrpcDispatcherClient_DispatcherStubs(

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##########
@@ -81,6 +70,75 @@ public static WindmillEndpoints.Builder builder() {
     return new AutoValue_WindmillEndpoints.Builder();
   }
 
+  private static Optional<WindmillServiceAddress> parseDirectEndpoint(
+      Windmill.WorkerMetadataResponse.Endpoint endpointProto, String 
authenticatingService) {
+    Optional<WindmillServiceAddress> directEndpointIpV6Address =
+        tryParseDirectEndpointIntoIpV6Address(endpointProto)
+            .map(address -> 
AuthenticatedGcpServiceAddress.create(authenticatingService, address))
+            .map(WindmillServiceAddress::create);
+
+    return directEndpointIpV6Address.isPresent()
+        ? directEndpointIpV6Address
+        : tryParseEndpointIntoHostAndPort(endpointProto.getDirectEndpoint())
+            .map(WindmillServiceAddress::create);
+  }
+
+  private static Optional<HostAndPort> tryParseEndpointIntoHostAndPort(String 
directEndpoint) {
+    try {
+      return Optional.of(HostAndPort.fromString(directEndpoint));
+    } catch (IllegalArgumentException e) {
+      LOG.warn("{} cannot be parsed into a gcpServiceAddress", directEndpoint);
+      return Optional.empty();
+    }
+  }
+
+  private static Optional<HostAndPort> tryParseDirectEndpointIntoIpV6Address(
+      Windmill.WorkerMetadataResponse.Endpoint endpointProto) {
+    if (!endpointProto.hasDirectEndpoint()) {
+      return Optional.empty();
+    }
+
+    InetAddress directEndpointAddress = null;
+    try {
+      directEndpointAddress = 
Inet6Address.getByName(endpointProto.getDirectEndpoint());
+    } catch (UnknownHostException e) {
+      LOG.warn(
+          "Error occurred trying to parse direct_endpoint={} into IPv6 
address. Exception={}",
+          endpointProto.getDirectEndpoint(),
+          e.toString());

Review Comment:
   done.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -45,92 +48,156 @@ class GrpcDispatcherClient {
   private final WindmillStubFactory windmillStubFactory;
 
   @GuardedBy("this")
-  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
-
-  @GuardedBy("this")
-  private final Set<HostAndPort> dispatcherEndpoints;
+  private final AtomicReference<DispatcherStubs> dispatcherStubs;
 
   @GuardedBy("this")
   private final Random rand;
 
   private GrpcDispatcherClient(
       WindmillStubFactory windmillStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
-      Set<HostAndPort> dispatcherEndpoints,
+      AtomicReference<DispatcherStubs> dispatcherStubs,

Review Comment:
   done.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -44,93 +47,173 @@ class GrpcDispatcherClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDispatcherClient.class);
   private final WindmillStubFactory windmillStubFactory;
 
-  @GuardedBy("this")
-  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
-
-  @GuardedBy("this")
-  private final Set<HostAndPort> dispatcherEndpoints;
+  /**
+   * Current dispatcher endpoints and stubs used to communicate with Windmill 
Dispatcher.
+   *
+   * @implNote Reads are lock free, writes are synchronized.
+   */
+  private final AtomicReference<DispatcherStubs> dispatcherStubs;
 
   @GuardedBy("this")
   private final Random rand;
 
   private GrpcDispatcherClient(
       WindmillStubFactory windmillStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
-      Set<HostAndPort> dispatcherEndpoints,
+      DispatcherStubs initialDispatcherStubs,
       Random rand) {
     this.windmillStubFactory = windmillStubFactory;
-    this.dispatcherStubs = dispatcherStubs;
-    this.dispatcherEndpoints = dispatcherEndpoints;
     this.rand = rand;
+    this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
   }
 
   static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) {
-    return new GrpcDispatcherClient(
-        windmillStubFactory, new ArrayList<>(), new HashSet<>(), new Random());
+    return new GrpcDispatcherClient(windmillStubFactory, 
DispatcherStubs.empty(), new Random());
   }
 
   @VisibleForTesting
   static GrpcDispatcherClient forTesting(
       WindmillStubFactory windmillGrpcStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+      List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
+      List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs,
       Set<HostAndPort> dispatcherEndpoints) {
-    Preconditions.checkArgument(dispatcherEndpoints.size() == 
dispatcherStubs.size());
+    Preconditions.checkArgument(
+        dispatcherEndpoints.size() == windmillServiceStubs.size()
+            && windmillServiceStubs.size() == 
windmillMetadataServiceStubs.size());
     return new GrpcDispatcherClient(
-        windmillGrpcStubFactory, dispatcherStubs, dispatcherEndpoints, new 
Random());
+        windmillGrpcStubFactory,
+        DispatcherStubs.create(
+            dispatcherEndpoints, windmillServiceStubs, 
windmillMetadataServiceStubs),
+        new Random());
+  }
+
+  CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
+    ImmutableList<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs =
+        dispatcherStubs.get().windmillServiceStubs();
+    Preconditions.checkState(
+        !windmillServiceStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
+
+    return (windmillServiceStubs.size() == 1
+        ? windmillServiceStubs.get(0)
+        : randomlySelectNextStub(windmillServiceStubs));
   }
 
-  synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() {
+  CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStub() {
+    ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs =
+        dispatcherStubs.get().windmillMetadataServiceStubs();
     Preconditions.checkState(
-        !dispatcherStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
+        !windmillMetadataServiceStubs.isEmpty(), "windmillServiceEndpoint has 
not been set");
+
+    return (windmillMetadataServiceStubs.size() == 1
+        ? windmillMetadataServiceStubs.get(0)
+        : randomlySelectNextStub(windmillMetadataServiceStubs));
+  }
 
-    return (dispatcherStubs.size() == 1
-        ? dispatcherStubs.get(0)
-        : dispatcherStubs.get(rand.nextInt(dispatcherStubs.size())));
+  private synchronized <T> T randomlySelectNextStub(List<T> stubs) {
+    return stubs.get(rand.nextInt(stubs.size()));
   }
 
-  synchronized boolean isReady() {
-    return !dispatcherStubs.isEmpty();
+  boolean isReady() {

Review Comment:
   We use this to make sure that the endpoints are set in 
StreamingDataflowWorker before we fetch the pipeline config as well as schedule 
periodic config fetches 
https://github.com/apache/beam/blob/f08058ca75cf41cc535bab2e0f5cd6ef38c801ee/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1615
   
   if we remove this we would need another mechanism for that.
   
   we could use some kind of publish/subscribe mechanism to signal when the 
endpoint is ready (kind of like a count down latch) and then send another 
signal during an update (i.e was ready with endpoints, but is no longer ready). 
 
   
   How often does this value change?  I would think that the service endpoint 
itself would not change during pipeline execution.  @scwhittle 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -44,93 +47,172 @@ class GrpcDispatcherClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDispatcherClient.class);
   private final WindmillStubFactory windmillStubFactory;
 
-  @GuardedBy("this")
-  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
-
-  @GuardedBy("this")
-  private final Set<HostAndPort> dispatcherEndpoints;
+  /**
+   * Current dispatcher endpoints and stubs used to communicate with Windmill 
Dispatcher.
+   *
+   * @implNote Reads are lock free, writes are synchronized.
+   */
+  private final AtomicReference<DispatcherStubs> dispatcherStubs;
 
   @GuardedBy("this")
   private final Random rand;
 
   private GrpcDispatcherClient(
       WindmillStubFactory windmillStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
-      Set<HostAndPort> dispatcherEndpoints,
+      DispatcherStubs initialDispatcherStubs,
       Random rand) {
     this.windmillStubFactory = windmillStubFactory;
-    this.dispatcherStubs = dispatcherStubs;
-    this.dispatcherEndpoints = dispatcherEndpoints;
     this.rand = rand;
+    this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
   }
 
   static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) {
-    return new GrpcDispatcherClient(
-        windmillStubFactory, new ArrayList<>(), new HashSet<>(), new Random());
+    return new GrpcDispatcherClient(windmillStubFactory, 
DispatcherStubs.empty(), new Random());
   }
 
   @VisibleForTesting
   static GrpcDispatcherClient forTesting(
       WindmillStubFactory windmillGrpcStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+      List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
+      List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs,
       Set<HostAndPort> dispatcherEndpoints) {
-    Preconditions.checkArgument(dispatcherEndpoints.size() == 
dispatcherStubs.size());
+    Preconditions.checkArgument(
+        dispatcherEndpoints.size() == windmillServiceStubs.size()
+            && windmillServiceStubs.size() == 
windmillMetadataServiceStubs.size());
     return new GrpcDispatcherClient(
-        windmillGrpcStubFactory, dispatcherStubs, dispatcherEndpoints, new 
Random());
+        windmillGrpcStubFactory,
+        DispatcherStubs.create(
+            dispatcherEndpoints, windmillServiceStubs, 
windmillMetadataServiceStubs),
+        new Random());
   }
 
-  synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() {
+  CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
+    ImmutableList<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs =
+        dispatcherStubs.get().windmillServiceStubs();
     Preconditions.checkState(
-        !dispatcherStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
+        !windmillServiceStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
 
-    return (dispatcherStubs.size() == 1
-        ? dispatcherStubs.get(0)
-        : dispatcherStubs.get(rand.nextInt(dispatcherStubs.size())));
+    return (windmillServiceStubs.size() == 1
+        ? windmillServiceStubs.get(0)
+        : randomlySelectNextStub(windmillServiceStubs));
   }
 
-  synchronized boolean isReady() {
-    return !dispatcherStubs.isEmpty();
+  CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStub() {
+    ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs =
+        dispatcherStubs.get().windmillMetadataServiceStubs();
+    Preconditions.checkState(
+        !windmillMetadataServiceStubs.isEmpty(), "windmillServiceEndpoint has 
not been set");
+
+    return (windmillMetadataServiceStubs.size() == 1
+        ? windmillMetadataServiceStubs.get(0)
+        : randomlySelectNextStub(windmillMetadataServiceStubs));
+  }
+
+  private synchronized <T> T randomlySelectNextStub(List<T> stubs) {
+    return stubs.get(rand.nextInt(dispatcherStubs.get().size()));
+  }
+
+  boolean isReady() {
+    return dispatcherStubs.get().isReady();
   }
 
   synchronized void consumeWindmillDispatcherEndpoints(
       ImmutableSet<HostAndPort> dispatcherEndpoints) {
+    ImmutableSet<HostAndPort> currentDispatcherEndpoints =
+        dispatcherStubs.get().dispatcherEndpoints();
     Preconditions.checkArgument(
         dispatcherEndpoints != null && !dispatcherEndpoints.isEmpty(),
         "Cannot set dispatcher endpoints to nothing.");
-    if (this.dispatcherEndpoints.equals(dispatcherEndpoints)) {
+    if (currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
       // The endpoints are equal don't recreate the stubs.
       return;
     }
 
     LOG.info("Creating a new windmill stub, endpoints: {}", 
dispatcherEndpoints);
-    if (!this.dispatcherEndpoints.isEmpty()) {
-      LOG.info("Previous windmill stub endpoints: {}", 
this.dispatcherEndpoints);
+    if (!currentDispatcherEndpoints.isEmpty()) {
+      LOG.info("Previous windmill stub endpoints: {}", 
currentDispatcherEndpoints);
     }
 
-    resetDispatcherEndpoints(dispatcherEndpoints);
+    LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", 
dispatcherEndpoints);
+    dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, 
windmillStubFactory));
   }
 
-  private synchronized void resetDispatcherEndpoints(
-      ImmutableSet<HostAndPort> newDispatcherEndpoints) {
-    LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", 
newDispatcherEndpoints);
-    this.dispatcherStubs.clear();
-    this.dispatcherEndpoints.clear();
-    this.dispatcherEndpoints.addAll(newDispatcherEndpoints);
+  /**
+   * Endpoints and gRPC stubs used to communicate with the Windmill 
Dispatcher. {@link
+   * #dispatcherEndpoints()}, {@link #windmillServiceStubs()}, and {@link
+   * #windmillMetadataServiceStubs()} collections should all be of the same 
size.
+   */
+  @AutoValue
+  abstract static class DispatcherStubs {
+
+    private static DispatcherStubs empty() {
+      return new AutoValue_GrpcDispatcherClient_DispatcherStubs(
+          ImmutableSet.of(), ImmutableList.of(), ImmutableList.of());
+    }
 
-    dispatcherEndpoints.stream()
-        .map(this::createDispatcherStubForWindmillService)
-        .forEach(dispatcherStubs::add);
-  }
+    private static DispatcherStubs create(
+        Set<HostAndPort> endpoints,
+        List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
+        List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs) {
+      return new AutoValue_GrpcDispatcherClient_DispatcherStubs(
+          ImmutableSet.copyOf(endpoints),
+          ImmutableList.copyOf(windmillServiceStubs),
+          ImmutableList.copyOf(windmillMetadataServiceStubs));
+    }
+
+    private static DispatcherStubs create(
+        ImmutableSet<HostAndPort> newDispatcherEndpoints, WindmillStubFactory 
windmillStubFactory) {
+      ImmutableList.Builder<CloudWindmillServiceV1Alpha1Stub> 
windmillServiceStubs =
+          ImmutableList.builder();
+      ImmutableList.Builder<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs =
+          ImmutableList.builder();
+
+      for (HostAndPort endpoint : newDispatcherEndpoints) {
+        windmillServiceStubs.add(createWindmillServiceStub(endpoint, 
windmillStubFactory));
+        windmillMetadataServiceStubs.add(
+            createWindmillMetadataServiceStub(endpoint, windmillStubFactory));
+      }
+
+      return new AutoValue_GrpcDispatcherClient_DispatcherStubs(
+          newDispatcherEndpoints,
+          windmillServiceStubs.build(),
+          windmillMetadataServiceStubs.build());
+    }
+
+    private static CloudWindmillServiceV1Alpha1Stub createWindmillServiceStub(
+        HostAndPort endpoint, WindmillStubFactory windmillStubFactory) {
+      if (LOCALHOST.equals(endpoint.getHost())) {
+        return 
CloudWindmillServiceV1Alpha1Grpc.newStub(localhostChannel(endpoint.getPort()));
+      }
+
+      return 
windmillStubFactory.createWindmillServiceStub(WindmillServiceAddress.create(endpoint));
+    }
+
+    private static CloudWindmillMetadataServiceV1Alpha1Stub 
createWindmillMetadataServiceStub(
+        HostAndPort endpoint, WindmillStubFactory windmillStubFactory) {
+      if (LOCALHOST.equals(endpoint.getHost())) {
+        return CloudWindmillMetadataServiceV1Alpha1Grpc.newStub(
+            localhostChannel(endpoint.getPort()));
+      }
 
-  private CloudWindmillServiceV1Alpha1Stub 
createDispatcherStubForWindmillService(
-      HostAndPort endpoint) {
-    if (LOCALHOST.equals(endpoint.getHost())) {
-      return 
CloudWindmillServiceV1Alpha1Grpc.newStub(localhostChannel(endpoint.getPort()));
+      return windmillStubFactory.createWindmillMetadataServiceStub(
+          WindmillServiceAddress.create(endpoint));
     }
 
-    // Use an in-process stub if testing.
-    return windmillStubFactory.getKind() == WindmillStubFactory.Kind.IN_PROCESS
-        ? windmillStubFactory.inProcess().get()
-        : 
windmillStubFactory.remote().apply(WindmillServiceAddress.create(endpoint));
+    private int size() {
+      return dispatcherEndpoints().size();
+    }
+
+    private boolean isReady() {

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java:
##########
@@ -101,36 +103,21 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
   // newer ComputationHeartbeatRequests.
   private final boolean sendKeyedGetDataRequests;
   private Consumer<List<ComputationHeartbeatResponse>> 
processHeartbeatResponses;
+  private @Nullable GrpcWindmillStreamFactory windmillStreamFactory;

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -422,13 +422,20 @@ public void run() {
     commitThread.setName("CommitThread");
 
     this.publishCounters = publishCounters;
+    this.clientId = clientIdGenerator.nextLong();
     this.windmillServer = options.getWindmillServerStub();
     
this.windmillServer.setProcessHeartbeatResponses(this::handleHeartbeatResponses);
+    windmillServer.start(

Review Comment:
   changed
   opted to allow StreamingDataflowWorker to inject a stream factory 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -422,13 +424,33 @@ public void run() {
     commitThread.setName("CommitThread");
 
     this.publishCounters = publishCounters;
+    this.clientId = clientIdGenerator.nextLong();
     this.windmillServer = options.getWindmillServerStub();
     
this.windmillServer.setProcessHeartbeatResponses(this::handleHeartbeatResponses);
+    Duration maxBackoff =
+        !options.isEnableStreamingEngine() && 
options.getLocalWindmillHostport() != null
+            ? GrpcWindmillServer.LOCALHOST_BACKOFF
+            : GrpcWindmillServer.MAX_BACKOFF;
+    GrpcWindmillStreamFactory windmillStreamFactory =

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkHeartbeatProcessor.java:
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.streaming;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationHeartbeatResponse;
+import 
org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatResponse;
+import org.apache.beam.sdk.annotations.Internal;
+
+@Internal
+public final class WorkHeartbeatProcessor implements 
Consumer<List<ComputationHeartbeatResponse>> {
+  private final Function<String, Optional<ComputationState>> 
computationStateFetcher;
+
+  public WorkHeartbeatProcessor(
+      Function<String, Optional<ComputationState>> computationStateFetcher) {

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java:
##########
@@ -84,14 +86,14 @@
 })
 @SuppressWarnings("nullness") // 
TODO(https://github.com/apache/beam/issues/20497
 public final class GrpcWindmillServer extends WindmillServerStub {
+  public static final Duration LOCALHOST_BACKOFF = Duration.millis(500);
+  public static final Duration MAX_BACKOFF = Duration.standardSeconds(30);

Review Comment:
   moved



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -44,93 +47,173 @@ class GrpcDispatcherClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDispatcherClient.class);
   private final WindmillStubFactory windmillStubFactory;
 
-  @GuardedBy("this")
-  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
-
-  @GuardedBy("this")
-  private final Set<HostAndPort> dispatcherEndpoints;
+  /**
+   * Current dispatcher endpoints and stubs used to communicate with Windmill 
Dispatcher.
+   *
+   * @implNote Reads are lock free, writes are synchronized.
+   */
+  private final AtomicReference<DispatcherStubs> dispatcherStubs;
 
   @GuardedBy("this")
   private final Random rand;
 
   private GrpcDispatcherClient(
       WindmillStubFactory windmillStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
-      Set<HostAndPort> dispatcherEndpoints,
+      DispatcherStubs initialDispatcherStubs,
       Random rand) {
     this.windmillStubFactory = windmillStubFactory;
-    this.dispatcherStubs = dispatcherStubs;
-    this.dispatcherEndpoints = dispatcherEndpoints;
     this.rand = rand;
+    this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
   }
 
   static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) {
-    return new GrpcDispatcherClient(
-        windmillStubFactory, new ArrayList<>(), new HashSet<>(), new Random());
+    return new GrpcDispatcherClient(windmillStubFactory, 
DispatcherStubs.empty(), new Random());
   }
 
   @VisibleForTesting
   static GrpcDispatcherClient forTesting(
       WindmillStubFactory windmillGrpcStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+      List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
+      List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs,
       Set<HostAndPort> dispatcherEndpoints) {
-    Preconditions.checkArgument(dispatcherEndpoints.size() == 
dispatcherStubs.size());
+    Preconditions.checkArgument(
+        dispatcherEndpoints.size() == windmillServiceStubs.size()
+            && windmillServiceStubs.size() == 
windmillMetadataServiceStubs.size());
     return new GrpcDispatcherClient(
-        windmillGrpcStubFactory, dispatcherStubs, dispatcherEndpoints, new 
Random());
+        windmillGrpcStubFactory,
+        DispatcherStubs.create(
+            dispatcherEndpoints, windmillServiceStubs, 
windmillMetadataServiceStubs),
+        new Random());
+  }
+
+  CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
+    ImmutableList<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs =
+        dispatcherStubs.get().windmillServiceStubs();
+    Preconditions.checkState(
+        !windmillServiceStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
+
+    return (windmillServiceStubs.size() == 1
+        ? windmillServiceStubs.get(0)
+        : randomlySelectNextStub(windmillServiceStubs));
   }
 
-  synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() {
+  CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStub() {
+    ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs =
+        dispatcherStubs.get().windmillMetadataServiceStubs();
     Preconditions.checkState(
-        !dispatcherStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
+        !windmillMetadataServiceStubs.isEmpty(), "windmillServiceEndpoint has 
not been set");
+
+    return (windmillMetadataServiceStubs.size() == 1
+        ? windmillMetadataServiceStubs.get(0)
+        : randomlySelectNextStub(windmillMetadataServiceStubs));
+  }
 
-    return (dispatcherStubs.size() == 1
-        ? dispatcherStubs.get(0)
-        : dispatcherStubs.get(rand.nextInt(dispatcherStubs.size())));
+  private synchronized <T> T randomlySelectNextStub(List<T> stubs) {
+    return stubs.get(rand.nextInt(stubs.size()));
   }
 
-  synchronized boolean isReady() {
-    return !dispatcherStubs.isEmpty();
+  boolean isReady() {

Review Comment:
   i guess we can do it async
   
   how often does this value change? @scwhittle 



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java:
##########
@@ -109,10 +113,13 @@
 })
 public class GrpcWindmillServerTest {
   @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
+  @Rule public GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
+  @Rule public ErrorCollector errorCollector = new ErrorCollector();
+
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcWindmillServerTest.class);
   private static final int STREAM_CHUNK_SIZE = 2 << 20;
+  private final long clientId = new Random().nextLong();

Review Comment:
   done.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/InProcessWindmillStubFactory.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.function.Function;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+
+/**
+ * Creates in process stubs to talk to Streaming Engine. Only recommended to 
be used for testing.
+ */
+final class InProcessWindmillStubFactory implements WindmillStubFactory {
+  private final String testName;
+  private final Function<String, ManagedChannel> channelFactory;
+
+  InProcessWindmillStubFactory(String testName, Function<String, 
ManagedChannel> channelFactory) {

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactories.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import com.google.auth.Credentials;
+import java.util.function.Function;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+
+@Internal
+public final class WindmillStubFactories {

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -44,93 +47,172 @@ class GrpcDispatcherClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDispatcherClient.class);
   private final WindmillStubFactory windmillStubFactory;
 
-  @GuardedBy("this")
-  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
-
-  @GuardedBy("this")
-  private final Set<HostAndPort> dispatcherEndpoints;
+  /**
+   * Current dispatcher endpoints and stubs used to communicate with Windmill 
Dispatcher.
+   *
+   * @implNote Reads are lock free, writes are synchronized.
+   */
+  private final AtomicReference<DispatcherStubs> dispatcherStubs;
 
   @GuardedBy("this")
   private final Random rand;
 
   private GrpcDispatcherClient(
       WindmillStubFactory windmillStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
-      Set<HostAndPort> dispatcherEndpoints,
+      DispatcherStubs initialDispatcherStubs,
       Random rand) {
     this.windmillStubFactory = windmillStubFactory;
-    this.dispatcherStubs = dispatcherStubs;
-    this.dispatcherEndpoints = dispatcherEndpoints;
     this.rand = rand;
+    this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
   }
 
   static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) {
-    return new GrpcDispatcherClient(
-        windmillStubFactory, new ArrayList<>(), new HashSet<>(), new Random());
+    return new GrpcDispatcherClient(windmillStubFactory, 
DispatcherStubs.empty(), new Random());
   }
 
   @VisibleForTesting
   static GrpcDispatcherClient forTesting(
       WindmillStubFactory windmillGrpcStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+      List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
+      List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs,
       Set<HostAndPort> dispatcherEndpoints) {
-    Preconditions.checkArgument(dispatcherEndpoints.size() == 
dispatcherStubs.size());
+    Preconditions.checkArgument(
+        dispatcherEndpoints.size() == windmillServiceStubs.size()
+            && windmillServiceStubs.size() == 
windmillMetadataServiceStubs.size());
     return new GrpcDispatcherClient(
-        windmillGrpcStubFactory, dispatcherStubs, dispatcherEndpoints, new 
Random());
+        windmillGrpcStubFactory,
+        DispatcherStubs.create(
+            dispatcherEndpoints, windmillServiceStubs, 
windmillMetadataServiceStubs),
+        new Random());
   }
 
-  synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() {
+  CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
+    ImmutableList<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs =
+        dispatcherStubs.get().windmillServiceStubs();
     Preconditions.checkState(
-        !dispatcherStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
+        !windmillServiceStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
 
-    return (dispatcherStubs.size() == 1
-        ? dispatcherStubs.get(0)
-        : dispatcherStubs.get(rand.nextInt(dispatcherStubs.size())));
+    return (windmillServiceStubs.size() == 1
+        ? windmillServiceStubs.get(0)
+        : randomlySelectNextStub(windmillServiceStubs));
   }
 
-  synchronized boolean isReady() {
-    return !dispatcherStubs.isEmpty();
+  CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStub() {
+    ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs =
+        dispatcherStubs.get().windmillMetadataServiceStubs();
+    Preconditions.checkState(
+        !windmillMetadataServiceStubs.isEmpty(), "windmillServiceEndpoint has 
not been set");
+
+    return (windmillMetadataServiceStubs.size() == 1
+        ? windmillMetadataServiceStubs.get(0)
+        : randomlySelectNextStub(windmillMetadataServiceStubs));
+  }
+
+  private synchronized <T> T randomlySelectNextStub(List<T> stubs) {
+    return stubs.get(rand.nextInt(dispatcherStubs.get().size()));

Review Comment:
   changed to use stubs.size()



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/InProcessWindmillStubFactory.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.function.Function;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+
+/**
+ * Creates in process stubs to talk to Streaming Engine. Only recommended to 
be used for testing.

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -504,14 +548,23 @@ public static void main(String[] args) throws Exception {
     worker.start();
   }
 
-  public static StreamingDataflowWorker 
fromOptions(StreamingDataflowWorkerOptions options)
-      throws IOException {
-
+  public static StreamingDataflowWorker 
fromOptions(StreamingDataflowWorkerOptions options) {
+    StreamingDataflowWorkerOptions streamingOptions =
+        options.as(StreamingDataflowWorkerOptions.class);

Review Comment:
   done



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java:
##########
@@ -142,55 +141,43 @@ synchronized ActivateWorkResult 
activateWorkForKey(ShardedKey shardedKey, Work w
     return ActivateWorkResult.QUEUED;
   }
 
-  @AutoValue
-  public abstract static class FailedTokens {
-    public static Builder newBuilder() {
-      return new AutoValue_ActiveWorkState_FailedTokens.Builder();
-    }
-
-    public abstract long workToken();
-
-    public abstract long cacheToken();
-
-    @AutoValue.Builder
-    public abstract static class Builder {
-      public abstract Builder setWorkToken(long value);
-
-      public abstract Builder setCacheToken(long value);
-
-      public abstract FailedTokens build();
-    }
-  }
-
   /**
    * Fails any active work matching an element of the input Map.
    *
    * @param failedWork a map from sharding_key to tokens for the corresponding 
work.
    */
-  synchronized void failWorkForKey(Map<Long, List<FailedTokens>> failedWork) {
+  synchronized void failWorkForKey(Map<Long, List<FailedWorkToken>> 
failedWork) {
     // Note we can't construct a ShardedKey and look it up in activeWork 
directly since
     // HeartbeatResponse doesn't include the user key.
     for (Entry<ShardedKey, Deque<Work>> entry : activeWork.entrySet()) {
-      List<FailedTokens> failedTokens = 
failedWork.get(entry.getKey().shardingKey());
-      if (failedTokens == null) continue;
-      for (FailedTokens failedToken : failedTokens) {
-        for (Work queuedWork : entry.getValue()) {
-          WorkItem workItem = queuedWork.getWorkItem();
-          if (workItem.getWorkToken() == failedToken.workToken()
-              && workItem.getCacheToken() == failedToken.cacheToken()) {
-            LOG.debug(
-                "Failing work "
-                    + computationStateCache.getComputation()
-                    + " "
-                    + entry.getKey().shardingKey()
-                    + " "
-                    + failedToken.workToken()
-                    + " "
-                    + failedToken.cacheToken()
-                    + ". The work will be retried and is not lost.");
-            queuedWork.setFailed();
-            break;
-          }
+      ShardedKey shardedKey = entry.getKey();
+      Optional.ofNullable(failedWork.get(shardedKey.shardingKey()))
+          .ifPresent(
+              failedWorkTokens ->

Review Comment:
   done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to