Hi Mathias,

I did save it. The changes are added under Public Interfaces (Pt#2 about
enhancing KeyQueryMetadata with partitions method) and
throwing IllegalArgumentException when StreamPartitioner#partitions method
returns multiple partitions for just FK-join instead of the earlier decided
FK-Join and IQ.

The background is that for IQ, if the users have multi casted records to
multiple partitions during ingestion but the fetch returns only a single
partition, then it would be wrong. That's why the restriction was lifted
for IQ and that's the reason KeyQueryMetadata now has another partitions()
method to signify the same.

FK-Join also has a similar case, but while reviewing it was felt that
FK-Join on it's own is fairly complicated and we don't need this feature
right away so the restriction still exists.

Thanks!
Sagar.


On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax <mj...@apache.org> wrote:

> I don't see any update on the wiki about it. Did you forget to hit "save"?
>
> Can you also provide some background? I am not sure right now if I
> understand the proposed changes?
>
>
> -Matthias
>
> On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
> > Thanks Sagar, this makes sense to me -- we clearly need additional
> changes
> > to
> > avoid breaking IQ when using this feature, but I agree with continuing to
> > restrict
> > FKJ since they wouldn't stop working without it, and would become much
> > harder
> > to reason about (than they already are) if we did enable them to use it.
> >
> > And of course, they can still multicast the final results of a FKJ, they
> > just can't
> > mess with the internal workings of it in this way.
> >
> > On Tue, Dec 6, 2022 at 9:48 AM Sagar <sagarmeansoc...@gmail.com> wrote:
> >
> >> Hi All,
> >>
> >> I made a couple of edits to the KIP which came up during the code
> review.
> >> Changes at a high level are:
> >>
> >> 1) KeyQueryMetada enhanced to have a new method called partitions().
> >> 2) Lifting the restriction of a single partition for IQ. Now the
> >> restriction holds only for FK Join.
> >>
> >> Updated KIP:
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> >>
> >> Thanks!
> >> Sagar.
> >>
> >> On Mon, Sep 12, 2022 at 6:43 PM Sagar <sagarmeansoc...@gmail.com>
> wrote:
> >>
> >>> Thanks Bruno,
> >>>
> >>> Marking this as accepted.
> >>>
> >>> Thanks everyone for their comments/feedback.
> >>>
> >>> Thanks!
> >>> Sagar.
> >>>
> >>> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <cado...@apache.org>
> >> wrote:
> >>>
> >>>> Hi Sagar,
> >>>>
> >>>> Thanks for the update and the PR!
> >>>>
> >>>> +1 (binding)
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>> On 10.09.22 18:57, Sagar wrote:
> >>>>> Hi Bruno,
> >>>>>
> >>>>> Thanks, I think these changes make sense to me. I have updated the
> KIP
> >>>>> accordingly.
> >>>>>
> >>>>> Thanks!
> >>>>> Sagar.
> >>>>>
> >>>>> On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <cado...@apache.org>
> >>>> wrote:
> >>>>>
> >>>>>> Hi Sagar,
> >>>>>>
> >>>>>> I would not drop the support for dropping records. I would also not
> >>>>>> return null from partitions(). Maybe an Optional can help here. An
> >>>> empty
> >>>>>> Optional would mean to use the default partitioning behavior of the
> >>>>>> producer. So we would have:
> >>>>>>
> >>>>>> - non-empty Optional, non-empty list of integers: partitions to send
> >>>> the
> >>>>>> record to
> >>>>>> - non-empty Optional, empty list of integers: drop the record
> >>>>>> - empty Optional: use default behavior
> >>>>>>
> >>>>>> What do other think?
> >>>>>>
> >>>>>> Best,
> >>>>>> Bruno
> >>>>>>
> >>>>>> On 02.09.22 13:53, Sagar wrote:
> >>>>>>> Hello Bruno/Chris,
> >>>>>>>
> >>>>>>> Since these are the last set of changes(I am assuming haha), it
> >> would
> >>>> be
> >>>>>>> great if you could review the 2 options from above so that we can
> >>>> close
> >>>>>> the
> >>>>>>> voting. Of course I am happy to incorporate any other requisite
> >>>> changes.
> >>>>>>>
> >>>>>>> Thanks!
> >>>>>>> Sagar.
> >>>>>>>
> >>>>>>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <sagarmeansoc...@gmail.com>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks Bruno for the great points.
> >>>>>>>>
> >>>>>>>> I see 2 options here =>
> >>>>>>>>
> >>>>>>>> 1) As Chris suggested, drop the support for dropping records in
> the
> >>>>>>>> partitioner. That way, an empty list could signify the usage of a
> >>>>>> default
> >>>>>>>> partitioner. Also, if the deprecated partition() method returns
> >> null
> >>>>>>>> thereby signifying the default partitioner, the partitions() can
> >>>> return
> >>>>>> an
> >>>>>>>> empty list i.e default partitioner.
> >>>>>>>>
> >>>>>>>> 2) OR we treat a null return type of partitions() method to
> signify
> >>>> the
> >>>>>>>> usage of the default partitioner. In the default implementation of
> >>>>>>>> partitions() method, if partition() returns null, then even
> >>>> partitions()
> >>>>>>>> can return null(instead of an empty list). The RecordCollectorImpl
> >>>> code
> >>>>>> can
> >>>>>>>> also be modified accordingly. @Chris, to your point, we can even
> >> drop
> >>>>>> the
> >>>>>>>> support of dropping of records. It came up during KIP discussion,
> >>>> and I
> >>>>>>>> thought it might be a useful feature. Let me know what you think.
> >>>>>>>>
> >>>>>>>> 3) Lastly about the partition number check. I wanted to avoid the
> >>>>>> throwing
> >>>>>>>> of exception so I thought adding it might be a useful feature. But
> >> as
> >>>>>> you
> >>>>>>>> pointed out, if it can break backwards compatibility, it's better
> >> to
> >>>>>> remove
> >>>>>>>> it.
> >>>>>>>>
> >>>>>>>> Thanks!
> >>>>>>>> Sagar.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton
> >>>> <chr...@aiven.io.invalid>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> +1 to Bruno's concerns about backward compatibility. Do we
> >> actually
> >>>>>> need
> >>>>>>>>> support for dropping records in the partitioner? It doesn't seem
> >>>>>> necessary
> >>>>>>>>> based on the motivation for the KIP. If we remove that feature,
> we
> >>>>>> could
> >>>>>>>>> handle null and/or empty lists by using the default partitioning,
> >>>>>>>>> equivalent to how we handle null return values from the existing
> >>>>>> partition
> >>>>>>>>> method today.
> >>>>>>>>>
> >>>>>>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <
> cado...@apache.org
> >>>
> >>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi Sagar,
> >>>>>>>>>>
> >>>>>>>>>> Thank you for the updates!
> >>>>>>>>>>
> >>>>>>>>>> I do not intend to prolong this vote thread more than needed,
> >> but I
> >>>>>>>>>> still have some points.
> >>>>>>>>>>
> >>>>>>>>>> The deprecated partition method can return null if the default
> >>>>>>>>>> partitioning logic of the producer should be used.
> >>>>>>>>>> With the new method partitions() it seems that it is not
> possible
> >>>> to
> >>>>>> use
> >>>>>>>>>> the default partitioning logic, anymore.
> >>>>>>>>>>
> >>>>>>>>>> Also, in the default implementation of method partitions(), a
> >>>> record
> >>>>>>>>>> that would use the default partitioning logic in method
> >> partition()
> >>>>>>>>>> would be dropped, which would break backward compatibility since
> >>>>>> Streams
> >>>>>>>>>> would always call the new method partitions() even though the
> >> users
> >>>>>>>>>> still implement the deprecated method partition().
> >>>>>>>>>>
> >>>>>>>>>> I have a last point that we should probably discuss on the PR
> and
> >>>> not
> >>>>>> on
> >>>>>>>>>> the KIP but since you added the code in the KIP I need to
> mention
> >>>> it.
> >>>>>> I
> >>>>>>>>>> do not think you should check the validity of the partition
> >> number
> >>>>>> since
> >>>>>>>>>> the ProducerRecord does the same check and throws an exception.
> >> If
> >>>>>>>>>> Streams adds the same check but does not throw, the behavior is
> >> not
> >>>>>>>>>> backward compatible.
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Bruno
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 30.08.22 12:43, Sagar wrote:
> >>>>>>>>>>> Thanks Bruno/Chris,
> >>>>>>>>>>>
> >>>>>>>>>>> Even I agree that might be better to keep it simple like the
> way
> >>>>>> Chris
> >>>>>>>>>>> suggested. I have updated the KIP accordingly. I made couple of
> >>>> minor
> >>>>>>>>>>> changes to the KIP:
> >>>>>>>>>>>
> >>>>>>>>>>> 1) One of them being the change of return type of partitions
> >>>> method
> >>>>>>>>> from
> >>>>>>>>>>> List to Set. This is to ensure that in case the implementation
> >> of
> >>>>>>>>>>> StreamPartitoner is buggy and ends up returning duplicate
> >>>>>>>>>>> partition numbers, we won't have duplicates thereby not trying
> >> to
> >>>>>>>>> send to
> >>>>>>>>>>> the same partition multiple times due to this.
> >>>>>>>>>>> 2) I also added a check to send the record only to valid
> >> partition
> >>>>>>>>>> numbers
> >>>>>>>>>>> and log and drop when the partition number is invalid. This is
> >>>> again
> >>>>>>>>> to
> >>>>>>>>>>> prevent errors for cases when the StreamPartitioner
> >> implementation
> >>>>>> has
> >>>>>>>>>> some
> >>>>>>>>>>> bugs (since there are no validations as such).
> >>>>>>>>>>> 3) I also updated the Test Plan section based on the suggestion
> >>>> from
> >>>>>>>>>> Bruno.
> >>>>>>>>>>> 4) I updated the default implementation of partitions method
> >>>> based on
> >>>>>>>>> the
> >>>>>>>>>>> great catch from Chris!
> >>>>>>>>>>>
> >>>>>>>>>>> Let me know if it looks fine now.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks!
> >>>>>>>>>>> Sagar.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <
> >> cado...@apache.org
> >>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I am favour of discarding the sugar for broadcasting and leave
> >>>> the
> >>>>>>>>>>>> broadcasting to the implementation as Chris suggests. I think
> >>>> that
> >>>>>> is
> >>>>>>>>>>>> the cleanest option.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>> Bruno
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 29.08.22 19:50, Chris Egerton wrote:
> >>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I think it'd be useful to be more explicit about broadcasting
> >> to
> >>>>>> all
> >>>>>>>>>>>> topic
> >>>>>>>>>>>>> partitions rather than add implicit behavior for empty cases
> >>>> (empty
> >>>>>>>>>>>>> optional, empty list, etc.). The suggested enum approach
> would
> >>>>>>>>> address
> >>>>>>>>>>>> that
> >>>>>>>>>>>>> nicely.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> It's also worth noting that there's no hard requirement to
> add
> >>>>>> sugar
> >>>>>>>>>> for
> >>>>>>>>>>>>> broadcasting to all topic partitions since the API already
> >>>> provides
> >>>>>>>>> the
> >>>>>>>>>>>>> number of topic partitions available when calling a stream
> >>>>>>>>> partitioner.
> >>>>>>>>>>>> If
> >>>>>>>>>>>>> we can't find a clean way to add this support, it might be
> >>>> better
> >>>>>> to
> >>>>>>>>>>>> leave
> >>>>>>>>>>>>> it out and just let people implement this themselves with a
> >>>> line of
> >>>>>>>>>> Java
> >>>>>>>>>>>> 8
> >>>>>>>>>>>>> streams:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>          return IntStream.range(0,
> >>>>>>>>>>>>> numPartitions).boxed().collect(Collectors.toList());
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Also worth noting that there may be a bug in the default
> >>>>>>>>> implementation
> >>>>>>>>>>>> for
> >>>>>>>>>>>>> the new StreamPartitioner::partitions method, since it
> doesn't
> >>>>>>>>> appear
> >>>>>>>>>> to
> >>>>>>>>>>>>> correctly pick up on null return values from the partition
> >>>> method
> >>>>>>>>> and
> >>>>>>>>>>>>> translate them into empty lists.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Chris
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <
> >>>> cado...@apache.org>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Sagar,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I do not see an issue with using an empty list together with
> >> an
> >>>>>>>>> empty
> >>>>>>>>>>>>>> Optional. A non-empty Optional that contains an empty list
> >>>> would
> >>>>>>>>> just
> >>>>>>>>>>>>>> say that there is no partition to which the record should be
> >>>> sent.
> >>>>>>>>> If
> >>>>>>>>>>>>>> there is no partition, the record is dropped.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> An empty Optional might be a way to say, broadcast to all
> >>>>>>>>> partitions.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Alternatively -- to make it more explicit -- you could
> return
> >>>> an
> >>>>>>>>>> object
> >>>>>>>>>>>>>> with an enum and a list of partitions. The enum could have
> >>>> values
> >>>>>>>>>> SOME,
> >>>>>>>>>>>>>> ALL, and NONE. If the value is SOME, the list of partitions
> >>>>>>>>> contains
> >>>>>>>>>> the
> >>>>>>>>>>>>>> partitions to which to send the record. If the value of the
> >>>> enum
> >>>>>> is
> >>>>>>>>>> ALL
> >>>>>>>>>>>>>> or NONE, the list of partitions is not used and might be
> even
> >>>> null
> >>>>>>>>>>>>>> (since in that case the list should not be used and it would
> >>>> be a
> >>>>>>>>> bug
> >>>>>>>>>> to
> >>>>>>>>>>>>>> use it).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 24.08.22 20:11, Sagar wrote:
> >>>>>>>>>>>>>>> Thank you Bruno/Matthew for your comments.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I agree using null does seem error prone. However I think
> >>>> using a
> >>>>>>>>>>>>>> singleton
> >>>>>>>>>>>>>>> list of [-1] might be better in terms of usability, I am
> >>>> saying
> >>>>>>>>> this
> >>>>>>>>>>>>>>> because the KIP also has a provision to return an empty
> list
> >>>> to
> >>>>>>>>> refer
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>> dropping the record. So, an empty optional and an empty
> list
> >>>> have
> >>>>>>>>>>>> totally
> >>>>>>>>>>>>>>> different meanings which could get confusing.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Let me know what you think.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks!
> >>>>>>>>>>>>>>> Sagar.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich
> >>>>>>>>>>>>>>> <matthew.dedetr...@aiven.io.invalid> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I also concur with this, having an Optional in the type
> >>>> makes it
> >>>>>>>>>> very
> >>>>>>>>>>>>>>>> clear what’s going on and better signifies an absence of
> >>>> value
> >>>>>>>>> (or
> >>>>>>>>>> in
> >>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>> case the broadcast value).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>> Matthew de Detrich
> >>>>>>>>>>>>>>>> Aiven Deutschland GmbH
> >>>>>>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin
> >>>>>>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> >>>>>>>>>>>>>>>> m: +491603708037
> >>>>>>>>>>>>>>>> w: aiven.io e: matthew.dedetr...@aiven.io
> >>>>>>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org,
> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>> I would prefer changing the return type of partitions()
> to
> >>>>>>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as the
> >>>>>>>>> broadcast
> >>>>>>>>>>>>>>>>> value. IMO, The chances that an implementation returns
> >> null
> >>>> due
> >>>>>>>>> to
> >>>>>>>>>> a
> >>>>>>>>>>>>>> bug
> >>>>>>>>>>>>>>>>> is much higher than that an implementation returns an
> >> empty
> >>>>>>>>>> Optional
> >>>>>>>>>>>>>> due
> >>>>>>>>>>>>>>>>> to a bug.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to