scwhittle commented on code in PR #34539: URL: https://github.com/apache/beam/pull/34539#discussion_r2028435615
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillConnection.java: ########## @@ -48,17 +49,21 @@ public static Builder builder() { public abstract String backendWorkerToken(); - public abstract Optional<WindmillServiceAddress> directEndpoint(); + abstract Optional<WindmillServiceAddress> directEndpoint(); - public abstract CloudWindmillServiceV1Alpha1Stub stub(); + abstract Supplier<CloudWindmillServiceV1Alpha1Stub> stubFactory(); + + public final CloudWindmillServiceV1Alpha1Stub newStub() { Review Comment: if it's a supplier, I think currentStub() or something could be better name since new implies it is freshly allocated and not reused ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -621,18 +622,40 @@ private static void validateWorkerOptions(DataflowWorkerHarnessOptions options) } private static ChannelCachingStubFactory createFanOutStubFactory( - DataflowWorkerHarnessOptions workerOptions) { - return ChannelCachingRemoteStubFactory.create( - workerOptions.getGcpCredential(), + DataflowWorkerHarnessOptions workerOptions, ComputationConfig.Fetcher configFetcher) { + ChannelCache channelCache = ChannelCache.create( - serviceAddress -> - // IsolationChannel will create and manage separate RPC channels to the same - // serviceAddress. - IsolationChannel.create( - () -> - remoteChannel( - serviceAddress, - workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec())))); + serviceAddress -> { + // Always fetch the current flow control settings when we go to create the channel. + UserWorkerGrpcFlowControlSettings currentFlowControlSettings = + configFetcher + .getGlobalConfigHandle() + .getConfig() + .userWorkerJobSettings() + .getFlowControlSettings(); + // IsolationChannel will create and manage separate RPC channels to the same + // serviceAddress. + return IsolationChannel.create( + () -> + remoteChannel( + serviceAddress, + workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(), + currentFlowControlSettings), + currentFlowControlSettings.getOnReadyThresholdBytes()); + }, + configFetcher Review Comment: ditto seems simpler if we remove this constructor param, and have the registeredconfig observer trigger and update the flow control settings. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java: ########## @@ -36,6 +37,8 @@ public final class WindmillChannelFactory { public static final String LOCALHOST = "localhost"; private static final int MAX_REMOTE_TRACE_EVENTS = 100; + // 1MiB. + private static final int MAX_INBOUND_METADATA_SIZE_BYTES = 1024 * 1024; // 10MiB. private static final int WINDMILL_MAX_FLOW_CONTROL_WINDOW = Review Comment: Could add comment that this is chosen to be greater than 2 * max message size ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java: ########## @@ -72,16 +78,22 @@ public static ManagedChannel remoteChannel( private static ManagedChannel remoteDirectChannel( AuthenticatedGcpServiceAddress authenticatedGcpServiceAddress, - int windmillServiceRpcChannelTimeoutSec) { - return withDefaultChannelOptions( + int windmillServiceRpcChannelTimeoutSec, + UserWorkerGrpcFlowControlSettings flowControlSettings) { + NettyChannelBuilder channelBuilder = + withDefaultChannelOptions( NettyChannelBuilder.forAddress( authenticatedGcpServiceAddress.gcpServiceAddress().getHost(), // Ports are required for direct channels. authenticatedGcpServiceAddress.gcpServiceAddress().getPort(), new AltsChannelCredentials.Builder().build()) .overrideAuthority(authenticatedGcpServiceAddress.authenticatingService()), - windmillServiceRpcChannelTimeoutSec) - .build(); + windmillServiceRpcChannelTimeoutSec); + int flowControlWindowSizeBytes = + Math.max(WINDMILL_MAX_FLOW_CONTROL_WINDOW, flowControlSettings.getFlowControlWindowBytes()); Review Comment: seems like it should be WINDMILL_MIN_FLOW_CONTROL_WINDOW ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/WindmillConnection.java: ########## @@ -48,17 +49,21 @@ public static Builder builder() { public abstract String backendWorkerToken(); - public abstract Optional<WindmillServiceAddress> directEndpoint(); + abstract Optional<WindmillServiceAddress> directEndpoint(); - public abstract CloudWindmillServiceV1Alpha1Stub stub(); + abstract Supplier<CloudWindmillServiceV1Alpha1Stub> stubFactory(); + + public final CloudWindmillServiceV1Alpha1Stub newStub() { + return stubFactory().get(); + } @AutoValue.Builder public abstract static class Builder { abstract Builder setBackendWorkerToken(String backendWorkerToken); - public abstract Builder setDirectEndpoint(WindmillServiceAddress value); + abstract Builder setDirectEndpoint(WindmillServiceAddress value); - public abstract Builder setStub(CloudWindmillServiceV1Alpha1Stub stub); + public abstract Builder setStubFactory(Supplier<CloudWindmillServiceV1Alpha1Stub> stubFactory); Review Comment: should we name Supplier? factory implies it is making something new, where supplier could just be returning a cached stub (which is what we want if it hasn't been modified) ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ChannelCache.java: ########## @@ -107,6 +122,23 @@ public ManagedChannel get(WindmillServiceAddress windmillServiceAddress) { return channelCache.getUnchecked(windmillServiceAddress); } + public synchronized void consumeFlowControlSettings( + UserWorkerGrpcFlowControlSettings flowControlSettings) { + if (!flowControlSettings.equals(currentFlowControlSettings)) { Review Comment: is there a way to check equilvalence instead of equals()? if field is explicilty 10MB versus default of 10MB we don't want to do anything either. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillChannelFactory.java: ########## @@ -122,8 +135,7 @@ private static NettyChannelBuilder withDefaultChannelOptions( .maxInboundMessageSize(Integer.MAX_VALUE) .maxTraceEvents(MAX_REMOTE_TRACE_EVENTS) // 1MiB Review Comment: rm comment ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java: ########## @@ -621,18 +622,40 @@ private static void validateWorkerOptions(DataflowWorkerHarnessOptions options) } private static ChannelCachingStubFactory createFanOutStubFactory( - DataflowWorkerHarnessOptions workerOptions) { - return ChannelCachingRemoteStubFactory.create( - workerOptions.getGcpCredential(), + DataflowWorkerHarnessOptions workerOptions, ComputationConfig.Fetcher configFetcher) { + ChannelCache channelCache = ChannelCache.create( - serviceAddress -> - // IsolationChannel will create and manage separate RPC channels to the same - // serviceAddress. - IsolationChannel.create( - () -> - remoteChannel( - serviceAddress, - workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec())))); + serviceAddress -> { Review Comment: could we pass in the cache's view of the settings instead? It seems like it could be simpler to just have the single watcher for settings instead of also fetching here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org