Is anyone able to give useful pointers to Jon?

+Ash Berlin-Taylor <a...@apache.org> +kaxiln...@gmail.com
<kaxiln...@gmail.com> +Driesprong, Fokko <fo...@driesprong.frl> , +Jarek
Potiuk <jarek.pot...@polidea.com> tagging you just because I know you all
and feel safe about not being judged :D

On Tue, Oct 15, 2019 at 1:41 PM Jonathan Miles <j...@cybus.co.uk> wrote:

> Hi all,
>
> Posting this to dev instead of users because it crosses into framework
> territory.
>
> I've been using Airflow for six months or so and I'm starting to think
> about how to better manage Airflow tasks that are proxies for compute
> tasks running elsewhere, e.g. steps on Amazon EMR clusters. It's easy
> enough use a DAG with the various existing Emr...Operators to create
> clusters, add steps and tear down. However, with large numbers of
> parallel steps it's hard to manage the creation of EMR clusters, reuse
> them in various steps and potentially even dynamically scale the EMR
> cluster count. I want something more akin to a producer/consumer queue
> for EMR steps. Before I write an AIP, I want to see if anyone's aware of
> any other work or development in this area?
>
> *Example Problem 1*: cluster reuse.
>
> A workflow might need to spin up and execute steps on multiple EMR
> clusters of different sizes, e.g.
>
> - EMR Cluster A
> -- Phase 1 EMR Steps
> -- Phase 2 EMR Steps
> -- Phase 3 EMR Steps
> -- Phase 4 EMR Steps
>
> - EMR Cluster B
> -- Phase 5 EMR Steps
> -- Phase 6 EMR Steps
>
> - EMR Cluster C
> -- Phase 7 EMR Steps
> -- Phase 8 EMR Steps
>
> The above steps are serial, requiring the previous phase to finish. The
> most basic way to model it in a DAG is to use EmrCreateJobFlowOperator
> for a cluster, then a set of EmrAddStepsOperator and EmrStepSensor pairs
> for each phase and finally terminate the cluster with
> EmrTerminateJobFlowOperator. We can use XCom to fetch the cluster id for
> AddSteps/TerminateJobFlow and to fetch the step ids for the StepSensor.
>
> - EmrCreateJobFlowOperator
> - Phase 1: EmrAddStepsOperator + EmrStepSensor
> - Phase 2: EmrAddStepsOperator + EmrStepSensor
> - Phase 3: EmrAddStepsOperator + EmrStepSensor
> - Phase 4: EmrAddStepsOperator + EmrStepSensor
> - EmrTerminateJobFlowOperator
>
> One problem here is that if the underlying EMR cluster fails at any time
> - e.g. it could be using spot EC2 instances and AWS capacity runs out -
> someone needs to manually attend to the failed task instance: restart
> the EMR cluster; then reset the state of the failed AddSteps/StepSensor
> task pairs. It needs close supervision.
>
> There are other ways to model this workflow in a DAG with different
> trade-offs, e.g.
>
> 1. Put each phase in a SubDag then create and terminate the cluster for
> each phase, with any failed task causing the whole SubDag to retry. But
> this adds extra total duration due to stopping/starting the cluster for
> each phase, which is not insignificant.
>
> 2. Write custom operators. One to represent an EMR cluster as a
> EmrSubDagOperator to create and eventually terminate cluster. Then
> another another to create sub-tasks that use XCom to fetch the cluster
> id from the "parent" SubDag, add the EMR steps and wait.
>
> Ideally, however, I'd like Airflow to manage this itself: it could know
> that task instances require certain resources or resource instances,
> then start/stop them as required.
>
> *Example Problem 2*: parallel tasks.
>
> A workflow might be split into many parallel steps, e.g. for different
> data partitions or bins.
>
> - Phase P EMR Steps
> -- Parallel EMR step for bin 0
> -- ...
> -- Parallel EMR step for bin N
>
> The above steps are independent, where they compute a subset of the
> larger problem, so can be run in parallel. A basic way to model this as
> a DAG is to create as many branches as the desired parallelism level,
> e.g. for parallelism of two:
>
> - EmrCreateJobFlowOperator
> -- Bin 0: EmrAddStepsOperator + EmrStepSensor
> -- ...
> -- Bin max(even(B)): EmrAddStepsOperator + EmrStepSensor
> -- EmrTerminateJobFlowOperator
>
> - EmrCreateJobFlowOperator
> -- Bin 1: EmrAddStepsOperator + EmrStepSensor
> -- ...
> -- Bin max(odd(B)): EmrAddStepsOperator + EmrStepSensor
> -- EmrTerminateJobFlowOperator
>
> This has the same management problem as the previous example when
> clusters fail, but also another challenge: the parallelism level is
> statically coded into the DAG topology. It's hard to scale up/down and
> rebalance the tasks for the bins.
>
> I could create a separate "EMR cluster" management service outside of
> Airflow and write a custom Airflow operator to put EMR steps into a
> queue then have that service auto-scale depending on queue depth etc. If
> the queue were Celery, it starts to look like a specialised Airflow
> Executor.
>
> *Solutions*:
>
> Without yet doing much research, I've considered some competing
> solutions for both problems:
>
> 1. Service for managing resources. Custom operator to communicate with
> the service and schedule an atomic set of "EMR steps" with a given EMR
> cluster specification. Service decides whether or not to reuse an
> existing cluster or spin up a new one, can auto-scale etc. We can
> represent both serial or parallel step patterns in the Airflow DAG itself.
>
> 2. Build resource management into Airflow
>
> 2a. Allow tasks to specify *resource* dependencies. As written above, a
> new dimension of dependencies that lets Airflow manage when instances of
> a resource should be spun up or otherwise acquired.
>
> 2b. Allow Airflow to have multiple Executors (could be an implementation
> of 2a), e.g. EmrClusterExecutor. The scheduler still does its thing, but
> tasks are run on a specialised kind of executor that understands EMR steps.
>
> Any thoughts? Has anyone worked on this set of problems before? I'm
> specifically looking at EMR right now, but I suspect there are many
> other use-cases.
>
> Regards,
>
> Jon
>
>

Reply via email to