3. Right now, we still get the topic partitions assigned as a part of the
top level Assignment object (the one that wraps AssignmentInfo) and use
that to convert taskIds back. This list of only contains assignments for
that particular instance. Attempting to also reverse map for "all" the
tasksIds in the streams cluster i.e all the topic partitions in these
global assignment maps was what was problematic. By explicitly sending the
global assignment maps as actual topic partitions,  group coordinator (i.e
the leader that computes the assignment's ) is able to consistently enforce
its view of the topic metadata. Still don't think doing such a change that
forces you to reconsider semantics, is not needed to save bits on wire. May
be we can discuss this separately from this KIP?

4. There needs to be some caching/interval somewhere though since we don't
want to make 1 kafka read per 1 IQ potentially. But I think its a valid
suggestion, to make this call just synchronous and leave the caching or how
often you want to call to the application. Would it be good to then break
up the APIs for time and offset based lag?  We can obtain offset based lag
for free? Only incur the overhead of reading kafka if we want time
based lags?

On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman <sop...@confluent.io>
wrote:

> Adding on to John's response to 3), can you clarify when and why exactly we
> cannot
> convert between taskIds and partitions? If that's really the case I don't
> feel confident
> that the StreamsPartitionAssignor is not full of bugs...
>
> It seems like it currently just encodes a list of all partitions (the
> assignment) and also
> a list of the corresponding task ids, duplicated to ensure each partition
> has the corresponding
> taskId at the same offset into the list. Why is that problematic?
>
>
> On Fri, Nov 1, 2019 at 12:39 PM John Roesler <j...@confluent.io> wrote:
>
> > Thanks, all, for considering the points!
> >
> > 3. Interesting. I have a vague recollection of that... Still, though,
> > it seems a little fishy. After all, we return the assignments
> > themselves as task ids, and the members have to map these to topic
> > partitions in order to configure themselves properly. If it's too
> > complicated to get this right, then how do we know that Streams is
> > computing the correct partitions at all?
> >
> > 4. How about just checking the log-end timestamp when you call the
> > method? Then, when you get an answer, it's as fresh as it could
> > possibly be. And as a user you have just one, obvious, "knob" to
> > configure how much overhead you want to devote to checking... If you
> > want to call the broker API less frequently, you just call the Streams
> > API less frequently. And you don't have to worry about the
> > relationship between your invocations of that method and the config
> > setting (e.g., you'll never get a negative number, which you could if
> > you check the log-end timestamp less frequently than you check the
> > lag).
> >
> > Thanks,
> > -John
> >
> > On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar
> > <navinder_b...@yahoo.com.invalid> wrote:
> > >
> > > Thanks John for going through this.
> > >
> > >    - +1, makes sense
> > >    - +1, no issues there
> > >    - Yeah the initial patch I had submitted for K-7149(
> > https://github.com/apache/kafka/pull/6935) to reduce assignmentInfo
> > object had taskIds but the merged PR had similar size according to Vinoth
> > and it was simpler so if the end result is of same size, it would not
> make
> > sense to pivot from dictionary and again move to taskIDs.
> > >    - Not sure about what a good default would be if we don't have a
> > configurable setting. This gives the users the flexibility to the users
> to
> > serve their requirements as at the end of the day it would take CPU
> cycles.
> > I am ok with starting it with a default and see how it goes based upon
> > feedback.
> > >
> > > Thanks,
> > > Navinder
> > >     On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth Chandar <
> > vchan...@confluent.io> wrote:
> > >
> > >  1. Was trying to spell them out separately. but makes sense for
> > > readability. done
> > >
> > > 2. No I immediately agree :) .. makes sense. @navinder?
> > >
> > > 3. I actually attempted only sending taskIds while working on
> KAFKA-7149.
> > > Its non-trivial to handle edges cases resulting from newly added topic
> > > partitions and wildcarded topic entries. I ended up simplifying it to
> > just
> > > dictionary encoding the topic names to reduce size. We can apply the
> same
> > > technique here for this map. Additionally, we could also dictionary
> > encode
> > > HostInfo, given its now repeated twice. I think this would save more
> > space
> > > than having a flag per topic partition entry. Lmk if you are okay with
> > > this.
> > >
> > > 4. This opens up a good discussion. Given we support time lag estimates
> > > also, we need to read the tail record of the changelog periodically
> > (unlike
> > > offset lag, which we can potentially piggyback on metadata in
> > > ConsumerRecord IIUC). we thought we should have a config that control
> how
> > > often this read happens? Let me know if there is a simple way to get
> > > timestamp value of the tail record that we are missing.
> > >
> > > On Thu, Oct 31, 2019 at 12:58 PM John Roesler <j...@confluent.io>
> wrote:
> > >
> > > > Hey Navinder,
> > > >
> > > > Thanks for updating the KIP, it's a lot easier to see the current
> > > > state of the proposal now.
> > > >
> > > > A few remarks:
> > > > 1. I'm sure it was just an artifact of revisions, but you have two
> > > > separate sections where you list additions to the KafkaStreams
> > > > interface. Can you consolidate those so we can see all the additions
> > > > at once?
> > > >
> > > > 2. For messageLagEstimate, can I suggest "offsetLagEstimate" instead,
> > > > to be clearer that we're specifically measuring a number of offsets?
> > > > If you don't immediately agree, then I'd at least point out that we
> > > > usually refer to elements of Kafka topics as "records", not
> > > > "messages", so "recordLagEstimate" might be more appropriate.
> > > >
> > > > 3. The proposal mentions adding a map of the standby _partitions_ for
> > > > each host to AssignmentInfo. I assume this is designed to mirror the
> > > > existing "partitionsByHost" map. To keep the size of these metadata
> > > > messages down, maybe we can consider making two changes:
> > > > (a) for both actives and standbys, encode the _task ids_ instead of
> > > > _partitions_. Every member of the cluster has a copy of the topology,
> > > > so they can convert task ids into specific partitions on their own,
> > > > and task ids are only (usually) three characters.
> > > > (b) instead of encoding two maps (hostinfo -> actives AND hostinfo ->
> > > > standbys), which requires serializing all the hostinfos twice, maybe
> > > > we can pack them together in one map with a structured value
> (hostinfo
> > > > -> [actives,standbys]).
> > > > Both of these ideas still require bumping the protocol version to 6,
> > > > and they basically mean we drop the existing `PartitionsByHost` field
> > > > and add a new `TasksByHost` field with the structured value I
> > > > mentioned.
> > > >
> > > > 4. Can we avoid adding the new "lag refresh" config? The lags would
> > > > necessarily be approximate anyway, so adding the config seems to
> > > > increase the operational complexity of the system for little actual
> > > > benefit.
> > > >
> > > > Thanks for the pseudocode, by the way, it really helps visualize how
> > > > these new interfaces would play together. And thanks again for the
> > > > update!
> > > > -John
> > > >
> > > > On Thu, Oct 31, 2019 at 2:41 PM John Roesler <j...@confluent.io>
> > wrote:
> > > > >
> > > > > Hey Vinoth,
> > > > >
> > > > > I started going over the KIP again yesterday. There are a lot of
> > > > > updates, and I didn't finish my feedback in one day. I'm working on
> > it
> > > > > now.
> > > > >
> > > > > Thanks,
> > > > > John
> > > > >
> > > > > On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar <
> > vchan...@confluent.io>
> > > > wrote:
> > > > > >
> > > > > > Wondering if anyone has thoughts on these changes? I liked that
> > the new
> > > > > > metadata fetch APIs provide all the information at once with
> > consistent
> > > > > > naming..
> > > > > >
> > > > > > Any guidance on what you would like to be discussed or fleshed
> out
> > more
> > > > > > before we call a VOTE?
> > > > > >
> > > > > > On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar
> > > > > > <navinder_b...@yahoo.com.invalid> wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > We have made some edits in the KIP(
> > > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> > > > )
> > > > > > > after due deliberation on the agreed design to support the new
> > query
> > > > > > > design. This includes the new public API to query offset/time
> lag
> > > > > > > information and other details related to querying standby tasks
> > > > which have
> > > > > > > come up after thinking of thorough details.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >    - Addition of new config, “lag.fetch.interval.ms” to
> > configure
> > > > the
> > > > > > > interval of time/offset lag
> > > > > > >    - Addition of new class StoreLagInfo to store the
> periodically
> > > > obtained
> > > > > > > time/offset lag
> > > > > > >    - Addition of two new functions in KafkaStreams,
> > > > List<StoreLagInfo>
> > > > > > > allLagInfo() and List<StoreLagInfo> lagInfoForStore(String
> > > > storeName) to
> > > > > > > return the lag information for an instance and a store
> > respectively
> > > > > > >    - Addition of new class KeyQueryMetadata. We need
> > topicPartition
> > > > for
> > > > > > > each key to be matched with the lag API for the topic
> partition.
> > One
> > > > way is
> > > > > > > to add new functions and fetch topicPartition from
> > > > StreamsMetadataState but
> > > > > > > we thought having one call and fetching StreamsMetadata and
> > > > topicPartition
> > > > > > > is more cleaner.
> > > > > > >    -
> > > > > > > Renaming partitionsForHost to activePartitionsForHost in
> > > > StreamsMetadataState
> > > > > > > and partitionsByHostState to activePartitionsByHostState
> > > > > > > in StreamsPartitionAssignor
> > > > > > >    - We have also added the pseudo code of how all the changes
> > will
> > > > exist
> > > > > > > together and support the new querying APIs
> > > > > > >
> > > > > > > Please let me know if anything is pending now, before a vote
> can
> > be
> > > > > > > started on this.  On Saturday, 26 October, 2019, 05:41:44 pm
> IST,
> > > > Navinder
> > > > > > > Brar <navinder_b...@yahoo.com.invalid> wrote:
> > > > > > >
> > > > > > >  >> Since there are two soft votes for separate active/standby
> > API
> > > > > > > methods, I also change my position on that. Fine with 2
> separate
> > > > > > > methods. Once we remove the lag information from these APIs,
> > > > returning a
> > > > > > > List is less attractive, since the ordering has no special
> > meaning
> > > > now.
> > > > > > > Agreed, now that we are not returning lag, I am also sold on
> > having
> > > > two
> > > > > > > separate functions. We already have one which returns
> > > > streamsMetadata for
> > > > > > > active tasks, and now we can add another one for standbys.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >    On Saturday, 26 October, 2019, 03:55:16 am IST, Vinoth
> > Chandar <
> > > > > > > vchan...@confluent.io> wrote:
> > > > > > >
> > > > > > >  +1 to Sophie's suggestion. Having both lag in terms of time
> and
> > > > offsets is
> > > > > > > good and makes for a more complete API.
> > > > > > >
> > > > > > > Since there are two soft votes for separate active/standby API
> > > > methods, I
> > > > > > > also change my position on that. Fine with 2 separate methods.
> > > > > > > Once we remove the lag information from these APIs, returning a
> > List
> > > > is
> > > > > > > less attractive, since the ordering has no special meaning now.
> > > > > > >
> > > > > > > >> lag in offsets vs time: Having both, as suggested by Sophie
> > would
> > > > of
> > > > > > > course be best. What is a little unclear to me is, how in
> details
> > > > are we
> > > > > > > going to compute both?
> > > > > > > @navinder may be next step is to flesh out these details and
> > surface
> > > > any
> > > > > > > larger changes we need to make if need be.
> > > > > > >
> > > > > > > Any other details we need to cover, before a VOTE can be called
> > on
> > > > this?
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck <bbej...@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > I am jumping in a little late here.
> > > > > > > >
> > > > > > > > Overall I agree with the proposal to push decision making on
> > > > what/how to
> > > > > > > > query in the query layer.
> > > > > > > >
> > > > > > > > For point 5 from above, I'm slightly in favor of having a new
> > > > method,
> > > > > > > > "standbyMetadataForKey()" or something similar.
> > > > > > > > Because even if we return all tasks in one list, the user
> will
> > > > still have
> > > > > > > > to perform some filtering to separate the different tasks,
> so I
> > > > don't
> > > > > > > feel
> > > > > > > > making two calls is a burden, and IMHO makes things more
> > > > transparent for
> > > > > > > > the user.
> > > > > > > > If the final vote is for using an "isActive" field, I'm good
> > with
> > > > that as
> > > > > > > > well.
> > > > > > > >
> > > > > > > > Just my 2 cents.
> > > > > > > >
> > > > > > > > On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
> > > > > > > > <navinder_b...@yahoo.com.invalid> wrote:
> > > > > > > >
> > > > > > > > > I think now we are aligned on almost all the design parts.
> > > > Summarising
> > > > > > > > > below what has been discussed above and we have a general
> > > > consensus on.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >    - Rather than broadcasting lag across all nodes at
> > > > rebalancing/with
> > > > > > > > the
> > > > > > > > > heartbeat, we will just return a list of all available
> > standby’s
> > > > in the
> > > > > > > > > system and the user can make IQ query any of those nodes
> > which
> > > > will
> > > > > > > > return
> > > > > > > > > the response, and the lag and offset time. Based on which
> > user
> > > > can
> > > > > > > decide
> > > > > > > > > if he wants to return the response back or call another
> > standby.
> > > > > > > > >    -  The current metadata query frequency will not change.
> > It
> > > > will be
> > > > > > > > the
> > > > > > > > > same as it does now, i.e. before each query.
> > > > > > > > >
> > > > > > > > >    -  For fetching list<StreamsMetadata> in
> > > > StreamsMetadataState.java
> > > > > > > and
> > > > > > > > > List<QueryableStoreProvider> in
> > > > StreamThreadStateStoreProvider.java
> > > > > > > > (which
> > > > > > > > > will return all active stores which are running/restoring
> and
> > > > replica
> > > > > > > > > stores which are running), we will add new functions and
> not
> > > > disturb
> > > > > > > the
> > > > > > > > > existing functions
> > > > > > > > >
> > > > > > > > >    - There is no need to add new StreamsConfig for
> > implementing
> > > > this
> > > > > > > KIP
> > > > > > > > >
> > > > > > > > >    - We will add standbyPartitionsByHost in AssignmentInfo
> > and
> > > > > > > > > StreamsMetadataState which would change the existing
> > > > rebuildMetadata()
> > > > > > > > and
> > > > > > > > > setPartitionsByHostState()
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > If anyone has any more concerns please feel free to add.
> Post
> > > > this I
> > > > > > > will
> > > > > > > > > be initiating a vote.
> > > > > > > > > ~Navinder
> > > > > > > > >
> > > > > > > > >    On Friday, 25 October, 2019, 12:05:29 pm IST, Matthias
> J.
> > Sax
> > > > <
> > > > > > > > > matth...@confluent.io> wrote:
> > > > > > > > >
> > > > > > > > >  Just to close the loop @Vinoth:
> > > > > > > > >
> > > > > > > > > > 1. IIUC John intends to add (or we can do this in this
> > KIP) lag
> > > > > > > > > information
> > > > > > > > > > to AssignmentInfo, which gets sent to every participant.
> > > > > > > > >
> > > > > > > > > As explained by John, currently KIP-441 plans to only
> report
> > the
> > > > > > > > > information to the leader. But I guess, with the new
> > proposal to
> > > > not
> > > > > > > > > broadcast this information anyway, this concern is
> > invalidated
> > > > anyway
> > > > > > > > >
> > > > > > > > > > 2. At-least I was under the assumption that it can be
> > called
> > > > per
> > > > > > > query,
> > > > > > > > > > since the API docs don't seem to suggest otherwise. Do
> you
> > see
> > > > any
> > > > > > > > > > potential issues if we call this every query? (we should
> > > > benchmark
> > > > > > > this
> > > > > > > > > > nonetheless)
> > > > > > > > >
> > > > > > > > > I did not see a real issue if people refresh the metadata
> > > > frequently,
> > > > > > > > > because it would be a local call. My main point was, that
> > this
> > > > would
> > > > > > > > > change the current usage pattern of the API, and we would
> > > > clearly need
> > > > > > > > > to communicate this change. Similar to (1), this concern in
> > > > invalidated
> > > > > > > > > anyway.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > @John: I think it's a great idea to get rid of reporting
> > lag, and
> > > > > > > > > pushing the decision making process about "what to query"
> > into
> > > > the
> > > > > > > query
> > > > > > > > > serving layer itself. This simplifies the overall design of
> > this
> > > > KIP
> > > > > > > > > significantly, and actually aligns very well with the idea
> > that
> > > > Kafka
> > > > > > > > > Streams (as it is a library) should only provide the basic
> > > > building
> > > > > > > > > block. Many of my raised questions are invalided by this.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Some questions are still open though:
> > > > > > > > >
> > > > > > > > > > 10) Do we need to distinguish between active(restoring)
> and
> > > > standby
> > > > > > > > > > tasks? Or could be treat both as the same?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > @Vinoth: about (5). I see your point about multiple calls
> vs
> > a
> > > > single
> > > > > > > > > call. I still slightly prefer multiple calls, but it's
> highly
> > > > > > > subjective
> > > > > > > > > and I would also be fine to add an #isActive() method.
> Would
> > be
> > > > good
> > > > > > > the
> > > > > > > > > get feedback from others.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > For (14), ie, lag in offsets vs time: Having both, as
> > suggested
> > > > by
> > > > > > > > > Sophie would of course be best. What is a little unclear to
> > me
> > > > is, how
> > > > > > > > > in details are we going to compute both?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > -Matthias
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote:
> > > > > > > > > > Just to chime in on the "report lag vs timestamp
> > difference"
> > > > issue, I
> > > > > > > > > would
> > > > > > > > > > actually advocate for both. As mentioned already, time
> > > > difference is
> > > > > > > > > > probably a lot easier and/or more useful to reason about
> in
> > > > terms of
> > > > > > > > > > "freshness"
> > > > > > > > > > of the state. But in the case when all queried stores are
> > far
> > > > behind,
> > > > > > > > lag
> > > > > > > > > > could
> > > > > > > > > > be used to estimate the recovery velocity. You can then
> > get a
> > > > (pretty
> > > > > > > > > rough)
> > > > > > > > > > idea of when a store might be ready, and wait until
> around
> > > > then to
> > > > > > > > query
> > > > > > > > > > again.
> > > > > > > > > >
> > > > > > > > > > On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang <
> > > > wangg...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> I think I agree with John's recent reasoning as well:
> > instead
> > > > of
> > > > > > > > letting
> > > > > > > > > >> the storeMetadataAPI to return the staleness
> information,
> > > > letting
> > > > > > > the
> > > > > > > > > >> client to query either active or standby and letting
> > standby
> > > > query
> > > > > > > > > response
> > > > > > > > > >> to include both the values + timestamp (or lag, as in
> > diffs of
> > > > > > > > > timestamps)
> > > > > > > > > >> would actually be more intuitive -- not only the streams
> > > > client is
> > > > > > > > > simpler,
> > > > > > > > > >> from user's perspective they also do not need to
> > periodically
> > > > > > > refresh
> > > > > > > > > their
> > > > > > > > > >> staleness information from the client, but only need to
> > make
> > > > > > > decisions
> > > > > > > > > on
> > > > > > > > > >> the fly whenever they need to query.
> > > > > > > > > >>
> > > > > > > > > >> Again the standby replica then need to know the current
> > active
> > > > > > > task's
> > > > > > > > > >> timestamp, which can be found from the log end record's
> > > > encoded
> > > > > > > > > timestamp;
> > > > > > > > > >> today we standby tasks do not read that specific record,
> > but
> > > > only
> > > > > > > > > refresh
> > > > > > > > > >> its knowledge on the log end offset, but I think
> > refreshing
> > > > the
> > > > > > > latest
> > > > > > > > > >> record timestamp is not a very bad request to add on the
> > > > standby
> > > > > > > > > replicas.
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> Guozhang
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar <
> > > > > > > vchan...@confluent.io
> > > > > > > > >
> > > > > > > > > >> wrote:
> > > > > > > > > >>
> > > > > > > > > >>> +1 As someone implementing a query routing layer, there
> > is
> > > > already
> > > > > > > a
> > > > > > > > > need
> > > > > > > > > >>> to have mechanisms in place to do healthchecks/failure
> > > > detection to
> > > > > > > > > >> detect
> > > > > > > > > >>> failures for queries, while Streams rebalancing
> > eventually
> > > > kicks in
> > > > > > > > the
> > > > > > > > > >>> background.
> > > > > > > > > >>> So, pushing this complexity to the IQ client app keeps
> > > > Streams
> > > > > > > > simpler
> > > > > > > > > as
> > > > > > > > > >>> well. IQs will be potentially issues at an order of
> > > > magnitude more
> > > > > > > > > >>> frequently and it can achieve good freshness for the
> lag
> > > > > > > information.
> > > > > > > > > >>>
> > > > > > > > > >>> I would like to add however, that we would also need to
> > > > introduce
> > > > > > > > apis
> > > > > > > > > in
> > > > > > > > > >>> KafkaStreams class, for obtaining lag information for
> all
> > > > stores
> > > > > > > > local
> > > > > > > > > to
> > > > > > > > > >>> that host. This is for the IQs to relay back with the
> > > > response/its
> > > > > > > > own
> > > > > > > > > >>> heartbeat mechanism.
> > > > > > > > > >>>
> > > > > > > > > >>> On Thu, Oct 24, 2019 at 3:12 PM John Roesler <
> > > > j...@confluent.io>
> > > > > > > > > wrote:
> > > > > > > > > >>>
> > > > > > > > > >>>> Hi all,
> > > > > > > > > >>>>
> > > > > > > > > >>>> I've been mulling about this KIP, and I think I was on
> > the
> > > > wrong
> > > > > > > > track
> > > > > > > > > >>>> earlier with regard to task lags. Tl;dr: I don't think
> > we
> > > > should
> > > > > > > add
> > > > > > > > > >>>> lags at all to the metadata API (and also not to the
> > > > > > > AssignmentInfo
> > > > > > > > > >>>> protocol message).
> > > > > > > > > >>>>
> > > > > > > > > >>>> Like I mentioned early on, reporting lag via
> > > > > > > > > >>>> SubscriptionInfo/AssignmentInfo would only work while
> > > > rebalances
> > > > > > > are
> > > > > > > > > >>>> happening. Once the group stabilizes, no members would
> > be
> > > > notified
> > > > > > > > of
> > > > > > > > > >>>> each others' lags anymore. I had been thinking that
> the
> > > > solution
> > > > > > > > would
> > > > > > > > > >>>> be the heartbeat proposal I mentioned earlier, but
> that
> > > > proposal
> > > > > > > > would
> > > > > > > > > >>>> have reported the heartbeats of the members only to
> the
> > > > leader
> > > > > > > > member
> > > > > > > > > >>>> (the one who makes assignments). To be useful in the
> > > > context of
> > > > > > > > _this_
> > > > > > > > > >>>> KIP, we would also have to report the lags in the
> > heartbeat
> > > > > > > > responses
> > > > > > > > > >>>> to of _all_ members. This is a concern to be because
> now
> > > > _all_ the
> > > > > > > > > >>>> lags get reported to _all_ the members on _every_
> > > > heartbeat... a
> > > > > > > lot
> > > > > > > > > >>>> of chatter.
> > > > > > > > > >>>>
> > > > > > > > > >>>> Plus, the proposal for KIP-441 is only to report the
> > lags
> > > > of each
> > > > > > > > > >>>> _task_. This is the sum of the lags of all the stores
> > in the
> > > > > > > tasks.
> > > > > > > > > >>>> But this would be insufficient for KIP-535. For this
> > kip,
> > > > we would
> > > > > > > > > >>>> want the lag specifically of the store we want to
> > query. So
> > > > this
> > > > > > > > > >>>> means, we have to report the lags of all the stores of
> > all
> > > > the
> > > > > > > > members
> > > > > > > > > >>>> to every member... even more chatter!
> > > > > > > > > >>>>
> > > > > > > > > >>>> The final nail in the coffin to me is that IQ clients
> > would
> > > > have
> > > > > > > to
> > > > > > > > > >>>> start refreshing their metadata quite frequently to
> > stay up
> > > > to
> > > > > > > date
> > > > > > > > on
> > > > > > > > > >>>> the lags, which adds even more overhead to the system.
> > > > > > > > > >>>>
> > > > > > > > > >>>> Consider a strawman alternative: we bring KIP-535 back
> > to
> > > > > > > extending
> > > > > > > > > >>>> the metadata API to tell the client the active and
> > standby
> > > > > > > replicas
> > > > > > > > > >>>> for the key in question (not including and
> > "staleness/lag"
> > > > > > > > > >>>> restriction, just returning all the replicas). Then,
> the
> > > > client
> > > > > > > > picks
> > > > > > > > > >>>> a replica and sends the query. The server returns the
> > > > current lag
> > > > > > > > > >>>> along with the response (maybe in an HTML header or
> > > > something).
> > > > > > > > Then,
> > > > > > > > > >>>> the client keeps a map of its last observed lags for
> > each
> > > > replica,
> > > > > > > > and
> > > > > > > > > >>>> uses this information to prefer fresher replicas.
> > > > > > > > > >>>>
> > > > > > > > > >>>> OR, if it wants only to query the active replica, it
> > would
> > > > throw
> > > > > > > an
> > > > > > > > > >>>> error on any lag response greater than zero, refreshes
> > its
> > > > > > > metadata
> > > > > > > > by
> > > > > > > > > >>>> re-querying the metadata API, and tries again with the
> > > > current
> > > > > > > > active
> > > > > > > > > >>>> replica.
> > > > > > > > > >>>>
> > > > > > > > > >>>> This way, the lag information will be super fresh for
> > the
> > > > client,
> > > > > > > > and
> > > > > > > > > >>>> we keep the Metadata API / Assignment,Subscription /
> and
> > > > Heartbeat
> > > > > > > > as
> > > > > > > > > >>>> slim as possible.
> > > > > > > > > >>>>
> > > > > > > > > >>>> Side note: I do think that some time soon, we'll have
> to
> > > > add a
> > > > > > > > library
> > > > > > > > > >>>> for IQ server/clients. I think that this logic will
> > start
> > > > to get
> > > > > > > > > >>>> pretty complex.
> > > > > > > > > >>>>
> > > > > > > > > >>>> I hope this thinking is reasonably clear!
> > > > > > > > > >>>> Thanks again,
> > > > > > > > > >>>> -John
> > > > > > > > > >>>>
> > > > > > > > > >>>> Does that
> > > > > > > > > >>>>
> > > > > > > > > >>>> On Wed, Oct 23, 2019 at 10:16 AM Vinoth Chandar <
> > > > > > > > > vchan...@confluent.io
> > > > > > > > > >>>
> > > > > > > > > >>>> wrote:
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> Responding to the points raised by Matthias
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> 1. IIUC John intends to add (or we can do this in
> this
> > > > KIP) lag
> > > > > > > > > >>>> information
> > > > > > > > > >>>>> to AssignmentInfo, which gets sent to every
> > participant.
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> 2. At-least I was under the assumption that it can be
> > > > called per
> > > > > > > > > >> query,
> > > > > > > > > >>>>> since the API docs don't seem to suggest otherwise.
> Do
> > you
> > > > see
> > > > > > > any
> > > > > > > > > >>>>> potential issues if we call this every query? (we
> > should
> > > > > > > benchmark
> > > > > > > > > >> this
> > > > > > > > > >>>>> nonetheless)
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> 4. Agree. metadataForKey() implicitly would return
> the
> > > > active
> > > > > > > host
> > > > > > > > > >>>> metadata
> > > > > > > > > >>>>> (as it was before). We should also document this in
> > that
> > > > APIs
> > > > > > > > > >> javadoc,
> > > > > > > > > >>>>> given we have another method(s) that returns more
> host
> > > > metadata
> > > > > > > > now.
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> 5.  While I see the point, the app/caller has to make
> > two
> > > > > > > different
> > > > > > > > > >>> APIs
> > > > > > > > > >>>>> calls to obtain active/standby and potentially do the
> > same
> > > > set of
> > > > > > > > > >>>> operation
> > > > > > > > > >>>>> to query the state. I personally still like a method
> > like
> > > > > > > > isActive()
> > > > > > > > > >>>>> better, but don't have strong opinions.
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> 9. If we do expose the lag information, could we just
> > > > leave it
> > > > > > > upto
> > > > > > > > > >> to
> > > > > > > > > >>>> the
> > > > > > > > > >>>>> caller to decide whether it errors out or not and not
> > make
> > > > the
> > > > > > > > > >> decision
> > > > > > > > > >>>>> within Streams? i.e we don't need a new config
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> 14. +1 . If it's easier to do right away. We started
> > with
> > > > number
> > > > > > > of
> > > > > > > > > >>>>> records, following the lead from KIP-441
> > > > > > > > > >>>>>
> > > > > > > > > >>>>> On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar
> > > > > > > > > >>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > > > > > > > > >>>>>
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>> Thanks, everyone for taking a look. Some very cool
> > ideas
> > > > have
> > > > > > > > flown
> > > > > > > > > >>> in.
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>>>> There was a follow-on idea I POCed to continuously
> > > > share lag
> > > > > > > > > >>>>>> information in the heartbeat protocol+1 that would
> be
> > > > great, I
> > > > > > > > will
> > > > > > > > > >>>> update
> > > > > > > > > >>>>>> the KIP assuming this work will finish soon
> > > > > > > > > >>>>>>>> I think that adding a new method to
> > > > StreamsMetadataState and
> > > > > > > > > >>>>>> deprecating the existing method isthe best way to
> go;
> > we
> > > > just
> > > > > > > > can't
> > > > > > > > > >>>> change
> > > > > > > > > >>>>>> the return types of any existing methods.+1 on this,
> > we
> > > > will add
> > > > > > > > > >> new
> > > > > > > > > >>>>>> methods for users who would be interested in
> querying
> > > > back a
> > > > > > > list
> > > > > > > > > >> of
> > > > > > > > > >>>>>> possible options to query from and leave the current
> > > > function
> > > > > > > > > >>>>>> getStreamsMetadataForKey() untouched for users who
> > want
> > > > absolute
> > > > > > > > > >>>>>> consistency.
> > > > > > > > > >>>>>>>> why not just always return all available metadata
> > > > (including
> > > > > > > > > >>>>>> active/standby or lag) and let the caller decide to
> > which
> > > > node
> > > > > > > > they
> > > > > > > > > >>>> want to
> > > > > > > > > >>>>>> route the query+1. I think this makes sense as from
> a
> > user
> > > > > > > > > >> standpoint
> > > > > > > > > >>>> there
> > > > > > > > > >>>>>> is no difference b/w an active and a standby if both
> > have
> > > > same
> > > > > > > > lag,
> > > > > > > > > >>>> Infact
> > > > > > > > > >>>>>> users would be able to use this API to reduce query
> > load
> > > > on
> > > > > > > > > >> actives,
> > > > > > > > > >>> so
> > > > > > > > > >>>>>> returning all available options along with the
> current
> > > > lag in
> > > > > > > each
> > > > > > > > > >>>> would
> > > > > > > > > >>>>>> make sense and leave it to user how they want to use
> > this
> > > > data.
> > > > > > > > > >> This
> > > > > > > > > >>>> has
> > > > > > > > > >>>>>> another added advantage. If a user queries any
> random
> > > > machine
> > > > > > > for
> > > > > > > > a
> > > > > > > > > >>>> key and
> > > > > > > > > >>>>>> that machine has a replica for the partition(where
> key
> > > > belongs)
> > > > > > > > > >> user
> > > > > > > > > >>>> might
> > > > > > > > > >>>>>> choose to serve the data from there itself(if it
> > doesn’t
> > > > lag
> > > > > > > much)
> > > > > > > > > >>>> rather
> > > > > > > > > >>>>>> than finding the active and making an IQ to that.
> This
> > > > would
> > > > > > > save
> > > > > > > > > >>> some
> > > > > > > > > >>>>>> critical time in serving for some applications.
> > > > > > > > > >>>>>>>> Adding the lag in terms of timestamp diff
> comparing
> > the
> > > > > > > > > >> committed
> > > > > > > > > >>>>>> offset.+1 on this, I think it’s more readable. But
> as
> > > > John said
> > > > > > > > the
> > > > > > > > > >>>>>> function allMetadataForKey() is just returning the
> > > > possible
> > > > > > > > options
> > > > > > > > > >>>> from
> > > > > > > > > >>>>>> where users can query a key, so we can even drop the
> > > > parameter
> > > > > > > > > >>>>>> enableReplicaServing/tolerableDataStaleness and just
> > > > return all
> > > > > > > > the
> > > > > > > > > >>>>>> streamsMetadata containing that key along with the
> > offset
> > > > limit.
> > > > > > > > > >>>>>> Answering the questions posted by Matthias in
> > sequence.
> > > > > > > > > >>>>>> 1. @John can you please comment on this one.2. Yeah
> > the
> > > > usage
> > > > > > > > > >> pattern
> > > > > > > > > >>>>>> would include querying this prior to every request
> 3.
> > > > Will add
> > > > > > > the
> > > > > > > > > >>>> changes
> > > > > > > > > >>>>>> to StreamsMetadata in the KIP, would include changes
> > in
> > > > > > > > > >>>> rebuildMetadata()
> > > > > > > > > >>>>>> etc.4. Makes sense, already addressed above5. Is it
> > > > important
> > > > > > > from
> > > > > > > > > >> a
> > > > > > > > > >>>> user
> > > > > > > > > >>>>>> perspective if they are querying an
> > active(processing),
> > > > > > > > > >>>> active(restoring),
> > > > > > > > > >>>>>> a standby task if we have away of denoting lag in a
> > > > readable
> > > > > > > > manner
> > > > > > > > > >>>> which
> > > > > > > > > >>>>>> kind of signifies the user that this is the best
> node
> > to
> > > > query
> > > > > > > the
> > > > > > > > > >>>> fresh
> > > > > > > > > >>>>>> data.6. Yes, I intend to return the actives and
> > replicas
> > > > in the
> > > > > > > > > >> same
> > > > > > > > > >>>> return
> > > > > > > > > >>>>>> list in allMetadataForKey()7. tricky8. yes, we need
> > new
> > > > > > > functions
> > > > > > > > > >> to
> > > > > > > > > >>>> return
> > > > > > > > > >>>>>> activeRestoring and standbyRunning tasks.9.
> > StreamsConfig
> > > > > > > doesn’t
> > > > > > > > > >>> look
> > > > > > > > > >>>> like
> > > > > > > > > >>>>>> of much use to me since we are giving all possible
> > > > options via
> > > > > > > > this
> > > > > > > > > >>>>>> function, or they can use existing function
> > > > > > > > > >>> getStreamsMetadataForKey()
> > > > > > > > > >>>> and
> > > > > > > > > >>>>>> get just the active10. I think treat them both the
> > same
> > > > and let
> > > > > > > > the
> > > > > > > > > >>>> lag do
> > > > > > > > > >>>>>> the talking11. We are just sending them the option
> to
> > > > query from
> > > > > > > > in
> > > > > > > > > >>>>>> allMetadataForKey(), which doesn’t include any
> > handle. We
> > > > then
> > > > > > > > > >> query
> > > > > > > > > >>>> that
> > > > > > > > > >>>>>> machine for the key where it calls allStores() and
> > tries
> > > > to find
> > > > > > > > > >> the
> > > > > > > > > >>>> task
> > > > > > > > > >>>>>> in activeRunning/activeRestoring/standbyRunning and
> > adds
> > > > the
> > > > > > > store
> > > > > > > > > >>>> handle
> > > > > > > > > >>>>>> here. 12. Need to verify, but during the exact point
> > when
> > > > store
> > > > > > > is
> > > > > > > > > >>>> closed
> > > > > > > > > >>>>>> to transition it from restoring to running the
> queries
> > > > will
> > > > > > > fail.
> > > > > > > > > >> The
> > > > > > > > > >>>>>> caller in such case can have their own configurable
> > > > retries to
> > > > > > > > > >> check
> > > > > > > > > >>>> again
> > > > > > > > > >>>>>> or try the replica if a call fails to active13. I
> > think
> > > > KIP-216
> > > > > > > is
> > > > > > > > > >>>> working
> > > > > > > > > >>>>>> on those lines, we might not need few of those
> > exceptions
> > > > since
> > > > > > > > now
> > > > > > > > > >>> the
> > > > > > > > > >>>>>> basic idea of this KIP is to support IQ during
> > > > rebalancing.14.
> > > > > > > > > >>>> Addressed
> > > > > > > > > >>>>>> above, agreed it looks more readable.
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>>    On Tuesday, 22 October, 2019, 08:39:07 pm IST,
> > > > Matthias J.
> > > > > > > Sax
> > > > > > > > > >> <
> > > > > > > > > >>>>>> matth...@confluent.io> wrote:
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>>  One more thought:
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>> 14) Is specifying the allowed lag in number of
> > records a
> > > > useful
> > > > > > > > way
> > > > > > > > > >>> for
> > > > > > > > > >>>>>> users to declare how stale an instance is allowed to
> > be?
> > > > Would
> > > > > > > it
> > > > > > > > > >> be
> > > > > > > > > >>>>>> more intuitive for users to specify the allowed lag
> in
> > > > time
> > > > > > > units
> > > > > > > > > >>>> (would
> > > > > > > > > >>>>>> event time or processing time be better)? It seems
> > hard
> > > > for
> > > > > > > users
> > > > > > > > > >> to
> > > > > > > > > >>>>>> reason how "fresh" a store really is when number of
> > > > records is
> > > > > > > > > >> used.
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>> -Matthias
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>>> On 10/21/19 9:02 PM, Matthias J. Sax wrote:
> > > > > > > > > >>>>>>> Some more follow up thoughts:
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> 11) If we get a store handle of an
> active(restoring)
> > > > task, and
> > > > > > > > > >> the
> > > > > > > > > >>>> task
> > > > > > > > > >>>>>>> transits to running, does the store handle become
> > > > invalid and a
> > > > > > > > > >> new
> > > > > > > > > >>>> one
> > > > > > > > > >>>>>>> must be retrieved? Or can we "switch it out"
> > underneath
> > > > -- for
> > > > > > > > > >> this
> > > > > > > > > >>>>>>> case, how does the user know when they start to
> > query the
> > > > > > > > > >>> up-to-date
> > > > > > > > > >>>>>> state?
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> 12) Standby tasks will have the store open in
> regular
> > > > mode,
> > > > > > > while
> > > > > > > > > >>>>>>> active(restoring) tasks open stores in "upgrade
> mode"
> > > > for more
> > > > > > > > > >>>> efficient
> > > > > > > > > >>>>>>> bulk loading. When we switch the store into active
> > mode,
> > > > we
> > > > > > > close
> > > > > > > > > >>> it
> > > > > > > > > >>>> and
> > > > > > > > > >>>>>>> reopen it. What is the impact if we query the store
> > > > during
> > > > > > > > > >> restore?
> > > > > > > > > >>>> What
> > > > > > > > > >>>>>>> is the impact if we close the store to transit to
> > > > running (eg,
> > > > > > > > > >>> there
> > > > > > > > > >>>>>>> might be open iterators)?
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> 13) Do we need to introduced new exception types?
> > Compare
> > > > > > > KIP-216
> > > > > > > > > >>>>>>> (
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>
> > > > > > > > > >>>
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors
> > > > > > > > > >>>>>> )
> > > > > > > > > >>>>>>> that aims to improve the user experience with
> regard
> > to
> > > > IQ
> > > > > > > > > >>>> exceptions.
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> -Matthias
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>> On 10/21/19 6:39 PM, Matthias J. Sax wrote:
> > > > > > > > > >>>>>>>> Thanks for the KIP.
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> Couple of comments:
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> 1) With regard to KIP-441, my current
> understanding
> > is
> > > > that
> > > > > > > the
> > > > > > > > > >>> lag
> > > > > > > > > >>>>>>>> information is only reported to the leader (please
> > > > correct me
> > > > > > > > > >> if I
> > > > > > > > > >>>> am
> > > > > > > > > >>>>>>>> wrong). This seems to be quite a limitation to
> > actually
> > > > use
> > > > > > > the
> > > > > > > > > >>> lag
> > > > > > > > > >>>>>>>> information.
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> 2) The idea of the metadata API is actually to get
> > > > metadata
> > > > > > > once
> > > > > > > > > >>> and
> > > > > > > > > >>>>>>>> only refresh the metadata if a store was migrated.
> > The
> > > > current
> > > > > > > > > >>>> proposal
> > > > > > > > > >>>>>>>> would require to get the metadata before each
> query.
> > > > The KIP
> > > > > > > > > >>> should
> > > > > > > > > >>>>>>>> describe the usage pattern and impact in more
> > detail.
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> 3) Currently, the KIP does not list the public API
> > > > changes in
> > > > > > > > > >>>> detail.
> > > > > > > > > >>>>>>>> Please list all methods you intend to deprecate
> and
> > > > list all
> > > > > > > > > >>>> methods you
> > > > > > > > > >>>>>>>> intend to add (best, using a code-block markup --
> > > > compare
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>
> > > > > > > > > >>>
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements
> > > > > > > > > >>>>>>>> as an example)
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> 4) Also note (as already pointed out by John),
> that
> > we
> > > > cannot
> > > > > > > > > >> have
> > > > > > > > > >>>> any
> > > > > > > > > >>>>>>>> breaking API changes. Thus, the API should be
> > designed
> > > > in a
> > > > > > > > > >> fully
> > > > > > > > > >>>>>>>> backward compatible manner.
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> 5) Returning a list of metadata object makes it
> hard
> > > > for user
> > > > > > > to
> > > > > > > > > >>>> know if
> > > > > > > > > >>>>>>>> the first object refers to the active(processing),
> > > > > > > > > >>>> active(restoring), or
> > > > > > > > > >>>>>>>> a standby task. IMHO, we should be more explicit.
> > For
> > > > > > > example, a
> > > > > > > > > >>>>>>>> metadata object could have a flag that one can
> test
> > via
> > > > > > > > > >>>> `#isActive()`.
> > > > > > > > > >>>>>>>> Or maybe even better, we could keep the current
> API
> > > > as-is and
> > > > > > > > > >> add
> > > > > > > > > >>>>>>>> something like `standbyMetadataForKey()` (and
> > similar
> > > > methods
> > > > > > > > > >> for
> > > > > > > > > >>>>>>>> other). Having just a flag `isActive()` is a
> little
> > > > subtle and
> > > > > > > > > >>>> having
> > > > > > > > > >>>>>>>> new overloads would make the API much clearer
> > (passing
> > > > in a
> > > > > > > > > >>> boolean
> > > > > > > > > >>>> flag
> > > > > > > > > >>>>>>>> does not seem to be a nice API).
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> 6) Do you intent to return all standby metadata
> > > > information at
> > > > > > > > > >>> once,
> > > > > > > > > >>>>>>>> similar to `allMetadata()` -- seems to be useful.
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> 7) Even if the lag information is propagated to
> all
> > > > instances,
> > > > > > > > > >> it
> > > > > > > > > >>>> will
> > > > > > > > > >>>>>>>> happen in an async manner. Hence, I am wondering
> if
> > we
> > > > should
> > > > > > > > > >>>> address
> > > > > > > > > >>>>>>>> this race condition (I think we should). The idea
> > would
> > > > be to
> > > > > > > > > >>> check
> > > > > > > > > >>>> if a
> > > > > > > > > >>>>>>>> standby/active(restoring) task is actually still
> > within
> > > > the
> > > > > > > lag
> > > > > > > > > >>>> bounds
> > > > > > > > > >>>>>>>> when a query is executed and we would throw an
> > > > exception if
> > > > > > > not.
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> 8) The current `KafkaStreams#state()` method only
> > > > returns a
> > > > > > > > > >> handle
> > > > > > > > > >>>> to
> > > > > > > > > >>>>>>>> stores of active(processing) tasks. How can a user
> > > > actually
> > > > > > > get
> > > > > > > > > >> a
> > > > > > > > > >>>> handle
> > > > > > > > > >>>>>>>> to an store of an active(restoring) or standby
> task
> > for
> > > > > > > > > >> querying?
> > > > > > > > > >>>> Seems
> > > > > > > > > >>>>>>>> we should add a new method to get standby handles?
> > > > Changing
> > > > > > > the
> > > > > > > > > >>>>>>>> semantics to existing `state()` would be possible,
> > but
> > > > I think
> > > > > > > > > >>>> adding a
> > > > > > > > > >>>>>>>> new method is preferable?
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> 9) How does the user actually specify the
> acceptable
> > > > lag? A
> > > > > > > > > >> global
> > > > > > > > > >>>>>>>> config via StreamsConfig (this would be a public
> API
> > > > change
> > > > > > > that
> > > > > > > > > >>>> needs
> > > > > > > > > >>>>>>>> to be covered in the KIP)? Or on a per-store or
> even
> > > > per-query
> > > > > > > > > >>>> basis for
> > > > > > > > > >>>>>>>> more flexibility? We could also have a global
> > setting
> > > > that is
> > > > > > > > > >> used
> > > > > > > > > >>>> as
> > > > > > > > > >>>>>>>> default and allow to overwrite it on a per-query
> > basis.
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> 10) Do we need to distinguish between
> > active(restoring)
> > > > and
> > > > > > > > > >>> standby
> > > > > > > > > >>>>>>>> tasks? Or could be treat both as the same?
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> -Matthias
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>> On 10/21/19 5:40 PM, Vinoth Chandar wrote:
> > > > > > > > > >>>>>>>>>>> I'm wondering, rather than putting "acceptable
> > lag"
> > > > into
> > > > > > > the
> > > > > > > > > >>>>>>>>> configuration at all, or even making it a
> > parameter on
> > > > > > > > > >>>>>> `allMetadataForKey`,
> > > > > > > > > >>>>>>>>> why not just _always_ return all available
> metadata
> > > > > > > (including
> > > > > > > > > >>>>>>>>> active/standby or lag) and let the caller decide
> to
> > > > which
> > > > > > > node
> > > > > > > > > >>> they
> > > > > > > > > >>>>>> want to
> > > > > > > > > >>>>>>>>> route the query?
> > > > > > > > > >>>>>>>>> +1 on exposing lag information via the APIs. IMO
> > > > without
> > > > > > > having
> > > > > > > > > >>>>>>>>> continuously updated/fresh lag information, its
> > true
> > > > value
> > > > > > > as a
> > > > > > > > > >>>> signal
> > > > > > > > > >>>>>> for
> > > > > > > > > >>>>>>>>> query routing decisions is much limited. But we
> can
> > > > design
> > > > > > > the
> > > > > > > > > >>> API
> > > > > > > > > >>>>>> around
> > > > > > > > > >>>>>>>>> this model and iterate? Longer term, we should
> have
> > > > > > > > > >> continuously
> > > > > > > > > >>>>>> shared lag
> > > > > > > > > >>>>>>>>> information.
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>>>> more general to refactor it to
> > > > "allMetadataForKey(long
> > > > > > > > > >>>>>>>>> tolerableDataStaleness, ...)", and when it's set
> > to 0
> > > > it
> > > > > > > means
> > > > > > > > > >>>> "active
> > > > > > > > > >>>>>> task
> > > > > > > > > >>>>>>>>> only".
> > > > > > > > > >>>>>>>>> +1 IMO if we plan on having
> > `enableReplicaServing`, it
> > > > makes
> > > > > > > > > >>> sense
> > > > > > > > > >>>> to
> > > > > > > > > >>>>>>>>> generalize based on dataStaleness. This seems
> > > > complementary
> > > > > > > to
> > > > > > > > > >>>>>> exposing the
> > > > > > > > > >>>>>>>>> lag information itself.
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>>>> This is actually not a public api change at
> all,
> > and
> > > > I'm
> > > > > > > > > >>>> planning to
> > > > > > > > > >>>>>>>>> implement it asap as a precursor to the rest of
> > KIP-441
> > > > > > > > > >>>>>>>>> +1 again. Do we have a concrete timeline for when
> > this
> > > > change
> > > > > > > > > >>> will
> > > > > > > > > >>>>>> land on
> > > > > > > > > >>>>>>>>> master? I would like to get the implementation
> > wrapped
> > > > up (as
> > > > > > > > > >>> much
> > > > > > > > > >>>> as
> > > > > > > > > >>>>>>>>> possible) by end of the month. :). But I agree
> this
> > > > > > > sequencing
> > > > > > > > > >>>> makes
> > > > > > > > > >>>>>>>>> sense..
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>> On Mon, Oct 21, 2019 at 2:56 PM Guozhang Wang <
> > > > > > > > > >>> wangg...@gmail.com>
> > > > > > > > > >>>>>> wrote:
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>>> Hi Navinder,
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> Thanks for the KIP, I have a high level question
> > > > about the
> > > > > > > > > >>>> proposed
> > > > > > > > > >>>>>> API
> > > > > > > > > >>>>>>>>>> regarding:
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> "StreamsMetadataState::allMetadataForKey(boolean
> > > > > > > > > >>>>>> enableReplicaServing...)"
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> I'm wondering if it's more general to refactor
> it
> > to
> > > > > > > > > >>>>>>>>>> "allMetadataForKey(long tolerableDataStaleness,
> > > > ...)", and
> > > > > > > > > >> when
> > > > > > > > > >>>> it's
> > > > > > > > > >>>>>> set to
> > > > > > > > > >>>>>>>>>> 0 it means "active task only". Behind the scene,
> > we
> > > > can have
> > > > > > > > > >> the
> > > > > > > > > >>>>>> committed
> > > > > > > > > >>>>>>>>>> offsets to encode the stream time as well, so
> that
> > > > when
> > > > > > > > > >>> processing
> > > > > > > > > >>>>>> standby
> > > > > > > > > >>>>>>>>>> tasks the stream process knows not long the lag
> in
> > > > terms of
> > > > > > > > > >>>> offsets
> > > > > > > > > >>>>>>>>>> comparing to the committed offset (internally we
> > call
> > > > it
> > > > > > > > > >> offset
> > > > > > > > > >>>>>> limit), but
> > > > > > > > > >>>>>>>>>> also the lag in terms of timestamp diff
> comparing
> > the
> > > > > > > > > >> committed
> > > > > > > > > >>>>>> offset.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> Also encoding the timestamp as part of offset
> have
> > > > other
> > > > > > > > > >>> benefits
> > > > > > > > > >>>> for
> > > > > > > > > >>>>>>>>>> improving Kafka Streams time semantics as well,
> > but
> > > > for
> > > > > > > > > >> KIP-535
> > > > > > > > > >>>>>> itself I
> > > > > > > > > >>>>>>>>>> think it can help giving users a more intuitive
> > > > interface to
> > > > > > > > > >>>> reason
> > > > > > > > > >>>>>> about.
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> Guozhang
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> On Mon, Oct 21, 2019 at 12:30 PM John Roesler <
> > > > > > > > > >>> j...@confluent.io>
> > > > > > > > > >>>>>> wrote:
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> Hey Navinder,
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> Thanks for the KIP! I've been reading over the
> > > > discussion
> > > > > > > > > >> thus
> > > > > > > > > >>>> far,
> > > > > > > > > >>>>>>>>>>> and I have a couple of thoughts to pile on as
> > well:
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> It seems confusing to propose the API in terms
> > of the
> > > > > > > current
> > > > > > > > > >>>> system
> > > > > > > > > >>>>>>>>>>> state, but also propose how the API would look
> > > > if/when
> > > > > > > > > >> KIP-441
> > > > > > > > > >>> is
> > > > > > > > > >>>>>>>>>>> implemented. It occurs to me that the only part
> > of
> > > > KIP-441
> > > > > > > > > >> that
> > > > > > > > > >>>> would
> > > > > > > > > >>>>>>>>>>> affect you is the availability of the lag
> > > > information in
> > > > > > > the
> > > > > > > > > >>>>>>>>>>> SubscriptionInfo message. This is actually not
> a
> > > > public api
> > > > > > > > > >>>> change at
> > > > > > > > > >>>>>>>>>>> all, and I'm planning to implement it asap as a
> > > > precursor
> > > > > > > to
> > > > > > > > > >>> the
> > > > > > > > > >>>> rest
> > > > > > > > > >>>>>>>>>>> of KIP-441, so maybe you can just build on top
> of
> > > > KIP-441
> > > > > > > and
> > > > > > > > > >>>> assume
> > > > > > > > > >>>>>>>>>>> the lag information will be available. Then you
> > > > could have
> > > > > > > a
> > > > > > > > > >>> more
> > > > > > > > > >>>>>>>>>>> straightforward proposal (e.g., mention that
> > you'd
> > > > return
> > > > > > > the
> > > > > > > > > >>> lag
> > > > > > > > > >>>>>>>>>>> information in AssignmentInfo as well as in the
> > > > > > > > > >> StreamsMetadata
> > > > > > > > > >>>> in
> > > > > > > > > >>>>>>>>>>> some form, or make use of it in the API
> somehow).
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> I'm partially motivated in that former point
> > because
> > > > it
> > > > > > > seems
> > > > > > > > > >>>> like
> > > > > > > > > >>>>>>>>>>> understanding how callers would bound the
> > staleness
> > > > for
> > > > > > > their
> > > > > > > > > >>> use
> > > > > > > > > >>>>>> case
> > > > > > > > > >>>>>>>>>>> is _the_ key point for this KIP. FWIW, I think
> > that
> > > > adding
> > > > > > > a
> > > > > > > > > >>> new
> > > > > > > > > >>>>>>>>>>> method to StreamsMetadataState and deprecating
> > the
> > > > existing
> > > > > > > > > >>>> method is
> > > > > > > > > >>>>>>>>>>> the best way to go; we just can't change the
> > return
> > > > types
> > > > > > > of
> > > > > > > > > >>> any
> > > > > > > > > >>>>>>>>>>> existing methods.
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> I'm wondering, rather than putting "acceptable
> > lag"
> > > > into
> > > > > > > the
> > > > > > > > > >>>>>>>>>>> configuration at all, or even making it a
> > parameter
> > > > on
> > > > > > > > > >>>>>>>>>>> `allMetadataForKey`, why not just _always_
> > return all
> > > > > > > > > >> available
> > > > > > > > > >>>>>>>>>>> metadata (including active/standby or lag) and
> > let
> > > > the
> > > > > > > caller
> > > > > > > > > >>>> decide
> > > > > > > > > >>>>>>>>>>> to which node they want to route the query?
> This
> > > > method
> > > > > > > isn't
> > > > > > > > > >>>> making
> > > > > > > > > >>>>>>>>>>> any queries itself; it's merely telling you
> where
> > > > the local
> > > > > > > > > >>>> Streams
> > > > > > > > > >>>>>>>>>>> instance _thinks_ the key in question is
> located.
> > > > Just
> > > > > > > > > >>> returning
> > > > > > > > > >>>> all
> > > > > > > > > >>>>>>>>>>> available information lets the caller implement
> > any
> > > > > > > semantics
> > > > > > > > > >>>> they
> > > > > > > > > >>>>>>>>>>> desire around querying only active stores, or
> > > > standbys, or
> > > > > > > > > >>>> recovering
> > > > > > > > > >>>>>>>>>>> stores, or whatever.
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> One fly in the ointment, which you may wish to
> > > > consider if
> > > > > > > > > >>>> proposing
> > > > > > > > > >>>>>>>>>>> to use lag information, is that the cluster
> would
> > > > only
> > > > > > > become
> > > > > > > > > >>>> aware
> > > > > > > > > >>>>>> of
> > > > > > > > > >>>>>>>>>>> new lag information during rebalances. Even in
> > the
> > > > full
> > > > > > > > > >>>> expression of
> > > > > > > > > >>>>>>>>>>> KIP-441, this information would stop being
> > > > propagated when
> > > > > > > > > >> the
> > > > > > > > > >>>>>> cluster
> > > > > > > > > >>>>>>>>>>> achieves a balanced task distribution. There
> was
> > a
> > > > > > > follow-on
> > > > > > > > > >>>> idea I
> > > > > > > > > >>>>>>>>>>> POCed to continuously share lag information in
> > the
> > > > > > > heartbeat
> > > > > > > > > >>>>>> protocol,
> > > > > > > > > >>>>>>>>>>> which you might be interested in, if you want
> to
> > > > make sure
> > > > > > > > > >> that
> > > > > > > > > >>>> nodes
> > > > > > > > > >>>>>>>>>>> are basically _always_ aware of each others'
> lag
> > on
> > > > > > > different
> > > > > > > > > >>>>>>>>>>> partitions:
> > > > https://github.com/apache/kafka/pull/7096
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> Thanks again!
> > > > > > > > > >>>>>>>>>>> -John
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> On Sat, Oct 19, 2019 at 6:06 AM Navinder Brar
> > > > > > > > > >>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> Thanks, Vinoth. Looks like we are on the same
> > page.
> > > > I will
> > > > > > > > > >> add
> > > > > > > > > >>>> some
> > > > > > > > > >>>>>> of
> > > > > > > > > >>>>>>>>>>> these explanations to the KIP as well. Have
> > assigned
> > > > the
> > > > > > > > > >>>> KAFKA-6144
> > > > > > > > > >>>>>> to
> > > > > > > > > >>>>>>>>>>> myself and KAFKA-8994 is closed(by you). As
> > > > suggested, we
> > > > > > > > > >> will
> > > > > > > > > >>>>>> replace
> > > > > > > > > >>>>>>>>>>> "replica" with "standby".
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> In the new API,
> > > > > > > > > >>> "StreamsMetadataState::allMetadataForKey(boolean
> > > > > > > > > >>>>>>>>>>> enableReplicaServing, String storeName, K key,
> > > > > > > Serializer<K>
> > > > > > > > > >>>>>>>>>>> keySerializer)" Do we really need a per key
> > > > configuration?
> > > > > > > > > >> or a
> > > > > > > > > >>>> new
> > > > > > > > > >>>>>>>>>>> StreamsConfig is good enough?>> Coming from
> > > > experience,
> > > > > > > when
> > > > > > > > > >>>> teams
> > > > > > > > > >>>>>> are
> > > > > > > > > >>>>>>>>>>> building a platform with Kafka Streams and
> these
> > > > API's
> > > > > > > serve
> > > > > > > > > >>>> data to
> > > > > > > > > >>>>>>>>>>> multiple teams, we can't have a generalized
> > config
> > > > that
> > > > > > > says
> > > > > > > > > >>> as a
> > > > > > > > > >>>>>>>>>> platform
> > > > > > > > > >>>>>>>>>>> we will support stale reads or not. It should
> be
> > the
> > > > choice
> > > > > > > > > >> of
> > > > > > > > > >>>>>> someone
> > > > > > > > > >>>>>>>>>> who
> > > > > > > > > >>>>>>>>>>> is calling the API's to choose whether they are
> > ok
> > > > with
> > > > > > > stale
> > > > > > > > > >>>> reads
> > > > > > > > > >>>>>> or
> > > > > > > > > >>>>>>>>>> not.
> > > > > > > > > >>>>>>>>>>> Makes sense?
> > > > > > > > > >>>>>>>>>>>>    On Thursday, 17 October, 2019, 11:56:02 pm
> > IST,
> > > > Vinoth
> > > > > > > > > >>>> Chandar <
> > > > > > > > > >>>>>>>>>>> vchan...@confluent.io> wrote:
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>  Looks like we are covering ground :)
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>> Only if it is within a permissible
> range(say
> > > > 10000) we
> > > > > > > > > >> will
> > > > > > > > > >>>> serve
> > > > > > > > > >>>>>>>>>> from
> > > > > > > > > >>>>>>>>>>>> Restoring state of active.
> > > > > > > > > >>>>>>>>>>>> +1 on having a knob like this.. My reasoning
> is
> > as
> > > > > > > follows.
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> Looking at the Streams state as a read-only
> > > > distributed kv
> > > > > > > > > >>>> store.
> > > > > > > > > >>>>>> With
> > > > > > > > > >>>>>>>>>>>> num_standby = f , we should be able to
> tolerate
> > f
> > > > failures
> > > > > > > > > >> and
> > > > > > > > > >>>> if
> > > > > > > > > >>>>>> there
> > > > > > > > > >>>>>>>>>>> is
> > > > > > > > > >>>>>>>>>>>> a f+1' failure, the system should be
> > unavailable.
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> A) So with num_standby=0, the system should be
> > > > unavailable
> > > > > > > > > >>> even
> > > > > > > > > >>>> if
> > > > > > > > > >>>>>>>>>> there
> > > > > > > > > >>>>>>>>>>> is
> > > > > > > > > >>>>>>>>>>>> 1 failure and thats my argument for not
> allowing
> > > > querying
> > > > > > > in
> > > > > > > > > >>>>>>>>>> restoration
> > > > > > > > > >>>>>>>>>>>> state, esp in this case it will be a total
> > rebuild
> > > > of the
> > > > > > > > > >>> state
> > > > > > > > > >>>>>> (which
> > > > > > > > > >>>>>>>>>>> IMO
> > > > > > > > > >>>>>>>>>>>> cannot be considered a normal fault free
> > operational
> > > > > > > state).
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> B) Even there are standby's, say
> num_standby=2,
> > if
> > > > the
> > > > > > > user
> > > > > > > > > >>>> decides
> > > > > > > > > >>>>>> to
> > > > > > > > > >>>>>>>>>>> shut
> > > > > > > > > >>>>>>>>>>>> down all 3 instances, then only outcome should
> > be
> > > > > > > > > >>> unavailability
> > > > > > > > > >>>>>> until
> > > > > > > > > >>>>>>>>>>> all
> > > > > > > > > >>>>>>>>>>>> of them come back or state is rebuilt on other
> > > > nodes in
> > > > > > > the
> > > > > > > > > >>>>>> cluster. In
> > > > > > > > > >>>>>>>>>>>> normal operations, f <= 2 and when a failure
> > does
> > > > happen
> > > > > > > we
> > > > > > > > > >>> can
> > > > > > > > > >>>> then
> > > > > > > > > >>>>>>>>>>> either
> > > > > > > > > >>>>>>>>>>>> choose to be C over A and fail IQs until
> > > > replication is
> > > > > > > > > >> fully
> > > > > > > > > >>>>>> caught up
> > > > > > > > > >>>>>>>>>>> or
> > > > > > > > > >>>>>>>>>>>> choose A over C by serving in restoring state
> as
> > > > long as
> > > > > > > lag
> > > > > > > > > >>> is
> > > > > > > > > >>>>>>>>>> minimal.
> > > > > > > > > >>>>>>>>>>> If
> > > > > > > > > >>>>>>>>>>>> even with f=1 say, all the standbys are
> lagging
> > a
> > > > lot due
> > > > > > > to
> > > > > > > > > >>>> some
> > > > > > > > > >>>>>>>>>> issue,
> > > > > > > > > >>>>>>>>>>>> then that should be considered a failure since
> > that
> > > > is
> > > > > > > > > >>> different
> > > > > > > > > >>>>>> from
> > > > > > > > > >>>>>>>>>>>> normal/expected operational mode. Serving
> reads
> > with
> > > > > > > > > >> unbounded
> > > > > > > > > >>>>>>>>>>> replication
> > > > > > > > > >>>>>>>>>>>> lag and calling it "available" may not be very
> > > > usable or
> > > > > > > > > >> even
> > > > > > > > > >>>>>> desirable
> > > > > > > > > >>>>>>>>>>> :)
> > > > > > > > > >>>>>>>>>>>> IMHO, since it gives the user no way to reason
> > > > about the
> > > > > > > app
> > > > > > > > > >>>> that is
> > > > > > > > > >>>>>>>>>>> going
> > > > > > > > > >>>>>>>>>>>> to query this store.
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> So there is definitely a need to distinguish
> > > > between :
> > > > > > > > > >>>> Replication
> > > > > > > > > >>>>>>>>>>> catchup
> > > > > > > > > >>>>>>>>>>>> while being in fault free state vs Restoration
> > of
> > > > state
> > > > > > > when
> > > > > > > > > >>> we
> > > > > > > > > >>>> lose
> > > > > > > > > >>>>>>>>>> more
> > > > > > > > > >>>>>>>>>>>> than f standbys. This knob is a great starting
> > point
> > > > > > > towards
> > > > > > > > > >>>> this.
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> If you agree with some of the explanation
> above,
> > > > please
> > > > > > > feel
> > > > > > > > > >>>> free to
> > > > > > > > > >>>>>>>>>>>> include it in the KIP as well since this is
> > sort of
> > > > our
> > > > > > > > > >> design
> > > > > > > > > >>>>>>>>>> principle
> > > > > > > > > >>>>>>>>>>>> here..
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> Small nits :
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> - let's standardize on "standby" instead of
> > > > "replica", KIP
> > > > > > > > > >> or
> > > > > > > > > >>>>>> code,  to
> > > > > > > > > >>>>>>>>>>> be
> > > > > > > > > >>>>>>>>>>>> consistent with rest of Streams code/docs?
> > > > > > > > > >>>>>>>>>>>> - Can we merge KAFKA-8994 into KAFKA-6144 now
> > and
> > > > close
> > > > > > > the
> > > > > > > > > >>>> former?
> > > > > > > > > >>>>>>>>>>>> Eventually need to consolidate KAFKA-6555 as
> > well
> > > > > > > > > >>>>>>>>>>>> - In the new API,
> > > > > > > > > >>>> "StreamsMetadataState::allMetadataForKey(boolean
> > > > > > > > > >>>>>>>>>>>> enableReplicaServing, String storeName, K key,
> > > > > > > Serializer<K>
> > > > > > > > > >>>>>>>>>>> keySerializer)" Do
> > > > > > > > > >>>>>>>>>>>> we really need a per key configuration? or a
> new
> > > > > > > > > >> StreamsConfig
> > > > > > > > > >>>> is
> > > > > > > > > >>>>>> good
> > > > > > > > > >>>>>>>>>>>> enough?
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>> On Wed, Oct 16, 2019 at 8:31 PM Navinder Brar
> > > > > > > > > >>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> @Vinoth, I have incorporated a few of the
> > > > discussions we
> > > > > > > > > >> have
> > > > > > > > > >>>> had
> > > > > > > > > >>>>>> in
> > > > > > > > > >>>>>>>>>>> the
> > > > > > > > > >>>>>>>>>>>>> KIP.
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> In the current code, t0 and t1 serve queries
> > from
> > > > > > > > > >>>> Active(Running)
> > > > > > > > > >>>>>>>>>>>>> partition. For case t2, we are planning to
> > return
> > > > > > > > > >>>>>>>>>> List<StreamsMetadata>
> > > > > > > > > >>>>>>>>>>>>> such that it returns <StreamsMetadata(A),
> > > > > > > > > >> StreamsMetadata(B)>
> > > > > > > > > >>>> so
> > > > > > > > > >>>>>> that
> > > > > > > > > >>>>>>>>>>> if IQ
> > > > > > > > > >>>>>>>>>>>>> fails on A, the replica on B can serve the
> > data by
> > > > > > > enabling
> > > > > > > > > >>>> serving
> > > > > > > > > >>>>>>>>>>> from
> > > > > > > > > >>>>>>>>>>>>> replicas. This still does not solve case t3
> > and t4
> > > > since
> > > > > > > B
> > > > > > > > > >>> has
> > > > > > > > > >>>> been
> > > > > > > > > >>>>>>>>>>>>> promoted to active but it is in Restoring
> > state to
> > > > > > > catchup
> > > > > > > > > >>>> till A’s
> > > > > > > > > >>>>>>>>>>> last
> > > > > > > > > >>>>>>>>>>>>> committed position as we don’t serve from
> > > > Restoring state
> > > > > > > > > >> in
> > > > > > > > > >>>> Active
> > > > > > > > > >>>>>>>>>>> and new
> > > > > > > > > >>>>>>>>>>>>> Replica on R is building itself from scratch.
> > Both
> > > > these
> > > > > > > > > >>> cases
> > > > > > > > > >>>> can
> > > > > > > > > >>>>>> be
> > > > > > > > > >>>>>>>>>>>>> solved if we start serving from Restoring
> > state of
> > > > active
> > > > > > > > > >> as
> > > > > > > > > >>>> well
> > > > > > > > > >>>>>>>>>>> since it
> > > > > > > > > >>>>>>>>>>>>> is almost equivalent to previous Active.
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> There could be a case where all replicas of a
> > > > partition
> > > > > > > > > >>> become
> > > > > > > > > >>>>>>>>>>> unavailable
> > > > > > > > > >>>>>>>>>>>>> and active and all replicas of that partition
> > are
> > > > > > > building
> > > > > > > > > >>>>>> themselves
> > > > > > > > > >>>>>>>>>>> from
> > > > > > > > > >>>>>>>>>>>>> scratch, in this case, the state in Active is
> > far
> > > > behind
> > > > > > > > > >> even
> > > > > > > > > >>>>>> though
> > > > > > > > > >>>>>>>>>>> it is
> > > > > > > > > >>>>>>>>>>>>> in Restoring state. To cater to such cases
> > that we
> > > > don’t
> > > > > > > > > >>> serve
> > > > > > > > > >>>> from
> > > > > > > > > >>>>>>>>>>> this
> > > > > > > > > >>>>>>>>>>>>> state we can either add another state before
> > > > Restoring or
> > > > > > > > > >>>> check the
> > > > > > > > > >>>>>>>>>>>>> difference between last committed offset and
> > > > current
> > > > > > > > > >>> position.
> > > > > > > > > >>>> Only
> > > > > > > > > >>>>>>>>>> if
> > > > > > > > > >>>>>>>>>>> it
> > > > > > > > > >>>>>>>>>>>>> is within a permissible range (say 10000) we
> > will
> > > > serve
> > > > > > > > > >> from
> > > > > > > > > >>>>>>>>>> Restoring
> > > > > > > > > >>>>>>>>>>> the
> > > > > > > > > >>>>>>>>>>>>> state of Active.
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>    On Wednesday, 16 October, 2019, 10:01:35
> pm
> > IST,
> > > > > > > Vinoth
> > > > > > > > > >>>> Chandar
> > > > > > > > > >>>>>> <
> > > > > > > > > >>>>>>>>>>>>> vchan...@confluent.io> wrote:
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>  Thanks for the updates on the KIP, Navinder!
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> Few comments
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> - AssignmentInfo is not public API?. But we
> > will
> > > > change
> > > > > > > it
> > > > > > > > > >>> and
> > > > > > > > > >>>> thus
> > > > > > > > > >>>>>>>>>>> need to
> > > > > > > > > >>>>>>>>>>>>> increment the version and test for
> > version_probing
> > > > etc.
> > > > > > > > > >> Good
> > > > > > > > > >>> to
> > > > > > > > > >>>>>>>>>>> separate
> > > > > > > > > >>>>>>>>>>>>> that from StreamsMetadata changes (which is
> > public
> > > > API)
> > > > > > > > > >>>>>>>>>>>>> - From what I see, there is going to be
> choice
> > > > between
> > > > > > > the
> > > > > > > > > >>>>>> following
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>  A) introducing a new
> > > > *KafkaStreams::allMetadataForKey()
> > > > > > > > > >> *API
> > > > > > > > > >>>> that
> > > > > > > > > >>>>>>>>>>>>> potentially returns List<StreamsMetadata>
> > ordered
> > > > from
> > > > > > > most
> > > > > > > > > >>>> upto
> > > > > > > > > >>>>>> date
> > > > > > > > > >>>>>>>>>>> to
> > > > > > > > > >>>>>>>>>>>>> least upto date replicas. Today we cannot
> fully
> > > > implement
> > > > > > > > > >>> this
> > > > > > > > > >>>>>>>>>>> ordering,
> > > > > > > > > >>>>>>>>>>>>> since all we know is which hosts are active
> and
> > > > which are
> > > > > > > > > >>>> standbys.
> > > > > > > > > >>>>>>>>>>>>> However, this aligns well with the future.
> > KIP-441
> > > > adds
> > > > > > > the
> > > > > > > > > >>> lag
> > > > > > > > > >>>>>>>>>>> information
> > > > > > > > > >>>>>>>>>>>>> to the rebalancing protocol. We could also
> sort
> > > > replicas
> > > > > > > > > >>> based
> > > > > > > > > >>>> on
> > > > > > > > > >>>>>> the
> > > > > > > > > >>>>>>>>>>>>> report lags eventually. This is fully
> backwards
> > > > > > > compatible
> > > > > > > > > >>> with
> > > > > > > > > >>>>>>>>>>> existing
> > > > > > > > > >>>>>>>>>>>>> clients. Only drawback I see is the naming of
> > the
> > > > > > > existing
> > > > > > > > > >>>> method
> > > > > > > > > >>>>>>>>>>>>> KafkaStreams::metadataForKey, not conveying
> the
> > > > > > > distinction
> > > > > > > > > >>>> that it
> > > > > > > > > >>>>>>>>>>> simply
> > > > > > > > > >>>>>>>>>>>>> returns the active replica i.e
> > > > allMetadataForKey.get(0).
> > > > > > > > > >>>>>>>>>>>>>  B) Change KafkaStreams::metadataForKey() to
> > > > return a
> > > > > > > List.
> > > > > > > > > >>>> Its a
> > > > > > > > > >>>>>>>>>>> breaking
> > > > > > > > > >>>>>>>>>>>>> change.
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> I prefer A, since none of the
> > semantics/behavior
> > > > changes
> > > > > > > > > >> for
> > > > > > > > > >>>>>> existing
> > > > > > > > > >>>>>>>>>>>>> users. Love to hear more thoughts. Can we
> also
> > > > work this
> > > > > > > > > >> into
> > > > > > > > > >>>> the
> > > > > > > > > >>>>>>>>>> KIP?
> > > > > > > > > >>>>>>>>>>>>> I already implemented A to unblock myself for
> > now.
> > > > Seems
> > > > > > > > > >>>> feasible
> > > > > > > > > >>>>>> to
> > > > > > > > > >>>>>>>>>>> do.
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> On Tue, Oct 15, 2019 at 12:21 PM Vinoth
> > Chandar <
> > > > > > > > > >>>>>>>>>> vchan...@confluent.io
> > > > > > > > > >>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>> wrote:
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> I get your point. But suppose there is a
> > > > replica which
> > > > > > > > > >> has
> > > > > > > > > >>>> just
> > > > > > > > > >>>>>>>>>>> become
> > > > > > > > > >>>>>>>>>>>>>> active, so in that case replica will still
> be
> > > > building
> > > > > > > > > >>> itself
> > > > > > > > > >>>> from
> > > > > > > > > >>>>>>>>>>>>> scratch
> > > > > > > > > >>>>>>>>>>>>>> and this active will go to restoring state
> > till it
> > > > > > > catches
> > > > > > > > > >>> up
> > > > > > > > > >>>> with
> > > > > > > > > >>>>>>>>>>>>> previous
> > > > > > > > > >>>>>>>>>>>>>> active, wouldn't serving from a restoring
> > active
> > > > make
> > > > > > > more
> > > > > > > > > >>>> sense
> > > > > > > > > >>>>>>>>>>> than a
> > > > > > > > > >>>>>>>>>>>>>> replica in such case.
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>> KIP-441 will change this behavior such that
> > > > promotion to
> > > > > > > > > >>>> active
> > > > > > > > > >>>>>>>>>>> happens
> > > > > > > > > >>>>>>>>>>>>>> based on how caught up a replica is. So,
> once
> > we
> > > > have
> > > > > > > that
> > > > > > > > > >>>> (work
> > > > > > > > > >>>>>>>>>>> underway
> > > > > > > > > >>>>>>>>>>>>>> already for 2.5 IIUC) and user sets
> > > > > > > num.standby.replicas >
> > > > > > > > > >>> 0,
> > > > > > > > > >>>> then
> > > > > > > > > >>>>>>>>>>> the
> > > > > > > > > >>>>>>>>>>>>>> staleness window should not be that long as
> > you
> > > > > > > describe.
> > > > > > > > > >>> IMO
> > > > > > > > > >>>> if
> > > > > > > > > >>>>>>>>>> user
> > > > > > > > > >>>>>>>>>>>>> wants
> > > > > > > > > >>>>>>>>>>>>>> availability for state, then should
> configure
> > > > > > > > > >>>> num.standby.replicas
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>> 0.
> > > > > > > > > >>>>>>>>>>>>> If
> > > > > > > > > >>>>>>>>>>>>>> not, then on a node loss, few partitions
> > would be
> > > > > > > > > >>> unavailable
> > > > > > > > > >>>> for
> > > > > > > > > >>>>>> a
> > > > > > > > > >>>>>>>>>>> while
> > > > > > > > > >>>>>>>>>>>>>> (there are other ways to bring this window
> > down,
> > > > which I
> > > > > > > > > >>> won't
> > > > > > > > > >>>>>>>>>> bring
> > > > > > > > > >>>>>>>>>>> in
> > > > > > > > > >>>>>>>>>>>>>> here). We could argue for querying a
> restoring
> > > > active
> > > > > > > > > >> (say a
> > > > > > > > > >>>> new
> > > > > > > > > >>>>>>>>>> node
> > > > > > > > > >>>>>>>>>>>>> added
> > > > > > > > > >>>>>>>>>>>>>> to replace a faulty old node) based on AP vs
> > CP
> > > > > > > > > >> principles.
> > > > > > > > > >>>> But
> > > > > > > > > >>>>>> not
> > > > > > > > > >>>>>>>>>>> sure
> > > > > > > > > >>>>>>>>>>>>>> reading really really old values for the
> sake
> > of
> > > > > > > > > >>> availability
> > > > > > > > > >>>> is
> > > > > > > > > >>>>>>>>>>> useful.
> > > > > > > > > >>>>>>>>>>>>> No
> > > > > > > > > >>>>>>>>>>>>>> AP data system would be inconsistent for
> such
> > a
> > > > long
> > > > > > > time
> > > > > > > > > >> in
> > > > > > > > > >>>>>>>>>>> practice.
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>> So, I still feel just limiting this to
> standby
> > > > reads
> > > > > > > > > >>> provides
> > > > > > > > > >>>> best
> > > > > > > > > >>>>>>>>>>>>>> semantics.
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>> Just my 2c. Would love to see what others
> > think
> > > > as well.
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 5:34 AM Navinder
> Brar
> > > > > > > > > >>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote:
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>> Hi Vinoth,
> > > > > > > > > >>>>>>>>>>>>>>> Thanks for the feedback.
> > > > > > > > > >>>>>>>>>>>>>>>  Can we link the JIRA, discussion thread
> > also to
> > > > the
> > > > > > > > > >> KIP.>>
> > > > > > > > > >>>>>> Added.
> > > > > > > > > >>>>>>>>>>>>>>> Based on the discussion on KAFKA-6144, I
> was
> > > > under the
> > > > > > > > > >>>> impression
> > > > > > > > > >>>>>>>>>>> that
> > > > > > > > > >>>>>>>>>>>>>>> this KIP is also going to cover exposing of
> > the
> > > > standby
> > > > > > > > > >>>>>>>>>> information
> > > > > > > > > >>>>>>>>>>> in
> > > > > > > > > >>>>>>>>>>>>>>> StreamsMetadata and thus subsume
> KAFKA-8994 .
> > > > That
> > > > > > > would
> > > > > > > > > >>>> require
> > > > > > > > > >>>>>> a
> > > > > > > > > >>>>>>>>>>>>> public
> > > > > > > > > >>>>>>>>>>>>>>> API change?>> Sure, I can add changes for
> > 8994
> > > > in this
> > > > > > > > > >> KIP
> > > > > > > > > >>>> and
> > > > > > > > > >>>>>>>>>> link
> > > > > > > > > >>>>>>>>>>>>>>> KAFKA-6144 to KAFKA-8994 as well.
> > > > > > > > > >>>>>>>>>>>>>>>  KIP seems to be focussing on restoration
> > when a
> > > > new
> > > > > > > node
> > > > > > > > > >>> is
> > > > > > > > > >>>>>>>>>> added.
> > > > > > > > > >>>>>>>>>>>>>>> KIP-441 is underway and has some major
> > changes
> > > > proposed
> > > > > > > > > >> for
> > > > > > > > > >>>> this.
> > > > > > > > > >>>>>>>>>> It
> > > > > > > > > >>>>>>>>>>>>> would
> > > > > > > > > >>>>>>>>>>>>>>> be good to clarify dependencies if any.
> > Without
> > > > > > > KIP-441,
> > > > > > > > > >> I
> > > > > > > > > >>>> am not
> > > > > > > > > >>>>>>>>>>> very
> > > > > > > > > >>>>>>>>>>>>> sure
> > > > > > > > > >>>>>>>>>>>>>>> if we should allow reads from nodes in
> > RESTORING
> > > > state,
> > > > > > > > > >>> which
> > > > > > > > > >>>>>>>>>> could
> > > > > > > > > >>>>>>>>>>>>> amount
> > > > > > > > > >>>>>>>>>>>>>>> to many minutes/few hours of stale reads?
> > This
> > > > is
> > > > > > > > > >>> different
> > > > > > > > > >>>> from
> > > > > > > > > >>>>>>>>>>>>> allowing
> > > > > > > > > >>>>>>>>>>>>>>> querying standby replicas, which could be
> > mostly
> > > > caught
> > > > > > > > > >> up
> > > > > > > > > >>>> and
> > > > > > > > > >>>>>> the
> > > > > > > > > >>>>>>>>>>>>>>> staleness window could be much
> > > > smaller/tolerable. (once
> > > > > > > > > >>>> again the
> > > > > > > > > >>>>>>>>>>> focus
> > > > > > > > > >>>>>>>>>>>>> on
> > > > > > > > > >>>>>>>>>>>>>>> KAFKA-8994).>> I get your point. But
> suppose
> > > > there is a
> > > > > > > > > >>>> replica
> > > > > > > > > >>>>>>>>>>> which
> > > > > > > > > >>>>>>>>>>>>> has
> > > > > > > > > >>>>>>>>>>>>>>> just become active, so in that case replica
> > will
> > > > still
> > > > > > > be
> > > > > > > > > >>>>>> building
> > > > > > > > > >>>>>>>>>>>>> itself
> > > > > > > > > >>>>>>>>>>>>>>> from scratch and this active will go to
> > > > restoring state
> > > > > > > > > >>> till
> > > > > > > > > >>>> it
> > > > > > > > > >>>>>>>>>>> catches
> > > > > > > > > >>>>>>>>>>>>> up
> > > > > > > > > >>>>>>>>>>>>>>> with previous active, wouldn't serving
> from a
> > > > restoring
> > > > > > > > > >>>> active
> > > > > > > > > >>>>>>>>>> make
> > > > > > > > > >>>>>>>>>>> more
> > > > > > > > > >>>>>>>>>>>>>>> sense than a replica in such case.
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>> Finally, we may need to introduce a
> > > > configuration to
> > > > > > > > > >>> control
> > > > > > > > > >>>>>> this.
> > > > > > > > > >>>>>>>>>>> Some
> > > > > > > > > >>>>>>>>>>>>>>> users may prefer errors to stale data. Can
> we
> > > > also add
> > > > > > > it
> > > > > > > > > >>> to
> > > > > > > > > >>>> the
> > > > > > > > > >>>>>>>>>>> KIP?>>
> > > > > > > > > >>>>>>>>>>>>>>> Will add this.
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>> Regards,
> > > > > > > > > >>>>>>>>>>>>>>> Navinder
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>> On2019/10/14 16:56:49, Vinoth Chandar <
> > > > > > > v...@confluent.io
> > > > > > > > > >>>>> wrote:
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> Hi Navinder,>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> Thanks for sharing the KIP! Few thoughts>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> - Can we link the JIRA, discussion thread
> > also
> > > > to the
> > > > > > > > > >> KIP>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> - Based on the discussion on KAFKA-6144, I
> > was
> > > > under
> > > > > > > the
> > > > > > > > > >>>>>>>>>> impression
> > > > > > > > > >>>>>>>>>>>>>>> that>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> this KIP is also going to cover exposing
> of
> > the
> > > > > > > standby
> > > > > > > > > >>>>>>>>>>> information in>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> StreamsMetadata and thus subsume
> KAFKA-8994
> > .
> > > > That
> > > > > > > would
> > > > > > > > > >>>> require
> > > > > > > > > >>>>>>>>>> a
> > > > > > > > > >>>>>>>>>>>>>>> public>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> API change?>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> - KIP seems to be focussing on restoration
> > when
> > > > a new
> > > > > > > > > >> node
> > > > > > > > > >>>> is
> > > > > > > > > >>>>>>>>>>> added.>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> KIP-441 is underway and has some major
> > changes
> > > > > > > proposed
> > > > > > > > > >>> for
> > > > > > > > > >>>>>> this.
> > > > > > > > > >>>>>>>>>>> It
> > > > > > > > > >>>>>>>>>>>>>>> would>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> be good to clarify dependencies if any.
> > Without
> > > > > > > > > >> KIP-441, I
> > > > > > > > > >>>> am
> > > > > > > > > >>>>>> not
> > > > > > > > > >>>>>>>>>>> very
> > > > > > > > > >>>>>>>>>>>>>>> sure>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> if we should allow reads from nodes in
> > RESTORING
> > > > > > > state,
> > > > > > > > > >>>> which
> > > > > > > > > >>>>>>>>>> could
> > > > > > > > > >>>>>>>>>>>>>>> amount>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> to many minutes/few hours of stale reads?
> > This
> > > > is
> > > > > > > > > >>> different
> > > > > > > > > >>>>>>>>>>>>>>> fromallowing>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> querying standby replicas, which could be
> > mostly
> > > > > > > caught
> > > > > > > > > >> up
> > > > > > > > > >>>> and
> > > > > > > > > >>>>>>>>>> the>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> staleness window could be much
> > > > smaller/tolerable.
> > > > > > > (once
> > > > > > > > > >>>> again
> > > > > > > > > >>>>>> the
> > > > > > > > > >>>>>>>>>>> focus
> > > > > > > > > >>>>>>>>>>>>>>> on>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> KAFKA-8994)>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> - Finally, we may need to introduce a
> > > > configuration to
> > > > > > > > > >>>> control
> > > > > > > > > >>>>>>>>>>> this.
> > > > > > > > > >>>>>>>>>>>>>>> Some>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> users may prefer errors to stale data. Can
> > we
> > > > also add
> > > > > > > > > >> it
> > > > > > > > > >>>> to the
> > > > > > > > > >>>>>>>>>>> KIP?>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> Thanks>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> Vinoth>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> On Sun, Oct 13, 2019 at 3:31 PM Navinder
> > Brar>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>> <na...@yahoo.com.invalid>wrote:>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>> Hi,>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>> Starting a discussion on the KIP to Allow
> > state
> > > > > > > stores
> > > > > > > > > >> to
> > > > > > > > > >>>> serve
> > > > > > > > > >>>>>>>>>>>>> stale>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>> reads during rebalance(>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>
> > > > > > > > > >>>
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>> ).>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>> Thanks & Regards,Navinder>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>> LinkedIn>
> > > > > > > > > >>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>>
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>> --
> > > > > > > > > >>>>>>>>>> -- Guozhang
> > > > > > > > > >>>>>>>>>>
> > > > > > > > > >>>>>>>>>
> > > > > > > > > >>>>>>>>
> > > > > > > > > >>>>>>>
> > > > > > > > > >>>>>>
> > > > > > > > > >>>>
> > > > > > > > > >>>>
> > > > > > > > > >>>
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> --
> > > > > > > > > >> -- Guozhang
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > >
> >
>

Reply via email to