This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d6e254db68 Deprecate `delimiter` param and source object's wildcards 
in GCS, introduce `match_glob` param. (#31261)
d6e254db68 is described below

commit d6e254db689db070f2f181006e7d6bc593482300
Author: Shahar Epstein <60007259+shah...@users.noreply.github.com>
AuthorDate: Fri Jun 30 11:52:14 2023 +0300

    Deprecate `delimiter` param and source object's wildcards in GCS, introduce 
`match_glob` param. (#31261)
    
    * Deprecate `delimiter` param and source object's wildcards in GCS, 
introduce `match_glob` param.
    
    ---------
    
    Co-authored-by: eladkal <45845474+elad...@users.noreply.github.com>
---
 .../providers/amazon/aws/transfers/gcs_to_s3.py    |  18 +-
 airflow/providers/google/cloud/hooks/gcs.py        | 126 +++++++++++--
 airflow/providers/google/cloud/operators/gcs.py    |  48 +++--
 .../providers/google/cloud/transfers/gcs_to_gcs.py |  53 +++++-
 .../google/cloud/transfers/gcs_to_sftp.py          |   5 +-
 .../google/suite/transfers/gcs_to_gdrive.py        |   4 +
 .../operators/transfer/gcs_to_gcs.rst              |  15 +-
 .../amazon/aws/transfers/test_gcs_to_s3.py         | 195 ++++++++++++---------
 tests/providers/google/cloud/hooks/test_gcs.py     |  60 ++++++-
 tests/providers/google/cloud/operators/test_gcs.py |  16 +-
 .../google/cloud/transfers/test_gcs_to_gcs.py      |  26 +--
 .../google/cloud/transfers/test_gcs_to_sftp.py     |   2 +
 .../google/suite/transfers/test_gcs_to_gdrive.py   |   6 +
 .../google/cloud/gcs/example_gcs_to_gcs.py         |  12 ++
 14 files changed, 448 insertions(+), 138 deletions(-)

diff --git a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py 
b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
index 4004eea8c2..2213de2b60 100644
--- a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
@@ -19,8 +19,10 @@
 from __future__ import annotations
 
 import os
+import warnings
 from typing import TYPE_CHECKING, Sequence
 
+from airflow.exceptions import AirflowProviderDeprecationWarning
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
@@ -40,7 +42,7 @@ class GCSToS3Operator(BaseOperator):
     :param bucket: The Google Cloud Storage bucket to find the objects. 
(templated)
     :param prefix: Prefix string which filters objects whose name begin with
         this prefix. (templated)
-    :param delimiter: The delimiter by which you want to filter the objects. 
(templated)
+    :param delimiter: (Deprecated) The delimiter by which you want to filter 
the objects. (templated)
         For e.g to lists the CSV files from in a directory in GCS you would use
         delimiter='.csv'.
     :param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud.
@@ -76,6 +78,8 @@ class GCSToS3Operator(BaseOperator):
         object to be uploaded in S3
     :param keep_directory_structure: (Optional) When set to False the path of 
the file
          on the bucket is recreated within path passed in dest_s3_key.
+    :param match_glob: (Optional) filters objects based on the glob pattern 
given by the string
+        (e.g, ``'**/*/.json'``)
     """
 
     template_fields: Sequence[str] = (
@@ -102,12 +106,19 @@ class GCSToS3Operator(BaseOperator):
         dest_s3_extra_args: dict | None = None,
         s3_acl_policy: str | None = None,
         keep_directory_structure: bool = True,
+        match_glob: str | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
 
         self.bucket = bucket
         self.prefix = prefix
+        if delimiter:
+            warnings.warn(
+                "Usage of 'delimiter' is deprecated, please use 'match_glob' 
instead",
+                AirflowProviderDeprecationWarning,
+                stacklevel=2,
+            )
         self.delimiter = delimiter
         self.gcp_conn_id = gcp_conn_id
         self.dest_aws_conn_id = dest_aws_conn_id
@@ -118,6 +129,7 @@ class GCSToS3Operator(BaseOperator):
         self.dest_s3_extra_args = dest_s3_extra_args or {}
         self.s3_acl_policy = s3_acl_policy
         self.keep_directory_structure = keep_directory_structure
+        self.match_glob = match_glob
 
     def execute(self, context: Context) -> list[str]:
         # list all files in an Google Cloud Storage bucket
@@ -133,7 +145,9 @@ class GCSToS3Operator(BaseOperator):
             self.prefix,
         )
 
-        files = hook.list(bucket_name=self.bucket, prefix=self.prefix, 
delimiter=self.delimiter)
+        files = hook.list(
+            bucket_name=self.bucket, prefix=self.prefix, 
delimiter=self.delimiter, match_glob=self.match_glob
+        )
 
         s3_hook = S3Hook(
             aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify, 
extra_args=self.dest_s3_extra_args
diff --git a/airflow/providers/google/cloud/hooks/gcs.py 
b/airflow/providers/google/cloud/hooks/gcs.py
index b17af1d9b9..42a27e80f8 100644
--- a/airflow/providers/google/cloud/hooks/gcs.py
+++ b/airflow/providers/google/cloud/hooks/gcs.py
@@ -24,6 +24,7 @@ import json
 import os
 import shutil
 import time
+import warnings
 from contextlib import contextmanager
 from datetime import datetime
 from functools import partial
@@ -44,7 +45,7 @@ from google.cloud.exceptions import GoogleCloudError
 from google.cloud.storage.retry import DEFAULT_RETRY
 from requests import Session
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.google.cloud.utils.helpers import 
normalize_directory_path
 from airflow.providers.google.common.consts import CLIENT_INFO
 from airflow.providers.google.common.hooks.base_google import 
GoogleBaseAsyncHook, GoogleBaseHook
@@ -709,6 +710,7 @@ class GCSHook(GoogleBaseHook):
         max_results: int | None = None,
         prefix: str | List[str] | None = None,
         delimiter: str | None = None,
+        match_glob: str | None = None,
     ):
         """
         List all objects from the bucket with the given a single prefix or 
multiple prefixes.
@@ -717,9 +719,19 @@ class GCSHook(GoogleBaseHook):
         :param versions: if true, list all versions of the objects
         :param max_results: max count of items to return in a single page of 
responses
         :param prefix: string or list of strings which filter objects whose 
name begin with it/them
-        :param delimiter: filters objects based on the delimiter (for e.g 
'.csv')
+        :param delimiter: (Deprecated) filters objects based on the delimiter 
(for e.g '.csv')
+        :param match_glob: (Optional) filters objects based on the glob 
pattern given by the string
+            (e.g, ``'**/*/.json'``).
         :return: a stream of object names matching the filtering criteria
         """
+        if delimiter and delimiter != "/":
+            warnings.warn(
+                "Usage of 'delimiter' param is deprecated, please use 
'match_glob' instead",
+                AirflowProviderDeprecationWarning,
+                stacklevel=2,
+            )
+        if match_glob and delimiter and delimiter != "/":
+            raise AirflowException("'match_glob' param cannot be used with 
'delimiter' that differs than '/'")
         objects = []
         if isinstance(prefix, list):
             for prefix_item in prefix:
@@ -730,6 +742,7 @@ class GCSHook(GoogleBaseHook):
                         max_results=max_results,
                         prefix=prefix_item,
                         delimiter=delimiter,
+                        match_glob=match_glob,
                     )
                 )
         else:
@@ -740,6 +753,7 @@ class GCSHook(GoogleBaseHook):
                     max_results=max_results,
                     prefix=prefix,
                     delimiter=delimiter,
+                    match_glob=match_glob,
                 )
             )
         return objects
@@ -751,6 +765,7 @@ class GCSHook(GoogleBaseHook):
         max_results: int | None = None,
         prefix: str | None = None,
         delimiter: str | None = None,
+        match_glob: str | None = None,
     ) -> List:
         """
         List all objects from the bucket with the give string prefix in name.
@@ -759,7 +774,9 @@ class GCSHook(GoogleBaseHook):
         :param versions: if true, list all versions of the objects
         :param max_results: max count of items to return in a single page of 
responses
         :param prefix: string which filters objects whose name begin with it
-        :param delimiter: filters objects based on the delimiter (for e.g 
'.csv')
+        :param delimiter: (Deprecated) filters objects based on the delimiter 
(for e.g '.csv')
+        :param match_glob: (Optional) filters objects based on the glob 
pattern given by the string
+            (e.g, ``'**/*/.json'``).
         :return: a stream of object names matching the filtering criteria
         """
         client = self.get_conn()
@@ -768,13 +785,25 @@ class GCSHook(GoogleBaseHook):
         ids = []
         page_token = None
         while True:
-            blobs = bucket.list_blobs(
-                max_results=max_results,
-                page_token=page_token,
-                prefix=prefix,
-                delimiter=delimiter,
-                versions=versions,
-            )
+            if match_glob:
+                blobs = self._list_blobs_with_match_glob(
+                    bucket=bucket,
+                    client=client,
+                    match_glob=match_glob,
+                    max_results=max_results,
+                    page_token=page_token,
+                    path=bucket.path + "/o",
+                    prefix=prefix,
+                    versions=versions,
+                )
+            else:
+                blobs = bucket.list_blobs(
+                    max_results=max_results,
+                    page_token=page_token,
+                    prefix=prefix,
+                    delimiter=delimiter,
+                    versions=versions,
+                )
 
             blob_names = []
             for blob in blobs:
@@ -792,6 +821,52 @@ class GCSHook(GoogleBaseHook):
                 break
         return ids
 
+    @staticmethod
+    def _list_blobs_with_match_glob(
+        bucket,
+        client,
+        path: str,
+        max_results: int | None = None,
+        page_token: str | None = None,
+        match_glob: str | None = None,
+        prefix: str | None = None,
+        versions: bool | None = None,
+    ) -> Any:
+        """
+        List blobs when match_glob param is given.
+        This method is a patched version of google.cloud.storage 
Client.list_blobs().
+        It is used as a temporary workaround to support "match_glob" param,
+        as it isn't officially supported by GCS Python client.
+        (follow `issue 
#1035<https://github.com/googleapis/python-storage/issues/1035>`__).
+        """
+        from google.api_core import page_iterator
+        from google.cloud.storage.bucket import _blobs_page_start, 
_item_to_blob
+
+        extra_params: Any = {}
+        if prefix is not None:
+            extra_params["prefix"] = prefix
+        if match_glob is not None:
+            extra_params["matchGlob"] = match_glob
+        if versions is not None:
+            extra_params["versions"] = versions
+        api_request = functools.partial(
+            client._connection.api_request, timeout=DEFAULT_TIMEOUT, 
retry=DEFAULT_RETRY
+        )
+
+        blobs: Any = page_iterator.HTTPIterator(
+            client=client,
+            api_request=api_request,
+            path=path,
+            item_to_value=_item_to_blob,
+            page_token=page_token,
+            max_results=max_results,
+            extra_params=extra_params,
+            page_start=_blobs_page_start,
+        )
+        blobs.prefixes = set()
+        blobs.bucket = bucket
+        return blobs
+
     def list_by_timespan(
         self,
         bucket_name: str,
@@ -801,6 +876,7 @@ class GCSHook(GoogleBaseHook):
         max_results: int | None = None,
         prefix: str | None = None,
         delimiter: str | None = None,
+        match_glob: str | None = None,
     ) -> List[str]:
         """
         List all objects from the bucket with the give string prefix in name 
that were
@@ -813,7 +889,9 @@ class GCSHook(GoogleBaseHook):
         :param max_results: max count of items to return in a single page of 
responses
         :param prefix: prefix string which filters objects whose name begin 
with
             this prefix
-        :param delimiter: filters objects based on the delimiter (for e.g 
'.csv')
+        :param delimiter: (Deprecated) filters objects based on the delimiter 
(for e.g '.csv')
+        :param match_glob: (Optional) filters objects based on the glob 
pattern given by the string
+            (e.g, ``'**/*/.json'``).
         :return: a stream of object names matching the filtering criteria
         """
         client = self.get_conn()
@@ -823,13 +901,25 @@ class GCSHook(GoogleBaseHook):
         page_token = None
 
         while True:
-            blobs = bucket.list_blobs(
-                max_results=max_results,
-                page_token=page_token,
-                prefix=prefix,
-                delimiter=delimiter,
-                versions=versions,
-            )
+            if match_glob:
+                blobs = self._list_blobs_with_match_glob(
+                    bucket=bucket,
+                    client=client,
+                    match_glob=match_glob,
+                    max_results=max_results,
+                    page_token=page_token,
+                    path=bucket.path + "/o",
+                    prefix=prefix,
+                    versions=versions,
+                )
+            else:
+                blobs = bucket.list_blobs(
+                    max_results=max_results,
+                    page_token=page_token,
+                    prefix=prefix,
+                    delimiter=delimiter,
+                    versions=versions,
+                )
 
             blob_names = []
             for blob in blobs:
diff --git a/airflow/providers/google/cloud/operators/gcs.py 
b/airflow/providers/google/cloud/operators/gcs.py
index e2ac68c90d..6e50cbad2d 100644
--- a/airflow/providers/google/cloud/operators/gcs.py
+++ b/airflow/providers/google/cloud/operators/gcs.py
@@ -21,6 +21,7 @@ from __future__ import annotations
 import datetime
 import subprocess
 import sys
+import warnings
 from pathlib import Path
 from tempfile import NamedTemporaryFile, TemporaryDirectory
 from typing import TYPE_CHECKING, Sequence
@@ -33,7 +34,7 @@ if TYPE_CHECKING:
 from google.api_core.exceptions import Conflict
 from google.cloud.exceptions import GoogleCloudError
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
 from airflow.providers.google.cloud.operators.cloud_base import 
GoogleCloudBaseOperator
 from airflow.providers.google.common.links.storage import FileDetailsLink, 
StorageLink
@@ -157,16 +158,17 @@ class GCSCreateBucketOperator(GoogleCloudBaseOperator):
 
 class GCSListObjectsOperator(GoogleCloudBaseOperator):
     """
-    List all objects from the bucket with the given string prefix and 
delimiter in name.
+    List all objects from the bucket filtered by given string prefix and 
delimiter in name,
+    or match_glob.
 
     This operator returns a python list with the name of objects which can be 
used by
     XCom in the downstream task.
 
     :param bucket: The Google Cloud Storage bucket to find the objects. 
(templated)
-    :param prefix: String or list of strings, which filter objects whose name 
begin with
+    :param prefix: String or list of strings, which filter objects whose name 
begins with
            it/them. (templated)
-    :param delimiter: The delimiter by which you want to filter the objects. 
(templated)
-        For example, to lists the CSV files from in a directory in GCS you 
would use
+    :param delimiter: (Deprecated) The delimiter by which you want to filter 
the objects. (templated)
+        For example, to list the CSV files from in a directory in GCS you 
would use
         delimiter='.csv'.
     :param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud.
     :param impersonation_chain: Optional service account to impersonate using 
short-term
@@ -177,6 +179,8 @@ class GCSListObjectsOperator(GoogleCloudBaseOperator):
         If set as a sequence, the identities from the list must grant
         Service Account Token Creator IAM role to the directly preceding 
identity, with first
         account from the list granting this role to the originating account 
(templated).
+    :param match_glob: (Optional) filters objects based on the glob pattern 
given by the string
+        (e.g, ``'**/*/.json'``)
 
     **Example**:
         The following Operator would list all the Avro files from 
``sales/sales-2017``
@@ -186,7 +190,7 @@ class GCSListObjectsOperator(GoogleCloudBaseOperator):
                 task_id='GCS_Files',
                 bucket='data',
                 prefix='sales/sales-2017/',
-                delimiter='.avro',
+                match_glob='**/*/.avro',
                 gcp_conn_id=google_cloud_conn_id
             )
     """
@@ -210,14 +214,22 @@ class GCSListObjectsOperator(GoogleCloudBaseOperator):
         delimiter: str | None = None,
         gcp_conn_id: str = "google_cloud_default",
         impersonation_chain: str | Sequence[str] | None = None,
+        match_glob: str | None = None,
         **kwargs,
     ) -> None:
         super().__init__(**kwargs)
         self.bucket = bucket
         self.prefix = prefix
+        if delimiter:
+            warnings.warn(
+                "Usage of 'delimiter' is deprecated, please use 'match_glob' 
instead",
+                AirflowProviderDeprecationWarning,
+                stacklevel=2,
+            )
         self.delimiter = delimiter
         self.gcp_conn_id = gcp_conn_id
         self.impersonation_chain = impersonation_chain
+        self.match_glob = match_glob
 
     def execute(self, context: Context) -> list:
         hook = GCSHook(
@@ -225,12 +237,20 @@ class GCSListObjectsOperator(GoogleCloudBaseOperator):
             impersonation_chain=self.impersonation_chain,
         )
 
-        self.log.info(
-            "Getting list of the files. Bucket: %s; Delimiter: %s; Prefix(es): 
%s",
-            self.bucket,
-            self.delimiter,
-            self.prefix,
-        )
+        if self.match_glob:
+            self.log.info(
+                "Getting list of the files. Bucket: %s; MatchGlob: %s; 
Prefix(es): %s",
+                self.bucket,
+                self.match_glob,
+                self.prefix,
+            )
+        else:
+            self.log.info(
+                "Getting list of the files. Bucket: %s; Delimiter: %s; 
Prefix(es): %s",
+                self.bucket,
+                self.delimiter,
+                self.prefix,
+            )
 
         StorageLink.persist(
             context=context,
@@ -238,7 +258,9 @@ class GCSListObjectsOperator(GoogleCloudBaseOperator):
             uri=self.bucket,
             project_id=hook.project_id,
         )
-        return hook.list(bucket_name=self.bucket, prefix=self.prefix, 
delimiter=self.delimiter)
+        return hook.list(
+            bucket_name=self.bucket, prefix=self.prefix, 
delimiter=self.delimiter, match_glob=self.match_glob
+        )
 
 
 class GCSDeleteObjectsOperator(GoogleCloudBaseOperator):
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py 
b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
index c8f811c6e7..2b39df1c6a 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_gcs.py
@@ -18,9 +18,10 @@
 """This module contains a Google Cloud Storage operator."""
 from __future__ import annotations
 
+import warnings
 from typing import TYPE_CHECKING, Sequence
 
-from airflow.exceptions import AirflowException
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.models import BaseOperator
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
 
@@ -66,8 +67,8 @@ class GCSToGCSOperator(BaseOperator):
         of copied to the new location. This is the equivalent of a mv command
         as opposed to a cp command.
     :param replace: Whether you want to replace existing destination files or 
not.
-    :param delimiter: This is used to restrict the result to only the 'files' 
in a given 'folder'.
-        If source_objects = ['foo/bah/'] and delimiter = '.avro', then only 
the 'files' in the
+    :param delimiter: (Deprecated) This is used to restrict the result to only 
the 'files' in a given
+        'folder'. If source_objects = ['foo/bah/'] and delimiter = '.avro', 
then only the 'files' in the
         folder 'foo/bah/' with '.avro' delimiter will be copied to the 
destination object.
     :param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud.
     :param last_modified_time: When specified, the objects will be copied or 
moved,
@@ -90,6 +91,8 @@ class GCSToGCSOperator(BaseOperator):
         doesn't exist. It doesn't have any effect when the source objects are 
folders or patterns.
     :param exact_match: When specified, only exact match of the source object 
(filename) will be
         copied.
+    :param match_glob: (Optional) filters objects based on the glob pattern 
given by the string (
+        e.g, ``'**/*/.json'``)
 
     :Example:
 
@@ -116,7 +119,7 @@ class GCSToGCSOperator(BaseOperator):
             source_objects=['sales/sales-2017'],
             destination_bucket='data_backup',
             destination_object='copied_sales/2017/',
-            delimiter='.avro'
+            match_glob='**/*.avro'
             gcp_conn_id=google_cloud_conn_id
         )
 
@@ -190,15 +193,34 @@ class GCSToGCSOperator(BaseOperator):
         impersonation_chain: str | Sequence[str] | None = None,
         source_object_required=False,
         exact_match=False,
+        match_glob: str | None = None,
         **kwargs,
     ):
         super().__init__(**kwargs)
 
         self.source_bucket = source_bucket
+        if source_object and WILDCARD in source_object:
+            warnings.warn(
+                "Usage of wildcard (*) in 'source_object' is deprecated, 
utilize 'match_glob' instead",
+                AirflowProviderDeprecationWarning,
+                stacklevel=2,
+            )
         self.source_object = source_object
+        if source_objects and any([WILDCARD in obj for obj in source_objects]):
+            warnings.warn(
+                "Usage of wildcard (*) in 'source_objects' is deprecated, 
utilize 'match_glob' instead",
+                AirflowProviderDeprecationWarning,
+                stacklevel=2,
+            )
         self.source_objects = source_objects
         self.destination_bucket = destination_bucket
         self.destination_object = destination_object
+        if delimiter:
+            warnings.warn(
+                "Usage of 'delimiter' is deprecated, please use 'match_glob' 
instead",
+                AirflowProviderDeprecationWarning,
+                stacklevel=2,
+            )
         self.delimiter = delimiter
         self.move_object = move_object
         self.replace = replace
@@ -209,6 +231,7 @@ class GCSToGCSOperator(BaseOperator):
         self.impersonation_chain = impersonation_chain
         self.source_object_required = source_object_required
         self.exact_match = exact_match
+        self.match_glob = match_glob
 
     def execute(self, context: Context):
 
@@ -251,6 +274,7 @@ class GCSToGCSOperator(BaseOperator):
         for prefix in self.source_objects:
             # Check if prefix contains wildcard
             if WILDCARD in prefix:
+
                 self._copy_source_with_wildcard(hook=hook, prefix=prefix)
             # Now search with prefix using provided delimiter if any
             else:
@@ -261,15 +285,19 @@ class GCSToGCSOperator(BaseOperator):
         # and only keep those files which are present in
         # Source GCS bucket and not in Destination GCS bucket
         delimiter = kwargs.get("delimiter")
+        match_glob = kwargs.get("match_glob")
         objects = kwargs.get("objects")
         if self.destination_object is None:
-            existing_objects = hook.list(self.destination_bucket, 
prefix=prefix, delimiter=delimiter)
+            existing_objects = hook.list(
+                self.destination_bucket, prefix=prefix, delimiter=delimiter, 
match_glob=match_glob
+            )
         else:
             self.log.info("Replaced destination_object with source_object 
prefix.")
             destination_objects = hook.list(
                 self.destination_bucket,
                 prefix=self.destination_object,
                 delimiter=delimiter,
+                match_glob=match_glob,
             )
             existing_objects = [
                 dest_object.replace(self.destination_object, prefix, 1) for 
dest_object in destination_objects
@@ -338,11 +366,15 @@ class GCSToGCSOperator(BaseOperator):
                 gcp_conn_id=google_cloud_conn_id
             )
         """
-        objects = hook.list(self.source_bucket, prefix=prefix, 
delimiter=self.delimiter)
+        objects = hook.list(
+            self.source_bucket, prefix=prefix, delimiter=self.delimiter, 
match_glob=self.match_glob
+        )
 
         if not self.replace:
             # If we are not replacing, ignore files already existing in source 
buckets
-            objects = self._ignore_existing_files(hook, prefix, 
objects=objects, delimiter=self.delimiter)
+            objects = self._ignore_existing_files(
+                hook, prefix, objects=objects, delimiter=self.delimiter, 
match_glob=self.match_glob
+            )
 
         # If objects is empty, and we have prefix, let's check if prefix is a 
blob
         # and copy directly
@@ -397,11 +429,18 @@ class GCSToGCSOperator(BaseOperator):
         self.log.info("Delimiter ignored because wildcard is in prefix")
         prefix_, delimiter = prefix.split(WILDCARD, 1)
         objects = hook.list(self.source_bucket, prefix=prefix_, 
delimiter=delimiter)
+        # TODO: After deprecating delimiter and wildcards in source objects,
+        #       remove previous line and uncomment the following:
+        # match_glob = f"**/*{delimiter}" if delimiter else None
+        # objects = hook.list(self.source_bucket, prefix=prefix_, 
match_glob=match_glob)
         if not self.replace:
             # If we are not replacing, list all files in the Destination GCS 
