[ https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stefan Seelmann updated AIRFLOW-2747: ------------------------------------- Description: By default sensors block a worker and just sleep between pokes. This is very inefficient, especially when there are many long-running sensors. There is a hacky workaroud by setting a small timeout value and a high retry number. But that has drawbacks: * Errors raised by sensors are hidden and the sensor retries too often * The sensor is retried in a fixed time interval (with optional exponential backoff) * There are many attempts and many log files are generated I'd like to propose an explicit reschedule mechanism: * A new "reschedule" flag for sensors, if set to True it will raise an AirflowRescheduleException that causes a reschedule. * AirflowRescheduleException contains the (earliest) re-schedule date. * Reschedule requests are recorded in new `task_reschedule` table and visualized in the Gantt view. * A new TI dependency that checks if a sensor task is ready to be re-scheduled. Advantages: * This change is backward compatible. Existing sensors behave like before. But it's possible to set the "reschedule" flag. * The poke_interval, timeout, and soft_fail parameters are still respected and used to calculate the next schedule time. * Custom sensor implementations can even define the next sensible schedule date by raising AirflowRescheduleException themselves. * Existing TimeSensor and TimeDeltaSensor can also be changed to be rescheduled when the time is reached. * This mechanism can also be used by non-sensor operators (but then the new ReadyToRescheduleDep has to be added to deps or BaseOperator). Design decisions and caveats: * When handling AirflowRescheduleException the `try_number` is decremented. That means that subsequent runs use the same try number and write to the same log file. * Sensor TI dependency check now depends on `task_reschedule` table. However only the BaseSensorOperator includes the new ReadyToRescheduleDep for now. Open questions and TODOs: * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting the state back to `NONE`? This would require more changes in scheduler code and especially in the UI, but the state of a task would be more explicit and more transparent to the user. * Add example/test for a non-sensor operator * Document the new feature was: By default sensors block a worker and just sleep between pokes. This is very inefficient, especially when there are many long-running sensors. There is a hacky workaroud by setting a small timeout value and a high retry number. But that has drawbacks: * Errors throws by sensors are hidden and the sensor retries too often * The sensor is retried in a fixed time interval (with optional exponential backoff) * There are many attempts and many log files are generated I'd like to propose an explicit reschedule mechanism: * A new "reschedule" flag for sensors, if set to True it will raise an AirflowRescheduleException that causes a reschedule. * AirflowRescheduleException contains the (earliest) re-schedule date. * Reschedule requests are recorded in new `task_reschedule` table and visualized in the Gantt view. * A new TI dependency that checks if a sensor task is ready to be re-scheduled. Advantages: * This change is backward compatible. Existing sensors behave like before. But it's possible to set the "reschedule" flag. * The poke_interval, timeout, and soft_fail parameters are still respected and used to calculate the next schedule time. * Custom sensor implementations can even define the next sensible schedule date by raising AirflowRescheduleException themselves. * Existing TimeSensor and TimeDeltaSensor can also be changed to be rescheduled when the time is reached. * This mechanism can also be used by non-sensor operators (but then the new ReadyToRescheduleDep has to be added to deps or BaseOperator). Design decisions and caveats: * When handling AirflowRescheduleException the `try_number` is decremented. That means that subsequent runs use the same try number and write to the same log file. * Sensor TI dependency check now depends on `task_reschedule` table. However only the BaseSensorOperator includes the new ReadyToRescheduleDep for now. Open questions and TODOs: * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting the state back to `NONE`? This would require more changes in scheduler code and especially in the UI, but the state of a task would be more explicit and more transparent to the user. * Add example/test for a non-sensor operator * Document the new feature > Explicit re-schedule of sensors > ------------------------------- > > Key: AIRFLOW-2747 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2747 > Project: Apache Airflow > Issue Type: Improvement > Components: core, operators > Affects Versions: 1.9.0 > Reporter: Stefan Seelmann > Assignee: Stefan Seelmann > Priority: Major > Fix For: 2.0.0 > > Attachments: Screenshot_2018-07-12_14-10-24.png > > > By default sensors block a worker and just sleep between pokes. This is very > inefficient, especially when there are many long-running sensors. > There is a hacky workaroud by setting a small timeout value and a high retry > number. But that has drawbacks: > * Errors raised by sensors are hidden and the sensor retries too often > * The sensor is retried in a fixed time interval (with optional exponential > backoff) > * There are many attempts and many log files are generated > I'd like to propose an explicit reschedule mechanism: > * A new "reschedule" flag for sensors, if set to True it will raise an > AirflowRescheduleException that causes a reschedule. > * AirflowRescheduleException contains the (earliest) re-schedule date. > * Reschedule requests are recorded in new `task_reschedule` table and > visualized in the Gantt view. > * A new TI dependency that checks if a sensor task is ready to be > re-scheduled. > Advantages: > * This change is backward compatible. Existing sensors behave like before. > But it's possible to set the "reschedule" flag. > * The poke_interval, timeout, and soft_fail parameters are still respected > and used to calculate the next schedule time. > * Custom sensor implementations can even define the next sensible schedule > date by raising AirflowRescheduleException themselves. > * Existing TimeSensor and TimeDeltaSensor can also be changed to be > rescheduled when the time is reached. > * This mechanism can also be used by non-sensor operators (but then the new > ReadyToRescheduleDep has to be added to deps or BaseOperator). > Design decisions and caveats: > * When handling AirflowRescheduleException the `try_number` is decremented. > That means that subsequent runs use the same try number and write to the same > log file. > * Sensor TI dependency check now depends on `task_reschedule` table. However > only the BaseSensorOperator includes the new ReadyToRescheduleDep for now. > Open questions and TODOs: > * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting > the state back to `NONE`? This would require more changes in scheduler code > and especially in the UI, but the state of a task would be more explicit and > more transparent to the user. > * Add example/test for a non-sensor operator > * Document the new feature -- This message was sent by Atlassian JIRA (v7.6.3#76005)