Hey Harvey,

Can you verify that you indeed applied the patch? And supply logs (in the 
Jira). This is my log with a clean install of airflow and a setting of 100s as 
a delay:

[2017-01-13 18:28:46,599] {bash_operator.py:97} INFO - Command exited with 
return code 1
[2017-01-13 18:28:46,600] {models.py:1348} ERROR - Bash command failed
Traceback (most recent call last):
  File 
"/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/models.py",
 line 1305, in run
    result = task_copy.execute(context=context)
  File 
"/Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/airflow-1.7.2.dev0-py2.7.egg/airflow/operators/bash_operator.py",
 line 100, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2017-01-13 18:28:46,601] {models.py:1364} INFO - Marking task as UP_FOR_RETRY
[2017-01-13 18:28:46,609] {models.py:1393} ERROR - Bash command failed



[2017-01-13 18:30:28,880] {models.py:168} INFO - Filling up the DagBag from 
/Users/bolke/airflow/dags/retry_delay.py
[2017-01-13 18:30:28,929] {jobs.py:2019} INFO - Subprocess PID is 48594
[2017-01-13 18:30:29,781] {models.py:168} INFO - Filling up the DagBag from 
/Users/bolke/airflow/dags/retry_delay.py
[2017-01-13 18:30:29,817] {models.py:1062} INFO - Dependencies all met for 
<TaskInstance: test_retry_handling_job_spotify.test_retry_handling_op1 
2016-10-05 19:00:00 [queued]>
[2017-01-13 18:30:29,820] {models.py:1062} INFO - Dependencies all met for 
<TaskInstance: test_retry_handling_job_spotify.test_retry_handling_op1 
2016-10-05 19:00:00 [queued]>
[2017-01-13 18:30:29,820] {models.py:1250} INFO -
--------------------------------------------------------------------------------
Starting attempt 2 of 11
————————————————————————————————————————

This seems fine to me.

Bolke


