Repository: beam Updated Branches: refs/heads/master 0bd47c076 -> b8ac32641
For GCS operations use an http client with a default timeout value. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/68f1fb64 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/68f1fb64 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/68f1fb64 Branch: refs/heads/master Commit: 68f1fb64fd2565e287e322d715ca778d01e7137b Parents: 0bd47c0 Author: Ahmet Altay <[email protected]> Authored: Fri Jun 30 17:37:33 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Tue Jul 4 11:07:25 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/gcp/gcsio.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/68f1fb64/sdks/python/apache_beam/io/gcp/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index d43c8ba..643fbc7 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -31,6 +31,7 @@ import re import threading import time import traceback +import httplib2 from apache_beam.utils import retry @@ -68,6 +69,10 @@ except ImportError: # +---------------+------------+-------------+-------------+-------------+ DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024 +# This is the number of seconds the library will wait for GCS operations to +# complete. +DEFAULT_HTTP_TIMEOUT_SECONDS = 60 + # This is the number of seconds the library will wait for a partial-file read # operation from GCS to complete before retrying. DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS = 60 @@ -99,6 +104,7 @@ class GcsIO(object): def __new__(cls, storage_client=None): if storage_client: + # This path is only used for testing. return super(GcsIO, cls).__new__(cls, storage_client) else: # Create a single storage client for each thread. We would like to avoid @@ -108,7 +114,9 @@ class GcsIO(object): local_state = threading.local() if getattr(local_state, 'gcsio_instance', None) is None: credentials = auth.get_service_credentials() - storage_client = storage.StorageV1(credentials=credentials) + storage_client = storage.StorageV1( + credentials=credentials, + http=httplib2.Http(timeout=DEFAULT_HTTP_TIMEOUT_SECONDS)) local_state.gcsio_instance = ( super(GcsIO, cls).__new__(cls, storage_client)) local_state.gcsio_instance.client = storage_client
