> On Feb. 18, 2014, 3:48 a.m., Tejas Patil wrote: > > My views (which hardly matter !!) about the points in 'Questions/comments > > for discussion' section: > > > > - Change in partition assignment would need sync of offsets across brokers > > and subsequent bootstrap. Would be better to address in a separate jira. > > - +1 for single error code for all offsets. There must be versioning for > > OffsetCommitResponse as in future there might be need to have separate > > error codes. > > - It would be good to have separate error codes so that clients are aware > > of what went wrong. > > - Lazy offset loading: > > - "clear out the offset cache on becoming a follower" : The cleanup > > thread would eventually clean this stuff after the retention period. As per > > on-paper-math, having extra offsets won't eat up much space in the offsets > > cache. There must be a mechanism (Map[partition -> lastKnownLeaderEpoch] > > ??) to figure out that those offsets in the cache are stale and a bootstrap > > is needed for them. As a downside, the old offsets would sit for long in > > the old generation space in heap eating up few MBs worth of space. > > - How would offset manager populate Map[partition -> > > lastKnownLeaderEpoch] ? Its clear that it can't do that all by itself thus > > implying some coupling with other module. > > - DurableLog: I liked the concept. This might touch a lot of places in > > the codebase and would be better to address in a separate jira.
Tejas:- Change in partition assignment would need sync of offsets across brokers and subsequent bootstrap. Would be better to address in a separate jira Can you elaborate on the above? "clear out eh offset cache" - yes that is not strictly needed, but it is cheap. However, we do need to load offsets on a leader change (at least in the current approach which I prefer to doing a lazy load on offset fetch). How would offset manager populate the Map[partition->epoch]. The offset manager still needs to have access to the replica manager. However, the approach I described removes the dependency of the ReplicaManager to OffsetManager (which is an undesirable dependency). > On Feb. 18, 2014, 3:48 a.m., Tejas Patil wrote: > > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 92 > > <https://reviews.apache.org/r/18022/diff/1/?file=483587#file483587line92> > > > > 1. This could be made a 'val' instead of 'var' > > 2. Are entries ever cleared off this pool ? I see that it will keep > > growing after rebalances. It could be a val. Yes you are right that the entries are not cleared. If a consumer regains ownership of a partition it could potentially avoid recommitting offsets that have not moved since it last owned it. The memory requirements are probably not a big concern. The main reason we had this was to reduce the write load on ZooKeeper. Since we are moving to a new implementation we can even remove this altogether although it is still a useful optimization. > On Feb. 18, 2014, 3:48 a.m., Tejas Patil wrote: > > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line > > 168 > > <https://reviews.apache.org/r/18022/diff/1/?file=483587#file483587line168> > > > > There are two different names used possibly for the same thing: offset > > coordinator and offset manager. Might be confusing to someone new top this > > feature. We could collapse the two - coordinator crept in because there is some overlap with the consumer rewrite (and its use of the term consumer coordinator). My thinking was "coordinator" means broker, and offset manager is a component inside the broker that actually manages offsets. > On Feb. 18, 2014, 3:48 a.m., Tejas Patil wrote: > > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line > > 273 > > <https://reviews.apache.org/r/18022/diff/1/?file=483587#file483587line273> > > > > I am wondering why is an infinite loop needed inside offset commit. A > > finite retry counter might help for getting few reattempts. Yes I'm a bit undecided on whether we should retry until a successful commit or use a retry limit. - Joel ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/18022/#review34677 ----------------------------------------------------------- On Feb. 12, 2014, 7:50 p.m., Joel Koshy wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/18022/ > ----------------------------------------------------------- > > (Updated Feb. 12, 2014, 7:50 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1012 > https://issues.apache.org/jira/browse/KAFKA-1012 > > > Repository: kafka > > > Description > ------- > > I picked up most of Tejas' patch and made various edits for review here as I > would like this to be completed and closed. > > Here is a link to the original implementation wiki: > https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management > > A lot of it is the same in this revision of the patch, but there is a bunch > of refactoring. This patch does not use an "embedded producer" in the > consumer. i.e., the consumer issues offset commit/fetch requests directly to > the broker. Also, I decided against doing any kind of request forwarding and > added a "ConsumerMetadataRequest" that will be used to determine the offset > coordinator (and subsequently group coordinator that may be useful for the > client rewrite - see > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design). > Also, there were some questions on how to support multiple offset manager > implementations cleanly. After thinking about it I think it makes the code > simpler and clearer if we just have one good offset storage mechanism (i.e., > Kafka-based). Consumers that want to store offsets elsewhere can do so on > their own. (However, if we do want to do this somewhat cleanly, see the > discussion on separation of APIs below.) > > Here is a quick recap of how offset management within Kafka works: > - A special __offsets topic holds consumer offsets. > - The consumer group serves as the partition key for offsets committed to > the __offsets topic. i.e., all offsets for all topics that a group > consumes will be in a single partition of the offsets topic. > - The "group-topic-partition" is the actual (stored) key in each message of > the offsets topic. This facilitates de-duplication (and thus removal) of > older offsets. > - The offset manager also contains an in-memory cache of offsets so it can > serve offset fetch requests quickly. > - Think of commits as a little more than a produce request. If and only if > the commit is appended to the __offsets log as a regular produce request > we update the offsets cache. So the semantics are identical to a produce > request. Offset fetches return whatever is in the cache. If it is absent, > and offsets have not yet been loaded from the logs into the cache (on > becoming a leader), then we return an "OffsetsLoading" error code. > > (Tejas' wiki has pretty good diagrams that describe the above.) > > Some more details: > > - Atomicity per-commit: One drawback of the Zookeeper-based offset commits > is that we when we commit multiple offsets (since we don't use > multi-write) we have to write offsets serially so it is not atomic. In > this implementation I went with Jun's suggestion on using a compressed > message set. This ensures that we will disallow partial commits of a bulk > commit. I have hard-coded this to GZIP but maybe it is better to just > expose a config. Another option is to introduce an identity compression > codec. > - The main corner cases to consider are when there is leader movement due to > broker failures and simultaneous offset commits/fetches. Offset fetches > would only occur if there are consumer-side rebalances or shutdowns. The > guarantees we want to provide are: (i) successfully acknowledged offset > commits should be returned on the next offset fetch - i.e., should not be > lost (ii) offset fetches should never return a stale offset. > - On becoming a follower of an offsets topic partition: > - Partition.makeFollower clears the offset cache of entries belonging to > this partition of __offsets. > - Any subsequent offset fetch request will find out that the partition > is no longer a leader and fail. There is one problem in the existing > patch which I will highlight in the RB along with a suggested fix. > - Likewise, any subsequent offset commit request will fail (since the > underlying producer request will fail). It is okay if the underlying > producer request succeeds and the broker becomes a follower for that > partition just before the offset cache is updated (since the broker > will not serve any OffsetFetchRequests for that partition until it > becomes a leader again). > - On becoming a leader of an offsets topic partition: > - Partition.makeLeader: will load the offsets from the log > (asynchronously). While this is in progress, the broker rejects offset > fetches to this partition. Offset commits may continue to arrive - > i.e., will be appended to the log and then written to the cache. The > load loop might actually overwrite it with an earlier offset from the > log but that is okay - since it will eventually reach the more recent > update in the log and load that into the cache. > > Migrating from ZooKeeper-based offset storage to Kafka-based offset storage: > - The broker config should set offsets.backup.enabled=true > - Upgrade the brokers to the latest jar. (Consumers still commit > directly to ZooKeeper). > - Start migrating the consumers over. > - Consumers will now start sending offset commits to the broker. Since the > backup setting is enabled, offsets will also be committed to ZooKeeper. > This is necessary when migrating consumers. > - After _all_ consumers have moved over you can turn off the backup. > > I have made a number of preliminary comments as TODOs in the RB myself (i.e., > as a note to myself and others reviewing). > > Questions/comments for discussion > - Should we explicitly disallow changes to the number of offset topic > partitions? > This is necessary (or at least prompt with a warning) since changing the > number > of partitions would affect the partitioning strategy. > - Should we remove per-partition error codes for offset commits and use just > a global error code for the entire request? I'm using compressed message > sets for commits. i.e., the log append for a given commit will either > fail entirely or succeed entirely. The OffsetCommitResponse contains > per-partition error codes. So if the log append fails for any reason the > same error code would apply for all partitions. i.e., it is sufficient to > have a global error code. I think we currently have per-partition error > codes due to the fact that offset commit requests can include metadata for > each offset. The per-partition error code is set to MetadataTooLarge if > the metadata entry exceeds the MaxMetadataLength. However, in this case I > would prefer to just fail the entire request as opposed to doing partial > commits (as I am in the current patch). Anyone have thoughts on this? > - Error codes: right now I'm using existing error codes (with the exception > of OffsetsLoading). It may be better to return more specific error codes > but I'm not sure if it matters - since the client-side implementation > needs to check for _any_ error and if any error exists (other than > MetadataTooLarge) just retry the offset commit/fetch until it succeeds. > i.e., the client should not really care about the actual error. If people > have any strong preference on this let me know. > - Separation of APIs: Right now, the offset manager, replica manager are > intertwined which is less than ideal. It is okay if offset manager depends > on replica manager but not the other way around. Ideally, I would like to > have KafkaApis hand off offset commit/fetch requests to the offset manager > which then handles it. However, the inter-dependence comes about due to > the need to clear out the offset cache on becoming a follower and the need > to load offsets on becoming a leader. I think we can improve the > separation as follows: > - Don't optimistically load offsets/clear offsets on a leader/follower > transition. Instead, load offsets only when an offset fetch request > arrives for a partition that had not been loaded yet. > - The OffsetManager will need to maintain a Map[partition -> > lastKnownLeaderEpoch] to determine whether to load offsets or not. > - The above will eliminate the reference to OffsetManager from > ReplicaManager. KafkaApis still needs to reference the OffsetManager and > will need to create the offset commit message to append to the __offsets > log. > - We can actually avoid the need for KafkaApis to know about offset commit > messsages as well: in order to do that, we will need to create a > "DurableLog" layer on top of LogManager and move all the purgatory stuff > in there. The LogManager supports appends/reads from the local log, but > does not know anything about the replicas. Instead, we can have a > DurableLog layer that depends on ReplicaManager and LogManager and > contains the Producer/Fetch-Request purgatories. So OffsetManager will > need to depend on this DurableLog component. So KafkaApis can just hand > off ProducerRequests, FetchRequests to the DurableLog layer directly. It > will hand off OffsetCommit/OffsetFetch requests to the OffsetManager > which will then hand it off to the DurableLog layer. > - Is the above worth it? I'm not sure it is, especially if we are sticking > to only one offset management implementation. > > > Diffs > ----- > > core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala PRE-CREATION > core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala PRE-CREATION > core/src/main/scala/kafka/api/OffsetCommitRequest.scala 4d1fa5c > core/src/main/scala/kafka/api/OffsetCommitResponse.scala 9e1795f > core/src/main/scala/kafka/api/OffsetFetchRequest.scala 7036532 > core/src/main/scala/kafka/api/RequestKeys.scala c81214f > core/src/main/scala/kafka/client/ClientUtils.scala 1d2f81b > core/src/main/scala/kafka/cluster/Partition.scala 1087a2e > core/src/main/scala/kafka/common/ErrorMapping.scala b0b5dce > core/src/main/scala/kafka/common/OffsetLoadInProgressException.scala > PRE-CREATION > core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 59608a3 > core/src/main/scala/kafka/consumer/ConsoleConsumer.scala dc066c2 > core/src/main/scala/kafka/consumer/ConsumerConfig.scala e6875d6 > core/src/main/scala/kafka/consumer/SimpleConsumer.scala 6dae149 > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 703b2e2 > core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala 57b9d2a > core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala 570bf31 > core/src/main/scala/kafka/server/KafkaApis.scala ae2df20 > core/src/main/scala/kafka/server/KafkaConfig.scala 3c3aafc > core/src/main/scala/kafka/server/KafkaServer.scala 5e34f95 > core/src/main/scala/kafka/server/OffsetManager.scala PRE-CREATION > core/src/main/scala/kafka/server/ReplicaManager.scala 21bba48 > core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 33d7c2c > core/src/main/scala/kafka/tools/MirrorMaker.scala f0f871c > core/src/main/scala/kafka/tools/TestOffsetManager.scala PRE-CREATION > core/src/main/scala/kafka/utils/Utils.scala a89b046 > core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala 31534ca > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > eb274d1 > core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 6a96d80 > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 1317b4c > sbt 944ebf8 > system_test/mirror_maker/README da53c14 > system_test/mirror_maker/bin/expected.out 0a1bbaf > system_test/mirror_maker/bin/run-test.sh e5e6c08 > system_test/mirror_maker/config/blacklisttest.consumer.properties ff12015 > system_test/mirror_maker/config/mirror_producer.properties aa8be65 > system_test/mirror_maker/config/server_source_1_1.properties 2f070a7 > system_test/mirror_maker/config/server_source_1_2.properties f9353e8 > system_test/mirror_maker/config/server_source_2_1.properties daa01ad > system_test/mirror_maker/config/server_source_2_2.properties be6fdfc > system_test/mirror_maker/config/server_target_1_1.properties d37955a > system_test/mirror_maker/config/server_target_1_2.properties aa7546c > system_test/mirror_maker/config/whitelisttest_1.consumer.properties ff12015 > system_test/mirror_maker/config/whitelisttest_2.consumer.properties f1a902b > system_test/mirror_maker/config/zookeeper_source_1.properties f851796 > system_test/mirror_maker/config/zookeeper_source_2.properties d534d18 > system_test/mirror_maker/config/zookeeper_target.properties 55a7eb1 > system_test/offset_management_testsuite/cluster_config.json PRE-CREATION > system_test/offset_management_testsuite/config/console_consumer.properties > PRE-CREATION > system_test/offset_management_testsuite/config/server.properties > PRE-CREATION > system_test/offset_management_testsuite/config/zookeeper.properties > PRE-CREATION > system_test/offset_management_testsuite/offset_management_test.py > PRE-CREATION > > system_test/offset_management_testsuite/testcase_7001/testcase_7001_properties.json > PRE-CREATION > > system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties > PRE-CREATION > > system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties > PRE-CREATION > > system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties > PRE-CREATION > > system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties > PRE-CREATION > > system_test/offset_management_testsuite/testcase_7002/config/zookeeper_0.properties > PRE-CREATION > > system_test/offset_management_testsuite/testcase_7002/testcase_7002_properties.json > PRE-CREATION > system_test/testcase_to_run.json 8252860 > system_test/utils/kafka_system_test_utils.py fb4a9c0 > system_test/utils/testcase_env.py bee8716 > > Diff: https://reviews.apache.org/r/18022/diff/ > > > Testing > ------- > > > Thanks, > > Joel Koshy > >