GitHub user fr33w1f1 edited a comment on the discussion: Rerun a task with
different params
I'm late but for those who still find a way to work around. Using
BranchPythonOperator will get tasks you want to run and skip the rest. This is
an example
`airflow==2.10.2`
```python
from datetime import datetime, timedelta
import time
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.models.param import Param
FACT_TABLES = ['table1', 'table2', 'table3']
def process_table(**context):
task_id = context['task_instance'].task_id
table_name = task_id.replace("process_", "")
params = context['params']
time.sleep(5)
print(f"Processing {table_name} with date: {params['process_date']}")
def select_tables(**context):
dag_conf = context['dag_run'].conf
if not dag_conf or not dag_conf.get('tables'):
return [f'process_{table}' for table in FACT_TABLES]
return [f'process_{table}' for table in dag_conf.get('tables')]
default_args = {
'start_date': datetime(2024, 1, 1),
}
with DAG(
'selective_table_processing',
default_args=default_args,
schedule_interval=None, # Manual trigger
params={
'process_date': Param(
default=(datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d'),
type='string'
)
}
) as dag:
select_tables_task = BranchPythonOperator(
task_id='select_tables',
python_callable=select_tables,
provide_context=True
)
tasks = []
for table in FACT_TABLES:
task = PythonOperator(
task_id=f'process_{table}',
python_callable=process_table,
provide_context=True,
dag=dag
)
tasks.append(task)
select_tables_task >> tasks
```
On UI, click Trigger DAG and put this into Configuration JSON
```
{
"process_date": "2025-02-18",
"tables": ["table1", "table3"]
}
```
Click trigger, you see task `process_table2` is skipped (pink) and the rest
run with `process_date`
GitHub link:
https://github.com/apache/airflow/discussions/31546#discussioncomment-12248337
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]