For the usecase 1, is it possible to avoid changing the Context? Can we
have something along the lines of "StramToNodeRequest" ?

On Tue, Jun 21, 2016 at 11:09 AM Tushar Gosavi <tus...@datatorrent.com>
wrote:

> Hi All,
>
> We have seen few use cases in field which require Apex application
> scheduling based on some condition. This has also came up as part of
> Batch Support in Apex previously
> (
> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDPXNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E
> )
> . I am proposing following functionality in Apex to help scheduling
> and better resource utilization for batch jobs. Please provide your
> comments.
>
> Usecase 1 - Dynamic Dag modification.
>
> Each operator in DAG consumes yarn resources, sometimes it is
> desirable to return the resources to yarn when no data is available
> for processing, and deploy whole DAG once data starts to appear. For
> this to happen automatically, we will need some data monitoring
> operators running in the DAG to trigger restart and shutdown of the
> operators in the DAG.
>
> Apex already have such api to dynamically change the running dag
> through cli. We could provide similar API available to operators which
> will trigger dag modification at runtime. This information can be
> passed to master using heartbeat RPC and master will make
> required changed to the DAG. let me know what do you think about it..
> something like below.
> Context.beginDagChange();
> context.addOperator("o1") <== launch operator from previous check-pointed
> state.
> context.addOperator("o2", new Operator2()) <== create new operator
> context.addStream("s1", "reader.output", "o1.input");
> context.shutdown("o3"); <== delete this and downstream operators from the
> DAG.
> context.apply();  <== dag changes will be send to master, and master
> will apply these changes.
>
> Similarly API for other functionalities such as locality settings
> needs to be provided.
>
>
> Usecase 2 - Classic Batch Scheduling.
>
> Provide an API callable from operator to launch a DAG. The operator
> will prepare an dag object and submit it to the yarn, the DAG will be
> scheduled as a new application. This way complex schedulers can be
> written as operators.
>
> public SchedulerOperator implements Operator {
>    void handleIdleTime() {
>       // check of conditions to start a job (for example enough files
> available, enough items are available in kafa, or time has reached
>      Dag dag = context.createDAG();
>      dag.addOperator();
>      dag.addOperator();
>      LaunchOptions lOptions = new LaunchOptions();
>      lOptions.oldId = ""; // start for this checkpoint.
>      DagHandler dagHandler = context.submit(dag, lOptions);
>    }
> }
>
> DagHandler will have methods to monitor the final state of
> application, or to kill the DAG
> dagHandler.waitForCompletion() <== wait till the DAG terminates
> dagHandler.status()  <== get the status of application.
> dagHandler.kill() <== kill the running application.
> dagHandler.shutdown() <== shutdown the application.
>
> The more complex Scheduler operators could be written to manage the
> workflows, i.e DAG of DAGs. using these APIs.
>
> Regards,
> -Tushar.
>

Reply via email to