Hi All,

I have created an initial Deduper implementation based on Managed State and
opened a PR to get feedback from the community. This is based on the
initial PR by @chandnisingh.

Please help review this PR: https://github.com/apache/apex-malhar/pull/335

Note that this is based entirely on the support provided by Managed State.
If this is acceptable to the community, then I can proceed to other
additive support like the partitioning ability and fault tolerance support
etc.

Any feedback is appreciated.

Thanks.
~ Bhupesh

On Thu, Jul 7, 2016 at 11:18 AM, Bhupesh Chawda <[email protected]> wrote:

> Hi David,
>
> Thanks for your reply.
>
> If I am to use a windowed operator for the Dedup operator, there should be
> some operator (upstream to Deduper) which sends the watermark tuples. These
> tuples (along with allowed lateness), will be the ones deciding which
> incoming tuples are too late and will be dropped. I have the following
> questions:
>
> Is a windowed operator (which needs watermarks) dependent upon some other
> operator for these tuples? What happens when there are no watermark tuples
> sent from upstream?
>
> Can a windowed operator "*assume*" the watermark tuples based on some
> notion of time? For example, can the Deduper, use the streaming window time
> as the reference to advance the watermark?
>
> Thanks.
>
> ~ Bhupesh
>
> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <[email protected]> wrote:
>
>> Hi Bhupesh,
>>
>> FYI, there is a JIRA open for a scalable implementation of WindowedStorage
>> and WindowedKeyedStorage:
>>
>> https://issues.apache.org/jira/browse/APEXMALHAR-2130
>>
>> We expect either to use ManagedState directly, or Spillable structures,
>> which in turn uses ManagedState.
>>
>> I'm not very familiar with the dedup operator. but in order to use the
>> WindowedOperator, it sounds to me that we can use SlidingWindows with an
>> implementation of WindowedKeyedStorage that uses a Bloom filter to cover
>> most of the false cases.
>>
>> David
>>
>> On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <[email protected]>
>> wrote:
>>
>> > Hi All,
>> >
>> > I have looked into Windowing concepts from Apache Beam and the PR #319
>> by
>> > David. Looks like there are a lot of advanced concepts which could be
>> used
>> > by operators using event time windowing.
>> > Additionally I also looked at the Managed State implementation.
>> >
>> > One of the things I noticed is that there is an overlap of functionality
>> > between Managed State and Windowing Support in terms of the following:
>> >
>> >    - *Discarding / Dropping of tuples* from the system - Managed State
>> uses
>> >    the concept of expiry while a Windowed operator uses the concepts of
>> >    Watermarks and allowed lateness. If I try to reconcile the above
>> two, it
>> >    seems like Managed State (through TimeBucketAssigner) is trying to
>> >    implement some sort of implicit heuristic Watermarks based on either
>> the
>> >    user supplied time or the event time.
>> >    - *Global Window* support - Once we have an option to disable
>> purging in
>> >    Managed State, it will have similar semantics to the Global Window
>> > option
>> >    in Windowing support.
>> >
>> > If I understand correctly, is the suggestion to implement the Dedup
>> > operator as a Windowed operator and to use managed state only as a
>> storage
>> > medium (through WindowedStorage) ? What could be a better way of going
>> > about this?
>> >
>> > Thanks.
>> >
>> > ~ Bhupesh
>> >
>> > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <[email protected]>
>> > wrote:
>> >
>> > > Hi Thomas,
>> > >
>> > > I agree that the case of processing bounded data is a special case of
>> > > unbounded data.
>> > > Th difference I was pointing out was in terms of expiry. This is not
>> > > applicable in case of bounded data sets, while unbounded data sets
>> will
>> > > inherently use expiry for limiting the amount of data to be stored.
>> > >
>> > > For idempotency when applying expiry on the streaming data, I need to
>> > > explore more on the using the window timestamp that you proposed as
>> > opposed
>> > > to the system time which I was planning to use.
>> > >
>> > > Thanks.
>> > > ~ Bhupesh
>> > >
>> > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <[email protected]
>> >
>> > > wrote:
>> > >
>> > >> Bhupesh,
>> > >>
>> > >> Why is there a distinction between bounded and unbounded data? I see
>> the
>> > >> former as a special case of the latter?
>> > >>
>> > >> When rewinding the stream or reprocessing the stream in another run
>> the
>> > >> operator should produce the same result.
>> > >>
>> > >> This operator should be idempotent also. That implies that code does
>> not
>> > >> rely on current system time but the window timestamp instead.
>> > >>
>> > >> All of this should be accomplished by using the windowing support:
>> > >> https://github.com/apache/apex-malhar/pull/319
>> > >>
>> > >> Thanks,
>> > >> Thomas
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
>> > [email protected]>
>> > >> wrote:
>> > >>
>> > >> > Hi All,
>> > >> >
>> > >> > I want to validate the use cases for de-duplication that will be
>> going
>> > >> as
>> > >> > part of this implementation.
>> > >> >
>> > >> >    - *Bounded data set*
>> > >> >       - This is de-duplication for bounded data. For example, data
>> > sets
>> > >> >       which are old or fixed or which may not have a time field at
>> > >> > all. Example:
>> > >> >       Last year's transaction records or Customer data etc.
>> > >> >       - Concept of expiry is not needed as this is bounded data
>> set.
>> > >> >       - *Unbounded data set*
>> > >> >       - This is de-duplication of online streaming data
>> > >> >       - Expiry is needed because here incoming tuples may arrive
>> later
>> > >> than
>> > >> >       what they are expected. Expiry is always computed by taking
>> the
>> > >> > difference
>> > >> >       in System time and the Event time.
>> > >> >
>> > >> > Any feedback is appreciated.
>> > >> >
>> > >> > Thanks.
>> > >> >
>> > >> > ~ Bhupesh
>> > >> >
>> > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
>> > >> [email protected]>
>> > >> > wrote:
>> > >> >
>> > >> > > Hi All,
>> > >> > >
>> > >> > > I am working on adding a De-duplication operator in Malhar
>> library
>> > >> based
>> > >> > > on managed state APIs. I will be working off the already created
>> > JIRA
>> > >> -
>> > >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701 and the
>> > initial
>> > >> > > pull request for an AbstractDeduper here:
>> > >> > > https://github.com/apache/apex-malhar/pull/260/files
>> > >> > >
>> > >> > > I am planning to include the following features in the first
>> > version:
>> > >> > > 1. Time based de-duplication. Assumption: Tuple_Key -> Tuple_Time
>> > >> > > correlation holds.
>> > >> > > 2. Option to maintain order of incoming tuples.
>> > >> > > 3. Duplicate and Expired ports to emit duplicate and expired
>> tuples
>> > >> > > respectively.
>> > >> > >
>> > >> > > Thanks.
>> > >> > >
>> > >> > > ~ Bhupesh
>> > >> > >
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>
>

Reply via email to