Hi all,

I've been mulling about this KIP, and I think I was on the wrong track
earlier with regard to task lags. Tl;dr: I don't think we should add
lags at all to the metadata API (and also not to the AssignmentInfo
protocol message).

Like I mentioned early on, reporting lag via
SubscriptionInfo/AssignmentInfo would only work while rebalances are
happening. Once the group stabilizes, no members would be notified of
each others' lags anymore. I had been thinking that the solution would
be the heartbeat proposal I mentioned earlier, but that proposal would
have reported the heartbeats of the members only to the leader member
(the one who makes assignments). To be useful in the context of _this_
KIP, we would also have to report the lags in the heartbeat responses
to of _all_ members. This is a concern to be because now _all_ the
lags get reported to _all_ the members on _every_ heartbeat... a lot
of chatter.

Plus, the proposal for KIP-441 is only to report the lags of each
_task_. This is the sum of the lags of all the stores in the tasks.
But this would be insufficient for KIP-535. For this kip, we would
want the lag specifically of the store we want to query. So this
means, we have to report the lags of all the stores of all the members
to every member... even more chatter!

The final nail in the coffin to me is that IQ clients would have to
start refreshing their metadata quite frequently to stay up to date on
the lags, which adds even more overhead to the system.

Consider a strawman alternative: we bring KIP-535 back to extending
the metadata API to tell the client the active and standby replicas
for the key in question (not including and "staleness/lag"
restriction, just returning all the replicas). Then, the client picks
a replica and sends the query. The server returns the current lag
along with the response (maybe in an HTML header or something). Then,
the client keeps a map of its last observed lags for each replica, and
uses this information to prefer fresher replicas.

OR, if it wants only to query the active replica, it would throw an
error on any lag response greater than zero, refreshes its metadata by
re-querying the metadata API, and tries again with the current active
replica.

This way, the lag information will be super fresh for the client, and
we keep the Metadata API / Assignment,Subscription / and Heartbeat as
slim as possible.

Side note: I do think that some time soon, we'll have to add a library
for IQ server/clients. I think that this logic will start to get
pretty complex.

I hope this thinking is reasonably clear!
Thanks again,
-John

Does that

