I need help designing a DAG

High level problem:
I need a way to launch tasks through an API and manage their state, when
they fail I need the ability to automatically retry.

What's involved:
We use Singularity (https://github.com/HubSpot/Singularity) to launch tasks
on Mesos which can be standalone containers or Spark jobs that run for
hours.

What I've done so far:
I've written an Operator for interacting with the Singularity API and I can
launch a task that Singularity manages. I then need to wait and poll the
API for changes to the task state. The task can be in a few states but the
most important are FINISHED and FAILED. So I wrote a Sensor that polls the
API and watches for the task UID, that was passed through XCom from the
SingularityOperator, each poll it checks the various states. If everything
passes, everything is great and the DAG moves along.
The problem happens when the Singularity task fails, the SingularitySensor
will fail which is fine, but I don't know of a way to tell the previous
SingularityOperator task to re-execute, so the DAG is stuck.

Options I'm considering to resolve this problem:
1. Remove the Sensor and put the polling logic in the execute function for
the SingularityOperator. That will mean the Operator task will last for the
duration of the Singularity managed task which can be 4+ hours and the
majority of the time will be spent polling the API. I'll also have to write
my own poll logic, which isn't terrible but I won't get to use the work
already written in the BaseSensorOperator
2. Find a way to call back to the previous task in the event of Sensor
failure; I'd like the flow to go "execute_singularity_task ->
check_singularity_task"; if "check_singularity_task" is in the FAILED
state, clear both "execute_singularity_task" and "check_singularity_task"
and rerun from "execute_singularity_task" on.
3. Ask you guys for a better design

The end goal is to have the following:
0. The ability to launch and manage tasks through the Singularity API
1. The ability for retries on failure at any point in the DAG without human
intervention
2. A simple as possible DAG

Here's a gist for the DAG:
https://gist.github.com/sannessa/dea05f743a1250c1e5e8a8e10c49d7b5

Here's a gist for the Operator:
https://gist.github.com/sannessa/7652c97de3c99426663d9541b2abeba3

Here's a gist for the Sensor:
https://gist.github.com/sannessa/14a427ee55f90ec2dff60e038e93edb5
(This is a crude implementation and doesn't handle all of the states. I
figured before I invested more time in making this more robust and elegant
I'd spend time figuring out if this was the correct tool for the job.)

Thanks!

-- Steve

Reply via email to