This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 23e2c9527c Feature: Add Google Cloud Storage to Samba operator (#34369)
23e2c9527c is described below

commit 23e2c9527c1167b2262968cdad4d984708f4b2aa
Author: Yevhen Lebid <[email protected]>
AuthorDate: Fri Sep 29 18:16:37 2023 +0300

    Feature: Add Google Cloud Storage to Samba operator (#34369)
    
    * Feature: Add Google Cloud Storage to Samba operator
    
    * Moved GCS to Samba transfer operator to the higher intrest provider 
(Samba) folder
    
    * add docs
    
    * fix docs generation
    
    * fix tests
    
    * fix tests gcs_to_samba transfer operator
    
    * fix static check
    
    ---------
    
    Co-authored-by: Yevhen Lebid <[email protected]>
    Co-authored-by: Yevhen Lebid <[email protected]>
---
 airflow/providers/samba/provider.yaml              |   5 +
 .../samba/{provider.yaml => transfers/__init__.py} |  42 ---
 airflow/providers/samba/transfers/gcs_to_samba.py  | 200 +++++++++++++
 dev/breeze/tests/test_selective_checks.py          |  10 +-
 docs/apache-airflow-providers-samba/index.rst      |  14 +
 .../transfer/gcs_to_samba.rst                      |  86 ++++++
 generated/provider_dependencies.json               |   4 +-
 .../providers/samba/transfers/__init__.py          |  43 +--
 .../providers/samba/transfers/test_gcs_to_samba.py | 314 +++++++++++++++++++++
 .../system/providers/samba/__init__.py             |  42 ---
 .../system/providers/samba/example_gcs_to_samba.py | 148 ++++++++++
 11 files changed, 777 insertions(+), 131 deletions(-)

diff --git a/airflow/providers/samba/provider.yaml 
b/airflow/providers/samba/provider.yaml
index ff311bce35..2dfdec782a 100644
--- a/airflow/providers/samba/provider.yaml
+++ b/airflow/providers/samba/provider.yaml
@@ -52,6 +52,11 @@ hooks:
     python-modules:
       - airflow.providers.samba.hooks.samba
 
+transfers:
+  - source-integration-name: Google Cloud Storage (GCS)
+    target-integration-name: Samba
+    how-to-guide: 
/docs/apache-airflow-providers-samba/transfer/gcs_to_samba.rst
+    python-module: airflow.providers.samba.transfers.gcs_to_samba
 
 connection-types:
   - hook-class-name: airflow.providers.samba.hooks.samba.SambaHook
diff --git a/airflow/providers/samba/provider.yaml 
b/airflow/providers/samba/transfers/__init__.py
similarity index 52%
copy from airflow/providers/samba/provider.yaml
copy to airflow/providers/samba/transfers/__init__.py
index ff311bce35..13a83393a9 100644
--- a/airflow/providers/samba/provider.yaml
+++ b/airflow/providers/samba/transfers/__init__.py
@@ -14,45 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
----
-package-name: apache-airflow-providers-samba
-name: Samba
-description: |
-    `Samba <https://www.samba.org/>`__
-
-suspended: false
-versions:
-  - 4.2.2
-  - 4.2.1
-  - 4.2.0
-  - 4.1.0
-  - 4.0.0
-  - 3.0.4
-  - 3.0.3
-  - 3.0.2
-  - 3.0.1
-  - 3.0.0
-  - 2.0.0
-  - 1.0.1
-  - 1.0.0
-
-dependencies:
-  - apache-airflow>=2.4.0
-  - smbprotocol>=1.5.0
-
-integrations:
-  - integration-name: Samba
-    external-doc-url: https://www.samba.org/
-    logo: /integration-logos/samba/Samba.png
-    tags: [protocol]
-
-hooks:
-  - integration-name: Samba
-    python-modules:
-      - airflow.providers.samba.hooks.samba
-
-
-connection-types:
-  - hook-class-name: airflow.providers.samba.hooks.samba.SambaHook
-    connection-type: samba
diff --git a/airflow/providers/samba/transfers/gcs_to_samba.py 
b/airflow/providers/samba/transfers/gcs_to_samba.py
new file mode 100644
index 0000000000..a645c25352
--- /dev/null
+++ b/airflow/providers/samba/transfers/gcs_to_samba.py
@@ -0,0 +1,200 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""This module contains Google Cloud Storage to Samba operator."""
+from __future__ import annotations
+
+import os
+from tempfile import NamedTemporaryFile
+from typing import TYPE_CHECKING, Sequence
+
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.providers.google.cloud.hooks.gcs import GCSHook
+from airflow.providers.samba.hooks.samba import SambaHook
+
+WILDCARD = "*"
+
+if TYPE_CHECKING:
+    from airflow.utils.context import Context
+
+
+class GCSToSambaOperator(BaseOperator):
+    """
+    Transfer files from a Google Cloud Storage bucket to SMB server.
+
+    .. code-block:: python
+
+        with models.DAG(
+            "example_gcs_to_smb",
+            start_date=datetime(2020, 6, 19),
+            schedule=None,
+        ) as dag:
+            # downloads file to media/folder/subfolder/file.txt
+            copy_file_from_gcs_to_smb = GCSToSambaOperator(
+                task_id="file-copy-gcs-to-smb",
+                source_bucket="test-gcs-sftp-bucket-name",
+                source_object="folder/subfolder/file.txt",
+                destination_path="media",
+            )
+
+            # moves file to media/data.txt
+            move_file_from_gcs_to_smb = GCSToSambaOperator(
+                task_id="file-move-gcs-to-smb",
+                source_bucket="test-gcs-sftp-bucket-name",
+                source_object="folder/subfolder/data.txt",
+                destination_path="media",
+                move_object=True,
+                keep_directory_structure=False,
+            )
+
+    .. seealso::
+        For more information on how to use this operator, take a look at the 
guide:
+        :ref:`howto/operator:GCSToSambaOperator`
+
+    :param source_bucket: The source Google Cloud Storage bucket where the
+         object is. (templated)
+    :param source_object: The source name of the object to copy in the Google 
cloud
+        storage bucket. (templated)
+        You can use only one wildcard for objects (filenames) within your
+        bucket. The wildcard can appear inside the object name or at the
+        end of the object name. Appending a wildcard to the bucket name is
+        unsupported.
+    :param destination_path: The SMB remote path. This is the specified 
directory path in
+        the SMB share name for uploading files to the SMB server.
+    :param keep_directory_structure: (Optional) When set to False the path of 
the file
+         on the bucket is recreated within path passed in destination_path.
+    :param move_object: When move object is True, the object is moved instead
+        of copied to the new location. This is the equivalent of a mv command
+        as opposed to a cp command.
+    :param gcp_conn_id: (Optional) The connection ID used to connect to Google 
Cloud.
+    :param samba_conn_id: The SMB connection id. The name or identifier for
+        establishing a connection to the SMB server.
+    :param impersonation_chain: Optional service account to impersonate using 
short-term
+        credentials, or chained list of accounts required to get the 
access_token
+        of the last account in the list, which will be impersonated in the 
request.
+        If set as a string, the account must grant the originating account
+        the Service Account Token Creator IAM role.
+        If set as a sequence, the identities from the list must grant
+        Service Account Token Creator IAM role to the directly preceding 
identity, with first
+        account from the list granting this role to the originating account 
(templated).
+    """
+
+    template_fields: Sequence[str] = (
+        "source_bucket",
+        "source_object",
+        "destination_path",
+        "impersonation_chain",
+    )
+    ui_color = "#f0eee4"
+
+    def __init__(
+        self,
+        *,
+        source_bucket: str,
+        source_object: str,
+        destination_path: str,
+        keep_directory_structure: bool = True,
+        move_object: bool = False,
+        gcp_conn_id: str = "google_cloud_default",
+        samba_conn_id: str = "samba_default",
+        impersonation_chain: str | Sequence[str] | None = None,
+        **kwargs,
+    ) -> None:
+        super().__init__(**kwargs)
+
+        self.source_bucket = source_bucket
+        self.source_object = source_object
+        self.destination_path = destination_path
+        self.keep_directory_structure = keep_directory_structure
+        self.move_object = move_object
+        self.gcp_conn_id = gcp_conn_id
+        self.samba_conn_id = samba_conn_id
+        self.impersonation_chain = impersonation_chain
+        self.sftp_dirs = None
+
+    def execute(self, context: Context):
+        gcs_hook = GCSHook(
+            gcp_conn_id=self.gcp_conn_id,
+            impersonation_chain=self.impersonation_chain,
+        )
+
+        samba_hook = SambaHook(samba_conn_id=self.samba_conn_id)
+
+        if WILDCARD in self.source_object:
+            total_wildcards = self.source_object.count(WILDCARD)
+            if total_wildcards > 1:
+                raise AirflowException(
+                    "Only one wildcard '*' is allowed in source_object 
parameter. "
+                    f"Found {total_wildcards} in {self.source_object}."
+                )
+
+            prefix, delimiter = self.source_object.split(WILDCARD, 1)
+            prefix_dirname = os.path.dirname(prefix)
+            objects = gcs_hook.list(self.source_bucket, prefix=prefix, 
delimiter=delimiter)
+            # TODO: After deprecating delimiter and wildcards in source 
objects,
+            #       remove the previous line and uncomment the following:
+            # match_glob = f"**/*{delimiter}" if delimiter else None
+            # objects = gcs_hook.list(self.source_bucket, prefix=prefix, 
match_glob=match_glob)
+
+            for source_object in objects:
+                destination_path = 
self._resolve_destination_path(source_object, prefix=prefix_dirname)
+                self._copy_single_object(gcs_hook, samba_hook, source_object, 
destination_path)
+
+            self.log.info("Done. Uploaded '%d' files to %s", len(objects), 
self.destination_path)
+        else:
+            destination_path = 
self._resolve_destination_path(self.source_object)
+            self._copy_single_object(gcs_hook, samba_hook, self.source_object, 
destination_path)
+            self.log.info("Done. Uploaded '%s' file to %s", 
self.source_object, destination_path)
+
+    def _resolve_destination_path(self, source_object: str, prefix: str | None 
= None) -> str:
+        if not self.keep_directory_structure:
+            if prefix:
+                source_object = os.path.relpath(source_object, start=prefix)
+            else:
+                source_object = os.path.basename(source_object)
+        return os.path.join(self.destination_path, source_object)
+
+    def _copy_single_object(
+        self,
+        gcs_hook: GCSHook,
+        samba_hook: SambaHook,
+        source_object: str,
+        destination_path: str,
+    ) -> None:
+        """Helper function to copy single object."""
+        self.log.info(
+            "Executing copy of gs://%s/%s to %s",
+            self.source_bucket,
+            source_object,
+            destination_path,
+        )
+
+        dir_path = os.path.dirname(destination_path)
+        samba_hook.makedirs(dir_path, exist_ok=True)
+
+        with NamedTemporaryFile("w") as tmp:
+            gcs_hook.download(
+                bucket_name=self.source_bucket,
+                object_name=source_object,
+                filename=tmp.name,
+            )
+            samba_hook.push_from_local(destination_path, tmp.name)
+
+        if self.move_object:
+            self.log.info("Executing delete of gs://%s/%s", 
self.source_bucket, source_object)
+            gcs_hook.delete(self.source_bucket, source_object)
diff --git a/dev/breeze/tests/test_selective_checks.py 
b/dev/breeze/tests/test_selective_checks.py
index 523cb20a31..2868c203cb 100644
--- a/dev/breeze/tests/test_selective_checks.py
+++ b/dev/breeze/tests/test_selective_checks.py
@@ -540,7 +540,7 @@ def test_expected_output_full_tests_needed(
             {
                 "affected-providers-list-as-string": "amazon apache.beam 
apache.cassandra cncf.kubernetes "
                 "common.sql facebook google hashicorp microsoft.azure 
microsoft.mssql "
-                "mysql openlineage oracle postgres presto salesforce sftp ssh 
trino",
+                "mysql openlineage oracle postgres presto salesforce samba 
sftp ssh trino",
                 "all-python-versions": "['3.8']",
                 "all-python-versions-list-as-string": "3.8",
                 "needs-helm-tests": "false",
@@ -566,7 +566,7 @@ def test_expected_output_full_tests_needed(
                 "affected-providers-list-as-string": "amazon apache.beam 
apache.cassandra "
                 "cncf.kubernetes common.sql facebook google "
                 "hashicorp microsoft.azure microsoft.mssql mysql openlineage 
oracle postgres "
-                "presto salesforce sftp ssh trino",
+                "presto salesforce samba sftp ssh trino",
                 "all-python-versions": "['3.8']",
                 "all-python-versions-list-as-string": "3.8",
                 "image-build": "true",
@@ -667,7 +667,7 @@ def test_expected_output_pull_request_v2_3(
                 "affected-providers-list-as-string": "amazon apache.beam 
apache.cassandra "
                 "cncf.kubernetes common.sql "
                 "facebook google hashicorp microsoft.azure microsoft.mssql 
mysql "
-                "openlineage oracle postgres presto salesforce sftp ssh trino",
+                "openlineage oracle postgres presto salesforce samba sftp ssh 
trino",
                 "all-python-versions": "['3.8']",
                 "all-python-versions-list-as-string": "3.8",
                 "image-build": "true",
@@ -691,6 +691,7 @@ def test_expected_output_pull_request_v2_3(
                 "--package-filter apache-airflow-providers-postgres "
                 "--package-filter apache-airflow-providers-presto "
                 "--package-filter apache-airflow-providers-salesforce "
+                "--package-filter apache-airflow-providers-samba "
                 "--package-filter apache-airflow-providers-sftp "
                 "--package-filter apache-airflow-providers-ssh "
                 "--package-filter apache-airflow-providers-trino",
@@ -700,7 +701,7 @@ def test_expected_output_pull_request_v2_3(
                 "parallel-test-types-list-as-string": "Providers[amazon] 
Always CLI "
                 
"Providers[apache.beam,apache.cassandra,cncf.kubernetes,common.sql,facebook,"
                 
"hashicorp,microsoft.azure,microsoft.mssql,mysql,openlineage,oracle,postgres,presto,"
-                "salesforce,sftp,ssh,trino] Providers[google]",
+                "salesforce,samba,sftp,ssh,trino] Providers[google]",
             },
             id="CLI tests and Google-related provider tests should run if 
cli/chart files changed",
         ),
@@ -987,6 +988,7 @@ def test_upgrade_to_newer_dependencies(
                 "--package-filter apache-airflow-providers-postgres "
                 "--package-filter apache-airflow-providers-presto "
                 "--package-filter apache-airflow-providers-salesforce "
+                "--package-filter apache-airflow-providers-samba "
                 "--package-filter apache-airflow-providers-sftp "
                 "--package-filter apache-airflow-providers-ssh "
                 "--package-filter apache-airflow-providers-trino",
diff --git a/docs/apache-airflow-providers-samba/index.rst 
b/docs/apache-airflow-providers-samba/index.rst
index bbdf25794d..2db3106a0b 100644
--- a/docs/apache-airflow-providers-samba/index.rst
+++ b/docs/apache-airflow-providers-samba/index.rst
@@ -29,6 +29,13 @@
     Changelog <changelog>
     Security <security>
 
+.. toctree::
+    :hidden:
+    :maxdepth: 1
+    :caption: Guides
+
+    GCSToSambaOperator types <transfer/gcs_to_samba>
+
 .. toctree::
     :hidden:
     :maxdepth: 1
@@ -44,6 +51,13 @@
     PyPI Repository <https://pypi.org/project/apache-airflow-providers-samba/>
     Installing from sources <installing-providers-from-sources>
 
+.. toctree::
+    :hidden:
+    :maxdepth: 1
+    :caption: System tests
+
+    System Tests <_api/tests/system/providers/samba/index>
+
 .. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE 
OVERWRITTEN AT RELEASE TIME!
 
 
diff --git a/docs/apache-airflow-providers-samba/transfer/gcs_to_samba.rst 
b/docs/apache-airflow-providers-samba/transfer/gcs_to_samba.rst
new file mode 100644
index 0000000000..c63f468bd6
--- /dev/null
+++ b/docs/apache-airflow-providers-samba/transfer/gcs_to_samba.rst
@@ -0,0 +1,86 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Google Cloud Storage Transfer Operator to Samba
+===============================================
+
+Google has a service `Google Cloud Storage 
<https://cloud.google.com/storage/>`__.
+This service is used to store large data from various applications.
+Samba is the standard Windows interoperability suite of programs for Linux and 
Unix.
+Samba has provided secure, stable and fast file and print services for clients 
using the SMB/CIFS protocol
+
+.. _howto/operator:GCSToSambaOperator:
+
+Operator
+^^^^^^^^
+
+Transfer files between Google Storage and Samba is performed with the
+:class:`~airflow.providers.samba.transfers.gcs_to_samba.GCSToSambaOperator` 
operator.
+
+Use :ref:`Jinja templating <concepts:jinja-templating>` with
+:template-fields:`airflow.providers.samba.transfers.gcs_to_samba.GCSToSambaOperator`
+to define values dynamically.
+
+
+Copying a single file
+---------------------
+
+The following Operator copies a single file.
+
+.. exampleinclude:: /../../tests/system/providers/samba/example_gcs_to_samba.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_gcs_to_samba_copy_single_file]
+    :end-before: [END howto_operator_gcs_to_samba_copy_single_file]
+
+Moving a single file
+--------------------
+
+To move the file use the ``move_object`` parameter. Once the file is copied to 
SMB,
+the original file from the Google Storage is deleted. The ``destination_path`` 
parameter defines the
+full path of the file on the Samba server.
+
+.. exampleinclude:: /../../tests/system/providers/samba/example_gcs_to_samba.py
+    :language: python
+    :dedent: 4
+    :start-after: [START 
howto_operator_gcs_to_samba_move_single_file_destination]
+    :end-before: [END howto_operator_gcs_to_samba_move_single_file_destination]
+
+
+Copying a directory
+-------------------
+
+Use the ``wildcard`` in ``source_path`` parameter to copy a directory.
+
+.. exampleinclude:: /../../tests/system/providers/samba/example_gcs_to_samba.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_gcs_to_samba_copy_directory]
+    :end-before: [END howto_operator_gcs_to_samba_copy_directory]
+
+Moving specific files
+---------------------
+
+Use the ``wildcard`` in ``source_path`` parameter to move the specific files.
+The ``destination_path`` defines the path that is prefixed to all copied files.
+
+.. exampleinclude:: /../../tests/system/providers/samba/example_gcs_to_samba.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_gcs_to_samba_move_specific_files]
+    :end-before: [END howto_operator_gcs_to_samba_move_specific_files]
diff --git a/generated/provider_dependencies.json 
b/generated/provider_dependencies.json
index 1eecaa04b9..d72638a0db 100644
--- a/generated/provider_dependencies.json
+++ b/generated/provider_dependencies.json
@@ -759,7 +759,9 @@
       "apache-airflow>=2.4.0",
       "smbprotocol>=1.5.0"
     ],
-    "cross-providers-deps": [],
+    "cross-providers-deps": [
+      "google"
+    ],
     "excluded-python-versions": []
   },
   "segment": {
diff --git a/airflow/providers/samba/provider.yaml 
b/tests/providers/samba/transfers/__init__.py
similarity index 52%
copy from airflow/providers/samba/provider.yaml
copy to tests/providers/samba/transfers/__init__.py
index ff311bce35..217e5db960 100644
--- a/airflow/providers/samba/provider.yaml
+++ b/tests/providers/samba/transfers/__init__.py
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,45 +15,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
----
-package-name: apache-airflow-providers-samba
-name: Samba
-description: |
-    `Samba <https://www.samba.org/>`__
-
-suspended: false
-versions:
-  - 4.2.2
-  - 4.2.1
-  - 4.2.0
-  - 4.1.0
-  - 4.0.0
-  - 3.0.4
-  - 3.0.3
-  - 3.0.2
-  - 3.0.1
-  - 3.0.0
-  - 2.0.0
-  - 1.0.1
-  - 1.0.0
-
-dependencies:
-  - apache-airflow>=2.4.0
-  - smbprotocol>=1.5.0
-
-integrations:
-  - integration-name: Samba
-    external-doc-url: https://www.samba.org/
-    logo: /integration-logos/samba/Samba.png
-    tags: [protocol]
-
-hooks:
-  - integration-name: Samba
-    python-modules:
-      - airflow.providers.samba.hooks.samba
-
-
-connection-types:
-  - hook-class-name: airflow.providers.samba.hooks.samba.SambaHook
-    connection-type: samba
diff --git a/tests/providers/samba/transfers/test_gcs_to_samba.py 
b/tests/providers/samba/transfers/test_gcs_to_samba.py
new file mode 100644
index 0000000000..100fde5f7d
--- /dev/null
+++ b/tests/providers/samba/transfers/test_gcs_to_samba.py
@@ -0,0 +1,314 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import os
+from unittest import mock
+
+import pytest
+
+from airflow.exceptions import AirflowException
+from airflow.providers.samba.transfers.gcs_to_samba import GCSToSambaOperator
+
+TASK_ID = "test-gcs-to-samba-operator"
+GCP_CONN_ID = "GCP_CONN_ID"
+SAMBA_CONN_ID = "SAMBA_CONN_ID"
+IMPERSONATION_CHAIN = ["ACCOUNT_1", "ACCOUNT_2", "ACCOUNT_3"]
+TEST_BUCKET = "test-bucket"
+DESTINATION_SMB = "destination_path"
+
+
+class TestGoogleCloudStorageToSambaOperator:
+    @pytest.mark.parametrize(
+        "source_object, target_object, keep_directory_structure",
+        [
+            ("folder/test_object.txt", "folder/test_object.txt", True),
+            ("folder/subfolder/test_object.txt", 
"folder/subfolder/test_object.txt", True),
+            ("folder/test_object.txt", "test_object.txt", False),
+            ("folder/subfolder/test_object.txt", "test_object.txt", False),
+        ],
+    )
+    @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.GCSHook")
+    @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.SambaHook")
+    def test_execute_copy_single_file(
+        self, samba_hook_mock, gcs_hook_mock, source_object, target_object, 
keep_directory_structure
+    ):
+        operator = GCSToSambaOperator(
+            task_id=TASK_ID,
+            source_bucket=TEST_BUCKET,
+            source_object=source_object,
+            destination_path=DESTINATION_SMB,
+            keep_directory_structure=keep_directory_structure,
+            move_object=False,
+            gcp_conn_id=GCP_CONN_ID,
+            samba_conn_id=SAMBA_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        operator.execute({})
+        gcs_hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        samba_hook_mock.assert_called_once_with(samba_conn_id=SAMBA_CONN_ID)
+        gcs_hook_mock.return_value.download.assert_called_with(
+            bucket_name=TEST_BUCKET, object_name=source_object, 
filename=mock.ANY
+        )
+        samba_hook_mock.return_value.push_from_local.assert_called_with(
+            os.path.join(DESTINATION_SMB, target_object), mock.ANY
+        )
+        gcs_hook_mock.return_value.delete.assert_not_called()
+
+    @pytest.mark.parametrize(
+        "source_object, target_object, keep_directory_structure",
+        [
+            ("folder/test_object.txt", "folder/test_object.txt", True),
+            ("folder/subfolder/test_object.txt", 
"folder/subfolder/test_object.txt", True),
+            ("folder/test_object.txt", "test_object.txt", False),
+            ("folder/subfolder/test_object.txt", "test_object.txt", False),
+        ],
+    )
+    @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.GCSHook")
+    @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.SambaHook")
+    def test_execute_move_single_file(
+        self,
+        samba_hook_mock,
+        gcs_hook_mock,
+        source_object,
+        target_object,
+        keep_directory_structure,
+    ):
+        operator = GCSToSambaOperator(
+            task_id=TASK_ID,
+            source_bucket=TEST_BUCKET,
+            source_object=source_object,
+            destination_path=DESTINATION_SMB,
+            keep_directory_structure=keep_directory_structure,
+            move_object=True,
+            gcp_conn_id=GCP_CONN_ID,
+            samba_conn_id=SAMBA_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        operator.execute(None)
+        gcs_hook_mock.assert_called_once_with(
+            gcp_conn_id=GCP_CONN_ID,
+            impersonation_chain=IMPERSONATION_CHAIN,
+        )
+        samba_hook_mock.assert_called_once_with(samba_conn_id=SAMBA_CONN_ID)
+        gcs_hook_mock.return_value.download.assert_called_with(
+            bucket_name=TEST_BUCKET, object_name=source_object, 
filename=mock.ANY
+        )
+        samba_hook_mock.return_value.push_from_local.assert_called_with(
+            os.path.join(DESTINATION_SMB, target_object), mock.ANY
+        )
+        gcs_hook_mock.return_value.delete.assert_called_once_with(TEST_BUCKET, 
source_object)
+
+    @pytest.mark.parametrize(
+        "source_object, prefix, delimiter, gcs_files_list, target_objects, 
keep_directory_structure",
+        [
+            (
+                "folder/test_object*.txt",
+                "folder/test_object",
+                ".txt",
+                [
+                    "folder/test_object/file1.txt",
+                    "folder/test_object/file2.txt",
+                ],
+                ["test_object/file1.txt", "test_object/file2.txt"],
+                False,
+            ),
+            (
+                "folder/test_object/*",
+                "folder/test_object/",
+                "",
+                [
+                    "folder/test_object/file1.txt",
+                    "folder/test_object/file2.txt",
+                ],
+                ["file1.txt", "file2.txt"],
+                False,
+            ),
+            (
+                "folder/test_object*.txt",
+                "folder/test_object",
+                ".txt",
+                [
+                    "folder/test_object/file1.txt",
+                    "folder/test_object/file2.txt",
+                ],
+                ["folder/test_object/file1.txt", 
"folder/test_object/file2.txt"],
+                True,
+            ),
+            (
+                "folder/test_object/*",
+                "folder/test_object/",
+                "",
+                [
+                    "folder/test_object/file1.txt",
+                    "folder/test_object/file2.txt",
+                ],
+                ["folder/test_object/file1.txt", 
"folder/test_object/file2.txt"],
+                True,
+            ),
+        ],
+    )
+    @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.GCSHook")
+    @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.SambaHook")
+    def test_execute_copy_with_wildcard(
+        self,
+        samba_hook_mock,
+        gcs_hook_mock,
+        source_object,
+        prefix,
+        delimiter,
+        gcs_files_list,
+        target_objects,
+        keep_directory_structure,
+    ):
+        gcs_hook_mock.return_value.list.return_value = gcs_files_list
+        operator = GCSToSambaOperator(
+            task_id=TASK_ID,
+            source_bucket=TEST_BUCKET,
+            source_object=source_object,
+            destination_path=DESTINATION_SMB,
+            keep_directory_structure=keep_directory_structure,
+            move_object=False,
+            gcp_conn_id=GCP_CONN_ID,
+            samba_conn_id=SAMBA_CONN_ID,
+        )
+        operator.execute(None)
+        gcs_hook_mock.return_value.list.assert_called_with(TEST_BUCKET, 
delimiter=delimiter, prefix=prefix)
+        gcs_hook_mock.return_value.download.assert_has_calls(
+            [
+                mock.call(bucket_name=TEST_BUCKET, object_name=gcs_file, 
filename=mock.ANY)
+                for gcs_file in gcs_files_list
+            ]
+        )
+        samba_hook_mock.return_value.push_from_local.assert_has_calls(
+            [
+                mock.call(os.path.join(DESTINATION_SMB, target_object), 
mock.ANY)
+                for target_object in target_objects
+            ]
+        )
+        gcs_hook_mock.return_value.delete.assert_not_called()
+
+    @pytest.mark.parametrize(
+        "source_object, prefix, delimiter, gcs_files_list, target_objects, 
keep_directory_structure",
+        [
+            (
+                "folder/test_object*.txt",
+                "folder/test_object",
+                ".txt",
+                [
+                    "folder/test_object/file1.txt",
+                    "folder/test_object/file2.txt",
+                ],
+                ["test_object/file1.txt", "test_object/file2.txt"],
+                False,
+            ),
+            (
+                "folder/test_object/*",
+                "folder/test_object/",
+                "",
+                [
+                    "folder/test_object/file1.txt",
+                    "folder/test_object/file2.txt",
+                ],
+                ["file1.txt", "file2.txt"],
+                False,
+            ),
+            (
+                "folder/test_object*.txt",
+                "folder/test_object",
+                ".txt",
+                [
+                    "folder/test_object/file1.txt",
+                    "folder/test_object/file2.txt",
+                ],
+                ["folder/test_object/file1.txt", 
"folder/test_object/file2.txt"],
+                True,
+            ),
+            (
+                "folder/test_object/*",
+                "folder/test_object/",
+                "",
+                [
+                    "folder/test_object/file1.txt",
+                    "folder/test_object/file2.txt",
+                ],
+                ["folder/test_object/file1.txt", 
"folder/test_object/file2.txt"],
+                True,
+            ),
+        ],
+    )
+    @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.GCSHook")
+    @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.SambaHook")
+    def test_execute_move_with_wildcard(
+        self,
+        samba_hook_mock,
+        gcs_hook_mock,
+        source_object,
+        prefix,
+        delimiter,
+        gcs_files_list,
+        target_objects,
+        keep_directory_structure,
+    ):
+        gcs_hook_mock.return_value.list.return_value = gcs_files_list
+        operator = GCSToSambaOperator(
+            task_id=TASK_ID,
+            source_bucket=TEST_BUCKET,
+            source_object=source_object,
+            destination_path=DESTINATION_SMB,
+            keep_directory_structure=keep_directory_structure,
+            move_object=True,
+            gcp_conn_id=GCP_CONN_ID,
+            samba_conn_id=SAMBA_CONN_ID,
+        )
+        operator.execute(None)
+        gcs_hook_mock.return_value.list.assert_called_with(TEST_BUCKET, 
delimiter=delimiter, prefix=prefix)
+        gcs_hook_mock.return_value.download.assert_has_calls(
+            [
+                mock.call(bucket_name=TEST_BUCKET, object_name=gcs_file, 
filename=mock.ANY)
+                for gcs_file in gcs_files_list
+            ]
+        )
+        samba_hook_mock.return_value.push_from_local.assert_has_calls(
+            [
+                mock.call(os.path.join(DESTINATION_SMB, target_object), 
mock.ANY)
+                for target_object in target_objects
+            ]
+        )
+        gcs_hook_mock.return_value.delete.assert_has_calls(
+            [mock.call(TEST_BUCKET, gcs_file) for gcs_file in gcs_files_list]
+        )
+
+    @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.GCSHook")
+    @mock.patch("airflow.providers.samba.transfers.gcs_to_samba.SambaHook")
+    def test_execute_more_than_one_wildcard_exception(self, samba_hook_mock, 
gcs_hook_mock):
+        operator = GCSToSambaOperator(
+            task_id=TASK_ID,
+            source_bucket=TEST_BUCKET,
+            source_object="csv/*/test_*.csv",
+            destination_path=DESTINATION_SMB,
+            move_object=False,
+            gcp_conn_id=GCP_CONN_ID,
+            samba_conn_id=SAMBA_CONN_ID,
+        )
+        with pytest.raises(AirflowException):
+            operator.execute(None)
diff --git a/airflow/providers/samba/provider.yaml 
b/tests/system/providers/samba/__init__.py
similarity index 52%
copy from airflow/providers/samba/provider.yaml
copy to tests/system/providers/samba/__init__.py
index ff311bce35..13a83393a9 100644
--- a/airflow/providers/samba/provider.yaml
+++ b/tests/system/providers/samba/__init__.py
@@ -14,45 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
----
-package-name: apache-airflow-providers-samba
-name: Samba
-description: |
-    `Samba <https://www.samba.org/>`__
-
-suspended: false
-versions:
-  - 4.2.2
-  - 4.2.1
-  - 4.2.0
-  - 4.1.0
-  - 4.0.0
-  - 3.0.4
-  - 3.0.3
-  - 3.0.2
-  - 3.0.1
-  - 3.0.0
-  - 2.0.0
-  - 1.0.1
-  - 1.0.0
-
-dependencies:
-  - apache-airflow>=2.4.0
-  - smbprotocol>=1.5.0
-
-integrations:
-  - integration-name: Samba
-    external-doc-url: https://www.samba.org/
-    logo: /integration-logos/samba/Samba.png
-    tags: [protocol]
-
-hooks:
-  - integration-name: Samba
-    python-modules:
-      - airflow.providers.samba.hooks.samba
-
-
-connection-types:
-  - hook-class-name: airflow.providers.samba.hooks.samba.SambaHook
-    connection-type: samba
diff --git a/tests/system/providers/samba/example_gcs_to_samba.py 
b/tests/system/providers/samba/example_gcs_to_samba.py
new file mode 100644
index 0000000000..23df501f4c
--- /dev/null
+++ b/tests/system/providers/samba/example_gcs_to_samba.py
@@ -0,0 +1,148 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow DAG for Google Cloud Storage to Samba transfer operators.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+from pathlib import Path
+
+from airflow import models
+from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.transfers.local_to_gcs import 
LocalFilesystemToGCSOperator
+from airflow.providers.samba.transfers.gcs_to_samba import GCSToSambaOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
+DAG_ID = "gcs_to_smb"
+
+SMB_CONN = "samba_default"
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+DESTINATION_PATH_1 = "tmp/single-file/"
+DESTINATION_PATH_2 = "tmp/dest-dir-1/"
+DESTINATION_PATH_3 = "tmp/dest-dir-2/"
+FILE_NAME = GCS_SRC_FILE = "empty.txt"
+UPLOAD_SRC = str(Path(__file__).parent / "resources" / FILE_NAME)
+GCS_SRC_FILE_IN_DIR = f"dir-1/{FILE_NAME}"
+GCS_SRC_DIR = "dir-2/*.txt"
+UPLOAD_IN_DIR_DST = f"dir-2/{FILE_NAME}"
+
+
+with models.DAG(
+    DAG_ID,
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "gcs", "smb"],
+) as dag:
+    create_bucket = GCSCreateBucketOperator(
+        task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
+    )
+
+    upload_file_1 = LocalFilesystemToGCSOperator(
+        task_id="upload_file_1",
+        src=UPLOAD_SRC,
+        dst=FILE_NAME,
+        bucket=BUCKET_NAME,
+    )
+    upload_file_2 = LocalFilesystemToGCSOperator(
+        task_id="upload_file_2",
+        src=UPLOAD_SRC,
+        dst=GCS_SRC_FILE_IN_DIR,
+        bucket=BUCKET_NAME,
+    )
+    upload_file_3 = LocalFilesystemToGCSOperator(
+        task_id="upload_file_3",
+        src=UPLOAD_SRC,
+        dst=UPLOAD_IN_DIR_DST,
+        bucket=BUCKET_NAME,
+    )
+
+    # [START howto_operator_gcs_to_samba_copy_single_file]
+    copy_file_from_gcs_to_samba = GCSToSambaOperator(
+        task_id="file-copy-gcs-to-samba",
+        samba_conn_id=SMB_CONN,
+        source_bucket=BUCKET_NAME,
+        source_object=GCS_SRC_FILE,
+        destination_path=DESTINATION_PATH_1,
+    )
+    # [END howto_operator_gcs_to_samba_copy_single_file]
+
+    # [START howto_operator_gcs_to_samba_move_single_file_destination]
+    move_file_from_gcs_to_samba = GCSToSambaOperator(
+        task_id="file-move-gcs-to-samba",
+        samba_conn_id=SMB_CONN,
+        source_bucket=BUCKET_NAME,
+        source_object=GCS_SRC_FILE_IN_DIR,
+        destination_path=DESTINATION_PATH_1,
+        move_object=True,
+    )
+    # [END howto_operator_gcs_to_samba_move_single_file_destination]
+
+    # [START howto_operator_gcs_to_samba_copy_directory]
+    copy_dir_from_gcs_to_samba = GCSToSambaOperator(
+        task_id="dir-copy-gcs-to-samba",
+        samba_conn_id=SMB_CONN,
+        source_bucket=BUCKET_NAME,
+        source_object=GCS_SRC_DIR,
+        destination_path=DESTINATION_PATH_2,
+    )
+    # [END howto_operator_gcs_to_samba_copy_directory]
+
+    # [START howto_operator_gcs_to_samba_move_specific_files]
+    move_dir_from_gcs_to_samba = GCSToSambaOperator(
+        task_id="dir-move-gcs-to-samba",
+        samba_conn_id=SMB_CONN,
+        source_bucket=BUCKET_NAME,
+        source_object=GCS_SRC_DIR,
+        destination_path=DESTINATION_PATH_3,
+        keep_directory_structure=False,
+    )
+    # [END howto_operator_gcs_to_samba_move_specific_files]
+
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        >> (upload_file_1, upload_file_2, upload_file_3)
+        # TEST BODY
+        >> copy_file_from_gcs_to_samba
+        >> move_file_from_gcs_to_samba
+        >> copy_dir_from_gcs_to_samba
+        >> move_dir_from_gcs_to_samba
+        # TEST TEARDOWN
+        >> delete_bucket
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)


Reply via email to