scwhittle commented on code in PR #30046:
URL: https://github.com/apache/beam/pull/30046#discussion_r1469550140


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -44,93 +47,172 @@ class GrpcDispatcherClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDispatcherClient.class);
   private final WindmillStubFactory windmillStubFactory;
 
-  @GuardedBy("this")
-  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
-
-  @GuardedBy("this")
-  private final Set<HostAndPort> dispatcherEndpoints;
+  /**
+   * Current dispatcher endpoints and stubs used to communicate with Windmill 
Dispatcher.
+   *
+   * @implNote Reads are lock free, writes are synchronized.
+   */
+  private final AtomicReference<DispatcherStubs> dispatcherStubs;
 
   @GuardedBy("this")
   private final Random rand;
 
   private GrpcDispatcherClient(
       WindmillStubFactory windmillStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
-      Set<HostAndPort> dispatcherEndpoints,
+      DispatcherStubs initialDispatcherStubs,
       Random rand) {
     this.windmillStubFactory = windmillStubFactory;
-    this.dispatcherStubs = dispatcherStubs;
-    this.dispatcherEndpoints = dispatcherEndpoints;
     this.rand = rand;
+    this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
   }
 
   static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) {
-    return new GrpcDispatcherClient(
-        windmillStubFactory, new ArrayList<>(), new HashSet<>(), new Random());
+    return new GrpcDispatcherClient(windmillStubFactory, 
DispatcherStubs.empty(), new Random());
   }
 
   @VisibleForTesting
   static GrpcDispatcherClient forTesting(
       WindmillStubFactory windmillGrpcStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+      List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
+      List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs,
       Set<HostAndPort> dispatcherEndpoints) {
-    Preconditions.checkArgument(dispatcherEndpoints.size() == 
dispatcherStubs.size());
+    Preconditions.checkArgument(
+        dispatcherEndpoints.size() == windmillServiceStubs.size()
+            && windmillServiceStubs.size() == 
windmillMetadataServiceStubs.size());
     return new GrpcDispatcherClient(
-        windmillGrpcStubFactory, dispatcherStubs, dispatcherEndpoints, new 
Random());
+        windmillGrpcStubFactory,
+        DispatcherStubs.create(
+            dispatcherEndpoints, windmillServiceStubs, 
windmillMetadataServiceStubs),
+        new Random());
   }
 
