Hi Tushar, I have some questions about the use case 2: Batch Support I don¹t understand the advantages of providing batch support by having an operator as a scheduler.
An approach that seemed a little more straightforward to me was to expose an API for scheduler. If there is a scheduler set then the master uses and schedules operators. By default there isn¹t any scheduler and the job is run as it is now. Maybe this is too simplistic but can you please let me know why having an operator as a scheduler is a better way? Thanks, Chandni On 6/21/16, 11:09 AM, "Tushar Gosavi" <tus...@datatorrent.com> wrote: >Hi All, > >We have seen few use cases in field which require Apex application >scheduling based on some condition. This has also came up as part of >Batch Support in Apex previously >(http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E) >. I am proposing following functionality in Apex to help scheduling >and better resource utilization for batch jobs. Please provide your >comments. > >Usecase 1 - Dynamic Dag modification. > >Each operator in DAG consumes yarn resources, sometimes it is >desirable to return the resources to yarn when no data is available >for processing, and deploy whole DAG once data starts to appear. For >this to happen automatically, we will need some data monitoring >operators running in the DAG to trigger restart and shutdown of the >operators in the DAG. > >Apex already have such api to dynamically change the running dag >through cli. We could provide similar API available to operators which >will trigger dag modification at runtime. This information can be >passed to master using heartbeat RPC and master will make >required changed to the DAG. let me know what do you think about it.. >something like below. >Context.beginDagChange(); >context.addOperator("o1") <== launch operator from previous check-pointed >state. >context.addOperator("o2", new Operator2()) <== create new operator >context.addStream("s1", "reader.output", "o1.input"); >context.shutdown("o3"); <== delete this and downstream operators from the >DAG. >context.apply(); <== dag changes will be send to master, and master >will apply these changes. > >Similarly API for other functionalities such as locality settings >needs to be provided. > > >Usecase 2 - Classic Batch Scheduling. > >Provide an API callable from operator to launch a DAG. The operator >will prepare an dag object and submit it to the yarn, the DAG will be >scheduled as a new application. This way complex schedulers can be >written as operators. > >public SchedulerOperator implements Operator { > void handleIdleTime() { > // check of conditions to start a job (for example enough files >available, enough items are available in kafa, or time has reached > Dag dag = context.createDAG(); > dag.addOperator(); > dag.addOperator(); > LaunchOptions lOptions = new LaunchOptions(); > lOptions.oldId = ""; // start for this checkpoint. > DagHandler dagHandler = context.submit(dag, lOptions); > } >} > >DagHandler will have methods to monitor the final state of >application, or to kill the DAG >dagHandler.waitForCompletion() <== wait till the DAG terminates >dagHandler.status() <== get the status of application. >dagHandler.kill() <== kill the running application. >dagHandler.shutdown() <== shutdown the application. > >The more complex Scheduler operators could be written to manage the >workflows, i.e DAG of DAGs. using these APIs. > >Regards, >-Tushar. ________________________________________________________ The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.