This looks like something that coordination wise belongs into the master and can be done with a shared stats listener.
The operator request/response protocol could be used the relay the data for the scheduling decisions. Thomas On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni < chandni.si...@capitalone.com> wrote: > Hi Tushar, > > I have some questions about the use case 2: Batch Support > I don¹t understand the advantages of providing batch support by having an > operator as a scheduler. > > An approach that seemed a little more straightforward to me was to expose > an API for scheduler. If there is a scheduler set then the master uses and > schedules operators. By default there isn¹t any scheduler and the job is > run as it is now. > > Maybe this is too simplistic but can you please let me know why having an > operator as a scheduler is a better way? > > Thanks, > Chandni > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <tus...@datatorrent.com> wrote: > > >Hi All, > > > >We have seen few use cases in field which require Apex application > >scheduling based on some condition. This has also came up as part of > >Batch Support in Apex previously > >( > http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E) > >. I am proposing following functionality in Apex to help scheduling > >and better resource utilization for batch jobs. Please provide your > >comments. > > > >Usecase 1 - Dynamic Dag modification. > > > >Each operator in DAG consumes yarn resources, sometimes it is > >desirable to return the resources to yarn when no data is available > >for processing, and deploy whole DAG once data starts to appear. For > >this to happen automatically, we will need some data monitoring > >operators running in the DAG to trigger restart and shutdown of the > >operators in the DAG. > > > >Apex already have such api to dynamically change the running dag > >through cli. We could provide similar API available to operators which > >will trigger dag modification at runtime. This information can be > >passed to master using heartbeat RPC and master will make > >required changed to the DAG. let me know what do you think about it.. > >something like below. > >Context.beginDagChange(); > >context.addOperator("o1") <== launch operator from previous check-pointed > >state. > >context.addOperator("o2", new Operator2()) <== create new operator > >context.addStream("s1", "reader.output", "o1.input"); > >context.shutdown("o3"); <== delete this and downstream operators from the > >DAG. > >context.apply(); <== dag changes will be send to master, and master > >will apply these changes. > > > >Similarly API for other functionalities such as locality settings > >needs to be provided. > > > > > >Usecase 2 - Classic Batch Scheduling. > > > >Provide an API callable from operator to launch a DAG. The operator > >will prepare an dag object and submit it to the yarn, the DAG will be > >scheduled as a new application. This way complex schedulers can be > >written as operators. > > > >public SchedulerOperator implements Operator { > > void handleIdleTime() { > > // check of conditions to start a job (for example enough files > >available, enough items are available in kafa, or time has reached > > Dag dag = context.createDAG(); > > dag.addOperator(); > > dag.addOperator(); > > LaunchOptions lOptions = new LaunchOptions(); > > lOptions.oldId = ""; // start for this checkpoint. > > DagHandler dagHandler = context.submit(dag, lOptions); > > } > >} > > > >DagHandler will have methods to monitor the final state of > >application, or to kill the DAG > >dagHandler.waitForCompletion() <== wait till the DAG terminates > >dagHandler.status() <== get the status of application. > >dagHandler.kill() <== kill the running application. > >dagHandler.shutdown() <== shutdown the application. > > > >The more complex Scheduler operators could be written to manage the > >workflows, i.e DAG of DAGs. using these APIs. > > > >Regards, > >-Tushar. > > ________________________________________________________ > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. > >