Hello, I wonder how much this PR is the solution to your problems. https://github.com/apache/airflow/pull/6210 This PR improves the use of resources when the operation is performed remotely.
Best regards, On Thu, Oct 31, 2019 at 2:05 AM Aizhamal Nurmamat kyzy <[email protected]> wrote: > > Is anyone able to give useful pointers to Jon? > > +Ash Berlin-Taylor <[email protected]> [email protected] > <[email protected]> +Driesprong, Fokko <[email protected]> , +Jarek > Potiuk <[email protected]> tagging you just because I know you all > and feel safe about not being judged :D > > On Tue, Oct 15, 2019 at 1:41 PM Jonathan Miles <[email protected]> wrote: > > > Hi all, > > > > Posting this to dev instead of users because it crosses into framework > > territory. > > > > I've been using Airflow for six months or so and I'm starting to think > > about how to better manage Airflow tasks that are proxies for compute > > tasks running elsewhere, e.g. steps on Amazon EMR clusters. It's easy > > enough use a DAG with the various existing Emr...Operators to create > > clusters, add steps and tear down. However, with large numbers of > > parallel steps it's hard to manage the creation of EMR clusters, reuse > > them in various steps and potentially even dynamically scale the EMR > > cluster count. I want something more akin to a producer/consumer queue > > for EMR steps. Before I write an AIP, I want to see if anyone's aware of > > any other work or development in this area? > > > > *Example Problem 1*: cluster reuse. > > > > A workflow might need to spin up and execute steps on multiple EMR > > clusters of different sizes, e.g. > > > > - EMR Cluster A > > -- Phase 1 EMR Steps > > -- Phase 2 EMR Steps > > -- Phase 3 EMR Steps > > -- Phase 4 EMR Steps > > > > - EMR Cluster B > > -- Phase 5 EMR Steps > > -- Phase 6 EMR Steps > > > > - EMR Cluster C > > -- Phase 7 EMR Steps > > -- Phase 8 EMR Steps > > > > The above steps are serial, requiring the previous phase to finish. The > > most basic way to model it in a DAG is to use EmrCreateJobFlowOperator > > for a cluster, then a set of EmrAddStepsOperator and EmrStepSensor pairs > > for each phase and finally terminate the cluster with > > EmrTerminateJobFlowOperator. We can use XCom to fetch the cluster id for > > AddSteps/TerminateJobFlow and to fetch the step ids for the StepSensor. > > > > - EmrCreateJobFlowOperator > > - Phase 1: EmrAddStepsOperator + EmrStepSensor > > - Phase 2: EmrAddStepsOperator + EmrStepSensor > > - Phase 3: EmrAddStepsOperator + EmrStepSensor > > - Phase 4: EmrAddStepsOperator + EmrStepSensor > > - EmrTerminateJobFlowOperator > > > > One problem here is that if the underlying EMR cluster fails at any time > > - e.g. it could be using spot EC2 instances and AWS capacity runs out - > > someone needs to manually attend to the failed task instance: restart > > the EMR cluster; then reset the state of the failed AddSteps/StepSensor > > task pairs. It needs close supervision. > > > > There are other ways to model this workflow in a DAG with different > > trade-offs, e.g. > > > > 1. Put each phase in a SubDag then create and terminate the cluster for > > each phase, with any failed task causing the whole SubDag to retry. But > > this adds extra total duration due to stopping/starting the cluster for > > each phase, which is not insignificant. > > > > 2. Write custom operators. One to represent an EMR cluster as a > > EmrSubDagOperator to create and eventually terminate cluster. Then > > another another to create sub-tasks that use XCom to fetch the cluster > > id from the "parent" SubDag, add the EMR steps and wait. > > > > Ideally, however, I'd like Airflow to manage this itself: it could know > > that task instances require certain resources or resource instances, > > then start/stop them as required. > > > > *Example Problem 2*: parallel tasks. > > > > A workflow might be split into many parallel steps, e.g. for different > > data partitions or bins. > > > > - Phase P EMR Steps > > -- Parallel EMR step for bin 0 > > -- ... > > -- Parallel EMR step for bin N > > > > The above steps are independent, where they compute a subset of the > > larger problem, so can be run in parallel. A basic way to model this as > > a DAG is to create as many branches as the desired parallelism level, > > e.g. for parallelism of two: > > > > - EmrCreateJobFlowOperator > > -- Bin 0: EmrAddStepsOperator + EmrStepSensor > > -- ... > > -- Bin max(even(B)): EmrAddStepsOperator + EmrStepSensor > > -- EmrTerminateJobFlowOperator > > > > - EmrCreateJobFlowOperator > > -- Bin 1: EmrAddStepsOperator + EmrStepSensor > > -- ... > > -- Bin max(odd(B)): EmrAddStepsOperator + EmrStepSensor > > -- EmrTerminateJobFlowOperator > > > > This has the same management problem as the previous example when > > clusters fail, but also another challenge: the parallelism level is > > statically coded into the DAG topology. It's hard to scale up/down and > > rebalance the tasks for the bins. > > > > I could create a separate "EMR cluster" management service outside of > > Airflow and write a custom Airflow operator to put EMR steps into a > > queue then have that service auto-scale depending on queue depth etc. If > > the queue were Celery, it starts to look like a specialised Airflow > > Executor. > > > > *Solutions*: > > > > Without yet doing much research, I've considered some competing > > solutions for both problems: > > > > 1. Service for managing resources. Custom operator to communicate with > > the service and schedule an atomic set of "EMR steps" with a given EMR > > cluster specification. Service decides whether or not to reuse an > > existing cluster or spin up a new one, can auto-scale etc. We can > > represent both serial or parallel step patterns in the Airflow DAG itself. > > > > 2. Build resource management into Airflow > > > > 2a. Allow tasks to specify *resource* dependencies. As written above, a > > new dimension of dependencies that lets Airflow manage when instances of > > a resource should be spun up or otherwise acquired. > > > > 2b. Allow Airflow to have multiple Executors (could be an implementation > > of 2a), e.g. EmrClusterExecutor. The scheduler still does its thing, but > > tasks are run on a specialised kind of executor that understands EMR steps. > > > > Any thoughts? Has anyone worked on this set of problems before? I'm > > specifically looking at EMR right now, but I suspect there are many > > other use-cases. > > > > Regards, > > > > Jon > > > >
