Repository: kafka Updated Branches: refs/heads/trunk dea0719e9 -> e89a9ce1a
KAFKA-2982; Mark the old Scala producer and related classes as deprecated Also update server tests to always use new producer. Author: Ismael Juma <[email protected]> Reviewers: Gwen Shapira Closes #1092 from ijuma/kafka-2982-deprecate-old-producers Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e89a9ce1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e89a9ce1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e89a9ce1 Branch: refs/heads/trunk Commit: e89a9ce1a4383af32435c7f4ee04361b1b65797d Parents: dea0719 Author: Ismael Juma <[email protected]> Authored: Thu Mar 17 18:12:40 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Thu Mar 17 18:12:40 2016 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/client/ClientUtils.scala | 1 + .../scala/kafka/javaapi/producer/Producer.scala | 2 + .../scala/kafka/metrics/KafkaMetricsGroup.scala | 1 + .../scala/kafka/producer/BaseProducer.scala | 6 ++ .../kafka/producer/BrokerPartitionInfo.scala | 3 +- .../kafka/producer/ByteArrayPartitioner.scala | 2 + .../kafka/producer/DefaultPartitioner.scala | 2 + .../scala/kafka/producer/KeyedMessage.scala | 2 + .../main/scala/kafka/producer/Partitioner.scala | 2 + .../main/scala/kafka/producer/Producer.scala | 3 +- .../producer/ProducerClosedException.scala | 1 + .../scala/kafka/producer/ProducerConfig.scala | 4 + .../scala/kafka/producer/ProducerPool.scala | 3 +- .../kafka/producer/ProducerRequestStats.scala | 3 + .../scala/kafka/producer/ProducerStats.scala | 2 + .../kafka/producer/ProducerTopicStats.scala | 4 +- .../scala/kafka/producer/SyncProducer.scala | 4 + .../kafka/producer/SyncProducerConfig.scala | 6 ++ .../producer/async/AsyncProducerConfig.scala | 2 + .../producer/async/DefaultEventHandler.scala | 1 + .../kafka/producer/async/EventHandler.scala | 1 + .../async/IllegalQueueStateException.scala | 1 + .../producer/async/MissingConfigException.scala | 1 + .../producer/async/ProducerSendThread.scala | 1 + .../scala/kafka/tools/KafkaMigrationTool.java | 2 +- .../kafka/api/BaseProducerSendTest.scala | 22 +---- .../kafka/api/PlaintextProducerSendTest.scala | 24 ++++- .../kafka/api/ProducerFailureHandlingTest.scala | 9 +- .../scala/kafka/tools/ConsoleProducerTest.scala | 3 +- .../scala/unit/kafka/common/ConfigTest.scala | 1 + .../ZookeeperConsumerConnectorTest.scala | 1 + .../kafka/integration/AutoOffsetResetTest.scala | 1 + .../unit/kafka/integration/FetcherTest.scala | 4 +- .../kafka/integration/PrimitiveApiTest.scala | 1 + .../ProducerConsumerTestHarness.scala | 1 + .../integration/UncleanLeaderElectionTest.scala | 23 +++-- .../ZookeeperConsumerConnectorTest.scala | 2 +- .../scala/unit/kafka/metrics/MetricsTest.scala | 6 +- .../unit/kafka/network/SocketServerTest.scala | 14 ++- .../unit/kafka/producer/AsyncProducerTest.scala | 21 +++-- .../unit/kafka/producer/ProducerTest.scala | 1 + .../unit/kafka/producer/SyncProducerTest.scala | 34 ++++--- .../kafka/server/BaseReplicaFetchTest.scala | 22 +++-- .../unit/kafka/server/LogRecoveryTest.scala | 25 ++--- .../unit/kafka/server/ServerShutdownTest.scala | 36 +++---- .../test/scala/unit/kafka/utils/TestUtils.scala | 99 ++++++++++++-------- 46 files changed, 263 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/client/ClientUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala index 2093749..fd1fc26 100755 --- a/core/src/main/scala/kafka/client/ClientUtils.scala +++ b/core/src/main/scala/kafka/client/ClientUtils.scala @@ -42,6 +42,7 @@ object ClientUtils extends Logging{ * @param producerConfig The producer's config * @return topic metadata response */ + @deprecated("This method has been deprecated and will be removed in a future release.", "0.10.0.0") def fetchTopicMetadata(topics: Set[String], brokers: Seq[BrokerEndPoint], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = { var fetchMetaDataSucceeded: Boolean = false var i: Int = 0 http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/javaapi/producer/Producer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/javaapi/producer/Producer.scala b/core/src/main/scala/kafka/javaapi/producer/Producer.scala index c465da5..44f9245 100644 --- a/core/src/main/scala/kafka/javaapi/producer/Producer.scala +++ b/core/src/main/scala/kafka/javaapi/producer/Producer.scala @@ -21,6 +21,8 @@ import kafka.producer.ProducerConfig import kafka.producer.KeyedMessage import scala.collection.mutable +@deprecated("This class has been deprecated and will be removed in a future release. " + + "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0") class Producer[K,V](private val underlying: kafka.producer.Producer[K,V]) // for testing only { def this(config: ProducerConfig) = this(new kafka.producer.Producer[K,V](config)) http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala index 72ecae1..12dfeb1 100644 --- a/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala +++ b/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala @@ -192,6 +192,7 @@ object KafkaMetricsGroup extends KafkaMetricsGroup with Logging { removeAllMetricsInList(KafkaMetricsGroup.consumerMetricNameList, clientId) } + @deprecated("This method has been deprecated and will be removed in a future release.", "0.10.0.0") def removeAllProducerMetrics(clientId: String) { ProducerRequestStatsRegistry.removeProducerRequestStats(clientId) ProducerTopicStatsRegistry.removeProducerTopicStats(clientId) http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/BaseProducer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/BaseProducer.scala b/core/src/main/scala/kafka/producer/BaseProducer.scala index 9d0976f..83d9aa7 100644 --- a/core/src/main/scala/kafka/producer/BaseProducer.scala +++ b/core/src/main/scala/kafka/producer/BaseProducer.scala @@ -21,11 +21,15 @@ import java.util.Properties // A base producer used whenever we need to have options for both old and new producers; // this class will be removed once we fully rolled out 0.9 +@deprecated("This trait has been deprecated and will be removed in a future release. " + + "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0") trait BaseProducer { def send(topic: String, key: Array[Byte], value: Array[Byte]) def close() } +@deprecated("This class has been deprecated and will be removed in a future release. " + + "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0") class NewShinyProducer(producerProps: Properties) extends BaseProducer { import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback @@ -50,6 +54,8 @@ class NewShinyProducer(producerProps: Properties) extends BaseProducer { } } +@deprecated("This class has been deprecated and will be removed in a future release. " + + "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0") class OldProducer(producerProps: Properties) extends BaseProducer { // default to byte array partitioner http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index 6fa00dd..4616c7e 100644 --- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -24,7 +24,7 @@ import kafka.common.KafkaException import kafka.utils.Logging import kafka.client.ClientUtils - +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class BrokerPartitionInfo(producerConfig: ProducerConfig, producerPool: ProducerPool, topicPartitionInfo: HashMap[String, TopicMetadata]) @@ -101,4 +101,5 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, } +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") case class PartitionAndLeader(topic: String, partitionId: Int, leaderBrokerIdOpt: Option[Int]) http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala index e6b100e..7848456 100755 --- a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala +++ b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala @@ -21,6 +21,8 @@ package kafka.producer import kafka.utils._ import org.apache.kafka.common.utils.Utils +@deprecated("This class has been deprecated and will be removed in a future release. " + + "It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0") class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner { def partition(key: Any, numPartitions: Int): Int = { Utils.abs(java.util.Arrays.hashCode(key.asInstanceOf[Array[Byte]])) % numPartitions http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/DefaultPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala index 1141ed1..6b10e51 100755 --- a/core/src/main/scala/kafka/producer/DefaultPartitioner.scala +++ b/core/src/main/scala/kafka/producer/DefaultPartitioner.scala @@ -21,6 +21,8 @@ package kafka.producer import kafka.utils._ import org.apache.kafka.common.utils.Utils +@deprecated("This class has been deprecated and will be removed in a future release. " + + "It has been replaced by org.apache.kafka.clients.producer.internals.DefaultPartitioner.", "0.10.0.0") class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner { private val random = new java.util.Random http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/KeyedMessage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/KeyedMessage.scala b/core/src/main/scala/kafka/producer/KeyedMessage.scala index dbcf295..84ea232 100644 --- a/core/src/main/scala/kafka/producer/KeyedMessage.scala +++ b/core/src/main/scala/kafka/producer/KeyedMessage.scala @@ -21,6 +21,8 @@ package kafka.producer * A topic, key, and value. * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored. */ +@deprecated("This class has been deprecated and will be removed in a future release. " + + "Please use org.apache.kafka.clients.producer.ProducerRecord instead.", "0.10.0.0") case class KeyedMessage[K, V](topic: String, key: K, partKey: Any, message: V) { if(topic == null) throw new IllegalArgumentException("Topic cannot be null.") http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/Partitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/Partitioner.scala b/core/src/main/scala/kafka/producer/Partitioner.scala index efe6d6d..5d24692 100644 --- a/core/src/main/scala/kafka/producer/Partitioner.scala +++ b/core/src/main/scala/kafka/producer/Partitioner.scala @@ -23,6 +23,8 @@ package kafka.producer * Implementations will be constructed via reflection and are required to have a constructor that takes a single * VerifiableProperties instance--this allows passing configuration properties into the partitioner implementation. */ +@deprecated("This trait has been deprecated and will be removed in a future release. " + + "Please use org.apache.kafka.clients.producer.Partitioner instead.", "0.10.0.0") trait Partitioner { /** * Uses the key to calculate a partition bucket id for routing http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/Producer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 4be06c8..c11ad21 100755 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -25,7 +25,8 @@ import kafka.producer.async.{DefaultEventHandler, EventHandler, ProducerSendThre import kafka.serializer.Encoder import kafka.utils._ - +@deprecated("This class has been deprecated and will be removed in a future release. " + + "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0") class Producer[K,V](val config: ProducerConfig, private val eventHandler: EventHandler[K,V]) // only for unit testing extends Logging { http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerClosedException.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/ProducerClosedException.scala b/core/src/main/scala/kafka/producer/ProducerClosedException.scala index 27a5293..4f2f731 100644 --- a/core/src/main/scala/kafka/producer/ProducerClosedException.scala +++ b/core/src/main/scala/kafka/producer/ProducerClosedException.scala @@ -17,5 +17,6 @@ package kafka.producer +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class ProducerClosedException() extends RuntimeException("producer already closed") { } http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/ProducerConfig.scala b/core/src/main/scala/kafka/producer/ProducerConfig.scala index 08a4e51..c2715d0 100755 --- a/core/src/main/scala/kafka/producer/ProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/ProducerConfig.scala @@ -23,6 +23,8 @@ import kafka.utils.{CoreUtils, VerifiableProperties} import kafka.message.NoCompressionCodec import kafka.common.{InvalidConfigException, Config} +@deprecated("This object has been deprecated and will be removed in a future release. " + + "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0") object ProducerConfig extends Config { def validate(config: ProducerConfig) { validateClientId(config.clientId) @@ -48,6 +50,8 @@ object ProducerConfig extends Config { } } +@deprecated("This class has been deprecated and will be removed in a future release. " + + "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0") class ProducerConfig private (val props: VerifiableProperties) extends AsyncProducerConfig with SyncProducerConfigShared { import ProducerConfig._ http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerPool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/ProducerPool.scala b/core/src/main/scala/kafka/producer/ProducerPool.scala index 5ad6812..60cef63 100644 --- a/core/src/main/scala/kafka/producer/ProducerPool.scala +++ b/core/src/main/scala/kafka/producer/ProducerPool.scala @@ -26,7 +26,7 @@ import kafka.utils.Logging import scala.collection.mutable.HashMap - +@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0") object ProducerPool { /** * Used in ProducerPool to initiate a SyncProducer connection with a broker. @@ -40,6 +40,7 @@ object ProducerPool { } } +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class ProducerPool(val config: ProducerConfig) extends Logging { private val syncProducers = new HashMap[Int, SyncProducer] private val lock = new Object() http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerRequestStats.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala index b453f63..8ab948a 100644 --- a/core/src/main/scala/kafka/producer/ProducerRequestStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerRequestStats.scala @@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit import kafka.utils.Pool import kafka.common.{ClientIdAllBrokers, ClientIdBroker, ClientIdAndBroker} +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup { val tags = metricId match { case ClientIdAndBroker(clientId, brokerHost, brokerPort) => Map("clientId" -> clientId, "brokerHost" -> brokerHost, "brokerPort" -> brokerPort.toString) @@ -36,6 +37,7 @@ class ProducerRequestMetrics(metricId: ClientIdBroker) extends KafkaMetricsGroup * Tracks metrics of requests made by a given producer client to all brokers. * @param clientId ClientId of the given producer */ +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class ProducerRequestStats(clientId: String) { private val valueFactory = (k: ClientIdBroker) => new ProducerRequestMetrics(k) private val stats = new Pool[ClientIdBroker, ProducerRequestMetrics](Some(valueFactory)) @@ -51,6 +53,7 @@ class ProducerRequestStats(clientId: String) { /** * Stores the request stats information of each producer client in a (clientId -> ProducerRequestStats) map. */ +@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0") object ProducerRequestStatsRegistry { private val valueFactory = (k: String) => new ProducerRequestStats(k) private val globalStats = new Pool[String, ProducerRequestStats](Some(valueFactory)) http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerStats.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/ProducerStats.scala b/core/src/main/scala/kafka/producer/ProducerStats.scala index 1d0fa88..9466f26 100644 --- a/core/src/main/scala/kafka/producer/ProducerStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerStats.scala @@ -20,6 +20,7 @@ import kafka.metrics.KafkaMetricsGroup import java.util.concurrent.TimeUnit import kafka.utils.Pool +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class ProducerStats(clientId: String) extends KafkaMetricsGroup { val tags: Map[String, String] = Map("clientId" -> clientId) val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS, tags) @@ -30,6 +31,7 @@ class ProducerStats(clientId: String) extends KafkaMetricsGroup { /** * Stores metrics of serialization and message sending activity of each producer client in a (clientId -> ProducerStats) map. */ +@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0") object ProducerStatsRegistry { private val valueFactory = (k: String) => new ProducerStats(k) private val statsRegistry = new Pool[String, ProducerStats](Some(valueFactory)) http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/ProducerTopicStats.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala index 97594c8..7bb9610 100644 --- a/core/src/main/scala/kafka/producer/ProducerTopicStats.scala +++ b/core/src/main/scala/kafka/producer/ProducerTopicStats.scala @@ -21,7 +21,7 @@ import kafka.common.{ClientIdTopic, ClientIdAllTopics, ClientIdAndTopic} import kafka.utils.{Pool, threadsafe} import java.util.concurrent.TimeUnit - +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") @threadsafe class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup { val tags = metricId match { @@ -38,6 +38,7 @@ class ProducerTopicMetrics(metricId: ClientIdTopic) extends KafkaMetricsGroup { * Tracks metrics for each topic the given producer client has produced data to. * @param clientId The clientId of the given producer client. */ +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class ProducerTopicStats(clientId: String) { private val valueFactory = (k: ClientIdTopic) => new ProducerTopicMetrics(k) private val stats = new Pool[ClientIdTopic, ProducerTopicMetrics](Some(valueFactory)) @@ -53,6 +54,7 @@ class ProducerTopicStats(clientId: String) { /** * Stores the topic stats information of each producer client in a (clientId -> ProducerTopicStats) map. */ +@deprecated("This object has been deprecated and will be removed in a future release.", "0.10.0.0") object ProducerTopicStatsRegistry { private val valueFactory = (k: String) => new ProducerTopicStats(k) private val globalStats = new Pool[String, ProducerTopicStats](Some(valueFactory)) http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/SyncProducer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index ec3c4ab..de4f4ad 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -27,6 +27,8 @@ import org.apache.kafka.common.network.NetworkReceive import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.utils.Utils._ +@deprecated("This object has been deprecated and will be removed in a future release. " + + "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0") object SyncProducer { val RequestKey: Short = 0 val randomGenerator = new Random @@ -36,6 +38,8 @@ object SyncProducer { * Send a message set. */ @threadsafe +@deprecated("This class has been deprecated and will be removed in a future release. " + + "Please use org.apache.kafka.clients.producer.KafkaProducer instead.", "0.10.0.0") class SyncProducer(val config: SyncProducerConfig) extends Logging { private val lock = new Object() http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/SyncProducerConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala index a08ce00..207779c 100644 --- a/core/src/main/scala/kafka/producer/SyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/SyncProducerConfig.scala @@ -20,6 +20,8 @@ package kafka.producer import java.util.Properties import kafka.utils.VerifiableProperties +@deprecated("This class has been deprecated and will be removed in a future release. " + + "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0") class SyncProducerConfig private (val props: VerifiableProperties) extends SyncProducerConfigShared { def this(originalProps: Properties) { this(new VerifiableProperties(originalProps)) @@ -33,6 +35,8 @@ class SyncProducerConfig private (val props: VerifiableProperties) extends SyncP val port = props.getInt("port") } +@deprecated("This trait has been deprecated and will be removed in a future release. " + + "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0") trait SyncProducerConfigShared { val props: VerifiableProperties @@ -59,6 +63,8 @@ trait SyncProducerConfigShared { (1, Integer.MAX_VALUE)) } +@deprecated("This object has been deprecated and will be removed in a future release. " + + "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0") object SyncProducerConfig { val DefaultClientId = "" val DefaultRequiredAcks : Short = 0 http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala index dd39de5..cc3a79d 100644 --- a/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala +++ b/core/src/main/scala/kafka/producer/async/AsyncProducerConfig.scala @@ -18,6 +18,8 @@ package kafka.producer.async import kafka.utils.VerifiableProperties +@deprecated("This trait has been deprecated and will be removed in a future release. " + + "Please use org.apache.kafka.clients.producer.ProducerConfig instead.", "0.10.0.0") trait AsyncProducerConfig { val props: VerifiableProperties http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 7abe48a..b79e64b 100755 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -31,6 +31,7 @@ import java.util.concurrent.atomic._ import kafka.api.{TopicMetadata, ProducerRequest} import org.apache.kafka.common.utils.Utils +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class DefaultEventHandler[K,V](config: ProducerConfig, private val partitioner: Partitioner, private val encoder: Encoder[V], http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/EventHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/EventHandler.scala b/core/src/main/scala/kafka/producer/async/EventHandler.scala index e724000..3a17bfb 100644 --- a/core/src/main/scala/kafka/producer/async/EventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/EventHandler.scala @@ -21,6 +21,7 @@ import kafka.producer.KeyedMessage /** * Handler that dispatches the batched data from the queue. */ +@deprecated("This trait has been deprecated and will be removed in a future release.", "0.10.0.0") trait EventHandler[K,V] { /** http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala b/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala index 9ecdf76..7779715 100644 --- a/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala +++ b/core/src/main/scala/kafka/producer/async/IllegalQueueStateException.scala @@ -20,6 +20,7 @@ package kafka.producer.async /** * Indicates that the given config parameter has invalid value */ +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class IllegalQueueStateException(message: String) extends RuntimeException(message) { def this() = this(null) } http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/MissingConfigException.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/MissingConfigException.scala b/core/src/main/scala/kafka/producer/async/MissingConfigException.scala index 304e0b2..a42678b 100644 --- a/core/src/main/scala/kafka/producer/async/MissingConfigException.scala +++ b/core/src/main/scala/kafka/producer/async/MissingConfigException.scala @@ -18,6 +18,7 @@ package kafka.producer.async /* Indicates any missing configuration parameter */ +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class MissingConfigException(message: String) extends RuntimeException(message) { def this() = this(null) } http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala index 8a903f3..d423757 100644 --- a/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala +++ b/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala @@ -24,6 +24,7 @@ import kafka.producer.KeyedMessage import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge +@deprecated("This class has been deprecated and will be removed in a future release.", "0.10.0.0") class ProducerSendThread[K,V](val threadName: String, val queue: BlockingQueue[KeyedMessage[K,V]], val handler: EventHandler[K,V], http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/main/scala/kafka/tools/KafkaMigrationTool.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java index b1ab649..0b94902 100755 --- a/core/src/main/scala/kafka/tools/KafkaMigrationTool.java +++ b/core/src/main/scala/kafka/tools/KafkaMigrationTool.java @@ -60,7 +60,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * The user need to provide the configuration file for 0.7 consumer and 0.8 producer. For 0.8 producer, * the "serializer.class" config is set to "kafka.serializer.DefaultEncoder" by the code. */ -@SuppressWarnings({"unchecked", "rawtypes"}) +@SuppressWarnings({"unchecked", "rawtypes", "deprecation"}) public class KafkaMigrationTool { private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaMigrationTool.class.getName()); private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "kafka.consumer.Consumer"; http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 2d89bf8..49ce748 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -48,7 +48,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { private var consumer2: SimpleConsumer = null private val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() - private val topic = "topic" + protected val topic = "topic" private val numRecords = 100 @Before @@ -227,26 +227,6 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { } } - @Test - def testWrongSerializer() { - // send a record with a wrong type should receive a serialization exception - try { - val producer = createProducerWithWrongSerializer(brokerList) - val record5 = new ProducerRecord[Array[Byte], Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes) - producer.send(record5) - fail("Should have gotten a SerializationException") - } catch { - case se: SerializationException => // this is ok - } - } - - private def createProducerWithWrongSerializer(brokerList: String): KafkaProducer[Array[Byte], Array[Byte]] = { - val producerProps = new Properties() - producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") - producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") - createProducer(brokerList, props = Some(producerProps)) - } - /** * testClose checks the closing behavior * http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index d017d13..111bc15 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -19,8 +19,9 @@ package kafka.api import java.util.Properties -import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.config.ConfigException +import org.apache.kafka.common.errors.SerializationException import org.apache.kafka.common.serialization.ByteArraySerializer import org.junit.Test @@ -51,4 +52,25 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { return new KafkaProducer[Array[Byte],Array[Byte]](producerProps, new ByteArraySerializer, new ByteArraySerializer) } + @Test + def testWrongSerializer() { + // send a record with a wrong type should receive a serialization exception + try { + val producer = createProducerWithWrongSerializer(brokerList) + val record5 = new ProducerRecord[Array[Byte], Array[Byte]](topic, new Integer(0), "key".getBytes, "value".getBytes) + producer.send(record5) + fail("Should have gotten a SerializationException") + } catch { + case se: SerializationException => // this is ok + } + } + + private def createProducerWithWrongSerializer(brokerList: String): KafkaProducer[Array[Byte], Array[Byte]] = { + val producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + return new KafkaProducer[Array[Byte],Array[Byte]](producerProps) + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala index 2bb203d..7a22c73 100644 --- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala +++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala @@ -63,9 +63,12 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness { override def setUp() { super.setUp() - producer1 = TestUtils.createNewProducer(brokerList, acks = 0, maxBlockMs = 10000L, bufferSize = producerBufferSize) - producer2 = TestUtils.createNewProducer(brokerList, acks = 1, maxBlockMs = 10000L, bufferSize = producerBufferSize) - producer3 = TestUtils.createNewProducer(brokerList, acks = -1, maxBlockMs = 10000L, bufferSize = producerBufferSize) + producer1 = TestUtils.createNewProducer(brokerList, acks = 0, requestTimeoutMs = 30000L, maxBlockMs = 10000L, + bufferSize = producerBufferSize) + producer2 = TestUtils.createNewProducer(brokerList, acks = 1, requestTimeoutMs = 30000L, maxBlockMs = 10000L, + bufferSize = producerBufferSize) + producer3 = TestUtils.createNewProducer(brokerList, acks = -1, requestTimeoutMs = 30000L, maxBlockMs = 10000L, + bufferSize = producerBufferSize) } @After http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala b/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala index 7e211b7..4ddc7fe 100644 --- a/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala +++ b/core/src/test/scala/kafka/tools/ConsoleProducerTest.scala @@ -51,9 +51,10 @@ class ConsoleProducerTest { } @Test + @deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0") def testValidConfigsOldProducer() { val config = new ConsoleProducer.ProducerConfig(validArgs) - new producer.ProducerConfig(ConsoleProducer.getOldProducerProps(config)); + new producer.ProducerConfig(ConsoleProducer.getOldProducerProps(config)) } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/common/ConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/common/ConfigTest.scala b/core/src/test/scala/unit/kafka/common/ConfigTest.scala index a42836c..26154f2 100644 --- a/core/src/test/scala/unit/kafka/common/ConfigTest.scala +++ b/core/src/test/scala/unit/kafka/common/ConfigTest.scala @@ -26,6 +26,7 @@ import kafka.consumer.ConsumerConfig class ConfigTest { @Test + @deprecated("This test is deprecated and it will be removed in a future release.", "0.10.0.0") def testInvalidClientIds() { val invalidClientIds = new ArrayBuffer[String]() val badChars = Array('/', '\\', ',', '\u0000', ':', "\"", '\'', ';', '*', '?', ' ', '\t', '\r', '\n', '=') http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala index 28b1dd5..a69fba1 100644 --- a/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala @@ -34,6 +34,7 @@ import org.junit.{Test, After, Before} import scala.collection._ +@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0") class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with Logging { val RebalanceBackoffMs = 5000 http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala index a71ddf1..4515b94 100644 --- a/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala +++ b/core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala @@ -28,6 +28,7 @@ import org.junit.{After, Before, Test} import org.apache.log4j.{Level, Logger} import org.junit.Assert._ +@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0") class AutoOffsetResetTest extends KafkaServerTestHarness with Logging { def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/integration/FetcherTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 5af5d1a..3dd0454 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -68,11 +68,11 @@ class FetcherTest extends KafkaServerTestHarness { @Test def testFetcher() { val perNode = 2 - var count = TestUtils.sendMessages(servers, topic, perNode).size + var count = TestUtils.produceMessages(servers, topic, perNode).size fetch(count) assertQueueEmpty() - count = TestUtils.sendMessages(servers, topic, perNode).size + count = TestUtils.produceMessages(servers, topic, perNode).size fetch(count) assertQueueEmpty() } http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index df752db..beb5d0e 100755 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -34,6 +34,7 @@ import java.util.Properties /** * End to end tests of the primitive apis against a local server */ +@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0") class PrimitiveApiTest extends ProducerConsumerTestHarness with ZooKeeperTestHarness { val requestHandlerLogger = Logger.getLogger(classOf[KafkaRequestHandler]) http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala index cc5954d..2fdfc48 100644 --- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala @@ -23,6 +23,7 @@ import kafka.producer.Producer import kafka.utils.{StaticPartitioner, TestUtils} import kafka.serializer.StringEncoder +@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0") trait ProducerConsumerTestHarness extends KafkaServerTestHarness { val host = "localhost" var producer: Producer[String, String] = null http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala index 8e72ad3..b725d8b 100755 --- a/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/integration/UncleanLeaderElectionTest.scala @@ -23,6 +23,8 @@ import org.junit.{Test, After, Before} import scala.util.Random import org.apache.log4j.{Level, Logger} import java.util.Properties +import java.util.concurrent.ExecutionException + import kafka.admin.AdminUtils import kafka.common.FailedToSendMessageException import kafka.consumer.{Consumer, ConsumerConfig} @@ -31,6 +33,7 @@ import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.CoreUtils import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.common.errors.TimeoutException import org.junit.Assert._ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { @@ -180,14 +183,14 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 debug("Follower for " + topic + " is: %s".format(followerId)) - sendMessage(servers, topic, "first") + produceMessage(servers, topic, "first") waitUntilMetadataIsPropagated(servers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic)) // shutdown follower server servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) - sendMessage(servers, topic, "second") + produceMessage(servers, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) // shutdown leader and then restart follower @@ -197,7 +200,7 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { // wait until new leader is (uncleanly) elected waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(followerId)) - sendMessage(servers, topic, "third") + produceMessage(servers, topic, "third") // second message was lost due to unclean election assertEquals(List("first", "third"), consumeAllMessages(topic)) @@ -215,14 +218,14 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { val followerId = if (leaderId == brokerId1) brokerId2 else brokerId1 debug("Follower for " + topic + " is: %s".format(followerId)) - sendMessage(servers, topic, "first") + produceMessage(servers, topic, "first") waitUntilMetadataIsPropagated(servers, topic, partitionId) assertEquals(List("first"), consumeAllMessages(topic)) // shutdown follower server servers.filter(server => server.config.brokerId == followerId).map(server => shutdownServer(server)) - sendMessage(servers, topic, "second") + produceMessage(servers, topic, "second") assertEquals(List("first", "second"), consumeAllMessages(topic)) // shutdown leader and then restart follower @@ -233,16 +236,20 @@ class UncleanLeaderElectionTest extends ZooKeeperTestHarness { waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(-1)) // message production and consumption should both fail while leader is down - intercept[FailedToSendMessageException] { - sendMessage(servers, topic, "third") + try { + produceMessage(servers, topic, "third") + fail("Message produced while leader is down should fail, but it succeeded") + } catch { + case e: ExecutionException if e.getCause.isInstanceOf[TimeoutException] => // expected } + assertEquals(List.empty[String], consumeAllMessages(topic)) // restart leader temporarily to send a successfully replicated message servers.filter(server => server.config.brokerId == leaderId).map(server => server.startup()) waitUntilLeaderIsElectedOrChanged(zkUtils, topic, partitionId, newLeaderOpt = Some(leaderId)) - sendMessage(servers, topic, "third") + produceMessage(servers, topic, "third") waitUntilMetadataIsPropagated(servers, topic, partitionId) servers.filter(server => server.config.brokerId == leaderId).map(server => shutdownServer(server)) http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala index 88d95e8..e4c4697 100644 --- a/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala +++ b/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala @@ -36,7 +36,7 @@ import scala.collection.JavaConversions import org.apache.log4j.{Level, Logger} import org.junit.Assert._ - +@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0") class ZookeeperConsumerConnectorTest extends KafkaServerTestHarness with ZooKeeperTestHarness with Logging { val numNodes = 2 val numParts = 2 http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala index ee41fd7..3707deb 100644 --- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala +++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala @@ -52,6 +52,7 @@ class MetricsTest extends KafkaServerTestHarness with Logging { } @Test + @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0") def testMetricsLeak() { // create topic topic1 with 1 partition on broker 0 createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers) @@ -78,13 +79,14 @@ class MetricsTest extends KafkaServerTestHarness with Logging { assertFalse("Topic metrics exists after deleteTopic", checkTopicMetricsExists(topic)) } + @deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0") def createAndShutdownStep(group: String, consumerId: String, producerId: String): Unit = { - val sentMessages1 = sendMessages(servers, topic, nMessages) + sendMessages(servers, topic, nMessages) // create a consumer val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumerId)) val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true) val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Map(topic -> 1), new StringDecoder(), new StringDecoder()) - val receivedMessages1 = getMessages(topicMessageStreams1, nMessages) + getMessages(topicMessageStreams1, nMessages) zkConsumerConnector1.shutdown() } http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index d94c314..5d28894 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -17,7 +17,6 @@ package kafka.network; - import java.net._ import javax.net.ssl._ import java.io._ @@ -33,7 +32,6 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.{ProduceRequest, RequestHeader} import org.apache.kafka.common.utils.SystemTime -import kafka.producer.SyncProducerConfig import kafka.server.KafkaConfig import kafka.utils.TestUtils @@ -103,9 +101,9 @@ class SocketServerTest extends JUnitSuite { private def producerRequestBytes: Array[Byte] = { val apiKey: Short = 0 val correlationId = -1 - val clientId = SyncProducerConfig.DefaultClientId - val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs - val ack = SyncProducerConfig.DefaultRequiredAcks + val clientId = "" + val ackTimeoutMs = 10000 + val ack = 0: Short val emptyHeader = new RequestHeader(apiKey, clientId, correlationId) val emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap[TopicPartition, ByteBuffer]()) @@ -249,9 +247,9 @@ class SocketServerTest extends JUnitSuite { val apiKey = ApiKeys.PRODUCE.id val correlationId = -1 - val clientId = SyncProducerConfig.DefaultClientId - val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs - val ack = SyncProducerConfig.DefaultRequiredAcks + val clientId = "" + val ackTimeoutMs = 10000 + val ack = 0: Short val emptyHeader = new RequestHeader(apiKey, clientId, correlationId) val emptyRequest = new ProduceRequest(ack, ackTimeoutMs, new HashMap[TopicPartition, ByteBuffer]()) http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index f711ca4..3088199 100755 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -35,7 +35,13 @@ import scala.collection.Map import scala.collection.mutable.ArrayBuffer import kafka.utils._ +@deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0") class AsyncProducerTest { + + class NegativePartitioner(props: VerifiableProperties = null) extends Partitioner { + def partition(data: Any, numPartitions: Int): Int = -1 + } + // One of the few cases we can just set a fixed port because the producer is mocked out here since this uses mocks val props = Seq(createBrokerConfig(1, "127.0.0.1:1", port=65534)) val configs = props.map(KafkaConfig.fromProps) @@ -373,15 +379,20 @@ class AsyncProducerTest { val msgs = TestUtils.getMsgStrings(2) + import SyncProducerConfig.{DefaultAckTimeoutMs, DefaultClientId} + // produce request for topic1 and partitions 0 and 1. Let the first request fail // entirely. The second request will succeed for partition 1 but fail for partition 0. // On the third try for partition 0, let it succeed. - val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 11) - val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 17) + val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, + correlationId = 11, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId) + val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, + correlationId = 17, timeout = DefaultAckTimeoutMs, clientId = DefaultClientId) val response1 = ProducerResponse(0, Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NOT_LEADER_FOR_PARTITION.code, 0L)), (TopicAndPartition("topic1", 1), ProducerResponseStatus(Errors.NONE.code, 0L)))) - val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 21) + val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 21, + timeout = DefaultAckTimeoutMs, clientId = DefaultClientId) val response2 = ProducerResponse(0, Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(Errors.NONE.code, 0L)))) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) @@ -480,7 +491,3 @@ class AsyncProducerTest { messages.map(m => new Message(key = key, bytes = m, timestamp = 0L, magicValue = Message.MagicValue_V1)): _*) } } - -class NegativePartitioner(props: VerifiableProperties = null) extends Partitioner { - def partition(data: Any, numPartitions: Int): Int = -1 -} http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/producer/ProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index de19f6f..4a1ad5a 100755 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -37,6 +37,7 @@ import org.junit.Assert._ import org.junit.{After, Before, Test} import org.scalatest.exceptions.TestFailedException +@deprecated("This test has been deprecated and it will be removed in a future release.", "0.10.0.0") class ProducerTest extends ZooKeeperTestHarness with Logging{ private val brokerId1 = 0 private val brokerId2 = 1 http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index c1034fe..8e234d2 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -21,7 +21,7 @@ import java.net.SocketTimeoutException import java.util.Properties import kafka.admin.AdminUtils -import kafka.api.ProducerResponseStatus +import kafka.api.{ProducerRequest, ProducerResponseStatus} import kafka.common.TopicAndPartition import kafka.integration.KafkaServerTestHarness import kafka.message._ @@ -31,11 +31,22 @@ import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} import org.junit.Test import org.junit.Assert._ +@deprecated("This test has been deprecated and it will be removed in a future release", "0.10.0.0") class SyncProducerTest extends KafkaServerTestHarness { private val messageBytes = new Array[Byte](2) // turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool. def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, zkConnect, false).head)) + private def produceRequest(topic: String, + partition: Int, + message: ByteBufferMessageSet, + acks: Int, + timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs, + correlationId: Int = 0, + clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = { + TestUtils.produceRequest(topic, partition, message, acks, timeout, correlationId, clientId) + } + @Test def testReachableServer() { val server = servers.head @@ -46,7 +57,7 @@ class SyncProducerTest extends KafkaServerTestHarness { val producer = new SyncProducer(new SyncProducerConfig(props)) val firstStart = SystemTime.milliseconds try { - val response = producer.send(TestUtils.produceRequest("test", 0, + val response = producer.send(produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)) assertNotNull(response) } catch { @@ -56,7 +67,7 @@ class SyncProducerTest extends KafkaServerTestHarness { assertTrue((firstEnd-firstStart) < 500) val secondStart = SystemTime.milliseconds try { - val response = producer.send(TestUtils.produceRequest("test", 0, + val response = producer.send(produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)) assertNotNull(response) } catch { @@ -65,7 +76,7 @@ class SyncProducerTest extends KafkaServerTestHarness { val secondEnd = SystemTime.milliseconds assertTrue((secondEnd-secondStart) < 500) try { - val response = producer.send(TestUtils.produceRequest("test", 0, + val response = producer.send(produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)) assertNotNull(response) } catch { @@ -101,7 +112,7 @@ class SyncProducerTest extends KafkaServerTestHarness { val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1)) val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1) - val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1, acks = 1)) + val response1 = producer.send(produceRequest("test", 0, messageSet1, acks = 1)) assertEquals(1, response1.status.count(_._2.error != Errors.NONE.code)) assertEquals(Errors.MESSAGE_TOO_LARGE.code, response1.status(TopicAndPartition("test", 0)).error) @@ -110,7 +121,7 @@ class SyncProducerTest extends KafkaServerTestHarness { val safeSize = configs(0).messageMaxBytes - Message.MinMessageOverhead - Message.TimestampLength - MessageSet.LogOverhead - 1 val message2 = new Message(new Array[Byte](safeSize)) val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2) - val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2, acks = 1)) + val response2 = producer.send(produceRequest("test", 0, messageSet2, acks = 1)) assertEquals(1, response1.status.count(_._2.error != Errors.NONE.code)) assertEquals(Errors.NONE.code, response2.status(TopicAndPartition("test", 0)).error) @@ -130,14 +141,14 @@ class SyncProducerTest extends KafkaServerTestHarness { TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, "test", 0) // This message will be dropped silently since message size too large. - producer.send(TestUtils.produceRequest("test", 0, + producer.send(produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0)) // Send another message whose size is large enough to exceed the buffer size so // the socket buffer will be flushed immediately; // this send should fail since the socket has been closed try { - producer.send(TestUtils.produceRequest("test", 0, + producer.send(produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(new Array[Byte](configs(0).messageMaxBytes + 1))), acks = 0)) } catch { case e : java.io.IOException => // success @@ -154,7 +165,8 @@ class SyncProducerTest extends KafkaServerTestHarness { val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) // #1 - test that we get an error when partition does not belong to broker in response - val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1) + val request = TestUtils.produceRequestWithAcks(Array("topic1", "topic2", "topic3"), Array(0), messages, 1, + timeout = SyncProducerConfig.DefaultAckTimeoutMs, clientId = SyncProducerConfig.DefaultClientId) val response = producer.send(request) assertNotNull(response) @@ -199,7 +211,7 @@ class SyncProducerTest extends KafkaServerTestHarness { val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) - val request = TestUtils.produceRequest("topic1", 0, messages, acks = 1) + val request = produceRequest("topic1", 0, messages, acks = 1) // stop IO threads and request handling, but leave networking operational // any requests should be accepted and queue up, but not handled @@ -248,7 +260,7 @@ class SyncProducerTest extends KafkaServerTestHarness { AdminUtils.createTopic(zkUtils, topicName, 1, 1,topicProps) TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, topicName, 0) - val response = producer.send(TestUtils.produceRequest(topicName, 0, + val response = producer.send(produceRequest(topicName, 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)),-1)) assertEquals(Errors.NOT_ENOUGH_REPLICAS.code, response.status(TopicAndPartition(topicName, 0)).error) http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala index 5ecc2c0..c5b61de 100644 --- a/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseReplicaFetchTest.scala @@ -20,13 +20,13 @@ package kafka.server import java.io.File import org.apache.kafka.common.protocol.SecurityProtocol -import org.junit.{Test, After, Before} +import org.junit.{After, Before, Test} import kafka.zk.ZooKeeperTestHarness -import kafka.utils.TestUtils._ -import kafka.producer.KeyedMessage -import kafka.serializer.StringEncoder -import kafka.utils.{TestUtils} +import kafka.utils.TestUtils +import TestUtils._ import kafka.common._ +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.serialization.StringSerializer abstract class BaseReplicaFetchTest extends ZooKeeperTestHarness { var brokers: Seq[KafkaServer] = null @@ -63,11 +63,13 @@ abstract class BaseReplicaFetchTest extends ZooKeeperTestHarness { } // send test messages to leader - val producer = TestUtils.createProducer[String, String](TestUtils.getBrokerListStrFromServers(brokers), - encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[StringEncoder].getName) - val messages = testMessageList1.map(m => new KeyedMessage(topic1, m, m)) ++ testMessageList2.map(m => new KeyedMessage(topic2, m, m)) - producer.send(messages:_*) + val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(brokers), + retries = 5, + keySerializer = new StringSerializer, + valueSerializer = new StringSerializer) + val records = testMessageList1.map(m => new ProducerRecord(topic1, m, m)) ++ + testMessageList2.map(m => new ProducerRecord(topic2, m, m)) + records.map(producer.send).foreach(_.get) producer.close() def logsMatch(): Boolean = { http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index d11c40f..e13bfd9 100755 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -19,15 +19,14 @@ package kafka.server import java.util.Properties import kafka.utils.TestUtils._ -import kafka.utils.{IntEncoder, CoreUtils, TestUtils} +import kafka.utils.{CoreUtils, TestUtils} import kafka.zk.ZooKeeperTestHarness import kafka.common._ -import kafka.producer.{KeyedMessage, Producer} -import kafka.serializer.StringEncoder - import java.io.File -import org.junit.{Test, After, Before} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer} +import org.junit.{After, Before, Test} import org.junit.Assert._ class LogRecoveryTest extends ZooKeeperTestHarness { @@ -54,7 +53,7 @@ class LogRecoveryTest extends ZooKeeperTestHarness { val message = "hello" - var producer: Producer[Int, String] = null + var producer: KafkaProducer[Integer, String] = null def hwFile1 = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename)) def hwFile2 = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename)) var servers = Seq.empty[KafkaServer] @@ -64,16 +63,19 @@ class LogRecoveryTest extends ZooKeeperTestHarness { def updateProducer() = { if (producer != null) producer.close() - producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromServers(servers), - encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[IntEncoder].getName) + producer = TestUtils.createNewProducer( + TestUtils.getBrokerListStrFromServers(servers), + retries = 5, + keySerializer = new IntegerSerializer, + valueSerializer = new StringSerializer + ) } @Before override def setUp() { super.setUp() - configs = TestUtils.createBrokerConfigs(2, zkConnect, false).map(KafkaConfig.fromProps(_, overridingProps)) + configs = TestUtils.createBrokerConfigs(2, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) // start both servers server1 = TestUtils.createServer(configProps1) @@ -230,7 +232,6 @@ class LogRecoveryTest extends ZooKeeperTestHarness { } private def sendMessages(n: Int = 1) { - for(i <- 0 until n) - producer.send(new KeyedMessage[Int, String](topic, 0, message)) + (0 until n).map(_ => producer.send(new ProducerRecord(topic, 0, message))).foreach(_.get) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 8f081b9..67f62d9 100755 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -18,15 +18,14 @@ package kafka.server import kafka.zk.ZooKeeperTestHarness import kafka.consumer.SimpleConsumer -import kafka.producer._ -import kafka.utils.{IntEncoder, TestUtils, CoreUtils} +import kafka.utils.{CoreUtils, TestUtils} import kafka.utils.TestUtils._ import kafka.api.FetchRequestBuilder import kafka.message.ByteBufferMessageSet -import kafka.serializer.StringEncoder - import java.io.File +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.serialization.{IntegerSerializer, StringSerializer} import org.junit.{Before, Test} import org.junit.Assert._ @@ -46,27 +45,34 @@ class ServerShutdownTest extends ZooKeeperTestHarness { @Test def testCleanShutdown() { + + def createProducer(server: KafkaServer): KafkaProducer[Integer, String] = + TestUtils.createNewProducer( + TestUtils.getBrokerListStrFromServers(Seq(server)), + retries = 5, + keySerializer = new IntegerSerializer, + valueSerializer = new StringSerializer + ) + var server = new KafkaServer(config, threadNamePrefix = Option(this.getClass.getName)) server.startup() - var producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromServers(Seq(server)), - encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[IntEncoder].getName) + var producer = createProducer(server) // create topic createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = Seq(server)) // send some messages - producer.send(sent1.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*) + sent1.map(value => producer.send(new ProducerRecord(topic, 0, value))).foreach(_.get) // do a clean shutdown and check that offset checkpoint file exists server.shutdown() - for(logDir <- config.logDirs) { + for (logDir <- config.logDirs) { val OffsetCheckpointFile = new File(logDir, server.logManager.RecoveryPointCheckpointFile) assertTrue(OffsetCheckpointFile.exists) assertTrue(OffsetCheckpointFile.length() > 0) } producer.close() - + /* now restart the server and check that the written data is still readable and everything still works */ server = new KafkaServer(config) server.startup() @@ -74,13 +80,11 @@ class ServerShutdownTest extends ZooKeeperTestHarness { // wait for the broker to receive the update metadata request after startup TestUtils.waitUntilMetadataIsPropagated(Seq(server), topic, 0) - producer = TestUtils.createProducer[Int, String](TestUtils.getBrokerListStrFromServers(Seq(server)), - encoder = classOf[StringEncoder].getName, - keyEncoder = classOf[IntEncoder].getName) + producer = createProducer(server) val consumer = new SimpleConsumer(host, server.boundPort(), 1000000, 64*1024, "") var fetchedMessage: ByteBufferMessageSet = null - while(fetchedMessage == null || fetchedMessage.validBytes == 0) { + while (fetchedMessage == null || fetchedMessage.validBytes == 0) { val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).maxWait(0).build()) fetchedMessage = fetched.messageSet(topic, 0) } @@ -88,10 +92,10 @@ class ServerShutdownTest extends ZooKeeperTestHarness { val newOffset = fetchedMessage.last.nextOffset // send some more messages - producer.send(sent2.map(m => new KeyedMessage[Int, String](topic, 0, m)):_*) + sent2.map(value => producer.send(new ProducerRecord(topic, 0, value))).foreach(_.get) fetchedMessage = null - while(fetchedMessage == null || fetchedMessage.validBytes == 0) { + while (fetchedMessage == null || fetchedMessage.validBytes == 0) { val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build()) fetchedMessage = fetched.messageSet(topic, 0) } http://git-wip-us.apache.org/repos/asf/kafka/blob/e89a9ce1/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 49fb85f..7b3e955 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -42,14 +42,15 @@ import kafka.consumer.{ConsumerConfig, ConsumerTimeoutException, KafkaStream} import kafka.serializer.{DefaultEncoder, Encoder, StringEncoder} import kafka.common.TopicAndPartition import kafka.admin.AdminUtils -import kafka.producer.ProducerConfig import kafka.log._ import kafka.utils.ZkUtils._ import org.junit.Assert._ -import org.apache.kafka.clients.producer.KafkaProducer -import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} +import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer, RangeAssignor} import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.network.Mode +import org.apache.kafka.common.record.CompressionType +import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer} import scala.collection.Map import scala.collection.JavaConversions._ @@ -342,7 +343,7 @@ object TestUtils extends Logging { // check if the actual iterator was longer if (actual.hasNext) { - var length2 = length; + var length2 = length while (actual.hasNext) { actual.next length2 += 1 @@ -419,6 +420,7 @@ object TestUtils extends Logging { * Create a producer with a few pre-configured properties. * If certain properties need to be overridden, they can be provided in producerProps. */ + @deprecated("This method has been deprecated and it will be removed in a future release.", "0.10.0.0") def createProducer[K, V](brokerList: String, encoder: String = classOf[DefaultEncoder].getName, keyEncoder: String = classOf[DefaultEncoder].getName, @@ -433,7 +435,7 @@ object TestUtils extends Logging { props.put("serializer.class", encoder) props.put("key.serializer.class", keyEncoder) props.put("partitioner.class", partitioner) - new Producer[K, V](new ProducerConfig(props)) + new Producer[K, V](new kafka.producer.ProducerConfig(props)) } private def securityConfigs(mode: Mode, @@ -453,16 +455,18 @@ object TestUtils extends Logging { /** * Create a (new) producer with a few pre-configured properties. */ - def createNewProducer(brokerList: String, + def createNewProducer[K, V](brokerList: String, acks: Int = -1, maxBlockMs: Long = 60 * 1000L, bufferSize: Long = 1024L * 1024L, retries: Int = 0, lingerMs: Long = 0, + requestTimeoutMs: Long = 10 * 1024L, securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT, trustStoreFile: Option[File] = None, - props: Option[Properties] = None): KafkaProducer[Array[Byte], Array[Byte]] = { - import org.apache.kafka.clients.producer.ProducerConfig + keySerializer: Serializer[K] = new ByteArraySerializer, + valueSerializer: Serializer[V] = new ByteArraySerializer, + props: Option[Properties] = None): KafkaProducer[K, V] = { val producerProps = props.getOrElse(new Properties) producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) @@ -470,15 +474,15 @@ object TestUtils extends Logging { producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString) producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) + producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs.toString) /* Only use these if not already set */ val defaultProps = Map( ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> "100", ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG -> "200", - ProducerConfig.LINGER_MS_CONFIG -> lingerMs.toString, - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArraySerializer", - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArraySerializer" + ProducerConfig.LINGER_MS_CONFIG -> lingerMs.toString ) + defaultProps.foreach { case (key, value) => if (!producerProps.containsKey(key)) producerProps.put(key, value) } @@ -489,10 +493,10 @@ object TestUtils extends Logging { * invoke it before this call in IntegrationTestHarness, otherwise the * SSL client auth fails. */ - if(!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)) + if (!producerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)) producerProps.putAll(producerSecurityConfigs(securityProtocol, trustStoreFile)) - new KafkaProducer[Array[Byte],Array[Byte]](producerProps) + new KafkaProducer[K, V](producerProps, keySerializer, valueSerializer) } private def usesSslTransportLayer(securityProtocol: SecurityProtocol): Boolean = securityProtocol match { @@ -558,8 +562,6 @@ object TestUtils extends Logging { props.put("request.timeout.ms", "2000") props.put("request.required.acks", "-1") props.put("send.buffer.bytes", "65536") - props.put("connect.timeout.ms", "100000") - props.put("reconnect.interval", "10000") props } @@ -620,23 +622,25 @@ object TestUtils extends Logging { /** * Create a wired format request based on simple basic information */ + @deprecated("This method has been deprecated and it will be removed in a future release", "0.10.0.0") def produceRequest(topic: String, partition: Int, message: ByteBufferMessageSet, - acks: Int = SyncProducerConfig.DefaultRequiredAcks, - timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs, + acks: Int, + timeout: Int, correlationId: Int = 0, - clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = { + clientId: String): ProducerRequest = { produceRequestWithAcks(Seq(topic), Seq(partition), message, acks, timeout, correlationId, clientId) } + @deprecated("This method has been deprecated and it will be removed in a future release", "0.10.0.0") def produceRequestWithAcks(topics: Seq[String], partitions: Seq[Int], message: ByteBufferMessageSet, - acks: Int = SyncProducerConfig.DefaultRequiredAcks, - timeout: Int = SyncProducerConfig.DefaultAckTimeoutMs, + acks: Int, + timeout: Int, correlationId: Int = 0, - clientId: String = SyncProducerConfig.DefaultClientId): ProducerRequest = { + clientId: String): ProducerRequest = { val data = topics.flatMap(topic => partitions.map(partition => (TopicAndPartition(topic, partition), message)) ) @@ -889,6 +893,8 @@ object TestUtils extends Logging { time = time, brokerState = new BrokerState()) } + + @deprecated("This method has been deprecated and it will be removed in a future release.", "0.10.0.0") def sendMessages(servers: Seq[KafkaServer], topic: String, numMessages: Int, @@ -908,7 +914,7 @@ object TestUtils extends Logging { partitioner = classOf[FixedValuePartitioner].getName, producerProps = props) - producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)):_*) + producer.send(ms.map(m => new KeyedMessage[Int, String](topic, partition, m)): _*) debug("Sent %d messages for partition [%s,%d]".format(ms.size, topic, partition)) producer.close() ms.toList @@ -920,24 +926,43 @@ object TestUtils extends Logging { keyEncoder = classOf[StringEncoder].getName, partitioner = classOf[DefaultPartitioner].getName, producerProps = props) - producer.send(ms.map(m => new KeyedMessage[String, String](topic, topic, m)):_*) + producer.send(ms.map(m => new KeyedMessage[String, String](topic, topic, m)): _*) producer.close() debug("Sent %d messages for topic [%s]".format(ms.size, topic)) ms.toList } - } - def sendMessage(servers: Seq[KafkaServer], - topic: String, - message: String) = { + def produceMessages(servers: Seq[KafkaServer], + topic: String, + numMessages: Int): Seq[String] = { + + val producer = createNewProducer( + TestUtils.getBrokerListStrFromServers(servers), + retries = 5, + requestTimeoutMs = 2000 + ) + + val values = (0 until numMessages).map(x => s"test-$x") + + val futures = values.map { value => + producer.send(new ProducerRecord(topic, null, null, value.getBytes)) + } + futures.foreach(_.get) + producer.close() + + debug(s"Sent ${values.size} messages for topic [$topic]") - val producer: Producer[String, String] = - createProducer(TestUtils.getBrokerListStrFromServers(servers), - encoder = classOf[StringEncoder].getName(), - keyEncoder = classOf[StringEncoder].getName()) + values + } - producer.send(new KeyedMessage[String, String](topic, topic, message)) + def produceMessage(servers: Seq[KafkaServer], topic: String, message: String) { + val producer = createNewProducer( + TestUtils.getBrokerListStrFromServers(servers), + retries = 5, + requestTimeoutMs = 2000 + ) + producer.send(new ProducerRecord(topic, topic.getBytes, message.getBytes)).get producer.close() } @@ -1056,18 +1081,14 @@ class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] { override def toBytes(n: Int) = n.toString.getBytes } -class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner{ +@deprecated("This class is deprecated and it will be removed in a future release.", "0.10.0.0") +class StaticPartitioner(props: VerifiableProperties = null) extends Partitioner { def partition(data: Any, numPartitions: Int): Int = { (data.asInstanceOf[String].length % numPartitions) } } -class HashPartitioner(props: VerifiableProperties = null) extends Partitioner { - def partition(data: Any, numPartitions: Int): Int = { - (data.hashCode % numPartitions) - } -} - +@deprecated("This class has been deprecated and it will be removed in a future release.", "0.10.0.0") class FixedValuePartitioner(props: VerifiableProperties = null) extends Partitioner { def partition(data: Any, numPartitions: Int): Int = data.asInstanceOf[Int] }
