Hi All,

We have seen few use cases in field which require Apex application
scheduling based on some condition. This has also came up as part of
Batch Support in Apex previously
(http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDPXNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E)
. I am proposing following functionality in Apex to help scheduling
and better resource utilization for batch jobs. Please provide your
comments.

Usecase 1 - Dynamic Dag modification.

Each operator in DAG consumes yarn resources, sometimes it is
desirable to return the resources to yarn when no data is available
for processing, and deploy whole DAG once data starts to appear. For
this to happen automatically, we will need some data monitoring
operators running in the DAG to trigger restart and shutdown of the
operators in the DAG.

Apex already have such api to dynamically change the running dag
through cli. We could provide similar API available to operators which
will trigger dag modification at runtime. This information can be
passed to master using heartbeat RPC and master will make
required changed to the DAG. let me know what do you think about it..
something like below.
Context.beginDagChange();
context.addOperator("o1") <== launch operator from previous check-pointed state.
context.addOperator("o2", new Operator2()) <== create new operator
context.addStream("s1", "reader.output", "o1.input");
context.shutdown("o3"); <== delete this and downstream operators from the DAG.
context.apply();  <== dag changes will be send to master, and master
will apply these changes.

Similarly API for other functionalities such as locality settings
needs to be provided.


Usecase 2 - Classic Batch Scheduling.

Provide an API callable from operator to launch a DAG. The operator
will prepare an dag object and submit it to the yarn, the DAG will be
scheduled as a new application. This way complex schedulers can be
written as operators.

public SchedulerOperator implements Operator {
   void handleIdleTime() {
      // check of conditions to start a job (for example enough files
available, enough items are available in kafa, or time has reached
     Dag dag = context.createDAG();
     dag.addOperator();
     dag.addOperator();
     LaunchOptions lOptions = new LaunchOptions();
     lOptions.oldId = ""; // start for this checkpoint.
     DagHandler dagHandler = context.submit(dag, lOptions);
   }
}

DagHandler will have methods to monitor the final state of
application, or to kill the DAG
dagHandler.waitForCompletion() <== wait till the DAG terminates
dagHandler.status()  <== get the status of application.
dagHandler.kill() <== kill the running application.
dagHandler.shutdown() <== shutdown the application.

The more complex Scheduler operators could be written to manage the
workflows, i.e DAG of DAGs. using these APIs.

Regards,
-Tushar.

Reply via email to