Yes, they are related in the sense that if we always materialize a source
KTable, then we can completely replace the `sendOldValues` as it will
always be true. But since 3911 is a rather big change, I'd prefer to
complete this ticket first, and refactor it when we decided to work on 3911
later.

Feel free to link these two tickets though.

Guozhang

On Tue, Jun 28, 2016 at 9:47 AM, Philippe Derome <phder...@gmail.com> wrote:

> Is this point of view consistent with new ticket 3911 (Enforce KTable
> materialisation ) just submitted by Eno. T?
>
> Should the two tickets be linked somehow if they are related?
> My concern is that, the overhead of requesting the source KTable to be
> materialized (i.e. creating a state store, and sending the {old -> new}
> pair instead of the new value only) may be over-whelming compared with its
> potential benefits of reducing the downstream traffic.
>
> Guozhang
>
> On Sun, Jun 26, 2016 at 8:58 AM, Philippe Derome <phder...@gmail.com>
> wrote:
>
> > Guozhang,
> >
> > would you say it's advisable to initialize KTableFilter.sendOldValues to
> > true instead of false? That's what I see that can trigger your described
> > case 3 to potentially desirable effect, but I didn't include it into pull
> > request. If left to default value of false, I don't know what mechanism
> > should override it to true.
> >
> > Phil
> >
> > On Sun, Jun 26, 2016 at 12:07 AM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> > > Thanks! You can follow this step-by-step guidance to contribute to
> Kafka
> > > via github.
> > >
> > >
> > >
> >
>
> https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome <phder...@gmail.com>
> > > wrote:
> > >
> > > > I have a 1 liner solution for this in KTableFilter.java and about 5-6
> > > lines
> > > > changes to existing unit test KTableFilterTest.testSendingOldValue. I
> > > > included those lines with context in the JIRA. I am struggling a bit
> > with
> > > > github being new to it and how to do a proper pull request so
> hopefully
> > > > that can be followed up by you? I had the streams test suite pass
> aside
> > > for
> > > > a few cases that pertain specifically to this JIRA as assumptions
> have
> > > now
> > > > changed.
> > > >
> > > > On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Philippe,
> > > > >
> > > > > Great, since you agree with my reasonings, I have created a JIRA
> > ticket
> > > > for
> > > > > optimizing KTableFilter (feel free to pick it up if you are
> > interested
> > > in
> > > > > contributing):
> > > > >
> > > > > https://issues.apache.org/jira/browse/KAFKA-3902
> > > > >
> > > > > About case 3-c-1), what I meant is that since "predicate return
> true
> > on
> > > > > both",
> > > > > the resulted pair would just be the same as the original pair.
> > > > >
> > > > > About KIP-63, itself is a rather big story, but it has one
> > > correspondence
> > > > > to this JIRA: with caching you can dedup some records with the same
> > > key,
> > > > > for example in the input records to the KTable is:
> > > > >
> > > > > <a: 1>, <a: 2>, <a: 3>, <a: 4>, <a: 5>, <a: 6> ...
> > > > >
> > > > > And the KTable is materialized into a state store with cache on top
> > of
> > > > it,
> > > > > then the resulted downstream could be:
> > > > >
> > > > > <a: {null -> 1}>, <a: {1 -> 6}> ...
> > > > >
> > > > > Instead of
> > > > >
> > > > > <a: {null -> 1}>, <a: {1 -> 2}>, <a: {2 -> 3}>, ... <a: {5 -> 6}>
> ...
> > > > >
> > > > > So if it is piped to a filter() operator, then even less data will
> be
> > > > > produced.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome <
> phder...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Yes, it looks very good. Your detailed explanation appears
> > compelling
> > > > > > enough to reveal that some of the details of the complexity of a
> > > > streams
> > > > > > system are probably inherent complexity (not that I dared assume
> it
> > > was
> > > > > > "easy" but I could afford to be conveniently unaware). It took me
> > 30
> > > > > > minutes to grasp this latest response.
> > > > > >
> > > > > > There might be a typo in your email for case 3.c.1) as I would
> > think
> > > we
> > > > > > should send the most recent pair as opposed to original, in any
> > event
> > > > it
> > > > > > does not materially impact your presentation.
> > > > > >
> > > > > > Your case 3a) is really what triggered my line of questioning and
> I
> > > > found
> > > > > > the current behaviour vexing as it may lead to some undesirable
> and
> > > > > > necessary filter (see Michael G. Noll's fix in
> > > UserRegionLambdaExample
> > > > at
> > > > > > the very end trying to weed out null) used to output to topic to
> > > > console.
> > > > > > Without looking at design, it seemed self-evident to me that the
> > 3a)
> > > > > > behaviour had to be implemented ( from my point of view with the
> > code
> > > > > > example I was looking at, it simply means never say to delete a
> key
> > > > that
> > > > > > was never created, simply don't "create a deleted" key).
> > > > > >
> > > > > > Likewise cases 3 b,c look very reasonable.
> > > > > >
> > > > > > Just out of curiosity, did you effectively just restate the
> essence
> > > of
> > > > > > KIP-63 in a more approachable language I could understand or is
> > > KIP-63
> > > > > > really a different beast?
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang <
> wangg...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hello Philippe,
> > > > > > >
> > > > > > > Very good points, let me dump my thoughts about "KTable.filter"
> > > > > > > specifically and how we can improve on that:
> > > > > > >
> > > > > > > 1. Some context: when a KTable participates in a downstream
> > > operators
> > > > > > (e.g.
> > > > > > > if that operator is an aggregation), then we need to
> materialize
> > > this
> > > > > > > KTable and send both its old value as well as new value as a
> pair
> > > > {old
> > > > > ->
> > > > > > > new} to the downstream operator. In practice it usually needs
> to
> > > send
> > > > > the
> > > > > > > pair.
> > > > > > >
> > > > > > > So let's discuss about them separately, take the following
> > example
> > > > > source
> > > > > > > stream for your KTable
> > > > > > >
> > > > > > > <a: 1>, <b: 2>, <a: 3> ...
> > > > > > >
> > > > > > > When the KTable needs to be materialized, it will transform the
> > > > source
> > > > > > > messages into the pairs of:
> > > > > > >
> > > > > > > <a: {null -> 1}>, <b: {nul -> 2}>, <a: {1 -> 3}>
> > > > > > >
> > > > > > > 2. If "send old value" is not enabled, then when the filter
> > > predicate
> > > > > > > returns false, we MUST send a <key: null> to the downstream
> > > operator
> > > > to
> > > > > > > indicate that this key is being filtered in the table.
> Otherwise,
> > > for
> > > > > > > example if your filter is "value < 2", then the updated value
> <a:
> > > 3>
> > > > > will
> > > > > > > just be filtered, resulting in incorrect semantics.
> > > > > > >
> > > > > > > If it returns true we should still send the original <key:
> value>
> > > to
> > > > > > > downstream operators.
> > > > > > >
> > > > > > > 3. If "send old value" is enabled, then there are a couple of
> > cases
> > > > we
> > > > > > can
> > > > > > > consider:
> > > > > > >
> > > > > > >     a. If old value is <key: null> and new value is <key:
> > > not-null>,
> > > > > and
> > > > > > > the filter predicate return false for the new value, then in
> this
> > > > case
> > > > > it
> > > > > > > is safe to optimize and not returning anything to the
> downstream
> > > > > > operator,
> > > > > > > since in this case we know there is no value for the key
> > previously
> > > > > > > anyways; otherwise we send the original pair.
> > > > > > >
> > > > > > >     b. If old value is <key: not-null> and new value is <key:
> > > null>,
> > > > > > > indicating to delete this key, and the filter predicate return
> > > false
> > > > > for
> > > > > > > the old value, then in this case it is safe to optimize and not
> > > > > returning
> > > > > > > anything to the downstream operator, since we know that the old
> > > value
> > > > > has
> > > > > > > already been filtered in a previous message; otherwise we send
> > the
> > > > > > original
> > > > > > > pair.
> > > > > > >
> > > > > > >     c. If both old and new values are not null, and:
> > > > > > >
> > > > > > >
> > > > > > >   1) predicate return true on both, send the original pair;
> > > > > > >
> > > > > > >   2) predicate return false on both, we can optimize and do not
> > > send
> > > > > > > anything;
> > > > > > >
> > > > > > >   3) predicate return true on old and false on new, send the
> key:
> > > > {old
> > > > > ->
> > > > > > > null};
> > > > > > >
> > > > > > >   4) predicate return false on old and true on new, send the
> key:
> > > > {null
> > > > > > ->
> > > > > > > new};
> > > > > > >
> > > > > > > Does this sounds good to you?
> > > > > > >
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Jun 23, 2016 at 6:17 PM, Philippe Derome <
> > > phder...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks a lot for the detailed feedback, its clarity and the
> > > > reference
> > > > > > to
> > > > > > > > KIP-63, which however is for the most part above my head for
> > now.
> > > > > > > >
> > > > > > > > Having said that, I still hold the view that the behaviour I
> > > > > presented
> > > > > > is
> > > > > > > > undesirable and hardly defensible and we may have no choice
> but
> > > to
> > > > > > agree
> > > > > > > to
> > > > > > > > disagree and it could be a sterile discussion to keep at it
> and
> > > > > > > addressing
> > > > > > > > KIP-63 and other issues are more important than my brief
> > > > observation.
> > > > > > > >
> > > > > > > > What follows supports my point of view that the filter method
> > is
> > > > not
> > > > > > > > behaving as expected and I'd still think it's a defect,
> > however I
> > > > am
> > > > > > > > guarded with my observation admitting my status of "total
> > newbie"
> > > > at
> > > > > > > stream
> > > > > > > > processing and Kafka.
> > > > > > > >
> > > > > > > > if we rewrite the code snippet I provided from
> > > > > > > > KTable<String, *String*> regionCounts = userRegions
> > > > > > > >      .groupBy((userId, region) -> KeyValue.pair(region,
> > region))
> > > > > > > >      .count("CountsByRegion")
> > > > > > > >      .filter((regionName, count) -> false)
> > > > > > > >      .mapValues(count -> count.toString());
> > > > > > > >
> > > > > > > > to
> > > > > > > >
> > > > > > > >
> > > > > > > > KTable<String, Long> regionCounts1 = userRegions
> > > > > > > >     .groupBy((userId, region) -> KeyValue.pair(region,
> region))
> > > > > > > >     .count("CountsByRegion");
> > > > > > > >
> > > > > > > > KTable<String, String> regionCounts = regionCounts1
> > > > > > > >     .filter((regionName, count) -> false)
> > > > > > > >     .mapValues(count -> count.toString());
> > > > > > > >
> > > > > > > >
> > > > > > > > It becomes clear that regionCounts1 could build up plenty of
> > keys
> > > > > with
> > > > > > > > valid Long counts, normal behaviour
> > > > > > > >
> > > > > > > >  (I think you call this a node in the topology in KIP-63 and
> > > > > > > > regionCounts is a successor node).
> > > > > > > >
> > > > > > > > These regionCounts1 keys are then exposed to evaluation of
> > KTable
> > > > > > > > regionCounts as an input. But why should there be any key
> > created
> > > > in
> > > > > > > > KTable regionCounts that has a false filter? In other words,
> > the
> > > > > > > > "optimization"
> > > > > > > >
> > > > > > > > seems really compelling here: do not create a key before that
> > key
> > > > > > > > becomes relevant. The key with a null value is valid and
> > relevant
> > > > in
> > > > > > > > regionCounts1 but not regionCounts. By a programming
> > composition
> > > > > > > > argument, the original block
> > > > > > > >
> > > > > > > > of code I presented should be equivalent to the broken down
> one
> > > in
> > > > > two
> > > > > > > > blocks here (and I guess that's saying 1 unified node in the
> > > > topology
> > > > > > > > should be equivalent to a chain of 2 nodes represented below
> > if I
> > > > > > > > understand the terminology right).
> > > > > > > >
> > > > > > > > The contents of regionCounts should not change depending on
> the
> > > set
> > > > > of
> > > > > > > > keys present in regionCounts1 if we view this
> > > > > > > >
> > > > > > > > from a functional programming point of view (it's as if we
> are
> > > > > > > > carrying garbage collected objects into regionCounts), which
> > > seems
> > > > > > > > natural considering the method filter that is pervasive in
> FP.
> > > > > > > >
> > > > > > > > Here regionCounts is totally oblivious that aggregation took
> > > place
> > > > > > > > previously in regionCounts1 and that's fine (KIP-63 talks
> much
> > > > about
> > > > > > > > aggregation but I don't really care about, I care about the
> 2nd
> > > > node
> > > > > > > > and the behaviour of filter).
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Jun 23, 2016 at 6:13 PM, Guozhang Wang <
> > > wangg...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hello Philippe,
> > > > > > > > >
> > > > > > > > > I think your question is really in two-folds:
> > > > > > > > >
> > > > > > > > > 1. What is the semantic difference between a KTable and a
> > > > KStream,
> > > > > > and
> > > > > > > > more
> > > > > > > > > specifically how should we interpret (key, null) in KTable?
> > > > > > > > >
> > > > > > > > > You can find some explanations in this documentation:
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
> http://docs.confluent.io/3.0.0/streams/concepts.html#ktable-changelog-stream
> > > > > > > > >
> > > > > > > > > Note that KTable itself is still a stream behind the scene,
> > > > > although
> > > > > > it
> > > > > > > > may
> > > > > > > > > be materialized when necessary. And specifically to your
> > > > question,
> > > > > > > (key,
> > > > > > > > > null) can be treated as a tombstone on the specified key,
> and
> > > > when
> > > > > > this
> > > > > > > > > KTable stream is materialized, it will result in a "delete"
> > on
> > > > > > > > materialized
> > > > > > > > > view.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > 2. As for the "filter" operator, yes it will generate a
> large
> > > > > amount
> > > > > > of
> > > > > > > > > (key, null) records which indicates "delete" in the
> resulted
> > > > > KTable,
> > > > > > > and
> > > > > > > > > hence large traffic to the piped topic. But we are working
> on
> > > > > KIP-63
> > > > > > > > which
> > > > > > > > > unifies the caching mechanism in the `KTable.to` operator
> as
> > > well
> > > > > so
> > > > > > > that
> > > > > > > > > de-duping can be done in this operator and hence the
> outgoing
> > > > > traffic
> > > > > > > can
> > > > > > > > > be largely reduced:
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Guozhang
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, Jun 23, 2016 at 5:50 AM, Philippe Derome <
> > > > > phder...@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > I made a modification of latest Confluent's example
> > > > > > > > > > UserRegionLambdaExample. See relevant code at end of
> email.
> > > > > > > > > >
> > > > > > > > > > Am I correct in understanding that KTable semantics
> should
> > be
> > > > > > similar
> > > > > > > > to
> > > > > > > > > a
> > > > > > > > > > store-backed cache of a view as (per wikipedia on
> > > materialized
> > > > > > views)
> > > > > > > > or
> > > > > > > > > > similar to Oracle's materialized views and indexed views?
> > > More
> > > > > > > > > > specifically, I am looking at when a (key, null value)
> pair
> > > can
> > > > > > make
> > > > > > > it
> > > > > > > > > > into KTable on generating table from a valid KStream with
> a
> > > > false
> > > > > > > > filter.
> > > > > > > > > >
> > > > > > > > > > Here's relevant code modified from example for which I
> > > observed
> > > > > > that
> > > > > > > > all
> > > > > > > > > > keys within userRegions are sent out to topic
> LargeRegions
> > > > with a
> > > > > > > null
> > > > > > > > > > value. I would think that both regionCounts KTable and
> > topic
> > > > > > > > LargeRegions
> > > > > > > > > > should be empty so that the cached view agrees with the
> > > > intended
> > > > > > > query
> > > > > > > > (a
> > > > > > > > > > query with an intentional empty result set as the filter
> is
> > > > > > > > intentionally
> > > > > > > > > > false as 1 >= 2).
> > > > > > > > > >
> > > > > > > > > > I am not sure I understand implications properly as I am
> > new
> > > > but
> > > > > it
> > > > > > > > seems
> > > > > > > > > > possible that  a highly selective filter from a large
> > > incoming
> > > > > > stream
> > > > > > > > > would
> > > > > > > > > > result in high memory usage for regionCounts and hence
> the
> > > > stream
> > > > > > > > > > application.
> > > > > > > > > >
> > > > > > > > > > KTable<String, *String*> regionCounts = userRegions
> > > > > > > > > >     // Count by region
> > > > > > > > > >     // We do not need to specify any explicit serdes
> > because
> > > > the
> > > > > > key
> > > > > > > > > > and value types do not change
> > > > > > > > > >     .groupBy((userId, region) -> KeyValue.pair(region,
> > > region))
> > > > > > > > > >     .count("CountsByRegion")
> > > > > > > > > >     // discard any regions FOR SAKE OF EXAMPLE
> > > > > > > > > >     .filter((regionName, count) -> *1 >= 2*)
> > > > > > > > > >     .mapValues(count -> count.toString());
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > KStream<String, *String*> regionCountsForConsole =
> > > > > > > > > regionCounts.toStream();
> > > > > > > > > >
> > > > > > > > > > regionCountsForConsole.to(stringSerde, *stringSerde*,
> > > > > > > "LargeRegions");
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > --
> > > > > > > > > -- Guozhang
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Reply via email to