We've gotten around this by implementing the external async job API call in the __init__ of the sensor and then poll as normal. If the polling fails, the next sensor instantiates a new external async job. Note this will also trigger new jobs if you hit the timeout.
Here's the gist with our dag and sensor: https://gist.github.com/lauralorenz/bf47280b90067c71fe691bdf70b4145a On Thu, Feb 2, 2017 at 8:36 AM, Bolke de Bruin <[email protected]> wrote: > Hi Steve, > > At the moment we don’t have the possibility in Airflow to combine multiple > tasks in one unit of analysis, eg. when a task fails return to the > beginning of the set. We also don’t expose the functionality of resetting a > task state by API at the moment. You could mimic this behaviour (warning > this really is a hack) that if you get to a failed state you clear the > state of the earlier task in the database. I never tried it and it > certainly isn’t very clean or will be supported in anyway. > > What you could do is spit your dag in two. One that runs your > SinglarityOperator and one that monitors it. If it fails the monitor can > trigger a new dag_run for your first dag. > > - Bolke > > > On 1 Feb 2017, at 08:40, Steve Annessa <[email protected]> wrote: > > > > I need help designing a DAG > > > > High level problem: > > I need a way to launch tasks through an API and manage their state, when > > they fail I need the ability to automatically retry. > > > > What's involved: > > We use Singularity (https://github.com/HubSpot/Singularity) to launch > tasks > > on Mesos which can be standalone containers or Spark jobs that run for > > hours. > > > > What I've done so far: > > I've written an Operator for interacting with the Singularity API and I > can > > launch a task that Singularity manages. I then need to wait and poll the > > API for changes to the task state. The task can be in a few states but > the > > most important are FINISHED and FAILED. So I wrote a Sensor that polls > the > > API and watches for the task UID, that was passed through XCom from the > > SingularityOperator, each poll it checks the various states. If > everything > > passes, everything is great and the DAG moves along. > > The problem happens when the Singularity task fails, the > SingularitySensor > > will fail which is fine, but I don't know of a way to tell the previous > > SingularityOperator task to re-execute, so the DAG is stuck. > > > > Options I'm considering to resolve this problem: > > 1. Remove the Sensor and put the polling logic in the execute function > for > > the SingularityOperator. That will mean the Operator task will last for > the > > duration of the Singularity managed task which can be 4+ hours and the > > majority of the time will be spent polling the API. I'll also have to > write > > my own poll logic, which isn't terrible but I won't get to use the work > > already written in the BaseSensorOperator > > 2. Find a way to call back to the previous task in the event of Sensor > > failure; I'd like the flow to go "execute_singularity_task -> > > check_singularity_task"; if "check_singularity_task" is in the FAILED > > state, clear both "execute_singularity_task" and "check_singularity_task" > > and rerun from "execute_singularity_task" on. > > 3. Ask you guys for a better design > > > > The end goal is to have the following: > > 0. The ability to launch and manage tasks through the Singularity API > > 1. The ability for retries on failure at any point in the DAG without > human > > intervention > > 2. A simple as possible DAG > > > > Here's a gist for the DAG: > > https://gist.github.com/sannessa/dea05f743a1250c1e5e8a8e10c49d7b5 > > > > Here's a gist for the Operator: > > https://gist.github.com/sannessa/7652c97de3c99426663d9541b2abeba3 > > > > Here's a gist for the Sensor: > > https://gist.github.com/sannessa/14a427ee55f90ec2dff60e038e93edb5 > > (This is a crude implementation and doesn't handle all of the states. I > > figured before I invested more time in making this more robust and > elegant > > I'd spend time figuring out if this was the correct tool for the job.) > > > > Thanks! > > > > -- Steve > >
