This is an automated email from the ASF dual-hosted git repository.

shahar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 79674385da2 Create tests for Google Cloud Storage Transfer (#58476)
79674385da2 is described below

commit 79674385da2e29bcfaa39227c4c2cb90c92b7bd1
Author: ZZKangKZZ <[email protected]>
AuthorDate: Wed Jan 21 15:49:30 2026 +0800

    Create tests for Google Cloud Storage Transfer (#58476)
---
 .../tests/unit/always/test_project_structure.py    |   1 -
 .../cloud/links/test_cloud_storage_transfer.py     | 464 +++++++++++++++++++++
 2 files changed, 464 insertions(+), 1 deletion(-)

diff --git a/airflow-core/tests/unit/always/test_project_structure.py 
b/airflow-core/tests/unit/always/test_project_structure.py
index 3126224a420..1fab1d8e498 100644
--- a/airflow-core/tests/unit/always/test_project_structure.py
+++ b/airflow-core/tests/unit/always/test_project_structure.py
@@ -143,7 +143,6 @@ class TestProjectStructure:
             
"providers/google/tests/unit/google/cloud/links/test_cloud_functions.py",
             
"providers/google/tests/unit/google/cloud/links/test_cloud_memorystore.py",
             "providers/google/tests/unit/google/cloud/links/test_cloud_sql.py",
-            
"providers/google/tests/unit/google/cloud/links/test_cloud_storage_transfer.py",
             
"providers/google/tests/unit/google/cloud/links/test_cloud_tasks.py",
             "providers/google/tests/unit/google/cloud/links/test_compute.py",
             
"providers/google/tests/unit/google/cloud/links/test_data_loss_prevention.py",
diff --git 
a/providers/google/tests/unit/google/cloud/links/test_cloud_storage_transfer.py 
b/providers/google/tests/unit/google/cloud/links/test_cloud_storage_transfer.py
new file mode 100644
index 00000000000..da7925b9303
--- /dev/null
+++ 
b/providers/google/tests/unit/google/cloud/links/test_cloud_storage_transfer.py
@@ -0,0 +1,464 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Real tests for Google Cloud Storage Transfer Service links."""
+
+from __future__ import annotations
+
+import pytest
+
+from airflow.providers.google.cloud.links.cloud_storage_transfer import (
+    CloudStorageTransferDetailsLink,
+    CloudStorageTransferJobLink,
+    CloudStorageTransferLinkHelper,
+    CloudStorageTransferListLink,
+)
+
+REAL_PROJECT_ID = "my-gcp-project-123456"
+REAL_TRANSFER_JOB = "transferJobs-1234567890123456789"
+REAL_TRANSFER_OPERATION = "transferOperations/9876543210987654321"
+REAL_OPERATION_NAME = f"{REAL_TRANSFER_OPERATION}-{REAL_TRANSFER_JOB}"
+
+EXPECTED_LIST_URL = 
f"https://console.cloud.google.com/transfer/jobs?project={REAL_PROJECT_ID}";
+EXPECTED_JOB_URL = 
f"https://console.cloud.google.com/transfer/jobs/transferJobs%2F{REAL_TRANSFER_JOB}/runs?project={REAL_PROJECT_ID}";
+EXPECTED_DETAILS_URL = (
+    
f"https://console.cloud.google.com/transfer/jobs/transferJobs%2FtransferJobs";
+    
f"/runs/transferOperations%2F9876543210987654321-transferJobs-1234567890123456789?project={REAL_PROJECT_ID}"
+)
+
+
+class TestCloudStorageTransferLinkHelper:
+    """Test the CloudStorageTransferLinkHelper with real operation names."""
+
+    def test_extract_parts_with_real_operation_name(self):
+        """Test extract_parts with a real Google Cloud Storage Transfer 
operation name."""
+        transfer_operation, transfer_job = 
CloudStorageTransferLinkHelper.extract_parts(REAL_OPERATION_NAME)
+
+        assert transfer_operation == 
"9876543210987654321-transferJobs-1234567890123456789"
+        assert transfer_job == "transferJobs"
+
+    def test_extract_parts_with_various_real_formats(self):
+        """Test extract_parts with various real operation name formats."""
+        test_cases = [
+            ("transferOperations/12345-transferJobs-67890", 
"12345-transferJobs-67890", "transferJobs"),
+            ("transferOperations/op123-transferJobs-job456", 
"op123-transferJobs-job456", "transferJobs"),
+            (
+                
"transferOperations/99999999999999999999-transferJobs-11111111111111111111",
+                "99999999999999999999-transferJobs-11111111111111111111",
+                "transferJobs",
+            ),
+        ]
+
+        for operation_name, expected_operation, expected_job in test_cases:
+            transfer_operation, transfer_job = 
CloudStorageTransferLinkHelper.extract_parts(operation_name)
+            assert transfer_operation == expected_operation
+            assert transfer_job == expected_job
+
+    def test_extract_parts_with_none_operation_name(self):
+        """Test extract_parts with None operation name."""
+        transfer_operation, transfer_job = 
CloudStorageTransferLinkHelper.extract_parts(None)
+
+        assert transfer_operation == ""
+        assert transfer_job == ""
+
+    def test_extract_parts_with_empty_operation_name(self):
+        """Test extract_parts with empty operation name."""
+        transfer_operation, transfer_job = 
CloudStorageTransferLinkHelper.extract_parts("")
+
+        assert transfer_operation == ""
+        assert transfer_job == ""
+
+    def test_extract_parts_with_malformed_operation_names(self):
+        """Test extract_parts with malformed operation names that might occur 
in real scenarios."""
+        test_cases = [
+            ("invalid-format", IndexError),
+            ("transferOperations/", IndexError),
+            ("transferOperations/123", IndexError),
+            ("transferOperations/123-", ("123-", "")),
+            ("-transferJobs-job456", IndexError),
+            ("transferOperations/123-transferJobs", ("123-transferJobs", 
"transferJobs")),
+        ]
+
+        for operation_name, expected in test_cases:
+            if expected is IndexError:
+                with pytest.raises(IndexError):
+                    
CloudStorageTransferLinkHelper.extract_parts(operation_name)
+            else:
+                transfer_operation, transfer_job = 
CloudStorageTransferLinkHelper.extract_parts(
+                    operation_name
+                )
+                assert transfer_operation == expected[0]
+                assert transfer_job == expected[1]
+
+
+class TestCloudStorageTransferListLink:
+    """Test the CloudStorageTransferListLink with real scenarios."""
+
+    def test_link_properties(self):
+        """Test that link properties are set correctly for real usage."""
+        link = CloudStorageTransferListLink()
+
+        assert link.name == "Cloud Storage Transfer"
+        assert link.key == "cloud_storage_transfer"
+        assert "{project_id}" in link.format_str
+
+    def test_format_str_with_real_project_id(self):
+        """Test format_str generates correct URL for real project ID."""
+        link = CloudStorageTransferListLink()
+        formatted_url = link.format_str.format(project_id=REAL_PROJECT_ID)
+
+        assert formatted_url == EXPECTED_LIST_URL
+        assert 
formatted_url.startswith("https://console.cloud.google.com/transfer/jobs?project=";)
+        assert REAL_PROJECT_ID in formatted_url
+
+    def test_format_str_with_various_project_ids(self):
+        """Test format_str with various real project ID formats."""
+        project_ids = [
+            "my-project",
+            "project-123456",
+            "my-gcp-project-123",
+            "a" * 30,
+        ]
+
+        link = CloudStorageTransferListLink()
+        for project_id in project_ids:
+            formatted_url = link.format_str.format(project_id=project_id)
+            assert project_id in formatted_url
+            assert 
formatted_url.startswith("https://console.cloud.google.com/transfer/jobs?";)
+
+
+class TestCloudStorageTransferJobLink:
+    """Test the CloudStorageTransferJobLink with real scenarios."""
+
+    def test_link_properties(self):
+        """Test that link properties are set correctly for real usage."""
+        link = CloudStorageTransferJobLink()
+
+        assert link.name == "Cloud Storage Transfer Job"
+        assert link.key == "cloud_storage_transfer_job"
+        assert "{project_id}" in link.format_str
+        assert "{transfer_job}" in link.format_str
+
+    def test_format_str_with_real_parameters(self):
+        """Test format_str generates correct URL for real transfer job."""
+        link = CloudStorageTransferJobLink()
+        formatted_url = link.format_str.format(project_id=REAL_PROJECT_ID, 
transfer_job=REAL_TRANSFER_JOB)
+
+        assert formatted_url == EXPECTED_JOB_URL
+        assert 
formatted_url.startswith("https://console.cloud.google.com/transfer/jobs/transferJobs%2F";)
+        assert REAL_PROJECT_ID in formatted_url
+        assert REAL_TRANSFER_JOB in formatted_url
+
+    def test_format_str_url_encoding(self):
+        """Test that transfer job ID is properly URL encoded."""
+        link = CloudStorageTransferJobLink()
+        formatted_url = link.format_str.format(project_id=REAL_PROJECT_ID, 
transfer_job=REAL_TRANSFER_JOB)
+
+        assert "transferJobs%2F" in formatted_url
+        assert "transferJobs/" not in formatted_url
+
+
+class TestCloudStorageTransferDetailsLink:
+    """Test the CloudStorageTransferDetailsLink with real scenarios."""
+
+    def test_link_properties(self):
+        """Test that link properties are set correctly for real usage."""
+        link = CloudStorageTransferDetailsLink()
+
+        assert link.name == "Cloud Storage Transfer Details"
+        assert link.key == "cloud_storage_transfer_details"
+        assert "{project_id}" in link.format_str
+        assert "{transfer_job}" in link.format_str
+        assert "{transfer_operation}" in link.format_str
+
+    def test_format_str_with_real_parameters(self):
+        """Test format_str generates correct URL for real transfer 
operation."""
+        link = CloudStorageTransferDetailsLink()
+
+        formatted_url = link.format_str.format(
+            project_id=REAL_PROJECT_ID,
+            transfer_job="transferJobs",
+            
transfer_operation="9876543210987654321-transferJobs-1234567890123456789",
+        )
+
+        assert formatted_url == EXPECTED_DETAILS_URL
+        assert 
formatted_url.startswith("https://console.cloud.google.com/transfer/jobs/transferJobs%2F";)
+        assert REAL_PROJECT_ID in formatted_url
+        assert "transferJobs" in formatted_url
+        assert "9876543210987654321-transferJobs-1234567890123456789" in 
formatted_url
+
+    def test_extract_parts_with_real_operation_name(self):
+        """Test extract_parts with real operation name."""
+        transfer_operation, transfer_job = 
CloudStorageTransferDetailsLink.extract_parts(REAL_OPERATION_NAME)
+
+        assert transfer_operation == 
"9876543210987654321-transferJobs-1234567890123456789"
+        assert transfer_job == "transferJobs"
+
+    def test_extract_parts_edge_cases(self):
+        """Test extract_parts with edge cases that might occur in 
production."""
+        edge_cases = [
+            ("transferOperations/1-transferJobs-2", "1-transferJobs-2", 
"transferJobs"),
+            ("transferOperations/abc-transferJobs-def", 
"abc-transferJobs-def", "transferJobs"),
+            (
+                "transferOperations/123_456-transferJobs-789_012",
+                "123_456-transferJobs-789_012",
+                "transferJobs",
+            ),
+        ]
+
+        for operation_name, expected_operation, expected_job in edge_cases:
+            transfer_operation, transfer_job = 
CloudStorageTransferDetailsLink.extract_parts(operation_name)
+            assert transfer_operation == expected_operation
+            assert transfer_job == expected_job
+
+    def test_persist_method_with_real_operation_name(self):
+        """Test persist method behavior with real operation name (unit 
test)."""
+        link = CloudStorageTransferDetailsLink()
+
+        transfer_operation, transfer_job = 
CloudStorageTransferDetailsLink.extract_parts(REAL_OPERATION_NAME)
+
+        assert transfer_operation == 
"9876543210987654321-transferJobs-1234567890123456789"
+        assert transfer_job == "transferJobs"
+
+        formatted_url = link.format_str.format(
+            project_id=REAL_PROJECT_ID,
+            transfer_job=transfer_job,
+            transfer_operation=transfer_operation,
+        )
+
+        assert formatted_url == EXPECTED_DETAILS_URL
+
+    @pytest.mark.db_test
+    def test_persist_with_real_operation_name(
+        self, create_task_instance_of_operator, session, mock_supervisor_comms
+    ):
+        """Test persist method with real operation name."""
+        from airflow.providers.google.cloud.operators.cloud_base import 
GoogleCloudBaseOperator
+
+        class TestOperator(GoogleCloudBaseOperator):
+            operator_extra_links = (CloudStorageTransferDetailsLink(),)
+
+            def __init__(self, project_id: str, **kwargs):
+                super().__init__(**kwargs)
+                self.project_id = project_id
+
+            @property
+            def extra_links_params(self):
+                return {
+                    "project_id": self.project_id,
+                }
+
+            def execute(self, context):
+                pass
+
+        ti = create_task_instance_of_operator(
+            TestOperator,
+            dag_id="test_transfer_dag",
+            task_id="test_transfer_task",
+            project_id=REAL_PROJECT_ID,
+        )
+        session.add(ti)
+        session.commit()
+
+        from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+        if AIRFLOW_V_3_0_PLUS and mock_supervisor_comms:
+            from airflow.sdk.execution_time.comms import XComResult
+
+            transfer_operation, transfer_job = 
CloudStorageTransferDetailsLink.extract_parts(
+                REAL_OPERATION_NAME
+            )
+            mock_supervisor_comms.send.return_value = XComResult(
+                key="cloud_storage_transfer_details",
+                value={
+                    "project_id": REAL_PROJECT_ID,
+                    "transfer_job": transfer_job,
+                    "transfer_operation": transfer_operation,
+                },
+            )
+
+        CloudStorageTransferDetailsLink.persist(
+            context={"ti": ti, "task": ti.task},
+            project_id=REAL_PROJECT_ID,
+            operation_name=REAL_OPERATION_NAME,
+        )
+
+        actual_url = CloudStorageTransferDetailsLink.get_link(
+            CloudStorageTransferDetailsLink(), operator=ti.task, ti_key=ti.key
+        )
+        assert actual_url == EXPECTED_DETAILS_URL
+
+    @pytest.mark.db_test
+    def test_persist_with_none_operation_name(
+        self, create_task_instance_of_operator, session, mock_supervisor_comms
+    ):
+        """Test persist method with None operation name."""
+        from airflow.providers.google.cloud.operators.cloud_base import 
GoogleCloudBaseOperator
+
+        class TestOperator(GoogleCloudBaseOperator):
+            operator_extra_links = (CloudStorageTransferDetailsLink(),)
+
+            def __init__(self, project_id: str, **kwargs):
+                super().__init__(**kwargs)
+                self.project_id = project_id
+
+            @property
+            def extra_links_params(self):
+                return {
+                    "project_id": self.project_id,
+                }
+
+            def execute(self, context):
+                pass
+
+        ti = create_task_instance_of_operator(
+            TestOperator,
+            dag_id="test_transfer_dag",
+            task_id="test_transfer_task",
+            project_id=REAL_PROJECT_ID,
+        )
+        session.add(ti)
+        session.commit()
+
+        from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+        if AIRFLOW_V_3_0_PLUS and mock_supervisor_comms:
+            from airflow.sdk.execution_time.comms import XComResult
+
+            transfer_operation, transfer_job = 
CloudStorageTransferDetailsLink.extract_parts(None)
+            mock_supervisor_comms.send.return_value = XComResult(
+                key="cloud_storage_transfer_details",
+                value={
+                    "project_id": REAL_PROJECT_ID,
+                    "transfer_job": transfer_job,
+                    "transfer_operation": transfer_operation,
+                },
+            )
+
+        CloudStorageTransferDetailsLink.persist(
+            context={"ti": ti, "task": ti.task},
+            project_id=REAL_PROJECT_ID,
+            operation_name=None,
+        )
+
+        actual_url = CloudStorageTransferDetailsLink.get_link(
+            CloudStorageTransferDetailsLink(), operator=ti.task, ti_key=ti.key
+        )
+        expected_url = 
f"https://console.cloud.google.com/transfer/jobs/transferJobs%2F/runs/transferOperations%2F?project={REAL_PROJECT_ID}";
+        assert actual_url == expected_url
+
+    @pytest.mark.db_test
+    def test_persist_with_empty_operation_name(
+        self, create_task_instance_of_operator, session, mock_supervisor_comms
+    ):
+        """Test persist method with empty operation name."""
+        from airflow.providers.google.cloud.operators.cloud_base import 
GoogleCloudBaseOperator
+
+        class TestOperator(GoogleCloudBaseOperator):
+            operator_extra_links = (CloudStorageTransferDetailsLink(),)
+
+            def __init__(self, project_id: str, **kwargs):
+                super().__init__(**kwargs)
+                self.project_id = project_id
+
+            @property
+            def extra_links_params(self):
+                return {
+                    "project_id": self.project_id,
+                }
+
+            def execute(self, context):
+                pass
+
+        ti = create_task_instance_of_operator(
+            TestOperator,
+            dag_id="test_transfer_dag",
+            task_id="test_transfer_task",
+            project_id=REAL_PROJECT_ID,
+        )
+        session.add(ti)
+        session.commit()
+
+        from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+        if AIRFLOW_V_3_0_PLUS and mock_supervisor_comms:
+            from airflow.sdk.execution_time.comms import XComResult
+
+            transfer_operation, transfer_job = 
CloudStorageTransferDetailsLink.extract_parts("")
+            mock_supervisor_comms.send.return_value = XComResult(
+                key="cloud_storage_transfer_details",
+                value={
+                    "project_id": REAL_PROJECT_ID,
+                    "transfer_job": transfer_job,
+                    "transfer_operation": transfer_operation,
+                },
+            )
+
+        CloudStorageTransferDetailsLink.persist(
+            context={"ti": ti, "task": ti.task},
+            project_id=REAL_PROJECT_ID,
+            operation_name="",
+        )
+
+        actual_url = CloudStorageTransferDetailsLink.get_link(
+            CloudStorageTransferDetailsLink(), operator=ti.task, ti_key=ti.key
+        )
+        expected_url = 
f"https://console.cloud.google.com/transfer/jobs/transferJobs%2F/runs/transferOperations%2F?project={REAL_PROJECT_ID}";
+        assert actual_url == expected_url
+
+
+class TestIntegrationScenarios:
+    """Integration tests with real-world scenarios."""
+
+    def test_complete_transfer_workflow_links(self):
+        """Test all link types work together in a complete transfer 
workflow."""
+        list_link = CloudStorageTransferListLink()
+        list_url = list_link.format_str.format(project_id=REAL_PROJECT_ID)
+
+        job_link = CloudStorageTransferJobLink()
+        job_url = job_link.format_str.format(project_id=REAL_PROJECT_ID, 
transfer_job=REAL_TRANSFER_JOB)
+
+        details_link = CloudStorageTransferDetailsLink()
+        details_url = details_link.format_str.format(
+            project_id=REAL_PROJECT_ID,
+            transfer_job="transferJobs",
+            
transfer_operation="9876543210987654321-transferJobs-1234567890123456789",
+        )
+
+        assert all(
+            url.startswith("https://console.cloud.google.com/transfer";)
+            for url in [list_url, job_url, details_url]
+        )
+        assert REAL_PROJECT_ID in list_url
+        assert REAL_PROJECT_ID in job_url
+        assert REAL_PROJECT_ID in details_url
+        assert REAL_TRANSFER_JOB in job_url
+        assert "transferJobs" in details_url
+        assert "9876543210987654321-transferJobs-1234567890123456789" in 
details_url
+
+    def test_url_consistency_across_link_types(self):
+        """Test that URLs are consistent across different link types."""
+        base_url = "https://console.cloud.google.com/transfer";
+
+        list_link = CloudStorageTransferListLink()
+        job_link = CloudStorageTransferJobLink()
+        details_link = CloudStorageTransferDetailsLink()
+
+        assert list_link.format_str.startswith(base_url)
+        assert job_link.format_str.startswith(base_url)
+        assert details_link.format_str.startswith(base_url)

Reply via email to