There are a lot of ways to define the input source. Let's suppose you have
these inputs in a relational database, or a flat file on S3.
The first task in your DAG would be a matter of querying for those inputs,
or grabbing the file. The trick is getting the inputs to later tasks. The
XCOM feature is a way to share data between your tasks, so it's a matter of
pulling the XCOM from the task that originally queried the inputs.
Suppose you had an "input operator"
class InputOperator(BaseOperator)
.... with an execute method ...
def execute(self, context):
... whatever you return it retrievable in later tasks through XCOM
....
return {"input_key": "input_value"}
Then in your DAG
input_operator_task = ... your InputOperator ....
downstream_task = SomeExistingOperator(
task_id='downstream_task',
keyword_arg_using_your_inputs="{{ti.xcom_pull(task_ids='input_operator_task')}}",
dag=dag
)
The XCOM pull is evaluated through the Jinja template.
Let me know if that helps, or if I completely misunderstood :)
Joe Nap
On Wed, Aug 3, 2016 at 5:29 PM, Andrew Phillips <[email protected]> wrote:
> Hi all
>
> What is/are the suggested way(s) of passing "input parameters" to a DAG
> run (adding quotes since, as far as we can tell, that concept doesn't exist
> natively in Airflow, probably by design)?
>
> This would be information that is used by one or multiple operators in a
> DAG run and that should not change for all task instances in that DAG run,
> but may be different for another DAG run executing concurrently. An example
> would be a Git pull request number.
>
> What we tried first was to use a Variable for this, but it doesn't look
> like that will work because the value can change during the execution of
> the DAG run. At least, that seems to be the case in the way we're using it:
>
> input_params = Variable.get(<variable_for_dag>)
> dag = DAG(..., params=input_params)
>
> We had hoped that this would "fix" the values of the parameters when the
> DAG run was created, but that does not seem to be the case: if the variable
> is updated (in preparation for a new DAG run) while a DAG run is active,
> tasks that haven't executed yet see the new value. I.e. we end up seeing
> this:
>
> set Variable my_param to "foo"
> dag_run_1 starts, gets the variable and passes my_param to the Dag object
> dag_run_1.op_1 evaluates {{ params.my_param }} and gets "foo"
> set Variable my_param to "bar"
> dag_run_2 starts and passes var to the Dag object
> dag_run_1.op_2 evaluates {{ params.my_param }} and sees "bar" # want this
> to still be foo!
>
> Not sure at this point whether this is a bug or, if not, whether there's a
> different way to retrieve the value of a variable that allows us to "fix"
> it for the duration of the DAG run.
>
> Or, taking a step back, is there some other approach that we could use to
> store and retrieve input data to DAGs?
>
> Regards
>
> ap
>
>
>
--
*Joe Napolitano *| Sr. Data Engineer
www.blueapron.com | 5 Crosby Street, New York, NY 10013