> On March 6, 2014, 1:47 a.m., Neha Narkhede wrote:
> > core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala, line 
> > 307
> > <https://reviews.apache.org/r/18022/diff/1-2/?file=483587#file483587line307>
> >
> >     ZookeeperConsumerConnector is getting really large. I think it might be 
> > worth moving this logic out of here, but I realize that will be some 
> > refactoring work. Can we file a JIRA to improve this in the future?
> 
> Joel Koshy wrote:
>     Yeah it's not my favorite file in our code-base. Do you think it is worth 
> filing a jira given that we are beginning a consumer rewrite?

After thinking about this I think it is worth doing anyway - will file a 
follow-up jira if I don't get to it before checking in (wanted to leave some 
time for others to review as well).

So we can add a ConsumerOffsetsChannel class that manages fetching and 
committing offsets. This will be useful in tools (and we can also get rid of 
the currently broken commitOffsets API in SimpleConsumer).


- Joel


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/18022/#review36305
-----------------------------------------------------------


On March 5, 2014, 11:53 p.m., Joel Koshy wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/18022/
> -----------------------------------------------------------
> 
> (Updated March 5, 2014, 11:53 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1012
>     https://issues.apache.org/jira/browse/KAFKA-1012
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> I picked up most of Tejas' patch and made various edits for review here as I
> would like this to be completed and closed.
> 
> Here is a link to the original implementation wiki:
> https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management
> 
> A lot of it is the same in this revision of the patch, but there is a bunch
> of refactoring. This patch does not use an "embedded producer" in the
> consumer. i.e., the consumer issues offset commit/fetch requests directly to
> the broker. Also, I decided against doing any kind of request forwarding and
> added a "ConsumerMetadataRequest" that will be used to determine the offset
> coordinator (and subsequently group coordinator that may be useful for the
> client rewrite - see
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design).
> Also, there were some questions on how to support multiple offset manager
> implementations cleanly. After thinking about it I think it makes the code
> simpler and clearer if we just have one good offset storage mechanism (i.e.,
> Kafka-based). Consumers that want to store offsets elsewhere can do so on
> their own. (However, if we do want to do this somewhat cleanly, see the
> discussion on separation of APIs below.)
> 
> Here is a quick recap of how offset management within Kafka works:
> - A special __offsets topic holds consumer offsets.
> - The consumer group serves as the partition key for offsets committed to
>   the __offsets topic. i.e., all offsets for all topics that a group
>   consumes will be in a single partition of the offsets topic.
> - The "group-topic-partition" is the actual (stored) key in each message of
>   the offsets topic.  This facilitates de-duplication (and thus removal) of
>   older offsets.
> - The offset manager also contains an in-memory cache of offsets so it can
>   serve offset fetch requests quickly.
> - Think of commits as a little more than a produce request. If and only if
>   the commit is appended to the __offsets log as a regular produce request
>   we update the offsets cache. So the semantics are identical to a produce
>   request.  Offset fetches return whatever is in the cache. If it is absent,
>   and offsets have not yet been loaded from the logs into the cache (on
>   becoming a leader), then we return an "OffsetsLoading" error code.
> 
> (Tejas' wiki has pretty good diagrams that describe the above.)
> 
> Some more details:
> 
> - Atomicity per-commit: One drawback of the Zookeeper-based offset commits
>   is that we when we commit multiple offsets (since we don't use
>   multi-write) we have to write offsets serially so it is not atomic.  In
>   this implementation I went with Jun's suggestion on using a compressed
>   message set. This ensures that we will disallow partial commits of a bulk
>   commit. I have hard-coded this to GZIP but maybe it is better to just
>   expose a config. Another option is to introduce an identity compression
>   codec.
> - The main corner cases to consider are when there is leader movement due to
>   broker failures and simultaneous offset commits/fetches. Offset fetches
>   would only occur if there are consumer-side rebalances or shutdowns. The
>   guarantees we want to provide are: (i) successfully acknowledged offset
>   commits should be returned on the next offset fetch - i.e., should not be
>   lost (ii) offset fetches should never return a stale offset.
>   - On becoming a follower of an offsets topic partition:
>     - Partition.makeFollower clears the offset cache of entries belonging to
>       this partition of __offsets.
>     - Any subsequent offset fetch request will find out that the partition
>       is no longer a leader and fail. There is one problem in the existing
>       patch which I will highlight in the RB along with a suggested fix.
>     - Likewise, any subsequent offset commit request will fail (since the
>       underlying producer request will fail). It is okay if the underlying
>       producer request succeeds and the broker becomes a follower for that
>       partition just before the offset cache is updated (since the broker
>       will not serve any OffsetFetchRequests for that partition until it
>       becomes a leader again).
>   - On becoming a leader of an offsets topic partition:
>     - Partition.makeLeader: will load the offsets from the log
>       (asynchronously). While this is in progress, the broker rejects offset
>       fetches to this partition. Offset commits may continue to arrive -
>       i.e., will be appended to the log and then written to the cache. The
>       load loop might actually overwrite it with an earlier offset from the
>       log but that is okay - since it will eventually reach the more recent
>       update in the log and load that into the cache.
> 
> Migrating from ZooKeeper-based offset storage to Kafka-based offset storage:
> - The broker config should set offsets.backup.enabled=true
> - Upgrade the brokers to the latest jar. (Consumers still commit
>   directly to ZooKeeper).
> - Start migrating the consumers over.
> - Consumers will now start sending offset commits to the broker. Since the
>   backup setting is enabled, offsets will also be committed to ZooKeeper.
>   This is necessary when migrating consumers.
> - After _all_ consumers have moved over you can turn off the backup.
> 
> I have made a number of preliminary comments as TODOs in the RB myself (i.e.,
> as a note to myself and others reviewing).
> 
> Questions/comments for discussion
> - Should we explicitly disallow changes to the number of offset topic 
> partitions?
>   This is necessary (or at least prompt with a warning) since changing the 
> number
>   of partitions would affect the partitioning strategy.
> - Should we remove per-partition error codes for offset commits and use just
>   a global error code for the entire request? I'm using compressed message
>   sets for commits.  i.e., the log append for a given commit will either
>   fail entirely or succeed entirely. The OffsetCommitResponse contains
>   per-partition error codes. So if the log append fails for any reason the
>   same error code would apply for all partitions. i.e., it is sufficient to
>   have a global error code. I think we currently have per-partition error
>   codes due to the fact that offset commit requests can include metadata for
>   each offset. The per-partition error code is set to MetadataTooLarge if
>   the metadata entry exceeds the MaxMetadataLength. However, in this case I
>   would prefer to just fail the entire request as opposed to doing partial
>   commits (as I am in the current patch). Anyone have thoughts on this?
> - Error codes: right now I'm using existing error codes (with the exception
>   of OffsetsLoading). It may be better to return more specific error codes
>   but I'm not sure if it matters - since the client-side implementation
>   needs to check for _any_ error and if any error exists (other than
>   MetadataTooLarge) just retry the offset commit/fetch until it succeeds.
>   i.e., the client should not really care about the actual error. If people
>   have any strong preference on this let me know.
> - Separation of APIs: Right now, the offset manager, replica manager are
>   intertwined which is less than ideal. It is okay if offset manager depends
>   on replica manager but not the other way around. Ideally, I would like to
>   have KafkaApis hand off offset commit/fetch requests to the offset manager
>   which then handles it. However, the inter-dependence comes about due to
>   the need to clear out the offset cache on becoming a follower and the need
>   to load offsets on becoming a leader. I think we can improve the
>   separation as follows:
>   - Don't optimistically load offsets/clear offsets on a leader/follower
>     transition. Instead, load offsets only when an offset fetch request
>     arrives for a partition that had not been loaded yet.
>   - The OffsetManager will need to maintain a Map[partition ->
>     lastKnownLeaderEpoch] to determine whether to load offsets or not.
>   - The above will eliminate the reference to OffsetManager from
>     ReplicaManager. KafkaApis still needs to reference the OffsetManager and
>     will need to create the offset commit message to append to the __offsets
>     log.
>   - We can actually avoid the need for KafkaApis to know about offset commit
>     messsages as well: in order to do that, we will need to create a
>     "DurableLog" layer on top of LogManager and move all the purgatory stuff
>     in there. The LogManager supports appends/reads from the local log, but
>     does not know anything about the replicas. Instead, we can have a
>     DurableLog layer that depends on ReplicaManager and LogManager and
>     contains the Producer/Fetch-Request purgatories. So OffsetManager will
>     need to depend on this DurableLog component. So KafkaApis can just hand
>     off ProducerRequests, FetchRequests to the DurableLog layer directly. It
>     will hand off OffsetCommit/OffsetFetch requests to the OffsetManager
>     which will then hand it off to the DurableLog layer.
>   - Is the above worth it? I'm not sure it is, especially if we are sticking
>     to only one offset management implementation.
> 
> 
> Diffs
> -----
> 
>   config/server.properties c9e923a 
>   core/src/main/scala/kafka/admin/TopicCommand.scala dc9b092 
>   core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala PRE-CREATION 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 4d1fa5c 
>   core/src/main/scala/kafka/api/OffsetCommitResponse.scala 9e1795f 
>   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 7036532 
>   core/src/main/scala/kafka/api/RequestKeys.scala c81214f 
>   core/src/main/scala/kafka/client/ClientUtils.scala 1d2f81b 
>   core/src/main/scala/kafka/cluster/Partition.scala 882b6da 
>   
> core/src/main/scala/kafka/common/ConsumerCoordinatorNotAvailableException.scala
>  PRE-CREATION 
>   core/src/main/scala/kafka/common/ErrorMapping.scala b0b5dce 
>   core/src/main/scala/kafka/common/NotCoordinatorForConsumerException.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 59608a3 
>   core/src/main/scala/kafka/common/OffsetsLoadInProgressException.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/common/Topic.scala c1b9f65 
>   core/src/main/scala/kafka/consumer/ConsoleConsumer.scala dc066c2 
>   core/src/main/scala/kafka/consumer/ConsumerConfig.scala e6875d6 
>   core/src/main/scala/kafka/consumer/ConsumerConnector.scala 13c3f77 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 098d6e4 
>   core/src/main/scala/kafka/consumer/TopicCount.scala e332633 
>   core/src/main/scala/kafka/consumer/TopicFilter.scala 4f20823 
>   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 703b2e2 
>   core/src/main/scala/kafka/controller/KafkaController.scala 4deff9d 
>   core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala 57b9d2a 
>   core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala 570bf31 
>   core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java c45c803 
>   core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala 
> 58e83f6 
>   core/src/main/scala/kafka/producer/ProducerConfig.scala 7947b18 
>   core/src/main/scala/kafka/server/KafkaApis.scala 215ac36 
>   core/src/main/scala/kafka/server/KafkaConfig.scala b871843 
>   core/src/main/scala/kafka/server/KafkaServer.scala feb2093 
>   core/src/main/scala/kafka/server/OffsetManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaManager.scala fb759d9 
>   core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 33d7c2c 
>   core/src/main/scala/kafka/tools/DumpLogSegments.scala 14f44d9 
>   core/src/main/scala/kafka/tools/MirrorMaker.scala f0f871c 
>   core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 5e8c56d 
>   core/src/main/scala/kafka/tools/TestOffsetManager.scala PRE-CREATION 
>   core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala eac9af2 
>   core/src/main/scala/kafka/utils/Utils.scala a89b046 
>   core/src/main/scala/kafka/utils/VerifiableProperties.scala b070bb4 
>   core/src/main/scala/kafka/utils/ZkUtils.scala a198628 
>   core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala 31534ca 
>   core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala dbe078c 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> eb274d1 
>   core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala cf2724b 
>   
> core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala 
> 8fe7259 
>   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 6a96d80 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 1317b4c 
>   system_test/mirror_maker/README da53c14 
>   system_test/mirror_maker/bin/expected.out 0a1bbaf 
>   system_test/mirror_maker/bin/run-test.sh e5e6c08 
>   system_test/mirror_maker/config/blacklisttest.consumer.properties ff12015 
>   system_test/mirror_maker/config/mirror_producer.properties aa8be65 
>   system_test/mirror_maker/config/server_source_1_1.properties 2f070a7 
>   system_test/mirror_maker/config/server_source_1_2.properties f9353e8 
>   system_test/mirror_maker/config/server_source_2_1.properties daa01ad 
>   system_test/mirror_maker/config/server_source_2_2.properties be6fdfc 
>   system_test/mirror_maker/config/server_target_1_1.properties d37955a 
>   system_test/mirror_maker/config/server_target_1_2.properties aa7546c 
>   system_test/mirror_maker/config/whitelisttest_1.consumer.properties ff12015 
>   system_test/mirror_maker/config/whitelisttest_2.consumer.properties f1a902b 
>   system_test/mirror_maker/config/zookeeper_source_1.properties f851796 
>   system_test/mirror_maker/config/zookeeper_source_2.properties d534d18 
>   system_test/mirror_maker/config/zookeeper_target.properties 55a7eb1 
>   system_test/offset_management_testsuite/cluster_config.json PRE-CREATION 
>   system_test/offset_management_testsuite/config/console_consumer.properties 
> PRE-CREATION 
>   system_test/offset_management_testsuite/config/server.properties 
> PRE-CREATION 
>   system_test/offset_management_testsuite/config/zookeeper.properties 
> PRE-CREATION 
>   system_test/offset_management_testsuite/offset_management_test.py 
> PRE-CREATION 
>   
> system_test/offset_management_testsuite/testcase_7001/testcase_7001_properties.json
>  PRE-CREATION 
>   
> system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties
>  PRE-CREATION 
>   
> system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties
>  PRE-CREATION 
>   
> system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties
>  PRE-CREATION 
>   
> system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties
>  PRE-CREATION 
>   
> system_test/offset_management_testsuite/testcase_7002/config/zookeeper_0.properties
>  PRE-CREATION 
>   
> system_test/offset_management_testsuite/testcase_7002/testcase_7002_properties.json
>  PRE-CREATION 
>   system_test/utils/kafka_system_test_utils.py 5d2b7df 
>   system_test/utils/testcase_env.py bee8716 
> 
> Diff: https://reviews.apache.org/r/18022/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Joel Koshy
> 
>

Reply via email to