Repository: incubator-beam Updated Branches: refs/heads/python-sdk cce4331dc -> c1440f7aa
Improve GcsIO throughput by 10x This change increases the read buffer used from 1M to 16M. Previously, the speed of reading an incompressible file were: (50 MB: 3.17 MB/s, 100 MB: 3.79 MB/s, 200 MB: 4.13 MB/s, 400 MB: 4.24 MB/s). The speed is now improved to: (50 MB: 24.21 MB/s, 100 MB: 42.70 MB/s, 200 MB: 42.89 MB/s, 400 MB: 46.92 MB/s). Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e4a332d9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e4a332d9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e4a332d9 Branch: refs/heads/python-sdk Commit: e4a332d9de5eca941e08f23242cd63bb83148312 Parents: cce4331 Author: Charles Chen <c...@google.com> Authored: Thu Nov 17 11:46:44 2016 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Fri Nov 18 21:53:26 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/io/gcsio.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4a332d9/sdks/python/apache_beam/io/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index 1b08994..4f310be 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -47,7 +47,23 @@ except ImportError: 'Google Cloud Storage I/O not supported for this execution environment ' '(could not import storage API client).') -DEFAULT_READ_BUFFER_SIZE = 1024 * 1024 +# This is the size of each partial-file read operation from GCS. This +# parameter was chosen to give good throughput while keeping memory usage at +# a reasonable level; the following table shows throughput reached when +# reading files of a given size with a chosen buffer size and informed the +# choice of the value, as of 11/2016: +# +# +---------------+------------+-------------+-------------+-------------+ +# | | 50 MB file | 100 MB file | 200 MB file | 400 MB file | +# +---------------+------------+-------------+-------------+-------------+ +# | 8 MB buffer | 17.12 MB/s | 22.67 MB/s | 23.81 MB/s | 26.05 MB/s | +# | 16 MB buffer | 24.21 MB/s | 42.70 MB/s | 42.89 MB/s | 46.92 MB/s | +# | 32 MB buffer | 28.53 MB/s | 48.08 MB/s | 54.30 MB/s | 54.65 MB/s | +# | 400 MB buffer | 34.72 MB/s | 71.13 MB/s | 79.13 MB/s | 85.39 MB/s | +# +---------------+------------+-------------+-------------+-------------+ +DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024 + +# This is the size of chunks used when writing to GCS. WRITE_CHUNK_SIZE = 8 * 1024 * 1024 @@ -373,7 +389,7 @@ class GcsBufferedReader(object): # Initialize read buffer state. self.download_stream = StringIO.StringIO() self.downloader = transfer.Download( - self.download_stream, auto_transfer=False) + self.download_stream, auto_transfer=False, chunksize=buffer_size) self.client.objects.Get(get_request, download=self.downloader) self.position = 0 self.buffer = ''