hterik commented on issue #19673:
URL: https://github.com/apache/airflow/issues/19673#issuecomment-984574489


   I've tried out `task_instance_mutation_hook` from Cluster Policies now. It 
sounds a bit strange to use something called policies for this use case but if 
it works the way it's explained it's probably good enough.
   
   Unfortunately it seems to not behave the way it's described in the 
documentation, eg _"Called right before task execution."_.
   What happens instead is the hook is being called on all task instances 
whenever a dagrun is scheduled. Additionally it's not called at all when 
manually triggering a dag, only when triggered by schedule.
   
   
   Following example demonstrates:
   ```py
       @task
       def pvc_provider():
           time.sleep(5)
           return "pvc://blablabla"
   
       @task    
       def pvc_consumer(inparam):
           time.sleep(5)        
           return "build result"
   
       pvc_consumer(pvc_provider())
   ```
   
   The only thing my `task_instance_mutation_hook` does is to log the task 
instances being passed into it.
   Observe the time stamps, in the log we see the mutation _hook being called 
in the very same moment for both the tasks_, when they should be 5 seconds 
apart.
   
   ```
   [2021-12-02 12:50:03,024] {airflow_local_settings.py:16} INFO - In the 
mutation hook for ti='\x1b[01mpvc_provider\x1b[22m', 
run_id='\x1b[01mscheduled__2021-12-02T11:49:52.937626+00:00\x1b[22m'
   [2021-12-02 12:50:03,025] {airflow_local_settings.py:16} INFO - In the 
mutation hook for ti='\x1b[01mpvc_consumer\x1b[22m', 
run_id='\x1b[01mscheduled__2021-12-02T11:49:52.937626+00:00\x1b[22m'
   ```
   
   producer task starts shortly thereafter
   
   ```
   [2021-12-02 12:50:03,060] {dag.py:2928} INFO - Setting next_dagrun for pvc 
to 2021-12-02T11:50:02.937626+00:00
   [2021-12-02 12:50:03,097] {scheduler_job.py:288} INFO - 1 tasks up for 
execution:
        <TaskInstance: pvc.pvc_provider 
scheduled__2021-12-02T11:49:52.937626+00:00 [scheduled]>
   [2021-12-02 12:50:03,098] {scheduler_job.py:410} INFO - Setting the 
following tasks to queued state:
        <TaskInstance: pvc.pvc_provider 
scheduled__2021-12-02T11:49:52.937626+00:00 [scheduled]>
   [2021-12-02 12:50:03,099] {scheduler_job.py:450} INFO - Sending 
TaskInstanceKey(dag_id='pvc', task_id='pvc_provider', 
run_id='scheduled__2021-12-02T11:49:52.937626+00:00', try_number=1) to executor 
with priority 2 and queue default
   [2021-12-02 12:50:03,099] {base_executor.py:82} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'pvc', 'pvc_provider', 
'scheduled__2021-12-02T11:49:52.937626+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_pvc.py']
   [2021-12-02 12:50:03,105] {sequential_executor.py:59} INFO - Executing 
command: ['airflow', 'tasks', 'run', 'pvc', 'pvc_provider', 
'scheduled__2021-12-02T11:49:52.937626+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_pvc.py']
   Running <TaskInstance: pvc.pvc_provider 
scheduled__2021-12-02T11:49:52.937626+00:00 [queued]> on host 
   [2021-12-02 12:50:09,307] {scheduler_job.py:504} INFO - Executor reports 
execution of pvc.pvc_provider 
run_id=scheduled__2021-12-02T11:49:52.937626+00:00 exited with status success 
for try_number 1
   [2021-12-02 12:50:09,311] {scheduler_job.py:547} INFO - TaskInstance 
Finished: dag_id=pvc, task_id=pvc_provider, 
run_id=scheduled__2021-12-02T11:49:52.937626+00:00, run_start_date=2021-12-02 
11:50:04.024368+00:00, run_end_date=2021-12-02 11:50:09.107072+00:00, 
run_duration=5.082704, state=success, executor_state=success, try_number=1, 
max_tries=0, job_id=288, pool=default_pool, queue=default, priority_weight=2, 
operator=_PythonDecoratedOperator
   ```
   
   5 seconds later producer finishes and consumer is scheduled. Here is where 
