This is an automated email from the ASF dual-hosted git repository. eladkal pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new d6e254db68 Deprecate `delimiter` param and source object's wildcards in GCS, introduce `match_glob` param. (#31261) d6e254db68 is described below commit d6e254db689db070f2f181006e7d6bc593482300 Author: Shahar Epstein <60007259+shah...@users.noreply.github.com> AuthorDate: Fri Jun 30 11:52:14 2023 +0300 Deprecate `delimiter` param and source object's wildcards in GCS, introduce `match_glob` param. (#31261) * Deprecate `delimiter` param and source object's wildcards in GCS, introduce `match_glob` param. --------- Co-authored-by: eladkal <45845474+elad...@users.noreply.github.com> --- .../providers/amazon/aws/transfers/gcs_to_s3.py | 18 +- airflow/providers/google/cloud/hooks/gcs.py | 126 +++++++++++-- airflow/providers/google/cloud/operators/gcs.py | 48 +++-- .../providers/google/cloud/transfers/gcs_to_gcs.py | 53 +++++- .../google/cloud/transfers/gcs_to_sftp.py | 5 +- .../google/suite/transfers/gcs_to_gdrive.py | 4 + .../operators/transfer/gcs_to_gcs.rst | 15 +- .../amazon/aws/transfers/test_gcs_to_s3.py | 195 ++++++++++++--------- tests/providers/google/cloud/hooks/test_gcs.py | 60 ++++++- tests/providers/google/cloud/operators/test_gcs.py | 16 +- .../google/cloud/transfers/test_gcs_to_gcs.py | 26 +-- .../google/cloud/transfers/test_gcs_to_sftp.py | 2 + .../google/suite/transfers/test_gcs_to_gdrive.py | 6 + .../google/cloud/gcs/example_gcs_to_gcs.py | 12 ++ 14 files changed, 448 insertions(+), 138 deletions(-) diff --git a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py index 4004eea8c2..2213de2b60 100644 --- a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py @@ -19,8 +19,10 @@ from __future__ import annotations import os +import warnings from typing import TYPE_CHECKING, Sequence +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models import BaseOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.google.cloud.hooks.gcs import GCSHook @@ -40,7 +42,7 @@ class GCSToS3Operator(BaseOperator): :param bucket: The Google Cloud Storage bucket to find the objects. (templated) :param prefix: Prefix string which filters objects whose name begin with this prefix. (templated) - :param delimiter: The delimiter by which you want to filter the objects. (templated) + :param delimiter: (Deprecated) The delimiter by which you want to filter the objects. (templated) For e.g to lists the CSV files from in a directory in GCS you would use delimiter='.csv'. :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud. @@ -76,6 +78,8 @@ class GCSToS3Operator(BaseOperator): object to be uploaded in S3 :param keep_directory_structure: (Optional) When set to False the path of the file on the bucket is recreated within path passed in dest_s3_key. + :param match_glob: (Optional) filters objects based on the glob pattern given by the string + (e.g, ``'**/*/.json'``) """ template_fields: Sequence[str] = ( @@ -102,12 +106,19 @@ class GCSToS3Operator(BaseOperator): dest_s3_extra_args: dict | None = None, s3_acl_policy: str | None = None, keep_directory_structure: bool = True, + match_glob: str | None = None, **kwargs, ) -> None: super().__init__(**kwargs) self.bucket = bucket self.prefix = prefix + if delimiter: + warnings.warn( + "Usage of 'delimiter' is deprecated, please use 'match_glob' instead", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) self.delimiter = delimiter self.gcp_conn_id = gcp_conn_id self.dest_aws_conn_id = dest_aws_conn_id @@ -118,6 +129,7 @@ class GCSToS3Operator(BaseOperator): self.dest_s3_extra_args = dest_s3_extra_args or {} self.s3_acl_policy = s3_acl_policy self.keep_directory_structure = keep_directory_structure + self.match_glob = match_glob def execute(self, context: Context) -> list[str]: # list all files in an Google Cloud Storage bucket @@ -133,7 +145,9 @@ class GCSToS3Operator(BaseOperator): self.prefix, ) - files = hook.list(bucket_name=self.bucket, prefix=self.prefix, delimiter=self.delimiter) + files = hook.list( + bucket_name=self.bucket, prefix=self.prefix, delimiter=self.delimiter, match_glob=self.match_glob + ) s3_hook = S3Hook( aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify, extra_args=self.dest_s3_extra_args diff --git a/airflow/providers/google/cloud/hooks/gcs.py b/airflow/providers/google/cloud/hooks/gcs.py index b17af1d9b9..42a27e80f8 100644 --- a/airflow/providers/google/cloud/hooks/gcs.py +++ b/airflow/providers/google/cloud/hooks/gcs.py @@ -24,6 +24,7 @@ import json import os import shutil import time +import warnings from contextlib import contextmanager from datetime import datetime from functools import partial @@ -44,7 +45,7 @@ from google.cloud.exceptions import GoogleCloudError from google.cloud.storage.retry import DEFAULT_RETRY from requests import Session -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.providers.google.cloud.utils.helpers import normalize_directory_path from airflow.providers.google.common.consts import CLIENT_INFO from airflow.providers.google.common.hooks.base_google import GoogleBaseAsyncHook, GoogleBaseHook @@ -709,6 +710,7 @@ class GCSHook(GoogleBaseHook): max_results: int | None = None, prefix: str | List[str] | None = None, delimiter: str | None = None, + match_glob: str | None = None, ): """ List all objects from the bucket with the given a single prefix or multiple prefixes. @@ -717,9 +719,19 @@ class GCSHook(GoogleBaseHook): :param versions: if true, list all versions of the objects :param max_results: max count of items to return in a single page of responses :param prefix: string or list of strings which filter objects whose name begin with it/them - :param delimiter: filters objects based on the delimiter (for e.g '.csv') + :param delimiter: (Deprecated) filters objects based on the delimiter (for e.g '.csv') + :param match_glob: (Optional) filters objects based on the glob pattern given by the string + (e.g, ``'**/*/.json'``). :return: a stream of object names matching the filtering criteria """ + if delimiter and delimiter != "/": + warnings.warn( + "Usage of 'delimiter' param is deprecated, please use 'match_glob' instead", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + if match_glob and delimiter and delimiter != "/": + raise AirflowException("'match_glob' param cannot be used with 'delimiter' that differs than '/'") objects = [] if isinstance(prefix, list): for prefix_item in prefix: @@ -730,6 +742,7 @@ class GCSHook(GoogleBaseHook): max_results=max_results, prefix=prefix_item, delimiter=delimiter, + match_glob=match_glob, ) ) else: @@ -740,6 +753,7 @@ class GCSHook(GoogleBaseHook): max_results=max_results, prefix=prefix, delimiter=delimiter, + match_glob=match_glob, ) ) return objects @@ -751,6 +765,7 @@ class GCSHook(GoogleBaseHook): max_results: int | None = None, prefix: str | None = None, delimiter: str | None = None, + match_glob: str | None = None, ) -> List: """ List all objects from the bucket with the give string prefix in name. @@ -759,7 +774,9 @@ class GCSHook(GoogleBaseHook): :param versions: if true, list all versions of the objects :param max_results: max count of items to return in a single page of responses :param prefix: string which filters objects whose name begin with it - :param delimiter: filters objects based on the delimiter (for e.g '.csv') + :param delimiter: (Deprecated) filters objects based on the delimiter (for e.g '.csv') + :param match_glob: (Optional) filters objects based on the glob pattern given by the string + (e.g, ``'**/*/.json'``). :return: a stream of object names matching the filtering criteria """ client = self.get_conn() @@ -768,13 +785,25 @@ class GCSHook(GoogleBaseHook): ids = [] page_token = None while True: - blobs = bucket.list_blobs( - max_results=max_results, - page_token=page_token, - prefix=prefix, - delimiter=delimiter, - versions=versions, - ) + if match_glob: + blobs = self._list_blobs_with_match_glob( + bucket=bucket, + client=client, + match_glob=match_glob, + max_results=max_results, + page_token=page_token, + path=bucket.path + "/o", + prefix=prefix, + versions=versions, + ) + else: + blobs = bucket.list_blobs( + max_results=max_results, + page_token=page_token, + prefix=prefix, + delimiter=delimiter, + versions=versions, + ) blob_names = [] for blob in blobs: @@ -792,6 +821,52 @@ class GCSHook(GoogleBaseHook): break return ids + @staticmethod + def _list_blobs_with_match_glob( + bucket, + client, + path: str, + max_results: int | None = None, + page_token: str | None = None, + match_glob: str | None = None, + prefix: str | None = None, + versions: bool | None = None, + ) -> Any: + """ + List blobs when match_glob param is given. + This method is a patched version of google.cloud.storage Client.list_blobs(). + It is used as a temporary workaround to support "match_glob" param, + as it isn't officially supported by GCS Python client. + (follow `issue #1035<https://github.com/googleapis/python-storage/issues/1035>`__). + """ + from google.api_core import page_iterator + from google.cloud.storage.bucket import _blobs_page_start, _item_to_blob + + extra_params: Any = {} + if prefix is not None: + extra_params["prefix"] = prefix + if match_glob is not None: + extra_params["matchGlob"] = match_glob + if versions is not None: + extra_params["versions"] = versions + api_request = functools.partial( + client._connection.api_request, timeout=DEFAULT_TIMEOUT, retry=DEFAULT_RETRY + ) + + blobs: Any = page_iterator.HTTPIterator( + client=client, + api_request=api_request, + path=path, + item_to_value=_item_to_blob, + page_token=page_token, + max_results=max_results, + extra_params=extra_params, + page_start=_blobs_page_start, + ) + blobs.prefixes = set() + blobs.bucket = bucket + return blobs + def list_by_timespan( self, bucket_name: str, @@ -801,6 +876,7 @@ class GCSHook(GoogleBaseHook): max_results: int | None = None, prefix: str | None = None, delimiter: str | None = None, + match_glob: str | None = None, ) -> List[str]: """ List all objects from the bucket with the give string prefix in name that were @@ -813,7 +889,9 @@ class GCSHook(GoogleBaseHook): :param max_results: max count of items to return in a single page of responses :param prefix: prefix string which filters objects whose name begin with this prefix - :param delimiter: filters objects based on the delimiter (for e.g '.csv') + :param delimiter: (Deprecated) filters objects based on the delimiter (for e.g '.csv') + :param match_glob: (Optional) filters objects based on the glob pattern given by the string + (e.g, ``'**/*/.json'``). :return: a stream of object names matching the filtering criteria """ client = self.get_conn() @@ -823,13 +901,25 @@ class GCSHook(GoogleBaseHook): page_token = None while True: - blobs = bucket.list_blobs( - max_results=max_results, - page_token=page_token, - prefix=prefix, - delimiter=delimiter, - versions=versions, - ) + if match_glob: + blobs = self._list_blobs_with_match_glob( + bucket=bucket, + client=client, + match_glob=match_glob, + max_results=max_results, + page_token=page_token, + path=bucket.path + "/o", + prefix=prefix, + versions=versions, + ) + else: + blobs = bucket.list_blobs( + max_results=max_results, + page_token=page_token, + prefix=prefix, + delimiter=delimiter, + versions=versions, + ) blob_names = [] for blob in blobs: diff --git a/airflow/providers/google/cloud/operators/gcs.py b/airflow/providers/google/cloud/operators/gcs.py index e2ac68c90d..6e50cbad2d 100644 --- a/airflow/providers/google/cloud/operators/gcs.py +++ b/airflow/providers/google/cloud/operators/gcs.py @@ -21,6 +21,7 @@ from __future__ import annotations import datetime import subprocess import sys +import warnings from pathlib import Path from tempfile import NamedTemporaryFile, TemporaryDirectory from typing import TYPE_CHECKING, Sequence @@ -33,7 +34,7 @@ if TYPE_CHECKING: from google.api_core.exceptions import Conflict from google.cloud.exceptions import GoogleCloudError -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.providers.google.cloud.hooks.gcs import GCSHook from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator from airflow.providers.google.common.links.storage import FileDetailsLink, StorageLink @@ -157,16 +158,17 @@ class GCSCreateBucketOperator(GoogleCloudBaseOperator): class GCSListObjectsOperator(GoogleCloudBaseOperator): """ - List all objects from the bucket with the given string prefix and delimiter in name. + List all objects from the bucket filtered by given string prefix and delimiter in name, + or match_glob. This operator returns a python list with the name of objects which can be used by XCom in the downstream task. :param bucket: The Google Cloud Storage bucket to find the objects. (templated) - :param prefix: String or list of strings, which filter objects whose name begin with + :param prefix: String or list of strings, which filter objects whose name begins with it/them. (templated) - :param delimiter: The delimiter by which you want to filter the objects. (templated) - For example, to lists the CSV files from in a directory in GCS you would use + :param delimiter: (Deprecated) The delimiter by which you want to filter the objects. (templated) + For example, to list the CSV files from in a directory in GCS you would use delimiter='.csv'. :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud. :param impersonation_chain: Optional service account to impersonate using short-term @@ -177,6 +179,8 @@ class GCSListObjectsOperator(GoogleCloudBaseOperator): If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). + :param match_glob: (Optional) filters objects based on the glob pattern given by the string + (e.g, ``'**/*/.json'``) **Example**: The following Operator would list all the Avro files from ``sales/sales-2017`` @@ -186,7 +190,7 @@ class GCSListObjectsOperator(GoogleCloudBaseOperator): task_id='GCS_Files', bucket='data', prefix='sales/sales-2017/', - delimiter='.avro', + match_glob='**/*/.avro', gcp_conn_id=google_cloud_conn_id ) """ @@ -210,14 +214,22 @@ class GCSListObjectsOperator(GoogleCloudBaseOperator): delimiter: str | None = None, gcp_conn_id: str = "google_cloud_default", impersonation_chain: str | Sequence[str] | None = None, + match_glob: str | None = None, **kwargs, ) -> None: super().__init__(**kwargs) self.bucket = bucket self.prefix = prefix + if delimiter: + warnings.warn( + "Usage of 'delimiter' is deprecated, please use 'match_glob' instead", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) self.delimiter = delimiter self.gcp_conn_id = gcp_conn_id self.impersonation_chain = impersonation_chain + self.match_glob = match_glob def execute(self, context: Context) -> list: hook = GCSHook( @@ -225,12 +237,20 @@ class GCSListObjectsOperator(GoogleCloudBaseOperator): impersonation_chain=self.impersonation_chain, ) - self.log.info( - "Getting list of the files. Bucket: %s; Delimiter: %s; Prefix(es): %s", - self.bucket, - self.delimiter, - self.prefix, - ) + if self.match_glob: + self.log.info( + "Getting list of the files. Bucket: %s; MatchGlob: %s; Prefix(es): %s", + self.bucket, + self.match_glob, + self.prefix, + ) + else: + self.log.info( + "Getting list of the files. Bucket: %s; Delimiter: %s; Prefix(es): %s", + self.bucket, + self.delimiter, + self.prefix, + ) StorageLink.persist( context=context, @@ -238,7 +258,9 @@ class GCSListObjectsOperator(GoogleCloudBaseOperator): uri=self.bucket, project_id=hook.project_id, ) - return hook.list(bucket_name=self.bucket, prefix=self.prefix, delimiter=self.delimiter) + return hook.list( + bucket_name=self.bucket, prefix=self.prefix, delimiter=self.delimiter, match_glob=self.match_glob + ) class GCSDeleteObjectsOperator(GoogleCloudBaseOperator): diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py index c8f811c6e7..2b39df1c6a 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py @@ -18,9 +18,10 @@ """This module contains a Google Cloud Storage operator.""" from __future__ import annotations +import warnings from typing import TYPE_CHECKING, Sequence -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.models import BaseOperator from airflow.providers.google.cloud.hooks.gcs import GCSHook @@ -66,8 +67,8 @@ class GCSToGCSOperator(BaseOperator): of copied to the new location. This is the equivalent of a mv command as opposed to a cp command. :param replace: Whether you want to replace existing destination files or not. - :param delimiter: This is used to restrict the result to only the 'files' in a given 'folder'. - If source_objects = ['foo/bah/'] and delimiter = '.avro', then only the 'files' in the + :param delimiter: (Deprecated) This is used to restrict the result to only the 'files' in a given + 'folder'. If source_objects = ['foo/bah/'] and delimiter = '.avro', then only the 'files' in the folder 'foo/bah/' with '.avro' delimiter will be copied to the destination object. :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud. :param last_modified_time: When specified, the objects will be copied or moved, @@ -90,6 +91,8 @@ class GCSToGCSOperator(BaseOperator): doesn't exist. It doesn't have any effect when the source objects are folders or patterns. :param exact_match: When specified, only exact match of the source object (filename) will be copied. + :param match_glob: (Optional) filters objects based on the glob pattern given by the string ( + e.g, ``'**/*/.json'``) :Example: @@ -116,7 +119,7 @@ class GCSToGCSOperator(BaseOperator): source_objects=['sales/sales-2017'], destination_bucket='data_backup', destination_object='copied_sales/2017/', - delimiter='.avro' + match_glob='**/*.avro' gcp_conn_id=google_cloud_conn_id ) @@ -190,15 +193,34 @@ class GCSToGCSOperator(BaseOperator): impersonation_chain: str | Sequence[str] | None = None, source_object_required=False, exact_match=False, + match_glob: str | None = None, **kwargs, ): super().__init__(**kwargs) self.source_bucket = source_bucket + if source_object and WILDCARD in source_object: + warnings.warn( + "Usage of wildcard (*) in 'source_object' is deprecated, utilize 'match_glob' instead", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) self.source_object = source_object + if source_objects and any([WILDCARD in obj for obj in source_objects]): + warnings.warn( + "Usage of wildcard (*) in 'source_objects' is deprecated, utilize 'match_glob' instead", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) self.source_objects = source_objects self.destination_bucket = destination_bucket self.destination_object = destination_object + if delimiter: + warnings.warn( + "Usage of 'delimiter' is deprecated, please use 'match_glob' instead", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) self.delimiter = delimiter self.move_object = move_object self.replace = replace @@ -209,6 +231,7 @@ class GCSToGCSOperator(BaseOperator): self.impersonation_chain = impersonation_chain self.source_object_required = source_object_required self.exact_match = exact_match + self.match_glob = match_glob def execute(self, context: Context): @@ -251,6 +274,7 @@ class GCSToGCSOperator(BaseOperator): for prefix in self.source_objects: # Check if prefix contains wildcard if WILDCARD in prefix: + self._copy_source_with_wildcard(hook=hook, prefix=prefix) # Now search with prefix using provided delimiter if any else: @@ -261,15 +285,19 @@ class GCSToGCSOperator(BaseOperator): # and only keep those files which are present in # Source GCS bucket and not in Destination GCS bucket delimiter = kwargs.get("delimiter") + match_glob = kwargs.get("match_glob") objects = kwargs.get("objects") if self.destination_object is None: - existing_objects = hook.list(self.destination_bucket, prefix=prefix, delimiter=delimiter) + existing_objects = hook.list( + self.destination_bucket, prefix=prefix, delimiter=delimiter, match_glob=match_glob + ) else: self.log.info("Replaced destination_object with source_object prefix.") destination_objects = hook.list( self.destination_bucket, prefix=self.destination_object, delimiter=delimiter, + match_glob=match_glob, ) existing_objects = [ dest_object.replace(self.destination_object, prefix, 1) for dest_object in destination_objects @@ -338,11 +366,15 @@ class GCSToGCSOperator(BaseOperator): gcp_conn_id=google_cloud_conn_id ) """ - objects = hook.list(self.source_bucket, prefix=prefix, delimiter=self.delimiter) + objects = hook.list( + self.source_bucket, prefix=prefix, delimiter=self.delimiter, match_glob=self.match_glob + ) if not self.replace: # If we are not replacing, ignore files already existing in source buckets - objects = self._ignore_existing_files(hook, prefix, objects=objects, delimiter=self.delimiter) + objects = self._ignore_existing_files( + hook, prefix, objects=objects, delimiter=self.delimiter, match_glob=self.match_glob + ) # If objects is empty, and we have prefix, let's check if prefix is a blob # and copy directly @@ -397,11 +429,18 @@ class GCSToGCSOperator(BaseOperator): self.log.info("Delimiter ignored because wildcard is in prefix") prefix_, delimiter = prefix.split(WILDCARD, 1) objects = hook.list(self.source_bucket, prefix=prefix_, delimiter=delimiter) + # TODO: After deprecating delimiter and wildcards in source objects, + # remove previous line and uncomment the following: + # match_glob = f"**/*{delimiter}" if delimiter else None + # objects = hook.list(self.source_bucket, prefix=prefix_, match_glob=match_glob) if not self.replace: # If we are not replacing, list all files in the Destination GCS bucket # and only keep those files which are present in # Source GCS bucket and not in Destination GCS bucket objects = self._ignore_existing_files(hook, prefix_, delimiter=delimiter, objects=objects) + # TODO: After deprecating delimiter and wildcards in source objects, + # remove previous line and uncomment the following: + # objects = self._ignore_existing_files(hook, prefix_, match_glob=match_glob, objects=objects) for source_object in objects: if self.destination_object is None: diff --git a/airflow/providers/google/cloud/transfers/gcs_to_sftp.py b/airflow/providers/google/cloud/transfers/gcs_to_sftp.py index f4942311da..150c801861 100644 --- a/airflow/providers/google/cloud/transfers/gcs_to_sftp.py +++ b/airflow/providers/google/cloud/transfers/gcs_to_sftp.py @@ -145,8 +145,11 @@ class GCSToSFTPOperator(BaseOperator): prefix, delimiter = self.source_object.split(WILDCARD, 1) prefix_dirname = os.path.dirname(prefix) - objects = gcs_hook.list(self.source_bucket, prefix=prefix, delimiter=delimiter) + # TODO: After deprecating delimiter and wildcards in source objects, + # remove the previous line and uncomment the following: + # match_glob = f"**/*{delimiter}" if delimiter else None + # objects = gcs_hook.list(self.source_bucket, prefix=prefix, match_glob=match_glob) for source_object in objects: destination_path = self._resolve_destination_path(source_object, prefix=prefix_dirname) diff --git a/airflow/providers/google/suite/transfers/gcs_to_gdrive.py b/airflow/providers/google/suite/transfers/gcs_to_gdrive.py index 7f8568688c..c1e796258f 100644 --- a/airflow/providers/google/suite/transfers/gcs_to_gdrive.py +++ b/airflow/providers/google/suite/transfers/gcs_to_gdrive.py @@ -132,6 +132,10 @@ class GCSToGoogleDriveOperator(BaseOperator): prefix, delimiter = self.source_object.split(WILDCARD, 1) objects = self.gcs_hook.list(self.source_bucket, prefix=prefix, delimiter=delimiter) + # TODO: After deprecating delimiter and wildcards in source objects, + # remove the previous line and uncomment the following: + # match_glob = f"**/*{delimiter}" if delimiter else None + # objects = self.gcs_hook.list(self.source_bucket, prefix=prefix, match_glob=match_glob) for source_object in objects: if self.destination_object is None: diff --git a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst index b3fa2ac191..ef805355c4 100644 --- a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst +++ b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst @@ -83,6 +83,10 @@ When you use this operator, you can specify whether objects should be deleted fr they are transferred to the sink. Source objects can be specified using a single wildcard, as well as based on the file modification date. +Filtering objects according to their path could be done by using the `match_glob field <https://cloud.google.com/storage/docs/json_api/v1/objects/list#list-object-glob>`__. +You should avoid using the ``delimiter`` field nor a wildcard in the path of the source object(s), as both practices are deprecated. +Additionally, filtering could be achieved based on the file's creation date (``is_older_than``) or modification date (``last_modified_time`` and ``maximum_modified_time``). + The way this operator works by default can be compared to the ``cp`` command. When the file move option is active, this operator functions like the ``mv`` command. @@ -124,6 +128,15 @@ folder in ``BUCKET_1_DST``, with file names unchanged. For source_objects with no wildcard, all files in source_objects would be listed, using provided delimiter if any. Then copy files from source_objects to destination_object and rename each source file. +As previously stated, the ``delimiter`` field, as well as utilizing a wildcard (``*``) in the source object(s), +are both deprecated. Thus, it is not recommended to use them - but to utilize ``match_glob`` instead, as follows: + +.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_gcs_to_gcs_match_glob] + :end-before: [END howto_operator_gcs_to_gcs_match_glob] + The following example would copy all the files in ``subdir/`` folder (i.e subdir/a.csv, subdir/b.csv, subdir/c.csv) from the ``BUCKET_1_SRC`` GCS bucket to the ``backup/`` folder in ``BUCKET_1_DST`` bucket. (i.e backup/a.csv, backup/b.csv, backup/c.csv) @@ -133,7 +146,7 @@ the ``BUCKET_1_SRC`` GCS bucket to the ``backup/`` folder in ``BUCKET_1_DST`` bu :start-after: [START howto_operator_gcs_to_gcs_without_wildcard] :end-before: [END howto_operator_gcs_to_gcs_without_wildcard] -The delimiter filed may be specified to select any source files starting with ``source_object`` and ending with the +The delimiter field may be specified to select any source files starting with ``source_object`` and ending with the value supplied to ``delimiter``. This example uses the ``delimiter`` value to implement the same functionality as the prior example. diff --git a/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py b/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py index dce115551b..a7a0b2e430 100644 --- a/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py +++ b/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py @@ -20,6 +20,7 @@ from __future__ import annotations from tempfile import NamedTemporaryFile from unittest import mock +import pytest from moto import mock_s3 from airflow.providers.amazon.aws.hooks.s3 import S3Hook @@ -47,9 +48,9 @@ def _create_test_bucket(): @mock_s3 class TestGCSToS3Operator: - # Test1: incremental behaviour (just some files missing) + # Test0: match_glob @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") - def test_execute_incremental(self, mock_hook): + def test_execute__match_glob(self, mock_hook): mock_hook.return_value.list.return_value = MOCK_FILES with NamedTemporaryFile() as f: gcs_provide_file = mock_hook.return_value.provide_file @@ -59,14 +60,40 @@ class TestGCSToS3Operator: task_id=TASK_ID, bucket=GCS_BUCKET, prefix=PREFIX, - delimiter=DELIMITER, dest_aws_conn_id="aws_default", dest_s3_key=S3_BUCKET, replace=False, + match_glob=f"**/*{DELIMITER}", ) hook, bucket = _create_test_bucket() bucket.put_object(Key=MOCK_FILES[0], Body=b"testing") + operator.execute(None) + mock_hook.return_value.list.assert_called_once_with( + bucket_name=GCS_BUCKET, delimiter=None, match_glob=f"**/*{DELIMITER}", prefix=PREFIX + ) + + # Test1: incremental behaviour (just some files missing) + @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook") + def test_execute_incremental(self, mock_hook): + mock_hook.return_value.list.return_value = MOCK_FILES + with NamedTemporaryFile() as f: + gcs_provide_file = mock_hook.return_value.provide_file + gcs_provide_file.return_value.__enter__.return_value.name = f.name + + with pytest.deprecated_call(): + operator = GCSToS3Operator( + task_id=TASK_ID, + bucket=GCS_BUCKET, + prefix=PREFIX, + delimiter=DELIMITER, + dest_aws_conn_id="aws_default", + dest_s3_key=S3_BUCKET, + replace=False, + ) + hook, bucket = _create_test_bucket() + bucket.put_object(Key=MOCK_FILES[0], Body=b"testing") + # we expect all except first file in MOCK_FILES to be uploaded # and all the MOCK_FILES to be present at the S3 bucket uploaded_files = operator.execute(None) @@ -81,15 +108,16 @@ class TestGCSToS3Operator: gcs_provide_file = mock_hook.return_value.provide_file gcs_provide_file.return_value.__enter__.return_value.name = f.name - operator = GCSToS3Operator( - task_id=TASK_ID, - bucket=GCS_BUCKET, - prefix=PREFIX, - delimiter=DELIMITER, - dest_aws_conn_id="aws_default", - dest_s3_key=S3_BUCKET, - replace=False, - ) + with pytest.deprecated_call(): + operator = GCSToS3Operator( + task_id=TASK_ID, + bucket=GCS_BUCKET, + prefix=PREFIX, + delimiter=DELIMITER, + dest_aws_conn_id="aws_default", + dest_s3_key=S3_BUCKET, + replace=False, + ) hook, bucket = _create_test_bucket() for mock_file in MOCK_FILES: bucket.put_object(Key=mock_file, Body=b"testing") @@ -108,15 +136,16 @@ class TestGCSToS3Operator: gcs_provide_file = mock_hook.return_value.provide_file gcs_provide_file.return_value.__enter__.return_value.name = f.name - operator = GCSToS3Operator( - task_id=TASK_ID, - bucket=GCS_BUCKET, - prefix=PREFIX, - delimiter=DELIMITER, - dest_aws_conn_id="aws_default", - dest_s3_key=S3_BUCKET, - replace=False, - ) + with pytest.deprecated_call(): + operator = GCSToS3Operator( + task_id=TASK_ID, + bucket=GCS_BUCKET, + prefix=PREFIX, + delimiter=DELIMITER, + dest_aws_conn_id="aws_default", + dest_s3_key=S3_BUCKET, + replace=False, + ) hook, _ = _create_test_bucket() # we expect all MOCK_FILES to be uploaded @@ -133,15 +162,16 @@ class TestGCSToS3Operator: gcs_provide_file = mock_hook.return_value.provide_file gcs_provide_file.return_value.__enter__.return_value.name = f.name - operator = GCSToS3Operator( - task_id=TASK_ID, - bucket=GCS_BUCKET, - prefix=PREFIX, - delimiter=DELIMITER, - dest_aws_conn_id="aws_default", - dest_s3_key=S3_BUCKET, - replace=True, - ) + with pytest.deprecated_call(): + operator = GCSToS3Operator( + task_id=TASK_ID, + bucket=GCS_BUCKET, + prefix=PREFIX, + delimiter=DELIMITER, + dest_aws_conn_id="aws_default", + dest_s3_key=S3_BUCKET, + replace=True, + ) hook, bucket = _create_test_bucket() for mock_file in MOCK_FILES: bucket.put_object(Key=mock_file, Body=b"testing") @@ -160,15 +190,16 @@ class TestGCSToS3Operator: gcs_provide_file = mock_hook.return_value.provide_file gcs_provide_file.return_value.__enter__.return_value.name = f.name - operator = GCSToS3Operator( - task_id=TASK_ID, - bucket=GCS_BUCKET, - prefix=PREFIX, - delimiter=DELIMITER, - dest_aws_conn_id="aws_default", - dest_s3_key=S3_BUCKET, - replace=True, - ) + with pytest.deprecated_call(): + operator = GCSToS3Operator( + task_id=TASK_ID, + bucket=GCS_BUCKET, + prefix=PREFIX, + delimiter=DELIMITER, + dest_aws_conn_id="aws_default", + dest_s3_key=S3_BUCKET, + replace=True, + ) hook, bucket = _create_test_bucket() for mock_file in MOCK_FILES[:2]: bucket.put_object(Key=mock_file, Body=b"testing") @@ -187,15 +218,16 @@ class TestGCSToS3Operator: s3_mock_hook.return_value = mock.Mock() s3_mock_hook.parse_s3_url.return_value = mock.Mock() - operator = GCSToS3Operator( - task_id=TASK_ID, - bucket=GCS_BUCKET, - prefix=PREFIX, - delimiter=DELIMITER, - dest_aws_conn_id="aws_default", - dest_s3_key=S3_BUCKET, - replace=True, - ) + with pytest.deprecated_call(): + operator = GCSToS3Operator( + task_id=TASK_ID, + bucket=GCS_BUCKET, + prefix=PREFIX, + delimiter=DELIMITER, + dest_aws_conn_id="aws_default", + dest_s3_key=S3_BUCKET, + replace=True, + ) operator.execute(None) s3_mock_hook.assert_called_once_with(aws_conn_id="aws_default", extra_args={}, verify=None) @@ -209,18 +241,19 @@ class TestGCSToS3Operator: s3_mock_hook.return_value = mock.Mock() s3_mock_hook.parse_s3_url.return_value = mock.Mock() - operator = GCSToS3Operator( - task_id=TASK_ID, - bucket=GCS_BUCKET, - prefix=PREFIX, - delimiter=DELIMITER, - dest_aws_conn_id="aws_default", - dest_s3_key=S3_BUCKET, - replace=True, - dest_s3_extra_args={ - "ContentLanguage": "value", - }, - ) + with pytest.deprecated_call(): + operator = GCSToS3Operator( + task_id=TASK_ID, + bucket=GCS_BUCKET, + prefix=PREFIX, + delimiter=DELIMITER, + dest_aws_conn_id="aws_default", + dest_s3_key=S3_BUCKET, + replace=True, + dest_s3_extra_args={ + "ContentLanguage": "value", + }, + ) operator.execute(None) s3_mock_hook.assert_called_once_with( aws_conn_id="aws_default", extra_args={"ContentLanguage": "value"}, verify=None @@ -235,16 +268,17 @@ class TestGCSToS3Operator: gcs_provide_file = mock_gcs_hook.return_value.provide_file gcs_provide_file.return_value.__enter__.return_value.name = f.name - operator = GCSToS3Operator( - task_id=TASK_ID, - bucket=GCS_BUCKET, - prefix=PREFIX, - delimiter=DELIMITER, - dest_aws_conn_id="aws_default", - dest_s3_key=S3_BUCKET, - replace=False, - s3_acl_policy=S3_ACL_POLICY, - ) + with pytest.deprecated_call(): + operator = GCSToS3Operator( + task_id=TASK_ID, + bucket=GCS_BUCKET, + prefix=PREFIX, + delimiter=DELIMITER, + dest_aws_conn_id="aws_default", + dest_s3_key=S3_BUCKET, + replace=False, + s3_acl_policy=S3_ACL_POLICY, + ) _create_test_bucket() operator.execute(None) @@ -259,16 +293,17 @@ class TestGCSToS3Operator: gcs_provide_file = mock_hook.return_value.provide_file gcs_provide_file.return_value.__enter__.return_value.name = f.name - operator = GCSToS3Operator( - task_id=TASK_ID, - bucket=GCS_BUCKET, - prefix=PREFIX, - delimiter=DELIMITER, - dest_aws_conn_id="aws_default", - dest_s3_key=S3_BUCKET, - replace=False, - keep_directory_structure=False, - ) + with pytest.deprecated_call(): + operator = GCSToS3Operator( + task_id=TASK_ID, + bucket=GCS_BUCKET, + prefix=PREFIX, + delimiter=DELIMITER, + dest_aws_conn_id="aws_default", + dest_s3_key=S3_BUCKET, + replace=False, + keep_directory_structure=False, + ) hook, _ = _create_test_bucket() # we expect all except first file in MOCK_FILES to be uploaded diff --git a/tests/providers/google/cloud/hooks/test_gcs.py b/tests/providers/google/cloud/hooks/test_gcs.py index 926ad36431..9e7ff971d2 100644 --- a/tests/providers/google/cloud/hooks/test_gcs.py +++ b/tests/providers/google/cloud/hooks/test_gcs.py @@ -817,14 +817,66 @@ class TestGCSHook: ), ) @mock.patch(GCS_STRING.format("GCSHook.get_conn")) - def test_list(self, mock_service, prefix, result): + def test_list__delimiter(self, mock_service, prefix, result): mock_service.return_value.bucket.return_value.list_blobs.return_value.next_page_token = None + with pytest.deprecated_call(): + self.gcs_hook.list( + bucket_name="test_bucket", + prefix=prefix, + delimiter=",", + ) + assert mock_service.return_value.bucket.return_value.list_blobs.call_args_list == result + + @mock.patch(GCS_STRING.format("GCSHook.get_conn")) + @mock.patch("airflow.providers.google.cloud.hooks.gcs.functools") + @mock.patch("google.cloud.storage.bucket._item_to_blob") + @mock.patch("google.cloud.storage.bucket._blobs_page_start") + @mock.patch("google.api_core.page_iterator.HTTPIterator") + def test_list__match_glob( + self, http_iterator, _blobs_page_start, _item_to_blob, mocked_functools, mock_service + ): + http_iterator.return_value.next_page_token = None self.gcs_hook.list( bucket_name="test_bucket", - prefix=prefix, - delimiter=",", + prefix="prefix", + match_glob="**/*.json", + ) + http_iterator.assert_has_calls( + [ + mock.call( + api_request=mocked_functools.partial.return_value, + client=mock_service.return_value, + extra_params={"prefix": "prefix", "matchGlob": "**/*.json"}, + item_to_value=_item_to_blob, + max_results=None, + page_start=_blobs_page_start, + page_token=None, + path=mock_service.return_value.bucket.return_value.path.__add__.return_value, + ) + ] + ) + + @mock.patch(GCS_STRING.format("GCSHook.get_conn")) + def test_list__error_match_glob_and_invalid_delimiter(self, _): + with pytest.raises(AirflowException): + self.gcs_hook.list( + bucket_name="test_bucket", + prefix="prefix", + delimiter=",", + match_glob="**/*.json", + ) + + @pytest.mark.parametrize("delimiter", [None, "", "/"]) + @mock.patch("google.api_core.page_iterator.HTTPIterator") + @mock.patch(GCS_STRING.format("GCSHook.get_conn")) + def test_list__error_match_glob_and_valid_delimiter(self, mock_service, http_iterator, delimiter): + http_iterator.return_value.next_page_token = None + self.gcs_hook.list( + bucket_name="test_bucket", + prefix="prefix", + delimiter="/", + match_glob="**/*.json", ) - assert mock_service.return_value.bucket.return_value.list_blobs.call_args_list == result @mock.patch(GCS_STRING.format("GCSHook.get_conn")) def test_list_by_timespans(self, mock_service): diff --git a/tests/providers/google/cloud/operators/test_gcs.py b/tests/providers/google/cloud/operators/test_gcs.py index bf9a4f5d7e..b048aa0c8e 100644 --- a/tests/providers/google/cloud/operators/test_gcs.py +++ b/tests/providers/google/cloud/operators/test_gcs.py @@ -159,14 +159,26 @@ class TestGCSDeleteObjectsOperator: class TestGoogleCloudStorageListOperator: @mock.patch("airflow.providers.google.cloud.operators.gcs.GCSHook") - def test_execute(self, mock_hook): + def test_execute__delimiter(self, mock_hook): mock_hook.return_value.list.return_value = MOCK_FILES operator = GCSListObjectsOperator( task_id=TASK_ID, bucket=TEST_BUCKET, prefix=PREFIX, delimiter=DELIMITER ) files = operator.execute(context=mock.MagicMock()) mock_hook.return_value.list.assert_called_once_with( - bucket_name=TEST_BUCKET, prefix=PREFIX, delimiter=DELIMITER + bucket_name=TEST_BUCKET, prefix=PREFIX, delimiter=DELIMITER, match_glob=None + ) + assert sorted(files) == sorted(MOCK_FILES) + + @mock.patch("airflow.providers.google.cloud.operators.gcs.GCSHook") + def test_execute__match_glob(self, mock_hook): + mock_hook.return_value.list.return_value = MOCK_FILES + operator = GCSListObjectsOperator( + task_id=TASK_ID, bucket=TEST_BUCKET, prefix=PREFIX, match_glob=f"**/*{DELIMITER}", delimiter=None + ) + files = operator.execute(context=mock.MagicMock()) + mock_hook.return_value.list.assert_called_once_with( + bucket_name=TEST_BUCKET, prefix=PREFIX, match_glob=f"**/*{DELIMITER}", delimiter=None ) assert sorted(files) == sorted(MOCK_FILES) diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py index d8fa94c0c6..1cf7be1166 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py @@ -54,6 +54,8 @@ MOD_TIME_1 = datetime(2016, 1, 1) MOD_TIME_2 = datetime(2019, 1, 1) +# TODO: After deprecating delimiter and wildcards in source objects, +# implement reverted changes from the first commit of PR #31261 class TestGoogleCloudStorageToCloudStorageOperator: """ Tests the three use-cases for the wildcard operator. These are @@ -100,7 +102,7 @@ class TestGoogleCloudStorageToCloudStorageOperator: operator.execute(None) mock_calls = [ mock.call(TEST_BUCKET, prefix="test_object", delimiter=""), - mock.call(DESTINATION_BUCKET, prefix="test_object", delimiter=""), + mock.call(DESTINATION_BUCKET, prefix="test_object", delimiter="", match_glob=None), ] mock_hook.return_value.list.assert_has_calls(mock_calls) @@ -117,8 +119,8 @@ class TestGoogleCloudStorageToCloudStorageOperator: operator.execute(None) mock_calls = [ - mock.call(TEST_BUCKET, prefix=SOURCE_OBJECT_NO_WILDCARD, delimiter=None), - mock.call(DESTINATION_BUCKET, prefix=SOURCE_OBJECT_NO_WILDCARD, delimiter=None), + mock.call(TEST_BUCKET, prefix=SOURCE_OBJECT_NO_WILDCARD, delimiter=None, match_glob=None), + mock.call(DESTINATION_BUCKET, prefix=SOURCE_OBJECT_NO_WILDCARD, delimiter=None, match_glob=None), ] mock_hook.return_value.list.assert_has_calls(mock_calls) @@ -140,7 +142,7 @@ class TestGoogleCloudStorageToCloudStorageOperator: operator.execute(None) mock_calls = [ - mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None), + mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None, match_glob=None), ] mock_hook.return_value.list.assert_has_calls(mock_calls) @@ -450,7 +452,9 @@ class TestGoogleCloudStorageToCloudStorageOperator: ) operator.execute(None) - mock_hook.return_value.list.assert_called_once_with(TEST_BUCKET, prefix="", delimiter=None) + mock_hook.return_value.list.assert_called_once_with( + TEST_BUCKET, prefix="", delimiter=None, match_glob=None + ) @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") def test_raises_exception_with_two_empty_list_inside_source_objects(self, mock_hook): @@ -469,7 +473,7 @@ class TestGoogleCloudStorageToCloudStorageOperator: ) operator.execute(None) mock_hook.return_value.list.assert_called_once_with( - TEST_BUCKET, prefix=SOURCE_OBJECTS_SINGLE_FILE[0], delimiter=None + TEST_BUCKET, prefix=SOURCE_OBJECTS_SINGLE_FILE[0], delimiter=None, match_glob=None ) @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") @@ -480,8 +484,8 @@ class TestGoogleCloudStorageToCloudStorageOperator: operator.execute(None) mock_hook.return_value.list.assert_has_calls( [ - mock.call(TEST_BUCKET, prefix="test_object/file1.txt", delimiter=None), - mock.call(TEST_BUCKET, prefix="test_object/file2.txt", delimiter=None), + mock.call(TEST_BUCKET, prefix="test_object/file1.txt", delimiter=None, match_glob=None), + mock.call(TEST_BUCKET, prefix="test_object/file2.txt", delimiter=None, match_glob=None), ], any_order=True, ) @@ -495,7 +499,9 @@ class TestGoogleCloudStorageToCloudStorageOperator: delimiter=DELIMITER, ) operator.execute(None) - mock_hook.return_value.list.assert_called_once_with(TEST_BUCKET, prefix="", delimiter=DELIMITER) + mock_hook.return_value.list.assert_called_once_with( + TEST_BUCKET, prefix="", delimiter=DELIMITER, match_glob=None + ) # COPY @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook") @@ -593,7 +599,7 @@ class TestGoogleCloudStorageToCloudStorageOperator: operator.execute(None) mock_calls = [ mock.call(TEST_BUCKET, prefix="test_object", delimiter=""), - mock.call(DESTINATION_BUCKET, prefix="foo/bar", delimiter=""), + mock.call(DESTINATION_BUCKET, prefix="foo/bar", delimiter="", match_glob=None), ] mock_hook.return_value.list.assert_has_calls(mock_calls) diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_sftp.py b/tests/providers/google/cloud/transfers/test_gcs_to_sftp.py index 9c6884766b..f2ede74a8a 100644 --- a/tests/providers/google/cloud/transfers/test_gcs_to_sftp.py +++ b/tests/providers/google/cloud/transfers/test_gcs_to_sftp.py @@ -34,6 +34,8 @@ TEST_BUCKET = "test-bucket" DESTINATION_SFTP = "destination_path" +# TODO: After deprecating delimiter and wildcards in source objects, +# implement reverted changes from the first commit of PR #31261 class TestGoogleCloudStorageToSFTPOperator: @pytest.mark.parametrize( "source_object, target_object, keep_directory_structure", diff --git a/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py b/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py index 5730a4b66e..525f20398a 100644 --- a/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py +++ b/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py @@ -95,6 +95,9 @@ class TestGcsToGDriveOperator: impersonation_chain=IMPERSONATION_CHAIN, ), mock.call().list("data", delimiter=".avro", prefix="sales/sales-2017/"), + # TODO: After deprecating delimiter and wildcards in source objects, + # remove previous line and uncomment the following: + # mock.call().list("data", match_glob="**/*.avro", prefix="sales/sales-2017/"), mock.call().download(bucket_name="data", filename="TMP1", object_name="sales/A.avro"), mock.call().download(bucket_name="data", filename="TMP2", object_name="sales/B.avro"), mock.call().download(bucket_name="data", filename="TMP3", object_name="sales/C.avro"), @@ -137,6 +140,9 @@ class TestGcsToGDriveOperator: impersonation_chain=IMPERSONATION_CHAIN, ), mock.call().list("data", delimiter=".avro", prefix="sales/sales-2017/"), + # TODO: After deprecating delimiter and wildcards in source objects, + # remove previous line and uncomment the following: + # mock.call().list("data", match_glob="**/*.avro", prefix="sales/sales-2017/"), mock.call().download(bucket_name="data", filename="TMP1", object_name="sales/A.avro"), mock.call().delete("data", "sales/A.avro"), mock.call().download(bucket_name="data", filename="TMP2", object_name="sales/B.avro"), diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py b/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py index 7931295d23..e13f76ebc4 100644 --- a/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py +++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py @@ -173,6 +173,17 @@ with models.DAG( ) # [END howto_operator_gcs_to_gcs_delimiter] + # [START howto_operator_gcs_to_gcs_match_glob] + copy_files_with_match_glob = GCSToGCSOperator( + task_id="copy_files_with_match_glob", + source_bucket=BUCKET_NAME_SRC, + source_object="data/", + destination_bucket=BUCKET_NAME_DST, + destination_object="backup/", + match_glob="**/*.txt", + ) + # [END howto_operator_gcs_to_gcs_match_glob] + # [START howto_operator_gcs_to_gcs_list] copy_files_with_list = GCSToGCSOperator( task_id="copy_files_with_list", @@ -226,6 +237,7 @@ with models.DAG( copy_files_with_wildcard, copy_files_without_wildcard, copy_files_with_delimiter, + copy_files_with_match_glob, copy_files_with_list, move_single_file, move_files_with_list,