-  synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() {
+  CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
+    ImmutableList<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs =
+        dispatcherStubs.get().windmillServiceStubs();
     Preconditions.checkState(
-        !dispatcherStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
+        !windmillServiceStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
 
-    return (dispatcherStubs.size() == 1
-        ? dispatcherStubs.get(0)
-        : dispatcherStubs.get(rand.nextInt(dispatcherStubs.size())));
+    return (windmillServiceStubs.size() == 1
+        ? windmillServiceStubs.get(0)
+        : randomlySelectNextStub(windmillServiceStubs));
   }
 
-  synchronized boolean isReady() {
-    return !dispatcherStubs.isEmpty();
+  CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStub() {
+    ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs =
+        dispatcherStubs.get().windmillMetadataServiceStubs();
+    Preconditions.checkState(
+        !windmillMetadataServiceStubs.isEmpty(), "windmillServiceEndpoint has 
not been set");
+
+    return (windmillMetadataServiceStubs.size() == 1
+        ? windmillMetadataServiceStubs.get(0)
+        : randomlySelectNextStub(windmillMetadataServiceStubs));
+  }
+
+  private synchronized <T> T randomlySelectNextStub(List<T> stubs) {
+    return stubs.get(rand.nextInt(dispatcherStubs.get().size()));
+  }
+
+  boolean isReady() {
+    return dispatcherStubs.get().isReady();
   }
 
   synchronized void consumeWindmillDispatcherEndpoints(
       ImmutableSet<HostAndPort> dispatcherEndpoints) {
+    ImmutableSet<HostAndPort> currentDispatcherEndpoints =
+        dispatcherStubs.get().dispatcherEndpoints();
     Preconditions.checkArgument(
         dispatcherEndpoints != null && !dispatcherEndpoints.isEmpty(),
         "Cannot set dispatcher endpoints to nothing.");
-    if (this.dispatcherEndpoints.equals(dispatcherEndpoints)) {
+    if (currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
       // The endpoints are equal don't recreate the stubs.
       return;
     }
 
     LOG.info("Creating a new windmill stub, endpoints: {}", 
dispatcherEndpoints);
-    if (!this.dispatcherEndpoints.isEmpty()) {
-      LOG.info("Previous windmill stub endpoints: {}", 
this.dispatcherEndpoints);
+    if (!currentDispatcherEndpoints.isEmpty()) {
+      LOG.info("Previous windmill stub endpoints: {}", 
currentDispatcherEndpoints);
     }
 
-    resetDispatcherEndpoints(dispatcherEndpoints);
+    LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", 
dispatcherEndpoints);
+    dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, 
windmillStubFactory));
   }
 
-  private synchronized void resetDispatcherEndpoints(
-      ImmutableSet<HostAndPort> newDispatcherEndpoints) {
-    LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", 
newDispatcherEndpoints);
-    this.dispatcherStubs.clear();
-    this.dispatcherEndpoints.clear();
-    this.dispatcherEndpoints.addAll(newDispatcherEndpoints);
+  /**
+   * Endpoints and gRPC stubs used to communicate with the Windmill 
Dispatcher. {@link
+   * #dispatcherEndpoints()}, {@link #windmillServiceStubs()}, and {@link
+   * #windmillMetadataServiceStubs()} collections should all be of the same 
size.
+   */
+  @AutoValue
+  abstract static class DispatcherStubs {
+
+    private static DispatcherStubs empty() {
+      return new AutoValue_GrpcDispatcherClient_DispatcherStubs(
+          ImmutableSet.of(), ImmutableList.of(), ImmutableList.of());
+    }
 
-    dispatcherEndpoints.stream()
-        .map(this::createDispatcherStubForWindmillService)
-        .forEach(dispatcherStubs::add);
-  }
+    private static DispatcherStubs create(
+        Set<HostAndPort> endpoints,
+        List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
+        List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs) {
+      return new AutoValue_GrpcDispatcherClient_DispatcherStubs(

Review Comment:
   precondition check that endpoints and stubs are all the same sizes?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -44,93 +47,172 @@ class GrpcDispatcherClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDispatcherClient.class);
   private final WindmillStubFactory windmillStubFactory;
 
-  @GuardedBy("this")
-  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
-
-  @GuardedBy("this")
-  private final Set<HostAndPort> dispatcherEndpoints;
+  /**
+   * Current dispatcher endpoints and stubs used to communicate with Windmill 
Dispatcher.
+   *
+   * @implNote Reads are lock free, writes are synchronized.
+   */
+  private final AtomicReference<DispatcherStubs> dispatcherStubs;
 
   @GuardedBy("this")
   private final Random rand;
 
   private GrpcDispatcherClient(
       WindmillStubFactory windmillStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
-      Set<HostAndPort> dispatcherEndpoints,
+      DispatcherStubs initialDispatcherStubs,
       Random rand) {
     this.windmillStubFactory = windmillStubFactory;
-    this.dispatcherStubs = dispatcherStubs;
-    this.dispatcherEndpoints = dispatcherEndpoints;
     this.rand = rand;
+    this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
   }
 
   static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) {
-    return new GrpcDispatcherClient(
-        windmillStubFactory, new ArrayList<>(), new HashSet<>(), new Random());
+    return new GrpcDispatcherClient(windmillStubFactory, 
DispatcherStubs.empty(), new Random());
   }
 
   @VisibleForTesting
   static GrpcDispatcherClient forTesting(
       WindmillStubFactory windmillGrpcStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+      List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
+      List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs,
       Set<HostAndPort> dispatcherEndpoints) {
-    Preconditions.checkArgument(dispatcherEndpoints.size() == 
dispatcherStubs.size());
+    Preconditions.checkArgument(
+        dispatcherEndpoints.size() == windmillServiceStubs.size()
+            && windmillServiceStubs.size() == 
windmillMetadataServiceStubs.size());
     return new GrpcDispatcherClient(
-        windmillGrpcStubFactory, dispatcherStubs, dispatcherEndpoints, new 
Random());
+        windmillGrpcStubFactory,
+        DispatcherStubs.create(
+            dispatcherEndpoints, windmillServiceStubs, 
windmillMetadataServiceStubs),
+        new Random());
   }
 
-  synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() {
+  CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
+    ImmutableList<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs =
+        dispatcherStubs.get().windmillServiceStubs();
     Preconditions.checkState(
-        !dispatcherStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
+        !windmillServiceStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
 
-    return (dispatcherStubs.size() == 1
-        ? dispatcherStubs.get(0)
-        : dispatcherStubs.get(rand.nextInt(dispatcherStubs.size())));
+    return (windmillServiceStubs.size() == 1
+        ? windmillServiceStubs.get(0)
+        : randomlySelectNextStub(windmillServiceStubs));
   }
 
-  synchronized boolean isReady() {
-    return !dispatcherStubs.isEmpty();
+  CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStub() {
+    ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs =
+        dispatcherStubs.get().windmillMetadataServiceStubs();
+    Preconditions.checkState(
+        !windmillMetadataServiceStubs.isEmpty(), "windmillServiceEndpoint has 
not been set");
+
+    return (windmillMetadataServiceStubs.size() == 1
+        ? windmillMetadataServiceStubs.get(0)
+        : randomlySelectNextStub(windmillMetadataServiceStubs));
+  }
+
+  private synchronized <T> T randomlySelectNextStub(List<T> stubs) {
+    return stubs.get(rand.nextInt(dispatcherStubs.get().size()));

Review Comment:
   mixing stubs and dispatcherStubs



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -44,93 +47,172 @@ class GrpcDispatcherClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDispatcherClient.class);
   private final WindmillStubFactory windmillStubFactory;
 
-  @GuardedBy("this")
-  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
-
-  @GuardedBy("this")
-  private final Set<HostAndPort> dispatcherEndpoints;
+  /**
+   * Current dispatcher endpoints and stubs used to communicate with Windmill 
Dispatcher.
+   *
+   * @implNote Reads are lock free, writes are synchronized.
+   */
+  private final AtomicReference<DispatcherStubs> dispatcherStubs;
 
   @GuardedBy("this")
   private final Random rand;
 
   private GrpcDispatcherClient(
       WindmillStubFactory windmillStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
-      Set<HostAndPort> dispatcherEndpoints,
+      DispatcherStubs initialDispatcherStubs,
       Random rand) {
     this.windmillStubFactory = windmillStubFactory;
-    this.dispatcherStubs = dispatcherStubs;
-    this.dispatcherEndpoints = dispatcherEndpoints;
     this.rand = rand;
+    this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
   }
 
   static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) {
-    return new GrpcDispatcherClient(
-        windmillStubFactory, new ArrayList<>(), new HashSet<>(), new Random());
+    return new GrpcDispatcherClient(windmillStubFactory, 
DispatcherStubs.empty(), new Random());
   }
 
   @VisibleForTesting
   static GrpcDispatcherClient forTesting(
       WindmillStubFactory windmillGrpcStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+      List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
+      List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs,
       Set<HostAndPort> dispatcherEndpoints) {
-    Preconditions.checkArgument(dispatcherEndpoints.size() == 
dispatcherStubs.size());
+    Preconditions.checkArgument(
+        dispatcherEndpoints.size() == windmillServiceStubs.size()
+            && windmillServiceStubs.size() == 
windmillMetadataServiceStubs.size());
     return new GrpcDispatcherClient(
-        windmillGrpcStubFactory, dispatcherStubs, dispatcherEndpoints, new 
Random());
+        windmillGrpcStubFactory,
+        DispatcherStubs.create(
+            dispatcherEndpoints, windmillServiceStubs, 
windmillMetadataServiceStubs),
+        new Random());
   }
 
-  synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() {
+  CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
+    ImmutableList<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs =
+        dispatcherStubs.get().windmillServiceStubs();
     Preconditions.checkState(
-        !dispatcherStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
+        !windmillServiceStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
 
-    return (dispatcherStubs.size() == 1
-        ? dispatcherStubs.get(0)
-        : dispatcherStubs.get(rand.nextInt(dispatcherStubs.size())));
+    return (windmillServiceStubs.size() == 1
+        ? windmillServiceStubs.get(0)
+        : randomlySelectNextStub(windmillServiceStubs));
   }
 
-  synchronized boolean isReady() {
-    return !dispatcherStubs.isEmpty();
+  CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStub() {
+    ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs =
+        dispatcherStubs.get().windmillMetadataServiceStubs();
+    Preconditions.checkState(
+        !windmillMetadataServiceStubs.isEmpty(), "windmillServiceEndpoint has 
not been set");
+
+    return (windmillMetadataServiceStubs.size() == 1
+        ? windmillMetadataServiceStubs.get(0)
+        : randomlySelectNextStub(windmillMetadataServiceStubs));
+  }
+
+  private synchronized <T> T randomlySelectNextStub(List<T> stubs) {
+    return stubs.get(rand.nextInt(dispatcherStubs.get().size()));
+  }
+
+  boolean isReady() {
+    return dispatcherStubs.get().isReady();
   }
 
   synchronized void consumeWindmillDispatcherEndpoints(
       ImmutableSet<HostAndPort> dispatcherEndpoints) {
+    ImmutableSet<HostAndPort> currentDispatcherEndpoints =
+        dispatcherStubs.get().dispatcherEndpoints();
     Preconditions.checkArgument(
         dispatcherEndpoints != null && !dispatcherEndpoints.isEmpty(),
         "Cannot set dispatcher endpoints to nothing.");
-    if (this.dispatcherEndpoints.equals(dispatcherEndpoints)) {
+    if (currentDispatcherEndpoints.equals(dispatcherEndpoints)) {
       // The endpoints are equal don't recreate the stubs.
       return;
     }
 
     LOG.info("Creating a new windmill stub, endpoints: {}", 
dispatcherEndpoints);
-    if (!this.dispatcherEndpoints.isEmpty()) {
-      LOG.info("Previous windmill stub endpoints: {}", 
this.dispatcherEndpoints);
+    if (!currentDispatcherEndpoints.isEmpty()) {
+      LOG.info("Previous windmill stub endpoints: {}", 
currentDispatcherEndpoints);
     }
 
-    resetDispatcherEndpoints(dispatcherEndpoints);
+    LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", 
dispatcherEndpoints);
+    dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, 
windmillStubFactory));
   }
 
-  private synchronized void resetDispatcherEndpoints(
-      ImmutableSet<HostAndPort> newDispatcherEndpoints) {
-    LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", 
newDispatcherEndpoints);
-    this.dispatcherStubs.clear();
-    this.dispatcherEndpoints.clear();
-    this.dispatcherEndpoints.addAll(newDispatcherEndpoints);
+  /**
+   * Endpoints and gRPC stubs used to communicate with the Windmill 
Dispatcher. {@link
+   * #dispatcherEndpoints()}, {@link #windmillServiceStubs()}, and {@link
+   * #windmillMetadataServiceStubs()} collections should all be of the same 
size.
+   */
+  @AutoValue
+  abstract static class DispatcherStubs {
+
+    private static DispatcherStubs empty() {
+      return new AutoValue_GrpcDispatcherClient_DispatcherStubs(
+          ImmutableSet.of(), ImmutableList.of(), ImmutableList.of());
+    }
 
-    dispatcherEndpoints.stream()
-        .map(this::createDispatcherStubForWindmillService)
-        .forEach(dispatcherStubs::add);
-  }
+    private static DispatcherStubs create(
+        Set<HostAndPort> endpoints,
+        List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
+        List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs) {
+      return new AutoValue_GrpcDispatcherClient_DispatcherStubs(
+          ImmutableSet.copyOf(endpoints),
+          ImmutableList.copyOf(windmillServiceStubs),
+          ImmutableList.copyOf(windmillMetadataServiceStubs));
+    }
+
+    private static DispatcherStubs create(
+        ImmutableSet<HostAndPort> newDispatcherEndpoints, WindmillStubFactory 
windmillStubFactory) {
+      ImmutableList.Builder<CloudWindmillServiceV1Alpha1Stub> 
windmillServiceStubs =
+          ImmutableList.builder();
+      ImmutableList.Builder<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs =
+          ImmutableList.builder();
+
+      for (HostAndPort endpoint : newDispatcherEndpoints) {
+        windmillServiceStubs.add(createWindmillServiceStub(endpoint, 
windmillStubFactory));
+        windmillMetadataServiceStubs.add(
+            createWindmillMetadataServiceStub(endpoint, windmillStubFactory));
+      }
+
+      return new AutoValue_GrpcDispatcherClient_DispatcherStubs(
+          newDispatcherEndpoints,
+          windmillServiceStubs.build(),
+          windmillMetadataServiceStubs.build());
+    }
+
+    private static CloudWindmillServiceV1Alpha1Stub createWindmillServiceStub(
+        HostAndPort endpoint, WindmillStubFactory windmillStubFactory) {
+      if (LOCALHOST.equals(endpoint.getHost())) {
+        return 
CloudWindmillServiceV1Alpha1Grpc.newStub(localhostChannel(endpoint.getPort()));
+      }
+
+      return 
windmillStubFactory.createWindmillServiceStub(WindmillServiceAddress.create(endpoint));
+    }
+
+    private static CloudWindmillMetadataServiceV1Alpha1Stub 
createWindmillMetadataServiceStub(
+        HostAndPort endpoint, WindmillStubFactory windmillStubFactory) {
+      if (LOCALHOST.equals(endpoint.getHost())) {
+        return CloudWindmillMetadataServiceV1Alpha1Grpc.newStub(
+            localhostChannel(endpoint.getPort()));
+      }
 
-  private CloudWindmillServiceV1Alpha1Stub 
createDispatcherStubForWindmillService(
-      HostAndPort endpoint) {
-    if (LOCALHOST.equals(endpoint.getHost())) {
-      return 
CloudWindmillServiceV1Alpha1Grpc.newStub(localhostChannel(endpoint.getPort()));
+      return windmillStubFactory.createWindmillMetadataServiceStub(
+          WindmillServiceAddress.create(endpoint));
     }
 
-    // Use an in-process stub if testing.
-    return windmillStubFactory.getKind() == WindmillStubFactory.Kind.IN_PROCESS
-        ? windmillStubFactory.inProcess().get()
-        : 
windmillStubFactory.remote().apply(WindmillServiceAddress.create(endpoint));
+    private int size() {
+      return dispatcherEndpoints().size();
+    }
+
+    private boolean isReady() {

Review Comment:
   implement as size() > 0? or remove?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -422,13 +424,33 @@ public void run() {
     commitThread.setName("CommitThread");
 
     this.publishCounters = publishCounters;
+    this.clientId = clientIdGenerator.nextLong();
     this.windmillServer = options.getWindmillServerStub();
     
this.windmillServer.setProcessHeartbeatResponses(this::handleHeartbeatResponses);
+    Duration maxBackoff =
+        !options.isEnableStreamingEngine() && 
options.getLocalWindmillHostport() != null
+            ? GrpcWindmillServer.LOCALHOST_BACKOFF
+            : GrpcWindmillServer.MAX_BACKOFF;
+    GrpcWindmillStreamFactory windmillStreamFactory =

Review Comment:
   seems odd to create this here if the server isn't grpc server.
   
   Can we just pass in somethign generic like JobHeader to windmillServer.start?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to