Repository: kafka
Updated Branches:
  refs/heads/trunk 5653249e0 -> 62dc1afb6


KAFKA-3718; propagate all KafkaConfig __consumer_offsets configs to 
OffsetConfig instantiation

Kafka has two configurable compression codecs: the one used by the client 
(source codec) and the one finally used when storing into the log (target 
codec). The target codec defaults to KafkaConfig.compressionType and can be 
dynamically configured through zookeeper.

The GroupCoordinator appends group membership information into the 
__consumer_offsets topic by:
1. making a message with group membership information
2. making a MessageSet with the single message compressed with the source codec
3. doing a log.append on the MessageSet

Without this patch, KafkaConfig.offsetsTopicCompressionCodec doesn't get 
propagated to OffsetConfig instantiation, so GroupMetadataManager uses a source 
codec of NoCompressionCodec when making the MessageSet. Let's say we have 
enough group information such that the message formed exceeds 
KafkaConfig.messageMaxBytes before compression but would fall below the 
threshold after compression using our source codec. Even if we had dynamically 
configured __consumer_offsets with our favorite compression codec, the 
log.append will throw RecordTooLargeException during 
analyzeAndValidateMessageSet since the message was unexpectedly uncompressed 
instead of having been compressed with the source codec defined by 
KafkaConfig.offsetsTopicCompressionCodec.

Author: Onur Karaman <[email protected]>

Reviewers: Manikumar Reddy <[email protected]>, Jason Gustafson 
<[email protected]>, Ismael Juma <[email protected]>

Closes #1394 from onurkaraman/KAFKA-3718


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62dc1afb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62dc1afb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62dc1afb

Branch: refs/heads/trunk
Commit: 62dc1afb69369c64207991ba59bcd203505d37ea
Parents: 5653249
Author: Onur Karaman <[email protected]>
Authored: Thu May 26 09:17:31 2016 +0100
Committer: Ismael Juma <[email protected]>
Committed: Thu May 26 09:17:31 2016 +0100

----------------------------------------------------------------------
 .../kafka/coordinator/GroupCoordinator.scala    | 10 ++--
 .../main/scala/kafka/server/KafkaServer.scala   | 14 ++---
 .../kafka/api/AuthorizerIntegrationTest.scala   |  2 +-
 .../api/GroupCoordinatorIntegrationTest.scala   | 63 ++++++++++++++++++++
 .../kafka/api/IntegrationTestHarness.scala      |  2 +-
 5 files changed, 78 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/62dc1afb/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index fb71254..f445764 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.common.{OffsetAndMetadata, OffsetMetadataAndError, 
TopicAndPartition}
 import kafka.log.LogConfig
-import kafka.message.UncompressedCodec
+import kafka.message.ProducerCompressionCodec
 import kafka.server._
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
@@ -65,7 +65,7 @@ class GroupCoordinator(val brokerId: Int,
     val props = new Properties
     props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
     props.put(LogConfig.SegmentBytesProp, 
offsetConfig.offsetsTopicSegmentBytes.toString)
-    props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name)
+    props.put(LogConfig.CompressionTypeProp, ProducerCompressionCodec.name)
     props
   }
 
@@ -744,14 +744,16 @@ object GroupCoordinator {
       offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
       offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
       offsetsTopicNumPartitions = config.offsetsTopicPartitions,
+      offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
       offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
+      offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
       offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
       offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
     val groupConfig = GroupConfig(groupMinSessionTimeoutMs = 
config.groupMinSessionTimeoutMs,
       groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
 
-    val groupManager = new GroupMetadataManager(config.brokerId, offsetConfig, 
replicaManager, zkUtils, time)
-    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, 
groupManager, heartbeatPurgatory, joinPurgatory, time)
+    val groupMetadataManager = new GroupMetadataManager(config.brokerId, 
offsetConfig, replicaManager, zkUtils, time)
+    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, 
groupMetadataManager, heartbeatPurgatory, joinPurgatory, time)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/62dc1afb/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 2832ebc..de3054a 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -123,7 +123,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
   var dynamicConfigHandlers: Map[String, ConfigHandler] = null
   var dynamicConfigManager: DynamicConfigManager = null
 
-  var consumerCoordinator: GroupCoordinator = null
+  var groupCoordinator: GroupCoordinator = null
 
   var kafkaController: KafkaController = null
 
