This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a4f2b33e0b7 Fix Google Cloud Data Fusion hook to handle pipeline start 
errors properly (#58698)
a4f2b33e0b7 is described below

commit a4f2b33e0b7bd926769242e514a78d06ad29af39
Author: Ashir Alam <[email protected]>
AuthorDate: Sat Dec 20 23:31:04 2025 +0530

    Fix Google Cloud Data Fusion hook to handle pipeline start errors properly 
(#58698)
    
    * Fix Google Cloud Data Fusion hook to handle pipeline start errors properly
    
    The start_pipeline method was using the multi-program start endpoint which
    returns HTTP 200 even when individual programs fail to start. This caused
    a KeyError when trying to access the runId from error responses.
    
    Changes:
    - Updated start_pipeline to use single-program start endpoint
    - Added validation to check if runId exists in response before accessing it
    - Improved error messages to provide context about failures
    - Updated tests to reflect new endpoint and added test for missing runId 
scenario
    
    Fixes #50387
    
    * Add spec to MagicMock for better static type checking
---
 .../providers/google/cloud/hooks/datafusion.py     | 36 ++++-----
 .../unit/google/cloud/hooks/test_datafusion.py     | 88 ++++++++++++----------
 2 files changed, 66 insertions(+), 58 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py
index 73183ca2236..063f7bbc2e3 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py
@@ -469,31 +469,33 @@ class DataFusionHook(GoogleBaseHook):
             is always default. If your pipeline belongs to an Enterprise 
edition instance, you
             can create a namespace.
         """
-        # TODO: This API endpoint starts multiple pipelines. There will 
eventually be a fix
-        #  return the run Id as part of the API request to run a single 
pipeline.
-        #  https://github.com/apache/airflow/pull/8954#discussion_r438223116
+        # Use the single-program start endpoint for better error handling
+        # 
https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-a-Program
+        program_type = self.cdap_program_type(pipeline_type=pipeline_type)
+        program_id = self.cdap_program_id(pipeline_type=pipeline_type)
         url = os.path.join(
-            instance_url,
-            "v3",
-            "namespaces",
-            quote(namespace),
+            self._base_url(instance_url, namespace),
+            quote(pipeline_name),
+            f"{program_type}s",
+            program_id,
             "start",
         )
         runtime_args = runtime_args or {}
-        body = [
-            {
-                "appId": pipeline_name,
-                "runtimeargs": runtime_args,
-                "programType": 
self.cdap_program_type(pipeline_type=pipeline_type),
-                "programId": self.cdap_program_id(pipeline_type=pipeline_type),
-            }
-        ]
-        response = self._cdap_request(url=url, method="POST", body=body)
+        response = self._cdap_request(url=url, method="POST", 
body=runtime_args)
         self._check_response_status_and_data(
             response, f"Starting a pipeline failed with code {response.status}"
         )
         response_json = json.loads(response.data)
-        return response_json[0]["runId"]
+
+        # Extract and validate runId from response
+        if "runId" not in response_json:
+            error_message = response_json.get("error", "Unknown error")
+            raise AirflowException(
+                f"Failed to start pipeline '{pipeline_name}'. "
+                f"The response does not contain a runId. Error: 
{error_message}"
+            )
+
+        return str(response_json["runId"])
 
     def stop_pipeline(self, pipeline_name: str, instance_url: str, namespace: 
str = "default") -> None:
         """
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py 
b/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py
index d98a64fc17d..49479446cde 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py
@@ -21,6 +21,7 @@ import logging
 from unittest import mock
 
 import aiohttp
+import google.auth.transport
 import pytest
 from aiohttp.helpers import TimerNoop
 from yarl import URL
@@ -340,42 +341,38 @@ class TestDataFusionHook:
     @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
     def test_start_pipeline(self, mock_request, hook):
         run_id = 1234
-        mock_request.return_value = mock.MagicMock(status=200, 
data=f'[{{"runId":{run_id}}}]')
+        mock_request.return_value = mock.MagicMock(
+            spec=google.auth.transport.Response, status=200, 
data=f'{{"runId":{run_id}}}'
+        )
 
-        hook.start_pipeline(pipeline_name=PIPELINE_NAME, 
instance_url=INSTANCE_URL, runtime_args=RUNTIME_ARGS)
-        body = [
-            {
-                "appId": PIPELINE_NAME,
-                "programType": "workflow",
-                "programId": "DataPipelineWorkflow",
-                "runtimeargs": RUNTIME_ARGS,
-            }
-        ]
+        result = hook.start_pipeline(
+            pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, 
runtime_args=RUNTIME_ARGS
+        )
+        assert result == str(run_id)
         mock_request.assert_called_once_with(
-            url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST", 
body=body
+            
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/workflows/DataPipelineWorkflow/start",
+            method="POST",
+            body=RUNTIME_ARGS,
         )
 
     @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
     def test_start_pipeline_stream(self, mock_request, hook):
-        run_id = 1234
-        mock_request.return_value = mock.MagicMock(status=200, 
data=f'[{{"runId":{run_id}}}]')
+        run_id = "test-run-123"
+        mock_request.return_value = mock.MagicMock(
+            spec=google.auth.transport.Response, status=200, 
data=f'{{"runId":"{run_id}"}}'
+        )
 
-        hook.start_pipeline(
+        result = hook.start_pipeline(
             pipeline_name=PIPELINE_NAME,
             instance_url=INSTANCE_URL,
             runtime_args=RUNTIME_ARGS,
             pipeline_type=DataFusionPipelineType.STREAM,
         )
-        body = [
-            {
-                "appId": PIPELINE_NAME,
-                "programType": "spark",
-                "programId": "DataStreamsSparkStreaming",
-                "runtimeargs": RUNTIME_ARGS,
-            }
-        ]
+        assert result == run_id
         mock_request.assert_called_once_with(
-            url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST", 
body=body
+            
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/sparks/DataStreamsSparkStreaming/start",
+            method="POST",
+            body=RUNTIME_ARGS,
         )
 
     @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
@@ -390,16 +387,10 @@ class TestDataFusionHook:
             hook.start_pipeline(
                 pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, 
runtime_args=RUNTIME_ARGS
             )
-        body = [
-            {
-                "appId": PIPELINE_NAME,
-                "programType": "workflow",
-                "programId": "DataPipelineWorkflow",
-                "runtimeargs": RUNTIME_ARGS,
-            }
-        ]
         mock_request.assert_called_once_with(
-            url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST", 
body=body
+            
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/workflows/DataPipelineWorkflow/start",
+            method="POST",
+            body=RUNTIME_ARGS,
         )
 
     @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
@@ -409,16 +400,31 @@ class TestDataFusionHook:
             hook.start_pipeline(
                 pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, 
runtime_args=RUNTIME_ARGS
             )
-        body = [
-            {
-                "appId": PIPELINE_NAME,
-                "programType": "workflow",
-                "programId": "DataPipelineWorkflow",
-                "runtimeargs": RUNTIME_ARGS,
-            }
-        ]
         mock_request.assert_called_once_with(
-            url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST", 
body=body
+            
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/workflows/DataPipelineWorkflow/start",
+            method="POST",
+            body=RUNTIME_ARGS,
+        )
+
+    @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
+    def test_start_pipeline_should_fail_if_no_run_id(self, mock_request, hook):
+        """Test that start_pipeline fails gracefully when response doesn't 
contain runId."""
+        error_response = '{"error": "Invalid runtime arguments"}'
+        mock_request.return_value = mock.MagicMock(
+            spec=google.auth.transport.Response, status=200, 
data=error_response
+        )
+        with pytest.raises(
+            AirflowException,
+            match=r"Failed to start pipeline 'shrubberyPipeline'. "
+            r"The response does not contain a runId. Error: Invalid runtime 
arguments",
+        ):
+            hook.start_pipeline(
+                pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, 
runtime_args=RUNTIME_ARGS
+            )
+        mock_request.assert_called_once_with(
+            
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/workflows/DataPipelineWorkflow/start",
+            method="POST",
+            body=RUNTIME_ARGS,
         )
 
     @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))

Reply via email to