Hi Harvey,

PR is out: https://github.com/apache/incubator-airflow/pull/1988 
<https://github.com/apache/incubator-airflow/pull/1988>

You you try it out and report on the results?

Thanks!
Bolke

> On 12 Jan 2017, at 21:41, Bolke de Bruin <bdbr...@gmail.com> wrote:
> 
> Ok I figured it out. Actually two bugs are observed:
> 
> 1. Retry_period not honoured
> 2. UP_FOR_RETRY tasks that with a dag_run that is marked failed are 
> nevertheless executed.
> 
> The problem is a combination of 3 issues.
> 
> 1. The dag_run is marked failed although a task is still in its 
> retry_delay_period
> 2. A task does not check its own “retry_delay” as the task is set to “queued” 
> by the scheduler and not “up_for_retry”
> 3. _execute_task_instance is called with “UP_FOR_RETRY” and “SCHEDULED”. This 
> function pulls tasks directly from the database and does not check 
> dependencies.
> 
> I think (for now) the fix is to:
> 
> 1. Set tasks that are up_for_retry and out of their retry period to 
> “SCHEDULED”
> 2. Update the call to _execute_task_instance to only allow “SCHEDULED” tasks
> 
> - Bolke
> 
>> On 12 Jan 2017, at 20:53, Bolke de Bruin <bdbr...@gmail.com 
>> <mailto:bdbr...@gmail.com>> wrote:
>> 
>> While the dag_run is not marked as failed and the Task should not retry yet 
>> it is nevertheless still scheduled. 
>> 
>> Bolke
>> 
>> 
>>> On 12 Jan 2017, at 20:16, Bolke de Bruin <bdbr...@gmail.com 
>>> <mailto:bdbr...@gmail.com>> wrote:
>>> 
>>> Hi All,
>>> 
>>> I further analysed this and its an error with the dependency engine. In the 
>>> current state of master / 1.8 when updating the state of a DAG run, the 
>>> “UP_FOR_RETRY” state is not considered. This leads to a dag run that is 
>>> marked deadlocked.
>>> 
>>> Applying a patch from https://github.com/apache/incubator-airflow/pull/1934 
>>> <https://github.com/apache/incubator-airflow/pull/1934> , particularly the 
>>> “ignore_in_retry_period”-part the logs show that the retry_period is being 
>>> honoured, but eventually the dag run still gets marked deadlocked.
>>> 
>>> These are the logs:
>>> 
>>> [2017-01-12 20:12:00,840] {jobs.py:801} DagFileProcessor262 INFO - 
>>> Examining DAG run <DagRun test_retry_handling_job_spotify @ 2016-10-05 
>>> 19:00:00: scheduled__2016-10-05T19:00:00, externally triggered: False>
>>> [2017-01-12 20:12:00,846] {models.py:3865} DagFileProcessor262 INFO - 
>>> Updating state for <DagRun test_retry_handling_job_spotify @ 2016-10-05 
>>> 19:00:00: scheduled__2016-10-05T19:00:00, externally triggered: False> 
>>> considering 2 task(s)
>>> [2017-01-12 20:12:00,849] {not_in_retry_period_dep.py:30} 
>>> DagFileProcessor262 INFO - Checking (u'test_retry_handling_job_spotify', 
>>> u'test_retry_handling_op1', datetime.datetime(2016, 10, 5, 19, 0)) state: 
>>> up_for_retry
>>> [2017-01-12 20:12:00,852] {jobs.py:844} DagFileProcessor262 INFO - Checking 
>>> dependencies
>>> [2017-01-12 20:12:00,862] {jobs.py:850} DagFileProcessor262 INFO - Done 
>>> checking and queuing
>>> [2017-01-12 20:12:00,862] {jobs.py:844} DagFileProcessor262 INFO - Checking 
>>> dependencies
>>> [2017-01-12 20:12:00,862] {not_in_retry_period_dep.py:30} 
>>> DagFileProcessor262 INFO - Checking (u'test_retry_handling_job_spotify', 
>>> u'test_retry_handling_op1', datetime.datetime(2016, 10, 5, 19, 0)) state: 
>>> up_for_retry
>>> [2017-01-12 20:12:00,862] {models.py:996} DagFileProcessor262 INFO - Prem: 
>>> State up_for_retry, next_retry: 2017-01-12 20:20:13.122766 , now: 
>>> 2017-01-12 20:12:00.862789
>>> [2017-01-12 20:12:00,862] {models.py:1109} DagFileProcessor262 INFO - State 
>>> up_for_retry, next_retry: 2017-01-12 20:20:13.122766 , now: 2017-01-12 
>>> 20:12:00.862928
>>> [2017-01-12 20:12:00,863] {jobs.py:850} DagFileProcessor262 INFO - Done 
>>> checking and queuing
>>> /Users/bolke/Documents/dev/airflow_env/lib/python2.7/site-packages/SQLAlchemy-1.1.4-py2.7-macosx-10.12-x86_64.egg/sqlalchemy/sql/default_comparator.py:161:
>>>  SAWarning: The IN-predicate on "dag_run.dag_id" was invoked with an empty 
>>> sequence. This results in a contradiction, which nonetheless can be 
>>> expensive to evaluate.  Consider alternative strategies for improved 
>>> performance.
>>> [2017-01-12 20:12:00,879] {models.py:321} DagFileProcessor262 INFO - 
>>> Finding 'running' jobs without a recent heartbeat
>>> [2017-01-12 20:12:00,879] {models.py:327} DagFileProcessor262 INFO - 
>>> Failing jobs without heartbeat after 2017-01-12 20:07:00.879746
>>> [2017-01-12 20:12:00,886] {jobs.py:331} DagFileProcessor262 INFO - 
>>> Processing /Users/bolke/airflow/dags/retry_delay.py took 0.095 seconds
>>> 
>>> 
>>> [2017-01-12 20:12:05,962] {jobs.py:323} DagFileProcessor272 INFO - Started 
>>> process (PID=25181) to work on /Users/bolke/airflow/dags/retry_delay.py
>>> [2017-01-12 20:12:05,975] {jobs.py:1483} DagFileProcessor272 INFO - 
>>> Processing file /Users/bolke/airflow/dags/retry_delay.py for tasks to queue
>>> [2017-01-12 20:12:05,975] {models.py:168} DagFileProcessor272 INFO - 
>>> Filling up the DagBag from /Users/bolke/airflow/dags/retry_delay.py
>>> [2017-01-12 20:12:05,982] {jobs.py:1497} DagFileProcessor272 INFO - DAG(s) 
>>> ['test_retry_handling_job_spotify'] retrieved from 
>>> /Users/bolke/airflow/dags/retry_delay.py
>>> [2017-01-12 20:12:06,001] {jobs.py:1109} DagFileProcessor272 INFO - 
>>> Processing test_retry_handling_job_spotify
>>> [2017-01-12 20:12:06,008] {jobs.py:801} DagFileProcessor272 INFO - 
>>> Examining DAG run <DagRun test_retry_handling_job_spotify @ 2016-10-05 
>>> 19:00:00: scheduled__2016-10-05T19:00:00, externally triggered: False>
>>> [2017-01-12 20:12:06,014] {models.py:3865} DagFileProcessor272 INFO - 
>>> Updating state for <DagRun test_retry_handling_job_spotify @ 2016-10-05 
>>> 19:00:00: scheduled__2016-10-05T19:00:00, externally triggered: False> 
>>> considering 2 task(s)
>>> [2017-01-12 20:12:06,021] {models.py:3914} DagFileProcessor272 INFO - 
>>> Deadlock; marking run <DagRun test_retry_handling_job_spotify @ 2016-10-05 
>>> 19:00:00: scheduled__2016-10-05T19:00:00, externally triggered: False> 
>>> failed
>>> 
>>> I consider this a blocker for 1.8 and I could use some help in solving the 
>>> issue.
>>> 
>>> Bolke.
>>> 
>>>> On 11 Jan 2017, at 21:45, Bolke de Bruin <bdbr...@gmail.com 
>>>> <mailto:bdbr...@gmail.com>> wrote:
>>>> 
>>>> First analysis:
>>>> 
>>>> The dependency checker (at least the retry_delay checker) is not called in 
>>>> the scheduler for some reason. As it leaves the scheduler the state will 
>>>> be set to “queued” which won’t match the “is_premature” requirement of 
>>>> “UP_FOR_RETRY”.
>>>> 
>>>> @Dan: are you able to help out here? 
>>>> 
>>>> Bolke
>>>> 
>>>>> On 11 Jan 2017, at 18:31, Harvey Xia <harvey...@spotify.com.INVALID 
>>>>> <mailto:harvey...@spotify.com.INVALID>> wrote:
>>>>> 
>>>>> Hi Bolke,
>>>>> 
>>>>> Here is the JIRA issue, https://issues.apache.org/jira/browse/AIRFLOW-747 
>>>>> <https://issues.apache.org/jira/browse/AIRFLOW-747>.
>>>>> 
>>>>> 
>>>>> Harvey Xia | Software Engineer
>>>>> harvey...@spotify.com <mailto:harvey...@spotify.com>
>>>>> +1 (339) 225 1875
>>>>> 
>>>>> On Wed, Jan 11, 2017 at 11:32 AM, Bolke de Bruin <bdbr...@gmail.com 
>>>>> <mailto:bdbr...@gmail.com>> wrote:
>>>>> 
>>>>>> Hi Harvey,
>>>>>> 
>>>>>> Thanks for reporting! Can you create a lira for this? I’ll have a look if
>>>>>> I can reproduce it.
>>>>>> 
>>>>>> - Bolke
>>>>>> 
>>>>>>> On 11 Jan 2017, at 16:06, Harvey Xia <harvey...@spotify.com.INVALID 
>>>>>>> <mailto:harvey...@spotify.com.INVALID>>
>>>>>> wrote:
>>>>>>> 
>>>>>>> Hi all,
>>>>>>> 
>>>>>>> In Airflow 1.8 alpha 2, using LocalExecutor, DAGs do not seem to honor
>>>>>> the
>>>>>>> retry_delay parameter, i.e. the retries happen immediately one after the
>>>>>>> other without waiting the specific retry_delay time. However, the
>>>>>> *number*
>>>>>>> of retries is honored. I am testing with the following code:
>>>>>>> 
>>>>>>> from airflow import DAG
>>>>>>> from airflow.operators.bash_operator import BashOperator
>>>>>>> from datetime import datetime, timedelta
>>>>>>> 
>>>>>>> default_args = {
>>>>>>> 'owner': 'airflow',
>>>>>>> 'depends_on_past': False,
>>>>>>> 'start_date': datetime(2016, 10, 5, 19),
>>>>>>> 'end_date': datetime(2016, 10, 6, 19),
>>>>>>> 'email': ['airf...@airflow.com <mailto:airf...@airflow.com>'],
>>>>>>> 'email_on_failure': False,
>>>>>>> 'email_on_retry': False,
>>>>>>> 'retries': 10,
>>>>>>> 'retry_delay': timedelta(0, 500)
>>>>>>> }
>>>>>>> 
>>>>>>> dag = DAG('test_retry_handling_job', default_args=default_args,
>>>>>>> schedule_interval='@once')
>>>>>>> 
>>>>>>> task1 = BashOperator(
>>>>>>> task_id='test_retry_handling_op1',
>>>>>>> bash_command='exit 1',
>>>>>>> dag=dag)
>>>>>>> 
>>>>>>> task2 = BashOperator(
>>>>>>> task_id='test_retry_handling_op2',
>>>>>>> bash_command='exit 1',
>>>>>>> dag=dag)
>>>>>>> 
>>>>>>> task2.set_upstream(task1)
>>>>>>> 
>>>>>>> Let me know if anyone has any ideas about this issue, thanks!
>>>>>>> 
>>>>>>> Harvey Xia | Software Engineer
>>>>>>> harvey...@spotify.com <mailto:harvey...@spotify.com>
>>>>>>> +1 (339) 225 1875
>>>>>> 
>>>>>> 
>>>> 
>>> 
>> 
> 

Reply via email to