Thanks! You can follow this step-by-step guidance to contribute to Kafka
via github.

https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest


Guozhang


On Sat, Jun 25, 2016 at 8:40 PM, Philippe Derome <phder...@gmail.com> wrote:

> I have a 1 liner solution for this in KTableFilter.java and about 5-6 lines
> changes to existing unit test KTableFilterTest.testSendingOldValue. I
> included those lines with context in the JIRA. I am struggling a bit with
> github being new to it and how to do a proper pull request so hopefully
> that can be followed up by you? I had the streams test suite pass aside for
> a few cases that pertain specifically to this JIRA as assumptions have now
> changed.
>
> On Sat, Jun 25, 2016 at 1:14 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Philippe,
> >
> > Great, since you agree with my reasonings, I have created a JIRA ticket
> for
> > optimizing KTableFilter (feel free to pick it up if you are interested in
> > contributing):
> >
> > https://issues.apache.org/jira/browse/KAFKA-3902
> >
> > About case 3-c-1), what I meant is that since "predicate return true on
> > both",
> > the resulted pair would just be the same as the original pair.
> >
> > About KIP-63, itself is a rather big story, but it has one correspondence
> > to this JIRA: with caching you can dedup some records with the same key,
> > for example in the input records to the KTable is:
> >
> > <a: 1>, <a: 2>, <a: 3>, <a: 4>, <a: 5>, <a: 6> ...
> >
> > And the KTable is materialized into a state store with cache on top of
> it,
> > then the resulted downstream could be:
> >
> > <a: {null -> 1}>, <a: {1 -> 6}> ...
> >
> > Instead of
> >
> > <a: {null -> 1}>, <a: {1 -> 2}>, <a: {2 -> 3}>, ... <a: {5 -> 6}> ...
> >
> > So if it is piped to a filter() operator, then even less data will be
> > produced.
> >
> >
> > Guozhang
> >
> >
> > On Fri, Jun 24, 2016 at 5:58 PM, Philippe Derome <phder...@gmail.com>
> > wrote:
> >
> > > Yes, it looks very good. Your detailed explanation appears compelling
> > > enough to reveal that some of the details of the complexity of a
> streams
> > > system are probably inherent complexity (not that I dared assume it was
> > > "easy" but I could afford to be conveniently unaware). It took me 30
> > > minutes to grasp this latest response.
> > >
> > > There might be a typo in your email for case 3.c.1) as I would think we
> > > should send the most recent pair as opposed to original, in any event
> it
> > > does not materially impact your presentation.
> > >
> > > Your case 3a) is really what triggered my line of questioning and I
> found
> > > the current behaviour vexing as it may lead to some undesirable and
> > > necessary filter (see Michael G. Noll's fix in UserRegionLambdaExample
> at
> > > the very end trying to weed out null) used to output to topic to
> console.
> > > Without looking at design, it seemed self-evident to me that the 3a)
> > > behaviour had to be implemented ( from my point of view with the code
> > > example I was looking at, it simply means never say to delete a key
> that
> > > was never created, simply don't "create a deleted" key).
> > >
> > > Likewise cases 3 b,c look very reasonable.
> > >
> > > Just out of curiosity, did you effectively just restate the essence of
> > > KIP-63 in a more approachable language I could understand or is KIP-63
> > > really a different beast?
> > >
> > >
> > >
> > > On Fri, Jun 24, 2016 at 5:45 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Hello Philippe,
> > > >
> > > > Very good points, let me dump my thoughts about "KTable.filter"
> > > > specifically and how we can improve on that:
> > > >
> > > > 1. Some context: when a KTable participates in a downstream operators
> > > (e.g.
> > > > if that operator is an aggregation), then we need to materialize this
> > > > KTable and send both its old value as well as new value as a pair
> {old
> > ->
> > > > new} to the downstream operator. In practice it usually needs to send
> > the
> > > > pair.
> > > >
> > > > So let's discuss about them separately, take the following example
> > source
> > > > stream for your KTable
> > > >
> > > > <a: 1>, <b: 2>, <a: 3> ...
> > > >
> > > > When the KTable needs to be materialized, it will transform the
> source
> > > > messages into the pairs of:
> > > >
> > > > <a: {null -> 1}>, <b: {nul -> 2}>, <a: {1 -> 3}>
> > > >
> > > > 2. If "send old value" is not enabled, then when the filter predicate
> > > > returns false, we MUST send a <key: null> to the downstream operator
> to
> > > > indicate that this key is being filtered in the table. Otherwise, for
> > > > example if your filter is "value < 2", then the updated value <a: 3>
> > will
> > > > just be filtered, resulting in incorrect semantics.
> > > >
> > > > If it returns true we should still send the original <key: value> to
> > > > downstream operators.
> > > >
> > > > 3. If "send old value" is enabled, then there are a couple of cases
> we
> > > can
> > > > consider:
> > > >
> > > >     a. If old value is <key: null> and new value is <key: not-null>,
> > and
> > > > the filter predicate return false for the new value, then in this
> case
> > it
> > > > is safe to optimize and not returning anything to the downstream
> > > operator,
> > > > since in this case we know there is no value for the key previously
> > > > anyways; otherwise we send the original pair.
> > > >
> > > >     b. If old value is <key: not-null> and new value is <key: null>,
> > > > indicating to delete this key, and the filter predicate return false
> > for
> > > > the old value, then in this case it is safe to optimize and not
> > returning
> > > > anything to the downstream operator, since we know that the old value
> > has
> > > > already been filtered in a previous message; otherwise we send the
> > > original
> > > > pair.
> > > >
> > > >     c. If both old and new values are not null, and:
> > > >
> > > >
> > > >   1) predicate return true on both, send the original pair;
> > > >
> > > >   2) predicate return false on both, we can optimize and do not send
> > > > anything;
> > > >
> > > >   3) predicate return true on old and false on new, send the key:
> {old
> > ->
> > > > null};
> > > >
> > > >   4) predicate return false on old and true on new, send the key:
> {null
> > > ->
> > > > new};
> > > >
> > > > Does this sounds good to you?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Thu, Jun 23, 2016 at 6:17 PM, Philippe Derome <phder...@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Thanks a lot for the detailed feedback, its clarity and the
> reference
> > > to
> > > > > KIP-63, which however is for the most part above my head for now.
> > > > >
> > > > > Having said that, I still hold the view that the behaviour I
> > presented
> > > is
> > > > > undesirable and hardly defensible and we may have no choice but to
> > > agree
> > > > to
> > > > > disagree and it could be a sterile discussion to keep at it and
> > > > addressing
> > > > > KIP-63 and other issues are more important than my brief
> observation.
> > > > >
> > > > > What follows supports my point of view that the filter method is
> not
> > > > > behaving as expected and I'd still think it's a defect, however I
> am
> > > > > guarded with my observation admitting my status of "total newbie"
> at
> > > > stream
> > > > > processing and Kafka.
> > > > >
> > > > > if we rewrite the code snippet I provided from
> > > > > KTable<String, *String*> regionCounts = userRegions
> > > > >      .groupBy((userId, region) -> KeyValue.pair(region, region))
> > > > >      .count("CountsByRegion")
> > > > >      .filter((regionName, count) -> false)
> > > > >      .mapValues(count -> count.toString());
> > > > >
> > > > > to
> > > > >
> > > > >
> > > > > KTable<String, Long> regionCounts1 = userRegions
> > > > >     .groupBy((userId, region) -> KeyValue.pair(region, region))
> > > > >     .count("CountsByRegion");
> > > > >
> > > > > KTable<String, String> regionCounts = regionCounts1
> > > > >     .filter((regionName, count) -> false)
> > > > >     .mapValues(count -> count.toString());
> > > > >
> > > > >
> > > > > It becomes clear that regionCounts1 could build up plenty of keys
> > with
> > > > > valid Long counts, normal behaviour
> > > > >
> > > > >  (I think you call this a node in the topology in KIP-63 and
> > > > > regionCounts is a successor node).
> > > > >
> > > > > These regionCounts1 keys are then exposed to evaluation of KTable
> > > > > regionCounts as an input. But why should there be any key created
> in
> > > > > KTable regionCounts that has a false filter? In other words, the
> > > > > "optimization"
> > > > >
> > > > > seems really compelling here: do not create a key before that key
> > > > > becomes relevant. The key with a null value is valid and relevant
> in
> > > > > regionCounts1 but not regionCounts. By a programming composition
> > > > > argument, the original block
> > > > >
> > > > > of code I presented should be equivalent to the broken down one in
> > two
> > > > > blocks here (and I guess that's saying 1 unified node in the
> topology
> > > > > should be equivalent to a chain of 2 nodes represented below if I
> > > > > understand the terminology right).
> > > > >
> > > > > The contents of regionCounts should not change depending on the set
> > of
> > > > > keys present in regionCounts1 if we view this
> > > > >
> > > > > from a functional programming point of view (it's as if we are
> > > > > carrying garbage collected objects into regionCounts), which seems
> > > > > natural considering the method filter that is pervasive in FP.
> > > > >
> > > > > Here regionCounts is totally oblivious that aggregation took place
> > > > > previously in regionCounts1 and that's fine (KIP-63 talks much
> about
> > > > > aggregation but I don't really care about, I care about the 2nd
> node
> > > > > and the behaviour of filter).
> > > > >
> > > > >
> > > > > On Thu, Jun 23, 2016 at 6:13 PM, Guozhang Wang <wangg...@gmail.com
> >
> > > > wrote:
> > > > >
> > > > > > Hello Philippe,
> > > > > >
> > > > > > I think your question is really in two-folds:
> > > > > >
> > > > > > 1. What is the semantic difference between a KTable and a
> KStream,
> > > and
> > > > > more
> > > > > > specifically how should we interpret (key, null) in KTable?
> > > > > >
> > > > > > You can find some explanations in this documentation:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://docs.confluent.io/3.0.0/streams/concepts.html#ktable-changelog-stream
> > > > > >
> > > > > > Note that KTable itself is still a stream behind the scene,
> > although
> > > it
> > > > > may
> > > > > > be materialized when necessary. And specifically to your
> question,
> > > > (key,
> > > > > > null) can be treated as a tombstone on the specified key, and
> when
> > > this
> > > > > > KTable stream is materialized, it will result in a "delete" on
> > > > > materialized
> > > > > > view.
> > > > > >
> > > > > >
> > > > > > 2. As for the "filter" operator, yes it will generate a large
> > amount
> > > of
> > > > > > (key, null) records which indicates "delete" in the resulted
> > KTable,
> > > > and
> > > > > > hence large traffic to the piped topic. But we are working on
> > KIP-63
> > > > > which
> > > > > > unifies the caching mechanism in the `KTable.to` operator as well
> > so
> > > > that
> > > > > > de-duping can be done in this operator and hence the outgoing
> > traffic
> > > > can
> > > > > > be largely reduced:
> > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-63:+Unify+store+and+downstream+caching+in+streams
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Thu, Jun 23, 2016 at 5:50 AM, Philippe Derome <
> > phder...@gmail.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > I made a modification of latest Confluent's example
> > > > > > > UserRegionLambdaExample. See relevant code at end of email.
> > > > > > >
> > > > > > > Am I correct in understanding that KTable semantics should be
> > > similar
> > > > > to
> > > > > > a
> > > > > > > store-backed cache of a view as (per wikipedia on materialized
> > > views)
> > > > > or
> > > > > > > similar to Oracle's materialized views and indexed views? More
> > > > > > > specifically, I am looking at when a (key, null value) pair can
> > > make
> > > > it
> > > > > > > into KTable on generating table from a valid KStream with a
> false
> > > > > filter.
> > > > > > >
> > > > > > > Here's relevant code modified from example for which I observed
> > > that
> > > > > all
> > > > > > > keys within userRegions are sent out to topic LargeRegions
> with a
> > > > null
> > > > > > > value. I would think that both regionCounts KTable and topic
> > > > > LargeRegions
> > > > > > > should be empty so that the cached view agrees with the
> intended
> > > > query
> > > > > (a
> > > > > > > query with an intentional empty result set as the filter is
> > > > > intentionally
> > > > > > > false as 1 >= 2).
> > > > > > >
> > > > > > > I am not sure I understand implications properly as I am new
> but
> > it
> > > > > seems
> > > > > > > possible that  a highly selective filter from a large incoming
> > > stream
> > > > > > would
> > > > > > > result in high memory usage for regionCounts and hence the
> stream
> > > > > > > application.
> > > > > > >
> > > > > > > KTable<String, *String*> regionCounts = userRegions
> > > > > > >     // Count by region
> > > > > > >     // We do not need to specify any explicit serdes because
> the
> > > key
> > > > > > > and value types do not change
> > > > > > >     .groupBy((userId, region) -> KeyValue.pair(region, region))
> > > > > > >     .count("CountsByRegion")
> > > > > > >     // discard any regions FOR SAKE OF EXAMPLE
> > > > > > >     .filter((regionName, count) -> *1 >= 2*)
> > > > > > >     .mapValues(count -> count.toString());
> > > > > > >
> > > > > > >
> > > > > > > KStream<String, *String*> regionCountsForConsole =
> > > > > > regionCounts.toStream();
> > > > > > >
> > > > > > > regionCountsForConsole.to(stringSerde, *stringSerde*,
> > > > "LargeRegions");
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to