Hey Vinoth,

Really sorry, I just remembered that I started a reply earlier today,
but got side-tracked.

Regarding the AssignmentInfo extension:

Your explanation for this point makes sense. I was incorrectly
thinking that the cluster metadata was shared with all members, but
now I see it's only given to the assignor. I agree now that the
assignor basically has to encode this information in the userdata
field if it wants the members to have it. Thanks for your patience in
explaining and linking the relevant history.

Given this constraint, the encoding part of the discussion is moot.
Regardless, the detail you provided does make sense to me.

I'm now in favor of the proposal for extending AssignmentInfo.


Regarding "breaking up the API":

Ah, my mistake. Yes, it sounds like this would be a good idea. I just
took another look at KafkaStreams. Since there's no method for getting
all the local stores, perhaps we can skip the "get the lags for all
stores" method, and just add two new methods to the KafkaStreams
interface like this:

KafkaStreams {
// existing
<T> T store(String storeName, QueryableStoreType<T> queryableStoreType)

// new
/* Report the current amount by which the local store lags behind the
changelog tail. This is an indicator of how fresh the local copy of a
store is with respect to the active copy. Always 0 for stores in
active tasks. */
long storeChangelogOffsetLag(String storeName)

/* Report the time difference between the last consumed changelog
record's timestamp and the changelog tail record's timestamp. This is
an indicator of how fresh the local copy of a store is with respect to
the active copy. Always Duration.ZERO for stores in active tasks. */
Duration storeChangelogTimeLag(String storeName)
}

Note, I'm not insisting on this interface, just proposing it to
potentially minimize back-and-forth. Here's the reasoning:
* Since this API is no longer reporting lags for all stores, just
local ones, it makes sense to try and stick close to the `store(name,
type)` method. This also brings the new methods down to two. If others
think there's a use case for getting all the stores' lags, then we can
also propose to add corresponding `all*Lags` methods that return
`Map<storeName, lag>`.
* I also just realized that we were proposing to add
`timeLagEstimateMs()` as a `long`, but as a project, we have a larger
evolution to migrate to `Duration` and `Instant` where applicable. I
think it makes sense to do that in this case.

How does this sound?
Thanks,
-John



