Hi Chandni,

That is correct. I will provide some explanation about the motivation and
usage of that interface:

Goals:

The goal of SpillableComplexComponent is to provide an interface for
creating Spillable datastructures. It is essentially the interface for a
factory class which produces Spillable datastructures. By having a factory
interface it allows different backends to be plugged into operators very
easily.

Going into more detail for your use case SpillableComplexComponent has two
factory methods newSpillableByteMap and newSpillableByteArrayListMultimap.
These two methods are factory methods which return an implementation of the
SpillabeByteMap and SpillableArrayListMultimap interfaces.

Usage:

Setting backend on an operator:

myOperator.setStore(new InMemorySpillableComplexComponent())
//myOperator.setStore(new ManagedStateSpillableComplexComponent())
//myOperator.setStore(new HbaseSpillableComplexComponent())

Using the factory in an operator

setup() {
   map = store.newSpillableByteMap()
}

As you can see you can set the factory on an operator, then the operator
can use the factory to create a Spillable datastructure. The operator is
agnostic to the store which manages the data for Spillable datastructures.
If you want the data to be stored in managed state simply set a
ManagedState implementation of SpillableComplexComponent, if you want the
data to be stored in Cassandra simply set a Cassandra implementation of
SpillableComplexComponent on the operator. The code using the spillable
datastructures is independent of the backend used to store the data with
this design

Thanks,
Tim

On Thu, May 19, 2016 at 9:29 PM, Chandni Singh <[email protected]>
wrote:

> Hey Tim,
>
> As I understand for Join operator, there needs to be a common abstract for
> a SpillableMap and SpillableArrayListMultipmap.
>
> I suggested using SpillableComplexComponent. Is this correct?
>
> Thanks,
> Chandni
>
> On Wed, May 18, 2016 at 1:34 AM, Chandni Singh <[email protected]>
> wrote:
>
> > Hi Chaitanya,
> >
> > I am NOT suggesting that you use the interface TimeSliceBucketedState.
> >
> > I don't see the need of having a JoinStore abstraction.
> >
> > There will be SpillableArrayListMultimap implementation on which you can
> > set "ManagedTimeUnifiedStateImpl" as the persistent store.
> > This API of SpillableArrayListMultimap is sufficient for the use case.
> >
> > You can directly use this implementation of SpillableArrayListMultimap in
> > the Join operator.  Here is a simple example:
> >
> > class InnerJoinOperator
> > {
> >    SpillableArrayListMultiMap stream1Data = new
> > SpillableArrayListMultiMap(ManagedTimeUnifiedStateImp);
> >
> >    port1.process (tuple) {
> >        stream1Data.put(tuple.getKey(), tuple.getVal());
> >    }
> > }
> >
> >
> > Chandni
> >
> >
> >
> >
> > On Wed, May 18, 2016 at 1:00 AM, Chaitanya Chebolu <
> > [email protected]> wrote:
> >
> >> Hi Chandni,
> >>
> >>    I think you are suggesting about interface
> "TimeSlicedBucketedState". I
> >> feel this is tightly coupled with the Managed State.
> >>    In "TimeSlicedBucketedState" abstraction, bucketId parameter relates
> to
> >> the Managed State and this is not needed for join operator.
> >>
> >> Regards,
> >> Chaitanya
> >>
> >> On Wed, May 18, 2016 at 12:51 PM, Chandni Singh <
> [email protected]>
> >> wrote:
> >>
> >> > Chaitanya,
> >> >
> >> > SpillableArrayListMultimap will provide gives you similar abstraction.
> >> >
> >> > Why do we need  another abstraction "Join Store" ?
> >> >
> >> > Chandni
> >> >
> >> > On Tue, May 17, 2016 at 11:30 PM, Chaitanya Chebolu <
> >> > [email protected]> wrote:
> >> >
> >> > > Chandni,
> >> > >
> >> > >    JoinStore is an interface and consists of following methods:
> >> > > 1) boolean put(Object key, long time, Object value) => Insert (Key,
> >> > Value)
> >> > > pair into the store.
> >> > > 2) List<> getTuples(Object key) => Return the list of values from
> >> store
> >> > to
> >> > > which the specified key is mapped.
> >> > >
> >> > >    JoinStore is a plug-able to the join operator. Below properties
> >> > exposed
> >> > > from join operator:
> >> > > JoinStore leftStore, rightStore.
> >> > >
> >> > > By default, leftStore & rightStore would be the Join Store using
> >> > spillable
> >> > > data-structures over ManagedState. If the user wants to integrate
> >> > different
> >> > > store, he/she has to implements the JoinStore and set it to the Join
> >> > > operator.
> >> > >
> >> > >
> >> > > Tim,
> >> > >
> >> > >    I am not planning any different implementation.
> >> > >    Here, I proposed to plug the
> >> SpillableArrayListMultiMap/SplillableMap
> >> > > over managed state. I think this is what you are developing. Please
> >> > correct
> >> > > it, if I am wrong.
> >> > >
> >> > > Regards,
> >> > > Chaitanya
> >> > >
> >> > > On Tue, May 17, 2016 at 11:24 PM, Timothy Farkas <
> >> > > [email protected]> wrote:
> >> > >
> >> > > > Chaitanya,
> >> > > >
> >> > > > Are you planning to use the SpillableMap and
> >> SpillableArrayListMultiMap
> >> > > > that are in development, or separate implementations? If you are
> >> > planning
> >> > > > on using separate implementations can you please explain why they
> >> are
> >> > > > needed?
> >> > > >
> >> > > > Thanks,
> >> > > > Tim
> >> > > >
> >> > > > On Tue, May 17, 2016 at 10:36 AM, Chaitanya Chebolu <
> >> > > > [email protected]> wrote:
> >> > > >
> >> > > > > Hi Chandni,
> >> > > > >
> >> > > > >   I am suggesting two stores and categorized those based on key
> >> type:
> >> > > > > (1) If the key is Primary Key then store would be
> >> ManagedJoinStore or
> >> > > > Join
> >> > > > > store using SpillableMap.
> >> > > > > (2) If the key is not the Primary Key then store would be Join
> >> store
> >> > > > using
> >> > > > > SpillableArrayListMultiMap.
> >> > > > >
> >> > > > > Regards,
> >> > > > > Chaitanya
> >> > > > >
> >> > > > > On Tue, May 17, 2016 at 10:46 PM, Chandni Singh <
> >> > > [email protected]
> >> > > > >
> >> > > > > wrote:
> >> > > > >
> >> > > > > > Hi Chaitanya,
> >> > > > > >
> >> > > > > > In the last discussion, we decided that for Join operation we
> >> > needed
> >> > > > the
> >> > > > > > Spillable DataStructures- specifically
> >> SpillableArrayListMultiMap
> >> > for
> >> > > > the
> >> > > > > > Join Operator.
> >> > > > > >
> >> > > > > > Tim had created a ticket APEXMALHAR-2070 to address this.  The
> >> pull
> >> > > > > request
> >> > > > > > for an in-memory implementation already exists:
> >> > > > > > https://github.com/apache/incubator-apex-malhar/pull/262
> >> > > > > >
> >> > > > > > As commented on the ticket, Tim is working on the
> >> implementation of
> >> > > > > > SpillableArrayListMultiMap which uses ManagedState.
> >> > > > > >
> >> > > > > > The ManagedJoinStore seems to me like a duplicate of these
> >> > spoilable
> >> > > > data
> >> > > > > > structure. I think we should avoid creating multiple things
> that
> >> > > > provide
> >> > > > > > the same basic functionality.
> >> > > > > >
> >> > > > > > Can you please help review the spillable data structures pull
> >> > request
> >> > > > and
> >> > > > > > point out what you will need for Join that is missing there?
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Chandni
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > On Mon, May 16, 2016 at 10:05 PM, Chaitanya Chebolu <
> >> > > > > > [email protected]> wrote:
> >> > > > > >
> >> > > > > > > Hi All,
> >> > > > > > >
> >> > > > > > >   Please go through this design and share your suggestions.
> >> > > > > > >
> >> > > > > > > Regards,
> >> > > > > > > Chaitanya
> >> > > > > > >
> >> > > > > > > On Thu, May 12, 2016 at 2:56 PM, Mohit Jotwani <
> >> > > > [email protected]>
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > +1.
> >> > > > > > > >
> >> > > > > > > > This is one of the common use cases in batch scenarios and
> >> it
> >> > > will
> >> > > > be
> >> > > > > > > great
> >> > > > > > > > to have this in a stream as well using managed state,
> >> > > > > spillableMultiMap
> >> > > > > > > > structure.
> >> > > > > > > >
> >> > > > > > > > Regards,
> >> > > > > > > > Mohit
> >> > > > > > > >
> >> > > > > > > > On Thu, May 12, 2016 at 2:38 PM, Yogi Devendra <
> >> > > > > > > > [email protected]
> >> > > > > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > +1 for the Proposal.
> >> > > > > > > > >
> >> > > > > > > > > This will be useful for joins involving large amount of
> >> data
> >> > to
> >> > > > be
> >> > > > > > held
> >> > > > > > > > > intermediately.
> >> > > > > > > > >
> >> > > > > > > > > ~ Yogi
> >> > > > > > > > >
> >> > > > > > > > > On 9 May 2016 at 14:05, Chaitanya Chebolu <
> >> > > > > [email protected]
> >> > > > > > >
> >> > > > > > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > > > Hi ,
> >> > > > > > > > > >
> >> > > > > > > > > >    Malhar library has in-memory join operator and this
> >> > can't
> >> > > > > > process
> >> > > > > > > > > large
> >> > > > > > > > > > amount of data because of checkpoint and Memory
> >> > bottlenecks.
> >> > > To
> >> > > > > > avoid
> >> > > > > > > > > > these, I am proposing inner join operator using
> Managed
> >> > State
> >> > > > > that
> >> > > > > > is
> >> > > > > > > > > > recently added to Malhar.
> >> > > > > > > > > >
> >> > > > > > > > > > Details of Inner Join operator, Managed State and
> Design
> >> > for
> >> > > > > Inner
> >> > > > > > > Join
> >> > > > > > > > > > operator using Managed State are given below:
> >> > > > > > > > > >
> >> > > > > > > > > > Inner Join Operator
> >> > > > > > > > > > --------------------------
> >> > > > > > > > > > Join Operator pairs tuples from two relational streams
> >> that
> >> > > > > matches
> >> > > > > > > the
> >> > > > > > > > > > join condition and emits the paired tuples.
> >> > > > > > > > > >
> >> > > > > > > > > > For example, let's say S1 and S2 are two input streams
> >> and
> >> > > join
> >> > > > > > > > condition
> >> > > > > > > > > > is
> >> > > > > > > > > > (S1.a.key = S2.b.key) and |S1.a.time - S2.b.time| < t1
> >> > > > > > > > > > where a and b are tuples coming on S1 and S2
> >> respectively,
> >> > t1
> >> > > > is
> >> > > > > > the
> >> > > > > > > > time
> >> > > > > > > > > > period
> >> > > > > > > > > >
> >> > > > > > > > > > Based on the above join condition, join operator needs
> >> to
> >> > > store
> >> > > > > > > tuples
> >> > > > > > > > > for
> >> > > > > > > > > > "t1" time. Let's say JS1 and JS2 are their respective
> >> > stores
> >> > > > > where
> >> > > > > > > > these
> >> > > > > > > > > > tuples are stored.
> >> > > > > > > > > >
> >> > > > > > > > > > Let (k1, v1) be the tuple coming on Stream S1.
> >> Following
> >> > are
> >> > > > the
> >> > > > > > > steps
> >> > > > > > > > > to
> >> > > > > > > > > > be done for join operation:
> >> > > > > > > > > >
> >> > > > > > > > > >    1. Insert (k1, v1) into the store JS1.
> >> > > > > > > > > >    2. For Key k1, get the values from store JS2.
> >> > > > > > > > > >    3. If step 2 returns non-null set, apply join
> >> conditions
> >> > > > > > > > > >
> >> > > > > > > > > > The similar procedure is applied if the tuple is on
> >> Stream
> >> > > S2,
> >> > > > > > > swapping
> >> > > > > > > > > JS1
> >> > > > > > > > > > and JS2 in the steps above.
> >> > > > > > > > > >
> >> > > > > > > > > > Managed State
> >> > > > > > > > > > --------------------
> >> > > > > > > > > > Managed State is an incremental check-pointing and
> >> > > > fault-tolerant
> >> > > > > > key
> >> > > > > > > > > value
> >> > > > > > > > > > data structure. For more info about Managed State,
> >> please
> >> > > have
> >> > > > a
> >> > > > > > look
> >> > > > > > > > at
> >> > > > > > > > > > the below link:
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/org/apache/apex/malhar/lib/state/managed
> >> > > > > > > > > >
> >> > > > > > > > > > Abstract Join Operator
> >> > > > > > > > > > --------------------------------
> >> > > > > > > > > > Abstract implementation of Join operator is available
> at
> >> > > > > > > > > > com.datatorrent.lib.join package. JoinStore is a store
> >> > > > interface
> >> > > > > > used
> >> > > > > > > > in
> >> > > > > > > > > > join operator and available at the same package.
> >> > > > > > > > > > For more details about the join operator,  please
> have a
> >> > look
> >> > > > at
> >> > > > > > the
> >> > > > > > > > > below
> >> > > > > > > > > > link:
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> https://github.com/apache/incubator-apex-malhar/tree/master/library/src/main/java/com/datatorrent/lib/join
> >> > > > > > > > > >
> >> > > > > > > > > > Design of Join Operator Using Managed State
> >> > > > > > > > > >
> >> > -------------------------------------------------------------
> >> > > > > > > > > > We need to provide a concrete store using Managed
> State
> >> and
> >> > > > would
> >> > > > > > be
> >> > > > > > > > like
> >> > > > > > > > > > as below:
> >> > > > > > > > > >
> >> > > > > > > > > > public class ManagedJoinStore extends
> >> ManagedTimeStateImpl
> >> > > > > > implements
> >> > > > > > > > > > JoinStore
> >> > > > > > > > > > {
> >> > > > > > > > > > }
> >> > > > > > > > > >
> >> > > > > > > > > > Data in Managed store would be in the form of Map of
> >> byte[]
> >> > > to
> >> > > > > > > byte[].
> >> > > > > > > > > >
> >> > > > > > > > > > If the value is list, then inserting (k1, v1) into the
> >> > store
> >> > > > > would
> >> > > > > > be
> >> > > > > > > > the
> >> > > > > > > > > > following steps:
> >> > > > > > > > > > (1) For the key k1, get the value from store JS2.
> (Here
> >> we
> >> > > need
> >> > > > > to
> >> > > > > > > > search
> >> > > > > > > > > > it in all the time buckets, if the key is not present
> in
> >> > > > Memory)
> >> > > > > > > > > > (2) Convert the above value to List.
> >> > > > > > > > > > (3) Add the value v1 to the above list.
> >> > > > > > > > > > (4) Convert the new list to slice.
> >> > > > > > > > > > (5) Insert (k1, new slice) into the Managed state.
> >> > > > > > > > > >
> >> > > > > > > > > > Values as list has been discussed here:
> >> > > > > > > > > > https://issues.apache.org/jira/browse/APEXMALHAR-2026
> >> > > > > > > > > >
> >> > > > > > > > > > Based on above JIRA, I am suggesting SpillableMultiMap
> >> data
> >> > > > > > structure
> >> > > > > > > > for
> >> > > > > > > > > > list of values. Let the store would be
> >> > > > > SpillableMultiMapJoinStore.
> >> > > > > > > > > >
> >> > > > > > > > > > Based on above details, store is categorized based on
> >> key
> >> > > type.
> >> > > > > > > > > >
> >> > > > > > > > > >          Key
> >> >  Store
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> --------------------------------------------------------------------------
> >> > > > > > > > > > Primary
> >> ManagedJoinStore
> >> > > > > > > > > > Not Primary
> >> > > > > SpillableMultiMapJoinStore
> >> > > > > > > > > >
> >> > > > > > > > > > Following additional properties would be exposed by
> the
> >> > Join
> >> > > > > > > operator:
> >> > > > > > > > > >
> >> > > > > > > > > >    - isPrimaryKey - whether the key is primary or not.
> >> > > > > > > > > >    - noOfBuckets - Number of key buckets. This
> >> parameter is
> >> > > > > > required
> >> > > > > > > > for
> >> > > > > > > > > >    Managed State.
> >> > > > > > > > > >
> >> > > > > > > > > > Please provide your suggestions.
> >> > > > > > > > > >
> >> > > > > > > > > > Regards,
> >> > > > > > > > > > Chaitanya
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Reply via email to