Repository: beam Updated Branches: refs/heads/master 4d1db2265 -> eb0850ef8
Increase the gRPC message size to max value Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b424aa04 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b424aa04 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b424aa04 Branch: refs/heads/master Commit: b424aa0409b507fe1c0c56a5f652d9be6458de66 Parents: 4d1db22 Author: Vikas Kedigehalli <vika...@google.com> Authored: Tue Jul 18 10:06:46 2017 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Wed Jul 19 13:17:37 2017 -0700 ---------------------------------------------------------------------- .../beam/fn/harness/channel/ManagedChannelFactory.java | 6 ++++++ sdks/python/apache_beam/runners/worker/data_plane.py | 8 +++++++- 2 files changed, 13 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b424aa04/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java index d26f4a5..3138bab 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java @@ -61,6 +61,9 @@ public abstract class ManagedChannelFactory { ? EpollDomainSocketChannel.class : EpollSocketChannel.class) .eventLoopGroup(new EpollEventLoopGroup()) .usePlaintext(true) + // Set the message size to max value here. The actual size is governed by the + // buffer size in the layers above. + .maxInboundMessageSize(Integer.MAX_VALUE) .build(); } } @@ -74,6 +77,9 @@ public abstract class ManagedChannelFactory { public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) { return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl()) .usePlaintext(true) + // Set the message size to max value here. The actual size is governed by the + // buffer size in the layers above. + .maxInboundMessageSize(Integer.MAX_VALUE) .build(); } } http://git-wip-us.apache.org/repos/asf/beam/blob/b424aa04/sdks/python/apache_beam/runners/worker/data_plane.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py index 26f65ee..e713041 100644 --- a/sdks/python/apache_beam/runners/worker/data_plane.py +++ b/sdks/python/apache_beam/runners/worker/data_plane.py @@ -269,7 +269,13 @@ class GrpcClientDataChannelFactory(DataChannelFactory): url = remote_grpc_port.api_service_descriptor.url if url not in self._data_channel_cache: logging.info('Creating channel for %s', url) - grpc_channel = grpc.insecure_channel(url) + grpc_channel = grpc.insecure_channel( + url, + # Options to have no limits (-1) on the size of the messages + # received or sent over the data plane. The actual buffer size is + # controlled in a layer above. + options=[("grpc.max_receive_message_length", -1), + ("grpc.max_send_message_length", -1)]) self._data_channel_cache[url] = GrpcClientDataChannel( beam_fn_api_pb2.BeamFnDataStub(grpc_channel)) return self._data_channel_cache[url]