m-trieu commented on code in PR #30046:
URL: https://github.com/apache/beam/pull/30046#discussion_r1491499772


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -44,93 +47,173 @@ class GrpcDispatcherClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDispatcherClient.class);
   private final WindmillStubFactory windmillStubFactory;
 
-  @GuardedBy("this")
-  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
-
-  @GuardedBy("this")
-  private final Set<HostAndPort> dispatcherEndpoints;
+  /**
+   * Current dispatcher endpoints and stubs used to communicate with Windmill 
Dispatcher.
+   *
+   * @implNote Reads are lock free, writes are synchronized.
+   */
+  private final AtomicReference<DispatcherStubs> dispatcherStubs;
 
   @GuardedBy("this")
   private final Random rand;
 
   private GrpcDispatcherClient(
       WindmillStubFactory windmillStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
-      Set<HostAndPort> dispatcherEndpoints,
+      DispatcherStubs initialDispatcherStubs,
       Random rand) {
     this.windmillStubFactory = windmillStubFactory;
-    this.dispatcherStubs = dispatcherStubs;
-    this.dispatcherEndpoints = dispatcherEndpoints;
     this.rand = rand;
+    this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
   }
 
   static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) {
-    return new GrpcDispatcherClient(
-        windmillStubFactory, new ArrayList<>(), new HashSet<>(), new Random());
+    return new GrpcDispatcherClient(windmillStubFactory, 
DispatcherStubs.empty(), new Random());
   }
 
   @VisibleForTesting
   static GrpcDispatcherClient forTesting(
       WindmillStubFactory windmillGrpcStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+      List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
+      List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs,
       Set<HostAndPort> dispatcherEndpoints) {
-    Preconditions.checkArgument(dispatcherEndpoints.size() == 
dispatcherStubs.size());
+    Preconditions.checkArgument(
+        dispatcherEndpoints.size() == windmillServiceStubs.size()
+            && windmillServiceStubs.size() == 
windmillMetadataServiceStubs.size());
     return new GrpcDispatcherClient(
-        windmillGrpcStubFactory, dispatcherStubs, dispatcherEndpoints, new 
Random());
+        windmillGrpcStubFactory,
+        DispatcherStubs.create(
+            dispatcherEndpoints, windmillServiceStubs, 
windmillMetadataServiceStubs),
+        new Random());
+  }
+
+  CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
+    ImmutableList<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs =
+        dispatcherStubs.get().windmillServiceStubs();
+    Preconditions.checkState(
+        !windmillServiceStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
+
+    return (windmillServiceStubs.size() == 1
+        ? windmillServiceStubs.get(0)
+        : randomlySelectNextStub(windmillServiceStubs));
   }
 
-  synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() {
+  CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStub() {
+    ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs =
+        dispatcherStubs.get().windmillMetadataServiceStubs();
     Preconditions.checkState(
-        !dispatcherStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
+        !windmillMetadataServiceStubs.isEmpty(), "windmillServiceEndpoint has 
not been set");
+
+    return (windmillMetadataServiceStubs.size() == 1
+        ? windmillMetadataServiceStubs.get(0)
+        : randomlySelectNextStub(windmillMetadataServiceStubs));
+  }
 
-    return (dispatcherStubs.size() == 1
-        ? dispatcherStubs.get(0)
-        : dispatcherStubs.get(rand.nextInt(dispatcherStubs.size())));
+  private synchronized <T> T randomlySelectNextStub(List<T> stubs) {
+    return stubs.get(rand.nextInt(stubs.size()));
   }
 
-  synchronized boolean isReady() {
-    return !dispatcherStubs.isEmpty();
+  boolean isReady() {

Review Comment:
   ack done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to