dstandish commented on issue #31720:
URL: https://github.com/apache/airflow/issues/31720#issuecomment-1579175299

   OK so here's the message shared in that ticket:
   ```
   [2023-05-08, 02:19:30 UTC] {credentials.py:617} INFO - Found credentials in 
shared credentials file: ~/.aws/credentials
   [2023-05-08, 02:19:38 UTC] {redshift_cluster.py:203} INFO - Status of 
cluster is pausing
   [2023-05-08, 02:19:57 UTC] {redshift_cluster.py:203} INFO - Status of 
cluster is pausing
   [2023-05-08, 02:20:15 UTC] {redshift_cluster.py:203} INFO - Status of 
cluster is pausing
   [2023-05-08, 02:20:28 UTC] {taskinstance.py:1144} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
example_redshift.pause_cluster manual__2023-05-08T02:11:38.809301+00:00 
[queued]>
   [2023-05-08, 02:20:28 UTC] {taskinstance.py:1144} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
example_redshift.pause_cluster manual__2023-05-08T02:11:38.809301+00:00 
[queued]>
   [2023-05-08, 02:20:28 UTC] {taskinstance.py:1343} INFO - Resuming after 
deferral
   [2023-05-08, 02:20:28 UTC] {taskinstance.py:1364} INFO - Executing 
<Task(RedshiftPauseClusterOperator): pause_cluster> on 2023-05-08 
02:11:38.809301+00:00
   [2023-05-08, 02:20:28 UTC] {standard_task_runner.py:57} INFO - Started 
process 328 to run task
   [2023-05-08, 02:20:28 UTC] {standard_task_runner.py:84} INFO - Running: 
['***', 'tasks', 'run', 'example_redshift', 'pause_cluster', 
'manual__2023-05-08T02:11:38.809301+00:00', '--job-id', '17', '--raw', 
'--subdir', 'DAGS_FOLDER/example_redshift.py', '--cfg-path', '/tmp/tmp582aoom9']
   [2023-05-08, 02:20:28 UTC] {standard_task_runner.py:85} INFO - Job 17: 
Subtask pause_cluster
   [2023-05-08, 02:20:28 UTC] {task_command.py:410} INFO - Running 
<TaskInstance: example_redshift.pause_cluster 
manual__2023-05-08T02:11:38.809301+00:00 [running]> on host 9ed7c7091244
   [2023-05-08, 02:20:28 UTC] {taskinstance.py:1863} ERROR - Task failed with 
exception
   airflow.exceptions.TaskDeferralError: Trigger/execution timeout
   [2023-05-08, 02:20:28 UTC] {taskinstance.py:1382} INFO - Marking task as 
FAILED. dag_id=example_redshift, task_id=pause_cluster, 
execution_date=20230508T021138, start_date=20230508T021920, 
end_date=20230508T022028
   ```
   
   Notice that after the last `Status of cluster is pausing` message, there's 
nothing indicating that anything went wrong with the trigger (until the very 
end of the log)
   
   So the author of that PR pointed out that, it sorta makes it look like the 
trigger completed successfully and the task resumed.
   
   The goal is of this PR is to improve that somewhat.
   
   Initially I was thinking we would be able to add a log message right there 
such as 
   
   ```
   [2023-05-08, 02:20:15 UTC] {redshift_cluster.py:203} INFO - Trigger 
cancelled due to timeout
   ````
   
   or 
   
   ```
   [2023-05-08, 02:20:15 UTC] {redshift_cluster.py:203} INFO - Trigger 
cancelled: running elsewhere
   ```
   
   Looking again, that might actually be more complicated and tougher to 
accomplish.  At a minimum though, I think it would be good to add a log message 
[here](https://github.com/apache/airflow/blob/main/airflow/jobs/triggerer_job_runner.py#L611).
   
   Something such as 
   
   ```
   except CancelledError:
       self.log.info("Trigger cancelled")
   ```
   
   This alone would be an improvement.
   
   ---
   
   I think in order to get more information than that, we obviously have 
somehow get the cancellation reason reason forwarded to the location above.
   
   One way we can do that is through a `msg` passed to `task.cancel`. Here's 
some sample code demonstrating that:
   
   ```
   import asyncio
   
   async def cancel_me():
       print('cancel_me(): before sleep')
   
       try:
           # Wait for 1 hour
           await asyncio.sleep(3600)
       except asyncio.CancelledError as e:
           print(f"cancel_me was cancelled: {e.args[0]!r}")
           raise
       finally:
           print('cancel_me(): after sleep')
   
   async def main():
       # Create a "cancel_me" Task
       task = asyncio.create_task(cancel_me())
   
       # Wait for 1 second
       await asyncio.sleep(1)
   
       task.cancel("this is why")
       try:
           await task
       except asyncio.CancelledError:
           print("main(): cancel_me is cancelled now")
   
   asyncio.run(main())
   ```
   
   So that would take us to 
[here](https://github.com/apache/airflow/blob/main/airflow/jobs/triggerer_job_runner.py#LL515C37-L515C37),
 where we actually initiate cancellation.  But at that location, we don't 
currently know why we are cancelling.  So we'd have to figure out some way to 
get access there to the needed information.
   
   --- 
   
   So, it's up to you, maybe you want to just get the quick win with the first 
option.  Maybe you want to go for the more detailed solution.  
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to