-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/18022/
-----------------------------------------------------------
(Updated March 13, 2014, 11:03 p.m.)
Review request for kafka.
Changes
-------
Key changes in this revision:
- I moved the dual-commit (offsets.backup) concept to the consumer. The
reason for this is that if a consumer that consumes several partitions
migrates to Kafka-based offset storage then the initial lookup in Kafka
can take a long time (since the broker will have to look up those
partitions serially from ZooKeeper). If we have several consumers doing
the same thing it could cause a DoS on the socket server. So consumers
will have to specify offsets.storage=kafka and set
dual.commit.enabled=true to start migrating. During this phase, commits
will go to both ZooKeeper and Kafka. For offset fetches, the consumer will
look up ZooKeeper and query Kafka and pick the latest (greater) offset.
After all instances in the group have moved over, dual.commit.enabled can
be set to false.
- Added a kafka/zookeeper commit meter to help operations determine if
consumers have moved over to Kafka for offsets storage.
- I also changed the offset load process to avoid an unnecessary copy for
faster load times.
Bugs: KAFKA-1012
https://issues.apache.org/jira/browse/KAFKA-1012
Repository: kafka
Description
-------
I picked up most of Tejas' patch and made various edits for review here as I
would like this to be completed and closed.
Here is a link to the original implementation wiki:
https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management
A lot of it is the same in this revision of the patch, but there is a bunch
of refactoring. This patch does not use an "embedded producer" in the
consumer. i.e., the consumer issues offset commit/fetch requests directly to
the broker. Also, I decided against doing any kind of request forwarding and
added a "ConsumerMetadataRequest" that will be used to determine the offset
coordinator (and subsequently group coordinator that may be useful for the
client rewrite - see
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design).
Also, there were some questions on how to support multiple offset manager
implementations cleanly. After thinking about it I think it makes the code
simpler and clearer if we just have one good offset storage mechanism (i.e.,
Kafka-based). Consumers that want to store offsets elsewhere can do so on
their own. (However, if we do want to do this somewhat cleanly, see the
discussion on separation of APIs below.)
Here is a quick recap of how offset management within Kafka works:
- A special __offsets topic holds consumer offsets.
- The consumer group serves as the partition key for offsets committed to
the __offsets topic. i.e., all offsets for all topics that a group
consumes will be in a single partition of the offsets topic.
- The "group-topic-partition" is the actual (stored) key in each message of
the offsets topic. This facilitates de-duplication (and thus removal) of
older offsets.
- The offset manager also contains an in-memory cache of offsets so it can
serve offset fetch requests quickly.
- Think of commits as a little more than a produce request. If and only if
the commit is appended to the __offsets log as a regular produce request
we update the offsets cache. So the semantics are identical to a produce
request. Offset fetches return whatever is in the cache. If it is absent,
and offsets have not yet been loaded from the logs into the cache (on
becoming a leader), then we return an "OffsetsLoading" error code.
(Tejas' wiki has pretty good diagrams that describe the above.)
Some more details:
- Atomicity per-commit: One drawback of the Zookeeper-based offset commits
is that we when we commit multiple offsets (since we don't use
multi-write) we have to write offsets serially so it is not atomic. In
this implementation I went with Jun's suggestion on using a compressed
message set. This ensures that we will disallow partial commits of a bulk
commit. I have hard-coded this to GZIP but maybe it is better to just
expose a config. Another option is to introduce an identity compression
codec.
- The main corner cases to consider are when there is leader movement due to
broker failures and simultaneous offset commits/fetches. Offset fetches
would only occur if there are consumer-side rebalances or shutdowns. The
guarantees we want to provide are: (i) successfully acknowledged offset
commits should be returned on the next offset fetch - i.e., should not be
lost (ii) offset fetches should never return a stale offset.
- On becoming a follower of an offsets topic partition:
- Partition.makeFollower clears the offset cache of entries belonging to
this partition of __offsets.
- Any subsequent offset fetch request will find out that the partition
is no longer a leader and fail. There is one problem in the existing
patch which I will highlight in the RB along with a suggested fix.
- Likewise, any subsequent offset commit request will fail (since the
underlying producer request will fail). It is okay if the underlying
producer request succeeds and the broker becomes a follower for that
partition just before the offset cache is updated (since the broker
will not serve any OffsetFetchRequests for that partition until it
becomes a leader again).
- On becoming a leader of an offsets topic partition:
- Partition.makeLeader: will load the offsets from the log
(asynchronously). While this is in progress, the broker rejects offset
fetches to this partition. Offset commits may continue to arrive -
i.e., will be appended to the log and then written to the cache. The
load loop might actually overwrite it with an earlier offset from the
log but that is okay - since it will eventually reach the more recent
update in the log and load that into the cache.
Migrating from ZooKeeper-based offset storage to Kafka-based offset storage:
- The broker config should set offsets.backup.enabled=true
- Upgrade the brokers to the latest jar. (Consumers still commit
directly to ZooKeeper).
- Start migrating the consumers over.
- Consumers will now start sending offset commits to the broker. Since the
backup setting is enabled, offsets will also be committed to ZooKeeper.
This is necessary when migrating consumers.
- After _all_ consumers have moved over you can turn off the backup.
I have made a number of preliminary comments as TODOs in the RB myself (i.e.,
as a note to myself and others reviewing).
Questions/comments for discussion
- Should we explicitly disallow changes to the number of offset topic
partitions?
This is necessary (or at least prompt with a warning) since changing the
number
of partitions would affect the partitioning strategy.
- Should we remove per-partition error codes for offset commits and use just
a global error code for the entire request? I'm using compressed message
sets for commits. i.e., the log append for a given commit will either
fail entirely or succeed entirely. The OffsetCommitResponse contains
per-partition error codes. So if the log append fails for any reason the
same error code would apply for all partitions. i.e., it is sufficient to
have a global error code. I think we currently have per-partition error
codes due to the fact that offset commit requests can include metadata for
each offset. The per-partition error code is set to MetadataTooLarge if
the metadata entry exceeds the MaxMetadataLength. However, in this case I
would prefer to just fail the entire request as opposed to doing partial
commits (as I am in the current patch). Anyone have thoughts on this?
- Error codes: right now I'm using existing error codes (with the exception
of OffsetsLoading). It may be better to return more specific error codes
but I'm not sure if it matters - since the client-side implementation
needs to check for _any_ error and if any error exists (other than
MetadataTooLarge) just retry the offset commit/fetch until it succeeds.
i.e., the client should not really care about the actual error. If people
have any strong preference on this let me know.
- Separation of APIs: Right now, the offset manager, replica manager are
intertwined which is less than ideal. It is okay if offset manager depends
on replica manager but not the other way around. Ideally, I would like to
have KafkaApis hand off offset commit/fetch requests to the offset manager
which then handles it. However, the inter-dependence comes about due to
the need to clear out the offset cache on becoming a follower and the need
to load offsets on becoming a leader. I think we can improve the
separation as follows:
- Don't optimistically load offsets/clear offsets on a leader/follower
transition. Instead, load offsets only when an offset fetch request
arrives for a partition that had not been loaded yet.
- The OffsetManager will need to maintain a Map[partition ->
lastKnownLeaderEpoch] to determine whether to load offsets or not.
- The above will eliminate the reference to OffsetManager from
ReplicaManager. KafkaApis still needs to reference the OffsetManager and
will need to create the offset commit message to append to the __offsets
log.
- We can actually avoid the need for KafkaApis to know about offset commit
messsages as well: in order to do that, we will need to create a
"DurableLog" layer on top of LogManager and move all the purgatory stuff
in there. The LogManager supports appends/reads from the local log, but
does not know anything about the replicas. Instead, we can have a
DurableLog layer that depends on ReplicaManager and LogManager and
contains the Producer/Fetch-Request purgatories. So OffsetManager will
need to depend on this DurableLog component. So KafkaApis can just hand
off ProducerRequests, FetchRequests to the DurableLog layer directly. It
will hand off OffsetCommit/OffsetFetch requests to the OffsetManager
which will then hand it off to the DurableLog layer.
- Is the above worth it? I'm not sure it is, especially if we are sticking
to only one offset management implementation.
Diffs (updated)
-----
core/src/main/scala/kafka/admin/TopicCommand.scala dc9b092
core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala PRE-CREATION
core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala PRE-CREATION
core/src/main/scala/kafka/api/OffsetCommitRequest.scala 4d1fa5c
core/src/main/scala/kafka/api/OffsetCommitResponse.scala 9e1795f
core/src/main/scala/kafka/api/OffsetFetchRequest.scala 7036532
core/src/main/scala/kafka/api/RequestKeys.scala c81214f
core/src/main/scala/kafka/client/ClientUtils.scala 1d2f81b
core/src/main/scala/kafka/cluster/Partition.scala 882b6da
core/src/main/scala/kafka/common/ConsumerCoordinatorNotAvailableException.scala
PRE-CREATION
core/src/main/scala/kafka/common/ErrorMapping.scala b0b5dce
core/src/main/scala/kafka/common/NotCoordinatorForConsumerException.scala
PRE-CREATION
core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 59608a3
core/src/main/scala/kafka/common/OffsetsLoadInProgressException.scala
PRE-CREATION
core/src/main/scala/kafka/common/Topic.scala c1b9f65
core/src/main/scala/kafka/consumer/ConsoleConsumer.scala dc066c2
core/src/main/scala/kafka/consumer/ConsumerConfig.scala e6875d6
core/src/main/scala/kafka/consumer/ConsumerConnector.scala 13c3f77
core/src/main/scala/kafka/consumer/SimpleConsumer.scala fa7caa7
core/src/main/scala/kafka/consumer/TopicCount.scala e332633
core/src/main/scala/kafka/consumer/TopicFilter.scala 4f20823
core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 703b2e2
core/src/main/scala/kafka/controller/KafkaController.scala 4deff9d
core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala PRE-CREATION
core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala 57b9d2a
core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala 570bf31
core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java c45c803
core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
58e83f6
core/src/main/scala/kafka/log/FileMessageSet.scala e1f8b97
core/src/main/scala/kafka/producer/ProducerConfig.scala 7947b18
core/src/main/scala/kafka/server/KafkaApis.scala 215ac36
core/src/main/scala/kafka/server/KafkaConfig.scala b871843
core/src/main/scala/kafka/server/KafkaServer.scala feb2093
core/src/main/scala/kafka/server/OffsetManager.scala PRE-CREATION
core/src/main/scala/kafka/server/ReplicaManager.scala fb759d9
core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 33d7c2c
core/src/main/scala/kafka/tools/DumpLogSegments.scala 14f44d9
core/src/main/scala/kafka/tools/MirrorMaker.scala f0f871c
core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 5e8c56d
core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala eac9af2
core/src/main/scala/kafka/utils/VerifiableProperties.scala b070bb4
core/src/main/scala/kafka/utils/ZkUtils.scala a198628
core/src/test/scala/other/kafka/TestOffsetManager.scala PRE-CREATION
core/src/test/scala/other/kafka/TestZKConsumerOffsets.scala 31534ca
core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala dbe078c
core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
eb274d1
core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala cf2724b
core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
8fe7259
core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 6a96d80
core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 1317b4c
system_test/migration_tool_testsuite/migration_tool_test.py 2fecd19
system_test/mirror_maker/README da53c14
system_test/mirror_maker/bin/expected.out 0a1bbaf
system_test/mirror_maker/bin/run-test.sh e5e6c08
system_test/mirror_maker/config/blacklisttest.consumer.properties ff12015
system_test/mirror_maker/config/mirror_producer.properties aa8be65
system_test/mirror_maker/config/server_source_1_1.properties 2f070a7
system_test/mirror_maker/config/server_source_1_2.properties f9353e8
system_test/mirror_maker/config/server_source_2_1.properties daa01ad
system_test/mirror_maker/config/server_source_2_2.properties be6fdfc
system_test/mirror_maker/config/server_target_1_1.properties d37955a
system_test/mirror_maker/config/server_target_1_2.properties aa7546c
system_test/mirror_maker/config/whitelisttest_1.consumer.properties ff12015
system_test/mirror_maker/config/whitelisttest_2.consumer.properties f1a902b
system_test/mirror_maker/config/zookeeper_source_1.properties f851796
system_test/mirror_maker/config/zookeeper_source_2.properties d534d18
system_test/mirror_maker/config/zookeeper_target.properties 55a7eb1
system_test/mirror_maker_testsuite/mirror_maker_test.py fd18088
system_test/offset_management_testsuite/cluster_config.json PRE-CREATION
system_test/offset_management_testsuite/config/console_consumer.properties
PRE-CREATION
system_test/offset_management_testsuite/config/server.properties PRE-CREATION
system_test/offset_management_testsuite/config/zookeeper.properties
PRE-CREATION
system_test/offset_management_testsuite/offset_management_test.py
PRE-CREATION
system_test/offset_management_testsuite/testcase_7001/testcase_7001_properties.json
PRE-CREATION
system_test/offset_management_testsuite/testcase_7002/config/kafka_server_1.properties
PRE-CREATION
system_test/offset_management_testsuite/testcase_7002/config/kafka_server_2.properties
PRE-CREATION
system_test/offset_management_testsuite/testcase_7002/config/kafka_server_3.properties
PRE-CREATION
system_test/offset_management_testsuite/testcase_7002/config/kafka_server_4.properties
PRE-CREATION
system_test/offset_management_testsuite/testcase_7002/config/zookeeper_0.properties
PRE-CREATION
system_test/offset_management_testsuite/testcase_7002/testcase_7002_properties.json
PRE-CREATION
system_test/replication_testsuite/replica_basic_test.py 5d3d93e
system_test/utils/kafka_system_test_utils.py 29ab2ba
system_test/utils/testcase_env.py bee8716
Diff: https://reviews.apache.org/r/18022/diff/
Testing
-------
Thanks,
Joel Koshy