Hi All, I have started working on this again. The top level design document is located at
https://docs.google.com/document/d/1OugxutMYI-JwB9Z7hxjTWwiAKQsqbYPujDmgwlpPOXM/edit?usp=sharing The WIP branch is https://github.com/tushargosavi/apex-core/tree/dag_schedule.redesign @Thomas can you please review the document and comment on design approach. Regards, -Tushar. On Tue, Jan 24, 2017 at 4:47 PM, Tushar Gosavi <[email protected]> wrote: > Hi All, > > I have updated the design document as per review comments on pull > request (https://issues.apache.org/jira/secure/attachment/ > 12849094/dag.docx). > Please provide your suggestion. > > - Tushar. > > > On Fri, Sep 16, 2016 at 6:40 PM, Tushar Gosavi <[email protected]> > wrote: > > Hi All, > > > > I have opened a review pull request for dynamic dag modification > > through stats listener (https://github.com/apache/apex-core/pull/393). > > Please review and provide > > comments/suggestions. > > > > It provides following functionality > > - StatsListener can access the opearator name for easily detecting > > which opearator stats are being processed. > > - StatsListener can create a instance of object through which it can > > submit dag modifications to the engine. > > - StatsListener can return dag changes as a response to engine. > > - PlanModifier is modified to take a DAG and apply it on the existing > > running DAG and deploy the changes. > > > > The following functionality is not working yet. > > > > - The new opearator does not start from the correct windowId > > (https://issues.apache.org/jira/browse/APEXCORE-532) > > - Relanched application failed to start when it was killed after > > dynamic dag modification. > > - There is no support for resuming operator from previous state when > > they were removed. This could be achived through > > readig state through external storage on setup. > > - persist operator support is not present for newly added streams. > > > > The demo application using the feature is available at > > https://github.com/tushargosavi/apex-dynamic-scheduling > > > > There are two variations of WordCount application. The first variation > > detects the presence of > > new files and start a disconnected DAG to process the data. > > (https://github.com/tushargosavi/apex-dynamic- > scheduling/blob/master/src/main/java/com/datatorrent/ > wordcount/WordCountApp.java) > > > > The second application > > (https://github.com/tushargosavi/apex-dynamic- > scheduling/blob/master/src/main/java/com/datatorrent/ > wordcount/ExtendApp.java), > > starts with reader operator, and provides pendingFiles as auto-metric > > to stat listener. On detecting pending files it attaches splitter > > counter and output > > operator to the read operator. Once files are processed the splitter, > > counter and output operators are removed and > > added back again if new data files are added into the directory. > > > > Regards, > > -Tushar. > > > > > > On Mon, Aug 1, 2016 at 6:10 PM, Tushar Gosavi <[email protected]> > wrote: > >> Hi All, > >> > >> I was able to prototype an simple word count application, which will > >> start with just a single file reader operator. File reader operator > >> will emit pendingFiles as metric to StatsListener. The statslistener > >> will change DAG once enough files are available. The listener will > return > >> plan change to add word splitter, counter and console operator to the > >> reader and complete the DAG for wordcount. > >> > >> After 120 windows of inactivity, the three operators will be removed > >> from DAG again. When new set of files are added these operators are > >> added back again. > >> > >> The high level proposal document: > >> https://docs.google.com/document/d/1gpy7NFxd6te1lTXN1- > 2r8WHRjg83zEM8aqpPAXivFRQ/edit?usp=sharing > >> > >> The prototype code is at : > >> https://github.com/tushargosavi/apex-dynamic-scheduling/ > >> The Application files are > >> https://github.com/tushargosavi/apex-dynamic- > scheduling/blob/master/src/main/java/com/datatorrent/wordcount/ > FileStatListenerSameDag.java > >> https://github.com/tushargosavi/apex-dynamic- > scheduling/blob/master/src/main/java/com/datatorrent/ > wordcount/ExtendApp.java > >> > >> Please provide your feedback. > >> > >> Some challenges yet to resolve are > >> - Restoring operator state from previously removed operator. > >> - Handling cuncurrent modifications to DAG from multiple StatsListener. > >> - Making DAG changes persistent, user should be able to restart the > >> application if application was killed with modified dag. > >> > >> Thanks, > >> -Tushar. > >> > >> On Fri, Jul 8, 2016 at 6:48 PM, Tushar Gosavi <[email protected]> > wrote: > >>> Hi All, > >>> > >>> I have dome some initial prototype which allows stat listener to > >>> specify dag changes, and the dag changes are applied asynchronously. > >>> > >>> The changes involved are > >>> - Add DagChangeSet object which is inherited from DAG, supporting > >>> methods to remove > >>> operator and streams. > >>> > >>> - The stat listener will return this object in Response, and platform > >>> will apply changes specified in response to the DAG. > >>> > >>> > >>> The Apex changes > >>> https://github.com/apache/apex-core/compare/master... > tushargosavi:scheduler?expand=1 > >>> > >>> The correspondign Demo application, which one operator monitors the > >>> directory for files, and launch the wordcount DAG in > >>> same application master when files are available. > >>> https://github.com/tushargosavi/incubator-apex-malhar/tree/ > 178ad0c763b48b32dfb1041d4d1c6d5da5fbb7fb/demos/wordcount/ > src/main/java/com/datatorrent/demos/wordcount/schedular > >>> > >>> Example of stat listerner which monitors a metric and instruct master > >>> to start a dag. > >>> > >>> /** look for more than 100 files in a directory, before lauching the > DAG */ > >>> @Override > >>> public Response processStats(BatchedOperatorStats stats) > >>> { > >>> for(Stats.OperatorStats ws: stats.getLastWindowedStats()) { > >>> // pendingFiles is autometric. > >>> Integer value = (Integer)ws.metrics.get("pendingFiles"); > >>> LOG.info("stats recevied for {} pendingFiles {}", > >>> stats.getOperatorId(), value); > >>> if (value != null && value > 100 && !dagStarted) { > >>> dagStarted = true; > >>> Response resp = new Response(); > >>> resp.dag = getWordCountDag((String)ws. > metrics.get("directory")); > >>> counter = 0; > >>> return resp; > >>> } > >>> } > >>> return null; > >>> } > >>> > >>> DAGChangeSet getWordCountDag(String dir) > >>> { > >>> DAGChangeSet dag = new DAGChangeSet(); > >>> LineByLineFileInputOperator reader = dag.addOperator("Reader", > >>> new LineByLineFileInputOperator()); > >>> List<StatsListener> listeners = new ArrayList<>(); > >>> listeners.add(this); > >>> dag.getMeta(reader).getAttributes().put(Context. > OperatorContext.STATS_LISTENERS, > >>> listeners); > >>> reader.setDirectory(dir); > >>> LineSplitter splitter = dag.addOperator("SplitteR", new > LineSplitter()); > >>> UniqueCounter<String> counter = dag.addOperator("Counter", new > >>> UniqueCounter<String>()); > >>> ConsoleOutputOperator out = dag.addOperator("Output", new > >>> ConsoleOutputOperator()); > >>> dag.addStream("s1", reader.output, splitter.input); > >>> dag.addStream("s2", splitter.words, counter.data); > >>> dag.addStream("s3", counter.count, out.input); > >>> return dag; > >>> } > >>> > >>> Let me know if this type of API is acceptable for launching the DAG. > >>> This is an API to specify DAG changes. The scheduler functionality > >>> will use > >>> this API. > >>> > >>> > >>> Regards, > >>> -Tushar. > >>> > >>> On Thu, Jun 23, 2016 at 2:48 AM, Thomas Weise <[email protected]> > wrote: > >>>> I like the idea of keeping heavy lifting and custom code out of the > master, > >>>> if possible. You find that split in responsibilities even in the case > of > >>>> partitioning (Kafka connector for example). The change that requires > >>>> partitioning may be detected as byproduct of the regular processing > in the > >>>> container, the information relayed to the master, the action being > taken > >>>> there. > >>>> > >>>> We should separate all the different pieces and then decide where > they run. > >>>> There is detecting the need for a plan change, then effecting the > change > >>>> (which requires full DAG view and absolutely has to/should be in the > >>>> master). > >>>> > >>>> Thomas > >>>> > >>>> On Wed, Jun 22, 2016 at 12:03 PM, Singh, Chandni < > >>>> [email protected]> wrote: > >>>> > >>>>> We have couple of components that already run in master - > partitioners, > >>>>> stats listeners, metrics aggregators. The problem of crashing the > master > >>>>> is not specific to just scheduler, isn't it? > >>>>> ________________________________ > >>>>> From: Tushar Gosavi <[email protected]> > >>>>> Sent: Wednesday, June 22, 2016 2:32:39 PM > >>>>> To: [email protected] > >>>>> Subject: Re: APEXCORE-408 : Ability to schedule Sub-DAG from running > >>>>> application > >>>>> > >>>>> I was thinking about avoiding running user code in master, As a crash > >>>>> in master takes down all containers with it. hence was going for > >>>>> scheduler as an operator, crash in scheduler won't kill the > >>>>> application, master can restart the scheduler back and it can start > >>>>> monitoring the job again and change the DAG when required. But this > >>>>> will require communication between master and scheduler for > monitoring > >>>>> of operator status/stats. > >>>>> > >>>>> It is considerably easy to put scheduling functionality in master, as > >>>>> we have access to operator stats and there is communication channel > >>>>> already opened between master and operators. And custom scheduler can > >>>>> be written as shared stat listener, with additional API available to > >>>>> listener to add/remove/deploy/undeploy etc.. operators. > >>>>> > >>>>> Regards, > >>>>> - Tushar. > >>>>> > >>>>> > >>>>> On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise < > [email protected]> > >>>>> wrote: > >>>>> > Right, if it runs in the app master and does not rely on unmanaged > >>>>> external > >>>>> > processes, then these requirements can be met. > >>>>> > > >>>>> > This capability seeks to avoid users having to deal with external > >>>>> > schedulers or workflows if all they want is to split a DAG that is > >>>>> > logically one application into multiple stages for resource > optimization. > >>>>> > This is not very different from the need to have elasticity in > terms of > >>>>> > partitions depending to the availability of input, as you point > out. > >>>>> > > >>>>> > > >>>>> > On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni < > >>>>> > [email protected]> wrote: > >>>>> > > >>>>> >> Scheduling IMO belongs to App master. Operators can influence it, > for > >>>>> eg. > >>>>> >> File splitter can indicate that no more file to process. > >>>>> >> > >>>>> >> I don’t understand how that can not integrate with all the > aspects- > >>>>> >> operability, fault tolerance and security. > >>>>> >> > >>>>> >> Chandni > >>>>> >> > >>>>> >> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" < > [email protected]> > >>>>> wrote: > >>>>> >> > >>>>> >> >I think its a good idea to have a scheduling operator when you > need to > >>>>> >> >start a part of the DAG when some trigger happens (for eg. > FileSplitter > >>>>> >> >identifying new files in FS) and otherwise bring it down to save > >>>>> >> >resources. > >>>>> >> > > >>>>> >> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas < > >>>>> >> >[email protected]> wrote: > >>>>> >> > > >>>>> >> >> I am in agreement with Chandni. Scheduling a batch job is an > API > >>>>> >> >>completely > >>>>> >> >> independent of a DAG or an operator. It could be used by a > >>>>> commandline > >>>>> >> >>tool > >>>>> >> >> running on your laptop, a script, or it could happen to be > used by an > >>>>> >> >> Operator running in a DAG and a StatsListener. > >>>>> >> >> > >>>>> >> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise < > >>>>> [email protected]> > >>>>> >> >> wrote: > >>>>> >> >> > >>>>> >> >> > Scheduling can be independent, although we have use cases > where the > >>>>> >> >> > scheduling depends on completion of processing (multi-staged > batch > >>>>> >> >>jobs > >>>>> >> >> > where unused resources need to be freed). > >>>>> >> >> > > >>>>> >> >> > Both can be accomplished with a stats listener. > >>>>> >> >> > > >>>>> >> >> > There can be a "scheduling operator" that brings up and > removes DAG > >>>>> >> >> > fragments as needed. > >>>>> >> >> > > >>>>> >> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh > >>>>> >> >><[email protected]> > >>>>> >> >> > wrote: > >>>>> >> >> > > >>>>> >> >> > > Hi, > >>>>> >> >> > > IMO scheduling a job can be independent of any operator > while > >>>>> >> >> > > StatsListeners are not. I understand that in a lot of > cases > >>>>> >> >> input/output > >>>>> >> >> > > operators will decide when the job ends but there can be > cases > >>>>> when > >>>>> >> >> > > scheduling can be independent of it. > >>>>> >> >> > > > >>>>> >> >> > > Thanks, > >>>>> >> >> > > Chandni > >>>>> >> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" < > [email protected] > >>>>> > > >>>>> >> >> wrote: > >>>>> >> >> > > > >>>>> >> >> > > > This looks like something that coordination wise belongs > into > >>>>> the > >>>>> >> >> > master > >>>>> >> >> > > > and can be done with a shared stats listener. > >>>>> >> >> > > > > >>>>> >> >> > > > The operator request/response protocol could be used the > relay > >>>>> the > >>>>> >> >> data > >>>>> >> >> > > for > >>>>> >> >> > > > the scheduling decisions. > >>>>> >> >> > > > > >>>>> >> >> > > > Thomas > >>>>> >> >> > > > > >>>>> >> >> > > > > >>>>> >> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni < > >>>>> >> >> > > > [email protected]> wrote: > >>>>> >> >> > > > > >>>>> >> >> > > > > Hi Tushar, > >>>>> >> >> > > > > > >>>>> >> >> > > > > I have some questions about the use case 2: Batch > Support > >>>>> >> >> > > > > I don¹t understand the advantages of providing batch > support > >>>>> by > >>>>> >> >> > having > >>>>> >> >> > > an > >>>>> >> >> > > > > operator as a scheduler. > >>>>> >> >> > > > > > >>>>> >> >> > > > > An approach that seemed a little more straightforward > to me > >>>>> was > >>>>> >> >>to > >>>>> >> >> > > expose > >>>>> >> >> > > > > an API for scheduler. If there is a scheduler set then > the > >>>>> >> >>master > >>>>> >> >> > uses > >>>>> >> >> > > > and > >>>>> >> >> > > > > schedules operators. By default there isn¹t any > scheduler and > >>>>> >> >>the > >>>>> >> >> job > >>>>> >> >> > > is > >>>>> >> >> > > > > run as it is now. > >>>>> >> >> > > > > > >>>>> >> >> > > > > Maybe this is too simplistic but can you please let me > know > >>>>> why > >>>>> >> >> > having > >>>>> >> >> > > an > >>>>> >> >> > > > > operator as a scheduler is a better way? > >>>>> >> >> > > > > > >>>>> >> >> > > > > Thanks, > >>>>> >> >> > > > > Chandni > >>>>> >> >> > > > > > >>>>> >> >> > > > > > >>>>> >> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" < > >>>>> [email protected]> > >>>>> >> >> > wrote: > >>>>> >> >> > > > > > >>>>> >> >> > > > > >Hi All, > >>>>> >> >> > > > > > > >>>>> >> >> > > > > >We have seen few use cases in field which require Apex > >>>>> >> >>application > >>>>> >> >> > > > > >scheduling based on some condition. This has also > came up as > >>>>> >> >>part > >>>>> >> >> of > >>>>> >> >> > > > > >Batch Support in Apex previously > >>>>> >> >> > > > > >( > >>>>> >> >> > > > > > >>>>> >> >> > > > > >>>>> >> >> > > > >>>>> >> >> > > >>>>> >> >> > >>>>> >> >> > >>>>> >> > >>>>> http://mail-archives.apache.org/mod_mbox/apex-dev/201602. > mbox/%3CCAKJfLDP > >>>>> >> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw% > >>>>> >> >> 40mail.gmail.com > >>>>> >> >> > > %3E) > >>>>> >> >> > > > > >. I am proposing following functionality in Apex to > help > >>>>> >> >> scheduling > >>>>> >> >> > > > > >and better resource utilization for batch jobs. Please > >>>>> provide > >>>>> >> >> your > >>>>> >> >> > > > > >comments. > >>>>> >> >> > > > > > > >>>>> >> >> > > > > >Usecase 1 - Dynamic Dag modification. > >>>>> >> >> > > > > > > >>>>> >> >> > > > > >Each operator in DAG consumes yarn resources, > sometimes it > >>>>> is > >>>>> >> >> > > > > >desirable to return the resources to yarn when no > data is > >>>>> >> >> available > >>>>> >> >> > > > > >for processing, and deploy whole DAG once data starts > to > >>>>> >> >>appear. > >>>>> >> >> For > >>>>> >> >> > > > > >this to happen automatically, we will need some data > >>>>> monitoring > >>>>> >> >> > > > > >operators running in the DAG to trigger restart and > >>>>> shutdown of > >>>>> >> >> the > >>>>> >> >> > > > > >operators in the DAG. > >>>>> >> >> > > > > > > >>>>> >> >> > > > > >Apex already have such api to dynamically change the > running > >>>>> >> >>dag > >>>>> >> >> > > > > >through cli. We could provide similar API available to > >>>>> >> >>operators > >>>>> >> >> > which > >>>>> >> >> > > > > >will trigger dag modification at runtime. This > information > >>>>> can > >>>>> >> >>be > >>>>> >> >> > > > > >passed to master using heartbeat RPC and master will > make > >>>>> >> >> > > > > >required changed to the DAG. let me know what do you > think > >>>>> >> >>about > >>>>> >> >> > it.. > >>>>> >> >> > > > > >something like below. > >>>>> >> >> > > > > >Context.beginDagChange(); > >>>>> >> >> > > > > >context.addOperator("o1") <== launch operator from > previous > >>>>> >> >> > > > check-pointed > >>>>> >> >> > > > > >state. > >>>>> >> >> > > > > >context.addOperator("o2", new Operator2()) <== create > new > >>>>> >> >>operator > >>>>> >> >> > > > > >context.addStream("s1", "reader.output", "o1.input"); > >>>>> >> >> > > > > >context.shutdown("o3"); <== delete this and downstream > >>>>> >> >>operators > >>>>> >> >> > from > >>>>> >> >> > > > the > >>>>> >> >> > > > > >DAG. > >>>>> >> >> > > > > >context.apply(); <== dag changes will be send to > master, > >>>>> and > >>>>> >> >> master > >>>>> >> >> > > > > >will apply these changes. > >>>>> >> >> > > > > > > >>>>> >> >> > > > > >Similarly API for other functionalities such as > locality > >>>>> >> >>settings > >>>>> >> >> > > > > >needs to be provided. > >>>>> >> >> > > > > > > >>>>> >> >> > > > > > > >>>>> >> >> > > > > >Usecase 2 - Classic Batch Scheduling. > >>>>> >> >> > > > > > > >>>>> >> >> > > > > >Provide an API callable from operator to launch a > DAG. The > >>>>> >> >> operator > >>>>> >> >> > > > > >will prepare an dag object and submit it to the yarn, > the > >>>>> DAG > >>>>> >> >>will > >>>>> >> >> > be > >>>>> >> >> > > > > >scheduled as a new application. This way complex > schedulers > >>>>> >> >>can be > >>>>> >> >> > > > > >written as operators. > >>>>> >> >> > > > > > > >>>>> >> >> > > > > >public SchedulerOperator implements Operator { > >>>>> >> >> > > > > > void handleIdleTime() { > >>>>> >> >> > > > > > // check of conditions to start a job (for > example > >>>>> enough > >>>>> >> >> > files > >>>>> >> >> > > > > >available, enough items are available in kafa, or > time has > >>>>> >> >>reached > >>>>> >> >> > > > > > Dag dag = context.createDAG(); > >>>>> >> >> > > > > > dag.addOperator(); > >>>>> >> >> > > > > > dag.addOperator(); > >>>>> >> >> > > > > > LaunchOptions lOptions = new LaunchOptions(); > >>>>> >> >> > > > > > lOptions.oldId = ""; // start for this > checkpoint. > >>>>> >> >> > > > > > DagHandler dagHandler = context.submit(dag, > lOptions); > >>>>> >> >> > > > > > } > >>>>> >> >> > > > > >} > >>>>> >> >> > > > > > > >>>>> >> >> > > > > >DagHandler will have methods to monitor the final > state of > >>>>> >> >> > > > > >application, or to kill the DAG > >>>>> >> >> > > > > >dagHandler.waitForCompletion() <== wait till the DAG > >>>>> terminates > >>>>> >> >> > > > > >dagHandler.status() <== get the status of > application. > >>>>> >> >> > > > > >dagHandler.kill() <== kill the running application. > >>>>> >> >> > > > > >dagHandler.shutdown() <== shutdown the application. > >>>>> >> >> > > > > > > >>>>> >> >> > > > > >The more complex Scheduler operators could be written > to > >>>>> manage > >>>>> >> >> the > >>>>> >> >> > > > > >workflows, i.e DAG of DAGs. using these APIs. > >>>>> >> >> > > > > > > >>>>> >> >> > > > > >Regards, > >>>>> >> >> > > > > >-Tushar. > >>>>> >> >> > > > > > >>>>> >> >> > > > > ______________________________ > __________________________ > >>>>> >> >> > > > > > >>>>> >> >> > > > > The information contained in this e-mail is > confidential > >>>>> and/or > >>>>> >> >> > > > > proprietary to Capital One and/or its affiliates and > may > >>>>> only be > >>>>> >> >> used > >>>>> >> >> > > > > solely in performance of work or services for Capital > One. > >>>>> The > >>>>> >> >> > > > information > >>>>> >> >> > > > > transmitted herewith is intended only for use by the > >>>>> individual > >>>>> >> >>or > >>>>> >> >> > > entity > >>>>> >> >> > > > > to which it is addressed. If the reader of this > message is > >>>>> not > >>>>> >> >>the > >>>>> >> >> > > > intended > >>>>> >> >> > > > > recipient, you are hereby notified that any review, > >>>>> >> >>retransmission, > >>>>> >> >> > > > > dissemination, distribution, copying or other use of, > or > >>>>> taking > >>>>> >> >>of > >>>>> >> >> > any > >>>>> >> >> > > > > action in reliance upon this information is strictly > >>>>> >> >>prohibited. If > >>>>> >> >> > you > >>>>> >> >> > > > > have received this communication in error, please > contact the > >>>>> >> >> sender > >>>>> >> >> > > and > >>>>> >> >> > > > > delete the material from your computer. > >>>>> >> >> > > > > > >>>>> >> >> > > > > > >>>>> >> >> > > > > >>>>> >> >> > > > >>>>> >> >> > > >>>>> >> >> > >>>>> >> > >>>>> >> ________________________________________________________ > >>>>> >> > >>>>> >> The information contained in this e-mail is confidential and/or > >>>>> >> proprietary to Capital One and/or its affiliates and may only be > used > >>>>> >> solely in performance of work or services for Capital One. The > >>>>> information > >>>>> >> transmitted herewith is intended only for use by the individual or > >>>>> entity > >>>>> >> to which it is addressed. If the reader of this message is not the > >>>>> intended > >>>>> >> recipient, you are hereby notified that any review, > retransmission, > >>>>> >> dissemination, distribution, copying or other use of, or taking > of any > >>>>> >> action in reliance upon this information is strictly prohibited. > If you > >>>>> >> have received this communication in error, please contact the > sender and > >>>>> >> delete the material from your computer. > >>>>> >> > >>>>> ________________________________________________________ > >>>>> > >>>>> The information contained in this e-mail is confidential and/or > >>>>> proprietary to Capital One and/or its affiliates and may only be used > >>>>> solely in performance of work or services for Capital One. The > information > >>>>> transmitted herewith is intended only for use by the individual or > entity > >>>>> to which it is addressed. If the reader of this message is not the > intended > >>>>> recipient, you are hereby notified that any review, retransmission, > >>>>> dissemination, distribution, copying or other use of, or taking of > any > >>>>> action in reliance upon this information is strictly prohibited. If > you > >>>>> have received this communication in error, please contact the sender > and > >>>>> delete the material from your computer. > >>>>> >
