Hi All,

I also implemented a De-duplication operator using Windowed Operator. Now
we have two implementations, one with Managed state and another using
Windowed operator. Here are their details:

   1. *With Managed State - *
   - The operator is implemented using managed state as the storage for
      buckets into which the tuples will be stored.
      - *TimeBucketAssigner* is used to assign an incoming tuple to
      different buckets based on the event time. It is also used to identify
      whether a particular tuple is expired and should be sent to the expired
      port / dropped.
      - For managed state, the *ManagedTimeUnifiedStateImpl* implementation
      is used which just requires the user to specify the event time
and a bucket
      is automatically assigned based on that. The structure of the bucket data
      on storage is as follows: /operator_id /time_bucket
      - An advantage of using Managed State approach is that we don't have
      to assume the correlation of event time to the de-duplication key of the
      tuple. For example, if we get two tuples like: (K1, T1), and (K1, T2), we
      can still use ManagedStateImpl and conclude that these tuples are
      duplicates based on the Key K1.
      2. *With Windowed Operator - *
   - The operator uses the WindowedOperatorImpl as the base operator.
      - Accumulation, for the deduper, basically amounts to storing a list
      of tuples in the data storage. Every time we get a unique tuple, we
      *accumulate* it in the list.
      - Event windows are modeled using the *TimeWindow* option. Although
      SlidingTimeWIndows seems to be intuitive for data buckets, it seems to be
      the costly option as the accumulation in this case is not just
an aggregate
      value but a list of values in that bucket.
      - Watermarks are not assumed to be sent from an input operator
      (although it is okay if an upstream operator sends them). The
      *fixedWatermark* feature is used to assume watermarks which are
      relative to the window time.
      - One of the issues I found with using WindowedOperator for Dedup is
      that event time is tightly coupled with the de-duplication key. In the
      above example, (K1, T1), and (K1, T2) *might* be concluded as two
      unique tuples since T1 and T2 may fall into two different time buckets.

Here are the PRs for both of them.

   - Using Managed State: https://github.com/apache/apex-malhar/pull/335
   - Using Windowed Operator: https://github.com/apache/apex-malhar/pull/343

Please review them and suggest on the correct approach for the final
implementation which should be used to add other features like fault
tolerance, scalability, optimizations etc.
Thanks.

~ Bhupesh

On Fri, Jul 8, 2016 at 11:30 PM, David Yan <[email protected]> wrote:

> No problem.
>
> By the way, I changed the method name to setFixedWatermark. And also, if
> you want to drop any tuples that are considered late, you need to set the
> allowed lateness to be 0.
>
> David
>
> On Fri, Jul 8, 2016 at 4:55 AM, Bhupesh Chawda <[email protected]> wrote:
>
> > Thanks David.
> > I'll try to create an implementation for Deduper which uses
> > WindowedOperator. Will open a PR soon for review.
> >
> > ~ Bhupesh
> >
> > On Fri, Jul 8, 2016 at 2:23 AM, David Yan <[email protected]> wrote:
> >
> > > Hi Bhupesh,
> > >
> > > I just added the method setFixedLateness(long millis) to
> > > AbstractWindowedOperator in my PR. This will allow you to specify the
> > > lateness with respect to the timestamp from the window ID without
> > watermark
> > > tuples from upstream.
> > >
> > > David
> > >
> > > On Thu, Jul 7, 2016 at 11:49 AM, David Yan <[email protected]>
> > wrote:
> > >
> > > > Hi Bhupesh,
> > > >
> > > > Yes, the windowed operator currently depends on the watermark tuples
> > > > upstream for any "lateness" related operation. If there is no
> > watermark,
> > > > nothing will be considered late. We can add support for lateness
> > handling
> > > > without incoming watermark tuples. Let me add that to the pull
> request.
> > > >
> > > > David
> > > >
> > > >
> > > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <[email protected]>
> > > > wrote:
> > > >
> > > >> Hi David,
> > > >>
> > > >> Thanks for your reply.
> > > >>
> > > >> If I am to use a windowed operator for the Dedup operator, there
> > should
> > > be
> > > >> some operator (upstream to Deduper) which sends the watermark
> tuples.
> > > >> These
> > > >> tuples (along with allowed lateness), will be the ones deciding
> which
> > > >> incoming tuples are too late and will be dropped. I have the
> following
> > > >> questions:
> > > >>
> > > >> Is a windowed operator (which needs watermarks) dependent upon some
> > > other
> > > >> operator for these tuples? What happens when there are no watermark
> > > tuples
> > > >> sent from upstream?
> > > >>
> > > >> Can a windowed operator "*assume*" the watermark tuples based on
> some
> > > >> notion of time? For example, can the Deduper, use the streaming
> window
> > > >> time
> > > >> as the reference to advance the watermark?
> > > >>
> > > >> Thanks.
> > > >>
> > > >> ~ Bhupesh
> > > >>
> > > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <[email protected]>
> > > wrote:
> > > >>
> > > >> > Hi Bhupesh,
> > > >> >
> > > >> > FYI, there is a JIRA open for a scalable implementation of
> > > >> WindowedStorage
> > > >> > and WindowedKeyedStorage:
> > > >> >
> > > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
> > > >> >
> > > >> > We expect either to use ManagedState directly, or Spillable
> > > structures,
> > > >> > which in turn uses ManagedState.
> > > >> >
> > > >> > I'm not very familiar with the dedup operator. but in order to use
> > the
> > > >> > WindowedOperator, it sounds to me that we can use SlidingWindows
> > with
> > > an
> > > >> > implementation of WindowedKeyedStorage that uses a Bloom filter to
> > > cover
> > > >> > most of the false cases.
> > > >> >
> > > >> > David
> > > >> >
> > > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <
> [email protected]>
> > > >> wrote:
> > > >> >
> > > >> > > Hi All,
> > > >> > >
> > > >> > > I have looked into Windowing concepts from Apache Beam and the
> PR
> > > >> #319 by
> > > >> > > David. Looks like there are a lot of advanced concepts which
> could
> > > be
> > > >> > used
> > > >> > > by operators using event time windowing.
> > > >> > > Additionally I also looked at the Managed State implementation.
> > > >> > >
> > > >> > > One of the things I noticed is that there is an overlap of
> > > >> functionality
> > > >> > > between Managed State and Windowing Support in terms of the
> > > following:
> > > >> > >
> > > >> > >    - *Discarding / Dropping of tuples* from the system - Managed
> > > State
> > > >> > uses
> > > >> > >    the concept of expiry while a Windowed operator uses the
> > concepts
> > > >> of
> > > >> > >    Watermarks and allowed lateness. If I try to reconcile the
> > above
> > > >> two,
> > > >> > it
> > > >> > >    seems like Managed State (through TimeBucketAssigner) is
> trying
> > > to
> > > >> > >    implement some sort of implicit heuristic Watermarks based on
> > > >> either
> > > >> > the
> > > >> > >    user supplied time or the event time.
> > > >> > >    - *Global Window* support - Once we have an option to disable
> > > >> purging
> > > >> > in
> > > >> > >    Managed State, it will have similar semantics to the Global
> > > Window
> > > >> > > option
> > > >> > >    in Windowing support.
> > > >> > >
> > > >> > > If I understand correctly, is the suggestion to implement the
> > Dedup
> > > >> > > operator as a Windowed operator and to use managed state only
> as a
> > > >> > storage
> > > >> > > medium (through WindowedStorage) ? What could be a better way of
> > > going
> > > >> > > about this?
> > > >> > >
> > > >> > > Thanks.
> > > >> > >
> > > >> > > ~ Bhupesh
> > > >> > >
> > > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <
> > > [email protected]>
> > > >> > > wrote:
> > > >> > >
> > > >> > > > Hi Thomas,
> > > >> > > >
> > > >> > > > I agree that the case of processing bounded data is a special
> > case
> > > >> of
> > > >> > > > unbounded data.
> > > >> > > > Th difference I was pointing out was in terms of expiry. This
> is
> > > not
> > > >> > > > applicable in case of bounded data sets, while unbounded data
> > sets
> > > >> will
> > > >> > > > inherently use expiry for limiting the amount of data to be
> > > stored.
> > > >> > > >
> > > >> > > > For idempotency when applying expiry on the streaming data, I
> > need
> > > >> to
> > > >> > > > explore more on the using the window timestamp that you
> proposed
> > > as
> > > >> > > opposed
> > > >> > > > to the system time which I was planning to use.
> > > >> > > >
> > > >> > > > Thanks.
> > > >> > > > ~ Bhupesh
> > > >> > > >
> > > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <
> > > >> [email protected]>
> > > >> > > > wrote:
> > > >> > > >
> > > >> > > >> Bhupesh,
> > > >> > > >>
> > > >> > > >> Why is there a distinction between bounded and unbounded
> data?
> > I
> > > >> see
> > > >> > the
> > > >> > > >> former as a special case of the latter?
> > > >> > > >>
> > > >> > > >> When rewinding the stream or reprocessing the stream in
> another
> > > run
> > > >> > the
> > > >> > > >> operator should produce the same result.
> > > >> > > >>
> > > >> > > >> This operator should be idempotent also. That implies that
> code
> > > >> does
> > > >> > not
> > > >> > > >> rely on current system time but the window timestamp instead.
> > > >> > > >>
> > > >> > > >> All of this should be accomplished by using the windowing
> > > support:
> > > >> > > >> https://github.com/apache/apex-malhar/pull/319
> > > >> > > >>
> > > >> > > >> Thanks,
> > > >> > > >> Thomas
> > > >> > > >>
> > > >> > > >>
> > > >> > > >>
> > > >> > > >>
> > > >> > > >>
> > > >> > > >>
> > > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> > > >> > > [email protected]>
> > > >> > > >> wrote:
> > > >> > > >>
> > > >> > > >> > Hi All,
> > > >> > > >> >
> > > >> > > >> > I want to validate the use cases for de-duplication that
> will
> > > be
> > > >> > going
> > > >> > > >> as
> > > >> > > >> > part of this implementation.
> > > >> > > >> >
> > > >> > > >> >    - *Bounded data set*
> > > >> > > >> >       - This is de-duplication for bounded data. For
> example,
> > > >> data
> > > >> > > sets
> > > >> > > >> >       which are old or fixed or which may not have a time
> > field
> > > >> at
> > > >> > > >> > all. Example:
> > > >> > > >> >       Last year's transaction records or Customer data etc.
> > > >> > > >> >       - Concept of expiry is not needed as this is bounded
> > data
> > > >> set.
> > > >> > > >> >       - *Unbounded data set*
> > > >> > > >> >       - This is de-duplication of online streaming data
> > > >> > > >> >       - Expiry is needed because here incoming tuples may
> > > arrive
> > > >> > later
> > > >> > > >> than
> > > >> > > >> >       what they are expected. Expiry is always computed by
> > > taking
> > > >> > the
> > > >> > > >> > difference
> > > >> > > >> >       in System time and the Event time.
> > > >> > > >> >
> > > >> > > >> > Any feedback is appreciated.
> > > >> > > >> >
> > > >> > > >> > Thanks.
> > > >> > > >> >
> > > >> > > >> > ~ Bhupesh
> > > >> > > >> >
> > > >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> > > >> > > >> [email protected]>
> > > >> > > >> > wrote:
> > > >> > > >> >
> > > >> > > >> > > Hi All,
> > > >> > > >> > >
> > > >> > > >> > > I am working on adding a De-duplication operator in
> Malhar
> > > >> library
> > > >> > > >> based
> > > >> > > >> > > on managed state APIs. I will be working off the already
> > > >> created
> > > >> > > JIRA
> > > >> > > >> -
> > > >> > > >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701
> and
> > > the
> > > >> > > initial
> > > >> > > >> > > pull request for an AbstractDeduper here:
> > > >> > > >> > > https://github.com/apache/apex-malhar/pull/260/files
> > > >> > > >> > >
> > > >> > > >> > > I am planning to include the following features in the
> > first
> > > >> > > version:
> > > >> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key ->
> > > >> Tuple_Time
> > > >> > > >> > > correlation holds.
> > > >> > > >> > > 2. Option to maintain order of incoming tuples.
> > > >> > > >> > > 3. Duplicate and Expired ports to emit duplicate and
> > expired
> > > >> > tuples
> > > >> > > >> > > respectively.
> > > >> > > >> > >
> > > >> > > >> > > Thanks.
> > > >> > > >> > >
> > > >> > > >> > > ~ Bhupesh
> > > >> > > >> > >
> > > >> > > >> >
> > > >> > > >>
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Reply via email to