For the usecase 1, is it possible to avoid changing the Context? Can we have something along the lines of "StramToNodeRequest" ?
On Tue, Jun 21, 2016 at 11:09 AM Tushar Gosavi <tus...@datatorrent.com> wrote: > Hi All, > > We have seen few use cases in field which require Apex application > scheduling based on some condition. This has also came up as part of > Batch Support in Apex previously > ( > http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDPXNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E > ) > . I am proposing following functionality in Apex to help scheduling > and better resource utilization for batch jobs. Please provide your > comments. > > Usecase 1 - Dynamic Dag modification. > > Each operator in DAG consumes yarn resources, sometimes it is > desirable to return the resources to yarn when no data is available > for processing, and deploy whole DAG once data starts to appear. For > this to happen automatically, we will need some data monitoring > operators running in the DAG to trigger restart and shutdown of the > operators in the DAG. > > Apex already have such api to dynamically change the running dag > through cli. We could provide similar API available to operators which > will trigger dag modification at runtime. This information can be > passed to master using heartbeat RPC and master will make > required changed to the DAG. let me know what do you think about it.. > something like below. > Context.beginDagChange(); > context.addOperator("o1") <== launch operator from previous check-pointed > state. > context.addOperator("o2", new Operator2()) <== create new operator > context.addStream("s1", "reader.output", "o1.input"); > context.shutdown("o3"); <== delete this and downstream operators from the > DAG. > context.apply(); <== dag changes will be send to master, and master > will apply these changes. > > Similarly API for other functionalities such as locality settings > needs to be provided. > > > Usecase 2 - Classic Batch Scheduling. > > Provide an API callable from operator to launch a DAG. The operator > will prepare an dag object and submit it to the yarn, the DAG will be > scheduled as a new application. This way complex schedulers can be > written as operators. > > public SchedulerOperator implements Operator { > void handleIdleTime() { > // check of conditions to start a job (for example enough files > available, enough items are available in kafa, or time has reached > Dag dag = context.createDAG(); > dag.addOperator(); > dag.addOperator(); > LaunchOptions lOptions = new LaunchOptions(); > lOptions.oldId = ""; // start for this checkpoint. > DagHandler dagHandler = context.submit(dag, lOptions); > } > } > > DagHandler will have methods to monitor the final state of > application, or to kill the DAG > dagHandler.waitForCompletion() <== wait till the DAG terminates > dagHandler.status() <== get the status of application. > dagHandler.kill() <== kill the running application. > dagHandler.shutdown() <== shutdown the application. > > The more complex Scheduler operators could be written to manage the > workflows, i.e DAG of DAGs. using these APIs. > > Regards, > -Tushar. >