Hi Steve,

At the moment we don’t have the possibility in Airflow to combine multiple 
tasks in one unit of analysis, eg. when a task fails return to the beginning of 
the set. We also don’t expose the functionality of resetting a task state by 
API at the moment. You could mimic this behaviour (warning this really is a 
hack) that if you get to a failed state you clear the state of the earlier task 
in the database. I never tried it and it certainly isn’t very clean or will be 
supported in anyway. 

What you could do is spit your dag in two. One that runs your 
SinglarityOperator and one that monitors it. If it fails the monitor can 
trigger a new dag_run for your first dag.

- Bolke
 
> On 1 Feb 2017, at 08:40, Steve Annessa <[email protected]> wrote:
> 
> I need help designing a DAG
> 
> High level problem:
> I need a way to launch tasks through an API and manage their state, when
> they fail I need the ability to automatically retry.
> 
> What's involved:
> We use Singularity (https://github.com/HubSpot/Singularity) to launch tasks
> on Mesos which can be standalone containers or Spark jobs that run for
> hours.
> 
> What I've done so far:
> I've written an Operator for interacting with the Singularity API and I can
> launch a task that Singularity manages. I then need to wait and poll the
> API for changes to the task state. The task can be in a few states but the
> most important are FINISHED and FAILED. So I wrote a Sensor that polls the
> API and watches for the task UID, that was passed through XCom from the
> SingularityOperator, each poll it checks the various states. If everything
> passes, everything is great and the DAG moves along.
> The problem happens when the Singularity task fails, the SingularitySensor
> will fail which is fine, but I don't know of a way to tell the previous
> SingularityOperator task to re-execute, so the DAG is stuck.
> 
> Options I'm considering to resolve this problem:
> 1. Remove the Sensor and put the polling logic in the execute function for
> the SingularityOperator. That will mean the Operator task will last for the
> duration of the Singularity managed task which can be 4+ hours and the
> majority of the time will be spent polling the API. I'll also have to write
> my own poll logic, which isn't terrible but I won't get to use the work
> already written in the BaseSensorOperator
> 2. Find a way to call back to the previous task in the event of Sensor
> failure; I'd like the flow to go "execute_singularity_task ->
> check_singularity_task"; if "check_singularity_task" is in the FAILED
> state, clear both "execute_singularity_task" and "check_singularity_task"
> and rerun from "execute_singularity_task" on.
> 3. Ask you guys for a better design
> 
> The end goal is to have the following:
> 0. The ability to launch and manage tasks through the Singularity API
> 1. The ability for retries on failure at any point in the DAG without human
> intervention
> 2. A simple as possible DAG
> 
> Here's a gist for the DAG:
> https://gist.github.com/sannessa/dea05f743a1250c1e5e8a8e10c49d7b5
> 
> Here's a gist for the Operator:
> https://gist.github.com/sannessa/7652c97de3c99426663d9541b2abeba3
> 
> Here's a gist for the Sensor:
> https://gist.github.com/sannessa/14a427ee55f90ec2dff60e038e93edb5
> (This is a crude implementation and doesn't handle all of the states. I
> figured before I invested more time in making this more robust and elegant
> I'd spend time figuring out if this was the correct tool for the job.)
> 
> Thanks!
> 
> -- Steve

Reply via email to