Maybe another approach can be to add a new "offsets.replica.fetch.max.bytes" config on the brokers.
On Thu, Jun 9, 2016 at 3:03 AM, Onur Karaman <okara...@linkedin.com> wrote: > I made a PR with a tweak to Jun's/Becket's proposal: > https://github.com/apache/kafka/pull/1484 > > It just tweaks the fetch behavior specifically for replicas fetching from > the __consumer_offsets topic when the fetcher's "replica.fetch.max.bytes" > is less than the __consumer_offset leader's "message.max.bytes" to take the > max of the two. > > I'm honestly not that happy with this solution, as I'd rather not change > the "replica.fetch.max.bytes" config from being a limit to a > recommendation. I'd definitely be happy to hear other alternatives! > > On Sun, May 29, 2016 at 1:57 PM, Onur Karaman < > onurkaraman.apa...@gmail.com> wrote: > >> Sorry I know next to nothing about Kafka Connect. I didn't understand the >> Kafka Connect / MM idea you brought up. Can you go into more detail? >> >> Otherwise I think our remaining options are: >> - Jun's suggestion to bump up the KafkaConfig.messageMaxBytes for >> __consumer_offsets topic and change the fetch behavior when message size >> is >> larger than fetch size >> - option 6: support sending the regex over the wire instead of the fully >> expanded topic subscriptions. This should cut down the message size from >> the subscription side. Again this only helps when pattern-based >> subscriptions are done. >> >> minor correction to an earlier comment I made regarding the message size: >> message size ~ sum(s_i + a_i for i in range [1, |C|]) >> >> On Thu, May 26, 2016 at 3:35 PM, Jason Gustafson <ja...@confluent.io> >> wrote: >> >> > Hey Onur, >> > >> > Thanks for the investigation. It seems the conclusion is that the >> compact >> > format helps, but perhaps not enough to justify adding a new assignment >> > schema? I'm not sure there's much more room for savings unless we change >> > something more fundamental in the assignment approach. We spent some >> time >> > thinking before about whether we could let the consumers compute their >> > assignment locally from a smaller set of information, but the difficulty >> > (I'm sure you remember) is reaching consensus on topic metadata. Kafka >> > Connect has a similar problem where all the workers need to agree on >> > connector configurations. Since all configs are stored in a single topic >> > partition, the approach we take there is to propagate the offset in the >> > assignment protocol. Not sure if we could do something similar for MM... >> > Anyway, it seems like the best workaround at the moment is Jun's initial >> > suggestion. What do you think? >> > >> > -Jason >> > >> > On Wed, May 25, 2016 at 10:47 PM, Onur Karaman < >> > onurkaraman.apa...@gmail.com >> > > wrote: >> > >> > > I gave the topic index assignment trick a try against the same >> > environment. >> > > The implementation just changed the assignment serialization and >> > > deserialization logic. It didn't change SyncGroupResponse, meaning it >> > > continues to exclude the subscription from the SyncGroupResponse and >> > > assumes the member has kept track of its last subscription. >> > > >> > > Assignment topic indexing with compression: >> > > 1 consumer 34346 bytes >> > > 5 consumers 177687 bytes >> > > 10 consumers 331897 bytes >> > > 20 consumers 572467 bytes >> > > 30 consumers 811269 bytes >> > > 40 consumers 1047188 bytes * the tipping point >> > > 50 consumers 1290092 bytes >> > > 60 consumers 1527806 bytes >> > > 70 consumers 1769259 bytes >> > > 80 consumers 2000118 bytes >> > > 90 consumers 2244392 bytes >> > > 100 consumers 2482415 bytes >> > > >> > > Assignment topic indexing without compression: >> > > 1 consumer 211904 bytes >> > > 5 consumers 677184 bytes >> > > 10 consumers 1211154 bytes * the tipping point >> > > 20 consumers 2136196 bytes >> > > 30 consumers 3061238 bytes >> > > 40 consumers 3986280 bytes >> > > 50 consumers 4911322 bytes >> > > 60 consumers 5836284 bytes >> > > 70 consumers 6761246 bytes >> > > 80 consumers 7686208 bytes >> > > 90 consumers 8611170 bytes >> > > 100 consumers 9536132 bytes >> > > >> > > Assignment topic indexing seems to reduce the size by 500KB without >> > > compression and 80KB with compression. So assignment topic indexing >> makes >> > > some difference in both with and without compression but in our case >> was >> > > not nearly enough. >> > > >> > > This can be explained by the fact that we aren't actually hitting the >> > worst >> > > case scenario of each consumer being assigned a partition from every >> > topic. >> > > The reason is simple: a topic can only fully span all the consumers >> if it >> > > has at least as many partitions as there are consumers. Given that >> there >> > > are 8 partitions per topic and we have 100 consumers, it makes sense >> that >> > > we aren't close to this worse case scenario where topic indexing would >> > make >> > > a bigger difference. >> > > >> > > I tweaked the group leader's assignment code to print out the >> assignments >> > > and found that each consumer was getting either 238 or 239 partitions. >> > Each >> > > of these partitions were from unique topics. So the consumers were >> really >> > > getting partitions from 239 topics instead of the full worst case >> > scenario >> > > of 3000 topics. >> > > >> > > On Wed, May 25, 2016 at 1:42 PM, Jason Gustafson <ja...@confluent.io> >> > > wrote: >> > > >> > > > Gwen, Joel: >> > > > >> > > > That's correct. The protocol does allow us to give an assignor its >> own >> > > > assignment schema, but I think this will require a couple internal >> > > changes >> > > > to the consumer to make use of the full generality. >> > > > >> > > > One thing I'm a little uncertain about is whether we should use a >> > > different >> > > > protocol type. For a little context, the group membership protocol >> > allows >> > > > the client to provide a "protocol type" when joining the group to >> > ensure >> > > > that all members have some basic semantic compatibility. For >> example, >> > the >> > > > consumer uses "consumer" and Kafka Connect uses "connect." Currently >> > all >> > > > assignors using the "consumer" protocol share a common schema for >> > > > representing subscriptions and assignment. This is convenient for >> tools >> > > > (like consumer-groups.sh) since they just need to know how to parse >> the >> > > > "consumer" protocol type without knowing anything about the >> assignors. >> > So >> > > > introducing another schema would break that assumption and we'd need >> > > those >> > > > tools to do assignor-specific parsing. Maybe this is OK? >> Alternatively, >> > > we >> > > > could use a separate protocol type (e.g. "compact-consumer"), but >> that >> > > > seems less than desirable. >> > > > >> > > > Thanks, >> > > > Jason >> > > > >> > > > On Wed, May 25, 2016 at 11:00 AM, Gwen Shapira <g...@confluent.io> >> > > wrote: >> > > > >> > > > > ah, right - we can add as many strategies as we want. >> > > > > >> > > > > On Wed, May 25, 2016 at 10:54 AM, Joel Koshy <jjkosh...@gmail.com >> > >> > > > wrote: >> > > > > >> > > > > > > Yes it would be a protocol bump. >> > > > > > > >> > > > > > >> > > > > > Sorry - I'm officially confused. I think it may not be required >> - >> > > since >> > > > > the >> > > > > > more compact format would be associated with a new assignment >> > > strategy >> > > > - >> > > > > > right? >> > > > > > >> > > > > > >> > > > > > > smaller than the plaintext PAL, but the post-compressed binary >> > PAL >> > > is >> > > > > > just >> > > > > > > 25% smaller than the post-compressed plaintext PAL. IOW using >> a >> > > > symbol >> > > > > > > table helps a lot but further compression on that already >> compact >> > > > > format >> > > > > > > would yield only marginal return. >> > > > > > > >> > > > > > >> > > > > > > So basically I feel we could get pretty far with a more >> compact >> > > field >> > > > > > > format for assignment and if we do that then we would >> potentially >> > > not >> > > > > > even >> > > > > > > want to do any compression. >> > > > > > > >> > > > > > >> > > > > > Also just wanted to add that this compression on the binary PAL >> did >> > > > help >> > > > > > but the compression ratio was obviously not as high as plaintext >> > > > > > compression. >> > > > > > >> > > > > > >> > > > > > > >> > > > > > > On Tue, May 24, 2016 at 4:19 PM, Gwen Shapira < >> g...@confluent.io >> > > >> > > > > wrote: >> > > > > > > >> > > > > > >> Regarding the change to the assignment field. It would be a >> > > protocol >> > > > > > bump, >> > > > > > >> otherwise consumers will not know how to parse the bytes the >> > > broker >> > > > is >> > > > > > >> returning, right? >> > > > > > >> Or did I misunderstand the suggestion? >> > > > > > >> >> > > > > > >> On Tue, May 24, 2016 at 2:52 PM, Guozhang Wang < >> > > wangg...@gmail.com> >> > > > > > >> wrote: >> > > > > > >> >> > > > > > >> > I think for just solving issue 1), Jun's suggestion is >> > > sufficient >> > > > > and >> > > > > > >> > simple. So I'd prefer that approach. >> > > > > > >> > >> > > > > > >> > In addition, Jason's optimization on the assignment field >> > would >> > > be >> > > > > > good >> > > > > > >> for >> > > > > > >> > 2) and 3) as well, and I like that optimization for its >> > > simplicity >> > > > > and >> > > > > > >> no >> > > > > > >> > format change as well. And in the future I'm in favor of >> > > > considering >> > > > > > to >> > > > > > >> > change the in-memory cache format as Jiangjie suggested. >> > > > > > >> > >> > > > > > >> > Guozhang >> > > > > > >> > >> > > > > > >> > >> > > > > > >> > On Tue, May 24, 2016 at 12:42 PM, Becket Qin < >> > > > becket....@gmail.com> >> > > > > > >> wrote: >> > > > > > >> > >> > > > > > >> > > Hi Jason, >> > > > > > >> > > >> > > > > > >> > > There are a few problems we want to solve here: >> > > > > > >> > > 1. The group metadata is too big to be appended to the >> log. >> > > > > > >> > > 2. Reduce the memory footprint on the broker >> > > > > > >> > > 3. Reduce the bytes transferred over the wire. >> > > > > > >> > > >> > > > > > >> > > To solve (1), I like your idea of having separate >> messages >> > per >> > > > > > member. >> > > > > > >> > The >> > > > > > >> > > proposal (Onur's option 8) is to break metadata into >> small >> > > > records >> > > > > > in >> > > > > > >> the >> > > > > > >> > > same uncompressed message set so each record is small. I >> > agree >> > > > it >> > > > > > >> would >> > > > > > >> > be >> > > > > > >> > > ideal if we are able to store the metadata separately for >> > each >> > > > > > >> member. I >> > > > > > >> > > was also thinking about storing the metadata into >> multiple >> > > > > messages, >> > > > > > >> too. >> > > > > > >> > > What concerns me was that having multiple messages seems >> > > > breaking >> > > > > > the >> > > > > > >> > > atomicity. I am not sure how we are going to deal with >> the >> > > > > potential >> > > > > > >> > > issues. For example, What if group metadata is replicated >> > but >> > > > the >> > > > > > >> member >> > > > > > >> > > metadata is not? It might be fine depending on the >> > > > implementation >> > > > > > >> though, >> > > > > > >> > > but I am not sure. >> > > > > > >> > > >> > > > > > >> > > For (2) we want to store the metadata onto the disk, >> which >> > is >> > > > what >> > > > > > we >> > > > > > >> > have >> > > > > > >> > > to do anyway. The only question is in what format should >> we >> > > > store >> > > > > > >> them. >> > > > > > >> > > >> > > > > > >> > > To address (3) we want to have the metadata to be >> > compressed, >> > > > > which >> > > > > > is >> > > > > > >> > > contradict to the the above solution of (1). >> > > > > > >> > > >> > > > > > >> > > I think Jun's suggestion is probably still the simplest. >> To >> > > > avoid >> > > > > > >> > changing >> > > > > > >> > > the behavior for consumers, maybe we can do that only for >> > > > > > >> offset_topic, >> > > > > > >> > > i.e, if the max fetch bytes of the fetch request is >> smaller >> > > than >> > > > > the >> > > > > > >> > > message size on the offset topic, we always return at >> least >> > > one >> > > > > full >> > > > > > >> > > message. This should avoid the unexpected problem on the >> > > client >> > > > > side >> > > > > > >> > > because supposedly only tools and brokers will fetch from >> > the >> > > > the >> > > > > > >> > internal >> > > > > > >> > > topics, >> > > > > > >> > > >> > > > > > >> > > As a modification to what you suggested, one solution I >> was >> > > > > thinking >> > > > > > >> was >> > > > > > >> > to >> > > > > > >> > > have multiple messages in a single compressed message. >> That >> > > > means >> > > > > > for >> > > > > > >> > > SyncGroupResponse we still need to read the entire >> > compressed >> > > > > > messages >> > > > > > >> > and >> > > > > > >> > > extract the inner messages, which seems not quite >> different >> > > from >> > > > > > >> having a >> > > > > > >> > > single message containing everything. But let me just >> put it >> > > > here >> > > > > > and >> > > > > > >> see >> > > > > > >> > > if that makes sense. >> > > > > > >> > > >> > > > > > >> > > We can have a map of GroupMetadataKey -> >> > > > GroupMetadataValueOffset. >> > > > > > >> > > >> > > > > > >> > > The GroupMetadataValue is stored in a compressed message. >> > The >> > > > > inner >> > > > > > >> > > messages are the following: >> > > > > > >> > > >> > > > > > >> > > Inner Message 0: Version GroupId Generation >> > > > > > >> > > >> > > > > > >> > > Inner Message 1: MemberId MemberMetadata_1 (we can >> compress >> > > the >> > > > > > bytes >> > > > > > >> > here) >> > > > > > >> > > >> > > > > > >> > > Inner Message 2: MemberId MemberMetadata_2 >> > > > > > >> > > .... >> > > > > > >> > > Inner Message N: MemberId MemberMetadata_N >> > > > > > >> > > >> > > > > > >> > > The MemberMetadata format is the following: >> > > > > > >> > > MemberMetadata => Version Generation ClientId Host >> > > > Subscription >> > > > > > >> > > Assignment >> > > > > > >> > > >> > > > > > >> > > So DescribeGroupResponse will just return the entire >> > > compressed >> > > > > > >> > > GroupMetadataMessage. SyncGroupResponse will return the >> > > > > > corresponding >> > > > > > >> > inner >> > > > > > >> > > message. >> > > > > > >> > > >> > > > > > >> > > Thanks, >> > > > > > >> > > >> > > > > > >> > > Jiangjie (Becket) Qin >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > >> > > > > > >> > > On Tue, May 24, 2016 at 9:14 AM, Jason Gustafson < >> > > > > > ja...@confluent.io> >> > > > > > >> > > wrote: >> > > > > > >> > > >> > > > > > >> > > > Hey Becket, >> > > > > > >> > > > >> > > > > > >> > > > I like your idea to store only the offset for the group >> > > > metadata >> > > > > > in >> > > > > > >> > > memory. >> > > > > > >> > > > I think it would be safe to keep it in memory for a >> short >> > > time >> > > > > > after >> > > > > > >> > the >> > > > > > >> > > > rebalance completes, but after that, it's only real >> > purpose >> > > is >> > > > > to >> > > > > > >> > answer >> > > > > > >> > > > DescribeGroup requests, so your proposal makes a lot of >> > > sense >> > > > to >> > > > > > me. >> > > > > > >> > > > >> > > > > > >> > > > As for the specific problem with the size of the group >> > > > metadata >> > > > > > >> message >> > > > > > >> > > for >> > > > > > >> > > > the MM case, if we cannot succeed in reducing the size >> of >> > > the >> > > > > > >> > > > subscription/assignment (which I think is still >> probably >> > the >> > > > > best >> > > > > > >> > > > alternative if it can work), then I think there are >> some >> > > > options >> > > > > > for >> > > > > > >> > > > changing the message format (option #8 in Onur's >> initial >> > > > > e-mail). >> > > > > > >> > > > Currently, the key used for storing the group metadata >> is >> > > > this: >> > > > > > >> > > > >> > > > > > >> > > > GroupMetadataKey => Version GroupId >> > > > > > >> > > > >> > > > > > >> > > > And the value is something like this (some details >> > elided): >> > > > > > >> > > > >> > > > > > >> > > > GroupMetadataValue => Version GroupId Generation >> > > > > [MemberMetadata] >> > > > > > >> > > > MemberMetadata => ClientId Host Subscription >> Assignment >> > > > > > >> > > > >> > > > > > >> > > > I don't think we can change the key without a lot of >> pain, >> > > but >> > > > > it >> > > > > > >> seems >> > > > > > >> > > > like we can change the value format. Maybe we can take >> the >> > > > > > >> > > > subscription/assignment payloads out of the value and >> > > > introduce >> > > > > a >> > > > > > >> new >> > > > > > >> > > > "MemberMetadata" message for each member in the group. >> For >> > > > > > example: >> > > > > > >> > > > >> > > > > > >> > > > MemberMetadataKey => Version GroupId MemberId >> > > > > > >> > > > >> > > > > > >> > > > MemberMetadataValue => Version Generation ClientId Host >> > > > > > Subscription >> > > > > > >> > > > Assignment >> > > > > > >> > > > >> > > > > > >> > > > When a new generation is created, we would first write >> the >> > > > group >> > > > > > >> > metadata >> > > > > > >> > > > message which includes the generation and all of the >> > > > memberIds, >> > > > > > and >> > > > > > >> > then >> > > > > > >> > > > we'd write the member metadata messages. To answer the >> > > > > > DescribeGroup >> > > > > > >> > > > request, we'd read the group metadata at the cached >> offset >> > > > and, >> > > > > > >> > depending >> > > > > > >> > > > on the version, all of the following member metadata. >> This >> > > > would >> > > > > > be >> > > > > > >> > more >> > > > > > >> > > > complex to maintain, but it seems doable if it comes to >> > it. >> > > > > > >> > > > >> > > > > > >> > > > Thanks, >> > > > > > >> > > > Jason >> > > > > > >> > > > >> > > > > > >> > > > On Mon, May 23, 2016 at 6:15 PM, Becket Qin < >> > > > > becket....@gmail.com >> > > > > > > >> > > > > > >> > > wrote: >> > > > > > >> > > > >> > > > > > >> > > > > It might worth thinking a little further. We have >> > > discussed >> > > > > this >> > > > > > >> > before >> > > > > > >> > > > > that we want to avoid holding all the group metadata >> in >> > > > > memory. >> > > > > > >> > > > > >> > > > > > >> > > > > I am thinking about the following end state: >> > > > > > >> > > > > >> > > > > > >> > > > > 1. Enable compression on the offset topic. >> > > > > > >> > > > > 2. Instead of holding the entire group metadata in >> > memory >> > > on >> > > > > the >> > > > > > >> > > brokers, >> > > > > > >> > > > > each broker only keeps a [group -> Offset] map, the >> > offset >> > > > > > points >> > > > > > >> to >> > > > > > >> > > the >> > > > > > >> > > > > message in the offset topic which holds the latest >> > > metadata >> > > > of >> > > > > > the >> > > > > > >> > > group. >> > > > > > >> > > > > 3. DescribeGroupResponse will read from the offset >> topic >> > > > > > directly >> > > > > > >> > like >> > > > > > >> > > a >> > > > > > >> > > > > normal consumption, except that only exactly one >> message >> > > > will >> > > > > be >> > > > > > >> > > > returned. >> > > > > > >> > > > > 4. SyncGroupResponse will read the message, extract >> the >> > > > > > assignment >> > > > > > >> > part >> > > > > > >> > > > and >> > > > > > >> > > > > send back the partition assignment. We can compress >> the >> > > > > > partition >> > > > > > >> > > > > assignment before sends it out if we want. >> > > > > > >> > > > > >> > > > > > >> > > > > Jiangjie (Becket) Qin >> > > > > > >> > > > > >> > > > > > >> > > > > On Mon, May 23, 2016 at 5:08 PM, Jason Gustafson < >> > > > > > >> ja...@confluent.io >> > > > > > >> > > >> > > > > > >> > > > > wrote: >> > > > > > >> > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > Jason, doesn't gzip (or other compression) >> basically >> > > do >> > > > > > this? >> > > > > > >> If >> > > > > > >> > > the >> > > > > > >> > > > > > topic >> > > > > > >> > > > > > > is a string and the topic is repeated throughout, >> > > won't >> > > > > > >> > compression >> > > > > > >> > > > > > > basically replace all repeated instances of it >> with >> > an >> > > > > index >> > > > > > >> > > > reference >> > > > > > >> > > > > to >> > > > > > >> > > > > > > the full string? >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > Hey James, yeah, that's probably true, but keep in >> > mind >> > > > that >> > > > > > the >> > > > > > >> > > > > > compression happens on the broker side. It would be >> > nice >> > > > to >> > > > > > >> have a >> > > > > > >> > > more >> > > > > > >> > > > > > compact representation so that get some benefit >> over >> > the >> > > > > wire >> > > > > > as >> > > > > > >> > > well. >> > > > > > >> > > > > This >> > > > > > >> > > > > > seems to be less of a concern here, so the bigger >> > gains >> > > > are >> > > > > > >> > probably >> > > > > > >> > > > from >> > > > > > >> > > > > > reducing the number of partitions that need to be >> > listed >> > > > > > >> > > individually. >> > > > > > >> > > > > > >> > > > > > >> > > > > > -Jason >> > > > > > >> > > > > > >> > > > > > >> > > > > > On Mon, May 23, 2016 at 4:23 PM, Onur Karaman < >> > > > > > >> > > > > > onurkaraman.apa...@gmail.com> >> > > > > > >> > > > > > wrote: >> > > > > > >> > > > > > >> > > > > > >> > > > > > > When figuring out these optimizations, it's worth >> > > > keeping >> > > > > in >> > > > > > >> mind >> > > > > > >> > > the >> > > > > > >> > > > > > > improvements when the message is uncompressed vs >> > when >> > > > it's >> > > > > > >> > > > compressed. >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > When uncompressed: >> > > > > > >> > > > > > > Fixing the Assignment serialization to instead >> be a >> > > > topic >> > > > > > >> index >> > > > > > >> > > into >> > > > > > >> > > > > the >> > > > > > >> > > > > > > corresponding member's subscription list would >> > usually >> > > > be >> > > > > a >> > > > > > >> good >> > > > > > >> > > > thing. >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > I think the proposal is only worse when the topic >> > > names >> > > > > are >> > > > > > >> > small. >> > > > > > >> > > > The >> > > > > > >> > > > > > > Type.STRING we use in our protocol for the >> > > assignment's >> > > > > > >> > > > TOPIC_KEY_NAME >> > > > > > >> > > > > is >> > > > > > >> > > > > > > limited in length to Short.MAX_VALUE, so our >> strings >> > > are >> > > > > > first >> > > > > > >> > > > > prepended >> > > > > > >> > > > > > > with 2 bytes to indicate the string size. >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > The new proposal does worse when: >> > > > > > >> > > > > > > 2 + utf_encoded_string_payload_size < >> > index_type_size >> > > > > > >> > > > > > > in other words when: >> > > > > > >> > > > > > > utf_encoded_string_payload_size < >> index_type_size - >> > 2 >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > If the index type ends up being Type.INT32, then >> the >> > > > > > proposal >> > > > > > >> is >> > > > > > >> > > > worse >> > > > > > >> > > > > > when >> > > > > > >> > > > > > > the topic is length 1. >> > > > > > >> > > > > > > If the index type ends up being Type.INT64, then >> the >> > > > > > proposal >> > > > > > >> is >> > > > > > >> > > > worse >> > > > > > >> > > > > > when >> > > > > > >> > > > > > > the topic is less than length 6. >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > When compressed: >> > > > > > >> > > > > > > As James Cheng brought up, I'm not sure how >> things >> > > > change >> > > > > > when >> > > > > > >> > > > > > compression >> > > > > > >> > > > > > > comes into the picture. This would be worth >> > > > investigating. >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > On Mon, May 23, 2016 at 4:05 PM, James Cheng < >> > > > > > >> > wushuja...@gmail.com >> > > > > > >> > > > >> > > > > > >> > > > > > wrote: >> > > > > > >> > > > > > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > > > On May 23, 2016, at 10:59 AM, Jason >> Gustafson < >> > > > > > >> > > > ja...@confluent.io> >> > > > > > >> > > > > > > > wrote: >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > > > 2. Maybe there's a better way to lay out the >> > > > > assignment >> > > > > > >> > without >> > > > > > >> > > > > > needing >> > > > > > >> > > > > > > > to >> > > > > > >> > > > > > > > > explicitly repeat the topic? For example, the >> > > leader >> > > > > > could >> > > > > > >> > sort >> > > > > > >> > > > the >> > > > > > >> > > > > > > > topics >> > > > > > >> > > > > > > > > for each member and just use an integer to >> > > represent >> > > > > the >> > > > > > >> > index >> > > > > > >> > > of >> > > > > > >> > > > > > each >> > > > > > >> > > > > > > > > topic within the sorted list (note this >> depends >> > on >> > > > the >> > > > > > >> > > > subscription >> > > > > > >> > > > > > > > > including the full topic list). >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > > > Assignment -> [TopicIndex [Partition]] >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > > Jason, doesn't gzip (or other compression) >> > basically >> > > > do >> > > > > > >> this? >> > > > > > >> > If >> > > > > > >> > > > the >> > > > > > >> > > > > > > topic >> > > > > > >> > > > > > > > is a string and the topic is repeated >> throughout, >> > > > won't >> > > > > > >> > > compression >> > > > > > >> > > > > > > > basically replace all repeated instances of it >> > with >> > > an >> > > > > > index >> > > > > > >> > > > > reference >> > > > > > >> > > > > > to >> > > > > > >> > > > > > > > the full string? >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > > -James >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > > > You could even combine these two options so >> that >> > > you >> > > > > > have >> > > > > > >> > only >> > > > > > >> > > 3 >> > > > > > >> > > > > > > integers >> > > > > > >> > > > > > > > > for each topic assignment: >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > > > Assignment -> [TopicIndex MinPartition >> > > MaxPartition] >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > > > There may even be better options with a >> little >> > > more >> > > > > > >> thought. >> > > > > > >> > > All >> > > > > > >> > > > of >> > > > > > >> > > > > > > this >> > > > > > >> > > > > > > > is >> > > > > > >> > > > > > > > > just part of the client-side protocol, so it >> > > > wouldn't >> > > > > > >> require >> > > > > > >> > > any >> > > > > > >> > > > > > > version >> > > > > > >> > > > > > > > > bumps on the broker. What do you think? >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > > > Thanks, >> > > > > > >> > > > > > > > > Jason >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > > > On Mon, May 23, 2016 at 9:17 AM, Guozhang >> Wang < >> > > > > > >> > > > wangg...@gmail.com >> > > > > > >> > > > > > >> > > > > > >> > > > > > > > wrote: >> > > > > > >> > > > > > > > > >> > > > > > >> > > > > > > > >> The original concern is that regex may not >> be >> > > > > > efficiently >> > > > > > >> > > > > supported >> > > > > > >> > > > > > > > >> across-languages, but if there is a neat >> > > > workaround I >> > > > > > >> would >> > > > > > >> > > love >> > > > > > >> > > > > to >> > > > > > >> > > > > > > > learn. >> > > > > > >> > > > > > > > >> >> > > > > > >> > > > > > > > >> Guozhang >> > > > > > >> > > > > > > > >> >> > > > > > >> > > > > > > > >> On Mon, May 23, 2016 at 5:31 AM, Ismael >> Juma < >> > > > > > >> > > ism...@juma.me.uk >> > > > > > >> > > > > >> > > > > > >> > > > > > > wrote: >> > > > > > >> > > > > > > > >> >> > > > > > >> > > > > > > > >>> +1 to Jun's suggestion. >> > > > > > >> > > > > > > > >>> >> > > > > > >> > > > > > > > >>> Having said that, as a general point, I >> think >> > we >> > > > > > should >> > > > > > >> > > > consider >> > > > > > >> > > > > > > > >> supporting >> > > > > > >> > > > > > > > >>> topic patterns in the wire protocol. It >> > requires >> > > > > some >> > > > > > >> > > thinking >> > > > > > >> > > > > for >> > > > > > >> > > > > > > > >>> cross-language support, but it seems >> > > surmountable >> > > > > and >> > > > > > it >> > > > > > >> > > could >> > > > > > >> > > > > make >> > > > > > >> > > > > > > > >> certain >> > > > > > >> > > > > > > > >>> operations a lot more efficient (the fact >> > that a >> > > > > basic >> > > > > > >> > regex >> > > > > > >> > > > > > > > subscription >> > > > > > >> > > > > > > > >>> causes the consumer to request metadata for >> > all >> > > > > topics >> > > > > > >> is >> > > > > > >> > not >> > > > > > >> > > > > > great). >> > > > > > >> > > > > > > > >>> >> > > > > > >> > > > > > > > >>> Ismael >> > > > > > >> > > > > > > > >>> >> > > > > > >> > > > > > > > >>> On Sun, May 22, 2016 at 11:49 PM, Guozhang >> > Wang >> > > < >> > > > > > >> > > > > > wangg...@gmail.com> >> > > > > > >> > > > > > > > >>> wrote: >> > > > > > >> > > > > > > > >>> >> > > > > > >> > > > > > > > >>>> I like Jun's suggestion in changing the >> > > handling >> > > > > > >> logics of >> > > > > > >> > > > > single >> > > > > > >> > > > > > > > large >> > > > > > >> > > > > > > > >>>> message on the consumer side. >> > > > > > >> > > > > > > > >>>> >> > > > > > >> > > > > > > > >>>> As for the case of "a single group >> > subscribing >> > > to >> > > > > > 3000 >> > > > > > >> > > > topics", >> > > > > > >> > > > > > with >> > > > > > >> > > > > > > > >> 100 >> > > > > > >> > > > > > > > >>>> consumers the 2.5Mb Gzip size is >> reasonable >> > to >> > > me >> > > > > > (when >> > > > > > >> > > > storing >> > > > > > >> > > > > in >> > > > > > >> > > > > > > ZK, >> > > > > > >> > > > > > > > >> we >> > > > > > >> > > > > > > > >>>> also have the znode limit which is set to >> 1Mb >> > > by >> > > > > > >> default, >> > > > > > >> > > > though >> > > > > > >> > > > > > > > >>> admittedly >> > > > > > >> > > > > > > > >>>> it is only for one consumer). And if we do >> > the >> > > > > change >> > > > > > >> as >> > > > > > >> > Jun >> > > > > > >> > > > > > > > suggested, >> > > > > > >> > > > > > > > >>>> 2.5Mb on follower's memory pressure is OK >> I >> > > > think. >> > > > > > >> > > > > > > > >>>> >> > > > > > >> > > > > > > > >>>> >> > > > > > >> > > > > > > > >>>> Guozhang >> > > > > > >> > > > > > > > >>>> >> > > > > > >> > > > > > > > >>>> >> > > > > > >> > > > > > > > >>>> On Sat, May 21, 2016 at 12:51 PM, Onur >> > Karaman >> > > < >> > > > > > >> > > > > > > > >>>> onurkaraman.apa...@gmail.com >> > > > > > >> > > > > > > > >>>>> wrote: >> > > > > > >> > > > > > > > >>>> >> > > > > > >> > > > > > > > >>>>> Results without compression: >> > > > > > >> > > > > > > > >>>>> 1 consumer 292383 bytes >> > > > > > >> > > > > > > > >>>>> 5 consumers 1079579 bytes * the tipping >> > point >> > > > > > >> > > > > > > > >>>>> 10 consumers 1855018 bytes >> > > > > > >> > > > > > > > >>>>> 20 consumers 2780220 bytes >> > > > > > >> > > > > > > > >>>>> 30 consumers 3705422 bytes >> > > > > > >> > > > > > > > >>>>> 40 consumers 4630624 bytes >> > > > > > >> > > > > > > > >>>>> 50 consumers 5555826 bytes >> > > > > > >> > > > > > > > >>>>> 60 consumers 6480788 bytes >> > > > > > >> > > > > > > > >>>>> 70 consumers 7405750 bytes >> > > > > > >> > > > > > > > >>>>> 80 consumers 8330712 bytes >> > > > > > >> > > > > > > > >>>>> 90 consumers 9255674 bytes >> > > > > > >> > > > > > > > >>>>> 100 consumers 10180636 bytes >> > > > > > >> > > > > > > > >>>>> >> > > > > > >> > > > > > > > >>>>> So it looks like gzip compression shrinks >> > the >> > > > > > message >> > > > > > >> > size >> > > > > > >> > > by >> > > > > > >> > > > > 4x. >> > > > > > >> > > > > > > > >>>>> >> > > > > > >> > > > > > > > >>>>> On Sat, May 21, 2016 at 9:47 AM, Jun Rao >> < >> > > > > > >> > j...@confluent.io >> > > > > > >> > > > >> > > > > > >> > > > > > wrote: >> > > > > > >> > > > > > > > >>>>> >> > > > > > >> > > > > > > > >>>>>> Onur, >> > > > > > >> > > > > > > > >>>>>> >> > > > > > >> > > > > > > > >>>>>> Thanks for the investigation. >> > > > > > >> > > > > > > > >>>>>> >> > > > > > >> > > > > > > > >>>>>> Another option is to just fix how we >> deal >> > > with >> > > > > the >> > > > > > >> case >> > > > > > >> > > > when a >> > > > > > >> > > > > > > > >>> message >> > > > > > >> > > > > > > > >>>> is >> > > > > > >> > > > > > > > >>>>>> larger than the fetch size. Today, if >> the >> > > fetch >> > > > > > size >> > > > > > >> is >> > > > > > >> > > > > smaller >> > > > > > >> > > > > > > > >> than >> > > > > > >> > > > > > > > >>>> the >> > > > > > >> > > > > > > > >>>>>> fetch size, the consumer will get stuck. >> > > > Instead, >> > > > > > we >> > > > > > >> can >> > > > > > >> > > > > simply >> > > > > > >> > > > > > > > >>> return >> > > > > > >> > > > > > > > >>>>> the >> > > > > > >> > > > > > > > >>>>>> full message if it's larger than the >> fetch >> > > size >> > > > > w/o >> > > > > > >> > > > requiring >> > > > > > >> > > > > > the >> > > > > > >> > > > > > > > >>>>> consumer >> > > > > > >> > > > > > > > >>>>>> to manually adjust the fetch size. On >> the >> > > > broker >> > > > > > >> side, >> > > > > > >> > to >> > > > > > >> > > > > serve >> > > > > > >> > > > > > a >> > > > > > >> > > > > > > > >>> fetch >> > > > > > >> > > > > > > > >>>>>> request, we already do an index lookup >> and >> > > then >> > > > > > scan >> > > > > > >> the >> > > > > > >> > > > log a >> > > > > > >> > > > > > bit >> > > > > > >> > > > > > > > >> to >> > > > > > >> > > > > > > > >>>>> find >> > > > > > >> > > > > > > > >>>>>> the message with the requested offset. >> We >> > can >> > > > > just >> > > > > > >> check >> > > > > > >> > > the >> > > > > > >> > > > > > size >> > > > > > >> > > > > > > > >> of >> > > > > > >> > > > > > > > >>>> that >> > > > > > >> > > > > > > > >>>>>> message and return the full message if >> its >> > > size >> > > > > is >> > > > > > >> > larger >> > > > > > >> > > > than >> > > > > > >> > > > > > the >> > > > > > >> > > > > > > > >>>> fetch >> > > > > > >> > > > > > > > >>>>>> size. This way, fetch size is really for >> > > > > > performance >> > > > > > >> > > > > > optimization, >> > > > > > >> > > > > > > > >>> i.e. >> > > > > > >> > > > > > > > >>>>> in >> > > > > > >> > > > > > > > >>>>>> the common case, we will not return more >> > > bytes >> > > > > than >> > > > > > >> > fetch >> > > > > > >> > > > > size, >> > > > > > >> > > > > > > but >> > > > > > >> > > > > > > > >>> if >> > > > > > >> > > > > > > > >>>>>> there is a large message, we will return >> > more >> > > > > bytes >> > > > > > >> than >> > > > > > >> > > the >> > > > > > >> > > > > > > > >>> specified >> > > > > > >> > > > > > > > >>>>>> fetch size. In practice, large messages >> are >> > > > rare. >> > > > > > >> So, it >> > > > > > >> > > > > > shouldn't >> > > > > > >> > > > > > > > >>>>> increase >> > > > > > >> > > > > > > > >>>>>> the memory consumption on the client too >> > > much. >> > > > > > >> > > > > > > > >>>>>> >> > > > > > >> > > > > > > > >>>>>> Jun >> > > > > > >> > > > > > > > >>>>>> >> > > > > > >> > > > > > > > >>>>>> On Sat, May 21, 2016 at 3:34 AM, Onur >> > > Karaman < >> > > > > > >> > > > > > > > >>>>>> onurkaraman.apa...@gmail.com> >> > > > > > >> > > > > > > > >>>>>> wrote: >> > > > > > >> > > > > > > > >>>>>> >> > > > > > >> > > > > > > > >>>>>>> Hey everyone. So I started doing some >> > tests >> > > on >> > > > > the >> > > > > > >> new >> > > > > > >> > > > > > > > >>>>>> consumer/coordinator >> > > > > > >> > > > > > > > >>>>>>> to see if it could handle more >> strenuous >> > use >> > > > > cases >> > > > > > >> like >> > > > > > >> > > > > > mirroring >> > > > > > >> > > > > > > > >>>>>> clusters >> > > > > > >> > > > > > > > >>>>>>> with thousands of topics and thought >> I'd >> > > share >> > > > > > >> > whatever I >> > > > > > >> > > > > have >> > > > > > >> > > > > > so >> > > > > > >> > > > > > > > >>>> far. >> > > > > > >> > > > > > > > >>>>>>> >> > > > > > >> > > > > > > > >>>>>>> The scalability limit: the amount of >> group >> > > > > > metadata >> > > > > > >> we >> > > > > > >> > > can >> > > > > > >> > > > > fit >> > > > > > >> > > > > > > > >> into >> > > > > > >> > > > > > > > >>>> one >> > > > > > >> > > > > > > > >>>>>>> message >> > > > > > >> > > > > > > > >>>>>>> >> > > > > > >> > > > > > > > >>>>>>> Some background: >> > > > > > >> > > > > > > > >>>>>>> Client-side assignment is implemented >> in >> > two >> > > > > > phases >> > > > > > >> > > > > > > > >>>>>>> 1. a PreparingRebalance phase that >> > > identifies >> > > > > > >> members >> > > > > > >> > of >> > > > > > >> > > > the >> > > > > > >> > > > > > > > >> group >> > > > > > >> > > > > > > > >>>> and >> > > > > > >> > > > > > > > >>>>>>> aggregates member subscriptions. >> > > > > > >> > > > > > > > >>>>>>> 2. an AwaitingSync phase that waits for >> > the >> > > > > group >> > > > > > >> > leader >> > > > > > >> > > to >> > > > > > >> > > > > > > > >> decide >> > > > > > >> > > > > > > > >>>>> member >> > > > > > >> > > > > > > > >>>>>>> assignments based on the member >> > > subscriptions >> > > > > > across >> > > > > > >> > the >> > > > > > >> > > > > group. >> > > > > > >> > > > > > > > >>>>>>> - The leader announces this decision >> > with a >> > > > > > >> > > > > SyncGroupRequest. >> > > > > > >> > > > > > > > >> The >> > > > > > >> > > > > > > > >>>>>>> GroupCoordinator handles >> SyncGroupRequests >> > > by >> > > > > > >> appending >> > > > > > >> > > all >> > > > > > >> > > > > > group >> > > > > > >> > > > > > > > >>>> state >> > > > > > >> > > > > > > > >>>>>>> into a single message under the >> > > > > __consumer_offsets >> > > > > > >> > topic. >> > > > > > >> > > > > This >> > > > > > >> > > > > > > > >>>> message >> > > > > > >> > > > > > > > >>>>> is >> > > > > > >> > > > > > > > >>>>>>> keyed on the group id and contains each >> > > member >> > > > > > >> > > subscription >> > > > > > >> > > > > as >> > > > > > >> > > > > > > > >> well >> > > > > > >> > > > > > > > >>>> as >> > > > > > >> > > > > > > > >>>>>> the >> > > > > > >> > > > > > > > >>>>>>> decided assignment for each member. >> > > > > > >> > > > > > > > >>>>>>> >> > > > > > >> > > > > > > > >>>>>>> The environment: >> > > > > > >> > > > > > > > >>>>>>> - one broker >> > > > > > >> > > > > > > > >>>>>>> - one __consumer_offsets partition >> > > > > > >> > > > > > > > >>>>>>> - offsets.topic.compression.codec=1 // >> > this >> > > is >> > > > > > gzip >> > > > > > >> > > > > > > > >>>>>>> - broker has my pending KAFKA-3718 >> patch >> > > that >> > > > > > >> actually >> > > > > > >> > > > makes >> > > > > > >> > > > > > use >> > > > > > >> > > > > > > > >> of >> > > > > > >> > > > > > > > >>>>>>> offsets.topic.compression.codec: >> > > > > > >> > > > > > > > >>>>>> >> https://github.com/apache/kafka/pull/1394 >> > > > > > >> > > > > > > > >>>>>>> - around 3000 topics. This is an actual >> > > subset >> > > > > of >> > > > > > >> > topics >> > > > > > >> > > > from >> > > > > > >> > > > > > one >> > > > > > >> > > > > > > > >>> of >> > > > > > >> > > > > > > > >>>>> our >> > > > > > >> > > > > > > > >>>>>>> clusters. >> > > > > > >> > > > > > > > >>>>>>> - topics have 8 partitions >> > > > > > >> > > > > > > > >>>>>>> - topics are 25 characters long on >> average >> > > > > > >> > > > > > > > >>>>>>> - one group with a varying number of >> > > consumers >> > > > > > each >> > > > > > >> > > > hardcoded >> > > > > > >> > > > > > > > >> with >> > > > > > >> > > > > > > > >>>> all >> > > > > > >> > > > > > > > >>>>>> the >> > > > > > >> > > > > > > > >>>>>>> topics just to make the tests more >> > > consistent. >> > > > > > >> > > wildcarding >> > > > > > >> > > > > with >> > > > > > >> > > > > > > > >> .* >> > > > > > >> > > > > > > > >>>>> should >> > > > > > >> > > > > > > > >>>>>>> have the same effect once the >> subscription >> > > > hits >> > > > > > the >> > > > > > >> > > > > coordinator >> > > > > > >> > > > > > > > >> as >> > > > > > >> > > > > > > > >>>> the >> > > > > > >> > > > > > > > >>>>>>> subscription has already been fully >> > expanded >> > > > out >> > > > > > to >> > > > > > >> the >> > > > > > >> > > > list >> > > > > > >> > > > > of >> > > > > > >> > > > > > > > >>>> topics >> > > > > > >> > > > > > > > >>>>> by >> > > > > > >> > > > > > > > >>>>>>> the consumers. >> > > > > > >> > > > > > > > >>>>>>> - I added some log messages to >> Log.scala >> > to >> > > > > print >> > > > > > >> out >> > > > > > >> > the >> > > > > > >> > > > > > message >> > > > > > >> > > > > > > > >>>> sizes >> > > > > > >> > > > > > > > >>>>>>> after compression >> > > > > > >> > > > > > > > >>>>>>> - there are no producers at all and >> auto >> > > > commits >> > > > > > are >> > > > > > >> > > > > disabled. >> > > > > > >> > > > > > > > >> The >> > > > > > >> > > > > > > > >>>> only >> > > > > > >> > > > > > > > >>>>>>> topic with messages getting added is >> the >> > > > > > >> > > __consumer_offsets >> > > > > > >> > > > > > topic >> > > > > > >> > > > > > > > >>> and >> > > > > > >> > > > > > > > >>>>>>> they're only from storing group >> metadata >> > > while >> > > > > > >> > processing >> > > > > > >> > > > > > > > >>>>>>> SyncGroupRequests. >> > > > > > >> > > > > > > > >>>>>>> >> > > > > > >> > > > > > > > >>>>>>> Results: >> > > > > > >> > > > > > > > >>>>>>> The results below show that we exceed >> the >> > > > > 1000012 >> > > > > > >> byte >> > > > > > >> > > > > > > > >>>>>>> KafkaConfig.messageMaxBytes limit >> > relatively >> > > > > > quickly >> > > > > > >> > > > (between >> > > > > > >> > > > > > > > >> 30-40 >> > > > > > >> > > > > > > > >>>>>>> consumers): >> > > > > > >> > > > > > > > >>>>>>> 1 consumer 54739 bytes >> > > > > > >> > > > > > > > >>>>>>> 5 consumers 261524 bytes >> > > > > > >> > > > > > > > >>>>>>> 10 consumers 459804 bytes >> > > > > > >> > > > > > > > >>>>>>> 20 consumers 702499 bytes >> > > > > > >> > > > > > > > >>>>>>> 30 consumers 930525 bytes >> > > > > > >> > > > > > > > >>>>>>> 40 consumers 1115657 bytes * the >> tipping >> > > point >> > > > > > >> > > > > > > > >>>>>>> 50 consumers 1363112 bytes >> > > > > > >> > > > > > > > >>>>>>> 60 consumers 1598621 bytes >> > > > > > >> > > > > > > > >>>>>>> 70 consumers 1837359 bytes >> > > > > > >> > > > > > > > >>>>>>> 80 consumers 2066934 bytes >> > > > > > >> > > > > > > > >>>>>>> 90 consumers 2310970 bytes >> > > > > > >> > > > > > > > >>>>>>> 100 consumers 2542735 bytes >> > > > > > >> > > > > > > > >>>>>>> >> > > > > > >> > > > > > > > >>>>>>> Note that the growth itself is pretty >> > > gradual. >> > > > > > >> Plotting >> > > > > > >> > > the >> > > > > > >> > > > > > > > >> points >> > > > > > >> > > > > > > > >>>>> makes >> > > > > > >> > > > > > > > >>>>>> it >> > > > > > >> > > > > > > > >>>>>>> look roughly linear w.r.t the number of >> > > > > consumers: >> > > > > > >> > > > > > > > >>>>>>> >> > > > > > >> > > > > > > > >>>>>>> >> > > > > > >> > > > > > > > >>>>>> >> > > > > > >> > > > > > > > >>>>> >> > > > > > >> > > > > > > > >>>> >> > > > > > >> > > > > > > > >>> >> > > > > > >> > > > > > > > >> >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > > > >> > > > >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> >> > > > > > >> > > > > >> > > > >> > > >> > >> https://www.wolframalpha.com/input/?i=(1,+54739),+(5,+261524),+(10,+459804),+(20,+702499),+(30,+930525),+(40,+1115657),+(50,+1363112),+(60,+1598621),+(70,+1837359),+(80,+2066934),+(90,+2310970),+(100,+2542735) >> > > > > > >> > > > > > > > >>>>>>> >> > > > > > >> > > > > > > > >>>>>>> Also note that these numbers aren't >> > averages >> > > > or >> > > > > > >> medians >> > > > > > >> > > or >> > > > > > >> > > > > > > > >> anything >> > > > > > >> > > > > > > > >>>>> like >> > > > > > >> > > > > > > > >>>>>>> that. It's just the byte size from a >> given >> > > > run. >> > > > > I >> > > > > > >> did >> > > > > > >> > run >> > > > > > >> > > > > them >> > > > > > >> > > > > > a >> > > > > > >> > > > > > > > >>> few >> > > > > > >> > > > > > > > >>>>>> times >> > > > > > >> > > > > > > > >>>>>>> and saw similar results. >> > > > > > >> > > > > > > > >>>>>>> >> > > > > > >> > > > > > > > >>>>>>> Impact: >> > > > > > >> > > > > > > > >>>>>>> Even after adding gzip to the >> > > > __consumer_offsets >> > > > > > >> topic >> > > > > > >> > > with >> > > > > > >> > > > > my >> > > > > > >> > > > > > > > >>>> pending >> > > > > > >> > > > > > > > >>>>>>> KAFKA-3718 patch, the AwaitingSync >> phase >> > of >> > > > the >> > > > > > >> group >> > > > > > >> > > fails >> > > > > > >> > > > > > with >> > > > > > >> > > > > > > > >>>>>>> RecordTooLargeException. This means the >> > > > combined >> > > > > > >> size >> > > > > > >> > of >> > > > > > >> > > > each >> > > > > > >> > > > > > > > >>>> member's >> > > > > > >> > > > > > > > >>>>>>> subscriptions and assignments exceeded >> the >> > > > > > >> > > > > > > > >>>> KafkaConfig.messageMaxBytes >> > > > > > >> > > > > > > > >>>>> of >> > > > > > >> > > > > > > > >>>>>>> 1000012 bytes. The group ends up dying. >> > > > > > >> > > > > > > > >>>>>>> >> > > > > > >> > > > > > > > >>>>>>> Options: >> > > > > > >> > > > > > > > >>>>>>> 1. Config change: reduce the number of >> > > > consumers >> > > > > > in >> > > > > > >> the >> > > > > > >> > > > > group. >> > > > > > >> > > > > > > > >> This >> > > > > > >> > > > > > > > >>>>> isn't >> > > > > > >> > > > > > > > >>>>>>> always a realistic answer in more >> > strenuous >> > > > use >> > > > > > >> cases >> > > > > > >> > > like >> > > > > > >> > > > > > > > >>>> MirrorMaker >> > > > > > >> > > > > > > > >>>>>>> clusters or for auditing. >> > > > > > >> > > > > > > > >>>>>>> 2. Config change: split the group into >> > > smaller >> > > > > > >> groups >> > > > > > >> > > which >> > > > > > >> > > > > > > > >>> together >> > > > > > >> > > > > > > > >>>>> will >> > > > > > >> > > > > > > > >>>>>>> get full coverage of the topics. This >> > gives >> > > > each >> > > > > > >> group >> > > > > > >> > > > > member a >> > > > > > >> > > > > > > > >>>> smaller >> > > > > > >> > > > > > > > >>>>>>> subscription.(ex: g1 has topics >> starting >> > > with >> > > > > a-m >> > > > > > >> while >> > > > > > >> > > g2 >> > > > > > >> > > > > has >> > > > > > >> > > > > > > > >>> topics >> > > > > > >> > > > > > > > >>>>>>> starting ith n-z). This would be >> > > operationally >> > > > > > >> painful >> > > > > > >> > to >> > > > > > >> > > > > > manage. >> > > > > > >> > > > > > > > >>>>>>> 3. Config change: split the topics >> among >> > > > members >> > > > > > of >> > > > > > >> the >> > > > > > >> > > > > group. >> > > > > > >> > > > > > > > >>> Again >> > > > > > >> > > > > > > > >>>>> this >> > > > > > >> > > > > > > > >>>>>>> gives each group member a smaller >> > > > subscription. >> > > > > > This >> > > > > > >> > > would >> > > > > > >> > > > > also >> > > > > > >> > > > > > > > >> be >> > > > > > >> > > > > > > > >>>>>>> operationally painful to manage. >> > > > > > >> > > > > > > > >>>>>>> 4. Config change: bump up >> > > > > > >> KafkaConfig.messageMaxBytes >> > > > > > >> > (a >> > > > > > >> > > > > > > > >>> topic-level >> > > > > > >> > > > > > > > >>>>>>> config) and >> > KafkaConfig.replicaFetchMaxBytes >> > > > (a >> > > > > > >> > > > broker-level >> > > > > > >> > > > > > > > >>> config). >> > > > > > >> > > > > > > > >>>>>>> Applying messageMaxBytes to just the >> > > > > > >> __consumer_offsets >> > > > > > >> > > > topic >> > > > > > >> > > > > > > > >> seems >> > > > > > >> > > > > > > > >>>>>>> relatively harmless, but bumping up the >> > > > > > broker-level >> > > > > > >> > > > > > > > >>>>> replicaFetchMaxBytes >> > > > > > >> > > > > > > > >>>>>>> would probably need more attention. >> > > > > > >> > > > > > > > >>>>>>> 5. Config change: try different >> > compression >> > > > > > codecs. >> > > > > > >> > Based >> > > > > > >> > > > on >> > > > > > >> > > > > 2 >> > > > > > >> > > > > > > > >>>> minutes >> > > > > > >> > > > > > > > >>>>> of >> > > > > > >> > > > > > > > >>>>>>> googling, it seems like lz4 and snappy >> are >> > > > > faster >> > > > > > >> than >> > > > > > >> > > gzip >> > > > > > >> > > > > but >> > > > > > >> > > > > > > > >>> have >> > > > > > >> > > > > > > > >>>>>> worse >> > > > > > >> > > > > > > > >>>>>>> compression, so this probably won't >> help. >> > > > > > >> > > > > > > > >>>>>>> 6. Implementation change: support >> sending >> > > the >> > > > > > regex >> > > > > > >> > over >> > > > > > >> > > > the >> > > > > > >> > > > > > wire >> > > > > > >> > > > > > > > >>>>> instead >> > > > > > >> > > > > > > > >>>>>>> of the fully expanded topic >> > subscriptions. I >> > > > > think >> > > > > > >> > people >> > > > > > >> > > > > said >> > > > > > >> > > > > > in >> > > > > > >> > > > > > > > >>> the >> > > > > > >> > > > > > > > >>>>>> past >> > > > > > >> > > > > > > > >>>>>>> that different languages have subtle >> > > > differences >> > > > > > in >> > > > > > >> > > regex, >> > > > > > >> > > > so >> > > > > > >> > > > > > > > >> this >> > > > > > >> > > > > > > > >>>>>> doesn't >> > > > > > >> > > > > > > > >>>>>>> play nicely with cross-language groups. >> > > > > > >> > > > > > > > >>>>>>> 7. Implementation change: maybe we can >> > > reverse >> > > > > the >> > > > > > >> > > mapping? >> > > > > > >> > > > > > > > >> Instead >> > > > > > >> > > > > > > > >>>> of >> > > > > > >> > > > > > > > >>>>>>> mapping from member to subscriptions, >> we >> > can >> > > > > map a >> > > > > > >> > > > > subscription >> > > > > > >> > > > > > > > >> to >> > > > > > >> > > > > > > > >>> a >> > > > > > >> > > > > > > > >>>>> list >> > > > > > >> > > > > > > > >>>>>>> of members. >> > > > > > >> > > > > > > > >>>>>>> 8. Implementation change: maybe we can >> try >> > > to >> > > > > > break >> > > > > > >> > apart >> > > > > > >> > > > the >> > > > > > >> > > > > > > > >>>>>> subscription >> > > > > > >> > > > > > > > >>>>>>> and assignments from the same >> > > SyncGroupRequest >> > > > > > into >> > > > > > >> > > > multiple >> > > > > > >> > > > > > > > >>> records? >> > > > > > >> > > > > > > > >>>>>> They >> > > > > > >> > > > > > > > >>>>>>> can still go to the same message set >> and >> > get >> > > > > > >> appended >> > > > > > >> > > > > together. >> > > > > > >> > > > > > > > >>> This >> > > > > > >> > > > > > > > >>>>> way >> > > > > > >> > > > > > > > >>>>>>> the limit become the segment size, >> which >> > > > > shouldn't >> > > > > > >> be a >> > > > > > >> > > > > > problem. >> > > > > > >> > > > > > > > >>> This >> > > > > > >> > > > > > > > >>>>> can >> > > > > > >> > > > > > > > >>>>>>> be tricky to get right because we're >> > > currently >> > > > > > >> keying >> > > > > > >> > > these >> > > > > > >> > > > > > > > >>> messages >> > > > > > >> > > > > > > > >>>> on >> > > > > > >> > > > > > > > >>>>>> the >> > > > > > >> > > > > > > > >>>>>>> group, so I think records from the same >> > > > > rebalance >> > > > > > >> might >> > > > > > >> > > > > > > > >>> accidentally >> > > > > > >> > > > > > > > >>>>>>> compact one another, but my >> understanding >> > of >> > > > > > >> compaction >> > > > > > >> > > > isn't >> > > > > > >> > > > > > > > >> that >> > > > > > >> > > > > > > > >>>>> great. >> > > > > > >> > > > > > > > >>>>>>> >> > > > > > >> > > > > > > > >>>>>>> Todo: >> > > > > > >> > > > > > > > >>>>>>> It would be interesting to rerun the >> tests >> > > > with >> > > > > no >> > > > > > >> > > > > compression >> > > > > > >> > > > > > > > >> just >> > > > > > >> > > > > > > > >>>> to >> > > > > > >> > > > > > > > >>>>>> see >> > > > > > >> > > > > > > > >>>>>>> how much gzip is helping but it's >> getting >> > > > late. >> > > > > > >> Maybe >> > > > > > >> > > > > tomorrow? >> > > > > > >> > > > > > > > >>>>>>> >> > > > > > >> > > > > > > > >>>>>>> - Onur >> > > > > > >> > > > > > > > >>>>>>> >> > > > > > >> > > > > > > > >>>>>> >> > > > > > >> > > > > > > > >>>>> >> > > > > > >> > > > > > > > >>>> >> > > > > > >> > > > > > > > >>>> >> > > > > > >> > > > > > > > >>>> >> > > > > > >> > > > > > > > >>>> -- >> > > > > > >> > > > > > > > >>>> -- Guozhang >> > > > > > >> > > > > > > > >>>> >> > > > > > >> > > > > > > > >>> >> > > > > > >> > > > > > > > >> >> > > > > > >> > > > > > > > >> >> > > > > > >> > > > > > > > >> >> > > > > > >> > > > > > > > >> -- >> > > > > > >> > > > > > > > >> -- Guozhang >> > > > > > >> > > > > > > > >> >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > > >> > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > > > >> > > > >> > > > > > >> > > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > > >> > -- >> > > > > > >> > -- Guozhang >> > > > > > >> > >> > > > > > >> >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >