Repository: beam
Updated Branches:
  refs/heads/master 4d1db2265 -> eb0850ef8


Increase the gRPC message size to max value


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b424aa04
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b424aa04
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b424aa04

Branch: refs/heads/master
Commit: b424aa0409b507fe1c0c56a5f652d9be6458de66
Parents: 4d1db22
Author: Vikas Kedigehalli <vika...@google.com>
Authored: Tue Jul 18 10:06:46 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 19 13:17:37 2017 -0700

----------------------------------------------------------------------
 .../beam/fn/harness/channel/ManagedChannelFactory.java       | 6 ++++++
 sdks/python/apache_beam/runners/worker/data_plane.py         | 8 +++++++-
 2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b424aa04/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
index d26f4a5..3138bab 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/channel/ManagedChannelFactory.java
@@ -61,6 +61,9 @@ public abstract class ManagedChannelFactory {
               ? EpollDomainSocketChannel.class : EpollSocketChannel.class)
           .eventLoopGroup(new EpollEventLoopGroup())
           .usePlaintext(true)
+          // Set the message size to max value here. The actual size is 
governed by the
+          // buffer size in the layers above.
+          .maxInboundMessageSize(Integer.MAX_VALUE)
           .build();
     }
   }
@@ -74,6 +77,9 @@ public abstract class ManagedChannelFactory {
     public ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor) {
       return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl())
           .usePlaintext(true)
+          // Set the message size to max value here. The actual size is 
governed by the
+          // buffer size in the layers above.
+          .maxInboundMessageSize(Integer.MAX_VALUE)
           .build();
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/b424aa04/sdks/python/apache_beam/runners/worker/data_plane.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py 
b/sdks/python/apache_beam/runners/worker/data_plane.py
index 26f65ee..e713041 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -269,7 +269,13 @@ class GrpcClientDataChannelFactory(DataChannelFactory):
     url = remote_grpc_port.api_service_descriptor.url
     if url not in self._data_channel_cache:
       logging.info('Creating channel for %s', url)
-      grpc_channel = grpc.insecure_channel(url)
+      grpc_channel = grpc.insecure_channel(
+          url,
+          # Options to have no limits (-1) on the size of the messages
+          # received or sent over the data plane. The actual buffer size is
+          # controlled in a layer above.
+          options=[("grpc.max_receive_message_length", -1),
+                   ("grpc.max_send_message_length", -1)])
       self._data_channel_cache[url] = GrpcClientDataChannel(
           beam_fn_api_pb2.BeamFnDataStub(grpc_channel))
     return self._data_channel_cache[url]

Reply via email to