the mutation is desired to happen.
   
   ```
   [2021-12-02 12:50:09,458] {scheduler_job.py:288} INFO - 1 tasks up for 
execution:
        <TaskInstance: pvc.pvc_consumer 
scheduled__2021-12-02T11:49:52.937626+00:00 [scheduled]>
   [2021-12-02 12:50:09,459] {scheduler_job.py:317} INFO - Figuring out tasks 
to run in Pool(name=default_pool) with 128 open slots and 1 task instances 
ready to be queued
   [2021-12-02 12:50:09,459] {scheduler_job.py:345} INFO - DAG pvc has 0/16 
running and queued tasks
   [2021-12-02 12:50:09,459] {scheduler_job.py:410} INFO - Setting the 
following tasks to queued state:
        <TaskInstance: pvc.pvc_consumer 
scheduled__2021-12-02T11:49:52.937626+00:00 [scheduled]>
   [2021-12-02 12:50:09,460] {scheduler_job.py:450} INFO - Sending 
TaskInstanceKey(dag_id='pvc', task_id='pvc_consumer', 
run_id='scheduled__2021-12-02T11:49:52.937626+00:00', try_number=1) to executor 
with priority 1 and queue default
   [2021-12-02 12:50:09,460] {base_executor.py:82} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'pvc', 'pvc_consumer', 
'scheduled__2021-12-02T11:49:52.937626+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_pvc.py']
   [2021-12-02 12:50:09,465] {sequential_executor.py:59} INFO - Executing 
command: ['airflow', 'tasks', 'run', 'pvc', 'pvc_consumer', 
'scheduled__2021-12-02T11:49:52.937626+00:00', '--local', '--subdir', 
'DAGS_FOLDER/dag_pvc.py']
   Running <TaskInstance: pvc.pvc_consumer 
scheduled__2021-12-02T11:49:52.937626+00:00 [queued]> on host
   ```
   
   5 seconds later consumer finishes
   
   ```
   [2021-12-02 12:50:15,675] {scheduler_job.py:504} INFO - Executor reports 
execution of pvc.pvc_consumer 
run_id=scheduled__2021-12-02T11:49:52.937626+00:00 exited with status success 
for try_number 1
   [2021-12-02 12:50:15,679] {scheduler_job.py:547} INFO - TaskInstance 
Finished: dag_id=pvc, task_id=pvc_consumer, 
run_id=scheduled__2021-12-02T11:49:52.937626+00:00, run_start_date=2021-12-02 
11:50:10.399690+00:00, run_end_date=2021-12-02 11:50:15.498334+00:00, 
run_duration=5.098644, state=success, executor_state=success, try_number=1, 
max_tries=0, job_id=289, pool=default_pool, queue=default, priority_weight=1, 
operator=_PythonDecoratedOperator
   [2021-12-02 12:50:15,700] {manager.py:1051} INFO - Finding 'running' jobs 
without a recent heartbeat
   [2021-12-02 12:50:15,700] {manager.py:1055} INFO - Failing jobs without 
heartbeat after 2021-12-02 11:45:15.700869+00:00
   [2021-12-02 12:50:15,815] {airflow_local_settings.py:16} INFO - In the 
mutation hook for ti='\x1b[01mpvc_provider\x1b[22m', 
run_id='\x1b[01mscheduled__2021-12-02T11:50:02.937626+00:00\x1b[22m'
   [2021-12-02 12:50:15,816] {airflow_local_settings.py:16} INFO - In the 
mutation hook for ti='\x1b[01mpvc_consumer\x1b[22m', 
run_id='\x1b[01mscheduled__2021-12-02T11:50:02.937626+00:00\x1b[22m'
   ```
   
   This was tried out using the SequentialScheduler and sqllite, maybe other 
schedulers behave differently?  Though i doubt it, skimming through the 
DagRun.verify_integrity where the hook is called it appears to always apply to 
all taskinstances at the same time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to