hterik commented on issue #19673:
URL: https://github.com/apache/airflow/issues/19673#issuecomment-984574489
I've tried out `task_instance_mutation_hook` from Cluster Policies now. It
sounds a bit strange to use something called policies for this use case but if
it works the way it's explained it's probably good enough.
Unfortunately it seems to not behave the way it's described in the
documentation, eg _"Called right before task execution."_.
What happens instead is the hook is being called on all task instances
whenever a dagrun is scheduled. Additionally it's not called at all when
manually triggering a dag, only when triggered by schedule.
Following example demonstrates:
```py
@task
def pvc_provider():
time.sleep(5)
return "pvc://blablabla"
@task
def pvc_consumer(inparam):
time.sleep(5)
return "build result"
pvc_consumer(pvc_provider())
```
The only thing my `task_instance_mutation_hook` does is to log the task
instances being passed into it.
Observe the time stamps, in the log we see the mutation _hook being called
in the very same moment for both the tasks_, when they should be 5 seconds
apart.
```
[2021-12-02 12:50:03,024] {airflow_local_settings.py:16} INFO - In the
mutation hook for ti='\x1b[01mpvc_provider\x1b[22m',
run_id='\x1b[01mscheduled__2021-12-02T11:49:52.937626+00:00\x1b[22m'
[2021-12-02 12:50:03,025] {airflow_local_settings.py:16} INFO - In the
mutation hook for ti='\x1b[01mpvc_consumer\x1b[22m',
run_id='\x1b[01mscheduled__2021-12-02T11:49:52.937626+00:00\x1b[22m'
```
producer task starts shortly thereafter
```
[2021-12-02 12:50:03,060] {dag.py:2928} INFO - Setting next_dagrun for pvc
to 2021-12-02T11:50:02.937626+00:00
[2021-12-02 12:50:03,097] {scheduler_job.py:288} INFO - 1 tasks up for
execution:
<TaskInstance: pvc.pvc_provider
scheduled__2021-12-02T11:49:52.937626+00:00 [scheduled]>
[2021-12-02 12:50:03,098] {scheduler_job.py:410} INFO - Setting the
following tasks to queued state:
<TaskInstance: pvc.pvc_provider
scheduled__2021-12-02T11:49:52.937626+00:00 [scheduled]>
[2021-12-02 12:50:03,099] {scheduler_job.py:450} INFO - Sending
TaskInstanceKey(dag_id='pvc', task_id='pvc_provider',
run_id='scheduled__2021-12-02T11:49:52.937626+00:00', try_number=1) to executor
with priority 2 and queue default
[2021-12-02 12:50:03,099] {base_executor.py:82} INFO - Adding to queue:
['airflow', 'tasks', 'run', 'pvc', 'pvc_provider',
'scheduled__2021-12-02T11:49:52.937626+00:00', '--local', '--subdir',
'DAGS_FOLDER/dag_pvc.py']
[2021-12-02 12:50:03,105] {sequential_executor.py:59} INFO - Executing
command: ['airflow', 'tasks', 'run', 'pvc', 'pvc_provider',
'scheduled__2021-12-02T11:49:52.937626+00:00', '--local', '--subdir',
'DAGS_FOLDER/dag_pvc.py']
Running <TaskInstance: pvc.pvc_provider
scheduled__2021-12-02T11:49:52.937626+00:00 [queued]> on host
[2021-12-02 12:50:09,307] {scheduler_job.py:504} INFO - Executor reports
execution of pvc.pvc_provider
run_id=scheduled__2021-12-02T11:49:52.937626+00:00 exited with status success
for try_number 1
[2021-12-02 12:50:09,311] {scheduler_job.py:547} INFO - TaskInstance
Finished: dag_id=pvc, task_id=pvc_provider,
run_id=scheduled__2021-12-02T11:49:52.937626+00:00, run_start_date=2021-12-02
11:50:04.024368+00:00, run_end_date=2021-12-02 11:50:09.107072+00:00,
run_duration=5.082704, state=success, executor_state=success, try_number=1,
max_tries=0, job_id=288, pool=default_pool, queue=default, priority_weight=2,
operator=_PythonDecoratedOperator
```
5 seconds later producer finishes and consumer is scheduled. Here is where
the mutation is desired to happen.
```
[2021-12-02 12:50:09,458] {scheduler_job.py:288} INFO - 1 tasks up for
execution:
<TaskInstance: pvc.pvc_consumer
scheduled__2021-12-02T11:49:52.937626+00:00 [scheduled]>
[2021-12-02 12:50:09,459] {scheduler_job.py:317} INFO - Figuring out tasks
to run in Pool(name=default_pool) with 128 open slots and 1 task instances
ready to be queued
[2021-12-02 12:50:09,459] {scheduler_job.py:345} INFO - DAG pvc has 0/16
running and queued tasks
[2021-12-02 12:50:09,459] {scheduler_job.py:410} INFO - Setting the
following tasks to queued state:
<TaskInstance: pvc.pvc_consumer
scheduled__2021-12-02T11:49:52.937626+00:00 [scheduled]>
[2021-12-02 12:50:09,460] {scheduler_job.py:450} INFO - Sending
TaskInstanceKey(dag_id='pvc', task_id='pvc_consumer',
run_id='scheduled__2021-12-02T11:49:52.937626+00:00', try_number=1) to executor
with priority 1 and queue default
[2021-12-02 12:50:09,460] {base_executor.py:82} INFO - Adding to queue:
['airflow', 'tasks', 'run', 'pvc', 'pvc_consumer',
'scheduled__2021-12-02T11:49:52.937626+00:00', '--local', '--subdir',
'DAGS_FOLDER/dag_pvc.py']
[2021-12-02 12:50:09,465] {sequential_executor.py:59} INFO - Executing
command: ['airflow', 'tasks', 'run', 'pvc', 'pvc_consumer',
'scheduled__2021-12-02T11:49:52.937626+00:00', '--local', '--subdir',
'DAGS_FOLDER/dag_pvc.py']
Running <TaskInstance: pvc.pvc_consumer
scheduled__2021-12-02T11:49:52.937626+00:00 [queued]> on host
```
5 seconds later consumer finishes
```
[2021-12-02 12:50:15,675] {scheduler_job.py:504} INFO - Executor reports
execution of pvc.pvc_consumer
run_id=scheduled__2021-12-02T11:49:52.937626+00:00 exited with status success
for try_number 1
[2021-12-02 12:50:15,679] {scheduler_job.py:547} INFO - TaskInstance
Finished: dag_id=pvc, task_id=pvc_consumer,
run_id=scheduled__2021-12-02T11:49:52.937626+00:00, run_start_date=2021-12-02
11:50:10.399690+00:00, run_end_date=2021-12-02 11:50:15.498334+00:00,
run_duration=5.098644, state=success, executor_state=success, try_number=1,
max_tries=0, job_id=289, pool=default_pool, queue=default, priority_weight=1,
operator=_PythonDecoratedOperator
[2021-12-02 12:50:15,700] {manager.py:1051} INFO - Finding 'running' jobs
without a recent heartbeat
[2021-12-02 12:50:15,700] {manager.py:1055} INFO - Failing jobs without
heartbeat after 2021-12-02 11:45:15.700869+00:00
[2021-12-02 12:50:15,815] {airflow_local_settings.py:16} INFO - In the
mutation hook for ti='\x1b[01mpvc_provider\x1b[22m',
run_id='\x1b[01mscheduled__2021-12-02T11:50:02.937626+00:00\x1b[22m'
[2021-12-02 12:50:15,816] {airflow_local_settings.py:16} INFO - In the
mutation hook for ti='\x1b[01mpvc_consumer\x1b[22m',
run_id='\x1b[01mscheduled__2021-12-02T11:50:02.937626+00:00\x1b[22m'
```
This was tried out using the SequentialScheduler and sqllite, maybe other
schedulers behave differently? Though i doubt it, skimming through the
DagRun.verify_integrity where the hook is called it appears to always apply to
all taskinstances at the same time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]