This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 23e2c9527c Feature: Add Google Cloud Storage to Samba operator (#34369)
23e2c9527c is described below
commit 23e2c9527c1167b2262968cdad4d984708f4b2aa
Author: Yevhen Lebid <[email protected]>
AuthorDate: Fri Sep 29 18:16:37 2023 +0300
Feature: Add Google Cloud Storage to Samba operator (#34369)
* Feature: Add Google Cloud Storage to Samba operator
* Moved GCS to Samba transfer operator to the higher intrest provider
(Samba) folder
* add docs
* fix docs generation
* fix tests
* fix tests gcs_to_samba transfer operator
* fix static check
---------
Co-authored-by: Yevhen Lebid <[email protected]>
Co-authored-by: Yevhen Lebid <[email protected]>
---
airflow/providers/samba/provider.yaml | 5 +
.../samba/{provider.yaml => transfers/__init__.py} | 42 ---
airflow/providers/samba/transfers/gcs_to_samba.py | 200 +++++++++++++
dev/breeze/tests/test_selective_checks.py | 10 +-
docs/apache-airflow-providers-samba/index.rst | 14 +
.../transfer/gcs_to_samba.rst | 86 ++++++
generated/provider_dependencies.json | 4 +-
.../providers/samba/transfers/__init__.py | 43 +--
.../providers/samba/transfers/test_gcs_to_samba.py | 314 +++++++++++++++++++++
.../system/providers/samba/__init__.py | 42 ---
.../system/providers/samba/example_gcs_to_samba.py | 148 ++++++++++
11 files changed, 777 insertions(+), 131 deletions(-)
diff --git a/airflow/providers/samba/provider.yaml
b/airflow/providers/samba/provider.yaml
index ff311bce35..2dfdec782a 100644
--- a/airflow/providers/samba/provider.yaml
+++ b/airflow/providers/samba/provider.yaml
@@ -52,6 +52,11 @@ hooks:
python-modules:
- airflow.providers.samba.hooks.samba
+transfers:
+ - source-integration-name: Google Cloud Storage (GCS)
+ target-integration-name: Samba
+ how-to-guide:
/docs/apache-airflow-providers-samba/transfer/gcs_to_samba.rst
+ python-module: airflow.providers.samba.transfers.gcs_to_samba
connection-types:
- hook-class-name: airflow.providers.samba.hooks.samba.SambaHook
diff --git a/airflow/providers/samba/provider.yaml
b/airflow/providers/samba/transfers/__init__.py
similarity index 52%
copy from airflow/providers/samba/provider.yaml
copy to airflow/providers/samba/transfers/__init__.py
index ff311bce35..13a83393a9 100644
--- a/airflow/providers/samba/provider.yaml
+++ b/airflow/providers/samba/transfers/__init__.py
@@ -14,45 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
----
-package-name: apache-airflow-providers-samba
-name: Samba
-description: |
- `Samba <https://www.samba.org/>`__
-
-suspended: false
-versions:
- - 4.2.2
- - 4.2.1
- - 4.2.0
- - 4.1.0
- - 4.0.0
- - 3.0.4
- - 3.0.3
- - 3.0.2
- - 3.0.1
- - 3.0.0
- - 2.0.0
- - 1.0.1
- - 1.0.0
-
-dependencies:
- - apache-airflow>=2.4.0
- - smbprotocol>=1.5.0
-
-integrations:
- - integration-name: Samba
- external-doc-url: https://www.samba.org/
- logo: /integration-logos/samba/Samba.png
- tags: [protocol]
-
-hooks:
- - integration-name: Samba
- python-modules:
- - airflow.providers.samba.hooks.samba
-
-
-connection-types:
- - hook-class-name: airflow.providers.samba.hooks.samba.SambaHook
- connection-type: samba
diff --git a/airflow/providers/samba/transfers/gcs_to_samba.py
b/airflow/providers/samba/transfers/gcs_to_samba.py
new file mode 100644
index 0000000000..a645c25352
--- /dev/null
+++ b/airflow/providers/samba/transfers/gcs_to_samba.py
@@ -0,0 +1,200 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Cloud Storage to Samba operator."""
+from __future__ import annotations
+
+import os
+from tempfile import NamedTemporaryFile
+from typing import TYPE_CHECKING, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.gcs import GCSHook
+from airflow.providers.samba.hooks.samba import SambaHook
+
+WILDCARD = "*"
+
+if TYPE_CHECKING:
+ from airflow.utils.context import Context
+
+
+class GCSToSambaOperator(BaseOperator):
+ """
+ Transfer files from a Google Cloud Storage bucket to SMB server.
+
+ .. code-block:: python
+
+ with models.DAG(
+ "example_gcs_to_smb",
+ start_date=datetime(2020, 6, 19),
+ schedule=None,
+ ) as dag:
+ # downloads file to media/folder/subfolder/file.txt
+ copy_file_from_gcs_to_smb = GCSToSambaOperator(
+ task_id="file-copy-gcs-to-smb",
+ source_bucket="test-gcs-sftp-bucket-name",
+ source_object="folder/subfolder/file.txt",
+ destination_path="media",
+ )
+
+ # moves file to media/data.txt
+ move_file_from_gcs_to_smb = GCSToSambaOperator(
+ task_id="file-move-gcs-to-smb",
+ source_bucket="test-gcs-sftp-bucket-name",
+ source_object="folder/subfolder/data.txt",
+ destination_path="media",
+ move_object=True,
+ keep_directory_structure=False,
+ )
+
+ .. seealso::
+ For more information on how to use this operator, take a look at the
guide:
+ :ref:`howto/operator:GCSToSambaOperator`
+
+ :param source_bucket: The source Google Cloud Storage bucket where the
+ object is. (templated)
+ :param source_object: The source name of the object to copy in the Google
cloud
+ storage bucket. (templated)
+ You can use only one wildcard for objects (filenames) within your
+ bucket. The wildcard can appear inside the object name or at the
+ end of the object name. Appending a wildcard to the bucket name is
+ unsupported.
+ :param destination_path: The SMB remote path. This is the specified
directory path in
+ the SMB share name for uploading files to the SMB server.
+ :param keep_directory_structure: (Optional) When set to False the path of
the file
+ on the bucket is recreated within path passed in destination_path.
+ :param move_object: When move object is True, the object is moved instead
+ of copied to the new location. This is the equivalent of a mv command
+ as opposed to a cp command.
+ :param gcp_conn_id: (Optional) The connection ID used to connect to Google
Cloud.
+ :param samba_conn_id: The SMB connection id. The name or identifier for
+ establishing a connection to the SMB server.
+ :param impersonation_chain: Optional service account to impersonate using
short-term
+ credentials, or chained list of accounts required to get the
access_token
+ of the last account in the list, which will be impersonated in the
request.
+ If set as a string, the account must grant the originating account
+ the Service Account Token Creator IAM role.
+ If set as a sequence, the identities from the list must grant
+ Service Account Token Creator IAM role to the directly preceding
identity, with first
+ account from the list granting this role to the originating account
(templated).
+ """
+
+ template_fields: Sequence[str] = (
+ "source_bucket",
+ "source_object",
+ "destination_path",
+ "impersonation_chain",
+ )
+ ui_color = "#f0eee4"
+
+ def __init__(
+ self,
+ *,
+ source_bucket: str,
+ source_object: str,
+ destination_path: str,
+ keep_directory_structure: bool = True,
+ move_object: bool = False,
+ gcp_conn_id: str = "google_cloud_default",
+ samba_conn_id: str = "samba_default",
+ impersonation_chain: str | Sequence[str] | None = None,
+ **kwargs,
+ ) -> None:
+ super().__init__(**kwargs)
+
+ self.source_bucket = source_bucket
+ self.source_object = source_object
+ self.destination_path = destination_path
+ self.keep_directory_structure = keep_directory_structure
+ self.move_object = move_object
+ self.gcp_conn_id = gcp_conn_id
+ self.samba_conn_id = samba_conn_id
+ self.impersonation_chain = impersonation_chain
+ self.sftp_dirs = None
+
+ def execute(self, context: Context):
+ gcs_hook = GCSHook(
+ gcp_conn_id=self.gcp_conn_id,
+ impersonation_chain=self.impersonation_chain,
+ )
+
+ samba_hook = SambaHook(samba_conn_id=self.samba_conn_id)
+
+ if WILDCARD in self.source_object:
+ total_wildcards = self.source_object.count(WILDCARD)
+ if total_wildcards > 1:
+ raise AirflowException(
+ "Only one wildcard '*' is allowed in source_object
parameter. "
+ f"Found {total_wildcards} in {self.source_object}."
+ )
+
+ prefix, delimiter = self.source_object.split(WILDCARD, 1)
+ prefix_dirname = os.path.dirname(prefix)
+ objects = gcs_hook.list(self.source_bucket, prefix=prefix,
delimiter=delimiter)
+ # TODO: After deprecating delimiter and wildcards in source
objects,
+ # remove the previous line and uncomment the following:
+ # match_glob = f"**/*{delimiter}" if delimiter else None
+ # objects = gcs_hook.list(self.source_bucket, prefix=prefix,
match_glob=match_glob)
+
+ for source_object in objects:
+ destination_path =
self._resolve_destination_path(source_object, prefix=prefix_dirname)
+ self._copy_single_object(gcs_hook, samba_hook, source_object,
destination_path)
+
+ self.log.info("Done. Uploaded '%d' files to %s", len(objects),
self.destination_path)
+ else:
+ destination_path =
self._resolve_destination_path(self.source_object)
+ self._copy_single_object(gcs_hook, samba_hook, self.source_object,
destination_path)
+ self.log.info("Done. Uploaded '%s' file to %s",
self.source_object, destination_path)
+
+ def _resolve_destination_path(self, source_object: str, prefix: str | None
= None) -> str:
+ if not self.keep_directory_structure:
+ if prefix:
+ source_object = os.path.relpath(source_object, start=prefix)
+ else:
+ source_object = os.path.basename(source_object)
+ return os.path.join(self.destination_path, source_object)
+
+ def _copy_single_object(
+ self,
+ gcs_hook: GCSHook,
+ samba_hook: SambaHook,
+ source_object: str,
+ destination_path: str,
+ ) -> None:
+ """Helper function to copy single object."""
+ self.log.info(
+ "Executing copy of gs://%s/%s to %s",
+ self.source_bucket,
+ source_object,
+ destination_path,
+ )
+
+ dir_path = os.path.dirname(destination_path)
+ samba_hook.makedirs(dir_path, exist_ok=True)
+
+ with NamedTemporaryFile("w") as tmp:
+ gcs_hook.download(
+ bucket_name=self.source_bucket,
+ object_name=source_object,
+ filename=tmp.name,
+ )
+ samba_hook.push_from_local(destination_path, tmp.name)
+
+ if self.move_object:
+ self.log.info("Executing delete of gs://%s/%s",
self.source_bucket, source_object)
+ gcs_hook.delete(self.source_bucket, source_object)
diff --git a/dev/breeze/tests/test_selective_checks.py
b/dev/breeze/tests/test_selective_checks.py
index 523cb20a31..2868c203cb 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -540,7 +540,7 @@ def test_expected_output_full_tests_needed(
{
"affected-providers-list-as-string": "amazon apache.beam
apache.cassandra cncf.kubernetes "
"common.sql facebook google hashicorp microsoft.azure
microsoft.mssql "
- "mysql openlineage oracle postgres presto salesforce sftp ssh
trino",
+ "mysql openlineage oracle postgres presto salesforce samba
sftp ssh trino",
"all-python-versions": "['3.8']",
"all-python-versions-list-as-string": "3.8",
"needs-helm-tests": "false",
@@ -566,7 +566,7 @@ def test_expected_output_full_tests_needed(
"affected-providers-list-as-string": "amazon apache.beam
apache.cassandra "
"cncf.kubernetes common.sql facebook google "
"hashicorp microsoft.azure microsoft.mssql mysql openlineage
oracle postgres "
- "presto salesforce sftp ssh trino",
+ "presto salesforce samba sftp ssh trino",
"all-python-versions": "['3.8']",
"all-python-versions-list-as-string": "3.8",
"image-build": "true",
@@ -667,7 +667,7 @@ def test_expected_output_pull_request_v2_3(
"affected-providers-list-as-string": "amazon apache.beam
apache.cassandra "
"cncf.kubernetes common.sql "
"facebook google hashicorp microsoft.azure microsoft.mssql
mysql "
- "openlineage oracle postgres presto salesforce sftp ssh trino",
+ "openlineage oracle postgres presto salesforce samba sftp ssh
trino",
"all-python-versions": "['3.8']",
"all-python-versions-list-as-string": "3.8",
"image-build": "true",
@@ -691,6 +691,7 @@ def test_expected_output_pull_request_v2_3(
"--package-filter apache-airflow-providers-postgres "
"--package-filter apache-airflow-providers-presto "
"--package-filter apache-airflow-providers-salesforce "
+ "--package-filter apache-airflow-providers-samba "
"--package-filter apache-airflow-providers-sftp "
"--package-filter apache-airflow-providers-ssh "
"--package-filter apache-airflow-providers-trino",
@@ -700,7 +701,7 @@ def test_expected_output_pull_request_v2_3(
"parallel-test-types-list-as-string": "Providers[amazon]
Always CLI "
"Providers[apache.beam,apache.cassandra,cncf.kubernetes,common.sql,facebook,"
"hashicorp,microsoft.azure,microsoft.mssql,mysql,openlineage,oracle,postgres,presto,"
- "salesforce,sftp,ssh,trino] Providers[google]",
+ "salesforce,samba,sftp,ssh,trino] Providers[google]",
},
id="CLI tests and Google-related provider tests should run if
cli/chart files changed",
),
@@ -987,6 +988,7 @@ def test_upgrade_to_newer_dependencies(
"--package-filter apache-airflow-providers-postgres "
"--package-filter apache-airflow-providers-presto "
"--package-filter apache-airflow-providers-salesforce "
+ "--package-filter apache-airflow-providers-samba "
"--package-filter apache-airflow-providers-sftp "
"--package-filter apache-airflow-providers-ssh "
"--package-filter apache-airflow-providers-trino",
diff --git a/docs/apache-airflow-providers-samba/index.rst
b/docs/apache-airflow-providers-samba/index.rst
index bbdf25794d..2db3106a0b 100644
--- a/docs/apache-airflow-providers-samba/index.rst
+++ b/docs/apache-airflow-providers-samba/index.rst
@@ -29,6 +29,13 @@
Changelog <changelog>
Security <security>
+.. toctree::
+ :hidden:
+ :maxdepth: 1
+ :caption: Guides
+
+ GCSToSambaOperator types <transfer/gcs_to_samba>
+
.. toctree::
:hidden:
:maxdepth: 1
@@ -44,6 +51,13 @@
PyPI Repository <https://pypi.org/project/apache-airflow-providers-samba/>
Installing from sources <installing-providers-from-sources>
+.. toctree::
+ :hidden:
+ :maxdepth: 1
+ :caption: System tests
+
+ System Tests <_api/tests/system/providers/samba/index>
+
.. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE
OVERWRITTEN AT RELEASE TIME!
diff --git a/docs/apache-airflow-providers-samba/transfer/gcs_to_samba.rst
b/docs/apache-airflow-providers-samba/transfer/gcs_to_samba.rst
new file mode 100644
index 0000000000..c63f468bd6
--- /dev/null
+++ b/docs/apache-airflow-providers-samba/transfer/gcs_to_samba.rst
@@ -0,0 +1,86 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+Google Cloud Storage Transfer Operator to Samba
+===============================================
+
+Google has a service `Google Cloud Storage
<https://cloud.google.com/storage/>`__.
+This service is used to store large data from various applications.
+Samba is the standard Windows interoperability suite of programs for Linux and
Unix.
+Samba has provided secure, stable and fast file and print services for clients
using the SMB/CIFS protocol
+
+.. _howto/operator:GCSToSambaOperator:
+
+Operator
+^^^^^^^^
+
+Transfer files between Google Storage and Samba is performed with the
+:class:`~airflow.providers.samba.transfers.gcs_to_samba.GCSToSambaOperator`
operator.
+
+Use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.samba.transfers.gcs_to_samba.GCSToSambaOperator`
+to define values dynamically.
+
+
+Copying a single file
+---------------------
+
+The following Operator copies a single file.
+
+.. exampleinclude:: /../../tests/system/providers/samba/example_gcs_to_samba.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcs_to_samba_copy_single_file]
+ :end-before: [END howto_operator_gcs_to_samba_copy_single_file]
+
+Moving a single file
+--------------------
+
+To move the file use the ``move_object`` parameter. Once the file is copied to
SMB,
+the original file from the Google Storage is deleted. The ``destination_path``
parameter defines the
+full path of the file on the Samba server.
+
+.. exampleinclude:: /../../tests/system/providers/samba/example_gcs_to_samba.py
+ :language: python
+ :dedent: 4
+ :start-after: [START
howto_operator_gcs_to_samba_move_single_file_destination]
+ :end-before: [END howto_operator_gcs_to_samba_move_single_file_destination]
+
+
+Copying a directory
+-------------------
+
+Use the ``wildcard`` in ``source_path`` parameter to copy a directory.
+
+.. exampleinclude:: /../../tests/system/providers/samba/example_gcs_to_samba.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcs_to_samba_copy_directory]
+ :end-before: [END howto_operator_gcs_to_samba_copy_directory]
+
+Moving specific files
+---------------------
+
+Use the ``wildcard`` in ``source_path`` parameter to move the specific files.
+The ``destination_path`` defines the path that is prefixed to all copied files.
+
+.. exampleinclude:: /../../tests/system/providers/samba/example_gcs_to_samba.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_gcs_to_samba_move_specific_files]
+ :end-before: [END howto_operator_gcs_to_samba_move_specific_files]
diff --git a/generated/provider_dependencies.json
b/generated/provider_dependencies.json
index 1eecaa04b9..d72638a0db 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -759,7 +759,9 @@
"apache-airflow>=2.4.0",
"smbprotocol>=1.5.0"
],
- "cross-providers-deps": [],
+ "cross-providers-deps": [
+ "google"
+ ],
"excluded-python-versions": []
},
"segment": {
diff --git a/airflow/providers/samba/provider.yaml
b/tests/providers/samba/transfers/__init__.py
similarity index 52%
copy from airflow/providers/samba/provider.yaml
copy to tests/providers/samba/transfers/__init__.py
index ff311bce35..217e5db960 100644
--- a/airflow/providers/samba/provider.yaml
+++ b/tests/providers/samba/transfers/__init__.py
@@ -1,3 +1,4 @@
+#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -14,45 +15,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
----
-package-name: apache-airflow-providers-samba
-name: Samba
-description: |
- `Samba <https://www.samba.org/>`__
-
-suspended: false
-versions:
- - 4.2.2
- - 4.2.1
- - 4.2.0
- - 4.1.0
- - 4.0.0
- - 3.0.4
- - 3.0.3
- - 3.0.2
- - 3.0.1
- - 3.0.0
- - 2.0.0
- - 1.0.1
- - 1.0.0
-
-dependencies:
- - apache-airflow>=2.4.0
- - smbprotocol>=1.5.0
-
-integrations:
- - integration-name: Samba
- external-doc-url: https://www.samba.org/
- logo: /integration-logos/samba/Samba.png
- tags: [protocol]
-
-hooks:
- - integration-name: Samba
- python-modules:
- - airflow.providers.samba.hooks.samba
-
-
-connection-types:
- - hook-class-name: airflow.providers.samba.hooks.samba.SambaHook
- connection-type: samba
diff --git a/tests/providers/samba/transfers/test_gcs_to_samba.py
b/tests/providers/samba/transfers/test_gcs_to_samba.py
new file mode 100644
index 0000000000..100fde5f7d
--- /dev/null
+++ b/tests/providers/samba/transfers/test_gcs_to_samba.py
@@ -0,0 +1,314 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+from unittest import mock
+
+import pytest
+
+from airflow.exceptions import AirflowException
+from airflow.providers.samba.transfers.gcs_to_samba import GCSToSambaOperator
+
+TASK_ID = "test-gcs-to-samba-operator"
+GCP_CONN_ID = "GCP_CONN_ID"
+SAMBA_CONN_ID = "SAMBA_CONN_ID"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+TEST_BUCKET = "test-bucket"
+DESTINATION_SMB = "destination_path"
+
+
+class TestGoogleCloudStorageToSambaOperator:
+ @pytest.mark.parametrize(
+ "source_object, target_object, keep_directory_structure",
+ [
+ ("folder/test_object.txt", "folder/test_object.txt", True),
+ ("folder/subfolder/test_object.txt",
"folder/subfolder/test_object.txt", True),
+ ("folder/test_object.txt", "test_object.txt", False),
+ ("folder/subfolder/test_object.txt", "test_object.txt", False),
+ ],
+ )
+ @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.GCSHook")
+ @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.SambaHook")
+ def test_execute_copy_single_file(
+ self, samba_hook_mock, gcs_hook_mock, source_object, target_object,
keep_directory_structure
+ ):
+ operator = GCSToSambaOperator(
+ task_id=TASK_ID,
+ source_bucket=TEST_BUCKET,
+ source_object=source_object,
+ destination_path=DESTINATION_SMB,
+ keep_directory_structure=keep_directory_structure,
+ move_object=False,
+ gcp_conn_id=GCP_CONN_ID,
+ samba_conn_id=SAMBA_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ operator.execute({})
+ gcs_hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ samba_hook_mock.assert_called_once_with(samba_conn_id=SAMBA_CONN_ID)
+ gcs_hook_mock.return_value.download.assert_called_with(
+ bucket_name=TEST_BUCKET, object_name=source_object,
filename=mock.ANY
+ )
+ samba_hook_mock.return_value.push_from_local.assert_called_with(
+ os.path.join(DESTINATION_SMB, target_object), mock.ANY
+ )
+ gcs_hook_mock.return_value.delete.assert_not_called()
+
+ @pytest.mark.parametrize(
+ "source_object, target_object, keep_directory_structure",
+ [
+ ("folder/test_object.txt", "folder/test_object.txt", True),
+ ("folder/subfolder/test_object.txt",
"folder/subfolder/test_object.txt", True),
+ ("folder/test_object.txt", "test_object.txt", False),
+ ("folder/subfolder/test_object.txt", "test_object.txt", False),
+ ],
+ )
+ @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.GCSHook")
+ @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.SambaHook")
+ def test_execute_move_single_file(
+ self,
+ samba_hook_mock,
+ gcs_hook_mock,
+ source_object,
+ target_object,
+ keep_directory_structure,
+ ):
+ operator = GCSToSambaOperator(
+ task_id=TASK_ID,
+ source_bucket=TEST_BUCKET,
+ source_object=source_object,
+ destination_path=DESTINATION_SMB,
+ keep_directory_structure=keep_directory_structure,
+ move_object=True,
+ gcp_conn_id=GCP_CONN_ID,
+ samba_conn_id=SAMBA_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ operator.execute(None)
+ gcs_hook_mock.assert_called_once_with(
+ gcp_conn_id=GCP_CONN_ID,
+ impersonation_chain=IMPERSONATION_CHAIN,
+ )
+ samba_hook_mock.assert_called_once_with(samba_conn_id=SAMBA_CONN_ID)
+ gcs_hook_mock.return_value.download.assert_called_with(
+ bucket_name=TEST_BUCKET, object_name=source_object,
filename=mock.ANY
+ )
+ samba_hook_mock.return_value.push_from_local.assert_called_with(
+ os.path.join(DESTINATION_SMB, target_object), mock.ANY
+ )
+ gcs_hook_mock.return_value.delete.assert_called_once_with(TEST_BUCKET,
source_object)
+
+ @pytest.mark.parametrize(
+ "source_object, prefix, delimiter, gcs_files_list, target_objects,
keep_directory_structure",
+ [
+ (
+ "folder/test_object*.txt",
+ "folder/test_object",
+ ".txt",
+ [
+ "folder/test_object/file1.txt",
+ "folder/test_object/file2.txt",
+ ],
+ ["test_object/file1.txt", "test_object/file2.txt"],
+ False,
+ ),
+ (
+ "folder/test_object/*",
+ "folder/test_object/",
+ "",
+ [
+ "folder/test_object/file1.txt",
+ "folder/test_object/file2.txt",
+ ],
+ ["file1.txt", "file2.txt"],
+ False,
+ ),
+ (
+ "folder/test_object*.txt",
+ "folder/test_object",
+ ".txt",
+ [
+ "folder/test_object/file1.txt",
+ "folder/test_object/file2.txt",
+ ],
+ ["folder/test_object/file1.txt",
"folder/test_object/file2.txt"],
+ True,
+ ),
+ (
+ "folder/test_object/*",
+ "folder/test_object/",
+ "",
+ [
+ "folder/test_object/file1.txt",
+ "folder/test_object/file2.txt",
+ ],
+ ["folder/test_object/file1.txt",
"folder/test_object/file2.txt"],
+ True,
+ ),
+ ],
+ )
+ @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.GCSHook")
+ @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.SambaHook")
+ def test_execute_copy_with_wildcard(
+ self,
+ samba_hook_mock,
+ gcs_hook_mock,
+ source_object,
+ prefix,
+ delimiter,
+ gcs_files_list,
+ target_objects,
+ keep_directory_structure,
+ ):
+ gcs_hook_mock.return_value.list.return_value = gcs_files_list
+ operator = GCSToSambaOperator(
+ task_id=TASK_ID,
+ source_bucket=TEST_BUCKET,
+ source_object=source_object,
+ destination_path=DESTINATION_SMB,
+ keep_directory_structure=keep_directory_structure,
+ move_object=False,
+ gcp_conn_id=GCP_CONN_ID,
+ samba_conn_id=SAMBA_CONN_ID,
+ )
+ operator.execute(None)
+ gcs_hook_mock.return_value.list.assert_called_with(TEST_BUCKET,
delimiter=delimiter, prefix=prefix)
+ gcs_hook_mock.return_value.download.assert_has_calls(
+ [
+ mock.call(bucket_name=TEST_BUCKET, object_name=gcs_file,
filename=mock.ANY)
+ for gcs_file in gcs_files_list
+ ]
+ )
+ samba_hook_mock.return_value.push_from_local.assert_has_calls(
+ [
+ mock.call(os.path.join(DESTINATION_SMB, target_object),
mock.ANY)
+ for target_object in target_objects
+ ]
+ )
+ gcs_hook_mock.return_value.delete.assert_not_called()
+
+ @pytest.mark.parametrize(
+ "source_object, prefix, delimiter, gcs_files_list, target_objects,
keep_directory_structure",
+ [
+ (
+ "folder/test_object*.txt",
+ "folder/test_object",
+ ".txt",
+ [
+ "folder/test_object/file1.txt",
+ "folder/test_object/file2.txt",
+ ],
+ ["test_object/file1.txt", "test_object/file2.txt"],
+ False,
+ ),
+ (
+ "folder/test_object/*",
+ "folder/test_object/",
+ "",
+ [
+ "folder/test_object/file1.txt",
+ "folder/test_object/file2.txt",
+ ],
+ ["file1.txt", "file2.txt"],
+ False,
+ ),
+ (
+ "folder/test_object*.txt",
+ "folder/test_object",
+ ".txt",
+ [
+ "folder/test_object/file1.txt",
+ "folder/test_object/file2.txt",
+ ],
+ ["folder/test_object/file1.txt",
"folder/test_object/file2.txt"],
+ True,
+ ),
+ (
+ "folder/test_object/*",
+ "folder/test_object/",
+ "",
+ [
+ "folder/test_object/file1.txt",
+ "folder/test_object/file2.txt",
+ ],
+ ["folder/test_object/file1.txt",
"folder/test_object/file2.txt"],
+ True,
+ ),
+ ],
+ )
+ @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.GCSHook")
+ @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.SambaHook")
+ def test_execute_move_with_wildcard(
+ self,
+ samba_hook_mock,
+ gcs_hook_mock,
+ source_object,
+ prefix,
+ delimiter,
+ gcs_files_list,
+ target_objects,
+ keep_directory_structure,
+ ):
+ gcs_hook_mock.return_value.list.return_value = gcs_files_list
+ operator = GCSToSambaOperator(
+ task_id=TASK_ID,
+ source_bucket=TEST_BUCKET,
+ source_object=source_object,
+ destination_path=DESTINATION_SMB,
+ keep_directory_structure=keep_directory_structure,
+ move_object=True,
+ gcp_conn_id=GCP_CONN_ID,
+ samba_conn_id=SAMBA_CONN_ID,
+ )
+ operator.execute(None)
+ gcs_hook_mock.return_value.list.assert_called_with(TEST_BUCKET,
delimiter=delimiter, prefix=prefix)
+ gcs_hook_mock.return_value.download.assert_has_calls(
+ [
+ mock.call(bucket_name=TEST_BUCKET, object_name=gcs_file,
filename=mock.ANY)
+ for gcs_file in gcs_files_list
+ ]
+ )
+ samba_hook_mock.return_value.push_from_local.assert_has_calls(
+ [
+ mock.call(os.path.join(DESTINATION_SMB, target_object),
mock.ANY)
+ for target_object in target_objects
+ ]
+ )
+ gcs_hook_mock.return_value.delete.assert_has_calls(
+ [mock.call(TEST_BUCKET, gcs_file) for gcs_file in gcs_files_list]
+ )
+
+ @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.GCSHook")
+ @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.SambaHook")
+ def test_execute_more_than_one_wildcard_exception(self, samba_hook_mock,
gcs_hook_mock):
+ operator = GCSToSambaOperator(
+ task_id=TASK_ID,
+ source_bucket=TEST_BUCKET,
+ source_object="csv/*/test_*.csv",
+ destination_path=DESTINATION_SMB,
+ move_object=False,
+ gcp_conn_id=GCP_CONN_ID,
+ samba_conn_id=SAMBA_CONN_ID,
+ )
+ with pytest.raises(AirflowException):
+ operator.execute(None)
diff --git a/airflow/providers/samba/provider.yaml
b/tests/system/providers/samba/__init__.py
similarity index 52%
copy from airflow/providers/samba/provider.yaml
copy to tests/system/providers/samba/__init__.py
index ff311bce35..13a83393a9 100644
--- a/airflow/providers/samba/provider.yaml
+++ b/tests/system/providers/samba/__init__.py
@@ -14,45 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-
----
-package-name: apache-airflow-providers-samba
-name: Samba
-description: |
- `Samba <https://www.samba.org/>`__
-
-suspended: false
-versions:
- - 4.2.2
- - 4.2.1
- - 4.2.0
- - 4.1.0
- - 4.0.0
- - 3.0.4
- - 3.0.3
- - 3.0.2
- - 3.0.1
- - 3.0.0
- - 2.0.0
- - 1.0.1
- - 1.0.0
-
-dependencies:
- - apache-airflow>=2.4.0
- - smbprotocol>=1.5.0
-
-integrations:
- - integration-name: Samba
- external-doc-url: https://www.samba.org/
- logo: /integration-logos/samba/Samba.png
- tags: [protocol]
-
-hooks:
- - integration-name: Samba
- python-modules:
- - airflow.providers.samba.hooks.samba
-
-
-connection-types:
- - hook-class-name: airflow.providers.samba.hooks.samba.SambaHook
- connection-type: samba
diff --git a/tests/system/providers/samba/example_gcs_to_samba.py
b/tests/system/providers/samba/example_gcs_to_samba.py
new file mode 100644
index 0000000000..23df501f4c
--- /dev/null
+++ b/tests/system/providers/samba/example_gcs_to_samba.py
@@ -0,0 +1,148 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG for Google Cloud Storage to Samba transfer operators.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from airflow import models
+from airflow.providers.google.cloud.operators.gcs import
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.transfers.local_to_gcs import
LocalFilesystemToGCSOperator
+from airflow.providers.samba.transfers.gcs_to_samba import GCSToSambaOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+DAG_ID = "gcs_to_smb"
+
+SMB_CONN = "samba_default"
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+DESTINATION_PATH_1 = "tmp/single-file/"
+DESTINATION_PATH_2 = "tmp/dest-dir-1/"
+DESTINATION_PATH_3 = "tmp/dest-dir-2/"
+FILE_NAME = GCS_SRC_FILE = "empty.txt"
+UPLOAD_SRC = str(Path(__file__).parent / "resources" / FILE_NAME)
+GCS_SRC_FILE_IN_DIR = f"dir-1/{FILE_NAME}"
+GCS_SRC_DIR = "dir-2/*.txt"
+UPLOAD_IN_DIR_DST = f"dir-2/{FILE_NAME}"
+
+
+with models.DAG(
+ DAG_ID,
+ schedule="@once",
+ start_date=datetime(2021, 1, 1),
+ catchup=False,
+ tags=["example", "gcs", "smb"],
+) as dag:
+ create_bucket = GCSCreateBucketOperator(
+ task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
+ )
+
+ upload_file_1 = LocalFilesystemToGCSOperator(
+ task_id="upload_file_1",
+ src=UPLOAD_SRC,
+ dst=FILE_NAME,
+ bucket=BUCKET_NAME,
+ )
+ upload_file_2 = LocalFilesystemToGCSOperator(
+ task_id="upload_file_2",
+ src=UPLOAD_SRC,
+ dst=GCS_SRC_FILE_IN_DIR,
+ bucket=BUCKET_NAME,
+ )
+ upload_file_3 = LocalFilesystemToGCSOperator(
+ task_id="upload_file_3",
+ src=UPLOAD_SRC,
+ dst=UPLOAD_IN_DIR_DST,
+ bucket=BUCKET_NAME,
+ )
+
+ # [START howto_operator_gcs_to_samba_copy_single_file]
+ copy_file_from_gcs_to_samba = GCSToSambaOperator(
+ task_id="file-copy-gcs-to-samba",
+ samba_conn_id=SMB_CONN,
+ source_bucket=BUCKET_NAME,
+ source_object=GCS_SRC_FILE,
+ destination_path=DESTINATION_PATH_1,
+ )
+ # [END howto_operator_gcs_to_samba_copy_single_file]
+
+ # [START howto_operator_gcs_to_samba_move_single_file_destination]
+ move_file_from_gcs_to_samba = GCSToSambaOperator(
+ task_id="file-move-gcs-to-samba",
+ samba_conn_id=SMB_CONN,
+ source_bucket=BUCKET_NAME,
+ source_object=GCS_SRC_FILE_IN_DIR,
+ destination_path=DESTINATION_PATH_1,
+ move_object=True,
+ )
+ # [END howto_operator_gcs_to_samba_move_single_file_destination]
+
+ # [START howto_operator_gcs_to_samba_copy_directory]
+ copy_dir_from_gcs_to_samba = GCSToSambaOperator(
+ task_id="dir-copy-gcs-to-samba",
+ samba_conn_id=SMB_CONN,
+ source_bucket=BUCKET_NAME,
+ source_object=GCS_SRC_DIR,
+ destination_path=DESTINATION_PATH_2,
+ )
+ # [END howto_operator_gcs_to_samba_copy_directory]
+
+ # [START howto_operator_gcs_to_samba_move_specific_files]
+ move_dir_from_gcs_to_samba = GCSToSambaOperator(
+ task_id="dir-move-gcs-to-samba",
+ samba_conn_id=SMB_CONN,
+ source_bucket=BUCKET_NAME,
+ source_object=GCS_SRC_DIR,
+ destination_path=DESTINATION_PATH_3,
+ keep_directory_structure=False,
+ )
+ # [END howto_operator_gcs_to_samba_move_specific_files]
+
+ delete_bucket = GCSDeleteBucketOperator(
+ task_id="delete_bucket", bucket_name=BUCKET_NAME,
trigger_rule=TriggerRule.ALL_DONE
+ )
+
+ (
+ # TEST SETUP
+ create_bucket
+ >> (upload_file_1, upload_file_2, upload_file_3)
+ # TEST BODY
+ >> copy_file_from_gcs_to_samba
+ >> move_file_from_gcs_to_samba
+ >> copy_dir_from_gcs_to_samba
+ >> move_dir_from_gcs_to_samba
+ # TEST TEARDOWN
+ >> delete_bucket
+ )
+
+ from tests.system.utils.watcher import watcher
+
+ # This test needs watcher in order to properly mark success/failure
+ # when "tearDown" task with trigger rule is part of the DAG
+ list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run # noqa: E402
+
+# Needed to run the example DAG with pytest (see:
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)