bhardwaj-priyanshu opened a new issue, #50387:
URL: https://github.com/apache/airflow/issues/50387

   ### Apache Airflow version
   
   3.0.0
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   
   
   There is a issue within the 
airflow/providers/google/cloud/hooks/datafusion.py concerning how it interacts 
with CDAP's Lifecycle Microservices for starting programs.
   
   - **Problem:** The Airflow hook currently uses the `POST 
/v3/namespaces/<namespace-id>/start` endpoint (documented here: 
[https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-Multiple-Programs](https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-Multiple-Programs))
 to start a single program. While this endpoint returns an overall HTTP `200 
OK` status code even if some individual programs fail, the Airflow code 
(specifically around 
[https://github.com/apache/airflow/blob/3284b8d476408fbb68eafd64c725e5ab23352d36/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py#L492](https://github.com/apache/airflow/blob/3284b8d476408fbb68eafd64c725e5ab23352d36/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py#L492))
 does not appear to inspect the `statusCode` field within the *response body*.
   - **Impact:** When there is some problem in lets say runtime args the 
response body will have the error code and no runId. This leads to the failure 
on the next line where the code is trying to extract the run ID.
   
   Full Error
   ```
   [2025-05-05T12:19:53.950+0000] {taskinstance.py:2907} ERROR - Task failed 
with exception
   Traceback (most recent call last):
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", 
line 465, in _execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", 
line 432, in _execute_callable
       return execute_callable(context=context, **execute_callable_kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", 
line 401, in wrapper
       return func(self, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/datafusion.py",
 line 825, in execute
       pipeline_id = hook.start_pipeline(
                     ^^^^^^^^^^^^^^^^^^^^
     File 
"/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/datafusion.py",
 line 500, in start_pipeline
       return response_json[0]["runId"]
              ~~~~~~~~~~~~~~~~^^^^^^^^^
   KeyError: 'runId'
   ```
   
   
![Image](https://github.com/user-attachments/assets/c9df551b-abd8-4cda-a263-c7d7238a4a15)
   
   
   
   ### What you think should happen instead?
   
   There are two issues within the 
airflow/providers/google/cloud/hooks/datafusion.py concerning how it interacts 
with CDAP's Lifecycle Microservices for starting programs.
   
   **1. Insufficient Status Code Checking**
   
   * **Problem:** The Airflow hook currently uses the `POST 
/v3/namespaces/<namespace-id>/start` endpoint (documented here: 
[https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-Multiple-Programs](https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-Multiple-Programs))
 to start a single program. While this endpoint returns an overall HTTP `200 
OK` status code even if some individual programs fail, the Airflow code 
(specifically around 
[https://github.com/apache/airflow/blob/3284b8d476408fbb68eafd64c725e5ab23352d36/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py#L492](https://github.com/apache/airflow/blob/3284b8d476408fbb68eafd64c725e5ab23352d36/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py#L492))
 does not appear to inspect the `statusCode` field within the *response body*.
   * **Proposed Fix:** The hook should be updated to parse the JSON response 
body and check the `statusCode` for each program entry to ensure all intended 
programs started successfully.
   
   **2. [Preferred Approach] Suboptimal API Usage for Single Program Starts**
   
   * **Problem:** When starting a *single* program, the Airflow hook (around 
[https://github.com/apache/airflow/blob/3284b8d476408fbb68eafd64c725e5ab23352d36/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py#L453](https://github.com/apache/airflow/blob/3284b8d476408fbb68eafd64c725e5ab23352d36/providers/google/src/airflow/providers/google/cloud/hooks/datafusion.py#L453))
 appears to be using the `POST /v3/namespaces/<namespace-id>/start` endpoint, 
which is designed for starting multiple programs.
   * **Recommendation:** It is highly recommended to use the dedicated `POST 
/v3/namespaces/<namespace-id>/apps/<app-id>/<program-type>/<program-id>/start` 
endpoint for starting a single program (documented here: 
[https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-a-Program](https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-a-Program)).
   * **Benefit:** Using the specific single-program endpoint can provide 
clearer and more direct error handling for individual program starts, 
potentially reducing complexity in parsing multi-program responses for a single 
operation.
   
   
   ### How to reproduce
   
   Trigger the CDF start pipeline with wrong runtime args or incorrect program.
   
   ### Operating System
   
   I am not sure how is that related to the issue. NA
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to