bucket
             # and only keep those files which are present in
             # Source GCS bucket and not in Destination GCS bucket
             objects = self._ignore_existing_files(hook, prefix_, 
delimiter=delimiter, objects=objects)
+            # TODO: After deprecating delimiter and wildcards in source 
objects,
+            #       remove previous line and uncomment the following:
+            # objects = self._ignore_existing_files(hook, prefix_, 
match_glob=match_glob, objects=objects)
 
         for source_object in objects:
             if self.destination_object is None:
diff --git a/airflow/providers/google/cloud/transfers/gcs_to_sftp.py 
b/airflow/providers/google/cloud/transfers/gcs_to_sftp.py
index f4942311da..150c801861 100644
--- a/airflow/providers/google/cloud/transfers/gcs_to_sftp.py
+++ b/airflow/providers/google/cloud/transfers/gcs_to_sftp.py
@@ -145,8 +145,11 @@ class GCSToSFTPOperator(BaseOperator):
 
             prefix, delimiter = self.source_object.split(WILDCARD, 1)
             prefix_dirname = os.path.dirname(prefix)
-
             objects = gcs_hook.list(self.source_bucket, prefix=prefix, 
delimiter=delimiter)
+            # TODO: After deprecating delimiter and wildcards in source 
objects,
+            #       remove the previous line and uncomment the following:
+            # match_glob = f"**/*{delimiter}" if delimiter else None
+            # objects = gcs_hook.list(self.source_bucket, prefix=prefix, 
match_glob=match_glob)
 
             for source_object in objects:
                 destination_path = 
