Hi All,

Resuming the discussion.

After some discussion, I have created a document which captures the
requirements and a high level design for supporting batch applications. The
document consolidates different threads of discussion and aspects which are
relevant to batch support.

This is in no way a design document, but just captures the high level
steps. I have tried to keep it very brief and to the point. I will keep
refining the document depending on the comments to ultimately convert it to
a design document.

Here is the link to the document:
https://docs.google.com/document/d/1qlyQJP80dOlWZeHwICMFA3D3jGG_T2NLhMfzScbuTwQ/edit?usp=sharing

Please provide your valuable feedback.

~ Bhupesh

On Tue, Feb 23, 2016 at 7:24 AM, David Yan <da...@datatorrent.com> wrote:

> For batch applications without checkpointing or iteration loops, what would
> be the significance of streaming windows and application windows?
>
>
> On Sun, Feb 14, 2016 at 10:33 PM, Thomas Weise <tho...@datatorrent.com>
> wrote:
>
> > Time to resume this discussion. I think it makes sense to look at the
> batch
> > as execution of a DAG from setup to teardown for all its operators, as
> > suggested by Bhupesh and Sandeep. The DAG comes into existence when the
> > batch begins and terminates when it is done.
> >
> > We have also seen from customers that there is demand for having the
> > scheduler function built in, when there is no external component already
> > present. For example, a file or set of files could be identified as
> > "batch". As the application is idle, there is only a scheduler operator
> > which polls for files. Once work is ready, that operator would launch the
> > DAG for processing (within same application, but not connected through
> > stream). When processing is complete, that DAG terminates and returns the
> > resources.
> >
> > As discussed, there is the need to be able to turn off checkpointing,
> which
> > is different from setting a large checkpoint window. No checkpointing
> means
> > no incremental recovery and hence no need to keep data in buffers.
> >
> > There is also the need to relay begin/end signal through the entire DAG.
> > This is different from setup/shutdown. It is more like begin/endWindow,
> but
> > there is only a single "window" in a batch.
> >
> >
> > On Mon, Dec 28, 2015 at 10:36 PM, Chinmay Kolhatkar <
> > chin...@datatorrent.com
> > > wrote:
> >
> > > Hi Thomas,
> > >
> > > A comment on following in your previous mails:
> > >
> > >
> > >
> > > *An operator that identifies the batch boundary tells theengine about
> it
> > > and corresponding control tuples are submitted through thestream,
> leading
> > > to callbacks on downstream operators*
> > >
> > > This would mean there will be a single boundary definition of a batch
> in
> > > the application DAG.
> > > I think we should give freedom to individual operator to define what a
> > > batch is and produce a callbacks accordingly.
> > >
> > > Considering that in mind, here is a quick sketch/suggestion of how it
> can
> > > be done:
> > >
> > > 1) The operator that needs to work on a batch can implement an
> interface,
> > > lets say BatchListener.
> > >
> > > 2) This will have 4 methods:
> > > *    startBatch*
> > > *    endBatch*
> > > *    configureBatch*
> > > *    callAtApplicationWindowBoundary *(maybe some better name??)
> > >
> > > 3) *configureBatch* will tell what tell what is the boundary of a
> batch.
> > > This will be called right after setup OR activate, basically before
> > > beginning of the stream. The return value will be set with operator
> > thread.
> > >
> > > 4) Based on configuration, the *startBatch* and *endBatch* will be
> > called.
> > >
> > > 5) the *callAtApplicationWindowBoundary* should return *true/false*
> based
> > > on whether start/end batch calls should happen at application window
> > > boundary OR not. Here is where user can choose to take care of
> > > checkpointing of tuples within a windows by platform OR whether user
> > wants
> > > to do that of his own.
> > >
> > >
> > > Thoughts?
> > >
> > >
> > > -Chinmay.
> > >
> > >
> > > ~ Chinmay.
> > >
> > > On Tue, Dec 29, 2015 at 11:35 AM, Thomas Weise <tho...@datatorrent.com
> >
> > > wrote:
> > >
> > > > On Mon, Dec 28, 2015 at 7:01 AM, Sandeep Deshmukh <
> > > sand...@datatorrent.com
> > > > >
> > > > wrote:
> > > >
> > > > > +1 for batch support in Apex. I would be interested to be part of
> > this
> > > > > work.
> > > > >
> > > > > I would like to start with basics and would like to know how one
> will
> > > > > define "batch" in Apex context. Which of the following cases would
> be
> > > > > supported under batch:
> > > > >
> > > > >    1. A program completes a task and auto shutdown itself once the
> > task
> > > > is
> > > > >    complete. E.g.  the program needs to copy a set of files from
> > source
> > > > to
> > > > >    destination.
> > > > >    2. A program completes a task and then waits for pre-defined
> time
> > to
> > > > >    poll for something more to work on. E.g. the program copies all
> > the
> > > > > files
> > > > >    from source location and then periodically checks, say every 1
> > hour,
> > > > if
> > > > >    there are new files at the source and copies them.
> > > > >    3. A program completes a task and then polls every 1 hr as in
> > case 2
> > > > but
> > > > >    releases resources during wait time.
> > > > >
> > > > >
> > > >
> > > > Yes, both, 1. and 2. are valid use cases. I would not make a further
> > > > distinction between 2. and 3. at this point.
> > > >
> > > > Ability to run an application that expands and shrinks as self
> > contained
> > > > unit can be a benefit, as otherwise you need an external scheduler
> just
> > > to
> > > > launch jobs (such as Oozie). The associated extra integration work
> may
> > be
> > > > brittle and an unwanted barrier for certain use cases.
> > > >
> > > >
> > > >
> > > > > Needs for each of the above will vary. I am putting down some basic
> > > > > requirements for each of them
> > > > >
> > > > > 1. This case will need a mechanism to shutdown automatically on
> > > > completion
> > > > > of the task.
> > > > >
> > > > > StartProgram()
> > > > >     StartBatch()
> > > > >         Streaming Application starts, runs and finishes
> > > > >     EndBatch()
> > > > > EndProgram()
> > > > >
> > > > > 2. This will simply need a construct to wait for some time ( say 10
> > > > > minutes) or till some time ( till 1pm) .
> > > > >
> > > > > StartProgram()
> > > > > while(true)
> > > > > {
> > > > >     StartBatch()
> > > > >         Streaming Application starts, runs and finishes
> > > > >     EndBatch()
> > > > >     WaitTill(time) or WaitFor(timeperiod)
> > > > > }
> > > > > EndProgram()
> > > > >
> > > > > 3. Apart from wait construct, we also need release resources
> support
> > > > >
> > > > > StartProgram()
> > > > > while(true)
> > > > > {
> > > > >     RestartFromSavedState() // if any state is saved previously.
> > > > >     StartBatch()
> > > > >         Streaming Application starts, runs and finishes
> > > > >     EndBatch()
> > > > >     SaveState()
> > > > >     RelaseResources()
> > > > >     WaitTill(time) or WaitFor(timeperiod)
> > > > > }
> > > > > EndProgram()
> > > > >
> > > > >
> > > > > All the constructs : waitTime(), RestartFromSavedState(),
> SaveState()
> > > > > , RelaseResources()
> > > > > could be very well be part of StartBatch() or EndBatch(). I have
> put
> > > them
> > > > > separately for clear understanding only.
> > > > >
> > > > > Another point to think on would be scheduler. A batch job is
> > generally
> > > > > triggered as a cron job. Do we still see Apex jobs being triggered
> by
> > > > cron
> > > > > or would like to include a scheduler within Apex that will trigger
> > jobs
> > > > > based on time or on some external trigger or even polling for
> events.
> > > > >
> > > > > Regards
> > > > > Sandeep
> > > > >
> > > > > On Mon, Dec 28, 2015 at 5:11 PM, Bhupesh Chawda <
> > > bhup...@datatorrent.com
> > > > >
> > > > > wrote:
> > > > >
> > > > > > +1
> > > > > >
> > > > > > I think in the batch case, application windows may be transparent
> > to
> > > > the
> > > > > > user application / operator logic.  A batch can be thought of as
> > one
> > > > > > instantiation of a Apex Dag, from setup() to teardown() for all
> > > > > operators.
> > > > > > May be we need to define a higher level API which encapsulates a
> > > > > streaming
> > > > > > application.
> > > > > > Something like:
> > > > > >
> > > > > > StartBatch()
> > > > > >   Streaming Application starts, runs and finishes
> > > > > > EndBatch()
> > > > > >
> > > > > > The streaming application will run transparently with all the
> > > > windowing /
> > > > > > checkpointing logic that it currently does. Checkpointing large
> > > amounts
> > > > > of
> > > > > > data may be avoided by either checkpointing at large intervals or
> > > even
> > > > > > disabling checkpointing for the batch job.
> > > > > > Additionally, the external trigger (existence of some file etc. )
> > can
> > > > be
> > > > > > controlled by the StartBatch() and EndBatch() calls. In all the
> > batch
> > > > use
> > > > > > cases, it is usually the case that once the input is processed
> > > > > completely,
> > > > > > the batch is done. Example: In map reduce all splits processed
> > means
> > > > > batch
> > > > > > job is done. Similar primitives can be supported by Apex in order
> > to
> > > > > > facilitate the control management in the StartBatch() and
> > EndBatch()
> > > > > > methods.
> > > > > >
> > > > > > -Bhupesh
> > > > > >
> > > > > > On Mon, Dec 28, 2015 at 1:34 PM, Thomas Weise <
> > > tho...@datatorrent.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Following JIRA is open to enhance the support for batch:
> > > > > > >
> > > > > > > https://issues.apache.org/jira/browse/APEXCORE-235
> > > > > > >
> > > > > > > One of the challenges with batch on Apex today is that there
> > isn't
> > > > any
> > > > > > > native support to identify begin/end of batch and associate
> > actions
> > > > to
> > > > > > it.
> > > > > > > For example, at the beginning we may want to fetch some data
> > needed
> > > > for
> > > > > > all
> > > > > > > subsequent processing and at the end perform some finalization
> > > action
> > > > > or
> > > > > > > push to external system (add partition to Hive table or
> similar).
> > > > > > >
> > > > > > > Absent native support, the workaround is to add a bunch of
> ports
> > > and
> > > > > > extra
> > > > > > > operators for propagation and synchronization purposes, which
> > makes
> > > > > > > building the batch application with standard operators or
> > > development
> > > > > of
> > > > > > > custom operators rather difficult and inefficient.
> > > > > > >
> > > > > > > The span of a batch can also be seen as a user defined window,
> > with
> > > > > logic
> > > > > > > for begin and end. The current "application window" support is
> > > > limited
> > > > > > to a
> > > > > > > multiple of streaming window on a per operator basis. In the
> > batch
> > > > > case,
> > > > > > > the boundary needs to be more flexible - user code needs to be
> > able
> > > > to
> > > > > > > determine begin/endWindow based on external data (existence of
> > > files
> > > > > > etc.).
> > > > > > >
> > > > > > > There is another commonality with application window, and
> that's
> > > > > > alignment
> > > > > > > of checkpointing. For batches where it is more efficient to
> redo
> > > the
> > > > > > > processing instead of checkpointing potentially large amounts
> of
> > > > > > > intermediate state for incremental recovery, it would be nice
> to
> > be
> > > > > able
> > > > > > to
> > > > > > > say "user window == checkpoint interval".
> > > > > > >
> > > > > > > This is to float the idea of having a window control that can
> be
> > > > > > influenced
> > > > > > > by user code. An operator that identifies the batch boundary
> > tells
> > > > the
> > > > > > > engine about it and corresponding control tuples are submitted
> > > > through
> > > > > > the
> > > > > > > stream, leading to callbacks on downstream operators. These
> > control
> > > > > > > tuples should
> > > > > > > be able to carry contextual information that can be used in
> > > > downstream
> > > > > > > operator logic (file names, schema information etc.)
> > > > > > >
> > > > > > > I don't expect the current beginWindow/endWindow can be
> augmented
> > > in
> > > > a
> > > > > > > backward compatible way to accommodate this, but a similar
> > optional
> > > > > > > interface could be supported to enable batch aware operators
> and
> > > > > > > checkpointing optimization.
> > > > > > >
> > > > > > > Thoughts?
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to