That's a great idea, to provide a generic way to do these. I feel like
standalone sensors are a bit abused in the framework, like they're
better suited as triggers when an external source is ready (e.g. new S3
file appears) than to poll for completion of a previous task (e.g.
EmrStepSensor); exactly because of the lack of atomicity with retries
discussed earlier in the thread. Whereas the BaseAsyncOperator (I'd
prefer BaseAtomicOperator or modify existing BaseSensorOperator) removes
the need for the sensor misuse. Ideally all operators should have the
ability to use pre/action/sense/post hooks.
It still doesn't solve the problem that we need to write a combined
EmrAddStep + Sensor operator, but helps all of these types of
implementations be more consistent.
I've added some comments to the PR, thanks for bringing it up!
Jon
On 15/10/2019 20:04, Jarek Potiuk wrote:
I think it could be solved in a more consistent and future-proof way.
There is this new proposal for Base Async operators proposed by Jacob:
https://github.com/apache/airflow/pull/6210
This is an interesting approach that might solve the problem in a slightly
different way. Rather than combining operators and sensors, it introduces
an Async Operator that has two stages: "initiate operation" and sensor-like
"wait for the operator to complete". In this model a lot of long-running
operators we have now could be rewritten (easily) using the Async operator
model and you could run them in either sync mode (like there are now) or
with the Async mode that would be equivalent to the "Operator" + "Sensor"
pattern that is indeed a bit problematic.
The EMRAddStep + Sensor could be an Async Operator then.
J.
On Tue, Oct 15, 2019 at 7:13 PM Jonathan Miles <j...@cybus.co.uk> wrote:
Yes, I often refer to this as the "atomicity problem": we usually want
all four "create", "add steps", "wait for steps", "terminate cluster"
tasks to retry together and in order if any one of them fails (well, the
terminate one is questionable). In our current Dags we resolved this by
putting the tasks in a SubDag and changing the on_retry_callback to
clear the state of the sub-tasks. But use of SubDags makes navigation in
the UI a bit of a pain, so we've planned to merge them into a single
custom operator soon. There's also the problem that for big workflows,
doing this adds a lot of duration due to the management overhead of
starting/stopping EMR clusters instead of reusing them. I'm about to
send a separate e-mail about that.
I think it'd be great to have a combined operator upstream in the codebase!
Jon
On 14/10/2019 20:42, Daniel Mateus Pires wrote:
Hi there!
Would it make sense to add an operator that is both the EmrAddStep
operator
and the step sensor?
In a past role we were using Airflow heavily for all things EMR, and I
found myself writing an Operator that combined the emr_add_step operator
and the sensor, it made the pipelines simpler (less operator instances
per
DAG) and retries were easy
There is still value in keeping those 2 other classes around when we
don't
care about the result of an EMR step or we are polling for the completion
of an EMR step we did not start from Airflow, but for most tasks
wouldn't a
"merged operator" make sense?
Thanks!
Daniel