Hi All, I have updated the design document as per review comments on pull request (https://issues.apache.org/jira/secure/attachment/12849094/dag.docx). Please provide your suggestion.
- Tushar. On Fri, Sep 16, 2016 at 6:40 PM, Tushar Gosavi <tus...@datatorrent.com> wrote: > Hi All, > > I have opened a review pull request for dynamic dag modification > through stats listener (https://github.com/apache/apex-core/pull/393). > Please review and provide > comments/suggestions. > > It provides following functionality > - StatsListener can access the opearator name for easily detecting > which opearator stats are being processed. > - StatsListener can create a instance of object through which it can > submit dag modifications to the engine. > - StatsListener can return dag changes as a response to engine. > - PlanModifier is modified to take a DAG and apply it on the existing > running DAG and deploy the changes. > > The following functionality is not working yet. > > - The new opearator does not start from the correct windowId > (https://issues.apache.org/jira/browse/APEXCORE-532) > - Relanched application failed to start when it was killed after > dynamic dag modification. > - There is no support for resuming operator from previous state when > they were removed. This could be achived through > readig state through external storage on setup. > - persist operator support is not present for newly added streams. > > The demo application using the feature is available at > https://github.com/tushargosavi/apex-dynamic-scheduling > > There are two variations of WordCount application. The first variation > detects the presence of > new files and start a disconnected DAG to process the data. > (https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/WordCountApp.java) > > The second application > (https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java), > starts with reader operator, and provides pendingFiles as auto-metric > to stat listener. On detecting pending files it attaches splitter > counter and output > operator to the read operator. Once files are processed the splitter, > counter and output operators are removed and > added back again if new data files are added into the directory. > > Regards, > -Tushar. > > > On Mon, Aug 1, 2016 at 6:10 PM, Tushar Gosavi <tus...@datatorrent.com> wrote: >> Hi All, >> >> I was able to prototype an simple word count application, which will >> start with just a single file reader operator. File reader operator >> will emit pendingFiles as metric to StatsListener. The statslistener >> will change DAG once enough files are available. The listener will return >> plan change to add word splitter, counter and console operator to the >> reader and complete the DAG for wordcount. >> >> After 120 windows of inactivity, the three operators will be removed >> from DAG again. When new set of files are added these operators are >> added back again. >> >> The high level proposal document: >> https://docs.google.com/document/d/1gpy7NFxd6te1lTXN1-2r8WHRjg83zEM8aqpPAXivFRQ/edit?usp=sharing >> >> The prototype code is at : >> https://github.com/tushargosavi/apex-dynamic-scheduling/ >> The Application files are >> https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/FileStatListenerSameDag.java >> https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java >> >> Please provide your feedback. >> >> Some challenges yet to resolve are >> - Restoring operator state from previously removed operator. >> - Handling cuncurrent modifications to DAG from multiple StatsListener. >> - Making DAG changes persistent, user should be able to restart the >> application if application was killed with modified dag. >> >> Thanks, >> -Tushar. >> >> On Fri, Jul 8, 2016 at 6:48 PM, Tushar Gosavi <tus...@datatorrent.com> wrote: >>> Hi All, >>> >>> I have dome some initial prototype which allows stat listener to >>> specify dag changes, and the dag changes are applied asynchronously. >>> >>> The changes involved are >>> - Add DagChangeSet object which is inherited from DAG, supporting >>> methods to remove >>> operator and streams. >>> >>> - The stat listener will return this object in Response, and platform >>> will apply changes specified in response to the DAG. >>> >>> >>> The Apex changes >>> https://github.com/apache/apex-core/compare/master...tushargosavi:scheduler?expand=1 >>> >>> The correspondign Demo application, which one operator monitors the >>> directory for files, and launch the wordcount DAG in >>> same application master when files are available. >>> https://github.com/tushargosavi/incubator-apex-malhar/tree/178ad0c763b48b32dfb1041d4d1c6d5da5fbb7fb/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/schedular >>> >>> Example of stat listerner which monitors a metric and instruct master >>> to start a dag. >>> >>> /** look for more than 100 files in a directory, before lauching the DAG */ >>> @Override >>> public Response processStats(BatchedOperatorStats stats) >>> { >>> for(Stats.OperatorStats ws: stats.getLastWindowedStats()) { >>> // pendingFiles is autometric. >>> Integer value = (Integer)ws.metrics.get("pendingFiles"); >>> LOG.info("stats recevied for {} pendingFiles {}", >>> stats.getOperatorId(), value); >>> if (value != null && value > 100 && !dagStarted) { >>> dagStarted = true; >>> Response resp = new Response(); >>> resp.dag = getWordCountDag((String)ws.metrics.get("directory")); >>> counter = 0; >>> return resp; >>> } >>> } >>> return null; >>> } >>> >>> DAGChangeSet getWordCountDag(String dir) >>> { >>> DAGChangeSet dag = new DAGChangeSet(); >>> LineByLineFileInputOperator reader = dag.addOperator("Reader", >>> new LineByLineFileInputOperator()); >>> List<StatsListener> listeners = new ArrayList<>(); >>> listeners.add(this); >>> >>> dag.getMeta(reader).getAttributes().put(Context.OperatorContext.STATS_LISTENERS, >>> listeners); >>> reader.setDirectory(dir); >>> LineSplitter splitter = dag.addOperator("SplitteR", new >>> LineSplitter()); >>> UniqueCounter<String> counter = dag.addOperator("Counter", new >>> UniqueCounter<String>()); >>> ConsoleOutputOperator out = dag.addOperator("Output", new >>> ConsoleOutputOperator()); >>> dag.addStream("s1", reader.output, splitter.input); >>> dag.addStream("s2", splitter.words, counter.data); >>> dag.addStream("s3", counter.count, out.input); >>> return dag; >>> } >>> >>> Let me know if this type of API is acceptable for launching the DAG. >>> This is an API to specify DAG changes. The scheduler functionality >>> will use >>> this API. >>> >>> >>> Regards, >>> -Tushar. >>> >>> On Thu, Jun 23, 2016 at 2:48 AM, Thomas Weise <tho...@datatorrent.com> >>> wrote: >>>> I like the idea of keeping heavy lifting and custom code out of the master, >>>> if possible. You find that split in responsibilities even in the case of >>>> partitioning (Kafka connector for example). The change that requires >>>> partitioning may be detected as byproduct of the regular processing in the >>>> container, the information relayed to the master, the action being taken >>>> there. >>>> >>>> We should separate all the different pieces and then decide where they run. >>>> There is detecting the need for a plan change, then effecting the change >>>> (which requires full DAG view and absolutely has to/should be in the >>>> master). >>>> >>>> Thomas >>>> >>>> On Wed, Jun 22, 2016 at 12:03 PM, Singh, Chandni < >>>> chandni.si...@capitalone.com> wrote: >>>> >>>>> We have couple of components that already run in master - partitioners, >>>>> stats listeners, metrics aggregators. The problem of crashing the master >>>>> is not specific to just scheduler, isn't it? >>>>> ________________________________ >>>>> From: Tushar Gosavi <tus...@datatorrent.com> >>>>> Sent: Wednesday, June 22, 2016 2:32:39 PM >>>>> To: dev@apex.apache.org >>>>> Subject: Re: APEXCORE-408 : Ability to schedule Sub-DAG from running >>>>> application >>>>> >>>>> I was thinking about avoiding running user code in master, As a crash >>>>> in master takes down all containers with it. hence was going for >>>>> scheduler as an operator, crash in scheduler won't kill the >>>>> application, master can restart the scheduler back and it can start >>>>> monitoring the job again and change the DAG when required. But this >>>>> will require communication between master and scheduler for monitoring >>>>> of operator status/stats. >>>>> >>>>> It is considerably easy to put scheduling functionality in master, as >>>>> we have access to operator stats and there is communication channel >>>>> already opened between master and operators. And custom scheduler can >>>>> be written as shared stat listener, with additional API available to >>>>> listener to add/remove/deploy/undeploy etc.. operators. >>>>> >>>>> Regards, >>>>> - Tushar. >>>>> >>>>> >>>>> On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <tho...@datatorrent.com> >>>>> wrote: >>>>> > Right, if it runs in the app master and does not rely on unmanaged >>>>> external >>>>> > processes, then these requirements can be met. >>>>> > >>>>> > This capability seeks to avoid users having to deal with external >>>>> > schedulers or workflows if all they want is to split a DAG that is >>>>> > logically one application into multiple stages for resource >>>>> > optimization. >>>>> > This is not very different from the need to have elasticity in terms of >>>>> > partitions depending to the availability of input, as you point out. >>>>> > >>>>> > >>>>> > On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni < >>>>> > chandni.si...@capitalone.com> wrote: >>>>> > >>>>> >> Scheduling IMO belongs to App master. Operators can influence it, for >>>>> eg. >>>>> >> File splitter can indicate that no more file to process. >>>>> >> >>>>> >> I don’t understand how that can not integrate with all the aspects- >>>>> >> operability, fault tolerance and security. >>>>> >> >>>>> >> Chandni >>>>> >> >>>>> >> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <chin...@datatorrent.com> >>>>> wrote: >>>>> >> >>>>> >> >I think its a good idea to have a scheduling operator when you need to >>>>> >> >start a part of the DAG when some trigger happens (for eg. >>>>> >> >FileSplitter >>>>> >> >identifying new files in FS) and otherwise bring it down to save >>>>> >> >resources. >>>>> >> > >>>>> >> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas < >>>>> >> >timothytiborfar...@gmail.com> wrote: >>>>> >> > >>>>> >> >> I am in agreement with Chandni. Scheduling a batch job is an API >>>>> >> >>completely >>>>> >> >> independent of a DAG or an operator. It could be used by a >>>>> commandline >>>>> >> >>tool >>>>> >> >> running on your laptop, a script, or it could happen to be used by >>>>> >> >> an >>>>> >> >> Operator running in a DAG and a StatsListener. >>>>> >> >> >>>>> >> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise < >>>>> tho...@datatorrent.com> >>>>> >> >> wrote: >>>>> >> >> >>>>> >> >> > Scheduling can be independent, although we have use cases where >>>>> >> >> > the >>>>> >> >> > scheduling depends on completion of processing (multi-staged batch >>>>> >> >>jobs >>>>> >> >> > where unused resources need to be freed). >>>>> >> >> > >>>>> >> >> > Both can be accomplished with a stats listener. >>>>> >> >> > >>>>> >> >> > There can be a "scheduling operator" that brings up and removes >>>>> >> >> > DAG >>>>> >> >> > fragments as needed. >>>>> >> >> > >>>>> >> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh >>>>> >> >><singh.chan...@gmail.com> >>>>> >> >> > wrote: >>>>> >> >> > >>>>> >> >> > > Hi, >>>>> >> >> > > IMO scheduling a job can be independent of any operator while >>>>> >> >> > > StatsListeners are not. I understand that in a lot of cases >>>>> >> >> input/output >>>>> >> >> > > operators will decide when the job ends but there can be cases >>>>> when >>>>> >> >> > > scheduling can be independent of it. >>>>> >> >> > > >>>>> >> >> > > Thanks, >>>>> >> >> > > Chandni >>>>> >> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <tho...@datatorrent.com >>>>> > >>>>> >> >> wrote: >>>>> >> >> > > >>>>> >> >> > > > This looks like something that coordination wise belongs into >>>>> the >>>>> >> >> > master >>>>> >> >> > > > and can be done with a shared stats listener. >>>>> >> >> > > > >>>>> >> >> > > > The operator request/response protocol could be used the relay >>>>> the >>>>> >> >> data >>>>> >> >> > > for >>>>> >> >> > > > the scheduling decisions. >>>>> >> >> > > > >>>>> >> >> > > > Thomas >>>>> >> >> > > > >>>>> >> >> > > > >>>>> >> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni < >>>>> >> >> > > > chandni.si...@capitalone.com> wrote: >>>>> >> >> > > > >>>>> >> >> > > > > Hi Tushar, >>>>> >> >> > > > > >>>>> >> >> > > > > I have some questions about the use case 2: Batch Support >>>>> >> >> > > > > I don¹t understand the advantages of providing batch support >>>>> by >>>>> >> >> > having >>>>> >> >> > > an >>>>> >> >> > > > > operator as a scheduler. >>>>> >> >> > > > > >>>>> >> >> > > > > An approach that seemed a little more straightforward to me >>>>> was >>>>> >> >>to >>>>> >> >> > > expose >>>>> >> >> > > > > an API for scheduler. If there is a scheduler set then the >>>>> >> >>master >>>>> >> >> > uses >>>>> >> >> > > > and >>>>> >> >> > > > > schedules operators. By default there isn¹t any scheduler >>>>> >> >> > > > > and >>>>> >> >>the >>>>> >> >> job >>>>> >> >> > > is >>>>> >> >> > > > > run as it is now. >>>>> >> >> > > > > >>>>> >> >> > > > > Maybe this is too simplistic but can you please let me know >>>>> why >>>>> >> >> > having >>>>> >> >> > > an >>>>> >> >> > > > > operator as a scheduler is a better way? >>>>> >> >> > > > > >>>>> >> >> > > > > Thanks, >>>>> >> >> > > > > Chandni >>>>> >> >> > > > > >>>>> >> >> > > > > >>>>> >> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" < >>>>> tus...@datatorrent.com> >>>>> >> >> > wrote: >>>>> >> >> > > > > >>>>> >> >> > > > > >Hi All, >>>>> >> >> > > > > > >>>>> >> >> > > > > >We have seen few use cases in field which require Apex >>>>> >> >>application >>>>> >> >> > > > > >scheduling based on some condition. This has also came up >>>>> >> >> > > > > >as >>>>> >> >>part >>>>> >> >> of >>>>> >> >> > > > > >Batch Support in Apex previously >>>>> >> >> > > > > >( >>>>> >> >> > > > > >>>>> >> >> > > > >>>>> >> >> > > >>>>> >> >> > >>>>> >> >> >>>>> >> >> >>>>> >> >>>>> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP >>>>> >> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw% >>>>> >> >> 40mail.gmail.com >>>>> >> >> > > %3E) >>>>> >> >> > > > > >. I am proposing following functionality in Apex to help >>>>> >> >> scheduling >>>>> >> >> > > > > >and better resource utilization for batch jobs. Please >>>>> provide >>>>> >> >> your >>>>> >> >> > > > > >comments. >>>>> >> >> > > > > > >>>>> >> >> > > > > >Usecase 1 - Dynamic Dag modification. >>>>> >> >> > > > > > >>>>> >> >> > > > > >Each operator in DAG consumes yarn resources, sometimes it >>>>> is >>>>> >> >> > > > > >desirable to return the resources to yarn when no data is >>>>> >> >> available >>>>> >> >> > > > > >for processing, and deploy whole DAG once data starts to >>>>> >> >>appear. >>>>> >> >> For >>>>> >> >> > > > > >this to happen automatically, we will need some data >>>>> monitoring >>>>> >> >> > > > > >operators running in the DAG to trigger restart and >>>>> shutdown of >>>>> >> >> the >>>>> >> >> > > > > >operators in the DAG. >>>>> >> >> > > > > > >>>>> >> >> > > > > >Apex already have such api to dynamically change the >>>>> >> >> > > > > >running >>>>> >> >>dag >>>>> >> >> > > > > >through cli. We could provide similar API available to >>>>> >> >>operators >>>>> >> >> > which >>>>> >> >> > > > > >will trigger dag modification at runtime. This information >>>>> can >>>>> >> >>be >>>>> >> >> > > > > >passed to master using heartbeat RPC and master will make >>>>> >> >> > > > > >required changed to the DAG. let me know what do you think >>>>> >> >>about >>>>> >> >> > it.. >>>>> >> >> > > > > >something like below. >>>>> >> >> > > > > >Context.beginDagChange(); >>>>> >> >> > > > > >context.addOperator("o1") <== launch operator from previous >>>>> >> >> > > > check-pointed >>>>> >> >> > > > > >state. >>>>> >> >> > > > > >context.addOperator("o2", new Operator2()) <== create new >>>>> >> >>operator >>>>> >> >> > > > > >context.addStream("s1", "reader.output", "o1.input"); >>>>> >> >> > > > > >context.shutdown("o3"); <== delete this and downstream >>>>> >> >>operators >>>>> >> >> > from >>>>> >> >> > > > the >>>>> >> >> > > > > >DAG. >>>>> >> >> > > > > >context.apply(); <== dag changes will be send to master, >>>>> and >>>>> >> >> master >>>>> >> >> > > > > >will apply these changes. >>>>> >> >> > > > > > >>>>> >> >> > > > > >Similarly API for other functionalities such as locality >>>>> >> >>settings >>>>> >> >> > > > > >needs to be provided. >>>>> >> >> > > > > > >>>>> >> >> > > > > > >>>>> >> >> > > > > >Usecase 2 - Classic Batch Scheduling. >>>>> >> >> > > > > > >>>>> >> >> > > > > >Provide an API callable from operator to launch a DAG. The >>>>> >> >> operator >>>>> >> >> > > > > >will prepare an dag object and submit it to the yarn, the >>>>> DAG >>>>> >> >>will >>>>> >> >> > be >>>>> >> >> > > > > >scheduled as a new application. This way complex schedulers >>>>> >> >>can be >>>>> >> >> > > > > >written as operators. >>>>> >> >> > > > > > >>>>> >> >> > > > > >public SchedulerOperator implements Operator { >>>>> >> >> > > > > > void handleIdleTime() { >>>>> >> >> > > > > > // check of conditions to start a job (for example >>>>> enough >>>>> >> >> > files >>>>> >> >> > > > > >available, enough items are available in kafa, or time has >>>>> >> >>reached >>>>> >> >> > > > > > Dag dag = context.createDAG(); >>>>> >> >> > > > > > dag.addOperator(); >>>>> >> >> > > > > > dag.addOperator(); >>>>> >> >> > > > > > LaunchOptions lOptions = new LaunchOptions(); >>>>> >> >> > > > > > lOptions.oldId = ""; // start for this checkpoint. >>>>> >> >> > > > > > DagHandler dagHandler = context.submit(dag, lOptions); >>>>> >> >> > > > > > } >>>>> >> >> > > > > >} >>>>> >> >> > > > > > >>>>> >> >> > > > > >DagHandler will have methods to monitor the final state of >>>>> >> >> > > > > >application, or to kill the DAG >>>>> >> >> > > > > >dagHandler.waitForCompletion() <== wait till the DAG >>>>> terminates >>>>> >> >> > > > > >dagHandler.status() <== get the status of application. >>>>> >> >> > > > > >dagHandler.kill() <== kill the running application. >>>>> >> >> > > > > >dagHandler.shutdown() <== shutdown the application. >>>>> >> >> > > > > > >>>>> >> >> > > > > >The more complex Scheduler operators could be written to >>>>> manage >>>>> >> >> the >>>>> >> >> > > > > >workflows, i.e DAG of DAGs. using these APIs. >>>>> >> >> > > > > > >>>>> >> >> > > > > >Regards, >>>>> >> >> > > > > >-Tushar. >>>>> >> >> > > > > >>>>> >> >> > > > > ________________________________________________________ >>>>> >> >> > > > > >>>>> >> >> > > > > The information contained in this e-mail is confidential >>>>> and/or >>>>> >> >> > > > > proprietary to Capital One and/or its affiliates and may >>>>> only be >>>>> >> >> used >>>>> >> >> > > > > solely in performance of work or services for Capital One. >>>>> The >>>>> >> >> > > > information >>>>> >> >> > > > > transmitted herewith is intended only for use by the >>>>> individual >>>>> >> >>or >>>>> >> >> > > entity >>>>> >> >> > > > > to which it is addressed. If the reader of this message is >>>>> not >>>>> >> >>the >>>>> >> >> > > > intended >>>>> >> >> > > > > recipient, you are hereby notified that any review, >>>>> >> >>retransmission, >>>>> >> >> > > > > dissemination, distribution, copying or other use of, or >>>>> taking >>>>> >> >>of >>>>> >> >> > any >>>>> >> >> > > > > action in reliance upon this information is strictly >>>>> >> >>prohibited. If >>>>> >> >> > you >>>>> >> >> > > > > have received this communication in error, please contact >>>>> >> >> > > > > the >>>>> >> >> sender >>>>> >> >> > > and >>>>> >> >> > > > > delete the material from your computer. >>>>> >> >> > > > > >>>>> >> >> > > > > >>>>> >> >> > > > >>>>> >> >> > > >>>>> >> >> > >>>>> >> >> >>>>> >> >>>>> >> ________________________________________________________ >>>>> >> >>>>> >> The information contained in this e-mail is confidential and/or >>>>> >> proprietary to Capital One and/or its affiliates and may only be used >>>>> >> solely in performance of work or services for Capital One. The >>>>> information >>>>> >> transmitted herewith is intended only for use by the individual or >>>>> entity >>>>> >> to which it is addressed. If the reader of this message is not the >>>>> intended >>>>> >> recipient, you are hereby notified that any review, retransmission, >>>>> >> dissemination, distribution, copying or other use of, or taking of any >>>>> >> action in reliance upon this information is strictly prohibited. If you >>>>> >> have received this communication in error, please contact the sender >>>>> >> and >>>>> >> delete the material from your computer. >>>>> >> >>>>> ________________________________________________________ >>>>> >>>>> The information contained in this e-mail is confidential and/or >>>>> proprietary to Capital One and/or its affiliates and may only be used >>>>> solely in performance of work or services for Capital One. The information >>>>> transmitted herewith is intended only for use by the individual or entity >>>>> to which it is addressed. If the reader of this message is not the >>>>> intended >>>>> recipient, you are hereby notified that any review, retransmission, >>>>> dissemination, distribution, copying or other use of, or taking of any >>>>> action in reliance upon this information is strictly prohibited. If you >>>>> have received this communication in error, please contact the sender and >>>>> delete the material from your computer. >>>>>