Hi All,

I have opened a review pull request for dynamic dag modification
through stats listener (https://github.com/apache/apex-core/pull/393).
Please review and provide
comments/suggestions.

It provides following functionality
- StatsListener can access the opearator name for easily detecting
which opearator stats are being processed.
- StatsListener can create a instance of object through which it can
submit dag modifications to the engine.
- StatsListener can return dag changes as a response to engine.
- PlanModifier is modified to take a DAG and apply it on the existing
running DAG and deploy the changes.

The following functionality is not working yet.

- The new opearator does not start from the correct windowId
(https://issues.apache.org/jira/browse/APEXCORE-532)
- Relanched application failed to start when it was killed after
dynamic dag modification.
- There is no support for resuming operator from previous state when
they were removed. This could be achived through
  readig state through external storage on setup.
- persist operator support is not present for newly added streams.

The demo application using the feature is available at
https://github.com/tushargosavi/apex-dynamic-scheduling

There are two variations of WordCount application. The first variation
detects the presence of
new files and start a disconnected DAG to process the data.
(https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/WordCountApp.java)

The second application
(https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java),
starts with reader operator, and provides pendingFiles as auto-metric
to stat listener. On detecting pending files it attaches splitter
counter and output
operator to the read operator. Once files are processed the splitter,
counter and output operators are removed and
added back again if new data files are added into the directory.

Regards,
-Tushar.


On Mon, Aug 1, 2016 at 6:10 PM, Tushar Gosavi <tus...@datatorrent.com> wrote:
> Hi All,
>
> I was able to prototype an simple word count application, which will
> start with just a single file reader operator. File reader operator
> will emit pendingFiles as metric to StatsListener. The statslistener
> will change DAG once enough files are available. The listener will return
> plan change to add word splitter, counter and console operator to the
> reader and complete the DAG for wordcount.
>
> After 120 windows of inactivity, the three operators will be removed
> from DAG again. When new set of files are added these operators are
> added back again.
>
> The high level proposal document:
> https://docs.google.com/document/d/1gpy7NFxd6te1lTXN1-2r8WHRjg83zEM8aqpPAXivFRQ/edit?usp=sharing
>
> The prototype code is at :
> https://github.com/tushargosavi/apex-dynamic-scheduling/
> The Application files are
> https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/FileStatListenerSameDag.java
> https://github.com/tushargosavi/apex-dynamic-scheduling/blob/master/src/main/java/com/datatorrent/wordcount/ExtendApp.java
>
> Please provide your feedback.
>
> Some challenges yet to resolve are
> - Restoring operator state from previously removed operator.
> - Handling cuncurrent modifications to DAG from multiple StatsListener.
> - Making DAG changes persistent, user should be able to restart the
> application if application was killed with modified dag.
>
> Thanks,
> -Tushar.
>
> On Fri, Jul 8, 2016 at 6:48 PM, Tushar Gosavi <tus...@datatorrent.com> wrote:
>> Hi All,
>>
>> I have dome some initial prototype which allows stat listener to
>> specify dag changes, and the dag changes are applied asynchronously.
>>
>> The changes involved are
>> - Add DagChangeSet object which is inherited from DAG, supporting
>> methods to remove
>>   operator and streams.
>>
>> - The stat listener will return this object in Response, and platform
>> will apply changes specified in response to the DAG.
>>
>>
>> The Apex changes
>> https://github.com/apache/apex-core/compare/master...tushargosavi:scheduler?expand=1
>>
>> The correspondign Demo application, which one operator monitors the
>> directory for files, and launch the wordcount DAG in
>> same application master when files are available.
>> https://github.com/tushargosavi/incubator-apex-malhar/tree/178ad0c763b48b32dfb1041d4d1c6d5da5fbb7fb/demos/wordcount/src/main/java/com/datatorrent/demos/wordcount/schedular
>>
>> Example of stat listerner which monitors a metric and instruct master
>> to start a dag.
>>
>>  /** look for more than 100 files in a directory, before lauching the DAG */
>> @Override
>>   public Response processStats(BatchedOperatorStats stats)
>>   {
>>     for(Stats.OperatorStats ws: stats.getLastWindowedStats()) {
>>       // pendingFiles is autometric.
>>       Integer value = (Integer)ws.metrics.get("pendingFiles");
>>       LOG.info("stats recevied for {} pendingFiles {}",
>> stats.getOperatorId(), value);
>>       if (value != null  && value > 100 && !dagStarted) {
>>         dagStarted = true;
>>         Response resp = new Response();
>>         resp.dag = getWordCountDag((String)ws.metrics.get("directory"));
>>         counter = 0;
>>         return resp;
>>       }
>>     }
>>     return null;
>>   }
>>
>>   DAGChangeSet getWordCountDag(String dir)
>>     {
>>       DAGChangeSet dag = new DAGChangeSet();
>>       LineByLineFileInputOperator reader = dag.addOperator("Reader",
>> new LineByLineFileInputOperator());
>>       List<StatsListener> listeners = new ArrayList<>();
>>       listeners.add(this);
>>       
>> dag.getMeta(reader).getAttributes().put(Context.OperatorContext.STATS_LISTENERS,
>> listeners);
>>       reader.setDirectory(dir);
>>       LineSplitter splitter = dag.addOperator("SplitteR", new 
>> LineSplitter());
>>       UniqueCounter<String> counter = dag.addOperator("Counter", new
>> UniqueCounter<String>());
>>       ConsoleOutputOperator out = dag.addOperator("Output", new
>> ConsoleOutputOperator());
>>       dag.addStream("s1", reader.output, splitter.input);
>>       dag.addStream("s2", splitter.words, counter.data);
>>       dag.addStream("s3", counter.count, out.input);
>>       return dag;
>>     }
>>
>> Let me know if this type of API is acceptable for launching the DAG.
>> This is an API to specify DAG changes. The scheduler functionality
>> will use
>> this API.
>>
>>
>> Regards,
>> -Tushar.
>>
>> On Thu, Jun 23, 2016 at 2:48 AM, Thomas Weise <tho...@datatorrent.com> wrote:
>>> I like the idea of keeping heavy lifting and custom code out of the master,
>>> if possible. You find that split in responsibilities even in the case of
>>> partitioning (Kafka connector for example). The change that requires
>>> partitioning may be detected as byproduct of the regular processing in the
>>> container, the information relayed to the master, the action being taken
>>> there.
>>>
>>> We should separate all the different pieces and then decide where they run.
>>> There is detecting the need for a plan change, then effecting the change
>>> (which requires full DAG view and absolutely has to/should be in the
>>> master).
>>>
>>> Thomas
>>>
>>> On Wed, Jun 22, 2016 at 12:03 PM, Singh, Chandni <
>>> chandni.si...@capitalone.com> wrote:
>>>
>>>> We have couple of components that already run in  master - partitioners,
>>>> stats listeners,  metrics aggregators.  The problem of crashing the master
>>>> is not specific to just scheduler, isn't it?
>>>> ________________________________
>>>> From: Tushar Gosavi <tus...@datatorrent.com>
>>>> Sent: Wednesday, June 22, 2016 2:32:39 PM
>>>> To: dev@apex.apache.org
>>>> Subject: Re: APEXCORE-408 : Ability to schedule Sub-DAG from running
>>>> application
>>>>
>>>> I was thinking about avoiding running user code in master, As a crash
>>>> in master takes down all containers with it. hence was going for
>>>> scheduler as an operator, crash in scheduler won't kill the
>>>> application, master can restart the scheduler back and it can start
>>>> monitoring the job again and change the DAG when required. But this
>>>> will require communication between master and scheduler for monitoring
>>>> of operator status/stats.
>>>>
>>>> It is considerably easy to put scheduling functionality in master, as
>>>> we have access to operator stats and there is communication channel
>>>> already opened between master and operators. And custom scheduler can
>>>> be written as shared stat listener, with additional API available to
>>>> listener to add/remove/deploy/undeploy etc.. operators.
>>>>
>>>> Regards,
>>>> - Tushar.
>>>>
>>>>
>>>> On Wed, Jun 22, 2016 at 11:02 PM, Thomas Weise <tho...@datatorrent.com>
>>>> wrote:
>>>> > Right, if it runs in the app master and does not rely on unmanaged
>>>> external
>>>> > processes, then these requirements can be met.
>>>> >
>>>> > This capability seeks to avoid users having to deal with external
>>>> > schedulers or workflows if all they want is to split a DAG that is
>>>> > logically one application into multiple stages for resource optimization.
>>>> > This is not very different from the need to have elasticity in terms of
>>>> > partitions depending to the availability of input, as you point out.
>>>> >
>>>> >
>>>> > On Wed, Jun 22, 2016 at 10:23 AM, Singh, Chandni <
>>>> > chandni.si...@capitalone.com> wrote:
>>>> >
>>>> >> Scheduling IMO belongs to App master. Operators can influence it, for
>>>> eg.
>>>> >> File splitter can indicate that no more file to process.
>>>> >>
>>>> >> I don’t understand how that can not integrate with all the aspects-
>>>> >> operability, fault tolerance and security.
>>>> >>
>>>> >> Chandni
>>>> >>
>>>> >> On 6/22/16, 10:08 AM, "Chinmay Kolhatkar" <chin...@datatorrent.com>
>>>> wrote:
>>>> >>
>>>> >> >I think its a good idea to have a scheduling operator when you need to
>>>> >> >start a part of the DAG when some trigger happens (for eg. FileSplitter
>>>> >> >identifying new files in FS) and otherwise bring it down to save
>>>> >> >resources.
>>>> >> >
>>>> >> >On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
>>>> >> >timothytiborfar...@gmail.com> wrote:
>>>> >> >
>>>> >> >> I am in agreement with Chandni. Scheduling a batch job is an API
>>>> >> >>completely
>>>> >> >> independent of a DAG or an operator. It could be used by a
>>>> commandline
>>>> >> >>tool
>>>> >> >> running on your laptop, a script, or it could happen to be used by an
>>>> >> >> Operator running in a DAG and a StatsListener.
>>>> >> >>
>>>> >> >> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <
>>>> tho...@datatorrent.com>
>>>> >> >> wrote:
>>>> >> >>
>>>> >> >> > Scheduling can be independent, although we have use cases where the
>>>> >> >> > scheduling depends on completion of processing (multi-staged batch
>>>> >> >>jobs
>>>> >> >> > where unused resources need to be freed).
>>>> >> >> >
>>>> >> >> > Both can be accomplished with a stats listener.
>>>> >> >> >
>>>> >> >> > There can be a "scheduling operator" that brings up and removes DAG
>>>> >> >> > fragments as needed.
>>>> >> >> >
>>>> >> >> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh
>>>> >> >><singh.chan...@gmail.com>
>>>> >> >> > wrote:
>>>> >> >> >
>>>> >> >> > > Hi,
>>>> >> >> > > IMO scheduling a job can be independent of any operator while
>>>> >> >> > > StatsListeners are not.  I understand that in a lot of cases
>>>> >> >> input/output
>>>> >> >> > > operators will decide when the job ends but there can be cases
>>>> when
>>>> >> >> > > scheduling can be independent of it.
>>>> >> >> > >
>>>> >> >> > > Thanks,
>>>> >> >> > > Chandni
>>>> >> >> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <tho...@datatorrent.com
>>>> >
>>>> >> >> wrote:
>>>> >> >> > >
>>>> >> >> > > > This looks like something that coordination wise belongs into
>>>> the
>>>> >> >> > master
>>>> >> >> > > > and can be done with a shared stats listener.
>>>> >> >> > > >
>>>> >> >> > > > The operator request/response protocol could be used the relay
>>>> the
>>>> >> >> data
>>>> >> >> > > for
>>>> >> >> > > > the scheduling decisions.
>>>> >> >> > > >
>>>> >> >> > > > Thomas
>>>> >> >> > > >
>>>> >> >> > > >
>>>> >> >> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
>>>> >> >> > > > chandni.si...@capitalone.com> wrote:
>>>> >> >> > > >
>>>> >> >> > > > > Hi Tushar,
>>>> >> >> > > > >
>>>> >> >> > > > > I have some questions about the use case 2: Batch Support
>>>> >> >> > > > > I don¹t understand the advantages of providing batch support
>>>> by
>>>> >> >> > having
>>>> >> >> > > an
>>>> >> >> > > > > operator as a scheduler.
>>>> >> >> > > > >
>>>> >> >> > > > > An approach that seemed a little more straightforward to me
>>>> was
>>>> >> >>to
>>>> >> >> > > expose
>>>> >> >> > > > > an API for scheduler. If there is a scheduler set then the
>>>> >> >>master
>>>> >> >> > uses
>>>> >> >> > > > and
>>>> >> >> > > > > schedules operators. By default there isn¹t any scheduler and
>>>> >> >>the
>>>> >> >> job
>>>> >> >> > > is
>>>> >> >> > > > > run as it is now.
>>>> >> >> > > > >
>>>> >> >> > > > > Maybe this is too simplistic but can you please let me know
>>>> why
>>>> >> >> > having
>>>> >> >> > > an
>>>> >> >> > > > > operator as a scheduler is a better way?
>>>> >> >> > > > >
>>>> >> >> > > > > Thanks,
>>>> >> >> > > > > Chandni
>>>> >> >> > > > >
>>>> >> >> > > > >
>>>> >> >> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <
>>>> tus...@datatorrent.com>
>>>> >> >> > wrote:
>>>> >> >> > > > >
>>>> >> >> > > > > >Hi All,
>>>> >> >> > > > > >
>>>> >> >> > > > > >We have seen few use cases in field which require Apex
>>>> >> >>application
>>>> >> >> > > > > >scheduling based on some condition. This has also came up as
>>>> >> >>part
>>>> >> >> of
>>>> >> >> > > > > >Batch Support in Apex previously
>>>> >> >> > > > > >(
>>>> >> >> > > > >
>>>> >> >> > > >
>>>> >> >> > >
>>>> >> >> >
>>>> >> >>
>>>> >> >>
>>>> >>
>>>> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
>>>> >> >> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
>>>> >> >> 40mail.gmail.com
>>>> >> >> > > %3E)
>>>> >> >> > > > > >. I am proposing following functionality in Apex to help
>>>> >> >> scheduling
>>>> >> >> > > > > >and better resource utilization for batch jobs. Please
>>>> provide
>>>> >> >> your
>>>> >> >> > > > > >comments.
>>>> >> >> > > > > >
>>>> >> >> > > > > >Usecase 1 - Dynamic Dag modification.
>>>> >> >> > > > > >
>>>> >> >> > > > > >Each operator in DAG consumes yarn resources, sometimes it
>>>> is
>>>> >> >> > > > > >desirable to return the resources to yarn when no data is
>>>> >> >> available
>>>> >> >> > > > > >for processing, and deploy whole DAG once data starts to
>>>> >> >>appear.
>>>> >> >> For
>>>> >> >> > > > > >this to happen automatically, we will need some data
>>>> monitoring
>>>> >> >> > > > > >operators running in the DAG to trigger restart and
>>>> shutdown of
>>>> >> >> the
>>>> >> >> > > > > >operators in the DAG.
>>>> >> >> > > > > >
>>>> >> >> > > > > >Apex already have such api to dynamically change the running
>>>> >> >>dag
>>>> >> >> > > > > >through cli. We could provide similar API available to
>>>> >> >>operators
>>>> >> >> > which
>>>> >> >> > > > > >will trigger dag modification at runtime. This information
>>>> can
>>>> >> >>be
>>>> >> >> > > > > >passed to master using heartbeat RPC and master will make
>>>> >> >> > > > > >required changed to the DAG. let me know what do you think
>>>> >> >>about
>>>> >> >> > it..
>>>> >> >> > > > > >something like below.
>>>> >> >> > > > > >Context.beginDagChange();
>>>> >> >> > > > > >context.addOperator("o1") <== launch operator from previous
>>>> >> >> > > > check-pointed
>>>> >> >> > > > > >state.
>>>> >> >> > > > > >context.addOperator("o2", new Operator2()) <== create new
>>>> >> >>operator
>>>> >> >> > > > > >context.addStream("s1", "reader.output", "o1.input");
>>>> >> >> > > > > >context.shutdown("o3"); <== delete this and downstream
>>>> >> >>operators
>>>> >> >> > from
>>>> >> >> > > > the
>>>> >> >> > > > > >DAG.
>>>> >> >> > > > > >context.apply();  <== dag changes will be send to master,
>>>> and
>>>> >> >> master
>>>> >> >> > > > > >will apply these changes.
>>>> >> >> > > > > >
>>>> >> >> > > > > >Similarly API for other functionalities such as locality
>>>> >> >>settings
>>>> >> >> > > > > >needs to be provided.
>>>> >> >> > > > > >
>>>> >> >> > > > > >
>>>> >> >> > > > > >Usecase 2 - Classic Batch Scheduling.
>>>> >> >> > > > > >
>>>> >> >> > > > > >Provide an API callable from operator to launch a DAG. The
>>>> >> >> operator
>>>> >> >> > > > > >will prepare an dag object and submit it to the yarn, the
>>>> DAG
>>>> >> >>will
>>>> >> >> > be
>>>> >> >> > > > > >scheduled as a new application. This way complex schedulers
>>>> >> >>can be
>>>> >> >> > > > > >written as operators.
>>>> >> >> > > > > >
>>>> >> >> > > > > >public SchedulerOperator implements Operator {
>>>> >> >> > > > > >   void handleIdleTime() {
>>>> >> >> > > > > >      // check of conditions to start a job (for example
>>>> enough
>>>> >> >> > files
>>>> >> >> > > > > >available, enough items are available in kafa, or time has
>>>> >> >>reached
>>>> >> >> > > > > >     Dag dag = context.createDAG();
>>>> >> >> > > > > >     dag.addOperator();
>>>> >> >> > > > > >     dag.addOperator();
>>>> >> >> > > > > >     LaunchOptions lOptions = new LaunchOptions();
>>>> >> >> > > > > >     lOptions.oldId = ""; // start for this checkpoint.
>>>> >> >> > > > > >     DagHandler dagHandler = context.submit(dag, lOptions);
>>>> >> >> > > > > >   }
>>>> >> >> > > > > >}
>>>> >> >> > > > > >
>>>> >> >> > > > > >DagHandler will have methods to monitor the final state of
>>>> >> >> > > > > >application, or to kill the DAG
>>>> >> >> > > > > >dagHandler.waitForCompletion() <== wait till the DAG
>>>> terminates
>>>> >> >> > > > > >dagHandler.status()  <== get the status of application.
>>>> >> >> > > > > >dagHandler.kill() <== kill the running application.
>>>> >> >> > > > > >dagHandler.shutdown() <== shutdown the application.
>>>> >> >> > > > > >
>>>> >> >> > > > > >The more complex Scheduler operators could be written to
>>>> manage
>>>> >> >> the
>>>> >> >> > > > > >workflows, i.e DAG of DAGs. using these APIs.
>>>> >> >> > > > > >
>>>> >> >> > > > > >Regards,
>>>> >> >> > > > > >-Tushar.
>>>> >> >> > > > >
>>>> >> >> > > > > ________________________________________________________
>>>> >> >> > > > >
>>>> >> >> > > > > The information contained in this e-mail is confidential
>>>> and/or
>>>> >> >> > > > > proprietary to Capital One and/or its affiliates and may
>>>> only be
>>>> >> >> used
>>>> >> >> > > > > solely in performance of work or services for Capital One.
>>>> The
>>>> >> >> > > > information
>>>> >> >> > > > > transmitted herewith is intended only for use by the
>>>> individual
>>>> >> >>or
>>>> >> >> > > entity
>>>> >> >> > > > > to which it is addressed. If the reader of this message is
>>>> not
>>>> >> >>the
>>>> >> >> > > > intended
>>>> >> >> > > > > recipient, you are hereby notified that any review,
>>>> >> >>retransmission,
>>>> >> >> > > > > dissemination, distribution, copying or other use of, or
>>>> taking
>>>> >> >>of
>>>> >> >> > any
>>>> >> >> > > > > action in reliance upon this information is strictly
>>>> >> >>prohibited. If
>>>> >> >> > you
>>>> >> >> > > > > have received this communication in error, please contact the
>>>> >> >> sender
>>>> >> >> > > and
>>>> >> >> > > > > delete the material from your computer.
>>>> >> >> > > > >
>>>> >> >> > > > >
>>>> >> >> > > >
>>>> >> >> > >
>>>> >> >> >
>>>> >> >>
>>>> >>
>>>> >> ________________________________________________________
>>>> >>
>>>> >> The information contained in this e-mail is confidential and/or
>>>> >> proprietary to Capital One and/or its affiliates and may only be used
>>>> >> solely in performance of work or services for Capital One. The
>>>> information
>>>> >> transmitted herewith is intended only for use by the individual or
>>>> entity
>>>> >> to which it is addressed. If the reader of this message is not the
>>>> intended
>>>> >> recipient, you are hereby notified that any review, retransmission,
>>>> >> dissemination, distribution, copying or other use of, or taking of any
>>>> >> action in reliance upon this information is strictly prohibited. If you
>>>> >> have received this communication in error, please contact the sender and
>>>> >> delete the material from your computer.
>>>> >>
>>>> ________________________________________________________
>>>>
>>>> The information contained in this e-mail is confidential and/or
>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>> solely in performance of work or services for Capital One. The information
>>>> transmitted herewith is intended only for use by the individual or entity
>>>> to which it is addressed. If the reader of this message is not the intended
>>>> recipient, you are hereby notified that any review, retransmission,
>>>> dissemination, distribution, copying or other use of, or taking of any
>>>> action in reliance upon this information is strictly prohibited. If you
>>>> have received this communication in error, please contact the sender and
>>>> delete the material from your computer.
>>>>

Reply via email to