> On 13 Jan 2017, at 16:19, Harvey Xia <harvey...@spotify.com.INVALID> wrote:
> 
> Hi Bolke,
> 
> Upon testing with the same DAG I included in the JIRA ticket
> <https://issues.apache.org/jira/browse/AIRFLOW-747 
> <https://issues.apache.org/jira/browse/AIRFLOW-747>>, it seems that the
> retry_delay still isn't honored. I'm using the LocalExecutor. Let me know
> if I can provide any other information, thanks for looking into this!
> 
> 
> Harvey Xia | Software Engineer
> harvey...@spotify.com <mailto:harvey...@spotify.com>
> +1 (339) 225 1875
> 
> On Fri, Jan 13, 2017 at 6:34 AM, Bolke de Bruin <bdbr...@gmail.com 
> <mailto:bdbr...@gmail.com>> wrote:
> 
>> Hi Harvey,
>> 
>> PR is out: https://github.com/apache/incubator-airflow/pull/1988 
>> <https://github.com/apache/incubator-airflow/pull/1988> <
>> https://github.com/apache/incubator-airflow/pull/1988 
>> <https://github.com/apache/incubator-airflow/pull/1988>>
>> 
>> You you try it out and report on the results?
>> 
>> Thanks!
>> Bolke
>> 
>>> On 12 Jan 2017, at 21:41, Bolke de Bruin <bdbr...@gmail.com> wrote:
>>> 
>>> Ok I figured it out. Actually two bugs are observed:
>>> 
>>> 1. Retry_period not honoured
>>> 2. UP_FOR_RETRY tasks that with a dag_run that is marked failed are
>> nevertheless executed.
>>> 
>>> The problem is a combination of 3 issues.
>>> 
>>> 1. The dag_run is marked failed although a task is still in its
>> retry_delay_period
>>> 2. A task does not check its own “retry_delay” as the task is set to
>> “queued” by the scheduler and not “up_for_retry”
>>> 3. _execute_task_instance is called with “UP_FOR_RETRY” and “SCHEDULED”.
>> This function pulls tasks directly from the database and does not check
>> dependencies.
>>> 
>>> I think (for now) the fix is to:
>>> 
>>> 1. Set tasks that are up_for_retry and out of their retry period to
>> “SCHEDULED”
>>> 2. Update the call to _execute_task_instance to only allow “SCHEDULED”
>> tasks
>>> 
>>> - Bolke
>>> 
>>>> On 12 Jan 2017, at 20:53, Bolke de Bruin <bdbr...@gmail.com 
>>>> <mailto:bdbr...@gmail.com> <mailto:
>> bdbr...@gmail.com <mailto:bdbr...@gmail.com>>> wrote:
>>>> 
>>>> While the dag_run is not marked as failed and the Task should not retry
>> yet it is nevertheless still scheduled.
>>>> 
>>>> Bolke
>>>> 
>>>> 
>>>>> On 12 Jan 2017, at 20:16, Bolke de Bruin <bdbr...@gmail.com 
>>>>> <mailto:bdbr...@gmail.com> <mailto:
>> bdbr...@gmail.com <mailto:bdbr...@gmail.com>>> wrote:
>>>>> 
>>>>> Hi All,
>>>>> 
>>>>> I further analysed this and its an error with the dependency engine.
>> In the current state of master / 1.8 when updating the state of a DAG run,
>> the “UP_FOR_RETRY” state is not considered. This leads to a dag run that is
>> marked deadlocked.
>>>>> 
>>>>> Applying a patch from https://github.com/apache/ 
>>>>> <https://github.com/apache/>
>> incubator-airflow/pull/1934 <https://github.com/apache/ 
>> <https://github.com/apache/>
>> incubator-airflow/pull/1934> , particularly the
>> “ignore_in_retry_period”-part the logs show that the retry_period is being
>> honoured, but eventually the dag run still gets marked deadlocked.
>>>>> 
>>>>> These are the logs:
>>>>> 
>>>>> [2017-01-12 20:12:00,840] {jobs.py:801} DagFileProcessor262 INFO -
>> Examining DAG run <DagRun test_retry_handling_job_spotify @ 2016-10-05
>> 19:00:00: scheduled__2016-10-05T19:00:00, externally triggered: False>
>>>>> [2017-01-12 20:12:00,846] {models.py:3865} DagFileProcessor262 INFO -
>> Updating state for <DagRun test_retry_handling_job_spotify @ 2016-10-05
>> 19:00:00: scheduled__2016-10-05T19:00:00, externally triggered: False>
>> considering 2 task(s)
>>>>> [2017-01-12 20:12:00,849] {not_in_retry_period_dep.py:30}
>> DagFileProcessor262 INFO - Checking (u'test_retry_handling_job_spotify',
>> u'test_retry_handling_op1', datetime.datetime(2016, 10, 5, 19, 0)) state:
>> up_for_retry
>>>>> [2017-01-12 20:12:00,852] {jobs.py:844} DagFileProcessor262 INFO -
>> Checking dependencies
>>>>> [2017-01-12 20:12:00,862] {jobs.py:850} DagFileProcessor262 INFO -
>> Done checking and queuing
>>>>> [2017-01-12 20:12:00,862] {jobs.py:844} DagFileProcessor262 INFO -
>> Checking dependencies
>>>>> [2017-01-12 20:12:00,862] {not_in_retry_period_dep.py:30}
>> DagFileProcessor262 INFO - Checking (u'test_retry_handling_job_spotify',
>> u'test_retry_handling_op1', datetime.datetime(2016, 10, 5, 19, 0)) state:
>> up_for_retry
>>>>> [2017-01-12 20:12:00,862] {models.py:996} DagFileProcessor262 INFO -
>> Prem: State up_for_retry, next_retry: 2017-01-12 20:20:13.122766 , now:
>> 2017-01-12 20:12:00.862789
>>>>> [2017-01-12 20:12:00,862] {models.py:1109} DagFileProcessor262 INFO -
>> State up_for_retry, next_retry: 2017-01-12 20:20:13.122766 , now:
>> 2017-01-12 20:12:00.862928
>>>>> [2017-01-12 20:12:00,863] {jobs.py:850} DagFileProcessor262 INFO -
>> Done checking and queuing
>>>>> /Users/bolke/Documents/dev/airflow_env/lib/python2.7/
>> site-packages/SQLAlchemy-1.1.4-py2.7-macosx-10.12-x86_64.
>> egg/sqlalchemy/sql/default_comparator.py:161: SAWarning: The IN-predicate
>> on "dag_run.dag_id" was invoked with an empty sequence. This results in a
>> contradiction, which nonetheless can be expensive to evaluate.  Consider
>> alternative strategies for improved performance.
>>>>> [2017-01-12 20:12:00,879] {models.py:321} DagFileProcessor262 INFO -
>> Finding 'running' jobs without a recent heartbeat
>>>>> [2017-01-12 20:12:00,879] {models.py:327} DagFileProcessor262 INFO -
>> Failing jobs without heartbeat after 2017-01-12 20:07:00.879746
>>>>> [2017-01-12 20:12:00,886] {jobs.py:331} DagFileProcessor262 INFO -
>> Processing /Users/bolke/airflow/dags/retry_delay.py took 0.095 seconds
>>>>> 
>>>>> 
>>>>> [2017-01-12 20:12:05,962] {jobs.py:323} DagFileProcessor272 INFO -
>> Started process (PID=25181) to work on /Users/bolke/airflow/dags/
>> retry_delay.py
>>>>> [2017-01-12 20:12:05,975] {jobs.py:1483} DagFileProcessor272 INFO -
>> Processing file /Users/bolke/airflow/dags/retry_delay.py for tasks to
>> queue
>>>>> [2017-01-12 20:12:05,975] {models.py:168} DagFileProcessor272 INFO -
>> Filling up the DagBag from /Users/bolke/airflow/dags/retry_delay.py
>>>>> [2017-01-12 20:12:05,982] {jobs.py:1497} DagFileProcessor272 INFO -
>> DAG(s) ['test_retry_handling_job_spotify'] retrieved from
>> /Users/bolke/airflow/dags/retry_delay.py
>>>>> [2017-01-12 20:12:06,001] {jobs.py:1109} DagFileProcessor272 INFO -
>> Processing test_retry_handling_job_spotify
>>>>> [2017-01-12 20:12:06,008] {jobs.py:801} DagFileProcessor272 INFO -
>> Examining DAG run <DagRun test_retry_handling_job_spotify @ 2016-10-05
>> 19:00:00: scheduled__2016-10-05T19:00:00, externally triggered: False>
>>>>> [2017-01-12 20:12:06,014] {models.py:3865} DagFileProcessor272 INFO -
>> Updating state for <DagRun test_retry_handling_job_spotify @ 2016-10-05
>> 19:00:00: scheduled__2016-10-05T19:00:00, externally triggered: False>
>> considering 2 task(s)
>>>>> [2017-01-12 20:12:06,021] {models.py:3914} DagFileProcessor272 INFO -
>> Deadlock; marking run <DagRun test_retry_handling_job_spotify @
>> 2016-10-05 19:00:00: scheduled__2016-10-05T19:00:00, externally
>> triggered: False> failed
>>>>> 
>>>>> I consider this a blocker for 1.8 and I could use some help in solving
>> the issue.
>>>>> 
>>>>> Bolke.
>>>>> 
>>>>>> On 11 Jan 2017, at 21:45, Bolke de Bruin <bdbr...@gmail.com 
>>>>>> <mailto:bdbr...@gmail.com> <mailto:
>> bdbr...@gmail.com <mailto:bdbr...@gmail.com>>> wrote:
>>>>>> 
>>>>>> First analysis:
>>>>>> 
>>>>>> The dependency checker (at least the retry_delay checker) is not
>> called in the scheduler for some reason. As it leaves the scheduler the
>> state will be set to “queued” which won’t match the “is_premature”
>> requirement of “UP_FOR_RETRY”.
>>>>>> 
>>>>>> @Dan: are you able to help out here?
>>>>>> 
>>>>>> Bolke
>>>>>> 
>>>>>>> On 11 Jan 2017, at 18:31, Harvey Xia <harvey...@spotify.com.INVALID 
>>>>>>> <mailto:harvey...@spotify.com.INVALID>
>> <mailto:harvey...@spotify.com.INVALID 
>> <mailto:harvey...@spotify.com.INVALID>>> wrote:
>>>>>>> 
>>>>>>> Hi Bolke,
>>>>>>> 
>>>>>>> Here is the JIRA issue, https://issues.apache.org/ 
>>>>>>> <https://issues.apache.org/>
>> jira/browse/AIRFLOW-747 <https://issues.apache.org/jira/browse/AIRFLOW-747 
>> <https://issues.apache.org/jira/browse/AIRFLOW-747>
>>> .
>>>>>>> 
>>>>>>> 
>>>>>>> Harvey Xia | Software Engineer
>>>>>>> harvey...@spotify.com <mailto:harvey...@spotify.com> 
>>>>>>> <mailto:harvey...@spotify.com <mailto:harvey...@spotify.com>>
>>>>>>> +1 (339) 225 1875
>>>>>>> 
>>>>>>> On Wed, Jan 11, 2017 at 11:32 AM, Bolke de Bruin <bdbr...@gmail.com 
>>>>>>> <mailto:bdbr...@gmail.com>
>> <mailto:bdbr...@gmail.com <mailto:bdbr...@gmail.com>>> wrote:
>>>>>>> 
>>>>>>>> Hi Harvey,
>>>>>>>> 
>>>>>>>> Thanks for reporting! Can you create a lira for this? I’ll have a
>> look if
>>>>>>>> I can reproduce it.
>>>>>>>> 
>>>>>>>> - Bolke
>>>>>>>> 
>>>>>>>>> On 11 Jan 2017, at 16:06, Harvey Xia <harvey...@spotify.com.INVALID 
>>>>>>>>> <mailto:harvey...@spotify.com.INVALID>
>> <mailto:harvey...@spotify.com.INVALID 
>> <mailto:harvey...@spotify.com.INVALID>>>
>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>> Hi all,
>>>>>>>>> 
>>>>>>>>> In Airflow 1.8 alpha 2, using LocalExecutor, DAGs do not seem to
>> honor
>>>>>>>> the
>>>>>>>>> retry_delay parameter, i.e. the retries happen immediately one
>> after the
>>>>>>>>> other without waiting the specific retry_delay time. However, the
>>>>>>>> *number*
>>>>>>>>> of retries is honored. I am testing with the following code:
>>>>>>>>> 
>>>>>>>>> from airflow import DAG
>>>>>>>>> from airflow.operators.bash_operator import BashOperator
>>>>>>>>> from datetime import datetime, timedelta
>>>>>>>>> 
>>>>>>>>> default_args = {
>>>>>>>>> 'owner': 'airflow',
>>>>>>>>> 'depends_on_past': False,
>>>>>>>>> 'start_date': datetime(2016, 10, 5, 19),
>>>>>>>>> 'end_date': datetime(2016, 10, 6, 19),
>>>>>>>>> 'email': ['airf...@airflow.com <mailto:airf...@airflow.com> 
>>>>>>>>> <mailto:airf...@airflow.com <mailto:airf...@airflow.com>>'],
>>>>>>>>> 'email_on_failure': False,
>>>>>>>>> 'email_on_retry': False,
>>>>>>>>> 'retries': 10,
>>>>>>>>> 'retry_delay': timedelta(0, 500)
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> dag = DAG('test_retry_handling_job', default_args=default_args,
>>>>>>>>> schedule_interval='@once')
>>>>>>>>> 
>>>>>>>>> task1 = BashOperator(
>>>>>>>>> task_id='test_retry_handling_op1',
>>>>>>>>> bash_command='exit 1',
>>>>>>>>> dag=dag)
>>>>>>>>> 
>>>>>>>>> task2 = BashOperator(
>>>>>>>>> task_id='test_retry_handling_op2',
>>>>>>>>> bash_command='exit 1',
>>>>>>>>> dag=dag)
>>>>>>>>> 
>>>>>>>>> task2.set_upstream(task1)
>>>>>>>>> 
>>>>>>>>> Let me know if anyone has any ideas about this issue, thanks!
>>>>>>>>> 
>>>>>>>>> Harvey Xia | Software Engineer
>>>>>>>>> harvey...@spotify.com <mailto:harvey...@spotify.com> 
>>>>>>>>> <mailto:harvey...@spotify.com <mailto:harvey...@spotify.com>>
>>>>>>>>> +1 (339) 225 1875

Reply via email to