MINOR: Consolidate Topic classes During the 0.11.0.0 cycle, a Java version of the class was introduced so that Streams could use it. Given that it includes the bulk of the functionality of the Scala version of the class, it makes sense to consolidate them.
While doing this, I noticed that one of the tests for the Java class (`shouldThrowOnInvalidTopicNames`) was broken as it only checked if the first topic name in the list was invalid. Author: Ismael Juma <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #3046 from ijuma/consolidate-topic-classes Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bc55f852 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bc55f852 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bc55f852 Branch: refs/heads/trunk Commit: bc55f85237cb46e73c6774298cf308060a4a739c Parents: 885643c Author: Ismael Juma <[email protected]> Authored: Mon May 15 09:10:09 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Mon May 15 09:10:09 2017 +0100 ---------------------------------------------------------------------- .../apache/kafka/common/internals/Topic.java | 66 ++++++++---- .../kafka/common/internals/TopicTest.java | 89 +++++++++------- .../src/main/scala/kafka/admin/AdminUtils.scala | 3 +- .../kafka/admin/ConsumerGroupCommand.scala | 9 +- .../main/scala/kafka/admin/TopicCommand.scala | 6 +- core/src/main/scala/kafka/common/Topic.scala | 74 -------------- .../main/scala/kafka/consumer/TopicFilter.scala | 5 +- .../coordinator/group/GroupCoordinator.scala | 5 +- .../group/GroupMetadataManager.scala | 16 +-- .../transaction/TransactionStateManager.scala | 11 +- core/src/main/scala/kafka/log/Log.scala | 4 +- .../src/main/scala/kafka/server/KafkaApis.scala | 22 ++-- .../main/scala/kafka/server/MetadataCache.scala | 3 +- .../scala/kafka/server/ReplicaManager.scala | 3 +- .../kafka/tools/StateChangeLogMerger.scala | 5 +- .../kafka/api/AuthorizerIntegrationTest.scala | 18 ++-- .../kafka/api/BaseConsumerTest.scala | 4 +- .../api/GroupCoordinatorIntegrationTest.scala | 6 +- .../kafka/api/ProducerFailureHandlingTest.scala | 4 +- ...tenersWithSameSecurityProtocolBaseTest.scala | 4 +- .../unit/kafka/admin/TopicCommandTest.scala | 12 +-- .../scala/unit/kafka/common/TopicTest.scala | 102 ------------------- .../unit/kafka/consumer/TopicFilterTest.scala | 10 +- .../group/GroupCoordinatorResponseTest.scala | 31 +++--- .../group/GroupMetadataManagerTest.scala | 42 ++++---- .../TransactionCoordinatorIntegrationTest.scala | 4 +- .../TransactionStateManagerTest.scala | 9 +- .../unit/kafka/server/MetadataRequestTest.scala | 7 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 5 +- 29 files changed, 230 insertions(+), 349 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/clients/src/main/java/org/apache/kafka/common/internals/Topic.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java index b9971d0..dd6dbcf 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java @@ -16,45 +16,67 @@ */ package org.apache.kafka.common.internals; -import java.util.regex.Matcher; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.utils.Utils; + +import java.util.Collections; +import java.util.Set; import java.util.regex.Pattern; public class Topic { - private static final String INVALID_CHARS = "[^a-zA-Z0-9._\\-]"; + public static final String GROUP_METADATA_TOPIC_NAME = "__consumer_offsets"; + public static final String TRANSACTION_STATE_TOPIC_NAME = "__transaction_state"; + public static final String LEGAL_CHARS = "[a-zA-Z0-9._-]"; + + private static final Set<String> INTERNAL_TOPICS = Collections.unmodifiableSet( + Utils.mkSet(GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME)); + private static final int MAX_NAME_LENGTH = 249; - private static final Pattern INVALID_CHARS_PATTERN = Pattern.compile(INVALID_CHARS); - private static final Pattern ONLY_PERIODS_PATTERN = Pattern.compile("^[.]+$"); + private static final Pattern LEGAL_CHARS_PATTERN = Pattern.compile(LEGAL_CHARS + "+"); public static void validate(String topic) { - if (isEmpty(topic)) - throw new org.apache.kafka.common.errors.InvalidTopicException("Topic name is illegal, can't be empty"); - else if (containsOnlyPeriods(topic)) - throw new org.apache.kafka.common.errors.InvalidTopicException("Topic name cannot be \".\" or \"..\""); - else if (exceedsMaxLength(topic)) - throw new org.apache.kafka.common.errors.InvalidTopicException("Topic name is illegal, can't be longer than " + MAX_NAME_LENGTH + " characters"); - else if (containsInvalidCharacters(topic)) throw new org.apache.kafka.common.errors.InvalidTopicException("Topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'"); + if (topic.isEmpty()) + throw new InvalidTopicException("Topic name is illegal, it can't be empty"); + if (topic.equals(".") || topic.equals("..")) + throw new InvalidTopicException("Topic name cannot be \".\" or \"..\""); + if (topic.length() > MAX_NAME_LENGTH) + throw new InvalidTopicException("Topic name is illegal, it can't be longer than " + MAX_NAME_LENGTH + + " characters, topic name: " + topic); + if (!containsValidPattern(topic)) + throw new InvalidTopicException("Topic name \"" + topic + "\" is illegal, it contains a character other than " + + "ASCII alphanumerics, '.', '_' and '-'"); } - static boolean isEmpty(String topic) { - return topic.isEmpty(); + public static boolean isInternal(String topic) { + return INTERNAL_TOPICS.contains(topic); } - static boolean containsOnlyPeriods(String topic) { - Matcher matcher = ONLY_PERIODS_PATTERN.matcher(topic); - return matcher.find(); + /** + * Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. + * + * @param topic The topic to check for colliding character + * @return true if the topic has collision characters + */ + public static boolean hasCollisionChars(String topic) { + return topic.contains("_") || topic.contains("."); } - static boolean exceedsMaxLength(String topic) { - return topic.length() > MAX_NAME_LENGTH; + /** + * Returns true if the topicNames collide due to a period ('.') or underscore ('_') in the same position. + * + * @param topicA A topic to check for collision + * @param topicB A topic to check for collision + * @return true if the topics collide + */ + public static boolean hasCollision(String topicA, String topicB) { + return topicA.replace('.', '_').equals(topicB.replace('.', '_')); } /** * Valid characters for Kafka topics are the ASCII alphanumerics, '.', '_', and '-' */ - static boolean containsInvalidCharacters(String topic) { - Matcher matcher = INVALID_CHARS_PATTERN.matcher(topic); - return matcher.find(); + static boolean containsValidPattern(String topic) { + return LEGAL_CHARS_PATTERN.matcher(topic).matches(); } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java b/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java index 36ff521..f7475c4 100644 --- a/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java +++ b/clients/src/test/java/org/apache/kafka/common/internals/TopicTest.java @@ -17,20 +17,23 @@ package org.apache.kafka.common.internals; import org.apache.kafka.common.errors.InvalidTopicException; -import org.junit.Rule; +import org.apache.kafka.test.TestUtils; import org.junit.Test; -import org.junit.rules.ExpectedException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class TopicTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); - @Test - public void shouldRecognizeValidTopicNames() { - String[] validTopicNames = {"valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_."}; + public void shouldAcceptValidTopicNames() { + String maxLengthString = TestUtils.randomString(249); + String[] validTopicNames = {"valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_.", "...", maxLengthString}; for (String topicName : validTopicNames) { Topic.validate(topicName); @@ -39,46 +42,64 @@ public class TopicTest { @Test public void shouldThrowOnInvalidTopicNames() { - String[] invalidTopicNames = {"", "foo bar", "..", "foo:bar", "foo=bar"}; + char[] longString = new char[250]; + Arrays.fill(longString, 'a'); + String[] invalidTopicNames = {"", "foo bar", "..", "foo:bar", "foo=bar", ".", new String(longString)}; for (String topicName : invalidTopicNames) { - thrown.expect(InvalidTopicException.class); - Topic.validate(topicName); + try { + Topic.validate(topicName); + fail("No exception was thrown for topic with invalid name: " + topicName); + } catch (InvalidTopicException e) { + // Good + } } } @Test - public void shouldRecognizeEmptyTopicNames() { - assertTrue(Topic.isEmpty("")); - } - - @Test - public void shouldRecognizeTopicNamesThatExceedMaxLength() { - String longName = "ATCG"; + public void shouldRecognizeInvalidCharactersInTopicNames() { + char[] invalidChars = {'/', '\\', ',', '\u0000', ':', '"', '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '='}; - for (int i = 0; i < 6; i++) { - longName += longName; + for (char c : invalidChars) { + String topicName = "Is " + c + "illegal"; + assertFalse(Topic.containsValidPattern(topicName)); } - - assertTrue(Topic.exceedsMaxLength(longName)); } @Test - public void shouldRecognizeInvalidCharactersInTopicNames() { - Character[] invalidChars = {'/', '\\', ',', '\u0000', ':', '"', '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '='}; - - for (Character c : invalidChars) { - String topicName = "Is " + c + "illegal"; - assertTrue(Topic.containsInvalidCharacters(topicName)); - } + public void testTopicHasCollisionChars() { + List<String> falseTopics = Arrays.asList("start", "end", "middle", "many"); + List<String> trueTopics = Arrays.asList( + ".start", "end.", "mid.dle", ".ma.ny.", + "_start", "end_", "mid_dle", "_ma_ny." + ); + + for (String topic : falseTopics) + assertFalse(Topic.hasCollisionChars(topic)); + + for (String topic : trueTopics) + assertTrue(Topic.hasCollisionChars(topic)); } @Test - public void shouldRecognizeTopicNamesThatContainOnlyPeriods() { - String[] invalidTopicNames = {".", "..", "...."}; + public void testTopicHasCollision() { + List<String> periodFirstMiddleLastNone = Arrays.asList(".topic", "to.pic", "topic.", "topic"); + List<String> underscoreFirstMiddleLastNone = Arrays.asList("_topic", "to_pic", "topic_", "topic"); - for (String topicName : invalidTopicNames) { - assertTrue(Topic.containsOnlyPeriods(topicName)); - } + // Self + for (String topic : periodFirstMiddleLastNone) + assertTrue(Topic.hasCollision(topic, topic)); + + for (String topic : underscoreFirstMiddleLastNone) + assertTrue(Topic.hasCollision(topic, topic)); + + // Same Position + for (int i = 0; i < periodFirstMiddleLastNone.size(); ++i) + assertTrue(Topic.hasCollision(periodFirstMiddleLastNone.get(i), underscoreFirstMiddleLastNone.get(i))); + + // Different Position + Collections.reverse(underscoreFirstMiddleLastNone); + for (int i = 0; i < periodFirstMiddleLastNone.size(); ++i) + assertFalse(Topic.hasCollision(periodFirstMiddleLastNone.get(i), underscoreFirstMiddleLastNone.get(i))); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index ca81af3..49d249b 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -17,7 +17,6 @@ package kafka.admin -import kafka.common._ import kafka.cluster.Broker import kafka.log.LogConfig import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig} @@ -26,6 +25,7 @@ import kafka.utils.ZkUtils._ import java.util.Random import java.util.Properties +import kafka.common.TopicAlreadyMarkedForDeletionException import org.apache.kafka.common.Node import org.apache.kafka.common.errors.{InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidReplicationFactorException, InvalidTopicException, LeaderNotAvailableException, ReplicaNotAvailableException, TopicExistsException, UnknownTopicOrPartitionException} import org.apache.kafka.common.network.ListenerName @@ -39,6 +39,7 @@ import scala.collection.mutable import collection.Map import collection.Set import org.I0Itec.zkclient.exception.ZkNodeExistsException +import org.apache.kafka.common.internals.Topic trait AdminUtilities { def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index caad62a..dd7a477 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -20,18 +20,17 @@ package kafka.admin import java.util.Properties import joptsimple.{OptionParser, OptionSpec} - import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo} import kafka.client.ClientUtils -import kafka.common.{TopicAndPartition, _} +import kafka.common.{OffsetMetadataAndError, TopicAndPartition} import kafka.consumer.SimpleConsumer import kafka.utils._ - import org.I0Itec.zkclient.exception.ZkNoNodeException import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} -import org.apache.kafka.common.Node -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.BrokerNotAvailableException +import org.apache.kafka.common.{KafkaException, Node, TopicPartition} +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} import org.apache.kafka.common.security.JaasUtils http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/admin/TopicCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 57dfc5a..942d70e 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -18,8 +18,9 @@ package kafka.admin import java.util.Properties + import joptsimple._ -import kafka.common.{AdminCommandFailedException, Topic} +import kafka.common.AdminCommandFailedException import kafka.consumer.Whitelist import kafka.log.LogConfig import kafka.server.ConfigType @@ -27,6 +28,7 @@ import kafka.utils.ZkUtils._ import kafka.utils._ import org.I0Itec.zkclient.exception.ZkNodeExistsException import org.apache.kafka.common.errors.TopicExistsException +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.Utils @@ -135,7 +137,7 @@ object TopicCommand extends Logging { } if(opts.options.has(opts.partitionsOpt)) { - if (topic == Topic.GroupMetadataTopicName) { + if (topic == Topic.GROUP_METADATA_TOPIC_NAME) { throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.") } println("WARNING: If partitions are increased for a topic that has a key, the partition " + http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/common/Topic.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala deleted file mode 100644 index 6ca7175..0000000 --- a/core/src/main/scala/kafka/common/Topic.scala +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -import util.matching.Regex - -import scala.collection.immutable - -object Topic { - - val GroupMetadataTopicName = "__consumer_offsets" - val TransactionStateTopicName = "__transaction_state" - val InternalTopics = immutable.Set(GroupMetadataTopicName, TransactionStateTopicName) - - val legalChars = "[a-zA-Z0-9\\._\\-]" - private val maxNameLength = 249 - private val rgx = new Regex(legalChars + "+") - - def validate(topic: String) { - if (topic.length <= 0) - throw new org.apache.kafka.common.errors.InvalidTopicException("topic name is illegal, can't be empty") - else if (topic.equals(".") || topic.equals("..")) - throw new org.apache.kafka.common.errors.InvalidTopicException("topic name cannot be \".\" or \"..\"") - else if (topic.length > maxNameLength) - throw new org.apache.kafka.common.errors.InvalidTopicException("topic name is illegal, can't be longer than " + maxNameLength + " characters") - - rgx.findFirstIn(topic) match { - case Some(t) => - if (!t.equals(topic)) - throw new org.apache.kafka.common.errors.InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'") - case None => throw new org.apache.kafka.common.errors.InvalidTopicException("topic name " + topic + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'") - } - } - - /** - * Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. - * - * @param topic The topic to check for colliding character - * @return true if the topic has collision characters - */ - def hasCollisionChars(topic: String): Boolean = { - topic.contains("_") || topic.contains(".") - } - - /** - * Returns true if the topicNames collide due to a period ('.') or underscore ('_') in the same position. - * - * @param topicA A topic to check for collision - * @param topicB A topic to check for collision - * @return true if the topics collide - */ - def hasCollision(topicA: String, topicB: String): Boolean = { - topicA.replace('.', '_') == topicB.replace('.', '_') - } - - def isInternal(topic: String): Boolean = - InternalTopics.contains(topic) - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/consumer/TopicFilter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala index 914e9b9..69d7455 100644 --- a/core/src/main/scala/kafka/consumer/TopicFilter.scala +++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala @@ -18,8 +18,9 @@ package kafka.consumer import kafka.utils.Logging -import java.util.regex.{PatternSyntaxException, Pattern} -import kafka.common.Topic +import java.util.regex.{Pattern, PatternSyntaxException} + +import org.apache.kafka.common.internals.Topic sealed abstract class TopicFilter(rawRegex: String) extends Logging { http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index a57b6be..031a9c1 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -19,12 +19,13 @@ package kafka.coordinator.group import java.util.Properties import java.util.concurrent.atomic.AtomicBoolean -import kafka.common.{OffsetAndMetadata, Topic} +import kafka.common.OffsetAndMetadata import kafka.log.LogConfig import kafka.message.ProducerCompressionCodec import kafka.server._ import kafka.utils._ import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetFetchResponse, TransactionResult} @@ -428,7 +429,7 @@ class GroupCoordinator(val brokerId: Int, def handleTxnCompletion(producerId: Long, topicPartitions: Seq[TopicPartition], transactionResult: TransactionResult) { - val offsetPartitions = topicPartitions.filter(_.topic() == Topic.GroupMetadataTopicName).map(_.partition).toSet + val offsetPartitions = topicPartitions.filter(_.topic == Topic.GROUP_METADATA_TOPIC_NAME).map(_.partition).toSet groupManager.handleTxnCompletion(producerId, offsetPartitions, transactionResult == TransactionResult.COMMIT) } http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 222bcdc..74a3f7b 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -27,13 +27,13 @@ import java.util.concurrent.locks.ReentrantLock import com.yammer.metrics.core.Gauge import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0} import kafka.common.{MessageFormatter, _} -import kafka.coordinator.group._ import kafka.metrics.KafkaMetricsGroup import kafka.server.ReplicaManager import kafka.utils.CoreUtils.inLock import kafka.utils._ import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.types.Type._ import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct} @@ -171,7 +171,7 @@ class GroupMetadataManager(brokerId: Int, builder.build() } - val groupMetadataPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId)) + val groupMetadataPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId)) val groupMetadataRecords = Map(groupMetadataPartition -> records) val generationId = group.generationId @@ -284,7 +284,7 @@ class GroupMetadataManager(brokerId: Int, val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata) new SimpleRecord(timestamp, key, value) } - val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, partitionFor(group.groupId)) + val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId)) val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType, records.asJava)) if (isTxnOffsetCommit && magicValue < RecordBatch.MAGIC_VALUE_V2) @@ -431,7 +431,7 @@ class GroupMetadataManager(brokerId: Int, * Asynchronously read the partition from the offsets topic and populate the cache */ def loadGroupsForPartition(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit) { - val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition) + val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition) def doLoadGroupsAndOffsets() { info(s"Loading offsets and group metadata from $topicPartition") @@ -632,7 +632,7 @@ class GroupMetadataManager(brokerId: Int, */ def removeGroupsForPartition(offsetsPartition: Int, onGroupUnloaded: GroupMetadata => Unit) { - val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition) + val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition) scheduler.schedule(topicPartition.toString, removeGroupsAndOffsets _) def removeGroupsAndOffsets() { @@ -687,7 +687,7 @@ class GroupMetadataManager(brokerId: Int, } val offsetsPartition = partitionFor(groupId) - val appendPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition) + val appendPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition) getMagic(offsetsPartition) match { case Some(magicValue) => // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically. @@ -802,7 +802,7 @@ class GroupMetadataManager(brokerId: Int, * If the topic does not exist, the configured partition count is returned. */ private def getGroupMetadataTopicPartitionCount: Int = { - zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName).getOrElse(config.offsetsTopicNumPartitions) + zkUtils.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(config.offsetsTopicNumPartitions) } /** @@ -812,7 +812,7 @@ class GroupMetadataManager(brokerId: Int, * @return Some(MessageFormatVersion) if replica is local, None otherwise */ private def getMagic(partition: Int): Option[Byte] = - replicaManager.getMagic(new TopicPartition(Topic.GroupMetadataTopicName, partition)) + replicaManager.getMagic(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partition)) /** * Add the partition into the owned list http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index 7a03fc3..1106e7c 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -22,13 +22,14 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantLock -import kafka.common.{KafkaException, Topic} +import kafka.common.KafkaException import kafka.log.LogConfig import kafka.message.UncompressedCodec import kafka.server.ReplicaManager import kafka.utils.CoreUtils.inLock import kafka.utils.{Logging, Pool, Scheduler, ZkUtils} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord} import org.apache.kafka.common.requests.IsolationLevel @@ -182,7 +183,7 @@ class TransactionStateManager(brokerId: Int, * If the topic does not exist, the default partition count is returned. */ private def getTransactionTopicPartitionCount: Int = { - zkUtils.getTopicPartitionCount(Topic.TransactionStateTopicName).getOrElse(config.transactionLogNumPartitions) + zkUtils.getTopicPartitionCount(Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(config.transactionLogNumPartitions) } private def loadTransactionMetadata(topicPartition: TopicPartition, coordinatorEpoch: Int): Pool[String, TransactionMetadata] = { @@ -274,7 +275,7 @@ class TransactionStateManager(brokerId: Int, def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback) { validateTransactionTopicPartitionCountIsStable() - val topicPartition = new TopicPartition(Topic.TransactionStateTopicName, partitionId) + val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId) inLock(stateLock) { loadingPartitions.add(partitionId) @@ -320,7 +321,7 @@ class TransactionStateManager(brokerId: Int, def removeTransactionsForTxnTopicPartition(partitionId: Int) { validateTransactionTopicPartitionCountIsStable() - val topicPartition = new TopicPartition(Topic.TransactionStateTopicName, partitionId) + val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionId) def removeTransactions() { inLock(stateLock) { @@ -359,7 +360,7 @@ class TransactionStateManager(brokerId: Int, val records = MemoryRecords.withRecords(TransactionLog.EnforcedCompressionType, new SimpleRecord(timestamp, keyBytes, valueBytes)) - val topicPartition = new TopicPartition(Topic.TransactionStateTopicName, partitionFor(transactionalId)) + val topicPartition = new TopicPartition(Topic.TRANSACTION_STATE_TOPIC_NAME, partitionFor(transactionalId)) val recordsPerPartition = Map(topicPartition -> records) // set the callback function to update transaction status in cache after log append completed http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/log/Log.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index ed4eff2..7203033 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -45,6 +45,8 @@ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import java.util.Map.{Entry => JEntry} import java.lang.{Long => JLong} +import org.apache.kafka.common.internals.Topic + object LogAppendInfo { val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false) @@ -778,7 +780,7 @@ class Log(@volatile var dir: File, loadingFromLog: Boolean): Unit = { val pid = batch.producerId val appendInfo = producers.getOrElseUpdate(pid, new ProducerAppendInfo(pid, lastEntry, loadingFromLog)) - val shouldValidateSequenceNumbers = topicPartition.topic() != Topic.GroupMetadataTopicName + val shouldValidateSequenceNumbers = topicPartition.topic != Topic.GROUP_METADATA_TOPIC_NAME val maybeCompletedTxn = appendInfo.append(batch, shouldValidateSequenceNumbers) maybeCompletedTxn.foreach(completedTxns += _) } http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1f8bea5..5f1a2d5 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -28,7 +28,6 @@ import kafka.admin.{AdminUtils, RackAwareMode} import kafka.api.{ControlledShutdownRequest, ControlledShutdownResponse} import kafka.cluster.Partition import kafka.common.{KafkaStorageException, OffsetAndMetadata, OffsetMetadata, TopicAndPartition} -import kafka.common.Topic.{GroupMetadataTopicName, TransactionStateTopicName, isInternal} import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} import kafka.controller.KafkaController import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult} @@ -40,6 +39,7 @@ import kafka.security.auth.{Authorizer, ClusterAction, Create, Delete, Describe, import kafka.utils.{Exit, Logging, ZKGroupTopicDirs, ZkUtils} import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidRequestException, NotLeaderForPartitionException, TopicExistsException, UnknownTopicOrPartitionException, UnsupportedForMessageFormatException} import org.apache.kafka.common.internals.FatalExitError +import org.apache.kafka.common.internals.Topic.{isInternal, GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol} @@ -139,16 +139,16 @@ class KafkaApis(val requestChannel: RequestChannel, // this callback is invoked under the replica state change lock to ensure proper order of // leadership changes updatedLeaders.foreach { partition => - if (partition.topic == GroupMetadataTopicName) + if (partition.topic == GROUP_METADATA_TOPIC_NAME) groupCoordinator.handleGroupImmigration(partition.partitionId) - else if (partition.topic == TransactionStateTopicName) + else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME) txnCoordinator.handleTxnImmigration(partition.partitionId, partition.getLeaderEpoch) } updatedFollowers.foreach { partition => - if (partition.topic == GroupMetadataTopicName) + if (partition.topic == GROUP_METADATA_TOPIC_NAME) groupCoordinator.handleGroupEmigration(partition.partitionId) - else if (partition.topic == TransactionStateTopicName) + else if (partition.topic == TRANSACTION_STATE_TOPIC_NAME) txnCoordinator.handleTxnEmigration(partition.partitionId) } } @@ -185,7 +185,7 @@ class KafkaApis(val requestChannel: RequestChannel, // request to become a follower due to which cache for groups that belong to an offsets topic partition for which broker 1 was the leader, // is not cleared. result.foreach { case (topicPartition, error) => - if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == GroupMetadataTopicName) { + if (error == Errors.NONE && stopReplicaRequest.deletePartitions() && topicPartition.topic == GROUP_METADATA_TOPIC_NAME) { groupCoordinator.handleGroupEmigration(topicPartition.partition) } } @@ -816,7 +816,7 @@ class KafkaApis(val requestChannel: RequestChannel, val aliveBrokers = metadataCache.getAliveBrokers topic match { - case GroupMetadataTopicName => + case GROUP_METADATA_TOPIC_NAME => if (aliveBrokers.size < config.offsetsTopicReplicationFactor) { error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " + s"'${config.offsetsTopicReplicationFactor}' for the offsets topic (configured via " + @@ -827,7 +827,7 @@ class KafkaApis(val requestChannel: RequestChannel, createTopic(topic, config.offsetsTopicPartitions, config.offsetsTopicReplicationFactor.toInt, groupCoordinator.offsetsTopicConfigs) } - case TransactionStateTopicName => + case TRANSACTION_STATE_TOPIC_NAME => if (aliveBrokers.size < config.transactionTopicReplicationFactor) { error(s"Number of alive brokers '${aliveBrokers.size}' does not meet the required replication factor " + s"'${config.transactionTopicReplicationFactor}' for the transactions state topic (configured via " + @@ -1037,12 +1037,12 @@ class KafkaApis(val requestChannel: RequestChannel, val (partition, topicMetadata) = findCoordinatorRequest.coordinatorType match { case FindCoordinatorRequest.CoordinatorType.GROUP => val partition = groupCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey) - val metadata = getOrCreateInternalTopic(GroupMetadataTopicName, request.listenerName) + val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, request.listenerName) (partition, metadata) case FindCoordinatorRequest.CoordinatorType.TRANSACTION => val partition = txnCoordinator.partitionFor(findCoordinatorRequest.coordinatorKey) - val metadata = getOrCreateInternalTopic(TransactionStateTopicName, request.listenerName) + val metadata = getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, request.listenerName) (partition, metadata) case _ => @@ -1504,7 +1504,7 @@ class KafkaApis(val requestChannel: RequestChannel, val addOffsetsToTxnRequest = request.body[AddOffsetsToTxnRequest] val transactionalId = addOffsetsToTxnRequest.transactionalId val groupId = addOffsetsToTxnRequest.consumerGroupId - val offsetTopicPartition = new TopicPartition(GroupMetadataTopicName, groupCoordinator.partitionFor(groupId)) + val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) // Send response callback def sendResponseCallback(error: Errors): Unit = { http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/server/MetadataCache.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 2e4c19a..4e1cd37 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -23,10 +23,11 @@ import scala.collection.{Seq, Set, mutable} import scala.collection.JavaConverters._ import kafka.cluster.{Broker, EndPoint} import kafka.api._ -import kafka.common.{BrokerEndPointNotAvailableException, Topic, TopicAndPartition} +import kafka.common.{BrokerEndPointNotAvailableException, TopicAndPartition} import kafka.controller.{KafkaController, LeaderIsrAndControllerEpoch} import kafka.utils.CoreUtils._ import kafka.utils.Logging +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 9cd92f7..b3d1d32 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -46,7 +46,8 @@ import scala.collection._ import scala.collection.JavaConverters._ import java.util.{Map => JMap} -import kafka.common.{KafkaStorageException, Topic} +import kafka.common.KafkaStorageException +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.requests.EpochEndOffset._ http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala index 0bd749a..b4b3722 100755 --- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala +++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala @@ -25,10 +25,11 @@ import java.util.Date import java.text.SimpleDateFormat import kafka.utils.{CommandLineUtils, CoreUtils, Exit, Logging} -import kafka.common.Topic import java.io.{BufferedOutputStream, OutputStream} import java.nio.charset.StandardCharsets +import org.apache.kafka.common.internals.Topic + /** * A utility that merges the state change logs (possibly obtained from different brokers and over multiple days). * @@ -47,7 +48,7 @@ import java.nio.charset.StandardCharsets object StateChangeLogMerger extends Logging { val dateFormatString = "yyyy-MM-dd HH:mm:ss,SSS" - val topicPartitionRegex = new Regex("\\[(" + Topic.legalChars + "+),( )*([0-9]+)\\]") + val topicPartitionRegex = new Regex("\\[(" + Topic.LEGAL_CHARS + "+),( )*([0-9]+)\\]") val dateRegex = new Regex("[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}") val dateFormat = new SimpleDateFormat(dateFormatString) var files: List[String] = List() http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 4277d26..52a90d8 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -18,7 +18,6 @@ import java.util.concurrent.ExecutionException import java.util.regex.Pattern import java.util.{ArrayList, Collections, Properties} -import kafka.common import kafka.common.TopicAndPartition import kafka.security.auth._ import kafka.server.{BaseRequestTest, KafkaConfig} @@ -27,6 +26,7 @@ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.errors._ +import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} import org.apache.kafka.common.requests._ import CreateTopicsRequest.TopicDetails @@ -162,7 +162,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { consumers += TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT) // create the consumer offset topic - TestUtils.createTopic(zkUtils, common.Topic.GroupMetadataTopicName, + TestUtils.createTopic(zkUtils, GROUP_METADATA_TOPIC_NAME, 1, 1, servers, @@ -563,8 +563,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { // set the subscription pattern to an internal topic that the consumer has read permission to. Since // internal topics are not included, we should not be assigned any partitions from this topic - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), new Resource(Topic, kafka.common.Topic.GroupMetadataTopicName)) - consumer.subscribe(Pattern.compile(kafka.common.Topic.GroupMetadataTopicName), new NoOpConsumerRebalanceListener) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), new Resource(Topic, + GROUP_METADATA_TOPIC_NAME)) + consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME), new NoOpConsumerRebalanceListener) consumer.poll(0) assertTrue(consumer.subscription().isEmpty) assertTrue(consumer.assignment().isEmpty) @@ -590,10 +591,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest { assertEquals(Set(topic).asJava, consumer.subscription) // now authorize the user for the internal topic and verify that we can subscribe - addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), Resource(Topic, kafka.common.Topic.GroupMetadataTopicName)) - consumer.subscribe(Pattern.compile(kafka.common.Topic.GroupMetadataTopicName), new NoOpConsumerRebalanceListener) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), Resource(Topic, + GROUP_METADATA_TOPIC_NAME)) + consumer.subscribe(Pattern.compile(GROUP_METADATA_TOPIC_NAME), new NoOpConsumerRebalanceListener) consumer.poll(0) - assertEquals(Set(kafka.common.Topic.GroupMetadataTopicName), consumer.subscription.asScala) + assertEquals(Set(GROUP_METADATA_TOPIC_NAME), consumer.subscription.asScala) } finally consumer.close() } @@ -605,7 +607,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) - val internalTopicResource = new Resource(Topic, kafka.common.Topic.GroupMetadataTopicName) + val internalTopicResource = new Resource(Topic, GROUP_METADATA_TOPIC_NAME) addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), internalTopicResource) val consumerConfig = new Properties http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 8d394c6..992e74a 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -20,7 +20,6 @@ import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.{PartitionInfo, TopicPartition} import kafka.utils.{Logging, ShutdownableThread, TestUtils} -import kafka.common.Topic import kafka.server.KafkaConfig import org.junit.Assert._ import org.junit.{Before, Test} @@ -29,6 +28,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, Buffer} import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.common.errors.WakeupException +import org.apache.kafka.common.internals.Topic /** * Integration tests for the new consumer that cover basic usage as well as server failures @@ -103,7 +103,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness { // get metadata for the topic var parts: Seq[PartitionInfo] = null while (parts == null) - parts = consumer0.partitionsFor(Topic.GroupMetadataTopicName).asScala + parts = consumer0.partitionsFor(Topic.GROUP_METADATA_TOPIC_NAME).asScala assertEquals(1, parts.size) assertNotNull(parts.head.leader()) http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala index 09b715d..fd588de 100644 --- a/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala @@ -12,7 +12,6 @@ */ package kafka.api -import kafka.common.Topic import kafka.integration.KafkaServerTestHarness import kafka.log.Log import kafka.server.KafkaConfig @@ -26,6 +25,7 @@ import org.junit.Assert._ import scala.collection.JavaConverters._ import java.util.Properties +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.record.CompressionType class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness { @@ -43,13 +43,13 @@ class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness { val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), securityProtocol = SecurityProtocol.PLAINTEXT) val offsetMap = Map( - new TopicPartition(Topic.GroupMetadataTopicName, 0) -> new OffsetAndMetadata(10, "") + new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) -> new OffsetAndMetadata(10, "") ).asJava consumer.commitSync(offsetMap) val logManager = servers.head.getLogManager def getGroupMetadataLogOpt: Option[Log] = - logManager.getLog(new TopicPartition(Topic.GroupMetadataTopicName, 0)) + logManager.getLog(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)) TestUtils.waitUntilTrue(() => getGroupMetadataLogOpt.exists(_.logSegments.exists(_.log.batches.asScala.nonEmpty)), "Commit message not appended in time") http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 7372ac4..0c44ca9 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -20,7 +20,6 @@ package kafka.api import java.util.concurrent.{ExecutionException, TimeoutException} import java.util.Properties -import kafka.common.Topic import kafka.integration.KafkaServerTestHarness import kafka.log.LogConfig import kafka.server.KafkaConfig @@ -28,6 +27,7 @@ import kafka.utils.TestUtils import org.apache.kafka.clients.producer._ import org.apache.kafka.common.KafkaException import org.apache.kafka.common.errors._ +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch} import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -226,7 +226,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { def testCannotSendToInternalTopic() { TestUtils.createOffsetsTopic(zkUtils, servers) val thrown = intercept[ExecutionException] { - producer2.send(new ProducerRecord(Topic.GroupMetadataTopicName, "test".getBytes, "test".getBytes)).get + producer2.send(new ProducerRecord(Topic.GROUP_METADATA_TOPIC_NAME, "test".getBytes, "test".getBytes)).get } assertTrue("Unexpected exception while sending to an invalid topic " + thrown.getCause, thrown.getCause.isInstanceOf[InvalidTopicException]) } http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala index 7db9d7c..4d879d0 100644 --- a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala +++ b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala @@ -23,7 +23,6 @@ import java.util.{Collections, Properties} import java.util.concurrent.TimeUnit import kafka.api.SaslSetup -import kafka.common.Topic import kafka.coordinator.group.OffsetConfig import kafka.utils.JaasTestUtils.JaasSection import kafka.utils.TestUtils @@ -31,6 +30,7 @@ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.config.SslConfigs +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.network.{ListenerName, Mode} import org.junit.Assert.assertEquals import org.junit.{After, Before, Test} @@ -103,7 +103,7 @@ abstract class MultipleListenersWithSameSecurityProtocolBaseTest extends ZooKeep Internal, config.interBrokerListenerName.value) } - TestUtils.createTopic(zkUtils, Topic.GroupMetadataTopicName, OffsetConfig.DefaultOffsetsTopicNumPartitions, + TestUtils.createTopic(zkUtils, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions, replicationFactor = 2, servers, servers.head.groupCoordinator.offsetsTopicConfigs) servers.head.config.listeners.foreach { endPoint => http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala index 5215867..ad6cfa5 100644 --- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala @@ -18,7 +18,6 @@ package kafka.admin import org.junit.Assert._ import org.junit.Test -import kafka.common.Topic import kafka.utils.Logging import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness @@ -26,6 +25,7 @@ import kafka.server.ConfigType import kafka.admin.TopicCommand.TopicCommandOptions import kafka.utils.ZkUtils._ import org.apache.kafka.common.errors.TopicExistsException +import org.apache.kafka.common.internals.Topic class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareTest { @@ -86,15 +86,15 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT // create the offset topic val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString, "--replication-factor", "1", - "--topic", Topic.GroupMetadataTopicName)) + "--topic", Topic.GROUP_METADATA_TOPIC_NAME)) TopicCommand.createTopic(zkUtils, createOffsetTopicOpts) - // try to delete the Topic.GroupMetadataTopicName and make sure it doesn't - val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", Topic.GroupMetadataTopicName)) - val deleteOffsetTopicPath = getDeleteTopicPath(Topic.GroupMetadataTopicName) + // try to delete the Topic.GROUP_METADATA_TOPIC_NAME and make sure it doesn't + val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", Topic.GROUP_METADATA_TOPIC_NAME)) + val deleteOffsetTopicPath = getDeleteTopicPath(Topic.GROUP_METADATA_TOPIC_NAME) assertFalse("Delete path for topic shouldn't exist before deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath)) intercept[AdminOperationException] { - TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts) + TopicCommand.deleteTopic(zkUtils, deleteOffsetTopicOpts) } assertFalse("Delete path for topic shouldn't exist after deletion.", zkUtils.zkClient.exists(deleteOffsetTopicPath)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/unit/kafka/common/TopicTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/common/TopicTest.scala b/core/src/test/scala/unit/kafka/common/TopicTest.scala deleted file mode 100644 index caab674..0000000 --- a/core/src/test/scala/unit/kafka/common/TopicTest.scala +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -import org.junit.Assert._ -import collection.mutable.ArrayBuffer -import org.junit.Test - -class TopicTest { - - @Test - def testInvalidTopicNames() { - val invalidTopicNames = new ArrayBuffer[String]() - invalidTopicNames += ("", ".", "..") - var longName = "ATCG" - for (_ <- 1 to 6) - longName += longName - invalidTopicNames += longName - invalidTopicNames += longName.drop(6) - val badChars = Array('/', '\\', ',', '\u0000', ':', "\"", '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '=') - for (weirdChar <- badChars) { - invalidTopicNames += "Is" + weirdChar + "illegal" - } - - for (i <- invalidTopicNames.indices) { - try { - Topic.validate(invalidTopicNames(i)) - fail("Should throw InvalidTopicException.") - } - catch { - case _: org.apache.kafka.common.errors.InvalidTopicException => // This is good. - } - } - - val validTopicNames = new ArrayBuffer[String]() - validTopicNames += ("valid", "TOPIC", "nAmEs", "ar6", "VaL1d", "_0-9_.", longName.drop(7)) - for (i <- validTopicNames.indices) { - try { - Topic.validate(validTopicNames(i)) - } - catch { - case _: Exception => fail("Should not throw exception.") - } - } - } - - @Test - def testTopicHasCollisionChars() = { - val falseTopics = List("start", "end", "middle", "many") - val trueTopics = List( - ".start", "end.", "mid.dle", ".ma.ny.", - "_start", "end_", "mid_dle", "_ma_ny." - ) - - falseTopics.foreach( t => - assertFalse(Topic.hasCollisionChars(t)) - ) - - trueTopics.foreach( t => - assertTrue(Topic.hasCollisionChars(t)) - ) - } - - @Test - def testTopicHasCollision() = { - val periodFirstMiddleLastNone = List(".topic", "to.pic", "topic.", "topic") - val underscoreFirstMiddleLastNone = List("_topic", "to_pic", "topic_", "topic") - - // Self - periodFirstMiddleLastNone.foreach { t => - assertTrue(Topic.hasCollision(t, t)) - } - underscoreFirstMiddleLastNone.foreach { t => - assertTrue(Topic.hasCollision(t, t)) - } - - // Same Position - periodFirstMiddleLastNone.zip(underscoreFirstMiddleLastNone).foreach { case (t1, t2) => - assertTrue(Topic.hasCollision(t1, t2)) - } - - // Different Position - periodFirstMiddleLastNone.zip(underscoreFirstMiddleLastNone.reverse).foreach { case (t1, t2) => - assertFalse(Topic.hasCollision(t1, t2)) - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala index b1e1274..8de4a89 100644 --- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala @@ -18,10 +18,10 @@ package kafka.consumer +import org.apache.kafka.common.internals.Topic import org.junit.Assert._ import org.scalatest.junit.JUnitSuite import org.junit.Test -import kafka.common.Topic class TopicFilterTest extends JUnitSuite { @@ -36,8 +36,8 @@ class TopicFilterTest extends JUnitSuite { val topicFilter2 = Whitelist(".+") assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true)) - assertFalse(topicFilter2.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = true)) - assertTrue(topicFilter2.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = false)) + assertFalse(topicFilter2.isTopicAllowed(Topic.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = true)) + assertTrue(topicFilter2.isTopicAllowed(Topic.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = false)) val topicFilter3 = Whitelist("white_listed-topic.+") assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true)) @@ -56,8 +56,8 @@ class TopicFilterTest extends JUnitSuite { assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true)) assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false)) - assertFalse(topicFilter1.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = true)) - assertTrue(topicFilter1.isTopicAllowed(Topic.GroupMetadataTopicName, excludeInternalTopics = false)) + assertFalse(topicFilter1.isTopicAllowed(Topic.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = true)) + assertTrue(topicFilter1.isTopicAllowed(Topic.GROUP_METADATA_TOPIC_NAME, excludeInternalTopics = false)) } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala index bfc9be2..0ace2e7 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorResponseTest.scala @@ -18,18 +18,19 @@ package kafka.coordinator.group -import kafka.common.{OffsetAndMetadata, Topic} +import kafka.common.OffsetAndMetadata import kafka.server.{DelayedOperationPurgatory, KafkaConfig, ReplicaManager} import kafka.utils._ import kafka.utils.timer.MockTimer import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.record.{RecordBatch, MemoryRecords, TimestampType} +import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, TimestampType} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse, TransactionResult} import org.easymock.{Capture, EasyMock, IAnswer} import java.util.concurrent.TimeUnit +import org.apache.kafka.common.internals.Topic import org.junit.Assert._ import org.junit.{After, Assert, Before, Test} import org.scalatest.junit.JUnitSuite @@ -83,13 +84,13 @@ class GroupCoordinatorResponseTest extends JUnitSuite { props.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, GroupInitialRebalanceDelay.toString) // make two partitions of the group topic to make sure some partitions are not owned by the coordinator val ret = mutable.Map[String, Map[Int, Seq[Int]]]() - ret += (Topic.GroupMetadataTopicName -> Map(0 -> Seq(1), 1 -> Seq(1))) + ret += (Topic.GROUP_METADATA_TOPIC_NAME -> Map(0 -> Seq(1), 1 -> Seq(1))) replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager]) zkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) // make two partitions of the group topic to make sure some partitions are not owned by the coordinator - EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName)).andReturn(Some(2)) + EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2)) EasyMock.replay(zkUtils) timer = new MockTimer @@ -310,7 +311,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { assertEquals(Errors.NONE, syncGroupError) EasyMock.reset(replicaManager) - EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None) + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andReturn(None) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes() EasyMock.replay(replicaManager) @@ -783,7 +784,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { assertEquals(Errors.NONE, error) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset)) - val offsetsTopic = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) // Send commit marker. groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT) @@ -808,7 +809,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { assertEquals(Errors.NONE, error) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset)) - val offsetsTopic = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) // Validate that the pending commit is discarded. groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT) @@ -832,7 +833,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { assertEquals(Errors.NONE, error) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset)) - val offsetsTopic = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT) val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) @@ -859,8 +860,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite { val producerEpoch: Short = 3 val groupIds = List(groupId, otherGroupId) - val offsetTopicPartitions = List(new TopicPartition(Topic.GroupMetadataTopicName, groupCoordinator.partitionFor(groupId)), - new TopicPartition(Topic.GroupMetadataTopicName, groupCoordinator.partitionFor(otherGroupId))) + val offsetTopicPartitions = List(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)), + new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(otherGroupId))) groupCoordinator.groupManager.addPartitionOwnership(offsetTopicPartitions(1).partition) val errors = mutable.ArrayBuffer[Errors]() @@ -937,7 +938,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { val producerIds = List(1000L, 1005L) val producerEpochs: Seq[Short] = List(3, 4) - val offsetTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupCoordinator.partitionFor(groupId)) + val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) val errors = mutable.ArrayBuffer[Errors]() val partitionData = mutable.ArrayBuffer[Map[TopicPartition, OffsetFetchResponse.PartitionData]]() @@ -1346,7 +1347,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( - Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) -> + Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP) ) )}) @@ -1428,7 +1429,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( - Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) -> + Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP) ) )}) @@ -1454,7 +1455,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( - Map(new TopicPartition(Topic.GroupMetadataTopicName, groupCoordinator.partitionFor(groupId)) -> + Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) -> new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP) ) )}) @@ -1470,7 +1471,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite { private def leaveGroup(groupId: String, consumerId: String): LeaveGroupCallbackParams = { val (responseFuture, responseCallback) = setupHeartbeatCallback - EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andReturn(None) + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andReturn(None) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes() EasyMock.replay(replicaManager) http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index ce3b997..f76eb7b 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -19,12 +19,11 @@ package kafka.coordinator.group import kafka.api.ApiVersion import kafka.cluster.Partition -import kafka.common.{OffsetAndMetadata, Topic} +import kafka.common.OffsetAndMetadata import kafka.log.{Log, LogAppendInfo} import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager} import kafka.utils.TestUtils.fail import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils} - import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record._ @@ -33,9 +32,10 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.easymock.{Capture, EasyMock, IAnswer} import org.junit.Assert.{assertEquals, assertFalse, assertTrue} import org.junit.{Before, Test} - import java.nio.ByteBuffer +import org.apache.kafka.common.internals.Topic + import scala.collection.JavaConverters._ import scala.collection._ @@ -71,7 +71,7 @@ class GroupMetadataManagerTest { // make two partitions of the group topic to make sure some partitions are not owned by the coordinator zkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) - EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName)).andReturn(Some(2)) + EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2)) EasyMock.replay(zkUtils) time = new MockTime @@ -82,7 +82,7 @@ class GroupMetadataManagerTest { @Test def testLoadOffsetsWithoutGroup() { - val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) val startOffset = 15L val committedOffsets = Map( @@ -110,7 +110,7 @@ class GroupMetadataManagerTest { @Test def testLoadTransactionalOffsetsWithoutGroup() { - val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) val producerId = 1000L val producerEpoch: Short = 2 @@ -144,7 +144,7 @@ class GroupMetadataManagerTest { @Test def testDoNotLoadAbortedTransactionalOffsetCommits() { - val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) val producerId = 1000L val producerEpoch: Short = 2 @@ -174,7 +174,7 @@ class GroupMetadataManagerTest { @Test def testGroupLoadedWithPendingCommits(): Unit = { - val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) val producerId = 1000L val producerEpoch: Short = 2 @@ -209,7 +209,7 @@ class GroupMetadataManagerTest { @Test def testLoadWithCommittedAndAbortedTransactionalOffsetCommits() { // A test which loads a log with a mix of committed and aborted transactional offset committed messages. - val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) val producerId = 1000L val producerEpoch: Short = 2 @@ -254,7 +254,7 @@ class GroupMetadataManagerTest { @Test def testLoadWithCommittedAndAbortedAndPendingTransactionalOffsetCommits() { - val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) val producerId = 1000L val producerEpoch: Short = 2 @@ -316,7 +316,7 @@ class GroupMetadataManagerTest { @Test def testLoadTransactionalOffsetCommitsFromMultipleProducers(): Unit = { - val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) val firstProducerId = 1000L val firstProducerEpoch: Short = 2 val secondProducerId = 1001L @@ -386,7 +386,7 @@ class GroupMetadataManagerTest { @Test def testLoadOffsetsWithTombstones() { - val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) val startOffset = 15L val tombstonePartition = new TopicPartition("foo", 1) @@ -421,7 +421,7 @@ class GroupMetadataManagerTest { @Test def testLoadOffsetsAndGroup() { - val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) val startOffset = 15L val committedOffsets = Map( new TopicPartition("foo", 0) -> 23L, @@ -454,7 +454,7 @@ class GroupMetadataManagerTest { @Test def testLoadGroupWithTombstone() { - val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) val startOffset = 15L val memberId = "98098230493" @@ -478,7 +478,7 @@ class GroupMetadataManagerTest { // 1. the group exists at some point in time, but is later removed (because all members left) // 2. a "simple" consumer (i.e. not a consumer group) then uses the same groupId to commit some offsets - val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) + val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) val startOffset = 15L val committedOffsets = Map( @@ -765,7 +765,7 @@ class GroupMetadataManagerTest { topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1), topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3)) - EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andStubReturn(Some(partition)) expectAppendMessage(Errors.NONE) EasyMock.replay(replicaManager) @@ -817,7 +817,7 @@ class GroupMetadataManagerTest { val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture() EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) - EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andStubReturn(Some(partition)) EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt())) .andReturn(LogAppendInfo.UnknownLogAppendInfo) @@ -865,7 +865,7 @@ class GroupMetadataManagerTest { val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture() EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) - EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andStubReturn(Some(partition)) EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt())) .andReturn(LogAppendInfo.UnknownLogAppendInfo) @@ -920,7 +920,7 @@ class GroupMetadataManagerTest { topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1), topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3)) - EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andStubReturn(Some(partition)) expectAppendMessage(Errors.NONE) EasyMock.replay(replicaManager) @@ -997,7 +997,7 @@ class GroupMetadataManagerTest { topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1), topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3)) - EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition)) + EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andStubReturn(Some(partition)) expectAppendMessage(Errors.NONE) EasyMock.replay(replicaManager) @@ -1044,7 +1044,7 @@ class GroupMetadataManagerTest { EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] { override def answer = capturedArgument.getValue.apply( - Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) -> + Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP) ) )}) http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala index 83cba71..77577cf 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorIntegrationTest.scala @@ -18,11 +18,11 @@ package kafka.coordinator.transaction import java.util.Properties -import kafka.common.Topic import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.CompressionType import org.apache.kafka.common.utils.Utils @@ -41,7 +41,7 @@ class TransactionCoordinatorIntegrationTest extends KafkaServerTestHarness { @Test def shouldCommitTransaction(): Unit = { - TestUtils.createTopic(zkUtils, Topic.TransactionStateTopicName, 1, 1, servers, servers.head.groupCoordinator.offsetsTopicConfigs) + TestUtils.createTopic(zkUtils, Topic.TRANSACTION_STATE_TOPIC_NAME, 1, 1, servers, servers.head.groupCoordinator.offsetsTopicConfigs) val topic = "foo" TestUtils.createTopic(this.zkUtils, topic, 1, 1, servers) http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 0250f60..0d3263a 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -18,13 +18,12 @@ package kafka.coordinator.transaction import java.nio.ByteBuffer -import kafka.common.Topic -import kafka.common.Topic.TransactionStateTopicName import kafka.log.Log import kafka.server.{FetchDataInfo, LogOffsetMetadata, ReplicaManager} import kafka.utils.{MockScheduler, Pool, ZkUtils} import kafka.utils.TestUtils.fail import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.IsolationLevel @@ -44,7 +43,7 @@ class TransactionStateManagerTest { val partitionId = 0 val numPartitions = 2 val transactionTimeoutMs: Int = 1000 - val topicPartition = new TopicPartition(TransactionStateTopicName, partitionId) + val topicPartition = new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId) val coordinatorEpoch = 10 val txnRecords: mutable.ArrayBuffer[SimpleRecord] = mutable.ArrayBuffer[SimpleRecord]() @@ -54,7 +53,7 @@ class TransactionStateManagerTest { val zkUtils: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) val replicaManager: ReplicaManager = EasyMock.createNiceMock(classOf[ReplicaManager]) - EasyMock.expect(zkUtils.getTopicPartitionCount(TransactionStateTopicName)) + EasyMock.expect(zkUtils.getTopicPartitionCount(TRANSACTION_STATE_TOPIC_NAME)) .andReturn(Some(numPartitions)) .anyTimes() @@ -403,7 +402,7 @@ class TransactionStateManagerTest { EasyMock.capture(capturedArgument))) .andAnswer(new IAnswer[Unit] { override def answer(): Unit = capturedArgument.getValue.apply( - Map(new TopicPartition(Topic.TransactionStateTopicName, partitionId) -> + Map(new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId) -> new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP) ) ) http://git-wip-us.apache.org/repos/asf/kafka/blob/bc55f852/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala index ed0e805..fdc9a95 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala @@ -17,15 +17,16 @@ package kafka.server -import java.util.{Properties} +import java.util.Properties -import kafka.common.Topic import kafka.utils.TestUtils +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse} import org.junit.Assert._ import org.junit.Test import org.apache.kafka.test.TestUtils.isValidClusterId + import scala.collection.JavaConverters._ class MetadataRequestTest extends BaseRequestTest { @@ -80,7 +81,7 @@ class MetadataRequestTest extends BaseRequestTest { @Test def testIsInternal() { - val internalTopic = Topic.GroupMetadataTopicName + val internalTopic = Topic.GROUP_METADATA_TOPIC_NAME val notInternalTopic = "notInternal" // create the topics TestUtils.createTopic(zkUtils, internalTopic, 3, 2, servers)
