Repository: kafka Updated Branches: refs/heads/trunk dce06766d -> 30e78fa00
KAFKA-2832: Add a consumer config option to exclude internal topics A new consumer config option 'exclude.internal.topics' was added to allow excluding internal topics when wildcards are used to specify consumers. The new option takes a boolean value, with a default 'false' value (i.e. no exclusion). This patch is co-authored with rajinisivaram edoardocomar mimaison Author: edoardo <[email protected]> Author: Vahid Hashemian <[email protected]> Reviewers: Ismael Juma, Jun Rao, Gwen Shapira Closes #1082 from edoardocomar/KAFKA-2832 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/30e78fa0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/30e78fa0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/30e78fa0 Branch: refs/heads/trunk Commit: 30e78fa00650b258f3ab5ef6c9bdf5ca137289c0 Parents: dce0676 Author: edoardo <[email protected]> Authored: Thu Mar 17 12:33:47 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Thu Mar 17 12:33:47 2016 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/ConsumerConfig.java | 11 ++++++ .../kafka/clients/consumer/KafkaConsumer.java | 3 +- .../consumer/internals/ConsumerCoordinator.java | 9 +++-- .../kafka/common/internals/TopicConstants.java | 33 +++++++++++++++++ .../internals/ConsumerCoordinatorTest.java | 37 ++++++++++++++++---- .../main/scala/kafka/admin/TopicCommand.scala | 7 ++-- core/src/main/scala/kafka/common/Topic.scala | 2 -- .../main/scala/kafka/consumer/TopicFilter.scala | 5 +-- .../kafka/coordinator/GroupCoordinator.scala | 3 -- .../coordinator/GroupMetadataManager.scala | 30 ++++++++-------- .../src/main/scala/kafka/server/KafkaApis.scala | 11 +++--- .../scala/kafka/server/ReplicaManager.scala | 5 ++- .../kafka/api/AuthorizerIntegrationTest.scala | 5 ++- .../kafka/api/BaseConsumerTest.scala | 7 ++-- .../kafka/api/IntegrationTestHarness.scala | 4 +-- .../kafka/api/ProducerFailureHandlingTest.scala | 4 +-- .../unit/kafka/admin/TopicCommandTest.scala | 9 ++--- .../unit/kafka/consumer/TopicFilterTest.scala | 9 ++--- .../GroupCoordinatorResponseTest.scala | 14 ++++---- 19 files changed, 135 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index bd9efc3..9101307 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -172,6 +172,12 @@ public class ConsumerConfig extends AbstractConfig { public static final String MAX_POLL_RECORDS_CONFIG = "max.poll.records"; private static final String MAX_POLL_RECORDS_DOC = "The maximum number of records returned in a single call to poll()."; + /** <code>exclude.internal.topics</code> */ + public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG = "exclude.internal.topics"; + private static final String EXCLUDE_INTERNAL_TOPICS_DOC = "Whether records from internal topics (such as offsets) should be exposed to the consumer. " + + "If set to <code>true</code> the only way to receive records from an internal topic is subscribing to it."; + public static final boolean EXCLUDE_INTERNAL_TOPICS_DEFAULT = true; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, @@ -316,6 +322,11 @@ public class ConsumerConfig extends AbstractConfig { atLeast(1), Importance.MEDIUM, MAX_POLL_RECORDS_DOC) + .define(EXCLUDE_INTERNAL_TOPICS_CONFIG, + Type.BOOLEAN, + EXCLUDE_INTERNAL_TOPICS_DEFAULT, + Importance.MEDIUM, + EXCLUDE_INTERNAL_TOPICS_DOC) // security support .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index d7c8e14..804a160 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -612,7 +612,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { new ConsumerCoordinator.DefaultOffsetCommitCallback(), config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), - this.interceptors); + this.interceptors, + config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG)); if (keyDeserializer == null) { this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class); http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 2ae1437..cf93530 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.internals.TopicConstants; import org.apache.kafka.common.metrics.Measurable; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -69,6 +70,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private final boolean autoCommitEnabled; private final AutoCommitTask autoCommitTask; private final ConsumerInterceptors<?, ?> interceptors; + private final boolean excludeInternalTopics; /** * Initialize the coordination manager. @@ -87,7 +89,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { OffsetCommitCallback defaultOffsetCommitCallback, boolean autoCommitEnabled, long autoCommitIntervalMs, - ConsumerInterceptors<?, ?> interceptors) { + ConsumerInterceptors<?, ?> interceptors, + boolean excludeInternalTopics) { super(client, groupId, sessionTimeoutMs, @@ -110,6 +113,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null; this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix); this.interceptors = interceptors; + this.excludeInternalTopics = excludeInternalTopics; } @Override @@ -140,7 +144,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { final List<String> topicsToSubscribe = new ArrayList<>(); for (String topic : cluster.topics()) - if (subscriptions.getSubscribedPattern().matcher(topic).matches()) + if (subscriptions.getSubscribedPattern().matcher(topic).matches() && + !(excludeInternalTopics && TopicConstants.INTERNAL_TOPICS.contains(topic))) topicsToSubscribe.add(topic); subscriptions.changeSubscription(topicsToSubscribe); http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java b/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java new file mode 100644 index 0000000..5d6b992 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/internals/TopicConstants.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.internals; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; + +public final class TopicConstants { + + //avoid instantiation + private TopicConstants() { + } + + // TODO: we store both group metadata and offset data here despite the topic name being offsets only + public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets"; + public static final Collection<String> INTERNAL_TOPICS = Collections.unmodifiableSet(new HashSet<String>(Arrays.asList(GROUP_METADATA_TOPIC_NAME))); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 0b8a162..260ee7a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.consumer.CommitFailedException; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetCommitCallback; @@ -34,6 +35,7 @@ import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.DisconnectException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.OffsetMetadataTooLarge; +import org.apache.kafka.common.internals.TopicConstants; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; @@ -63,6 +65,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -107,7 +110,7 @@ public class ConsumerCoordinatorTest { this.partitionAssignor.clear(); client.setNode(node); - this.coordinator = buildCoordinator(metrics, assignors); + this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT); } @After @@ -263,7 +266,7 @@ public class ConsumerCoordinatorTest { } @Test(expected = ApiException.class) - public void testJoinGroupInvalidGroupId() { + public void testJoinGroupInvalidGroupId() { final String consumerId = "leader"; subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); @@ -509,7 +512,7 @@ public class ConsumerCoordinatorTest { } @Test - public void testMetadataChangeTriggersRebalance() { + public void testMetadataChangeTriggersRebalance() { final String consumerId = "consumer"; subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); @@ -533,6 +536,25 @@ public class ConsumerCoordinatorTest { } @Test + public void testExcludeInternalTopicsConfigOption() { + subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener); + + metadata.update(TestUtils.singletonCluster(TopicConstants.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds()); + + assertFalse(subscriptions.partitionAssignmentNeeded()); + } + + @Test + public void testIncludeInternalTopicsConfigOption() { + coordinator = buildCoordinator(new Metrics(), assignors, false); + subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener); + + metadata.update(TestUtils.singletonCluster(TopicConstants.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds()); + + assertTrue(subscriptions.partitionAssignmentNeeded()); + } + + @Test public void testRejoinGroup() { subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); subscriptions.needReassignment(); @@ -882,7 +904,7 @@ public class ConsumerCoordinatorTest { RangeAssignor range = new RangeAssignor(); try (Metrics metrics = new Metrics(time)) { - ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range)); + ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range), ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT); List<ProtocolMetadata> metadata = coordinator.metadata(); assertEquals(2, metadata.size()); assertEquals(roundRobin.name(), metadata.get(0).name()); @@ -890,7 +912,7 @@ public class ConsumerCoordinatorTest { } try (Metrics metrics = new Metrics(time)) { - ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin)); + ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin), ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT); List<ProtocolMetadata> metadata = coordinator.metadata(); assertEquals(2, metadata.size()); assertEquals(range.name(), metadata.get(0).name()); @@ -898,7 +920,7 @@ public class ConsumerCoordinatorTest { } } - private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> assignors) { + private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> assignors, boolean excludeInternalTopics) { return new ConsumerCoordinator( consumerClient, groupId, @@ -914,7 +936,8 @@ public class ConsumerCoordinatorTest { defaultOffsetCommitCallback, autoCommitEnabled, autoCommitIntervalMs, - null); + null, + excludeInternalTopics); } private Struct consumerMetadataResponse(Node node, short error) { http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/admin/TopicCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index e89e09d..b3b0635 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -18,7 +18,6 @@ package kafka.admin import java.util.Properties - import joptsimple._ import kafka.common.{AdminCommandFailedException, Topic, TopicExistsException} import kafka.consumer.{ConsumerConfig, Whitelist} @@ -30,9 +29,9 @@ import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.Utils - import scala.collection.JavaConversions._ import scala.collection._ +import org.apache.kafka.common.internals.TopicConstants object TopicCommand extends Logging { @@ -138,7 +137,7 @@ object TopicCommand extends Logging { } if(opts.options.has(opts.partitionsOpt)) { - if (topic == GroupCoordinator.GroupMetadataTopicName) { + if (topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) { throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.") } println("WARNING: If partitions are increased for a topic that has a key, the partition " + @@ -171,7 +170,7 @@ object TopicCommand extends Logging { } topics.foreach { topic => try { - if (Topic.InternalTopics.contains(topic)) { + if (TopicConstants.INTERNAL_TOPICS.contains(topic)) { throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic)) } else { zkUtils.createPersistentPath(getDeleteTopicPath(topic)) http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/common/Topic.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala index 55d2bdb..930d0e4 100644 --- a/core/src/main/scala/kafka/common/Topic.scala +++ b/core/src/main/scala/kafka/common/Topic.scala @@ -25,8 +25,6 @@ object Topic { private val maxNameLength = 255 private val rgx = new Regex(legalChars + "+") - val InternalTopics = Set(GroupCoordinator.GroupMetadataTopicName) - def validate(topic: String) { if (topic.length <= 0) throw new org.apache.kafka.common.errors.InvalidTopicException("topic name is illegal, can't be empty") http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/consumer/TopicFilter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala index 5a13540..b89968e 100644 --- a/core/src/main/scala/kafka/consumer/TopicFilter.scala +++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala @@ -21,6 +21,7 @@ package kafka.consumer import kafka.utils.Logging import java.util.regex.{PatternSyntaxException, Pattern} import kafka.common.Topic +import org.apache.kafka.common.internals.TopicConstants sealed abstract class TopicFilter(rawRegex: String) extends Logging { @@ -47,7 +48,7 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging { case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) { override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = { - val allowed = topic.matches(regex) && !(Topic.InternalTopics.contains(topic) && excludeInternalTopics) + val allowed = topic.matches(regex) && !(TopicConstants.INTERNAL_TOPICS.contains(topic) && excludeInternalTopics) debug("%s %s".format( topic, if (allowed) "allowed" else "filtered")) @@ -60,7 +61,7 @@ case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) { case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) { override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = { - val allowed = (!topic.matches(regex)) && !(Topic.InternalTopics.contains(topic) && excludeInternalTopics) + val allowed = (!topic.matches(regex)) && !(TopicConstants.INTERNAL_TOPICS.contains(topic) && excludeInternalTopics) debug("%s %s".format( topic, if (allowed) "allowed" else "filtered")) http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala index 36d7bbb..30a3a78 100644 --- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala @@ -725,9 +725,6 @@ object GroupCoordinator { val EmptyGroup = GroupSummary(NoState, NoProtocolType, NoProtocol, NoMembers) val DeadGroup = GroupSummary(Dead.toString, NoProtocolType, NoProtocol, NoMembers) - // TODO: we store both group metadata and offset data here despite the topic name being offsets only - val GroupMetadataTopicName = "__consumer_offsets" - def apply(config: KafkaConfig, zkUtils: ZkUtils, replicaManager: ReplicaManager, http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index 2c0236e..c6bc44e 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -18,7 +18,6 @@ package kafka.coordinator import java.util.concurrent.locks.ReentrantReadWriteLock - import kafka.utils.CoreUtils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors @@ -40,14 +39,13 @@ import kafka.metrics.KafkaMetricsGroup import kafka.common.TopicAndPartition import kafka.common.MessageFormatter import kafka.server.ReplicaManager - import scala.collection._ import java.io.PrintStream import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.TimeUnit - import com.yammer.metrics.core.Gauge +import org.apache.kafka.common.internals.TopicConstants case class DelayedStore(messageSet: Map[TopicPartition, MessageSet], callback: Map[TopicPartition, PartitionResponse] => Unit) @@ -147,9 +145,9 @@ class GroupMetadataManager(val brokerId: Int, val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId), timestamp = timestamp, magicValue = magicValue) - val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition) + val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition) partitionOpt.foreach { partition => - val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition) + val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartition) trace("Marking group %s as deleted.".format(group.groupId)) @@ -177,7 +175,7 @@ class GroupMetadataManager(val brokerId: Int, timestamp = timestamp, magicValue = magicValue) - val groupMetadataPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId)) + val groupMetadataPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId)) val groupMetadataMessageSet = Map(groupMetadataPartition -> new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message)) @@ -263,7 +261,7 @@ class GroupMetadataManager(val brokerId: Int, ) }.toSeq - val offsetTopicPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(groupId)) + val offsetTopicPartition = new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionFor(groupId)) val offsetsAndMetadataMessageSet = Map(offsetTopicPartition -> new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) @@ -351,7 +349,7 @@ class GroupMetadataManager(val brokerId: Int, */ def loadGroupsForPartition(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit) { - val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition) + val topicPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition) scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets) def loadGroupsAndOffsets() { @@ -470,7 +468,7 @@ class GroupMetadataManager(val brokerId: Int, */ def removeGroupsForPartition(offsetsPartition: Int, onGroupUnloaded: GroupMetadata => Unit) { - val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition) + val topicPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition) scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets) def removeGroupsAndOffsets() { @@ -507,10 +505,10 @@ class GroupMetadataManager(val brokerId: Int, } if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower transition." - .format(numOffsetsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition))) + .format(numOffsetsRemoved, TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition))) if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower transition." - .format(numGroupsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition))) + .format(numGroupsRemoved, TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition))) } } @@ -566,9 +564,9 @@ class GroupMetadataManager(val brokerId: Int, // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say, // if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles. tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) => - val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition) + val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition) partitionOpt.map { partition => - val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition) + val appendPartition = TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, offsetsPartition) val messages = tombstones.map(_._2).toSeq trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition)) @@ -593,7 +591,7 @@ class GroupMetadataManager(val brokerId: Int, } private def getHighWatermark(partitionId: Int): Long = { - val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, partitionId) + val partitionOpt = replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partitionId) val hw = partitionOpt.map { partition => partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L) @@ -621,7 +619,7 @@ class GroupMetadataManager(val brokerId: Int, * If the topic does not exist, the configured partition count is returned. */ private def getOffsetsTopicPartitionCount = { - val topic = GroupCoordinator.GroupMetadataTopicName + val topic = TopicConstants.GROUP_METADATA_TOPIC_NAME val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic)) if (topicData(topic).nonEmpty) topicData(topic).size @@ -630,7 +628,7 @@ class GroupMetadataManager(val brokerId: Int, } private def getMessageFormatVersionAndTimestamp(partition: Int): (Byte, Long) = { - val groupMetadataTopicAndPartition = new TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partition) + val groupMetadataTopicAndPartition = new TopicAndPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, partition) val messageFormatVersion = replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse { throw new IllegalArgumentException(s"Message format version for partition $groupMetadataTopicPartitionCount not found") } http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 452f721..0fb4d74 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -45,6 +45,7 @@ MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, Of import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.{TopicPartition, Node} +import org.apache.kafka.common.internals.TopicConstants import scala.collection._ import scala.collection.JavaConverters._ @@ -129,11 +130,11 @@ class KafkaApis(val requestChannel: RequestChannel, // this callback is invoked under the replica state change lock to ensure proper order of // leadership changes updatedLeaders.foreach { partition => - if (partition.topic == GroupCoordinator.GroupMetadataTopicName) + if (partition.topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) coordinator.handleGroupImmigration(partition.partitionId) } updatedFollowers.foreach { partition => - if (partition.topic == GroupCoordinator.GroupMetadataTopicName) + if (partition.topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) coordinator.handleGroupEmigration(partition.partitionId) } } @@ -643,12 +644,12 @@ class KafkaApis(val requestChannel: RequestChannel, Math.min(config.offsetsTopicReplicationFactor.toInt, aliveBrokers.length) else config.offsetsTopicReplicationFactor.toInt - createTopic(GroupCoordinator.GroupMetadataTopicName, config.offsetsTopicPartitions, + createTopic(TopicConstants.GROUP_METADATA_TOPIC_NAME, config.offsetsTopicPartitions, offsetsTopicReplicationFactor, coordinator.offsetsTopicConfigs) } private def getOrCreateGroupMetadataTopic(securityProtocol: SecurityProtocol): MetadataResponse.TopicMetadata = { - val topicMetadata = metadataCache.getTopicMetadata(Set(GroupCoordinator.GroupMetadataTopicName), securityProtocol) + val topicMetadata = metadataCache.getTopicMetadata(Set(TopicConstants.GROUP_METADATA_TOPIC_NAME), securityProtocol) topicMetadata.headOption.getOrElse(createGroupMetadataTopic()) } @@ -659,7 +660,7 @@ class KafkaApis(val requestChannel: RequestChannel, } else { val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet val responsesForNonExistentTopics = nonExistentTopics.map { topic => - if (topic == GroupCoordinator.GroupMetadataTopicName) { + if (topic == TopicConstants.GROUP_METADATA_TOPIC_NAME) { createGroupMetadataTopic() } else if (config.autoCreateTopicsEnable) { createTopic(topic, config.numPartitions, config.defaultReplicationFactor) http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index de58e56..f050e27 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -19,7 +19,6 @@ package kafka.server import java.io.{File, IOException} import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} - import com.yammer.metrics.core.Gauge import kafka.api._ import kafka.cluster.{Partition, Replica} @@ -38,9 +37,9 @@ import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{LeaderAndIsrRequest, StopReplicaRequest, UpdateMetadataRequest} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time => JTime} - import scala.collection._ import scala.collection.JavaConverters._ +import org.apache.kafka.common.internals.TopicConstants /* * Result metadata of a log append operation on the log @@ -395,7 +394,7 @@ class ReplicaManager(val config: KafkaConfig, BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark() // reject appending to internal topics if it is not allowed - if (Topic.InternalTopics.contains(topicPartition.topic) && !internalTopicsAllowed) { + if (TopicConstants.INTERNAL_TOPICS.contains(topicPartition.topic) && !internalTopicsAllowed) { (topicPartition, LogAppendResult( LogAppendInfo.UnknownLogAppendInfo, Some(new InvalidTopicException("Cannot append to internal topic %s".format(topicPartition.topic))))) http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index b09c541..fad7657 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -17,7 +17,6 @@ import java.net.Socket import java.nio.ByteBuffer import java.util.concurrent.ExecutionException import java.util.{ArrayList, Collections, Properties} - import kafka.cluster.EndPoint import kafka.common.TopicAndPartition import kafka.coordinator.GroupCoordinator @@ -34,10 +33,10 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.{BrokerEndPoint, TopicPartition, requests} import org.junit.Assert._ import org.junit.{After, Assert, Before, Test} - import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.Buffer +import org.apache.kafka.common.internals.TopicConstants class AuthorizerIntegrationTest extends KafkaServerTestHarness { val topic = "topic" @@ -143,7 +142,7 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness { consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT) // create the consumer offset topic - TestUtils.createTopic(zkUtils, GroupCoordinator.GroupMetadataTopicName, + TestUtils.createTopic(zkUtils, TopicConstants.GROUP_METADATA_TOPIC_NAME, 1, 1, servers, http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 684b38f..f576be5 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -13,23 +13,20 @@ package kafka.api import java.util - import kafka.coordinator.GroupCoordinator import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord} import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.{PartitionInfo, TopicPartition} - import kafka.utils.{TestUtils, Logging, ShutdownableThread} import kafka.server.KafkaConfig - import java.util.ArrayList import org.junit.Assert._ import org.junit.{Before, Test} - import scala.collection.JavaConverters._ import scala.collection.mutable.Buffer +import org.apache.kafka.common.internals.TopicConstants /** * Integration tests for the new consumer that cover basic usage as well as server failures @@ -196,7 +193,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { // get metadata for the topic var parts: Seq[PartitionInfo] = null while (parts == null) - parts = consumer0.partitionsFor(GroupCoordinator.GroupMetadataTopicName).asScala + parts = consumer0.partitionsFor(TopicConstants.GROUP_METADATA_TOPIC_NAME).asScala assertEquals(1, parts.size) assertNotNull(parts(0).leader()) http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index b4f31c4..d0680b8 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -24,10 +24,10 @@ import java.util.Properties import org.apache.kafka.clients.producer.KafkaProducer import kafka.server.KafkaConfig import kafka.integration.KafkaServerTestHarness - import org.junit.{After, Before} import scala.collection.mutable.Buffer import kafka.coordinator.GroupCoordinator +import org.apache.kafka.common.internals.TopicConstants /** * A helper class for writing integration tests that involve producers, consumers, and servers @@ -75,7 +75,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { } // create the consumer offset topic - TestUtils.createTopic(zkUtils, GroupCoordinator.GroupMetadataTopicName, + TestUtils.createTopic(zkUtils, TopicConstants.GROUP_METADATA_TOPIC_NAME, serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt, serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt, servers, http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 63a6b6f..2bb203d 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -19,7 +19,6 @@ package kafka.api import java.util.concurrent.{ExecutionException, TimeUnit, TimeoutException} import java.util.{Properties, Random} - import kafka.common.Topic import kafka.consumer.SimpleConsumer import kafka.integration.KafkaServerTestHarness @@ -31,6 +30,7 @@ import org.apache.kafka.common.KafkaException import org.apache.kafka.common.errors.{InvalidTopicException, NotEnoughReplicasAfterAppendException, NotEnoughReplicasException} import org.junit.Assert._ import org.junit.{After, Before, Test} +import org.apache.kafka.common.internals.TopicConstants class ProducerFailureHandlingTest extends KafkaServerTestHarness { private val producerBufferSize = 30000 @@ -198,7 +198,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { @Test def testCannotSendToInternalTopic() { val thrown = intercept[ExecutionException] { - producer2.send(new ProducerRecord[Array[Byte],Array[Byte]](Topic.InternalTopics.head, "test".getBytes, "test".getBytes)).get + producer2.send(new ProducerRecord[Array[Byte],Array[Byte]](TopicConstants.INTERNAL_TOPICS.iterator.next, "test".getBytes, "test".getBytes)).get } assertTrue("Unexpected exception while sending to an invalid topic " + thrown.getCause, thrown.getCause.isInstanceOf[InvalidTopicException]) } http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index b42aaf4..e0107da 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -26,6 +26,7 @@ import kafka.server.ConfigType import kafka.admin.TopicCommand.TopicCommandOptions import kafka.utils.ZkUtils._ import kafka.coordinator.GroupCoordinator +import org.apache.kafka.common.internals.TopicConstants class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest { @@ -86,12 +87,12 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT // create the offset topic val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString, "--replication-factor", "1", - "--topic", GroupCoordinator.GroupMetadataTopicName)) + "--topic", TopicConstants.GROUP_METADATA_TOPIC_NAME)) TopicCommand.createTopic(zkUtils, createOffsetTopicOpts) - // try to delete the GroupCoordinator.GroupMetadataTopicName and make sure it doesn't - val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", GroupCoordinator.GroupMetadataTopicName)) - val deleteOffsetTopicPath = getDeleteTopicPath(GroupCoordinator.GroupMetadataTopicName) + // try to delete the TopicConstants.GROUP_METADATA_TOPIC_NAME and make sure it doesn't + val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", TopicConstants.GROUP_METADATA_TOPIC_NAME)) + val deleteOffsetTopicPath = getDeleteTopicPath(TopicConstants.GROUP_METADATA_TOPIC_NAME) assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath)) intercept[AdminOperationException] { TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts) http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala index 1e8d04e..0e0a06a 100644 --- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala @@ -22,6 +22,7 @@ import org.junit.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.Test import kafka.coordinator.GroupCoordinator +import org.apache.kafka.common.internals.TopicConstants class TopicFilterTest extends JUnitSuite { @@ -37,8 +38,8 @@ class TopicFilterTest extends JUnitSuite { val topicFilter2 = new Whitelist(".+") assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true)) - assertFalse(topicFilter2.isTopicAllowed(GroupCoordinator.GroupMetadataTopicName, excludeInternalTopics = true)) - assertTrue(topicFilter2.isTopicAllowed(GroupCoordinator.GroupMetadataTopicName, excludeInternalTopics = false)) + assertFalse(topicFilter2.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = true)) + assertTrue(topicFilter2.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = false)) val topicFilter3 = new Whitelist("white_listed-topic.+") assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true)) @@ -57,8 +58,8 @@ class TopicFilterTest extends JUnitSuite { assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true)) assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false)) - assertFalse(topicFilter1.isTopicAllowed(GroupCoordinator.GroupMetadataTopicName, excludeInternalTopics = true)) - assertTrue(topicFilter1.isTopicAllowed(GroupCoordinator.GroupMetadataTopicName, excludeInternalTopics = false)) + assertFalse(topicFilter1.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = true)) + assertTrue(topicFilter1.isTopicAllowed(TopicConstants.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = false)) } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/30e78fa0/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala index 50fa09e..acdb660 100644 --- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala @@ -19,7 +19,6 @@ package kafka.coordinator import org.apache.kafka.common.record.Record import org.junit.Assert._ - import kafka.common.{OffsetAndMetadata, TopicAndPartition} import kafka.message.{Message, MessageSet} import kafka.server.{ReplicaManager, KafkaConfig} @@ -32,12 +31,11 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.easymock.{Capture, IAnswer, EasyMock} import org.junit.{After, Before, Test} import org.scalatest.junit.JUnitSuite - import java.util.concurrent.TimeUnit - import scala.collection._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future, Promise} +import org.apache.kafka.common.internals.TopicConstants /** * Test GroupCoordinator responses @@ -81,12 +79,12 @@ class GroupCoordinatorResponseTest extends JUnitSuite { // make two partitions of the group topic to make sure some partitions are not owned by the coordinator val ret = mutable.Map[String, Map[Int, Seq[Int]]]() - ret += (GroupCoordinator.GroupMetadataTopicName -> Map(0 -> Seq(1), 1 -> Seq(1))) + ret += (TopicConstants.GROUP_METADATA_TOPIC_NAME -> Map(0 -> Seq(1), 1 -> Seq(1))) replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager]) zkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) - EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(GroupCoordinator.GroupMetadataTopicName))).andReturn(ret) + EasyMock.expect(zkUtils.getPartitionAssignmentForTopics(Seq(TopicConstants.GROUP_METADATA_TOPIC_NAME))).andReturn(ret) EasyMock.replay(zkUtils) groupCoordinator = GroupCoordinator(KafkaConfig.fromProps(props), zkUtils, replicaManager, new SystemTime) @@ -834,7 +832,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]], EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( - Map(new TopicPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) -> + Map(new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP) ) )}) @@ -911,7 +909,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MessageSet]], EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( - Map(new TopicPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId) -> + Map(new TopicPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> new PartitionResponse(Errors.NONE.code, 0L, Record.NO_TIMESTAMP) ) )}) @@ -925,7 +923,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { private def leaveGroup(groupId: String, consumerId: String): LeaveGroupCallbackParams = { val (responseFuture, responseCallback) = setupHeartbeatCallback - EasyMock.expect(replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartitionId)).andReturn(None) + EasyMock.expect(replicaManager.getPartition(TopicConstants.GROUP_METADATA_TOPIC_NAME, groupPartitionId)).andReturn(None) EasyMock.expect(replicaManager.getMessageFormatVersion(EasyMock.anyObject())).andReturn(Some(Message.MagicValue_V1)).anyTimes() EasyMock.replay(replicaManager)
