Thanks.

I buy the argument about the lag for active tasks.

Nit: The KIP briefly mentions the deprecation of `metadataFoKey()`
methods -- those should be listed as `@deprecated` next to the newly
added methods to point this change out more visibly.

Nit: in the code example, why do we loop over `inSyncStandbys` ? Would
we not just query only the most up-to-date one?

Nit: Section "Compatibility, Deprecation, and Migration Plan" should
point out that two methods are deprecated and user can migrate their
code to use the two new methods instead.

Those nits only address the write-up of the KIP, not the actual design
that LGTM.


-Matthias




On 11/14/19 3:48 PM, Guozhang Wang wrote:
> 10/20: I think I'm aligned with John's replies as well.
> 
> Guozhang
> 
> On Fri, Nov 15, 2019 at 1:45 AM Vinoth Chandar <vchan...@confluent.io>
> wrote:
> 
>>> during restoring state the active might have some lag
>>
>> Great catch, yes.. we cannot assume lag = 0 for active. Lets report active
>> lag as well then. If active is too laggy, the app can then deem the store
>> partition unavailable (based on what the application is willing to
>> tolerate).
>>
>> @matthias do you agree? We can then begin the vote.
>>
>> On Thu, Nov 14, 2019 at 9:03 AM Navinder Brar
>> <navinder_b...@yahoo.com.invalid> wrote:
>>
>>> I agree with Vinoth and John on having "allLocalStoreOffsetLags()", all
>>> actives don't have 0 lag, as during restoring state the active might have
>>> some lag and one of the features of this KIP is to provide an option to
>>> query from active (which might be in restoring state).
>>> I will update the KIP with rejected alternatives and post this will start
>>> a vote if everyone agrees on this.
>>>     On Thursday, 14 November, 2019, 09:34:52 pm IST, John Roesler <
>>> j...@confluent.io> wrote:
>>>
>>>  Hi all,
>>>
>>> Thanks for the "reset", Vinoth. It brings some needed clarity to the
>>> discussion.
>>>
>>> 10. My 2 cents: we might as well include the lags for the active
>>> copies as well. This is a more context-free API. If we only include
>>> standbys, this choice won't make sense to users unless they understand
>>> that the active task cannot lag in the steady state, since it's the
>>> source of updates. This isn't a bad thing to realize, but it's just
>>> more mental overhead for the person who wants to list the lags for
>>> "all local stores".
>>>
>>> Another reason is that we could consider also reporting the lag for
>>> actives during recovery (when they would have non-zero lag). We don't
>>> have to now, but if we choose to call the method "standby lags", then
>>> we can't make this choice in the future.
>>>
>>> That said, it's just my opinion. I'm fine either way.
>>>
>>> 20. Vinoth's reply makes sense to me, fwiw.
>>>
>>> Beyond these two points, I'm happy with the current proposal.
>>>
>>> Thanks again,
>>> -John
>>>
>>> On Thu, Nov 14, 2019 at 4:48 AM Vinoth Chandar <vchan...@confluent.io>
>>> wrote:
>>>>
>>>> 10. I considered that. Had to pick one or the other. Can just return
>>>> standby too and rename method to may be “allLocalStandbyOffsetLags()”
>> to
>>>> have it explicit. (Standby should implicitly convey that we are talking
>>>> about stores)
>>>>
>>>> 20. What I meant was, we are returning HostInfo instead of
>>> StreamsMetadata
>>>> since thats sufficient to route query; same for “int partition “ vs
>> topic
>>>> partition before. Previously KeyQueryMetadata had similar structure but
>>>> used StreamsMetadata and TopicPartition objects to convey same
>>> information
>>>>
>>>> @navinder KIP is already upto date with the email I sent, except for
>> the
>>>> reasonings I was laying out. +1 on revisiting rejected alternatives.
>>>> Please make the follow up changes
>>>>
>>>> On Wed, Nov 13, 2019 at 9:12 PM Matthias J. Sax <matth...@confluent.io
>>>
>>>> wrote:
>>>>
>>>>> Thanks for the summary Vinoth!
>>>>>
>>>>> I buy the overall argument. Couple of clarification questions:
>>>>>
>>>>>
>>>>> 10. Why do we need to include the active stores in
>>>>> `allLocalStoreOffsetLags()`? Would it not be simpler to just return
>> lag
>>>>> for standbys?
>>>>>
>>>>>
>>>>> 20: What does
>>>>>
>>>>>> Thin the KeyQueryMetadata object to just contain the minimum
>>> information
>>>>>> needed.
>>>>>
>>>>> exaclty mean? What is the "minimum information needed" ?
>>>>>
>>>>>
>>>>> @Navinder: if you agree, can you update the KIP accoringly? With all
>>> the
>>>>> proposals, it's hard to keep track and it would be great to have the
>>>>> current proposal summarized in the wiki page.
>>>>>
>>>>> Please also update the "Rejected alternative" sections to avoid that
>> we
>>>>> cycle back to old proposal (including the reason _why_ they got
>>> rejected).
>>>>>
>>>>>
>>>>> Thanks a lot!
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>>
>>>>>
>>>>> On 11/13/19 7:10 PM, Vinoth Chandar wrote:
>>>>>> Given we have had a healthy discussion on this topic for a month
>> now
>>> and
>>>>>> still with many loose ends and open ended conversations, I thought
>> It
>>>>> would
>>>>>> be worthwhile to take a step back and re-evaluate everything in the
>>>>> context
>>>>>> of the very real use-case and its specific scenarios.
>>>>>>
>>>>>> First, let's remind ourselves of the query routing flow of the
>>> streams
>>>>>> application ("app" here on)
>>>>>>
>>>>>>    1. queries get routed to any random streams instance in the
>>> cluster
>>>>>>    ("router" here on)
>>>>>>    2. router then uses Streams metadata to pick active/standby
>>> instances
>>>>>>    for that key's store/partition
>>>>>>    3. router instance also maintains global lag information for all
>>>>> stores
>>>>>>    and all their partitions, by a gossip/broadcast/heartbeat
>> protocol
>>>>> (done
>>>>>>    outside of Streams framework), but using
>>> KafkaStreams#allMetadata()
>>>>> for
>>>>>>    streams instance discovery.
>>>>>>    4. router then uses information in 2 & 3 to determine which
>>> instance
>>>>> to
>>>>>>    send the query to  : always picks active instance if alive or
>> the
>>> most
>>>>>>    in-sync live standby otherwise.
>>>>>>
>>>>>> Few things to note :
>>>>>>
>>>>>> A) We choose to decouple how the lag information is obtained
>> (control
>>>>>> plane) from query path (data plane), since that provides more
>>> flexibility
>>>>>> in designing the control plane. i.e pick any or combination of
>>> gossip,
>>>>>> N-way broadcast, control the rate of propagation, piggybacking on
>>> request
>>>>>> responses
>>>>>> B) Since the app needs to do its own control plane, talking to
>> other
>>>>>> instances directly for failure detection & exchanging other
>>> metadata, we
>>>>>> can leave the lag APIs added to KafkaStreams class itself local and
>>>>> simply
>>>>>> return lag for all store/partitions on that instance.
>>>>>> C) Streams preserves its existing behavior of instances only
>> talking
>>> to
>>>>>> each other through the Kafka brokers.
>>>>>> D) Since the router treats active/standby differently, it would be
>>> good
>>>>> for
>>>>>> the KafkaStreams APIs to hand them back explicitly, with no
>>> additional
>>>>>> logic needed for computing them. Specifically, the router only
>> knows
>>> two
>>>>>> things - key and store and if we just return a
>>>>> Collection<StreamsMetadata>
>>>>>> back, it cannot easily tease apart active and standby. Say, a
>> streams
>>>>>> instance hosts the same store as both active and standby for
>>> different
>>>>>> partitions, matching by just storename the app will find it in both
>>>>> active
>>>>>> and standby lists.
>>>>>> E) From above, we assume the global lag estimate (lag per store
>> topic
>>>>>> partition) are continuously reported amongst application instances
>>> and
>>>>>> already available on the router during step 2 above. Hence,
>>> attaching lag
>>>>>> APIs to StreamsMetadata is unnecessary and does not solve the needs
>>>>> anyway.
>>>>>> F) Currently returned StreamsMetadata object is really information
>>> about
>>>>> a
>>>>>> streams instance, that is not very specific to the key being
>> queried.
>>>>>> Specifically, router has no knowledge of the topic partition a
>> given
>>> key
>>>>>> belongs, this is needed for matching to the global lag information
>> in
>>>>> step
>>>>>> 4 above (and as also the example code in the KIP showed before).
>> The
>>>>>> StreamsMetadata, since it's about the instance itself, would
>> contain
>>> all
>>>>>> topic partitions and stores on that instance, not specific to the
>>> given
>>>>> key.
>>>>>> G) A cleaner API would thin the amount of information returned to
>>>>>> specifically the given key and store - which is partition the key
>>> belongs
>>>>>> to, active host info, list of standby host info. KeyQueryMetadata
>> was
>>>>>> proposed in this spirit, but still hung onto StreamsMetadata as the
>>>>> member
>>>>>> field(s) which leaks more than what's required to route.
>>>>>> H) For this use case, its sufficient to just support offsetLag
>>> initially,
>>>>>> without needing time based lag information right away.  So we
>>> probably
>>>>>> don't need a new top level StoreLagInfo class anymore.
>>>>>> I) On whether the local lag information is cached within Streams or
>>> App
>>>>>> code, its again preferable and be necessary for the App to be able
>> to
>>>>>> invoke the lag api from an app thread and keep a cached state in
>>> memory,
>>>>> to
>>>>>> be piggybacked with request responses. If this information is also
>>> cached
>>>>>> inside Streams, it does no harm anyway.
>>>>>>
>>>>>> Based on this reasoning, I have made the following updates to the
>> KIP
>>>>>>
>>>>>> - Drop the proposed KafkaStreams#allStandbyMetadata() and
>>>>>> KafkaStreams#allStandbyMetadataForStore() methods, with intent of
>>>>>> introducing minimal APIs to support the usage described above
>>>>>> - Drop the KafkaStreams#lagInfoForStore() method and just stick to
>>> one
>>>>>> method that returns all the local store partition's lags.
>>>>>> - Rename allLagInfo() to allLocalStoreOffsetLags() to be explicit
>>> that
>>>>> the
>>>>>> lags are local and only on offsets.
>>>>>> - Drop StoreLagInfo class
>>>>>> - allLocalStoreOffsetLags() returns Map<String, Map<Integer,
>> Long>>,
>>>>> which
>>>>>> maps a store name to a map containing partition to offset lag info.
>>> It
>>>>> will
>>>>>> include both active and standby store partitions. active always has
>>> 0 lag
>>>>>> since its caught up with changelog topic.
>>>>>> - Thin the KeyQueryMetadata object to just contain the minimum
>>>>> information
>>>>>> needed. Rename allMetadataForKey() method variants to
>>>>> queryMetadataForKey()
>>>>>> variants that return KeyQueryMetadata
>>>>>> - Propose deprecating two current methods that return metadata
>> based
>>> on
>>>>> key
>>>>>> :  KafkaStreams#metadataForKey(..), to get more users to use the
>> new
>>>>>> queryMetadataForKey APIs
>>>>>> - We will still have to enhance StreamsMetadata with fields for
>>>>>> standbyTopicPartitions and standbyStateStoreNames, since that is a
>>> core
>>>>>> object that gets updated upon rebalance.
>>>>>>
>>>>>>
>>>>>> Please let us know if this is agreeable.  We will also work some of
>>> this
>>>>>> discussion into the background/proposed changes sections, upon
>>> feedback.
>>>>>>
>>>>>>
>>>>>> On Tue, Nov 12, 2019 at 9:17 AM Vinoth Chandar <
>>> vchan...@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>>> In all, is everyone OK with
>>>>>>>
>>>>>>>  - Dropping KeyQueryMetadata, and the allMetadataForKey() apis
>>>>>>>  - Dropping allLagInfo() from KafkaStreams class, Drop
>> StoreLagInfo
>>>>> class
>>>>>>>  - Add offsetLag(store, key, serializer) -> Optional<Long> &
>>>>>>> offsetLag(store, key, partitioner) -> Optional<Long> to
>>> StreamsMetadata
>>>>>>>  - Duplicate the current methods for standbyMetadata in
>>> KafkaStreams :
>>>>>>> allStandbyMetadata(), allStandbyMetadataForStore(), two variants
>> of
>>>>>>> standbyMetadataForKey(),
>>>>>>>
>>>>>>>
>>>>>>> Responses to Guozhang:
>>>>>>>
>>>>>>> 1.1 Like I mentioned before, the allStandbyMetadata() and
>>>>>>> allStandbyMetadataForStore() complement existing allMetadata() and
>>>>>>> allMetadataForStore(), since we don't want to change behavior of
>>>>> existing
>>>>>>> APIs. Based on discussions so far, if we decide to drop
>>>>> KeyQueryMetadata,
>>>>>>> then we will need to introduce 4 equivalents for standby metadata
>> as
>>>>>>> Matthias mentioned.
>>>>>>> 1.2 I am okay with pushing lag information to a method on
>>>>> StreamsMetadata
>>>>>>> (Personally, I won't design it like that, but happy to live with
>> it)
>>>>> like
>>>>>>> what Matthias suggested. But assuming topic name <=> store name
>>>>> equivalency
>>>>>>> for mapping this seems like a broken API to me. If all of Streams
>>> code
>>>>> were
>>>>>>> written like this, I can understand. But I don't think its the
>>> case? I
>>>>>>> would not be comfortable making such assumptions outside of public
>>> APIs.
>>>>>>>>> look into each one's standby partition / stores to tell which
>> one
>>>>>>> StreamsMetadata is corresponding to the instance who holds a
>>> specific
>>>>> key
>>>>>>> as standby, yes, but I feel this one extra iteration is worth to
>>> avoid
>>>>>>> introducing a new class.
>>>>>>> This sort of thing would lead to non-standardized/potentially
>> buggy
>>>>> client
>>>>>>> implementations, for something I expect the system would hand me
>>>>> directly.
>>>>>>> I don't personally feel introducing a new class is so bad, to
>>> warrant
>>>>> the
>>>>>>> user to do all this matching. Given the current APIs are not
>>> explicitly
>>>>>>> named to denote active metadata, it gives us a chance to build
>>> something
>>>>>>> more direct and clear IMO. If we do allMetadataForKey apis, then
>> we
>>>>> should
>>>>>>> clearly separate active and standby ourselves. Alternate is
>> separate
>>>>> active
>>>>>>> and standby APIs as Matthias suggests, which I can make peace
>> with.
>>>>>>>
>>>>>>> 1.3 Similar as above. In Streams code, we treat topic partitions
>> and
>>>>> store
>>>>>>> names separately?.
>>>>>>> 2.1 I think most databases build replication using logical
>> offsets,
>>> not
>>>>>>> time. Time lag can be a nice to have feature, but offset lag is
>>> fully
>>>>>>> sufficient for a lot of use-cases.
>>>>>>> 2.2.1 We could support a lagInfoForStores() batch api. makes
>> sense.
>>>>>>>
>>>>>>>
>>>>>>> Responses to Matthias :
>>>>>>> (100) Streams can still keep the upto date version in memory and
>>>>>>> implementation could be for now just reading this already
>> refreshed
>>>>> value.
>>>>>>> Designing the API, with intent  of pushing this to the user keeps
>>> doors
>>>>>>> open for supporting time based lag in the future.
>>>>>>> (101) I am not sure what the parameters of evaluating approaches
>>> here
>>>>> is.
>>>>>>> Generally, when I am handed a Metadata object, I don't expect to
>>> further
>>>>>>> query it for more information semantically. I would not also force
>>> user
>>>>> to
>>>>>>> make separate calls for active and standby metadata.
>>>>>>> Well, that may be just me. So sure, we can push this into
>>>>> StreamsMetadata
>>>>>>> if everyone agrees!
>>>>>>> +1 on duplicating all 4 methods for standbys in this case.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Nov 12, 2019 at 4:12 AM Navinder Brar
>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>    - Looking back, I agree that 2 calls to StreamsMetadata to
>> fetch
>>>>>>>> StreamsMetadata and then using something like ‘long
>>>>>>>> StreamsMetadata#offsetLag(store, key)’ which Matthias suggested
>> is
>>>>> better
>>>>>>>> than introducing a new public API i.e. ‘KeyQueryMetadata’. I will
>>>>> change
>>>>>>>> the KIP accordingly.
>>>>>>>>    - >> I am actually not even sure, why we added
>>>>>>>> `StreamsMetadata#topicPartitions()` originally
>>>>>>>> I think it is helpful in showing which host holds which source
>>> topic
>>>>>>>> partitions for /instances endpoint and when you query a key, you
>>> need
>>>>> to
>>>>>>>> match the partition on which the key belongs to with the hosts
>>> holding
>>>>>>>> source topic partitions for that partition. Is there any other
>> way
>>> to
>>>>> get
>>>>>>>> this info?
>>>>>>>>
>>>>>>>>    On Tuesday, 12 November, 2019, 03:55:16 am IST, Guozhang Wang
>> <
>>>>>>>> wangg...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>  Regarding 1.2: StreamsMetadata is 1-1 mapped to the streams
>>>>> instances, so
>>>>>>>> 1) allMetadata would still return the same number of
>>> StreamsMetadata in
>>>>>>>> collection, just that within the StreamsMetadata now you have new
>>> APIs
>>>>> to
>>>>>>>> access standby partitions / stores. So I think it would not be a
>>>>> breaking
>>>>>>>> change to the public API to not include `allStandbyMetadata` and
>> `
>>>>>>>> allStandbyMetadataForStore` but still rely on
>>> `allMetadata(ForStore)`?
>>>>>>>>
>>>>>>>> Regarding 1.1: Good point about the partition number. But I'm
>> still
>>>>>>>> wondering if it is definitely necessary to introduce a new
>>>>>>>> `KeyQueryMetadata`
>>>>>>>> interface class. E.g. suppose our function signature is
>>>>>>>>
>>>>>>>> Collection<StreamsMetadata> allMetadataForKey
>>>>>>>>
>>>>>>>> When you get the collection of StreamsMetadata you need to
>> iterate
>>> over
>>>>>>>> the
>>>>>>>> collection and look into each one's standby partition / stores to
>>> tell
>>>>>>>> which one StreamsMetadata is corresponding to the instance who
>>> holds a
>>>>>>>> specific key as standby, yes, but I feel this one extra iteration
>>> is
>>>>> worth
>>>>>>>> to avoid introducing a new class.
>>>>>>>>
>>>>>>>>
>>>>>>>> Guozhang
>>>>>>>>
>>>>>>>> On Sat, Nov 9, 2019 at 10:04 PM Matthias J. Sax <
>>> matth...@confluent.io
>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I agree, that we might want to drop the time-base lag for the
>>> initial
>>>>>>>>> implementation. There is no good way to get this information
>>> without a
>>>>>>>>> broker side change.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> (100) For the offset lag information, I don't see a reason why
>>> the app
>>>>>>>>> should drive when this information is updated though, because KS
>>> will
>>>>>>>>> update this information anyway (once per `commit.interval.ms`
>> --
>>> and
>>>>>>>>> updating it more frequently does not make sense, as it most
>> likely
>>>>> won't
>>>>>>>>> change more frequently anyway).
>>>>>>>>>
>>>>>>>>> If you all insist that the app should drive it, I can live with
>>> it,
>>>>> but
>>>>>>>>> I think it makes the API unnecessarily complex without a
>> benefit.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> (101) I still don't understand why we need to have
>>> `KeyQueryMetadata`
>>>>>>>>> though. Note, that an instance can only report lag for it's
>> local
>>>>>>>>> stores, but not remote stores as it does not know to what
>> offset a
>>>>>>>>> remote standby has caught up to.
>>>>>>>>>
>>>>>>>>>> Because we needed to return the topicPartition the key belongs
>>> to, in
>>>>>>>>>> order to correlate with the lag information from the other set
>> of
>>>>>>>> APIs.
>>>>>>>>>
>>>>>>>>> My suggestion is to get the lag information from
>>> `StreamsMetadata` --
>>>>>>>>> which partition the store belongs to can be completely
>>> encapsulated
>>>>>>>>> within KS as all information is local, and I don't think we need
>>> to
>>>>>>>>> expose it to the user at all.
>>>>>>>>>
>>>>>>>>> We can just add `long StreamsMetadata#offsetLag(store, key)`. If
>>> the
>>>>>>>>> store is local we return its lag, if it's remote we return `-1`
>>> (ie,
>>>>>>>>> UNKNOWN). As an alternative, we can change the return type to
>>>>>>>>> `Optional<Long>`. This works for active and standby task alike.
>>>>>>>>>
>>>>>>>>> Note, that a user must verify if `StreamsMeatadata` is for
>> itself
>>>>>>>>> (local) or remote anyway. We only need to provide a way that
>>> allows
>>>>>>>>> users to distinguish between active an standby. (More below.)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I am actually not even sure, why we added
>>>>>>>>> `StreamsMetadata#topicPartitions()` originally -- seems pretty
>>>>> useless.
>>>>>>>>> Can we deprecate it as side cleanup in this KIP? Or do I miss
>>>>> something?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> (102)
>>>>>>>>>
>>>>>>>>>> There are basically 2 reasons. One is that instead of having
>> two
>>>>>>>>> functions, one to get StreamsMetadata for active and one for
>>> replicas.
>>>>>>>> We
>>>>>>>>> are fetching both in a single call and we have a way to get only
>>>>> active
>>>>>>>> or
>>>>>>>>> only replicas from the KeyQueryMetadata object(just like
>>> isStandby()
>>>>> and
>>>>>>>>> isActive() discussion we had earlier)
>>>>>>>>>
>>>>>>>>> I understand, that we need two methods. However, I think we can
>>>>> simplify
>>>>>>>>> the API and not introduce `KeyQueryMetadata`, but just
>> "duplicate"
>>>>> all 4
>>>>>>>>> existing methods for standby tasks:
>>>>>>>>>
>>>>>>>>> // note that `standbyMetadataForKey` return a Collection in
>>> contrast
>>>>> to
>>>>>>>>> existing `metadataForKey`
>>>>>>>>>
>>>>>>>>>>  Collection<StreamsMetadata> allStandbyMetadata()
>>>>>>>>>>  Collection<StreamsMetadata> allStandbyMetadataForStore(String
>>>>>>>>> storeName)
>>>>>>>>>>  Collection<StreamsMetadata> metadataForKey(String storeName, K
>>> key,
>>>>>>>>> Serializer<K> keySerializer)
>>>>>>>>>>  Collection<StreamsMetadata> metadataForKey(String storeName, K
>>> key,
>>>>>>>>> StreamPartitioner<? super K, ?> partitioner)
>>>>>>>>>
>>>>>>>>> Because the existing methods return all active metadata, there
>> is
>>> no
>>>>>>>>> reason to return `KeyQueryMetadata` as it's more complicated to
>>> get
>>>>> the
>>>>>>>>> standby metadata. With `KeyQueryMetadata` the user needs to make
>>> more
>>>>>>>>> calls to get the metadata:
>>>>>>>>>
>>>>>>>>>  KafkaStreams#allMetadataForKey()
>>>>>>>>>              #getActive()
>>>>>>>>>
>>>>>>>>>  KafkaStreams#allMetadataForKey()
>>>>>>>>>              #getStandby()
>>>>>>>>>
>>>>>>>>> vs:
>>>>>>>>>
>>>>>>>>>  KafkaStreams#metadataForKey()
>>>>>>>>>
>>>>>>>>>  KafkaStreams#standbyMetadataForKey()
>>>>>>>>>
>>>>>>>>> The wrapping of both within `KeyQueryMetadata` does not seem to
>>>>> provide
>>>>>>>>> any benefit but increase our public API surface.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> @Guozhang:
>>>>>>>>>
>>>>>>>>> (1.1. + 1.2.) From my understanding `allMetadata()` (and other
>>>>> existing
>>>>>>>>> methods) will only return the metadata of _active_ tasks for
>>> backward
>>>>>>>>> compatibility reasons. If we would return standby metadata,
>>> existing
>>>>>>>>> code would potentially "break" because the code might pick a
>>> standby
>>>>> to
>>>>>>>>> query a key without noticing.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 11/8/19 6:07 AM, Navinder Brar wrote:
>>>>>>>>>> Thanks, Guozhang for going through it again.
>>>>>>>>>>
>>>>>>>>>>    - 1.1 & 1.2: The main point of adding topicPartition in
>>>>>>>>> KeyQueryMetadata is not topicName, but the partition number. I
>>> agree
>>>>>>>>> changelog topicNames and store names will have 1-1 mapping but
>> we
>>> also
>>>>>>>> need
>>>>>>>>> the partition number of the changelog for which are calculating
>>> the
>>>>> lag.
>>>>>>>>> Now we can add partition number in StreamsMetadata but it will
>> be
>>>>>>>>> orthogonal to the definition of StreamsMetadata i.e.-
>> “Represents
>>> the
>>>>>>>> state
>>>>>>>>> of an instance (process) in a {@link KafkaStreams} application.”
>>> If
>>>>> we
>>>>>>>> add
>>>>>>>>> partition number in this, it doesn’t stay metadata for an
>>> instance,
>>>>>>>> because
>>>>>>>>> now it is storing the partition information for a key being
>>> queried.
>>>>> So,
>>>>>>>>> having “KeyQueryMetadata” simplifies this as now it contains all
>>> the
>>>>>>>>> metadata and also changelog and partition information for which
>> we
>>>>> need
>>>>>>>> to
>>>>>>>>> calculate the lag.
>>>>>>>>>>
>>>>>>>>>> Another way is having another function in parallel to
>>> metadataForKey,
>>>>>>>>> which returns the partition number for the key being queried.
>> But
>>> then
>>>>>>>> we
>>>>>>>>> would need 2 calls to StreamsMetadataState, once to fetch
>>> metadata and
>>>>>>>>> another to fetch partition number. Let me know if any of these
>> two
>>>>> ways
>>>>>>>>> seem more intuitive than KeyQueryMetadata then we can try to
>>> converge
>>>>> on
>>>>>>>>> one.
>>>>>>>>>>    - 1.3:  Again, it is required for the partition number. We
>> can
>>>>> drop
>>>>>>>>> store name though.
>>>>>>>>>>    - 2.1: I think this was done in accordance with the opinion
>>> from
>>>>>>>> John
>>>>>>>>> as time lag would be better implemented with a broker level
>>> change and
>>>>>>>>> offset change is readily implementable. @vinoth?
>>>>>>>>>>    - 2.2.1: Good point.  +1
>>>>>>>>>>    - 2.2.2: I am not well aware of it, @vinoth any comments?
>>>>>>>>>>    - 3.1: I think we have already agreed on dropping this, we
>>> need to
>>>>>>>>> KIP. Also, is there any opinion on lagInfoForStore(String
>>> storeName)
>>>>> vs
>>>>>>>>> lagInfoForStore(String storeName, int partition)
>>>>>>>>>>    - 3.2: But in functions such as onAssignment(),
>>>>>>>>> onPartitionsAssigned(), for standbyTasks also the
>> topicPartitions
>>> we
>>>>> use
>>>>>>>>> are input topic partitions and not changelog partitions. Would
>>> this be
>>>>>>>>> breaking from that semantics?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>    On Thursday, 7 November, 2019, 11:33:19 pm IST, Guozhang
>> Wang
>>> <
>>>>>>>>> wangg...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>  Hi Navinder, Vinoth, thanks for the updated KIP!
>>>>>>>>>>
>>>>>>>>>> Read through the discussions so far and made another pass on
>> the
>>> wiki
>>>>>>>>> page,
>>>>>>>>>> and here are some more comments:
>>>>>>>>>>
>>>>>>>>>> 1. About the public APIs:
>>>>>>>>>>
>>>>>>>>>> 1.1. It is not clear to me how allStandbyMetadataForStore
>>>>>>>>>> and allStandbyMetadata would be differentiated from the
>> original
>>> APIs
>>>>>>>>> given
>>>>>>>>>> that we will augment StreamsMetadata to include both active and
>>>>>>>> standby
>>>>>>>>>> topic-partitions and store names, so I think we can still use
>>>>>>>> allMetadata
>>>>>>>>>> and allMetadataForStore to get the collection of instance
>>> metadata
>>>>>>>> that
>>>>>>>>>> host the store both as active and standbys. Are there any
>>> specific
>>>>> use
>>>>>>>>>> cases where we ONLY want to get the standby's metadata? And
>> even
>>> if
>>>>>>>> there
>>>>>>>>>> are, we can easily filter it out from the allMetadata /
>>>>>>>>> allMetadataForStore
>>>>>>>>>> right?
>>>>>>>>>>
>>>>>>>>>> 1.2. Similarly I'm wondering for allMetadataForKey, can we
>>> return the
>>>>>>>>> same
>>>>>>>>>> type: "Collection<StreamsMetadata>" which includes 1 for
>> active,
>>> and
>>>>>>>> N-1
>>>>>>>>>> for standbys, and callers can easily identify them by looking
>>> inside
>>>>>>>> the
>>>>>>>>>> StreamsMetadata objects? In addition I feel the
>> "topicPartition"
>>>>> field
>>>>>>>>>> inside "KeyQueryMetadata" is not very important since the
>>> changelog
>>>>>>>>>> topic-name is always 1-1 mapping to the store name, so as long
>>> as the
>>>>>>>>> store
>>>>>>>>>> name matches, the changelog topic name should always match
>> (i.e.
>>> in
>>>>>>>>>> the pseudo code, just checking store names should be
>>> sufficient). If
>>>>>>>> all
>>>>>>>>> of
>>>>>>>>>> the above assumption is true, I think we can save us from
>>> introducing
>>>>>>>> one
>>>>>>>>>> more public class here.
>>>>>>>>>>
>>>>>>>>>> 1.3. Similarly in StoreLagInfo, seems not necessary to include
>>> the
>>>>>>>> topic
>>>>>>>>>> partition name in addition to the store name.
>>>>>>>>>>
>>>>>>>>>> 2. About querying store lags: we've discussed about separating
>>> the
>>>>>>>>> querying
>>>>>>>>>> of the lag information and the querying of the host information
>>> so I
>>>>>>>>> still
>>>>>>>>>> support having separate APIs here. More thoughts:
>>>>>>>>>>
>>>>>>>>>> 2.1. Compared with offsets, I'm wondering would time-difference
>>> be
>>>>>>>> more
>>>>>>>>>> intuitive for users to define the acceptable "staleness"? More
>>>>>>>> strictly,
>>>>>>>>>> are there any scenarios where we would actually prefer offsets
>>> over
>>>>>>>>>> timestamps except that the timestamps are not available?
>>>>>>>>>>
>>>>>>>>>> 2.2. I'm also a bit leaning towards not putting the burden of
>>>>>>>>> periodically
>>>>>>>>>> refreshing our lag and caching it (and introducing another
>>> config) on
>>>>>>>> the
>>>>>>>>>> streams side but document clearly its cost and let users to
>>> consider
>>>>>>>> its
>>>>>>>>>> call frequency; of course in terms of implementation there are
>>> some
>>>>>>>>>> optimizations we can consider:
>>>>>>>>>>
>>>>>>>>>> 1) for restoring active tasks, the log-end-offset is read once
>>> since
>>>>>>>> it
>>>>>>>>> is
>>>>>>>>>> not expected to change, and that offset / timestamp can be
>>> remembered
>>>>>>>> for
>>>>>>>>>> lag calculation and we do not need to refresh again;
>>>>>>>>>> 2) for standby tasks,  there's a "Map<TopicPartition, Long>
>>>>>>>>>> endOffsets(Collection<TopicPartition> partitions)" in
>>> KafkaConsumer
>>>>> to
>>>>>>>>>> batch a list of topic-partitions in one round-trip, and we can
>>> use
>>>>>>>> that
>>>>>>>>> to
>>>>>>>>>> let our APIs be sth. like "lagInfoForStores(Collection<String>
>>>>>>>>> storeNames)"
>>>>>>>>>> to enable the batching effects.
>>>>>>>>>>
>>>>>>>>>> 3. Misc.:
>>>>>>>>>>
>>>>>>>>>> 3.1 There's a typo on the pseudo code "globalLagInforation".
>>> Also it
>>>>>>>>> seems
>>>>>>>>>> not describing how that information is collected (personally I
>>> also
>>>>>>>> feel
>>>>>>>>>> one "lagInfoForStores" is sufficient).
>>>>>>>>>> 3.2 Note there's a slight semantical difference between active
>>> and
>>>>>>>>>> standby's "partitions" inside StreamsMetadata, for active tasks
>>> the
>>>>>>>>>> partitions are actually input topic partitions for the task:
>>> some of
>>>>>>>> them
>>>>>>>>>> may also act as changelog topics but these are exceptional
>>> cases; for
>>>>>>>>>> standby tasks the "standbyTopicPartitions" are actually the
>>> changelog
>>>>>>>>>> topics of the task. So maybe renaming it to
>>>>>>>> "standbyChangelogPartitions"
>>>>>>>>> to
>>>>>>>>>> differentiate it?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Overall I think this would be a really good KIP to add to
>>> Streams,
>>>>>>>> thank
>>>>>>>>>> you so much!
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Guozhang
>>>>>>>>>>
>>>>>>>>>> On Wed, Nov 6, 2019 at 8:47 PM Navinder Brar
>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
>>>>>>>>>>
>>>>>>>>>>> +1 on implementing offset based lag for now and push
>> time-based
>>> lag
>>>>>>>> to a
>>>>>>>>>>> later point in time when broker changes are done. Although
>>>>> time-based
>>>>>>>>> lag
>>>>>>>>>>> enhances the readability, it would not be a make or break
>>> change for
>>>>>>>>>>> implementing this KIP.
>>>>>>>>>>>
>>>>>>>>>>> Vinoth has explained the role of KeyQueryMetadata, let me in
>>> add in
>>>>>>>> my 2
>>>>>>>>>>> cents as well.
>>>>>>>>>>>    - There are basically 2 reasons. One is that instead of
>>> having
>>>>> two
>>>>>>>>>>> functions, one to get StreamsMetadata for active and one for
>>>>>>>> replicas.
>>>>>>>>> We
>>>>>>>>>>> are fetching both in a single call and we have a way to get
>> only
>>>>>>>> active
>>>>>>>>> or
>>>>>>>>>>> only replicas from the KeyQueryMetadata object(just like
>>> isStandby()
>>>>>>>> and
>>>>>>>>>>> isActive() discussion we had earlier)
>>>>>>>>>>>    - Since even after fetching the metadata now we have a
>>>>> requirement
>>>>>>>>> of
>>>>>>>>>>> fetching the topicPartition for which the query came:- to
>> fetch
>>> lag
>>>>>>>> for
>>>>>>>>>>> that specific topicPartition. Instead of having another call
>> to
>>>>> fetch
>>>>>>>>> the
>>>>>>>>>>> partition from StreamsMetadataState we thought using one
>> single
>>> call
>>>>>>>> and
>>>>>>>>>>> fetching partition and all metadata would be better.
>>>>>>>>>>>    - Another option was to change StreamsMetadata object and
>> add
>>>>>>>>>>> topicPartition in that for which the query came but it doesn’t
>>> make
>>>>>>>>> sense
>>>>>>>>>>> in terms of semantics as it StreamsMetadata. Also,
>>> KeyQueryMetadata
>>>>>>>>>>> represents all the metadata for the Key being queried, i.e.
>> the
>>>>>>>>> partition
>>>>>>>>>>> it belongs to and the list of StreamsMetadata(hosts) active or
>>>>>>>> replica
>>>>>>>>>>> where the key could be found.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>    On Thursday, 7 November, 2019, 01:53:36 am IST, Vinoth
>>> Chandar <
>>>>>>>>>>> vchan...@confluent.io> wrote:
>>>>>>>>>>>
>>>>>>>>>>>  +1 to John, suggestion on Duration/Instant and dropping the
>>> API to
>>>>>>>>> fetch
>>>>>>>>>>> all store's lags. However, I do think we need to return lags
>> per
>>>>>>>> topic
>>>>>>>>>>> partition. So not sure if single return value would work? We
>>> need
>>>>>>>> some
>>>>>>>>> new
>>>>>>>>>>> class that holds a TopicPartition and Duration/Instant
>> variables
>>>>>>>>> together?
>>>>>>>>>>>
>>>>>>>>>>> 10) Because we needed to return the topicPartition the key
>>> belongs
>>>>>>>> to,
>>>>>>>>> in
>>>>>>>>>>> order to correlate with the lag information from the other set
>>> of
>>>>>>>> APIs.
>>>>>>>>>>> Otherwise, we don't know which topic partition's lag estimate
>> to
>>>>>>>> use. We
>>>>>>>>>>> tried to illustrate this on the example code. StreamsMetadata
>> is
>>>>>>>> simply
>>>>>>>>>>> capturing state of a streams host/instance, where as
>>> TopicPartition
>>>>>>>>> depends
>>>>>>>>>>> on the key passed in. This is a side effect of our decision to
>>>>>>>> decouple
>>>>>>>>> lag
>>>>>>>>>>> based filtering on the metadata apis.
>>>>>>>>>>>
>>>>>>>>>>> 20) Goes back to the previous point. We needed to return
>>> information
>>>>>>>>> that
>>>>>>>>>>> is key specific, at which point it seemed natural for the
>>>>>>>>> KeyQueryMetadata
>>>>>>>>>>> to contain active, standby, topic partition for that key. If
>> we
>>>>>>>> merely
>>>>>>>>>>> returned a standbyMetadataForKey() ->
>>> Collection<StreamsMetadata>
>>>>>>>>> standby,
>>>>>>>>>>> an active metadataForKey() -> StreamsMetadata, and new
>>>>>>>>>>> getTopicPartition(key) -> topicPartition object back to the
>>> caller,
>>>>>>>> then
>>>>>>>>>>> arguably you could do the same kind of correlation. IMO having
>>> a the
>>>>>>>>>>> KeyQueryMetadata class to encapsulate all this is a friendlier
>>> API.
>>>>>>>>>>>  allStandbyMetadata() and allStandbyMetadataForStore() are
>> just
>>>>>>>> counter
>>>>>>>>>>> parts for metadataForStore() and allMetadata() that we
>> introduce
>>>>>>>> mostly
>>>>>>>>> for
>>>>>>>>>>> consistent API semantics. (their presence implicitly could
>> help
>>>>>>>> denote
>>>>>>>>>>> metadataForStore() is for active instances. Happy to drop them
>>> if
>>>>>>>> their
>>>>>>>>>>> utility is not clear)
>>>>>>>>>>>
>>>>>>>>>>> 30) This would assume we refresh all the standby lag
>> information
>>>>>>>> every
>>>>>>>>>>> time we query for that StreamsMetadata for a specific store?
>> For
>>>>> time
>>>>>>>>> based
>>>>>>>>>>> lag, this will involve fetching the tail kafka record at once
>>> from
>>>>>>>>> multiple
>>>>>>>>>>> kafka topic partitions? I would prefer not to couple them like
>>> this
>>>>>>>> and
>>>>>>>>>>> have the ability to make granular store (or even topic
>> partition
>>>>>>>> level)
>>>>>>>>>>> fetches for lag information.
>>>>>>>>>>>
>>>>>>>>>>> 32) I actually prefer John's suggestion to let the application
>>> drive
>>>>>>>> the
>>>>>>>>>>> lag fetches/updation and not have flags as the KIP current
>>> points
>>>>> to.
>>>>>>>>> Are
>>>>>>>>>>> you reexamining that position?
>>>>>>>>>>>
>>>>>>>>>>> On fetching lag information, +1 we could do this much more
>>>>>>>> efficiently
>>>>>>>>> with
>>>>>>>>>>> a broker changes. Given I don't yet have a burning need for
>> the
>>> time
>>>>>>>>> based
>>>>>>>>>>> lag, I think we can sequence the APIs such that the offset
>> based
>>>>> ones
>>>>>>>>> are
>>>>>>>>>>> implemented first, while we have a broker side change?
>>>>>>>>>>> Given we decoupled the offset and time based lag API, I am
>>> willing
>>>>> to
>>>>>>>>> drop
>>>>>>>>>>> the time based lag functionality (since its not needed right
>>> away
>>>>>>>> for my
>>>>>>>>>>> use-case). @navinder . thoughts?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Nov 5, 2019 at 11:10 PM Matthias J. Sax <
>>>>>>>> matth...@confluent.io>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Navinder,
>>>>>>>>>>>>
>>>>>>>>>>>> thanks for updating the KIP. Couple of follow up questions:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> (10) Why do we need to introduce the class
>> `KeyQueryMetadata`?
>>>>>>>>>>>>
>>>>>>>>>>>> (20) Why do we introduce the two methods
>> `allMetadataForKey()`?
>>>>>>>> Would
>>>>>>>>> it
>>>>>>>>>>>> not be simpler to add `Collection<StreamMetatdata>
>>>>>>>>>>>> standbyMetadataForKey(...)`. This would align with new
>> methods
>>>>>>>>>>>> `#allStandbyMetadata()` and `#allStandbyMetadataForStore()`?
>>>>>>>>>>>>
>>>>>>>>>>>> (30) Why do we need the class `StoreLagInfo` -- it seems
>>> simpler to
>>>>>>>>> just
>>>>>>>>>>>> extend `StreamMetadata` with the corresponding attributes and
>>>>>>>> methods
>>>>>>>>>>>> (of active task, the lag would always be reported as zero)
>>>>>>>>>>>>
>>>>>>>>>>>> (32) Via (30) we can avoid the two new methods
>> `#allLagInfo()`
>>> and
>>>>>>>>>>>> `#lagInfoForStore()`, too, reducing public API and making it
>>>>>>>> simpler to
>>>>>>>>>>>> use the feature.
>>>>>>>>>>>>
>>>>>>>>>>>> Btw: If we make `StreamMetadata` thread safe, the lag
>>> information
>>>>>>>> can
>>>>>>>>> be
>>>>>>>>>>>> updated in the background without the need that the
>> application
>>>>>>>>>>>> refreshes its metadata. Hence, the user can get active and/or
>>>>>>>> standby
>>>>>>>>>>>> metadata once, and only needs to refresh it, if a rebalance
>>>>>>>> happened.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> About point (4) of the previous thread: I was also thinking
>>> about
>>>>>>>>>>>> when/how to update the time-lag information, and I agree that
>>> we
>>>>>>>> should
>>>>>>>>>>>> not update it for each query.
>>>>>>>>>>>>
>>>>>>>>>>>> "How": That we need to fetch the last record is a little bit
>>>>>>>>>>>> unfortunate, but I don't see any other way without a broker
>>> change.
>>>>>>>> One
>>>>>>>>>>>> issue I still see is with "exactly-once" -- if transaction
>>> markers
>>>>>>>> are
>>>>>>>>>>>> in the topic, the last message is not at offset "endOffset -
>>> 1" and
>>>>>>>> as
>>>>>>>>>>>> multiple transaction markers might be after each other, it's
>>>>> unclear
>>>>>>>>> how
>>>>>>>>>>>> to identify the offset of the last record... Thoughts?
>>>>>>>>>>>>
>>>>>>>>>>>> Hence, it might be worth to look into a broker change as a
>>>>> potential
>>>>>>>>>>>> future improvement. It might be possible that the broker
>>> caches the
>>>>>>>>>>>> latest timestamp per partition to serve this data
>> efficiently,
>>>>>>>> similar
>>>>>>>>>>>> to `#endOffset()`.
>>>>>>>>>>>>
>>>>>>>>>>>> "When": We refresh the end-offset information based on the
>>>>>>>>>>>> `commit.interval.ms` -- doing it more often is not really
>>> useful,
>>>>>>>> as
>>>>>>>>>>>> state store caches will most likely buffer up all writes to
>>>>>>>> changelogs
>>>>>>>>>>>> anyway and are only flushed on commit (including a flush of
>> the
>>>>>>>>>>>> producer). Hence, I would suggest to update the time-lag
>>>>> information
>>>>>>>>>>>> based on the same strategy in the background. This way there
>>> is no
>>>>>>>>>>>> additional config or methods and the user does not need to
>>> worry
>>>>>>>> about
>>>>>>>>>>>> it at all.
>>>>>>>>>>>>
>>>>>>>>>>>> To avoid refresh overhead if we don't need it (a user might
>>> not use
>>>>>>>> IQ
>>>>>>>>>>>> to begin with), it might be worth to maintain an internal
>> flag
>>>>>>>>>>>> `updateTimeLagEnabled` that is set to `false` initially and
>>> only
>>>>>>>> set to
>>>>>>>>>>>> `true` on the first call of a user to get standby-metadata.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 11/4/19 5:13 PM, Vinoth Chandar wrote:
>>>>>>>>>>>>>>>  I'm having some trouble wrapping my head around what race
>>>>>>>>> conditions
>>>>>>>>>>>>> might occur, other than the fundamentally broken state in
>>> which
>>>>>>>>>>> different
>>>>>>>>>>>>> instances are running totally different topologies.
>>>>>>>>>>>>> 3. @both Without the topic partitions that the tasks can map
>>> back
>>>>>>>> to,
>>>>>>>>>>> we
>>>>>>>>>>>>> have to rely on topology/cluster metadata in each Streams
>>> instance
>>>>>>>> to
>>>>>>>>>>> map
>>>>>>>>>>>>> the task back. If the source topics are wild carded for e,g
>>> then
>>>>>>>> each
>>>>>>>>>>>>> instance could have different source topics in topology,
>>> until the
>>>>>>>>> next
>>>>>>>>>>>>> rebalance happens. You can also read my comments from here
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>
>>>
>> https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>> seems hard to imagine how encoding arbitrarily long topic
>>> names
>>>>>>>> plus
>>>>>>>>>>> an
>>>>>>>>>>>>> integer for the partition number could be as efficient as
>> task
>>>>> ids,
>>>>>>>>>>> which
>>>>>>>>>>>>> are just two integers.
>>>>>>>>>>>>> 3. if you still have concerns about the efficacy of
>> dictionary
>>>>>>>>>>> encoding,
>>>>>>>>>>>>> happy to engage. The link above also has some benchmark
>> code I
>>>>>>>> used.
>>>>>>>>>>>>> Theoretically, we would send each topic name atleast once,
>> so
>>> yes
>>>>>>>> if
>>>>>>>>>>> you
>>>>>>>>>>>>> compare a 10-20 character topic name + an integer to two
>>> integers,
>>>>>>>> it
>>>>>>>>>>>> will
>>>>>>>>>>>>> be more bytes. But its constant overhead proportional to
>> size
>>> of
>>>>>>>> topic
>>>>>>>>>>>> name
>>>>>>>>>>>>> and with 4,8,12, partitions the size difference between
>>> baseline
>>>>>>>>>>>> (version 4
>>>>>>>>>>>>> where we just repeated topic names for each topic partition)
>>> and
>>>>>>>> the
>>>>>>>>>>> two
>>>>>>>>>>>>> approaches becomes narrow.
>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Plus, Navinder is going to implement a bunch of protocol
>>> code
>>>>>>>> that
>>>>>>>>> we
>>>>>>>>>>>>> might just want to change when the discussion actually does
>>> take
>>>>>>>>> place,
>>>>>>>>>>>> if
>>>>>>>>>>>>> ever.
>>>>>>>>>>>>>>> it'll just be a mental burden for everyone to remember
>> that
>>> we
>>>>>>>> want
>>>>>>>>>>> to
>>>>>>>>>>>>> have this follow-up discussion.
>>>>>>>>>>>>> 3. Is n't people changing same parts of code and tracking
>>> follow
>>>>>>>> ups a
>>>>>>>>>>>>> common thing, we need to deal with anyway?  For this KIP, is
>>> n't
>>>>> it
>>>>>>>>>>>> enough
>>>>>>>>>>>>> to reason about whether the additional map on top of the
>> topic
>>>>>>>>>>> dictionary
>>>>>>>>>>>>> would incur more overhead than the sending task_ids? I don't
>>> think
>>>>>>>>> it's
>>>>>>>>>>>>> case, both of them send two integers. As I see it, we can
>> do a
>>>>>>>>> separate
>>>>>>>>>>>>> follow up to (re)pursue the task_id conversion and get it
>>> working
>>>>>>>> for
>>>>>>>>>>>> both
>>>>>>>>>>>>> maps within the next release?
>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Can you elaborate on "breaking up the API"? It looks like
>>> there
>>>>>>>> are
>>>>>>>>>>>>> already separate API calls in the proposal, one for
>> time-lag,
>>> and
>>>>>>>>>>> another
>>>>>>>>>>>>> for offset-lag, so are they not already broken up?
>>>>>>>>>>>>> The current APIs (e.g lagInfoForStore) for lags return
>>>>> StoreLagInfo
>>>>>>>>>>>> objects
>>>>>>>>>>>>> which has both time and offset lags. If we had separate
>> APIs,
>>> say
>>>>>>>> (e.g
>>>>>>>>>>>>> offsetLagForStore(), timeLagForStore()), we can implement
>>> offset
>>>>>>>>>>> version
>>>>>>>>>>>>> using the offset lag that the streams instance already
>> tracks
>>> i.e
>>>>>>>> no
>>>>>>>>>>> need
>>>>>>>>>>>>> for external calls. The time based lag API would incur the
>>> kafka
>>>>>>>> read
>>>>>>>>>>> for
>>>>>>>>>>>>> the timestamp. makes sense?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Based on the discussions so far, I only see these two
>> pending
>>>>>>>> issues
>>>>>>>>> to
>>>>>>>>>>>> be
>>>>>>>>>>>>> aligned on. Is there any other open item people want to
>> bring
>>> up?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman <
>>>>>>>>>>> sop...@confluent.io
>>>>>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regarding 3) I'm wondering, does your concern still apply
>>> even
>>>>> now
>>>>>>>>>>>>>> that the pluggable PartitionGrouper interface has been
>>>>> deprecated?
>>>>>>>>>>>>>> Now that we can be sure that the DefaultPartitionGrouper is
>>> used
>>>>>>>> to
>>>>>>>>>>>>>> generate
>>>>>>>>>>>>>> the taskId -> partitions mapping, we should be able to
>>> convert
>>>>> any
>>>>>>>>>>>> taskId
>>>>>>>>>>>>>> to any
>>>>>>>>>>>>>> partitions.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Nov 4, 2019 at 11:17 AM John Roesler <
>>> j...@confluent.io>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hey Vinoth, thanks for the reply!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 3.
>>>>>>>>>>>>>>> I get that it's not the main focus of this KIP, but if
>> it's
>>> ok,
>>>>>>>> it
>>>>>>>>>>>>>>> would be nice to hash out this point right now. It only
>>> came up
>>>>>>>>>>>>>>> because this KIP-535 is substantially extending the
>> pattern
>>> in
>>>>>>>>>>>>>>> question. If we push it off until later, then the
>> reviewers
>>> are
>>>>>>>>> going
>>>>>>>>>>>>>>> to have to suspend their concerns not just while voting
>> for
>>> the
>>>>>>>> KIP,
>>>>>>>>>>>>>>> but also while reviewing the code. Plus, Navinder is going
>>> to
>>>>>>>>>>>>>>> implement a bunch of protocol code that we might just want
>>> to
>>>>>>>> change
>>>>>>>>>>>>>>> when the discussion actually does take place, if ever.
>>> Finally,
>>>>>>>>> it'll
>>>>>>>>>>>>>>> just be a mental burden for everyone to remember that we
>>> want to
>>>>>>>>> have
>>>>>>>>>>>>>>> this follow-up discussion.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It makes sense what you say... the specific assignment is
>>>>> already
>>>>>>>>>>>>>>> encoded in the "main" portion of the assignment, not in
>> the
>>>>>>>>>>> "userdata"
>>>>>>>>>>>>>>> part. It also makes sense that it's simpler to reason
>> about
>>>>>>>> races if
>>>>>>>>>>>>>>> you simply get all the information about the topics and
>>>>>>>> partitions
>>>>>>>>>>>>>>> directly from the assignor, rather than get the partition
>>> number
>>>>>>>>> from
>>>>>>>>>>>>>>> the assignor and the topic name from your own a priori
>>> knowledge
>>>>>>>> of
>>>>>>>>>>>>>>> the topology. On the other hand, I'm having some trouble
>>>>>>>> wrapping my
>>>>>>>>>>>>>>> head around what race conditions might occur, other than
>> the
>>>>>>>>>>>>>>> fundamentally broken state in which different instances
>> are
>>>>>>>> running
>>>>>>>>>>>>>>> totally different topologies. Sorry, but can you remind us
>>> of
>>>>> the
>>>>>>>>>>>>>>> specific condition?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> To the efficiency counterargument, it seems hard to
>> imagine
>>> how
>>>>>>>>>>>>>>> encoding arbitrarily long topic names plus an integer for
>>> the
>>>>>>>>>>>>>>> partition number could be as efficient as task ids, which
>>> are
>>>>>>>> just
>>>>>>>>>>> two
>>>>>>>>>>>>>>> integers. It seems like this would only be true if topic
>>> names
>>>>>>>> were
>>>>>>>>> 4
>>>>>>>>>>>>>>> characters or less.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 4.
>>>>>>>>>>>>>>> Yeah, clearly, it would not be a good idea to query the
>>> metadata
>>>>>>>>>>>>>>> before every single IQ query. I think there are plenty of
>>>>>>>>> established
>>>>>>>>>>>>>>> patterns for distributed database clients to follow. Can
>> you
>>>>>>>>>>> elaborate
>>>>>>>>>>>>>>> on "breaking up the API"? It looks like there are already
>>>>>>>> separate
>>>>>>>>>>> API
>>>>>>>>>>>>>>> calls in the proposal, one for time-lag, and another for
>>>>>>>> offset-lag,
>>>>>>>>>>>>>>> so are they not already broken up? FWIW, yes, I agree, the
>>>>> offset
>>>>>>>>> lag
>>>>>>>>>>>>>>> is already locally known, so we don't need to build in an
>>> extra
>>>>>>>>>>>>>>> synchronous broker API call, just one for the time-lag.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks again for the discussion,
>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar <
>>>>>>>>>>> vchan...@confluent.io>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 3. Right now, we still get the topic partitions assigned
>>> as a
>>>>>>>> part
>>>>>>>>>>> of
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> top level Assignment object (the one that wraps
>>> AssignmentInfo)
>>>>>>>> and
>>>>>>>>>>>> use
>>>>>>>>>>>>>>>> that to convert taskIds back. This list of only contains
>>>>>>>>> assignments
>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>> that particular instance. Attempting to also reverse map
>>> for
>>>>>>>> "all"
>>>>>>>>>>> the
>>>>>>>>>>>>>>>> tasksIds in the streams cluster i.e all the topic
>>> partitions in
>>>>>>>>>>> these
>>>>>>>>>>>>>>>> global assignment maps was what was problematic. By
>>> explicitly
>>>>>>>>>>> sending
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> global assignment maps as actual topic partitions,  group
>>>>>>>>>>> coordinator
>>>>>>>>>>>>>>> (i.e
>>>>>>>>>>>>>>>> the leader that computes the assignment's ) is able to
>>>>>>>> consistently
>>>>>>>>>>>>>>> enforce
>>>>>>>>>>>>>>>> its view of the topic metadata. Still don't think doing
>>> such a
>>>>>>>>>>> change
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>> forces you to reconsider semantics, is not needed to save
>>> bits
>>>>>>>> on
>>>>>>>>>>>> wire.
>>>>>>>>>>>>>>> May
>>>>>>>>>>>>>>>> be we can discuss this separately from this KIP?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 4. There needs to be some caching/interval somewhere
>> though
>>>>>>>> since
>>>>>>>>> we
>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>> want to make 1 kafka read per 1 IQ potentially. But I
>>> think its
>>>>>>>> a
>>>>>>>>>>>> valid
>>>>>>>>>>>>>>>> suggestion, to make this call just synchronous and leave
>>> the
>>>>>>>>> caching
>>>>>>>>>>>> or
>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>> often you want to call to the application. Would it be
>>> good to
>>>>>>>> then
>>>>>>>>>>>>>> break
>>>>>>>>>>>>>>>> up the APIs for time and offset based lag?  We can obtain
>>>>> offset
>>>>>>>>>>> based
>>>>>>>>>>>>>>> lag
>>>>>>>>>>>>>>>> for free? Only incur the overhead of reading kafka if we
>>> want
>>>>>>>> time
>>>>>>>>>>>>>>>> based lags?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman <
>>>>>>>>>>>>>> sop...@confluent.io>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Adding on to John's response to 3), can you clarify when
>>> and
>>>>>>>> why
>>>>>>>>>>>>>>> exactly we
>>>>>>>>>>>>>>>>> cannot
>>>>>>>>>>>>>>>>> convert between taskIds and partitions? If that's really
>>> the
>>>>>>>> case
>>>>>>>>> I
>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>> feel confident
>>>>>>>>>>>>>>>>> that the StreamsPartitionAssignor is not full of bugs...
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It seems like it currently just encodes a list of all
>>>>>>>> partitions
>>>>>>>>>>> (the
>>>>>>>>>>>>>>>>> assignment) and also
>>>>>>>>>>>>>>>>> a list of the corresponding task ids, duplicated to
>> ensure
>>>>> each
>>>>>>>>>>>>>>> partition
>>>>>>>>>>>>>>>>> has the corresponding
>>>>>>>>>>>>>>>>> taskId at the same offset into the list. Why is that
>>>>>>>> problematic?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Nov 1, 2019 at 12:39 PM John Roesler <
>>>>>>>> j...@confluent.io>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks, all, for considering the points!
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 3. Interesting. I have a vague recollection of that...
>>> Still,
>>>>>>>>>>>>>> though,
>>>>>>>>>>>>>>>>>> it seems a little fishy. After all, we return the
>>> assignments
>>>>>>>>>>>>>>>>>> themselves as task ids, and the members have to map
>>> these to
>>>>>>>>> topic
>>>>>>>>>>>>>>>>>> partitions in order to configure themselves properly.
>> If
>>> it's
>>>>>>>> too
>>>>>>>>>>>>>>>>>> complicated to get this right, then how do we know that
>>>>>>>> Streams
>>>>>>>>> is
>>>>>>>>>>>>>>>>>> computing the correct partitions at all?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 4. How about just checking the log-end timestamp when
>> you
>>>>> call
>>>>>>>>> the
>>>>>>>>>>>>>>>>>> method? Then, when you get an answer, it's as fresh as
>> it
>>>>>>>> could
>>>>>>>>>>>>>>>>>> possibly be. And as a user you have just one, obvious,
>>> "knob"
>>>>>>>> to
>>>>>>>>>>>>>>>>>> configure how much overhead you want to devote to
>>> checking...
>>>>>>>> If
>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>> want to call the broker API less frequently, you just
>>> call
>>>>> the
>>>>>>>>>>>>>>> Streams
>>>>>>>>>>>>>>>>>> API less frequently. And you don't have to worry about
>>> the
>>>>>>>>>>>>>>>>>> relationship between your invocations of that method
>> and
>>> the
>>>>>>>>>>> config
>>>>>>>>>>>>>>>>>> setting (e.g., you'll never get a negative number,
>> which
>>> you
>>>>>>>>> could
>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>> you check the log-end timestamp less frequently than
>> you
>>>>> check
>>>>>>>>> the
>>>>>>>>>>>>>>>>>> lag).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar
>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks John for going through this.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>    - +1, makes sense
>>>>>>>>>>>>>>>>>>>    - +1, no issues there
>>>>>>>>>>>>>>>>>>>    - Yeah the initial patch I had submitted for
>> K-7149(
>>>>>>>>>>>>>>>>>> https://github.com/apache/kafka/pull/6935) to reduce
>>>>>>>>>>>>>> assignmentInfo
>>>>>>>>>>>>>>>>>> object had taskIds but the merged PR had similar size
>>>>>>>> according
>>>>>>>>> to
>>>>>>>>>>>>>>> Vinoth
>>>>>>>>>>>>>>>>>> and it was simpler so if the end result is of same
>> size,
>>> it
>>>>>>>> would
>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>> make
>>>>>>>>>>>>>>>>>> sense to pivot from dictionary and again move to
>> taskIDs.
>>>>>>>>>>>>>>>>>>>    - Not sure about what a good default would be if we
>>> don't
>>>>>>>>>>>>>> have a
>>>>>>>>>>>>>>>>>> configurable setting. This gives the users the
>>> flexibility to
>>>>>>>> the
>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> serve their requirements as at the end of the day it
>>> would
>>>>>>>> take
>>>>>>>>>>> CPU
>>>>>>>>>>>>>>>>> cycles.
>>>>>>>>>>>>>>>>>> I am ok with starting it with a default and see how it
>>> goes
>>>>>>>> based
>>>>>>>>>>>>>>> upon
>>>>>>>>>>>>>>>>>> feedback.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>> Navinder
>>>>>>>>>>>>>>>>>>>    On Friday, 1 November, 2019, 03:46:42 am IST,
>> Vinoth
>>>>>>>> Chandar
>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>>>>> vchan...@confluent.io> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>  1. Was trying to spell them out separately. but makes
>>> sense
>>>>>>>>> for
>>>>>>>>>>>>>>>>>>> readability. done
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 2. No I immediately agree :) .. makes sense.
>> @navinder?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 3. I actually attempted only sending taskIds while
>>> working
>>>>> on
>>>>>>>>>>>>>>>>> KAFKA-7149.
>>>>>>>>>>>>>>>>>>> Its non-trivial to handle edges cases resulting from
>>> newly
>>>>>>>> added
>>>>>>>>>>>>>>> topic
>>>>>>>>>>>>>>>>>>> partitions and wildcarded topic entries. I ended up
>>>>>>>> simplifying
>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>> dictionary encoding the topic names to reduce size. We
>>> can
>>>>>>>> apply
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> same
>>>>>>>>>>>>>>>>>>> technique here for this map. Additionally, we could
>> also
>>>>>>>>>>>>>> dictionary
>>>>>>>>>>>>>>>>>> encode
>>>>>>>>>>>>>>>>>>> HostInfo, given its now repeated twice. I think this
>>> would
>>>>>>>> save
>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>> space
>>>>>>>>>>>>>>>>>>> than having a flag per topic partition entry. Lmk if
>>> you are
>>>>>>>>> okay
>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 4. This opens up a good discussion. Given we support
>>> time
>>>>> lag
>>>>>>>>>>>>>>> estimates
>>>>>>>>>>>>>>>>>>> also, we need to read the tail record of the changelog
>>>>>>>>>>>>>> periodically
>>>>>>>>>>>>>>>>>> (unlike
>>>>>>>>>>>>>>>>>>> offset lag, which we can potentially piggyback on
>>> metadata
>>>>> in
>>>>>>>>>>>>>>>>>>> ConsumerRecord IIUC). we thought we should have a
>> config
>>>>> that
>>>>>>>>>>>>>>> control
>>>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>> often this read happens? Let me know if there is a
>>> simple
>>>>>>>> way to
>>>>>>>>>>>>>>> get
>>>>>>>>>>>>>>>>>>> timestamp value of the tail record that we are
>> missing.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Thu, Oct 31, 2019 at 12:58 PM John Roesler <
>>>>>>>>> j...@confluent.io
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hey Navinder,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks for updating the KIP, it's a lot easier to see
>>> the
>>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>>>> state of the proposal now.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> A few remarks:
>>>>>>>>>>>>>>>>>>>> 1. I'm sure it was just an artifact of revisions, but
>>> you
>>>>>>>> have
>>>>>>>>>>>>>>> two
>>>>>>>>>>>>>>>>>>>> separate sections where you list additions to the
>>>>>>>> KafkaStreams
>>>>>>>>>>>>>>>>>>>> interface. Can you consolidate those so we can see
>> all
>>> the
>>>>>>>>>>>>>>> additions
>>>>>>>>>>>>>>>>>>>> at once?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 2. For messageLagEstimate, can I suggest
>>>>> "offsetLagEstimate"
>>>>>>>>>>>>>>> instead,
>>>>>>>>>>>>>>>>>>>> to be clearer that we're specifically measuring a
>>> number of
>>>>>>>>>>>>>>> offsets?
>>>>>>>>>>>>>>>>>>>> If you don't immediately agree, then I'd at least
>>> point out
>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>> usually refer to elements of Kafka topics as
>>> "records", not
>>>>>>>>>>>>>>>>>>>> "messages", so "recordLagEstimate" might be more
>>>>>>>> appropriate.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 3. The proposal mentions adding a map of the standby
>>>>>>>>>>>>>>> _partitions_ for
>>>>>>>>>>>>>>>>>>>> each host to AssignmentInfo. I assume this is
>> designed
>>> to
>>>>>>>>>>>>>> mirror
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> existing "partitionsByHost" map. To keep the size of
>>> these
>>>>>>>>>>>>>>> metadata
>>>>>>>>>>>>>>>>>>>> messages down, maybe we can consider making two
>>> changes:
>>>>>>>>>>>>>>>>>>>> (a) for both actives and standbys, encode the _task
>>> ids_
>>>>>>>>>>>>>> instead
>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>> _partitions_. Every member of the cluster has a copy
>>> of the
>>>>>>>>>>>>>>> topology,
>>>>>>>>>>>>>>>>>>>> so they can convert task ids into specific partitions
>>> on
>>>>>>>> their
>>>>>>>>>>>>>>> own,
>>>>>>>>>>>>>>>>>>>> and task ids are only (usually) three characters.
>>>>>>>>>>>>>>>>>>>> (b) instead of encoding two maps (hostinfo -> actives
>>> AND
>>>>>>>>>>>>>>> hostinfo ->
>>>>>>>>>>>>>>>>>>>> standbys), which requires serializing all the
>> hostinfos
>>>>>>>> twice,
>>>>>>>>>>>>>>> maybe
>>>>>>>>>>>>>>>>>>>> we can pack them together in one map with a
>> structured
>>>>> value
>>>>>>>>>>>>>>>>> (hostinfo
>>>>>>>>>>>>>>>>>>>> -> [actives,standbys]).
>>>>>>>>>>>>>>>>>>>> Both of these ideas still require bumping the
>> protocol
>>>>>>>> version
>>>>>>>>>>>>>>> to 6,
>>>>>>>>>>>>>>>>>>>> and they basically mean we drop the existing
>>>>>>>> `PartitionsByHost`
>>>>>>>>>>>>>>> field
>>>>>>>>>>>>>>>>>>>> and add a new `TasksByHost` field with the structured
>>> value
>>>>>>>> I
>>>>>>>>>>>>>>>>>>>> mentioned.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 4. Can we avoid adding the new "lag refresh" config?
>>> The
>>>>>>>> lags
>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>> necessarily be approximate anyway, so adding the
>> config
>>>>>>>> seems
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>> increase the operational complexity of the system for
>>>>> little
>>>>>>>>>>>>>>> actual
>>>>>>>>>>>>>>>>>>>> benefit.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks for the pseudocode, by the way, it really
>> helps
>>>>>>>>>>>>>> visualize
>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>>>>>>> these new interfaces would play together. And thanks
>>> again
>>>>>>>> for
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> update!
>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thu, Oct 31, 2019 at 2:41 PM John Roesler <
>>>>>>>>>>>>>> j...@confluent.io>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hey Vinoth,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I started going over the KIP again yesterday. There
>>> are a
>>>>>>>> lot
>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>> updates, and I didn't finish my feedback in one day.
>>> I'm
>>>>>>>>>>>>>>> working on
>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>> now.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>> John
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar <
>>>>>>>>>>>>>>>>>> vchan...@confluent.io>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Wondering if anyone has thoughts on these changes?
>> I
>>>>> liked
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>> the new
>>>>>>>>>>>>>>>>>>>>>> metadata fetch APIs provide all the information at
>>> once
>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>> consistent
>>>>>>>>>>>>>>>>>>>>>> naming..
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Any guidance on what you would like to be discussed
>>> or
>>>>>>>>>>>>>>> fleshed
>>>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>>>> before we call a VOTE?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar
>>>>>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>> We have made some edits in the KIP(
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
>>>>>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>>>>>> after due deliberation on the agreed design to
>>> support
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>> query
>>>>>>>>>>>>>>>>>>>>>>> design. This includes the new public API to query
>>>>>>>>>>>>>>> offset/time
>>>>>>>>>>>>>>>>> lag
>>>>>>>>>>>>>>>>>>>>>>> information and other details related to querying
>>>>> standby
>>>>>>>>>>>>>>> tasks
>>>>>>>>>>>>>>>>>>>> which have
>>>>>>>>>>>>>>>>>>>>>>> come up after thinking of thorough details.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>    - Addition of new config, “
>> lag.fetch.interval.ms”
>>> to
>>>>>>>>>>>>>>>>>> configure
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> interval of time/offset lag
>>>>>>>>>>>>>>>>>>>>>>>    - Addition of new class StoreLagInfo to store
>> the
>>>>>>>>>>>>>>>>> periodically
>>>>>>>>>>>>>>>>>>>> obtained
>>>>>>>>>>>>>>>>>>>>>>> time/offset lag
>>>>>>>>>>>>>>>>>>>>>>>    - Addition of two new functions in
>> KafkaStreams,
>>>>>>>>>>>>>>>>>>>> List<StoreLagInfo>
>>>>>>>>>>>>>>>>>>>>>>> allLagInfo() and List<StoreLagInfo>
>>>>>>>>>>>>>> lagInfoForStore(String
>>>>>>>>>>>>>>>>>>>> storeName) to
>>>>>>>>>>>>>>>>>>>>>>> return the lag information for an instance and a
>>> store
>>>>>>>>>>>>>>>>>> respectively
>>>>>>>>>>>>>>>>>>>>>>>    - Addition of new class KeyQueryMetadata. We
>> need
>>>>>>>>>>>>>>>>>> topicPartition
>>>>>>>>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>>> each key to be matched with the lag API for the
>>> topic
>>>>>>>>>>>>>>>>> partition.
>>>>>>>>>>>>>>>>>> One
>>>>>>>>>>>>>>>>>>>> way is
>>>>>>>>>>>>>>>>>>>>>>> to add new functions and fetch topicPartition from
>>>>>>>>>>>>>>>>>>>> StreamsMetadataState but
>>>>>>>>>>>>>>>>>>>>>>> we thought having one call and fetching
>>> StreamsMetadata
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> topicPartition
>>>>>>>>>>>>>>>>>>>>>>> is more cleaner.
>>>>>>>>>>>>>>>>>>>>>>>    -
>>>>>>>>>>>>>>>>>>>>>>> Renaming partitionsForHost to
>>> activePartitionsForHost in
>>>>>>>>>>>>>>>>>>>> StreamsMetadataState
>>>>>>>>>>>>>>>>>>>>>>> and partitionsByHostState to
>>> activePartitionsByHostState
>>>>>>>>>>>>>>>>>>>>>>> in StreamsPartitionAssignor
>>>>>>>>>>>>>>>>>>>>>>>    - We have also added the pseudo code of how all
>>> the
>>>>>>>>>>>>>>> changes
>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>> exist
>>>>>>>>>>>>>>>>>>>>>>> together and support the new querying APIs
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Please let me know if anything is pending now,
>>> before a
>>>>>>>>>>>>>>> vote
>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>> started on this.  On Saturday, 26 October, 2019,
>>>>> 05:41:44
>>>>>>>>>>>>>>> pm
>>>>>>>>>>>>>>>>> IST,
>>>>>>>>>>>>>>>>>>>> Navinder
>>>>>>>>>>>>>>>>>>>>>>> Brar <navinder_b...@yahoo.com.invalid> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>  >> Since there are two soft votes for separate
>>>>>>>>>>>>>>> active/standby
>>>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>>>>>>> methods, I also change my position on that. Fine
>>> with 2
>>>>>>>>>>>>>>>>> separate
>>>>>>>>>>>>>>>>>>>>>>> methods. Once we remove the lag information from
>>> these
>>>>>>>>>>>>>>> APIs,
>>>>>>>>>>>>>>>>>>>> returning a
>>>>>>>>>>>>>>>>>>>>>>> List is less attractive, since the ordering has no
>>>>>>>>>>>>>> special
>>>>>>>>>>>>>>>>>> meaning
>>>>>>>>>>>>>>>>>>>> now.
>>>>>>>>>>>>>>>>>>>>>>> Agreed, now that we are not returning lag, I am
>> also
>>>>> sold
>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>> having
>>>>>>>>>>>>>>>>>>>> two
>>>>>>>>>>>>>>>>>>>>>>> separate functions. We already have one which
>>> returns
>>>>>>>>>>>>>>>>>>>> streamsMetadata for
>>>>>>>>>>>>>>>>>>>>>>> active tasks, and now we can add another one for
>>>>>>>>>>>>>> standbys.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>    On Saturday, 26 October, 2019, 03:55:16 am IST,
>>>>> Vinoth
>>>>>>>>>>>>>>>>>> Chandar <
>>>>>>>>>>>>>>>>>>>>>>> vchan...@confluent.io> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>  +1 to Sophie's suggestion. Having both lag in
>>> terms of
>>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> offsets is
>>>>>>>>>>>>>>>>>>>>>>> good and makes for a more complete API.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Since there are two soft votes for separate
>>>>>>>>>>>>>> active/standby
>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>>>> methods, I
>>>>>>>>>>>>>>>>>>>>>>> also change my position on that. Fine with 2
>>> separate
>>>>>>>>>>>>>>> methods.
>>>>>>>>>>>>>>>>>>>>>>> Once we remove the lag information from these
>> APIs,
>>>>>>>>>>>>>>> returning a
>>>>>>>>>>>>>>>>>> List
>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>> less attractive, since the ordering has no special
>>>>>>>>>>>>>> meaning
>>>>>>>>>>>>>>> now.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> lag in offsets vs time: Having both, as
>> suggested
>>> by
>>>>>>>>>>>>>>> Sophie
>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>>>>>>>>> course be best. What is a little unclear to me is,
>>> how
>>>>> in
>>>>>>>>>>>>>>>>> details
>>>>>>>>>>>>>>>>>>>> are we
>>>>>>>>>>>>>>>>>>>>>>> going to compute both?
>>>>>>>>>>>>>>>>>>>>>>> @navinder may be next step is to flesh out these
>>> details
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> surface
>>>>>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>>>>>>>> larger changes we need to make if need be.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Any other details we need to cover, before a VOTE
>>> can be
>>>>>>>>>>>>>>> called
>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>> this?
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck <
>>>>>>>>>>>>>>> bbej...@gmail.com
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I am jumping in a little late here.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Overall I agree with the proposal to push
>> decision
>>>>>>>>>>>>>>> making on
>>>>>>>>>>>>>>>>>>>> what/how to
>>>>>>>>>>>>>>>>>>>>>>>> query in the query layer.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> For point 5 from above, I'm slightly in favor of
>>> having
>>>>>>>>>>>>>>> a new
>>>>>>>>>>>>>>>>>>>> method,
>>>>>>>>>>>>>>>>>>>>>>>> "standbyMetadataForKey()" or something similar.
>>>>>>>>>>>>>>>>>>>>>>>> Because even if we return all tasks in one list,
>>> the
>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>> still have
>>>>>>>>>>>>>>>>>>>>>>>> to perform some filtering to separate the
>> different
>>>>>>>>>>>>>>> tasks,
>>>>>>>>>>>>>>>>> so I
>>>>>>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>>>>>>>> feel
>>>>>>>>>>>>>>>>>>>>>>>> making two calls is a burden, and IMHO makes
>> things
>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>> transparent for
>>>>>>>>>>>>>>>>>>>>>>>> the user.
>>>>>>>>>>>>>>>>>>>>>>>> If the final vote is for using an "isActive"
>>> field, I'm
>>>>>>>>>>>>>>> good
>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>> that as
>>>>>>>>>>>>>>>>>>>>>>>> well.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Just my 2 cents.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
>>>>>>>>>>>>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> I think now we are aligned on almost all the
>>> design
>>>>>>>>>>>>>>> parts.
>>>>>>>>>>>>>>>>>>>> Summarising
>>>>>>>>>>>>>>>>>>>>>>>>> below what has been discussed above and we have
>> a
>>>>>>>>>>>>>>> general
>>>>>>>>>>>>>>>>>>>> consensus on.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>    - Rather than broadcasting lag across all
>>> nodes at
>>>>>>>>>>>>>>>>>>>> rebalancing/with
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> heartbeat, we will just return a list of all
>>>>>>>>>>>>>> available
>>>>>>>>>>>>>>>>>> standby’s
>>>>>>>>>>>>>>>>>>>> in the
>>>>>>>>>>>>>>>>>>>>>>>>> system and the user can make IQ query any of
>> those
>>>>>>>>>>>>>>> nodes
>>>>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>>>> return
>>>>>>>>>>>>>>>>>>>>>>>>> the response, and the lag and offset time. Based
>>> on
>>>>>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>> user
>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>> decide
>>>>>>>>>>>>>>>>>>>>>>>>> if he wants to return the response back or call
>>>>>>>>>>>>>> another
>>>>>>>>>>>>>>>>>> standby.
>>>>>>>>>>>>>>>>>>>>>>>>>    -  The current metadata query frequency will
>>> not
>>>>>>>>>>>>>>> change.
>>>>>>>>>>>>>>>>>> It
>>>>>>>>>>>>>>>>>>>> will be
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> same as it does now, i.e. before each query.
>>>>>>>>
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to