Maybe another approach can be to add a new
"offsets.replica.fetch.max.bytes" config on the brokers.

On Thu, Jun 9, 2016 at 3:03 AM, Onur Karaman <okara...@linkedin.com> wrote:

> I made a PR with a tweak to Jun's/Becket's proposal:
> https://github.com/apache/kafka/pull/1484
>
> It just tweaks the fetch behavior specifically for replicas fetching from
> the __consumer_offsets topic when the fetcher's "replica.fetch.max.bytes"
> is less than the __consumer_offset leader's "message.max.bytes" to take the
> max of the two.
>
> I'm honestly not that happy with this solution, as I'd rather not change
> the "replica.fetch.max.bytes" config from being a limit to a
> recommendation. I'd definitely be happy to hear other alternatives!
>
> On Sun, May 29, 2016 at 1:57 PM, Onur Karaman <
> onurkaraman.apa...@gmail.com> wrote:
>
>> Sorry I know next to nothing about Kafka Connect. I didn't understand the
>> Kafka Connect / MM idea you brought up. Can you go into more detail?
>>
>> Otherwise I think our remaining options are:
>> - Jun's suggestion to bump up the KafkaConfig.messageMaxBytes for
>> __consumer_offsets topic and change the fetch behavior when message size
>> is
>> larger than fetch size
>> - option 6: support sending the regex over the wire instead of the fully
>> expanded topic subscriptions. This should cut down the message size from
>> the subscription side. Again this only helps when pattern-based
>> subscriptions are done.
>>
>> minor correction to an earlier comment I made regarding the message size:
>> message size ~ sum(s_i + a_i for i in range [1, |C|])
>>
>> On Thu, May 26, 2016 at 3:35 PM, Jason Gustafson <ja...@confluent.io>
>> wrote:
>>
>> > Hey Onur,
>> >
>> > Thanks for the investigation. It seems the conclusion is that the
>> compact
>> > format helps, but perhaps not enough to justify adding a new assignment
>> > schema? I'm not sure there's much more room for savings unless we change
>> > something more fundamental in the assignment approach. We spent some
>> time
>> > thinking before about whether we could let the consumers compute their
>> > assignment locally from a smaller set of information, but the difficulty
>> > (I'm sure you remember) is reaching consensus on topic metadata. Kafka
>> > Connect has a similar problem where all the workers need to agree on
>> > connector configurations. Since all configs are stored in a single topic
>> > partition, the approach we take there is to propagate the offset in the
>> > assignment protocol. Not sure if we could do something similar for MM...
>> > Anyway, it seems like the best workaround at the moment is Jun's initial
>> > suggestion. What do you think?
>> >
>> > -Jason
>> >
>> > On Wed, May 25, 2016 at 10:47 PM, Onur Karaman <
>> > onurkaraman.apa...@gmail.com
>> > > wrote:
>> >
>> > > I gave the topic index assignment trick a try against the same
>> > environment.
>> > > The implementation just changed the assignment serialization and
>> > > deserialization logic. It didn't change SyncGroupResponse, meaning it
>> > > continues to exclude the subscription from the SyncGroupResponse and
>> > > assumes the member has kept track of its last subscription.
>> > >
>> > > Assignment topic indexing with compression:
>> > > 1 consumer 34346 bytes
>> > > 5 consumers 177687 bytes
>> > > 10 consumers 331897 bytes
>> > > 20 consumers 572467 bytes
>> > > 30 consumers 811269 bytes
>> > > 40 consumers 1047188 bytes * the tipping point
>> > > 50 consumers 1290092 bytes
>> > > 60 consumers 1527806 bytes
>> > > 70 consumers 1769259 bytes
>> > > 80 consumers 2000118 bytes
>> > > 90 consumers 2244392 bytes
>> > > 100 consumers 2482415 bytes
>> > >
>> > > Assignment topic indexing without compression:
>> > > 1 consumer 211904 bytes
>> > > 5 consumers 677184 bytes
>> > > 10 consumers 1211154 bytes * the tipping point
>> > > 20 consumers 2136196 bytes
>> > > 30 consumers 3061238 bytes
>> > > 40 consumers 3986280 bytes
>> > > 50 consumers 4911322 bytes
>> > > 60 consumers 5836284 bytes
>> > > 70 consumers 6761246 bytes
>> > > 80 consumers 7686208 bytes
>> > > 90 consumers 8611170 bytes
>> > > 100 consumers 9536132 bytes
>> > >
>> > > Assignment topic indexing seems to reduce the size by 500KB without
>> > > compression and 80KB with compression. So assignment topic indexing
>> makes
>> > > some difference in both with and without compression but in our case
>> was
>> > > not nearly enough.
>> > >
>> > > This can be explained by the fact that we aren't actually hitting the
>> > worst
>> > > case scenario of each consumer being assigned a partition from every
>> > topic.
>> > > The reason is simple: a topic can only fully span all the consumers
>> if it
>> > > has at least as many partitions as there are consumers. Given that
>> there
>> > > are 8 partitions per topic and we have 100 consumers, it makes sense
>> that
>> > > we aren't close to this worse case scenario where topic indexing would
>> > make
>> > > a bigger difference.
>> > >
>> > > I tweaked the group leader's assignment code to print out the
>> assignments
>> > > and found that each consumer was getting either 238 or 239 partitions.
>> > Each
>> > > of these partitions were from unique topics. So the consumers were
>> really
>> > > getting partitions from 239 topics instead of the full worst case
>> > scenario
>> > > of 3000 topics.
>> > >
>> > > On Wed, May 25, 2016 at 1:42 PM, Jason Gustafson <ja...@confluent.io>
>> > > wrote:
>> > >
>> > > > Gwen, Joel:
>> > > >
>> > > > That's correct. The protocol does allow us to give an assignor its
>> own
>> > > > assignment schema, but I think this will require a couple internal
>> > > changes
>> > > > to the consumer to make use of the full generality.
>> > > >
>> > > > One thing I'm a little uncertain about is whether we should use a
>> > > different
>> > > > protocol type. For a little context, the group membership protocol
>> > allows
>> > > > the client to provide a "protocol type" when joining the group to
>> > ensure
>> > > > that all members have some basic semantic compatibility. For
>> example,
>> > the
>> > > > consumer uses "consumer" and Kafka Connect uses "connect." Currently
>> > all
>> > > > assignors using the "consumer" protocol share a common schema for
>> > > > representing subscriptions and assignment. This is convenient for
>> tools
>> > > > (like consumer-groups.sh) since they just need to know how to parse
>> the
>> > > > "consumer" protocol type without knowing anything about the
>> assignors.
>> > So
>> > > > introducing another schema would break that assumption and we'd need
>> > > those
>> > > > tools to do assignor-specific parsing. Maybe this is OK?
>> Alternatively,
>> > > we
>> > > > could use a separate protocol type (e.g. "compact-consumer"), but
>> that
>> > > > seems less than desirable.
>> > > >
>> > > > Thanks,
>> > > > Jason
>> > > >
>> > > > On Wed, May 25, 2016 at 11:00 AM, Gwen Shapira <g...@confluent.io>
>> > > wrote:
>> > > >
>> > > > > ah, right - we can add as many strategies as we want.
>> > > > >
>> > > > > On Wed, May 25, 2016 at 10:54 AM, Joel Koshy <jjkosh...@gmail.com
>> >
>> > > > wrote:
>> > > > >
>> > > > > > > Yes it would be a protocol bump.
>> > > > > > >
>> > > > > >
>> > > > > > Sorry - I'm officially confused. I think it may not be required
>> -
>> > > since
>> > > > > the
>> > > > > > more compact format would be associated with a new assignment
>> > > strategy
>> > > > -
>> > > > > > right?
>> > > > > >
>> > > > > >
>> > > > > > > smaller than the plaintext PAL, but the post-compressed binary
>> > PAL
>> > > is
>> > > > > > just
>> > > > > > > 25% smaller than the post-compressed plaintext PAL. IOW using
>> a
>> > > > symbol
>> > > > > > > table helps a lot but further compression on that already
>> compact
>> > > > > format
>> > > > > > > would yield only marginal return.
>> > > > > > >
>> > > > > >
>> > > > > > > So basically I feel we could get pretty far with a more
>> compact
>> > > field
>> > > > > > > format for assignment and if we do that then we would
>> potentially
>> > > not
>> > > > > > even
>> > > > > > > want to do any compression.
>> > > > > > >
>> > > > > >
>> > > > > > Also just wanted to add that this compression on the binary PAL
>> did
>> > > > help
>> > > > > > but the compression ratio was obviously not as high as plaintext
>> > > > > > compression.
>> > > > > >
>> > > > > >
>> > > > > > >
>> > > > > > > On Tue, May 24, 2016 at 4:19 PM, Gwen Shapira <
>> g...@confluent.io
>> > >
>> > > > > wrote:
>> > > > > > >
>> > > > > > >> Regarding the change to the assignment field. It would be a
>> > > protocol
>> > > > > > bump,
>> > > > > > >> otherwise consumers will not know how to parse the bytes the
>> > > broker
>> > > > is
>> > > > > > >> returning, right?
>> > > > > > >> Or did I misunderstand the suggestion?
>> > > > > > >>
>> > > > > > >> On Tue, May 24, 2016 at 2:52 PM, Guozhang Wang <
>> > > wangg...@gmail.com>
>> > > > > > >> wrote:
>> > > > > > >>
>> > > > > > >> > I think for just solving issue 1), Jun's suggestion is
>> > > sufficient
>> > > > > and
>> > > > > > >> > simple. So I'd prefer that approach.
>> > > > > > >> >
>> > > > > > >> > In addition, Jason's optimization on the assignment field
>> > would
>> > > be
>> > > > > > good
>> > > > > > >> for
>> > > > > > >> > 2) and 3) as well, and I like that optimization for its
>> > > simplicity
>> > > > > and
>> > > > > > >> no
>> > > > > > >> > format change as well. And in the future I'm in favor of
>> > > > considering
>> > > > > > to
>> > > > > > >> > change the in-memory cache format as Jiangjie suggested.
>> > > > > > >> >
>> > > > > > >> > Guozhang
>> > > > > > >> >
>> > > > > > >> >
>> > > > > > >> > On Tue, May 24, 2016 at 12:42 PM, Becket Qin <
>> > > > becket....@gmail.com>
>> > > > > > >> wrote:
>> > > > > > >> >
>> > > > > > >> > > Hi Jason,
>> > > > > > >> > >
>> > > > > > >> > > There are a few problems we want to solve here:
>> > > > > > >> > > 1. The group metadata is too big to be appended to the
>> log.
>> > > > > > >> > > 2. Reduce the memory footprint on the broker
>> > > > > > >> > > 3. Reduce the bytes transferred over the wire.
>> > > > > > >> > >
>> > > > > > >> > > To solve (1), I like your idea of having separate
>> messages
>> > per
>> > > > > > member.
>> > > > > > >> > The
>> > > > > > >> > > proposal (Onur's option 8) is to break metadata into
>> small
>> > > > records
>> > > > > > in
>> > > > > > >> the
>> > > > > > >> > > same uncompressed message set so each record is small. I
>> > agree
>> > > > it
>> > > > > > >> would
>> > > > > > >> > be
>> > > > > > >> > > ideal if we are able to store the metadata separately for
>> > each
>> > > > > > >> member. I
>> > > > > > >> > > was also thinking about storing the metadata into
>> multiple
>> > > > > messages,
>> > > > > > >> too.
>> > > > > > >> > > What concerns me was that having multiple messages seems
>> > > > breaking
>> > > > > > the
>> > > > > > >> > > atomicity. I am not sure how we are going to deal with
>> the
>> > > > > potential
>> > > > > > >> > > issues. For example, What if group metadata is replicated
>> > but
>> > > > the
>> > > > > > >> member
>> > > > > > >> > > metadata is not? It might be fine depending on the
>> > > > implementation
>> > > > > > >> though,
>> > > > > > >> > > but I am not sure.
>> > > > > > >> > >
>> > > > > > >> > > For (2) we want to store the metadata onto the disk,
>> which
>> > is
>> > > > what
>> > > > > > we
>> > > > > > >> > have
>> > > > > > >> > > to do anyway. The only question is in what format should
>> we
>> > > > store
>> > > > > > >> them.
>> > > > > > >> > >
>> > > > > > >> > > To address (3) we want to have the metadata to be
>> > compressed,
>> > > > > which
>> > > > > > is
>> > > > > > >> > > contradict to the the above solution of (1).
>> > > > > > >> > >
>> > > > > > >> > > I think Jun's suggestion is probably still the simplest.
>> To
>> > > > avoid
>> > > > > > >> > changing
>> > > > > > >> > > the behavior for consumers, maybe we can do that only for
>> > > > > > >> offset_topic,
>> > > > > > >> > > i.e, if the max fetch bytes of the fetch request is
>> smaller
>> > > than
>> > > > > the
>> > > > > > >> > > message size on the offset topic, we always return at
>> least
>> > > one
>> > > > > full
>> > > > > > >> > > message. This should avoid the unexpected problem on the
>> > > client
>> > > > > side
>> > > > > > >> > > because supposedly only tools and brokers will fetch from
>> > the
>> > > > the
>> > > > > > >> > internal
>> > > > > > >> > > topics,
>> > > > > > >> > >
>> > > > > > >> > > As a modification to what you suggested, one solution I
>> was
>> > > > > thinking
>> > > > > > >> was
>> > > > > > >> > to
>> > > > > > >> > > have multiple messages in a single compressed message.
>> That
>> > > > means
>> > > > > > for
>> > > > > > >> > > SyncGroupResponse we still need to read the entire
>> > compressed
>> > > > > > messages
>> > > > > > >> > and
>> > > > > > >> > > extract the inner messages, which seems not quite
>> different
>> > > from
>> > > > > > >> having a
>> > > > > > >> > > single message containing everything. But let me just
>> put it
>> > > > here
>> > > > > > and
>> > > > > > >> see
>> > > > > > >> > > if that makes sense.
>> > > > > > >> > >
>> > > > > > >> > > We can have a map of GroupMetadataKey ->
>> > > > GroupMetadataValueOffset.
>> > > > > > >> > >
>> > > > > > >> > > The GroupMetadataValue is stored in a compressed message.
>> > The
>> > > > > inner
>> > > > > > >> > > messages are the following:
>> > > > > > >> > >
>> > > > > > >> > > Inner Message 0: Version GroupId Generation
>> > > > > > >> > >
>> > > > > > >> > > Inner Message 1: MemberId MemberMetadata_1 (we can
>> compress
>> > > the
>> > > > > > bytes
>> > > > > > >> > here)
>> > > > > > >> > >
>> > > > > > >> > > Inner Message 2: MemberId MemberMetadata_2
>> > > > > > >> > > ....
>> > > > > > >> > > Inner Message N: MemberId MemberMetadata_N
>> > > > > > >> > >
>> > > > > > >> > > The MemberMetadata format is the following:
>> > > > > > >> > >   MemberMetadata => Version Generation ClientId Host
>> > > > Subscription
>> > > > > > >> > > Assignment
>> > > > > > >> > >
>> > > > > > >> > > So DescribeGroupResponse will just return the entire
>> > > compressed
>> > > > > > >> > > GroupMetadataMessage. SyncGroupResponse will return the
>> > > > > > corresponding
>> > > > > > >> > inner
>> > > > > > >> > > message.
>> > > > > > >> > >
>> > > > > > >> > > Thanks,
>> > > > > > >> > >
>> > > > > > >> > > Jiangjie (Becket) Qin
>> > > > > > >> > >
>> > > > > > >> > >
>> > > > > > >> > >
>> > > > > > >> > > On Tue, May 24, 2016 at 9:14 AM, Jason Gustafson <
>> > > > > > ja...@confluent.io>
>> > > > > > >> > > wrote:
>> > > > > > >> > >
>> > > > > > >> > > > Hey Becket,
>> > > > > > >> > > >
>> > > > > > >> > > > I like your idea to store only the offset for the group
>> > > > metadata
>> > > > > > in
>> > > > > > >> > > memory.
>> > > > > > >> > > > I think it would be safe to keep it in memory for a
>> short
>> > > time
>> > > > > > after
>> > > > > > >> > the
>> > > > > > >> > > > rebalance completes, but after that, it's only real
>> > purpose
>> > > is
>> > > > > to
>> > > > > > >> > answer
>> > > > > > >> > > > DescribeGroup requests, so your proposal makes a lot of
>> > > sense
>> > > > to
>> > > > > > me.
>> > > > > > >> > > >
>> > > > > > >> > > > As for the specific problem with the size of the group
>> > > > metadata
>> > > > > > >> message
>> > > > > > >> > > for
>> > > > > > >> > > > the MM case, if we cannot succeed in reducing the size
>> of
>> > > the
>> > > > > > >> > > > subscription/assignment (which I think is still
>> probably
>> > the
>> > > > > best
>> > > > > > >> > > > alternative if it can work), then I think there are
>> some
>> > > > options
>> > > > > > for
>> > > > > > >> > > > changing the message format (option #8 in Onur's
>> initial
>> > > > > e-mail).
>> > > > > > >> > > > Currently, the key used for storing the group metadata
>> is
>> > > > this:
>> > > > > > >> > > >
>> > > > > > >> > > > GroupMetadataKey => Version GroupId
>> > > > > > >> > > >
>> > > > > > >> > > > And the value is something like this (some details
>> > elided):
>> > > > > > >> > > >
>> > > > > > >> > > > GroupMetadataValue => Version GroupId Generation
>> > > > > [MemberMetadata]
>> > > > > > >> > > >   MemberMetadata => ClientId Host Subscription
>> Assignment
>> > > > > > >> > > >
>> > > > > > >> > > > I don't think we can change the key without a lot of
>> pain,
>> > > but
>> > > > > it
>> > > > > > >> seems
>> > > > > > >> > > > like we can change the value format. Maybe we can take
>> the
>> > > > > > >> > > > subscription/assignment payloads out of the value and
>> > > > introduce
>> > > > > a
>> > > > > > >> new
>> > > > > > >> > > > "MemberMetadata" message for each member in the group.
>> For
>> > > > > > example:
>> > > > > > >> > > >
>> > > > > > >> > > > MemberMetadataKey => Version GroupId MemberId
>> > > > > > >> > > >
>> > > > > > >> > > > MemberMetadataValue => Version Generation ClientId Host
>> > > > > > Subscription
>> > > > > > >> > > > Assignment
>> > > > > > >> > > >
>> > > > > > >> > > > When a new generation is created, we would first write
>> the
>> > > > group
>> > > > > > >> > metadata
>> > > > > > >> > > > message which includes the generation and all of the
>> > > > memberIds,
>> > > > > > and
>> > > > > > >> > then
>> > > > > > >> > > > we'd write the member metadata messages. To answer the
>> > > > > > DescribeGroup
>> > > > > > >> > > > request, we'd read the group metadata at the cached
>> offset
>> > > > and,
>> > > > > > >> > depending
>> > > > > > >> > > > on the version, all of the following member metadata.
>> This
>> > > > would
>> > > > > > be
>> > > > > > >> > more
>> > > > > > >> > > > complex to maintain, but it seems doable if it comes to
>> > it.
>> > > > > > >> > > >
>> > > > > > >> > > > Thanks,
>> > > > > > >> > > > Jason
>> > > > > > >> > > >
>> > > > > > >> > > > On Mon, May 23, 2016 at 6:15 PM, Becket Qin <
>> > > > > becket....@gmail.com
>> > > > > > >
>> > > > > > >> > > wrote:
>> > > > > > >> > > >
>> > > > > > >> > > > > It might worth thinking a little further. We have
>> > > discussed
>> > > > > this
>> > > > > > >> > before
>> > > > > > >> > > > > that we want to avoid holding all the group metadata
>> in
>> > > > > memory.
>> > > > > > >> > > > >
>> > > > > > >> > > > > I am thinking about the following end state:
>> > > > > > >> > > > >
>> > > > > > >> > > > > 1. Enable compression on the offset topic.
>> > > > > > >> > > > > 2. Instead of holding the entire group metadata in
>> > memory
>> > > on
>> > > > > the
>> > > > > > >> > > brokers,
>> > > > > > >> > > > > each broker only keeps a [group -> Offset] map, the
>> > offset
>> > > > > > points
>> > > > > > >> to
>> > > > > > >> > > the
>> > > > > > >> > > > > message in the offset topic which holds the latest
>> > > metadata
>> > > > of
>> > > > > > the
>> > > > > > >> > > group.
>> > > > > > >> > > > > 3. DescribeGroupResponse will read from the offset
>> topic
>> > > > > > directly
>> > > > > > >> > like
>> > > > > > >> > > a
>> > > > > > >> > > > > normal consumption, except that only exactly one
>> message
>> > > > will
>> > > > > be
>> > > > > > >> > > > returned.
>> > > > > > >> > > > > 4. SyncGroupResponse will read the message, extract
>> the
>> > > > > > assignment
>> > > > > > >> > part
>> > > > > > >> > > > and
>> > > > > > >> > > > > send back the partition assignment. We can compress
>> the
>> > > > > > partition
>> > > > > > >> > > > > assignment before sends it out if we want.
>> > > > > > >> > > > >
>> > > > > > >> > > > > Jiangjie (Becket) Qin
>> > > > > > >> > > > >
>> > > > > > >> > > > > On Mon, May 23, 2016 at 5:08 PM, Jason Gustafson <
>> > > > > > >> ja...@confluent.io
>> > > > > > >> > >
>> > > > > > >> > > > > wrote:
>> > > > > > >> > > > >
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > > Jason, doesn't gzip (or other compression)
>> basically
>> > > do
>> > > > > > this?
>> > > > > > >> If
>> > > > > > >> > > the
>> > > > > > >> > > > > > topic
>> > > > > > >> > > > > > > is a string and the topic is repeated throughout,
>> > > won't
>> > > > > > >> > compression
>> > > > > > >> > > > > > > basically replace all repeated instances of it
>> with
>> > an
>> > > > > index
>> > > > > > >> > > > reference
>> > > > > > >> > > > > to
>> > > > > > >> > > > > > > the full string?
>> > > > > > >> > > > > >
>> > > > > > >> > > > > >
>> > > > > > >> > > > > > Hey James, yeah, that's probably true, but keep in
>> > mind
>> > > > that
>> > > > > > the
>> > > > > > >> > > > > > compression happens on the broker side. It would be
>> > nice
>> > > > to
>> > > > > > >> have a
>> > > > > > >> > > more
>> > > > > > >> > > > > > compact representation so that get some benefit
>> over
>> > the
>> > > > > wire
>> > > > > > as
>> > > > > > >> > > well.
>> > > > > > >> > > > > This
>> > > > > > >> > > > > > seems to be less of a concern here, so the bigger
>> > gains
>> > > > are
>> > > > > > >> > probably
>> > > > > > >> > > > from
>> > > > > > >> > > > > > reducing the number of partitions that need to be
>> > listed
>> > > > > > >> > > individually.
>> > > > > > >> > > > > >
>> > > > > > >> > > > > > -Jason
>> > > > > > >> > > > > >
>> > > > > > >> > > > > > On Mon, May 23, 2016 at 4:23 PM, Onur Karaman <
>> > > > > > >> > > > > > onurkaraman.apa...@gmail.com>
>> > > > > > >> > > > > > wrote:
>> > > > > > >> > > > > >
>> > > > > > >> > > > > > > When figuring out these optimizations, it's worth
>> > > > keeping
>> > > > > in
>> > > > > > >> mind
>> > > > > > >> > > the
>> > > > > > >> > > > > > > improvements when the message is uncompressed vs
>> > when
>> > > > it's
>> > > > > > >> > > > compressed.
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > > When uncompressed:
>> > > > > > >> > > > > > > Fixing the Assignment serialization to instead
>> be a
>> > > > topic
>> > > > > > >> index
>> > > > > > >> > > into
>> > > > > > >> > > > > the
>> > > > > > >> > > > > > > corresponding member's subscription list would
>> > usually
>> > > > be
>> > > > > a
>> > > > > > >> good
>> > > > > > >> > > > thing.
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > > I think the proposal is only worse when the topic
>> > > names
>> > > > > are
>> > > > > > >> > small.
>> > > > > > >> > > > The
>> > > > > > >> > > > > > > Type.STRING we use in our protocol for the
>> > > assignment's
>> > > > > > >> > > > TOPIC_KEY_NAME
>> > > > > > >> > > > > is
>> > > > > > >> > > > > > > limited in length to Short.MAX_VALUE, so our
>> strings
>> > > are
>> > > > > > first
>> > > > > > >> > > > > prepended
>> > > > > > >> > > > > > > with 2 bytes to indicate the string size.
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > > The new proposal does worse when:
>> > > > > > >> > > > > > > 2 + utf_encoded_string_payload_size <
>> > index_type_size
>> > > > > > >> > > > > > > in other words when:
>> > > > > > >> > > > > > > utf_encoded_string_payload_size <
>> index_type_size -
>> > 2
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > > If the index type ends up being Type.INT32, then
>> the
>> > > > > > proposal
>> > > > > > >> is
>> > > > > > >> > > > worse
>> > > > > > >> > > > > > when
>> > > > > > >> > > > > > > the topic is length 1.
>> > > > > > >> > > > > > > If the index type ends up being Type.INT64, then
>> the
>> > > > > > proposal
>> > > > > > >> is
>> > > > > > >> > > > worse
>> > > > > > >> > > > > > when
>> > > > > > >> > > > > > > the topic is less than length 6.
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > > When compressed:
>> > > > > > >> > > > > > > As James Cheng brought up, I'm not sure how
>> things
>> > > > change
>> > > > > > when
>> > > > > > >> > > > > > compression
>> > > > > > >> > > > > > > comes into the picture. This would be worth
>> > > > investigating.
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > > On Mon, May 23, 2016 at 4:05 PM, James Cheng <
>> > > > > > >> > wushuja...@gmail.com
>> > > > > > >> > > >
>> > > > > > >> > > > > > wrote:
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > > > >
>> > > > > > >> > > > > > > > > On May 23, 2016, at 10:59 AM, Jason
>> Gustafson <
>> > > > > > >> > > > ja...@confluent.io>
>> > > > > > >> > > > > > > > wrote:
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > > 2. Maybe there's a better way to lay out the
>> > > > > assignment
>> > > > > > >> > without
>> > > > > > >> > > > > > needing
>> > > > > > >> > > > > > > > to
>> > > > > > >> > > > > > > > > explicitly repeat the topic? For example, the
>> > > leader
>> > > > > > could
>> > > > > > >> > sort
>> > > > > > >> > > > the
>> > > > > > >> > > > > > > > topics
>> > > > > > >> > > > > > > > > for each member and just use an integer to
>> > > represent
>> > > > > the
>> > > > > > >> > index
>> > > > > > >> > > of
>> > > > > > >> > > > > > each
>> > > > > > >> > > > > > > > > topic within the sorted list (note this
>> depends
>> > on
>> > > > the
>> > > > > > >> > > > subscription
>> > > > > > >> > > > > > > > > including the full topic list).
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > > Assignment -> [TopicIndex [Partition]]
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > >
>> > > > > > >> > > > > > > > Jason, doesn't gzip (or other compression)
>> > basically
>> > > > do
>> > > > > > >> this?
>> > > > > > >> > If
>> > > > > > >> > > > the
>> > > > > > >> > > > > > > topic
>> > > > > > >> > > > > > > > is a string and the topic is repeated
>> throughout,
>> > > > won't
>> > > > > > >> > > compression
>> > > > > > >> > > > > > > > basically replace all repeated instances of it
>> > with
>> > > an
>> > > > > > index
>> > > > > > >> > > > > reference
>> > > > > > >> > > > > > to
>> > > > > > >> > > > > > > > the full string?
>> > > > > > >> > > > > > > >
>> > > > > > >> > > > > > > > -James
>> > > > > > >> > > > > > > >
>> > > > > > >> > > > > > > > > You could even combine these two options so
>> that
>> > > you
>> > > > > > have
>> > > > > > >> > only
>> > > > > > >> > > 3
>> > > > > > >> > > > > > > integers
>> > > > > > >> > > > > > > > > for each topic assignment:
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > > Assignment -> [TopicIndex MinPartition
>> > > MaxPartition]
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > > There may even be better options with a
>> little
>> > > more
>> > > > > > >> thought.
>> > > > > > >> > > All
>> > > > > > >> > > > of
>> > > > > > >> > > > > > > this
>> > > > > > >> > > > > > > > is
>> > > > > > >> > > > > > > > > just part of the client-side protocol, so it
>> > > > wouldn't
>> > > > > > >> require
>> > > > > > >> > > any
>> > > > > > >> > > > > > > version
>> > > > > > >> > > > > > > > > bumps on the broker. What do you think?
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > > Thanks,
>> > > > > > >> > > > > > > > > Jason
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > > On Mon, May 23, 2016 at 9:17 AM, Guozhang
>> Wang <
>> > > > > > >> > > > wangg...@gmail.com
>> > > > > > >> > > > > >
>> > > > > > >> > > > > > > > wrote:
>> > > > > > >> > > > > > > > >
>> > > > > > >> > > > > > > > >> The original concern is that regex may not
>> be
>> > > > > > efficiently
>> > > > > > >> > > > > supported
>> > > > > > >> > > > > > > > >> across-languages, but if there is a neat
>> > > > workaround I
>> > > > > > >> would
>> > > > > > >> > > love
>> > > > > > >> > > > > to
>> > > > > > >> > > > > > > > learn.
>> > > > > > >> > > > > > > > >>
>> > > > > > >> > > > > > > > >> Guozhang
>> > > > > > >> > > > > > > > >>
>> > > > > > >> > > > > > > > >> On Mon, May 23, 2016 at 5:31 AM, Ismael
>> Juma <
>> > > > > > >> > > ism...@juma.me.uk
>> > > > > > >> > > > >
>> > > > > > >> > > > > > > wrote:
>> > > > > > >> > > > > > > > >>
>> > > > > > >> > > > > > > > >>> +1 to Jun's suggestion.
>> > > > > > >> > > > > > > > >>>
>> > > > > > >> > > > > > > > >>> Having said that, as a general point, I
>> think
>> > we
>> > > > > > should
>> > > > > > >> > > > consider
>> > > > > > >> > > > > > > > >> supporting
>> > > > > > >> > > > > > > > >>> topic patterns in the wire protocol. It
>> > requires
>> > > > > some
>> > > > > > >> > > thinking
>> > > > > > >> > > > > for
>> > > > > > >> > > > > > > > >>> cross-language support, but it seems
>> > > surmountable
>> > > > > and
>> > > > > > it
>> > > > > > >> > > could
>> > > > > > >> > > > > make
>> > > > > > >> > > > > > > > >> certain
>> > > > > > >> > > > > > > > >>> operations a lot more efficient (the fact
>> > that a
>> > > > > basic
>> > > > > > >> > regex
>> > > > > > >> > > > > > > > subscription
>> > > > > > >> > > > > > > > >>> causes the consumer to request metadata for
>> > all
>> > > > > topics
>> > > > > > >> is
>> > > > > > >> > not
>> > > > > > >> > > > > > great).
>> > > > > > >> > > > > > > > >>>
>> > > > > > >> > > > > > > > >>> Ismael
>> > > > > > >> > > > > > > > >>>
>> > > > > > >> > > > > > > > >>> On Sun, May 22, 2016 at 11:49 PM, Guozhang
>> > Wang
>> > > <
>> > > > > > >> > > > > > wangg...@gmail.com>
>> > > > > > >> > > > > > > > >>> wrote:
>> > > > > > >> > > > > > > > >>>
>> > > > > > >> > > > > > > > >>>> I like Jun's suggestion in changing the
>> > > handling
>> > > > > > >> logics of
>> > > > > > >> > > > > single
>> > > > > > >> > > > > > > > large
>> > > > > > >> > > > > > > > >>>> message on the consumer side.
>> > > > > > >> > > > > > > > >>>>
>> > > > > > >> > > > > > > > >>>> As for the case of "a single group
>> > subscribing
>> > > to
>> > > > > > 3000
>> > > > > > >> > > > topics",
>> > > > > > >> > > > > > with
>> > > > > > >> > > > > > > > >> 100
>> > > > > > >> > > > > > > > >>>> consumers the 2.5Mb Gzip size is
>> reasonable
>> > to
>> > > me
>> > > > > > (when
>> > > > > > >> > > > storing
>> > > > > > >> > > > > in
>> > > > > > >> > > > > > > ZK,
>> > > > > > >> > > > > > > > >> we
>> > > > > > >> > > > > > > > >>>> also have the znode limit which is set to
>> 1Mb
>> > > by
>> > > > > > >> default,
>> > > > > > >> > > > though
>> > > > > > >> > > > > > > > >>> admittedly
>> > > > > > >> > > > > > > > >>>> it is only for one consumer). And if we do
>> > the
>> > > > > change
>> > > > > > >> as
>> > > > > > >> > Jun
>> > > > > > >> > > > > > > > suggested,
>> > > > > > >> > > > > > > > >>>> 2.5Mb on follower's memory pressure is OK
>> I
>> > > > think.
>> > > > > > >> > > > > > > > >>>>
>> > > > > > >> > > > > > > > >>>>
>> > > > > > >> > > > > > > > >>>> Guozhang
>> > > > > > >> > > > > > > > >>>>
>> > > > > > >> > > > > > > > >>>>
>> > > > > > >> > > > > > > > >>>> On Sat, May 21, 2016 at 12:51 PM, Onur
>> > Karaman
>> > > <
>> > > > > > >> > > > > > > > >>>> onurkaraman.apa...@gmail.com
>> > > > > > >> > > > > > > > >>>>> wrote:
>> > > > > > >> > > > > > > > >>>>
>> > > > > > >> > > > > > > > >>>>> Results without compression:
>> > > > > > >> > > > > > > > >>>>> 1 consumer 292383 bytes
>> > > > > > >> > > > > > > > >>>>> 5 consumers 1079579 bytes * the tipping
>> > point
>> > > > > > >> > > > > > > > >>>>> 10 consumers 1855018 bytes
>> > > > > > >> > > > > > > > >>>>> 20 consumers 2780220 bytes
>> > > > > > >> > > > > > > > >>>>> 30 consumers 3705422 bytes
>> > > > > > >> > > > > > > > >>>>> 40 consumers 4630624 bytes
>> > > > > > >> > > > > > > > >>>>> 50 consumers 5555826 bytes
>> > > > > > >> > > > > > > > >>>>> 60 consumers 6480788 bytes
>> > > > > > >> > > > > > > > >>>>> 70 consumers 7405750 bytes
>> > > > > > >> > > > > > > > >>>>> 80 consumers 8330712 bytes
>> > > > > > >> > > > > > > > >>>>> 90 consumers 9255674 bytes
>> > > > > > >> > > > > > > > >>>>> 100 consumers 10180636 bytes
>> > > > > > >> > > > > > > > >>>>>
>> > > > > > >> > > > > > > > >>>>> So it looks like gzip compression shrinks
>> > the
>> > > > > > message
>> > > > > > >> > size
>> > > > > > >> > > by
>> > > > > > >> > > > > 4x.
>> > > > > > >> > > > > > > > >>>>>
>> > > > > > >> > > > > > > > >>>>> On Sat, May 21, 2016 at 9:47 AM, Jun Rao
>> <
>> > > > > > >> > j...@confluent.io
>> > > > > > >> > > >
>> > > > > > >> > > > > > wrote:
>> > > > > > >> > > > > > > > >>>>>
>> > > > > > >> > > > > > > > >>>>>> Onur,
>> > > > > > >> > > > > > > > >>>>>>
>> > > > > > >> > > > > > > > >>>>>> Thanks for the investigation.
>> > > > > > >> > > > > > > > >>>>>>
>> > > > > > >> > > > > > > > >>>>>> Another option is to just fix how we
>> deal
>> > > with
>> > > > > the
>> > > > > > >> case
>> > > > > > >> > > > when a
>> > > > > > >> > > > > > > > >>> message
>> > > > > > >> > > > > > > > >>>> is
>> > > > > > >> > > > > > > > >>>>>> larger than the fetch size. Today, if
>> the
>> > > fetch
>> > > > > > size
>> > > > > > >> is
>> > > > > > >> > > > > smaller
>> > > > > > >> > > > > > > > >> than
>> > > > > > >> > > > > > > > >>>> the
>> > > > > > >> > > > > > > > >>>>>> fetch size, the consumer will get stuck.
>> > > > Instead,
>> > > > > > we
>> > > > > > >> can
>> > > > > > >> > > > > simply
>> > > > > > >> > > > > > > > >>> return
>> > > > > > >> > > > > > > > >>>>> the
>> > > > > > >> > > > > > > > >>>>>> full message if it's larger than the
>> fetch
>> > > size
>> > > > > w/o
>> > > > > > >> > > > requiring
>> > > > > > >> > > > > > the
>> > > > > > >> > > > > > > > >>>>> consumer
>> > > > > > >> > > > > > > > >>>>>> to manually adjust the fetch size. On
>> the
>> > > > broker
>> > > > > > >> side,
>> > > > > > >> > to
>> > > > > > >> > > > > serve
>> > > > > > >> > > > > > a
>> > > > > > >> > > > > > > > >>> fetch
>> > > > > > >> > > > > > > > >>>>>> request, we already do an index lookup
>> and
>> > > then
>> > > > > > scan
>> > > > > > >> the
>> > > > > > >> > > > log a
>> > > > > > >> > > > > > bit
>> > > > > > >> > > > > > > > >> to
>> > > > > > >> > > > > > > > >>>>> find
>> > > > > > >> > > > > > > > >>>>>> the message with the requested offset.
>> We
>> > can
>> > > > > just
>> > > > > > >> check
>> > > > > > >> > > the
>> > > > > > >> > > > > > size
>> > > > > > >> > > > > > > > >> of
>> > > > > > >> > > > > > > > >>>> that
>> > > > > > >> > > > > > > > >>>>>> message and return the full message if
>> its
>> > > size
>> > > > > is
>> > > > > > >> > larger
>> > > > > > >> > > > than
>> > > > > > >> > > > > > the
>> > > > > > >> > > > > > > > >>>> fetch
>> > > > > > >> > > > > > > > >>>>>> size. This way, fetch size is really for
>> > > > > > performance
>> > > > > > >> > > > > > optimization,
>> > > > > > >> > > > > > > > >>> i.e.
>> > > > > > >> > > > > > > > >>>>> in
>> > > > > > >> > > > > > > > >>>>>> the common case, we will not return more
>> > > bytes
>> > > > > than
>> > > > > > >> > fetch
>> > > > > > >> > > > > size,
>> > > > > > >> > > > > > > but
>> > > > > > >> > > > > > > > >>> if
>> > > > > > >> > > > > > > > >>>>>> there is a large message, we will return
>> > more
>> > > > > bytes
>> > > > > > >> than
>> > > > > > >> > > the
>> > > > > > >> > > > > > > > >>> specified
>> > > > > > >> > > > > > > > >>>>>> fetch size. In practice, large messages
>> are
>> > > > rare.
>> > > > > > >> So, it
>> > > > > > >> > > > > > shouldn't
>> > > > > > >> > > > > > > > >>>>> increase
>> > > > > > >> > > > > > > > >>>>>> the memory consumption on the client too
>> > > much.
>> > > > > > >> > > > > > > > >>>>>>
>> > > > > > >> > > > > > > > >>>>>> Jun
>> > > > > > >> > > > > > > > >>>>>>
>> > > > > > >> > > > > > > > >>>>>> On Sat, May 21, 2016 at 3:34 AM, Onur
>> > > Karaman <
>> > > > > > >> > > > > > > > >>>>>> onurkaraman.apa...@gmail.com>
>> > > > > > >> > > > > > > > >>>>>> wrote:
>> > > > > > >> > > > > > > > >>>>>>
>> > > > > > >> > > > > > > > >>>>>>> Hey everyone. So I started doing some
>> > tests
>> > > on
>> > > > > the
>> > > > > > >> new
>> > > > > > >> > > > > > > > >>>>>> consumer/coordinator
>> > > > > > >> > > > > > > > >>>>>>> to see if it could handle more
>> strenuous
>> > use
>> > > > > cases
>> > > > > > >> like
>> > > > > > >> > > > > > mirroring
>> > > > > > >> > > > > > > > >>>>>> clusters
>> > > > > > >> > > > > > > > >>>>>>> with thousands of topics and thought
>> I'd
>> > > share
>> > > > > > >> > whatever I
>> > > > > > >> > > > > have
>> > > > > > >> > > > > > so
>> > > > > > >> > > > > > > > >>>> far.
>> > > > > > >> > > > > > > > >>>>>>>
>> > > > > > >> > > > > > > > >>>>>>> The scalability limit: the amount of
>> group
>> > > > > > metadata
>> > > > > > >> we
>> > > > > > >> > > can
>> > > > > > >> > > > > fit
>> > > > > > >> > > > > > > > >> into
>> > > > > > >> > > > > > > > >>>> one
>> > > > > > >> > > > > > > > >>>>>>> message
>> > > > > > >> > > > > > > > >>>>>>>
>> > > > > > >> > > > > > > > >>>>>>> Some background:
>> > > > > > >> > > > > > > > >>>>>>> Client-side assignment is implemented
>> in
>> > two
>> > > > > > phases
>> > > > > > >> > > > > > > > >>>>>>> 1. a PreparingRebalance phase that
>> > > identifies
>> > > > > > >> members
>> > > > > > >> > of
>> > > > > > >> > > > the
>> > > > > > >> > > > > > > > >> group
>> > > > > > >> > > > > > > > >>>> and
>> > > > > > >> > > > > > > > >>>>>>> aggregates member subscriptions.
>> > > > > > >> > > > > > > > >>>>>>> 2. an AwaitingSync phase that waits for
>> > the
>> > > > > group
>> > > > > > >> > leader
>> > > > > > >> > > to
>> > > > > > >> > > > > > > > >> decide
>> > > > > > >> > > > > > > > >>>>> member
>> > > > > > >> > > > > > > > >>>>>>> assignments based on the member
>> > > subscriptions
>> > > > > > across
>> > > > > > >> > the
>> > > > > > >> > > > > group.
>> > > > > > >> > > > > > > > >>>>>>>  - The leader announces this decision
>> > with a
>> > > > > > >> > > > > SyncGroupRequest.
>> > > > > > >> > > > > > > > >> The
>> > > > > > >> > > > > > > > >>>>>>> GroupCoordinator handles
>> SyncGroupRequests
>> > > by
>> > > > > > >> appending
>> > > > > > >> > > all
>> > > > > > >> > > > > > group
>> > > > > > >> > > > > > > > >>>> state
>> > > > > > >> > > > > > > > >>>>>>> into a single message under the
>> > > > > __consumer_offsets
>> > > > > > >> > topic.
>> > > > > > >> > > > > This
>> > > > > > >> > > > > > > > >>>> message
>> > > > > > >> > > > > > > > >>>>> is
>> > > > > > >> > > > > > > > >>>>>>> keyed on the group id and contains each
>> > > member
>> > > > > > >> > > subscription
>> > > > > > >> > > > > as
>> > > > > > >> > > > > > > > >> well
>> > > > > > >> > > > > > > > >>>> as
>> > > > > > >> > > > > > > > >>>>>> the
>> > > > > > >> > > > > > > > >>>>>>> decided assignment for each member.
>> > > > > > >> > > > > > > > >>>>>>>
>> > > > > > >> > > > > > > > >>>>>>> The environment:
>> > > > > > >> > > > > > > > >>>>>>> - one broker
>> > > > > > >> > > > > > > > >>>>>>> - one __consumer_offsets partition
>> > > > > > >> > > > > > > > >>>>>>> - offsets.topic.compression.codec=1 //
>> > this
>> > > is
>> > > > > > gzip
>> > > > > > >> > > > > > > > >>>>>>> - broker has my pending KAFKA-3718
>> patch
>> > > that
>> > > > > > >> actually
>> > > > > > >> > > > makes
>> > > > > > >> > > > > > use
>> > > > > > >> > > > > > > > >> of
>> > > > > > >> > > > > > > > >>>>>>> offsets.topic.compression.codec:
>> > > > > > >> > > > > > > > >>>>>>
>> https://github.com/apache/kafka/pull/1394
>> > > > > > >> > > > > > > > >>>>>>> - around 3000 topics. This is an actual
>> > > subset
>> > > > > of
>> > > > > > >> > topics
>> > > > > > >> > > > from
>> > > > > > >> > > > > > one
>> > > > > > >> > > > > > > > >>> of
>> > > > > > >> > > > > > > > >>>>> our
>> > > > > > >> > > > > > > > >>>>>>> clusters.
>> > > > > > >> > > > > > > > >>>>>>> - topics have 8 partitions
>> > > > > > >> > > > > > > > >>>>>>> - topics are 25 characters long on
>> average
>> > > > > > >> > > > > > > > >>>>>>> - one group with a varying number of
>> > > consumers
>> > > > > > each
>> > > > > > >> > > > hardcoded
>> > > > > > >> > > > > > > > >> with
>> > > > > > >> > > > > > > > >>>> all
>> > > > > > >> > > > > > > > >>>>>> the
>> > > > > > >> > > > > > > > >>>>>>> topics just to make the tests more
>> > > consistent.
>> > > > > > >> > > wildcarding
>> > > > > > >> > > > > with
>> > > > > > >> > > > > > > > >> .*
>> > > > > > >> > > > > > > > >>>>> should
>> > > > > > >> > > > > > > > >>>>>>> have the same effect once the
>> subscription
>> > > > hits
>> > > > > > the
>> > > > > > >> > > > > coordinator
>> > > > > > >> > > > > > > > >> as
>> > > > > > >> > > > > > > > >>>> the
>> > > > > > >> > > > > > > > >>>>>>> subscription has already been fully
>> > expanded
>> > > > out
>> > > > > > to
>> > > > > > >> the
>> > > > > > >> > > > list
>> > > > > > >> > > > > of
>> > > > > > >> > > > > > > > >>>> topics
>> > > > > > >> > > > > > > > >>>>> by
>> > > > > > >> > > > > > > > >>>>>>> the consumers.
>> > > > > > >> > > > > > > > >>>>>>> - I added some log messages to
>> Log.scala
>> > to
>> > > > > print
>> > > > > > >> out
>> > > > > > >> > the
>> > > > > > >> > > > > > message
>> > > > > > >> > > > > > > > >>>> sizes
>> > > > > > >> > > > > > > > >>>>>>> after compression
>> > > > > > >> > > > > > > > >>>>>>> - there are no producers at all and
>> auto
>> > > > commits
>> > > > > > are
>> > > > > > >> > > > > disabled.
>> > > > > > >> > > > > > > > >> The
>> > > > > > >> > > > > > > > >>>> only
>> > > > > > >> > > > > > > > >>>>>>> topic with messages getting added is
>> the
>> > > > > > >> > > __consumer_offsets
>> > > > > > >> > > > > > topic
>> > > > > > >> > > > > > > > >>> and
>> > > > > > >> > > > > > > > >>>>>>> they're only from storing group
>> metadata
>> > > while
>> > > > > > >> > processing
>> > > > > > >> > > > > > > > >>>>>>> SyncGroupRequests.
>> > > > > > >> > > > > > > > >>>>>>>
>> > > > > > >> > > > > > > > >>>>>>> Results:
>> > > > > > >> > > > > > > > >>>>>>> The results below show that we exceed
>> the
>> > > > > 1000012
>> > > > > > >> byte
>> > > > > > >> > > > > > > > >>>>>>> KafkaConfig.messageMaxBytes limit
>> > relatively
>> > > > > > quickly
>> > > > > > >> > > > (between
>> > > > > > >> > > > > > > > >> 30-40
>> > > > > > >> > > > > > > > >>>>>>> consumers):
>> > > > > > >> > > > > > > > >>>>>>> 1 consumer 54739 bytes
>> > > > > > >> > > > > > > > >>>>>>> 5 consumers 261524 bytes
>> > > > > > >> > > > > > > > >>>>>>> 10 consumers 459804 bytes
>> > > > > > >> > > > > > > > >>>>>>> 20 consumers 702499 bytes
>> > > > > > >> > > > > > > > >>>>>>> 30 consumers 930525 bytes
>> > > > > > >> > > > > > > > >>>>>>> 40 consumers 1115657 bytes * the
>> tipping
>> > > point
>> > > > > > >> > > > > > > > >>>>>>> 50 consumers 1363112 bytes
>> > > > > > >> > > > > > > > >>>>>>> 60 consumers 1598621 bytes
>> > > > > > >> > > > > > > > >>>>>>> 70 consumers 1837359 bytes
>> > > > > > >> > > > > > > > >>>>>>> 80 consumers 2066934 bytes
>> > > > > > >> > > > > > > > >>>>>>> 90 consumers 2310970 bytes
>> > > > > > >> > > > > > > > >>>>>>> 100 consumers 2542735 bytes
>> > > > > > >> > > > > > > > >>>>>>>
>> > > > > > >> > > > > > > > >>>>>>> Note that the growth itself is pretty
>> > > gradual.
>> > > > > > >> Plotting
>> > > > > > >> > > the
>> > > > > > >> > > > > > > > >> points
>> > > > > > >> > > > > > > > >>>>> makes
>> > > > > > >> > > > > > > > >>>>>> it
>> > > > > > >> > > > > > > > >>>>>>> look roughly linear w.r.t the number of
>> > > > > consumers:
>> > > > > > >> > > > > > > > >>>>>>>
>> > > > > > >> > > > > > > > >>>>>>>
>> > > > > > >> > > > > > > > >>>>>>
>> > > > > > >> > > > > > > > >>>>>
>> > > > > > >> > > > > > > > >>>>
>> > > > > > >> > > > > > > > >>>
>> > > > > > >> > > > > > > > >>
>> > > > > > >> > > > > > > >
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > >
>> > > > > > >> > > > >
>> > > > > > >> > > >
>> > > > > > >> > >
>> > > > > > >> >
>> > > > > > >>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://www.wolframalpha.com/input/?i=(1,+54739),+(5,+261524),+(10,+459804),+(20,+702499),+(30,+930525),+(40,+1115657),+(50,+1363112),+(60,+1598621),+(70,+1837359),+(80,+2066934),+(90,+2310970),+(100,+2542735)
>> > > > > > >> > > > > > > > >>>>>>>
>> > > > > > >> > > > > > > > >>>>>>> Also note that these numbers aren't
>> > averages
>> > > > or
>> > > > > > >> medians
>> > > > > > >> > > or
>> > > > > > >> > > > > > > > >> anything
>> > > > > > >> > > > > > > > >>>>> like
>> > > > > > >> > > > > > > > >>>>>>> that. It's just the byte size from a
>> given
>> > > > run.
>> > > > > I
>> > > > > > >> did
>> > > > > > >> > run
>> > > > > > >> > > > > them
>> > > > > > >> > > > > > a
>> > > > > > >> > > > > > > > >>> few
>> > > > > > >> > > > > > > > >>>>>> times
>> > > > > > >> > > > > > > > >>>>>>> and saw similar results.
>> > > > > > >> > > > > > > > >>>>>>>
>> > > > > > >> > > > > > > > >>>>>>> Impact:
>> > > > > > >> > > > > > > > >>>>>>> Even after adding gzip to the
>> > > > __consumer_offsets
>> > > > > > >> topic
>> > > > > > >> > > with
>> > > > > > >> > > > > my
>> > > > > > >> > > > > > > > >>>> pending
>> > > > > > >> > > > > > > > >>>>>>> KAFKA-3718 patch, the AwaitingSync
>> phase
>> > of
>> > > > the
>> > > > > > >> group
>> > > > > > >> > > fails
>> > > > > > >> > > > > > with
>> > > > > > >> > > > > > > > >>>>>>> RecordTooLargeException. This means the
>> > > > combined
>> > > > > > >> size
>> > > > > > >> > of
>> > > > > > >> > > > each
>> > > > > > >> > > > > > > > >>>> member's
>> > > > > > >> > > > > > > > >>>>>>> subscriptions and assignments exceeded
>> the
>> > > > > > >> > > > > > > > >>>> KafkaConfig.messageMaxBytes
>> > > > > > >> > > > > > > > >>>>> of
>> > > > > > >> > > > > > > > >>>>>>> 1000012 bytes. The group ends up dying.
>> > > > > > >> > > > > > > > >>>>>>>
>> > > > > > >> > > > > > > > >>>>>>> Options:
>> > > > > > >> > > > > > > > >>>>>>> 1. Config change: reduce the number of
>> > > > consumers
>> > > > > > in
>> > > > > > >> the
>> > > > > > >> > > > > group.
>> > > > > > >> > > > > > > > >> This
>> > > > > > >> > > > > > > > >>>>> isn't
>> > > > > > >> > > > > > > > >>>>>>> always a realistic answer in more
>> > strenuous
>> > > > use
>> > > > > > >> cases
>> > > > > > >> > > like
>> > > > > > >> > > > > > > > >>>> MirrorMaker
>> > > > > > >> > > > > > > > >>>>>>> clusters or for auditing.
>> > > > > > >> > > > > > > > >>>>>>> 2. Config change: split the group into
>> > > smaller
>> > > > > > >> groups
>> > > > > > >> > > which
>> > > > > > >> > > > > > > > >>> together
>> > > > > > >> > > > > > > > >>>>> will
>> > > > > > >> > > > > > > > >>>>>>> get full coverage of the topics. This
>> > gives
>> > > > each
>> > > > > > >> group
>> > > > > > >> > > > > member a
>> > > > > > >> > > > > > > > >>>> smaller
>> > > > > > >> > > > > > > > >>>>>>> subscription.(ex: g1 has topics
>> starting
>> > > with
>> > > > > a-m
>> > > > > > >> while
>> > > > > > >> > > g2
>> > > > > > >> > > > > has
>> > > > > > >> > > > > > > > >>> topics
>> > > > > > >> > > > > > > > >>>>>>> starting ith n-z). This would be
>> > > operationally
>> > > > > > >> painful
>> > > > > > >> > to
>> > > > > > >> > > > > > manage.
>> > > > > > >> > > > > > > > >>>>>>> 3. Config change: split the topics
>> among
>> > > > members
>> > > > > > of
>> > > > > > >> the
>> > > > > > >> > > > > group.
>> > > > > > >> > > > > > > > >>> Again
>> > > > > > >> > > > > > > > >>>>> this
>> > > > > > >> > > > > > > > >>>>>>> gives each group member a smaller
>> > > > subscription.
>> > > > > > This
>> > > > > > >> > > would
>> > > > > > >> > > > > also
>> > > > > > >> > > > > > > > >> be
>> > > > > > >> > > > > > > > >>>>>>> operationally painful to manage.
>> > > > > > >> > > > > > > > >>>>>>> 4. Config change: bump up
>> > > > > > >> KafkaConfig.messageMaxBytes
>> > > > > > >> > (a
>> > > > > > >> > > > > > > > >>> topic-level
>> > > > > > >> > > > > > > > >>>>>>> config) and
>> > KafkaConfig.replicaFetchMaxBytes
>> > > > (a
>> > > > > > >> > > > broker-level
>> > > > > > >> > > > > > > > >>> config).
>> > > > > > >> > > > > > > > >>>>>>> Applying messageMaxBytes to just the
>> > > > > > >> __consumer_offsets
>> > > > > > >> > > > topic
>> > > > > > >> > > > > > > > >> seems
>> > > > > > >> > > > > > > > >>>>>>> relatively harmless, but bumping up the
>> > > > > > broker-level
>> > > > > > >> > > > > > > > >>>>> replicaFetchMaxBytes
>> > > > > > >> > > > > > > > >>>>>>> would probably need more attention.
>> > > > > > >> > > > > > > > >>>>>>> 5. Config change: try different
>> > compression
>> > > > > > codecs.
>> > > > > > >> > Based
>> > > > > > >> > > > on
>> > > > > > >> > > > > 2
>> > > > > > >> > > > > > > > >>>> minutes
>> > > > > > >> > > > > > > > >>>>> of
>> > > > > > >> > > > > > > > >>>>>>> googling, it seems like lz4 and snappy
>> are
>> > > > > faster
>> > > > > > >> than
>> > > > > > >> > > gzip
>> > > > > > >> > > > > but
>> > > > > > >> > > > > > > > >>> have
>> > > > > > >> > > > > > > > >>>>>> worse
>> > > > > > >> > > > > > > > >>>>>>> compression, so this probably won't
>> help.
>> > > > > > >> > > > > > > > >>>>>>> 6. Implementation change: support
>> sending
>> > > the
>> > > > > > regex
>> > > > > > >> > over
>> > > > > > >> > > > the
>> > > > > > >> > > > > > wire
>> > > > > > >> > > > > > > > >>>>> instead
>> > > > > > >> > > > > > > > >>>>>>> of the fully expanded topic
>> > subscriptions. I
>> > > > > think
>> > > > > > >> > people
>> > > > > > >> > > > > said
>> > > > > > >> > > > > > in
>> > > > > > >> > > > > > > > >>> the
>> > > > > > >> > > > > > > > >>>>>> past
>> > > > > > >> > > > > > > > >>>>>>> that different languages have subtle
>> > > > differences
>> > > > > > in
>> > > > > > >> > > regex,
>> > > > > > >> > > > so
>> > > > > > >> > > > > > > > >> this
>> > > > > > >> > > > > > > > >>>>>> doesn't
>> > > > > > >> > > > > > > > >>>>>>> play nicely with cross-language groups.
>> > > > > > >> > > > > > > > >>>>>>> 7. Implementation change: maybe we can
>> > > reverse
>> > > > > the
>> > > > > > >> > > mapping?
>> > > > > > >> > > > > > > > >> Instead
>> > > > > > >> > > > > > > > >>>> of
>> > > > > > >> > > > > > > > >>>>>>> mapping from member to subscriptions,
>> we
>> > can
>> > > > > map a
>> > > > > > >> > > > > subscription
>> > > > > > >> > > > > > > > >> to
>> > > > > > >> > > > > > > > >>> a
>> > > > > > >> > > > > > > > >>>>> list
>> > > > > > >> > > > > > > > >>>>>>> of members.
>> > > > > > >> > > > > > > > >>>>>>> 8. Implementation change: maybe we can
>> try
>> > > to
>> > > > > > break
>> > > > > > >> > apart
>> > > > > > >> > > > the
>> > > > > > >> > > > > > > > >>>>>> subscription
>> > > > > > >> > > > > > > > >>>>>>> and assignments from the same
>> > > SyncGroupRequest
>> > > > > > into
>> > > > > > >> > > > multiple
>> > > > > > >> > > > > > > > >>> records?
>> > > > > > >> > > > > > > > >>>>>> They
>> > > > > > >> > > > > > > > >>>>>>> can still go to the same message set
>> and
>> > get
>> > > > > > >> appended
>> > > > > > >> > > > > together.
>> > > > > > >> > > > > > > > >>> This
>> > > > > > >> > > > > > > > >>>>> way
>> > > > > > >> > > > > > > > >>>>>>> the limit become the segment size,
>> which
>> > > > > shouldn't
>> > > > > > >> be a
>> > > > > > >> > > > > > problem.
>> > > > > > >> > > > > > > > >>> This
>> > > > > > >> > > > > > > > >>>>> can
>> > > > > > >> > > > > > > > >>>>>>> be tricky to get right because we're
>> > > currently
>> > > > > > >> keying
>> > > > > > >> > > these
>> > > > > > >> > > > > > > > >>> messages
>> > > > > > >> > > > > > > > >>>> on
>> > > > > > >> > > > > > > > >>>>>> the
>> > > > > > >> > > > > > > > >>>>>>> group, so I think records from the same
>> > > > > rebalance
>> > > > > > >> might
>> > > > > > >> > > > > > > > >>> accidentally
>> > > > > > >> > > > > > > > >>>>>>> compact one another, but my
>> understanding
>> > of
>> > > > > > >> compaction
>> > > > > > >> > > > isn't
>> > > > > > >> > > > > > > > >> that
>> > > > > > >> > > > > > > > >>>>> great.
>> > > > > > >> > > > > > > > >>>>>>>
>> > > > > > >> > > > > > > > >>>>>>> Todo:
>> > > > > > >> > > > > > > > >>>>>>> It would be interesting to rerun the
>> tests
>> > > > with
>> > > > > no
>> > > > > > >> > > > > compression
>> > > > > > >> > > > > > > > >> just
>> > > > > > >> > > > > > > > >>>> to
>> > > > > > >> > > > > > > > >>>>>> see
>> > > > > > >> > > > > > > > >>>>>>> how much gzip is helping but it's
>> getting
>> > > > late.
>> > > > > > >> Maybe
>> > > > > > >> > > > > tomorrow?
>> > > > > > >> > > > > > > > >>>>>>>
>> > > > > > >> > > > > > > > >>>>>>> - Onur
>> > > > > > >> > > > > > > > >>>>>>>
>> > > > > > >> > > > > > > > >>>>>>
>> > > > > > >> > > > > > > > >>>>>
>> > > > > > >> > > > > > > > >>>>
>> > > > > > >> > > > > > > > >>>>
>> > > > > > >> > > > > > > > >>>>
>> > > > > > >> > > > > > > > >>>> --
>> > > > > > >> > > > > > > > >>>> -- Guozhang
>> > > > > > >> > > > > > > > >>>>
>> > > > > > >> > > > > > > > >>>
>> > > > > > >> > > > > > > > >>
>> > > > > > >> > > > > > > > >>
>> > > > > > >> > > > > > > > >>
>> > > > > > >> > > > > > > > >> --
>> > > > > > >> > > > > > > > >> -- Guozhang
>> > > > > > >> > > > > > > > >>
>> > > > > > >> > > > > > > >
>> > > > > > >> > > > > > > >
>> > > > > > >> > > > > > >
>> > > > > > >> > > > > >
>> > > > > > >> > > > >
>> > > > > > >> > > >
>> > > > > > >> > >
>> > > > > > >> >
>> > > > > > >> >
>> > > > > > >> >
>> > > > > > >> > --
>> > > > > > >> > -- Guozhang
>> > > > > > >> >
>> > > > > > >>
>> > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to