This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 519d99baee Check google provider version in GCSToS3Operator before
provide match_glob param (#32925)
519d99baee is described below
commit 519d99baee058dfa56f293f94222309c493ba3c4
Author: Hussein Awala <[email protected]>
AuthorDate: Fri Aug 4 19:40:57 2023 +0200
Check google provider version in GCSToS3Operator before provide match_glob
param (#32925)
* Add google extra to AWS provider to help users installing compatible
versions
* Revert "Add google extra to AWS provider to help users installing
compatible versions"
This reverts commit 2058200397f30167c4b96be160de231a7246d153.
* Add a check for google provider version in GCSToS3Operator before
providing match_glob param
* use airflow.providers.google.__version__ to handle providers installed
from source
---
.../providers/amazon/aws/transfers/gcs_to_s3.py | 49 +++++++++++++++-------
1 file changed, 34 insertions(+), 15 deletions(-)
diff --git a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
index 2cdd0761a1..f47eb285ef 100644
--- a/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
+++ b/airflow/providers/amazon/aws/transfers/gcs_to_s3.py
@@ -22,7 +22,9 @@ import os
import warnings
from typing import TYPE_CHECKING, Sequence
-from airflow.exceptions import AirflowProviderDeprecationWarning
+from packaging.version import Version
+
+from airflow.exceptions import AirflowException,
AirflowProviderDeprecationWarning
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.google.cloud.hooks.gcs import GCSHook
@@ -117,13 +119,6 @@ class GCSToS3Operator(BaseOperator):
self.bucket = bucket
self.prefix = prefix
- if delimiter:
- warnings.warn(
- "Usage of 'delimiter' is deprecated, please use 'match_glob'
instead",
- AirflowProviderDeprecationWarning,
- stacklevel=2,
- )
- self.delimiter = delimiter
self.gcp_conn_id = gcp_conn_id
self.dest_aws_conn_id = dest_aws_conn_id
self.dest_s3_key = dest_s3_key
@@ -133,6 +128,27 @@ class GCSToS3Operator(BaseOperator):
self.dest_s3_extra_args = dest_s3_extra_args or {}
self.s3_acl_policy = s3_acl_policy
self.keep_directory_structure = keep_directory_structure
+ try:
+ from airflow.providers.google import __version__
+
+ if Version(__version__) >= Version("10.3.0"):
+ self.__is_match_glob_supported = True
+ else:
+ self.__is_match_glob_supported = False
+ except ImportError: # __version__ was added in 10.1.0, so this means
it's < 10.3.0
+ self.__is_match_glob_supported = False
+ if self.__is_match_glob_supported:
+ if delimiter:
+ warnings.warn(
+ "Usage of 'delimiter' is deprecated, please use
'match_glob' instead",
+ AirflowProviderDeprecationWarning,
+ stacklevel=2,
+ )
+ elif match_glob:
+ raise AirflowException(
+ "The 'match_glob' parameter requires
'apache-airflow-providers-google>=10.3.0'."
+ )
+ self.delimiter = delimiter
self.match_glob = match_glob
self.gcp_user_project = gcp_user_project
@@ -150,13 +166,16 @@ class GCSToS3Operator(BaseOperator):
self.prefix,
)
- gcs_files = gcs_hook.list(
- bucket_name=self.bucket,
- prefix=self.prefix,
- delimiter=self.delimiter,
- match_glob=self.match_glob,
- user_project=self.gcp_user_project,
- )
+ list_kwargs = {
+ "bucket_name": self.bucket,
+ "prefix": self.prefix,
+ "delimiter": self.delimiter,
+ "user_project": self.gcp_user_project,
+ }
+ if self.__is_match_glob_supported:
+ list_kwargs["match_glob"] = self.match_glob
+
+ gcs_files = gcs_hook.list(**list_kwargs) # type: ignore
s3_hook = S3Hook(
aws_conn_id=self.dest_aws_conn_id, verify=self.dest_verify,
extra_args=self.dest_s3_extra_args