self._resolve_destination_path(source_object, prefix=prefix_dirname)
diff --git a/airflow/providers/google/suite/transfers/gcs_to_gdrive.py 
b/airflow/providers/google/suite/transfers/gcs_to_gdrive.py
index 7f8568688c..c1e796258f 100644
--- a/airflow/providers/google/suite/transfers/gcs_to_gdrive.py
+++ b/airflow/providers/google/suite/transfers/gcs_to_gdrive.py
@@ -132,6 +132,10 @@ class GCSToGoogleDriveOperator(BaseOperator):
 
             prefix, delimiter = self.source_object.split(WILDCARD, 1)
             objects = self.gcs_hook.list(self.source_bucket, prefix=prefix, 
delimiter=delimiter)
+            # TODO: After deprecating delimiter and wildcards in source 
objects,
+            #       remove the previous line and uncomment the following:
+            # match_glob = f"**/*{delimiter}" if delimiter else None
+            # objects = self.gcs_hook.list(self.source_bucket, prefix=prefix, 
match_glob=match_glob)
 
             for source_object in objects:
                 if self.destination_object is None:
diff --git 
a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst 
b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst
index b3fa2ac191..ef805355c4 100644
--- a/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst
+++ b/docs/apache-airflow-providers-google/operators/transfer/gcs_to_gcs.rst
@@ -83,6 +83,10 @@ When you use this operator, you can specify whether objects 
should be deleted fr
 they are transferred to the sink. Source objects can be specified using a 
single wildcard, as
 well as based on the file modification date.
 
+Filtering objects according to their path could be done by using the 
`match_glob field 
<https://cloud.google.com/storage/docs/json_api/v1/objects/list#list-object-glob>`__.
+You should avoid using the ``delimiter`` field nor a wildcard in the path of 
the source object(s), as both practices are deprecated.
+Additionally, filtering could be achieved based on the file's creation date 
(``is_older_than``) or modification date (``last_modified_time`` and 
``maximum_modified_time``).
+
 The way this operator works by default can be compared to the ``cp`` command. 
When the file move option is active, this
 operator functions like the ``mv`` command.
 
@@ -124,6 +128,15 @@ folder in ``BUCKET_1_DST``, with file names unchanged.
 For source_objects with no wildcard, all files in source_objects would be 
listed, using provided delimiter if any.
 Then copy files from source_objects to destination_object and rename each 
source file.
 
+As previously stated, the ``delimiter`` field, as well as utilizing a wildcard 
(``*``) in the source object(s),
+are both deprecated. Thus, it is not recommended to use them - but to utilize 
``match_glob`` instead, as follows:
+
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_gcs_to_gcs_match_glob]
+    :end-before: [END howto_operator_gcs_to_gcs_match_glob]
+
 The following example would copy all the files in ``subdir/`` folder (i.e 
subdir/a.csv, subdir/b.csv, subdir/c.csv) from
 the ``BUCKET_1_SRC`` GCS bucket to the ``backup/`` folder in ``BUCKET_1_DST`` 
