Hi Bolke,

Upon testing with the same DAG I included in the JIRA ticket
<https://issues.apache.org/jira/browse/AIRFLOW-747>, it seems that the
retry_delay still isn't honored. I'm using the LocalExecutor. Let me know
if I can provide any other information, thanks for looking into this!


Harvey Xia | Software Engineer
harvey...@spotify.com
+1 (339) 225 1875

On Fri, Jan 13, 2017 at 6:34 AM, Bolke de Bruin <bdbr...@gmail.com> wrote:

> Hi Harvey,
>
> PR is out: https://github.com/apache/incubator-airflow/pull/1988 <
> https://github.com/apache/incubator-airflow/pull/1988>
>
> You you try it out and report on the results?
>
> Thanks!
> Bolke
>
> > On 12 Jan 2017, at 21:41, Bolke de Bruin <bdbr...@gmail.com> wrote:
> >
> > Ok I figured it out. Actually two bugs are observed:
> >
> > 1. Retry_period not honoured
> > 2. UP_FOR_RETRY tasks that with a dag_run that is marked failed are
> nevertheless executed.
> >
> > The problem is a combination of 3 issues.
> >
> > 1. The dag_run is marked failed although a task is still in its
> retry_delay_period
> > 2. A task does not check its own “retry_delay” as the task is set to
> “queued” by the scheduler and not “up_for_retry”
> > 3. _execute_task_instance is called with “UP_FOR_RETRY” and “SCHEDULED”.
> This function pulls tasks directly from the database and does not check
> dependencies.
> >
> > I think (for now) the fix is to:
> >
> > 1. Set tasks that are up_for_retry and out of their retry period to
> “SCHEDULED”
> > 2. Update the call to _execute_task_instance to only allow “SCHEDULED”
> tasks
> >
> > - Bolke
> >
> >> On 12 Jan 2017, at 20:53, Bolke de Bruin <bdbr...@gmail.com <mailto:
> bdbr...@gmail.com>> wrote:
> >>
> >> While the dag_run is not marked as failed and the Task should not retry
> yet it is nevertheless still scheduled.
> >>
> >> Bolke
> >>
> >>
> >>> On 12 Jan 2017, at 20:16, Bolke de Bruin <bdbr...@gmail.com <mailto:
> bdbr...@gmail.com>> wrote:
> >>>
> >>> Hi All,
> >>>
> >>> I further analysed this and its an error with the dependency engine.
> In the current state of master / 1.8 when updating the state of a DAG run,
> the “UP_FOR_RETRY” state is not considered. This leads to a dag run that is
> marked deadlocked.
> >>>
> >>> Applying a patch from https://github.com/apache/
> incubator-airflow/pull/1934 <https://github.com/apache/
> incubator-airflow/pull/1934> , particularly the
> “ignore_in_retry_period”-part the logs show that the retry_period is being
> honoured, but eventually the dag run still gets marked deadlocked.
> >>>
> >>> These are the logs:
> >>>
> >>> [2017-01-12 20:12:00,840] {jobs.py:801} DagFileProcessor262 INFO -
> Examining DAG run <DagRun test_retry_handling_job_spotify @ 2016-10-05
> 19:00:00: scheduled__2016-10-05T19:00:00, externally triggered: False>
> >>> [2017-01-12 20:12:00,846] {models.py:3865} DagFileProcessor262 INFO -
> Updating state for <DagRun test_retry_handling_job_spotify @ 2016-10-05
> 19:00:00: scheduled__2016-10-05T19:00:00, externally triggered: False>
> considering 2 task(s)
> >>> [2017-01-12 20:12:00,849] {not_in_retry_period_dep.py:30}
> DagFileProcessor262 INFO - Checking (u'test_retry_handling_job_spotify',
> u'test_retry_handling_op1', datetime.datetime(2016, 10, 5, 19, 0)) state:
> up_for_retry
> >>> [2017-01-12 20:12:00,852] {jobs.py:844} DagFileProcessor262 INFO -
> Checking dependencies
> >>> [2017-01-12 20:12:00,862] {jobs.py:850} DagFileProcessor262 INFO -
> Done checking and queuing
> >>> [2017-01-12 20:12:00,862] {jobs.py:844} DagFileProcessor262 INFO -
> Checking dependencies
> >>> [2017-01-12 20:12:00,862] {not_in_retry_period_dep.py:30}
> DagFileProcessor262 INFO - Checking (u'test_retry_handling_job_spotify',
> u'test_retry_handling_op1', datetime.datetime(2016, 10, 5, 19, 0)) state:
> up_for_retry
> >>> [2017-01-12 20:12:00,862] {models.py:996} DagFileProcessor262 INFO -
> Prem: State up_for_retry, next_retry: 2017-01-12 20:20:13.122766 , now:
> 2017-01-12 20:12:00.862789
> >>> [2017-01-12 20:12:00,862] {models.py:1109} DagFileProcessor262 INFO -
> State up_for_retry, next_retry: 2017-01-12 20:20:13.122766 , now:
> 2017-01-12 20:12:00.862928
> >>> [2017-01-12 20:12:00,863] {jobs.py:850} DagFileProcessor262 INFO -
> Done checking and queuing
> >>> /Users/bolke/Documents/dev/airflow_env/lib/python2.7/
> site-packages/SQLAlchemy-1.1.4-py2.7-macosx-10.12-x86_64.
> egg/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate
> on "dag_run.dag_id" was invoked with an empty sequence. This results in a
> contradiction, which nonetheless can be expensive to evaluate.  Consider
> alternative strategies for improved performance.
> >>> [2017-01-12 20:12:00,879] {models.py:321} DagFileProcessor262 INFO -
> Finding 'running' jobs without a recent heartbeat
> >>> [2017-01-12 20:12:00,879] {models.py:327} DagFileProcessor262 INFO -
> Failing jobs without heartbeat after 2017-01-12 20:07:00.879746
> >>> [2017-01-12 20:12:00,886] {jobs.py:331} DagFileProcessor262 INFO -
> Processing /Users/bolke/airflow/dags/retry_delay.py took 0.095 seconds
> >>>
> >>>
> >>> [2017-01-12 20:12:05,962] {jobs.py:323} DagFileProcessor272 INFO -
> Started process (PID=25181) to work on /Users/bolke/airflow/dags/
> retry_delay.py
> >>> [2017-01-12 20:12:05,975] {jobs.py:1483} DagFileProcessor272 INFO -
> Processing file /Users/bolke/airflow/dags/retry_delay.py for tasks to
> queue
> >>> [2017-01-12 20:12:05,975] {models.py:168} DagFileProcessor272 INFO -
> Filling up the DagBag from /Users/bolke/airflow/dags/retry_delay.py
> >>> [2017-01-12 20:12:05,982] {jobs.py:1497} DagFileProcessor272 INFO -
> DAG(s) ['test_retry_handling_job_spotify'] retrieved from
> /Users/bolke/airflow/dags/retry_delay.py
> >>> [2017-01-12 20:12:06,001] {jobs.py:1109} DagFileProcessor272 INFO -
> Processing test_retry_handling_job_spotify
> >>> [2017-01-12 20:12:06,008] {jobs.py:801} DagFileProcessor272 INFO -
> Examining DAG run <DagRun test_retry_handling_job_spotify @ 2016-10-05
> 19:00:00: scheduled__2016-10-05T19:00:00, externally triggered: False>
> >>> [2017-01-12 20:12:06,014] {models.py:3865} DagFileProcessor272 INFO -
> Updating state for <DagRun test_retry_handling_job_spotify @ 2016-10-05
> 19:00:00: scheduled__2016-10-05T19:00:00, externally triggered: False>
> considering 2 task(s)
> >>> [2017-01-12 20:12:06,021] {models.py:3914} DagFileProcessor272 INFO -
> Deadlock; marking run <DagRun test_retry_handling_job_spotify @
> 2016-10-05 19:00:00: scheduled__2016-10-05T19:00:00, externally
> triggered: False> failed
> >>>
> >>> I consider this a blocker for 1.8 and I could use some help in solving
> the issue.
> >>>
> >>> Bolke.
> >>>
> >>>> On 11 Jan 2017, at 21:45, Bolke de Bruin <bdbr...@gmail.com <mailto:
> bdbr...@gmail.com>> wrote:
> >>>>
> >>>> First analysis:
> >>>>
> >>>> The dependency checker (at least the retry_delay checker) is not
> called in the scheduler for some reason. As it leaves the scheduler the
> state will be set to “queued” which won’t match the “is_premature”
> requirement of “UP_FOR_RETRY”.
> >>>>
> >>>> @Dan: are you able to help out here?
> >>>>
> >>>> Bolke
> >>>>
> >>>>> On 11 Jan 2017, at 18:31, Harvey Xia <harvey...@spotify.com.INVALID
> <mailto:harvey...@spotify.com.INVALID>> wrote:
> >>>>>
> >>>>> Hi Bolke,
> >>>>>
> >>>>> Here is the JIRA issue, https://issues.apache.org/
> jira/browse/AIRFLOW-747 <https://issues.apache.org/jira/browse/AIRFLOW-747
> >.
> >>>>>
> >>>>>
> >>>>> Harvey Xia | Software Engineer
> >>>>> harvey...@spotify.com <mailto:harvey...@spotify.com>
> >>>>> +1 (339) 225 1875
> >>>>>
> >>>>> On Wed, Jan 11, 2017 at 11:32 AM, Bolke de Bruin <bdbr...@gmail.com
> <mailto:bdbr...@gmail.com>> wrote:
> >>>>>
> >>>>>> Hi Harvey,
> >>>>>>
> >>>>>> Thanks for reporting! Can you create a lira for this? I’ll have a
> look if
> >>>>>> I can reproduce it.
> >>>>>>
> >>>>>> - Bolke
> >>>>>>
> >>>>>>> On 11 Jan 2017, at 16:06, Harvey Xia <harvey...@spotify.com.INVALID
> <mailto:harvey...@spotify.com.INVALID>>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> In Airflow 1.8 alpha 2, using LocalExecutor, DAGs do not seem to
> honor
> >>>>>> the
> >>>>>>> retry_delay parameter, i.e. the retries happen immediately one
> after the
> >>>>>>> other without waiting the specific retry_delay time. However, the
> >>>>>> *number*
> >>>>>>> of retries is honored. I am testing with the following code:
> >>>>>>>
> >>>>>>> from airflow import DAG
> >>>>>>> from airflow.operators.bash_operator import BashOperator
> >>>>>>> from datetime import datetime, timedelta
> >>>>>>>
> >>>>>>> default_args = {
> >>>>>>> 'owner': 'airflow',
> >>>>>>> 'depends_on_past': False,
> >>>>>>> 'start_date': datetime(2016, 10, 5, 19),
> >>>>>>> 'end_date': datetime(2016, 10, 6, 19),
> >>>>>>> 'email': ['airf...@airflow.com <mailto:airf...@airflow.com>'],
> >>>>>>> 'email_on_failure': False,
> >>>>>>> 'email_on_retry': False,
> >>>>>>> 'retries': 10,
> >>>>>>> 'retry_delay': timedelta(0, 500)
> >>>>>>> }
> >>>>>>>
> >>>>>>> dag = DAG('test_retry_handling_job', default_args=default_args,
> >>>>>>> schedule_interval='@once')
> >>>>>>>
> >>>>>>> task1 = BashOperator(
> >>>>>>> task_id='test_retry_handling_op1',
> >>>>>>> bash_command='exit 1',
> >>>>>>> dag=dag)
> >>>>>>>
> >>>>>>> task2 = BashOperator(
> >>>>>>> task_id='test_retry_handling_op2',
> >>>>>>> bash_command='exit 1',
> >>>>>>> dag=dag)
> >>>>>>>
> >>>>>>> task2.set_upstream(task1)
> >>>>>>>
> >>>>>>> Let me know if anyone has any ideas about this issue, thanks!
> >>>>>>>
> >>>>>>> Harvey Xia | Software Engineer
> >>>>>>> harvey...@spotify.com <mailto:harvey...@spotify.com>
> >>>>>>> +1 (339) 225 1875
> >>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> >
>
>

Reply via email to