[
https://issues.apache.org/jira/browse/AIRFLOW-3097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17062180#comment-17062180
]
Tony Brookes commented on AIRFLOW-3097:
---------------------------------------
Just to be clear. I'm seeing these delays with the various max delay flags in
the airflow.cfg set to 0. Nothing is set to 60 seconds, for example. With
debugging turned on I am seeing that the code seems to go into a rabbits hole
of checking to see if it should do something and something like 5 times out of
6 decides not to do anything... Can't figure it out just yet!
> Capability for nested SubDags
> -----------------------------
>
> Key: AIRFLOW-3097
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3097
> Project: Apache Airflow
> Issue Type: New Feature
> Components: operators
> Affects Versions: 1.8.0
> Reporter: John Longo
> Priority: Critical
> Labels: subdag
>
> Unless I'm doing something incorrectly, it appears that you cannot nest
> SubDags which would be a very helpful feature. I've created a simple
> pipeline to demonstrate the failure case below. It produces the following in
> Airflow: Broken DAG: [/home/airflow/airflow/dags/test_dag.py] 'NoneType'
> object has no attribute 'dag_id'
> test_dag.py
> {code:java}
> from airflow import DAG
> from airflow.operators.subdag_operator import SubDagOperator
> import datetime
> from datetime import timedelta
> from test_subdag1 import TestSubDag1
> startDate = '2018-09-20'
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'email': ['[email protected]'],
> 'start_date': datetime.datetime(2018, 3, 20, 9, 0),
> 'email_on_failure': False,
> 'email_on_retry': False,
> 'retries': 5,
> 'retry_delay': timedelta(seconds=30),
> 'run_as_user': 'airflow'
> }
> Test_DAG = DAG('Test_DAG', default_args=default_args,
> start_date=datetime.datetime(2018, 3, 20, 9, 0), schedule_interval=None,
> catchup=False)
> test_subdag1 = SubDagOperator(subdag=TestSubDag1('Test_DAG', 'test_subdag1',
> startDate),
> task_id='test_subdag1',
> dag=Test_DAG)
> TestDagConsolidateTask = DummyOperator(task_id='TestDag_Consolidate',
> dag=Test_DAG)
> test_subdag1 >> TestDagConsolidateTask
> {code}
> test_subdag1.py
> {code:java}
> from airflow import DAG
> from airflow.operators.subdag_operator import SubDagOperator
> from airflow.operators.dummy_operator import DummyOperator
> from test_subdag2 import TestSubDag2
> import datetime
> from datetime import timedelta
> def TestSubDag1(parent_dag_name, child_dag_name, startDate):
> subdag = DAG(
> '%s.%s' % (parent_dag_name, child_dag_name),
> schedule_interval=None,
> start_date=startDate)
> test_subdag2 = SubDagOperator(subdag=TestSubDag2('%s.%s' % (parent_dag_name,
> child_dag_name), 'test_subdag2', startDate),
> task_id='test_subdag2',
> dag=subdag)
> Subdag1ConsolidateTask = DummyOperator(task_id='Subdag1_Consolidate',
> dag=subdag)
> test_subdag2 >> Subdag1ConsolidateTask
> {code}
>
> test_subdag2.py
> {code:java}
> // code placeholder
> from airflow import DAG
> from airflow.operators.dummy_operator import DummyOperator
> import datetime
> from datetime import timedelta
> def TestSubDag2(parent_dag_name, child_dag_name, startDate):
> subdag = DAG(
> '%s.%s' % (parent_dag_name, child_dag_name),
> schedule_interval=None,
> start_date=startDate)
> TestTask = DummyOperator(task_id='TestTask', dag=subdag)
> Subdag2ConsolidateTask = DummyOperator(task_id='Subdag2_Consolidate',
> dag=subdag)
> TestTask >> Subdag2ConsolidateTask
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)