On Tue, Nov 5, 2019 at 7:50 PM Vinoth Chandar <vchan...@confluent.io> wrote:
>
> Ping :) Any thoughts?
>
> On Mon, Nov 4, 2019 at 5:13 PM Vinoth Chandar <vchan...@confluent.io> wrote:
>
> > >>  I'm having some trouble wrapping my head around what race conditions
> > might occur, other than the fundamentally broken state in which different
> > instances are running totally different topologies.
> > 3. @both Without the topic partitions that the tasks can map back to, we
> > have to rely on topology/cluster metadata in each Streams instance to map
> > the task back. If the source topics are wild carded for e,g then each
> > instance could have different source topics in topology, until the next
> > rebalance happens. You can also read my comments from here
> > https://issues.apache.org/jira/browse/KAFKA-7149?focusedCommentId=16904106&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16904106
> >
> >
> > >> seems hard to imagine how encoding arbitrarily long topic names plus an
> > integer for the partition number could be as efficient as task ids, which
> > are just two integers.
> > 3. if you still have concerns about the efficacy of dictionary encoding,
> > happy to engage. The link above also has some benchmark code I used.
> > Theoretically, we would send each topic name atleast once, so yes if you
> > compare a 10-20 character topic name + an integer to two integers, it will
> > be more bytes. But its constant overhead proportional to size of topic name
> > and with 4,8,12, partitions the size difference between baseline (version 4
> > where we just repeated topic names for each topic partition) and the two
> > approaches becomes narrow.
> >
> > >>Plus, Navinder is going to implement a bunch of protocol code that we
> > might just want to change when the discussion actually does take place, if
> > ever.
> > >>it'll just be a mental burden for everyone to remember that we want to
> > have this follow-up discussion.
> > 3. Is n't people changing same parts of code and tracking follow ups a
> > common thing, we need to deal with anyway?  For this KIP, is n't it enough
> > to reason about whether the additional map on top of the topic dictionary
> > would incur more overhead than the sending task_ids? I don't think it's
> > case, both of them send two integers. As I see it, we can do a separate
> > follow up to (re)pursue the task_id conversion and get it working for both
> > maps within the next release?
> >
> > >>Can you elaborate on "breaking up the API"? It looks like there are
> > already separate API calls in the proposal, one for time-lag, and another
> > for offset-lag, so are they not already broken up?
> > The current APIs (e.g lagInfoForStore) for lags return StoreLagInfo
> > objects which has both time and offset lags. If we had separate APIs, say
> > (e.g offsetLagForStore(), timeLagForStore()), we can implement offset
> > version using the offset lag that the streams instance already tracks i.e
> > no need for external calls. The time based lag API would incur the kafka
> > read for the timestamp. makes sense?
> >
> > Based on the discussions so far, I only see these two pending issues to be
> > aligned on. Is there any other open item people want to bring up?
> >
> > On Mon, Nov 4, 2019 at 11:24 AM Sophie Blee-Goldman <sop...@confluent.io>
> > wrote:
> >
> >> Regarding 3) I'm wondering, does your concern still apply even now
> >> that the pluggable PartitionGrouper interface has been deprecated?
> >> Now that we can be sure that the DefaultPartitionGrouper is used to
> >> generate
> >> the taskId -> partitions mapping, we should be able to convert any taskId
> >> to any
> >> partitions.
> >>
> >> On Mon, Nov 4, 2019 at 11:17 AM John Roesler <j...@confluent.io> wrote:
> >>
> >> > Hey Vinoth, thanks for the reply!
> >> >
> >> > 3.
> >> > I get that it's not the main focus of this KIP, but if it's ok, it
> >> > would be nice to hash out this point right now. It only came up
> >> > because this KIP-535 is substantially extending the pattern in
> >> > question. If we push it off until later, then the reviewers are going
> >> > to have to suspend their concerns not just while voting for the KIP,
> >> > but also while reviewing the code. Plus, Navinder is going to
> >> > implement a bunch of protocol code that we might just want to change
> >> > when the discussion actually does take place, if ever. Finally, it'll
> >> > just be a mental burden for everyone to remember that we want to have
> >> > this follow-up discussion.
> >> >
> >> > It makes sense what you say... the specific assignment is already
> >> > encoded in the "main" portion of the assignment, not in the "userdata"
> >> > part. It also makes sense that it's simpler to reason about races if
> >> > you simply get all the information about the topics and partitions
> >> > directly from the assignor, rather than get the partition number from
> >> > the assignor and the topic name from your own a priori knowledge of
> >> > the topology. On the other hand, I'm having some trouble wrapping my
> >> > head around what race conditions might occur, other than the
> >> > fundamentally broken state in which different instances are running
> >> > totally different topologies. Sorry, but can you remind us of the
> >> > specific condition?
> >> >
> >> > To the efficiency counterargument, it seems hard to imagine how
> >> > encoding arbitrarily long topic names plus an integer for the
> >> > partition number could be as efficient as task ids, which are just two
> >> > integers. It seems like this would only be true if topic names were 4
> >> > characters or less.
> >> >
> >> > 4.
> >> > Yeah, clearly, it would not be a good idea to query the metadata
> >> > before every single IQ query. I think there are plenty of established
> >> > patterns for distributed database clients to follow. Can you elaborate
> >> > on "breaking up the API"? It looks like there are already separate API
> >> > calls in the proposal, one for time-lag, and another for offset-lag,
> >> > so are they not already broken up? FWIW, yes, I agree, the offset lag
> >> > is already locally known, so we don't need to build in an extra
> >> > synchronous broker API call, just one for the time-lag.
> >> >
> >> > Thanks again for the discussion,
> >> > -John
> >> >
> >> > On Mon, Nov 4, 2019 at 11:17 AM Vinoth Chandar <vchan...@confluent.io>
> >> > wrote:
> >> > >
> >> > > 3. Right now, we still get the topic partitions assigned as a part of
> >> the
> >> > > top level Assignment object (the one that wraps AssignmentInfo) and
> >> use
> >> > > that to convert taskIds back. This list of only contains assignments
> >> for
> >> > > that particular instance. Attempting to also reverse map for "all" the
> >> > > tasksIds in the streams cluster i.e all the topic partitions in these
> >> > > global assignment maps was what was problematic. By explicitly sending
> >> > the
> >> > > global assignment maps as actual topic partitions,  group coordinator
> >> > (i.e
> >> > > the leader that computes the assignment's ) is able to consistently
> >> > enforce
> >> > > its view of the topic metadata. Still don't think doing such a change
> >> > that
> >> > > forces you to reconsider semantics, is not needed to save bits on
> >> wire.
> >> > May
> >> > > be we can discuss this separately from this KIP?
> >> > >
> >> > > 4. There needs to be some caching/interval somewhere though since we
> >> > don't
> >> > > want to make 1 kafka read per 1 IQ potentially. But I think its a
> >> valid
> >> > > suggestion, to make this call just synchronous and leave the caching
> >> or
> >> > how
> >> > > often you want to call to the application. Would it be good to then
> >> break
> >> > > up the APIs for time and offset based lag?  We can obtain offset based
> >> > lag
> >> > > for free? Only incur the overhead of reading kafka if we want time
> >> > > based lags?
> >> > >
> >> > > On Fri, Nov 1, 2019 at 2:49 PM Sophie Blee-Goldman <
> >> sop...@confluent.io>
> >> > > wrote:
> >> > >
> >> > > > Adding on to John's response to 3), can you clarify when and why
> >> > exactly we
> >> > > > cannot
> >> > > > convert between taskIds and partitions? If that's really the case I
> >> > don't
> >> > > > feel confident
> >> > > > that the StreamsPartitionAssignor is not full of bugs...
> >> > > >
> >> > > > It seems like it currently just encodes a list of all partitions
> >> (the
> >> > > > assignment) and also
> >> > > > a list of the corresponding task ids, duplicated to ensure each
> >> > partition
> >> > > > has the corresponding
> >> > > > taskId at the same offset into the list. Why is that problematic?
> >> > > >
> >> > > >
> >> > > > On Fri, Nov 1, 2019 at 12:39 PM John Roesler <j...@confluent.io>
> >> > wrote:
> >> > > >
> >> > > > > Thanks, all, for considering the points!
> >> > > > >
> >> > > > > 3. Interesting. I have a vague recollection of that... Still,
> >> though,
> >> > > > > it seems a little fishy. After all, we return the assignments
> >> > > > > themselves as task ids, and the members have to map these to topic
> >> > > > > partitions in order to configure themselves properly. If it's too
> >> > > > > complicated to get this right, then how do we know that Streams is
> >> > > > > computing the correct partitions at all?
> >> > > > >
> >> > > > > 4. How about just checking the log-end timestamp when you call the
> >> > > > > method? Then, when you get an answer, it's as fresh as it could
> >> > > > > possibly be. And as a user you have just one, obvious, "knob" to
> >> > > > > configure how much overhead you want to devote to checking... If
> >> you
> >> > > > > want to call the broker API less frequently, you just call the
> >> > Streams
> >> > > > > API less frequently. And you don't have to worry about the
> >> > > > > relationship between your invocations of that method and the
> >> config
> >> > > > > setting (e.g., you'll never get a negative number, which you
> >> could if
> >> > > > > you check the log-end timestamp less frequently than you check the
> >> > > > > lag).
> >> > > > >
> >> > > > > Thanks,
> >> > > > > -John
> >> > > > >
> >> > > > > On Thu, Oct 31, 2019 at 11:52 PM Navinder Brar
> >> > > > > <navinder_b...@yahoo.com.invalid> wrote:
> >> > > > > >
> >> > > > > > Thanks John for going through this.
> >> > > > > >
> >> > > > > >    - +1, makes sense
> >> > > > > >    - +1, no issues there
> >> > > > > >    - Yeah the initial patch I had submitted for K-7149(
> >> > > > > https://github.com/apache/kafka/pull/6935) to reduce
> >> assignmentInfo
> >> > > > > object had taskIds but the merged PR had similar size according to
> >> > Vinoth
> >> > > > > and it was simpler so if the end result is of same size, it would
> >> not
> >> > > > make
> >> > > > > sense to pivot from dictionary and again move to taskIDs.
> >> > > > > >    - Not sure about what a good default would be if we don't
> >> have a
> >> > > > > configurable setting. This gives the users the flexibility to the
> >> > users
> >> > > > to
> >> > > > > serve their requirements as at the end of the day it would take
> >> CPU
> >> > > > cycles.
> >> > > > > I am ok with starting it with a default and see how it goes based
> >> > upon
> >> > > > > feedback.
> >> > > > > >
> >> > > > > > Thanks,
> >> > > > > > Navinder
> >> > > > > >     On Friday, 1 November, 2019, 03:46:42 am IST, Vinoth
> >> Chandar <
> >> > > > > vchan...@confluent.io> wrote:
> >> > > > > >
> >> > > > > >  1. Was trying to spell them out separately. but makes sense for
> >> > > > > > readability. done
> >> > > > > >
> >> > > > > > 2. No I immediately agree :) .. makes sense. @navinder?
> >> > > > > >
> >> > > > > > 3. I actually attempted only sending taskIds while working on
> >> > > > KAFKA-7149.
> >> > > > > > Its non-trivial to handle edges cases resulting from newly added
> >> > topic
> >> > > > > > partitions and wildcarded topic entries. I ended up simplifying
> >> it
> >> > to
> >> > > > > just
> >> > > > > > dictionary encoding the topic names to reduce size. We can apply
> >> > the
> >> > > > same
> >> > > > > > technique here for this map. Additionally, we could also
> >> dictionary
> >> > > > > encode
> >> > > > > > HostInfo, given its now repeated twice. I think this would save
> >> > more
> >> > > > > space
> >> > > > > > than having a flag per topic partition entry. Lmk if you are
> >> okay
> >> > with
> >> > > > > > this.
> >> > > > > >
> >> > > > > > 4. This opens up a good discussion. Given we support time lag
> >> > estimates
> >> > > > > > also, we need to read the tail record of the changelog
> >> periodically
> >> > > > > (unlike
> >> > > > > > offset lag, which we can potentially piggyback on metadata in
> >> > > > > > ConsumerRecord IIUC). we thought we should have a config that
> >> > control
> >> > > > how
> >> > > > > > often this read happens? Let me know if there is a simple way to
> >> > get
> >> > > > > > timestamp value of the tail record that we are missing.
> >> > > > > >
> >> > > > > > On Thu, Oct 31, 2019 at 12:58 PM John Roesler <
> >> j...@confluent.io>
> >> > > > wrote:
> >> > > > > >
> >> > > > > > > Hey Navinder,
> >> > > > > > >
> >> > > > > > > Thanks for updating the KIP, it's a lot easier to see the
> >> current
> >> > > > > > > state of the proposal now.
> >> > > > > > >
> >> > > > > > > A few remarks:
> >> > > > > > > 1. I'm sure it was just an artifact of revisions, but you have
> >> > two
> >> > > > > > > separate sections where you list additions to the KafkaStreams
> >> > > > > > > interface. Can you consolidate those so we can see all the
> >> > additions
> >> > > > > > > at once?
> >> > > > > > >
> >> > > > > > > 2. For messageLagEstimate, can I suggest "offsetLagEstimate"
> >> > instead,
> >> > > > > > > to be clearer that we're specifically measuring a number of
> >> > offsets?
> >> > > > > > > If you don't immediately agree, then I'd at least point out
> >> that
> >> > we
> >> > > > > > > usually refer to elements of Kafka topics as "records", not
> >> > > > > > > "messages", so "recordLagEstimate" might be more appropriate.
> >> > > > > > >
> >> > > > > > > 3. The proposal mentions adding a map of the standby
> >> > _partitions_ for
> >> > > > > > > each host to AssignmentInfo. I assume this is designed to
> >> mirror
> >> > the
> >> > > > > > > existing "partitionsByHost" map. To keep the size of these
> >> > metadata
> >> > > > > > > messages down, maybe we can consider making two changes:
> >> > > > > > > (a) for both actives and standbys, encode the _task ids_
> >> instead
> >> > of
> >> > > > > > > _partitions_. Every member of the cluster has a copy of the
> >> > topology,
> >> > > > > > > so they can convert task ids into specific partitions on their
> >> > own,
> >> > > > > > > and task ids are only (usually) three characters.
> >> > > > > > > (b) instead of encoding two maps (hostinfo -> actives AND
> >> > hostinfo ->
> >> > > > > > > standbys), which requires serializing all the hostinfos twice,
> >> > maybe
> >> > > > > > > we can pack them together in one map with a structured value
> >> > > > (hostinfo
> >> > > > > > > -> [actives,standbys]).
> >> > > > > > > Both of these ideas still require bumping the protocol version
> >> > to 6,
> >> > > > > > > and they basically mean we drop the existing
> >> `PartitionsByHost`
> >> > field
> >> > > > > > > and add a new `TasksByHost` field with the structured value I
> >> > > > > > > mentioned.
> >> > > > > > >
> >> > > > > > > 4. Can we avoid adding the new "lag refresh" config? The lags
> >> > would
> >> > > > > > > necessarily be approximate anyway, so adding the config seems
> >> to
> >> > > > > > > increase the operational complexity of the system for little
> >> > actual
> >> > > > > > > benefit.
> >> > > > > > >
> >> > > > > > > Thanks for the pseudocode, by the way, it really helps
> >> visualize
> >> > how
> >> > > > > > > these new interfaces would play together. And thanks again for
> >> > the
> >> > > > > > > update!
> >> > > > > > > -John
> >> > > > > > >
> >> > > > > > > On Thu, Oct 31, 2019 at 2:41 PM John Roesler <
> >> j...@confluent.io>
> >> > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > Hey Vinoth,
> >> > > > > > > >
> >> > > > > > > > I started going over the KIP again yesterday. There are a
> >> lot
> >> > of
> >> > > > > > > > updates, and I didn't finish my feedback in one day. I'm
> >> > working on
> >> > > > > it
> >> > > > > > > > now.
> >> > > > > > > >
> >> > > > > > > > Thanks,
> >> > > > > > > > John
> >> > > > > > > >
> >> > > > > > > > On Thu, Oct 31, 2019 at 11:42 AM Vinoth Chandar <
> >> > > > > vchan...@confluent.io>
> >> > > > > > > wrote:
> >> > > > > > > > >
> >> > > > > > > > > Wondering if anyone has thoughts on these changes? I liked
> >> > that
> >> > > > > the new
> >> > > > > > > > > metadata fetch APIs provide all the information at once
> >> with
> >> > > > > consistent
> >> > > > > > > > > naming..
> >> > > > > > > > >
> >> > > > > > > > > Any guidance on what you would like to be discussed or
> >> > fleshed
> >> > > > out
> >> > > > > more
> >> > > > > > > > > before we call a VOTE?
> >> > > > > > > > >
> >> > > > > > > > > On Wed, Oct 30, 2019 at 10:31 AM Navinder Brar
> >> > > > > > > > > <navinder_b...@yahoo.com.invalid> wrote:
> >> > > > > > > > >
> >> > > > > > > > > > Hi,
> >> > > > > > > > > > We have made some edits in the KIP(
> >> > > > > > > > > >
> >> > > > > > >
> >> > > > >
> >> > > >
> >> >
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> >> > > > > > > )
> >> > > > > > > > > > after due deliberation on the agreed design to support
> >> the
> >> > new
> >> > > > > query
> >> > > > > > > > > > design. This includes the new public API to query
> >> > offset/time
> >> > > > lag
> >> > > > > > > > > > information and other details related to querying
> >> standby
> >> > tasks
> >> > > > > > > which have
> >> > > > > > > > > > come up after thinking of thorough details.
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > >    - Addition of new config, “lag.fetch.interval.ms” to
> >> > > > > configure
> >> > > > > > > the
> >> > > > > > > > > > interval of time/offset lag
> >> > > > > > > > > >    - Addition of new class StoreLagInfo to store the
> >> > > > periodically
> >> > > > > > > obtained
> >> > > > > > > > > > time/offset lag
> >> > > > > > > > > >    - Addition of two new functions in KafkaStreams,
> >> > > > > > > List<StoreLagInfo>
> >> > > > > > > > > > allLagInfo() and List<StoreLagInfo>
> >> lagInfoForStore(String
> >> > > > > > > storeName) to
> >> > > > > > > > > > return the lag information for an instance and a store
> >> > > > > respectively
> >> > > > > > > > > >    - Addition of new class KeyQueryMetadata. We need
> >> > > > > topicPartition
> >> > > > > > > for
> >> > > > > > > > > > each key to be matched with the lag API for the topic
> >> > > > partition.
> >> > > > > One
> >> > > > > > > way is
> >> > > > > > > > > > to add new functions and fetch topicPartition from
> >> > > > > > > StreamsMetadataState but
> >> > > > > > > > > > we thought having one call and fetching StreamsMetadata
> >> and
> >> > > > > > > topicPartition
> >> > > > > > > > > > is more cleaner.
> >> > > > > > > > > >    -
> >> > > > > > > > > > Renaming partitionsForHost to activePartitionsForHost in
> >> > > > > > > StreamsMetadataState
> >> > > > > > > > > > and partitionsByHostState to activePartitionsByHostState
> >> > > > > > > > > > in StreamsPartitionAssignor
> >> > > > > > > > > >    - We have also added the pseudo code of how all the
> >> > changes
> >> > > > > will
> >> > > > > > > exist
> >> > > > > > > > > > together and support the new querying APIs
> >> > > > > > > > > >
> >> > > > > > > > > > Please let me know if anything is pending now, before a
> >> > vote
> >> > > > can
> >> > > > > be
> >> > > > > > > > > > started on this.  On Saturday, 26 October, 2019,
> >> 05:41:44
> >> > pm
> >> > > > IST,
> >> > > > > > > Navinder
> >> > > > > > > > > > Brar <navinder_b...@yahoo.com.invalid> wrote:
> >> > > > > > > > > >
> >> > > > > > > > > >  >> Since there are two soft votes for separate
> >> > active/standby
> >> > > > > API
> >> > > > > > > > > > methods, I also change my position on that. Fine with 2
> >> > > > separate
> >> > > > > > > > > > methods. Once we remove the lag information from these
> >> > APIs,
> >> > > > > > > returning a
> >> > > > > > > > > > List is less attractive, since the ordering has no
> >> special
> >> > > > > meaning
> >> > > > > > > now.
> >> > > > > > > > > > Agreed, now that we are not returning lag, I am also
> >> sold
> >> > on
> >> > > > > having
> >> > > > > > > two
> >> > > > > > > > > > separate functions. We already have one which returns
> >> > > > > > > streamsMetadata for
> >> > > > > > > > > > active tasks, and now we can add another one for
> >> standbys.
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > >    On Saturday, 26 October, 2019, 03:55:16 am IST,
> >> Vinoth
> >> > > > > Chandar <
> >> > > > > > > > > > vchan...@confluent.io> wrote:
> >> > > > > > > > > >
> >> > > > > > > > > >  +1 to Sophie's suggestion. Having both lag in terms of
> >> > time
> >> > > > and
> >> > > > > > > offsets is
> >> > > > > > > > > > good and makes for a more complete API.
> >> > > > > > > > > >
> >> > > > > > > > > > Since there are two soft votes for separate
> >> active/standby
> >> > API
> >> > > > > > > methods, I
> >> > > > > > > > > > also change my position on that. Fine with 2 separate
> >> > methods.
> >> > > > > > > > > > Once we remove the lag information from these APIs,
> >> > returning a
> >> > > > > List
> >> > > > > > > is
> >> > > > > > > > > > less attractive, since the ordering has no special
> >> meaning
> >> > now.
> >> > > > > > > > > >
> >> > > > > > > > > > >> lag in offsets vs time: Having both, as suggested by
> >> > Sophie
> >> > > > > would
> >> > > > > > > of
> >> > > > > > > > > > course be best. What is a little unclear to me is, how
> >> in
> >> > > > details
> >> > > > > > > are we
> >> > > > > > > > > > going to compute both?
> >> > > > > > > > > > @navinder may be next step is to flesh out these details
> >> > and
> >> > > > > surface
> >> > > > > > > any
> >> > > > > > > > > > larger changes we need to make if need be.
> >> > > > > > > > > >
> >> > > > > > > > > > Any other details we need to cover, before a VOTE can be
> >> > called
> >> > > > > on
> >> > > > > > > this?
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > On Fri, Oct 25, 2019 at 1:51 PM Bill Bejeck <
> >> > bbej...@gmail.com
> >> > > > >
> >> > > > > > > wrote:
> >> > > > > > > > > >
> >> > > > > > > > > > > I am jumping in a little late here.
> >> > > > > > > > > > >
> >> > > > > > > > > > > Overall I agree with the proposal to push decision
> >> > making on
> >> > > > > > > what/how to
> >> > > > > > > > > > > query in the query layer.
> >> > > > > > > > > > >
> >> > > > > > > > > > > For point 5 from above, I'm slightly in favor of
> >> having
> >> > a new
> >> > > > > > > method,
> >> > > > > > > > > > > "standbyMetadataForKey()" or something similar.
> >> > > > > > > > > > > Because even if we return all tasks in one list, the
> >> user
> >> > > > will
> >> > > > > > > still have
> >> > > > > > > > > > > to perform some filtering to separate the different
> >> > tasks,
> >> > > > so I
> >> > > > > > > don't
> >> > > > > > > > > > feel
> >> > > > > > > > > > > making two calls is a burden, and IMHO makes things
> >> more
> >> > > > > > > transparent for
> >> > > > > > > > > > > the user.
> >> > > > > > > > > > > If the final vote is for using an "isActive" field,
> >> I'm
> >> > good
> >> > > > > with
> >> > > > > > > that as
> >> > > > > > > > > > > well.
> >> > > > > > > > > > >
> >> > > > > > > > > > > Just my 2 cents.
> >> > > > > > > > > > >
> >> > > > > > > > > > > On Fri, Oct 25, 2019 at 5:09 AM Navinder Brar
> >> > > > > > > > > > > <navinder_b...@yahoo.com.invalid> wrote:
> >> > > > > > > > > > >
> >> > > > > > > > > > > > I think now we are aligned on almost all the design
> >> > parts.
> >> > > > > > > Summarising
> >> > > > > > > > > > > > below what has been discussed above and we have a
> >> > general
> >> > > > > > > consensus on.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >    - Rather than broadcasting lag across all nodes
> >> at
> >> > > > > > > rebalancing/with
> >> > > > > > > > > > > the
> >> > > > > > > > > > > > heartbeat, we will just return a list of all
> >> available
> >> > > > > standby’s
> >> > > > > > > in the
> >> > > > > > > > > > > > system and the user can make IQ query any of those
> >> > nodes
> >> > > > > which
> >> > > > > > > will
> >> > > > > > > > > > > return
> >> > > > > > > > > > > > the response, and the lag and offset time. Based on
> >> > which
> >> > > > > user
> >> > > > > > > can
> >> > > > > > > > > > decide
> >> > > > > > > > > > > > if he wants to return the response back or call
> >> another
> >> > > > > standby.
> >> > > > > > > > > > > >    -  The current metadata query frequency will not
> >> > change.
> >> > > > > It
> >> > > > > > > will be
> >> > > > > > > > > > > the
> >> > > > > > > > > > > > same as it does now, i.e. before each query.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >    -  For fetching list<StreamsMetadata> in
> >> > > > > > > StreamsMetadataState.java
> >> > > > > > > > > > and
> >> > > > > > > > > > > > List<QueryableStoreProvider> in
> >> > > > > > > StreamThreadStateStoreProvider.java
> >> > > > > > > > > > > (which
> >> > > > > > > > > > > > will return all active stores which are
> >> > running/restoring
> >> > > > and
> >> > > > > > > replica
> >> > > > > > > > > > > > stores which are running), we will add new functions
> >> > and
> >> > > > not
> >> > > > > > > disturb
> >> > > > > > > > > > the
> >> > > > > > > > > > > > existing functions
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >    - There is no need to add new StreamsConfig for
> >> > > > > implementing
> >> > > > > > > this
> >> > > > > > > > > > KIP
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >    - We will add standbyPartitionsByHost in
> >> > AssignmentInfo
> >> > > > > and
> >> > > > > > > > > > > > StreamsMetadataState which would change the existing
> >> > > > > > > rebuildMetadata()
> >> > > > > > > > > > > and
> >> > > > > > > > > > > > setPartitionsByHostState()
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > If anyone has any more concerns please feel free to
> >> > add.
> >> > > > Post
> >> > > > > > > this I
> >> > > > > > > > > > will
> >> > > > > > > > > > > > be initiating a vote.
> >> > > > > > > > > > > > ~Navinder
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >    On Friday, 25 October, 2019, 12:05:29 pm IST,
> >> > Matthias
> >> > > > J.
> >> > > > > Sax
> >> > > > > > > <
> >> > > > > > > > > > > > matth...@confluent.io> wrote:
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >  Just to close the loop @Vinoth:
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > > 1. IIUC John intends to add (or we can do this in
> >> > this
> >> > > > > KIP) lag
> >> > > > > > > > > > > > information
> >> > > > > > > > > > > > > to AssignmentInfo, which gets sent to every
> >> > participant.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > As explained by John, currently KIP-441 plans to
> >> only
> >> > > > report
> >> > > > > the
> >> > > > > > > > > > > > information to the leader. But I guess, with the new
> >> > > > > proposal to
> >> > > > > > > not
> >> > > > > > > > > > > > broadcast this information anyway, this concern is
> >> > > > > invalidated
> >> > > > > > > anyway
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > > 2. At-least I was under the assumption that it
> >> can be
> >> > > > > called
> >> > > > > > > per
> >> > > > > > > > > > query,
> >> > > > > > > > > > > > > since the API docs don't seem to suggest
> >> otherwise.
> >> > Do
> >> > > > you
> >> > > > > see
> >> > > > > > > any
> >> > > > > > > > > > > > > potential issues if we call this every query? (we
> >> > should
> >> > > > > > > benchmark
> >> > > > > > > > > > this
> >> > > > > > > > > > > > > nonetheless)
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > I did not see a real issue if people refresh the
> >> > metadata
> >> > > > > > > frequently,
> >> > > > > > > > > > > > because it would be a local call. My main point was,
> >> > that
> >> > > > > this
> >> > > > > > > would
> >> > > > > > > > > > > > change the current usage pattern of the API, and we
> >> > would
> >> > > > > > > clearly need
> >> > > > > > > > > > > > to communicate this change. Similar to (1), this
> >> > concern in
> >> > > > > > > invalidated
> >> > > > > > > > > > > > anyway.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > @John: I think it's a great idea to get rid of
> >> > reporting
> >> > > > > lag, and
> >> > > > > > > > > > > > pushing the decision making process about "what to
> >> > query"
> >> > > > > into
> >> > > > > > > the
> >> > > > > > > > > > query
> >> > > > > > > > > > > > serving layer itself. This simplifies the overall
> >> > design of
> >> > > > > this
> >> > > > > > > KIP
> >> > > > > > > > > > > > significantly, and actually aligns very well with
> >> the
> >> > idea
> >> > > > > that
> >> > > > > > > Kafka
> >> > > > > > > > > > > > Streams (as it is a library) should only provide the
> >> > basic
> >> > > > > > > building
> >> > > > > > > > > > > > block. Many of my raised questions are invalided by
> >> > this.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > Some questions are still open though:
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > > 10) Do we need to distinguish between
> >> > active(restoring)
> >> > > > and
> >> > > > > > > standby
> >> > > > > > > > > > > > > tasks? Or could be treat both as the same?
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > @Vinoth: about (5). I see your point about multiple
> >> > calls
> >> > > > vs
> >> > > > > a
> >> > > > > > > single
> >> > > > > > > > > > > > call. I still slightly prefer multiple calls, but
> >> it's
> >> > > > highly
> >> > > > > > > > > > subjective
> >> > > > > > > > > > > > and I would also be fine to add an #isActive()
> >> method.
> >> > > > Would
> >> > > > > be
> >> > > > > > > good
> >> > > > > > > > > > the
> >> > > > > > > > > > > > get feedback from others.
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > For (14), ie, lag in offsets vs time: Having both,
> >> as
> >> > > > > suggested
> >> > > > > > > by
> >> > > > > > > > > > > > Sophie would of course be best. What is a little
> >> > unclear to
> >> > > > > me
> >> > > > > > > is, how
> >> > > > > > > > > > > > in details are we going to compute both?
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > -Matthias
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > On 10/24/19 11:07 PM, Sophie Blee-Goldman wrote:
> >> > > > > > > > > > > > > Just to chime in on the "report lag vs timestamp
> >> > > > > difference"
> >> > > > > > > issue, I
> >> > > > > > > > > > > > would
> >> > > > > > > > > > > > > actually advocate for both. As mentioned already,
> >> > time
> >> > > > > > > difference is
> >> > > > > > > > > > > > > probably a lot easier and/or more useful to reason
> >> > about
> >> > > > in
> >> > > > > > > terms of
> >> > > > > > > > > > > > > "freshness"
> >> > > > > > > > > > > > > of the state. But in the case when all queried
> >> > stores are
> >> > > > > far
> >> > > > > > > behind,
> >> > > > > > > > > > > lag
> >> > > > > > > > > > > > > could
> >> > > > > > > > > > > > > be used to estimate the recovery velocity. You can
> >> > then
> >> > > > > get a
> >> > > > > > > (pretty
> >> > > > > > > > > > > > rough)
> >> > > > > > > > > > > > > idea of when a store might be ready, and wait
> >> until
> >> > > > around
> >> > > > > > > then to
> >> > > > > > > > > > > query
> >> > > > > > > > > > > > > again.
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > > On Thu, Oct 24, 2019 at 9:53 PM Guozhang Wang <
> >> > > > > > > wangg...@gmail.com>
> >> > > > > > > > > > > > wrote:
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > > >> I think I agree with John's recent reasoning as
> >> > well:
> >> > > > > instead
> >> > > > > > > of
> >> > > > > > > > > > > letting
> >> > > > > > > > > > > > >> the storeMetadataAPI to return the staleness
> >> > > > information,
> >> > > > > > > letting
> >> > > > > > > > > > the
> >> > > > > > > > > > > > >> client to query either active or standby and
> >> letting
> >> > > > > standby
> >> > > > > > > query
> >> > > > > > > > > > > > response
> >> > > > > > > > > > > > >> to include both the values + timestamp (or lag,
> >> as
> >> > in
> >> > > > > diffs of
> >> > > > > > > > > > > > timestamps)
> >> > > > > > > > > > > > >> would actually be more intuitive -- not only the
> >> > streams
> >> > > > > > > client is
> >> > > > > > > > > > > > simpler,
> >> > > > > > > > > > > > >> from user's perspective they also do not need to
> >> > > > > periodically
> >> > > > > > > > > > refresh
> >> > > > > > > > > > > > their
> >> > > > > > > > > > > > >> staleness information from the client, but only
> >> > need to
> >> > > > > make
> >> > > > > > > > > > decisions
> >> > > > > > > > > > > > on
> >> > > > > > > > > > > > >> the fly whenever they need to query.
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> Again the standby replica then need to know the
> >> > current
> >> > > > > active
> >> > > > > > > > > > task's
> >> > > > > > > > > > > > >> timestamp, which can be found from the log end
> >> > record's
> >> > > > > > > encoded
> >> > > > > > > > > > > > timestamp;
> >> > > > > > > > > > > > >> today we standby tasks do not read that specific
> >> > record,
> >> > > > > but
> >> > > > > > > only
> >> > > > > > > > > > > > refresh
> >> > > > > > > > > > > > >> its knowledge on the log end offset, but I think
> >> > > > > refreshing
> >> > > > > > > the
> >> > > > > > > > > > latest
> >> > > > > > > > > > > > >> record timestamp is not a very bad request to add
> >> > on the
> >> > > > > > > standby
> >> > > > > > > > > > > > replicas.
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> Guozhang
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> On Thu, Oct 24, 2019 at 5:43 PM Vinoth Chandar <
> >> > > > > > > > > > vchan...@confluent.io
> >> > > > > > > > > > > >
> >> > > > > > > > > > > > >> wrote:
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >>> +1 As someone implementing a query routing
> >> layer,
> >> > there
> >> > > > > is
> >> > > > > > > already
> >> > > > > > > > > > a
> >> > > > > > > > > > > > need
> >> > > > > > > > > > > > >>> to have mechanisms in place to do
> >> > healthchecks/failure
> >> > > > > > > detection to
> >> > > > > > > > > > > > >> detect
> >> > > > > > > > > > > > >>> failures for queries, while Streams rebalancing
> >> > > > > eventually
> >> > > > > > > kicks in
> >> > > > > > > > > > > the
> >> > > > > > > > > > > > >>> background.
> >> > > > > > > > > > > > >>> So, pushing this complexity to the IQ client app
> >> > keeps
> >> > > > > > > Streams
> >> > > > > > > > > > > simpler
> >> > > > > > > > > > > > as
> >> > > > > > > > > > > > >>> well. IQs will be potentially issues at an
> >> order of
> >> > > > > > > magnitude more
> >> > > > > > > > > > > > >>> frequently and it can achieve good freshness for
> >> > the
> >> > > > lag
> >> > > > > > > > > > information.
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>> I would like to add however, that we would also
> >> > need to
> >> > > > > > > introduce
> >> > > > > > > > > > > apis
> >> > > > > > > > > > > > in
> >> > > > > > > > > > > > >>> KafkaStreams class, for obtaining lag
> >> information
> >> > for
> >> > > > all
> >> > > > > > > stores
> >> > > > > > > > > > > local
> >> > > > > > > > > > > > to
> >> > > > > > > > > > > > >>> that host. This is for the IQs to relay back
> >> with
> >> > the
> >> > > > > > > response/its
> >> > > > > > > > > > > own
> >> > > > > > > > > > > > >>> heartbeat mechanism.
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>> On Thu, Oct 24, 2019 at 3:12 PM John Roesler <
> >> > > > > > > j...@confluent.io>
> >> > > > > > > > > > > > wrote:
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>>> Hi all,
> >> > > > > > > > > > > > >>>>
> >> > > > > > > > > > > > >>>> I've been mulling about this KIP, and I think I
> >> > was on
> >> > > > > the
> >> > > > > > > wrong
> >> > > > > > > > > > > track
> >> > > > > > > > > > > > >>>> earlier with regard to task lags. Tl;dr: I
> >> don't
> >> > think
> >> > > > > we
> >> > > > > > > should
> >> > > > > > > > > > add
> >> > > > > > > > > > > > >>>> lags at all to the metadata API (and also not
> >> to
> >> > the
> >> > > > > > > > > > AssignmentInfo
> >> > > > > > > > > > > > >>>> protocol message).
> >> > > > > > > > > > > > >>>>
> >> > > > > > > > > > > > >>>> Like I mentioned early on, reporting lag via
> >> > > > > > > > > > > > >>>> SubscriptionInfo/AssignmentInfo would only work
> >> > while
> >> > > > > > > rebalances
> >> > > > > > > > > > are
> >> > > > > > > > > > > > >>>> happening. Once the group stabilizes, no
> >> members
> >> > would
> >> > > > > be
> >> > > > > > > notified
> >> > > > > > > > > > > of
> >> > > > > > > > > > > > >>>> each others' lags anymore. I had been thinking
> >> > that
> >> > > > the
> >> > > > > > > solution
> >> > > > > > > > > > > would
> >> > > > > > > > > > > > >>>> be the heartbeat proposal I mentioned earlier,
> >> but
> >> > > > that
> >> > > > > > > proposal
> >> > > > > > > > > > > would
> >> > > > > > > > > > > > >>>> have reported the heartbeats of the members
> >> only
> >> > to
> >> > > > the
> >> > > > > > > leader
> >> > > > > > > > > > > member
> >> > > > > > > > > > > > >>>> (the one who makes assignments). To be useful
> >> in
> >> > the
> >> > > > > > > context of
> >> > > > > > > > > > > _this_
> >> > > > > > > > > > > > >>>> KIP, we would also have to report the lags in
> >> the
> >> > > > > heartbeat
> >> > > > > > > > > > > responses
> >> > > > > > > > > > > > >>>> to of _all_ members. This is a concern to be
> >> > because
> >> > > > now
> >> > > > > > > _all_ the
> >> > > > > > > > > > > > >>>> lags get reported to _all_ the members on
> >> _every_
> >> > > > > > > heartbeat... a
> >> > > > > > > > > > lot
> >> > > > > > > > > > > > >>>> of chatter.
> >> > > > > > > > > > > > >>>>
> >> > > > > > > > > > > > >>>> Plus, the proposal for KIP-441 is only to
> >> report
> >> > the
> >> > > > > lags
> >> > > > > > > of each
> >> > > > > > > > > > > > >>>> _task_. This is the sum of the lags of all the
> >> > stores
> >> > > > > in the
> >> > > > > > > > > > tasks.
> >> > > > > > > > > > > > >>>> But this would be insufficient for KIP-535. For
> >> > this
> >> > > > > kip,
> >> > > > > > > we would
> >> > > > > > > > > > > > >>>> want the lag specifically of the store we want
> >> to
> >> > > > > query. So
> >> > > > > > > this
> >> > > > > > > > > > > > >>>> means, we have to report the lags of all the
> >> > stores of
> >> > > > > all
> >> > > > > > > the
> >> > > > > > > > > > > members
> >> > > > > > > > > > > > >>>> to every member... even more chatter!
> >> > > > > > > > > > > > >>>>
> >> > > > > > > > > > > > >>>> The final nail in the coffin to me is that IQ
> >> > clients
> >> > > > > would
> >> > > > > > > have
> >> > > > > > > > > > to
> >> > > > > > > > > > > > >>>> start refreshing their metadata quite
> >> frequently
> >> > to
> >> > > > > stay up
> >> > > > > > > to
> >> > > > > > > > > > date
> >> > > > > > > > > > > on
> >> > > > > > > > > > > > >>>> the lags, which adds even more overhead to the
> >> > system.
> >> > > > > > > > > > > > >>>>
> >> > > > > > > > > > > > >>>> Consider a strawman alternative: we bring
> >> KIP-535
> >> > back
> >> > > > > to
> >> > > > > > > > > > extending
> >> > > > > > > > > > > > >>>> the metadata API to tell the client the active
> >> and
> >> > > > > standby
> >> > > > > > > > > > replicas
> >> > > > > > > > > > > > >>>> for the key in question (not including and
> >> > > > > "staleness/lag"
> >> > > > > > > > > > > > >>>> restriction, just returning all the replicas).
> >> > Then,
> >> > > > the
> >> > > > > > > client
> >> > > > > > > > > > > picks
> >> > > > > > > > > > > > >>>> a replica and sends the query. The server
> >> returns
> >> > the
> >> > > > > > > current lag
> >> > > > > > > > > > > > >>>> along with the response (maybe in an HTML
> >> header
> >> > or
> >> > > > > > > something).
> >> > > > > > > > > > > Then,
> >> > > > > > > > > > > > >>>> the client keeps a map of its last observed
> >> lags
> >> > for
> >> > > > > each
> >> > > > > > > replica,
> >> > > > > > > > > > > and
> >> > > > > > > > > > > > >>>> uses this information to prefer fresher
> >> replicas.
> >> > > > > > > > > > > > >>>>
> >> > > > > > > > > > > > >>>> OR, if it wants only to query the active
> >> replica,
> >> > it
> >> > > > > would
> >> > > > > > > throw
> >> > > > > > > > > > an
> >> > > > > > > > > > > > >>>> error on any lag response greater than zero,
> >> > refreshes
> >> > > > > its
> >> > > > > > > > > > metadata
> >> > > > > > > > > > > by
> >> > > > > > > > > > > > >>>> re-querying the metadata API, and tries again
> >> > with the
> >> > > > > > > current
> >> > > > > > > > > > > active
> >> > > > > > > > > > > > >>>> replica.
> >> > > > > > > > > > > > >>>>
> >> > > > > > > > > > > > >>>> This way, the lag information will be super
> >> fresh
> >> > for
> >> > > > > the
> >> > > > > > > client,
> >> > > > > > > > > > > and
> >> > > > > > > > > > > > >>>> we keep the Metadata API /
> >> > Assignment,Subscription /
> >> > > > and
> >> > > > > > > Heartbeat
> >> > > > > > > > > > > as
> >> > > > > > > > > > > > >>>> slim as possible.
> >> > > > > > > > > > > > >>>>
> >> > > > > > > > > > > > >>>> Side note: I do think that some time soon,
> >> we'll
> >> > have
> >> > > > to
> >> > > > > > > add a
> >> > > > > > > > > > > library
> >> > > > > > > > > > > > >>>> for IQ server/clients. I think that this logic
> >> > will
> >> > > > > start
> >> > > > > > > to get
> >> > > > > > > > > > > > >>>> pretty complex.
> >> > > > > > > > > > > > >>>>
> >> > > > > > > > > > > > >>>> I hope this thinking is reasonably clear!
> >> > > > > > > > > > > > >>>> Thanks again,
> >> > > > > > > > > > > > >>>> -John
> >> > > > > > > > > > > > >>>>
> >> > > > > > > > > > > > >>>> Does that
> >> > > > > > > > > > > > >>>>
> >> > > > > > > > > > > > >>>> On Wed, Oct 23, 2019 at 10:16 AM Vinoth
> >> Chandar <
> >> > > > > > > > > > > > vchan...@confluent.io
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>>> wrote:
> >> > > > > > > > > > > > >>>>>
> >> > > > > > > > > > > > >>>>> Responding to the points raised by Matthias
> >> > > > > > > > > > > > >>>>>
> >> > > > > > > > > > > > >>>>> 1. IIUC John intends to add (or we can do
> >> this in
> >> > > > this
> >> > > > > > > KIP) lag
> >> > > > > > > > > > > > >>>> information
> >> > > > > > > > > > > > >>>>> to AssignmentInfo, which gets sent to every
> >> > > > > participant.
> >> > > > > > > > > > > > >>>>>
> >> > > > > > > > > > > > >>>>> 2. At-least I was under the assumption that it
> >> > can be
> >> > > > > > > called per
> >> > > > > > > > > > > > >> query,
> >> > > > > > > > > > > > >>>>> since the API docs don't seem to suggest
> >> > otherwise.
> >> > > > Do
> >> > > > > you
> >> > > > > > > see
> >> > > > > > > > > > any
> >> > > > > > > > > > > > >>>>> potential issues if we call this every query?
> >> (we
> >> > > > > should
> >> > > > > > > > > > benchmark
> >> > > > > > > > > > > > >> this
> >> > > > > > > > > > > > >>>>> nonetheless)
> >> > > > > > > > > > > > >>>>>
> >> > > > > > > > > > > > >>>>> 4. Agree. metadataForKey() implicitly would
> >> > return
> >> > > > the
> >> > > > > > > active
> >> > > > > > > > > > host
> >> > > > > > > > > > > > >>>> metadata
> >> > > > > > > > > > > > >>>>> (as it was before). We should also document
> >> this
> >> > in
> >> > > > > that
> >> > > > > > > APIs
> >> > > > > > > > > > > > >> javadoc,
> >> > > > > > > > > > > > >>>>> given we have another method(s) that returns
> >> more
> >> > > > host
> >> > > > > > > metadata
> >> > > > > > > > > > > now.
> >> > > > > > > > > > > > >>>>>
> >> > > > > > > > > > > > >>>>> 5.  While I see the point, the app/caller has
> >> to
> >> > make
> >> > > > > two
> >> > > > > > > > > > different
> >> > > > > > > > > > > > >>> APIs
> >> > > > > > > > > > > > >>>>> calls to obtain active/standby and potentially
> >> > do the
> >> > > > > same
> >> > > > > > > set of
> >> > > > > > > > > > > > >>>> operation
> >> > > > > > > > > > > > >>>>> to query the state. I personally still like a
> >> > method
> >> > > > > like
> >> > > > > > > > > > > isActive()
> >> > > > > > > > > > > > >>>>> better, but don't have strong opinions.
> >> > > > > > > > > > > > >>>>>
> >> > > > > > > > > > > > >>>>> 9. If we do expose the lag information, could
> >> we
> >> > just
> >> > > > > > > leave it
> >> > > > > > > > > > upto
> >> > > > > > > > > > > > >> to
> >> > > > > > > > > > > > >>>> the
> >> > > > > > > > > > > > >>>>> caller to decide whether it errors out or not
> >> > and not
> >> > > > > make
> >> > > > > > > the
> >> > > > > > > > > > > > >> decision
> >> > > > > > > > > > > > >>>>> within Streams? i.e we don't need a new config
> >> > > > > > > > > > > > >>>>>
> >> > > > > > > > > > > > >>>>> 14. +1 . If it's easier to do right away. We
> >> > started
> >> > > > > with
> >> > > > > > > number
> >> > > > > > > > > > of
> >> > > > > > > > > > > > >>>>> records, following the lead from KIP-441
> >> > > > > > > > > > > > >>>>>
> >> > > > > > > > > > > > >>>>> On Wed, Oct 23, 2019 at 5:44 AM Navinder Brar
> >> > > > > > > > > > > > >>>>> <navinder_b...@yahoo.com.invalid> wrote:
> >> > > > > > > > > > > > >>>>>
> >> > > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > > >>>>>> Thanks, everyone for taking a look. Some very
> >> > cool
> >> > > > > ideas
> >> > > > > > > have
> >> > > > > > > > > > > flown
> >> > > > > > > > > > > > >>> in.
> >> > > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > > >>>>>>>> There was a follow-on idea I POCed to
> >> > continuously
> >> > > > > > > share lag
> >> > > > > > > > > > > > >>>>>> information in the heartbeat protocol+1 that
> >> > would
> >> > > > be
> >> > > > > > > great, I
> >> > > > > > > > > > > will
> >> > > > > > > > > > > > >>>> update
> >> > > > > > > > > > > > >>>>>> the KIP assuming this work will finish soon
> >> > > > > > > > > > > > >>>>>>>> I think that adding a new method to
> >> > > > > > > StreamsMetadataState and
> >> > > > > > > > > > > > >>>>>> deprecating the existing method isthe best
> >> way
> >> > to
> >> > > > go;
> >> > > > > we
> >> > > > > > > just
> >> > > > > > > > > > > can't
> >> > > > > > > > > > > > >>>> change
> >> > > > > > > > > > > > >>>>>> the return types of any existing methods.+1
> >> on
> >> > this,
> >> > > > > we
> >> > > > > > > will add
> >> > > > > > > > > > > > >> new
> >> > > > > > > > > > > > >>>>>> methods for users who would be interested in
> >> > > > querying
> >> > > > > > > back a
> >> > > > > > > > > > list
> >> > > > > > > > > > > > >> of
> >> > > > > > > > > > > > >>>>>> possible options to query from and leave the
> >> > current
> >> > > > > > > function
> >> > > > > > > > > > > > >>>>>> getStreamsMetadataForKey() untouched for
> >> users
> >> > who
> >> > > > > want
> >> > > > > > > absolute
> >> > > > > > > > > > > > >>>>>> consistency.
> >> > > > > > > > > > > > >>>>>>>> why not just always return all available
> >> > metadata
> >> > > > > > > (including
> >> > > > > > > > > > > > >>>>>> active/standby or lag) and let the caller
> >> > decide to
> >> > > > > which
> >> > > > > > > node
> >> > > > > > > > > > > they
> >> > > > > > > > > > > > >>>> want to
> >> > > > > > > > > > > > >>>>>> route the query+1. I think this makes sense
> >> as
> >> > from
> >> > > > a
> >> > > > > user
> >> > > > > > > > > > > > >> standpoint
> >> > > > > > > > > > > > >>>> there
> >> > > > > > > > > > > > >>>>>> is no difference b/w an active and a standby
> >> if
> >> > both
> >> > > > > have
> >> > > > > > > same
> >> > > > > > > > > > > lag,
> >> > > > > > > > > > > > >>>> Infact
> >> > > > > > > > > > > > >>>>>> users would be able to use this API to reduce
> >> > query
> >> > > > > load
> >> > > > > > > on
> >> > > > > > > > > > > > >> actives,
> >> > > > > > > > > > > > >>> so
> >> > > > > > > > > > > > >>>>>> returning all available options along with
> >> the
> >> > > > current
> >> > > > > > > lag in
> >> > > > > > > > > > each
> >> > > > > > > > > > > > >>>> would
> >> > > > > > > > > > > > >>>>>> make sense and leave it to user how they want
> >> > to use
> >> > > > > this
> >> > > > > > > data.
> >> > > > > > > > > > > > >> This
> >> > > > > > > > > > > > >>>> has
> >> > > > > > > > > > > > >>>>>> another added advantage. If a user queries
> >> any
> >> > > > random
> >> > > > > > > machine
> >> > > > > > > > > > for
> >> > > > > > > > > > > a
> >> > > > > > > > > > > > >>>> key and
> >> > > > > > > > > > > > >>>>>> that machine has a replica for the
> >> > partition(where
> >> > > > key
> >> > > > > > > belongs)
> >> > > > > > > > > > > > >> user
> >> > > > > > > > > > > > >>>> might
> >> > > > > > > > > > > > >>>>>> choose to serve the data from there
> >> itself(if it
> >> > > > > doesn’t
> >> > > > > > > lag
> >> > > > > > > > > > much)
> >> > > > > > > > > > > > >>>> rather
> >> > > > > > > > > > > > >>>>>> than finding the active and making an IQ to
> >> > that.
> >> > > > This
> >> > > > > > > would
> >> > > > > > > > > > save
> >> > > > > > > > > > > > >>> some
> >> > > > > > > > > > > > >>>>>> critical time in serving for some
> >> applications.
> >> > > > > > > > > > > > >>>>>>>> Adding the lag in terms of timestamp diff
> >> > > > comparing
> >> > > > > the
> >> > > > > > > > > > > > >> committed
> >> > > > > > > > > > > > >>>>>> offset.+1 on this, I think it’s more
> >> readable.
> >> > But
> >> > > > as
> >> > > > > > > John said
> >> > > > > > > > > > > the
> >> > > > > > > > > > > > >>>>>> function allMetadataForKey() is just
> >> returning
> >> > the
> >> > > > > > > possible
> >> > > > > > > > > > > options
> >> > > > > > > > > > > > >>>> from
> >> > > > > > > > > > > > >>>>>> where users can query a key, so we can even
> >> > drop the
> >> > > > > > > parameter
> >> > > > > > > > > > > > >>>>>> enableReplicaServing/tolerableDataStaleness
> >> and
> >> > just
> >> > > > > > > return all
> >> > > > > > > > > > > the
> >> > > > > > > > > > > > >>>>>> streamsMetadata containing that key along
> >> with
> >> > the
> >> > > > > offset
> >> > > > > > > limit.
> >> > > > > > > > > > > > >>>>>> Answering the questions posted by Matthias in
> >> > > > > sequence.
> >> > > > > > > > > > > > >>>>>> 1. @John can you please comment on this
> >> one.2.
> >> > Yeah
> >> > > > > the
> >> > > > > > > usage
> >> > > > > > > > > > > > >> pattern
> >> > > > > > > > > > > > >>>>>> would include querying this prior to every
> >> > request
> >> > > > 3.
> >> > > > > > > Will add
> >> > > > > > > > > > the
> >> > > > > > > > > > > > >>>> changes
> >> > > > > > > > > > > > >>>>>> to StreamsMetadata in the KIP, would include
> >> > changes
> >> > > > > in
> >> > > > > > > > > > > > >>>> rebuildMetadata()
> >> > > > > > > > > > > > >>>>>> etc.4. Makes sense, already addressed above5.
> >> > Is it
> >> > > > > > > important
> >> > > > > > > > > > from
> >> > > > > > > > > > > > >> a
> >> > > > > > > > > > > > >>>> user
> >> > > > > > > > > > > > >>>>>> perspective if they are querying an
> >> > > > > active(processing),
> >> > > > > > > > > > > > >>>> active(restoring),
> >> > > > > > > > > > > > >>>>>> a standby task if we have away of denoting
> >> lag
> >> > in a
> >> > > > > > > readable
> >> > > > > > > > > > > manner
> >> > > > > > > > > > > > >>>> which
> >> > > > > > > > > > > > >>>>>> kind of signifies the user that this is the
> >> best
> >> > > > node
> >> > > > > to
> >> > > > > > > query
> >> > > > > > > > > > the
> >> > > > > > > > > > > > >>>> fresh
> >> > > > > > > > > > > > >>>>>> data.6. Yes, I intend to return the actives
> >> and
> >> > > > > replicas
> >> > > > > > > in the
> >> > > > > > > > > > > > >> same
> >> > > > > > > > > > > > >>>> return
> >> > > > > > > > > > > > >>>>>> list in allMetadataForKey()7. tricky8. yes,
> >> we
> >> > need
> >> > > > > new
> >> > > > > > > > > > functions
> >> > > > > > > > > > > > >> to
> >> > > > > > > > > > > > >>>> return
> >> > > > > > > > > > > > >>>>>> activeRestoring and standbyRunning tasks.9.
> >> > > > > StreamsConfig
> >> > > > > > > > > > doesn’t
> >> > > > > > > > > > > > >>> look
> >> > > > > > > > > > > > >>>> like
> >> > > > > > > > > > > > >>>>>> of much use to me since we are giving all
> >> > possible
> >> > > > > > > options via
> >> > > > > > > > > > > this
> >> > > > > > > > > > > > >>>>>> function, or they can use existing function
> >> > > > > > > > > > > > >>> getStreamsMetadataForKey()
> >> > > > > > > > > > > > >>>> and
> >> > > > > > > > > > > > >>>>>> get just the active10. I think treat them
> >> both
> >> > the
> >> > > > > same
> >> > > > > > > and let
> >> > > > > > > > > > > the
> >> > > > > > > > > > > > >>>> lag do
> >> > > > > > > > > > > > >>>>>> the talking11. We are just sending them the
> >> > option
> >> > > > to
> >> > > > > > > query from
> >> > > > > > > > > > > in
> >> > > > > > > > > > > > >>>>>> allMetadataForKey(), which doesn’t include
> >> any
> >> > > > > handle. We
> >> > > > > > > then
> >> > > > > > > > > > > > >> query
> >> > > > > > > > > > > > >>>> that
> >> > > > > > > > > > > > >>>>>> machine for the key where it calls
> >> allStores()
> >> > and
> >> > > > > tries
> >> > > > > > > to find
> >> > > > > > > > > > > > >> the
> >> > > > > > > > > > > > >>>> task
> >> > > > > > > > > > > > >>>>>> in
> >> activeRunning/activeRestoring/standbyRunning
> >> > and
> >> > > > > adds
> >> > > > > > > the
> >> > > > > > > > > > store
> >> > > > > > > > > > > > >>>> handle
> >> > > > > > > > > > > > >>>>>> here. 12. Need to verify, but during the
> >> exact
> >> > point
> >> > > > > when
> >> > > > > > > store
> >> > > > > > > > > > is
> >> > > > > > > > > > > > >>>> closed
> >> > > > > > > > > > > > >>>>>> to transition it from restoring to running
> >> the
> >> > > > queries
> >> > > > > > > will
> >> > > > > > > > > > fail.
> >> > > > > > > > > > > > >> The
> >> > > > > > > > > > > > >>>>>> caller in such case can have their own
> >> > configurable
> >> > > > > > > retries to
> >> > > > > > > > > > > > >> check
> >> > > > > > > > > > > > >>>> again
> >> > > > > > > > > > > > >>>>>> or try the replica if a call fails to
> >> active13.
> >> > I
> >> > > > > think
> >> > > > > > > KIP-216
> >> > > > > > > > > > is
> >> > > > > > > > > > > > >>>> working
> >> > > > > > > > > > > > >>>>>> on those lines, we might not need few of
> >> those
> >> > > > > exceptions
> >> > > > > > > since
> >> > > > > > > > > > > now
> >> > > > > > > > > > > > >>> the
> >> > > > > > > > > > > > >>>>>> basic idea of this KIP is to support IQ
> >> during
> >> > > > > > > rebalancing.14.
> >> > > > > > > > > > > > >>>> Addressed
> >> > > > > > > > > > > > >>>>>> above, agreed it looks more readable.
> >> > > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > > >>>>>>    On Tuesday, 22 October, 2019, 08:39:07 pm
> >> > IST,
> >> > > > > > > Matthias J.
> >> > > > > > > > > > Sax
> >> > > > > > > > > > > > >> <
> >> > > > > > > > > > > > >>>>>> matth...@confluent.io> wrote:
> >> > > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > > >>>>>>  One more thought:
> >> > > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > > >>>>>> 14) Is specifying the allowed lag in number
> >> of
> >> > > > > records a
> >> > > > > > > useful
> >> > > > > > > > > > > way
> >> > > > > > > > > > > > >>> for
> >> > > > > > > > > > > > >>>>>> users to declare how stale an instance is
> >> > allowed to
> >> > > > > be?
> >> > > > > > > Would
> >> > > > > > > > > > it
> >> > > > > > > > > > > > >> be
> >> > > > > > > > > > > > >>>>>> more intuitive for users to specify the
> >> allowed
> >> > lag
> >> > > > in
> >> > > > > > > time
> >> > > > > > > > > > units
> >> > > > > > > > > > > > >>>> (would
> >> > > > > > > > > > > > >>>>>> event time or processing time be better)? It
> >> > seems
> >> > > > > hard
> >> > > > > > > for
> >> > > > > > > > > > users
> >> > > > > > > > > > > > >> to
> >> > > > > > > > > > > > >>>>>> reason how "fresh" a store really is when
> >> > number of
> >> > > > > > > records is
> >> > > > > > > > > > > > >> used.
> >> > > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > > >>>>>> -Matthias
> >> > > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > > >>>>>> On 10/21/19 9:02 PM, Matthias J. Sax wrote:
> >> > > > > > > > > > > > >>>>>>> Some more follow up thoughts:
> >> > > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > > >>>>>>> 11) If we get a store handle of an
> >> > > > active(restoring)
> >> > > > > > > task, and
> >> > > > > > > > > > > > >> the
> >> > > > > > > > > > > > >>>> task
> >> > > > > > > > > > > > >>>>>>> transits to running, does the store handle
> >> > become
> >> > > > > > > invalid and a
> >> > > > > > > > > > > > >> new
> >> > > > > > > > > > > > >>>> one
> >> > > > > > > > > > > > >>>>>>> must be retrieved? Or can we "switch it out"
> >> > > > > underneath
> >> > > > > > > -- for
> >> > > > > > > > > > > > >> this
> >> > > > > > > > > > > > >>>>>>> case, how does the user know when they
> >> start to
> >> > > > > query the
> >> > > > > > > > > > > > >>> up-to-date
> >> > > > > > > > > > > > >>>>>> state?
> >> > > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > > >>>>>>> 12) Standby tasks will have the store open
> >> in
> >> > > > regular
> >> > > > > > > mode,
> >> > > > > > > > > > while
> >> > > > > > > > > > > > >>>>>>> active(restoring) tasks open stores in
> >> "upgrade
> >> > > > mode"
> >> > > > > > > for more
> >> > > > > > > > > > > > >>>> efficient
> >> > > > > > > > > > > > >>>>>>> bulk loading. When we switch the store into
> >> > active
> >> > > > > mode,
> >> > > > > > > we
> >> > > > > > > > > > close
> >> > > > > > > > > > > > >>> it
> >> > > > > > > > > > > > >>>> and
> >> > > > > > > > > > > > >>>>>>> reopen it. What is the impact if we query
> >> the
> >> > store
> >> > > > > > > during
> >> > > > > > > > > > > > >> restore?
> >> > > > > > > > > > > > >>>> What
> >> > > > > > > > > > > > >>>>>>> is the impact if we close the store to
> >> transit
> >> > to
> >> > > > > > > running (eg,
> >> > > > > > > > > > > > >>> there
> >> > > > > > > > > > > > >>>>>>> might be open iterators)?
> >> > > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > > >>>>>>> 13) Do we need to introduced new exception
> >> > types?
> >> > > > > Compare
> >> > > > > > > > > > KIP-216
> >> > > > > > > > > > > > >>>>>>> (
> >> > > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > > >>>>
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > >
> >> > > > >
> >> > > >
> >> >
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors
> >> > > > > > > > > > > > >>>>>> )
> >> > > > > > > > > > > > >>>>>>> that aims to improve the user experience
> >> with
> >> > > > regard
> >> > > > > to
> >> > > > > > > IQ
> >> > > > > > > > > > > > >>>> exceptions.
> >> > > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > > >>>>>>> -Matthias
> >> > > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > > >>>>>>> On 10/21/19 6:39 PM, Matthias J. Sax wrote:
> >> > > > > > > > > > > > >>>>>>>> Thanks for the KIP.
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>> Couple of comments:
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>> 1) With regard to KIP-441, my current
> >> > > > understanding
> >> > > > > is
> >> > > > > > > that
> >> > > > > > > > > > the
> >> > > > > > > > > > > > >>> lag
> >> > > > > > > > > > > > >>>>>>>> information is only reported to the leader
> >> > (please
> >> > > > > > > correct me
> >> > > > > > > > > > > > >> if I
> >> > > > > > > > > > > > >>>> am
> >> > > > > > > > > > > > >>>>>>>> wrong). This seems to be quite a
> >> limitation to
> >> > > > > actually
> >> > > > > > > use
> >> > > > > > > > > > the
> >> > > > > > > > > > > > >>> lag
> >> > > > > > > > > > > > >>>>>>>> information.
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>> 2) The idea of the metadata API is actually
> >> > to get
> >> > > > > > > metadata
> >> > > > > > > > > > once
> >> > > > > > > > > > > > >>> and
> >> > > > > > > > > > > > >>>>>>>> only refresh the metadata if a store was
> >> > migrated.
> >> > > > > The
> >> > > > > > > current
> >> > > > > > > > > > > > >>>> proposal
> >> > > > > > > > > > > > >>>>>>>> would require to get the metadata before
> >> each
> >> > > > query.
> >> > > > > > > The KIP
> >> > > > > > > > > > > > >>> should
> >> > > > > > > > > > > > >>>>>>>> describe the usage pattern and impact in
> >> more
> >> > > > > detail.
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>> 3) Currently, the KIP does not list the
> >> > public API
> >> > > > > > > changes in
> >> > > > > > > > > > > > >>>> detail.
> >> > > > > > > > > > > > >>>>>>>> Please list all methods you intend to
> >> > deprecate
> >> > > > and
> >> > > > > > > list all
> >> > > > > > > > > > > > >>>> methods you
> >> > > > > > > > > > > > >>>>>>>> intend to add (best, using a code-block
> >> > markup --
> >> > > > > > > compare
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > > >>>>
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > >
> >> > > > >
> >> > > >
> >> >
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-470%3A+TopologyTestDriver+test+input+and+output+usability+improvements
> >> > > > > > > > > > > > >>>>>>>> as an example)
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>> 4) Also note (as already pointed out by
> >> John),
> >> > > > that
> >> > > > > we
> >> > > > > > > cannot
> >> > > > > > > > > > > > >> have
> >> > > > > > > > > > > > >>>> any
> >> > > > > > > > > > > > >>>>>>>> breaking API changes. Thus, the API should
> >> be
> >> > > > > designed
> >> > > > > > > in a
> >> > > > > > > > > > > > >> fully
> >> > > > > > > > > > > > >>>>>>>> backward compatible manner.
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>> 5) Returning a list of metadata object
> >> makes
> >> > it
> >> > > > hard
> >> > > > > > > for user
> >> > > > > > > > > > to
> >> > > > > > > > > > > > >>>> know if
> >> > > > > > > > > > > > >>>>>>>> the first object refers to the
> >> > active(processing),
> >> > > > > > > > > > > > >>>> active(restoring), or
> >> > > > > > > > > > > > >>>>>>>> a standby task. IMHO, we should be more
> >> > explicit.
> >> > > > > For
> >> > > > > > > > > > example, a
> >> > > > > > > > > > > > >>>>>>>> metadata object could have a flag that one
> >> can
> >> > > > test
> >> > > > > via
> >> > > > > > > > > > > > >>>> `#isActive()`.
> >> > > > > > > > > > > > >>>>>>>> Or maybe even better, we could keep the
> >> > current
> >> > > > API
> >> > > > > > > as-is and
> >> > > > > > > > > > > > >> add
> >> > > > > > > > > > > > >>>>>>>> something like `standbyMetadataForKey()`
> >> (and
> >> > > > > similar
> >> > > > > > > methods
> >> > > > > > > > > > > > >> for
> >> > > > > > > > > > > > >>>>>>>> other). Having just a flag `isActive()` is
> >> a
> >> > > > little
> >> > > > > > > subtle and
> >> > > > > > > > > > > > >>>> having
> >> > > > > > > > > > > > >>>>>>>> new overloads would make the API much
> >> clearer
> >> > > > > (passing
> >> > > > > > > in a
> >> > > > > > > > > > > > >>> boolean
> >> > > > > > > > > > > > >>>> flag
> >> > > > > > > > > > > > >>>>>>>> does not seem to be a nice API).
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>> 6) Do you intent to return all standby
> >> > metadata
> >> > > > > > > information at
> >> > > > > > > > > > > > >>> once,
> >> > > > > > > > > > > > >>>>>>>> similar to `allMetadata()` -- seems to be
> >> > useful.
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>> 7) Even if the lag information is
> >> propagated
> >> > to
> >> > > > all
> >> > > > > > > instances,
> >> > > > > > > > > > > > >> it
> >> > > > > > > > > > > > >>>> will
> >> > > > > > > > > > > > >>>>>>>> happen in an async manner. Hence, I am
> >> > wondering
> >> > > > if
> >> > > > > we
> >> > > > > > > should
> >> > > > > > > > > > > > >>>> address
> >> > > > > > > > > > > > >>>>>>>> this race condition (I think we should).
> >> The
> >> > idea
> >> > > > > would
> >> > > > > > > be to
> >> > > > > > > > > > > > >>> check
> >> > > > > > > > > > > > >>>> if a
> >> > > > > > > > > > > > >>>>>>>> standby/active(restoring) task is actually
> >> > still
> >> > > > > within
> >> > > > > > > the
> >> > > > > > > > > > lag
> >> > > > > > > > > > > > >>>> bounds
> >> > > > > > > > > > > > >>>>>>>> when a query is executed and we would
> >> throw an
> >> > > > > > > exception if
> >> > > > > > > > > > not.
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>> 8) The current `KafkaStreams#state()`
> >> method
> >> > only
> >> > > > > > > returns a
> >> > > > > > > > > > > > >> handle
> >> > > > > > > > > > > > >>>> to
> >> > > > > > > > > > > > >>>>>>>> stores of active(processing) tasks. How
> >> can a
> >> > user
> >> > > > > > > actually
> >> > > > > > > > > > get
> >> > > > > > > > > > > > >> a
> >> > > > > > > > > > > > >>>> handle
> >> > > > > > > > > > > > >>>>>>>> to an store of an active(restoring) or
> >> standby
> >> > > > task
> >> > > > > for
> >> > > > > > > > > > > > >> querying?
> >> > > > > > > > > > > > >>>> Seems
> >> > > > > > > > > > > > >>>>>>>> we should add a new method to get standby
> >> > handles?
> >> > > > > > > Changing
> >> > > > > > > > > > the
> >> > > > > > > > > > > > >>>>>>>> semantics to existing `state()` would be
> >> > possible,
> >> > > > > but
> >> > > > > > > I think
> >> > > > > > > > > > > > >>>> adding a
> >> > > > > > > > > > > > >>>>>>>> new method is preferable?
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>> 9) How does the user actually specify the
> >> > > > acceptable
> >> > > > > > > lag? A
> >> > > > > > > > > > > > >> global
> >> > > > > > > > > > > > >>>>>>>> config via StreamsConfig (this would be a
> >> > public
> >> > > > API
> >> > > > > > > change
> >> > > > > > > > > > that
> >> > > > > > > > > > > > >>>> needs
> >> > > > > > > > > > > > >>>>>>>> to be covered in the KIP)? Or on a
> >> per-store
> >> > or
> >> > > > even
> >> > > > > > > per-query
> >> > > > > > > > > > > > >>>> basis for
> >> > > > > > > > > > > > >>>>>>>> more flexibility? We could also have a
> >> global
> >> > > > > setting
> >> > > > > > > that is
> >> > > > > > > > > > > > >> used
> >> > > > > > > > > > > > >>>> as
> >> > > > > > > > > > > > >>>>>>>> default and allow to overwrite it on a
> >> > per-query
> >> > > > > basis.
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>> 10) Do we need to distinguish between
> >> > > > > active(restoring)
> >> > > > > > > and
> >> > > > > > > > > > > > >>> standby
> >> > > > > > > > > > > > >>>>>>>> tasks? Or could be treat both as the same?
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>> -Matthias
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>> On 10/21/19 5:40 PM, Vinoth Chandar wrote:
> >> > > > > > > > > > > > >>>>>>>>>>> I'm wondering, rather than putting
> >> > "acceptable
> >> > > > > lag"
> >> > > > > > > into
> >> > > > > > > > > > the
> >> > > > > > > > > > > > >>>>>>>>> configuration at all, or even making it a
> >> > > > > parameter on
> >> > > > > > > > > > > > >>>>>> `allMetadataForKey`,
> >> > > > > > > > > > > > >>>>>>>>> why not just _always_ return all available
> >> > > > metadata
> >> > > > > > > > > > (including
> >> > > > > > > > > > > > >>>>>>>>> active/standby or lag) and let the caller
> >> > decide
> >> > > > to
> >> > > > > > > which
> >> > > > > > > > > > node
> >> > > > > > > > > > > > >>> they
> >> > > > > > > > > > > > >>>>>> want to
> >> > > > > > > > > > > > >>>>>>>>> route the query?
> >> > > > > > > > > > > > >>>>>>>>> +1 on exposing lag information via the
> >> APIs.
> >> > IMO
> >> > > > > > > without
> >> > > > > > > > > > having
> >> > > > > > > > > > > > >>>>>>>>> continuously updated/fresh lag
> >> information,
> >> > its
> >> > > > > true
> >> > > > > > > value
> >> > > > > > > > > > as a
> >> > > > > > > > > > > > >>>> signal
> >> > > > > > > > > > > > >>>>>> for
> >> > > > > > > > > > > > >>>>>>>>> query routing decisions is much limited.
> >> But
> >> > we
> >> > > > can
> >> > > > > > > design
> >> > > > > > > > > > the
> >> > > > > > > > > > > > >>> API
> >> > > > > > > > > > > > >>>>>> around
> >> > > > > > > > > > > > >>>>>>>>> this model and iterate? Longer term, we
> >> > should
> >> > > > have
> >> > > > > > > > > > > > >> continuously
> >> > > > > > > > > > > > >>>>>> shared lag
> >> > > > > > > > > > > > >>>>>>>>> information.
> >> > > > > > > > > > > > >>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>> more general to refactor it to
> >> > > > > > > "allMetadataForKey(long
> >> > > > > > > > > > > > >>>>>>>>> tolerableDataStaleness, ...)", and when
> >> it's
> >> > set
> >> > > > > to 0
> >> > > > > > > it
> >> > > > > > > > > > means
> >> > > > > > > > > > > > >>>> "active
> >> > > > > > > > > > > > >>>>>> task
> >> > > > > > > > > > > > >>>>>>>>> only".
> >> > > > > > > > > > > > >>>>>>>>> +1 IMO if we plan on having
> >> > > > > `enableReplicaServing`, it
> >> > > > > > > makes
> >> > > > > > > > > > > > >>> sense
> >> > > > > > > > > > > > >>>> to
> >> > > > > > > > > > > > >>>>>>>>> generalize based on dataStaleness. This
> >> seems
> >> > > > > > > complementary
> >> > > > > > > > > > to
> >> > > > > > > > > > > > >>>>>> exposing the
> >> > > > > > > > > > > > >>>>>>>>> lag information itself.
> >> > > > > > > > > > > > >>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>> This is actually not a public api
> >> change at
> >> > > > all,
> >> > > > > and
> >> > > > > > > I'm
> >> > > > > > > > > > > > >>>> planning to
> >> > > > > > > > > > > > >>>>>>>>> implement it asap as a precursor to the
> >> rest
> >> > of
> >> > > > > KIP-441
> >> > > > > > > > > > > > >>>>>>>>> +1 again. Do we have a concrete timeline
> >> for
> >> > when
> >> > > > > this
> >> > > > > > > change
> >> > > > > > > > > > > > >>> will
> >> > > > > > > > > > > > >>>>>> land on
> >> > > > > > > > > > > > >>>>>>>>> master? I would like to get the
> >> > implementation
> >> > > > > wrapped
> >> > > > > > > up (as
> >> > > > > > > > > > > > >>> much
> >> > > > > > > > > > > > >>>> as
> >> > > > > > > > > > > > >>>>>>>>> possible) by end of the month. :). But I
> >> > agree
> >> > > > this
> >> > > > > > > > > > sequencing
> >> > > > > > > > > > > > >>>> makes
> >> > > > > > > > > > > > >>>>>>>>> sense..
> >> > > > > > > > > > > > >>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>> On Mon, Oct 21, 2019 at 2:56 PM Guozhang
> >> > Wang <
> >> > > > > > > > > > > > >>> wangg...@gmail.com>
> >> > > > > > > > > > > > >>>>>> wrote:
> >> > > > > > > > > > > > >>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>> Hi Navinder,
> >> > > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>> Thanks for the KIP, I have a high level
> >> > question
> >> > > > > > > about the
> >> > > > > > > > > > > > >>>> proposed
> >> > > > > > > > > > > > >>>>>> API
> >> > > > > > > > > > > > >>>>>>>>>> regarding:
> >> > > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>
> >> > "StreamsMetadataState::allMetadataForKey(boolean
> >> > > > > > > > > > > > >>>>>> enableReplicaServing...)"
> >> > > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>> I'm wondering if it's more general to
> >> > refactor
> >> > > > it
> >> > > > > to
> >> > > > > > > > > > > > >>>>>>>>>> "allMetadataForKey(long
> >> > tolerableDataStaleness,
> >> > > > > > > ...)", and
> >> > > > > > > > > > > > >> when
> >> > > > > > > > > > > > >>>> it's
> >> > > > > > > > > > > > >>>>>> set to
> >> > > > > > > > > > > > >>>>>>>>>> 0 it means "active task only". Behind the
> >> > scene,
> >> > > > > we
> >> > > > > > > can have
> >> > > > > > > > > > > > >> the
> >> > > > > > > > > > > > >>>>>> committed
> >> > > > > > > > > > > > >>>>>>>>>> offsets to encode the stream time as
> >> well,
> >> > so
> >> > > > that
> >> > > > > > > when
> >> > > > > > > > > > > > >>> processing
> >> > > > > > > > > > > > >>>>>> standby
> >> > > > > > > > > > > > >>>>>>>>>> tasks the stream process knows not long
> >> the
> >> > lag
> >> > > > in
> >> > > > > > > terms of
> >> > > > > > > > > > > > >>>> offsets
> >> > > > > > > > > > > > >>>>>>>>>> comparing to the committed offset
> >> > (internally we
> >> > > > > call
> >> > > > > > > it
> >> > > > > > > > > > > > >> offset
> >> > > > > > > > > > > > >>>>>> limit), but
> >> > > > > > > > > > > > >>>>>>>>>> also the lag in terms of timestamp diff
> >> > > > comparing
> >> > > > > the
> >> > > > > > > > > > > > >> committed
> >> > > > > > > > > > > > >>>>>> offset.
> >> > > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>> Also encoding the timestamp as part of
> >> > offset
> >> > > > have
> >> > > > > > > other
> >> > > > > > > > > > > > >>> benefits
> >> > > > > > > > > > > > >>>> for
> >> > > > > > > > > > > > >>>>>>>>>> improving Kafka Streams time semantics as
> >> > well,
> >> > > > > but
> >> > > > > > > for
> >> > > > > > > > > > > > >> KIP-535
> >> > > > > > > > > > > > >>>>>> itself I
> >> > > > > > > > > > > > >>>>>>>>>> think it can help giving users a more
> >> > intuitive
> >> > > > > > > interface to
> >> > > > > > > > > > > > >>>> reason
> >> > > > > > > > > > > > >>>>>> about.
> >> > > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>> Guozhang
> >> > > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>> On Mon, Oct 21, 2019 at 12:30 PM John
> >> > Roesler <
> >> > > > > > > > > > > > >>> j...@confluent.io>
> >> > > > > > > > > > > > >>>>>> wrote:
> >> > > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>> Hey Navinder,
> >> > > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>> Thanks for the KIP! I've been reading
> >> over
> >> > the
> >> > > > > > > discussion
> >> > > > > > > > > > > > >> thus
> >> > > > > > > > > > > > >>>> far,
> >> > > > > > > > > > > > >>>>>>>>>>> and I have a couple of thoughts to pile
> >> on
> >> > as
> >> > > > > well:
> >> > > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>> It seems confusing to propose the API in
> >> > terms
> >> > > > > of the
> >> > > > > > > > > > current
> >> > > > > > > > > > > > >>>> system
> >> > > > > > > > > > > > >>>>>>>>>>> state, but also propose how the API
> >> would
> >> > look
> >> > > > > > > if/when
> >> > > > > > > > > > > > >> KIP-441
> >> > > > > > > > > > > > >>> is
> >> > > > > > > > > > > > >>>>>>>>>>> implemented. It occurs to me that the
> >> only
> >> > part
> >> > > > > of
> >> > > > > > > KIP-441
> >> > > > > > > > > > > > >> that
> >> > > > > > > > > > > > >>>> would
> >> > > > > > > > > > > > >>>>>>>>>>> affect you is the availability of the
> >> lag
> >> > > > > > > information in
> >> > > > > > > > > > the
> >> > > > > > > > > > > > >>>>>>>>>>> SubscriptionInfo message. This is
> >> actually
> >> > not
> >> > > > a
> >> > > > > > > public api
> >> > > > > > > > > > > > >>>> change at
> >> > > > > > > > > > > > >>>>>>>>>>> all, and I'm planning to implement it
> >> asap
> >> > as a
> >> > > > > > > precursor
> >> > > > > > > > > > to
> >> > > > > > > > > > > > >>> the
> >> > > > > > > > > > > > >>>> rest
> >> > > > > > > > > > > > >>>>>>>>>>> of KIP-441, so maybe you can just build
> >> on
> >> > top
> >> > > > of
> >> > > > > > > KIP-441
> >> > > > > > > > > > and
> >> > > > > > > > > > > > >>>> assume
> >> > > > > > > > > > > > >>>>>>>>>>> the lag information will be available.
> >> > Then you
> >> > > > > > > could have
> >> > > > > > > > > > a
> >> > > > > > > > > > > > >>> more
> >> > > > > > > > > > > > >>>>>>>>>>> straightforward proposal (e.g., mention
> >> > that
> >> > > > > you'd
> >> > > > > > > return
> >> > > > > > > > > > the
> >> > > > > > > > > > > > >>> lag
> >> > > > > > > > > > > > >>>>>>>>>>> information in AssignmentInfo as well as
> >> > in the
> >> > > > > > > > > > > > >> StreamsMetadata
> >> > > > > > > > > > > > >>>> in
> >> > > > > > > > > > > > >>>>>>>>>>> some form, or make use of it in the API
> >> > > > somehow).
> >> > > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>> I'm partially motivated in that former
> >> > point
> >> > > > > because
> >> > > > > > > it
> >> > > > > > > > > > seems
> >> > > > > > > > > > > > >>>> like
> >> > > > > > > > > > > > >>>>>>>>>>> understanding how callers would bound
> >> the
> >> > > > > staleness
> >> > > > > > > for
> >> > > > > > > > > > their
> >> > > > > > > > > > > > >>> use
> >> > > > > > > > > > > > >>>>>> case
> >> > > > > > > > > > > > >>>>>>>>>>> is _the_ key point for this KIP. FWIW, I
> >> > think
> >> > > > > that
> >> > > > > > > adding
> >> > > > > > > > > > a
> >> > > > > > > > > > > > >>> new
> >> > > > > > > > > > > > >>>>>>>>>>> method to StreamsMetadataState and
> >> > deprecating
> >> > > > > the
> >> > > > > > > existing
> >> > > > > > > > > > > > >>>> method is
> >> > > > > > > > > > > > >>>>>>>>>>> the best way to go; we just can't change
> >> > the
> >> > > > > return
> >> > > > > > > types
> >> > > > > > > > > > of
> >> > > > > > > > > > > > >>> any
> >> > > > > > > > > > > > >>>>>>>>>>> existing methods.
> >> > > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>> I'm wondering, rather than putting
> >> > "acceptable
> >> > > > > lag"
> >> > > > > > > into
> >> > > > > > > > > > the
> >> > > > > > > > > > > > >>>>>>>>>>> configuration at all, or even making it
> >> a
> >> > > > > parameter
> >> > > > > > > on
> >> > > > > > > > > > > > >>>>>>>>>>> `allMetadataForKey`, why not just
> >> _always_
> >> > > > > return all
> >> > > > > > > > > > > > >> available
> >> > > > > > > > > > > > >>>>>>>>>>> metadata (including active/standby or
> >> lag)
> >> > and
> >> > > > > let
> >> > > > > > > the
> >> > > > > > > > > > caller
> >> > > > > > > > > > > > >>>> decide
> >> > > > > > > > > > > > >>>>>>>>>>> to which node they want to route the
> >> query?
> >> > > > This
> >> > > > > > > method
> >> > > > > > > > > > isn't
> >> > > > > > > > > > > > >>>> making
> >> > > > > > > > > > > > >>>>>>>>>>> any queries itself; it's merely telling
> >> you
> >> > > > where
> >> > > > > > > the local
> >> > > > > > > > > > > > >>>> Streams
> >> > > > > > > > > > > > >>>>>>>>>>> instance _thinks_ the key in question is
> >> > > > located.
> >> > > > > > > Just
> >> > > > > > > > > > > > >>> returning
> >> > > > > > > > > > > > >>>> all
> >> > > > > > > > > > > > >>>>>>>>>>> available information lets the caller
> >> > implement
> >> > > > > any
> >> > > > > > > > > > semantics
> >> > > > > > > > > > > > >>>> they
> >> > > > > > > > > > > > >>>>>>>>>>> desire around querying only active
> >> stores,
> >> > or
> >> > > > > > > standbys, or
> >> > > > > > > > > > > > >>>> recovering
> >> > > > > > > > > > > > >>>>>>>>>>> stores, or whatever.
> >> > > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>> One fly in the ointment, which you may
> >> > wish to
> >> > > > > > > consider if
> >> > > > > > > > > > > > >>>> proposing
> >> > > > > > > > > > > > >>>>>>>>>>> to use lag information, is that the
> >> cluster
> >> > > > would
> >> > > > > > > only
> >> > > > > > > > > > become
> >> > > > > > > > > > > > >>>> aware
> >> > > > > > > > > > > > >>>>>> of
> >> > > > > > > > > > > > >>>>>>>>>>> new lag information during rebalances.
> >> > Even in
> >> > > > > the
> >> > > > > > > full
> >> > > > > > > > > > > > >>>> expression of
> >> > > > > > > > > > > > >>>>>>>>>>> KIP-441, this information would stop
> >> being
> >> > > > > > > propagated when
> >> > > > > > > > > > > > >> the
> >> > > > > > > > > > > > >>>>>> cluster
> >> > > > > > > > > > > > >>>>>>>>>>> achieves a balanced task distribution.
> >> > There
> >> > > > was
> >> > > > > a
> >> > > > > > > > > > follow-on
> >> > > > > > > > > > > > >>>> idea I
> >> > > > > > > > > > > > >>>>>>>>>>> POCed to continuously share lag
> >> > information in
> >> > > > > the
> >> > > > > > > > > > heartbeat
> >> > > > > > > > > > > > >>>>>> protocol,
> >> > > > > > > > > > > > >>>>>>>>>>> which you might be interested in, if you
> >> > want
> >> > > > to
> >> > > > > > > make sure
> >> > > > > > > > > > > > >> that
> >> > > > > > > > > > > > >>>> nodes
> >> > > > > > > > > > > > >>>>>>>>>>> are basically _always_ aware of each
> >> > others'
> >> > > > lag
> >> > > > > on
> >> > > > > > > > > > different
> >> > > > > > > > > > > > >>>>>>>>>>> partitions:
> >> > > > > > > https://github.com/apache/kafka/pull/7096
> >> > > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>> Thanks again!
> >> > > > > > > > > > > > >>>>>>>>>>> -John
> >> > > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>> On Sat, Oct 19, 2019 at 6:06 AM Navinder
> >> > Brar
> >> > > > > > > > > > > > >>>>>>>>>>> <navinder_b...@yahoo.com.invalid>
> >> wrote:
> >> > > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>> Thanks, Vinoth. Looks like we are on
> >> the
> >> > same
> >> > > > > page.
> >> > > > > > > I will
> >> > > > > > > > > > > > >> add
> >> > > > > > > > > > > > >>>> some
> >> > > > > > > > > > > > >>>>>> of
> >> > > > > > > > > > > > >>>>>>>>>>> these explanations to the KIP as well.
> >> Have
> >> > > > > assigned
> >> > > > > > > the
> >> > > > > > > > > > > > >>>> KAFKA-6144
> >> > > > > > > > > > > > >>>>>> to
> >> > > > > > > > > > > > >>>>>>>>>>> myself and KAFKA-8994 is closed(by
> >> you). As
> >> > > > > > > suggested, we
> >> > > > > > > > > > > > >> will
> >> > > > > > > > > > > > >>>>>> replace
> >> > > > > > > > > > > > >>>>>>>>>>> "replica" with "standby".
> >> > > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>> In the new API,
> >> > > > > > > > > > > > >>> "StreamsMetadataState::allMetadataForKey(boolean
> >> > > > > > > > > > > > >>>>>>>>>>> enableReplicaServing, String storeName,
> >> K
> >> > key,
> >> > > > > > > > > > Serializer<K>
> >> > > > > > > > > > > > >>>>>>>>>>> keySerializer)" Do we really need a per
> >> key
> >> > > > > > > configuration?
> >> > > > > > > > > > > > >> or a
> >> > > > > > > > > > > > >>>> new
> >> > > > > > > > > > > > >>>>>>>>>>> StreamsConfig is good enough?>> Coming
> >> from
> >> > > > > > > experience,
> >> > > > > > > > > > when
> >> > > > > > > > > > > > >>>> teams
> >> > > > > > > > > > > > >>>>>> are
> >> > > > > > > > > > > > >>>>>>>>>>> building a platform with Kafka Streams
> >> and
> >> > > > these
> >> > > > > > > API's
> >> > > > > > > > > > serve
> >> > > > > > > > > > > > >>>> data to
> >> > > > > > > > > > > > >>>>>>>>>>> multiple teams, we can't have a
> >> generalized
> >> > > > > config
> >> > > > > > > that
> >> > > > > > > > > > says
> >> > > > > > > > > > > > >>> as a
> >> > > > > > > > > > > > >>>>>>>>>> platform
> >> > > > > > > > > > > > >>>>>>>>>>> we will support stale reads or not. It
> >> > should
> >> > > > be
> >> > > > > the
> >> > > > > > > choice
> >> > > > > > > > > > > > >> of
> >> > > > > > > > > > > > >>>>>> someone
> >> > > > > > > > > > > > >>>>>>>>>> who
> >> > > > > > > > > > > > >>>>>>>>>>> is calling the API's to choose whether
> >> > they are
> >> > > > > ok
> >> > > > > > > with
> >> > > > > > > > > > stale
> >> > > > > > > > > > > > >>>> reads
> >> > > > > > > > > > > > >>>>>> or
> >> > > > > > > > > > > > >>>>>>>>>> not.
> >> > > > > > > > > > > > >>>>>>>>>>> Makes sense?
> >> > > > > > > > > > > > >>>>>>>>>>>>    On Thursday, 17 October, 2019,
> >> > 11:56:02 pm
> >> > > > > IST,
> >> > > > > > > Vinoth
> >> > > > > > > > > > > > >>>> Chandar <
> >> > > > > > > > > > > > >>>>>>>>>>> vchan...@confluent.io> wrote:
> >> > > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>  Looks like we are covering ground :)
> >> > > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>> Only if it is within a permissible
> >> > > > range(say
> >> > > > > > > 10000) we
> >> > > > > > > > > > > > >> will
> >> > > > > > > > > > > > >>>> serve
> >> > > > > > > > > > > > >>>>>>>>>> from
> >> > > > > > > > > > > > >>>>>>>>>>>> Restoring state of active.
> >> > > > > > > > > > > > >>>>>>>>>>>> +1 on having a knob like this.. My
> >> > reasoning
> >> > > > is
> >> > > > > as
> >> > > > > > > > > > follows.
> >> > > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>> Looking at the Streams state as a
> >> > read-only
> >> > > > > > > distributed kv
> >> > > > > > > > > > > > >>>> store.
> >> > > > > > > > > > > > >>>>>> With
> >> > > > > > > > > > > > >>>>>>>>>>>> num_standby = f , we should be able to
> >> > > > tolerate
> >> > > > > f
> >> > > > > > > failures
> >> > > > > > > > > > > > >> and
> >> > > > > > > > > > > > >>>> if
> >> > > > > > > > > > > > >>>>>> there
> >> > > > > > > > > > > > >>>>>>>>>>> is
> >> > > > > > > > > > > > >>>>>>>>>>>> a f+1' failure, the system should be
> >> > > > > unavailable.
> >> > > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>> A) So with num_standby=0, the system
> >> > should be
> >> > > > > > > unavailable
> >> > > > > > > > > > > > >>> even
> >> > > > > > > > > > > > >>>> if
> >> > > > > > > > > > > > >>>>>>>>>> there
> >> > > > > > > > > > > > >>>>>>>>>>> is
> >> > > > > > > > > > > > >>>>>>>>>>>> 1 failure and thats my argument for not
> >> > > > allowing
> >> > > > > > > querying
> >> > > > > > > > > > in
> >> > > > > > > > > > > > >>>>>>>>>> restoration
> >> > > > > > > > > > > > >>>>>>>>>>>> state, esp in this case it will be a
> >> total
> >> > > > > rebuild
> >> > > > > > > of the
> >> > > > > > > > > > > > >>> state
> >> > > > > > > > > > > > >>>>>> (which
> >> > > > > > > > > > > > >>>>>>>>>>> IMO
> >> > > > > > > > > > > > >>>>>>>>>>>> cannot be considered a normal fault
> >> free
> >> > > > > operational
> >> > > > > > > > > > state).
> >> > > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>> B) Even there are standby's, say
> >> > > > num_standby=2,
> >> > > > > if
> >> > > > > > > the
> >> > > > > > > > > > user
> >> > > > > > > > > > > > >>>> decides
> >> > > > > > > > > > > > >>>>>> to
> >> > > > > > > > > > > > >>>>>>>>>>> shut
> >> > > > > > > > > > > > >>>>>>>>>>>> down all 3 instances, then only outcome
> >> > should
> >> > > > > be
> >> > > > > > > > > > > > >>> unavailability
> >> > > > > > > > > > > > >>>>>> until
> >> > > > > > > > > > > > >>>>>>>>>>> all
> >> > > > > > > > > > > > >>>>>>>>>>>> of them come back or state is rebuilt
> >> on
> >> > other
> >> > > > > > > nodes in
> >> > > > > > > > > > the
> >> > > > > > > > > > > > >>>>>> cluster. In
> >> > > > > > > > > > > > >>>>>>>>>>>> normal operations, f <= 2 and when a
> >> > failure
> >> > > > > does
> >> > > > > > > happen
> >> > > > > > > > > > we
> >> > > > > > > > > > > > >>> can
> >> > > > > > > > > > > > >>>> then
> >> > > > > > > > > > > > >>>>>>>>>>> either
> >> > > > > > > > > > > > >>>>>>>>>>>> choose to be C over A and fail IQs
> >> until
> >> > > > > > > replication is
> >> > > > > > > > > > > > >> fully
> >> > > > > > > > > > > > >>>>>> caught up
> >> > > > > > > > > > > > >>>>>>>>>>> or
> >> > > > > > > > > > > > >>>>>>>>>>>> choose A over C by serving in restoring
> >> > state
> >> > > > as
> >> > > > > > > long as
> >> > > > > > > > > > lag
> >> > > > > > > > > > > > >>> is
> >> > > > > > > > > > > > >>>>>>>>>> minimal.
> >> > > > > > > > > > > > >>>>>>>>>>> If
> >> > > > > > > > > > > > >>>>>>>>>>>> even with f=1 say, all the standbys are
> >> > > > lagging
> >> > > > > a
> >> > > > > > > lot due
> >> > > > > > > > > > to
> >> > > > > > > > > > > > >>>> some
> >> > > > > > > > > > > > >>>>>>>>>> issue,
> >> > > > > > > > > > > > >>>>>>>>>>>> then that should be considered a
> >> failure
> >> > since
> >> > > > > that
> >> > > > > > > is
> >> > > > > > > > > > > > >>> different
> >> > > > > > > > > > > > >>>>>> from
> >> > > > > > > > > > > > >>>>>>>>>>>> normal/expected operational mode.
> >> Serving
> >> > > > reads
> >> > > > > with
> >> > > > > > > > > > > > >> unbounded
> >> > > > > > > > > > > > >>>>>>>>>>> replication
> >> > > > > > > > > > > > >>>>>>>>>>>> lag and calling it "available" may not
> >> be
> >> > very
> >> > > > > > > usable or
> >> > > > > > > > > > > > >> even
> >> > > > > > > > > > > > >>>>>> desirable
> >> > > > > > > > > > > > >>>>>>>>>>> :)
> >> > > > > > > > > > > > >>>>>>>>>>>> IMHO, since it gives the user no way to
> >> > reason
> >> > > > > > > about the
> >> > > > > > > > > > app
> >> > > > > > > > > > > > >>>> that is
> >> > > > > > > > > > > > >>>>>>>>>>> going
> >> > > > > > > > > > > > >>>>>>>>>>>> to query this store.
> >> > > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>> So there is definitely a need to
> >> > distinguish
> >> > > > > > > between :
> >> > > > > > > > > > > > >>>> Replication
> >> > > > > > > > > > > > >>>>>>>>>>> catchup
> >> > > > > > > > > > > > >>>>>>>>>>>> while being in fault free state vs
> >> > Restoration
> >> > > > > of
> >> > > > > > > state
> >> > > > > > > > > > when
> >> > > > > > > > > > > > >>> we
> >> > > > > > > > > > > > >>>> lose
> >> > > > > > > > > > > > >>>>>>>>>> more
> >> > > > > > > > > > > > >>>>>>>>>>>> than f standbys. This knob is a great
> >> > starting
> >> > > > > point
> >> > > > > > > > > > towards
> >> > > > > > > > > > > > >>>> this.
> >> > > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>> If you agree with some of the
> >> explanation
> >> > > > above,
> >> > > > > > > please
> >> > > > > > > > > > feel
> >> > > > > > > > > > > > >>>> free to
> >> > > > > > > > > > > > >>>>>>>>>>>> include it in the KIP as well since
> >> this
> >> > is
> >> > > > > sort of
> >> > > > > > > our
> >> > > > > > > > > > > > >> design
> >> > > > > > > > > > > > >>>>>>>>>> principle
> >> > > > > > > > > > > > >>>>>>>>>>>> here..
> >> > > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>> Small nits :
> >> > > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>> - let's standardize on "standby"
> >> instead
> >> > of
> >> > > > > > > "replica", KIP
> >> > > > > > > > > > > > >> or
> >> > > > > > > > > > > > >>>>>> code,  to
> >> > > > > > > > > > > > >>>>>>>>>>> be
> >> > > > > > > > > > > > >>>>>>>>>>>> consistent with rest of Streams
> >> code/docs?
> >> > > > > > > > > > > > >>>>>>>>>>>> - Can we merge KAFKA-8994 into
> >> KAFKA-6144
> >> > now
> >> > > > > and
> >> > > > > > > close
> >> > > > > > > > > > the
> >> > > > > > > > > > > > >>>> former?
> >> > > > > > > > > > > > >>>>>>>>>>>> Eventually need to consolidate
> >> KAFKA-6555
> >> > as
> >> > > > > well
> >> > > > > > > > > > > > >>>>>>>>>>>> - In the new API,
> >> > > > > > > > > > > > >>>>
> >> "StreamsMetadataState::allMetadataForKey(boolean
> >> > > > > > > > > > > > >>>>>>>>>>>> enableReplicaServing, String
> >> storeName, K
> >> > key,
> >> > > > > > > > > > Serializer<K>
> >> > > > > > > > > > > > >>>>>>>>>>> keySerializer)" Do
> >> > > > > > > > > > > > >>>>>>>>>>>> we really need a per key configuration?
> >> > or a
> >> > > > new
> >> > > > > > > > > > > > >> StreamsConfig
> >> > > > > > > > > > > > >>>> is
> >> > > > > > > > > > > > >>>>>> good
> >> > > > > > > > > > > > >>>>>>>>>>>> enough?
> >> > > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>> On Wed, Oct 16, 2019 at 8:31 PM
> >> Navinder
> >> > Brar
> >> > > > > > > > > > > > >>>>>>>>>>>> <navinder_b...@yahoo.com.invalid>
> >> wrote:
> >> > > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>> @Vinoth, I have incorporated a few of
> >> the
> >> > > > > > > discussions we
> >> > > > > > > > > > > > >> have
> >> > > > > > > > > > > > >>>> had
> >> > > > > > > > > > > > >>>>>> in
> >> > > > > > > > > > > > >>>>>>>>>>> the
> >> > > > > > > > > > > > >>>>>>>>>>>>> KIP.
> >> > > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>> In the current code, t0 and t1 serve
> >> > queries
> >> > > > > from
> >> > > > > > > > > > > > >>>> Active(Running)
> >> > > > > > > > > > > > >>>>>>>>>>>>> partition. For case t2, we are
> >> planning
> >> > to
> >> > > > > return
> >> > > > > > > > > > > > >>>>>>>>>> List<StreamsMetadata>
> >> > > > > > > > > > > > >>>>>>>>>>>>> such that it returns
> >> <StreamsMetadata(A),
> >> > > > > > > > > > > > >> StreamsMetadata(B)>
> >> > > > > > > > > > > > >>>> so
> >> > > > > > > > > > > > >>>>>> that
> >> > > > > > > > > > > > >>>>>>>>>>> if IQ
> >> > > > > > > > > > > > >>>>>>>>>>>>> fails on A, the replica on B can serve
> >> > the
> >> > > > > data by
> >> > > > > > > > > > enabling
> >> > > > > > > > > > > > >>>> serving
> >> > > > > > > > > > > > >>>>>>>>>>> from
> >> > > > > > > > > > > > >>>>>>>>>>>>> replicas. This still does not solve
> >> case
> >> > t3
> >> > > > > and t4
> >> > > > > > > since
> >> > > > > > > > > > B
> >> > > > > > > > > > > > >>> has
> >> > > > > > > > > > > > >>>> been
> >> > > > > > > > > > > > >>>>>>>>>>>>> promoted to active but it is in
> >> Restoring
> >> > > > > state to
> >> > > > > > > > > > catchup
> >> > > > > > > > > > > > >>>> till A’s
> >> > > > > > > > > > > > >>>>>>>>>>> last
> >> > > > > > > > > > > > >>>>>>>>>>>>> committed position as we don’t serve
> >> from
> >> > > > > > > Restoring state
> >> > > > > > > > > > > > >> in
> >> > > > > > > > > > > > >>>> Active
> >> > > > > > > > > > > > >>>>>>>>>>> and new
> >> > > > > > > > > > > > >>>>>>>>>>>>> Replica on R is building itself from
> >> > scratch.
> >> > > > > Both
> >> > > > > > > these
> >> > > > > > > > > > > > >>> cases
> >> > > > > > > > > > > > >>>> can
> >> > > > > > > > > > > > >>>>>> be
> >> > > > > > > > > > > > >>>>>>>>>>>>> solved if we start serving from
> >> Restoring
> >> > > > > state of
> >> > > > > > > active
> >> > > > > > > > > > > > >> as
> >> > > > > > > > > > > > >>>> well
> >> > > > > > > > > > > > >>>>>>>>>>> since it
> >> > > > > > > > > > > > >>>>>>>>>>>>> is almost equivalent to previous
> >> Active.
> >> > > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>> There could be a case where all
> >> replicas
> >> > of a
> >> > > > > > > partition
> >> > > > > > > > > > > > >>> become
> >> > > > > > > > > > > > >>>>>>>>>>> unavailable
> >> > > > > > > > > > > > >>>>>>>>>>>>> and active and all replicas of that
> >> > partition
> >> > > > > are
> >> > > > > > > > > > building
> >> > > > > > > > > > > > >>>>>> themselves
> >> > > > > > > > > > > > >>>>>>>>>>> from
> >> > > > > > > > > > > > >>>>>>>>>>>>> scratch, in this case, the state in
> >> > Active is
> >> > > > > far
> >> > > > > > > behind
> >> > > > > > > > > > > > >> even
> >> > > > > > > > > > > > >>>>>> though
> >> > > > > > > > > > > > >>>>>>>>>>> it is
> >> > > > > > > > > > > > >>>>>>>>>>>>> in Restoring state. To cater to such
> >> > cases
> >> > > > > that we
> >> > > > > > > don’t
> >> > > > > > > > > > > > >>> serve
> >> > > > > > > > > > > > >>>> from
> >> > > > > > > > > > > > >>>>>>>>>>> this
> >> > > > > > > > > > > > >>>>>>>>>>>>> state we can either add another state
> >> > before
> >> > > > > > > Restoring or
> >> > > > > > > > > > > > >>>> check the
> >> > > > > > > > > > > > >>>>>>>>>>>>> difference between last committed
> >> offset
> >> > and
> >> > > > > > > current
> >> > > > > > > > > > > > >>> position.
> >> > > > > > > > > > > > >>>> Only
> >> > > > > > > > > > > > >>>>>>>>>> if
> >> > > > > > > > > > > > >>>>>>>>>>> it
> >> > > > > > > > > > > > >>>>>>>>>>>>> is within a permissible range (say
> >> > 10000) we
> >> > > > > will
> >> > > > > > > serve
> >> > > > > > > > > > > > >> from
> >> > > > > > > > > > > > >>>>>>>>>> Restoring
> >> > > > > > > > > > > > >>>>>>>>>>> the
> >> > > > > > > > > > > > >>>>>>>>>>>>> state of Active.
> >> > > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>    On Wednesday, 16 October, 2019,
> >> > 10:01:35
> >> > > > pm
> >> > > > > IST,
> >> > > > > > > > > > Vinoth
> >> > > > > > > > > > > > >>>> Chandar
> >> > > > > > > > > > > > >>>>>> <
> >> > > > > > > > > > > > >>>>>>>>>>>>> vchan...@confluent.io> wrote:
> >> > > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>  Thanks for the updates on the KIP,
> >> > Navinder!
> >> > > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>> Few comments
> >> > > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>> - AssignmentInfo is not public API?.
> >> But
> >> > we
> >> > > > > will
> >> > > > > > > change
> >> > > > > > > > > > it
> >> > > > > > > > > > > > >>> and
> >> > > > > > > > > > > > >>>> thus
> >> > > > > > > > > > > > >>>>>>>>>>> need to
> >> > > > > > > > > > > > >>>>>>>>>>>>> increment the version and test for
> >> > > > > version_probing
> >> > > > > > > etc.
> >> > > > > > > > > > > > >> Good
> >> > > > > > > > > > > > >>> to
> >> > > > > > > > > > > > >>>>>>>>>>> separate
> >> > > > > > > > > > > > >>>>>>>>>>>>> that from StreamsMetadata changes
> >> (which
> >> > is
> >> > > > > public
> >> > > > > > > API)
> >> > > > > > > > > > > > >>>>>>>>>>>>> - From what I see, there is going to
> >> be
> >> > > > choice
> >> > > > > > > between
> >> > > > > > > > > > the
> >> > > > > > > > > > > > >>>>>> following
> >> > > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>  A) introducing a new
> >> > > > > > > *KafkaStreams::allMetadataForKey()
> >> > > > > > > > > > > > >> *API
> >> > > > > > > > > > > > >>>> that
> >> > > > > > > > > > > > >>>>>>>>>>>>> potentially returns
> >> List<StreamsMetadata>
> >> > > > > ordered
> >> > > > > > > from
> >> > > > > > > > > > most
> >> > > > > > > > > > > > >>>> upto
> >> > > > > > > > > > > > >>>>>> date
> >> > > > > > > > > > > > >>>>>>>>>>> to
> >> > > > > > > > > > > > >>>>>>>>>>>>> least upto date replicas. Today we
> >> cannot
> >> > > > fully
> >> > > > > > > implement
> >> > > > > > > > > > > > >>> this
> >> > > > > > > > > > > > >>>>>>>>>>> ordering,
> >> > > > > > > > > > > > >>>>>>>>>>>>> since all we know is which hosts are
> >> > active
> >> > > > and
> >> > > > > > > which are
> >> > > > > > > > > > > > >>>> standbys.
> >> > > > > > > > > > > > >>>>>>>>>>>>> However, this aligns well with the
> >> > future.
> >> > > > > KIP-441
> >> > > > > > > adds
> >> > > > > > > > > > the
> >> > > > > > > > > > > > >>> lag
> >> > > > > > > > > > > > >>>>>>>>>>> information
> >> > > > > > > > > > > > >>>>>>>>>>>>> to the rebalancing protocol. We could
> >> > also
> >> > > > sort
> >> > > > > > > replicas
> >> > > > > > > > > > > > >>> based
> >> > > > > > > > > > > > >>>> on
> >> > > > > > > > > > > > >>>>>> the
> >> > > > > > > > > > > > >>>>>>>>>>>>> report lags eventually. This is fully
> >> > > > backwards
> >> > > > > > > > > > compatible
> >> > > > > > > > > > > > >>> with
> >> > > > > > > > > > > > >>>>>>>>>>> existing
> >> > > > > > > > > > > > >>>>>>>>>>>>> clients. Only drawback I see is the
> >> > naming of
> >> > > > > the
> >> > > > > > > > > > existing
> >> > > > > > > > > > > > >>>> method
> >> > > > > > > > > > > > >>>>>>>>>>>>> KafkaStreams::metadataForKey, not
> >> > conveying
> >> > > > the
> >> > > > > > > > > > distinction
> >> > > > > > > > > > > > >>>> that it
> >> > > > > > > > > > > > >>>>>>>>>>> simply
> >> > > > > > > > > > > > >>>>>>>>>>>>> returns the active replica i.e
> >> > > > > > > allMetadataForKey.get(0).
> >> > > > > > > > > > > > >>>>>>>>>>>>>  B) Change
> >> > KafkaStreams::metadataForKey() to
> >> > > > > > > return a
> >> > > > > > > > > > List.
> >> > > > > > > > > > > > >>>> Its a
> >> > > > > > > > > > > > >>>>>>>>>>> breaking
> >> > > > > > > > > > > > >>>>>>>>>>>>> change.
> >> > > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>> I prefer A, since none of the
> >> > > > > semantics/behavior
> >> > > > > > > changes
> >> > > > > > > > > > > > >> for
> >> > > > > > > > > > > > >>>>>> existing
> >> > > > > > > > > > > > >>>>>>>>>>>>> users. Love to hear more thoughts.
> >> Can we
> >> > > > also
> >> > > > > > > work this
> >> > > > > > > > > > > > >> into
> >> > > > > > > > > > > > >>>> the
> >> > > > > > > > > > > > >>>>>>>>>> KIP?
> >> > > > > > > > > > > > >>>>>>>>>>>>> I already implemented A to unblock
> >> > myself for
> >> > > > > now.
> >> > > > > > > Seems
> >> > > > > > > > > > > > >>>> feasible
> >> > > > > > > > > > > > >>>>>> to
> >> > > > > > > > > > > > >>>>>>>>>>> do.
> >> > > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>> On Tue, Oct 15, 2019 at 12:21 PM
> >> Vinoth
> >> > > > > Chandar <
> >> > > > > > > > > > > > >>>>>>>>>> vchan...@confluent.io
> >> > > > > > > > > > > > >>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>> wrote:
> >> > > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> I get your point. But suppose there
> >> > is a
> >> > > > > > > replica which
> >> > > > > > > > > > > > >> has
> >> > > > > > > > > > > > >>>> just
> >> > > > > > > > > > > > >>>>>>>>>>> become
> >> > > > > > > > > > > > >>>>>>>>>>>>>> active, so in that case replica will
> >> > still
> >> > > > be
> >> > > > > > > building
> >> > > > > > > > > > > > >>> itself
> >> > > > > > > > > > > > >>>> from
> >> > > > > > > > > > > > >>>>>>>>>>>>> scratch
> >> > > > > > > > > > > > >>>>>>>>>>>>>> and this active will go to restoring
> >> > state
> >> > > > > till it
> >> > > > > > > > > > catches
> >> > > > > > > > > > > > >>> up
> >> > > > > > > > > > > > >>>> with
> >> > > > > > > > > > > > >>>>>>>>>>>>> previous
> >> > > > > > > > > > > > >>>>>>>>>>>>>> active, wouldn't serving from a
> >> > restoring
> >> > > > > active
> >> > > > > > > make
> >> > > > > > > > > > more
> >> > > > > > > > > > > > >>>> sense
> >> > > > > > > > > > > > >>>>>>>>>>> than a
> >> > > > > > > > > > > > >>>>>>>>>>>>>> replica in such case.
> >> > > > > > > > > > > > >>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>> KIP-441 will change this behavior
> >> such
> >> > that
> >> > > > > > > promotion to
> >> > > > > > > > > > > > >>>> active
> >> > > > > > > > > > > > >>>>>>>>>>> happens
> >> > > > > > > > > > > > >>>>>>>>>>>>>> based on how caught up a replica is.
> >> So,
> >> > > > once
> >> > > > > we
> >> > > > > > > have
> >> > > > > > > > > > that
> >> > > > > > > > > > > > >>>> (work
> >> > > > > > > > > > > > >>>>>>>>>>> underway
> >> > > > > > > > > > > > >>>>>>>>>>>>>> already for 2.5 IIUC) and user sets
> >> > > > > > > > > > num.standby.replicas >
> >> > > > > > > > > > > > >>> 0,
> >> > > > > > > > > > > > >>>> then
> >> > > > > > > > > > > > >>>>>>>>>>> the
> >> > > > > > > > > > > > >>>>>>>>>>>>>> staleness window should not be that
> >> > long as
> >> > > > > you
> >> > > > > > > > > > describe.
> >> > > > > > > > > > > > >>> IMO
> >> > > > > > > > > > > > >>>> if
> >> > > > > > > > > > > > >>>>>>>>>> user
> >> > > > > > > > > > > > >>>>>>>>>>>>> wants
> >> > > > > > > > > > > > >>>>>>>>>>>>>> availability for state, then should
> >> > > > configure
> >> > > > > > > > > > > > >>>> num.standby.replicas
> >> > > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>> 0.
> >> > > > > > > > > > > > >>>>>>>>>>>>> If
> >> > > > > > > > > > > > >>>>>>>>>>>>>> not, then on a node loss, few
> >> partitions
> >> > > > > would be
> >> > > > > > > > > > > > >>> unavailable
> >> > > > > > > > > > > > >>>> for
> >> > > > > > > > > > > > >>>>>> a
> >> > > > > > > > > > > > >>>>>>>>>>> while
> >> > > > > > > > > > > > >>>>>>>>>>>>>> (there are other ways to bring this
> >> > window
> >> > > > > down,
> >> > > > > > > which I
> >> > > > > > > > > > > > >>> won't
> >> > > > > > > > > > > > >>>>>>>>>> bring
> >> > > > > > > > > > > > >>>>>>>>>>> in
> >> > > > > > > > > > > > >>>>>>>>>>>>>> here). We could argue for querying a
> >> > > > restoring
> >> > > > > > > active
> >> > > > > > > > > > > > >> (say a
> >> > > > > > > > > > > > >>>> new
> >> > > > > > > > > > > > >>>>>>>>>> node
> >> > > > > > > > > > > > >>>>>>>>>>>>> added
> >> > > > > > > > > > > > >>>>>>>>>>>>>> to replace a faulty old node) based
> >> on
> >> > AP vs
> >> > > > > CP
> >> > > > > > > > > > > > >> principles.
> >> > > > > > > > > > > > >>>> But
> >> > > > > > > > > > > > >>>>>> not
> >> > > > > > > > > > > > >>>>>>>>>>> sure
> >> > > > > > > > > > > > >>>>>>>>>>>>>> reading really really old values for
> >> the
> >> > > > sake
> >> > > > > of
> >> > > > > > > > > > > > >>> availability
> >> > > > > > > > > > > > >>>> is
> >> > > > > > > > > > > > >>>>>>>>>>> useful.
> >> > > > > > > > > > > > >>>>>>>>>>>>> No
> >> > > > > > > > > > > > >>>>>>>>>>>>>> AP data system would be inconsistent
> >> for
> >> > > > such
> >> > > > > a
> >> > > > > > > long
> >> > > > > > > > > > time
> >> > > > > > > > > > > > >> in
> >> > > > > > > > > > > > >>>>>>>>>>> practice.
> >> > > > > > > > > > > > >>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>> So, I still feel just limiting this
> >> to
> >> > > > standby
> >> > > > > > > reads
> >> > > > > > > > > > > > >>> provides
> >> > > > > > > > > > > > >>>> best
> >> > > > > > > > > > > > >>>>>>>>>>>>>> semantics.
> >> > > > > > > > > > > > >>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>> Just my 2c. Would love to see what
> >> > others
> >> > > > > think
> >> > > > > > > as well.
> >> > > > > > > > > > > > >>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>> On Tue, Oct 15, 2019 at 5:34 AM
> >> Navinder
> >> > > > Brar
> >> > > > > > > > > > > > >>>>>>>>>>>>>> <navinder_b...@yahoo.com.invalid>
> >> > wrote:
> >> > > > > > > > > > > > >>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> Hi Vinoth,
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> Thanks for the feedback.
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>  Can we link the JIRA, discussion
> >> > thread
> >> > > > > also to
> >> > > > > > > the
> >> > > > > > > > > > > > >> KIP.>>
> >> > > > > > > > > > > > >>>>>> Added.
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> Based on the discussion on
> >> KAFKA-6144,
> >> > I
> >> > > > was
> >> > > > > > > under the
> >> > > > > > > > > > > > >>>> impression
> >> > > > > > > > > > > > >>>>>>>>>>> that
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> this KIP is also going to cover
> >> > exposing of
> >> > > > > the
> >> > > > > > > standby
> >> > > > > > > > > > > > >>>>>>>>>> information
> >> > > > > > > > > > > > >>>>>>>>>>> in
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> StreamsMetadata and thus subsume
> >> > > > KAFKA-8994 .
> >> > > > > > > That
> >> > > > > > > > > > would
> >> > > > > > > > > > > > >>>> require
> >> > > > > > > > > > > > >>>>>> a
> >> > > > > > > > > > > > >>>>>>>>>>>>> public
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> API change?>> Sure, I can add
> >> changes
> >> > for
> >> > > > > 8994
> >> > > > > > > in this
> >> > > > > > > > > > > > >> KIP
> >> > > > > > > > > > > > >>>> and
> >> > > > > > > > > > > > >>>>>>>>>> link
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> KAFKA-6144 to KAFKA-8994 as well.
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>  KIP seems to be focussing on
> >> > restoration
> >> > > > > when a
> >> > > > > > > new
> >> > > > > > > > > > node
> >> > > > > > > > > > > > >>> is
> >> > > > > > > > > > > > >>>>>>>>>> added.
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> KIP-441 is underway and has some
> >> major
> >> > > > > changes
> >> > > > > > > proposed
> >> > > > > > > > > > > > >> for
> >> > > > > > > > > > > > >>>> this.
> >> > > > > > > > > > > > >>>>>>>>>> It
> >> > > > > > > > > > > > >>>>>>>>>>>>> would
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> be good to clarify dependencies if
> >> any.
> >> > > > > Without
> >> > > > > > > > > > KIP-441,
> >> > > > > > > > > > > > >> I
> >> > > > > > > > > > > > >>>> am not
> >> > > > > > > > > > > > >>>>>>>>>>> very
> >> > > > > > > > > > > > >>>>>>>>>>>>> sure
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> if we should allow reads from nodes
> >> in
> >> > > > > RESTORING
> >> > > > > > > state,
> >> > > > > > > > > > > > >>> which
> >> > > > > > > > > > > > >>>>>>>>>> could
> >> > > > > > > > > > > > >>>>>>>>>>>>> amount
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> to many minutes/few hours of stale
> >> > reads?
> >> > > > > This
> >> > > > > > > is
> >> > > > > > > > > > > > >>> different
> >> > > > > > > > > > > > >>>> from
> >> > > > > > > > > > > > >>>>>>>>>>>>> allowing
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> querying standby replicas, which
> >> could
> >> > be
> >> > > > > mostly
> >> > > > > > > caught
> >> > > > > > > > > > > > >> up
> >> > > > > > > > > > > > >>>> and
> >> > > > > > > > > > > > >>>>>> the
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> staleness window could be much
> >> > > > > > > smaller/tolerable. (once
> >> > > > > > > > > > > > >>>> again the
> >> > > > > > > > > > > > >>>>>>>>>>> focus
> >> > > > > > > > > > > > >>>>>>>>>>>>> on
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> KAFKA-8994).>> I get your point. But
> >> > > > suppose
> >> > > > > > > there is a
> >> > > > > > > > > > > > >>>> replica
> >> > > > > > > > > > > > >>>>>>>>>>> which
> >> > > > > > > > > > > > >>>>>>>>>>>>> has
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> just become active, so in that case
> >> > replica
> >> > > > > will
> >> > > > > > > still
> >> > > > > > > > > > be
> >> > > > > > > > > > > > >>>>>> building
> >> > > > > > > > > > > > >>>>>>>>>>>>> itself
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> from scratch and this active will
> >> go to
> >> > > > > > > restoring state
> >> > > > > > > > > > > > >>> till
> >> > > > > > > > > > > > >>>> it
> >> > > > > > > > > > > > >>>>>>>>>>> catches
> >> > > > > > > > > > > > >>>>>>>>>>>>> up
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> with previous active, wouldn't
> >> serving
> >> > > > from a
> >> > > > > > > restoring
> >> > > > > > > > > > > > >>>> active
> >> > > > > > > > > > > > >>>>>>>>>> make
> >> > > > > > > > > > > > >>>>>>>>>>> more
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> sense than a replica in such case.
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> Finally, we may need to introduce a
> >> > > > > > > configuration to
> >> > > > > > > > > > > > >>> control
> >> > > > > > > > > > > > >>>>>> this.
> >> > > > > > > > > > > > >>>>>>>>>>> Some
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> users may prefer errors to stale
> >> data.
> >> > Can
> >> > > > we
> >> > > > > > > also add
> >> > > > > > > > > > it
> >> > > > > > > > > > > > >>> to
> >> > > > > > > > > > > > >>>> the
> >> > > > > > > > > > > > >>>>>>>>>>> KIP?>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> Will add this.
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> Regards,
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> Navinder
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> On2019/10/14 16:56:49, Vinoth
> >> Chandar <
> >> > > > > > > > > > v...@confluent.io
> >> > > > > > > > > > > > >>>>> wrote:
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> Hi Navinder,>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> Thanks for sharing the KIP! Few
> >> > thoughts>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> - Can we link the JIRA, discussion
> >> > thread
> >> > > > > also
> >> > > > > > > to the
> >> > > > > > > > > > > > >> KIP>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> - Based on the discussion on
> >> > KAFKA-6144, I
> >> > > > > was
> >> > > > > > > under
> >> > > > > > > > > > the
> >> > > > > > > > > > > > >>>>>>>>>> impression
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> that>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> this KIP is also going to cover
> >> > exposing
> >> > > > of
> >> > > > > the
> >> > > > > > > > > > standby
> >> > > > > > > > > > > > >>>>>>>>>>> information in>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> StreamsMetadata and thus subsume
> >> > > > KAFKA-8994
> >> > > > > .
> >> > > > > > > That
> >> > > > > > > > > > would
> >> > > > > > > > > > > > >>>> require
> >> > > > > > > > > > > > >>>>>>>>>> a
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> public>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> API change?>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> - KIP seems to be focussing on
> >> > restoration
> >> > > > > when
> >> > > > > > > a new
> >> > > > > > > > > > > > >> node
> >> > > > > > > > > > > > >>>> is
> >> > > > > > > > > > > > >>>>>>>>>>> added.>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> KIP-441 is underway and has some
> >> major
> >> > > > > changes
> >> > > > > > > > > > proposed
> >> > > > > > > > > > > > >>> for
> >> > > > > > > > > > > > >>>>>> this.
> >> > > > > > > > > > > > >>>>>>>>>>> It
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> would>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> be good to clarify dependencies if
> >> > any.
> >> > > > > Without
> >> > > > > > > > > > > > >> KIP-441, I
> >> > > > > > > > > > > > >>>> am
> >> > > > > > > > > > > > >>>>>> not
> >> > > > > > > > > > > > >>>>>>>>>>> very
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> sure>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> if we should allow reads from
> >> nodes in
> >> > > > > RESTORING
> >> > > > > > > > > > state,
> >> > > > > > > > > > > > >>>> which
> >> > > > > > > > > > > > >>>>>>>>>> could
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> amount>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> to many minutes/few hours of stale
> >> > reads?
> >> > > > > This
> >> > > > > > > is
> >> > > > > > > > > > > > >>> different
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> fromallowing>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> querying standby replicas, which
> >> > could be
> >> > > > > mostly
> >> > > > > > > > > > caught
> >> > > > > > > > > > > > >> up
> >> > > > > > > > > > > > >>>> and
> >> > > > > > > > > > > > >>>>>>>>>> the>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> staleness window could be much
> >> > > > > > > smaller/tolerable.
> >> > > > > > > > > > (once
> >> > > > > > > > > > > > >>>> again
> >> > > > > > > > > > > > >>>>>> the
> >> > > > > > > > > > > > >>>>>>>>>>> focus
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> on>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> KAFKA-8994)>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> - Finally, we may need to
> >> introduce a
> >> > > > > > > configuration to
> >> > > > > > > > > > > > >>>> control
> >> > > > > > > > > > > > >>>>>>>>>>> this.
> >> > > > > > > > > > > > >>>>>>>>>>>>>>> Some>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> users may prefer errors to stale
> >> > data. Can
> >> > > > > we
> >> > > > > > > also add
> >> > > > > > > > > > > > >> it
> >> > > > > > > > > > > > >>>> to the
> >> > > > > > > > > > > > >>>>>>>>>>> KIP?>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> Thanks>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> Vinoth>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> On Sun, Oct 13, 2019 at 3:31 PM
> >> > Navinder
> >> > > > > Brar>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>> <na...@yahoo.com.invalid>wrote:>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>> Hi,>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>> Starting a discussion on the KIP
> >> to
> >> > Allow
> >> > > > > state
> >> > > > > > > > > > stores
> >> > > > > > > > > > > > >> to
> >> > > > > > > > > > > > >>>> serve
> >> > > > > > > > > > > > >>>>>>>>>>>>> stale>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>> reads during rebalance(>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > > >>>>
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > >
> >> > > > >
> >> > > >
> >> >
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>> ).>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>> Thanks & Regards,Navinder>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>> LinkedIn>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>> --
> >> > > > > > > > > > > > >>>>>>>>>> -- Guozhang
> >> > > > > > > > > > > > >>>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>>
> >> > > > > > > > > > > > >>>>>>>>
> >> > > > > > > > > > > > >>>>>>>
> >> > > > > > > > > > > > >>>>>>
> >> > > > > > > > > > > > >>>>
> >> > > > > > > > > > > > >>>>
> >> > > > > > > > > > > > >>>
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >> --
> >> > > > > > > > > > > > >> -- Guozhang
> >> > > > > > > > > > > > >>
> >> > > > > > > > > > > > >
> >> > > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > >
> >> > > > >
> >> > > >
> >> >
> >> >
> >>
> >

Reply via email to