This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f466c77f476 Revert "Fix Google Cloud Data Fusion hook to handle 
pipeline start errors properly (#58698)" (#60701)
f466c77f476 is described below

commit f466c77f476a465c1be5a08f3e81bdb447bd2da8
Author: Shahar Epstein <[email protected]>
AuthorDate: Sat Jan 17 17:58:56 2026 +0200

    Revert "Fix Google Cloud Data Fusion hook to handle pipeline start errors 
properly (#58698)" (#60701)
    
    This reverts commit a4f2b33e0b7bd926769242e514a78d06ad29af39.
---
 .../providers/google/cloud/hooks/datafusion.py     | 36 +++++----
 .../unit/google/cloud/hooks/test_datafusion.py     | 88 ++++++++++------------
 2 files changed, 58 insertions(+), 66 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py
index 063f7bbc2e3..73183ca2236 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py
@@ -469,33 +469,31 @@ class DataFusionHook(GoogleBaseHook):
             is always default. If your pipeline belongs to an Enterprise 
edition instance, you
             can create a namespace.
         """
-        # Use the single-program start endpoint for better error handling
-        # 
https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-a-Program
-        program_type = self.cdap_program_type(pipeline_type=pipeline_type)
-        program_id = self.cdap_program_id(pipeline_type=pipeline_type)
+        # TODO: This API endpoint starts multiple pipelines. There will 
eventually be a fix
+        #  return the run Id as part of the API request to run a single 
pipeline.
+        #  https://github.com/apache/airflow/pull/8954#discussion_r438223116
         url = os.path.join(
-            self._base_url(instance_url, namespace),
-            quote(pipeline_name),
-            f"{program_type}s",
-            program_id,
+            instance_url,
+            "v3",
+            "namespaces",
+            quote(namespace),
             "start",
         )
         runtime_args = runtime_args or {}
-        response = self._cdap_request(url=url, method="POST", 
body=runtime_args)
+        body = [
+            {
+                "appId": pipeline_name,
+                "runtimeargs": runtime_args,
+                "programType": 
self.cdap_program_type(pipeline_type=pipeline_type),
+                "programId": self.cdap_program_id(pipeline_type=pipeline_type),
+            }
+        ]
+        response = self._cdap_request(url=url, method="POST", body=body)
         self._check_response_status_and_data(
             response, f"Starting a pipeline failed with code {response.status}"
         )
         response_json = json.loads(response.data)
-
-        # Extract and validate runId from response
-        if "runId" not in response_json:
-            error_message = response_json.get("error", "Unknown error")
-            raise AirflowException(
-                f"Failed to start pipeline '{pipeline_name}'. "
-                f"The response does not contain a runId. Error: 
{error_message}"
-            )
-
-        return str(response_json["runId"])
+        return response_json[0]["runId"]
 
     def stop_pipeline(self, pipeline_name: str, instance_url: str, namespace: 
str = "default") -> None:
         """
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py 
b/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py
index 49479446cde..d98a64fc17d 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py
@@ -21,7 +21,6 @@ import logging
 from unittest import mock
 
 import aiohttp
-import google.auth.transport
 import pytest
 from aiohttp.helpers import TimerNoop
 from yarl import URL
@@ -341,38 +340,42 @@ class TestDataFusionHook:
     @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
     def test_start_pipeline(self, mock_request, hook):
         run_id = 1234
-        mock_request.return_value = mock.MagicMock(
-            spec=google.auth.transport.Response, status=200, 
data=f'{{"runId":{run_id}}}'
-        )
+        mock_request.return_value = mock.MagicMock(status=200, 
data=f'[{{"runId":{run_id}}}]')
 
-        result = hook.start_pipeline(
-            pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, 
runtime_args=RUNTIME_ARGS
-        )
-        assert result == str(run_id)
+        hook.start_pipeline(pipeline_name=PIPELINE_NAME, 
instance_url=INSTANCE_URL, runtime_args=RUNTIME_ARGS)
+        body = [
+            {
+                "appId": PIPELINE_NAME,
+                "programType": "workflow",
+                "programId": "DataPipelineWorkflow",
+                "runtimeargs": RUNTIME_ARGS,
+            }
+        ]
         mock_request.assert_called_once_with(
-            
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/workflows/DataPipelineWorkflow/start",
-            method="POST",
-            body=RUNTIME_ARGS,
+            url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST", 
body=body
         )
 
     @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
     def test_start_pipeline_stream(self, mock_request, hook):
-        run_id = "test-run-123"
-        mock_request.return_value = mock.MagicMock(
-            spec=google.auth.transport.Response, status=200, 
data=f'{{"runId":"{run_id}"}}'
-        )
+        run_id = 1234
+        mock_request.return_value = mock.MagicMock(status=200, 
data=f'[{{"runId":{run_id}}}]')
 
-        result = hook.start_pipeline(
+        hook.start_pipeline(
             pipeline_name=PIPELINE_NAME,
             instance_url=INSTANCE_URL,
             runtime_args=RUNTIME_ARGS,
             pipeline_type=DataFusionPipelineType.STREAM,
         )
-        assert result == run_id
+        body = [
+            {
+                "appId": PIPELINE_NAME,
+                "programType": "spark",
+                "programId": "DataStreamsSparkStreaming",
+                "runtimeargs": RUNTIME_ARGS,
+            }
+        ]
         mock_request.assert_called_once_with(
-            
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/sparks/DataStreamsSparkStreaming/start",
-            method="POST",
-            body=RUNTIME_ARGS,
+            url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST", 
body=body
         )
 
     @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
@@ -387,10 +390,16 @@ class TestDataFusionHook:
             hook.start_pipeline(
                 pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, 
runtime_args=RUNTIME_ARGS
             )
+        body = [
+            {
+                "appId": PIPELINE_NAME,
+                "programType": "workflow",
+                "programId": "DataPipelineWorkflow",
+                "runtimeargs": RUNTIME_ARGS,
+            }
+        ]
         mock_request.assert_called_once_with(
-            
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/workflows/DataPipelineWorkflow/start",
-            method="POST",
-            body=RUNTIME_ARGS,
+            url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST", 
body=body
         )
 
     @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
@@ -400,31 +409,16 @@ class TestDataFusionHook:
             hook.start_pipeline(
                 pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, 
runtime_args=RUNTIME_ARGS
             )
+        body = [
+            {
+                "appId": PIPELINE_NAME,
+                "programType": "workflow",
+                "programId": "DataPipelineWorkflow",
+                "runtimeargs": RUNTIME_ARGS,
+            }
+        ]
         mock_request.assert_called_once_with(
-            
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/workflows/DataPipelineWorkflow/start",
-            method="POST",
-            body=RUNTIME_ARGS,
-        )
-
-    @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
-    def test_start_pipeline_should_fail_if_no_run_id(self, mock_request, hook):
-        """Test that start_pipeline fails gracefully when response doesn't 
contain runId."""
-        error_response = '{"error": "Invalid runtime arguments"}'
-        mock_request.return_value = mock.MagicMock(
-            spec=google.auth.transport.Response, status=200, 
data=error_response
-        )
-        with pytest.raises(
-            AirflowException,
-            match=r"Failed to start pipeline 'shrubberyPipeline'. "
-            r"The response does not contain a runId. Error: Invalid runtime 
arguments",
-        ):
-            hook.start_pipeline(
-                pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL, 
runtime_args=RUNTIME_ARGS
-            )
-        mock_request.assert_called_once_with(
-            
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/workflows/DataPipelineWorkflow/start",
-            method="POST",
-            body=RUNTIME_ARGS,
+            url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST", 
body=body
         )
 
     @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))

Reply via email to