I don't think this is the way to go. Global Window only means the timestamp
does not matter (or that there is no timestamp). It does not necessarily
mean it's a large batch. Unless there is some notion of event time for each
file, you don't want to embed the file into the window itself.

If you want the result broken up by file name, and if the files are to be
processed in parallel, I think making the file name be part of the key is
the way to go. I think it's very confusing if we somehow make the file to
be part of the window.

For count-based window, it's not implemented yet and you're welcome to add
that feature. In case of count-based windows, there would be no notion of
time and you probably only trigger at the end of each window. In the case
of count-based windows, the watermark only matters for batch since you need
a way to know when the batch has ended (if the count is 10, the number of
tuples in the batch is let's say 105, you need a way to end the last window
with 5 tuples).

David

On Mon, Feb 27, 2017 at 2:41 AM, Bhupesh Chawda <bhup...@datatorrent.com>
wrote:

> Hi David,
>
> Thanks for your comments.
>
> The wordcount example that I created based on the windowed operator does
> processing of word counts per file (each file as a separate batch), i.e.
> process counts for each file and dump into separate files.
> As I understand Global window is for one large batch; i.e. all incoming
> data falls into the same batch. This could not be processed using
> GlobalWindow option as we need more than one windows. In this case, I
> configured the windowed operator to have time windows of 1ms each and
> passed data for each file with increasing timestamps: (file1, 1), (file2,
> 2) and so on. Is there a better way of handling this scenario?
>
> Regarding (2 - count based windows), I think there is a trigger option to
> process count based windows. In case I want to process every 1000 tuples as
> a batch, I could set the Trigger option to CountTrigger with the
> accumulation set to Discarding. Is this correct?
>
> I agree that (4. Final Watermark) can be done using Global window.
>
> ​~ Bhupesh​
>
> _______________________________________________________
>
> Bhupesh Chawda
>
> E: bhup...@datatorrent.com | Twitter: @bhupeshsc
>
> www.datatorrent.com  |  apex.apache.org
>
>
>
> On Mon, Feb 27, 2017 at 12:18 PM, David Yan <david...@gmail.com> wrote:
>
> > I'm worried that we are making the watermark concept too complicated.
> >
> > Watermarks should simply just tell you what windows can be considered
> > complete.
> >
> > Point 2 is basically a count-based window. Watermarks do not play a role
> > here because the window is always complete at the n-th tuple.
> >
> > If I understand correctly, point 3 is for batch processing of files.
> Unless
> > the files contain timed events, it sounds to be that this can be achieved
> > with just a Global Window. For signaling EOF, a watermark with a
> +infinity
> > timestamp can be used so that triggers will be fired upon receipt of that
> > watermark.
> >
> > For point 4, just like what I mentioned above, can be achieved with a
> > watermark with a +infinity timestamp.
> >
> > David
> >
> >
> >
> >
> > On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda <bhup...@datatorrent.com
> >
> > wrote:
> >
> > > Hi Thomas,
> > >
> > > For an input operator which is supposed to generate watermarks for
> > > downstream operators, I can think about the following watermarks that
> the
> > > operator can emit:
> > > 1. Time based watermarks (the high watermark / low watermark)
> > > 2. Number of tuple based watermarks (Every n tuples)
> > > 3. File based watermarks (Start file, end file)
> > > 4. Final watermark
> > >
> > > File based watermarks seem to be applicable for batch (file based) as
> > well,
> > > and hence I thought of looking at these first. Does this seem to be in
> > line
> > > with the thought process?
> > >
> > > ~ Bhupesh
> > >
> > >
> > >
> > > _______________________________________________________
> > >
> > > Bhupesh Chawda
> > >
> > > Software Engineer
> > >
> > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> > >
> > > www.datatorrent.com  |  apex.apache.org
> > >
> > >
> > >
> > > On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise <t...@apache.org> wrote:
> > >
> > > > I don't think this should be designed based on a simplistic file
> > > > input-output scenario. It would be good to include a stateful
> > > > transformation based on event time.
> > > >
> > > > More complex pipelines contain stateful transformations that depend
> on
> > > > windowing and watermarks. I think we need a watermark concept that is
> > > based
> > > > on progress in event time (or other monotonic increasing sequence)
> that
> > > > other operators can generically work with.
> > > >
> > > > Note that even file input in many cases can produce time based
> > > watermarks,
> > > > for example when you read part files that are bound by event time.
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > > On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda <
> > bhup...@datatorrent.com
> > > >
> > > > wrote:
> > > >
> > > > > For better understanding the use case for control tuples in batch,
> ​I
> > > am
> > > > > creating a prototype for a batch application using File Input and
> > File
> > > > > Output operators.
> > > > >
> > > > > To enable basic batch processing for File IO operators, I am
> > proposing
> > > > the
> > > > > following changes to File input and output operators:
> > > > > 1. File Input operator emits a watermark each time it opens and
> > closes
> > > a
> > > > > file. These can be "start file" and "end file" watermarks which
> > include
> > > > the
> > > > > corresponding file names. The "start file" tuple should be sent
> > before
> > > > any
> > > > > of the data from that file flows.
> > > > > 2. File Input operator can be configured to end the application
> > after a
> > > > > single or n scans of the directory (a batch). This is where the
> > > operator
> > > > > emits the final watermark (the end of application control tuple).
> > This
> > > > will
> > > > > also shutdown the application.
> > > > > 3. The File output operator handles these control tuples. "Start
> > file"
> > > > > initializes the file name for the incoming tuples. "End file"
> > watermark
> > > > > forces a finalize on that file.
> > > > >
> > > > > The user would be able to enable the operators to send only those
> > > > > watermarks that are needed in the application. If none of the
> options
> > > are
> > > > > configured, the operators behave as in a streaming application.
> > > > >
> > > > > There are a few challenges in the implementation where the input
> > > operator
> > > > > is partitioned. In this case, the correlation between the start/end
> > > for a
> > > > > file and the data tuples for that file is lost. Hence we need to
> > > maintain
> > > > > the filename as part of each tuple in the pipeline.
> > > > >
> > > > > The "start file" and "end file" control tuples in this example are
> > > > > temporary names for watermarks. We can have generic "start batch" /
> > > "end
> > > > > batch" tuples which could be used for other use cases as well. The
> > > Final
> > > > > watermark is common and serves the same purpose in each case.
> > > > >
> > > > > Please let me know your thoughts on this.
> > > > >
> > > > > ~ Bhupesh
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Jan 18, 2017 at 12:22 AM, Bhupesh Chawda <
> > > > bhup...@datatorrent.com>
> > > > > wrote:
> > > > >
> > > > > > Yes, this can be part of operator configuration. Given this, for
> a
> > > user
> > > > > to
> > > > > > define a batch application, would mean configuring the connectors
> > > > (mostly
> > > > > > the input operator) in the application for the desired behavior.
> > > > > Similarly,
> > > > > > there can be other use cases that can be achieved other than
> batch.
> > > > > >
> > > > > > We may also need to take care of the following:
> > > > > > 1. Make sure that the watermarks or control tuples are consistent
> > > > across
> > > > > > sources. Meaning an HDFS sink should be able to interpret the
> > > watermark
> > > > > > tuple sent out by, say, a JDBC source.
> > > > > > 2. In addition to I/O connectors, we should also look at the need
> > for
> > > > > > processing operators to understand some of the control tuples /
> > > > > watermarks.
> > > > > > For example, we may want to reset the operator behavior on
> arrival
> > of
> > > > > some
> > > > > > watermark tuple.
> > > > > >
> > > > > > ~ Bhupesh
> > > > > >
> > > > > > On Tue, Jan 17, 2017 at 9:59 PM, Thomas Weise <t...@apache.org>
> > > wrote:
> > > > > >
> > > > > >> The HDFS source can operate in two modes, bounded or unbounded.
> If
> > > you
> > > > > >> scan
> > > > > >> only once, then it should emit the final watermark after it is
> > done.
> > > > > >> Otherwise it would emit watermarks based on a policy (files
> names
> > > > etc.).
> > > > > >> The mechanism to generate the marks may depend on the type of
> > source
> > > > and
> > > > > >> the user needs to be able to influence/configure it.
> > > > > >>
> > > > > >> Thomas
> > > > > >>
> > > > > >>
> > > > > >> On Tue, Jan 17, 2017 at 5:03 AM, Bhupesh Chawda <
> > > > > bhup...@datatorrent.com>
> > > > > >> wrote:
> > > > > >>
> > > > > >> > Hi Thomas,
> > > > > >> >
> > > > > >> > I am not sure that I completely understand your suggestion.
> Are
> > > you
> > > > > >> > suggesting to broaden the scope of the proposal to treat all
> > > sources
> > > > > as
> > > > > >> > bounded as well as unbounded?
> > > > > >> >
> > > > > >> > In case of Apex, we treat all sources as unbounded sources.
> Even
> > > > > bounded
> > > > > >> > sources like HDFS file source is treated as unbounded by means
> > of
> > > > > >> scanning
> > > > > >> > the input directory repeatedly.
> > > > > >> >
> > > > > >> > Let's consider HDFS file source for example:
> > > > > >> > In this case, if we treat it as a bounded source, we can
> define
> > > > hooks
> > > > > >> which
> > > > > >> > allows us to detect the end of the file and send the "final
> > > > > watermark".
> > > > > >> We
> > > > > >> > could also consider HDFS file source as a streaming source and
> > > > define
> > > > > >> hooks
> > > > > >> > which send watermarks based on different kinds of windows.
> > > > > >> >
> > > > > >> > Please correct me if I misunderstand.
> > > > > >> >
> > > > > >> > ~ Bhupesh
> > > > > >> >
> > > > > >> >
> > > > > >> > On Mon, Jan 16, 2017 at 9:23 PM, Thomas Weise <t...@apache.org
> >
> > > > wrote:
> > > > > >> >
> > > > > >> > > Bhupesh,
> > > > > >> > >
> > > > > >> > > Please see how that can be solved in a unified way using
> > windows
> > > > and
> > > > > >> > > watermarks. It is bounded data vs. unbounded data. In Beam
> for
> > > > > >> example,
> > > > > >> > you
> > > > > >> > > can use the "global window" and the final watermark to
> > > accomplish
> > > > > what
> > > > > >> > you
> > > > > >> > > are looking for. Batch is just a special case of streaming
> > where
> > > > the
> > > > > >> > source
> > > > > >> > > emits the final watermark.
> > > > > >> > >
> > > > > >> > > Thanks,
> > > > > >> > > Thomas
> > > > > >> > >
> > > > > >> > >
> > > > > >> > > On Mon, Jan 16, 2017 at 1:02 AM, Bhupesh Chawda <
> > > > > >> bhup...@datatorrent.com
> > > > > >> > >
> > > > > >> > > wrote:
> > > > > >> > >
> > > > > >> > > > Yes, if the user needs to develop a batch application,
> then
> > > > batch
> > > > > >> aware
> > > > > >> > > > operators need to be used in the application.
> > > > > >> > > > The nature of the application is mostly controlled by the
> > > input
> > > > > and
> > > > > >> the
> > > > > >> > > > output operators used in the application.
> > > > > >> > > >
> > > > > >> > > > For example, consider an application which needs to filter
> > > > records
> > > > > >> in a
> > > > > >> > > > input file and store the filtered records in another file.
> > The
> > > > > >> nature
> > > > > >> > of
> > > > > >> > > > this app is to end once the entire file is processed.
> > > Following
> > > > > >> things
> > > > > >> > > are
> > > > > >> > > > expected of the application:
> > > > > >> > > >
> > > > > >> > > >    1. Once the input data is over, finalize the output
> file
> > > from
> > > > > >> .tmp
> > > > > >> > > >    files. - Responsibility of output operator
> > > > > >> > > >    2. End the application, once the data is read and
> > > processed -
> > > > > >> > > >    Responsibility of input operator
> > > > > >> > > >
> > > > > >> > > > These functions are essential to allow the user to do
> higher
> > > > level
> > > > > >> > > > operations like scheduling or running a workflow of batch
> > > > > >> applications.
> > > > > >> > > >
> > > > > >> > > > I am not sure about intermediate (processing) operators,
> as
> > > > there
> > > > > >> is no
> > > > > >> > > > change in their functionality for batch use cases.
> Perhaps,
> > > > > allowing
> > > > > >> > > > running multiple batches in a single application may
> require
> > > > > similar
> > > > > >> > > > changes in processing operators as well.
> > > > > >> > > >
> > > > > >> > > > ~ Bhupesh
> > > > > >> > > >
> > > > > >> > > > On Mon, Jan 16, 2017 at 2:19 PM, Priyanka Gugale <
> > > > > pri...@apache.org
> > > > > >> >
> > > > > >> > > > wrote:
> > > > > >> > > >
> > > > > >> > > > > Will it make an impression on user that, if he has a
> batch
> > > > > >> usecase he
> > > > > >> > > has
> > > > > >> > > > > to use batch aware operators only? If so, is that what
> we
> > > > > expect?
> > > > > >> I
> > > > > >> > am
> > > > > >> > > > not
> > > > > >> > > > > aware of how do we implement batch scenario so this
> might
> > > be a
> > > > > >> basic
> > > > > >> > > > > question.
> > > > > >> > > > >
> > > > > >> > > > > -Priyanka
> > > > > >> > > > >
> > > > > >> > > > > On Mon, Jan 16, 2017 at 12:02 PM, Bhupesh Chawda <
> > > > > >> > > > bhup...@datatorrent.com>
> > > > > >> > > > > wrote:
> > > > > >> > > > >
> > > > > >> > > > > > Hi All,
> > > > > >> > > > > >
> > > > > >> > > > > > While design / implementation for custom control
> tuples
> > is
> > > > > >> > ongoing, I
> > > > > >> > > > > > thought it would be a good idea to consider its
> > usefulness
> > > > in
> > > > > >> one
> > > > > >> > of
> > > > > >> > > > the
> > > > > >> > > > > > use cases -  batch applications.
> > > > > >> > > > > >
> > > > > >> > > > > > This is a proposal to adapt / extend existing
> operators
> > in
> > > > the
> > > > > >> > Apache
> > > > > >> > > > > Apex
> > > > > >> > > > > > Malhar library so that it is easy to use them in batch
> > use
> > > > > >> cases.
> > > > > >> > > > > > Naturally, this would be applicable for only a subset
> of
> > > > > >> operators
> > > > > >> > > like
> > > > > >> > > > > > File, JDBC and NoSQL databases.
> > > > > >> > > > > > For example, for a file based store, (say HDFS store),
> > we
> > > > > could
> > > > > >> > have
> > > > > >> > > > > > FileBatchInput and FileBatchOutput operators which
> allow
> > > > easy
> > > > > >> > > > integration
> > > > > >> > > > > > into a batch application. These operators would be
> > > extended
> > > > > from
> > > > > >> > > their
> > > > > >> > > > > > existing implementations and would be "Batch Aware",
> in
> > > that
> > > > > >> they
> > > > > >> > may
> > > > > >> > > > > > understand the meaning of some specific control tuples
> > > that
> > > > > flow
> > > > > >> > > > through
> > > > > >> > > > > > the DAG. Start batch and end batch seem to be the
> > obvious
> > > > > >> > candidates
> > > > > >> > > > that
> > > > > >> > > > > > come to mind. On receipt of such control tuples, they
> > may
> > > > try
> > > > > to
> > > > > >> > > modify
> > > > > >> > > > > the
> > > > > >> > > > > > behavior of the operator - to reinitialize some
> metrics
> > or
> > > > > >> finalize
> > > > > >> > > an
> > > > > >> > > > > > output file for example.
> > > > > >> > > > > >
> > > > > >> > > > > > We can discuss the potential control tuples and
> actions
> > in
> > > > > >> detail,
> > > > > >> > > but
> > > > > >> > > > > > first I would like to understand the views of the
> > > community
> > > > > for
> > > > > >> > this
> > > > > >> > > > > > proposal.
> > > > > >> > > > > >
> > > > > >> > > > > > ~ Bhupesh
> > > > > >> > > > > >
> > > > > >> > > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to