Is anyone able to give useful pointers to Jon? +Ash Berlin-Taylor <a...@apache.org> +kaxiln...@gmail.com <kaxiln...@gmail.com> +Driesprong, Fokko <fo...@driesprong.frl> , +Jarek Potiuk <jarek.pot...@polidea.com> tagging you just because I know you all and feel safe about not being judged :D
On Tue, Oct 15, 2019 at 1:41 PM Jonathan Miles <j...@cybus.co.uk> wrote: > Hi all, > > Posting this to dev instead of users because it crosses into framework > territory. > > I've been using Airflow for six months or so and I'm starting to think > about how to better manage Airflow tasks that are proxies for compute > tasks running elsewhere, e.g. steps on Amazon EMR clusters. It's easy > enough use a DAG with the various existing Emr...Operators to create > clusters, add steps and tear down. However, with large numbers of > parallel steps it's hard to manage the creation of EMR clusters, reuse > them in various steps and potentially even dynamically scale the EMR > cluster count. I want something more akin to a producer/consumer queue > for EMR steps. Before I write an AIP, I want to see if anyone's aware of > any other work or development in this area? > > *Example Problem 1*: cluster reuse. > > A workflow might need to spin up and execute steps on multiple EMR > clusters of different sizes, e.g. > > - EMR Cluster A > -- Phase 1 EMR Steps > -- Phase 2 EMR Steps > -- Phase 3 EMR Steps > -- Phase 4 EMR Steps > > - EMR Cluster B > -- Phase 5 EMR Steps > -- Phase 6 EMR Steps > > - EMR Cluster C > -- Phase 7 EMR Steps > -- Phase 8 EMR Steps > > The above steps are serial, requiring the previous phase to finish. The > most basic way to model it in a DAG is to use EmrCreateJobFlowOperator > for a cluster, then a set of EmrAddStepsOperator and EmrStepSensor pairs > for each phase and finally terminate the cluster with > EmrTerminateJobFlowOperator. We can use XCom to fetch the cluster id for > AddSteps/TerminateJobFlow and to fetch the step ids for the StepSensor. > > - EmrCreateJobFlowOperator > - Phase 1: EmrAddStepsOperator + EmrStepSensor > - Phase 2: EmrAddStepsOperator + EmrStepSensor > - Phase 3: EmrAddStepsOperator + EmrStepSensor > - Phase 4: EmrAddStepsOperator + EmrStepSensor > - EmrTerminateJobFlowOperator > > One problem here is that if the underlying EMR cluster fails at any time > - e.g. it could be using spot EC2 instances and AWS capacity runs out - > someone needs to manually attend to the failed task instance: restart > the EMR cluster; then reset the state of the failed AddSteps/StepSensor > task pairs. It needs close supervision. > > There are other ways to model this workflow in a DAG with different > trade-offs, e.g. > > 1. Put each phase in a SubDag then create and terminate the cluster for > each phase, with any failed task causing the whole SubDag to retry. But > this adds extra total duration due to stopping/starting the cluster for > each phase, which is not insignificant. > > 2. Write custom operators. One to represent an EMR cluster as a > EmrSubDagOperator to create and eventually terminate cluster. Then > another another to create sub-tasks that use XCom to fetch the cluster > id from the "parent" SubDag, add the EMR steps and wait. > > Ideally, however, I'd like Airflow to manage this itself: it could know > that task instances require certain resources or resource instances, > then start/stop them as required. > > *Example Problem 2*: parallel tasks. > > A workflow might be split into many parallel steps, e.g. for different > data partitions or bins. > > - Phase P EMR Steps > -- Parallel EMR step for bin 0 > -- ... > -- Parallel EMR step for bin N > > The above steps are independent, where they compute a subset of the > larger problem, so can be run in parallel. A basic way to model this as > a DAG is to create as many branches as the desired parallelism level, > e.g. for parallelism of two: > > - EmrCreateJobFlowOperator > -- Bin 0: EmrAddStepsOperator + EmrStepSensor > -- ... > -- Bin max(even(B)): EmrAddStepsOperator + EmrStepSensor > -- EmrTerminateJobFlowOperator > > - EmrCreateJobFlowOperator > -- Bin 1: EmrAddStepsOperator + EmrStepSensor > -- ... > -- Bin max(odd(B)): EmrAddStepsOperator + EmrStepSensor > -- EmrTerminateJobFlowOperator > > This has the same management problem as the previous example when > clusters fail, but also another challenge: the parallelism level is > statically coded into the DAG topology. It's hard to scale up/down and > rebalance the tasks for the bins. > > I could create a separate "EMR cluster" management service outside of > Airflow and write a custom Airflow operator to put EMR steps into a > queue then have that service auto-scale depending on queue depth etc. If > the queue were Celery, it starts to look like a specialised Airflow > Executor. > > *Solutions*: > > Without yet doing much research, I've considered some competing > solutions for both problems: > > 1. Service for managing resources. Custom operator to communicate with > the service and schedule an atomic set of "EMR steps" with a given EMR > cluster specification. Service decides whether or not to reuse an > existing cluster or spin up a new one, can auto-scale etc. We can > represent both serial or parallel step patterns in the Airflow DAG itself. > > 2. Build resource management into Airflow > > 2a. Allow tasks to specify *resource* dependencies. As written above, a > new dimension of dependencies that lets Airflow manage when instances of > a resource should be spun up or otherwise acquired. > > 2b. Allow Airflow to have multiple Executors (could be an implementation > of 2a), e.g. EmrClusterExecutor. The scheduler still does its thing, but > tasks are run on a specialised kind of executor that understands EMR steps. > > Any thoughts? Has anyone worked on this set of problems before? I'm > specifically looking at EMR right now, but I suspect there are many > other use-cases. > > Regards, > > Jon > >