-->

On Thu, Feb 23, 2017 at 12:02 AM, Bhupesh Chawda <bhup...@datatorrent.com>
wrote:

> Hi Thomas,
>
> My response inline:
>
> On Wed, Feb 22, 2017 at 10:17 PM, Thomas Weise <t...@apache.org> wrote:
>
> > Hi Bhupesh,
> >
> > This looks great. You use the watermark as measure of completeness and
> the
> > window to isolate the state, which is how it should work.
> >
> > Questions/comments:
> >
> > Why does the count operator have a 2ms window when this should be driven
> by
> > the watermark from the input operator?
> >
> >
> ​In this example, we trigger at the Watermark. So the count (windowed)
> operator accumulates the state until the watermark and then emits all the
> accumulated counts.
> 2 ms is not necessary, we can make it 1ms. But in this case its not the
> time duration that matters. The file input operator makes sure all tuples
> belonging to a file become part of the same window by making the timestamp
> in those tuples same. So all tuples in first file go out with timestamp 0,
> second file with timestamp 1 and so on.
> ​
>

Understood all but this line:

windowedOperator.setWindowOption(new
WindowOption.TimeWindows(Duration.millis(2)));

I wonder if there is an option to control this from the source, maybe David
can take a look?


>
> > I don't think there should be separate "windowed" connector operators.
> The
> > watermark support needs to be incorporated into existing operators and
> > configurable. Windowing is a concept that the entire library needs to be
> > aware of. I see no reason to arrange classes in separate "window"
> packages
> > except those that are really specific to windowing support such as the
> > watermark tuple.
> >
>
> ​I think, making changes to the existing operators would make them too
> heavy and complex. I suggest we extend the existing operators and have new
> classes with just the logic for watermarks. This will also help bugs
> resulting due to the new implementations isolated.
> We can keep these in the same package as the existing operators. Just the
> window specific classes (like watermarks) will go into the window package.
>

Watermark support needs to be part of all connectors, just like
idempotency, fault tolerance, partitioning etc. Well written operators have
pluggable components and strategies. Inheritance is not a way to solve
this, this needs to be part of the abstract base classes.


> ​
>
>
> >
> > Why does the control tuple have an operatorId in it?
> >
>
> ​Operator id is not used in the current example, but may help the user to
> understand the originating partition for a watermark tuple. This will be in
> scenarios where we cannot distinguish between watermark tuples from
> different partitions; unlike the file based watermarks where filename is
> the distinguishing property.
>


Not clear why it is important which physical operator generated a
watermark. I don't think anything downstream can rely on that, in general.
It would help if you can provide a real example and document it?


