Hi Victoria,

Here's the other conern I mentioned. I didn't bother
bringing it up before because it would be obviated by the
last proposal.

I'm wondering about the need to add four new overloads and a
new config object. We already have 20 KTable#join overloads,
out of a total of 47 KTable operations. In other words,
after this proposal, almost 50% of the KTable interface will
be devoted to slightly different variations of join().

This kind of proliferation has a number of downsides, which
I documented in my proposal to fix it here:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar

We decided in the past to defer completely refactoring all
of the Streams DSL because we didn't forsee things becoming
much worse than they were at the time. Not to harp on this
particular KIP, but it does sort of seem like we're
approaching critical mass with 24 join overloads.

What do you think about seeking some kind of middle ground
by pulling more of the optional join arguments into the
proposed TableJoined interface? After all, starting to chip
away at this problem was the idea behind making these config
objects implement NamedOperation to begin with.

Specifically, this idea would be to add just one overload:
KTable#join(TableJoined<K, KO, VO, VR>)

And the TableJoined interface would have one static method
with the required `KTable<KO, VO> other` and `ValueJoiner<V,
VO, VR> joiner` arguments.

The rest of the arguments (isLeft, fkExtractor,
materialized, etc) would be settable using `withXYZ` builder
methods.

If that proposal isn't clear, I'd be happy to provide a code
sketch.

Thanks in advance for your consideration, and thanks again
for driving this,
-John


On Wed, 2021-09-22 at 17:34 -0500, John Roesler wrote:
> Thanks, Guozhang!
> 
> I agree it would be bigger in scope to approach it that way.
> We certainly would need to implement a way for the
> partitioner to flow down the topology, like we do with
> serdes and grace period information. It's not super complex,
> but not trivial either.
> 
> I think we've always been mildly uncomfortable with
> assumption (1), but we just kind of hope for the best for
> performance reasons. But that's because the alternative we
> considered in the past was to proactively repartition all
> inputs. If the question is instead just getting a
> partitioner and validating that the partitioning of the
> table is correct, I think the performance question is much
> different. However, we have also gone a really long time
> with assumption (1) in place, and I don't believe it has
> ever been an issue for anyone.
> 
> I appreciate your consideration. I still think it would be
> nice to approach it more from a data provenance perspective,
> but I don't feel strongly about it. I'm happy to leave it up
> to Victoria's discretion.
> 
> I have one more comment about the KIP, but I'll send a
> separate message for clarity.
> 
> Thanks again for considering it,
> -John
> 
> On Wed, 2021-09-22 at 14:50 -0700, Guozhang Wang wrote:
> > Thanks Victoria for writing the KIP! I think this is a miss when we
> > designed KIP-213 and should be fixed in syntax. Regarding how large its
> > scope should be, here's my thoughts:
> > 
> > 1) Today Streams does not take any indicator on how the input stream/table
> > are partitioned, instead it simply assumes that the input stream/table is
> > always partitioned by the key already. This is by-design that we let the
> > users to make sure this is always the case; for internal repartitioning
> > like `groupBy`s, Streams does guarantee the the repartition topics are
> > partitioned by the new grouping keys, but nevertheless the assumption still
> > holds: for all stream/table entities defined from a input topic (whether
> > external or internal), that topic is partitioned by the stream/table key,
> > and hence at the moment we do not require any partitioners to be passed in
> > since we do not need it.
> > 
> > 2) For all operators that need to write to a partition, today the
> > partitioners are either defined by the operator logic itself (think:
> > groupBys, where partitioner is hard-coded as by the grouping-key), or
> > user-customized (think: the #through/to APIs). We do not have any scenarios
> > where we need to "inherit" partitioners from parent operators, until
> > FK-joins. Here, we need to make sure: a) the left table A's input topic and
> > the internal response topic are co-partitioned; b) the right table B's
> > input topic and the internal subscription topic are co-partitioned. Of
> > course, if both left table A and right table B's input topic are
> > partitioned by the default partitioner, then both holds. But the assumption
> > above only requires that the "topic is partitioned by the key", not
> > requiring "the topic is partitioned by the key, following exactly the
> > default partitioner mechanism" (e.g. in
> > https://issues.apache.org/jira/browse/KAFKA-13261, the issue arises when a
> > different partitioner which is not based on hashing of the bytes is used,
> > which still guarantees "the input topic is partitioned by key").
> > 
> > So I feel that if we feel the assumption 1) above in Streams should still
> > hold in the long run, it's not very meaningful to require the source tables
> > to indicate their partitioners, but only require the FK join operators
> > itself to make sure the co-partition conditions a) and b) above holds. Of
> > course the easiest way is to require users to pass-in the partitioners for
> > those two internal topics, which they have to make sure are the same as the
> > input partitioners. We can also have a bit more complicated approach to
> > have some "inheritance" rule for partitioners when they are given at the
> > sink (we have similar rules for serde inheritance throughout the topology),
> > but that only fixes for "#through / #repartition" cases, but not fixes for
> > source "builder#table" cases -- i.e. we still need to require users to
> > indicate partitioners which we can hopefully inherit within the topology.
> > 
> > I agree that for IQ, it would be beneficial if we have the partitioner for
> > each table/stream entities so that IQ itself does not need the partitioners
> > to be specified, but to make this fix work, we'd probably need both 1)
> > source table/stream partitioner specification and 2) inheritance rule
> > (otherwise we'd have to enforce users to specify the same for #repartition
> > etc as well?). Given that rationale, I'm slightly leaning towards the
> > current proposal in the KIP doc, i.e. just fixing for this FK operator
> > only, with the easy approach that requires users themselves to set
> > partitioners accordingly.
> > 
> > 
> > 
> > Guozhang
> > 
> > 
> > 
> > On Wed, Sep 22, 2021 at 10:29 AM John Roesler <vvcep...@apache.org> wrote:
> > 
> > > Thanks for the KIP, Victoria!
> > > 
> > > This is a great catch. As fas as my memory serves, we
> > > totally missed this case in the design of FK joins.
> > > 
> > > tl;dr: I'm wondering if it might be better to instead
> > > introduce a way to specify the partitioning scheme when we
> > > create a KTable and then just use it when we join, rather
> > > than to specify it in the join itself.
> > > 
> > > I was initially surprised to see the proposal to add these
> > > partitioners in the join operation itself rather than
> > > inheriting them from the tables on both sides, but I
> > > reviewed the Streams DSL and see that we _cannot_ inherit it
> > > because almost all the time, the input KTables' partitioning
> > > is not known!
> > > 
> > > It seems like the only times we specify a partitioner on a
> > > DSL object is:
> > > 1. when we produce to an output topic via KStream#to or
> > > KStream#through.
> > > 2. when we repartition via KStream#repartition
> > > 
> > > These are both specifying the partitioner to use in output
> > > operations (ie, we are telling Streams the partition *to
> > > use*); there's currently only one situation in which we have
> > > to _inform_ streams about the partitioning of a KTable or
> > > KStream:
> > > 3. when we issue a key-based query via IQ, we need to know
> > > the partitioner, so the IQ interface allows us to pass in a
> > > custom partitioner with the query.
> > > 
> > > This is a bit weird. Taking a step back, the partitioning
> > > scheme is a property of the table itself, not of the query
> > > (or the join). Specifying a table property as part of a
> > > query (or join) on the table seems to be an indication that
> > > the KTable definition itself is lacking something.
> > > 
> > > Perhaps a more comprehensive approach would be to add an
> > > optional StreamPartitioner parameter to StreamBuilder#table.
> > > Then, queries and joins (and anything else that partitioner-
> > > sensitive now and in the future) could simply inherit the
> > > partitioner via the topology.
> > > 
> > > WDYT?
> > > 
> > > Thanks,
> > > -John
> > > 
> > > On Wed, 2021-09-22 at 10:43 -0400, Bill Bejeck wrote:
> > > > Hi Victoria,
> > > > 
> > > > Thanks for the KIP; this is a beneficial addition.
> > > > 
> > > > I'm a +1 on the KIP and the changes made:
> > > > 
> > > >    - Using a config object TableJoined
> > > >    - Limiting the static methods to two
> > > > 
> > > > I have an additional "wild" thought about rolling the `Function<V, KO>
> > > > foreignKeyExtractor` and `Materialized<K, VR, KeyValueStore<Bytes,
> > > > byte[]>>` into TableJoined to align and reduce the number of KTable
> > > > interfaces.  But I don't have a strong opinion on this; I am curious to
> > > see
> > > > what others think about this possibility.
> > > > 
> > > > -Bill
> > > > 
> > > > On Tue, Sep 21, 2021 at 2:52 AM Matthias J. Sax <mj...@apache.org>
> > > wrote:
> > > > 
> > > > > Thanks for updating the KIP.
> > > > > 
> > > > > One nit:
> > > > > 
> > > > > > The existing methods which accept Named will be marked for
> > > deprecation
> > > > > in 4.0.
> > > > > 
> > > > > We can skip `in 4.0`. (1) The next release will be 3.1 (not 4.0) and
> > > (2)
> > > > > a KIP could always slip into a future release.
> > > > > 
> > > > > 
> > > > > About `TableJoined`: It seems you propose to add static methods for 
> > > > > all
> > > > > possible parameter combination. We usually try to avoid this to keep
> > > the
> > > > > number of methods low; if we add too many methods, it defeats the
> > > > > purpose to use a "builder like" config object.
> > > > > 
> > > > > To me, it seems sufficient to only have two static methods:
> > > > > 
> > > > > > as(final String name);
> > > > > 
> > > > > and
> > > > > 
> > > > > > with(final StreamPartitioner<K, Void> partitioner,
> > > > > >      final StreamPartitioner<KO, Void> otherPartitioner);
> > > > > 
> > > > > The second one should allow to pass in `null` to only set one of both
> > > > > partitioners.
> > > > > 
> > > > > Curious to hear what other think.
> > > > > 
> > > > > 
> > > > > -Matthias
> > > > > 
> > > > > On 9/20/21 8:27 PM, Victoria Xia wrote:
> > > > > > Hi Matthias,
> > > > > > 
> > > > > > Thanks for having a look at the KIP! I've updated it with your
> > > suggestion
> > > > > > to introduce a new `TableJoined` object with partitioners of type
> > > > > > `StreamPartitioner<K, Void>` and `StreamPartitioner<KO, Void>`, and
> > > to
> > > > > > deprecate the existing FK join methods which accept a `Named` object
> > > > > > accordingly. I agree it makes sense to keep the number of join
> > > interfaces
> > > > > > smaller.
> > > > > > 
> > > > > > Thanks,
> > > > > > Victoria
> > > > > > 
> > > > > > On Sat, Sep 18, 2021 at 11:07 AM Matthias J. Sax <mj...@apache.org>
> > > > > wrote:
> > > > > > 
> > > > > > > Thanks for the KIP Victoria.
> > > > > > > 
> > > > > > > As pointed out on the Jira ticket by you, using `<K,V>` and
> > > `<KO,VO>` as
> > > > > > > partitioner types does not really work, because we don't have
> > > access to
> > > > > > > the right value on the left side nor have we access to the left
> > > value on
> > > > > > > the right hand side. -- I like your idea to use `Void` as value
> > > types to
> > > > > > > make it clear to the users that partitioning must be done on the
> > > key
> > > > > only.
> > > > > > > 
> > > > > > > For the proposed public API change, I would propose not to pass 
> > > > > > > the
> > > > > > > partitioners directly, but to introduce a config object (similar 
> > > > > > > to
> > > > > > > `Joined` for stream-table joins, and `StreamJoined` for
> > > stream-stream
> > > > > > > joins). This new object could also implement `NamedOperation` and
> > > thus
> > > > > > > replace `Named`. To this end, we would deprecate the existing
> > > methods
> > > > > > > using `Named` and replace them with the new methods. Net benefit
> > > is,
> > > > > > > that we don't get more overloads (after we removed the deprecated
> > > ones).
> > > > > > > 
> > > > > > > Not sure how we want to call the new object. Maybe `TableJoined` 
> > > > > > > in
> > > > > > > alignment to `StreamJoined`?
> > > > > > > 
> > > > > > > 
> > > > > > > -Matthias
> > > > > > > 
> > > > > > > On 9/15/21 3:36 PM, Victoria Xia wrote:
> > > > > > > > Hi,
> > > > > > > > 
> > > > > > > > I've opened a small KIP for adding Kafka Streams support for
> > > foreign
> > > > > key
> > > > > > > > joins on tables with custom partitioners:
> > > > > > > > 
> > > > > > > 
> > > > > 
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-775%3A+Custom+partitioners+in+foreign+key+joins
> > > > > > > > 
> > > > > > > > Feedback appreciated. Thanks!
> > > > > > > > 
> > > > > > > > - Victoria
> > > > > > > > 
> > > > > > > 
> > > > > > 
> > > > > 
> > > 
> > > 
> > > 
> > 
> 
> 


Reply via email to