This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 519d99baee Check google provider version in GCSToS3Operator before 
provide match_glob param (#32925)
519d99baee is described below

commit 519d99baee058dfa56f293f94222309c493ba3c4
Author: Hussein Awala <[email protected]>
AuthorDate: Fri Aug 4 19:40:57 2023 +0200

    Check google provider version in GCSToS3Operator before provide match_glob 
param (#32925)
    
    * Add google extra to AWS provider to help users installing compatible 
versions
    
    * Revert "Add google extra to AWS provider to help users installing 
compatible versions"
    
    This reverts commit 2058200397f30167c4b96be160de231a7246d153.
    
    * Add a check for google provider version in GCSToS3Operator before 
providing match_glob param
    
    * use airflow.providers.google.__version__ to handle providers installed 
from source
---
 .../providers/amazon/aws/transfers/gcs_to_s3.py    | 49 +++++++++++++++-------
 1 file changed, 34 insertions(+), 15 deletions(-)

diff --git a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py 
b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
index 2cdd0761a1..f47eb285ef 100644
--- a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
@@ -22,7 +22,9 @@ import os
 import warnings
 from typing import TYPE_CHECKING, Sequence
 
-from airflow.exceptions import AirflowProviderDeprecationWarning
+from packaging.version import Version
+
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.google.cloud.hooks.gcs import GCSHook
@@ -117,13 +119,6 @@ class GCSToS3Operator(BaseOperator):
 
         self.bucket = bucket
         self.prefix = prefix
-        if delimiter:
-            warnings.warn(
-                "Usage of 'delimiter' is deprecated, please use 'match_glob' 
instead",
-                AirflowProviderDeprecationWarning,
-                stacklevel=2,
-            )
-        self.delimiter = delimiter
         self.gcp_conn_id = gcp_conn_id
         self.dest_aws_conn_id = dest_aws_conn_id
         self.dest_s3_key = dest_s3_key
@@ -133,6 +128,27 @@ class GCSToS3Operator(BaseOperator):
         self.dest_s3_extra_args = dest_s3_extra_args or {}
         self.s3_acl_policy = s3_acl_policy
         self.keep_directory_structure = keep_directory_structure
+        try:
+            from airflow.providers.google import __version__
+
+            if Version(__version__) >= Version("10.3.0"):
+                self.__is_match_glob_supported = True
+            else:
+                self.__is_match_glob_supported = False
+        except ImportError:  # __version__ was added in 10.1.0, so this means 
it's < 10.3.0
+            self.__is_match_glob_supported = False
+        if self.__is_match_glob_supported:
+            if delimiter:
+                warnings.warn(
+                    "Usage of 'delimiter' is deprecated, please use 
'match_glob' instead",
+                    AirflowProviderDeprecationWarning,
+                    stacklevel=2,
+                )
+        elif match_glob:
+            raise AirflowException(
+                "The 'match_glob' parameter requires 
'apache-airflow-providers-google>=10.3.0'."
+            )
+        self.delimiter = delimiter
         self.match_glob = match_glob
         self.gcp_user_project = gcp_user_project
 
@@ -150,13 +166,16 @@ class GCSToS3Operator(BaseOperator):
             self.prefix,
         )
 
-        gcs_files = gcs_hook.list(
-            bucket_name=self.bucket,
-            prefix=self.prefix,
-            delimiter=self.delimiter,
-            match_glob=self.match_glob,
-            user_project=self.gcp_user_project,
-        )
+        list_kwargs = {
+            "bucket_name": self.bucket,
+            "prefix": self.prefix,
+            "delimiter": self.delimiter,
+            "user_project": self.gcp_user_project,
+        }
+        if self.__is_match_glob_supported:
+            list_kwargs["match_glob"] = self.match_glob
+
+        gcs_files = gcs_hook.list(**list_kwargs)  # type: ignore
 
         s3_hook = S3Hook(
             aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify, 
extra_args=self.dest_s3_extra_args

Reply via email to