Hi Thomas, For an input operator which is supposed to generate watermarks for downstream operators, I can think about the following watermarks that the operator can emit: 1. Time based watermarks (the high watermark / low watermark) 2. Number of tuple based watermarks (Every n tuples) 3. File based watermarks (Start file, end file) 4. Final watermark
File based watermarks seem to be applicable for batch (file based) as well, and hence I thought of looking at these first. Does this seem to be in line with the thought process? ~ Bhupesh _______________________________________________________ Bhupesh Chawda Software Engineer E: bhup...@datatorrent.com | Twitter: @bhupeshsc www.datatorrent.com | apex.apache.org On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise <t...@apache.org> wrote: > I don't think this should be designed based on a simplistic file > input-output scenario. It would be good to include a stateful > transformation based on event time. > > More complex pipelines contain stateful transformations that depend on > windowing and watermarks. I think we need a watermark concept that is based > on progress in event time (or other monotonic increasing sequence) that > other operators can generically work with. > > Note that even file input in many cases can produce time based watermarks, > for example when you read part files that are bound by event time. > > Thanks, > Thomas > > > On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda <bhup...@datatorrent.com> > wrote: > > > For better understanding the use case for control tuples in batch, I am > > creating a prototype for a batch application using File Input and File > > Output operators. > > > > To enable basic batch processing for File IO operators, I am proposing > the > > following changes to File input and output operators: > > 1. File Input operator emits a watermark each time it opens and closes a > > file. These can be "start file" and "end file" watermarks which include > the > > corresponding file names. The "start file" tuple should be sent before > any > > of the data from that file flows. > > 2. File Input operator can be configured to end the application after a > > single or n scans of the directory (a batch). This is where the operator > > emits the final watermark (the end of application control tuple). This > will > > also shutdown the application. > > 3. The File output operator handles these control tuples. "Start file" > > initializes the file name for the incoming tuples. "End file" watermark > > forces a finalize on that file. > > > > The user would be able to enable the operators to send only those > > watermarks that are needed in the application. If none of the options are > > configured, the operators behave as in a streaming application. > > > > There are a few challenges in the implementation where the input operator > > is partitioned. In this case, the correlation between the start/end for a > > file and the data tuples for that file is lost. Hence we need to maintain > > the filename as part of each tuple in the pipeline. > > > > The "start file" and "end file" control tuples in this example are > > temporary names for watermarks. We can have generic "start batch" / "end > > batch" tuples which could be used for other use cases as well. The Final > > watermark is common and serves the same purpose in each case. > > > > Please let me know your thoughts on this. > > > > ~ Bhupesh > > > > > > > > On Wed, Jan 18, 2017 at 12:22 AM, Bhupesh Chawda < > bhup...@datatorrent.com> > > wrote: > > > > > Yes, this can be part of operator configuration. Given this, for a user > > to > > > define a batch application, would mean configuring the connectors > (mostly > > > the input operator) in the application for the desired behavior. > > Similarly, > > > there can be other use cases that can be achieved other than batch. > > > > > > We may also need to take care of the following: > > > 1. Make sure that the watermarks or control tuples are consistent > across > > > sources. Meaning an HDFS sink should be able to interpret the watermark > > > tuple sent out by, say, a JDBC source. > > > 2. In addition to I/O connectors, we should also look at the need for > > > processing operators to understand some of the control tuples / > > watermarks. > > > For example, we may want to reset the operator behavior on arrival of > > some > > > watermark tuple. > > > > > > ~ Bhupesh > > > > > > On Tue, Jan 17, 2017 at 9:59 PM, Thomas Weise <t...@apache.org> wrote: > > > > > >> The HDFS source can operate in two modes, bounded or unbounded. If you > > >> scan > > >> only once, then it should emit the final watermark after it is done. > > >> Otherwise it would emit watermarks based on a policy (files names > etc.). > > >> The mechanism to generate the marks may depend on the type of source > and > > >> the user needs to be able to influence/configure it. > > >> > > >> Thomas > > >> > > >> > > >> On Tue, Jan 17, 2017 at 5:03 AM, Bhupesh Chawda < > > bhup...@datatorrent.com> > > >> wrote: > > >> > > >> > Hi Thomas, > > >> > > > >> > I am not sure that I completely understand your suggestion. Are you > > >> > suggesting to broaden the scope of the proposal to treat all sources > > as > > >> > bounded as well as unbounded? > > >> > > > >> > In case of Apex, we treat all sources as unbounded sources. Even > > bounded > > >> > sources like HDFS file source is treated as unbounded by means of > > >> scanning > > >> > the input directory repeatedly. > > >> > > > >> > Let's consider HDFS file source for example: > > >> > In this case, if we treat it as a bounded source, we can define > hooks > > >> which > > >> > allows us to detect the end of the file and send the "final > > watermark". > > >> We > > >> > could also consider HDFS file source as a streaming source and > define > > >> hooks > > >> > which send watermarks based on different kinds of windows. > > >> > > > >> > Please correct me if I misunderstand. > > >> > > > >> > ~ Bhupesh > > >> > > > >> > > > >> > On Mon, Jan 16, 2017 at 9:23 PM, Thomas Weise <t...@apache.org> > wrote: > > >> > > > >> > > Bhupesh, > > >> > > > > >> > > Please see how that can be solved in a unified way using windows > and > > >> > > watermarks. It is bounded data vs. unbounded data. In Beam for > > >> example, > > >> > you > > >> > > can use the "global window" and the final watermark to accomplish > > what > > >> > you > > >> > > are looking for. Batch is just a special case of streaming where > the > > >> > source > > >> > > emits the final watermark. > > >> > > > > >> > > Thanks, > > >> > > Thomas > > >> > > > > >> > > > > >> > > On Mon, Jan 16, 2017 at 1:02 AM, Bhupesh Chawda < > > >> bhup...@datatorrent.com > > >> > > > > >> > > wrote: > > >> > > > > >> > > > Yes, if the user needs to develop a batch application, then > batch > > >> aware > > >> > > > operators need to be used in the application. > > >> > > > The nature of the application is mostly controlled by the input > > and > > >> the > > >> > > > output operators used in the application. > > >> > > > > > >> > > > For example, consider an application which needs to filter > records > > >> in a > > >> > > > input file and store the filtered records in another file. The > > >> nature > > >> > of > > >> > > > this app is to end once the entire file is processed. Following > > >> things > > >> > > are > > >> > > > expected of the application: > > >> > > > > > >> > > > 1. Once the input data is over, finalize the output file from > > >> .tmp > > >> > > > files. - Responsibility of output operator > > >> > > > 2. End the application, once the data is read and processed - > > >> > > > Responsibility of input operator > > >> > > > > > >> > > > These functions are essential to allow the user to do higher > level > > >> > > > operations like scheduling or running a workflow of batch > > >> applications. > > >> > > > > > >> > > > I am not sure about intermediate (processing) operators, as > there > > >> is no > > >> > > > change in their functionality for batch use cases. Perhaps, > > allowing > > >> > > > running multiple batches in a single application may require > > similar > > >> > > > changes in processing operators as well. > > >> > > > > > >> > > > ~ Bhupesh > > >> > > > > > >> > > > On Mon, Jan 16, 2017 at 2:19 PM, Priyanka Gugale < > > pri...@apache.org > > >> > > > >> > > > wrote: > > >> > > > > > >> > > > > Will it make an impression on user that, if he has a batch > > >> usecase he > > >> > > has > > >> > > > > to use batch aware operators only? If so, is that what we > > expect? > > >> I > > >> > am > > >> > > > not > > >> > > > > aware of how do we implement batch scenario so this might be a > > >> basic > > >> > > > > question. > > >> > > > > > > >> > > > > -Priyanka > > >> > > > > > > >> > > > > On Mon, Jan 16, 2017 at 12:02 PM, Bhupesh Chawda < > > >> > > > bhup...@datatorrent.com> > > >> > > > > wrote: > > >> > > > > > > >> > > > > > Hi All, > > >> > > > > > > > >> > > > > > While design / implementation for custom control tuples is > > >> > ongoing, I > > >> > > > > > thought it would be a good idea to consider its usefulness > in > > >> one > > >> > of > > >> > > > the > > >> > > > > > use cases - batch applications. > > >> > > > > > > > >> > > > > > This is a proposal to adapt / extend existing operators in > the > > >> > Apache > > >> > > > > Apex > > >> > > > > > Malhar library so that it is easy to use them in batch use > > >> cases. > > >> > > > > > Naturally, this would be applicable for only a subset of > > >> operators > > >> > > like > > >> > > > > > File, JDBC and NoSQL databases. > > >> > > > > > For example, for a file based store, (say HDFS store), we > > could > > >> > have > > >> > > > > > FileBatchInput and FileBatchOutput operators which allow > easy > > >> > > > integration > > >> > > > > > into a batch application. These operators would be extended > > from > > >> > > their > > >> > > > > > existing implementations and would be "Batch Aware", in that > > >> they > > >> > may > > >> > > > > > understand the meaning of some specific control tuples that > > flow > > >> > > > through > > >> > > > > > the DAG. Start batch and end batch seem to be the obvious > > >> > candidates > > >> > > > that > > >> > > > > > come to mind. On receipt of such control tuples, they may > try > > to > > >> > > modify > > >> > > > > the > > >> > > > > > behavior of the operator - to reinitialize some metrics or > > >> finalize > > >> > > an > > >> > > > > > output file for example. > > >> > > > > > > > >> > > > > > We can discuss the potential control tuples and actions in > > >> detail, > > >> > > but > > >> > > > > > first I would like to understand the views of the community > > for > > >> > this > > >> > > > > > proposal. > > >> > > > > > > > >> > > > > > ~ Bhupesh > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > > > > >