Becket,

This is a clever approach for to ensure that only one thing communicates
the metadata so even if it is stale, the entire group has the same view.
However, the big assumption this makes is that the coordinator is that one
process that has the ability to know the metadata for group members, which
does not work for any non-consumer use case.

I wonder if we may be complicating the design of 95% use cases for the
remaining 5%. For instance, how many times do people create and remove
topics or even add partitions? We operated LI clusters for a long time and
this wasn't a frequent event that would need us to optimize this design for.

Also, this is something we can easily validate by running a few tests on
the patch and I suggest we wait for that.

Thanks,
Neha

On Sat, Aug 15, 2015 at 9:14 AM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Jiangjie,
>
> I was thinking about the same problem. When metadata is changing
> frequently, the clients may not be able to ever find agreement on the
> current state. The server doesn't have this problem, as you say, because it
> can just take a snapshot and send that to the clients. Adding a dampening
> setting to the client would help if churn is sporadic, but not if it is
> steady. I think the metadata would have to be changing really frequently
> for this to be a problem practically, but this is a case where the
> server-side approach has an advantage.
>
> Including the metadata in the join group response would require making the
> subscription available to the coordinator, right? We lose a little bit of
> the generality of the protocol, but it might not be too bad since most use
> cases for reusing this protocol have a similar need for metadata (and they
> can always pass an empty subscription if they don't). We've gone back and
> forth a few times on this, and I'm generally not opposed. It might help if
> we try to quantify the impact of the metadata churn in practice. I think
> the rate of change would have to be in the low seconds for this to be a
> real problem. It does seem nice though that we have a way to manage even
> this much churn when we do this.
>
> -Jason
>
>
>
> On Fri, Aug 14, 2015 at 9:03 PM, Jiangjie Qin <j...@linkedin.com.invalid>
> wrote:
>
> > Ewen,
> >
> > I agree that if there is a churn in metadata, the consumers need several
> > rounds of rebalances to succeed. The difference I am thinking is that
> with
> > coordinator as single source of truth, we can let the consumer finish one
> > round of rebalance, work for a while and start the next round of
> rebalance.
> > If we purely depend on the consumers to synchronize by themselves based
> on
> > different metadata sources, is it possible we have some groups spending a
> > lot of time on rebalancing but not able to make too much progress in
> > consuming?
> >
> > I'm thinking can we let consumers to fetch metadata only from their
> > coordinator? So the rebalance can be done in the following way:
> >
> > 1. Consumers refresh their metadata periodically
> > 2. If one of the consumer sees a change in metadata that triggers a
> > rebalance, it sends JoinGroupRequest to coordinator.
> > 3. Once the coordinator receives the first JoinGroupRequest of a group,
> it
> > takes a snapshot of current metadata and the group enters
> prepare-rebalance
> > state.
> > 4. The metadata snapshot will be used for this round of rebalance. i.e.
> the
> > metadata snapshot will be sent to consumers in JoinGroupResponse.
> > 4.1 If the consumers are subscribing to explicit topic lists (not regex),
> > the JoinGroupResponse needs only contain the metadata of all topics the
> > group is interested.
> > 4.2 If the consumers are subscribing using regex, all the topic metadata
> > will be returned to the consumer.
> > 5. Consumers get JoinGroupResponse, refresh metadata using the metadata
> in
> > JoinGroupResponse, run algorithm to assign partitions and start consume.
> > 6. Go back to 1.
> >
> > The benefit here is that we can let rebalance finish in one round, and
> all
> > the rest of changes will be captured in next consumer metadata refresh -
> so
> > we get group commit. One concern might be letting consumer refresh
> metadata
> > from coordinator might cause issue for big consumer groups. Maybe that is
> > OK because metadata refresh is infrequent.
> >
> > This approach actually is very similar to what is proposed now: rebalance
> > is triggered by metadata refresh, consumer provides subscription list to
> > pass around. The only difference is that we don't need metadata hash
> > anymore because the metadata is guaranteed to be the same. Replacing
> > metadata hash with actual metadata will not have too much overhead for
> > small subscription groups. There will be some overhead for regex
> > subscriptions, but this can save the potential extra round of metadata
> > fetch and will only occur when consumer see metadata change, which is
> > infrequent.
> >
> > Any thoughts?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Fri, Aug 14, 2015 at 12:57 PM, Ewen Cheslack-Postava <
> e...@confluent.io
> > >
> > wrote:
> >
> > > On Fri, Aug 14, 2015 at 10:59 AM, Jiangjie Qin
> <j...@linkedin.com.invalid
> > >
> > > wrote:
> > >
> > > > Neha and Ewen,
> > > >
> > > > About the metadata change frequency. I guess it really depends on how
> > > > frequent the metadata change might occur. If we run Kafka as a
> > service, I
> > > > can see that happens from time to time. As I can imagine people will
> > > create
> > > > some topic, test and maybe delete the topic in some automated test.
> If
> > > so,
> > > > the proposed protocol might be a little bit vulnerable.
> > > >
> > > > More specifically the scenario I am thinking is:
> > > > 1. Consumer 0 periodically refresh metadata and detected a metadata
> > > change.
> > > > So it sends a JoinGroupRequest with metadata_hash_0.
> > > > 2. Consumer 1 was notified by controller to start a rebalance, so it
> > > > refreshes its metadata and send a JoingGroupRequest with
> > metadata_hash_1,
> > > > which is different from metadata hash 0.
> > > > 3. Rebalance failed and both consumer refresh there metadata again
> from
> > > > different brokers.
> > > > 4. Depending on the metadata change frequency(or some admin operation
> > > like
> > > > partition reassigment), they may or may not have the same metadata
> > > > returned, so the restart from 3 again.
> > > >
> > > > I agree that step 4 might not be a big concern if consumers updates
> > > > metadata at almost the same time, but I'm a little bit worried
> whether
> > > that
> > > > assumption really stands because we do not have control over how
> > frequent
> > > > the metadata can change.
> > > >
> > > >
> > > Is this really that different from what would happen if the coordinator
> > > distributed the metadata to consumers? In that case you would trivially
> > > have everyone in a consistent state, but those metadata changes would
> > still
> > > cause churn and require JoinGroup rounds, during which processing is
> > > stalled for the nodes that are waiting on other members to re-join the
> > > group.
> > >
> > > -Ewen
> > >
> > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Fri, Aug 14, 2015 at 2:03 AM, Ewen Cheslack-Postava <
> > > e...@confluent.io>
> > > > wrote:
> > > >
> > > > > On Thu, Aug 13, 2015 at 11:07 PM, Neha Narkhede <n...@confluent.io
> >
> > > > wrote:
> > > > >
> > > > > > Becket,
> > > > > >
> > > > > > As you say, the metadata hash addresses the concern you
> originally
> > > > raised
> > > > > > about large topic subscriptions. Can you please list other
> problems
> > > you
> > > > > are
> > > > > > raising more clearly? It is more helpful to know problems that
> the
> > > > > proposal
> > > > > > does not address or addresses poorly.
> > > > > >
> > > > > > Regarding other things you said -
> > > > > >
> > > > > > it is required that each
> > > > > > > consumer refresh their metadata before sending a
> JoinGroupRequest
> > > > > > >
> > > > > >
> > > > > > This is required for wildcard topic subscriptions anyway. So this
> > > > > proposal
> > > > > > does not introduce a regression. We had agreed earlier that it
> does
> > > not
> > > > > > make sense for the server to deserialize regular expressions sent
> > by
> > > > the
> > > > > > consumer.
> > > > > >
> > > > >
> > > > > I don't think consumers need to do a metadata refresh before
> sending
> > a
> > > > > JoinGroupRequest. Metadata changes that affect assignment are rare
> --
> > > it
> > > > > requires changing the number of partitions in a topic. But you
> might
> > > > send a
> > > > > JoinGroupRequest simply because a new member is trying to join the
> > > group.
> > > > > That case is presumably much more common.
> > > > >
> > > > > I think it's actually a good idea to have the first JoinGroup cycle
> > > fail
> > > > in
> > > > > some cases, and has little impact. Lets say the metadata does
> change
> > > > > because partitions are added. Then we might fail in the first
> round,
> > > but
> > > > > then all members detect that issue *immediately*, refresh their
> > > metadata,
> > > > > and submit a new join group request. This second cycle does not
> > > require a
> > > > > full heartbeat cycle. It happens much more quickly because everyone
> > > > > detected the inconsistency based on the first JoinGroupResponse.
> The
> > > > > inconsistency should be resolved very quickly (barring other
> failures
> > > > like
> > > > > a member leaving mid-rebalance)
> > > >
> > > >
> > > >
> > > >
> > > > > >
> > > > > >
> > > > > > > the metadata might still be inconsistent if there is a topic or
> > > > > partition
> > > > > > > change because the
> > > > > > > UpdateMetadataRequest from controller might be handled at
> > different
> > > > > time.
> > > > > > >
> > > > > >
> > > > > > Topic metadata does not change frequently and even if it did, a
> > > couple
> > > > > > rebalance attempts will be needed whether the coordinator drives
> > the
> > > > > > assignments or the consumer. Because guess how the coordinator
> > knows
> > > > > about
> > > > > > the topic metadata changes -- indirectly through either a zk
> > callback
> > > > or
> > > > > > UpdateMetadataRequest, so it is completely possible the
> coordinator
> > > > sees
> > > > > > the topic metadata changes in batches, not all at once.
> > > > > >
> > > > >
> > > > > > On Thu, Aug 13, 2015 at 10:50 PM, Neha Narkhede <
> n...@confluent.io
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Ewen/Jason,
> > > > > > >
> > > > > > > The metadata hash is a clever approach and certainly addresses
> > the
> > > > > > problem
> > > > > > > of large metadata for consumers like mirror maker. Few
> comments -
> > > > > > >
> > > > > > >
> > > > > > >    1. In the interest of simplifying the format of the
> consumer's
> > > > > > >    metadata - Why not just always include only the topic names
> in
> > > the
> > > > > > metadata
> > > > > > >    followed by the metadata hash? If the metadata hash check
> > > > succeeds,
> > > > > > each
> > > > > > >    consumer uses the # of partitions it had fetched. If it
> > fails, a
> > > > > > rebalance
> > > > > > >    happens and the metadata is not used anyway.
> > > > > >
> > > > >
> > > > > Doing this requires that every consumer always fetch the full
> > metadata.
> > > > The
> > > > > most common use case is consumers that just want to consume one or
> a
> > > > couple
> > > > > of topics, in which case grabbing all metadata for the entire
> cluster
> > > is
> > > > > wasteful. If I subscribe only to topic A, why make all consumers
> grab
> > > > > metadata for the entire topic (and need to rebalance every time it
> > > > > changes!). Including the # of partitions for each topic lets you
> > avoid
> > > > > having to grab the global set of metadata.
> > > > >
> > > > > So if you're just subscribing to one or a couple of topics, why not
> > > just
> > > > > compute the hash by filtering out everything but the topics you are
> > > > > subscribed to? The problem there is if you ever add/remove
> > > subscriptions
> > > > > and want to support rolling upgrades. If the group was subscribed
> to
> > > > topic
> > > > > A, but later changes require subscribing to A + B, then to achieve
> a
> > > > > seamless rolling upgrade would require one (old) consumer to be
> > > > subscribing
> > > > > to A and one (new) consumer to be subscribing to A+B. If we
> computed
> > > > > metadata hashes based on filtered metadata, those two would
> disagree
> > > and
> > > > we
> > > > > could not perform assignment while the upgrade was in progress.
> > > > >
> > > > > The solution is to differentiate between the cases when a very
> small
> > > > amount
> > > > > of the metadata is needed (one or a couple of topic subscriptions;
> > > > > communicate and share this via metadata in the JoinGroup protocol)
> vs
> > > > when
> > > > > *all* the metadata is needed (regex subscription; verify agreement
> > via
> > > > > hash).
> > > > >
> > > >
> > > > >
> > > > >
> > > > > > >    2. Do you need a topic list and topic regex to be separate?
> A
> > > > single
> > > > > > >    topic or list of topics can be expressed as a regex.
> > > > > >
> > > > >
> > > > > See above note about collecting all metadata when you really only
> > need
> > > it
> > > > > for 1 or 2 topics. There's probably some debate to be had about
> > whether
> > > > > this cost would be too high -- every consumer would need to request
> > the
> > > > > metadata for all topics, and they'd need to request that all every
> > time
> > > > > they might be out of date.
> > > > >
> > > > Are we going to allow consumers in the same group to subscribe to
> > > different
> > > > topic set? If we do, we need to let them refresh metadata for all the
> > > > topics a group is consuming from. If we don't then in the protocol we
> > > only
> > > > need a subscription set hash.
> > > >
> > > > >
> > > > >
> > > > > > >    3. Let's include a version explicitly at the beginning of
> the
> > > > > > >    ProtocolMetadata. The version dictates how to deserialize
> the
> > > > > > >    ProtocolMetadata blob and is consistent with the rest of
> > Kafka.
> > > > > >
> > > > >
> > > > > If I'm understanding correctly, in JoinGroupRequest I would change
> > > > >
> > > > > GroupProtocols          => [Protocol ProtocolMetadata]
> > > > >
> > > > > to
> > > > >
> > > > > GroupProtocols          => [Protocol ProtocolVersion
> > ProtocolMetadata]
> > > > >
> > > > > We had been talking about just baking the version into the Protocol
> > > > field,
> > > > > but making it separate seems perfectly reasonable to me. Jason, any
> > > issue
> > > > > with splitting the version out into a separate field like this?
> > > > >
> > > > > >
> > > > > > > That can simplify the metadata format to the following:
> > > > > > >
> > > > > > > GroupType => "consumer"
> > > > > > >>
> > > > > > >> Protocol => AssignmentStrategy
> > > > > > >>   AssignmentStrategy   => String
> > > > > > >>
> > > > > > >> ProtocolMetadata => Version Subscription
> > > AssignmentStrategyMetadata
> > > > > > >
> > > > > > >     Version                    => String
> > > > > > >
> > > > > > >   Subscription                 => TopicRegex MetadataHash
> > > > > > >>     TopicRegex                 => String
> > > > > > >>     MetadataHash               => String
> > > > > > >>   AssignmentStrategyMetadata   => bytes
> > > > > >
> > > > > >
> > > > > > >
> > > > > > > On Thu, Aug 13, 2015 at 6:28 PM, Jiangjie Qin
> > > > > <j...@linkedin.com.invalid
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Ewen and Jason,
> > > > > > >>
> > > > > > >> Thanks for the reply. Sorry I missed the metadata hash. Yes,
> > that
> > > > is a
> > > > > > >> clever approach and would solve my concern about the data
> > passing
> > > > > > around.
> > > > > > >> I
> > > > > > >> can see both pros and cons from doing this, though. The
> > advantage
> > > is
> > > > > we
> > > > > > >> don't need the topic metadata in JoinGroupResponse anymore.
> The
> > > > > downside
> > > > > > >> is
> > > > > > >> that now rebalance have extra dependency on the consensus of
> > > > metadata
> > > > > of
> > > > > > >> all consumers, which is obtained separately. So it is required
> > > that
> > > > > each
> > > > > > >> consumer refresh their metadata before sending a
> > JoinGroupRequest,
> > > > > > >> otherwise in some cases (e.g. wildcard consumers) will almost
> > > > > certainly
> > > > > > >> fail for the first rebalance attempt. Even if we do that,
> since
> > > the
> > > > > > >> consumers are getting metadata from different brokers, the
> > > metadata
> > > > > > might
> > > > > > >> still be inconsistent if there is a topic or partition change
> > > > because
> > > > > > the
> > > > > > >> UpdateMetadataRequest from controller might be handled at
> > > different
> > > > > > time.
> > > > > > >> Just want to make sure we think through the cases so the
> > protocol
> > > > does
> > > > > > not
> > > > > > >> cause us unexpected issues.
> > > > > > >>
> > > > > > >> About the number of consumers, I think with the current
> > liveliness
> > > > > > >> definition, we can tolerate churns by bumping up the session
> > > > timeout.
> > > > > > Also
> > > > > > >> I guess we will see an increasing number of consumers for new
> > > > > consumer,
> > > > > > >> because every the old consumer thread will probably become a
> > > > consumer.
> > > > > > >>
> > > > > > >> It is a valid concern for consumers that have large
> subscription
> > > > set.
> > > > > > This
> > > > > > >> might not be avoided though for client side assignment
> approach.
> > > One
> > > > > > >> solution is having topic names associate with a topic ID. And
> > only
> > > > use
> > > > > > >> topic ID in JoinGroupRequest and JoinGroupResponse, There is a
> > > > > > discussion
> > > > > > >> thread about this to solve the topic renaming case but this
> is a
> > > > > > >> completely
> > > > > > >> different discussion.
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >>
> > > > > > >> Jiangjie (Becket) Qin
> > > > > > >>
> > > > > > >>
> > > > > > >>
> > > > > > >> On Thu, Aug 13, 2015 at 2:14 PM, Jason Gustafson <
> > > > ja...@confluent.io>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >> > Thanks Jiangjie, that information helps. I agree the
> protocol
> > > must
> > > > > > >> consider
> > > > > > >> > scalability. My point was that the synchronization barrier
> in
> > > the
> > > > > > >> current
> > > > > > >> > protocol already effectively limits the number of consumers
> > > since
> > > > it
> > > > > > >> > provides no way to gracefully handle churn. It wouldn't be
> > worth
> > > > > > >> worrying
> > > > > > >> > about scaling up to 100,000 members, for example, because
> > > there's
> > > > no
> > > > > > way
> > > > > > >> > the group would be stable. So we just need to set some clear
> > > > > > >> expectations
> > > > > > >> > on the size we can scale to, and that can help inform the
> > > > discussion
> > > > > > on
> > > > > > >> the
> > > > > > >> > size of messages in this protocol.
> > > > > > >> >
> > > > > > >> > Ewen and I were discussing this morning along similar lines
> to
> > > > what
> > > > > > >> you're
> > > > > > >> > suggesting. However, even if the coordinator decides on the
> > > > metadata
> > > > > > for
> > > > > > >> > the group, each member still needs to communicate its
> > > > subscriptions
> > > > > to
> > > > > > >> the
> > > > > > >> > rest of the group. This is nice for the regex case since the
> > > regex
> > > > > is
> > > > > > >> > probably small, but if the members have a large topic list,
> > then
> > > > we
> > > > > > have
> > > > > > >> > the same problem. One thing I was thinking about was whether
> > we
> > > > > really
> > > > > > >> need
> > > > > > >> > to handle different subscriptions for every member. If the
> > > > > coordinator
> > > > > > >> > could guarantee that all members had the same subscription,
> > then
> > > > > there
> > > > > > >> > would be no need for the coordinator to return the
> > subscriptions
> > > > for
> > > > > > >> each
> > > > > > >> > member. However, this would prevent graceful upgrades. We
> > might
> > > be
> > > > > > able
> > > > > > >> to
> > > > > > >> > fix that problem by allowing the consumer to provide two
> > > > > subscriptions
> > > > > > >> to
> > > > > > >> > allowing rolling updates, but that starts to sound pretty
> > nasty.
> > > > > > >> >
> > > > > > >> > -Jason
> > > > > > >> >
> > > > > > >> > On Thu, Aug 13, 2015 at 1:41 PM, Jiangjie Qin
> > > > > > <j...@linkedin.com.invalid
> > > > > > >> >
> > > > > > >> > wrote:
> > > > > > >> >
> > > > > > >> > > Jason,
> > > > > > >> > >
> > > > > > >> > > The protocol has to consider the scalability. The protocol
> > in
> > > > the
> > > > > > wiki
> > > > > > >> > > means the JoinGroupResoponse size would be:
> > > > > > >> > > NumberOfTopics * (AvgTopicNameLength + 4) *
> > > > (NumberOfConsumers)^2
> > > > > > >> > >
> > > > > > >> > > To give some real number, we have 26-node Mirror Maker
> > > cluster,
> > > > > each
> > > > > > >> > with 4
> > > > > > >> > > consumers. That is 104 consumers using regex ".*". And
> most
> > of
> > > > our
> > > > > > >> > clusters
> > > > > > >> > > have around 3000 topics, whose topic name are typically
> > around
> > > > 20
> > > > > > >> > > characters.
> > > > > > >> > >
> > > > > > >> > > I think the key issue for client side partition assignment
> > > logic
> > > > > is
> > > > > > to
> > > > > > >> > make
> > > > > > >> > > sure 1) all the clients run the same algorithm. 2) all the
> > > > clients
> > > > > > >> make
> > > > > > >> > > decision on the same topic metadata. The second purpose
> can
> > be
> > > > > done
> > > > > > by
> > > > > > >> > > simply letting coordinator provide the topic metadata and
> > all
> > > > then
> > > > > > >> member
> > > > > > >> > > information as source of truth. Is it necessary to pass
> > topic
> > > > > > >> metadata of
> > > > > > >> > > each consumer around? Can we keep the protocol metadata
> > field
> > > > > > >> completely
> > > > > > >> > > independent of topic metadata? I think In the
> > > JoinGroupResponse,
> > > > > we
> > > > > > >> > should
> > > > > > >> > > have only one copy of topic metadata provided by
> coordinator
> > > and
> > > > > is
> > > > > > >> > outside
> > > > > > >> > > of protocol metadata. If user decides to put some metadata
> > in
> > > > the
> > > > > > >> > > JoinGroupRequest and let coordinator pass around, they are
> > > > > > responsible
> > > > > > >> > for
> > > > > > >> > > understanding the risk.
> > > > > > >> > >
> > > > > > >> > > Thanks,
> > > > > > >> > >
> > > > > > >> > > Jiangjie (Becket) Qin
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > On Thu, Aug 13, 2015 at 12:41 PM, Jason Gustafson <
> > > > > > ja...@confluent.io
> > > > > > >> >
> > > > > > >> > > wrote:
> > > > > > >> > >
> > > > > > >> > > > Hey Onur and Jiangjie,
> > > > > > >> > > >
> > > > > > >> > > > I've updated that wiki with a proposal to add regex
> > > > > subscriptions
> > > > > > to
> > > > > > >> > the
> > > > > > >> > > > consumer metadata. Can you have a look to see if it
> > > addresses
> > > > > your
> > > > > > >> > > concern?
> > > > > > >> > > > In general, I think we should be a little careful when
> we
> > > are
> > > > > > >> talking
> > > > > > >> > > about
> > > > > > >> > > > the scalability of the protocol. Regardless of whether
> > > > > assignment
> > > > > > is
> > > > > > >> > done
> > > > > > >> > > > on the server or the client, the protocol assumes a
> > > relatively
> > > > > > >> stable
> > > > > > >> > > > configuration. When the number of consumers increases
> > > beyond a
> > > > > > >> certain
> > > > > > >> > > > limit, then membership churn becomes a major concern.
> > > > Similarly
> > > > > > >> there
> > > > > > >> > is
> > > > > > >> > > a
> > > > > > >> > > > notion of metadata churn when topics are added, deleted,
> > or
> > > > > > >> resized. If
> > > > > > >> > > > either membership or metadata changes, then the protocol
> > > > forces
> > > > > > all
> > > > > > >> > > > consumers to stop consumption and rejoin the group. If
> > this
> > > > > > happens
> > > > > > >> > often
> > > > > > >> > > > enough, then it can severely impact the ability of the
> > > > consumer
> > > > > to
> > > > > > >> make
> > > > > > >> > > > progress. The point is that the protocol may already be
> > > > unsuited
> > > > > > to
> > > > > > >> > cases
> > > > > > >> > > > where there are either a large number of consumers or
> > > topics.
> > > > I
> > > > > > >> wonder
> > > > > > >> > if
> > > > > > >> > > > you guys can share your thoughts about your scaling
> > > > > expectations?
> > > > > > >> > > >
> > > > > > >> > > > -Jason
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > On Wed, Aug 12, 2015 at 12:28 PM, Jason Gustafson <
> > > > > > >> ja...@confluent.io>
> > > > > > >> > > > wrote:
> > > > > > >> > > >
> > > > > > >> > > > > Hey Jiangjie,
> > > > > > >> > > > >
> > > > > > >> > > > > That's a great point. In the worst case (the mirror
> > maker
> > > > > case I
> > > > > > >> > > guess),
> > > > > > >> > > > > the join group response can be massive. This would be
> > > > > especially
> > > > > > >> > deadly
> > > > > > >> > > > > when there is a lot of churn in the group (e.g. in a
> > > rolling
> > > > > > >> > upgrade).
> > > > > > >> > > > The
> > > > > > >> > > > > current protocol is not great for this case either,
> but
> > > it's
> > > > > > >> > > > significantly
> > > > > > >> > > > > better. Here are a couple ways to deal with the size:
> > > > > > >> > > > >
> > > > > > >> > > > > 1. First, we could have the coordinator compress the
> > > > > responses.
> > > > > > >> This
> > > > > > >> > > > would
> > > > > > >> > > > > probably be pretty effective if applied across the
> > > metadata
> > > > > from
> > > > > > >> all
> > > > > > >> > > > > members.
> > > > > > >> > > > >
> > > > > > >> > > > > 2. I think the regex case is the main problem. Is that
> > > > right?
> > > > > We
> > > > > > >> > could
> > > > > > >> > > > > extend the metadata to allow the consumer to embed its
> > > regex
> > > > > > >> > > subscription
> > > > > > >> > > > > in the metadata directly (note this might be a good
> idea
> > > > > > >> regardless
> > > > > > >> > of
> > > > > > >> > > > the
> > > > > > >> > > > > rest of this proposal). To support regex on the
> > consumer,
> > > we
> > > > > > must
> > > > > > >> > fetch
> > > > > > >> > > > > metadata for all topics. Rather than having all regex
> > > > > > subscribers
> > > > > > >> > embed
> > > > > > >> > > > all
> > > > > > >> > > > > of this metadata in their join group requests, they
> > could
> > > > > > instead
> > > > > > >> > > embed a
> > > > > > >> > > > > hash of it. Then after the join group responses are
> > > > received,
> > > > > > they
> > > > > > >> > just
> > > > > > >> > > > > need to check that the hashes are the same. If there
> is
> > a
> > > > > > mismatch
> > > > > > >> > > (which
> > > > > > >> > > > > should only occur when topics are created, deleted, or
> > > > > resized),
> > > > > > >> then
> > > > > > >> > > the
> > > > > > >> > > > > group members must refetch the metadata and rejoin the
> > > > group.
> > > > > > >> This is
> > > > > > >> > > > also
> > > > > > >> > > > > how the current protocol behaves when there is a
> change
> > in
> > > > the
> > > > > > >> topic
> > > > > > >> > > > > metadata affecting the group--someone (either the
> > > > coordinator
> > > > > or
> > > > > > >> the
> > > > > > >> > > > > consumer) detects the change and forces the group to
> > > > > rebalance.
> > > > > > >> > > > >
> > > > > > >> > > > > What do you think?
> > > > > > >> > > > >
> > > > > > >> > > > > (Also I think adding groupId/generationId to fetch and
> > > > produce
> > > > > > >> > requests
> > > > > > >> > > > > seems like an interesting line of thought.)
> > > > > > >> > > > >
> > > > > > >> > > > > -Jason
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > > On Wed, Aug 12, 2015 at 10:57 AM, Jiangjie Qin
> > > > > > >> > > <j...@linkedin.com.invalid
> > > > > > >> > > > >
> > > > > > >> > > > > wrote:
> > > > > > >> > > > >
> > > > > > >> > > > >> Hey Ewen,
> > > > > > >> > > > >>
> > > > > > >> > > > >> Onur and I discussed this a little bit more. And we
> are
> > > > still
> > > > > > >> > worrying
> > > > > > >> > > > >> about passing all the metadata of all consumers
> around.
> > > > > > >> > > > >>
> > > > > > >> > > > >> Let's say I have a cluster has 10,000 topics, the
> > average
> > > > > topic
> > > > > > >> name
> > > > > > >> > > > >> length
> > > > > > >> > > > >> is 10 bytes. In this case, the opaque metadata will
> > have
> > > > 10 *
> > > > > > >> > 10,000 =
> > > > > > >> > > > >> 100KB for topic name, for each topic, there is a
> 4-byte
> > > > > integer
> > > > > > >> of
> > > > > > >> > > > number
> > > > > > >> > > > >> of partitions, that's another 40KB. So one global
> topic
> > > > > > metadata
> > > > > > >> > will
> > > > > > >> > > > have
> > > > > > >> > > > >> 140KB data. If I have 100 consumers who are using
> > > wildcard
> > > > to
> > > > > > >> > consume
> > > > > > >> > > > from
> > > > > > >> > > > >> all the topics. That means the protocol metadata end
> up
> > > in
> > > > > the
> > > > > > >> > > > >> JoinGroupResponse will be 140KB * 100 = 14MB. And the
> > > > > > >> > > JoinGroupResponse
> > > > > > >> > > > >> will need to be sent to 100 different consumers, that
> > > means
> > > > > > 14MB
> > > > > > >> *
> > > > > > >> > > 100 =
> > > > > > >> > > > >> 1.4GB need to be sent by the consumer coordinator for
> > one
> > > > > > >> rebalance.
> > > > > > >> > > How
> > > > > > >> > > > >> would that work?
> > > > > > >> > > > >>
> > > > > > >> > > > >> Also, having two consumers (old owner and new owner)
> > > > > consuming
> > > > > > >> from
> > > > > > >> > > the
> > > > > > >> > > > >> same partition might also be a problem. e.g. people
> are
> > > > > > updating
> > > > > > >> > > > database.
> > > > > > >> > > > >> One thing might worth doing is to add GroupId and
> > > > Generation
> > > > > ID
> > > > > > >> to
> > > > > > >> > > > >> ProducerRequest and FetchRequest as well. This will
> > also
> > > > help
> > > > > > >> with
> > > > > > >> > the
> > > > > > >> > > > >> single producer use case. However, this is probably
> > > > > orthogonal
> > > > > > to
> > > > > > >> > this
> > > > > > >> > > > >> thread given the current new consumer also has this
> > > problem
> > > > > > and I
> > > > > > >> > > > believe
> > > > > > >> > > > >> we need to fix it.
> > > > > > >> > > > >>
> > > > > > >> > > > >> Thanks,
> > > > > > >> > > > >>
> > > > > > >> > > > >> Jiangjie (Becket) Qin
> > > > > > >> > > > >>
> > > > > > >> > > > >> On Tue, Aug 11, 2015 at 11:43 PM, Ewen
> > Cheslack-Postava <
> > > > > > >> > > > >> e...@confluent.io>
> > > > > > >> > > > >> wrote:
> > > > > > >> > > > >>
> > > > > > >> > > > >> > On Tue, Aug 11, 2015 at 11:29 PM, Jiangjie Qin
> > > > > > >> > > > >> <j...@linkedin.com.invalid>
> > > > > > >> > > > >> > wrote:
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > > Ewen,
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > > Thanks for the explanation.
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > > For (1), I am more concerned about the failure
> case
> > > > > instead
> > > > > > >> of
> > > > > > >> > > > normal
> > > > > > >> > > > >> > case.
> > > > > > >> > > > >> > > What if a consumer somehow was kick out of a
> group
> > > but
> > > > is
> > > > > > >> still
> > > > > > >> > > > >> consuming
> > > > > > >> > > > >> > > and committing offsets? Does that mean the new
> > owner
> > > > and
> > > > > > old
> > > > > > >> > owner
> > > > > > >> > > > >> might
> > > > > > >> > > > >> > > potentially consuming from and committing offsets
> > for
> > > > the
> > > > > > >> same
> > > > > > >> > > > >> partition?
> > > > > > >> > > > >> > > In the old consumer, this won't happen because
> the
> > > new
> > > > > > >> consumer
> > > > > > >> > > will
> > > > > > >> > > > >> not
> > > > > > >> > > > >> > be
> > > > > > >> > > > >> > > able to start consumption unless the previous
> owner
> > > has
> > > > > > >> released
> > > > > > >> > > its
> > > > > > >> > > > >> > > ownership. Basically, without the ownership
> > > guarantee,
> > > > I
> > > > > > >> don't
> > > > > > >> > see
> > > > > > >> > > > how
> > > > > > >> > > > >> > the
> > > > > > >> > > > >> > > communication among consumers themselves alone
> can
> > > > solve
> > > > > > the
> > > > > > >> > > problem
> > > > > > >> > > > >> > here.
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > The generation ID check still applies to offset
> > > commits.
> > > > If
> > > > > > >> one of
> > > > > > >> > > the
> > > > > > >> > > > >> > consumers is kicked out and misbehaving, it can
> > > obviously
> > > > > > still
> > > > > > >> > > fetch
> > > > > > >> > > > >> and
> > > > > > >> > > > >> > process messages, but offset commits will not work
> > > since
> > > > it
> > > > > > >> will
> > > > > > >> > not
> > > > > > >> > > > >> have
> > > > > > >> > > > >> > the current generation ID.
> > > > > > >> > > > >> >
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > > For (2) and (3), now I understand how metadata
> are
> > > > used.
> > > > > > But
> > > > > > >> I
> > > > > > >> > > still
> > > > > > >> > > > >> > don't
> > > > > > >> > > > >> > > see why should we let the consumers to pass the
> > topic
> > > > > > >> > information
> > > > > > >> > > > >> across
> > > > > > >> > > > >> > > instead of letting coordinator give the
> > information.
> > > > The
> > > > > > >> single
> > > > > > >> > > > >> producer
> > > > > > >> > > > >> > > use case does not solve the ownership problem in
> > > > abnormal
> > > > > > >> case
> > > > > > >> > > > either,
> > > > > > >> > > > >> > > which seems to be a little bit vulnerable.
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > One of the goals here was to generalize group
> > > membership
> > > > so
> > > > > > we
> > > > > > >> > can,
> > > > > > >> > > > for
> > > > > > >> > > > >> > example, use it for balancing Copycat tasks across
> > > > workers.
> > > > > > >> > There's
> > > > > > >> > > no
> > > > > > >> > > > >> > topic subscription info in that case. The metadata
> > for
> > > > > > copycat
> > > > > > >> > > workers
> > > > > > >> > > > >> > would instead need to somehow indicate the current
> > set
> > > of
> > > > > > tasks
> > > > > > >> > that
> > > > > > >> > > > >> need
> > > > > > >> > > > >> > to be assigned to workers. By making the metadata
> > > > > completely
> > > > > > >> > opaque
> > > > > > >> > > to
> > > > > > >> > > > >> the
> > > > > > >> > > > >> > protocol, it becomes more generally useful since it
> > > > focuses
> > > > > > >> > squarely
> > > > > > >> > > > on
> > > > > > >> > > > >> the
> > > > > > >> > > > >> > group membership problem, allowing for that
> > additional
> > > > bit
> > > > > of
> > > > > > >> > > metadata
> > > > > > >> > > > >> so
> > > > > > >> > > > >> > you don't just get a list of members, but also get
> a
> > > > little
> > > > > > >> bit of
> > > > > > >> > > > info
> > > > > > >> > > > >> > about each of them.
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > A different option that we explored is to use a
> sort
> > of
> > > > > mixed
> > > > > > >> > model
> > > > > > >> > > --
> > > > > > >> > > > >> > still bake all the topic subscriptions directly
> into
> > > the
> > > > > > >> protocol
> > > > > > >> > > but
> > > > > > >> > > > >> also
> > > > > > >> > > > >> > include metadata. That would allow us to maintain
> the
> > > > > > existing
> > > > > > >> > > > >> > coordinator-driven approach to handling the
> metadata
> > > and
> > > > > > change
> > > > > > >> > > events
> > > > > > >> > > > >> like
> > > > > > >> > > > >> > the ones Onur pointed out. Then something like the
> > > > Copycat
> > > > > > >> workers
> > > > > > >> > > > would
> > > > > > >> > > > >> > just not fill in any topic subscriptions and it
> would
> > > be
> > > > > > >> handled
> > > > > > >> > as
> > > > > > >> > > a
> > > > > > >> > > > >> > degenerate case. Based on the way I explained that
> we
> > > can
> > > > > > >> handle
> > > > > > >> > > those
> > > > > > >> > > > >> > types of events, I personally feel its cleaner and
> a
> > > > nicer
> > > > > > >> > > > >> generalization
> > > > > > >> > > > >> > to not include the subscriptions in the join group
> > > > > protocol,
> > > > > > >> > making
> > > > > > >> > > it
> > > > > > >> > > > >> part
> > > > > > >> > > > >> > of the metadata instead.
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > For the single producer case, are you saying it
> > doesn't
> > > > > solve
> > > > > > >> > > > ownership
> > > > > > >> > > > >> in
> > > > > > >> > > > >> > the abnormal case because a producer that doesn't
> > know
> > > it
> > > > > has
> > > > > > >> been
> > > > > > >> > > > >> kicked
> > > > > > >> > > > >> > out of the group yet can still produce data even
> > though
> > > > it
> > > > > > >> > shouldn't
> > > > > > >> > > > be
> > > > > > >> > > > >> > able to anymore? I definitely agree that that is a
> > risk
> > > > --
> > > > > > this
> > > > > > >> > > > >> provides a
> > > > > > >> > > > >> > way to get closer to a true single-writer, but
> there
> > > are
> > > > > > >> > definitely
> > > > > > >> > > > >> still
> > > > > > >> > > > >> > failure modes that this does not address.
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > -Ewen
> > > > > > >> > > > >> >
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > > Thanks,
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > > Jiangjie (Becket) Qin
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > > On Tue, Aug 11, 2015 at 11:06 PM, Ewen
> > > > Cheslack-Postava <
> > > > > > >> > > > >> > e...@confluent.io
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > wrote:
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > > > On Tue, Aug 11, 2015 at 10:15 PM, Jiangjie Qin
> > > > > > >> > > > >> > <j...@linkedin.com.invalid
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > wrote:
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > > Hi Jason,
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > > Thanks for writing this up. It would be
> useful
> > to
> > > > > > >> generalize
> > > > > > >> > > the
> > > > > > >> > > > >> > group
> > > > > > >> > > > >> > > > > concept. I have a few questions below.
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > > 1. In old consumer actually the partition
> > > > assignment
> > > > > > are
> > > > > > >> > done
> > > > > > >> > > by
> > > > > > >> > > > >> > > > consumers
> > > > > > >> > > > >> > > > > themselves. We used zookeeper to guarantee
> > that a
> > > > > > >> partition
> > > > > > >> > > will
> > > > > > >> > > > >> only
> > > > > > >> > > > >> > > be
> > > > > > >> > > > >> > > > > consumed by one consumer thread who
> > successfully
> > > > > > claimed
> > > > > > >> its
> > > > > > >> > > > >> > ownership.
> > > > > > >> > > > >> > > > > Does the new protocol plan to provide the
> same
> > > > > > guarantee?
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > Once you have all the metadata from all the
> > > > consumers,
> > > > > > >> > > assignment
> > > > > > >> > > > >> > should
> > > > > > >> > > > >> > > > just be a simple function mapping that
> > > > Map<ConsumerId,
> > > > > > >> > Metadata>
> > > > > > >> > > > to
> > > > > > >> > > > >> > > > Map<ConsumerId, List<TopicPartition>>. If
> > everyone
> > > is
> > > > > > >> > consistent
> > > > > > >> > > > in
> > > > > > >> > > > >> > > > computing that, you don't need ZK involved at
> > all.
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > In practice, this shouldn't be that hard to
> > ensure
> > > > for
> > > > > > most
> > > > > > >> > > > >> assignment
> > > > > > >> > > > >> > > > strategies just by having decent unit testing
> on
> > > > them.
> > > > > > You
> > > > > > >> > just
> > > > > > >> > > > >> have to
> > > > > > >> > > > >> > > do
> > > > > > >> > > > >> > > > things like ensure your assignment strategy
> sorts
> > > > lists
> > > > > > >> into a
> > > > > > >> > > > >> > consistent
> > > > > > >> > > > >> > > > order.
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > You do give up the ability to use some
> techniques
> > > > (e.g.
> > > > > > any
> > > > > > >> > > > >> randomized
> > > > > > >> > > > >> > > > algorithm if you can't distribute the seed w/
> the
> > > > > > metadata)
> > > > > > >> > and
> > > > > > >> > > > it's
> > > > > > >> > > > >> > true
> > > > > > >> > > > >> > > > that nothing validates the assignment, but if
> > that
> > > > > > >> assignment
> > > > > > >> > > > >> algorithm
> > > > > > >> > > > >> > > > step is kept simple, small, and well tested,
> the
> > > risk
> > > > > is
> > > > > > >> very
> > > > > > >> > > > >> minimal.
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > > 2. It looks that both JoinGroupRequest and
> > > > > > >> JoinGroupResponse
> > > > > > >> > > has
> > > > > > >> > > > >> the
> > > > > > >> > > > >> > > > > ProtocolMetadata.AssignmentStrategyMetadata,
> > what
> > > > > would
> > > > > > >> be
> > > > > > >> > the
> > > > > > >> > > > >> > metadata
> > > > > > >> > > > >> > > > be
> > > > > > >> > > > >> > > > > sent and returned by coordinator? How will
> the
> > > > > > >> coordinator
> > > > > > >> > > > handle
> > > > > > >> > > > >> the
> > > > > > >> > > > >> > > > > metadata?
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > The coordinator is basically just blindly
> > > > broadcasting
> > > > > > all
> > > > > > >> of
> > > > > > >> > it
> > > > > > >> > > > to
> > > > > > >> > > > >> > group
> > > > > > >> > > > >> > > > members so they have a consistent view.
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > So from the coordinators perspective, it sees
> > > > something
> > > > > > >> like:
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > Consumer 1 -> JoinGroupRequest with
> > GroupProtocols
> > > =
> > > > [
> > > > > > >> > > "consumer"
> > > > > > >> > > > >> > > > <Consumer1 opaque byte[]>]
> > > > > > >> > > > >> > > > Consumer 2 -> JoinGroupRequest with
> > GroupProtocols
> > > =
> > > > [
> > > > > > >> > > "consumer"
> > > > > > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > Then, in the responses would look like:
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > Consumer 1 <- JoinGroupResponse with
> > GroupProtocol
> > > =
> > > > > > >> > "consumer"
> > > > > > >> > > > and
> > > > > > >> > > > >> > > > GroupMembers = [ Consumer 1 <Consumer1 opaque
> > > > byte[]>,
> > > > > > >> > Consumer
> > > > > > >> > > 2
> > > > > > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > > > > > >> > > > >> > > > Consumer 2 <- JoinGroupResponse with
> > GroupProtocol
> > > =
> > > > > > >> > "consumer"
> > > > > > >> > > > and
> > > > > > >> > > > >> > > > GroupMembers = [ Consumer 1 <Consumer1 opaque
> > > > byte[]>,
> > > > > > >> > Consumer
> > > > > > >> > > 2
> > > > > > >> > > > >> > > > <Consumer2 opaque byte[]>]
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > So all the responses include all the metadata
> for
> > > > every
> > > > > > >> member
> > > > > > >> > > in
> > > > > > >> > > > >> the
> > > > > > >> > > > >> > > > group, and everyone can use that to
> consistently
> > > > decide
> > > > > > on
> > > > > > >> > > > >> assignment.
> > > > > > >> > > > >> > > The
> > > > > > >> > > > >> > > > broker doesn't care and cannot even understand
> > the
> > > > > > metadata
> > > > > > >> > > since
> > > > > > >> > > > >> the
> > > > > > >> > > > >> > > data
> > > > > > >> > > > >> > > > format for it is dependent on the assignment
> > > strategy
> > > > > > being
> > > > > > >> > > used.
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > As another example that is *not* a consumer,
> > let's
> > > > say
> > > > > > you
> > > > > > >> > just
> > > > > > >> > > > >> want to
> > > > > > >> > > > >> > > > have a single writer in the group which
> everyone
> > > will
> > > > > > >> forward
> > > > > > >> > > > >> requests
> > > > > > >> > > > >> > > to.
> > > > > > >> > > > >> > > > To accomplish this, you could use a very dumb
> > > > > assignment
> > > > > > >> > > strategy:
> > > > > > >> > > > >> > there
> > > > > > >> > > > >> > > is
> > > > > > >> > > > >> > > > no metadata (empty byte[]) and all we care
> about
> > is
> > > > who
> > > > > > is
> > > > > > >> the
> > > > > > >> > > > first
> > > > > > >> > > > >> > > member
> > > > > > >> > > > >> > > > in the group (e.g. when IDs are sorted
> > > > > > lexicographically).
> > > > > > >> > That
> > > > > > >> > > > >> member
> > > > > > >> > > > >> > is
> > > > > > >> > > > >> > > > selected as the writer. In that case, we
> actually
> > > > just
> > > > > > care
> > > > > > >> > > about
> > > > > > >> > > > >> the
> > > > > > >> > > > >> > > > membership list, there's no additional info
> about
> > > > each
> > > > > > >> member
> > > > > > >> > > that
> > > > > > >> > > > >> is
> > > > > > >> > > > >> > > > required to determine who is the writer.
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > > 3. Do you mean that the number of partitions
> in
> > > > > > >> > > > JoinGroupResponse
> > > > > > >> > > > >> > will
> > > > > > >> > > > >> > > be
> > > > > > >> > > > >> > > > > the max partition number of a topic among all
> > the
> > > > > > >> reported
> > > > > > >> > > > >> partition
> > > > > > >> > > > >> > > > number
> > > > > > >> > > > >> > > > > by consumers? Is there any reason not just
> let
> > > > > > >> Coordinator
> > > > > > >> > to
> > > > > > >> > > > >> return
> > > > > > >> > > > >> > > the
> > > > > > >> > > > >> > > > > number of partitions of a topic in its
> metadata
> > > > > cache?
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > Nothing from the embedded protocol is touched
> by
> > > the
> > > > > > >> broker.
> > > > > > >> > The
> > > > > > >> > > > >> broker
> > > > > > >> > > > >> > > > just collects opaque bytes of metadata, does
> the
> > > > > > selection
> > > > > > >> of
> > > > > > >> > > the
> > > > > > >> > > > >> > > strategy
> > > > > > >> > > > >> > > > if multiple are supported by some consumers,
> and
> > > then
> > > > > > >> returns
> > > > > > >> > > that
> > > > > > >> > > > >> > opaque
> > > > > > >> > > > >> > > > metadata for all the members back to every
> > member.
> > > In
> > > > > > that
> > > > > > >> way
> > > > > > >> > > > they
> > > > > > >> > > > >> all
> > > > > > >> > > > >> > > > have a consistent view of the group. For
> regular
> > > > > > consumers,
> > > > > > >> > that
> > > > > > >> > > > >> view
> > > > > > >> > > > >> > of
> > > > > > >> > > > >> > > > the group includes information about how many
> > > > > partitions
> > > > > > >> each
> > > > > > >> > > > >> consumer
> > > > > > >> > > > >> > > > currently thinks the topics it is subscribed to
> > > has.
> > > > > > These
> > > > > > >> > could
> > > > > > >> > > > be
> > > > > > >> > > > >> > > > inconsistent due to out of date metadata and it
> > > would
> > > > > be
> > > > > > >> up to
> > > > > > >> > > the
> > > > > > >> > > > >> > > > assignment strategy on the *client* to resolve
> > > that.
> > > > As
> > > > > > you
> > > > > > >> > > point
> > > > > > >> > > > >> out,
> > > > > > >> > > > >> > in
> > > > > > >> > > > >> > > > that case they could just take the max value
> that
> > > any
> > > > > > >> consumer
> > > > > > >> > > > >> reported
> > > > > > >> > > > >> > > > seeing and use that. The consumers that notice
> > that
> > > > > their
> > > > > > >> > > metadata
> > > > > > >> > > > >> had
> > > > > > >> > > > >> > a
> > > > > > >> > > > >> > > > smaller # of partitions should also trigger a
> > > > metadata
> > > > > > >> update
> > > > > > >> > > when
> > > > > > >> > > > >> they
> > > > > > >> > > > >> > > see
> > > > > > >> > > > >> > > > someone else observing a larger # of
> partitions.
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > > Thanks,
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > > Jiangjie (Becket) Qin
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > > On Tue, Aug 11, 2015 at 1:19 PM, Jason
> > Gustafson
> > > <
> > > > > > >> > > > >> ja...@confluent.io
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> > > > > wrote:
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > > > > Hi Kafka Devs,
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > > > One of the nagging issues in the current
> > design
> > > > of
> > > > > > the
> > > > > > >> new
> > > > > > >> > > > >> consumer
> > > > > > >> > > > >> > > has
> > > > > > >> > > > >> > > > > > been the need to support a variety of
> > > assignment
> > > > > > >> > strategies.
> > > > > > >> > > > >> We've
> > > > > > >> > > > >> > > > > > encountered this in particular in the
> design
> > of
> > > > > > copycat
> > > > > > >> > and
> > > > > > >> > > > the
> > > > > > >> > > > >> > > > > processing
> > > > > > >> > > > >> > > > > > framework (KIP-28). From what I understand,
> > > Samza
> > > > > > also
> > > > > > >> > has a
> > > > > > >> > > > >> number
> > > > > > >> > > > >> > > of
> > > > > > >> > > > >> > > > > use
> > > > > > >> > > > >> > > > > > cases with custom assignment needs. The new
> > > > > consumer
> > > > > > >> > > protocol
> > > > > > >> > > > >> > > supports
> > > > > > >> > > > >> > > > > new
> > > > > > >> > > > >> > > > > > assignment strategies by hooking them into
> > the
> > > > > > broker.
> > > > > > >> For
> > > > > > >> > > > many
> > > > > > >> > > > >> > > > > > environments, this is a major pain and in
> > some
> > > > > > cases, a
> > > > > > >> > > > >> > non-starter.
> > > > > > >> > > > >> > > It
> > > > > > >> > > > >> > > > > > also challenges the validation that the
> > > > coordinator
> > > > > > can
> > > > > > >> > > > provide.
> > > > > > >> > > > >> > For
> > > > > > >> > > > >> > > > > > example, some assignment strategies call
> for
> > > > > > >> partitions to
> > > > > > >> > > be
> > > > > > >> > > > >> > > assigned
> > > > > > >> > > > >> > > > > > multiple times, which means that the
> > > coordinator
> > > > > can
> > > > > > >> only
> > > > > > >> > > > check
> > > > > > >> > > > >> > that
> > > > > > >> > > > >> > > > > > partitions have been assigned at least
> once.
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > > > To solve these issues, we'd like to propose
> > > > moving
> > > > > > >> > > assignment
> > > > > > >> > > > to
> > > > > > >> > > > >> > the
> > > > > > >> > > > >> > > > > > client. I've written a wiki which outlines
> > some
> > > > > > >> protocol
> > > > > > >> > > > >> changes to
> > > > > > >> > > > >> > > > > achieve
> > > > > > >> > > > >> > > > > > this:
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> >
> > > > > > >> > > > >>
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal
> > > > > > >> > > > >> > > > > > .
> > > > > > >> > > > >> > > > > > To summarize briefly, instead of the
> > > coordinator
> > > > > > >> assigning
> > > > > > >> > > the
> > > > > > >> > > > >> > > > partitions
> > > > > > >> > > > >> > > > > > itself, all subscriptions are forwarded to
> > each
> > > > > > member
> > > > > > >> of
> > > > > > >> > > the
> > > > > > >> > > > >> group
> > > > > > >> > > > >> > > > which
> > > > > > >> > > > >> > > > > > then decides independently which partitions
> > it
> > > > > should
> > > > > > >> > > consume.
> > > > > > >> > > > >> The
> > > > > > >> > > > >> > > > > protocol
> > > > > > >> > > > >> > > > > > provides a mechanism for the coordinator to
> > > > > validate
> > > > > > >> that
> > > > > > >> > > all
> > > > > > >> > > > >> > > consumers
> > > > > > >> > > > >> > > > > use
> > > > > > >> > > > >> > > > > > the same assignment strategy, but it does
> not
> > > > > ensure
> > > > > > >> that
> > > > > > >> > > the
> > > > > > >> > > > >> > > resulting
> > > > > > >> > > > >> > > > > > assignment is "correct." This provides a
> > > powerful
> > > > > > >> > capability
> > > > > > >> > > > for
> > > > > > >> > > > >> > > users
> > > > > > >> > > > >> > > > to
> > > > > > >> > > > >> > > > > > control the full data flow on the client
> > side.
> > > > They
> > > > > > >> > control
> > > > > > >> > > > how
> > > > > > >> > > > >> > data
> > > > > > >> > > > >> > > is
> > > > > > >> > > > >> > > > > > written to partitions through the
> Partitioner
> > > > > > interface
> > > > > > >> > and
> > > > > > >> > > > they
> > > > > > >> > > > >> > > > control
> > > > > > >> > > > >> > > > > > how data is consumed through the assignment
> > > > > strategy,
> > > > > > >> all
> > > > > > >> > > > >> without
> > > > > > >> > > > >> > > > > touching
> > > > > > >> > > > >> > > > > > the server.
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > > > Of course nothing comes for free. In
> > > particular,
> > > > > this
> > > > > > >> > change
> > > > > > >> > > > >> > removes
> > > > > > >> > > > >> > > > the
> > > > > > >> > > > >> > > > > > ability of the coordinator to validate that
> > > > commits
> > > > > > are
> > > > > > >> > made
> > > > > > >> > > > by
> > > > > > >> > > > >> > > > consumers
> > > > > > >> > > > >> > > > > > who were assigned the respective partition.
> > > This
> > > > > > might
> > > > > > >> not
> > > > > > >> > > be
> > > > > > >> > > > >> too
> > > > > > >> > > > >> > bad
> > > > > > >> > > > >> > > > > since
> > > > > > >> > > > >> > > > > > we retain the ability to validate the
> > > generation
> > > > > id,
> > > > > > >> but
> > > > > > >> > it
> > > > > > >> > > > is a
> > > > > > >> > > > >> > > > > potential
> > > > > > >> > > > >> > > > > > concern. We have considered alternative
> > > protocols
> > > > > > which
> > > > > > >> > add
> > > > > > >> > > a
> > > > > > >> > > > >> > second
> > > > > > >> > > > >> > > > > > round-trip to the protocol in order to give
> > the
> > > > > > >> > coordinator
> > > > > > >> > > > the
> > > > > > >> > > > >> > > ability
> > > > > > >> > > > >> > > > > to
> > > > > > >> > > > >> > > > > > confirm the assignment. As mentioned above,
> > the
> > > > > > >> > coordinator
> > > > > > >> > > is
> > > > > > >> > > > >> > > somewhat
> > > > > > >> > > > >> > > > > > limited in what it can actually validate,
> but
> > > > this
> > > > > > >> would
> > > > > > >> > > > return
> > > > > > >> > > > >> its
> > > > > > >> > > > >> > > > > ability
> > > > > > >> > > > >> > > > > > to validate commits. The tradeoff is that
> it
> > > > > > increases
> > > > > > >> the
> > > > > > >> > > > >> > protocol's
> > > > > > >> > > > >> > > > > > complexity which means more ways for the
> > > protocol
> > > > > to
> > > > > > >> fail
> > > > > > >> > > and
> > > > > > >> > > > >> > > > > consequently
> > > > > > >> > > > >> > > > > > more edge cases in the code.
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > > > It also misses an opportunity to generalize
> > the
> > > > > group
> > > > > > >> > > > membership
> > > > > > >> > > > >> > > > protocol
> > > > > > >> > > > >> > > > > > for additional use cases. In fact, after
> > you've
> > > > > gone
> > > > > > to
> > > > > > >> > the
> > > > > > >> > > > >> trouble
> > > > > > >> > > > >> > > of
> > > > > > >> > > > >> > > > > > moving assignment to the client, the main
> > thing
> > > > > that
> > > > > > is
> > > > > > >> > left
> > > > > > >> > > > in
> > > > > > >> > > > >> > this
> > > > > > >> > > > >> > > > > > protocol is basically a general group
> > > management
> > > > > > >> > capability.
> > > > > > >> > > > >> This
> > > > > > >> > > > >> > is
> > > > > > >> > > > >> > > > > > exactly what is needed for a few cases that
> > are
> > > > > > >> currently
> > > > > > >> > > > under
> > > > > > >> > > > >> > > > > discussion
> > > > > > >> > > > >> > > > > > (e.g. copycat or single-writer producer).
> > We've
> > > > > taken
> > > > > > >> this
> > > > > > >> > > > >> further
> > > > > > >> > > > >> > > step
> > > > > > >> > > > >> > > > > in
> > > > > > >> > > > >> > > > > > the proposal and attempted to envision what
> > > that
> > > > > > >> general
> > > > > > >> > > > >> protocol
> > > > > > >> > > > >> > > might
> > > > > > >> > > > >> > > > > > look like and how it could be used both by
> > the
> > > > > > consumer
> > > > > > >> > and
> > > > > > >> > > > for
> > > > > > >> > > > >> > some
> > > > > > >> > > > >> > > of
> > > > > > >> > > > >> > > > > > these other cases.
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > > > Anyway, since time is running out on the
> new
> > > > > > consumer,
> > > > > > >> we
> > > > > > >> > > have
> > > > > > >> > > > >> > > perhaps
> > > > > > >> > > > >> > > > > one
> > > > > > >> > > > >> > > > > > last chance to consider a significant
> change
> > in
> > > > the
> > > > > > >> > protocol
> > > > > > >> > > > >> like
> > > > > > >> > > > >> > > this,
> > > > > > >> > > > >> > > > > so
> > > > > > >> > > > >> > > > > > have a look at the wiki and share your
> > > thoughts.
> > > > > I've
> > > > > > >> no
> > > > > > >> > > doubt
> > > > > > >> > > > >> that
> > > > > > >> > > > >> > > > some
> > > > > > >> > > > >> > > > > > ideas seem clearer in my mind than they do
> on
> > > > > paper,
> > > > > > so
> > > > > > >> > ask
> > > > > > >> > > > >> > questions
> > > > > > >> > > > >> > > > if
> > > > > > >> > > > >> > > > > > there is any confusion.
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > > > Thanks!
> > > > > > >> > > > >> > > > > > Jason
> > > > > > >> > > > >> > > > > >
> > > > > > >> > > > >> > > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > > > --
> > > > > > >> > > > >> > > > Thanks,
> > > > > > >> > > > >> > > > Ewen
> > > > > > >> > > > >> > > >
> > > > > > >> > > > >> > >
> > > > > > >> > > > >> >
> > > > > > >> > > > >> >
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > --
> > > > > > >> > > > >> > Thanks,
> > > > > > >> > > > >> > Ewen
> > > > > > >> > > > >> >
> > > > > > >> > > > >>
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > Thanks,
> > > > > > > Neha
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > Thanks,
> > > > > > Neha
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Thanks,
> > > > > Ewen
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> >
>



-- 
Thanks,
Neha

Reply via email to