@@ -199,9 +199,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
         kafkaController = new KafkaController(config, zkUtils, brokerState, 
kafkaMetricsTime, metrics, threadNamePrefix)
         kafkaController.startup()
 
-        /* start kafka coordinator */
-        consumerCoordinator = GroupCoordinator(config, zkUtils, 
replicaManager, kafkaMetricsTime)
-        consumerCoordinator.startup()
+        /* start group coordinator */
+        groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, 
kafkaMetricsTime)
+        groupCoordinator.startup()
 
         /* Get the authorizer and initialize it if one is specified.*/
         authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map 
{ authorizerClassName =>
@@ -211,7 +211,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
         }
 
         /* start processing requests */
-        apis = new KafkaApis(socketServer.requestChannel, replicaManager, 
consumerCoordinator,
+        apis = new KafkaApis(socketServer.requestChannel, replicaManager, 
groupCoordinator,
           kafkaController, zkUtils, config.brokerId, config, metadataCache, 
metrics, authorizer)
         requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, 
socketServer.requestChannel, apis, config.numIoThreads)
         brokerState.newState(RunningAsBroker)
@@ -555,8 +555,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
           CoreUtils.swallow(replicaManager.shutdown())
         if(logManager != null)
           CoreUtils.swallow(logManager.shutdown())
-        if(consumerCoordinator != null)
-          CoreUtils.swallow(consumerCoordinator.shutdown())
+        if(groupCoordinator != null)
+          CoreUtils.swallow(groupCoordinator.shutdown())
         if(kafkaController != null)
           CoreUtils.swallow(kafkaController.shutdown())
         if(zkUtils != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/62dc1afb/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index bc705f1..2d5900f 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -148,7 +148,7 @@ class AuthorizerIntegrationTest extends 
KafkaServerTestHarness {
       1,
       1,
       servers,
-      servers.head.consumerCoordinator.offsetsTopicConfigs)
+      servers.head.groupCoordinator.offsetsTopicConfigs)
     // create the test topic with all the brokers as replicas
     TestUtils.createTopic(zkUtils, topic, 1, 1, this.servers)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/62dc1afb/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
new file mode 100644
index 0000000..9183d0f
--- /dev/null
+++ 
b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package integration.kafka.api
+
+import kafka.common.TopicAndPartition
+import kafka.integration.KafkaServerTestHarness
+import kafka.log.Log
+import kafka.message.GZIPCompressionCodec
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.OffsetAndMetadata
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.internals.TopicConstants
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.Test
+import org.junit.Assert._
+
+import scala.collection.JavaConverters._
+import java.util.Properties
+
+class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness {
+  val offsetsTopicCompressionCodec = GZIPCompressionCodec
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+  overridingProps.put(KafkaConfig.OffsetsTopicCompressionCodecProp, 
offsetsTopicCompressionCodec.codec.toString)
+
+  override def generateConfigs = TestUtils.createBrokerConfigs(1, zkConnect, 
enableControlledShutdown = false).map {
+    KafkaConfig.fromProps(_, overridingProps)
+  }
+
+  @Test
+  def testGroupCoordinatorPropagatesOfffsetsTopicCompressionCodec() {
+    val consumer = 
TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
+                                               securityProtocol = 
SecurityProtocol.PLAINTEXT)
+    val offsetMap = Map(
+      new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, 0) -> new 
OffsetAndMetadata(10, "")
+    ).asJava
+    consumer.commitSync(offsetMap)
+    val logManager = servers.head.getLogManager
+
+    def getGroupMetadataLogOpt: Option[Log] =
+      
logManager.getLog(TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, 
0))
+
+    TestUtils.waitUntilTrue(() => 
getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.nonEmpty)),
+                            "Commit message not appended in time")
+
+    val logSegments = getGroupMetadataLogOpt.get.logSegments
+    val incorrectCompressionCodecs = 
logSegments.flatMap(_.log.map(_.message.compressionCodec)).filter(_ != 
offsetsTopicCompressionCodec)
+    assertEquals("Incorrect compression codecs should be empty", Seq.empty, 
incorrectCompressionCodecs)
+
+    consumer.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/62dc1afb/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index de05c9c..6e76f90 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -81,7 +81,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
       serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
       
serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
       servers,
-      servers(0).consumerCoordinator.offsetsTopicConfigs)
+      servers(0).groupCoordinator.offsetsTopicConfigs)
   }
 
   @After

Reply via email to