Thanks Tushar.
I implemented the codec as per your suggestion.
To test it i did the following :
1. Partitioned Counter operator to have 2 instances using
<configuration><property><name>dt.application.APPNAME.operator.counter.attr.PARTITIONER</name><value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value></property></configuration>
2. The tokenizer emits <word,1> . So if i were to group the stream by
second field ( index 1 ) , all words would go to same instance of counter.
dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC, new
TupleStreamCodec(new int[] { 1 })));
3. Override getStreamCodec() method in counter operator's input port to
return new TupleStreamCodec(new int[] { 1 })));
But i couldn't observe the desired results. I also added few debug
statements in TupleStreamCodec class, but surprisingly, i couldn't see them
in logs.
Am i missing something ?
Is the com.datatorrent.common.partitioner.StatelessPartitioner:2 overriding
the behavior of TupleStreamCodec ?
But if i remove that how would i specify how many partitions i need ?
I did take look at FraudDetectionDemo and YahooFinance demo . I noticed
that StreamCodec is being used by some operators the same way i am using,
but no where i could see number of partitions being defined for them ( in
application.java or properties.xml )
Could you guide me more on this.
Thanks,
Shubham
On Mon, Feb 8, 2016 at 4:36 PM, Tushar Gosavi <[email protected]>
wrote:
> Hi Shubham,
>
> You can implement a custom stream codec to define your own hashCode logic,
> A simple StateFulStreamCodec
> is as below. You specify indexes on which you want to compute the hashCode
> in constructor of the stream codec,
> and while computing hashCode you only use elements at those indexes.
>
>
> public static class TupleStreamCodec extends
> DefaultStatefulStreamCodec<Tuple>
> {
> private int[] indexes;
>
> public TupleStreamCodec(int[] indexes) {
> this.indexes = indexes;
> }
>
> @Override
> public int getPartition(Tuple tuple)
> {
> int hashCode = 1;
> for (int idx : indexes) {
> hashCode = 31 * hashCode + tuple.get(i).hashCode();
> }
> return hashCode;
> }
>
> }
>
> and you can set this stream codec at input port of the Counter operator.
> dag.setInputPortAttribute(counter.inport,
> Context.PortContext.STREAM_CODEC, new TupleStreamCodec(new int[] {0} ));
> // partition based on first element
>
> Note : This is sample implementation, does not handle out of index and
> other errors :)
>
> - Tushar.
>
>
> On Mon, Feb 8, 2016 at 3:42 PM, Shubham Pathak <[email protected]>
> wrote:
>
> > Hi,
> >
> > I need some suggestions / pointers related to defining a custom
> > partitioner.
> >
> > The operators in my application process a custom tuple class ( lets call
> it
> > TUPLE) . This data type has a single field ArrayList.. So each tuple
> > represents a list of values.
> >
> > For a typical word count problem, my dag would be
> >
> > WordGenerator -> <STRING> -> Tokenizer -> <TUPLE> -> Counter -> <TUPLE>
> ->
> > Console
> >
> > and if i were to use TUPLE, tokenizer will emit TUPLE that contains
> array
> > list with contents <word,count>
> >
> > Now i wish to partition Counter and each instance should receive all
> tuples
> > containing same word.
> >
> > I know that by default , hashCode() method of custom tuple class would
> be
> > used , but in my case custom tuple class is an arrayList and i wish to
> > specify that hashCode must be done on just the first field in ArrayList.
> In
> > a generic case it could also be on multiple fields in array list.
> >
> > Do we have any examples that i could refer to ?
> >
> > Also can this be done at application level by setting an attribute ?
> >
> > Thanks,
> > Shubham
> >
>