bucket. (i.e backup/a.csv, backup/b.csv, backup/c.csv)
 
@@ -133,7 +146,7 @@ the ``BUCKET_1_SRC`` GCS bucket to the ``backup/`` folder 
in ``BUCKET_1_DST`` bu
     :start-after: [START howto_operator_gcs_to_gcs_without_wildcard]
     :end-before: [END howto_operator_gcs_to_gcs_without_wildcard]
 
-The delimiter filed may be specified to select any source files starting with 
``source_object`` and ending with the
+The delimiter field may be specified to select any source files starting with 
``source_object`` and ending with the
 value supplied to ``delimiter``. This example uses the ``delimiter`` value to 
implement the same functionality as the
 prior example.
 
diff --git a/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py 
b/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py
index dce115551b..a7a0b2e430 100644
--- a/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py
+++ b/tests/providers/amazon/aws/transfers/test_gcs_to_s3.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 from tempfile import NamedTemporaryFile
 from unittest import mock
 
+import pytest
 from moto import mock_s3
 
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
@@ -47,9 +48,9 @@ def _create_test_bucket():
 @mock_s3
 class TestGCSToS3Operator:
 
-    # Test1: incremental behaviour (just some files missing)
+    # Test0: match_glob
     @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
-    def test_execute_incremental(self, mock_hook):
+    def test_execute__match_glob(self, mock_hook):
         mock_hook.return_value.list.return_value = MOCK_FILES
         with NamedTemporaryFile() as f:
             gcs_provide_file = mock_hook.return_value.provide_file
@@ -59,14 +60,40 @@ class TestGCSToS3Operator:
                 task_id=TASK_ID,
                 bucket=GCS_BUCKET,
                 prefix=PREFIX,
-                delimiter=DELIMITER,
                 dest_aws_conn_id="aws_default",
                 dest_s3_key=S3_BUCKET,
                 replace=False,
+                match_glob=f"**/*{DELIMITER}",
             )
             hook, bucket = _create_test_bucket()
             bucket.put_object(Key=MOCK_FILES[0], Body=b"testing")
 
+            operator.execute(None)
+            mock_hook.return_value.list.assert_called_once_with(
+                bucket_name=GCS_BUCKET, delimiter=None, 
match_glob=f"**/*{DELIMITER}", prefix=PREFIX
+            )
+
+    # Test1: incremental behaviour (just some files missing)
+    @mock.patch("airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSHook")
+    def test_execute_incremental(self, mock_hook):
+        mock_hook.return_value.list.return_value = MOCK_FILES
+        with NamedTemporaryFile() as f:
+            gcs_provide_file = mock_hook.return_value.provide_file
+            gcs_provide_file.return_value.__enter__.return_value.name = f.name
+
+            with pytest.deprecated_call():
+                operator = GCSToS3Operator(
+                    task_id=TASK_ID,
+                    bucket=GCS_BUCKET,
+                    prefix=PREFIX,
+                    delimiter=DELIMITER,
+                    dest_aws_conn_id="aws_default",
+                    dest_s3_key=S3_BUCKET,
+                    replace=False,
+                )
+            hook, bucket = _create_test_bucket()
+            bucket.put_object(Key=MOCK_FILES[0], Body=b"testing")
+
             # we expect all except first file in MOCK_FILES to be uploaded
             # and all the MOCK_FILES to be present at the S3 bucket
             uploaded_files = operator.execute(None)
@@ -81,15 +108,16 @@ class TestGCSToS3Operator:
             gcs_provide_file = mock_hook.return_value.provide_file
             gcs_provide_file.return_value.__enter__.return_value.name = f.name
 
-            operator = GCSToS3Operator(
-                task_id=TASK_ID,
-                bucket=GCS_BUCKET,
-                prefix=PREFIX,
-                delimiter=DELIMITER,
-                dest_aws_conn_id="aws_default",
-                dest_s3_key=S3_BUCKET,
-                replace=False,
-            )
+            with pytest.deprecated_call():
+                operator = GCSToS3Operator(
+                    task_id=TASK_ID,
+                    bucket=GCS_BUCKET,
+                    prefix=PREFIX,
+                    delimiter=DELIMITER,
+                    dest_aws_conn_id="aws_default",
+                    dest_s3_key=S3_BUCKET,
+                    replace=False,
+                )
             hook, bucket = _create_test_bucket()
             for mock_file in MOCK_FILES:
                 bucket.put_object(Key=mock_file, Body=b"testing")
@@ -108,15 +136,16 @@ class TestGCSToS3Operator:
             gcs_provide_file = mock_hook.return_value.provide_file
             gcs_provide_file.return_value.__enter__.return_value.name = f.name
 
-            operator = GCSToS3Operator(
-                task_id=TASK_ID,
-                bucket=GCS_BUCKET,
-                prefix=PREFIX,
-                delimiter=DELIMITER,
-                dest_aws_conn_id="aws_default",
-                dest_s3_key=S3_BUCKET,
-                replace=False,
-            )
+            with pytest.deprecated_call():
+                operator = GCSToS3Operator(
+                    task_id=TASK_ID,
+                    bucket=GCS_BUCKET,
+                    prefix=PREFIX,
+                    delimiter=DELIMITER,
+                    dest_aws_conn_id="aws_default",
+                    dest_s3_key=S3_BUCKET,
+                    replace=False,
+                )
             hook, _ = _create_test_bucket()
 
             # we expect all MOCK_FILES to be uploaded
