Hi All, I have created an initial Deduper implementation based on Managed State and opened a PR to get feedback from the community. This is based on the initial PR by @chandnisingh.
Please help review this PR: https://github.com/apache/apex-malhar/pull/335 Note that this is based entirely on the support provided by Managed State. If this is acceptable to the community, then I can proceed to other additive support like the partitioning ability and fault tolerance support etc. Any feedback is appreciated. Thanks. ~ Bhupesh On Thu, Jul 7, 2016 at 11:18 AM, Bhupesh Chawda <[email protected]> wrote: > Hi David, > > Thanks for your reply. > > If I am to use a windowed operator for the Dedup operator, there should be > some operator (upstream to Deduper) which sends the watermark tuples. These > tuples (along with allowed lateness), will be the ones deciding which > incoming tuples are too late and will be dropped. I have the following > questions: > > Is a windowed operator (which needs watermarks) dependent upon some other > operator for these tuples? What happens when there are no watermark tuples > sent from upstream? > > Can a windowed operator "*assume*" the watermark tuples based on some > notion of time? For example, can the Deduper, use the streaming window time > as the reference to advance the watermark? > > Thanks. > > ~ Bhupesh > > On Thu, Jul 7, 2016 at 4:07 AM, David Yan <[email protected]> wrote: > >> Hi Bhupesh, >> >> FYI, there is a JIRA open for a scalable implementation of WindowedStorage >> and WindowedKeyedStorage: >> >> https://issues.apache.org/jira/browse/APEXMALHAR-2130 >> >> We expect either to use ManagedState directly, or Spillable structures, >> which in turn uses ManagedState. >> >> I'm not very familiar with the dedup operator. but in order to use the >> WindowedOperator, it sounds to me that we can use SlidingWindows with an >> implementation of WindowedKeyedStorage that uses a Bloom filter to cover >> most of the false cases. >> >> David >> >> On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <[email protected]> >> wrote: >> >> > Hi All, >> > >> > I have looked into Windowing concepts from Apache Beam and the PR #319 >> by >> > David. Looks like there are a lot of advanced concepts which could be >> used >> > by operators using event time windowing. >> > Additionally I also looked at the Managed State implementation. >> > >> > One of the things I noticed is that there is an overlap of functionality >> > between Managed State and Windowing Support in terms of the following: >> > >> > - *Discarding / Dropping of tuples* from the system - Managed State >> uses >> > the concept of expiry while a Windowed operator uses the concepts of >> > Watermarks and allowed lateness. If I try to reconcile the above >> two, it >> > seems like Managed State (through TimeBucketAssigner) is trying to >> > implement some sort of implicit heuristic Watermarks based on either >> the >> > user supplied time or the event time. >> > - *Global Window* support - Once we have an option to disable >> purging in >> > Managed State, it will have similar semantics to the Global Window >> > option >> > in Windowing support. >> > >> > If I understand correctly, is the suggestion to implement the Dedup >> > operator as a Windowed operator and to use managed state only as a >> storage >> > medium (through WindowedStorage) ? What could be a better way of going >> > about this? >> > >> > Thanks. >> > >> > ~ Bhupesh >> > >> > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <[email protected]> >> > wrote: >> > >> > > Hi Thomas, >> > > >> > > I agree that the case of processing bounded data is a special case of >> > > unbounded data. >> > > Th difference I was pointing out was in terms of expiry. This is not >> > > applicable in case of bounded data sets, while unbounded data sets >> will >> > > inherently use expiry for limiting the amount of data to be stored. >> > > >> > > For idempotency when applying expiry on the streaming data, I need to >> > > explore more on the using the window timestamp that you proposed as >> > opposed >> > > to the system time which I was planning to use. >> > > >> > > Thanks. >> > > ~ Bhupesh >> > > >> > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <[email protected] >> > >> > > wrote: >> > > >> > >> Bhupesh, >> > >> >> > >> Why is there a distinction between bounded and unbounded data? I see >> the >> > >> former as a special case of the latter? >> > >> >> > >> When rewinding the stream or reprocessing the stream in another run >> the >> > >> operator should produce the same result. >> > >> >> > >> This operator should be idempotent also. That implies that code does >> not >> > >> rely on current system time but the window timestamp instead. >> > >> >> > >> All of this should be accomplished by using the windowing support: >> > >> https://github.com/apache/apex-malhar/pull/319 >> > >> >> > >> Thanks, >> > >> Thomas >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> >> > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda < >> > [email protected]> >> > >> wrote: >> > >> >> > >> > Hi All, >> > >> > >> > >> > I want to validate the use cases for de-duplication that will be >> going >> > >> as >> > >> > part of this implementation. >> > >> > >> > >> > - *Bounded data set* >> > >> > - This is de-duplication for bounded data. For example, data >> > sets >> > >> > which are old or fixed or which may not have a time field at >> > >> > all. Example: >> > >> > Last year's transaction records or Customer data etc. >> > >> > - Concept of expiry is not needed as this is bounded data >> set. >> > >> > - *Unbounded data set* >> > >> > - This is de-duplication of online streaming data >> > >> > - Expiry is needed because here incoming tuples may arrive >> later >> > >> than >> > >> > what they are expected. Expiry is always computed by taking >> the >> > >> > difference >> > >> > in System time and the Event time. >> > >> > >> > >> > Any feedback is appreciated. >> > >> > >> > >> > Thanks. >> > >> > >> > >> > ~ Bhupesh >> > >> > >> > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda < >> > >> [email protected]> >> > >> > wrote: >> > >> > >> > >> > > Hi All, >> > >> > > >> > >> > > I am working on adding a De-duplication operator in Malhar >> library >> > >> based >> > >> > > on managed state APIs. I will be working off the already created >> > JIRA >> > >> - >> > >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701 and the >> > initial >> > >> > > pull request for an AbstractDeduper here: >> > >> > > https://github.com/apache/apex-malhar/pull/260/files >> > >> > > >> > >> > > I am planning to include the following features in the first >> > version: >> > >> > > 1. Time based de-duplication. Assumption: Tuple_Key -> Tuple_Time >> > >> > > correlation holds. >> > >> > > 2. Option to maintain order of incoming tuples. >> > >> > > 3. Duplicate and Expired ports to emit duplicate and expired >> tuples >> > >> > > respectively. >> > >> > > >> > >> > > Thanks. >> > >> > > >> > >> > > ~ Bhupesh >> > >> > > >> > >> > >> > >> >> > > >> > > >> > >> > >
