Hi All, Resuming the discussion.
After some discussion, I have created a document which captures the requirements and a high level design for supporting batch applications. The document consolidates different threads of discussion and aspects which are relevant to batch support. This is in no way a design document, but just captures the high level steps. I have tried to keep it very brief and to the point. I will keep refining the document depending on the comments to ultimately convert it to a design document. Here is the link to the document: https://docs.google.com/document/d/1qlyQJP80dOlWZeHwICMFA3D3jGG_T2NLhMfzScbuTwQ/edit?usp=sharing Please provide your valuable feedback. ~ Bhupesh On Tue, Feb 23, 2016 at 7:24 AM, David Yan <da...@datatorrent.com> wrote: > For batch applications without checkpointing or iteration loops, what would > be the significance of streaming windows and application windows? > > > On Sun, Feb 14, 2016 at 10:33 PM, Thomas Weise <tho...@datatorrent.com> > wrote: > > > Time to resume this discussion. I think it makes sense to look at the > batch > > as execution of a DAG from setup to teardown for all its operators, as > > suggested by Bhupesh and Sandeep. The DAG comes into existence when the > > batch begins and terminates when it is done. > > > > We have also seen from customers that there is demand for having the > > scheduler function built in, when there is no external component already > > present. For example, a file or set of files could be identified as > > "batch". As the application is idle, there is only a scheduler operator > > which polls for files. Once work is ready, that operator would launch the > > DAG for processing (within same application, but not connected through > > stream). When processing is complete, that DAG terminates and returns the > > resources. > > > > As discussed, there is the need to be able to turn off checkpointing, > which > > is different from setting a large checkpoint window. No checkpointing > means > > no incremental recovery and hence no need to keep data in buffers. > > > > There is also the need to relay begin/end signal through the entire DAG. > > This is different from setup/shutdown. It is more like begin/endWindow, > but > > there is only a single "window" in a batch. > > > > > > On Mon, Dec 28, 2015 at 10:36 PM, Chinmay Kolhatkar < > > chin...@datatorrent.com > > > wrote: > > > > > Hi Thomas, > > > > > > A comment on following in your previous mails: > > > > > > > > > > > > *An operator that identifies the batch boundary tells theengine about > it > > > and corresponding control tuples are submitted through thestream, > leading > > > to callbacks on downstream operators* > > > > > > This would mean there will be a single boundary definition of a batch > in > > > the application DAG. > > > I think we should give freedom to individual operator to define what a > > > batch is and produce a callbacks accordingly. > > > > > > Considering that in mind, here is a quick sketch/suggestion of how it > can > > > be done: > > > > > > 1) The operator that needs to work on a batch can implement an > interface, > > > lets say BatchListener. > > > > > > 2) This will have 4 methods: > > > * startBatch* > > > * endBatch* > > > * configureBatch* > > > * callAtApplicationWindowBoundary *(maybe some better name??) > > > > > > 3) *configureBatch* will tell what tell what is the boundary of a > batch. > > > This will be called right after setup OR activate, basically before > > > beginning of the stream. The return value will be set with operator > > thread. > > > > > > 4) Based on configuration, the *startBatch* and *endBatch* will be > > called. > > > > > > 5) the *callAtApplicationWindowBoundary* should return *true/false* > based > > > on whether start/end batch calls should happen at application window > > > boundary OR not. Here is where user can choose to take care of > > > checkpointing of tuples within a windows by platform OR whether user > > wants > > > to do that of his own. > > > > > > > > > Thoughts? > > > > > > > > > -Chinmay. > > > > > > > > > ~ Chinmay. > > > > > > On Tue, Dec 29, 2015 at 11:35 AM, Thomas Weise <tho...@datatorrent.com > > > > > wrote: > > > > > > > On Mon, Dec 28, 2015 at 7:01 AM, Sandeep Deshmukh < > > > sand...@datatorrent.com > > > > > > > > > wrote: > > > > > > > > > +1 for batch support in Apex. I would be interested to be part of > > this > > > > > work. > > > > > > > > > > I would like to start with basics and would like to know how one > will > > > > > define "batch" in Apex context. Which of the following cases would > be > > > > > supported under batch: > > > > > > > > > > 1. A program completes a task and auto shutdown itself once the > > task > > > > is > > > > > complete. E.g. the program needs to copy a set of files from > > source > > > > to > > > > > destination. > > > > > 2. A program completes a task and then waits for pre-defined > time > > to > > > > > poll for something more to work on. E.g. the program copies all > > the > > > > > files > > > > > from source location and then periodically checks, say every 1 > > hour, > > > > if > > > > > there are new files at the source and copies them. > > > > > 3. A program completes a task and then polls every 1 hr as in > > case 2 > > > > but > > > > > releases resources during wait time. > > > > > > > > > > > > > > > > > > Yes, both, 1. and 2. are valid use cases. I would not make a further > > > > distinction between 2. and 3. at this point. > > > > > > > > Ability to run an application that expands and shrinks as self > > contained > > > > unit can be a benefit, as otherwise you need an external scheduler > just > > > to > > > > launch jobs (such as Oozie). The associated extra integration work > may > > be > > > > brittle and an unwanted barrier for certain use cases. > > > > > > > > > > > > > > > > > Needs for each of the above will vary. I am putting down some basic > > > > > requirements for each of them > > > > > > > > > > 1. This case will need a mechanism to shutdown automatically on > > > > completion > > > > > of the task. > > > > > > > > > > StartProgram() > > > > > StartBatch() > > > > > Streaming Application starts, runs and finishes > > > > > EndBatch() > > > > > EndProgram() > > > > > > > > > > 2. This will simply need a construct to wait for some time ( say 10 > > > > > minutes) or till some time ( till 1pm) . > > > > > > > > > > StartProgram() > > > > > while(true) > > > > > { > > > > > StartBatch() > > > > > Streaming Application starts, runs and finishes > > > > > EndBatch() > > > > > WaitTill(time) or WaitFor(timeperiod) > > > > > } > > > > > EndProgram() > > > > > > > > > > 3. Apart from wait construct, we also need release resources > support > > > > > > > > > > StartProgram() > > > > > while(true) > > > > > { > > > > > RestartFromSavedState() // if any state is saved previously. > > > > > StartBatch() > > > > > Streaming Application starts, runs and finishes > > > > > EndBatch() > > > > > SaveState() > > > > > RelaseResources() > > > > > WaitTill(time) or WaitFor(timeperiod) > > > > > } > > > > > EndProgram() > > > > > > > > > > > > > > > All the constructs : waitTime(), RestartFromSavedState(), > SaveState() > > > > > , RelaseResources() > > > > > could be very well be part of StartBatch() or EndBatch(). I have > put > > > them > > > > > separately for clear understanding only. > > > > > > > > > > Another point to think on would be scheduler. A batch job is > > generally > > > > > triggered as a cron job. Do we still see Apex jobs being triggered > by > > > > cron > > > > > or would like to include a scheduler within Apex that will trigger > > jobs > > > > > based on time or on some external trigger or even polling for > events. > > > > > > > > > > Regards > > > > > Sandeep > > > > > > > > > > On Mon, Dec 28, 2015 at 5:11 PM, Bhupesh Chawda < > > > bhup...@datatorrent.com > > > > > > > > > > wrote: > > > > > > > > > > > +1 > > > > > > > > > > > > I think in the batch case, application windows may be transparent > > to > > > > the > > > > > > user application / operator logic. A batch can be thought of as > > one > > > > > > instantiation of a Apex Dag, from setup() to teardown() for all > > > > > operators. > > > > > > May be we need to define a higher level API which encapsulates a > > > > > streaming > > > > > > application. > > > > > > Something like: > > > > > > > > > > > > StartBatch() > > > > > > Streaming Application starts, runs and finishes > > > > > > EndBatch() > > > > > > > > > > > > The streaming application will run transparently with all the > > > > windowing / > > > > > > checkpointing logic that it currently does. Checkpointing large > > > amounts > > > > > of > > > > > > data may be avoided by either checkpointing at large intervals or > > > even > > > > > > disabling checkpointing for the batch job. > > > > > > Additionally, the external trigger (existence of some file etc. ) > > can > > > > be > > > > > > controlled by the StartBatch() and EndBatch() calls. In all the > > batch > > > > use > > > > > > cases, it is usually the case that once the input is processed > > > > > completely, > > > > > > the batch is done. Example: In map reduce all splits processed > > means > > > > > batch > > > > > > job is done. Similar primitives can be supported by Apex in order > > to > > > > > > facilitate the control management in the StartBatch() and > > EndBatch() > > > > > > methods. > > > > > > > > > > > > -Bhupesh > > > > > > > > > > > > On Mon, Dec 28, 2015 at 1:34 PM, Thomas Weise < > > > tho...@datatorrent.com> > > > > > > wrote: > > > > > > > > > > > > > Following JIRA is open to enhance the support for batch: > > > > > > > > > > > > > > https://issues.apache.org/jira/browse/APEXCORE-235 > > > > > > > > > > > > > > One of the challenges with batch on Apex today is that there > > isn't > > > > any > > > > > > > native support to identify begin/end of batch and associate > > actions > > > > to > > > > > > it. > > > > > > > For example, at the beginning we may want to fetch some data > > needed > > > > for > > > > > > all > > > > > > > subsequent processing and at the end perform some finalization > > > action > > > > > or > > > > > > > push to external system (add partition to Hive table or > similar). > > > > > > > > > > > > > > Absent native support, the workaround is to add a bunch of > ports > > > and > > > > > > extra > > > > > > > operators for propagation and synchronization purposes, which > > makes > > > > > > > building the batch application with standard operators or > > > development > > > > > of > > > > > > > custom operators rather difficult and inefficient. > > > > > > > > > > > > > > The span of a batch can also be seen as a user defined window, > > with > > > > > logic > > > > > > > for begin and end. The current "application window" support is > > > > limited > > > > > > to a > > > > > > > multiple of streaming window on a per operator basis. In the > > batch > > > > > case, > > > > > > > the boundary needs to be more flexible - user code needs to be > > able > > > > to > > > > > > > determine begin/endWindow based on external data (existence of > > > files > > > > > > etc.). > > > > > > > > > > > > > > There is another commonality with application window, and > that's > > > > > > alignment > > > > > > > of checkpointing. For batches where it is more efficient to > redo > > > the > > > > > > > processing instead of checkpointing potentially large amounts > of > > > > > > > intermediate state for incremental recovery, it would be nice > to > > be > > > > > able > > > > > > to > > > > > > > say "user window == checkpoint interval". > > > > > > > > > > > > > > This is to float the idea of having a window control that can > be > > > > > > influenced > > > > > > > by user code. An operator that identifies the batch boundary > > tells > > > > the > > > > > > > engine about it and corresponding control tuples are submitted > > > > through > > > > > > the > > > > > > > stream, leading to callbacks on downstream operators. These > > control > > > > > > > tuples should > > > > > > > be able to carry contextual information that can be used in > > > > downstream > > > > > > > operator logic (file names, schema information etc.) > > > > > > > > > > > > > > I don't expect the current beginWindow/endWindow can be > augmented > > > in > > > > a > > > > > > > backward compatible way to accommodate this, but a similar > > optional > > > > > > > interface could be supported to enable batch aware operators > and > > > > > > > checkpointing optimization. > > > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > > > > > > > > > > > > > > >