I think its a good idea to have a scheduling operator when you need to start a part of the DAG when some trigger happens (for eg. FileSplitter identifying new files in FS) and otherwise bring it down to save resources.
On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas < timothytiborfar...@gmail.com> wrote: > I am in agreement with Chandni. Scheduling a batch job is an API completely > independent of a DAG or an operator. It could be used by a commandline tool > running on your laptop, a script, or it could happen to be used by an > Operator running in a DAG and a StatsListener. > > On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <tho...@datatorrent.com> > wrote: > > > Scheduling can be independent, although we have use cases where the > > scheduling depends on completion of processing (multi-staged batch jobs > > where unused resources need to be freed). > > > > Both can be accomplished with a stats listener. > > > > There can be a "scheduling operator" that brings up and removes DAG > > fragments as needed. > > > > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh <singh.chan...@gmail.com> > > wrote: > > > > > Hi, > > > IMO scheduling a job can be independent of any operator while > > > StatsListeners are not. I understand that in a lot of cases > input/output > > > operators will decide when the job ends but there can be cases when > > > scheduling can be independent of it. > > > > > > Thanks, > > > Chandni > > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <tho...@datatorrent.com> > wrote: > > > > > > > This looks like something that coordination wise belongs into the > > master > > > > and can be done with a shared stats listener. > > > > > > > > The operator request/response protocol could be used the relay the > data > > > for > > > > the scheduling decisions. > > > > > > > > Thomas > > > > > > > > > > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni < > > > > chandni.si...@capitalone.com> wrote: > > > > > > > > > Hi Tushar, > > > > > > > > > > I have some questions about the use case 2: Batch Support > > > > > I don¹t understand the advantages of providing batch support by > > having > > > an > > > > > operator as a scheduler. > > > > > > > > > > An approach that seemed a little more straightforward to me was to > > > expose > > > > > an API for scheduler. If there is a scheduler set then the master > > uses > > > > and > > > > > schedules operators. By default there isn¹t any scheduler and the > job > > > is > > > > > run as it is now. > > > > > > > > > > Maybe this is too simplistic but can you please let me know why > > having > > > an > > > > > operator as a scheduler is a better way? > > > > > > > > > > Thanks, > > > > > Chandni > > > > > > > > > > > > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <tus...@datatorrent.com> > > wrote: > > > > > > > > > > >Hi All, > > > > > > > > > > > >We have seen few use cases in field which require Apex application > > > > > >scheduling based on some condition. This has also came up as part > of > > > > > >Batch Support in Apex previously > > > > > >( > > > > > > > > > > > > > > > http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP > > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw% > 40mail.gmail.com > > > %3E) > > > > > >. I am proposing following functionality in Apex to help > scheduling > > > > > >and better resource utilization for batch jobs. Please provide > your > > > > > >comments. > > > > > > > > > > > >Usecase 1 - Dynamic Dag modification. > > > > > > > > > > > >Each operator in DAG consumes yarn resources, sometimes it is > > > > > >desirable to return the resources to yarn when no data is > available > > > > > >for processing, and deploy whole DAG once data starts to appear. > For > > > > > >this to happen automatically, we will need some data monitoring > > > > > >operators running in the DAG to trigger restart and shutdown of > the > > > > > >operators in the DAG. > > > > > > > > > > > >Apex already have such api to dynamically change the running dag > > > > > >through cli. We could provide similar API available to operators > > which > > > > > >will trigger dag modification at runtime. This information can be > > > > > >passed to master using heartbeat RPC and master will make > > > > > >required changed to the DAG. let me know what do you think about > > it.. > > > > > >something like below. > > > > > >Context.beginDagChange(); > > > > > >context.addOperator("o1") <== launch operator from previous > > > > check-pointed > > > > > >state. > > > > > >context.addOperator("o2", new Operator2()) <== create new operator > > > > > >context.addStream("s1", "reader.output", "o1.input"); > > > > > >context.shutdown("o3"); <== delete this and downstream operators > > from > > > > the > > > > > >DAG. > > > > > >context.apply(); <== dag changes will be send to master, and > master > > > > > >will apply these changes. > > > > > > > > > > > >Similarly API for other functionalities such as locality settings > > > > > >needs to be provided. > > > > > > > > > > > > > > > > > >Usecase 2 - Classic Batch Scheduling. > > > > > > > > > > > >Provide an API callable from operator to launch a DAG. The > operator > > > > > >will prepare an dag object and submit it to the yarn, the DAG will > > be > > > > > >scheduled as a new application. This way complex schedulers can be > > > > > >written as operators. > > > > > > > > > > > >public SchedulerOperator implements Operator { > > > > > > void handleIdleTime() { > > > > > > // check of conditions to start a job (for example enough > > files > > > > > >available, enough items are available in kafa, or time has reached > > > > > > Dag dag = context.createDAG(); > > > > > > dag.addOperator(); > > > > > > dag.addOperator(); > > > > > > LaunchOptions lOptions = new LaunchOptions(); > > > > > > lOptions.oldId = ""; // start for this checkpoint. > > > > > > DagHandler dagHandler = context.submit(dag, lOptions); > > > > > > } > > > > > >} > > > > > > > > > > > >DagHandler will have methods to monitor the final state of > > > > > >application, or to kill the DAG > > > > > >dagHandler.waitForCompletion() <== wait till the DAG terminates > > > > > >dagHandler.status() <== get the status of application. > > > > > >dagHandler.kill() <== kill the running application. > > > > > >dagHandler.shutdown() <== shutdown the application. > > > > > > > > > > > >The more complex Scheduler operators could be written to manage > the > > > > > >workflows, i.e DAG of DAGs. using these APIs. > > > > > > > > > > > >Regards, > > > > > >-Tushar. > > > > > > > > > > ________________________________________________________ > > > > > > > > > > The information contained in this e-mail is confidential and/or > > > > > proprietary to Capital One and/or its affiliates and may only be > used > > > > > solely in performance of work or services for Capital One. The > > > > information > > > > > transmitted herewith is intended only for use by the individual or > > > entity > > > > > to which it is addressed. If the reader of this message is not the > > > > intended > > > > > recipient, you are hereby notified that any review, retransmission, > > > > > dissemination, distribution, copying or other use of, or taking of > > any > > > > > action in reliance upon this information is strictly prohibited. If > > you > > > > > have received this communication in error, please contact the > sender > > > and > > > > > delete the material from your computer. > > > > > > > > > > > > > > > > > > > >