@@ -133,15 +162,16 @@ class TestGCSToS3Operator:
             gcs_provide_file = mock_hook.return_value.provide_file
             gcs_provide_file.return_value.__enter__.return_value.name = f.name
 
-            operator = GCSToS3Operator(
-                task_id=TASK_ID,
-                bucket=GCS_BUCKET,
-                prefix=PREFIX,
-                delimiter=DELIMITER,
-                dest_aws_conn_id="aws_default",
-                dest_s3_key=S3_BUCKET,
-                replace=True,
-            )
+            with pytest.deprecated_call():
+                operator = GCSToS3Operator(
+                    task_id=TASK_ID,
+                    bucket=GCS_BUCKET,
+                    prefix=PREFIX,
+                    delimiter=DELIMITER,
+                    dest_aws_conn_id="aws_default",
+                    dest_s3_key=S3_BUCKET,
+                    replace=True,
+                )
             hook, bucket = _create_test_bucket()
             for mock_file in MOCK_FILES:
                 bucket.put_object(Key=mock_file, Body=b"testing")
@@ -160,15 +190,16 @@ class TestGCSToS3Operator:
             gcs_provide_file = mock_hook.return_value.provide_file
             gcs_provide_file.return_value.__enter__.return_value.name = f.name
 
-            operator = GCSToS3Operator(
-                task_id=TASK_ID,
-                bucket=GCS_BUCKET,
-                prefix=PREFIX,
-                delimiter=DELIMITER,
-                dest_aws_conn_id="aws_default",
-                dest_s3_key=S3_BUCKET,
-                replace=True,
-            )
+            with pytest.deprecated_call():
+                operator = GCSToS3Operator(
+                    task_id=TASK_ID,
+                    bucket=GCS_BUCKET,
+                    prefix=PREFIX,
+                    delimiter=DELIMITER,
+                    dest_aws_conn_id="aws_default",
+                    dest_s3_key=S3_BUCKET,
+                    replace=True,
+                )
             hook, bucket = _create_test_bucket()
             for mock_file in MOCK_FILES[:2]:
                 bucket.put_object(Key=mock_file, Body=b"testing")
@@ -187,15 +218,16 @@ class TestGCSToS3Operator:
         s3_mock_hook.return_value = mock.Mock()
         s3_mock_hook.parse_s3_url.return_value = mock.Mock()
 
-        operator = GCSToS3Operator(
-            task_id=TASK_ID,
-            bucket=GCS_BUCKET,
-            prefix=PREFIX,
-            delimiter=DELIMITER,
-            dest_aws_conn_id="aws_default",
-            dest_s3_key=S3_BUCKET,
-            replace=True,
-        )
+        with pytest.deprecated_call():
+            operator = GCSToS3Operator(
+                task_id=TASK_ID,
+                bucket=GCS_BUCKET,
+                prefix=PREFIX,
+                delimiter=DELIMITER,
+                dest_aws_conn_id="aws_default",
+                dest_s3_key=S3_BUCKET,
+                replace=True,
+            )
         operator.execute(None)
         s3_mock_hook.assert_called_once_with(aws_conn_id="aws_default", 
extra_args={}, verify=None)
 
@@ -209,18 +241,19 @@ class TestGCSToS3Operator:
             s3_mock_hook.return_value = mock.Mock()
             s3_mock_hook.parse_s3_url.return_value = mock.Mock()
 
-            operator = GCSToS3Operator(
-                task_id=TASK_ID,
-                bucket=GCS_BUCKET,
-                prefix=PREFIX,
-                delimiter=DELIMITER,
-                dest_aws_conn_id="aws_default",
-                dest_s3_key=S3_BUCKET,
-                replace=True,
-                dest_s3_extra_args={
-                    "ContentLanguage": "value",
-                },
-            )
+            with pytest.deprecated_call():
+                operator = GCSToS3Operator(
+                    task_id=TASK_ID,
+                    bucket=GCS_BUCKET,
+                    prefix=PREFIX,
+                    delimiter=DELIMITER,
+                    dest_aws_conn_id="aws_default",
+                    dest_s3_key=S3_BUCKET,
+                    replace=True,
+                    dest_s3_extra_args={
+                        "ContentLanguage": "value",
+                    },
+                )
             operator.execute(None)
             s3_mock_hook.assert_called_once_with(
                 aws_conn_id="aws_default", extra_args={"ContentLanguage": 
"value"}, verify=None
@@ -235,16 +268,17 @@ class TestGCSToS3Operator:
             gcs_provide_file = mock_gcs_hook.return_value.provide_file
             gcs_provide_file.return_value.__enter__.return_value.name = f.name
 
-            operator = GCSToS3Operator(
-                task_id=TASK_ID,
-                bucket=GCS_BUCKET,
-                prefix=PREFIX,
-                delimiter=DELIMITER,
-                dest_aws_conn_id="aws_default",
-                dest_s3_key=S3_BUCKET,
-                replace=False,
-                s3_acl_policy=S3_ACL_POLICY,
-            )
+            with pytest.deprecated_call():
+                operator = GCSToS3Operator(
+                    task_id=TASK_ID,
+                    bucket=GCS_BUCKET,
+                    prefix=PREFIX,
+                    delimiter=DELIMITER,
+                    dest_aws_conn_id="aws_default",
+                    dest_s3_key=S3_BUCKET,
+                    replace=False,
+                    s3_acl_policy=S3_ACL_POLICY,
+                )
             _create_test_bucket()
             operator.execute(None)
 
@@ -259,16 +293,17 @@ class TestGCSToS3Operator:
             gcs_provide_file = mock_hook.return_value.provide_file
             gcs_provide_file.return_value.__enter__.return_value.name = f.name
 
-            operator = GCSToS3Operator(
-                task_id=TASK_ID,
-                bucket=GCS_BUCKET,
-                prefix=PREFIX,
-                delimiter=DELIMITER,
-                dest_aws_conn_id="aws_default",
-                dest_s3_key=S3_BUCKET,
-                replace=False,
-                keep_directory_structure=False,
-            )
+            with pytest.deprecated_call():
+                operator = GCSToS3Operator(
+                    task_id=TASK_ID,
+                    bucket=GCS_BUCKET,
+                    prefix=PREFIX,
+                    delimiter=DELIMITER,
+                    dest_aws_conn_id="aws_default",
+                    dest_s3_key=S3_BUCKET,
+                    replace=False,
+                    keep_directory_structure=False,
+                )
             hook, _ = _create_test_bucket()
 
             # we expect all except first file in MOCK_FILES to be uploaded
diff --git a/tests/providers/google/cloud/hooks/test_gcs.py 
b/tests/providers/google/cloud/hooks/test_gcs.py
index 926ad36431..9e7ff971d2 100644
--- a/tests/providers/google/cloud/hooks/test_gcs.py
+++ b/tests/providers/google/cloud/hooks/test_gcs.py
@@ -817,14 +817,66 @@ class TestGCSHook:
         ),
     )
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
-    def test_list(self, mock_service, prefix, result):
+    def test_list__delimiter(self, mock_service, prefix, result):
         
