Guozhang,

my latest commit would propose that semantics of your JIRA case 2 be
changed a little to suppress nulls when not sendingOldValues and not
materializing. When a table T2 is created first from another table T1 and
the filter does not match for the key k from T1, the invalid key k does not
enter T2 at all (no null). Ultimately, the code change is simpler and the
test results look more intuitive.

On Wed, Jun 29, 2016 at 6:55 PM, Philippe Derome <phder...@gmail.com> wrote:

> good.
>
> On Wed, Jun 29, 2016 at 6:44 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Yes, they are related in the sense that if we always materialize a source
>> KTable, then we can completely replace the `sendOldValues` as it will
>> always be true. But since 3911 is a rather big change, I'd prefer to
>> complete this ticket first, and refactor it when we decided to work on
>> 3911
>> later.
>>
>> Feel free to link these two tickets though.
>>
>> Guozhang
>>
>> On Tue, Jun 28, 2016 at 9:47 AM, Philippe Derome <phder...@gmail.com>
>> wrote:
>>
>> > Is this point of view consistent with new ticket 3911 (Enforce KTable
>> > materialisation ) just submitted by Eno. T?
>> >
>> > Should the two tickets be linked somehow if they are related?
>> > My concern is that, the overhead of requesting the source KTable to be
>> > materialized (i.e. creating a state store, and sending the {old -> new}
>> > pair instead of the new value only) may be over-whelming compared with
>> its
>> > potential benefits of reducing the downstream traffic.
>> >
>> > Guozhang
>> >
>> > On Sun, Jun 26, 2016 at 8:58 AM, Philippe Derome <phder...@gmail.com>
>> > wrote:
>> >
>> > > Guozhang,
>> > >
>> > > would you say it's advisable to initialize KTableFilter.sendOldValues
>> to
>> > > true instead of false? That's what I see that can trigger your
>> described
>> > > case 3 to potentially desirable effect, but I didn't include it into
>> pull
>> > > request. If left to default value of false, I don't know what
>> mechanism
>> > > should override it to true.
>> > >
>> > > Phil
>> > >
>> > > On Sun, Jun 26, 2016 at 12:07 AM, Guozhang Wang <wangg...@gmail.com>
>> > > wrote:
>> > >
>> > > > Thanks! You can follow this step-by-step guidance to contribute to
>> > Kafka
>> > > > via github.
>> > > >
>> > > >
>> > > >
>> > >
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest
>> > > >
>> > > >
>> > > > Guozhang
>> > > >
>> > > >
>> > > > On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome <
>> phder...@gmail.com>
>> > > > wrote:
>> > > >
>> > > > > I have a 1 liner solution for this in KTableFilter.java and about
>> 5-6
>> > > > lines
>> > > > > changes to existing unit test
>> KTableFilterTest.testSendingOldValue. I
>> > > > > included those lines with context in the JIRA. I am struggling a
>> bit
>> > > with
>> > > > > github being new to it and how to do a proper pull request so
>> > hopefully
>> > > > > that can be followed up by you? I had the streams test suite pass
>> > aside
>> > > > for
>> > > > > a few cases that pertain specifically to this JIRA as assumptions
>> > have
>> > > > now
>> > > > > changed.
>> > > > >
>> > > > > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang <
>> wangg...@gmail.com>
>> > > > wrote:
>> > > > >
>> > > > > > Hi Philippe,
>> > > > > >
>> > > > > > Great, since you agree with my reasonings, I have created a JIRA
>> > > ticket
>> > > > > for
>> > > > > > optimizing KTableFilter (feel free to pick it up if you are
>> > > interested
>> > > > in
>> > > > > > contributing):
>> > > > > >
>> > > > > > https://issues.apache.org/jira/browse/KAFKA-3902
>> > > > > >
>> > > > > > About case 3-c-1), what I meant is that since "predicate return
>> > true
>> > > on
>> > > > > > both",
>> > > > > > the resulted pair would just be the same as the original pair.
>> > > > > >
>> > > > > > About KIP-63, itself is a rather big story, but it has one
>> > > > correspondence
>> > > > > > to this JIRA: with caching you can dedup some records with the
>> same
>> > > > key,
>> > > > > > for example in the input records to the KTable is:
>> > > > > >
>> > > > > > <a: 1>, <a: 2>, <a: 3>, <a: 4>, <a: 5>, <a: 6> ...
>> > > > > >
>> > > > > > And the KTable is materialized into a state store with cache on
>> top
>> > > of
>> > > > > it,
>> > > > > > then the resulted downstream could be:
>> > > > > >
>> > > > > > <a: {null -> 1}>, <a: {1 -> 6}> ...
>> > > > > >
>> > > > > > Instead of
>> > > > > >
>> > > > > > <a: {null -> 1}>, <a: {1 -> 2}>, <a: {2 -> 3}>, ... <a: {5 ->
>> 6}>
>> > ...
>> > > > > >
>> > > > > > So if it is piped to a filter() operator, then even less data
>> will
>> > be
>> > > > > > produced.
>> > > > > >
>> > > > > >
>> > > > > > Guozhang
>> > > > > >
>> > > > > >
>> > > > > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome <
>> > phder...@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > >
>> > > > > > > Yes, it looks very good. Your detailed explanation appears
>> > > compelling
>> > > > > > > enough to reveal that some of the details of the complexity
>> of a
>> > > > > streams
>> > > > > > > system are probably inherent complexity (not that I dared
>> assume
>> > it
>> > > > was
>> > > > > > > "easy" but I could afford to be conveniently unaware). It
>> took me
>> > > 30
>> > > > > > > minutes to grasp this latest response.
>> > > > > > >
>> > > > > > > There might be a typo in your email for case 3.c.1) as I would
>> > > think
>> > > > we
>> > > > > > > should send the most recent pair as opposed to original, in
>> any
>> > > event
>> > > > > it
>> > > > > > > does not materially impact your presentation.
>> > > > > > >
>> > > > > > > Your case 3a) is really what triggered my line of questioning
>> and
>> > I
>> > > > > found
>> > > > > > > the current behaviour vexing as it may lead to some
>> undesirable
>> > and
>> > > > > > > necessary filter (see Michael G. Noll's fix in
>> > > > UserRegionLambdaExample
>> > > > > at
>> > > > > > > the very end trying to weed out null) used to output to topic
>> to
>> > > > > console.
>> > > > > > > Without looking at design, it seemed self-evident to me that
>> the
>> > > 3a)
>> > > > > > > behaviour had to be implemented ( from my point of view with
>> the
>> > > code
>> > > > > > > example I was looking at, it simply means never say to delete
>> a
>> > key
>> > > > > that
>> > > > > > > was never created, simply don't "create a deleted" key).
>> > > > > > >
>> > > > > > > Likewise cases 3 b,c look very reasonable.
>> > > > > > >
>> > > > > > > Just out of curiosity, did you effectively just restate the
>> > essence
>> > > > of
>> > > > > > > KIP-63 in a more approachable language I could understand or
>> is
>> > > > KIP-63
>> > > > > > > really a different beast?
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang <
>> > wangg...@gmail.com
>> > > >
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hello Philippe,
>> > > > > > > >
>> > > > > > > > Very good points, let me dump my thoughts about
>> "KTable.filter"
>> > > > > > > > specifically and how we can improve on that:
>> > > > > > > >
>> > > > > > > > 1. Some context: when a KTable participates in a downstream
>> > > > operators
>> > > > > > > (e.g.
>> > > > > > > > if that operator is an aggregation), then we need to
>> > materialize
>> > > > this
>> > > > > > > > KTable and send both its old value as well as new value as a
>> > pair
>> > > > > {old
>> > > > > > ->
>> > > > > > > > new} to the downstream operator. In practice it usually
>> needs
>> > to
>> > > > send
>> > > > > > the
>> > > > > > > > pair.
>> > > > > > > >
>> > > > > > > > So let's discuss about them separately, take the following
>> > > example
>> > > > > > source
>> > > > > > > > stream for your KTable
>> > > > > > > >
>> > > > > > > > <a: 1>, <b: 2>, <a: 3> ...
>> > > > > > > >
>> > > > > > > > When the KTable needs to be materialized, it will transform
>> the
>> > > > > source
>> > > > > > > > messages into the pairs of:
>> > > > > > > >
>> > > > > > > > <a: {null -> 1}>, <b: {nul -> 2}>, <a: {1 -> 3}>
>> > > > > > > >
>> > > > > > > > 2. If "send old value" is not enabled, then when the filter
>> > > > predicate
>> > > > > > > > returns false, we MUST send a <key: null> to the downstream
>> > > > operator
>> > > > > to
>> > > > > > > > indicate that this key is being filtered in the table.
>> > Otherwise,
>> > > > for
>> > > > > > > > example if your filter is "value < 2", then the updated
>> value
>> > <a:
>> > > > 3>
>> > > > > > will
>> > > > > > > > just be filtered, resulting in incorrect semantics.
>> > > > > > > >
>> > > > > > > > If it returns true we should still send the original <key:
>> > value>
>> > > > to
>> > > > > > > > downstream operators.
>> > > > > > > >
>> > > > > > > > 3. If "send old value" is enabled, then there are a couple
>> of
>> > > cases
>> > > > > we
>> > > > > > > can
>> > > > > > > > consider:
>> > > > > > > >
>> > > > > > > >     a. If old value is <key: null> and new value is <key:
>> > > > not-null>,
>> > > > > > and
>> > > > > > > > the filter predicate return false for the new value, then in
>> > this
>> > > > > case
>> > > > > > it
>> > > > > > > > is safe to optimize and not returning anything to the
>> > downstream
>> > > > > > > operator,
>> > > > > > > > since in this case we know there is no value for the key
>> > > previously
>> > > > > > > > anyways; otherwise we send the original pair.
>> > > > > > > >
>> > > > > > > >     b. If old value is <key: not-null> and new value is
>> <key:
>> > > > null>,
>> > > > > > > > indicating to delete this key, and the filter predicate
>> return
>> > > > false
>> > > > > > for
>> > > > > > > > the old value, then in this case it is safe to optimize and
>> not
>> > > > > > returning
>> > > > > > > > anything to the downstream operator, since we know that the
>> old
>> > > > value
>> > > > > > has
>> > > > > > > > already been filtered in a previous message; otherwise we
>> send
>> > > the
>> > > > > > > original
>> > > > > > > > pair.
>> > > > > > > >
>> > > > > > > >     c. If both old and new values are not null, and:
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >   1) predicate return true on both, send the original pair;
>> > > > > > > >
>> > > > > > > >   2) predicate return false on both, we can optimize and do
>> not
>> > > > send
>> > > > > > > > anything;
>> > > > > > > >
>> > > > > > > >   3) predicate return true on old and false on new, send the
>> > key:
>> > > > > {old
>> > > > > > ->
>> > > > > > > > null};
>> > > > > > > >
>> > > > > > > >   4) predicate return false on old and true on new, send the
>> > key:
>> > > > > {null
>> > > > > > > ->
>> > > > > > > > new};
>> > > > > > > >
>> > > > > > > > Does this sounds good to you?
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Guozhang
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Thu, Jun 23, 2016 at 6:17 PM, Philippe Derome <
>> > > > phder...@gmail.com
>> > > > > >
>> > > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Thanks a lot for the detailed feedback, its clarity and
>> the
>> > > > > reference
>> > > > > > > to
>> > > > > > > > > KIP-63, which however is for the most part above my head
>> for
>> > > now.
>> > > > > > > > >
>> > > > > > > > > Having said that, I still hold the view that the
>> behaviour I
>> > > > > > presented
>> > > > > > > is
>> > > > > > > > > undesirable and hardly defensible and we may have no
>> choice
>> > but
>> > > > to
>> > > > > > > agree
>> > > > > > > > to
>> > > > > > > > > disagree and it could be a sterile discussion to keep at
>> it
>> > and
>> > > > > > > > addressing
>> > > > > > > > > KIP-63 and other issues are more important than my brief
>> > > > > observation.
>> > > > > > > > >
>> > > > > > > > > What follows supports my point of view that the filter
>> method
>> > > is
>> > > > > not
>> > > > > > > > > behaving as expected and I'd still think it's a defect,
>> > > however I
>> > > > > am
>> > > > > > > > > guarded with my observation admitting my status of "total
>> > > newbie"
>> > > > > at
>> > > > > > > > stream
>> > > > > > > > > processing and Kafka.
>> > > > > > > > >
>> > > > > > > > > if we rewrite the code snippet I provided from
>> > > > > > > > > KTable<String, *String*> regionCounts = userRegions
>> > > > > > > > >      .groupBy((userId, region) -> KeyValue.pair(region,
>> > > region))
>> > > > > > > > >      .count("CountsByRegion")
>> > > > > > > > >      .filter((regionName, count) -> false)
>> > > > > > > > >      .mapValues(count -> count.toString());
>> > > > > > > > >
>> > > > > > > > > to
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > KTable<String, Long> regionCounts1 = userRegions
>> > > > > > > > >     .groupBy((userId, region) -> KeyValue.pair(region,
>> > region))
>> > > > > > > > >     .count("CountsByRegion");
>> > > > > > > > >
>> > > > > > > > > KTable<String, String> regionCounts = regionCounts1
>> > > > > > > > >     .filter((regionName, count) -> false)
>> > > > > > > > >     .mapValues(count -> count.toString());
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > It becomes clear that regionCounts1 could build up plenty
>> of
>> > > keys
>> > > > > > with
>> > > > > > > > > valid Long counts, normal behaviour
>> > > > > > > > >
>> > > > > > > > >  (I think you call this a node in the topology in KIP-63
>> and
>> > > > > > > > > regionCounts is a successor node).
>> > > > > > > > >
>> > > > > > > > > These regionCounts1 keys are then exposed to evaluation of
>> > > KTable
>> > > > > > > > > regionCounts as an input. But why should there be any key
>> > > created
>> > > > > in
>> > > > > > > > > KTable regionCounts that has a false filter? In other
>> words,
>> > > the
>> > > > > > > > > "optimization"
>> > > > > > > > >
>> > > > > > > > > seems really compelling here: do not create a key before
>> that
>> > > key
>> > > > > > > > > becomes relevant. The key with a null value is valid and
>> > > relevant
>> > > > > in
>> > > > > > > > > regionCounts1 but not regionCounts. By a programming
>> > > composition
>> > > > > > > > > argument, the original block
>> > > > > > > > >
>> > > > > > > > > of code I presented should be equivalent to the broken
>> down
>> > one
>> > > > in
>> > > > > > two
>> > > > > > > > > blocks here (and I guess that's saying 1 unified node in
>> the
>> > > > > topology
>> > > > > > > > > should be equivalent to a chain of 2 nodes represented
>> below
>> > > if I
>> > > > > > > > > understand the terminology right).
>> > > > > > > > >
>> > > > > > > > > The contents of regionCounts should not change depending
>> on
>> > the
>> > > > set
>> > > > > > of
>> > > > > > > > > keys present in regionCounts1 if we view this
>> > > > > > > > >
>> > > > > > > > > from a functional programming point of view (it's as if we
>> > are
>> > > > > > > > > carrying garbage collected objects into regionCounts),
>> which
>> > > > seems
>> > > > > > > > > natural considering the method filter that is pervasive in
>> > FP.
>> > > > > > > > >
>> > > > > > > > > Here regionCounts is totally oblivious that aggregation
>> took
>> > > > place
>> > > > > > > > > previously in regionCounts1 and that's fine (KIP-63 talks
>> > much
>> > > > > about
>> > > > > > > > > aggregation but I don't really care about, I care about
>> the
>> > 2nd
>> > > > > node
>> > > > > > > > > and the behaviour of filter).
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Thu, Jun 23, 2016 at 6:13 PM, Guozhang Wang <
>> > > > wangg...@gmail.com
>> > > > > >
>> > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hello Philippe,
>> > > > > > > > > >
>> > > > > > > > > > I think your question is really in two-folds:
>> > > > > > > > > >
>> > > > > > > > > > 1. What is the semantic difference between a KTable and
>> a
>> > > > > KStream,
>> > > > > > > and
>> > > > > > > > > more
>> > > > > > > > > > specifically how should we interpret (key, null) in
>> KTable?
>> > > > > > > > > >
>> > > > > > > > > > You can find some explanations in this documentation:
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> http://docs.confluent.io/3.0.0/streams/concepts.html#ktable-changelog-stream
>> > > > > > > > > >
>> > > > > > > > > > Note that KTable itself is still a stream behind the
>> scene,
>> > > > > > although
>> > > > > > > it
>> > > > > > > > > may
>> > > > > > > > > > be materialized when necessary. And specifically to your
>> > > > > question,
>> > > > > > > > (key,
>> > > > > > > > > > null) can be treated as a tombstone on the specified
>> key,
>> > and
>> > > > > when
>> > > > > > > this
>> > > > > > > > > > KTable stream is materialized, it will result in a
>> "delete"
>> > > on
>> > > > > > > > > materialized
>> > > > > > > > > > view.
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > 2. As for the "filter" operator, yes it will generate a
>> > large
>> > > > > > amount
>> > > > > > > of
>> > > > > > > > > > (key, null) records which indicates "delete" in the
>> > resulted
>> > > > > > KTable,
>> > > > > > > > and
>> > > > > > > > > > hence large traffic to the piped topic. But we are
>> working
>> > on
>> > > > > > KIP-63
>> > > > > > > > > which
>> > > > > > > > > > unifies the caching mechanism in the `KTable.to`
>> operator
>> > as
>> > > > well
>> > > > > > so
>> > > > > > > > that
>> > > > > > > > > > de-duping can be done in this operator and hence the
>> > outgoing
>> > > > > > traffic
>> > > > > > > > can
>> > > > > > > > > > be largely reduced:
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > Guozhang
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > On Thu, Jun 23, 2016 at 5:50 AM, Philippe Derome <
>> > > > > > phder...@gmail.com
>> > > > > > > >
>> > > > > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > I made a modification of latest Confluent's example
>> > > > > > > > > > > UserRegionLambdaExample. See relevant code at end of
>> > email.
>> > > > > > > > > > >
>> > > > > > > > > > > Am I correct in understanding that KTable semantics
>> > should
>> > > be
>> > > > > > > similar
>> > > > > > > > > to
>> > > > > > > > > > a
>> > > > > > > > > > > store-backed cache of a view as (per wikipedia on
>> > > > materialized
>> > > > > > > views)
>> > > > > > > > > or
>> > > > > > > > > > > similar to Oracle's materialized views and indexed
>> views?
>> > > > More
>> > > > > > > > > > > specifically, I am looking at when a (key, null value)
>> > pair
>> > > > can
>> > > > > > > make
>> > > > > > > > it
>> > > > > > > > > > > into KTable on generating table from a valid KStream
>> with
>> > a
>> > > > > false
>> > > > > > > > > filter.
>> > > > > > > > > > >
>> > > > > > > > > > > Here's relevant code modified from example for which I
>> > > > observed
>> > > > > > > that
>> > > > > > > > > all
>> > > > > > > > > > > keys within userRegions are sent out to topic
>> > LargeRegions
>> > > > > with a
>> > > > > > > > null
>> > > > > > > > > > > value. I would think that both regionCounts KTable and
>> > > topic
>> > > > > > > > > LargeRegions
>> > > > > > > > > > > should be empty so that the cached view agrees with
>> the
>> > > > > intended
>> > > > > > > > query
>> > > > > > > > > (a
>> > > > > > > > > > > query with an intentional empty result set as the
>> filter
>> > is
>> > > > > > > > > intentionally
>> > > > > > > > > > > false as 1 >= 2).
>> > > > > > > > > > >
>> > > > > > > > > > > I am not sure I understand implications properly as I
>> am
>> > > new
>> > > > > but
>> > > > > > it
>> > > > > > > > > seems
>> > > > > > > > > > > possible that  a highly selective filter from a large
>> > > > incoming
>> > > > > > > stream
>> > > > > > > > > > would
>> > > > > > > > > > > result in high memory usage for regionCounts and hence
>> > the
>> > > > > stream
>> > > > > > > > > > > application.
>> > > > > > > > > > >
>> > > > > > > > > > > KTable<String, *String*> regionCounts = userRegions
>> > > > > > > > > > >     // Count by region
>> > > > > > > > > > >     // We do not need to specify any explicit serdes
>> > > because
>> > > > > the
>> > > > > > > key
>> > > > > > > > > > > and value types do not change
>> > > > > > > > > > >     .groupBy((userId, region) -> KeyValue.pair(region,
>> > > > region))
>> > > > > > > > > > >     .count("CountsByRegion")
>> > > > > > > > > > >     // discard any regions FOR SAKE OF EXAMPLE
>> > > > > > > > > > >     .filter((regionName, count) -> *1 >= 2*)
>> > > > > > > > > > >     .mapValues(count -> count.toString());
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > KStream<String, *String*> regionCountsForConsole =
>> > > > > > > > > > regionCounts.toStream();
>> > > > > > > > > > >
>> > > > > > > > > > > regionCountsForConsole.to(stringSerde, *stringSerde*,
>> > > > > > > > "LargeRegions");
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > --
>> > > > > > > > > > -- Guozhang
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > --
>> > > > > > > > -- Guozhang
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > -- Guozhang
>> > > > > >
>> > > > >
>> > > >
>> > > >
>> > > >
>> > > > --
>> > > > -- Guozhang
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>

Reply via email to