m-trieu commented on code in PR #30046:
URL: https://github.com/apache/beam/pull/30046#discussion_r1491499772
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -44,93 +47,173 @@ class GrpcDispatcherClient {
private static final Logger LOG =
LoggerFactory.getLogger(GrpcDispatcherClient.class);
private final WindmillStubFactory windmillStubFactory;
- @GuardedBy("this")
- private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
-
- @GuardedBy("this")
- private final Set<HostAndPort> dispatcherEndpoints;
+ /**
+ * Current dispatcher endpoints and stubs used to communicate with Windmill
Dispatcher.
+ *
+ * @implNote Reads are lock free, writes are synchronized.
+ */
+ private final AtomicReference<DispatcherStubs> dispatcherStubs;
@GuardedBy("this")
private final Random rand;
private GrpcDispatcherClient(
WindmillStubFactory windmillStubFactory,
- List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
- Set<HostAndPort> dispatcherEndpoints,
+ DispatcherStubs initialDispatcherStubs,
Random rand) {
this.windmillStubFactory = windmillStubFactory;
- this.dispatcherStubs = dispatcherStubs;
- this.dispatcherEndpoints = dispatcherEndpoints;
this.rand = rand;
+ this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
}
static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) {
- return new GrpcDispatcherClient(
- windmillStubFactory, new ArrayList<>(), new HashSet<>(), new Random());
+ return new GrpcDispatcherClient(windmillStubFactory,
DispatcherStubs.empty(), new Random());
}
@VisibleForTesting
static GrpcDispatcherClient forTesting(
WindmillStubFactory windmillGrpcStubFactory,
- List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+ List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
+ List<CloudWindmillMetadataServiceV1Alpha1Stub>
windmillMetadataServiceStubs,
Set<HostAndPort> dispatcherEndpoints) {
- Preconditions.checkArgument(dispatcherEndpoints.size() ==
dispatcherStubs.size());
+ Preconditions.checkArgument(
+ dispatcherEndpoints.size() == windmillServiceStubs.size()
+ && windmillServiceStubs.size() ==
windmillMetadataServiceStubs.size());
return new GrpcDispatcherClient(
- windmillGrpcStubFactory, dispatcherStubs, dispatcherEndpoints, new
Random());
+ windmillGrpcStubFactory,
+ DispatcherStubs.create(
+ dispatcherEndpoints, windmillServiceStubs,
windmillMetadataServiceStubs),
+ new Random());
+ }
+
+ CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
+ ImmutableList<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs =
+ dispatcherStubs.get().windmillServiceStubs();
+ Preconditions.checkState(
+ !windmillServiceStubs.isEmpty(), "windmillServiceEndpoint has not been
set");
+
+ return (windmillServiceStubs.size() == 1
+ ? windmillServiceStubs.get(0)
+ : randomlySelectNextStub(windmillServiceStubs));
}
- synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() {
+ CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStub() {
+ ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub>
windmillMetadataServiceStubs =
+ dispatcherStubs.get().windmillMetadataServiceStubs();
Preconditions.checkState(
- !dispatcherStubs.isEmpty(), "windmillServiceEndpoint has not been
set");
+ !windmillMetadataServiceStubs.isEmpty(), "windmillServiceEndpoint has
not been set");
+
+ return (windmillMetadataServiceStubs.size() == 1
+ ? windmillMetadataServiceStubs.get(0)
+ : randomlySelectNextStub(windmillMetadataServiceStubs));
+ }
- return (dispatcherStubs.size() == 1
- ? dispatcherStubs.get(0)
- : dispatcherStubs.get(rand.nextInt(dispatcherStubs.size())));
+ private synchronized <T> T randomlySelectNextStub(List<T> stubs) {
+ return stubs.get(rand.nextInt(stubs.size()));
}
- synchronized boolean isReady() {
- return !dispatcherStubs.isEmpty();
+ boolean isReady() {
Review Comment:
ack done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]