Hi All, I have dome some initial prototype which allows stat listener to specify dag changes, and the dag changes are applied asynchronously.
The changes involved are - Add DagChangeSet object which is inherited from DAG, supporting methods to remove operator and streams. - The stat listener will return this object in Response, and platform will apply changes specified in response to the DAG. The Apex changes https://github.com/apache/apex-core/compare/master...tushargosavi:scheduler?expand=1 The correspondign Demo application, which one operator monitors the directory for files, and launch the wordcount DAG in same application master when files are available. https://github.com/tushargosavi/incubator-apex-malhar/tree/178ad0c763b48b32dfb1041d4d1c6d5da5fbb7fb/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/schedular Example of stat listerner which monitors a metric and instruct master to start a dag. /** look for more than 100 files in a directory, before lauching the DAG */ @Override public Response processStats(BatchedOperatorStats stats) { for(Stats.OperatorStats ws: stats.getLastWindowedStats()) { // pendingFiles is autometric. Integer value = (Integer)ws.metrics.get("pendingFiles"); LOG.info("stats recevied for {} pendingFiles {}", stats.getOperatorId(), value); if (value != null && value > 100 && !dagStarted) { dagStarted = true; Response resp = new Response(); resp.dag = getWordCountDag((String)ws.metrics.get("directory")); counter = 0; return resp; } } return null; } DAGChangeSet getWordCountDag(String dir) { DAGChangeSet dag = new DAGChangeSet(); LineByLineFileInputOperator reader = dag.addOperator("Reader", new LineByLineFileInputOperator()); List<StatsListener> listeners = new ArrayList<>(); listeners.add(this); dag.getMeta(reader).getAttributes().put(Context.OperatorContext.STATS_LISTENERS, listeners); reader.setDirectory(dir); LineSplitter splitter = dag.addOperator("SplitteR", new LineSplitter()); UniqueCounter<String> counter = dag.addOperator("Counter", new UniqueCounter<String>()); ConsoleOutputOperator out = dag.addOperator("Output", new ConsoleOutputOperator()); dag.addStream("s1", reader.output, splitter.input); dag.addStream("s2", splitter.words, counter.data); dag.addStream("s3", counter.count, out.input); return dag; } Let me know if this type of API is acceptable for launching the DAG. This is an API to specify DAG changes. The scheduler functionality will use this API. Regards, -Tushar. On Thu, Jun 23, 2016 at 2:48 AM, Thomas Weise <tho...@datatorrent.com> wrote: > I like the idea of keeping heavy lifting and custom code out of the master, > if possible. You find that split in responsibilities even in the case of > partitioning (Kafka connector for example). The change that requires > partitioning may be detected as byproduct of the regular processing in the > container, the information relayed to the master, the action being taken > there. > > We should separate all the different pieces and then decide where they run. > There is detecting the need for a plan change, then effecting the change > (which requires full DAG view and absolutely has to/should be in the > master). > > Thomas > > On Wed, Jun 22, 2016 at 12:03 PM, Singh, Chandni < > chandni.si...@capitalone.com> wrote: > >> We have couple of components that already run in master - partitioners, >> stats listeners, metrics aggregators. The problem of crashing the master >> is not specific to just scheduler, isn't it? >> ________________________________ >> From: Tushar Gosavi <tus...@datatorrent.com> >> Sent: Wednesday, June 22, 2016 2:32:39 PM >> To: dev@apex.apache.org >> Subject: Re: APEXCORE-408 : Ability to schedule Sub-DAG from running >> application >> >> I was thinking about avoiding running user code in master, As a crash >> in master takes down all containers with it. hence was going for >> scheduler as an operator, crash in scheduler won't kill the >> application, master can restart the scheduler back and it can start >> monitoring the job again and change the DAG when required. But this >> will require communication between master and scheduler for monitoring >> of operator status/stats. >> >> It is considerably easy to put scheduling functionality in master, as >> we have access to operator stats and there is communication channel >> already opened between master and operators. And custom scheduler can >> be written as shared stat listener, with additional API available to >> listener to add/remove/deploy/undeploy etc.. operators. >> >> Regards, >> - Tushar. >> >> >> On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <tho...@datatorrent.com> >> wrote: >> > Right, if it runs in the app master and does not rely on unmanaged >> external >> > processes, then these requirements can be met. >> > >> > This capability seeks to avoid users having to deal with external >> > schedulers or workflows if all they want is to split a DAG that is >> > logically one application into multiple stages for resource optimization. >> > This is not very different from the need to have elasticity in terms of >> > partitions depending to the availability of input, as you point out. >> > >> > >> > On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni < >> > chandni.si...@capitalone.com> wrote: >> > >> >> Scheduling IMO belongs to App master. Operators can influence it, for >> eg. >> >> File splitter can indicate that no more file to process. >> >> >> >> I don’t understand how that can not integrate with all the aspects- >> >> operability, fault tolerance and security. >> >> >> >> Chandni >> >> >> >> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <chin...@datatorrent.com> >> wrote: >> >> >> >> >I think its a good idea to have a scheduling operator when you need to >> >> >start a part of the DAG when some trigger happens (for eg. FileSplitter >> >> >identifying new files in FS) and otherwise bring it down to save >> >> >resources. >> >> > >> >> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas < >> >> >timothytiborfar...@gmail.com> wrote: >> >> > >> >> >> I am in agreement with Chandni. Scheduling a batch job is an API >> >> >>completely >> >> >> independent of a DAG or an operator. It could be used by a >> commandline >> >> >>tool >> >> >> running on your laptop, a script, or it could happen to be used by an >> >> >> Operator running in a DAG and a StatsListener. >> >> >> >> >> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise < >> tho...@datatorrent.com> >> >> >> wrote: >> >> >> >> >> >> > Scheduling can be independent, although we have use cases where the >> >> >> > scheduling depends on completion of processing (multi-staged batch >> >> >>jobs >> >> >> > where unused resources need to be freed). >> >> >> > >> >> >> > Both can be accomplished with a stats listener. >> >> >> > >> >> >> > There can be a "scheduling operator" that brings up and removes DAG >> >> >> > fragments as needed. >> >> >> > >> >> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh >> >> >><singh.chan...@gmail.com> >> >> >> > wrote: >> >> >> > >> >> >> > > Hi, >> >> >> > > IMO scheduling a job can be independent of any operator while >> >> >> > > StatsListeners are not. I understand that in a lot of cases >> >> >> input/output >> >> >> > > operators will decide when the job ends but there can be cases >> when >> >> >> > > scheduling can be independent of it. >> >> >> > > >> >> >> > > Thanks, >> >> >> > > Chandni >> >> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <tho...@datatorrent.com >> > >> >> >> wrote: >> >> >> > > >> >> >> > > > This looks like something that coordination wise belongs into >> the >> >> >> > master >> >> >> > > > and can be done with a shared stats listener. >> >> >> > > > >> >> >> > > > The operator request/response protocol could be used the relay >> the >> >> >> data >> >> >> > > for >> >> >> > > > the scheduling decisions. >> >> >> > > > >> >> >> > > > Thomas >> >> >> > > > >> >> >> > > > >> >> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni < >> >> >> > > > chandni.si...@capitalone.com> wrote: >> >> >> > > > >> >> >> > > > > Hi Tushar, >> >> >> > > > > >> >> >> > > > > I have some questions about the use case 2: Batch Support >> >> >> > > > > I don¹t understand the advantages of providing batch support >> by >> >> >> > having >> >> >> > > an >> >> >> > > > > operator as a scheduler. >> >> >> > > > > >> >> >> > > > > An approach that seemed a little more straightforward to me >> was >> >> >>to >> >> >> > > expose >> >> >> > > > > an API for scheduler. If there is a scheduler set then the >> >> >>master >> >> >> > uses >> >> >> > > > and >> >> >> > > > > schedules operators. By default there isn¹t any scheduler and >> >> >>the >> >> >> job >> >> >> > > is >> >> >> > > > > run as it is now. >> >> >> > > > > >> >> >> > > > > Maybe this is too simplistic but can you please let me know >> why >> >> >> > having >> >> >> > > an >> >> >> > > > > operator as a scheduler is a better way? >> >> >> > > > > >> >> >> > > > > Thanks, >> >> >> > > > > Chandni >> >> >> > > > > >> >> >> > > > > >> >> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" < >> tus...@datatorrent.com> >> >> >> > wrote: >> >> >> > > > > >> >> >> > > > > >Hi All, >> >> >> > > > > > >> >> >> > > > > >We have seen few use cases in field which require Apex >> >> >>application >> >> >> > > > > >scheduling based on some condition. This has also came up as >> >> >>part >> >> >> of >> >> >> > > > > >Batch Support in Apex previously >> >> >> > > > > >( >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> >> >> >> >> >> >> >> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP >> >> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw% >> >> >> 40mail.gmail.com >> >> >> > > %3E) >> >> >> > > > > >. I am proposing following functionality in Apex to help >> >> >> scheduling >> >> >> > > > > >and better resource utilization for batch jobs. Please >> provide >> >> >> your >> >> >> > > > > >comments. >> >> >> > > > > > >> >> >> > > > > >Usecase 1 - Dynamic Dag modification. >> >> >> > > > > > >> >> >> > > > > >Each operator in DAG consumes yarn resources, sometimes it >> is >> >> >> > > > > >desirable to return the resources to yarn when no data is >> >> >> available >> >> >> > > > > >for processing, and deploy whole DAG once data starts to >> >> >>appear. >> >> >> For >> >> >> > > > > >this to happen automatically, we will need some data >> monitoring >> >> >> > > > > >operators running in the DAG to trigger restart and >> shutdown of >> >> >> the >> >> >> > > > > >operators in the DAG. >> >> >> > > > > > >> >> >> > > > > >Apex already have such api to dynamically change the running >> >> >>dag >> >> >> > > > > >through cli. We could provide similar API available to >> >> >>operators >> >> >> > which >> >> >> > > > > >will trigger dag modification at runtime. This information >> can >> >> >>be >> >> >> > > > > >passed to master using heartbeat RPC and master will make >> >> >> > > > > >required changed to the DAG. let me know what do you think >> >> >>about >> >> >> > it.. >> >> >> > > > > >something like below. >> >> >> > > > > >Context.beginDagChange(); >> >> >> > > > > >context.addOperator("o1") <== launch operator from previous >> >> >> > > > check-pointed >> >> >> > > > > >state. >> >> >> > > > > >context.addOperator("o2", new Operator2()) <== create new >> >> >>operator >> >> >> > > > > >context.addStream("s1", "reader.output", "o1.input"); >> >> >> > > > > >context.shutdown("o3"); <== delete this and downstream >> >> >>operators >> >> >> > from >> >> >> > > > the >> >> >> > > > > >DAG. >> >> >> > > > > >context.apply(); <== dag changes will be send to master, >> and >> >> >> master >> >> >> > > > > >will apply these changes. >> >> >> > > > > > >> >> >> > > > > >Similarly API for other functionalities such as locality >> >> >>settings >> >> >> > > > > >needs to be provided. >> >> >> > > > > > >> >> >> > > > > > >> >> >> > > > > >Usecase 2 - Classic Batch Scheduling. >> >> >> > > > > > >> >> >> > > > > >Provide an API callable from operator to launch a DAG. The >> >> >> operator >> >> >> > > > > >will prepare an dag object and submit it to the yarn, the >> DAG >> >> >>will >> >> >> > be >> >> >> > > > > >scheduled as a new application. This way complex schedulers >> >> >>can be >> >> >> > > > > >written as operators. >> >> >> > > > > > >> >> >> > > > > >public SchedulerOperator implements Operator { >> >> >> > > > > > void handleIdleTime() { >> >> >> > > > > > // check of conditions to start a job (for example >> enough >> >> >> > files >> >> >> > > > > >available, enough items are available in kafa, or time has >> >> >>reached >> >> >> > > > > > Dag dag = context.createDAG(); >> >> >> > > > > > dag.addOperator(); >> >> >> > > > > > dag.addOperator(); >> >> >> > > > > > LaunchOptions lOptions = new LaunchOptions(); >> >> >> > > > > > lOptions.oldId = ""; // start for this checkpoint. >> >> >> > > > > > DagHandler dagHandler = context.submit(dag, lOptions); >> >> >> > > > > > } >> >> >> > > > > >} >> >> >> > > > > > >> >> >> > > > > >DagHandler will have methods to monitor the final state of >> >> >> > > > > >application, or to kill the DAG >> >> >> > > > > >dagHandler.waitForCompletion() <== wait till the DAG >> terminates >> >> >> > > > > >dagHandler.status() <== get the status of application. >> >> >> > > > > >dagHandler.kill() <== kill the running application. >> >> >> > > > > >dagHandler.shutdown() <== shutdown the application. >> >> >> > > > > > >> >> >> > > > > >The more complex Scheduler operators could be written to >> manage >> >> >> the >> >> >> > > > > >workflows, i.e DAG of DAGs. using these APIs. >> >> >> > > > > > >> >> >> > > > > >Regards, >> >> >> > > > > >-Tushar. >> >> >> > > > > >> >> >> > > > > ________________________________________________________ >> >> >> > > > > >> >> >> > > > > The information contained in this e-mail is confidential >> and/or >> >> >> > > > > proprietary to Capital One and/or its affiliates and may >> only be >> >> >> used >> >> >> > > > > solely in performance of work or services for Capital One. >> The >> >> >> > > > information >> >> >> > > > > transmitted herewith is intended only for use by the >> individual >> >> >>or >> >> >> > > entity >> >> >> > > > > to which it is addressed. If the reader of this message is >> not >> >> >>the >> >> >> > > > intended >> >> >> > > > > recipient, you are hereby notified that any review, >> >> >>retransmission, >> >> >> > > > > dissemination, distribution, copying or other use of, or >> taking >> >> >>of >> >> >> > any >> >> >> > > > > action in reliance upon this information is strictly >> >> >>prohibited. If >> >> >> > you >> >> >> > > > > have received this communication in error, please contact the >> >> >> sender >> >> >> > > and >> >> >> > > > > delete the material from your computer. >> >> >> > > > > >> >> >> > > > > >> >> >> > > > >> >> >> > > >> >> >> > >> >> >> >> >> >> >> ________________________________________________________ >> >> >> >> The information contained in this e-mail is confidential and/or >> >> proprietary to Capital One and/or its affiliates and may only be used >> >> solely in performance of work or services for Capital One. The >> information >> >> transmitted herewith is intended only for use by the individual or >> entity >> >> to which it is addressed. If the reader of this message is not the >> intended >> >> recipient, you are hereby notified that any review, retransmission, >> >> dissemination, distribution, copying or other use of, or taking of any >> >> action in reliance upon this information is strictly prohibited. If you >> >> have received this communication in error, please contact the sender and >> >> delete the material from your computer. >> >> >> ________________________________________________________ >> >> The information contained in this e-mail is confidential and/or >> proprietary to Capital One and/or its affiliates and may only be used >> solely in performance of work or services for Capital One. The information >> transmitted herewith is intended only for use by the individual or entity >> to which it is addressed. If the reader of this message is not the intended >> recipient, you are hereby notified that any review, retransmission, >> dissemination, distribution, copying or other use of, or taking of any >> action in reliance upon this information is strictly prohibited. If you >> have received this communication in error, please contact the sender and >> delete the material from your computer. >>