Hi all

What is/are the suggested way(s) of passing "input parameters" to a DAG run (adding quotes since, as far as we can tell, that concept doesn't exist natively in Airflow, probably by design)?

This would be information that is used by one or multiple operators in a DAG run and that should not change for all task instances in that DAG run, but may be different for another DAG run executing concurrently. An example would be a Git pull request number.

What we tried first was to use a Variable for this, but it doesn't look like that will work because the value can change during the execution of the DAG run. At least, that seems to be the case in the way we're using it:

input_params = Variable.get(<variable_for_dag>)
dag = DAG(..., params=input_params)

We had hoped that this would "fix" the values of the parameters when the DAG run was created, but that does not seem to be the case: if the variable is updated (in preparation for a new DAG run) while a DAG run is active, tasks that haven't executed yet see the new value. I.e. we end up seeing this:

set Variable my_param to "foo"
dag_run_1 starts, gets the variable and passes my_param to the Dag object
dag_run_1.op_1 evaluates {{ params.my_param }} and gets "foo"
set Variable my_param to "bar"
dag_run_2 starts and passes var to the Dag object
dag_run_1.op_2 evaluates {{ params.my_param }} and sees "bar" # want this to still be foo!

Not sure at this point whether this is a bug or, if not, whether there's a different way to retrieve the value of a variable that allows us to "fix" it for the duration of the DAG run.

Or, taking a step back, is there some other approach that we could use to store and retrieve input data to DAGs?

Regards

ap


Reply via email to