Thanks for the feedback Laura and Bolke. I think I'll try Laura's approach and make the call to launch the task in the Sensor's init function.
-- Steve On Thu, Feb 2, 2017 at 9:55 AM, Laura Lorenz <[email protected]> wrote: > We've gotten around this by implementing the external async job API call in > the __init__ of the sensor and then poll as normal. If the polling fails, > the next sensor instantiates a new external async job. Note this will also > trigger new jobs if you hit the timeout. > > Here's the gist with our dag and sensor: > > https://gist.github.com/lauralorenz/bf47280b90067c71fe691bdf70b4145a > > On Thu, Feb 2, 2017 at 8:36 AM, Bolke de Bruin <[email protected]> wrote: > > > Hi Steve, > > > > At the moment we don’t have the possibility in Airflow to combine > multiple > > tasks in one unit of analysis, eg. when a task fails return to the > > beginning of the set. We also don’t expose the functionality of > resetting a > > task state by API at the moment. You could mimic this behaviour (warning > > this really is a hack) that if you get to a failed state you clear the > > state of the earlier task in the database. I never tried it and it > > certainly isn’t very clean or will be supported in anyway. > > > > What you could do is spit your dag in two. One that runs your > > SinglarityOperator and one that monitors it. If it fails the monitor can > > trigger a new dag_run for your first dag. > > > > - Bolke > > > > > On 1 Feb 2017, at 08:40, Steve Annessa <[email protected]> > wrote: > > > > > > I need help designing a DAG > > > > > > High level problem: > > > I need a way to launch tasks through an API and manage their state, > when > > > they fail I need the ability to automatically retry. > > > > > > What's involved: > > > We use Singularity (https://github.com/HubSpot/Singularity) to launch > > tasks > > > on Mesos which can be standalone containers or Spark jobs that run for > > > hours. > > > > > > What I've done so far: > > > I've written an Operator for interacting with the Singularity API and I > > can > > > launch a task that Singularity manages. I then need to wait and poll > the > > > API for changes to the task state. The task can be in a few states but > > the > > > most important are FINISHED and FAILED. So I wrote a Sensor that polls > > the > > > API and watches for the task UID, that was passed through XCom from the > > > SingularityOperator, each poll it checks the various states. If > > everything > > > passes, everything is great and the DAG moves along. > > > The problem happens when the Singularity task fails, the > > SingularitySensor > > > will fail which is fine, but I don't know of a way to tell the previous > > > SingularityOperator task to re-execute, so the DAG is stuck. > > > > > > Options I'm considering to resolve this problem: > > > 1. Remove the Sensor and put the polling logic in the execute function > > for > > > the SingularityOperator. That will mean the Operator task will last for > > the > > > duration of the Singularity managed task which can be 4+ hours and the > > > majority of the time will be spent polling the API. I'll also have to > > write > > > my own poll logic, which isn't terrible but I won't get to use the work > > > already written in the BaseSensorOperator > > > 2. Find a way to call back to the previous task in the event of Sensor > > > failure; I'd like the flow to go "execute_singularity_task -> > > > check_singularity_task"; if "check_singularity_task" is in the FAILED > > > state, clear both "execute_singularity_task" and > "check_singularity_task" > > > and rerun from "execute_singularity_task" on. > > > 3. Ask you guys for a better design > > > > > > The end goal is to have the following: > > > 0. The ability to launch and manage tasks through the Singularity API > > > 1. The ability for retries on failure at any point in the DAG without > > human > > > intervention > > > 2. A simple as possible DAG > > > > > > Here's a gist for the DAG: > > > https://gist.github.com/sannessa/dea05f743a1250c1e5e8a8e10c49d7b5 > > > > > > Here's a gist for the Operator: > > > https://gist.github.com/sannessa/7652c97de3c99426663d9541b2abeba3 > > > > > > Here's a gist for the Sensor: > > > https://gist.github.com/sannessa/14a427ee55f90ec2dff60e038e93edb5 > > > (This is a crude implementation and doesn't handle all of the states. I > > > figured before I invested more time in making this more robust and > > elegant > > > I'd spend time figuring out if this was the correct tool for the job.) > > > > > > Thanks! > > > > > > -- Steve > > > > >
