I think its a good idea to have a scheduling operator when you need to
start a part of the DAG when some trigger happens (for eg. FileSplitter
identifying new files in FS) and otherwise bring it down to save resources.

On Wed, Jun 22, 2016 at 9:54 AM, Timothy Farkas <
timothytiborfar...@gmail.com> wrote:

> I am in agreement with Chandni. Scheduling a batch job is an API completely
> independent of a DAG or an operator. It could be used by a commandline tool
> running on your laptop, a script, or it could happen to be used by an
> Operator running in a DAG and a StatsListener.
>
> On Wed, Jun 22, 2016 at 9:28 AM, Thomas Weise <tho...@datatorrent.com>
> wrote:
>
> > Scheduling can be independent, although we have use cases where the
> > scheduling depends on completion of processing (multi-staged batch jobs
> > where unused resources need to be freed).
> >
> > Both can be accomplished with a stats listener.
> >
> > There can be a "scheduling operator" that brings up and removes DAG
> > fragments as needed.
> >
> > On Tue, Jun 21, 2016 at 6:51 PM, Chandni Singh <singh.chan...@gmail.com>
> > wrote:
> >
> > > Hi,
> > > IMO scheduling a job can be independent of any operator while
> > > StatsListeners are not.  I understand that in a lot of cases
> input/output
> > > operators will decide when the job ends but there can be cases when
> > > scheduling can be independent of it.
> > >
> > > Thanks,
> > > Chandni
> > > On Jun 21, 2016 12:12 PM, "Thomas Weise" <tho...@datatorrent.com>
> wrote:
> > >
> > > > This looks like something that coordination wise belongs into the
> > master
> > > > and can be done with a shared stats listener.
> > > >
> > > > The operator request/response protocol could be used the relay the
> data
> > > for
> > > > the scheduling decisions.
> > > >
> > > > Thomas
> > > >
> > > >
> > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
> > > > chandni.si...@capitalone.com> wrote:
> > > >
> > > > > Hi Tushar,
> > > > >
> > > > > I have some questions about the use case 2: Batch Support
> > > > > I don¹t understand the advantages of providing batch support by
> > having
> > > an
> > > > > operator as a scheduler.
> > > > >
> > > > > An approach that seemed a little more straightforward to me was to
> > > expose
> > > > > an API for scheduler. If there is a scheduler set then the master
> > uses
> > > > and
> > > > > schedules operators. By default there isn¹t any scheduler and the
> job
> > > is
> > > > > run as it is now.
> > > > >
> > > > > Maybe this is too simplistic but can you please let me know why
> > having
> > > an
> > > > > operator as a scheduler is a better way?
> > > > >
> > > > > Thanks,
> > > > > Chandni
> > > > >
> > > > >
> > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" <tus...@datatorrent.com>
> > wrote:
> > > > >
> > > > > >Hi All,
> > > > > >
> > > > > >We have seen few use cases in field which require Apex application
> > > > > >scheduling based on some condition. This has also came up as part
> of
> > > > > >Batch Support in Apex previously
> > > > > >(
> > > > >
> > > >
> > >
> >
> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
> > > > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%
> 40mail.gmail.com
> > > %3E)
> > > > > >. I am proposing following functionality in Apex to help
> scheduling
> > > > > >and better resource utilization for batch jobs. Please provide
> your
> > > > > >comments.
> > > > > >
> > > > > >Usecase 1 - Dynamic Dag modification.
> > > > > >
> > > > > >Each operator in DAG consumes yarn resources, sometimes it is
> > > > > >desirable to return the resources to yarn when no data is
> available
> > > > > >for processing, and deploy whole DAG once data starts to appear.
> For
> > > > > >this to happen automatically, we will need some data monitoring
> > > > > >operators running in the DAG to trigger restart and shutdown of
> the
> > > > > >operators in the DAG.
> > > > > >
> > > > > >Apex already have such api to dynamically change the running dag
> > > > > >through cli. We could provide similar API available to operators
> > which
> > > > > >will trigger dag modification at runtime. This information can be
> > > > > >passed to master using heartbeat RPC and master will make
> > > > > >required changed to the DAG. let me know what do you think about
> > it..
> > > > > >something like below.
> > > > > >Context.beginDagChange();
> > > > > >context.addOperator("o1") <== launch operator from previous
> > > > check-pointed
> > > > > >state.
> > > > > >context.addOperator("o2", new Operator2()) <== create new operator
> > > > > >context.addStream("s1", "reader.output", "o1.input");
> > > > > >context.shutdown("o3"); <== delete this and downstream operators
> > from
> > > > the
> > > > > >DAG.
> > > > > >context.apply();  <== dag changes will be send to master, and
> master
> > > > > >will apply these changes.
> > > > > >
> > > > > >Similarly API for other functionalities such as locality settings
> > > > > >needs to be provided.
> > > > > >
> > > > > >
> > > > > >Usecase 2 - Classic Batch Scheduling.
> > > > > >
> > > > > >Provide an API callable from operator to launch a DAG. The
> operator
> > > > > >will prepare an dag object and submit it to the yarn, the DAG will
> > be
> > > > > >scheduled as a new application. This way complex schedulers can be
> > > > > >written as operators.
> > > > > >
> > > > > >public SchedulerOperator implements Operator {
> > > > > >   void handleIdleTime() {
> > > > > >      // check of conditions to start a job (for example enough
> > files
> > > > > >available, enough items are available in kafa, or time has reached
> > > > > >     Dag dag = context.createDAG();
> > > > > >     dag.addOperator();
> > > > > >     dag.addOperator();
> > > > > >     LaunchOptions lOptions = new LaunchOptions();
> > > > > >     lOptions.oldId = ""; // start for this checkpoint.
> > > > > >     DagHandler dagHandler = context.submit(dag, lOptions);
> > > > > >   }
> > > > > >}
> > > > > >
> > > > > >DagHandler will have methods to monitor the final state of
> > > > > >application, or to kill the DAG
> > > > > >dagHandler.waitForCompletion() <== wait till the DAG terminates
> > > > > >dagHandler.status()  <== get the status of application.
> > > > > >dagHandler.kill() <== kill the running application.
> > > > > >dagHandler.shutdown() <== shutdown the application.
> > > > > >
> > > > > >The more complex Scheduler operators could be written to manage
> the
> > > > > >workflows, i.e DAG of DAGs. using these APIs.
> > > > > >
> > > > > >Regards,
> > > > > >-Tushar.
> > > > >
> > > > > ________________________________________________________
> > > > >
> > > > > The information contained in this e-mail is confidential and/or
> > > > > proprietary to Capital One and/or its affiliates and may only be
> used
> > > > > solely in performance of work or services for Capital One. The
> > > > information
> > > > > transmitted herewith is intended only for use by the individual or
> > > entity
> > > > > to which it is addressed. If the reader of this message is not the
> > > > intended
> > > > > recipient, you are hereby notified that any review, retransmission,
> > > > > dissemination, distribution, copying or other use of, or taking of
> > any
> > > > > action in reliance upon this information is strictly prohibited. If
> > you
> > > > > have received this communication in error, please contact the
> sender
> > > and
> > > > > delete the material from your computer.
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to