[
https://issues.apache.org/jira/browse/AIRFLOW-3097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17062176#comment-17062176
]
Tony Brookes commented on AIRFLOW-3097:
---------------------------------------
Hi [~jdrodriguez],
We have run into issues lately. It's not that the tasks themselves run any
slower when inside a SubDAG it's that the time between tasks is
disproportionately long. I see 60 second gaps (VERY very evenly distributed)
between one task ending and the next one starting. If the tasks themselves
only take a short time to run then this becomes a big problem.
I believe (am still gathering data) that this is some multiple of how deep into
the sub dag stack you are and (again currently a guess) I think it's 20 seconds
* level of nesting. I am still working my way through the code to figure that
out though.
We have designed our code base so that we can configure our various to either
render as a SubDagOperator or just render the same set of tasks directly into
the top level DAG (if you pass in the prior task and the next task then what
you generate and hook up to those tasks is up to you, so you look at config and
decide whether to wrap all your tasks in a sub dag operator or just wire them
up directly.
This is currently giving us 2 problems and it's impossible to solve both.
# If I render all tasks into the top level DAG then I waste a lot of time
"skipping" tasks I don't need on this particular run, when if they were inside
a SubDAG the top level DAG only has to skip the single SubDAG operator.
# If I render the tasks inside SubDAGs then I have these big delays whenever
the tasks inside a SubDAG actually need to run (and are not skipped by the top
level DAG.)
I am currently trying to figure a way through this. My current approach
involve investigating the way Airflow schedules with SubDAGs. It seems to
schedule a SubDAG as a back fill task, and I think (hunch at the moment) this
is something to do with why the intra task scheduler delays are so big. Either
that or just the nested SubDAGs. Not sure which yet.
This is my focus for the next day or two so hopefully I will figure something
out!
> Capability for nested SubDags
> -----------------------------
>
> Key: AIRFLOW-3097
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3097
> Project: Apache Airflow
> Issue Type: New Feature
> Components: operators
> Affects Versions: 1.8.0
> Reporter: John Longo
> Priority: Critical
> Labels: subdag
>
> Unless I'm doing something incorrectly, it appears that you cannot nest
> SubDags which would be a very helpful feature. I've created a simple
> pipeline to demonstrate the failure case below. It produces the following in
> Airflow: Broken DAG: [/home/airflow/airflow/dags/test_dag.py] 'NoneType'
> object has no attribute 'dag_id'
> test_dag.py
> {code:java}
> from airflow import DAG
> from airflow.operators.subdag_operator import SubDagOperator
> import datetime
> from datetime import timedelta
> from test_subdag1 import TestSubDag1
> startDate = '2018-09-20'
> default_args = {
> 'owner': 'airflow',
> 'depends_on_past': False,
> 'email': ['[email protected]'],
> 'start_date': datetime.datetime(2018, 3, 20, 9, 0),
> 'email_on_failure': False,
> 'email_on_retry': False,
> 'retries': 5,
> 'retry_delay': timedelta(seconds=30),
> 'run_as_user': 'airflow'
> }
> Test_DAG = DAG('Test_DAG', default_args=default_args,
> start_date=datetime.datetime(2018, 3, 20, 9, 0), schedule_interval=None,
> catchup=False)
> test_subdag1 = SubDagOperator(subdag=TestSubDag1('Test_DAG', 'test_subdag1',
> startDate),
> task_id='test_subdag1',
> dag=Test_DAG)
> TestDagConsolidateTask = DummyOperator(task_id='TestDag_Consolidate',
> dag=Test_DAG)
> test_subdag1 >> TestDagConsolidateTask
> {code}
> test_subdag1.py
> {code:java}
> from airflow import DAG
> from airflow.operators.subdag_operator import SubDagOperator
> from airflow.operators.dummy_operator import DummyOperator
> from test_subdag2 import TestSubDag2
> import datetime
> from datetime import timedelta
> def TestSubDag1(parent_dag_name, child_dag_name, startDate):
> subdag = DAG(
> '%s.%s' % (parent_dag_name, child_dag_name),
> schedule_interval=None,
> start_date=startDate)
> test_subdag2 = SubDagOperator(subdag=TestSubDag2('%s.%s' % (parent_dag_name,
> child_dag_name), 'test_subdag2', startDate),
> task_id='test_subdag2',
> dag=subdag)
> Subdag1ConsolidateTask = DummyOperator(task_id='Subdag1_Consolidate',
> dag=subdag)
> test_subdag2 >> Subdag1ConsolidateTask
> {code}
>
> test_subdag2.py
> {code:java}
> // code placeholder
> from airflow import DAG
> from airflow.operators.dummy_operator import DummyOperator
> import datetime
> from datetime import timedelta
> def TestSubDag2(parent_dag_name, child_dag_name, startDate):
> subdag = DAG(
> '%s.%s' % (parent_dag_name, child_dag_name),
> schedule_interval=None,
> start_date=startDate)
> TestTask = DummyOperator(task_id='TestTask', dag=subdag)
> Subdag2ConsolidateTask = DummyOperator(task_id='Subdag2_Consolidate',
> dag=subdag)
> TestTask >> Subdag2ConsolidateTask
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)