This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f466c77f476 Revert "Fix Google Cloud Data Fusion hook to handle
pipeline start errors properly (#58698)" (#60701)
f466c77f476 is described below
commit f466c77f476a465c1be5a08f3e81bdb447bd2da8
Author: Shahar Epstein <[email protected]>
AuthorDate: Sat Jan 17 17:58:56 2026 +0200
Revert "Fix Google Cloud Data Fusion hook to handle pipeline start errors
properly (#58698)" (#60701)
This reverts commit a4f2b33e0b7bd926769242e514a78d06ad29af39.
---
.../providers/google/cloud/hooks/datafusion.py | 36 +++++----
.../unit/google/cloud/hooks/test_datafusion.py | 88 ++++++++++------------
2 files changed, 58 insertions(+), 66 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py
b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py
index 063f7bbc2e3..73183ca2236 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py
@@ -469,33 +469,31 @@ class DataFusionHook(GoogleBaseHook):
is always default. If your pipeline belongs to an Enterprise
edition instance, you
can create a namespace.
"""
- # Use the single-program start endpoint for better error handling
- #
https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-a-Program
- program_type = self.cdap_program_type(pipeline_type=pipeline_type)
- program_id = self.cdap_program_id(pipeline_type=pipeline_type)
+ # TODO: This API endpoint starts multiple pipelines. There will
eventually be a fix
+ # return the run Id as part of the API request to run a single
pipeline.
+ # https://github.com/apache/airflow/pull/8954#discussion_r438223116
url = os.path.join(
- self._base_url(instance_url, namespace),
- quote(pipeline_name),
- f"{program_type}s",
- program_id,
+ instance_url,
+ "v3",
+ "namespaces",
+ quote(namespace),
"start",
)
runtime_args = runtime_args or {}
- response = self._cdap_request(url=url, method="POST",
body=runtime_args)
+ body = [
+ {
+ "appId": pipeline_name,
+ "runtimeargs": runtime_args,
+ "programType":
self.cdap_program_type(pipeline_type=pipeline_type),
+ "programId": self.cdap_program_id(pipeline_type=pipeline_type),
+ }
+ ]
+ response = self._cdap_request(url=url, method="POST", body=body)
self._check_response_status_and_data(
response, f"Starting a pipeline failed with code {response.status}"
)
response_json = json.loads(response.data)
-
- # Extract and validate runId from response
- if "runId" not in response_json:
- error_message = response_json.get("error", "Unknown error")
- raise AirflowException(
- f"Failed to start pipeline '{pipeline_name}'. "
- f"The response does not contain a runId. Error:
{error_message}"
- )
-
- return str(response_json["runId"])
+ return response_json[0]["runId"]
def stop_pipeline(self, pipeline_name: str, instance_url: str, namespace:
str = "default") -> None:
"""
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py
b/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py
index 49479446cde..d98a64fc17d 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_datafusion.py
@@ -21,7 +21,6 @@ import logging
from unittest import mock
import aiohttp
-import google.auth.transport
import pytest
from aiohttp.helpers import TimerNoop
from yarl import URL
@@ -341,38 +340,42 @@ class TestDataFusionHook:
@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_start_pipeline(self, mock_request, hook):
run_id = 1234
- mock_request.return_value = mock.MagicMock(
- spec=google.auth.transport.Response, status=200,
data=f'{{"runId":{run_id}}}'
- )
+ mock_request.return_value = mock.MagicMock(status=200,
data=f'[{{"runId":{run_id}}}]')
- result = hook.start_pipeline(
- pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL,
runtime_args=RUNTIME_ARGS
- )
- assert result == str(run_id)
+ hook.start_pipeline(pipeline_name=PIPELINE_NAME,
instance_url=INSTANCE_URL, runtime_args=RUNTIME_ARGS)
+ body = [
+ {
+ "appId": PIPELINE_NAME,
+ "programType": "workflow",
+ "programId": "DataPipelineWorkflow",
+ "runtimeargs": RUNTIME_ARGS,
+ }
+ ]
mock_request.assert_called_once_with(
-
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/workflows/DataPipelineWorkflow/start",
- method="POST",
- body=RUNTIME_ARGS,
+ url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST",
body=body
)
@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
def test_start_pipeline_stream(self, mock_request, hook):
- run_id = "test-run-123"
- mock_request.return_value = mock.MagicMock(
- spec=google.auth.transport.Response, status=200,
data=f'{{"runId":"{run_id}"}}'
- )
+ run_id = 1234
+ mock_request.return_value = mock.MagicMock(status=200,
data=f'[{{"runId":{run_id}}}]')
- result = hook.start_pipeline(
+ hook.start_pipeline(
pipeline_name=PIPELINE_NAME,
instance_url=INSTANCE_URL,
runtime_args=RUNTIME_ARGS,
pipeline_type=DataFusionPipelineType.STREAM,
)
- assert result == run_id
+ body = [
+ {
+ "appId": PIPELINE_NAME,
+ "programType": "spark",
+ "programId": "DataStreamsSparkStreaming",
+ "runtimeargs": RUNTIME_ARGS,
+ }
+ ]
mock_request.assert_called_once_with(
-
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/sparks/DataStreamsSparkStreaming/start",
- method="POST",
- body=RUNTIME_ARGS,
+ url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST",
body=body
)
@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
@@ -387,10 +390,16 @@ class TestDataFusionHook:
hook.start_pipeline(
pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL,
runtime_args=RUNTIME_ARGS
)
+ body = [
+ {
+ "appId": PIPELINE_NAME,
+ "programType": "workflow",
+ "programId": "DataPipelineWorkflow",
+ "runtimeargs": RUNTIME_ARGS,
+ }
+ ]
mock_request.assert_called_once_with(
-
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/workflows/DataPipelineWorkflow/start",
- method="POST",
- body=RUNTIME_ARGS,
+ url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST",
body=body
)
@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
@@ -400,31 +409,16 @@ class TestDataFusionHook:
hook.start_pipeline(
pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL,
runtime_args=RUNTIME_ARGS
)
+ body = [
+ {
+ "appId": PIPELINE_NAME,
+ "programType": "workflow",
+ "programId": "DataPipelineWorkflow",
+ "runtimeargs": RUNTIME_ARGS,
+ }
+ ]
mock_request.assert_called_once_with(
-
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/workflows/DataPipelineWorkflow/start",
- method="POST",
- body=RUNTIME_ARGS,
- )
-
- @mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))
- def test_start_pipeline_should_fail_if_no_run_id(self, mock_request, hook):
- """Test that start_pipeline fails gracefully when response doesn't
contain runId."""
- error_response = '{"error": "Invalid runtime arguments"}'
- mock_request.return_value = mock.MagicMock(
- spec=google.auth.transport.Response, status=200,
data=error_response
- )
- with pytest.raises(
- AirflowException,
- match=r"Failed to start pipeline 'shrubberyPipeline'. "
- r"The response does not contain a runId. Error: Invalid runtime
arguments",
- ):
- hook.start_pipeline(
- pipeline_name=PIPELINE_NAME, instance_url=INSTANCE_URL,
runtime_args=RUNTIME_ARGS
- )
- mock_request.assert_called_once_with(
-
url=f"{INSTANCE_URL}/v3/namespaces/default/apps/{PIPELINE_NAME}/workflows/DataPipelineWorkflow/start",
- method="POST",
- body=RUNTIME_ARGS,
+ url=f"{INSTANCE_URL}/v3/namespaces/default/start", method="POST",
body=body
)
@mock.patch(HOOK_STR.format("DataFusionHook._cdap_request"))