Ok and here's an example you could try:
The callable:
def skip_if_specified(context):
dr = context.get('dag_run')
if not dr:
return
conf = dr.conf
if not conf:
return
skip_list = conf.get('skip_list')
if not skip_list:
return
ti = conf.get('task_instance')
if not ti:
return
if ti.task_id in skip_list:
raise AirflowSkipException()
How to use in a task:
op = BashOperator(
task_id='hello',
pre_execute=skip_if_specified,
bash_command='echo hello',
)
Or in all tasks in a dag:
dag = DAG(
task_id="my_dag",
default_args=dict(pre_execute=skip_if_specified),
...
)
And then, I'm pretty sure if you supply this conf to dag run it will skip
this task:
{'skip_list': ['hello']}