bhardwaj-priyanshu opened a new issue, #50387: URL: https://github.com/apache/airflow/issues/50387
### Apache Airflow version 3.0.0 ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? There is a issue within the airflow/providers/google/cloud/hooks/datafusion.py concerning how it interacts with CDAP's Lifecycle Microservices for starting programs. - **Problem:** The Airflow hook currently uses the `POST /v3/namespaces/<namespace-id>/start` endpoint (documented here: [https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-Multiple-Programs](https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-Multiple-Programs)) to start a single program. While this endpoint returns an overall HTTP `200 OK` status code even if some individual programs fail, the Airflow code (specifically around [https://github.com/apache/airflow/blob/3284b8d476408fbb68eafd64c725e5ab23352d36/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py#L492](https://github.com/apache/airflow/blob/3284b8d476408fbb68eafd64c725e5ab23352d36/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py#L492)) does not appear to inspect the `statusCode` field within the *response body*. - **Impact:** When there is some problem in lets say runtime args the response body will have the error code and no runId. This leads to the failure on the next line where the code is trying to extract the run ID. Full Error ``` [2025-05-05T12:19:53.950+0000] {taskinstance.py:2907} ERROR - Task failed with exception Traceback (most recent call last): File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable return execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 401, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/datafusion.py", line 825, in execute pipeline_id = hook.start_pipeline( ^^^^^^^^^^^^^^^^^^^^ File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/datafusion.py", line 500, in start_pipeline return response_json[0]["runId"] ~~~~~~~~~~~~~~~~^^^^^^^^^ KeyError: 'runId' ```  ### What you think should happen instead? There are two issues within the airflow/providers/google/cloud/hooks/datafusion.py concerning how it interacts with CDAP's Lifecycle Microservices for starting programs. **1. Insufficient Status Code Checking** * **Problem:** The Airflow hook currently uses the `POST /v3/namespaces/<namespace-id>/start` endpoint (documented here: [https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-Multiple-Programs](https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-Multiple-Programs)) to start a single program. While this endpoint returns an overall HTTP `200 OK` status code even if some individual programs fail, the Airflow code (specifically around [https://github.com/apache/airflow/blob/3284b8d476408fbb68eafd64c725e5ab23352d36/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py#L492](https://github.com/apache/airflow/blob/3284b8d476408fbb68eafd64c725e5ab23352d36/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py#L492)) does not appear to inspect the `statusCode` field within the *response body*. * **Proposed Fix:** The hook should be updated to parse the JSON response body and check the `statusCode` for each program entry to ensure all intended programs started successfully. **2. [Preferred Approach] Suboptimal API Usage for Single Program Starts** * **Problem:** When starting a *single* program, the Airflow hook (around [https://github.com/apache/airflow/blob/3284b8d476408fbb68eafd64c725e5ab23352d36/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py#L453](https://github.com/apache/airflow/blob/3284b8d476408fbb68eafd64c725e5ab23352d36/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py#L453)) appears to be using the `POST /v3/namespaces/<namespace-id>/start` endpoint, which is designed for starting multiple programs. * **Recommendation:** It is highly recommended to use the dedicated `POST /v3/namespaces/<namespace-id>/apps/<app-id>/<program-type>/<program-id>/start` endpoint for starting a single program (documented here: [https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-a-Program](https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-a-Program)). * **Benefit:** Using the specific single-program endpoint can provide clearer and more direct error handling for individual program starts, potentially reducing complexity in parsing multi-program responses for a single operation. ### How to reproduce Trigger the CDF start pipeline with wrong runtime args or incorrect program. ### Operating System I am not sure how is that related to the issue. NA ### Versions of Apache Airflow Providers _No response_ ### Deployment Official Apache Airflow Helm Chart ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