On Wed, Oct 23, 2019 at 10:16 AM Vinoth Chandar <vchan...@confluent.io> wrote:
>
> Responding to the points raised by Matthias
>
> 1. IIUC John intends to add (or we can do this in this KIP) lag information
> to AssignmentInfo, which gets sent to every participant.
>
> 2. At-least I was under the assumption that it can be called per query,
> since the API docs don't seem to suggest otherwise. Do you see any
> potential issues if we call this every query? (we should benchmark this
> nonetheless)
>
> 4. Agree. metadataForKey() implicitly would return the active host metadata
> (as it was before). We should also document this in that APIs javadoc,
> given we have another method(s) that returns more host metadata now.
>
> 5.  While I see the point, the app/caller has to make two different APIs
> calls to obtain active/standby and potentially do the same set of operation
> to query the state. I personally still like a method like isActive()
> better, but don't have strong opinions.
>
> 9. If we do expose the lag information, could we just leave it upto to the
> caller to decide whether it errors out or not and not make the decision
> within Streams? i.e we don't need a new config
>
> 14. +1 . If it's easier to do right away. We started with number of
> records, following the lead from KIP-441
>
> On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar
> <navinder_b...@yahoo.com.invalid> wrote:
>
> >
> > Thanks, everyone for taking a look. Some very cool ideas have flown in.
> >
> > >> There was a follow-on idea I POCed to continuously share lag
> > information in the heartbeat protocol+1 that would be great, I will update
> > the KIP assuming this work will finish soon
> > >> I think that adding a new method to StreamsMetadataState and
> > deprecating the existing method isthe best way to go; we just can't change
> > the return types of any existing methods.+1 on this, we will add new
> > methods for users who would be interested in querying back a list of
> > possible options to query from and leave the current function
> > getStreamsMetadataForKey() untouched for users who want absolute
> > consistency.
> > >> why not just always return all available metadata (including
> > active/standby or lag) and let the caller decide to which node they want to
> > route the query+1. I think this makes sense as from a user standpoint there
> > is no difference b/w an active and a standby if both have same lag, Infact
> > users would be able to use this API to reduce query load on actives, so
> > returning all available options along with the current lag in each would
> > make sense and leave it to user how they want to use this data. This has
> > another added advantage. If a user queries any random machine for a key and
> > that machine has a replica for the partition(where key belongs) user might
> > choose to serve the data from there itself(if it doesn’t lag much) rather
> > than finding the active and making an IQ to that. This would save some
> > critical time in serving for some applications.
> > >> Adding the lag in terms of timestamp diff comparing the committed
> > offset.+1 on this, I think it’s more readable. But as John said the
> > function allMetadataForKey() is just returning the possible options from
> > where users can query a key, so we can even drop the parameter
> > enableReplicaServing/tolerableDataStaleness and just return all the
> > streamsMetadata containing that key along with the offset limit.
> > Answering the questions posted by Matthias in sequence.
> > 1. @John can you please comment on this one.2. Yeah the usage pattern
> > would include querying this prior to every request 3. Will add the changes
> > to StreamsMetadata in the KIP, would include changes in rebuildMetadata()
> > etc.4. Makes sense, already addressed above5. Is it important from a user
> > perspective if they are querying an  active(processing), active(restoring),
> > a standby task if we have away of denoting lag in a readable manner which
> > kind of signifies the user that this is the best node to query the fresh
> > data.6. Yes, I intend to return the actives and replicas in the same return
> > list in allMetadataForKey()7. tricky8. yes, we need new functions to return
> > activeRestoring and standbyRunning tasks.9. StreamsConfig doesn’t look like
> > of much use to me since we are giving all possible options via this
> > function, or they can use existing function getStreamsMetadataForKey() and
> > get just the active10. I think treat them both the same and let the lag do
> > the talking11. We are just sending them the option to query from in
> > allMetadataForKey(), which doesn’t include any handle. We then query that
> > machine for the key where it calls allStores() and tries to find the task
> > in activeRunning/activeRestoring/standbyRunning and adds the store handle
> > here. 12. Need to verify, but during the exact point when store is closed
> > to transition it from restoring to running the queries will fail. The
> > caller in such case can have their own configurable retries to check again
> > or try the replica if a call fails to active13. I think KIP-216 is working
> > on those lines, we might not need few of those exceptions since now the
> > basic idea of this KIP is to support IQ during rebalancing.14. Addressed
> > above, agreed it looks more readable.
> >
> >
> >     On Tuesday, 22 October, 2019, 08:39:07 pm IST, Matthias J. Sax <
> > matth...@confluent.io> wrote:
> >
> >  One more thought:
> >
> > 14) Is specifying the allowed lag in number of records a useful way for
> > users to declare how stale an instance is allowed to be? Would it be
> > more intuitive for users to specify the allowed lag in time units (would
> > event time or processing time be better)? It seems hard for users to
> > reason how "fresh" a store really is when number of records is used.
> >
> >
> > -Matthias
> >
> > On 10/21/19 9:02 PM, Matthias J. Sax wrote:
> > > Some more follow up thoughts:
> > >
> > > 11) If we get a store handle of an active(restoring) task, and the task
> > > transits to running, does the store handle become invalid and a new one
> > > must be retrieved? Or can we "switch it out" underneath -- for this
> > > case, how does the user know when they start to query the up-to-date
> > state?
> > >
> > > 12) Standby tasks will have the store open in regular mode, while
> > > active(restoring) tasks open stores in "upgrade mode" for more efficient
> > > bulk loading. When we switch the store into active mode, we close it and
> > > reopen it. What is the impact if we query the store during restore? What
> > > is the impact if we close the store to transit to running (eg, there
> > > might be open iterators)?
> > >
> > > 13) Do we need to introduced new exception types? Compare KIP-216
> > > (
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors
> > )
> > > that aims to improve the user experience with regard to IQ exceptions.
> > >
> > >
> > > -Matthias
> > >
> > > On 10/21/19 6:39 PM, Matthias J. Sax wrote:
> > >> Thanks for the KIP.
> > >>
> > >> Couple of comments:
> > >>
> > >> 1) With regard to KIP-441, my current understanding is that the lag
> > >> information is only reported to the leader (please correct me if I am
> > >> wrong). This seems to be quite a limitation to actually use the lag
> > >> information.
> > >>
> > >> 2) The idea of the metadata API is actually to get metadata once and
> > >> only refresh the metadata if a store was migrated. The current proposal
> > >> would require to get the metadata before each query. The KIP should
> > >> describe the usage pattern and impact in more detail.
> > >>
> > >> 3) Currently, the KIP does not list the public API changes in detail.
> > >> Please list all methods you intend to deprecate and list all methods you
> > >> intend to add (best, using a code-block markup -- compare
> > >>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements
> > >> as an example)
> > >>
> > >> 4) Also note (as already pointed out by John), that we cannot have any
> > >> breaking API changes. Thus, the API should be designed in a fully
> > >> backward compatible manner.
> > >>
> > >> 5) Returning a list of metadata object makes it hard for user to know if
> > >> the first object refers to the active(processing), active(restoring), or
> > >> a standby task. IMHO, we should be more explicit. For example, a
> > >> metadata object could have a flag that one can test via `#isActive()`.
> > >> Or maybe even better, we could keep the current API as-is and add
> > >> something like `standbyMetadataForKey()` (and similar methods for
> > >> other). Having just a flag `isActive()` is a little subtle and having
> > >> new overloads would make the API much clearer (passing in a boolean flag
> > >> does not seem to be a nice API).
> > >>
> > >> 6) Do you intent to return all standby metadata information at once,
> > >> similar to `allMetadata()` -- seems to be useful.
> > >>
> > >> 7) Even if the lag information is propagated to all instances, it will
> > >> happen in an async manner. Hence, I am wondering if we should address
> > >> this race condition (I think we should). The idea would be to check if a
> > >> standby/active(restoring) task is actually still within the lag bounds
> > >> when a query is executed and we would throw an exception if not.
> > >>
> > >> 8) The current `KafkaStreams#state()` method only returns a handle to
> > >> stores of active(processing) tasks. How can a user actually get a handle
> > >> to an store of an active(restoring) or standby task for querying? Seems
> > >> we should add a new method to get standby handles? Changing the
> > >> semantics to existing `state()` would be possible, but I think adding a
> > >> new method is preferable?
> > >>
> > >> 9) How does the user actually specify the acceptable lag? A global
> > >> config via StreamsConfig (this would be a public API change that needs
> > >> to be covered in the KIP)? Or on a per-store or even per-query basis for
> > >> more flexibility? We could also have a global setting that is used as
> > >> default and allow to overwrite it on a per-query basis.
> > >>
> > >> 10) Do we need to distinguish between active(restoring) and standby
> > >> tasks? Or could be treat both as the same?
> > >>
> > >>
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >> On 10/21/19 5:40 PM, Vinoth Chandar wrote:
> > >>>>> I'm wondering, rather than putting "acceptable lag" into the
> > >>> configuration at all, or even making it a parameter on
> > `allMetadataForKey`,
> > >>> why not just _always_ return all available metadata (including
> > >>> active/standby or lag) and let the caller decide to which node they
> > want to
> > >>> route the query?
> > >>> +1 on exposing lag information via the APIs. IMO without having
> > >>> continuously updated/fresh lag information, its true value as a signal
> > for
> > >>> query routing decisions is much limited. But we can design the API
> > around
> > >>> this model and iterate? Longer term, we should have continuously
> > shared lag
> > >>> information.
> > >>>
> > >>>>> more general to refactor it to "allMetadataForKey(long
> > >>> tolerableDataStaleness, ...)", and when it's set to 0 it means "active
> > task
> > >>> only".
> > >>> +1 IMO if we plan on having `enableReplicaServing`, it makes sense to
> > >>> generalize based on dataStaleness. This seems complementary to
> > exposing the
> > >>> lag information itself.
> > >>>
> > >>>>> This is actually not a public api change at all, and I'm planning to
> > >>> implement it asap as a precursor to the rest of KIP-441
> > >>> +1 again. Do we have a concrete timeline for when this change will
> > land on
> > >>> master? I would like to get the implementation wrapped up (as much as
> > >>> possible) by end of the month. :). But I agree this sequencing makes
> > >>> sense..
> > >>>
> > >>>
> > >>> On Mon, Oct 21, 2019 at 2:56 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >>>
> > >>>> Hi Navinder,
> > >>>>
> > >>>> Thanks for the KIP, I have a high level question about the proposed
> > API
> > >>>> regarding:
> > >>>>
> > >>>> "StreamsMetadataState::allMetadataForKey(boolean
> > enableReplicaServing...)"
> > >>>>
> > >>>> I'm wondering if it's more general to refactor it to
> > >>>> "allMetadataForKey(long tolerableDataStaleness, ...)", and when it's
> > set to
> > >>>> 0 it means "active task only". Behind the scene, we can have the
> > committed
> > >>>> offsets to encode the stream time as well, so that when processing
> > standby
> > >>>> tasks the stream process knows not long the lag in terms of offsets
> > >>>> comparing to the committed offset (internally we call it offset
> > limit), but
> > >>>> also the lag in terms of timestamp diff comparing the committed
> > offset.
> > >>>>
> > >>>> Also encoding the timestamp as part of offset have other benefits for
> > >>>> improving Kafka Streams time semantics as well, but for KIP-535
> > itself I
> > >>>> think it can help giving users a more intuitive interface to reason
> > about.
> > >>>>
> > >>>>
> > >>>> Guozhang
> > >>>>
> > >>>> On Mon, Oct 21, 2019 at 12:30 PM John Roesler <j...@confluent.io>
> > wrote:
> > >>>>
> > >>>>> Hey Navinder,
> > >>>>>
> > >>>>> Thanks for the KIP! I've been reading over the discussion thus far,
> > >>>>> and I have a couple of thoughts to pile on as well:
> > >>>>>
> > >>>>> It seems confusing to propose the API in terms of the current system
> > >>>>> state, but also propose how the API would look if/when KIP-441 is
> > >>>>> implemented. It occurs to me that the only part of KIP-441 that would
> > >>>>> affect you is the availability of the lag information in the
> > >>>>> SubscriptionInfo message. This is actually not a public api change at
> > >>>>> all, and I'm planning to implement it asap as a precursor to the rest
> > >>>>> of KIP-441, so maybe you can just build on top of KIP-441 and assume
> > >>>>> the lag information will be available. Then you could have a more
> > >>>>> straightforward proposal (e.g., mention that you'd return the lag
> > >>>>> information in AssignmentInfo as well as in the StreamsMetadata in
> > >>>>> some form, or make use of it in the API somehow).
> > >>>>>
> > >>>>> I'm partially motivated in that former point because it seems like
> > >>>>> understanding how callers would bound the staleness for their use
> > case
> > >>>>> is _the_ key point for this KIP. FWIW, I think that adding a new
> > >>>>> method to StreamsMetadataState and deprecating the existing method is
> > >>>>> the best way to go; we just can't change the return types of any
> > >>>>> existing methods.
> > >>>>>
> > >>>>> I'm wondering, rather than putting "acceptable lag" into the
> > >>>>> configuration at all, or even making it a parameter on
> > >>>>> `allMetadataForKey`, why not just _always_ return all available
> > >>>>> metadata (including active/standby or lag) and let the caller decide
> > >>>>> to which node they want to route the query? This method isn't making
> > >>>>> any queries itself; it's merely telling you where the local Streams
> > >>>>> instance _thinks_ the key in question is located. Just returning all
> > >>>>> available information lets the caller implement any semantics they
> > >>>>> desire around querying only active stores, or standbys, or recovering
> > >>>>> stores, or whatever.
> > >>>>>
> > >>>>> One fly in the ointment, which you may wish to consider if proposing
> > >>>>> to use lag information, is that the cluster would only become aware
> > of
> > >>>>> new lag information during rebalances. Even in the full expression of
> > >>>>> KIP-441, this information would stop being propagated when the
> > cluster
> > >>>>> achieves a balanced task distribution. There was a follow-on idea I
> > >>>>> POCed to continuously share lag information in the heartbeat
> > protocol,
> > >>>>> which you might be interested in, if you want to make sure that nodes
> > >>>>> are basically _always_ aware of each others' lag on different
> > >>>>> partitions: https://github.com/apache/kafka/pull/7096
> > >>>>>
> > >>>>> Thanks again!
> > >>>>> -John
> > >>>>>
> > >>>>>
> > >>>>> On Sat, Oct 19, 2019 at 6:06 AM Navinder Brar
> > >>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > >>>>>>
> > >>>>>> Thanks, Vinoth. Looks like we are on the same page. I will add some
> > of
> > >>>>> these explanations to the KIP as well. Have assigned the KAFKA-6144
> > to
> > >>>>> myself and KAFKA-8994 is closed(by you). As suggested, we will
> > replace
> > >>>>> "replica" with "standby".
> > >>>>>>
> > >>>>>> In the new API, "StreamsMetadataState::allMetadataForKey(boolean
> > >>>>> enableReplicaServing, String storeName, K key, Serializer<K>
> > >>>>> keySerializer)" Do we really need a per key configuration? or a new
> > >>>>> StreamsConfig is good enough?>> Coming from experience, when teams
> > are
> > >>>>> building a platform with Kafka Streams and these API's serve data to
> > >>>>> multiple teams, we can't have a generalized config that says as a
> > >>>> platform
> > >>>>> we will support stale reads or not. It should be the choice of
> > someone
> > >>>> who
> > >>>>> is calling the API's to choose whether they are ok with stale reads
> > or
> > >>>> not.
> > >>>>> Makes sense?
> > >>>>>>    On Thursday, 17 October, 2019, 11:56:02 pm IST, Vinoth Chandar <
> > >>>>> vchan...@confluent.io> wrote:
> > >>>>>>
> > >>>>>>  Looks like we are covering ground :)
> > >>>>>>
> > >>>>>>>> Only if it is within a permissible  range(say 10000) we will serve
> > >>>> from
> > >>>>>> Restoring state of active.
> > >>>>>> +1 on having a knob like this.. My reasoning is as follows.
> > >>>>>>
> > >>>>>> Looking at the Streams state as a read-only distributed kv store.
> > With
> > >>>>>> num_standby = f , we should be able to tolerate f failures and if
> > there
> > >>>>> is
> > >>>>>> a f+1' failure, the system should be unavailable.
> > >>>>>>
> > >>>>>> A) So with num_standby=0, the system should be unavailable even if
> > >>>> there
> > >>>>> is
> > >>>>>> 1 failure and thats my argument for not allowing querying in
> > >>>> restoration
> > >>>>>> state, esp in this case it will be a total rebuild of the state
> > (which
> > >>>>> IMO
> > >>>>>> cannot be considered a normal fault free operational state).
> > >>>>>>
> > >>>>>> B) Even there are standby's, say num_standby=2, if the user decides
> > to
> > >>>>> shut
> > >>>>>> down all 3 instances, then only outcome should be unavailability
> > until
> > >>>>> all
> > >>>>>> of them come back or state is rebuilt on other nodes in the
> > cluster. In
> > >>>>>> normal operations, f <= 2 and when a failure does happen we can then
> > >>>>> either
> > >>>>>> choose to be C over A and fail IQs until replication is fully
> > caught up
> > >>>>> or
> > >>>>>> choose A over C by serving in restoring state as long as lag is
> > >>>> minimal.
> > >>>>> If
> > >>>>>> even with f=1 say, all the standbys are lagging a lot due to some
> > >>>> issue,
> > >>>>>> then that should be considered a failure since that is different
> > from
> > >>>>>> normal/expected operational mode. Serving reads with unbounded
> > >>>>> replication
> > >>>>>> lag and calling it "available" may not be very usable or even
> > desirable
> > >>>>> :)
> > >>>>>> IMHO, since it gives the user no way to reason about the app that is
> > >>>>> going
> > >>>>>> to query this store.
> > >>>>>>
> > >>>>>> So there is definitely a need to distinguish between :  Replication
> > >>>>> catchup
> > >>>>>> while being in fault free state vs Restoration of state when we lose
> > >>>> more
> > >>>>>> than f standbys. This knob is a great starting point towards this.
> > >>>>>>
> > >>>>>> If you agree with some of the explanation above, please feel free to
> > >>>>>> include it in the KIP as well since this is sort of our design
> > >>>> principle
> > >>>>>> here..
> > >>>>>>
> > >>>>>> Small nits :
> > >>>>>>
> > >>>>>> - let's standardize on "standby" instead of "replica", KIP or
> > code,  to
> > >>>>> be
> > >>>>>> consistent with rest of Streams code/docs?
> > >>>>>> - Can we merge KAFKA-8994 into KAFKA-6144 now and close the former?
> > >>>>>> Eventually need to consolidate KAFKA-6555 as well
> > >>>>>> - In the new API, "StreamsMetadataState::allMetadataForKey(boolean
> > >>>>>> enableReplicaServing, String storeName, K key, Serializer<K>
> > >>>>> keySerializer)" Do
> > >>>>>> we really need a per key configuration? or a new StreamsConfig is
> > good
> > >>>>>> enough?
> > >>>>>>
> > >>>>>> On Wed, Oct 16, 2019 at 8:31 PM Navinder Brar
> > >>>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > >>>>>>
> > >>>>>>> @Vinoth, I have incorporated a few of the discussions we have had
> > in
> > >>>>> the
> > >>>>>>> KIP.
> > >>>>>>>
> > >>>>>>> In the current code, t0 and t1 serve queries from Active(Running)
> > >>>>>>> partition. For case t2, we are planning to return
> > >>>> List<StreamsMetadata>
> > >>>>>>> such that it returns <StreamsMetadata(A), StreamsMetadata(B)> so
> > that
> > >>>>> if IQ
> > >>>>>>> fails on A, the replica on B can serve the data by enabling serving
> > >>>>> from
> > >>>>>>> replicas. This still does not solve case t3 and t4 since B has been
> > >>>>>>> promoted to active but it is in Restoring state to catchup till A’s
> > >>>>> last
> > >>>>>>> committed position as we don’t serve from Restoring state in Active
> > >>>>> and new
> > >>>>>>> Replica on R is building itself from scratch. Both these cases can
> > be
> > >>>>>>> solved if we start serving from Restoring state of active as well
> > >>>>> since it
> > >>>>>>> is almost equivalent to previous Active.
> > >>>>>>>
> > >>>>>>> There could be a case where all replicas of a partition become
> > >>>>> unavailable
> > >>>>>>> and active and all replicas of that partition are building
> > themselves
> > >>>>> from
> > >>>>>>> scratch, in this case, the state in Active is far behind even
> > though
> > >>>>> it is
> > >>>>>>> in Restoring state. To cater to such cases that we don’t serve from
> > >>>>> this
> > >>>>>>> state we can either add another state before Restoring or check the
> > >>>>>>> difference between last committed offset and current position. Only
> > >>>> if
> > >>>>> it
> > >>>>>>> is within a permissible range (say 10000) we will serve from
> > >>>> Restoring
> > >>>>> the
> > >>>>>>> state of Active.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>>    On Wednesday, 16 October, 2019, 10:01:35 pm IST, Vinoth Chandar
> > <
> > >>>>>>> vchan...@confluent.io> wrote:
> > >>>>>>>
> > >>>>>>>  Thanks for the updates on the KIP, Navinder!
> > >>>>>>>
> > >>>>>>> Few comments
> > >>>>>>>
> > >>>>>>> - AssignmentInfo is not public API?. But we will change it and thus
> > >>>>> need to
> > >>>>>>> increment the version and test for version_probing etc. Good to
> > >>>>> separate
> > >>>>>>> that from StreamsMetadata changes (which is public API)
> > >>>>>>> - From what I see, there is going to be choice between the
> > following
> > >>>>>>>
> > >>>>>>>  A) introducing a new *KafkaStreams::allMetadataForKey() *API that
> > >>>>>>> potentially returns List<StreamsMetadata> ordered from most upto
> > date
> > >>>>> to
> > >>>>>>> least upto date replicas. Today we cannot fully implement this
> > >>>>> ordering,
> > >>>>>>> since all we know is which hosts are active and which are standbys.
> > >>>>>>> However, this aligns well with the future. KIP-441 adds the lag
> > >>>>> information
> > >>>>>>> to the rebalancing protocol. We could also sort replicas based on
> > the
> > >>>>>>> report lags eventually. This is fully backwards compatible with
> > >>>>> existing
> > >>>>>>> clients. Only drawback I see is the naming of the existing method
> > >>>>>>> KafkaStreams::metadataForKey, not conveying the distinction that it
> > >>>>> simply
> > >>>>>>> returns the active replica i.e allMetadataForKey.get(0).
> > >>>>>>>  B) Change KafkaStreams::metadataForKey() to return a List. Its a
> > >>>>> breaking
> > >>>>>>> change.
> > >>>>>>>
> > >>>>>>> I prefer A, since none of the semantics/behavior changes for
> > existing
> > >>>>>>> users. Love to hear more thoughts. Can we also work this into the
> > >>>> KIP?
> > >>>>>>> I already implemented A to unblock myself for now. Seems feasible
> > to
> > >>>>> do.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Tue, Oct 15, 2019 at 12:21 PM Vinoth Chandar <
> > >>>> vchan...@confluent.io
> > >>>>>>
> > >>>>>>> wrote:
> > >>>>>>>
> > >>>>>>>>>> I get your point. But suppose there is a replica which has just
> > >>>>> become
> > >>>>>>>> active, so in that case replica will still be building itself from
> > >>>>>>> scratch
> > >>>>>>>> and this active will go to restoring state till it catches up with
> > >>>>>>> previous
> > >>>>>>>> active, wouldn't serving from a restoring active make more sense
> > >>>>> than a
> > >>>>>>>> replica in such case.
> > >>>>>>>>
> > >>>>>>>> KIP-441 will change this behavior such that promotion to active
> > >>>>> happens
> > >>>>>>>> based on how caught up a replica is. So, once we have that (work
> > >>>>> underway
> > >>>>>>>> already for 2.5 IIUC) and user sets num.standby.replicas > 0, then
> > >>>>> the
> > >>>>>>>> staleness window should not be that long as you describe. IMO if
> > >>>> user
> > >>>>>>> wants
> > >>>>>>>> availability for state, then should configure num.standby.replicas
> > >>>>>
> > >>>>> 0.
> > >>>>>>> If
> > >>>>>>>> not, then on a node loss, few partitions would be unavailable for
> > a
> > >>>>> while
> > >>>>>>>> (there are other ways to bring this window down, which I won't
> > >>>> bring
> > >>>>> in
> > >>>>>>>> here). We could argue for querying a restoring active (say a new
> > >>>> node
> > >>>>>>> added
> > >>>>>>>> to replace a faulty old node) based on AP vs CP principles. But
> > not
> > >>>>> sure
> > >>>>>>>> reading really really old values for the sake of availability is
> > >>>>> useful.
> > >>>>>>> No
> > >>>>>>>> AP data system would be inconsistent for such a long time in
> > >>>>> practice.
> > >>>>>>>>
> > >>>>>>>> So, I still feel just limiting this to standby reads provides best
> > >>>>>>>> semantics.
> > >>>>>>>>
> > >>>>>>>> Just my 2c. Would love to see what others think as well.
> > >>>>>>>>
> > >>>>>>>> On Tue, Oct 15, 2019 at 5:34 AM Navinder Brar
> > >>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi Vinoth,
> > >>>>>>>>> Thanks for the feedback.
> > >>>>>>>>>  Can we link the JIRA, discussion thread also to the KIP.>>
> > Added.
> > >>>>>>>>> Based on the discussion on KAFKA-6144, I was under the impression
> > >>>>> that
> > >>>>>>>>> this KIP is also going to cover exposing of the standby
> > >>>> information
> > >>>>> in
> > >>>>>>>>> StreamsMetadata and thus subsume KAFKA-8994 . That would require
> > a
> > >>>>>>> public
> > >>>>>>>>> API change?>> Sure, I can add changes for 8994 in this KIP and
> > >>>> link
> > >>>>>>>>> KAFKA-6144 to KAFKA-8994 as well.
> > >>>>>>>>>  KIP seems to be focussing on restoration when a new node is
> > >>>> added.
> > >>>>>>>>> KIP-441 is underway and has some major changes proposed for this.
> > >>>> It
> > >>>>>>> would
> > >>>>>>>>> be good to clarify dependencies if any. Without KIP-441, I am not
> > >>>>> very
> > >>>>>>> sure
> > >>>>>>>>> if we should allow reads from nodes in RESTORING state, which
> > >>>> could
> > >>>>>>> amount
> > >>>>>>>>> to many minutes/few hours of stale reads?  This is different from
> > >>>>>>> allowing
> > >>>>>>>>> querying standby replicas, which could be mostly caught up and
> > the
> > >>>>>>>>> staleness window could be much smaller/tolerable. (once again the
> > >>>>> focus
> > >>>>>>> on
> > >>>>>>>>> KAFKA-8994).>> I get your point. But suppose there is a replica
> > >>>>> which
> > >>>>>>> has
> > >>>>>>>>> just become active, so in that case replica will still be
> > building
> > >>>>>>> itself
> > >>>>>>>>> from scratch and this active will go to restoring state till it
> > >>>>> catches
> > >>>>>>> up
> > >>>>>>>>> with previous active, wouldn't serving from a restoring active
> > >>>> make
> > >>>>> more
> > >>>>>>>>> sense than a replica in such case.
> > >>>>>>>>>
> > >>>>>>>>> Finally, we may need to introduce a configuration to control
> > this.
> > >>>>> Some
> > >>>>>>>>> users may prefer errors to stale data. Can we also add it to the
> > >>>>> KIP?>>
> > >>>>>>>>> Will add this.
> > >>>>>>>>>
> > >>>>>>>>> Regards,
> > >>>>>>>>> Navinder
> > >>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>> On2019/10/14 16:56:49, Vinoth Chandar <v...@confluent.io>wrote:
> > >>>>>>>>>
> > >>>>>>>>>> Hi Navinder,>
> > >>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>> Thanks for sharing the KIP! Few thoughts>
> > >>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>> - Can we link the JIRA, discussion thread also to the KIP>
> > >>>>>>>>>
> > >>>>>>>>>> - Based on the discussion on KAFKA-6144, I was under the
> > >>>> impression
> > >>>>>>>>> that>
> > >>>>>>>>>
> > >>>>>>>>>> this KIP is also going to cover exposing of the standby
> > >>>>> information in>
> > >>>>>>>>>
> > >>>>>>>>>> StreamsMetadata and thus subsume KAFKA-8994 . That would require
> > >>>> a
> > >>>>>>>>> public>
> > >>>>>>>>>
> > >>>>>>>>>> API change?>
> > >>>>>>>>>
> > >>>>>>>>>> - KIP seems to be focussing on restoration when a new node is
> > >>>>> added.>
> > >>>>>>>>>
> > >>>>>>>>>> KIP-441 is underway and has some major changes proposed for
> > this.
> > >>>>> It
> > >>>>>>>>> would>
> > >>>>>>>>>
> > >>>>>>>>>> be good to clarify dependencies if any. Without KIP-441, I am
> > not
> > >>>>> very
> > >>>>>>>>> sure>
> > >>>>>>>>>
> > >>>>>>>>>> if we should allow reads from nodes in RESTORING state, which
> > >>>> could
> > >>>>>>>>> amount>
> > >>>>>>>>>
> > >>>>>>>>>> to many minutes/few hours of stale reads?  This is different
> > >>>>>>>>> fromallowing>
> > >>>>>>>>>
> > >>>>>>>>>> querying standby replicas, which could be mostly caught up and
> > >>>> the>
> > >>>>>>>>>
> > >>>>>>>>>> staleness window could be much smaller/tolerable. (once again
> > the
> > >>>>> focus
> > >>>>>>>>> on>
> > >>>>>>>>>
> > >>>>>>>>>> KAFKA-8994)>
> > >>>>>>>>>
> > >>>>>>>>>> - Finally, we may need to introduce a configuration to control
> > >>>>> this.
> > >>>>>>>>> Some>
> > >>>>>>>>>
> > >>>>>>>>>> users may prefer errors to stale data. Can we also add it to the
> > >>>>> KIP?>
> > >>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>> Thanks>
> > >>>>>>>>>
> > >>>>>>>>>> Vinoth>
> > >>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>> On Sun, Oct 13, 2019 at 3:31 PM Navinder Brar>
> > >>>>>>>>>
> > >>>>>>>>>> <na...@yahoo.com.invalid>wrote:>
> > >>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>>> Hi,>
> > >>>>>>>>>
> > >>>>>>>>>>> Starting a discussion on the KIP to Allow state stores to serve
> > >>>>>>> stale>
> > >>>>>>>>>
> > >>>>>>>>>>> reads during rebalance(>
> > >>>>>>>>>
> > >>>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> > >>>>>>>>>>
> > >>>>>>>>>
> > >>>>>>>>>>> ).>
> > >>>>>>>>>
> > >>>>>>>>>>> Thanks & Regards,Navinder>
> > >>>>>>>>>
> > >>>>>>>>>>> LinkedIn>
> > >>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>> --
> > >>>> -- Guozhang
> > >>>>
> > >>>
> > >>
> > >
> >

Reply via email to