Repository: kafka Updated Branches: refs/heads/trunk 5653249e0 -> 62dc1afb6
KAFKA-3718; propagate all KafkaConfig __consumer_offsets configs to OffsetConfig instantiation Kafka has two configurable compression codecs: the one used by the client (source codec) and the one finally used when storing into the log (target codec). The target codec defaults to KafkaConfig.compressionType and can be dynamically configured through zookeeper. The GroupCoordinator appends group membership information into the __consumer_offsets topic by: 1. making a message with group membership information 2. making a MessageSet with the single message compressed with the source codec 3. doing a log.append on the MessageSet Without this patch, KafkaConfig.offsetsTopicCompressionCodec doesn't get propagated to OffsetConfig instantiation, so GroupMetadataManager uses a source codec of NoCompressionCodec when making the MessageSet. Let's say we have enough group information such that the message formed exceeds KafkaConfig.messageMaxBytes before compression but would fall below the threshold after compression using our source codec. Even if we had dynamically configured __consumer_offsets with our favorite compression codec, the log.append will throw RecordTooLargeException during analyzeAndValidateMessageSet since the message was unexpectedly uncompressed instead of having been compressed with the source codec defined by KafkaConfig.offsetsTopicCompressionCodec. Author: Onur Karaman <[email protected]> Reviewers: Manikumar Reddy <[email protected]>, Jason Gustafson <[email protected]>, Ismael Juma <[email protected]> Closes #1394 from onurkaraman/KAFKA-3718 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62dc1afb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62dc1afb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62dc1afb Branch: refs/heads/trunk Commit: 62dc1afb69369c64207991ba59bcd203505d37ea Parents: 5653249 Author: Onur Karaman <[email protected]> Authored: Thu May 26 09:17:31 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Thu May 26 09:17:31 2016 +0100 ---------------------------------------------------------------------- .../kafka/coordinator/GroupCoordinator.scala | 10 ++-- .../main/scala/kafka/server/KafkaServer.scala | 14 ++--- .../kafka/api/AuthorizerIntegrationTest.scala | 2 +- .../api/GroupCoordinatorIntegrationTest.scala | 63 ++++++++++++++++++++ .../kafka/api/IntegrationTestHarness.scala | 2 +- 5 files changed, 78 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/62dc1afb/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala index fb71254..f445764 100644 --- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition} import kafka.log.LogConfig -import kafka.message.UncompressedCodec +import kafka.message.ProducerCompressionCodec import kafka.server._ import kafka.utils._ import org.apache.kafka.common.TopicPartition @@ -65,7 +65,7 @@ class GroupCoordinator(val brokerId: Int, val props = new Properties props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString) - props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name) + props.put(LogConfig.CompressionTypeProp, ProducerCompressionCodec.name) props } @@ -744,14 +744,16 @@ object GroupCoordinator { offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, offsetsTopicNumPartitions = config.offsetsTopicPartitions, + offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes, offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, + offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec, offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs, groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs) - val groupManager = new GroupMetadataManager(config.brokerId, offsetConfig, replicaManager, zkUtils, time) - new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupManager, heartbeatPurgatory, joinPurgatory, time) + val groupMetadataManager = new GroupMetadataManager(config.brokerId, offsetConfig, replicaManager, zkUtils, time) + new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/62dc1afb/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 2832ebc..de3054a 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -123,7 +123,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr var dynamicConfigHandlers: Map[String, ConfigHandler] = null var dynamicConfigManager: DynamicConfigManager = null - var consumerCoordinator: GroupCoordinator = null + var groupCoordinator: GroupCoordinator = null var kafkaController: KafkaController = null @@ -199,9 +199,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr kafkaController = new KafkaController(config, zkUtils, brokerState, kafkaMetricsTime, metrics, threadNamePrefix) kafkaController.startup() - /* start kafka coordinator */ - consumerCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime) - consumerCoordinator.startup() + /* start group coordinator */ + groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, kafkaMetricsTime) + groupCoordinator.startup() /* Get the authorizer and initialize it if one is specified.*/ authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName => @@ -211,7 +211,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr } /* start processing requests */ - apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator, + apis = new KafkaApis(socketServer.requestChannel, replicaManager, groupCoordinator, kafkaController, zkUtils, config.brokerId, config, metadataCache, metrics, authorizer) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) @@ -555,8 +555,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr CoreUtils.swallow(replicaManager.shutdown()) if(logManager != null) CoreUtils.swallow(logManager.shutdown()) - if(consumerCoordinator != null) - CoreUtils.swallow(consumerCoordinator.shutdown()) + if(groupCoordinator != null) + CoreUtils.swallow(groupCoordinator.shutdown()) if(kafkaController != null) CoreUtils.swallow(kafkaController.shutdown()) if(zkUtils != null) http://git-wip-us.apache.org/repos/asf/kafka/blob/62dc1afb/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index bc705f1..2d5900f 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -148,7 +148,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { 1, 1, servers, - servers.head.consumerCoordinator.offsetsTopicConfigs) + servers.head.groupCoordinator.offsetsTopicConfigs) // create the test topic with all the brokers as replicas TestUtils.createTopic(zkUtils, topic, 1, 1, this.servers) } http://git-wip-us.apache.org/repos/asf/kafka/blob/62dc1afb/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala new file mode 100644 index 0000000..9183d0f --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package integration.kafka.api + +import kafka.common.TopicAndPartition +import kafka.integration.KafkaServerTestHarness +import kafka.log.Log +import kafka.message.GZIPCompressionCodec +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.OffsetAndMetadata +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.internals.TopicConstants +import org.apache.kafka.common.protocol.SecurityProtocol +import org.junit.Test +import org.junit.Assert._ + +import scala.collection.JavaConverters._ +import java.util.Properties + +class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness { + val offsetsTopicCompressionCodec = GZIPCompressionCodec + val overridingProps = new Properties() + overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1") + overridingProps.put(KafkaConfig.OffsetsTopicCompressionCodecProp, offsetsTopicCompressionCodec.codec.toString) + + override def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { + KafkaConfig.fromProps(_, overridingProps) + } + + @Test + def testGroupCoordinatorPropagatesOfffsetsTopicCompressionCodec() { + val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), + securityProtocol = SecurityProtocol.PLAINTEXT) + val offsetMap = Map( + new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, 0) -> new OffsetAndMetadata(10, "") + ).asJava + consumer.commitSync(offsetMap) + val logManager = servers.head.getLogManager + + def getGroupMetadataLogOpt: Option[Log] = + logManager.getLog(TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, 0)) + + TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.nonEmpty)), + "Commit message not appended in time") + + val logSegments = getGroupMetadataLogOpt.get.logSegments + val incorrectCompressionCodecs = logSegments.flatMap(_.log.map(_.message.compressionCodec)).filter(_ != offsetsTopicCompressionCodec) + assertEquals("Incorrect compression codecs should be empty", Seq.empty, incorrectCompressionCodecs) + + consumer.close() + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/62dc1afb/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index de05c9c..6e76f90 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -81,7 +81,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt, serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt, servers, - servers(0).consumerCoordinator.offsetsTopicConfigs) + servers(0).groupCoordinator.offsetsTopicConfigs) } @After
