Hi Shubham,
You can implement a custom stream codec to define your own hashCode logic,
A simple StateFulStreamCodec
is as below. You specify indexes on which you want to compute the hashCode
in constructor of the stream codec,
and while computing hashCode you only use elements at those indexes.
public static class TupleStreamCodec extends
DefaultStatefulStreamCodec<Tuple>
{
private int[] indexes;
public TupleStreamCodec(int[] indexes) {
this.indexes = indexes;
}
@Override
public int getPartition(Tuple tuple)
{
int hashCode = 1;
for (int idx : indexes) {
hashCode = 31 * hashCode + tuple.get(i).hashCode();
}
return hashCode;
}
}
and you can set this stream codec at input port of the Counter operator.
dag.setInputPortAttribute(counter.inport,
Context.PortContext.STREAM_CODEC, new TupleStreamCodec(new int[] {0} ));
// partition based on first element
Note : This is sample implementation, does not handle out of index and
other errors :)
- Tushar.
On Mon, Feb 8, 2016 at 3:42 PM, Shubham Pathak <[email protected]>
wrote:
> Hi,
>
> I need some suggestions / pointers related to defining a custom
> partitioner.
>
> The operators in my application process a custom tuple class ( lets call it
> TUPLE) . This data type has a single field ArrayList.. So each tuple
> represents a list of values.
>
> For a typical word count problem, my dag would be
>
> WordGenerator -> <STRING> -> Tokenizer -> <TUPLE> -> Counter -> <TUPLE> ->
> Console
>
> and if i were to use TUPLE, tokenizer will emit TUPLE that contains array
> list with contents <word,count>
>
> Now i wish to partition Counter and each instance should receive all tuples
> containing same word.
>
> I know that by default , hashCode() method of custom tuple class would be
> used , but in my case custom tuple class is an arrayList and i wish to
> specify that hashCode must be done on just the first field in ArrayList. In
> a generic case it could also be on multiple fields in array list.
>
> Do we have any examples that i could refer to ?
>
> Also can this be done at application level by setting an attribute ?
>
> Thanks,
> Shubham
>