scwhittle commented on code in PR #30046:
URL: https://github.com/apache/beam/pull/30046#discussion_r1458695977


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##########
@@ -90,31 +137,20 @@ public static WindmillEndpoints.Builder builder() {
    */
   @AutoValue
   public abstract static class Endpoint {
-    /**
-     * {@link WindmillServiceAddress} representation of {@link
-     * Windmill.WorkerMetadataResponse.Endpoint#getDirectEndpoint()}. The 
proto's direct_endpoint
-     * string can be converted to either {@link Inet6Address} or {@link 
HostAndPort}.
-     */
-    public abstract Optional<WindmillServiceAddress> directEndpoint();
-
-    /**
-     * Corresponds to {@link 
Windmill.WorkerMetadataResponse.Endpoint#getWorkerToken()} in the
-     * windmill.proto file.
-     */
-    public abstract Optional<String> workerToken();
-
     public static Endpoint.Builder builder() {
       return new AutoValue_WindmillEndpoints_Endpoint.Builder();
     }
 
-    public static Endpoint from(Windmill.WorkerMetadataResponse.Endpoint 
endpointProto) {
+    public static Endpoint from(
+        Windmill.WorkerMetadataResponse.Endpoint endpointProto, String 
authenticatingService) {
       Endpoint.Builder endpointBuilder = Endpoint.builder();
       if (endpointProto.hasDirectEndpoint() && 
!endpointProto.getDirectEndpoint().isEmpty()) {
         parseDirectEndpoint(endpointProto.getDirectEndpoint())
             .ifPresent(endpointBuilder::setDirectEndpoint);
       }
-      if (endpointProto.hasWorkerToken() && 
!endpointProto.getWorkerToken().isEmpty()) {
-        endpointBuilder.setWorkerToken(endpointProto.getWorkerToken());
+      if (endpointProto.hasBackendWorkerToken()

Review Comment:
   ditto



##########
runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto:
##########
@@ -853,14 +858,25 @@ message WorkerMetadataResponse {
   message Endpoint {
     // IPv6 address of a streaming engine windmill worker.
     optional string direct_endpoint = 1;
-    optional string worker_token = 2;
+    optional string backend_worker_token = 2;
+    optional int64 port = 3;
   }
+
   repeated Endpoint work_endpoints = 2;
 
   // Maps from GlobalData tag to the endpoint that should be used for GetData
   // calls to retrieve that global data.
   map<string, Endpoint> global_data_endpoints = 3;
 
+  // DNS name of the windmill worker service. This is used only in the case
+  // direct path communication is established using VIP. In order to enact
+  // Server Authorization checks, User Worker (client) set authority of the
+  // gRPC channel to this endpoint. Server Authorization checks ensure the
+  // servers clients connect to are being run by authorized borg roles. All

Review Comment:
   rm comment, or just shorten to specify that this should be set as the grpc 
authority without reasoning as it is more internal details.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##########
@@ -81,6 +69,65 @@ public static WindmillEndpoints.Builder builder() {
     return new AutoValue_WindmillEndpoints.Builder();
   }
 
+  private static Optional<WindmillServiceAddress> parseDirectEndpoint(String 
directEndpoint) {
+    Optional<WindmillServiceAddress> directEndpointIpV6Address =
+        
tryParseDirectEndpointIntoIpV6Address(directEndpoint).map(WindmillServiceAddress::create);
+
+    return directEndpointIpV6Address.isPresent()
+        ? directEndpointIpV6Address
+        : 
tryParseEndpointIntoHostAndPort(directEndpoint).map(WindmillServiceAddress::create);

Review Comment:
   nit, the WindmillServiceAddress::create is common,
   so maybe cleaner to do
   return
   
tryParseDirectEndpointIntoIpV6Address(directEndpoint).orElseGet(tryParseEndpointIntoHostAndPort(directEndpoint)).map(WindmillServiceAddress::create);



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillServerStub.java:
##########
@@ -76,6 +76,10 @@ public abstract GetWorkStream getWorkStream(
   /** Returns the amount of time the server has been throttled and resets the 
time to 0. */
   public abstract long getAndResetThrottleTime();
 
+  public long clientId() {
+    return 0L;

Review Comment:
   client id is supposed to change on process restart. It is otherwise not a 
value that has to be coordinated in advance with the service so it can just be 
a random value as it was before.  I think that it can stay in 
StreamingDataflowWorker because it also doesn't need to be different for each 
endpoint.
   
   This helps the backend speed up retries, ensure caching is correct etc.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -45,92 +48,156 @@ class GrpcDispatcherClient {
   private final WindmillStubFactory windmillStubFactory;
 
   @GuardedBy("this")
-  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
-
-  @GuardedBy("this")
-  private final Set<HostAndPort> dispatcherEndpoints;
+  private final AtomicReference<DispatcherStubs> dispatcherStubs;
 
   @GuardedBy("this")
   private final Random rand;
 
   private GrpcDispatcherClient(
       WindmillStubFactory windmillStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
-      Set<HostAndPort> dispatcherEndpoints,
+      AtomicReference<DispatcherStubs> dispatcherStubs,

Review Comment:
   just take in DispatcherStubs and wrap in atomic reference?
   Seems to otherwise expose unencessarily how the stubs is used.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillEndpoints.java:
##########
@@ -90,31 +137,20 @@ public static WindmillEndpoints.Builder builder() {
    */
   @AutoValue
   public abstract static class Endpoint {
-    /**
-     * {@link WindmillServiceAddress} representation of {@link
-     * Windmill.WorkerMetadataResponse.Endpoint#getDirectEndpoint()}. The 
proto's direct_endpoint
-     * string can be converted to either {@link Inet6Address} or {@link 
HostAndPort}.
-     */
-    public abstract Optional<WindmillServiceAddress> directEndpoint();
-
-    /**
-     * Corresponds to {@link 
Windmill.WorkerMetadataResponse.Endpoint#getWorkerToken()} in the
-     * windmill.proto file.
-     */
-    public abstract Optional<String> workerToken();
-
     public static Endpoint.Builder builder() {
       return new AutoValue_WindmillEndpoints_Endpoint.Builder();
     }
 
-    public static Endpoint from(Windmill.WorkerMetadataResponse.Endpoint 
endpointProto) {
+    public static Endpoint from(
+        Windmill.WorkerMetadataResponse.Endpoint endpointProto, String 
authenticatingService) {
       Endpoint.Builder endpointBuilder = Endpoint.builder();
       if (endpointProto.hasDirectEndpoint() && 
!endpointProto.getDirectEndpoint().isEmpty()) {

Review Comment:
   remove the has check, can rely on default value being empty



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -45,92 +48,156 @@ class GrpcDispatcherClient {
   private final WindmillStubFactory windmillStubFactory;
 
   @GuardedBy("this")
-  private final List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs;
-
-  @GuardedBy("this")
-  private final Set<HostAndPort> dispatcherEndpoints;
+  private final AtomicReference<DispatcherStubs> dispatcherStubs;
 
   @GuardedBy("this")
   private final Random rand;
 
   private GrpcDispatcherClient(
       WindmillStubFactory windmillStubFactory,
-      List<CloudWindmillServiceV1Alpha1Stub> dispatcherStubs,
-      Set<HostAndPort> dispatcherEndpoints,
+      AtomicReference<DispatcherStubs> dispatcherStubs,

Review Comment:
   name initialDispatcherStubs if it can change now?



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java:
##########
@@ -45,92 +48,156 @@ class GrpcDispatcherClient {
   private final WindmillStubFactory windmillStubFactory;
 
   @GuardedBy("this")

Review Comment:
   seems like this wouldn't need guarding if it is atomic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to