Dear Thomas, +1
This will help a lot in an operational scenario where data is not in motion all the time. In addition to what you have mentioned, we would need monitoring capabilities such as the execution of a batch (scheduled DAG) and the next batch picking up the processing from the previous state. This may require meta-data to flow with the tuples to monitor a batch. Regards, Mohit On Mon, Feb 15, 2016 at 12:03 PM, Thomas Weise <[email protected]> wrote: > Time to resume this discussion. I think it makes sense to look at the batch > as execution of a DAG from setup to teardown for all its operators, as > suggested by Bhupesh and Sandeep. The DAG comes into existence when the > batch begins and terminates when it is done. > > We have also seen from customers that there is demand for having the > scheduler function built in, when there is no external component already > present. For example, a file or set of files could be identified as > "batch". As the application is idle, there is only a scheduler operator > which polls for files. Once work is ready, that operator would launch the > DAG for processing (within same application, but not connected through > stream). When processing is complete, that DAG terminates and returns the > resources. > > As discussed, there is the need to be able to turn off checkpointing, which > is different from setting a large checkpoint window. No checkpointing means > no incremental recovery and hence no need to keep data in buffers. > > There is also the need to relay begin/end signal through the entire DAG. > This is different from setup/shutdown. It is more like begin/endWindow, but > there is only a single "window" in a batch. > > > On Mon, Dec 28, 2015 at 10:36 PM, Chinmay Kolhatkar < > [email protected] > > wrote: > > > Hi Thomas, > > > > A comment on following in your previous mails: > > > > > > > > *An operator that identifies the batch boundary tells theengine about it > > and corresponding control tuples are submitted through thestream, leading > > to callbacks on downstream operators* > > > > This would mean there will be a single boundary definition of a batch in > > the application DAG. > > I think we should give freedom to individual operator to define what a > > batch is and produce a callbacks accordingly. > > > > Considering that in mind, here is a quick sketch/suggestion of how it can > > be done: > > > > 1) The operator that needs to work on a batch can implement an interface, > > lets say BatchListener. > > > > 2) This will have 4 methods: > > * startBatch* > > * endBatch* > > * configureBatch* > > * callAtApplicationWindowBoundary *(maybe some better name??) > > > > 3) *configureBatch* will tell what tell what is the boundary of a batch. > > This will be called right after setup OR activate, basically before > > beginning of the stream. The return value will be set with operator > thread. > > > > 4) Based on configuration, the *startBatch* and *endBatch* will be > called. > > > > 5) the *callAtApplicationWindowBoundary* should return *true/false* based > > on whether start/end batch calls should happen at application window > > boundary OR not. Here is where user can choose to take care of > > checkpointing of tuples within a windows by platform OR whether user > wants > > to do that of his own. > > > > > > Thoughts? > > > > > > -Chinmay. > > > > > > ~ Chinmay. > > > > On Tue, Dec 29, 2015 at 11:35 AM, Thomas Weise <[email protected]> > > wrote: > > > > > On Mon, Dec 28, 2015 at 7:01 AM, Sandeep Deshmukh < > > [email protected] > > > > > > > wrote: > > > > > > > +1 for batch support in Apex. I would be interested to be part of > this > > > > work. > > > > > > > > I would like to start with basics and would like to know how one will > > > > define "batch" in Apex context. Which of the following cases would be > > > > supported under batch: > > > > > > > > 1. A program completes a task and auto shutdown itself once the > task > > > is > > > > complete. E.g. the program needs to copy a set of files from > source > > > to > > > > destination. > > > > 2. A program completes a task and then waits for pre-defined time > to > > > > poll for something more to work on. E.g. the program copies all > the > > > > files > > > > from source location and then periodically checks, say every 1 > hour, > > > if > > > > there are new files at the source and copies them. > > > > 3. A program completes a task and then polls every 1 hr as in > case 2 > > > but > > > > releases resources during wait time. > > > > > > > > > > > > > > Yes, both, 1. and 2. are valid use cases. I would not make a further > > > distinction between 2. and 3. at this point. > > > > > > Ability to run an application that expands and shrinks as self > contained > > > unit can be a benefit, as otherwise you need an external scheduler just > > to > > > launch jobs (such as Oozie). The associated extra integration work may > be > > > brittle and an unwanted barrier for certain use cases. > > > > > > > > > > > > > Needs for each of the above will vary. I am putting down some basic > > > > requirements for each of them > > > > > > > > 1. This case will need a mechanism to shutdown automatically on > > > completion > > > > of the task. > > > > > > > > StartProgram() > > > > StartBatch() > > > > Streaming Application starts, runs and finishes > > > > EndBatch() > > > > EndProgram() > > > > > > > > 2. This will simply need a construct to wait for some time ( say 10 > > > > minutes) or till some time ( till 1pm) . > > > > > > > > StartProgram() > > > > while(true) > > > > { > > > > StartBatch() > > > > Streaming Application starts, runs and finishes > > > > EndBatch() > > > > WaitTill(time) or WaitFor(timeperiod) > > > > } > > > > EndProgram() > > > > > > > > 3. Apart from wait construct, we also need release resources support > > > > > > > > StartProgram() > > > > while(true) > > > > { > > > > RestartFromSavedState() // if any state is saved previously. > > > > StartBatch() > > > > Streaming Application starts, runs and finishes > > > > EndBatch() > > > > SaveState() > > > > RelaseResources() > > > > WaitTill(time) or WaitFor(timeperiod) > > > > } > > > > EndProgram() > > > > > > > > > > > > All the constructs : waitTime(), RestartFromSavedState(), SaveState() > > > > , RelaseResources() > > > > could be very well be part of StartBatch() or EndBatch(). I have put > > them > > > > separately for clear understanding only. > > > > > > > > Another point to think on would be scheduler. A batch job is > generally > > > > triggered as a cron job. Do we still see Apex jobs being triggered by > > > cron > > > > or would like to include a scheduler within Apex that will trigger > jobs > > > > based on time or on some external trigger or even polling for events. > > > > > > > > Regards > > > > Sandeep > > > > > > > > On Mon, Dec 28, 2015 at 5:11 PM, Bhupesh Chawda < > > [email protected] > > > > > > > > wrote: > > > > > > > > > +1 > > > > > > > > > > I think in the batch case, application windows may be transparent > to > > > the > > > > > user application / operator logic. A batch can be thought of as > one > > > > > instantiation of a Apex Dag, from setup() to teardown() for all > > > > operators. > > > > > May be we need to define a higher level API which encapsulates a > > > > streaming > > > > > application. > > > > > Something like: > > > > > > > > > > StartBatch() > > > > > Streaming Application starts, runs and finishes > > > > > EndBatch() > > > > > > > > > > The streaming application will run transparently with all the > > > windowing / > > > > > checkpointing logic that it currently does. Checkpointing large > > amounts > > > > of > > > > > data may be avoided by either checkpointing at large intervals or > > even > > > > > disabling checkpointing for the batch job. > > > > > Additionally, the external trigger (existence of some file etc. ) > can > > > be > > > > > controlled by the StartBatch() and EndBatch() calls. In all the > batch > > > use > > > > > cases, it is usually the case that once the input is processed > > > > completely, > > > > > the batch is done. Example: In map reduce all splits processed > means > > > > batch > > > > > job is done. Similar primitives can be supported by Apex in order > to > > > > > facilitate the control management in the StartBatch() and > EndBatch() > > > > > methods. > > > > > > > > > > -Bhupesh > > > > > > > > > > On Mon, Dec 28, 2015 at 1:34 PM, Thomas Weise < > > [email protected]> > > > > > wrote: > > > > > > > > > > > Following JIRA is open to enhance the support for batch: > > > > > > > > > > > > https://issues.apache.org/jira/browse/APEXCORE-235 > > > > > > > > > > > > One of the challenges with batch on Apex today is that there > isn't > > > any > > > > > > native support to identify begin/end of batch and associate > actions > > > to > > > > > it. > > > > > > For example, at the beginning we may want to fetch some data > needed > > > for > > > > > all > > > > > > subsequent processing and at the end perform some finalization > > action > > > > or > > > > > > push to external system (add partition to Hive table or similar). > > > > > > > > > > > > Absent native support, the workaround is to add a bunch of ports > > and > > > > > extra > > > > > > operators for propagation and synchronization purposes, which > makes > > > > > > building the batch application with standard operators or > > development > > > > of > > > > > > custom operators rather difficult and inefficient. > > > > > > > > > > > > The span of a batch can also be seen as a user defined window, > with > > > > logic > > > > > > for begin and end. The current "application window" support is > > > limited > > > > > to a > > > > > > multiple of streaming window on a per operator basis. In the > batch > > > > case, > > > > > > the boundary needs to be more flexible - user code needs to be > able > > > to > > > > > > determine begin/endWindow based on external data (existence of > > files > > > > > etc.). > > > > > > > > > > > > There is another commonality with application window, and that's > > > > > alignment > > > > > > of checkpointing. For batches where it is more efficient to redo > > the > > > > > > processing instead of checkpointing potentially large amounts of > > > > > > intermediate state for incremental recovery, it would be nice to > be > > > > able > > > > > to > > > > > > say "user window == checkpoint interval". > > > > > > > > > > > > This is to float the idea of having a window control that can be > > > > > influenced > > > > > > by user code. An operator that identifies the batch boundary > tells > > > the > > > > > > engine about it and corresponding control tuples are submitted > > > through > > > > > the > > > > > > stream, leading to callbacks on downstream operators. These > control > > > > > > tuples should > > > > > > be able to carry contextual information that can be used in > > > downstream > > > > > > operator logic (file names, schema information etc.) > > > > > > > > > > > > I don't expect the current beginWindow/endWindow can be augmented > > in > > > a > > > > > > backward compatible way to accommodate this, but a similar > optional > > > > > > interface could be supported to enable batch aware operators and > > > > > > checkpointing optimization. > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > > > > > > > > > >
