> On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > Thanks for the patch. Some comments. > > > > 1. KafkaApis: The way that we handle OffsetCommit request pollutes > > handleProducerRequest() and DelayedProduce. It would be great if we can > > isolate the logic better. I was thinking that we can do the following. > > First, only support ack=-1 when writing to the offset topic. This way, > > there is only one place where the response for a produce request to offset > > topic will be sent--DelayedProduce. Second, introduce an > > OffsetCommitDelayedProduce that extends DelayedProduce and overrides the > > respond() function. Then, when handling OffsetCommit request, we first > > create a ProduceRequest and pass it to handleProducerRequest() with a flag > > to indicate whether we should use DelayedProduce or > > OffsetCommitDelayedProduce. > > > > 2. ConsumerOffsetChecker: Do we need to provide an option to read the > > consumed offset from ZK so that for those consumers that haven't migrated > > off ZK, we can still check its lag? > > > > 3. OffsetMetadataAndError.scala should be renamed to > > OffsetAndMetadata.scala. > > > > 4. TestOffsetManager: Could you add some comments on what this test does? > > Also, the file is missing the license header. > > > > 5. offset_management_testsuite: The new suite seems to be just running a > > few consumers, bouncing them and validating the consumed data matching the > > produced. Instead of introducing a new test suite, could we just reuse the > > existing mirror maker test suite and maybe add a new test case there? If we > > do need this test suite, could we add some comments on what the test suite > > does? > > > > 6. OffsetFetchRequest: On the server side, if no partition is specified in > > the request, we fetch the offset for all partitions in this consumer group. > > It's not clear where we use this feature on the client side. If this > > feature is indeed needed, could we document it in the comment?
For (1) - yeah I think we can mandate ack = -1. The main reason I thought we might want to leave it configurable is if a user wants a stronger guarantee. i.e., acks > 1. WRT the suggestion on OffsetCommitDelayedProduce - that was actually how I first implemented it, but ended up removing it (https://gist.github.com/anonymous/9082577). I just felt that the current code is a bit clearer than what I had previously. However, I can revisit this. (2) yes that is a good point. (3) Ok - although it does have a OffsetMetadataAndError class in it as well. OffsetAndMetadata is used for commits and OffsetMetadataAndError is used for responses to offset fetches. I felt it was weird to have an error code in the commit request even if we provide a default that the user does not need to specify. (4) I'm not planning to check this in. I had it in there to do some stress testing. (5) We could - I just wanted to isolate this test for now. I need to do simultaneous bounces of the broker and the consumers. I also wanted to have more consumer instances than we have in the mirror maker suite. Finally I didn't really need a target cluster - i.e., this test would run faster than an equivalent mirror-maker suite test. (6) Yes I had originally intended this for use by tools (e.g., export offsets) but I think it is useful to have in general. > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala, lines 41-42 > > <https://reviews.apache.org/r/18022/diff/1/?file=483574#file483574line41> > > > > Do we really need two coordinators? I kept these separate even though we are planning to use the offset coordinator as the consumer coordinator because I feel it is very unintuitive that the consumer coordinator decision is driven off the offsets topic. That said, I will make this expose only a single coordinator and if we ever want to use a separate consumer coordinator then we can revisit. > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/client/ClientUtils.scala, lines 142-146 > > <https://reviews.apache.org/r/18022/diff/1/?file=483579#file483579line142> > > > > Stream.continually seems to be an un-intuitive way to implement a while > > loop. As I wrote - I actually think it is intuitive and convenient once you use it a couple of times. It also reduces the use of vars and forces you to validate all code paths that assign a value to that val. However, I'm not sure about its overheads so I'm planning to remove it. > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/client/ClientUtils.scala, lines 200-203 > > <https://reviews.apache.org/r/18022/diff/1/?file=483579#file483579line200> > > > > The check on channel.isConnected in line 201 is already done inside > > BlockingChannel.disconnect(). > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/consumer/ConsumerConfig.scala, lines 44-45 > > <https://reviews.apache.org/r/18022/diff/1/?file=483585#file483585line44> > > > > We can probably use a larger backoff and socket timeout. The intuition > > is that if the server is somehow slow in processing the request, resending > > the requests too quickly is just going to make the server worse. How about > > 20secs timeout and 500ms backoff? Makes sense. > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, lines > > 279-280 > > <https://reviews.apache.org/r/18022/diff/1/?file=483587#file483587line279> > > > > Shouldn't we specify the offset commit timestamp here? We could. However, I'm setting the timestamp on receiving the request at the broker. Part of the reason for this was that an earlier version of this patch depended on a correct timestamp while loading offsets (which I don't depend on anymore). With client-side timestamps we are exposed to client-side errors and clock skews so I preferred having the broker set the same timestamp on all entries in a given OffsetCommitRequest. > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, lines > > 286-287 > > <https://reviews.apache.org/r/18022/diff/1/?file=483587#file483587line286> > > > > We can probably rename connectOffsetManager() to sth like > > ensureOffsetManagerConnected() and get rid of the if test here. Makes sense. > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/KafkaConfig.scala, lines 255-256 > > <https://reviews.apache.org/r/18022/diff/1/?file=483591#file483591line255> > > > > Could we add a comment to explain how offsetsBackupDisabledGroups will > > be used? Do we expect to upgrade all consumers in those consumer groups > > first and then add those groups to this list? Yes - and it is intended mainly for heavy-hitters such as mirror makers. Will add a comment. > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/KafkaConfig.scala, lines 266-267 > > <https://reviews.apache.org/r/18022/diff/1/?file=483591#file483591line266> > > > > Do we really need to make this configurable? It seems that we should > > always use ack=-1. Had a comment on this above in the overall summary. > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/OffsetManager.scala, lines 79-85 > > <https://reviews.apache.org/r/18022/diff/1/?file=483593#file483593line79> > > > > The registered mbean already includes the class name. Do we need to add > > the "OffsetManager" prefix in the sensor name? Will remove. > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/OffsetManager.scala, lines 352-353 > > <https://reviews.apache.org/r/18022/diff/1/?file=483593#file483593line352> > > > > Would it be better to call this topic "__consumer_offsets"? Yes - I think that is better. > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/cluster/Partition.scala, lines 188-189 > > <https://reviews.apache.org/r/18022/diff/1/?file=483580#file483580line188> > > > > These probably need to be done before we set leaderReplicaIdOpt. > > Otherwise, the OffsetFetchRequest may not realize that the cached offset is > > stale. Yes it should be moved earlier. However, I don't think it will give a stale offset (since we clear out offsets on becoming a follower). It will not find an entry though which is wrong. > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, lines > > 297-298 > > <https://reviews.apache.org/r/18022/diff/1/?file=483587#file483587line297> > > > > What about other types of errors? It seems that if the error code is > > not NoError, we should return false. Yes this is incomplete. i.e., I have a TODO for this. > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/KafkaApis.scala, lines 226-229 > > <https://reviews.apache.org/r/18022/diff/1/?file=483590#file483590line226> > > > > Why can't the caller convert an OffsetCommitRequest to a > > ProduceRequest? Then we don't have to change the api for > > handleProducerRequest(). I need to pass in the original offset commit request since I need access to things such as the group-id (which is passed on to the OffsetManager). I also need the original offset commit request map since I may need to set an OffsetMetadataTooLarge error code on some partitions. (i.e., how to handle large metadata comes into play here - whether to reject the entire request or allow a partial commit, or just truncate the metadata to the max). My preference would actually be to fail the entire commit, and I think it would simplify some of this code. > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/KafkaApis.scala, lines 723-727 > > <https://reviews.apache.org/r/18022/diff/1/?file=483590#file483590line723> > > > > It would be better to use replicaManager.getLeaderReplicaIfLocal() to > > see if the leader is local, since it's the source of truth. We could do that, although it is redundant (since that check already happens in the handleProducerOrCommitRequest). That said, I think I can in fact just delete this code and have all the work done by the handleProducerOrCommitRequest. > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/KafkaApis.scala, lines 756-757 > > <https://reviews.apache.org/r/18022/diff/1/?file=483590#file483590line756> > > > > Such check is already in offsetManager.fetchOffsets(). Do we need the > > check here? We don't - and in fact I think the returned error code is inaccurate. It should be NotLeaderForPartitionCode. i.e., OffsetMetadataAndError.OffsetManagerNotLocal > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/KafkaConfig.scala, lines 254-261 > > <https://reviews.apache.org/r/18022/diff/1/?file=483591#file483591line254> > > > > Could we add the comment for each new property? Will do - actually I put the comments in OffsetManagerConfig. > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/OffsetManager.scala, lines 172-174 > > <https://reviews.apache.org/r/18022/diff/1/?file=483593#file483593line172> > > > > Is this correct? It could also be that the offset was never > > checkpointed. If it was never checkpointed then I currently return this. It is odd, but that's a TODO for offset-management-specific error codes. Ideally it should be a new "NoOffset" error code. > On Feb. 17, 2014, 7:16 p.m., Jun Rao wrote: > > core/src/main/scala/kafka/server/OffsetManager.scala, lines 401-402 > > <https://reviews.apache.org/r/18022/diff/1/?file=483593#file483593line401> > > > > Perhaps we should throw an exception if version is not expected? I'll need to think through this a little more. We definitely want to do some versioning of the messages - or we will be stuck with the current offset storage format indefinitely. - Joel ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/18022/#review34549 ----------------------------------------------------------- On Feb. 12, 2014, 7:50 p.m., Joel Koshy wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/18022/ > ----------------------------------------------------------- > > (Updated Feb. 12, 2014, 7:50 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1012 > https://issues.apache.org/jira/browse/KAFKA-1012 > > > Repository: kafka > > > Description > ------- > > I picked up most of Tejas' patch and made various edits for review here as I > would like this to be completed and closed. > > Here is a link to the original implementation wiki: > https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management > > A lot of it is the same in this revision of the patch, but there is a bunch > of refactoring. This patch does not use an "embedded producer" in the > consumer. i.e., the consumer issues offset commit/fetch requests directly to > the broker. Also, I decided against doing any kind of request forwarding and > added a "ConsumerMetadataRequest" that will be used to determine the offset > coordinator (and subsequently group coordinator that may be useful for the > client rewrite - see > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design). > Also, there were some questions on how to support multiple offset manager > implementations cleanly. After thinking about it I think it makes the code > simpler and clearer if we just have one good offset storage mechanism (i.e., > Kafka-based). Consumers that want to store offsets elsewhere can do so on > their own. (However, if we do want to do this somewhat cleanly, see the > discussion on separation of APIs below.) > > Here is a quick recap of how offset management within Kafka works: > - A special __offsets topic holds consumer offsets. > - The consumer group serves as the partition key for offsets committed to > the __offsets topic. i.e., all offsets for all topics that a group > consumes will be in a single partition of the offsets topic. > - The "group-topic-partition" is the actual (stored) key in each message of > the offsets topic. This facilitates de-duplication (and thus removal) of > older offsets. > - The offset manager also contains an in-memory cache of offsets so it can > serve offset fetch requests quickly. > - Think of commits as a little more than a produce request. If and only if > the commit is appended to the __offsets log as a regular produce request > we update the offsets cache. So the semantics are identical to a produce > request. Offset fetches return whatever is in the cache. If it is absent, > and offsets have not yet been loaded from the logs into the cache (on > becoming a leader), then we return an "OffsetsLoading" error code. > > (Tejas' wiki has pretty good diagrams that describe the above.) > > Some more details: > > - Atomicity per-commit: One drawback of the Zookeeper-based offset commits > is that we when we commit multiple offsets (since we don't use > multi-write) we have to write offsets serially so it is not atomic. In > this implementation I went with Jun's suggestion on using a compressed > message set. This ensures that we will disallow partial commits of a bulk > commit. I have hard-coded this to GZIP but maybe it is better to just > expose a config. Another option is to introduce an identity compression > codec. > - The main corner cases to consider are when there is leader movement due to > broker failures and simultaneous offset commits/fetches. Offset fetches > would only occur if there are consumer-side rebalances or shutdowns. The > guarantees we want to provide are: (i) successfully acknowledged offset > commits should be returned on the next offset fetch - i.e., should not be > lost (ii) offset fetches should never return a stale offset. > - On becoming a follower of an offsets topic partition: > - Partition.makeFollower clears the offset cache of entries belonging to > this partition of __offsets. > - Any subsequent offset fetch request will find out that the partition > is no longer a leader and fail. There is one problem in the existing > patch which I will highlight in the RB along with a suggested fix. > - Likewise, any subsequent offset commit request will fail (since the > underlying producer request will fail). It is okay if the underlying > producer request succeeds and the broker becomes a follower for that > partition just before the offset cache is updated (since the broker > will not serve any OffsetFetchRequests for that partition until it > becomes a leader again). > - On becoming a leader of an offsets topic partition: > - Partition.makeLeader: will load the offsets from the log > (asynchronously). While this is in progress, the broker rejects offset > fetches to this partition. Offset commits may continue to arrive - > i.e., will be appended to the log and then written to the cache. The > load loop might actually overwrite it with an earlier offset from the > log but that is okay - since it will eventually reach the more recent > update in the log and load that into the cache. > > Migrating from ZooKeeper-based offset storage to Kafka-based offset storage: > - The broker config should set offsets.backup.enabled=true > - Upgrade the brokers to the latest jar. (Consumers still commit > directly to ZooKeeper). > - Start migrating the consumers over. > - Consumers will now start sending offset commits to the broker. Since the > backup setting is enabled, offsets will also be committed to ZooKeeper. > This is necessary when migrating consumers. > - After _all_ consumers have moved over you can turn off the backup. > > I have made a number of preliminary comments as TODOs in the RB myself (i.e., > as a note to myself and others reviewing). > > Questions/comments for discussion > - Should we explicitly disallow changes to the number of offset topic > partitions? > This is necessary (or at least prompt with a warning) since changing the > number > of partitions would affect the partitioning strategy. > - Should we remove per-partition error codes for offset commits and use just > a global error code for the entire request? I'm using compressed message > sets for commits. i.e., the log append for a given commit will either > fail entirely or succeed entirely. The OffsetCommitResponse contains > per-partition error codes. So if the log append fails for any reason the > same error code would apply for all partitions. i.e., it is sufficient to > have a global error code. I think we currently have per-partition error > codes due to the fact that offset commit requests can include metadata for > each offset. The per-partition error code is set to MetadataTooLarge if > the metadata entry exceeds the MaxMetadataLength. However, in this case I > would prefer to just fail the entire request as opposed to doing partial > commits (as I am in the current patch). Anyone have thoughts on this? > - Error codes: right now I'm using existing error codes (with the exception > of OffsetsLoading). It may be better to return more specific error codes > but I'm not sure if it matters - since the client-side implementation > needs to check for _any_ error and if any error exists (other than > MetadataTooLarge) just retry the offset commit/fetch until it succeeds. > i.e., the client should not really care about the actual error. If people > have any strong preference on this let me know. > - Separation of APIs: Right now, the offset manager, replica manager are > intertwined which is less than ideal. It is okay if offset manager depends > on replica manager but not the other way around. Ideally, I would like to > have KafkaApis hand off offset commit/fetch requests to the offset manager > which then handles it. However, the inter-dependence comes about due to > the need to clear out the offset cache on becoming a follower and the need > to load offsets on becoming a leader. I think we can improve the > separation as follows: > - Don't optimistically load offsets/clear offsets on a leader/follower > transition. Instead, load offsets only when an offset fetch request > arrives for a partition that had not been loaded yet. > - The OffsetManager will need to maintain a Map[partition -> > lastKnownLeaderEpoch] to determine whether to load offsets or not. > - The above will eliminate the reference to OffsetManager from > ReplicaManager. KafkaApis still needs to reference the OffsetManager and > will need to create the offset commit message to append to the __offsets > log. > - We can actually avoid the need for KafkaApis to know about offset commit > messsages as well: in order to do that, we will need to create a > "DurableLog" layer on top of LogManager and move all the purgatory stuff > in there. The LogManager supports appends/reads from the local log, but > does not know anything about the replicas. Instead, we can have a > DurableLog layer that depends on ReplicaManager and LogManager and > contains the Producer/Fetch-Request purgatories. So OffsetManager will > need to depend on this DurableLog component. So KafkaApis can just hand > off ProducerRequests, FetchRequests to the DurableLog layer directly. It > will hand off OffsetCommit/OffsetFetch requests to the OffsetManager > which will then hand it off to the DurableLog layer. > - Is the above worth it? I'm not sure it is, especially if we are sticking > to only one offset management implementation. > > > Diffs > ----- > > core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala PRE-CREATION > core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala PRE-CREATION > core/src/main/scala/kafka/api/OffsetCommitRequest.scala 4d1fa5c > core/src/main/scala/kafka/api/OffsetCommitResponse.scala 9e1795f > core/src/main/scala/kafka/api/OffsetFetchRequest.scala 7036532 > core/src/main/scala/kafka/api/RequestKeys.scala c81214f > core/src/main/scala/kafka/client/ClientUtils.scala 1d2f81b > core/src/main/scala/kafka/cluster/Partition.scala 1087a2e > core/src/main/scala/kafka/common/ErrorMapping.scala b0b5dce > core/src/main/scala/kafka/common/OffsetLoadInProgressException.scala > PRE-CREATION > core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 59608a3 > core/src/main/scala/kafka/consumer/ConsoleConsumer.scala dc066c2 > core/src/main/scala/kafka/consumer/ConsumerConfig.scala e6875d6 > core/src/main/scala/kafka/consumer/SimpleConsumer.scala 6dae149 > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 703b2e2 > core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala 57b9d2a > core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala 570bf31 > core/src/main/scala/kafka/server/KafkaApis.scala ae2df20 > core/src/main/scala/kafka/server/KafkaConfig.scala 3c3aafc > core/src/main/scala/kafka/server/KafkaServer.scala 5e34f95 > core/src/main/scala/kafka/server/OffsetManager.scala PRE-CREATION > core/src/main/scala/kafka/server/ReplicaManager.scala 21bba48 > core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 33d7c2c > core/src/main/scala/kafka/tools/MirrorMaker.scala f0f871c > core/src/main/scala/kafka/tools/TestOffsetManager.scala PRE-CREATION > core/src/main/scala/kafka/utils/Utils.scala a89b046 > core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala 31534ca > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > eb274d1 > core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 6a96d80 > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 1317b4c > sbt 944ebf8 > system_test/mirror_maker/README da53c14 > system_test/mirror_maker/bin/expected.out 0a1bbaf > system_test/mirror_maker/bin/run-test.sh e5e6c08 > system_test/mirror_maker/config/blacklisttest.consumer.properties ff12015 > system_test/mirror_maker/config/mirror_producer.properties aa8be65 > system_test/mirror_maker/config/server_source_1_1.properties 2f070a7 > system_test/mirror_maker/config/server_source_1_2.properties f9353e8 > system_test/mirror_maker/config/server_source_2_1.properties daa01ad > system_test/mirror_maker/config/server_source_2_2.properties be6fdfc > system_test/mirror_maker/config/server_target_1_1.properties d37955a > system_test/mirror_maker/config/server_target_1_2.properties aa7546c > system_test/mirror_maker/config/whitelisttest_1.consumer.properties ff12015 > system_test/mirror_maker/config/whitelisttest_2.consumer.properties f1a902b > system_test/mirror_maker/config/zookeeper_source_1.properties f851796 > system_test/mirror_maker/config/zookeeper_source_2.properties d534d18 > system_test/mirror_maker/config/zookeeper_target.properties 55a7eb1 > system_test/offset_management_testsuite/cluster_config.json PRE-CREATION > system_test/offset_management_testsuite/config/console_consumer.properties > PRE-CREATION > system_test/offset_management_testsuite/config/server.properties > PRE-CREATION > system_test/offset_management_testsuite/config/zookeeper.properties > PRE-CREATION > system_test/offset_management_testsuite/offset_management_test.py > PRE-CREATION > > system_test/offset_management_testsuite/testcase_7001/testcase_7001_properties.json > PRE-CREATION > > system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties > PRE-CREATION > > system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties > PRE-CREATION > > system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties > PRE-CREATION > > system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties > PRE-CREATION > > system_test/offset_management_testsuite/testcase_7002/config/zookeeper_0.properties > PRE-CREATION > > system_test/offset_management_testsuite/testcase_7002/testcase_7002_properties.json > PRE-CREATION > system_test/testcase_to_run.json 8252860 > system_test/utils/kafka_system_test_utils.py fb4a9c0 > system_test/utils/testcase_env.py bee8716 > > Diff: https://reviews.apache.org/r/18022/diff/ > > > Testing > ------- > > > Thanks, > > Joel Koshy > >