Extending custom stream codec from DefaultKryoStreamCodec worked for my
case !

Thanks,
Shubham

On Thu, Feb 18, 2016 at 12:48 AM, Tushar Gosavi <[email protected]>
wrote:

> Hi Bhupesh,
>
> I also did some experiments and found out that stream codec extended
> from DefaultStatefulStreamCodec does not work. Extending custom stream
> codec from DefaultKryoStreamCodec worked.
>
> Can anyone explain, what is the difference between
> DefaultStatefulStreamCodec and DefaultKryoStreamCodec? and does platform
> allow writing custom streamcodec extended from DefaultStatefulStreamCodec?
> In above case it did not work. If it is expected to work then I will open
> an Jira for the issue.
>
> Regards,
> -Tushar.
>
>
>
> On Wed, Feb 17, 2016 at 6:32 PM, Bhupesh Chawda <[email protected]>
> wrote:
>
> > Hi,
> >
> > The above code fragment is not working in my case as well.
> >
> > Basically I have a Tuple class which has a "key" component.
> >
> > Here is my dag:
> > A -> B
> >
> > I need to be able to define Key Based stream multiplexing (same key goes
> to
> > same downstream partition) when the down stream operator B is
> partitioned.
> > I tried implementing this by defining a custom stream codec extending
> > DefaultStatefulStreamCodec where I define a getPartition() function which
> > returns tuple.key().hashcode(). However, I get the same distribution
> > irrespective of what I return in the getPartition() function. I am using
> > the StatelessPartitioner for partitioning the downstream operator.
> >
> > Any ideas on what I am doing wrong?
> >
> > Thanks.
> >
> > -Bhupesh
> >
> > On Wed, Feb 10, 2016 at 6:38 PM, Shubham Pathak <[email protected]
> >
> > wrote:
> >
> > > Thanks Tushar.
> > >
> > > I implemented the codec as per your suggestion.
> > >
> > > To test it i did the following :
> > >
> > > 1. Partitioned Counter operator to have 2 instances  using
> > >
> > >
> >
> <configuration><property><name>dt.application.APPNAME.operator.counter.attr.PARTITIONER</name><value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value></property></configuration>
> > >
> > > 2. The tokenizer emits <word,1> . So if i were to group the stream by
> > > second field ( index 1 ) , all words would go to same instance of
> > counter.
> > >
> > >   dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC,
> new
> > > TupleStreamCodec(new int[] { 1 })));
> > >
> > > 3. Override getStreamCodec() method in counter operator's input port to
> > > return new TupleStreamCodec(new int[] { 1 })));
> > >
> > > But i couldn't observe the desired results. I also added few debug
> > > statements in TupleStreamCodec class, but surprisingly, i couldn't see
> > them
> > > in logs.
> > >
> > > Am i missing something ?
> > >
> > > Is the com.datatorrent.common.partitioner.StatelessPartitioner:2
> > overriding
> > > the behavior of TupleStreamCodec ?
> > > But if i remove that how would i specify how many partitions i need ?
> > >
> > > I did take look at FraudDetectionDemo and YahooFinance demo . I noticed
> > > that StreamCodec is being used by some operators the same way i am
> using,
> > > but no where i could see number of partitions being defined for them (
> in
> > > application.java or properties.xml )
> > >
> > > Could you guide me more on this.
> > >
> > > Thanks,
> > > Shubham
> > >
> > >
> > >
> > > On Mon, Feb 8, 2016 at 4:36 PM, Tushar Gosavi <[email protected]>
> > > wrote:
> > >
> > > > Hi Shubham,
> > > >
> > > > You can implement a custom stream codec to define your own hashCode
> > > logic,
> > > > A simple StateFulStreamCodec
> > > > is as below. You specify indexes on which you want to compute the
> > > hashCode
> > > > in constructor of the stream codec,
> > > > and while computing hashCode you only use elements at those indexes.
> > > >
> > > >
> > > >   public static class TupleStreamCodec extends
> > > > DefaultStatefulStreamCodec<Tuple>
> > > >   {
> > > >     private int[] indexes;
> > > >
> > > >     public TupleStreamCodec(int[] indexes) {
> > > >     this.indexes = indexes;
> > > >     }
> > > >
> > > >     @Override
> > > >     public int getPartition(Tuple tuple)
> > > >     {
> > > >       int hashCode = 1;
> > > >       for (int idx : indexes) {
> > > >         hashCode = 31 * hashCode + tuple.get(i).hashCode();
> > > >       }
> > > >       return hashCode;
> > > >     }
> > > >
> > > >   }
> > > >
> > > > and you can set this stream codec at input port of the Counter
> > operator.
> > > >     dag.setInputPortAttribute(counter.inport,
> > > > Context.PortContext.STREAM_CODEC, new TupleStreamCodec(new int[] {0}
> > ));
> > > >  // partition based on first element
> > > >
> > > >   Note : This is sample implementation, does not handle out of index
> > and
> > > > other errors :)
> > > >
> > > >   - Tushar.
> > > >
> > > >
> > > > On Mon, Feb 8, 2016 at 3:42 PM, Shubham Pathak <
> > [email protected]>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I need some suggestions / pointers related to defining a custom
> > > > > partitioner.
> > > > >
> > > > > The operators in my application process a custom tuple class ( lets
> > > call
> > > > it
> > > > > TUPLE) . This data type has a single field ArrayList.. So each
> tuple
> > > > > represents a list of values.
> > > > >
> > > > > For a typical word count problem, my dag would be
> > > > >
> > > > > WordGenerator -> <STRING> -> Tokenizer -> <TUPLE> -> Counter ->
> > > <TUPLE>
> > > > ->
> > > > > Console
> > > > >
> > > > > and  if i were to use  TUPLE, tokenizer will emit TUPLE that
> contains
> > > > array
> > > > > list with contents <word,count>
> > > > >
> > > > > Now i wish to partition Counter and each instance should receive
> all
> > > > tuples
> > > > > containing same word.
> > > > >
> > > > > I know that by default , hashCode()  method of custom tuple class
> > would
> > > > be
> > > > > used , but in my case custom tuple class is an arrayList  and i
> wish
> > to
> > > > > specify that hashCode must be done on just the first field in
> > > ArrayList.
> > > > In
> > > > > a generic case it could also be on multiple fields in array list.
> > > > >
> > > > > Do we have any examples that i could refer to ?
> > > > >
> > > > > Also can this be done at application level by setting an attribute
> ?
> > > > >
> > > > > Thanks,
> > > > > Shubham
> > > > >
> > > >
> > >
> >
>

Reply via email to