Same for me! Does anyone know if this is expected? Or can be a bug? Thanks. -Bhupesh
On Thu, Feb 18, 2016 at 6:38 PM, Shubham Pathak <[email protected]> wrote: > Extending custom stream codec from DefaultKryoStreamCodec worked for my > case ! > > Thanks, > Shubham > > On Thu, Feb 18, 2016 at 12:48 AM, Tushar Gosavi <[email protected]> > wrote: > > > Hi Bhupesh, > > > > I also did some experiments and found out that stream codec extended > > from DefaultStatefulStreamCodec does not work. Extending custom stream > > codec from DefaultKryoStreamCodec worked. > > > > Can anyone explain, what is the difference between > > DefaultStatefulStreamCodec and DefaultKryoStreamCodec? and does platform > > allow writing custom streamcodec extended from > DefaultStatefulStreamCodec? > > In above case it did not work. If it is expected to work then I will open > > an Jira for the issue. > > > > Regards, > > -Tushar. > > > > > > > > On Wed, Feb 17, 2016 at 6:32 PM, Bhupesh Chawda <[email protected] > > > > wrote: > > > > > Hi, > > > > > > The above code fragment is not working in my case as well. > > > > > > Basically I have a Tuple class which has a "key" component. > > > > > > Here is my dag: > > > A -> B > > > > > > I need to be able to define Key Based stream multiplexing (same key > goes > > to > > > same downstream partition) when the down stream operator B is > > partitioned. > > > I tried implementing this by defining a custom stream codec extending > > > DefaultStatefulStreamCodec where I define a getPartition() function > which > > > returns tuple.key().hashcode(). However, I get the same distribution > > > irrespective of what I return in the getPartition() function. I am > using > > > the StatelessPartitioner for partitioning the downstream operator. > > > > > > Any ideas on what I am doing wrong? > > > > > > Thanks. > > > > > > -Bhupesh > > > > > > On Wed, Feb 10, 2016 at 6:38 PM, Shubham Pathak < > [email protected] > > > > > > wrote: > > > > > > > Thanks Tushar. > > > > > > > > I implemented the codec as per your suggestion. > > > > > > > > To test it i did the following : > > > > > > > > 1. Partitioned Counter operator to have 2 instances using > > > > > > > > > > > > > > <configuration><property><name>dt.application.APPNAME.operator.counter.attr.PARTITIONER</name><value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value></property></configuration> > > > > > > > > 2. The tokenizer emits <word,1> . So if i were to group the stream by > > > > second field ( index 1 ) , all words would go to same instance of > > > counter. > > > > > > > > dag.setInputPortAttribute(counter.input, PortContext.STREAM_CODEC, > > new > > > > TupleStreamCodec(new int[] { 1 }))); > > > > > > > > 3. Override getStreamCodec() method in counter operator's input port > to > > > > return new TupleStreamCodec(new int[] { 1 }))); > > > > > > > > But i couldn't observe the desired results. I also added few debug > > > > statements in TupleStreamCodec class, but surprisingly, i couldn't > see > > > them > > > > in logs. > > > > > > > > Am i missing something ? > > > > > > > > Is the com.datatorrent.common.partitioner.StatelessPartitioner:2 > > > overriding > > > > the behavior of TupleStreamCodec ? > > > > But if i remove that how would i specify how many partitions i need ? > > > > > > > > I did take look at FraudDetectionDemo and YahooFinance demo . I > noticed > > > > that StreamCodec is being used by some operators the same way i am > > using, > > > > but no where i could see number of partitions being defined for them > ( > > in > > > > application.java or properties.xml ) > > > > > > > > Could you guide me more on this. > > > > > > > > Thanks, > > > > Shubham > > > > > > > > > > > > > > > > On Mon, Feb 8, 2016 at 4:36 PM, Tushar Gosavi < > [email protected]> > > > > wrote: > > > > > > > > > Hi Shubham, > > > > > > > > > > You can implement a custom stream codec to define your own hashCode > > > > logic, > > > > > A simple StateFulStreamCodec > > > > > is as below. You specify indexes on which you want to compute the > > > > hashCode > > > > > in constructor of the stream codec, > > > > > and while computing hashCode you only use elements at those > indexes. > > > > > > > > > > > > > > > public static class TupleStreamCodec extends > > > > > DefaultStatefulStreamCodec<Tuple> > > > > > { > > > > > private int[] indexes; > > > > > > > > > > public TupleStreamCodec(int[] indexes) { > > > > > this.indexes = indexes; > > > > > } > > > > > > > > > > @Override > > > > > public int getPartition(Tuple tuple) > > > > > { > > > > > int hashCode = 1; > > > > > for (int idx : indexes) { > > > > > hashCode = 31 * hashCode + tuple.get(i).hashCode(); > > > > > } > > > > > return hashCode; > > > > > } > > > > > > > > > > } > > > > > > > > > > and you can set this stream codec at input port of the Counter > > > operator. > > > > > dag.setInputPortAttribute(counter.inport, > > > > > Context.PortContext.STREAM_CODEC, new TupleStreamCodec(new int[] > {0} > > > )); > > > > > // partition based on first element > > > > > > > > > > Note : This is sample implementation, does not handle out of > index > > > and > > > > > other errors :) > > > > > > > > > > - Tushar. > > > > > > > > > > > > > > > On Mon, Feb 8, 2016 at 3:42 PM, Shubham Pathak < > > > [email protected]> > > > > > wrote: > > > > > > > > > > > Hi, > > > > > > > > > > > > I need some suggestions / pointers related to defining a custom > > > > > > partitioner. > > > > > > > > > > > > The operators in my application process a custom tuple class ( > lets > > > > call > > > > > it > > > > > > TUPLE) . This data type has a single field ArrayList.. So each > > tuple > > > > > > represents a list of values. > > > > > > > > > > > > For a typical word count problem, my dag would be > > > > > > > > > > > > WordGenerator -> <STRING> -> Tokenizer -> <TUPLE> -> Counter -> > > > > <TUPLE> > > > > > -> > > > > > > Console > > > > > > > > > > > > and if i were to use TUPLE, tokenizer will emit TUPLE that > > contains > > > > > array > > > > > > list with contents <word,count> > > > > > > > > > > > > Now i wish to partition Counter and each instance should receive > > all > > > > > tuples > > > > > > containing same word. > > > > > > > > > > > > I know that by default , hashCode() method of custom tuple class > > > would > > > > > be > > > > > > used , but in my case custom tuple class is an arrayList and i > > wish > > > to > > > > > > specify that hashCode must be done on just the first field in > > > > ArrayList. > > > > > In > > > > > > a generic case it could also be on multiple fields in array list. > > > > > > > > > > > > Do we have any examples that i could refer to ? > > > > > > > > > > > > Also can this be done at application level by setting an > attribute > > ? > > > > > > > > > > > > Thanks, > > > > > > Shubham > > > > > > > > > > > > > > > > > > > > >