mock_service.return_value.bucket.return_value.list_blobs.return_value.next_page_token
 = None
+        with pytest.deprecated_call():
+            self.gcs_hook.list(
+                bucket_name="test_bucket",
+                prefix=prefix,
+                delimiter=",",
+            )
+        assert 
mock_service.return_value.bucket.return_value.list_blobs.call_args_list == 
result
+
+    @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
+    @mock.patch("airflow.providers.google.cloud.hooks.gcs.functools")
+    @mock.patch("google.cloud.storage.bucket._item_to_blob")
+    @mock.patch("google.cloud.storage.bucket._blobs_page_start")
+    @mock.patch("google.api_core.page_iterator.HTTPIterator")
+    def test_list__match_glob(
+        self, http_iterator, _blobs_page_start, _item_to_blob, 
mocked_functools, mock_service
+    ):
+        http_iterator.return_value.next_page_token = None
         self.gcs_hook.list(
             bucket_name="test_bucket",
-            prefix=prefix,
-            delimiter=",",
+            prefix="prefix",
+            match_glob="**/*.json",
+        )
+        http_iterator.assert_has_calls(
+            [
+                mock.call(
+                    api_request=mocked_functools.partial.return_value,
+                    client=mock_service.return_value,
+                    extra_params={"prefix": "prefix", "matchGlob": 
"**/*.json"},
+                    item_to_value=_item_to_blob,
+                    max_results=None,
+                    page_start=_blobs_page_start,
+                    page_token=None,
+                    
path=mock_service.return_value.bucket.return_value.path.__add__.return_value,
+                )
+            ]
+        )
+
+    @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
+    def test_list__error_match_glob_and_invalid_delimiter(self, _):
+        with pytest.raises(AirflowException):
+            self.gcs_hook.list(
+                bucket_name="test_bucket",
+                prefix="prefix",
+                delimiter=",",
+                match_glob="**/*.json",
+            )
+
+    @pytest.mark.parametrize("delimiter", [None, "", "/"])
+    @mock.patch("google.api_core.page_iterator.HTTPIterator")
+    @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
+    def test_list__error_match_glob_and_valid_delimiter(self, mock_service, 
http_iterator, delimiter):
+        http_iterator.return_value.next_page_token = None
+        self.gcs_hook.list(
+            bucket_name="test_bucket",
+            prefix="prefix",
+            delimiter="/",
+            match_glob="**/*.json",
         )
-        assert 
mock_service.return_value.bucket.return_value.list_blobs.call_args_list == 
result
 
     @mock.patch(GCS_STRING.format("GCSHook.get_conn"))
     def test_list_by_timespans(self, mock_service):
diff --git a/tests/providers/google/cloud/operators/test_gcs.py 
b/tests/providers/google/cloud/operators/test_gcs.py
index bf9a4f5d7e..b048aa0c8e 100644
--- a/tests/providers/google/cloud/operators/test_gcs.py
+++ b/tests/providers/google/cloud/operators/test_gcs.py
@@ -159,14 +159,26 @@ class TestGCSDeleteObjectsOperator:
 
 class TestGoogleCloudStorageListOperator:
     @mock.patch("airflow.providers.google.cloud.operators.gcs.GCSHook")
-    def test_execute(self, mock_hook):
+    def test_execute__delimiter(self, mock_hook):
         mock_hook.return_value.list.return_value = MOCK_FILES
         operator = GCSListObjectsOperator(
             task_id=TASK_ID, bucket=TEST_BUCKET, prefix=PREFIX, 
delimiter=DELIMITER
         )
         files = operator.execute(context=mock.MagicMock())
         mock_hook.return_value.list.assert_called_once_with(
-            bucket_name=TEST_BUCKET, prefix=PREFIX, delimiter=DELIMITER
+            bucket_name=TEST_BUCKET, prefix=PREFIX, delimiter=DELIMITER, 
match_glob=None
+        )
+        assert sorted(files) == sorted(MOCK_FILES)
+
+    @mock.patch("airflow.providers.google.cloud.operators.gcs.GCSHook")
+    def test_execute__match_glob(self, mock_hook):
+        mock_hook.return_value.list.return_value = MOCK_FILES
+        operator = GCSListObjectsOperator(
+            task_id=TASK_ID, bucket=TEST_BUCKET, prefix=PREFIX, 
match_glob=f"**/*{DELIMITER}", delimiter=None
+        )
+        files = operator.execute(context=mock.MagicMock())
+        mock_hook.return_value.list.assert_called_once_with(
+            bucket_name=TEST_BUCKET, prefix=PREFIX, 
match_glob=f"**/*{DELIMITER}", delimiter=None
         )
         assert sorted(files) == sorted(MOCK_FILES)
 
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py 
b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
index d8fa94c0c6..1cf7be1166 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_gcs.py
@@ -54,6 +54,8 @@ MOD_TIME_1 = datetime(2016, 1, 1)
 MOD_TIME_2 = datetime(2019, 1, 1)
 
 
