[ 
https://issues.apache.org/jira/browse/AIRFLOW-637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Taylor Murphy updated AIRFLOW-637:
----------------------------------
    Description: 
Currently in models.get_template_context we have:
{code:python}
run_id = ''
        dag_run = None
        if hasattr(task, 'dag'):
            if task.dag.params:
                params.update(task.dag.params)
            dag_run = (
                session.query(DagRun)
                .filter_by(
                    dag_id=task.dag.dag_id,
                    execution_date=self.execution_date)
                .first()
            )
            run_id = dag_run.run_id if dag_run else None
            session.expunge_all()
session.commit()
{code}

Since subdags do not have associated DagRuns for their dag_id, the context will 
not contain the dagrun or run_id associated with the subdag, even though it 
exists associated with the parent dag.

If we change this code to up the DagRun based on the dag_id of the parent dag, 
we can set these values. 

  was:Currently in models.get_template_context a dag's associated DagRun and 
run_id (populated from the dag_run) are both found by querying the data base 
for the dag_id. Since subdags don't have a dag run in the db, the dagrun and 
run_id will be set to None. By looking up the run in the db by the name of the 
parent dag (which will always be the first part of the dag_id when split by 
'.', we can return the current dag run object for the parent dag, and populate 
the run_id accordingly. 


> Operators in Subdags don't have their associated DagRun in the context
> ----------------------------------------------------------------------
>
>                 Key: AIRFLOW-637
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-637
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: DagRun, operators, subdag
>            Reporter: Taylor Murphy
>            Priority: Minor
>
> Currently in models.get_template_context we have:
> {code:python}
> run_id = ''
>         dag_run = None
>         if hasattr(task, 'dag'):
>             if task.dag.params:
>                 params.update(task.dag.params)
>             dag_run = (
>                 session.query(DagRun)
>                 .filter_by(
>                     dag_id=task.dag.dag_id,
>                     execution_date=self.execution_date)
>                 .first()
>             )
>             run_id = dag_run.run_id if dag_run else None
>             session.expunge_all()
> session.commit()
> {code}
> Since subdags do not have associated DagRuns for their dag_id, the context 
> will not contain the dagrun or run_id associated with the subdag, even though 
> it exists associated with the parent dag.
> If we change this code to up the DagRun based on the dag_id of the parent 
> dag, we can set these values. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to