Thanks for your sharing Matthias. I agree that is indeed an anti-pattern to assume poll() returns data or not.
However, I check all usages of poll() in code base. There is an interesting use case - poll(a bigger timeout) - it implies that callers want to block poll()(forever) unless there are available data. [1] https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L443 [2] https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java#L232 Hence, I start to worry client code like aforementioned cases get broken due to behavior change :( On 2021/02/03 22:59:09, "Matthias J. Sax" <mj...@apache.org> wrote: > Thanks for your email John. > > I agree that it seems to be an anti-pattern to write code that makes > assumptions if poll() returns data or not. Thus, we should fix-forward > the system test from my point of view. > > From my understanding, the impact of KIP-695 is that we might return > early from poll() (ie, before the timeout passed) with no data, only if > an empty fetch request comes back and there is no other fetch request > that did return data. Thus, for most cases, poll() should still return > early and provide data. -- Thus, I have no concerns with the slight > behavior change. > > Would be good to get input from others about this question though. > > > -Matthias > > > On 2/3/21 10:06 AM, John Roesler wrote: > > Hello again all, > > > > I'm resurrecting this thread to discuss an issue that has > > come up after merging the code for this KIP. > > > > The issue is that some of the system tests need to be > > updated in the same way that this integration test needed to > > be updated: > > https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888 > > > > This issue was reported here: > > https://issues.apache.org/jira/browse/KAFKA-12268 > > and there is some preliminary discussion here: > > https://github.com/apache/kafka/pull/10022 > > > > First, let me offer my apologies for failing to catch this > > before the merge. I'm sorry that it became Rajini's work to > > track down the cause of the failure, when it was my > > responsibility to ensure the feature was merged safely. > > > > To recap the situation: > > Consumer#poll(Duration) will now return before the duration > > expires even if there are no records returned if there is > > some returned metadata. > > > > This behavior was important for KIP-695. In the situation > > where we get no records back for some partition, Streams > > needs to have the freshest possible information about > > whether there are no new records on the broker, or whether > > there are records on the broker that we still need to fetch. > > If that's not clear, the KIP contains the full story. > > > > It's definitely a behavior change, but our rationale was > > that it's an acceptable behavior change. Our big alternative > > is to add a _new_ method to Consumer to > > pollForRecordsOrMetadata(Duration) or something. > > > > It seems unreliable to expect the broker to return a > > particular record within a particular timeout in general, > > which is what these tests are doing. The broker can decide > > for several reasons not to return data for a partition, but > > return data for another partition instead. > > > > It seems like the only case where you might reasonably try > > to rely on that is in a test, where you first write a record > > to a partition, then you assign only that one partition to a > > consumer, then you poll on the consumer, expecting it to > > return the data you just wrote. > > > > So the $10 question here is whether we should support this > > apparently artificial (testing-only) use case to the point > > where it's worth adding a whole new method to the Consumer > > interface. > > > > Thanks all, > > John > > > > On Thu, 2020-12-17 at 13:18 -0600, John Roesler wrote: > >> Thanks Jason, > >> > >> We would only return the metadata for the latest fetches. > >> So, if someone wanted to use this to lazily maintain a > >> client-side metadata map for all partitions, they'd have to > >> store it separately and merge in new updates as they arrive. > >> > >> This way: > >> 1. We don't need to increase the complexity of the client by > >> storing that metadata > >> 2. Users will be able to treat all returned metadata as > >> "fresh" without having to reason about the timestamps. > >> 3. All parts of the returned ConsumerRecords object have the > >> same lifecycle: all the data and metadata are the results of > >> the most recent round of fetch responses that had not been > >> previously polled. > >> > >> Does that seem sensible to you? I'll update the KIP to > >> clarify this. > >> > >> Thanks, > >> -John > >> > >> On Wed, 2020-12-16 at 10:29 -0800, Jason Gustafson wrote: > >>> Hi John, > >>> > >>> Just one question. It wasn't very clear to me exactly when the metadata > >>> would be returned in `ConsumerRecords`. Would we /always/ include the > >>> metadata for all partitions that are assigned, or would it be based on the > >>> latest fetches? > >>> > >>> Thanks, > >>> Jason > >>> > >>> On Fri, Dec 11, 2020 at 4:07 PM John Roesler <vvcep...@apache.org> wrote: > >>> > >>>> Thanks, Guozhang! > >>>> > >>>> All of your feedback sounds good to me. I’ll update the KIP when I am > >>>> able. > >>>> > >>>> 3) I believe it is the position after the fetch, but I will confirm. I > >>>> think omitting position may render beginning and end offsets useless as > >>>> well, which leaves only lag. That would be fine with me, but it also > >>>> seems > >>>> nice to supply this extra metadata since it is well defined and probably > >>>> handy for others. Therefore, I’d go the route of specifying the exact > >>>> semantics and keeping it. > >>>> > >>>> Thanks for the review, > >>>> John > >>>> > >>>> On Fri, Dec 11, 2020, at 17:36, Guozhang Wang wrote: > >>>>> Hello John, > >>>>> > >>>>> Thanks for the updates! I've made a pass on the KIP and also the POC PR, > >>>>> here are some minor comments: > >>>>> > >>>>> 1) nit: "receivedTimestamp" -> it seems the metadata keep getting > >>>> updated, > >>>>> and we do not create a new object but just update the values in-place, > >>>>> so > >>>>> maybe calling it `lastUpdateTimstamp` is better? > >>>>> > >>>>> 2) It will be great to verify in javadocs that the new API > >>>>> "ConsumerRecords#metadata(): Map<TopicPartition, Metadata>" may return a > >>>>> superset of TopicPartitions than the existing API that returns the data > >>>> by > >>>>> partitions, in case users assume their map key-entries would always be > >>>> the > >>>>> same. > >>>>> > >>>>> 3) The "position()" API of the call needs better clarification: is it > >>>>> the > >>>>> current position AFTER the records are returned, or is it BEFORE the > >>>>> records are returned? Personally I'd suggest we do not include it if it > >>>> is > >>>>> not used anywhere yet just to avoid possible misuage, but I'm fine if > >>>>> you > >>>>> like to keep it still; in that case just clarify its semantics. > >>>>> > >>>>> > >>>>> Other than that,I'm +1 on the KIP as well ! > >>>>> > >>>>> > >>>>> Guozhang > >>>>> > >>>>> > >>>>> On Fri, Dec 11, 2020 at 8:15 AM Walker Carlson <wcarl...@confluent.io> > >>>>> wrote: > >>>>> > >>>>>> Thanks for the KIP! > >>>>>> > >>>>>> +1 (non-binding) > >>>>>> > >>>>>> walker > >>>>>> > >>>>>> On Wed, Dec 9, 2020 at 11:40 AM Bruno Cadonna <br...@confluent.io> > >>>> wrote: > >>>>>> > >>>>>>> Thanks for the KIP, John! > >>>>>>> > >>>>>>> +1 (non-binding) > >>>>>>> > >>>>>>> Best, > >>>>>>> Bruno > >>>>>>> > >>>>>>> On 08.12.20 18:03, John Roesler wrote: > >>>>>>>> Hello all, > >>>>>>>> > >>>>>>>> There hasn't been much discussion on KIP-695 so far, so I'd > >>>>>>>> like to go ahead and call for a vote. > >>>>>>>> > >>>>>>>> As a reminder, the purpose of KIP-695 to improve on the > >>>>>>>> "task idling" feature we introduced in KIP-353. This KIP > >>>>>>>> will allow Streams to offer deterministic time semantics in > >>>>>>>> join-type topologies. For example, it makes sure that > >>>>>>>> when you join two topics, that we collate the topics by > >>>>>>>> timestamp. That was always the intent with task idling (KIP- > >>>>>>>> 353), but it turns out the previous mechanism couldn't > >>>>>>>> provide the desired semantics. > >>>>>>>> > >>>>>>>> The details are here: > >>>>>>>> https://cwiki.apache.org/confluence/x/JSXZCQ > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> -John > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>>> -- > >>>>> -- Guozhang > >>>>> > >>>> > >> > >> > > > > >