[
https://issues.apache.org/jira/browse/AIRFLOW-4297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16822945#comment-16822945
]
Xiaodong DENG commented on AIRFLOW-4297:
----------------------------------------
For our notes, an analysis of the code chunk which handles SLA miss records
discovering and updating:
Comments starting with "!!!Xiaodong" are my analysis lines.
{code:java}
# !!!Xiaodong: This part will find the LATEST SUCCESS/SKIPPED task instances
# !!!Xiaodong: Due to this design, if there is no SUCCESS/SKIPPED task
instances for the DAG,
# !!!Xiaodong: No SLA miss record can be discovered or created
TI = models.TaskInstance
sq = (
session
.query(
TI.task_id,
func.max(TI.execution_date).label('max_ti'))
.with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql')
.filter(TI.dag_id == dag.dag_id)
.filter(or_(
TI.state == State.SUCCESS,
TI.state == State.SKIPPED))
.filter(TI.task_id.in_(dag.task_ids))
.group_by(TI.task_id).subquery('sq')
)
max_tis = session.query(TI).filter(
TI.dag_id == dag.dag_id,
TI.task_id == sq.c.task_id,
TI.execution_date == sq.c.max_ti,
).all()
ts = timezone.utcnow()
for ti in max_tis:
task = dag.get_task(ti.task_id)
dttm = ti.execution_date
if isinstance(task.sla, timedelta):
# !!!Xiaodong: for each latest SUCCESS/SKIPPED task, we can get its
execution_date first,
# !!!Xiaodong: but we check from its start_time, that's why we have a
`dttm = dag.following_schedule(dttm)` here
# !!!Xiaodong: A BIG issue here: dag.following_schedule() will return
None
# !!!Xiaodong: if the scheduler_interval of this DAG is None or "@once".
# !!!Xiaodong: This is why the we have the bug reported in this ticket
('None' can't be compared with a time).
dttm = dag.following_schedule(dttm)
while dttm < timezone.utcnow():
following_schedule = dag.following_schedule(dttm)
if following_schedule + task.sla < timezone.utcnow():
session.merge(SlaMiss(
task_id=ti.task_id,
dag_id=ti.dag_id,
execution_date=dttm,
timestamp=ts))
dttm = dag.following_schedule(dttm)
session.commit()
{code}
>From the chunk above, we can find that the SchedulerJob.manage_slas() is
>purely written for DAGs with non-None/"@once" schedule_intervals. But does
>come with a few essential flaws, like my first comment in the code above.
> Manually triggerd DAG with no schedule_interval breaks scheduler
> ----------------------------------------------------------------
>
> Key: AIRFLOW-4297
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4297
> Project: Apache Airflow
> Issue Type: Bug
> Affects Versions: 1.10.3
> Reporter: Ash Berlin-Taylor
> Assignee: Xiaodong DENG
> Priority: Blocker
> Fix For: 1.10.4
>
>
> {code:title=example_dag.py}
> timezone = "UTC"
> local_tz = pendulum.timezone(timezone)
> start_date = datetime.datetime.strptime('2019-03-28 07:57:00', "%Y-%m-%d
> %H:%M:%S")
> start_date = start_date.replace(tzinfo=local_tz)
> default_args = {
> 'owner': 'DaniRC'
> }
> dag = DAG('testsla',
> default_args=default_args,
> start_date=start_date,
> concurrency=1,
> max_active_runs=1,
> default_view='tree',
> orientation='TB',
> catchup=False,
> schedule_interval=None
> )
> {code}
> If this DAG is triggered, then this error occurs:
> {noformat}
> Process DagFileProcessor5303-Process:
> Traceback (most recent call last):
> File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in
> _bootstrap
> self.run()
> File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
> self._target(*self._args, **self._kwargs)
> File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 402, in helper
> pickle_dags)
> File "/usr/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in
> wrapper
> return func(*args, **kwargs)
> File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 1760, in
> process_file
> self._process_dags(dagbag, dags, ti_keys_to_schedule)
> File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 1452, in
> _process_dags
> self.manage_slas(dag)
> File "/usr/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in
> wrapper
> return func(*args, **kwargs)
> File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 662, in
> manage_slas
> while dttm < timezone.utcnow():
> TypeError: '<' not supported between instances of 'NoneType' and
> 'datetime.datetime'
> {noformat}
> After this happens the scheduler also won't schedule any further tasks for
> _ANY_ dag.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)