This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push: new b07d67c MINOR: Use exceptions in o.a.k.common if possible and deprecate ZkUtils (#5255) b07d67c is described below commit b07d67ccb899de20f23fd17c81bcecd208087ba2 Author: Ismael Juma <ism...@juma.me.uk> AuthorDate: Wed Jun 20 05:05:50 2018 -0700 MINOR: Use exceptions in o.a.k.common if possible and deprecate ZkUtils (#5255) Also: - Remove exceptions in `kafka.common` that are no longer used. - Keep `kafka.common.KafkaException` as it's still used by `ZkUtils`, `kafka.admin.AdminClient` and `kafka.security.auth` classes and we would like to maintain compatibility for now. - Add deprecated annotation to `kafka.admin.AdminClient`. The scaladoc stated that the class is deprecated, but the annotation was missing. Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>, Manikumar Reddy <manikumar.re...@gmail.com> --- core/src/main/scala/kafka/admin/AdminClient.scala | 7 ++++-- .../src/main/scala/kafka/admin/ConfigCommand.scala | 10 ++++----- core/src/main/scala/kafka/api/ApiUtils.scala | 5 +++-- .../main/scala/kafka/cluster/BrokerEndPoint.scala | 2 +- core/src/main/scala/kafka/cluster/EndPoint.scala | 2 +- core/src/main/scala/kafka/cluster/Replica.scala | 3 +-- core/src/main/scala/kafka/common/Config.scala | 5 +++-- .../common/IndexOffsetOverflowException.scala | 2 +- .../kafka/common/InvalidConfigException.scala | 25 --------------------- .../kafka/common/InvalidOffsetException.scala | 22 ------------------ .../main/scala/kafka/common/KafkaException.scala | 5 ++++- .../kafka/common/LeaderNotAvailableException.scala | 26 ---------------------- .../common/LogSegmentOffsetOverflowException.scala | 2 +- .../controller/ControllerChannelManager.scala | 3 +-- .../scala/kafka/controller/KafkaController.scala | 2 +- .../coordinator/group/GroupMetadataManager.scala | 4 ++-- .../transaction/ProducerIdManager.scala | 2 +- .../coordinator/transaction/TransactionLog.scala | 4 ++-- .../transaction/TransactionStateManager.scala | 3 +-- core/src/main/scala/kafka/log/Log.scala | 6 ++--- core/src/main/scala/kafka/log/LogCleaner.scala | 2 +- core/src/main/scala/kafka/log/LogManager.scala | 3 +-- core/src/main/scala/kafka/log/OffsetIndex.scala | 2 +- .../scala/kafka/log/ProducerStateManager.scala | 3 +-- core/src/main/scala/kafka/log/TimeIndex.scala | 2 +- .../scala/kafka/message/CompressionCodec.scala | 4 +++- .../main/scala/kafka/network/SocketServer.scala | 3 +-- .../scala/kafka/server/AbstractFetcherThread.scala | 6 ++--- .../main/scala/kafka/tools/ConsoleProducer.scala | 1 + core/src/main/scala/kafka/utils/Pool.scala | 3 ++- core/src/main/scala/kafka/utils/ZkUtils.scala | 4 ++++ core/src/main/scala/kafka/zk/KafkaZkClient.scala | 3 +-- core/src/main/scala/kafka/zk/ZkData.scala | 4 ++-- .../scala/unit/kafka/admin/ConfigCommandTest.scala | 4 ++-- .../test/scala/unit/kafka/api/ApiUtilsTest.scala | 4 +++- .../scala/unit/kafka/cluster/PartitionTest.scala | 3 +-- .../transaction/ProducerIdManagerTest.scala | 2 +- .../kafka/integration/KafkaServerTestHarness.scala | 2 +- .../test/scala/unit/kafka/log/LogManagerTest.scala | 3 +-- core/src/test/scala/unit/kafka/log/LogTest.scala | 4 ++-- .../scala/unit/kafka/log/OffsetIndexTest.scala | 2 +- .../test/scala/unit/kafka/log/TimeIndexTest.scala | 6 ++--- .../unit/kafka/server/ServerStartupTest.scala | 2 +- .../scala/unit/kafka/utils/CoreUtilsTest.scala | 2 +- 44 files changed, 75 insertions(+), 139 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index d847881..1009bc5 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -42,9 +42,10 @@ import scala.util.{Failure, Success, Try} /** * A Scala administrative client for Kafka which supports managing and inspecting topics, brokers, - * and configurations. This client is deprecated, and will be replaced by KafkaAdminClient. - * @see KafkaAdminClient + * and configurations. This client is deprecated, and will be replaced by org.apache.kafka.clients.admin.AdminClient. */ +@deprecated("This class is deprecated in favour of org.apache.kafka.clients.admin.AdminClient and it will be removed in " + + "a future release.", since = "0.11.0") class AdminClient(val time: Time, val requestTimeoutMs: Int, val retryBackoffMs: Long, @@ -364,6 +365,8 @@ class CompositeFuture[T](time: Time, } } +@deprecated("This class is deprecated in favour of org.apache.kafka.clients.admin.AdminClient and it will be removed in " + + "a future release.", since = "0.11.0") object AdminClient { val DefaultConnectionMaxIdleMs = 9 * 60 * 1000 val DefaultRequestTimeoutMs = 5000 diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 6ac0a01..d8dade0 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -22,7 +22,6 @@ import java.util.{Collections, Properties} import joptsimple._ import kafka.common.Config -import kafka.common.InvalidConfigException import kafka.log.LogConfig import kafka.server.{ConfigEntityName, ConfigType, Defaults, DynamicBrokerConfig, DynamicConfig, KafkaConfig} import kafka.utils.{CommandLineUtils, Exit, PasswordEncoder} @@ -32,6 +31,7 @@ import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.{AlterConfigsOptions, ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient, Config => JConfig} import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.types.Password +import org.apache.kafka.common.errors.InvalidConfigurationException import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism} import org.apache.kafka.common.utils.{Sanitizer, Time, Utils} @@ -83,7 +83,7 @@ object ConfigCommand extends Config { processBrokerConfig(opts) } } catch { - case e @ (_: IllegalArgumentException | _: InvalidConfigException | _: OptionException) => + case e @ (_: IllegalArgumentException | _: InvalidConfigurationException | _: OptionException) => logger.debug(s"Failed config command with args '${args.mkString(" ")}'", e) System.err.println(e.getMessage) Exit.exit(1) @@ -145,7 +145,7 @@ object ConfigCommand extends Config { // fail the command if any of the configs to be deleted does not exist val invalidConfigs = configsToBeDeleted.filterNot(configs.containsKey(_)) if (invalidConfigs.nonEmpty) - throw new InvalidConfigException(s"Invalid config(s): ${invalidConfigs.mkString(",")}") + throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}") configs ++= configsToBeAdded configsToBeDeleted.foreach(configs.remove(_)) @@ -307,12 +307,12 @@ object ConfigCommand extends Config { // fail the command if any of the configs to be deleted does not exist val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains) if (invalidConfigs.nonEmpty) - throw new InvalidConfigException(s"Invalid config(s): ${invalidConfigs.mkString(",")}") + throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}") val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted val sensitiveEntries = newEntries.filter(_._2.value == null) if (sensitiveEntries.nonEmpty) - throw new InvalidConfigException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}") + throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}") val newConfig = new JConfig(newEntries.asJava.values) val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false) diff --git a/core/src/main/scala/kafka/api/ApiUtils.scala b/core/src/main/scala/kafka/api/ApiUtils.scala index 63fece7..4a0c8b0 100644 --- a/core/src/main/scala/kafka/api/ApiUtils.scala +++ b/core/src/main/scala/kafka/api/ApiUtils.scala @@ -16,8 +16,9 @@ */ package kafka.api -import java.nio._ -import kafka.common._ +import java.nio.ByteBuffer + +import org.apache.kafka.common.KafkaException /** * Helper functions specific to parsing or serializing requests and responses diff --git a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala index 847e959..986d352 100644 --- a/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala +++ b/core/src/main/scala/kafka/cluster/BrokerEndPoint.scala @@ -19,7 +19,7 @@ package kafka.cluster import java.nio.ByteBuffer import kafka.api.ApiUtils._ -import kafka.common.KafkaException +import org.apache.kafka.common.KafkaException import org.apache.kafka.common.utils.Utils._ object BrokerEndPoint { diff --git a/core/src/main/scala/kafka/cluster/EndPoint.scala b/core/src/main/scala/kafka/cluster/EndPoint.scala index 57ef0da..2bca5c8 100644 --- a/core/src/main/scala/kafka/cluster/EndPoint.scala +++ b/core/src/main/scala/kafka/cluster/EndPoint.scala @@ -17,7 +17,7 @@ package kafka.cluster -import kafka.common.KafkaException +import org.apache.kafka.common.KafkaException import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.Utils diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 4b65e43..962aaff 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -20,8 +20,7 @@ package kafka.cluster import kafka.log.Log import kafka.utils.Logging import kafka.server.{LogOffsetMetadata, LogReadResult} -import kafka.common.KafkaException -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.OffsetOutOfRangeException import org.apache.kafka.common.utils.Time diff --git a/core/src/main/scala/kafka/common/Config.scala b/core/src/main/scala/kafka/common/Config.scala index d24fb0d..4110ba7 100644 --- a/core/src/main/scala/kafka/common/Config.scala +++ b/core/src/main/scala/kafka/common/Config.scala @@ -19,6 +19,7 @@ package kafka.common import util.matching.Regex import kafka.utils.Logging +import org.apache.kafka.common.errors.InvalidConfigurationException trait Config extends Logging { @@ -29,8 +30,8 @@ trait Config extends Logging { rgx.findFirstIn(value) match { case Some(t) => if (!t.equals(value)) - throw new InvalidConfigException(prop + " " + value + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'") - case None => throw new InvalidConfigException(prop + " " + value + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'") + throw new InvalidConfigurationException(prop + " " + value + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'") + case None => throw new InvalidConfigurationException(prop + " " + value + " is illegal, contains a character other than ASCII alphanumerics, '.', '_' and '-'") } } } diff --git a/core/src/main/scala/kafka/common/IndexOffsetOverflowException.scala b/core/src/main/scala/kafka/common/IndexOffsetOverflowException.scala index 7f3ea11..5dd9b43 100644 --- a/core/src/main/scala/kafka/common/IndexOffsetOverflowException.scala +++ b/core/src/main/scala/kafka/common/IndexOffsetOverflowException.scala @@ -20,6 +20,6 @@ package kafka.common /** * Indicates that an attempt was made to append a message whose offset could cause the index offset to overflow. */ -class IndexOffsetOverflowException(message: String, cause: Throwable) extends KafkaException(message, cause) { +class IndexOffsetOverflowException(message: String, cause: Throwable) extends org.apache.kafka.common.KafkaException(message, cause) { def this(message: String) = this(message, null) } diff --git a/core/src/main/scala/kafka/common/InvalidConfigException.scala b/core/src/main/scala/kafka/common/InvalidConfigException.scala deleted file mode 100644 index 6437846..0000000 --- a/core/src/main/scala/kafka/common/InvalidConfigException.scala +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package kafka.common - -/** - * Indicates that the given config parameter has invalid value - */ -class InvalidConfigException(message: String) extends RuntimeException(message) { - def this() = this(null) -} diff --git a/core/src/main/scala/kafka/common/InvalidOffsetException.scala b/core/src/main/scala/kafka/common/InvalidOffsetException.scala deleted file mode 100644 index c6811d7..0000000 --- a/core/src/main/scala/kafka/common/InvalidOffsetException.scala +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -class InvalidOffsetException(message: String) extends RuntimeException(message) { - def this() = this(null) -} \ No newline at end of file diff --git a/core/src/main/scala/kafka/common/KafkaException.scala b/core/src/main/scala/kafka/common/KafkaException.scala index e72d151..61b3ba3 100644 --- a/core/src/main/scala/kafka/common/KafkaException.scala +++ b/core/src/main/scala/kafka/common/KafkaException.scala @@ -17,7 +17,10 @@ package kafka.common /** - * Generic Kafka exception + * Usage of this class is discouraged. Use org.apache.kafka.common.KafkaException instead. + * + * This class will be removed once ZkUtils and the kafka.security.auth classes are removed. + * The former is internal, but widely used, so we are leaving it in the codebase for now. */ class KafkaException(message: String, t: Throwable) extends RuntimeException(message, t) { def this(message: String) = this(message, null) diff --git a/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala b/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala deleted file mode 100644 index 972728e..0000000 --- a/core/src/main/scala/kafka/common/LeaderNotAvailableException.scala +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.common - -/** - * Thrown when a request is made for partition, but no leader exists for that partition - */ -class LeaderNotAvailableException(message: String, cause: Throwable) extends RuntimeException(message, cause) { - def this(message: String) = this(message, null) - def this() = this(null, null) -} diff --git a/core/src/main/scala/kafka/common/LogSegmentOffsetOverflowException.scala b/core/src/main/scala/kafka/common/LogSegmentOffsetOverflowException.scala index 9a24efe..2de5906 100644 --- a/core/src/main/scala/kafka/common/LogSegmentOffsetOverflowException.scala +++ b/core/src/main/scala/kafka/common/LogSegmentOffsetOverflowException.scala @@ -26,5 +26,5 @@ import kafka.log.LogSegment * do not have any segments with offset overflow. */ class LogSegmentOffsetOverflowException(val segment: LogSegment, val offset: Long) - extends KafkaException(s"Detected offset overflow at offset $offset in segment $segment") { + extends org.apache.kafka.common.KafkaException(s"Detected offset overflow at offset $offset in segment $segment") { } diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index addd88d..096b2b4 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -22,7 +22,6 @@ import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit} import com.yammer.metrics.core.{Gauge, Timer} import kafka.api._ import kafka.cluster.Broker -import kafka.common.KafkaException import kafka.metrics.KafkaMetricsGroup import kafka.server.KafkaConfig import kafka.utils._ @@ -35,7 +34,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.{LogContext, Time} -import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.{KafkaException, Node, TopicPartition} import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 9c33874..11d22fd 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -28,7 +28,7 @@ import kafka.utils._ import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult import kafka.zk._ import kafka.zookeeper.{StateChangeHandler, ZNodeChangeHandler, ZNodeChildChangeHandler} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors} diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 233a76e..02ba13a 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -26,14 +26,14 @@ import java.util.concurrent.locks.ReentrantLock import com.yammer.metrics.core.Gauge import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0} -import kafka.common.{KafkaException, MessageFormatter, OffsetAndMetadata} +import kafka.common.{MessageFormatter, OffsetAndMetadata} import kafka.metrics.KafkaMetricsGroup import kafka.server.ReplicaManager import kafka.utils.CoreUtils.inLock import kafka.utils._ import kafka.zk.KafkaZkClient import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.protocol.types.Type._ diff --git a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala index c3c9f7c..5c22c8e 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/ProducerIdManager.scala @@ -18,9 +18,9 @@ package kafka.coordinator.transaction import java.nio.charset.StandardCharsets -import kafka.common.KafkaException import kafka.utils.{Json, Logging} import kafka.zk.{KafkaZkClient, ProducerIdBlockZNode} +import org.apache.kafka.common.KafkaException import scala.collection.JavaConverters._ diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala index 2c7178e..2dc6e38 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala @@ -16,9 +16,9 @@ */ package kafka.coordinator.transaction -import kafka.common.{KafkaException, MessageFormatter} +import kafka.common.MessageFormatter import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.protocol.types.Type._ import org.apache.kafka.common.protocol.types._ import java.io.PrintStream diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index e3b0321..a358515 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantReadWriteLock -import kafka.common.KafkaException import kafka.log.LogConfig import kafka.message.UncompressedCodec import kafka.server.Defaults @@ -30,7 +29,7 @@ import kafka.server.ReplicaManager import kafka.utils.CoreUtils.{inReadLock, inWriteLock} import kafka.utils.{Logging, Pool, Scheduler} import kafka.zk.KafkaZkClient -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord} diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 64048fb..3036018 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -28,15 +28,15 @@ import java.util.regex.Pattern import com.yammer.metrics.core.Gauge import kafka.api.KAFKA_0_10_0_IV0 -import kafka.common.{InvalidOffsetException, KafkaException, LogSegmentOffsetOverflowException, LongRef, UnexpectedAppendOffsetException, OffsetsOutOfOrderException} +import kafka.common.{LogSegmentOffsetOverflowException, LongRef, OffsetsOutOfOrderException, UnexpectedAppendOffsetException} import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec} import kafka.metrics.KafkaMetricsGroup import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile} import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache} import kafka.server.{BrokerTopicStats, FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata} import kafka.utils._ -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException} +import org.apache.kafka.common.{KafkaException, TopicPartition} +import org.apache.kafka.common.errors.{CorruptRecordException, InvalidOffsetException, KafkaStorageException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException} import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import org.apache.kafka.common.requests.{IsolationLevel, ListOffsetRequest} diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 08bfa4f..91ddbf0 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -27,7 +27,7 @@ import kafka.common._ import kafka.metrics.KafkaMetricsGroup import kafka.server.{BrokerReconfigurable, KafkaConfig, LogDirFailureChannel} import kafka.utils._ -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException} import org.apache.kafka.common.record.MemoryRecords.RecordFilter diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 3bb5ee6..32203ac 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -22,13 +22,12 @@ import java.nio.file.Files import java.util.concurrent._ import com.yammer.metrics.core.Gauge -import kafka.common.KafkaException import kafka.metrics.KafkaMetricsGroup import kafka.server.checkpoints.OffsetCheckpointFile import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _} import kafka.utils._ import kafka.zk.KafkaZkClient -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.utils.Time import org.apache.kafka.common.errors.{KafkaStorageException, LogDirNotFoundException} diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala index d185631..2babd00 100755 --- a/core/src/main/scala/kafka/log/OffsetIndex.scala +++ b/core/src/main/scala/kafka/log/OffsetIndex.scala @@ -21,7 +21,7 @@ import java.io.File import java.nio.ByteBuffer import kafka.utils.CoreUtils.inLock -import kafka.common.{IndexOffsetOverflowException, InvalidOffsetException} +import org.apache.kafka.common.errors.InvalidOffsetException /** * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse: diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala index abeac6e..caca9a8 100644 --- a/core/src/main/scala/kafka/log/ProducerStateManager.scala +++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala @@ -20,11 +20,10 @@ import java.io._ import java.nio.ByteBuffer import java.nio.file.Files -import kafka.common.KafkaException import kafka.log.Log.offsetFromFile import kafka.server.LogOffsetMetadata import kafka.utils.{Logging, nonthreadsafe, threadsafe} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.protocol.types._ diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala index 7fae130..1661cba 100644 --- a/core/src/main/scala/kafka/log/TimeIndex.scala +++ b/core/src/main/scala/kafka/log/TimeIndex.scala @@ -20,9 +20,9 @@ package kafka.log import java.io.File import java.nio.ByteBuffer -import kafka.common.InvalidOffsetException import kafka.utils.CoreUtils._ import kafka.utils.Logging +import org.apache.kafka.common.errors.InvalidOffsetException import org.apache.kafka.common.record.RecordBatch /** diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala b/core/src/main/scala/kafka/message/CompressionCodec.scala index a485271..64e0aaa 100644 --- a/core/src/main/scala/kafka/message/CompressionCodec.scala +++ b/core/src/main/scala/kafka/message/CompressionCodec.scala @@ -19,6 +19,8 @@ package kafka.message import java.util.Locale +import kafka.common.UnknownCodecException + object CompressionCodec { def getCompressionCodec(codec: Int): CompressionCodec = { codec match { @@ -26,7 +28,7 @@ object CompressionCodec { case GZIPCompressionCodec.codec => GZIPCompressionCodec case SnappyCompressionCodec.codec => SnappyCompressionCodec case LZ4CompressionCodec.codec => LZ4CompressionCodec - case _ => throw new kafka.common.UnknownCodecException("%d is an unknown compression codec".format(codec)) + case _ => throw new UnknownCodecException("%d is an unknown compression codec".format(codec)) } } def getCompressionCodec(name: String): CompressionCodec = { diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 06da8df..62fc7a5 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -26,13 +26,12 @@ import java.util.concurrent.atomic._ import com.yammer.metrics.core.Gauge import kafka.cluster.{BrokerEndPoint, EndPoint} -import kafka.common.KafkaException import kafka.metrics.KafkaMetricsGroup import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse} import kafka.security.CredentialProvider import kafka.server.KafkaConfig import kafka.utils._ -import org.apache.kafka.common.Reconfigurable +import org.apache.kafka.common.{KafkaException, Reconfigurable} import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool} import org.apache.kafka.common.metrics._ import org.apache.kafka.common.metrics.stats.Meter diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index f27dbfe..e056ad6 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -19,11 +19,11 @@ package kafka.server import java.util.concurrent.locks.ReentrantLock -import kafka.cluster.{Replica, BrokerEndPoint} +import kafka.cluster.{BrokerEndPoint, Replica} import kafka.utils.{DelayedItem, Pool, ShutdownableThread} import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException} import org.apache.kafka.common.requests.EpochEndOffset._ -import kafka.common.{ClientIdAndBroker, KafkaException} +import kafka.common.ClientIdAndBroker import kafka.metrics.KafkaMetricsGroup import kafka.utils.CoreUtils.inLock import org.apache.kafka.common.protocol.Errors @@ -35,7 +35,7 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicLong import com.yammer.metrics.core.Gauge -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.internals.{FatalExitError, PartitionStates} import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.EpochEndOffset diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 3e64b93..8d8c42d 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -28,6 +28,7 @@ import java.nio.charset.StandardCharsets import joptsimple._ import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.common.KafkaException import org.apache.kafka.common.utils.Utils import scala.collection.JavaConverters._ diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala index 4ddf557..742d3dc 100644 --- a/core/src/main/scala/kafka/utils/Pool.scala +++ b/core/src/main/scala/kafka/utils/Pool.scala @@ -19,9 +19,10 @@ package kafka.utils import java.util.concurrent._ +import org.apache.kafka.common.KafkaException + import collection.mutable import collection.JavaConverters._ -import kafka.common.KafkaException class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] { diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 004ab3d..d47af0d 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -36,6 +36,8 @@ import scala.collection._ import scala.collection.JavaConverters._ import org.apache.kafka.common.TopicPartition +@deprecated("This is an internal class that is no longer used by Kafka and will be removed in a future release. Please " + + "use org.apache.kafka.clients.admin.AdminClient instead.", since = "2.0.0") object ZkUtils { private val UseDefaultAcls = new java.util.ArrayList[ACL] @@ -183,6 +185,8 @@ object ZkUtils { /** * Legacy class for interacting with ZooKeeper. Whenever possible, ``KafkaZkClient`` should be used instead. */ +@deprecated("This is an internal class that is no longer used by Kafka and will be removed in a future release. Please " + + "use org.apache.kafka.clients.admin.AdminClient instead.", since = "2.0.0") class ZkUtils(val zkClient: ZkClient, val zkConnection: ZkConnection, val isSecure: Boolean) extends Logging { diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index d5beae8..bb34294 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -21,7 +21,6 @@ import java.util.Properties import com.yammer.metrics.core.MetricName import kafka.api.LeaderAndIsr import kafka.cluster.Broker -import kafka.common.KafkaException import kafka.controller.LeaderIsrAndControllerEpoch import kafka.log.LogConfig import kafka.metrics.KafkaMetricsGroup @@ -30,7 +29,7 @@ import kafka.security.auth.{Acl, Resource, ResourceType} import kafka.server.ConfigType import kafka.utils.Logging import kafka.zookeeper._ -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.resource.PatternType import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation} import org.apache.kafka.common.utils.{Time, Utils} diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala index d782ae0..d2b2333 100644 --- a/core/src/main/scala/kafka/zk/ZkData.scala +++ b/core/src/main/scala/kafka/zk/ZkData.scala @@ -23,14 +23,14 @@ import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.core.JsonProcessingException import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr} import kafka.cluster.{Broker, EndPoint} -import kafka.common.{KafkaException, NotificationHandler, ZkNodeChangeNotificationListener} +import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener} import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch} import kafka.security.auth.Resource.Separator import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls import kafka.security.auth.{Acl, Resource, ResourceType} import kafka.server.{ConfigType, DelegationTokenManager} import kafka.utils.Json -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.UnsupportedVersionException import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.resource.PatternType diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index 2644dcc..2e8179c 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -22,7 +22,6 @@ import java.util.Properties import kafka.admin.ConfigCommand.ConfigCommandOptions import kafka.api.ApiVersion import kafka.cluster.{Broker, EndPoint} -import kafka.common.InvalidConfigException import kafka.server.{ConfigEntityName, KafkaConfig} import kafka.utils.{Exit, Logging} import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient, ZooKeeperTestHarness} @@ -30,6 +29,7 @@ import org.apache.kafka.clients.admin._ import org.apache.kafka.common.config.{ConfigException, ConfigResource} import org.apache.kafka.common.internals.KafkaFutureImpl import org.apache.kafka.common.Node +import org.apache.kafka.common.errors.InvalidConfigurationException import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils @@ -425,7 +425,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient)) } - @Test (expected = classOf[InvalidConfigException]) + @Test (expected = classOf[InvalidConfigurationException]) def shouldNotUpdateBrokerConfigIfNonExistingConfigIsDeleted(): Unit = { val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, "--entity-name", "my-topic", diff --git a/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala b/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala index b71b00b..9fe4cbf 100644 --- a/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/api/ApiUtilsTest.scala @@ -20,10 +20,12 @@ package kafka.api import org.junit._ import org.scalatest.junit.JUnitSuite import org.junit.Assert._ + import scala.util.Random import java.nio.ByteBuffer -import kafka.common.KafkaException + import kafka.utils.TestUtils +import org.apache.kafka.common.KafkaException object ApiUtilsTest { val rnd: Random = new Random() diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala index fe5d578..fe9038a 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala @@ -22,10 +22,9 @@ import java.util.Properties import java.util.concurrent.atomic.AtomicBoolean import kafka.common.UnexpectedAppendOffsetException -import kafka.log.{Log, LogConfig, LogManager, CleanerConfig} +import kafka.log.{LogConfig, LogManager, CleanerConfig} import kafka.server._ import kafka.utils.{MockTime, TestUtils, MockScheduler} -import kafka.utils.timer.MockTimer import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.ReplicaNotAvailableException import org.apache.kafka.common.metrics.Metrics diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala index 88aebd3..660e623 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala @@ -16,8 +16,8 @@ */ package kafka.coordinator.transaction -import kafka.common.KafkaException import kafka.zk.KafkaZkClient +import org.apache.kafka.common.KafkaException import org.easymock.{Capture, EasyMock, IAnswer} import org.junit.{After, Test} import org.junit.Assert._ diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 2c4a988..0c97357 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -20,7 +20,6 @@ package kafka.integration import java.io.File import java.util.Arrays -import kafka.common.KafkaException import kafka.server._ import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness @@ -30,6 +29,7 @@ import org.junit.{After, Before} import scala.collection.mutable.{ArrayBuffer, Buffer} import java.util.Properties +import org.apache.kafka.common.KafkaException import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.utils.Time diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 3fc6c1c..38d6f71 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -20,10 +20,9 @@ package kafka.log import java.io._ import java.util.Properties -import kafka.common._ import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils._ -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.OffsetOutOfRangeException import org.apache.kafka.common.utils.Utils import org.junit.Assert._ diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index f3b4e95..3b5b2fa 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -22,12 +22,12 @@ import java.nio.ByteBuffer import java.nio.file.{Files, Paths} import java.util.Properties -import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException, KafkaException} +import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} import kafka.log.Log.DeleteDirSuffix import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache} import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel} import kafka.utils._ -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors._ import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala index 1e4e892..1529597 100644 --- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala @@ -29,7 +29,7 @@ import org.scalatest.junit.JUnitSuite import scala.collection._ import scala.util.Random import kafka.utils.TestUtils -import kafka.common.InvalidOffsetException +import org.apache.kafka.common.errors.InvalidOffsetException class OffsetIndexTest extends JUnitSuite { diff --git a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala index 8520f89..b9478cd 100644 --- a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala @@ -19,10 +19,10 @@ package kafka.log import java.io.File -import kafka.common.InvalidOffsetException import kafka.utils.TestUtils -import org.junit.{Test, After, Before} -import org.junit.Assert.{assertEquals} +import org.apache.kafka.common.errors.InvalidOffsetException +import org.junit.{After, Before, Test} +import org.junit.Assert.assertEquals import org.scalatest.junit.JUnitSuite /** diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala index 64647de..67d083c 100755 --- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala @@ -17,9 +17,9 @@ package kafka.server -import kafka.common.KafkaException import kafka.utils.TestUtils import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.common.KafkaException import org.apache.zookeeper.KeeperException.NodeExistsException import org.easymock.EasyMock import org.junit.Assert._ diff --git a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala index 7c416a2..93578c6 100755 --- a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala +++ b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala @@ -26,8 +26,8 @@ import java.util.regex.Pattern import org.scalatest.junit.JUnitSuite import org.junit.Assert._ -import kafka.common.KafkaException import kafka.utils.CoreUtils.inLock +import org.apache.kafka.common.KafkaException import org.junit.Test import org.apache.kafka.common.utils.Utils import org.slf4j.event.Level