Hi Liquan,

Thanks for getting back to me and pointing me to the confluent doco. Based
on what i read and my own assumptions, i'd expect the data consumed from
the output topic to be:

A:1
A:2
A:3
A:4
A:5

What am i missing?

Thanks,
Damian

On Sun, 17 Apr 2016 at 19:20 Liquan Pei <liquan...@gmail.com> wrote:

> Hi Damin,
>
> I am new to KStreams as well, so my answer might not be 100% precise. In
> KTable, the same key is treated as updates instead of events. Thus
> aggregation on the same key will do some de-dup. The docs for the tech
> preview contains some explanation on this behavior:
>
>
> http://docs.confluent.io/2.1.0-alpha1/streams/concepts.html#ktable-changelog-stream
>
> http://docs.confluent.io/2.1.0-alpha1/streams/quickstart.html#inspect-the-output-data
>
> Maybe we can update the Javadoc to make this behavior more explicit?
>
> Thanks,
> Liquan
>
> On Sun, Apr 17, 2016 at 9:59 AM, Damian Guy <damian....@gmail.com> wrote:
>
> > Hi,
> >
> > I'm slightly confused by KTable.count(..). The javadoc says:
> >
> >  Count number of records of this stream by the selected key into a new
> > instance of {@link KTable}.
> >
> > So.. if i send 5 records with the same key to the input topic, as per
> below
> >
> > final KafkaProducer<String, Integer> producer = new
> > KafkaProducer<>(producerProperties, new StringSerializer(), new
> > IntegerSerializer());
> > for(int i =0;i<5;i++) {
> >     producer.send(new ProducerRecord<>("input", "A",i));
> > }
> > producer.flush();
> >
> > and then setup my stream like so:
> >
> > final KStreamBuilder builder = new KStreamBuilder();
> > final KTable<String, Integer> table = builder.table(Serdes.String(),
> > Serdes.Integer(), "input");
> > final KTable<String, Long> count = table.count(new
> > KeyValueMapper<String, Integer, String>() {
> >     @Override
> >     public String apply(final String key, final Integer value) {
> >         return key;
> >     }
> > }, Serdes.String(), Serdes.Integer(),"count");
> >
> > count.to(Serdes.String(), Serdes.Long(),"count");
> >
> > And then consume the data from the "count" topic I thought i should
> > eventually get a record where the key is A and the value is 5, i.e, the
> > number of times the key A was seen in the input stream. However, this is
> > not the case. What i receive on the count topic is:
> > A:1
> > A:2
> > A:1
> > A:2
> > A:1
> > A:2
> > A:1
> > A:2
> > A:1
> >
> > Is this expected behaviour? Have i misunderstood how count is supposed to
> > work?
> >
> > Also, KTable.count(KeyValueMapper<K,V,K1> selector, String name) causes a
> > NullPointerException
> >
> > Thanks,
> > Damian
> >
>
>
>
> --
> Liquan Pei
> Software Engineer, Confluent Inc
>

Reply via email to