[
https://issues.apache.org/jira/browse/AIRFLOW-637?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Taylor Murphy updated AIRFLOW-637:
----------------------------------
Description:
Currently in models.get_template_context we have:
{code}
run_id = ''
dag_run = None
if hasattr(task, 'dag'):
if task.dag.params:
params.update(task.dag.params)
dag_run = (
session.query(DagRun)
.filter_by(
dag_id=task.dag.dag_id,
execution_date=self.execution_date)
.first()
)
run_id = dag_run.run_id if dag_run else None
session.expunge_all()
session.commit()
{code}
Since subdags do not have associated DagRuns for their dag_id, the context will
not contain the dagrun or run_id associated with the subdag, even though it
exists associated with the parent dag.
If we change this code to up the DagRun based on the dag_id of the parent dag,
we can set these values.
was:
Currently in models.get_template_context we have:
{code:python}
run_id = ''
dag_run = None
if hasattr(task, 'dag'):
if task.dag.params:
params.update(task.dag.params)
dag_run = (
session.query(DagRun)
.filter_by(
dag_id=task.dag.dag_id,
execution_date=self.execution_date)
.first()
)
run_id = dag_run.run_id if dag_run else None
session.expunge_all()
session.commit()
{code}
Since subdags do not have associated DagRuns for their dag_id, the context will
not contain the dagrun or run_id associated with the subdag, even though it
exists associated with the parent dag.
If we change this code to up the DagRun based on the dag_id of the parent dag,
we can set these values.
> Operators in Subdags don't have their associated DagRun in the context
> ----------------------------------------------------------------------
>
> Key: AIRFLOW-637
> URL: https://issues.apache.org/jira/browse/AIRFLOW-637
> Project: Apache Airflow
> Issue Type: Bug
> Components: DagRun, operators, subdag
> Reporter: Taylor Murphy
> Priority: Minor
>
> Currently in models.get_template_context we have:
> {code}
> run_id = ''
> dag_run = None
> if hasattr(task, 'dag'):
> if task.dag.params:
> params.update(task.dag.params)
> dag_run = (
> session.query(DagRun)
> .filter_by(
> dag_id=task.dag.dag_id,
> execution_date=self.execution_date)
> .first()
> )
> run_id = dag_run.run_id if dag_run else None
> session.expunge_all()
> session.commit()
> {code}
> Since subdags do not have associated DagRuns for their dag_id, the context
> will not contain the dagrun or run_id associated with the subdag, even though
> it exists associated with the parent dag.
> If we change this code to up the DagRun based on the dag_id of the parent
> dag, we can set these values.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)