scwhittle commented on code in PR #30046:
URL: https://github.com/apache/beam/pull/30046#discussion_r1477937118


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java:
##########
@@ -101,36 +103,21 @@ public final class GrpcWindmillServer extends 
WindmillServerStub {
   // newer ComputationHeartbeatRequests.
   private final boolean sendKeyedGetDataRequests;
   private Consumer<List<ComputationHeartbeatResponse>> 
processHeartbeatResponses;
+  private @Nullable GrpcWindmillStreamFactory windmillStreamFactory;

Review Comment:
   if we can change constructor to require the streamfactory this nullable and 
preconditons below can be removed, see other comment



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -44,93 +47,173 @@ class GrpcDispatcherClient {
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDispatcherClient.class);
   private final WindmillStubFactory windmillStubFactory;
 
-  @GuardedBy("this")
-  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
-
-  @GuardedBy("this")
-  private final Set<HostAndPort> dispatcherEndpoints;
+  /**
+   * Current dispatcher endpoints and stubs used to communicate with Windmill 
Dispatcher.
+   *
+   * @implNote Reads are lock free, writes are synchronized.
+   */
+  private final AtomicReference<DispatcherStubs> dispatcherStubs;
 
   @GuardedBy("this")
   private final Random rand;
 
   private GrpcDispatcherClient(
       WindmillStubFactory windmillStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
-      Set<HostAndPort> dispatcherEndpoints,
+      DispatcherStubs initialDispatcherStubs,
       Random rand) {
     this.windmillStubFactory = windmillStubFactory;
-    this.dispatcherStubs = dispatcherStubs;
-    this.dispatcherEndpoints = dispatcherEndpoints;
     this.rand = rand;
+    this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs);
   }
 
   static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) {
-    return new GrpcDispatcherClient(
-        windmillStubFactory, new ArrayList<>(), new HashSet<>(), new Random());
+    return new GrpcDispatcherClient(windmillStubFactory, 
DispatcherStubs.empty(), new Random());
   }
 
   @VisibleForTesting
   static GrpcDispatcherClient forTesting(
       WindmillStubFactory windmillGrpcStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
+      List<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs,
+      List<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs,
       Set<HostAndPort> dispatcherEndpoints) {
-    Preconditions.checkArgument(dispatcherEndpoints.size() == 
dispatcherStubs.size());
+    Preconditions.checkArgument(
+        dispatcherEndpoints.size() == windmillServiceStubs.size()
+            && windmillServiceStubs.size() == 
windmillMetadataServiceStubs.size());
     return new GrpcDispatcherClient(
-        windmillGrpcStubFactory, dispatcherStubs, dispatcherEndpoints, new 
Random());
+        windmillGrpcStubFactory,
+        DispatcherStubs.create(
+            dispatcherEndpoints, windmillServiceStubs, 
windmillMetadataServiceStubs),
+        new Random());
+  }
+
+  CloudWindmillServiceV1Alpha1Stub getWindmillServiceStub() {
+    ImmutableList<CloudWindmillServiceV1Alpha1Stub> windmillServiceStubs =
+        dispatcherStubs.get().windmillServiceStubs();
+    Preconditions.checkState(
+        !windmillServiceStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
+
+    return (windmillServiceStubs.size() == 1
+        ? windmillServiceStubs.get(0)
+        : randomlySelectNextStub(windmillServiceStubs));
   }
 
-  synchronized CloudWindmillServiceV1Alpha1Stub getDispatcherStub() {
+  CloudWindmillMetadataServiceV1Alpha1Stub getWindmillMetadataServiceStub() {
+    ImmutableList<CloudWindmillMetadataServiceV1Alpha1Stub> 
windmillMetadataServiceStubs =
+        dispatcherStubs.get().windmillMetadataServiceStubs();
     Preconditions.checkState(
-        !dispatcherStubs.isEmpty(), "windmillServiceEndpoint has not been 
set");
+        !windmillMetadataServiceStubs.isEmpty(), "windmillServiceEndpoint has 
not been set");
+
+    return (windmillMetadataServiceStubs.size() == 1
+        ? windmillMetadataServiceStubs.get(0)
+        : randomlySelectNextStub(windmillMetadataServiceStubs));
+  }
 
-    return (dispatcherStubs.size() == 1
-        ? dispatcherStubs.get(0)
-        : dispatcherStubs.get(rand.nextInt(dispatcherStubs.size())));
+  private synchronized <T> T randomlySelectNextStub(List<T> stubs) {
+    return stubs.get(rand.nextInt(stubs.size()));
   }
 
-  synchronized boolean isReady() {
-    return !dispatcherStubs.isEmpty();
+  boolean isReady() {

Review Comment:
   this seems like it could lead to racy usage
   if (client.isReady()) {
     client.xxx
   }
   but since the dispatcherStubs can change in-between, there may not be a stub 
by the time the next call is made.
   
   Can this be removed?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/InProcessWindmillStubFactory.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.function.Function;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+
+/**
+ * Creates in process stubs to talk to Streaming Engine. Only recommended to 
be used for testing.
+ */
+final class InProcessWindmillStubFactory implements WindmillStubFactory {
+  private final String testName;
+  private final Function<String, ManagedChannel> channelFactory;
+
+  InProcessWindmillStubFactory(String testName, Function<String, 
ManagedChannel> channelFactory) {

Review Comment:
   nit: coudl just take a Supplier<ManagedChannel>
   If tests have a Function<String, ManagedChannel> they can always bind the 
test name themselves.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactories.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import com.google.auth.Credentials;
+import java.util.function.Function;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+
+@Internal
+public final class WindmillStubFactories {

Review Comment:
   should we get rid of this one? If the fake is moved to test folder this just 
has single method. I think callers could use RemoteWindmillStubFactory 
constructor directly



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -422,13 +422,20 @@ public void run() {
     commitThread.setName("CommitThread");
 
     this.publishCounters = publishCounters;
+    this.clientId = clientIdGenerator.nextLong();
     this.windmillServer = options.getWindmillServerStub();
     
this.windmillServer.setProcessHeartbeatResponses(this::handleHeartbeatResponses);
+    windmillServer.start(

Review Comment:
   could we avoid the start method (which is more complex than a factory method 
taking necessary requirements and initalizing) by moving the grpc server 
initialization into this file?
   
   It's kind of odd that it is within getWindmillServerStub default factory.  
It appears that is done so that the test can inject a fake server.  One idea 
would be to remove the default factory from options and create a server here if 
the option is null but use injected server otherwise.
   
   Then you could just create the GrpcWindmillServer directly with either a 
factory or constructor taking the JobHeader without modifying the 
WindmillServerStub interface.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/InProcessWindmillStubFactory.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs;
+
+import java.util.function.Function;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc;
+import 
org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc;
+import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
+import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ManagedChannel;
+
+/**
+ * Creates in process stubs to talk to Streaming Engine. Only recommended to 
be used for testing.

Review Comment:
   should this be moved to test folder and renamed FakeWindmillStubFactory? it 
ignores the serviceAddress
   



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java:
##########
@@ -84,14 +86,14 @@
 })
 @SuppressWarnings("nullness") // 
TODO(https://github.com/apache/beam/issues/20497
 public final class GrpcWindmillServer extends WindmillServerStub {
+  public static final Duration LOCALHOST_BACKOFF = Duration.millis(500);
+  public static final Duration MAX_BACKOFF = Duration.standardSeconds(30);

Review Comment:
   nit: seems good to leave max/min together



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##########
@@ -81,6 +70,75 @@ public static WindmillEndpoints.Builder builder() {
     return new AutoValue_WindmillEndpoints.Builder();
   }
 
+  private static Optional<WindmillServiceAddress> parseDirectEndpoint(
+      Windmill.WorkerMetadataResponse.Endpoint endpointProto, String 
authenticatingService) {
+    Optional<WindmillServiceAddress> directEndpointIpV6Address =
+        tryParseDirectEndpointIntoIpV6Address(endpointProto)
+            .map(address -> 
AuthenticatedGcpServiceAddress.create(authenticatingService, address))
+            .map(WindmillServiceAddress::create);
+
+    return directEndpointIpV6Address.isPresent()
+        ? directEndpointIpV6Address
+        : tryParseEndpointIntoHostAndPort(endpointProto.getDirectEndpoint())
+            .map(WindmillServiceAddress::create);
+  }
+
+  private static Optional<HostAndPort> tryParseEndpointIntoHostAndPort(String 
directEndpoint) {
+    try {
+      return Optional.of(HostAndPort.fromString(directEndpoint));
+    } catch (IllegalArgumentException e) {
+      LOG.warn("{} cannot be parsed into a gcpServiceAddress", directEndpoint);
+      return Optional.empty();
+    }
+  }
+
+  private static Optional<HostAndPort> tryParseDirectEndpointIntoIpV6Address(
+      Windmill.WorkerMetadataResponse.Endpoint endpointProto) {
+    if (!endpointProto.hasDirectEndpoint()) {
+      return Optional.empty();
+    }
+
+    InetAddress directEndpointAddress = null;
+    try {
+      directEndpointAddress = 
Inet6Address.getByName(endpointProto.getDirectEndpoint());
+    } catch (UnknownHostException e) {
+      LOG.warn(
+          "Error occurred trying to parse direct_endpoint={} into IPv6 
address. Exception={}",
+          endpointProto.getDirectEndpoint(),
+          e.toString());

Review Comment:
   return here, otherwise directEndpointAddress may be null below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to