themitigater opened a new issue, #36567:
URL: https://github.com/apache/airflow/issues/36567

   ### Apache Airflow Provider(s)
   
   google
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Apache Airflow version
   
   2.6.3
   
   ### Operating System
   
   Linux
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   Google Cloud Composer
   composer-2.5.3-airflow-2.6.3
   
   ### What happened
   
   ```python
   ERROR 2024-01-03T19:38:49.509065278Z Task failed with exception Traceback 
(most recent call last): File 
"/home/airflow/gcs/plugins/operators/dataflow_staging.py", line 885, in execute 
self.job = self.hook.start_flex_template( File 
"/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py",
 line 475, in inner_wrapper return func(self, *args, **kwargs) File 
"/home/airflow/gcs/plugins/hooks/dataflow_staging.py", line 871, in 
start_flex_template jobs_controller.wait_for_done() File 
"/home/airflow/gcs/plugins/hooks/dataflow_staging.py", line 484, in 
wait_for_done while self._jobs and not all( File 
"/home/airflow/gcs/plugins/hooks/dataflow_staging.py", line 485, in <genexpr> 
self._check_dataflow_job_state(job) for job in self._jobs File 
"/home/airflow/gcs/plugins/hooks/dataflow_staging.py", line 475, in 
_check_dataflow_job_state raise Exception( Exception: Google Cloud Dataflow job 
dataflow-operator-some-pipeline-status-20240103-150500-3-c0b5a70c i
 s in an unexpected terminal state: JOB_STATE_CANCELLED, expected terminal 
state: JOB_STATE_DONE
   ERROR 2024-01-03T19:38:50.049752244Z Failed to execute job 903791 for task 
dataflow_operator-staging-av_agent_active_status (Google Cloud Dataflow job 
dataflow-operator-some-pipeline-status-20240103-150500-3-c0b5a70c is in an 
unexpected terminal state: JOB_STATE_CANCELLED, expected terminal state: 
JOB_STATE_DONE; 409959)
   ERROR 2024-01-03T19:38:50.172097569Z Marking run <DagRun 
dataflow_dag-staging-av_agent_active_status @ 2024-01-03 15:05:00+00:00: 
scheduled__2024-01-03T15:05:00+00:00, state:running, queued_at: 2024-01-03 
19:34:54.329527+00:00. externally triggered: False> failed
   ---------------------
   This is the problematic behaviour
   ------------------------
   ERROR 2024-01-03T19:38:56.091734160Z [2024-01-03 19:38:56,079] {dag.py:1354} 
ERROR - failed to invoke dag state update callback
   ```
   
   ### What you think should happen instead
   
   This should log an error and execute the corresponding failure callbacks 
defined in the DAG file
   
   ### How to reproduce
   
   1. Create any DAG and use `DataflowStartFlexTemplateOperator` with 
   2. Cancel the Dataflow Job
   3. Check the status on Airflow, the Job would be failed the above errors 
would appear in the logs
   
   ### Anything else
   
   I have tried using the code for `hooks/dataflow.py` & 
`operators/dataflow.py`  in the main branch, 2.8.0 branch and the behaviour is 
the same. This was not the case a month ago (unsure if Google Cloud has updated 
the providers internally)
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to