Hey Vinoth,

I started going over the KIP again yesterday. There are a lot of
updates, and I didn't finish my feedback in one day. I'm working on it
now.

Thanks,
John

On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar <vchan...@confluent.io> wrote:
>
> Wondering if anyone has thoughts on these changes? I liked that the new
> metadata fetch APIs provide all the information at once with consistent
> naming..
>
> Any guidance on what you would like to be discussed or fleshed out more
> before we call a VOTE?
>
> On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar
> <navinder_b...@yahoo.com.invalid> wrote:
>
> > Hi,
> > We have made some edits in the KIP(
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance)
> > after due deliberation on the agreed design to support the new query
> > design. This includes the new public API to query offset/time lag
> > information and other details related to querying standby tasks which have
> > come up after thinking of thorough details.
> >
> >
> >
> >    - Addition of new config, “lag.fetch.interval.ms” to configure the
> > interval of time/offset lag
> >    - Addition of new class StoreLagInfo to store the periodically obtained
> > time/offset lag
> >    - Addition of two new functions in KafkaStreams, List<StoreLagInfo>
> > allLagInfo() and List<StoreLagInfo> lagInfoForStore(String storeName) to
> > return the lag information for an instance and a store respectively
> >    - Addition of new class KeyQueryMetadata. We need topicPartition for
> > each key to be matched with the lag API for the topic partition. One way is
> > to add new functions and fetch topicPartition from StreamsMetadataState but
> > we thought having one call and fetching StreamsMetadata and topicPartition
> > is more cleaner.
> >    -
> > Renaming partitionsForHost to activePartitionsForHost in 
> > StreamsMetadataState
> > and partitionsByHostState to activePartitionsByHostState
> > in StreamsPartitionAssignor
> >    - We have also added the pseudo code of how all the changes will exist
> > together and support the new querying APIs
> >
> > Please let me know if anything is pending now, before a vote can be
> > started on this.   On Saturday, 26 October, 2019, 05:41:44 pm IST, Navinder
> > Brar <navinder_b...@yahoo.com.invalid> wrote:
> >
> >  >> Since there are two soft votes for separate active/standby API
> > methods, I also change my position on that. Fine with 2 separate
> > methods. Once we remove the lag information from these APIs, returning a
> > List is less attractive, since the ordering has no special meaning now.
> > Agreed, now that we are not returning lag, I am also sold on having two
> > separate functions. We already have one which returns streamsMetadata for
> > active tasks, and now we can add another one for standbys.
> >
> >
> >
> >     On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth Chandar <
> > vchan...@confluent.io> wrote:
> >
> >  +1 to Sophie's suggestion. Having both lag in terms of time and offsets is
> > good and makes for a more complete API.
> >
> > Since there are two soft votes for separate active/standby API methods, I
> > also change my position on that. Fine with 2 separate methods.
> > Once we remove the lag information from these APIs, returning a List is
> > less attractive, since the ordering has no special meaning now.
> >
> > >> lag in offsets vs time: Having both, as suggested by Sophie would of
> > course be best. What is a little unclear to me is, how in details are we
> > going to compute both?
> > @navinder may be next step is to flesh out these details and surface any
> > larger changes we need to make if need be.
> >
> > Any other details we need to cover, before a VOTE can be called on this?
> >
> >
> > On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck <bbej...@gmail.com> wrote:
> >
> > > I am jumping in a little late here.
> > >
> > > Overall I agree with the proposal to push decision making on what/how to
> > > query in the query layer.
> > >
> > > For point 5 from above, I'm slightly in favor of having a new method,
> > > "standbyMetadataForKey()" or something similar.
> > > Because even if we return all tasks in one list, the user will still have
> > > to perform some filtering to separate the different tasks, so I don't
> > feel
> > > making two calls is a burden, and IMHO makes things more transparent for
> > > the user.
> > > If the final vote is for using an "isActive" field, I'm good with that as
> > > well.
> > >
> > > Just my 2 cents.
> > >
> > > On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
> > > <navinder_b...@yahoo.com.invalid> wrote:
> > >
> > > > I think now we are aligned on almost all the design parts. Summarising
> > > > below what has been discussed above and we have a general consensus on.
> > > >
> > > >
> > > >    - Rather than broadcasting lag across all nodes at rebalancing/with
> > > the
> > > > heartbeat, we will just return a list of all available standby’s in the
> > > > system and the user can make IQ query any of those nodes which will
> > > return
> > > > the response, and the lag and offset time. Based on which user can
> > decide
> > > > if he wants to return the response back or call another standby.
> > > >    -  The current metadata query frequency will not change. It will be
> > > the
> > > > same as it does now, i.e. before each query.
> > > >
> > > >    -  For fetching list<StreamsMetadata> in StreamsMetadataState.java
> > and
> > > > List<QueryableStoreProvider> in StreamThreadStateStoreProvider.java
> > > (which
> > > > will return all active stores which are running/restoring and replica
> > > > stores which are running), we will add new functions and not disturb
> > the
> > > > existing functions
> > > >
> > > >    - There is no need to add new StreamsConfig for implementing this
> > KIP
> > > >
> > > >    - We will add standbyPartitionsByHost in AssignmentInfo and
> > > > StreamsMetadataState which would change the existing rebuildMetadata()
> > > and
> > > > setPartitionsByHostState()
> > > >
> > > >
> > > >
> > > > If anyone has any more concerns please feel free to add. Post this I
> > will
> > > > be initiating a vote.
> > > > ~Navinder
> > > >
> > > >    On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias J. Sax <
> > > > matth...@confluent.io> wrote:
> > > >
> > > >  Just to close the loop @Vinoth:
> > > >
> > > > > 1. IIUC John intends to add (or we can do this in this KIP) lag
> > > > information
> > > > > to AssignmentInfo, which gets sent to every participant.
> > > >
> > > > As explained by John, currently KIP-441 plans to only report the
> > > > information to the leader. But I guess, with the new proposal to not
> > > > broadcast this information anyway, this concern is invalidated anyway
> > > >
> > > > > 2. At-least I was under the assumption that it can be called per
> > query,
> > > > > since the API docs don't seem to suggest otherwise. Do you see any
> > > > > potential issues if we call this every query? (we should benchmark
> > this
> > > > > nonetheless)
> > > >
> > > > I did not see a real issue if people refresh the metadata frequently,
> > > > because it would be a local call. My main point was, that this would
> > > > change the current usage pattern of the API, and we would clearly need
> > > > to communicate this change. Similar to (1), this concern in invalidated
> > > > anyway.
> > > >
> > > >
> > > > @John: I think it's a great idea to get rid of reporting lag, and
> > > > pushing the decision making process about "what to query" into the
> > query
> > > > serving layer itself. This simplifies the overall design of this KIP
> > > > significantly, and actually aligns very well with the idea that Kafka
> > > > Streams (as it is a library) should only provide the basic building
> > > > block. Many of my raised questions are invalided by this.
> > > >
> > > >
> > > >
> > > > Some questions are still open though:
> > > >
> > > > > 10) Do we need to distinguish between active(restoring) and standby
> > > > > tasks? Or could be treat both as the same?
> > > >
> > > >
> > > > @Vinoth: about (5). I see your point about multiple calls vs a single
> > > > call. I still slightly prefer multiple calls, but it's highly
> > subjective
> > > > and I would also be fine to add an #isActive() method. Would be good
> > the
> > > > get feedback from others.
> > > >
> > > >
> > > > For (14), ie, lag in offsets vs time: Having both, as suggested by
> > > > Sophie would of course be best. What is a little unclear to me is, how
> > > > in details are we going to compute both?
> > > >
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > > On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote:
> > > > > Just to chime in on the "report lag vs timestamp difference" issue, I
> > > > would
> > > > > actually advocate for both. As mentioned already, time difference is
> > > > > probably a lot easier and/or more useful to reason about in terms of
> > > > > "freshness"
> > > > > of the state. But in the case when all queried stores are far behind,
> > > lag
> > > > > could
> > > > > be used to estimate the recovery velocity. You can then get a (pretty
> > > > rough)
> > > > > idea of when a store might be ready, and wait until around then to
> > > query
> > > > > again.
> > > > >
> > > > > On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >
> > > > >> I think I agree with John's recent reasoning as well: instead of
> > > letting
> > > > >> the storeMetadataAPI to return the staleness information, letting
> > the
> > > > >> client to query either active or standby and letting standby query
> > > > response
> > > > >> to include both the values + timestamp (or lag, as in diffs of
> > > > timestamps)
> > > > >> would actually be more intuitive -- not only the streams client is
> > > > simpler,
> > > > >> from user's perspective they also do not need to periodically
> > refresh
> > > > their
> > > > >> staleness information from the client, but only need to make
> > decisions
> > > > on
> > > > >> the fly whenever they need to query.
> > > > >>
> > > > >> Again the standby replica then need to know the current active
> > task's
> > > > >> timestamp, which can be found from the log end record's encoded
> > > > timestamp;
> > > > >> today we standby tasks do not read that specific record, but only
> > > > refresh
> > > > >> its knowledge on the log end offset, but I think refreshing the
> > latest
> > > > >> record timestamp is not a very bad request to add on the standby
> > > > replicas.
> > > > >>
> > > > >>
> > > > >> Guozhang
> > > > >>
> > > > >>
> > > > >> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar <
> > vchan...@confluent.io
> > > >
> > > > >> wrote:
> > > > >>
> > > > >>> +1 As someone implementing a query routing layer, there is already
> > a
> > > > need
> > > > >>> to have mechanisms in place to do healthchecks/failure detection to
> > > > >> detect
> > > > >>> failures for queries, while Streams rebalancing eventually kicks in
> > > the
> > > > >>> background.
> > > > >>> So, pushing this complexity to the IQ client app keeps Streams
> > > simpler
> > > > as
> > > > >>> well. IQs will be potentially issues at an order of magnitude more
> > > > >>> frequently and it can achieve good freshness for the lag
> > information.
> > > > >>>
> > > > >>> I would like to add however, that we would also need to introduce
> > > apis
> > > > in
> > > > >>> KafkaStreams class, for obtaining lag information for all stores
> > > local
> > > > to
> > > > >>> that host. This is for the IQs to relay back with the response/its
> > > own
> > > > >>> heartbeat mechanism.
> > > > >>>
> > > > >>> On Thu, Oct 24, 2019 at 3:12 PM John Roesler <j...@confluent.io>
> > > > wrote:
> > > > >>>
> > > > >>>> Hi all,
> > > > >>>>
> > > > >>>> I've been mulling about this KIP, and I think I was on the wrong
> > > track
> > > > >>>> earlier with regard to task lags. Tl;dr: I don't think we should
> > add
> > > > >>>> lags at all to the metadata API (and also not to the
> > AssignmentInfo
> > > > >>>> protocol message).
> > > > >>>>
> > > > >>>> Like I mentioned early on, reporting lag via
> > > > >>>> SubscriptionInfo/AssignmentInfo would only work while rebalances
> > are
> > > > >>>> happening. Once the group stabilizes, no members would be notified
> > > of
> > > > >>>> each others' lags anymore. I had been thinking that the solution
> > > would
> > > > >>>> be the heartbeat proposal I mentioned earlier, but that proposal
> > > would
> > > > >>>> have reported the heartbeats of the members only to the leader
> > > member
> > > > >>>> (the one who makes assignments). To be useful in the context of
> > > _this_
> > > > >>>> KIP, we would also have to report the lags in the heartbeat
> > > responses
> > > > >>>> to of _all_ members. This is a concern to be because now _all_ the
> > > > >>>> lags get reported to _all_ the members on _every_ heartbeat... a
> > lot
> > > > >>>> of chatter.
> > > > >>>>
> > > > >>>> Plus, the proposal for KIP-441 is only to report the lags of each
> > > > >>>> _task_. This is the sum of the lags of all the stores in the
> > tasks.
> > > > >>>> But this would be insufficient for KIP-535. For this kip, we would
> > > > >>>> want the lag specifically of the store we want to query. So this
> > > > >>>> means, we have to report the lags of all the stores of all the
> > > members
> > > > >>>> to every member... even more chatter!
> > > > >>>>
> > > > >>>> The final nail in the coffin to me is that IQ clients would have
> > to
> > > > >>>> start refreshing their metadata quite frequently to stay up to
> > date
> > > on
> > > > >>>> the lags, which adds even more overhead to the system.
> > > > >>>>
> > > > >>>> Consider a strawman alternative: we bring KIP-535 back to
> > extending
> > > > >>>> the metadata API to tell the client the active and standby
> > replicas
> > > > >>>> for the key in question (not including and "staleness/lag"
> > > > >>>> restriction, just returning all the replicas). Then, the client
> > > picks
> > > > >>>> a replica and sends the query. The server returns the current lag
> > > > >>>> along with the response (maybe in an HTML header or something).
> > > Then,
> > > > >>>> the client keeps a map of its last observed lags for each replica,
> > > and
> > > > >>>> uses this information to prefer fresher replicas.
> > > > >>>>
> > > > >>>> OR, if it wants only to query the active replica, it would throw
> > an
> > > > >>>> error on any lag response greater than zero, refreshes its
> > metadata
> > > by
> > > > >>>> re-querying the metadata API, and tries again with the current
> > > active
> > > > >>>> replica.
> > > > >>>>
> > > > >>>> This way, the lag information will be super fresh for the client,
> > > and
> > > > >>>> we keep the Metadata API / Assignment,Subscription / and Heartbeat
> > > as
> > > > >>>> slim as possible.
> > > > >>>>
> > > > >>>> Side note: I do think that some time soon, we'll have to add a
> > > library
> > > > >>>> for IQ server/clients. I think that this logic will start to get
> > > > >>>> pretty complex.
> > > > >>>>
> > > > >>>> I hope this thinking is reasonably clear!
> > > > >>>> Thanks again,
> > > > >>>> -John
> > > > >>>>
> > > > >>>> Does that
> > > > >>>>
> > > > >>>> On Wed, Oct 23, 2019 at 10:16 AM Vinoth Chandar <
> > > > vchan...@confluent.io
> > > > >>>
> > > > >>>> wrote:
> > > > >>>>>
> > > > >>>>> Responding to the points raised by Matthias
> > > > >>>>>
> > > > >>>>> 1. IIUC John intends to add (or we can do this in this KIP) lag
> > > > >>>> information
> > > > >>>>> to AssignmentInfo, which gets sent to every participant.
> > > > >>>>>
> > > > >>>>> 2. At-least I was under the assumption that it can be called per
> > > > >> query,
> > > > >>>>> since the API docs don't seem to suggest otherwise. Do you see
> > any
> > > > >>>>> potential issues if we call this every query? (we should
> > benchmark
> > > > >> this
> > > > >>>>> nonetheless)
> > > > >>>>>
> > > > >>>>> 4. Agree. metadataForKey() implicitly would return the active
> > host
> > > > >>>> metadata
> > > > >>>>> (as it was before). We should also document this in that APIs
> > > > >> javadoc,
> > > > >>>>> given we have another method(s) that returns more host metadata
> > > now.
> > > > >>>>>
> > > > >>>>> 5.  While I see the point, the app/caller has to make two
> > different
> > > > >>> APIs
> > > > >>>>> calls to obtain active/standby and potentially do the same set of
> > > > >>>> operation
> > > > >>>>> to query the state. I personally still like a method like
> > > isActive()
> > > > >>>>> better, but don't have strong opinions.
> > > > >>>>>
> > > > >>>>> 9. If we do expose the lag information, could we just leave it
> > upto
> > > > >> to
> > > > >>>> the
> > > > >>>>> caller to decide whether it errors out or not and not make the
> > > > >> decision
> > > > >>>>> within Streams? i.e we don't need a new config
> > > > >>>>>
> > > > >>>>> 14. +1 . If it's easier to do right away. We started with number
> > of
> > > > >>>>> records, following the lead from KIP-441
> > > > >>>>>
> > > > >>>>> On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar
> > > > >>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > > > >>>>>
> > > > >>>>>>
> > > > >>>>>> Thanks, everyone for taking a look. Some very cool ideas have
> > > flown
> > > > >>> in.
> > > > >>>>>>
> > > > >>>>>>>> There was a follow-on idea I POCed to continuously share lag
> > > > >>>>>> information in the heartbeat protocol+1 that would be great, I
> > > will
> > > > >>>> update
> > > > >>>>>> the KIP assuming this work will finish soon
> > > > >>>>>>>> I think that adding a new method to StreamsMetadataState and
> > > > >>>>>> deprecating the existing method isthe best way to go; we just
> > > can't
> > > > >>>> change
> > > > >>>>>> the return types of any existing methods.+1 on this, we will add
> > > > >> new
> > > > >>>>>> methods for users who would be interested in querying back a
> > list
> > > > >> of
> > > > >>>>>> possible options to query from and leave the current function
> > > > >>>>>> getStreamsMetadataForKey() untouched for users who want absolute
> > > > >>>>>> consistency.
> > > > >>>>>>>> why not just always return all available metadata (including
> > > > >>>>>> active/standby or lag) and let the caller decide to which node
> > > they
> > > > >>>> want to
> > > > >>>>>> route the query+1. I think this makes sense as from a user
> > > > >> standpoint
> > > > >>>> there
> > > > >>>>>> is no difference b/w an active and a standby if both have same
> > > lag,
> > > > >>>> Infact
> > > > >>>>>> users would be able to use this API to reduce query load on
> > > > >> actives,
> > > > >>> so
> > > > >>>>>> returning all available options along with the current lag in
> > each
> > > > >>>> would
> > > > >>>>>> make sense and leave it to user how they want to use this data.
> > > > >> This
> > > > >>>> has
> > > > >>>>>> another added advantage. If a user queries any random machine
> > for
> > > a
> > > > >>>> key and
> > > > >>>>>> that machine has a replica for the partition(where key belongs)
> > > > >> user
> > > > >>>> might
> > > > >>>>>> choose to serve the data from there itself(if it doesn’t lag
> > much)
> > > > >>>> rather
> > > > >>>>>> than finding the active and making an IQ to that. This would
> > save
> > > > >>> some
> > > > >>>>>> critical time in serving for some applications.
> > > > >>>>>>>> Adding the lag in terms of timestamp diff comparing the
> > > > >> committed
> > > > >>>>>> offset.+1 on this, I think it’s more readable. But as John said
> > > the
> > > > >>>>>> function allMetadataForKey() is just returning the possible
> > > options
> > > > >>>> from
> > > > >>>>>> where users can query a key, so we can even drop the parameter
> > > > >>>>>> enableReplicaServing/tolerableDataStaleness and just return all
> > > the
> > > > >>>>>> streamsMetadata containing that key along with the offset limit.
> > > > >>>>>> Answering the questions posted by Matthias in sequence.
> > > > >>>>>> 1. @John can you please comment on this one.2. Yeah the usage
> > > > >> pattern
> > > > >>>>>> would include querying this prior to every request 3. Will add
> > the
> > > > >>>> changes
> > > > >>>>>> to StreamsMetadata in the KIP, would include changes in
> > > > >>>> rebuildMetadata()
> > > > >>>>>> etc.4. Makes sense, already addressed above5. Is it important
> > from
> > > > >> a
> > > > >>>> user
> > > > >>>>>> perspective if they are querying an  active(processing),
> > > > >>>> active(restoring),
> > > > >>>>>> a standby task if we have away of denoting lag in a readable
> > > manner
> > > > >>>> which
> > > > >>>>>> kind of signifies the user that this is the best node to query
> > the
> > > > >>>> fresh
> > > > >>>>>> data.6. Yes, I intend to return the actives and replicas in the
> > > > >> same
> > > > >>>> return
> > > > >>>>>> list in allMetadataForKey()7. tricky8. yes, we need new
> > functions
> > > > >> to
> > > > >>>> return
> > > > >>>>>> activeRestoring and standbyRunning tasks.9. StreamsConfig
> > doesn’t
> > > > >>> look
> > > > >>>> like
> > > > >>>>>> of much use to me since we are giving all possible options via
> > > this
> > > > >>>>>> function, or they can use existing function
> > > > >>> getStreamsMetadataForKey()
> > > > >>>> and
> > > > >>>>>> get just the active10. I think treat them both the same and let
> > > the
> > > > >>>> lag do
> > > > >>>>>> the talking11. We are just sending them the option to query from
> > > in
> > > > >>>>>> allMetadataForKey(), which doesn’t include any handle. We then
> > > > >> query
> > > > >>>> that
> > > > >>>>>> machine for the key where it calls allStores() and tries to find
> > > > >> the
> > > > >>>> task
> > > > >>>>>> in activeRunning/activeRestoring/standbyRunning and adds the
> > store
> > > > >>>> handle
> > > > >>>>>> here. 12. Need to verify, but during the exact point when store
> > is
> > > > >>>> closed
> > > > >>>>>> to transition it from restoring to running the queries will
> > fail.
> > > > >> The
> > > > >>>>>> caller in such case can have their own configurable retries to
> > > > >> check
> > > > >>>> again
> > > > >>>>>> or try the replica if a call fails to active13. I think KIP-216
> > is
> > > > >>>> working
> > > > >>>>>> on those lines, we might not need few of those exceptions since
> > > now
> > > > >>> the
> > > > >>>>>> basic idea of this KIP is to support IQ during rebalancing.14.
> > > > >>>> Addressed
> > > > >>>>>> above, agreed it looks more readable.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>    On Tuesday, 22 October, 2019, 08:39:07 pm IST, Matthias J.
> > Sax
> > > > >> <
> > > > >>>>>> matth...@confluent.io> wrote:
> > > > >>>>>>
> > > > >>>>>>  One more thought:
> > > > >>>>>>
> > > > >>>>>> 14) Is specifying the allowed lag in number of records a useful
> > > way
> > > > >>> for
> > > > >>>>>> users to declare how stale an instance is allowed to be? Would
> > it
> > > > >> be
> > > > >>>>>> more intuitive for users to specify the allowed lag in time
> > units
> > > > >>>> (would
> > > > >>>>>> event time or processing time be better)? It seems hard for
> > users
> > > > >> to
> > > > >>>>>> reason how "fresh" a store really is when number of records is
> > > > >> used.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> -Matthias
> > > > >>>>>>
> > > > >>>>>> On 10/21/19 9:02 PM, Matthias J. Sax wrote:
> > > > >>>>>>> Some more follow up thoughts:
> > > > >>>>>>>
> > > > >>>>>>> 11) If we get a store handle of an active(restoring) task, and
> > > > >> the
> > > > >>>> task
> > > > >>>>>>> transits to running, does the store handle become invalid and a
> > > > >> new
> > > > >>>> one
> > > > >>>>>>> must be retrieved? Or can we "switch it out" underneath -- for
> > > > >> this
> > > > >>>>>>> case, how does the user know when they start to query the
> > > > >>> up-to-date
> > > > >>>>>> state?
> > > > >>>>>>>
> > > > >>>>>>> 12) Standby tasks will have the store open in regular mode,
> > while
> > > > >>>>>>> active(restoring) tasks open stores in "upgrade mode" for more
> > > > >>>> efficient
> > > > >>>>>>> bulk loading. When we switch the store into active mode, we
> > close
> > > > >>> it
> > > > >>>> and
> > > > >>>>>>> reopen it. What is the impact if we query the store during
> > > > >> restore?
> > > > >>>> What
> > > > >>>>>>> is the impact if we close the store to transit to running (eg,
> > > > >>> there
> > > > >>>>>>> might be open iterators)?
> > > > >>>>>>>
> > > > >>>>>>> 13) Do we need to introduced new exception types? Compare
> > KIP-216
> > > > >>>>>>> (
> > > > >>>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors
> > > > >>>>>> )
> > > > >>>>>>> that aims to improve the user experience with regard to IQ
> > > > >>>> exceptions.
> > > > >>>>>>>
> > > > >>>>>>>
> > > > >>>>>>> -Matthias
> > > > >>>>>>>
> > > > >>>>>>> On 10/21/19 6:39 PM, Matthias J. Sax wrote:
> > > > >>>>>>>> Thanks for the KIP.
> > > > >>>>>>>>
> > > > >>>>>>>> Couple of comments:
> > > > >>>>>>>>
> > > > >>>>>>>> 1) With regard to KIP-441, my current understanding is that
> > the
> > > > >>> lag
> > > > >>>>>>>> information is only reported to the leader (please correct me
> > > > >> if I
> > > > >>>> am
> > > > >>>>>>>> wrong). This seems to be quite a limitation to actually use
> > the
> > > > >>> lag
> > > > >>>>>>>> information.
> > > > >>>>>>>>
> > > > >>>>>>>> 2) The idea of the metadata API is actually to get metadata
> > once
> > > > >>> and
> > > > >>>>>>>> only refresh the metadata if a store was migrated. The current
> > > > >>>> proposal
> > > > >>>>>>>> would require to get the metadata before each query. The KIP
> > > > >>> should
> > > > >>>>>>>> describe the usage pattern and impact in more detail.
> > > > >>>>>>>>
> > > > >>>>>>>> 3) Currently, the KIP does not list the public API changes in
> > > > >>>> detail.
> > > > >>>>>>>> Please list all methods you intend to deprecate and list all
> > > > >>>> methods you
> > > > >>>>>>>> intend to add (best, using a code-block markup -- compare
> > > > >>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements
> > > > >>>>>>>> as an example)
> > > > >>>>>>>>
> > > > >>>>>>>> 4) Also note (as already pointed out by John), that we cannot
> > > > >> have
> > > > >>>> any
> > > > >>>>>>>> breaking API changes. Thus, the API should be designed in a
> > > > >> fully
> > > > >>>>>>>> backward compatible manner.
> > > > >>>>>>>>
> > > > >>>>>>>> 5) Returning a list of metadata object makes it hard for user
> > to
> > > > >>>> know if
> > > > >>>>>>>> the first object refers to the active(processing),
> > > > >>>> active(restoring), or
> > > > >>>>>>>> a standby task. IMHO, we should be more explicit. For
> > example, a
> > > > >>>>>>>> metadata object could have a flag that one can test via
> > > > >>>> `#isActive()`.
> > > > >>>>>>>> Or maybe even better, we could keep the current API as-is and
> > > > >> add
> > > > >>>>>>>> something like `standbyMetadataForKey()` (and similar methods
> > > > >> for
> > > > >>>>>>>> other). Having just a flag `isActive()` is a little subtle and
> > > > >>>> having
> > > > >>>>>>>> new overloads would make the API much clearer (passing in a
> > > > >>> boolean
> > > > >>>> flag
> > > > >>>>>>>> does not seem to be a nice API).
> > > > >>>>>>>>
> > > > >>>>>>>> 6) Do you intent to return all standby metadata information at
> > > > >>> once,
> > > > >>>>>>>> similar to `allMetadata()` -- seems to be useful.
> > > > >>>>>>>>
> > > > >>>>>>>> 7) Even if the lag information is propagated to all instances,
> > > > >> it
> > > > >>>> will
> > > > >>>>>>>> happen in an async manner. Hence, I am wondering if we should
> > > > >>>> address
> > > > >>>>>>>> this race condition (I think we should). The idea would be to
> > > > >>> check
> > > > >>>> if a
> > > > >>>>>>>> standby/active(restoring) task is actually still within the
> > lag
> > > > >>>> bounds
> > > > >>>>>>>> when a query is executed and we would throw an exception if
> > not.
> > > > >>>>>>>>
> > > > >>>>>>>> 8) The current `KafkaStreams#state()` method only returns a
> > > > >> handle
> > > > >>>> to
> > > > >>>>>>>> stores of active(processing) tasks. How can a user actually
> > get
> > > > >> a
> > > > >>>> handle
> > > > >>>>>>>> to an store of an active(restoring) or standby task for
> > > > >> querying?
> > > > >>>> Seems
> > > > >>>>>>>> we should add a new method to get standby handles? Changing
> > the
> > > > >>>>>>>> semantics to existing `state()` would be possible, but I think
> > > > >>>> adding a
> > > > >>>>>>>> new method is preferable?
> > > > >>>>>>>>
> > > > >>>>>>>> 9) How does the user actually specify the acceptable lag? A
> > > > >> global
> > > > >>>>>>>> config via StreamsConfig (this would be a public API change
> > that
> > > > >>>> needs
> > > > >>>>>>>> to be covered in the KIP)? Or on a per-store or even per-query
> > > > >>>> basis for
> > > > >>>>>>>> more flexibility? We could also have a global setting that is
> > > > >> used
> > > > >>>> as
> > > > >>>>>>>> default and allow to overwrite it on a per-query basis.
> > > > >>>>>>>>
> > > > >>>>>>>> 10) Do we need to distinguish between active(restoring) and
> > > > >>> standby
> > > > >>>>>>>> tasks? Or could be treat both as the same?
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> -Matthias
> > > > >>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>> On 10/21/19 5:40 PM, Vinoth Chandar wrote:
> > > > >>>>>>>>>>> I'm wondering, rather than putting "acceptable lag" into
> > the
> > > > >>>>>>>>> configuration at all, or even making it a parameter on
> > > > >>>>>> `allMetadataForKey`,
> > > > >>>>>>>>> why not just _always_ return all available metadata
> > (including
> > > > >>>>>>>>> active/standby or lag) and let the caller decide to which
> > node
> > > > >>> they
> > > > >>>>>> want to
> > > > >>>>>>>>> route the query?
> > > > >>>>>>>>> +1 on exposing lag information via the APIs. IMO without
> > having
> > > > >>>>>>>>> continuously updated/fresh lag information, its true value
> > as a
> > > > >>>> signal
> > > > >>>>>> for
> > > > >>>>>>>>> query routing decisions is much limited. But we can design
> > the
> > > > >>> API
> > > > >>>>>> around
> > > > >>>>>>>>> this model and iterate? Longer term, we should have
> > > > >> continuously
> > > > >>>>>> shared lag
> > > > >>>>>>>>> information.
> > > > >>>>>>>>>
> > > > >>>>>>>>>>> more general to refactor it to "allMetadataForKey(long
> > > > >>>>>>>>> tolerableDataStaleness, ...)", and when it's set to 0 it
> > means
> > > > >>>> "active
> > > > >>>>>> task
> > > > >>>>>>>>> only".
> > > > >>>>>>>>> +1 IMO if we plan on having `enableReplicaServing`, it makes
> > > > >>> sense
> > > > >>>> to
> > > > >>>>>>>>> generalize based on dataStaleness. This seems complementary
> > to
> > > > >>>>>> exposing the
> > > > >>>>>>>>> lag information itself.
> > > > >>>>>>>>>
> > > > >>>>>>>>>>> This is actually not a public api change at all, and I'm
> > > > >>>> planning to
> > > > >>>>>>>>> implement it asap as a precursor to the rest of KIP-441
> > > > >>>>>>>>> +1 again. Do we have a concrete timeline for when this change
> > > > >>> will
> > > > >>>>>> land on
> > > > >>>>>>>>> master? I would like to get the implementation wrapped up (as
> > > > >>> much
> > > > >>>> as
> > > > >>>>>>>>> possible) by end of the month. :). But I agree this
> > sequencing
> > > > >>>> makes
> > > > >>>>>>>>> sense..
> > > > >>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>> On Mon, Oct 21, 2019 at 2:56 PM Guozhang Wang <
> > > > >>> wangg...@gmail.com>
> > > > >>>>>> wrote:
> > > > >>>>>>>>>
> > > > >>>>>>>>>> Hi Navinder,
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Thanks for the KIP, I have a high level question about the
> > > > >>>> proposed
> > > > >>>>>> API
> > > > >>>>>>>>>> regarding:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> "StreamsMetadataState::allMetadataForKey(boolean
> > > > >>>>>> enableReplicaServing...)"
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> I'm wondering if it's more general to refactor it to
> > > > >>>>>>>>>> "allMetadataForKey(long tolerableDataStaleness, ...)", and
> > > > >> when
> > > > >>>> it's
> > > > >>>>>> set to
> > > > >>>>>>>>>> 0 it means "active task only". Behind the scene, we can have
> > > > >> the
> > > > >>>>>> committed
> > > > >>>>>>>>>> offsets to encode the stream time as well, so that when
> > > > >>> processing
> > > > >>>>>> standby
> > > > >>>>>>>>>> tasks the stream process knows not long the lag in terms of
> > > > >>>> offsets
> > > > >>>>>>>>>> comparing to the committed offset (internally we call it
> > > > >> offset
> > > > >>>>>> limit), but
> > > > >>>>>>>>>> also the lag in terms of timestamp diff comparing the
> > > > >> committed
> > > > >>>>>> offset.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Also encoding the timestamp as part of offset have other
> > > > >>> benefits
> > > > >>>> for
> > > > >>>>>>>>>> improving Kafka Streams time semantics as well, but for
> > > > >> KIP-535
> > > > >>>>>> itself I
> > > > >>>>>>>>>> think it can help giving users a more intuitive interface to
> > > > >>>> reason
> > > > >>>>>> about.
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> Guozhang
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> On Mon, Oct 21, 2019 at 12:30 PM John Roesler <
> > > > >>> j...@confluent.io>
> > > > >>>>>> wrote:
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>> Hey Navinder,
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Thanks for the KIP! I've been reading over the discussion
> > > > >> thus
> > > > >>>> far,
> > > > >>>>>>>>>>> and I have a couple of thoughts to pile on as well:
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> It seems confusing to propose the API in terms of the
> > current
> > > > >>>> system
> > > > >>>>>>>>>>> state, but also propose how the API would look if/when
> > > > >> KIP-441
> > > > >>> is
> > > > >>>>>>>>>>> implemented. It occurs to me that the only part of KIP-441
> > > > >> that
> > > > >>>> would
> > > > >>>>>>>>>>> affect you is the availability of the lag information in
> > the
> > > > >>>>>>>>>>> SubscriptionInfo message. This is actually not a public api
> > > > >>>> change at
> > > > >>>>>>>>>>> all, and I'm planning to implement it asap as a precursor
> > to
> > > > >>> the
> > > > >>>> rest
> > > > >>>>>>>>>>> of KIP-441, so maybe you can just build on top of KIP-441
> > and
> > > > >>>> assume
> > > > >>>>>>>>>>> the lag information will be available. Then you could have
> > a
> > > > >>> more
> > > > >>>>>>>>>>> straightforward proposal (e.g., mention that you'd return
> > the
> > > > >>> lag
> > > > >>>>>>>>>>> information in AssignmentInfo as well as in the
> > > > >> StreamsMetadata
> > > > >>>> in
> > > > >>>>>>>>>>> some form, or make use of it in the API somehow).
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> I'm partially motivated in that former point because it
> > seems
> > > > >>>> like
> > > > >>>>>>>>>>> understanding how callers would bound the staleness for
> > their
> > > > >>> use
> > > > >>>>>> case
> > > > >>>>>>>>>>> is _the_ key point for this KIP. FWIW, I think that adding
> > a
> > > > >>> new
> > > > >>>>>>>>>>> method to StreamsMetadataState and deprecating the existing
> > > > >>>> method is
> > > > >>>>>>>>>>> the best way to go; we just can't change the return types
> > of
> > > > >>> any
> > > > >>>>>>>>>>> existing methods.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> I'm wondering, rather than putting "acceptable lag" into
> > the
> > > > >>>>>>>>>>> configuration at all, or even making it a parameter on
> > > > >>>>>>>>>>> `allMetadataForKey`, why not just _always_ return all
> > > > >> available
> > > > >>>>>>>>>>> metadata (including active/standby or lag) and let the
> > caller
> > > > >>>> decide
> > > > >>>>>>>>>>> to which node they want to route the query? This method
> > isn't
> > > > >>>> making
> > > > >>>>>>>>>>> any queries itself; it's merely telling you where the local
> > > > >>>> Streams
> > > > >>>>>>>>>>> instance _thinks_ the key in question is located. Just
> > > > >>> returning
> > > > >>>> all
> > > > >>>>>>>>>>> available information lets the caller implement any
> > semantics
> > > > >>>> they
> > > > >>>>>>>>>>> desire around querying only active stores, or standbys, or
> > > > >>>> recovering
> > > > >>>>>>>>>>> stores, or whatever.
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> One fly in the ointment, which you may wish to consider if
> > > > >>>> proposing
> > > > >>>>>>>>>>> to use lag information, is that the cluster would only
> > become
> > > > >>>> aware
> > > > >>>>>> of
> > > > >>>>>>>>>>> new lag information during rebalances. Even in the full
> > > > >>>> expression of
> > > > >>>>>>>>>>> KIP-441, this information would stop being propagated when
> > > > >> the
> > > > >>>>>> cluster
> > > > >>>>>>>>>>> achieves a balanced task distribution. There was a
> > follow-on
> > > > >>>> idea I
> > > > >>>>>>>>>>> POCed to continuously share lag information in the
> > heartbeat
> > > > >>>>>> protocol,
> > > > >>>>>>>>>>> which you might be interested in, if you want to make sure
> > > > >> that
> > > > >>>> nodes
> > > > >>>>>>>>>>> are basically _always_ aware of each others' lag on
> > different
> > > > >>>>>>>>>>> partitions: https://github.com/apache/kafka/pull/7096
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> Thanks again!
> > > > >>>>>>>>>>> -John
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> On Sat, Oct 19, 2019 at 6:06 AM Navinder Brar
> > > > >>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Thanks, Vinoth. Looks like we are on the same page. I will
> > > > >> add
> > > > >>>> some
> > > > >>>>>> of
> > > > >>>>>>>>>>> these explanations to the KIP as well. Have assigned the
> > > > >>>> KAFKA-6144
> > > > >>>>>> to
> > > > >>>>>>>>>>> myself and KAFKA-8994 is closed(by you). As suggested, we
> > > > >> will
> > > > >>>>>> replace
> > > > >>>>>>>>>>> "replica" with "standby".
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> In the new API,
> > > > >>> "StreamsMetadataState::allMetadataForKey(boolean
> > > > >>>>>>>>>>> enableReplicaServing, String storeName, K key,
> > Serializer<K>
> > > > >>>>>>>>>>> keySerializer)" Do we really need a per key configuration?
> > > > >> or a
> > > > >>>> new
> > > > >>>>>>>>>>> StreamsConfig is good enough?>> Coming from experience,
> > when
> > > > >>>> teams
> > > > >>>>>> are
> > > > >>>>>>>>>>> building a platform with Kafka Streams and these API's
> > serve
> > > > >>>> data to
> > > > >>>>>>>>>>> multiple teams, we can't have a generalized config that
> > says
> > > > >>> as a
> > > > >>>>>>>>>> platform
> > > > >>>>>>>>>>> we will support stale reads or not. It should be the choice
> > > > >> of
> > > > >>>>>> someone
> > > > >>>>>>>>>> who
> > > > >>>>>>>>>>> is calling the API's to choose whether they are ok with
> > stale
> > > > >>>> reads
> > > > >>>>>> or
> > > > >>>>>>>>>> not.
> > > > >>>>>>>>>>> Makes sense?
> > > > >>>>>>>>>>>>    On Thursday, 17 October, 2019, 11:56:02 pm IST, Vinoth
> > > > >>>> Chandar <
> > > > >>>>>>>>>>> vchan...@confluent.io> wrote:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>  Looks like we are covering ground :)
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Only if it is within a permissible  range(say 10000) we
> > > > >> will
> > > > >>>> serve
> > > > >>>>>>>>>> from
> > > > >>>>>>>>>>>> Restoring state of active.
> > > > >>>>>>>>>>>> +1 on having a knob like this.. My reasoning is as
> > follows.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Looking at the Streams state as a read-only distributed kv
> > > > >>>> store.
> > > > >>>>>> With
> > > > >>>>>>>>>>>> num_standby = f , we should be able to tolerate f failures
> > > > >> and
> > > > >>>> if
> > > > >>>>>> there
> > > > >>>>>>>>>>> is
> > > > >>>>>>>>>>>> a f+1' failure, the system should be unavailable.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> A) So with num_standby=0, the system should be unavailable
> > > > >>> even
> > > > >>>> if
> > > > >>>>>>>>>> there
> > > > >>>>>>>>>>> is
> > > > >>>>>>>>>>>> 1 failure and thats my argument for not allowing querying
> > in
> > > > >>>>>>>>>> restoration
> > > > >>>>>>>>>>>> state, esp in this case it will be a total rebuild of the
> > > > >>> state
> > > > >>>>>> (which
> > > > >>>>>>>>>>> IMO
> > > > >>>>>>>>>>>> cannot be considered a normal fault free operational
> > state).
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> B) Even there are standby's, say num_standby=2, if the
> > user
> > > > >>>> decides
> > > > >>>>>> to
> > > > >>>>>>>>>>> shut
> > > > >>>>>>>>>>>> down all 3 instances, then only outcome should be
> > > > >>> unavailability
> > > > >>>>>> until
> > > > >>>>>>>>>>> all
> > > > >>>>>>>>>>>> of them come back or state is rebuilt on other nodes in
> > the
> > > > >>>>>> cluster. In
> > > > >>>>>>>>>>>> normal operations, f <= 2 and when a failure does happen
> > we
> > > > >>> can
> > > > >>>> then
> > > > >>>>>>>>>>> either
> > > > >>>>>>>>>>>> choose to be C over A and fail IQs until replication is
> > > > >> fully
> > > > >>>>>> caught up
> > > > >>>>>>>>>>> or
> > > > >>>>>>>>>>>> choose A over C by serving in restoring state as long as
> > lag
> > > > >>> is
> > > > >>>>>>>>>> minimal.
> > > > >>>>>>>>>>> If
> > > > >>>>>>>>>>>> even with f=1 say, all the standbys are lagging a lot due
> > to
> > > > >>>> some
> > > > >>>>>>>>>> issue,
> > > > >>>>>>>>>>>> then that should be considered a failure since that is
> > > > >>> different
> > > > >>>>>> from
> > > > >>>>>>>>>>>> normal/expected operational mode. Serving reads with
> > > > >> unbounded
> > > > >>>>>>>>>>> replication
> > > > >>>>>>>>>>>> lag and calling it "available" may not be very usable or
> > > > >> even
> > > > >>>>>> desirable
> > > > >>>>>>>>>>> :)
> > > > >>>>>>>>>>>> IMHO, since it gives the user no way to reason about the
> > app
> > > > >>>> that is
> > > > >>>>>>>>>>> going
> > > > >>>>>>>>>>>> to query this store.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> So there is definitely a need to distinguish between :
> > > > >>>> Replication
> > > > >>>>>>>>>>> catchup
> > > > >>>>>>>>>>>> while being in fault free state vs Restoration of state
> > when
> > > > >>> we
> > > > >>>> lose
> > > > >>>>>>>>>> more
> > > > >>>>>>>>>>>> than f standbys. This knob is a great starting point
> > towards
> > > > >>>> this.
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> If you agree with some of the explanation above, please
> > feel
> > > > >>>> free to
> > > > >>>>>>>>>>>> include it in the KIP as well since this is sort of our
> > > > >> design
> > > > >>>>>>>>>> principle
> > > > >>>>>>>>>>>> here..
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> Small nits :
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> - let's standardize on "standby" instead of "replica", KIP
> > > > >> or
> > > > >>>>>> code,  to
> > > > >>>>>>>>>>> be
> > > > >>>>>>>>>>>> consistent with rest of Streams code/docs?
> > > > >>>>>>>>>>>> - Can we merge KAFKA-8994 into KAFKA-6144 now and close
> > the
> > > > >>>> former?
> > > > >>>>>>>>>>>> Eventually need to consolidate KAFKA-6555 as well
> > > > >>>>>>>>>>>> - In the new API,
> > > > >>>> "StreamsMetadataState::allMetadataForKey(boolean
> > > > >>>>>>>>>>>> enableReplicaServing, String storeName, K key,
> > Serializer<K>
> > > > >>>>>>>>>>> keySerializer)" Do
> > > > >>>>>>>>>>>> we really need a per key configuration? or a new
> > > > >> StreamsConfig
> > > > >>>> is
> > > > >>>>>> good
> > > > >>>>>>>>>>>> enough?
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>> On Wed, Oct 16, 2019 at 8:31 PM Navinder Brar
> > > > >>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> @Vinoth, I have incorporated a few of the discussions we
> > > > >> have
> > > > >>>> had
> > > > >>>>>> in
> > > > >>>>>>>>>>> the
> > > > >>>>>>>>>>>>> KIP.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> In the current code, t0 and t1 serve queries from
> > > > >>>> Active(Running)
> > > > >>>>>>>>>>>>> partition. For case t2, we are planning to return
> > > > >>>>>>>>>> List<StreamsMetadata>
> > > > >>>>>>>>>>>>> such that it returns <StreamsMetadata(A),
> > > > >> StreamsMetadata(B)>
> > > > >>>> so
> > > > >>>>>> that
> > > > >>>>>>>>>>> if IQ
> > > > >>>>>>>>>>>>> fails on A, the replica on B can serve the data by
> > enabling
> > > > >>>> serving
> > > > >>>>>>>>>>> from
> > > > >>>>>>>>>>>>> replicas. This still does not solve case t3 and t4 since
> > B
> > > > >>> has
> > > > >>>> been
> > > > >>>>>>>>>>>>> promoted to active but it is in Restoring state to
> > catchup
> > > > >>>> till A’s
> > > > >>>>>>>>>>> last
> > > > >>>>>>>>>>>>> committed position as we don’t serve from Restoring state
> > > > >> in
> > > > >>>> Active
> > > > >>>>>>>>>>> and new
> > > > >>>>>>>>>>>>> Replica on R is building itself from scratch. Both these
> > > > >>> cases
> > > > >>>> can
> > > > >>>>>> be
> > > > >>>>>>>>>>>>> solved if we start serving from Restoring state of active
> > > > >> as
> > > > >>>> well
> > > > >>>>>>>>>>> since it
> > > > >>>>>>>>>>>>> is almost equivalent to previous Active.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> There could be a case where all replicas of a partition
> > > > >>> become
> > > > >>>>>>>>>>> unavailable
> > > > >>>>>>>>>>>>> and active and all replicas of that partition are
> > building
> > > > >>>>>> themselves
> > > > >>>>>>>>>>> from
> > > > >>>>>>>>>>>>> scratch, in this case, the state in Active is far behind
> > > > >> even
> > > > >>>>>> though
> > > > >>>>>>>>>>> it is
> > > > >>>>>>>>>>>>> in Restoring state. To cater to such cases that we don’t
> > > > >>> serve
> > > > >>>> from
> > > > >>>>>>>>>>> this
> > > > >>>>>>>>>>>>> state we can either add another state before Restoring or
> > > > >>>> check the
> > > > >>>>>>>>>>>>> difference between last committed offset and current
> > > > >>> position.
> > > > >>>> Only
> > > > >>>>>>>>>> if
> > > > >>>>>>>>>>> it
> > > > >>>>>>>>>>>>> is within a permissible range (say 10000) we will serve
> > > > >> from
> > > > >>>>>>>>>> Restoring
> > > > >>>>>>>>>>> the
> > > > >>>>>>>>>>>>> state of Active.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>    On Wednesday, 16 October, 2019, 10:01:35 pm IST,
> > Vinoth
> > > > >>>> Chandar
> > > > >>>>>> <
> > > > >>>>>>>>>>>>> vchan...@confluent.io> wrote:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>  Thanks for the updates on the KIP, Navinder!
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> Few comments
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> - AssignmentInfo is not public API?. But we will change
> > it
> > > > >>> and
> > > > >>>> thus
> > > > >>>>>>>>>>> need to
> > > > >>>>>>>>>>>>> increment the version and test for version_probing etc.
> > > > >> Good
> > > > >>> to
> > > > >>>>>>>>>>> separate
> > > > >>>>>>>>>>>>> that from StreamsMetadata changes (which is public API)
> > > > >>>>>>>>>>>>> - From what I see, there is going to be choice between
> > the
> > > > >>>>>> following
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>  A) introducing a new *KafkaStreams::allMetadataForKey()
> > > > >> *API
> > > > >>>> that
> > > > >>>>>>>>>>>>> potentially returns List<StreamsMetadata> ordered from
> > most
> > > > >>>> upto
> > > > >>>>>> date
> > > > >>>>>>>>>>> to
> > > > >>>>>>>>>>>>> least upto date replicas. Today we cannot fully implement
> > > > >>> this
> > > > >>>>>>>>>>> ordering,
> > > > >>>>>>>>>>>>> since all we know is which hosts are active and which are
> > > > >>>> standbys.
> > > > >>>>>>>>>>>>> However, this aligns well with the future. KIP-441 adds
> > the
> > > > >>> lag
> > > > >>>>>>>>>>> information
> > > > >>>>>>>>>>>>> to the rebalancing protocol. We could also sort replicas
> > > > >>> based
> > > > >>>> on
> > > > >>>>>> the
> > > > >>>>>>>>>>>>> report lags eventually. This is fully backwards
> > compatible
> > > > >>> with
> > > > >>>>>>>>>>> existing
> > > > >>>>>>>>>>>>> clients. Only drawback I see is the naming of the
> > existing
> > > > >>>> method
> > > > >>>>>>>>>>>>> KafkaStreams::metadataForKey, not conveying the
> > distinction
> > > > >>>> that it
> > > > >>>>>>>>>>> simply
> > > > >>>>>>>>>>>>> returns the active replica i.e allMetadataForKey.get(0).
> > > > >>>>>>>>>>>>>  B) Change KafkaStreams::metadataForKey() to return a
> > List.
> > > > >>>> Its a
> > > > >>>>>>>>>>> breaking
> > > > >>>>>>>>>>>>> change.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> I prefer A, since none of the semantics/behavior changes
> > > > >> for
> > > > >>>>>> existing
> > > > >>>>>>>>>>>>> users. Love to hear more thoughts. Can we also work this
> > > > >> into
> > > > >>>> the
> > > > >>>>>>>>>> KIP?
> > > > >>>>>>>>>>>>> I already implemented A to unblock myself for now. Seems
> > > > >>>> feasible
> > > > >>>>>> to
> > > > >>>>>>>>>>> do.
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>> On Tue, Oct 15, 2019 at 12:21 PM Vinoth Chandar <
> > > > >>>>>>>>>> vchan...@confluent.io
> > > > >>>>>>>>>>>>
> > > > >>>>>>>>>>>>> wrote:
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> I get your point. But suppose there is a replica which
> > > > >> has
> > > > >>>> just
> > > > >>>>>>>>>>> become
> > > > >>>>>>>>>>>>>> active, so in that case replica will still be building
> > > > >>> itself
> > > > >>>> from
> > > > >>>>>>>>>>>>> scratch
> > > > >>>>>>>>>>>>>> and this active will go to restoring state till it
> > catches
> > > > >>> up
> > > > >>>> with
> > > > >>>>>>>>>>>>> previous
> > > > >>>>>>>>>>>>>> active, wouldn't serving from a restoring active make
> > more
> > > > >>>> sense
> > > > >>>>>>>>>>> than a
> > > > >>>>>>>>>>>>>> replica in such case.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> KIP-441 will change this behavior such that promotion to
> > > > >>>> active
> > > > >>>>>>>>>>> happens
> > > > >>>>>>>>>>>>>> based on how caught up a replica is. So, once we have
> > that
> > > > >>>> (work
> > > > >>>>>>>>>>> underway
> > > > >>>>>>>>>>>>>> already for 2.5 IIUC) and user sets
> > num.standby.replicas >
> > > > >>> 0,
> > > > >>>> then
> > > > >>>>>>>>>>> the
> > > > >>>>>>>>>>>>>> staleness window should not be that long as you
> > describe.
> > > > >>> IMO
> > > > >>>> if
> > > > >>>>>>>>>> user
> > > > >>>>>>>>>>>>> wants
> > > > >>>>>>>>>>>>>> availability for state, then should configure
> > > > >>>> num.standby.replicas
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>> 0.
> > > > >>>>>>>>>>>>> If
> > > > >>>>>>>>>>>>>> not, then on a node loss, few partitions would be
> > > > >>> unavailable
> > > > >>>> for
> > > > >>>>>> a
> > > > >>>>>>>>>>> while
> > > > >>>>>>>>>>>>>> (there are other ways to bring this window down, which I
> > > > >>> won't
> > > > >>>>>>>>>> bring
> > > > >>>>>>>>>>> in
> > > > >>>>>>>>>>>>>> here). We could argue for querying a restoring active
> > > > >> (say a
> > > > >>>> new
> > > > >>>>>>>>>> node
> > > > >>>>>>>>>>>>> added
> > > > >>>>>>>>>>>>>> to replace a faulty old node) based on AP vs CP
> > > > >> principles.
> > > > >>>> But
> > > > >>>>>> not
> > > > >>>>>>>>>>> sure
> > > > >>>>>>>>>>>>>> reading really really old values for the sake of
> > > > >>> availability
> > > > >>>> is
> > > > >>>>>>>>>>> useful.
> > > > >>>>>>>>>>>>> No
> > > > >>>>>>>>>>>>>> AP data system would be inconsistent for such a long
> > time
> > > > >> in
> > > > >>>>>>>>>>> practice.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> So, I still feel just limiting this to standby reads
> > > > >>> provides
> > > > >>>> best
> > > > >>>>>>>>>>>>>> semantics.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> Just my 2c. Would love to see what others think as well.
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 5:34 AM Navinder Brar
> > > > >>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Hi Vinoth,
> > > > >>>>>>>>>>>>>>> Thanks for the feedback.
> > > > >>>>>>>>>>>>>>>  Can we link the JIRA, discussion thread also to the
> > > > >> KIP.>>
> > > > >>>>>> Added.
> > > > >>>>>>>>>>>>>>> Based on the discussion on KAFKA-6144, I was under the
> > > > >>>> impression
> > > > >>>>>>>>>>> that
> > > > >>>>>>>>>>>>>>> this KIP is also going to cover exposing of the standby
> > > > >>>>>>>>>> information
> > > > >>>>>>>>>>> in
> > > > >>>>>>>>>>>>>>> StreamsMetadata and thus subsume KAFKA-8994 . That
> > would
> > > > >>>> require
> > > > >>>>>> a
> > > > >>>>>>>>>>>>> public
> > > > >>>>>>>>>>>>>>> API change?>> Sure, I can add changes for 8994 in this
> > > > >> KIP
> > > > >>>> and
> > > > >>>>>>>>>> link
> > > > >>>>>>>>>>>>>>> KAFKA-6144 to KAFKA-8994 as well.
> > > > >>>>>>>>>>>>>>>  KIP seems to be focussing on restoration when a new
> > node
> > > > >>> is
> > > > >>>>>>>>>> added.
> > > > >>>>>>>>>>>>>>> KIP-441 is underway and has some major changes proposed
> > > > >> for
> > > > >>>> this.
> > > > >>>>>>>>>> It
> > > > >>>>>>>>>>>>> would
> > > > >>>>>>>>>>>>>>> be good to clarify dependencies if any. Without
> > KIP-441,
> > > > >> I
> > > > >>>> am not
> > > > >>>>>>>>>>> very
> > > > >>>>>>>>>>>>> sure
> > > > >>>>>>>>>>>>>>> if we should allow reads from nodes in RESTORING state,
> > > > >>> which
> > > > >>>>>>>>>> could
> > > > >>>>>>>>>>>>> amount
> > > > >>>>>>>>>>>>>>> to many minutes/few hours of stale reads?  This is
> > > > >>> different
> > > > >>>> from
> > > > >>>>>>>>>>>>> allowing
> > > > >>>>>>>>>>>>>>> querying standby replicas, which could be mostly caught
> > > > >> up
> > > > >>>> and
> > > > >>>>>> the
> > > > >>>>>>>>>>>>>>> staleness window could be much smaller/tolerable. (once
> > > > >>>> again the
> > > > >>>>>>>>>>> focus
> > > > >>>>>>>>>>>>> on
> > > > >>>>>>>>>>>>>>> KAFKA-8994).>> I get your point. But suppose there is a
> > > > >>>> replica
> > > > >>>>>>>>>>> which
> > > > >>>>>>>>>>>>> has
> > > > >>>>>>>>>>>>>>> just become active, so in that case replica will still
> > be
> > > > >>>>>> building
> > > > >>>>>>>>>>>>> itself
> > > > >>>>>>>>>>>>>>> from scratch and this active will go to restoring state
> > > > >>> till
> > > > >>>> it
> > > > >>>>>>>>>>> catches
> > > > >>>>>>>>>>>>> up
> > > > >>>>>>>>>>>>>>> with previous active, wouldn't serving from a restoring
> > > > >>>> active
> > > > >>>>>>>>>> make
> > > > >>>>>>>>>>> more
> > > > >>>>>>>>>>>>>>> sense than a replica in such case.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Finally, we may need to introduce a configuration to
> > > > >>> control
> > > > >>>>>> this.
> > > > >>>>>>>>>>> Some
> > > > >>>>>>>>>>>>>>> users may prefer errors to stale data. Can we also add
> > it
> > > > >>> to
> > > > >>>> the
> > > > >>>>>>>>>>> KIP?>>
> > > > >>>>>>>>>>>>>>> Will add this.
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> Regards,
> > > > >>>>>>>>>>>>>>> Navinder
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>> On2019/10/14 16:56:49, Vinoth Chandar <
> > v...@confluent.io
> > > > >>>>> wrote:
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Hi Navinder,>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Thanks for sharing the KIP! Few thoughts>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> - Can we link the JIRA, discussion thread also to the
> > > > >> KIP>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> - Based on the discussion on KAFKA-6144, I was under
> > the
> > > > >>>>>>>>>> impression
> > > > >>>>>>>>>>>>>>> that>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> this KIP is also going to cover exposing of the
> > standby
> > > > >>>>>>>>>>> information in>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> StreamsMetadata and thus subsume KAFKA-8994 . That
> > would
> > > > >>>> require
> > > > >>>>>>>>>> a
> > > > >>>>>>>>>>>>>>> public>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> API change?>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> - KIP seems to be focussing on restoration when a new
> > > > >> node
> > > > >>>> is
> > > > >>>>>>>>>>> added.>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> KIP-441 is underway and has some major changes
> > proposed
> > > > >>> for
> > > > >>>>>> this.
> > > > >>>>>>>>>>> It
> > > > >>>>>>>>>>>>>>> would>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> be good to clarify dependencies if any. Without
> > > > >> KIP-441, I
> > > > >>>> am
> > > > >>>>>> not
> > > > >>>>>>>>>>> very
> > > > >>>>>>>>>>>>>>> sure>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> if we should allow reads from nodes in RESTORING
> > state,
> > > > >>>> which
> > > > >>>>>>>>>> could
> > > > >>>>>>>>>>>>>>> amount>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> to many minutes/few hours of stale reads?  This is
> > > > >>> different
> > > > >>>>>>>>>>>>>>> fromallowing>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> querying standby replicas, which could be mostly
> > caught
> > > > >> up
> > > > >>>> and
> > > > >>>>>>>>>> the>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> staleness window could be much smaller/tolerable.
> > (once
> > > > >>>> again
> > > > >>>>>> the
> > > > >>>>>>>>>>> focus
> > > > >>>>>>>>>>>>>>> on>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> KAFKA-8994)>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> - Finally, we may need to introduce a configuration to
> > > > >>>> control
> > > > >>>>>>>>>>> this.
> > > > >>>>>>>>>>>>>>> Some>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> users may prefer errors to stale data. Can we also add
> > > > >> it
> > > > >>>> to the
> > > > >>>>>>>>>>> KIP?>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Thanks>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> Vinoth>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> On Sun, Oct 13, 2019 at 3:31 PM Navinder Brar>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>> <na...@yahoo.com.invalid>wrote:>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Hi,>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Starting a discussion on the KIP to Allow state
> > stores
> > > > >> to
> > > > >>>> serve
> > > > >>>>>>>>>>>>> stale>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> reads during rebalance(>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> ).>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> Thanks & Regards,Navinder>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>> LinkedIn>
> > > > >>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>>
> > > > >>>>>>>>>>>>>
> > > > >>>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>>
> > > > >>>>>>>>>> --
> > > > >>>>>>>>>> -- Guozhang
> > > > >>>>>>>>>>
> > > > >>>>>>>>>
> > > > >>>>>>>>
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> -- Guozhang
> > > > >>
> > > > >
> > > >
> > >

Reply via email to