This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 7c5ce8b0c82f6b937bded81ceb96114f8ff6ea10 Author: Joshua Carp <[email protected]> AuthorDate: Thu Dec 24 08:12:06 2020 -0500 Add timeout option to gcs hook methods. (#13156) (cherry picked from commit 323084e97ddacbc5512709bf0cad8f53082d16b0) --- airflow/providers/google/cloud/hooks/gcs.py | 30 ++++++++++++++++++++------ setup.py | 2 +- tests/providers/google/cloud/hooks/test_gcs.py | 14 ++++++------ 3 files changed, 32 insertions(+), 14 deletions(-) diff --git a/airflow/providers/google/cloud/hooks/gcs.py b/airflow/providers/google/cloud/hooks/gcs.py index 0ca3961..72a23ea 100644 --- a/airflow/providers/google/cloud/hooks/gcs.py +++ b/airflow/providers/google/cloud/hooks/gcs.py @@ -40,6 +40,9 @@ from airflow.version import version RT = TypeVar('RT') # pylint: disable=invalid-name T = TypeVar("T", bound=Callable) # pylint: disable=invalid-name +# Use default timeout from google-cloud-storage +DEFAULT_TIMEOUT = 60 + def _fallback_object_url_to_object_name_and_bucket_name( object_url_keyword_arg_name='object_url', @@ -257,7 +260,12 @@ class GCSHook(GoogleBaseHook): ) def download( - self, object_name: str, bucket_name: Optional[str], filename: Optional[str] = None + self, + object_name: str, + bucket_name: Optional[str], + filename: Optional[str] = None, + chunk_size: Optional[int] = None, + timeout: Optional[int] = DEFAULT_TIMEOUT, ) -> Union[str, bytes]: """ Downloads a file from Google Cloud Storage. @@ -273,16 +281,20 @@ class GCSHook(GoogleBaseHook): :type object_name: str :param filename: If set, a local file path where the file should be written to. :type filename: str + :param chunk_size: Blob chunk size. + :type chunk_size: int + :param timeout: Request timeout in seconds. + :type timeout: int """ # TODO: future improvement check file size before downloading, # to check for local space availability client = self.get_conn() bucket = client.bucket(bucket_name) - blob = bucket.blob(blob_name=object_name) + blob = bucket.blob(blob_name=object_name, chunk_size=chunk_size) if filename: - blob.download_to_filename(filename) + blob.download_to_filename(filename, timeout=timeout) self.log.info('File downloaded to %s', filename) return filename else: @@ -359,6 +371,8 @@ class GCSHook(GoogleBaseHook): mime_type: Optional[str] = None, gzip: bool = False, encoding: str = 'utf-8', + chunk_size: Optional[int] = None, + timeout: Optional[int] = DEFAULT_TIMEOUT, ) -> None: """ Uploads a local file or file data as string or bytes to Google Cloud Storage. @@ -377,10 +391,14 @@ class GCSHook(GoogleBaseHook): :type gzip: bool :param encoding: bytes encoding for file data if provided as string :type encoding: str + :param chunk_size: Blob chunk size. + :type chunk_size: int + :param timeout: Request timeout in seconds. + :type timeout: int """ client = self.get_conn() bucket = client.bucket(bucket_name) - blob = bucket.blob(blob_name=object_name) + blob = bucket.blob(blob_name=object_name, chunk_size=chunk_size) if filename and data: raise ValueError( "'filename' and 'data' parameter provided. Please " @@ -398,7 +416,7 @@ class GCSHook(GoogleBaseHook): shutil.copyfileobj(f_in, f_out) filename = filename_gz - blob.upload_from_filename(filename=filename, content_type=mime_type) + blob.upload_from_filename(filename=filename, content_type=mime_type, timeout=timeout) if gzip: os.remove(filename) self.log.info('File %s uploaded to %s in %s bucket', filename, object_name, bucket_name) @@ -412,7 +430,7 @@ class GCSHook(GoogleBaseHook): with gz.GzipFile(fileobj=out, mode="w") as f: f.write(data) data = out.getvalue() - blob.upload_from_string(data, content_type=mime_type) + blob.upload_from_string(data, content_type=mime_type, timeout=timeout) self.log.info('Data stream uploaded to %s in %s bucket', object_name, bucket_name) else: raise ValueError("'filename' and 'data' parameter missing. One is required to upload to gcs.") diff --git a/setup.py b/setup.py index ae18e57..3df9e47 100644 --- a/setup.py +++ b/setup.py @@ -301,7 +301,7 @@ google = [ 'google-cloud-secret-manager>=0.2.0,<2.0.0', 'google-cloud-spanner>=1.10.0,<2.0.0', 'google-cloud-speech>=0.36.3,<2.0.0', - 'google-cloud-storage>=1.16,<2.0.0', + 'google-cloud-storage>=1.30,<2.0.0', 'google-cloud-tasks>=1.2.1,<2.0.0', 'google-cloud-texttospeech>=0.4.0,<2.0.0', 'google-cloud-translate>=1.5.0,<2.0.0', diff --git a/tests/providers/google/cloud/hooks/test_gcs.py b/tests/providers/google/cloud/hooks/test_gcs.py index dffe5ad..1ce44bb 100644 --- a/tests/providers/google/cloud/hooks/test_gcs.py +++ b/tests/providers/google/cloud/hooks/test_gcs.py @@ -672,7 +672,7 @@ class TestGCSHook(unittest.TestCase): ) self.assertEqual(response, test_file) - download_filename_method.assert_called_once_with(test_file) + download_filename_method.assert_called_once_with(test_file, timeout=60) @mock.patch(GCS_STRING.format('NamedTemporaryFile')) @mock.patch(GCS_STRING.format('GCSHook.get_conn')) @@ -697,7 +697,7 @@ class TestGCSHook(unittest.TestCase): with self.gcs_hook.provide_file(bucket_name=test_bucket, object_name=test_object) as response: self.assertEqual(test_file, response.name) - download_filename_method.assert_called_once_with(test_file) + download_filename_method.assert_called_once_with(test_file, timeout=60) mock_temp_file.assert_has_calls( [ mock.call(suffix='test_object'), @@ -762,7 +762,7 @@ class TestGCSHookUpload(unittest.TestCase): self.gcs_hook.upload(test_bucket, test_object, filename=self.testfile.name) upload_method.assert_called_once_with( - filename=self.testfile.name, content_type='application/octet-stream' + filename=self.testfile.name, content_type='application/octet-stream', timeout=60 ) @mock.patch(GCS_STRING.format('GCSHook.get_conn')) @@ -782,7 +782,7 @@ class TestGCSHookUpload(unittest.TestCase): self.gcs_hook.upload(test_bucket, test_object, data=self.testdata_str) - upload_method.assert_called_once_with(self.testdata_str, content_type='text/plain') + upload_method.assert_called_once_with(self.testdata_str, content_type='text/plain', timeout=60) @mock.patch(GCS_STRING.format('GCSHook.get_conn')) def test_upload_data_bytes(self, mock_service): @@ -793,7 +793,7 @@ class TestGCSHookUpload(unittest.TestCase): self.gcs_hook.upload(test_bucket, test_object, data=self.testdata_bytes) - upload_method.assert_called_once_with(self.testdata_bytes, content_type='text/plain') + upload_method.assert_called_once_with(self.testdata_bytes, content_type='text/plain', timeout=60) @mock.patch(GCS_STRING.format('BytesIO')) @mock.patch(GCS_STRING.format('gz.GzipFile')) @@ -812,7 +812,7 @@ class TestGCSHookUpload(unittest.TestCase): byte_str = bytes(self.testdata_str, encoding) mock_gzip.assert_called_once_with(fileobj=mock_bytes_io.return_value, mode="w") gzip_ctx.write.assert_called_once_with(byte_str) - upload_method.assert_called_once_with(data, content_type='text/plain') + upload_method.assert_called_once_with(data, content_type='text/plain', timeout=60) @mock.patch(GCS_STRING.format('BytesIO')) @mock.patch(GCS_STRING.format('gz.GzipFile')) @@ -829,7 +829,7 @@ class TestGCSHookUpload(unittest.TestCase): mock_gzip.assert_called_once_with(fileobj=mock_bytes_io.return_value, mode="w") gzip_ctx.write.assert_called_once_with(self.testdata_bytes) - upload_method.assert_called_once_with(data, content_type='text/plain') + upload_method.assert_called_once_with(data, content_type='text/plain', timeout=60) @mock.patch(GCS_STRING.format('GCSHook.get_conn')) def test_upload_exceptions(self, mock_service):
