Hi Liquan, Thanks for getting back to me and pointing me to the confluent doco. Based on what i read and my own assumptions, i'd expect the data consumed from the output topic to be:
A:1 A:2 A:3 A:4 A:5 What am i missing? Thanks, Damian On Sun, 17 Apr 2016 at 19:20 Liquan Pei <liquan...@gmail.com> wrote: > Hi Damin, > > I am new to KStreams as well, so my answer might not be 100% precise. In > KTable, the same key is treated as updates instead of events. Thus > aggregation on the same key will do some de-dup. The docs for the tech > preview contains some explanation on this behavior: > > > http://docs.confluent.io/2.1.0-alpha1/streams/concepts.html#ktable-changelog-stream > > http://docs.confluent.io/2.1.0-alpha1/streams/quickstart.html#inspect-the-output-data > > Maybe we can update the Javadoc to make this behavior more explicit? > > Thanks, > Liquan > > On Sun, Apr 17, 2016 at 9:59 AM, Damian Guy <damian....@gmail.com> wrote: > > > Hi, > > > > I'm slightly confused by KTable.count(..). The javadoc says: > > > > Count number of records of this stream by the selected key into a new > > instance of {@link KTable}. > > > > So.. if i send 5 records with the same key to the input topic, as per > below > > > > final KafkaProducer<String, Integer> producer = new > > KafkaProducer<>(producerProperties, new StringSerializer(), new > > IntegerSerializer()); > > for(int i =0;i<5;i++) { > > producer.send(new ProducerRecord<>("input", "A",i)); > > } > > producer.flush(); > > > > and then setup my stream like so: > > > > final KStreamBuilder builder = new KStreamBuilder(); > > final KTable<String, Integer> table = builder.table(Serdes.String(), > > Serdes.Integer(), "input"); > > final KTable<String, Long> count = table.count(new > > KeyValueMapper<String, Integer, String>() { > > @Override > > public String apply(final String key, final Integer value) { > > return key; > > } > > }, Serdes.String(), Serdes.Integer(),"count"); > > > > count.to(Serdes.String(), Serdes.Long(),"count"); > > > > And then consume the data from the "count" topic I thought i should > > eventually get a record where the key is A and the value is 5, i.e, the > > number of times the key A was seen in the input stream. However, this is > > not the case. What i receive on the count topic is: > > A:1 > > A:2 > > A:1 > > A:2 > > A:1 > > A:2 > > A:1 > > A:2 > > A:1 > > > > Is this expected behaviour? Have i misunderstood how count is supposed to > > work? > > > > Also, KTable.count(KeyValueMapper<K,V,K1> selector, String name) causes a > > NullPointerException > > > > Thanks, > > Damian > > > > > > -- > Liquan Pei > Software Engineer, Confluent Inc >