Thanks David. I'll try to create an implementation for Deduper which uses WindowedOperator. Will open a PR soon for review.
~ Bhupesh On Fri, Jul 8, 2016 at 2:23 AM, David Yan <[email protected]> wrote: > Hi Bhupesh, > > I just added the method setFixedLateness(long millis) to > AbstractWindowedOperator in my PR. This will allow you to specify the > lateness with respect to the timestamp from the window ID without watermark > tuples from upstream. > > David > > On Thu, Jul 7, 2016 at 11:49 AM, David Yan <[email protected]> wrote: > > > Hi Bhupesh, > > > > Yes, the windowed operator currently depends on the watermark tuples > > upstream for any "lateness" related operation. If there is no watermark, > > nothing will be considered late. We can add support for lateness handling > > without incoming watermark tuples. Let me add that to the pull request. > > > > David > > > > > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <[email protected]> > > wrote: > > > >> Hi David, > >> > >> Thanks for your reply. > >> > >> If I am to use a windowed operator for the Dedup operator, there should > be > >> some operator (upstream to Deduper) which sends the watermark tuples. > >> These > >> tuples (along with allowed lateness), will be the ones deciding which > >> incoming tuples are too late and will be dropped. I have the following > >> questions: > >> > >> Is a windowed operator (which needs watermarks) dependent upon some > other > >> operator for these tuples? What happens when there are no watermark > tuples > >> sent from upstream? > >> > >> Can a windowed operator "*assume*" the watermark tuples based on some > >> notion of time? For example, can the Deduper, use the streaming window > >> time > >> as the reference to advance the watermark? > >> > >> Thanks. > >> > >> ~ Bhupesh > >> > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <[email protected]> > wrote: > >> > >> > Hi Bhupesh, > >> > > >> > FYI, there is a JIRA open for a scalable implementation of > >> WindowedStorage > >> > and WindowedKeyedStorage: > >> > > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130 > >> > > >> > We expect either to use ManagedState directly, or Spillable > structures, > >> > which in turn uses ManagedState. > >> > > >> > I'm not very familiar with the dedup operator. but in order to use the > >> > WindowedOperator, it sounds to me that we can use SlidingWindows with > an > >> > implementation of WindowedKeyedStorage that uses a Bloom filter to > cover > >> > most of the false cases. > >> > > >> > David > >> > > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <[email protected]> > >> wrote: > >> > > >> > > Hi All, > >> > > > >> > > I have looked into Windowing concepts from Apache Beam and the PR > >> #319 by > >> > > David. Looks like there are a lot of advanced concepts which could > be > >> > used > >> > > by operators using event time windowing. > >> > > Additionally I also looked at the Managed State implementation. > >> > > > >> > > One of the things I noticed is that there is an overlap of > >> functionality > >> > > between Managed State and Windowing Support in terms of the > following: > >> > > > >> > > - *Discarding / Dropping of tuples* from the system - Managed > State > >> > uses > >> > > the concept of expiry while a Windowed operator uses the concepts > >> of > >> > > Watermarks and allowed lateness. If I try to reconcile the above > >> two, > >> > it > >> > > seems like Managed State (through TimeBucketAssigner) is trying > to > >> > > implement some sort of implicit heuristic Watermarks based on > >> either > >> > the > >> > > user supplied time or the event time. > >> > > - *Global Window* support - Once we have an option to disable > >> purging > >> > in > >> > > Managed State, it will have similar semantics to the Global > Window > >> > > option > >> > > in Windowing support. > >> > > > >> > > If I understand correctly, is the suggestion to implement the Dedup > >> > > operator as a Windowed operator and to use managed state only as a > >> > storage > >> > > medium (through WindowedStorage) ? What could be a better way of > going > >> > > about this? > >> > > > >> > > Thanks. > >> > > > >> > > ~ Bhupesh > >> > > > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda < > [email protected]> > >> > > wrote: > >> > > > >> > > > Hi Thomas, > >> > > > > >> > > > I agree that the case of processing bounded data is a special case > >> of > >> > > > unbounded data. > >> > > > Th difference I was pointing out was in terms of expiry. This is > not > >> > > > applicable in case of bounded data sets, while unbounded data sets > >> will > >> > > > inherently use expiry for limiting the amount of data to be > stored. > >> > > > > >> > > > For idempotency when applying expiry on the streaming data, I need > >> to > >> > > > explore more on the using the window timestamp that you proposed > as > >> > > opposed > >> > > > to the system time which I was planning to use. > >> > > > > >> > > > Thanks. > >> > > > ~ Bhupesh > >> > > > > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise < > >> [email protected]> > >> > > > wrote: > >> > > > > >> > > >> Bhupesh, > >> > > >> > >> > > >> Why is there a distinction between bounded and unbounded data? I > >> see > >> > the > >> > > >> former as a special case of the latter? > >> > > >> > >> > > >> When rewinding the stream or reprocessing the stream in another > run > >> > the > >> > > >> operator should produce the same result. > >> > > >> > >> > > >> This operator should be idempotent also. That implies that code > >> does > >> > not > >> > > >> rely on current system time but the window timestamp instead. > >> > > >> > >> > > >> All of this should be accomplished by using the windowing > support: > >> > > >> https://github.com/apache/apex-malhar/pull/319 > >> > > >> > >> > > >> Thanks, > >> > > >> Thomas > >> > > >> > >> > > >> > >> > > >> > >> > > >> > >> > > >> > >> > > >> > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda < > >> > > [email protected]> > >> > > >> wrote: > >> > > >> > >> > > >> > Hi All, > >> > > >> > > >> > > >> > I want to validate the use cases for de-duplication that will > be > >> > going > >> > > >> as > >> > > >> > part of this implementation. > >> > > >> > > >> > > >> > - *Bounded data set* > >> > > >> > - This is de-duplication for bounded data. For example, > >> data > >> > > sets > >> > > >> > which are old or fixed or which may not have a time field > >> at > >> > > >> > all. Example: > >> > > >> > Last year's transaction records or Customer data etc. > >> > > >> > - Concept of expiry is not needed as this is bounded data > >> set. > >> > > >> > - *Unbounded data set* > >> > > >> > - This is de-duplication of online streaming data > >> > > >> > - Expiry is needed because here incoming tuples may > arrive > >> > later > >> > > >> than > >> > > >> > what they are expected. Expiry is always computed by > taking > >> > the > >> > > >> > difference > >> > > >> > in System time and the Event time. > >> > > >> > > >> > > >> > Any feedback is appreciated. > >> > > >> > > >> > > >> > Thanks. > >> > > >> > > >> > > >> > ~ Bhupesh > >> > > >> > > >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda < > >> > > >> [email protected]> > >> > > >> > wrote: > >> > > >> > > >> > > >> > > Hi All, > >> > > >> > > > >> > > >> > > I am working on adding a De-duplication operator in Malhar > >> library > >> > > >> based > >> > > >> > > on managed state APIs. I will be working off the already > >> created > >> > > JIRA > >> > > >> - > >> > > >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701 and > the > >> > > initial > >> > > >> > > pull request for an AbstractDeduper here: > >> > > >> > > https://github.com/apache/apex-malhar/pull/260/files > >> > > >> > > > >> > > >> > > I am planning to include the following features in the first > >> > > version: > >> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key -> > >> Tuple_Time > >> > > >> > > correlation holds. > >> > > >> > > 2. Option to maintain order of incoming tuples. > >> > > >> > > 3. Duplicate and Expired ports to emit duplicate and expired > >> > tuples > >> > > >> > > respectively. > >> > > >> > > > >> > > >> > > Thanks. > >> > > >> > > > >> > > >> > > ~ Bhupesh > >> > > >> > > > >> > > >> > > >> > > >> > >> > > > > >> > > > > >> > > > >> > > >> > > > > >
