--> On Thu, Feb 23, 2017 at 12:02 AM, Bhupesh Chawda <bhup...@datatorrent.com> wrote:
> Hi Thomas, > > My response inline: > > On Wed, Feb 22, 2017 at 10:17 PM, Thomas Weise <t...@apache.org> wrote: > > > Hi Bhupesh, > > > > This looks great. You use the watermark as measure of completeness and > the > > window to isolate the state, which is how it should work. > > > > Questions/comments: > > > > Why does the count operator have a 2ms window when this should be driven > by > > the watermark from the input operator? > > > > > In this example, we trigger at the Watermark. So the count (windowed) > operator accumulates the state until the watermark and then emits all the > accumulated counts. > 2 ms is not necessary, we can make it 1ms. But in this case its not the > time duration that matters. The file input operator makes sure all tuples > belonging to a file become part of the same window by making the timestamp > in those tuples same. So all tuples in first file go out with timestamp 0, > second file with timestamp 1 and so on. > > Understood all but this line: windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(2))); I wonder if there is an option to control this from the source, maybe David can take a look? > > > I don't think there should be separate "windowed" connector operators. > The > > watermark support needs to be incorporated into existing operators and > > configurable. Windowing is a concept that the entire library needs to be > > aware of. I see no reason to arrange classes in separate "window" > packages > > except those that are really specific to windowing support such as the > > watermark tuple. > > > > I think, making changes to the existing operators would make them too > heavy and complex. I suggest we extend the existing operators and have new > classes with just the logic for watermarks. This will also help bugs > resulting due to the new implementations isolated. > We can keep these in the same package as the existing operators. Just the > window specific classes (like watermarks) will go into the window package. > Watermark support needs to be part of all connectors, just like idempotency, fault tolerance, partitioning etc. Well written operators have pluggable components and strategies. Inheritance is not a way to solve this, this needs to be part of the abstract base classes. > > > > > > > Why does the control tuple have an operatorId in it? > > > > Operator id is not used in the current example, but may help the user to > understand the originating partition for a watermark tuple. This will be in > scenarios where we cannot distinguish between watermark tuples from > different partitions; unlike the file based watermarks where filename is > the distinguishing property. > Not clear why it is important which physical operator generated a watermark. I don't think anything downstream can rely on that, in general. It would help if you can provide a real example and document it? > > > > > > Once you make the changes to the operators, please also augment the > > documentation and examples (in this case wordcount demo). > > > > Sure. > > > > > Thanks, > > Thomas > > > > > > > > On Wed, Feb 22, 2017 at 4:51 AM, Bhupesh Chawda <bhup...@datatorrent.com > > > > wrote: > > > > > Hi Thomas, > > > > > > Sorry for the delay. > > > I agree that the watermark concept is general and is understood by > > > intermediate transformations. File name is some additional information > in > > > the watermark which helps the start and end operators do stuff related > to > > > batch. > > > As suggested, I have created a wordcount application which uses > > watermarks > > > to create separate windows for each file by means of a long > (timestamp). > > > I am linking the source for reference: > > > > > > Watermarks: > > > https://github.com/bhupeshchawda/apex-malhar/blob/batch-io-operators/ > > > library/src/main/java/org/apache/apex/malhar/lib/window/ > > > windowable/FileWatermark.java > > > > > > Extended File Input and Output operators: > > > https://github.com/bhupeshchawda/apex-malhar/blob/batch-io-operators/ > > > library/src/main/java/org/apache/apex/malhar/lib/window/windowable/ > > > WindowedFileInputOperator.java > > > https://github.com/bhupeshchawda/apex-malhar/blob/batch-io-operators/ > > > library/src/main/java/org/apache/apex/malhar/lib/window/windowable/ > > > WindowedFileOutputOperator.java > > > > > > > > > WordCount Application: > > > > > > https://github.com/bhupeshchawda/apex-malhar/blob/batch-io-operators/ > > > library/src/test/java/org/apache/apex/malhar/lib/window/ > > > windowable/WindowedWordCount.java > > > > > > > > > The input operator attaches a timestamp with each file which allows the > > > WindowedOperator to identify each file and its state in a distinct > > window. > > > > > > Additionally, using the additional file information, the application > can > > > store the counts in similarly named files at the destination. > > > > > > > > > Thanks. > > > > > > _______________________________________________________ > > > > > > Bhupesh Chawda > > > > > > Software Engineer > > > > > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc > > > > > > www.datatorrent.com | apex.apache.org > > > > > > > > > > > > On Sat, Feb 18, 2017 at 10:24 PM, Thomas Weise <t...@apache.org> wrote: > > > > > > > Hi Bhupesh, > > > > > > > > I think this needs a generic watermark concept that is independent of > > > > source and destination and can be understood by intermediate > > > > transformations. File names don't meet this criteria. > > > > > > > > One possible approach is to have a monotonic increasing file sequence > > > > (instead of time, if it is not applicable) that can be mapped to > > > watermark. > > > > You can still tag on the file name to the control tuple as extra > > > > information so that a file output operator that understands it can do > > > > whatever it wants with it. But it should also work without it, let's > > say > > > > when we write the output to the console. > > > > > > > > The key here is that you can demonstrate that an intermediate > stateful > > > > transformation will work. I would suggest to try wordcount per input > > file > > > > with the window operator that emits the counts at file boundary, > > without > > > > knowing anything about files. > > > > > > > > Thanks, > > > > Thomas > > > > > > > > > > > > On Sat, Feb 18, 2017 at 8:04 AM, Bhupesh Chawda < > > bhup...@datatorrent.com > > > > > > > > wrote: > > > > > > > > > Hi Thomas, > > > > > > > > > > For an input operator which is supposed to generate watermarks for > > > > > downstream operators, I can think about the following watermarks > that > > > the > > > > > operator can emit: > > > > > 1. Time based watermarks (the high watermark / low watermark) > > > > > 2. Number of tuple based watermarks (Every n tuples) > > > > > 3. File based watermarks (Start file, end file) > > > > > 4. Final watermark > > > > > > > > > > File based watermarks seem to be applicable for batch (file based) > as > > > > well, > > > > > and hence I thought of looking at these first. Does this seem to be > > in > > > > line > > > > > with the thought process? > > > > > > > > > > ~ Bhupesh > > > > > > > > > > > > > > > > > > > > _______________________________________________________ > > > > > > > > > > Bhupesh Chawda > > > > > > > > > > Software Engineer > > > > > > > > > > E: bhup...@datatorrent.com | Twitter: @bhupeshsc > > > > > > > > > > www.datatorrent.com | apex.apache.org > > > > > > > > > > > > > > > > > > > > On Thu, Feb 16, 2017 at 10:37 AM, Thomas Weise <t...@apache.org> > > wrote: > > > > > > > > > > > I don't think this should be designed based on a simplistic file > > > > > > input-output scenario. It would be good to include a stateful > > > > > > transformation based on event time. > > > > > > > > > > > > More complex pipelines contain stateful transformations that > depend > > > on > > > > > > windowing and watermarks. I think we need a watermark concept > that > > is > > > > > based > > > > > > on progress in event time (or other monotonic increasing > sequence) > > > that > > > > > > other operators can generically work with. > > > > > > > > > > > > Note that even file input in many cases can produce time based > > > > > watermarks, > > > > > > for example when you read part files that are bound by event > time. > > > > > > > > > > > > Thanks, > > > > > > Thomas > > > > > > > > > > > > > > > > > > On Wed, Feb 15, 2017 at 4:02 AM, Bhupesh Chawda < > > > > bhup...@datatorrent.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > For better understanding the use case for control tuples in > > batch, > > > I > > > > > am > > > > > > > creating a prototype for a batch application using File Input > and > > > > File > > > > > > > Output operators. > > > > > > > > > > > > > > To enable basic batch processing for File IO operators, I am > > > > proposing > > > > > > the > > > > > > > following changes to File input and output operators: > > > > > > > 1. File Input operator emits a watermark each time it opens and > > > > closes > > > > > a > > > > > > > file. These can be "start file" and "end file" watermarks which > > > > include > > > > > > the > > > > > > > corresponding file names. The "start file" tuple should be sent > > > > before > > > > > > any > > > > > > > of the data from that file flows. > > > > > > > 2. File Input operator can be configured to end the application > > > > after a > > > > > > > single or n scans of the directory (a batch). This is where the > > > > > operator > > > > > > > emits the final watermark (the end of application control > tuple). > > > > This > > > > > > will > > > > > > > also shutdown the application. > > > > > > > 3. The File output operator handles these control tuples. > "Start > > > > file" > > > > > > > initializes the file name for the incoming tuples. "End file" > > > > watermark > > > > > > > forces a finalize on that file. > > > > > > > > > > > > > > The user would be able to enable the operators to send only > those > > > > > > > watermarks that are needed in the application. If none of the > > > options > > > > > are > > > > > > > configured, the operators behave as in a streaming application. > > > > > > > > > > > > > > There are a few challenges in the implementation where the > input > > > > > operator > > > > > > > is partitioned. In this case, the correlation between the > > start/end > > > > > for a > > > > > > > file and the data tuples for that file is lost. Hence we need > to > > > > > maintain > > > > > > > the filename as part of each tuple in the pipeline. > > > > > > > > > > > > > > The "start file" and "end file" control tuples in this example > > are > > > > > > > temporary names for watermarks. We can have generic "start > > batch" / > > > > > "end > > > > > > > batch" tuples which could be used for other use cases as well. > > The > > > > > Final > > > > > > > watermark is common and serves the same purpose in each case. > > > > > > > > > > > > > > Please let me know your thoughts on this. > > > > > > > > > > > > > > ~ Bhupesh > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 18, 2017 at 12:22 AM, Bhupesh Chawda < > > > > > > bhup...@datatorrent.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Yes, this can be part of operator configuration. Given this, > > for > > > a > > > > > user > > > > > > > to > > > > > > > > define a batch application, would mean configuring the > > connectors > > > > > > (mostly > > > > > > > > the input operator) in the application for the desired > > behavior. > > > > > > > Similarly, > > > > > > > > there can be other use cases that can be achieved other than > > > batch. > > > > > > > > > > > > > > > > We may also need to take care of the following: > > > > > > > > 1. Make sure that the watermarks or control tuples are > > consistent > > > > > > across > > > > > > > > sources. Meaning an HDFS sink should be able to interpret the > > > > > watermark > > > > > > > > tuple sent out by, say, a JDBC source. > > > > > > > > 2. In addition to I/O connectors, we should also look at the > > need > > > > for > > > > > > > > processing operators to understand some of the control > tuples / > > > > > > > watermarks. > > > > > > > > For example, we may want to reset the operator behavior on > > > arrival > > > > of > > > > > > > some > > > > > > > > watermark tuple. > > > > > > > > > > > > > > > > ~ Bhupesh > > > > > > > > > > > > > > > > On Tue, Jan 17, 2017 at 9:59 PM, Thomas Weise < > t...@apache.org> > > > > > wrote: > > > > > > > > > > > > > > > >> The HDFS source can operate in two modes, bounded or > > unbounded. > > > If > > > > > you > > > > > > > >> scan > > > > > > > >> only once, then it should emit the final watermark after it > is > > > > done. > > > > > > > >> Otherwise it would emit watermarks based on a policy (files > > > names > > > > > > etc.). > > > > > > > >> The mechanism to generate the marks may depend on the type > of > > > > source > > > > > > and > > > > > > > >> the user needs to be able to influence/configure it. > > > > > > > >> > > > > > > > >> Thomas > > > > > > > >> > > > > > > > >> > > > > > > > >> On Tue, Jan 17, 2017 at 5:03 AM, Bhupesh Chawda < > > > > > > > bhup...@datatorrent.com> > > > > > > > >> wrote: > > > > > > > >> > > > > > > > >> > Hi Thomas, > > > > > > > >> > > > > > > > > >> > I am not sure that I completely understand your > suggestion. > > > Are > > > > > you > > > > > > > >> > suggesting to broaden the scope of the proposal to treat > all > > > > > sources > > > > > > > as > > > > > > > >> > bounded as well as unbounded? > > > > > > > >> > > > > > > > > >> > In case of Apex, we treat all sources as unbounded > sources. > > > Even > > > > > > > bounded > > > > > > > >> > sources like HDFS file source is treated as unbounded by > > means > > > > of > > > > > > > >> scanning > > > > > > > >> > the input directory repeatedly. > > > > > > > >> > > > > > > > > >> > Let's consider HDFS file source for example: > > > > > > > >> > In this case, if we treat it as a bounded source, we can > > > define > > > > > > hooks > > > > > > > >> which > > > > > > > >> > allows us to detect the end of the file and send the > "final > > > > > > > watermark". > > > > > > > >> We > > > > > > > >> > could also consider HDFS file source as a streaming source > > and > > > > > > define > > > > > > > >> hooks > > > > > > > >> > which send watermarks based on different kinds of windows. > > > > > > > >> > > > > > > > > >> > Please correct me if I misunderstand. > > > > > > > >> > > > > > > > > >> > ~ Bhupesh > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > On Mon, Jan 16, 2017 at 9:23 PM, Thomas Weise < > > t...@apache.org > > > > > > > > > > wrote: > > > > > > > >> > > > > > > > > >> > > Bhupesh, > > > > > > > >> > > > > > > > > > >> > > Please see how that can be solved in a unified way using > > > > windows > > > > > > and > > > > > > > >> > > watermarks. It is bounded data vs. unbounded data. In > Beam > > > for > > > > > > > >> example, > > > > > > > >> > you > > > > > > > >> > > can use the "global window" and the final watermark to > > > > > accomplish > > > > > > > what > > > > > > > >> > you > > > > > > > >> > > are looking for. Batch is just a special case of > streaming > > > > where > > > > > > the > > > > > > > >> > source > > > > > > > >> > > emits the final watermark. > > > > > > > >> > > > > > > > > > >> > > Thanks, > > > > > > > >> > > Thomas > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > On Mon, Jan 16, 2017 at 1:02 AM, Bhupesh Chawda < > > > > > > > >> bhup...@datatorrent.com > > > > > > > >> > > > > > > > > > >> > > wrote: > > > > > > > >> > > > > > > > > > >> > > > Yes, if the user needs to develop a batch application, > > > then > > > > > > batch > > > > > > > >> aware > > > > > > > >> > > > operators need to be used in the application. > > > > > > > >> > > > The nature of the application is mostly controlled by > > the > > > > > input > > > > > > > and > > > > > > > >> the > > > > > > > >> > > > output operators used in the application. > > > > > > > >> > > > > > > > > > > >> > > > For example, consider an application which needs to > > filter > > > > > > records > > > > > > > >> in a > > > > > > > >> > > > input file and store the filtered records in another > > file. > > > > The > > > > > > > >> nature > > > > > > > >> > of > > > > > > > >> > > > this app is to end once the entire file is processed. > > > > > Following > > > > > > > >> things > > > > > > > >> > > are > > > > > > > >> > > > expected of the application: > > > > > > > >> > > > > > > > > > > >> > > > 1. Once the input data is over, finalize the output > > > file > > > > > from > > > > > > > >> .tmp > > > > > > > >> > > > files. - Responsibility of output operator > > > > > > > >> > > > 2. End the application, once the data is read and > > > > > processed - > > > > > > > >> > > > Responsibility of input operator > > > > > > > >> > > > > > > > > > > >> > > > These functions are essential to allow the user to do > > > higher > > > > > > level > > > > > > > >> > > > operations like scheduling or running a workflow of > > batch > > > > > > > >> applications. > > > > > > > >> > > > > > > > > > > >> > > > I am not sure about intermediate (processing) > operators, > > > as > > > > > > there > > > > > > > >> is no > > > > > > > >> > > > change in their functionality for batch use cases. > > > Perhaps, > > > > > > > allowing > > > > > > > >> > > > running multiple batches in a single application may > > > require > > > > > > > similar > > > > > > > >> > > > changes in processing operators as well. > > > > > > > >> > > > > > > > > > > >> > > > ~ Bhupesh > > > > > > > >> > > > > > > > > > > >> > > > On Mon, Jan 16, 2017 at 2:19 PM, Priyanka Gugale < > > > > > > > pri...@apache.org > > > > > > > >> > > > > > > > > >> > > > wrote: > > > > > > > >> > > > > > > > > > > >> > > > > Will it make an impression on user that, if he has a > > > batch > > > > > > > >> usecase he > > > > > > > >> > > has > > > > > > > >> > > > > to use batch aware operators only? If so, is that > what > > > we > > > > > > > expect? > > > > > > > >> I > > > > > > > >> > am > > > > > > > >> > > > not > > > > > > > >> > > > > aware of how do we implement batch scenario so this > > > might > > > > > be a > > > > > > > >> basic > > > > > > > >> > > > > question. > > > > > > > >> > > > > > > > > > > > >> > > > > -Priyanka > > > > > > > >> > > > > > > > > > > > >> > > > > On Mon, Jan 16, 2017 at 12:02 PM, Bhupesh Chawda < > > > > > > > >> > > > bhup...@datatorrent.com> > > > > > > > >> > > > > wrote: > > > > > > > >> > > > > > > > > > > > >> > > > > > Hi All, > > > > > > > >> > > > > > > > > > > > > >> > > > > > While design / implementation for custom control > > > tuples > > > > is > > > > > > > >> > ongoing, I > > > > > > > >> > > > > > thought it would be a good idea to consider its > > > > usefulness > > > > > > in > > > > > > > >> one > > > > > > > >> > of > > > > > > > >> > > > the > > > > > > > >> > > > > > use cases - batch applications. > > > > > > > >> > > > > > > > > > > > > >> > > > > > This is a proposal to adapt / extend existing > > > operators > > > > in > > > > > > the > > > > > > > >> > Apache > > > > > > > >> > > > > Apex > > > > > > > >> > > > > > Malhar library so that it is easy to use them in > > batch > > > > use > > > > > > > >> cases. > > > > > > > >> > > > > > Naturally, this would be applicable for only a > > subset > > > of > > > > > > > >> operators > > > > > > > >> > > like > > > > > > > >> > > > > > File, JDBC and NoSQL databases. > > > > > > > >> > > > > > For example, for a file based store, (say HDFS > > store), > > > > we > > > > > > > could > > > > > > > >> > have > > > > > > > >> > > > > > FileBatchInput and FileBatchOutput operators which > > > allow > > > > > > easy > > > > > > > >> > > > integration > > > > > > > >> > > > > > into a batch application. These operators would be > > > > > extended > > > > > > > from > > > > > > > >> > > their > > > > > > > >> > > > > > existing implementations and would be "Batch > Aware", > > > in > > > > > that > > > > > > > >> they > > > > > > > >> > may > > > > > > > >> > > > > > understand the meaning of some specific control > > tuples > > > > > that > > > > > > > flow > > > > > > > >> > > > through > > > > > > > >> > > > > > the DAG. Start batch and end batch seem to be the > > > > obvious > > > > > > > >> > candidates > > > > > > > >> > > > that > > > > > > > >> > > > > > come to mind. On receipt of such control tuples, > > they > > > > may > > > > > > try > > > > > > > to > > > > > > > >> > > modify > > > > > > > >> > > > > the > > > > > > > >> > > > > > behavior of the operator - to reinitialize some > > > metrics > > > > or > > > > > > > >> finalize > > > > > > > >> > > an > > > > > > > >> > > > > > output file for example. > > > > > > > >> > > > > > > > > > > > > >> > > > > > We can discuss the potential control tuples and > > > actions > > > > in > > > > > > > >> detail, > > > > > > > >> > > but > > > > > > > >> > > > > > first I would like to understand the views of the > > > > > community > > > > > > > for > > > > > > > >> > this > > > > > > > >> > > > > > proposal. > > > > > > > >> > > > > > > > > > > > > >> > > > > > ~ Bhupesh > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >