Hmm, I'm not sure if we can optimize this case as well. Following your
example:

KTable T1 = builder.table("source-topic");
KTable T2 = table.filter(value > 2);


And suppose the "source-topic" is piping the messages to T1 as: {a: 3}, {b:
5}, {a: 1}...

When {a: 3} is passed from T1 to T2, the filter will pass and hence it is
forwarded to downstream operators already; so now when later {a: 1} is
passed from T1 to T2, meaning "modifying the value with key {a} from 3 to
1", the filter will not pass any more, and hence in this case we need to
forward a {a: null} record downstreams in order to indicate the previously
forwarded {a: 3} has now been deleted in T2, right?

Anyways, we could move our discussions to the PR to not swamp the dev
mailing list.


Guozhang


On Thu, Jun 30, 2016 at 5:43 AM, Philippe Derome <phder...@gmail.com> wrote:

> Guozhang,
>
> my latest commit would propose that semantics of your JIRA case 2 be
> changed a little to suppress nulls when not sendingOldValues and not
> materializing. When a table T2 is created first from another table T1 and
> the filter does not match for the key k from T1, the invalid key k does not
> enter T2 at all (no null). Ultimately, the code change is simpler and the
> test results look more intuitive.
>
> On Wed, Jun 29, 2016 at 6:55 PM, Philippe Derome <phder...@gmail.com>
> wrote:
>
> > good.
> >
> > On Wed, Jun 29, 2016 at 6:44 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Yes, they are related in the sense that if we always materialize a
> source
> >> KTable, then we can completely replace the `sendOldValues` as it will
> >> always be true. But since 3911 is a rather big change, I'd prefer to
> >> complete this ticket first, and refactor it when we decided to work on
> >> 3911
> >> later.
> >>
> >> Feel free to link these two tickets though.
> >>
> >> Guozhang
> >>
> >> On Tue, Jun 28, 2016 at 9:47 AM, Philippe Derome <phder...@gmail.com>
> >> wrote:
> >>
> >> > Is this point of view consistent with new ticket 3911 (Enforce KTable
> >> > materialisation ) just submitted by Eno. T?
> >> >
> >> > Should the two tickets be linked somehow if they are related?
> >> > My concern is that, the overhead of requesting the source KTable to be
> >> > materialized (i.e. creating a state store, and sending the {old ->
> new}
> >> > pair instead of the new value only) may be over-whelming compared with
> >> its
> >> > potential benefits of reducing the downstream traffic.
> >> >
> >> > Guozhang
> >> >
> >> > On Sun, Jun 26, 2016 at 8:58 AM, Philippe Derome <phder...@gmail.com>
> >> > wrote:
> >> >
> >> > > Guozhang,
> >> > >
> >> > > would you say it's advisable to initialize
> KTableFilter.sendOldValues
> >> to
> >> > > true instead of false? That's what I see that can trigger your
> >> described
> >> > > case 3 to potentially desirable effect, but I didn't include it into
> >> pull
> >> > > request. If left to default value of false, I don't know what
> >> mechanism
> >> > > should override it to true.
> >> > >
> >> > > Phil
> >> > >
> >> > > On Sun, Jun 26, 2016 at 12:07 AM, Guozhang Wang <wangg...@gmail.com
> >
> >> > > wrote:
> >> > >
> >> > > > Thanks! You can follow this step-by-step guidance to contribute to
> >> > Kafka
> >> > > > via github.
> >> > > >
> >> > > >
> >> > > >
> >> > >
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest
> >> > > >
> >> > > >
> >> > > > Guozhang
> >> > > >
> >> > > >
> >> > > > On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome <
> >> phder...@gmail.com>
> >> > > > wrote:
> >> > > >
> >> > > > > I have a 1 liner solution for this in KTableFilter.java and
> about
> >> 5-6
> >> > > > lines
> >> > > > > changes to existing unit test
> >> KTableFilterTest.testSendingOldValue. I
> >> > > > > included those lines with context in the JIRA. I am struggling a
> >> bit
> >> > > with
> >> > > > > github being new to it and how to do a proper pull request so
> >> > hopefully
> >> > > > > that can be followed up by you? I had the streams test suite
> pass
> >> > aside
> >> > > > for
> >> > > > > a few cases that pertain specifically to this JIRA as
> assumptions
> >> > have
> >> > > > now
> >> > > > > changed.
> >> > > > >
> >> > > > > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang <
> >> wangg...@gmail.com>
> >> > > > wrote:
> >> > > > >
> >> > > > > > Hi Philippe,
> >> > > > > >
> >> > > > > > Great, since you agree with my reasonings, I have created a
> JIRA
> >> > > ticket
> >> > > > > for
> >> > > > > > optimizing KTableFilter (feel free to pick it up if you are
> >> > > interested
> >> > > > in
> >> > > > > > contributing):
> >> > > > > >
> >> > > > > > https://issues.apache.org/jira/browse/KAFKA-3902
> >> > > > > >
> >> > > > > > About case 3-c-1), what I meant is that since "predicate
> return
> >> > true
> >> > > on
> >> > > > > > both",
> >> > > > > > the resulted pair would just be the same as the original pair.
> >> > > > > >
> >> > > > > > About KIP-63, itself is a rather big story, but it has one
> >> > > > correspondence
> >> > > > > > to this JIRA: with caching you can dedup some records with the
> >> same
> >> > > > key,
> >> > > > > > for example in the input records to the KTable is:
> >> > > > > >
> >> > > > > > <a: 1>, <a: 2>, <a: 3>, <a: 4>, <a: 5>, <a: 6> ...
> >> > > > > >
> >> > > > > > And the KTable is materialized into a state store with cache
> on
> >> top
> >> > > of
> >> > > > > it,
> >> > > > > > then the resulted downstream could be:
> >> > > > > >
> >> > > > > > <a: {null -> 1}>, <a: {1 -> 6}> ...
> >> > > > > >
> >> > > > > > Instead of
> >> > > > > >
> >> > > > > > <a: {null -> 1}>, <a: {1 -> 2}>, <a: {2 -> 3}>, ... <a: {5 ->
> >> 6}>
> >> > ...
> >> > > > > >
> >> > > > > > So if it is piped to a filter() operator, then even less data
> >> will
> >> > be
> >> > > > > > produced.
> >> > > > > >
> >> > > > > >
> >> > > > > > Guozhang
> >> > > > > >
> >> > > > > >
> >> > > > > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome <
> >> > phder...@gmail.com
> >> > > >
> >> > > > > > wrote:
> >> > > > > >
> >> > > > > > > Yes, it looks very good. Your detailed explanation appears
> >> > > compelling
> >> > > > > > > enough to reveal that some of the details of the complexity
> >> of a
> >> > > > > streams
> >> > > > > > > system are probably inherent complexity (not that I dared
> >> assume
> >> > it
> >> > > > was
> >> > > > > > > "easy" but I could afford to be conveniently unaware). It
> >> took me
> >> > > 30
> >> > > > > > > minutes to grasp this latest response.
> >> > > > > > >
> >> > > > > > > There might be a typo in your email for case 3.c.1) as I
> would
> >> > > think
> >> > > > we
> >> > > > > > > should send the most recent pair as opposed to original, in
> >> any
> >> > > event
> >> > > > > it
> >> > > > > > > does not materially impact your presentation.
> >> > > > > > >
> >> > > > > > > Your case 3a) is really what triggered my line of
> questioning
> >> and
> >> > I
> >> > > > > found
> >> > > > > > > the current behaviour vexing as it may lead to some
> >> undesirable
> >> > and
> >> > > > > > > necessary filter (see Michael G. Noll's fix in
> >> > > > UserRegionLambdaExample
> >> > > > > at
> >> > > > > > > the very end trying to weed out null) used to output to
> topic
> >> to
> >> > > > > console.
> >> > > > > > > Without looking at design, it seemed self-evident to me that
> >> the
> >> > > 3a)
> >> > > > > > > behaviour had to be implemented ( from my point of view with
> >> the
> >> > > code
> >> > > > > > > example I was looking at, it simply means never say to
> delete
> >> a
> >> > key
> >> > > > > that
> >> > > > > > > was never created, simply don't "create a deleted" key).
> >> > > > > > >
> >> > > > > > > Likewise cases 3 b,c look very reasonable.
> >> > > > > > >
> >> > > > > > > Just out of curiosity, did you effectively just restate the
> >> > essence
> >> > > > of
> >> > > > > > > KIP-63 in a more approachable language I could understand or
> >> is
> >> > > > KIP-63
> >> > > > > > > really a different beast?
> >> > > > > > >
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang <
> >> > wangg...@gmail.com
> >> > > >
> >> > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > Hello Philippe,
> >> > > > > > > >
> >> > > > > > > > Very good points, let me dump my thoughts about
> >> "KTable.filter"
> >> > > > > > > > specifically and how we can improve on that:
> >> > > > > > > >
> >> > > > > > > > 1. Some context: when a KTable participates in a
> downstream
> >> > > > operators
> >> > > > > > > (e.g.
> >> > > > > > > > if that operator is an aggregation), then we need to
> >> > materialize
> >> > > > this
> >> > > > > > > > KTable and send both its old value as well as new value
> as a
> >> > pair
> >> > > > > {old
> >> > > > > > ->
> >> > > > > > > > new} to the downstream operator. In practice it usually
> >> needs
> >> > to
> >> > > > send
> >> > > > > > the
> >> > > > > > > > pair.
> >> > > > > > > >
> >> > > > > > > > So let's discuss about them separately, take the following
> >> > > example
> >> > > > > > source
> >> > > > > > > > stream for your KTable
> >> > > > > > > >
> >> > > > > > > > <a: 1>, <b: 2>, <a: 3> ...
> >> > > > > > > >
> >> > > > > > > > When the KTable needs to be materialized, it will
> transform
> >> the
> >> > > > > source
> >> > > > > > > > messages into the pairs of:
> >> > > > > > > >
> >> > > > > > > > <a: {null -> 1}>, <b: {nul -> 2}>, <a: {1 -> 3}>
> >> > > > > > > >
> >> > > > > > > > 2. If "send old value" is not enabled, then when the
> filter
> >> > > > predicate
> >> > > > > > > > returns false, we MUST send a <key: null> to the
> downstream
> >> > > > operator
> >> > > > > to
> >> > > > > > > > indicate that this key is being filtered in the table.
> >> > Otherwise,
> >> > > > for
> >> > > > > > > > example if your filter is "value < 2", then the updated
> >> value
> >> > <a:
> >> > > > 3>
> >> > > > > > will
> >> > > > > > > > just be filtered, resulting in incorrect semantics.
> >> > > > > > > >
> >> > > > > > > > If it returns true we should still send the original <key:
> >> > value>
> >> > > > to
> >> > > > > > > > downstream operators.
> >> > > > > > > >
> >> > > > > > > > 3. If "send old value" is enabled, then there are a couple
> >> of
> >> > > cases
> >> > > > > we
> >> > > > > > > can
> >> > > > > > > > consider:
> >> > > > > > > >
> >> > > > > > > >     a. If old value is <key: null> and new value is <key:
> >> > > > not-null>,
> >> > > > > > and
> >> > > > > > > > the filter predicate return false for the new value, then
> in
> >> > this
> >> > > > > case
> >> > > > > > it
> >> > > > > > > > is safe to optimize and not returning anything to the
> >> > downstream
> >> > > > > > > operator,
> >> > > > > > > > since in this case we know there is no value for the key
> >> > > previously
> >> > > > > > > > anyways; otherwise we send the original pair.
> >> > > > > > > >
> >> > > > > > > >     b. If old value is <key: not-null> and new value is
> >> <key:
> >> > > > null>,
> >> > > > > > > > indicating to delete this key, and the filter predicate
> >> return
> >> > > > false
> >> > > > > > for
> >> > > > > > > > the old value, then in this case it is safe to optimize
> and
> >> not
> >> > > > > > returning
> >> > > > > > > > anything to the downstream operator, since we know that
> the
> >> old
> >> > > > value
> >> > > > > > has
> >> > > > > > > > already been filtered in a previous message; otherwise we
> >> send
> >> > > the
> >> > > > > > > original
> >> > > > > > > > pair.
> >> > > > > > > >
> >> > > > > > > >     c. If both old and new values are not null, and:
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > >   1) predicate return true on both, send the original
> pair;
> >> > > > > > > >
> >> > > > > > > >   2) predicate return false on both, we can optimize and
> do
> >> not
> >> > > > send
> >> > > > > > > > anything;
> >> > > > > > > >
> >> > > > > > > >   3) predicate return true on old and false on new, send
> the
> >> > key:
> >> > > > > {old
> >> > > > > > ->
> >> > > > > > > > null};
> >> > > > > > > >
> >> > > > > > > >   4) predicate return false on old and true on new, send
> the
> >> > key:
> >> > > > > {null
> >> > > > > > > ->
> >> > > > > > > > new};
> >> > > > > > > >
> >> > > > > > > > Does this sounds good to you?
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > Guozhang
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > On Thu, Jun 23, 2016 at 6:17 PM, Philippe Derome <
> >> > > > phder...@gmail.com
> >> > > > > >
> >> > > > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > Thanks a lot for the detailed feedback, its clarity and
> >> the
> >> > > > > reference
> >> > > > > > > to
> >> > > > > > > > > KIP-63, which however is for the most part above my head
> >> for
> >> > > now.
> >> > > > > > > > >
> >> > > > > > > > > Having said that, I still hold the view that the
> >> behaviour I
> >> > > > > > presented
> >> > > > > > > is
> >> > > > > > > > > undesirable and hardly defensible and we may have no
> >> choice
> >> > but
> >> > > > to
> >> > > > > > > agree
> >> > > > > > > > to
> >> > > > > > > > > disagree and it could be a sterile discussion to keep at
> >> it
> >> > and
> >> > > > > > > > addressing
> >> > > > > > > > > KIP-63 and other issues are more important than my brief
> >> > > > > observation.
> >> > > > > > > > >
> >> > > > > > > > > What follows supports my point of view that the filter
> >> method
> >> > > is
> >> > > > > not
> >> > > > > > > > > behaving as expected and I'd still think it's a defect,
> >> > > however I
> >> > > > > am
> >> > > > > > > > > guarded with my observation admitting my status of
> "total
> >> > > newbie"
> >> > > > > at
> >> > > > > > > > stream
> >> > > > > > > > > processing and Kafka.
> >> > > > > > > > >
> >> > > > > > > > > if we rewrite the code snippet I provided from
> >> > > > > > > > > KTable<String, *String*> regionCounts = userRegions
> >> > > > > > > > >      .groupBy((userId, region) -> KeyValue.pair(region,
> >> > > region))
> >> > > > > > > > >      .count("CountsByRegion")
> >> > > > > > > > >      .filter((regionName, count) -> false)
> >> > > > > > > > >      .mapValues(count -> count.toString());
> >> > > > > > > > >
> >> > > > > > > > > to
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > KTable<String, Long> regionCounts1 = userRegions
> >> > > > > > > > >     .groupBy((userId, region) -> KeyValue.pair(region,
> >> > region))
> >> > > > > > > > >     .count("CountsByRegion");
> >> > > > > > > > >
> >> > > > > > > > > KTable<String, String> regionCounts = regionCounts1
> >> > > > > > > > >     .filter((regionName, count) -> false)
> >> > > > > > > > >     .mapValues(count -> count.toString());
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > It becomes clear that regionCounts1 could build up
> plenty
> >> of
> >> > > keys
> >> > > > > > with
> >> > > > > > > > > valid Long counts, normal behaviour
> >> > > > > > > > >
> >> > > > > > > > >  (I think you call this a node in the topology in KIP-63
> >> and
> >> > > > > > > > > regionCounts is a successor node).
> >> > > > > > > > >
> >> > > > > > > > > These regionCounts1 keys are then exposed to evaluation
> of
> >> > > KTable
> >> > > > > > > > > regionCounts as an input. But why should there be any
> key
> >> > > created
> >> > > > > in
> >> > > > > > > > > KTable regionCounts that has a false filter? In other
> >> words,
> >> > > the
> >> > > > > > > > > "optimization"
> >> > > > > > > > >
> >> > > > > > > > > seems really compelling here: do not create a key before
> >> that
> >> > > key
> >> > > > > > > > > becomes relevant. The key with a null value is valid and
> >> > > relevant
> >> > > > > in
> >> > > > > > > > > regionCounts1 but not regionCounts. By a programming
> >> > > composition
> >> > > > > > > > > argument, the original block
> >> > > > > > > > >
> >> > > > > > > > > of code I presented should be equivalent to the broken
> >> down
> >> > one
> >> > > > in
> >> > > > > > two
> >> > > > > > > > > blocks here (and I guess that's saying 1 unified node in
> >> the
> >> > > > > topology
> >> > > > > > > > > should be equivalent to a chain of 2 nodes represented
> >> below
> >> > > if I
> >> > > > > > > > > understand the terminology right).
> >> > > > > > > > >
> >> > > > > > > > > The contents of regionCounts should not change depending
> >> on
> >> > the
> >> > > > set
> >> > > > > > of
> >> > > > > > > > > keys present in regionCounts1 if we view this
> >> > > > > > > > >
> >> > > > > > > > > from a functional programming point of view (it's as if
> we
> >> > are
> >> > > > > > > > > carrying garbage collected objects into regionCounts),
> >> which
> >> > > > seems
> >> > > > > > > > > natural considering the method filter that is pervasive
> in
> >> > FP.
> >> > > > > > > > >
> >> > > > > > > > > Here regionCounts is totally oblivious that aggregation
> >> took
> >> > > > place
> >> > > > > > > > > previously in regionCounts1 and that's fine (KIP-63
> talks
> >> > much
> >> > > > > about
> >> > > > > > > > > aggregation but I don't really care about, I care about
> >> the
> >> > 2nd
> >> > > > > node
> >> > > > > > > > > and the behaviour of filter).
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > On Thu, Jun 23, 2016 at 6:13 PM, Guozhang Wang <
> >> > > > wangg...@gmail.com
> >> > > > > >
> >> > > > > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > > > Hello Philippe,
> >> > > > > > > > > >
> >> > > > > > > > > > I think your question is really in two-folds:
> >> > > > > > > > > >
> >> > > > > > > > > > 1. What is the semantic difference between a KTable
> and
> >> a
> >> > > > > KStream,
> >> > > > > > > and
> >> > > > > > > > > more
> >> > > > > > > > > > specifically how should we interpret (key, null) in
> >> KTable?
> >> > > > > > > > > >
> >> > > > > > > > > > You can find some explanations in this documentation:
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> >
> >>
> http://docs.confluent.io/3.0.0/streams/concepts.html#ktable-changelog-stream
> >> > > > > > > > > >
> >> > > > > > > > > > Note that KTable itself is still a stream behind the
> >> scene,
> >> > > > > > although
> >> > > > > > > it
> >> > > > > > > > > may
> >> > > > > > > > > > be materialized when necessary. And specifically to
> your
> >> > > > > question,
> >> > > > > > > > (key,
> >> > > > > > > > > > null) can be treated as a tombstone on the specified
> >> key,
> >> > and
> >> > > > > when
> >> > > > > > > this
> >> > > > > > > > > > KTable stream is materialized, it will result in a
> >> "delete"
> >> > > on
> >> > > > > > > > > materialized
> >> > > > > > > > > > view.
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > 2. As for the "filter" operator, yes it will generate
> a
> >> > large
> >> > > > > > amount
> >> > > > > > > of
> >> > > > > > > > > > (key, null) records which indicates "delete" in the
> >> > resulted
> >> > > > > > KTable,
> >> > > > > > > > and
> >> > > > > > > > > > hence large traffic to the piped topic. But we are
> >> working
> >> > on
> >> > > > > > KIP-63
> >> > > > > > > > > which
> >> > > > > > > > > > unifies the caching mechanism in the `KTable.to`
> >> operator
> >> > as
> >> > > > well
> >> > > > > > so
> >> > > > > > > > that
> >> > > > > > > > > > de-duping can be done in this operator and hence the
> >> > outgoing
> >> > > > > > traffic
> >> > > > > > > > can
> >> > > > > > > > > > be largely reduced:
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > Guozhang
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > On Thu, Jun 23, 2016 at 5:50 AM, Philippe Derome <
> >> > > > > > phder...@gmail.com
> >> > > > > > > >
> >> > > > > > > > > > wrote:
> >> > > > > > > > > >
> >> > > > > > > > > > > I made a modification of latest Confluent's example
> >> > > > > > > > > > > UserRegionLambdaExample. See relevant code at end of
> >> > email.
> >> > > > > > > > > > >
> >> > > > > > > > > > > Am I correct in understanding that KTable semantics
> >> > should
> >> > > be
> >> > > > > > > similar
> >> > > > > > > > > to
> >> > > > > > > > > > a
> >> > > > > > > > > > > store-backed cache of a view as (per wikipedia on
> >> > > > materialized
> >> > > > > > > views)
> >> > > > > > > > > or
> >> > > > > > > > > > > similar to Oracle's materialized views and indexed
> >> views?
> >> > > > More
> >> > > > > > > > > > > specifically, I am looking at when a (key, null
> value)
> >> > pair
> >> > > > can
> >> > > > > > > make
> >> > > > > > > > it
> >> > > > > > > > > > > into KTable on generating table from a valid KStream
> >> with
> >> > a
> >> > > > > false
> >> > > > > > > > > filter.
> >> > > > > > > > > > >
> >> > > > > > > > > > > Here's relevant code modified from example for
> which I
> >> > > > observed
> >> > > > > > > that
> >> > > > > > > > > all
> >> > > > > > > > > > > keys within userRegions are sent out to topic
> >> > LargeRegions
> >> > > > > with a
> >> > > > > > > > null
> >> > > > > > > > > > > value. I would think that both regionCounts KTable
> and
> >> > > topic
> >> > > > > > > > > LargeRegions
> >> > > > > > > > > > > should be empty so that the cached view agrees with
> >> the
> >> > > > > intended
> >> > > > > > > > query
> >> > > > > > > > > (a
> >> > > > > > > > > > > query with an intentional empty result set as the
> >> filter
> >> > is
> >> > > > > > > > > intentionally
> >> > > > > > > > > > > false as 1 >= 2).
> >> > > > > > > > > > >
> >> > > > > > > > > > > I am not sure I understand implications properly as
> I
> >> am
> >> > > new
> >> > > > > but
> >> > > > > > it
> >> > > > > > > > > seems
> >> > > > > > > > > > > possible that  a highly selective filter from a
> large
> >> > > > incoming
> >> > > > > > > stream
> >> > > > > > > > > > would
> >> > > > > > > > > > > result in high memory usage for regionCounts and
> hence
> >> > the
> >> > > > > stream
> >> > > > > > > > > > > application.
> >> > > > > > > > > > >
> >> > > > > > > > > > > KTable<String, *String*> regionCounts = userRegions
> >> > > > > > > > > > >     // Count by region
> >> > > > > > > > > > >     // We do not need to specify any explicit serdes
> >> > > because
> >> > > > > the
> >> > > > > > > key
> >> > > > > > > > > > > and value types do not change
> >> > > > > > > > > > >     .groupBy((userId, region) ->
> KeyValue.pair(region,
> >> > > > region))
> >> > > > > > > > > > >     .count("CountsByRegion")
> >> > > > > > > > > > >     // discard any regions FOR SAKE OF EXAMPLE
> >> > > > > > > > > > >     .filter((regionName, count) -> *1 >= 2*)
> >> > > > > > > > > > >     .mapValues(count -> count.toString());
> >> > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > > > KStream<String, *String*> regionCountsForConsole =
> >> > > > > > > > > > regionCounts.toStream();
> >> > > > > > > > > > >
> >> > > > > > > > > > > regionCountsForConsole.to(stringSerde,
> *stringSerde*,
> >> > > > > > > > "LargeRegions");
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > --
> >> > > > > > > > > > -- Guozhang
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > >
> >> > > > > > > > --
> >> > > > > > > > -- Guozhang
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > >
> >> > > > > > --
> >> > > > > > -- Guozhang
> >> > > > > >
> >> > > > >
> >> > > >
> >> > > >
> >> > > >
> >> > > > --
> >> > > > -- Guozhang
> >> > > >
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Reply via email to