scwhittle commented on code in PR #34539:
URL: https://github.com/apache/beam/pull/34539#discussion_r2028435615


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillConnection.java:
##########
@@ -48,17 +49,21 @@ public static Builder builder() {
 
   public abstract String backendWorkerToken();
 
-  public abstract Optional<WindmillServiceAddress> directEndpoint();
+  abstract Optional<WindmillServiceAddress> directEndpoint();
 
-  public abstract CloudWindmillServiceV1Alpha1Stub stub();
+  abstract Supplier<CloudWindmillServiceV1Alpha1Stub> stubFactory();
+
+  public final CloudWindmillServiceV1Alpha1Stub newStub() {

Review Comment:
   if it's a supplier, I think currentStub() or something could be better name 
since new implies it is freshly allocated and not reused



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -621,18 +622,40 @@ private static void 
validateWorkerOptions(DataflowWorkerHarnessOptions options)
   }
 
   private static ChannelCachingStubFactory createFanOutStubFactory(
-      DataflowWorkerHarnessOptions workerOptions) {
-    return ChannelCachingRemoteStubFactory.create(
-        workerOptions.getGcpCredential(),
+      DataflowWorkerHarnessOptions workerOptions, ComputationConfig.Fetcher 
configFetcher) {
+    ChannelCache channelCache =
         ChannelCache.create(
-            serviceAddress ->
-                // IsolationChannel will create and manage separate RPC 
channels to the same
-                // serviceAddress.
-                IsolationChannel.create(
-                    () ->
-                        remoteChannel(
-                            serviceAddress,
-                            
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec()))));
+            serviceAddress -> {
+              // Always fetch the current flow control settings when we go to 
create the channel.
+              UserWorkerGrpcFlowControlSettings currentFlowControlSettings =
+                  configFetcher
+                      .getGlobalConfigHandle()
+                      .getConfig()
+                      .userWorkerJobSettings()
+                      .getFlowControlSettings();
+              // IsolationChannel will create and manage separate RPC channels 
to the same
+              // serviceAddress.
+              return IsolationChannel.create(
+                  () ->
+                      remoteChannel(
+                          serviceAddress,
+                          
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(),
+                          currentFlowControlSettings),
+                  currentFlowControlSettings.getOnReadyThresholdBytes());
+            },
+            configFetcher

Review Comment:
   ditto seems simpler if we remove this constructor param, and have the 
registeredconfig observer trigger and update the flow control settings.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java:
##########
@@ -36,6 +37,8 @@
 public final class WindmillChannelFactory {
   public static final String LOCALHOST = "localhost";
   private static final int MAX_REMOTE_TRACE_EVENTS = 100;
+  // 1MiB.
+  private static final int MAX_INBOUND_METADATA_SIZE_BYTES = 1024 * 1024;
   // 10MiB.
   private static final int WINDMILL_MAX_FLOW_CONTROL_WINDOW =

Review Comment:
   Could add comment that this is chosen to be greater than 2 * max message size



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java:
##########
@@ -72,16 +78,22 @@ public static ManagedChannel remoteChannel(
 
   private static ManagedChannel remoteDirectChannel(
       AuthenticatedGcpServiceAddress authenticatedGcpServiceAddress,
-      int windmillServiceRpcChannelTimeoutSec) {
-    return withDefaultChannelOptions(
+      int windmillServiceRpcChannelTimeoutSec,
+      UserWorkerGrpcFlowControlSettings flowControlSettings) {
+    NettyChannelBuilder channelBuilder =
+        withDefaultChannelOptions(
             NettyChannelBuilder.forAddress(
                     
authenticatedGcpServiceAddress.gcpServiceAddress().getHost(),
                     // Ports are required for direct channels.
                     
authenticatedGcpServiceAddress.gcpServiceAddress().getPort(),
                     new AltsChannelCredentials.Builder().build())
                 
.overrideAuthority(authenticatedGcpServiceAddress.authenticatingService()),
-            windmillServiceRpcChannelTimeoutSec)
-        .build();
+            windmillServiceRpcChannelTimeoutSec);
+    int flowControlWindowSizeBytes =
+        Math.max(WINDMILL_MAX_FLOW_CONTROL_WINDOW, 
flowControlSettings.getFlowControlWindowBytes());

Review Comment:
   seems like it should be WINDMILL_MIN_FLOW_CONTROL_WINDOW



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillConnection.java:
##########
@@ -48,17 +49,21 @@ public static Builder builder() {
 
   public abstract String backendWorkerToken();
 
-  public abstract Optional<WindmillServiceAddress> directEndpoint();
+  abstract Optional<WindmillServiceAddress> directEndpoint();
 
-  public abstract CloudWindmillServiceV1Alpha1Stub stub();
+  abstract Supplier<CloudWindmillServiceV1Alpha1Stub> stubFactory();
+
+  public final CloudWindmillServiceV1Alpha1Stub newStub() {
+    return stubFactory().get();
+  }
 
   @AutoValue.Builder
   public abstract static class Builder {
     abstract Builder setBackendWorkerToken(String backendWorkerToken);
 
-    public abstract Builder setDirectEndpoint(WindmillServiceAddress value);
+    abstract Builder setDirectEndpoint(WindmillServiceAddress value);
 
-    public abstract Builder setStub(CloudWindmillServiceV1Alpha1Stub stub);
+    public abstract Builder 
setStubFactory(Supplier<CloudWindmillServiceV1Alpha1Stub> stubFactory);

Review Comment:
   should we name Supplier? factory implies it is making something new, where 
supplier could just be returning a cached stub (which is what we want if it 
hasn't been modified)



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java:
##########
@@ -107,6 +122,23 @@ public ManagedChannel get(WindmillServiceAddress 
windmillServiceAddress) {
     return channelCache.getUnchecked(windmillServiceAddress);
   }
 
+  public synchronized void consumeFlowControlSettings(
+      UserWorkerGrpcFlowControlSettings flowControlSettings) {
+    if (!flowControlSettings.equals(currentFlowControlSettings)) {

Review Comment:
   is there a way to check equilvalence instead of equals()? if field is 
explicilty 10MB versus default of 10MB we don't want to do anything either.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java:
##########
@@ -122,8 +135,7 @@ private static NettyChannelBuilder 
withDefaultChannelOptions(
         .maxInboundMessageSize(Integer.MAX_VALUE)
         .maxTraceEvents(MAX_REMOTE_TRACE_EVENTS)
         // 1MiB

Review Comment:
   rm comment



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java:
##########
@@ -621,18 +622,40 @@ private static void 
validateWorkerOptions(DataflowWorkerHarnessOptions options)
   }
 
   private static ChannelCachingStubFactory createFanOutStubFactory(
-      DataflowWorkerHarnessOptions workerOptions) {
-    return ChannelCachingRemoteStubFactory.create(
-        workerOptions.getGcpCredential(),
+      DataflowWorkerHarnessOptions workerOptions, ComputationConfig.Fetcher 
configFetcher) {
+    ChannelCache channelCache =
         ChannelCache.create(
-            serviceAddress ->
-                // IsolationChannel will create and manage separate RPC 
channels to the same
-                // serviceAddress.
-                IsolationChannel.create(
-                    () ->
-                        remoteChannel(
-                            serviceAddress,
-                            
workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec()))));
+            serviceAddress -> {

Review Comment:
   could we pass in the cache's view of the settings instead?
   
   It seems like it could be simpler to just have the single watcher for 
settings instead of also fetching here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to