+# TODO: After deprecating delimiter and wildcards in source objects,
+#       implement reverted changes from the first commit of PR #31261
 class TestGoogleCloudStorageToCloudStorageOperator:
     """
     Tests the three use-cases for the wildcard operator. These are
@@ -100,7 +102,7 @@ class TestGoogleCloudStorageToCloudStorageOperator:
         operator.execute(None)
         mock_calls = [
             mock.call(TEST_BUCKET, prefix="test_object", delimiter=""),
-            mock.call(DESTINATION_BUCKET, prefix="test_object", delimiter=""),
+            mock.call(DESTINATION_BUCKET, prefix="test_object", delimiter="", 
match_glob=None),
         ]
         mock_hook.return_value.list.assert_has_calls(mock_calls)
 
@@ -117,8 +119,8 @@ class TestGoogleCloudStorageToCloudStorageOperator:
 
         operator.execute(None)
         mock_calls = [
-            mock.call(TEST_BUCKET, prefix=SOURCE_OBJECT_NO_WILDCARD, 
delimiter=None),
-            mock.call(DESTINATION_BUCKET, prefix=SOURCE_OBJECT_NO_WILDCARD, 
delimiter=None),
+            mock.call(TEST_BUCKET, prefix=SOURCE_OBJECT_NO_WILDCARD, 
delimiter=None, match_glob=None),
+            mock.call(DESTINATION_BUCKET, prefix=SOURCE_OBJECT_NO_WILDCARD, 
delimiter=None, match_glob=None),
         ]
         mock_hook.return_value.list.assert_has_calls(mock_calls)
 
@@ -140,7 +142,7 @@ class TestGoogleCloudStorageToCloudStorageOperator:
 
         operator.execute(None)
         mock_calls = [
-            mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None),
+            mock.call(TEST_BUCKET, prefix="test_object.txt", delimiter=None, 
match_glob=None),
         ]
         mock_hook.return_value.list.assert_has_calls(mock_calls)
 
@@ -450,7 +452,9 @@ class TestGoogleCloudStorageToCloudStorageOperator:
         )
 
         operator.execute(None)
-        mock_hook.return_value.list.assert_called_once_with(TEST_BUCKET, 
prefix="", delimiter=None)
+        mock_hook.return_value.list.assert_called_once_with(
+            TEST_BUCKET, prefix="", delimiter=None, match_glob=None
+        )
 
     @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
     def test_raises_exception_with_two_empty_list_inside_source_objects(self, 
mock_hook):
@@ -469,7 +473,7 @@ class TestGoogleCloudStorageToCloudStorageOperator:
         )
         operator.execute(None)
         mock_hook.return_value.list.assert_called_once_with(
-            TEST_BUCKET, prefix=SOURCE_OBJECTS_SINGLE_FILE[0], delimiter=None
+            TEST_BUCKET, prefix=SOURCE_OBJECTS_SINGLE_FILE[0], delimiter=None, 
match_glob=None
         )
 
     @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
@@ -480,8 +484,8 @@ class TestGoogleCloudStorageToCloudStorageOperator:
         operator.execute(None)
         mock_hook.return_value.list.assert_has_calls(
             [
-                mock.call(TEST_BUCKET, prefix="test_object/file1.txt", 
delimiter=None),
-                mock.call(TEST_BUCKET, prefix="test_object/file2.txt", 
delimiter=None),
+                mock.call(TEST_BUCKET, prefix="test_object/file1.txt", 
delimiter=None, match_glob=None),
+                mock.call(TEST_BUCKET, prefix="test_object/file2.txt", 
delimiter=None, match_glob=None),
             ],
             any_order=True,
         )
@@ -495,7 +499,9 @@ class TestGoogleCloudStorageToCloudStorageOperator:
             delimiter=DELIMITER,
         )
         operator.execute(None)
-        mock_hook.return_value.list.assert_called_once_with(TEST_BUCKET, 
prefix="", delimiter=DELIMITER)
+        mock_hook.return_value.list.assert_called_once_with(
+            TEST_BUCKET, prefix="", delimiter=DELIMITER, match_glob=None
+        )
 
     # COPY
     @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_gcs.GCSHook")
@@ -593,7 +599,7 @@ class TestGoogleCloudStorageToCloudStorageOperator:
         operator.execute(None)
         mock_calls = [
             mock.call(TEST_BUCKET, prefix="test_object", delimiter=""),
-            mock.call(DESTINATION_BUCKET, prefix="foo/bar", delimiter=""),
+            mock.call(DESTINATION_BUCKET, prefix="foo/bar", delimiter="", 
match_glob=None),
         ]
         mock_hook.return_value.list.assert_has_calls(mock_calls)
 
diff --git a/tests/providers/google/cloud/transfers/test_gcs_to_sftp.py 
b/tests/providers/google/cloud/transfers/test_gcs_to_sftp.py
index 9c6884766b..f2ede74a8a 100644
--- a/tests/providers/google/cloud/transfers/test_gcs_to_sftp.py
+++ b/tests/providers/google/cloud/transfers/test_gcs_to_sftp.py
@@ -34,6 +34,8 @@ TEST_BUCKET = "test-bucket"
 DESTINATION_SFTP = "destination_path"
 
 
+# TODO: After deprecating delimiter and wildcards in source objects,
+#       implement reverted changes from the first commit of PR #31261
 class TestGoogleCloudStorageToSFTPOperator:
     @pytest.mark.parametrize(
         "source_object, target_object, keep_directory_structure",
diff --git a/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py 
b/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py
index 5730a4b66e..525f20398a 100644
--- a/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py
+++ b/tests/providers/google/suite/transfers/test_gcs_to_gdrive.py
@@ -95,6 +95,9 @@ class TestGcsToGDriveOperator:
                     impersonation_chain=IMPERSONATION_CHAIN,
                 ),
                 mock.call().list("data", delimiter=".avro", 
prefix="sales/sales-2017/"),
+                # TODO: After deprecating delimiter and wildcards in source 
objects,
+                #       remove previous line and uncomment the following:
+                # mock.call().list("data", match_glob="**/*.avro", 
prefix="sales/sales-2017/"),
                 mock.call().download(bucket_name="data", filename="TMP1", 
object_name="sales/A.avro"),
                 mock.call().download(bucket_name="data", filename="TMP2", 
object_name="sales/B.avro"),
                 mock.call().download(bucket_name="data", filename="TMP3", 
object_name="sales/C.avro"),
@@ -137,6 +140,9 @@ class TestGcsToGDriveOperator:
                     impersonation_chain=IMPERSONATION_CHAIN,
                 ),
                 mock.call().list("data", delimiter=".avro", 
prefix="sales/sales-2017/"),
+                # TODO: After deprecating delimiter and wildcards in source 
objects,
+                #       remove previous line and uncomment the following:
+                # mock.call().list("data", match_glob="**/*.avro", 
prefix="sales/sales-2017/"),
                 mock.call().download(bucket_name="data", filename="TMP1", 
object_name="sales/A.avro"),
                 mock.call().delete("data", "sales/A.avro"),
                 mock.call().download(bucket_name="data", filename="TMP2", 
object_name="sales/B.avro"),
diff --git a/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py 
b/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
index 7931295d23..e13f76ebc4 100644
--- a/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
+++ b/tests/system/providers/google/cloud/gcs/example_gcs_to_gcs.py
@@ -173,6 +173,17 @@ with models.DAG(
     )
     # [END howto_operator_gcs_to_gcs_delimiter]
 
+    # [START howto_operator_gcs_to_gcs_match_glob]
+    copy_files_with_match_glob = GCSToGCSOperator(
+        task_id="copy_files_with_match_glob",
+        source_bucket=BUCKET_NAME_SRC,
+        source_object="data/",
+        destination_bucket=BUCKET_NAME_DST,
+        destination_object="backup/",
+        match_glob="**/*.txt",
+    )
+    # [END howto_operator_gcs_to_gcs_match_glob]
+
     # [START howto_operator_gcs_to_gcs_list]
     copy_files_with_list = GCSToGCSOperator(
         task_id="copy_files_with_list",
@@ -226,6 +237,7 @@ with models.DAG(
         copy_files_with_wildcard,
         copy_files_without_wildcard,
         copy_files_with_delimiter,
+        copy_files_with_match_glob,
         copy_files_with_list,
         move_single_file,
         move_files_with_list,

Reply via email to