> ​
>
> >
> > Once you make the changes to the operators, please also augment the
> > documentation and examples (in this case wordcount demo).
> >
>
> ​Sure.​
>
>
>
> > Thanks,
> > Thomas
> >
> >
> >
> > On Wed, Feb 22, 2017 at 4:51 AM, Bhupesh Chawda <bhup...@datatorrent.com
> >
> > wrote:
> >
> > > Hi Thomas,
> > >
> > > Sorry for the delay.
> > > I agree that the watermark concept is general and is understood by
> > > intermediate transformations. File name is some additional information
> in
> > > the watermark which helps the start and end operators do stuff related
> to
> > > batch.
> > > As suggested, I have created a wordcount application which uses
> > watermarks
> > > to create separate windows for each file by means of a long
> (timestamp).
> > > I am linking the source for reference:
> > >
> > > Watermarks:
> > > https://github.com/bhupeshchawda/apex-malhar/blob/batch-io-operators/
> > > library/src/main/java/org/apache/apex/malhar/lib/window/
> > > windowable/FileWatermark.java
> > >
> > > Extended File Input and Output operators:
> > > https://github.com/bhupeshchawda/apex-malhar/blob/batch-io-operators/
> > > library/src/main/java/org/apache/apex/malhar/lib/window/windowable/
> > > WindowedFileInputOperator.java
> > > https://github.com/bhupeshchawda/apex-malhar/blob/batch-io-operators/
> > > library/src/main/java/org/apache/apex/malhar/lib/window/windowable/
> > > WindowedFileOutputOperator.java
> > >
> > >
> > > WordCount Application:
> > >
> > > https://github.com/bhupeshchawda/apex-malhar/blob/batch-io-operators/
> > > library/src/test/java/org/apache/apex/malhar/lib/window/
> > > windowable/WindowedWordCount.java
> > >
> > >
> > > The input operator attaches a timestamp with each file which allows the
> > > WindowedOperator to identify each file and its state in a distinct
> > window.
> > >
> > > Additionally, using the additional file information, the application
> can
> > > store the counts in similarly named files at the destination.
> > >
> > >
> > > Thanks.
> > >
> > > _______________________________________________________
> > >
> > > Bhupesh Chawda
> > >
> > > Software Engineer
> > >
> > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> > >
> > > www.datatorrent.com  |  apex.apache.org
> > >
> > >
> > >
> > > On Sat, Feb 18, 2017 at 10:24 PM, Thomas Weise <t...@apache.org> wrote:
> > >
> > > > Hi Bhupesh,
> > > >
> > > > I think this needs a generic watermark concept that is independent of
> > > > source and destination and can be understood by intermediate
> > > > transformations. File names don't meet this criteria.
> > > >
> > > > One possible approach is to have a monotonic increasing file sequence
> > > > (instead of time, if it is not applicable) that can be mapped to
> > > watermark.
> > > > You can still tag on the file name to the control tuple as extra
> > > > information so that a file output operator that understands it can do
> > > > whatever it wants with it. But it should also work without it, let's
> > say
> > > > when we write the output to the console.
> > > >
> > > > The key here is that you can demonstrate that an intermediate
> stateful
> > > > transformation will work. I would suggest to try wordcount per input
> > file
> > > > with the window operator that emits the counts at file boundary,
> > without
> > > > knowing anything about files.
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > > On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda <
> > bhup...@datatorrent.com
> > > >
> > > > wrote:
> > > >
> > > > > Hi Thomas,
> > > > >
> > > > > For an input operator which is supposed to generate watermarks for
> > > > > downstream operators, I can think about the following watermarks
> that
> > > the
> > > > > operator can emit:
> > > > > 1. Time based watermarks (the high watermark / low watermark)
> > > > > 2. Number of tuple based watermarks (Every n tuples)
> > > > > 3. File based watermarks (Start file, end file)
> > > > > 4. Final watermark
> > > > >
> > > > > File based watermarks seem to be applicable for batch (file based)
> as
> > > > well,
> > > > > and hence I thought of looking at these first. Does this seem to be
> > in
> > > > line
> > > > > with the thought process?
> > > > >
> > > > > ~ Bhupesh
> > > > >
> > > > >
> > > > >
> > > > > _______________________________________________________
> > > > >
> > > > > Bhupesh Chawda
> > > > >
> > > > > Software Engineer
> > > > >
> > > > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc
> > > > >
> > > > > www.datatorrent.com  |  apex.apache.org
> > > > >
> > > > >
> > > > >
> > > > > On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise <t...@apache.org>
> > wrote:
> > > > >
> > > > > > I don't think this should be designed based on a simplistic file
> > > > > > input-output scenario. It would be good to include a stateful
> > > > > > transformation based on event time.
> > > > > >
> > > > > > More complex pipelines contain stateful transformations that
> depend
> > > on
> > > > > > windowing and watermarks. I think we need a watermark concept
> that
> > is
> > > > > based
> > > > > > on progress in event time (or other monotonic increasing
> sequence)
> > > that
> > > > > > other operators can generically work with.
> > > > > >
> > > > > > Note that even file input in many cases can produce time based
> > > > > watermarks,
> > > > > > for example when you read part files that are bound by event
> time.
> > > > > >
> > > > > > Thanks,
> > > > > > Thomas
> > > > > >
> > > > > >
> > > > > > On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda <
> > > > bhup...@datatorrent.com
> > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > For better understanding the use case for control tuples in
> > batch,
> > > ​I
> > > > > am
> > > > > > > creating a prototype for a batch application using File Input
> and
> > > > File
> > > > > > > Output operators.
> > > > > > >
> > > > > > > To enable basic batch processing for File IO operators, I am
> > > > proposing
> > > > > > the
> > > > > > > following changes to File input and output operators:
> > > > > > > 1. File Input operator emits a watermark each time it opens and
> > > > closes
> > > > > a
> > > > > > > file. These can be "start file" and "end file" watermarks which
> > > > include
> > > > > > the
> > > > > > > corresponding file names. The "start file" tuple should be sent
> > > > before
> > > > > > any
> > > > > > > of the data from that file flows.
> > > > > > > 2. File Input operator can be configured to end the application
> > > > after a
> > > > > > > single or n scans of the directory (a batch). This is where the
> > > > > operator
> > > > > > > emits the final watermark (the end of application control
> tuple).
> > > > This
> > > > > > will
> > > > > > > also shutdown the application.
> > > > > > > 3. The File output operator handles these control tuples.
> "Start
> > > > file"
> > > > > > > initializes the file name for the incoming tuples. "End file"
> > > > watermark
> > > > > > > forces a finalize on that file.
> > > > > > >
> > > > > > > The user would be able to enable the operators to send only
> those
> > > > > > > watermarks that are needed in the application. If none of the
> > > options
> > > > > are
> > > > > > > configured, the operators behave as in a streaming application.
> > > > > > >
> > > > > > > There are a few challenges in the implementation where the
> input
> > > > > operator
> > > > > > > is partitioned. In this case, the correlation between the
> > start/end
> > > > > for a
> > > > > > > file and the data tuples for that file is lost. Hence we need
> to
> > > > > maintain
> > > > > > > the filename as part of each tuple in the pipeline.
> > > > > > >
> > > > > > > The "start file" and "end file" control tuples in this example
> > are
> > > > > > > temporary names for watermarks. We can have generic "start
> > batch" /
> > > > > "end
> > > > > > > batch" tuples which could be used for other use cases as well.
> > The
> > > > > Final
> > > > > > > watermark is common and serves the same purpose in each case.
> > > > > > >
> > > > > > > Please let me know your thoughts on this.
> > > > > > >
> > > > > > > ~ Bhupesh
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Jan 18, 2017 at 12:22 AM, Bhupesh Chawda <
> > > > > > bhup...@datatorrent.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Yes, this can be part of operator configuration. Given this,
> > for
> > > a
> > > > > user
> > > > > > > to
> > > > > > > > define a batch application, would mean configuring the
> > connectors
> > > > > > (mostly
> > > > > > > > the input operator) in the application for the desired
> > behavior.
> > > > > > > Similarly,
> > > > > > > > there can be other use cases that can be achieved other than
> > > batch.
> > > > > > > >
> > > > > > > > We may also need to take care of the following:
> > > > > > > > 1. Make sure that the watermarks or control tuples are
> > consistent
> > > > > > across
> > > > > > > > sources. Meaning an HDFS sink should be able to interpret the
> > > > > watermark
> > > > > > > > tuple sent out by, say, a JDBC source.
> > > > > > > > 2. In addition to I/O connectors, we should also look at the
> > need
> > > > for
> > > > > > > > processing operators to understand some of the control
> tuples /
> > > > > > > watermarks.
> > > > > > > > For example, we may want to reset the operator behavior on
> > > arrival
> > > > of
> > > > > > > some
> > > > > > > > watermark tuple.
> > > > > > > >
> > > > > > > > ~ Bhupesh
> > > > > > > >
> > > > > > > > On Tue, Jan 17, 2017 at 9:59 PM, Thomas Weise <
> t...@apache.org>
> > > > > wrote:
> > > > > > > >
> > > > > > > >> The HDFS source can operate in two modes, bounded or
> > unbounded.
> > > If
> > > > > you
> > > > > > > >> scan
> > > > > > > >> only once, then it should emit the final watermark after it
> is
> > > > done.
> > > > > > > >> Otherwise it would emit watermarks based on a policy (files
> > > names
> > > > > > etc.).
> > > > > > > >> The mechanism to generate the marks may depend on the type
> of
> > > > source
> > > > > > and
> > > > > > > >> the user needs to be able to influence/configure it.
> > > > > > > >>
> > > > > > > >> Thomas
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Tue, Jan 17, 2017 at 5:03 AM, Bhupesh Chawda <
> > > > > > > bhup...@datatorrent.com>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >> > Hi Thomas,
> > > > > > > >> >
> > > > > > > >> > I am not sure that I completely understand your
> suggestion.
> > > Are
> > > > > you
> > > > > > > >> > suggesting to broaden the scope of the proposal to treat
> all
> > > > > sources
> > > > > > > as
> > > > > > > >> > bounded as well as unbounded?
> > > > > > > >> >
> > > > > > > >> > In case of Apex, we treat all sources as unbounded
> sources.
> > > Even
> > > > > > > bounded
> > > > > > > >> > sources like HDFS file source is treated as unbounded by
> > means
> > > > of
> > > > > > > >> scanning
> > > > > > > >> > the input directory repeatedly.
> > > > > > > >> >
> > > > > > > >> > Let's consider HDFS file source for example:
> > > > > > > >> > In this case, if we treat it as a bounded source, we can
> > > define
> > > > > > hooks
> > > > > > > >> which
> > > > > > > >> > allows us to detect the end of the file and send the
> "final
> > > > > > > watermark".
> > > > > > > >> We
> > > > > > > >> > could also consider HDFS file source as a streaming source
> > and
> > > > > > define
> > > > > > > >> hooks
> > > > > > > >> > which send watermarks based on different kinds of windows.
> > > > > > > >> >
> > > > > > > >> > Please correct me if I misunderstand.
> > > > > > > >> >
> > > > > > > >> > ~ Bhupesh
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > On Mon, Jan 16, 2017 at 9:23 PM, Thomas Weise <
> > t...@apache.org
> > > >
> > > > > > wrote:
> > > > > > > >> >
> > > > > > > >> > > Bhupesh,
> > > > > > > >> > >
> > > > > > > >> > > Please see how that can be solved in a unified way using
> > > > windows
> > > > > > and
> > > > > > > >> > > watermarks. It is bounded data vs. unbounded data. In
> Beam
> > > for
> > > > > > > >> example,
> > > > > > > >> > you
> > > > > > > >> > > can use the "global window" and the final watermark to
> > > > > accomplish
> > > > > > > what
> > > > > > > >> > you
> > > > > > > >> > > are looking for. Batch is just a special case of
> streaming
> > > > where
> > > > > > the
> > > > > > > >> > source
> > > > > > > >> > > emits the final watermark.
> > > > > > > >> > >
> > > > > > > >> > > Thanks,
> > > > > > > >> > > Thomas
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > On Mon, Jan 16, 2017 at 1:02 AM, Bhupesh Chawda <
> > > > > > > >> bhup...@datatorrent.com
> > > > > > > >> > >
> > > > > > > >> > > wrote:
> > > > > > > >> > >
> > > > > > > >> > > > Yes, if the user needs to develop a batch application,
> > > then
> > > > > > batch
> > > > > > > >> aware
> > > > > > > >> > > > operators need to be used in the application.
> > > > > > > >> > > > The nature of the application is mostly controlled by
> > the
> > > > > input
> > > > > > > and
> > > > > > > >> the
> > > > > > > >> > > > output operators used in the application.
> > > > > > > >> > > >
> > > > > > > >> > > > For example, consider an application which needs to
> > filter
> > > > > > records
> > > > > > > >> in a
> > > > > > > >> > > > input file and store the filtered records in another
> > file.
> > > > The
> > > > > > > >> nature
> > > > > > > >> > of
> > > > > > > >> > > > this app is to end once the entire file is processed.
> > > > > Following
> > > > > > > >> things
> > > > > > > >> > > are
> > > > > > > >> > > > expected of the application:
> > > > > > > >> > > >
> > > > > > > >> > > >    1. Once the input data is over, finalize the output
> > > file
> > > > > from
> > > > > > > >> .tmp
> > > > > > > >> > > >    files. - Responsibility of output operator
> > > > > > > >> > > >    2. End the application, once the data is read and
> > > > > processed -
> > > > > > > >> > > >    Responsibility of input operator
> > > > > > > >> > > >
> > > > > > > >> > > > These functions are essential to allow the user to do
> > > higher
> > > > > > level
> > > > > > > >> > > > operations like scheduling or running a workflow of
> > batch
> > > > > > > >> applications.
> > > > > > > >> > > >
> > > > > > > >> > > > I am not sure about intermediate (processing)
> operators,
> > > as
> > > > > > there
> > > > > > > >> is no
> > > > > > > >> > > > change in their functionality for batch use cases.
> > > Perhaps,
> > > > > > > allowing
> > > > > > > >> > > > running multiple batches in a single application may
> > > require
> > > > > > > similar
> > > > > > > >> > > > changes in processing operators as well.
> > > > > > > >> > > >
> > > > > > > >> > > > ~ Bhupesh
> > > > > > > >> > > >
> > > > > > > >> > > > On Mon, Jan 16, 2017 at 2:19 PM, Priyanka Gugale <
> > > > > > > pri...@apache.org
> > > > > > > >> >
> > > > > > > >> > > > wrote:
> > > > > > > >> > > >
> > > > > > > >> > > > > Will it make an impression on user that, if he has a
> > > batch
> > > > > > > >> usecase he
> > > > > > > >> > > has
> > > > > > > >> > > > > to use batch aware operators only? If so, is that
> what
> > > we
> > > > > > > expect?
> > > > > > > >> I
> > > > > > > >> > am
> > > > > > > >> > > > not
> > > > > > > >> > > > > aware of how do we implement batch scenario so this
> > > might
> > > > > be a
> > > > > > > >> basic
> > > > > > > >> > > > > question.
> > > > > > > >> > > > >
> > > > > > > >> > > > > -Priyanka
> > > > > > > >> > > > >
> > > > > > > >> > > > > On Mon, Jan 16, 2017 at 12:02 PM, Bhupesh Chawda <
> > > > > > > >> > > > bhup...@datatorrent.com>
> > > > > > > >> > > > > wrote:
> > > > > > > >> > > > >
> > > > > > > >> > > > > > Hi All,
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > While design / implementation for custom control
> > > tuples
> > > > is
> > > > > > > >> > ongoing, I
> > > > > > > >> > > > > > thought it would be a good idea to consider its
> > > > usefulness
> > > > > > in
> > > > > > > >> one
> > > > > > > >> > of
> > > > > > > >> > > > the
> > > > > > > >> > > > > > use cases -  batch applications.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > This is a proposal to adapt / extend existing
> > > operators
> > > > in
> > > > > > the
> > > > > > > >> > Apache
> > > > > > > >> > > > > Apex
> > > > > > > >> > > > > > Malhar library so that it is easy to use them in
> > batch
> > > > use
> > > > > > > >> cases.
> > > > > > > >> > > > > > Naturally, this would be applicable for only a
> > subset
> > > of
> > > > > > > >> operators
> > > > > > > >> > > like
> > > > > > > >> > > > > > File, JDBC and NoSQL databases.
> > > > > > > >> > > > > > For example, for a file based store, (say HDFS
> > store),
> > > > we
> > > > > > > could
> > > > > > > >> > have
> > > > > > > >> > > > > > FileBatchInput and FileBatchOutput operators which
> > > allow
> > > > > > easy
> > > > > > > >> > > > integration
> > > > > > > >> > > > > > into a batch application. These operators would be
> > > > > extended
> > > > > > > from
> > > > > > > >> > > their
> > > > > > > >> > > > > > existing implementations and would be "Batch
> Aware",
> > > in
> > > > > that
> > > > > > > >> they
> > > > > > > >> > may
> > > > > > > >> > > > > > understand the meaning of some specific control
> > tuples
> > > > > that
> > > > > > > flow
> > > > > > > >> > > > through
> > > > > > > >> > > > > > the DAG. Start batch and end batch seem to be the
> > > > obvious
> > > > > > > >> > candidates
> > > > > > > >> > > > that
> > > > > > > >> > > > > > come to mind. On receipt of such control tuples,
> > they
> > > > may
> > > > > > try
> > > > > > > to
> > > > > > > >> > > modify
> > > > > > > >> > > > > the
> > > > > > > >> > > > > > behavior of the operator - to reinitialize some
> > > metrics
> > > > or
> > > > > > > >> finalize
> > > > > > > >> > > an
> > > > > > > >> > > > > > output file for example.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > We can discuss the potential control tuples and
> > > actions
> > > > in
> > > > > > > >> detail,
> > > > > > > >> > > but
> > > > > > > >> > > > > > first I would like to understand the views of the
> > > > > community
> > > > > > > for
> > > > > > > >> > this
> > > > > > > >> > > > > > proposal.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > ~ Bhupesh
> > > > > > > >> > > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to