Hi, The above code fragment is not working in my case as well.
Basically I have a Tuple class which has a "key" component. Here is my dag: A -> B I need to be able to define Key Based stream multiplexing (same key goes to same downstream partition) when the down stream operator B is partitioned. I tried implementing this by defining a custom stream codec extending DefaultStatefulStreamCodec where I define a getPartition() function which returns tuple.key().hashcode(). However, I get the same distribution irrespective of what I return in the getPartition() function. I am using the StatelessPartitioner for partitioning the downstream operator. Any ideas on what I am doing wrong? Thanks. -Bhupesh On Wed, Feb 10, 2016 at 6:38 PM, Shubham Pathak <[email protected]> wrote: > Thanks Tushar. > > I implemented the codec as per your suggestion. > > To test it i did the following : > > 1. Partitioned Counter operator to have 2 instances using > > <configuration><property><name>dt.application.APPNAME.operator.counter.attr.PARTITIONER</name><value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value></property></configuration> > > 2. The tokenizer emits <word,1> . So if i were to group the stream by > second field ( index 1 ) , all words would go to same instance of counter. > > dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC, new > TupleStreamCodec(new int[] { 1 }))); > > 3. Override getStreamCodec() method in counter operator's input port to > return new TupleStreamCodec(new int[] { 1 }))); > > But i couldn't observe the desired results. I also added few debug > statements in TupleStreamCodec class, but surprisingly, i couldn't see them > in logs. > > Am i missing something ? > > Is the com.datatorrent.common.partitioner.StatelessPartitioner:2 overriding > the behavior of TupleStreamCodec ? > But if i remove that how would i specify how many partitions i need ? > > I did take look at FraudDetectionDemo and YahooFinance demo . I noticed > that StreamCodec is being used by some operators the same way i am using, > but no where i could see number of partitions being defined for them ( in > application.java or properties.xml ) > > Could you guide me more on this. > > Thanks, > Shubham > > > > On Mon, Feb 8, 2016 at 4:36 PM, Tushar Gosavi <[email protected]> > wrote: > > > Hi Shubham, > > > > You can implement a custom stream codec to define your own hashCode > logic, > > A simple StateFulStreamCodec > > is as below. You specify indexes on which you want to compute the > hashCode > > in constructor of the stream codec, > > and while computing hashCode you only use elements at those indexes. > > > > > > public static class TupleStreamCodec extends > > DefaultStatefulStreamCodec<Tuple> > > { > > private int[] indexes; > > > > public TupleStreamCodec(int[] indexes) { > > this.indexes = indexes; > > } > > > > @Override > > public int getPartition(Tuple tuple) > > { > > int hashCode = 1; > > for (int idx : indexes) { > > hashCode = 31 * hashCode + tuple.get(i).hashCode(); > > } > > return hashCode; > > } > > > > } > > > > and you can set this stream codec at input port of the Counter operator. > > dag.setInputPortAttribute(counter.inport, > > Context.PortContext.STREAM_CODEC, new TupleStreamCodec(new int[] {0} )); > > // partition based on first element > > > > Note : This is sample implementation, does not handle out of index and > > other errors :) > > > > - Tushar. > > > > > > On Mon, Feb 8, 2016 at 3:42 PM, Shubham Pathak <[email protected]> > > wrote: > > > > > Hi, > > > > > > I need some suggestions / pointers related to defining a custom > > > partitioner. > > > > > > The operators in my application process a custom tuple class ( lets > call > > it > > > TUPLE) . This data type has a single field ArrayList.. So each tuple > > > represents a list of values. > > > > > > For a typical word count problem, my dag would be > > > > > > WordGenerator -> <STRING> -> Tokenizer -> <TUPLE> -> Counter -> > <TUPLE> > > -> > > > Console > > > > > > and if i were to use TUPLE, tokenizer will emit TUPLE that contains > > array > > > list with contents <word,count> > > > > > > Now i wish to partition Counter and each instance should receive all > > tuples > > > containing same word. > > > > > > I know that by default , hashCode() method of custom tuple class would > > be > > > used , but in my case custom tuple class is an arrayList and i wish to > > > specify that hashCode must be done on just the first field in > ArrayList. > > In > > > a generic case it could also be on multiple fields in array list. > > > > > > Do we have any examples that i could refer to ? > > > > > > Also can this be done at application level by setting an attribute ? > > > > > > Thanks, > > > Shubham > > > > > >
