Hi, IMO scheduling a job can be independent of any operator while StatsListeners are not. I understand that in a lot of cases input/output operators will decide when the job ends but there can be cases when scheduling can be independent of it.
Thanks, Chandni On Jun 21, 2016 12:12 PM, "Thomas Weise" <tho...@datatorrent.com> wrote: > This looks like something that coordination wise belongs into the master > and can be done with a shared stats listener. > > The operator request/response protocol could be used the relay the data for > the scheduling decisions. > > Thomas > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni < > chandni.si...@capitalone.com> wrote: > > > Hi Tushar, > > > > I have some questions about the use case 2: Batch Support > > I don¹t understand the advantages of providing batch support by having an > > operator as a scheduler. > > > > An approach that seemed a little more straightforward to me was to expose > > an API for scheduler. If there is a scheduler set then the master uses > and > > schedules operators. By default there isn¹t any scheduler and the job is > > run as it is now. > > > > Maybe this is too simplistic but can you please let me know why having an > > operator as a scheduler is a better way? > > > > Thanks, > > Chandni > > > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <tus...@datatorrent.com> wrote: > > > > >Hi All, > > > > > >We have seen few use cases in field which require Apex application > > >scheduling based on some condition. This has also came up as part of > > >Batch Support in Apex previously > > >( > > > http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E) > > >. I am proposing following functionality in Apex to help scheduling > > >and better resource utilization for batch jobs. Please provide your > > >comments. > > > > > >Usecase 1 - Dynamic Dag modification. > > > > > >Each operator in DAG consumes yarn resources, sometimes it is > > >desirable to return the resources to yarn when no data is available > > >for processing, and deploy whole DAG once data starts to appear. For > > >this to happen automatically, we will need some data monitoring > > >operators running in the DAG to trigger restart and shutdown of the > > >operators in the DAG. > > > > > >Apex already have such api to dynamically change the running dag > > >through cli. We could provide similar API available to operators which > > >will trigger dag modification at runtime. This information can be > > >passed to master using heartbeat RPC and master will make > > >required changed to the DAG. let me know what do you think about it.. > > >something like below. > > >Context.beginDagChange(); > > >context.addOperator("o1") <== launch operator from previous > check-pointed > > >state. > > >context.addOperator("o2", new Operator2()) <== create new operator > > >context.addStream("s1", "reader.output", "o1.input"); > > >context.shutdown("o3"); <== delete this and downstream operators from > the > > >DAG. > > >context.apply(); <== dag changes will be send to master, and master > > >will apply these changes. > > > > > >Similarly API for other functionalities such as locality settings > > >needs to be provided. > > > > > > > > >Usecase 2 - Classic Batch Scheduling. > > > > > >Provide an API callable from operator to launch a DAG. The operator > > >will prepare an dag object and submit it to the yarn, the DAG will be > > >scheduled as a new application. This way complex schedulers can be > > >written as operators. > > > > > >public SchedulerOperator implements Operator { > > > void handleIdleTime() { > > > // check of conditions to start a job (for example enough files > > >available, enough items are available in kafa, or time has reached > > > Dag dag = context.createDAG(); > > > dag.addOperator(); > > > dag.addOperator(); > > > LaunchOptions lOptions = new LaunchOptions(); > > > lOptions.oldId = ""; // start for this checkpoint. > > > DagHandler dagHandler = context.submit(dag, lOptions); > > > } > > >} > > > > > >DagHandler will have methods to monitor the final state of > > >application, or to kill the DAG > > >dagHandler.waitForCompletion() <== wait till the DAG terminates > > >dagHandler.status() <== get the status of application. > > >dagHandler.kill() <== kill the running application. > > >dagHandler.shutdown() <== shutdown the application. > > > > > >The more complex Scheduler operators could be written to manage the > > >workflows, i.e DAG of DAGs. using these APIs. > > > > > >Regards, > > >-Tushar. > > > > ________________________________________________________ > > > > The information contained in this e-mail is confidential and/or > > proprietary to Capital One and/or its affiliates and may only be used > > solely in performance of work or services for Capital One. The > information > > transmitted herewith is intended only for use by the individual or entity > > to which it is addressed. If the reader of this message is not the > intended > > recipient, you are hereby notified that any review, retransmission, > > dissemination, distribution, copying or other use of, or taking of any > > action in reliance upon this information is strictly prohibited. If you > > have received this communication in error, please contact the sender and > > delete the material from your computer. > > > > >