KAFKA-736 Add an option to the 0.8 producer to mimic 0.7 producer behavior; reviewed by Jun Rao and Sriram Subramanian
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4d8fb1ee Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4d8fb1ee Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4d8fb1ee Branch: refs/heads/trunk Commit: 4d8fb1eebc043fab11c58d3309e93cc83ef24a89 Parents: 828ce83 Author: Neha Narkhede <neha.narkh...@gmail.com> Authored: Fri Feb 22 17:48:10 2013 -0800 Committer: Neha Narkhede <neha.narkh...@gmail.com> Committed: Fri Feb 22 17:48:10 2013 -0800 ---------------------------------------------------------------------- config/log4j.properties | 1 + .../main/scala/kafka/api/ProducerResponse.scala | 2 - .../main/scala/kafka/network/RequestChannel.scala | 2 +- .../main/scala/kafka/network/SocketServer.scala | 30 +++++-- .../scala/kafka/producer/BrokerPartitionInfo.scala | 2 +- .../scala/kafka/producer/ConsoleProducer.scala | 10 ++- .../scala/kafka/producer/KafkaLog4jAppender.scala | 13 ++- core/src/main/scala/kafka/producer/Producer.scala | 2 +- .../main/scala/kafka/producer/SyncProducer.scala | 17 +++- .../kafka/producer/async/DefaultEventHandler.scala | 30 ++++--- core/src/main/scala/kafka/server/KafkaApis.scala | 9 ++- .../unit/kafka/integration/PrimitiveApiTest.scala | 48 ++++++++--- .../integration/ProducerConsumerTestHarness.scala | 14 +--- .../scala/unit/kafka/log/OffsetIndexTest.scala | 2 +- .../unit/kafka/log4j/KafkaLog4jAppenderTest.scala | 7 +- .../unit/kafka/network/SocketServerTest.scala | 36 +++++++- .../unit/kafka/producer/AsyncProducerTest.scala | 13 ++-- .../scala/unit/kafka/producer/ProducerTest.scala | 3 +- .../unit/kafka/producer/SyncProducerTest.scala | 69 +++++++-------- .../scala/unit/kafka/server/LogRecoveryTest.scala | 2 +- .../unit/kafka/server/ServerShutdownTest.scala | 2 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 25 ++++-- 22 files changed, 215 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/config/log4j.properties ---------------------------------------------------------------------- diff --git a/config/log4j.properties b/config/log4j.properties index e104751..5692da0 100644 --- a/config/log4j.properties +++ b/config/log4j.properties @@ -47,6 +47,7 @@ log4j.logger.kafka=INFO, kafkaAppender log4j.logger.kafka.network.RequestChannel$=TRACE, requestAppender log4j.additivity.kafka.network.RequestChannel$=false +#log4j.logger.kafka.network.Processor=TRACE, requestAppender #log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender #log4j.additivity.kafka.server.KafkaApis=false log4j.logger.kafka.request.logger=TRACE, requestAppender http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/api/ProducerResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/ProducerResponse.scala b/core/src/main/scala/kafka/api/ProducerResponse.scala index 743227d..5bff709 100644 --- a/core/src/main/scala/kafka/api/ProducerResponse.scala +++ b/core/src/main/scala/kafka/api/ProducerResponse.scala @@ -22,7 +22,6 @@ import scala.collection.Map import kafka.common.{TopicAndPartition, ErrorMapping} import kafka.api.ApiUtils._ - object ProducerResponse { def readFrom(buffer: ByteBuffer): ProducerResponse = { val correlationId = buffer.getInt @@ -44,7 +43,6 @@ object ProducerResponse { case class ProducerResponseStatus(error: Short, offset: Long) - case class ProducerResponse(correlationId: Int, status: Map[TopicAndPartition, ProducerResponseStatus]) extends RequestOrResponse { http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 7747ddd..931092d 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -116,7 +116,7 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe } /** Get the next request or block until there is one */ - def receiveRequest(): RequestChannel.Request = + def receiveRequest(): RequestChannel.Request = requestQueue.take() /** Get a response for the given processor if there is one */ http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index d5a24f6..648d936 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -35,7 +35,7 @@ import kafka.utils._ class SocketServer(val brokerId: Int, val host: String, val port: Int, - val numProcessorThreads: Int, + val numProcessorThreads: Int, val maxQueuedRequests: Int, val maxRequestSize: Int = Int.MaxValue) extends Logging { this.logIdent = "[Socket Server on Broker " + brokerId + "], " @@ -206,7 +206,7 @@ private[kafka] class Acceptor(val host: String, val port: Int, private val proce * each of which has its own selectors */ private[kafka] class Processor(val id: Int, - val time: Time, + val time: Time, val maxRequestSize: Int, val requestChannel: RequestChannel) extends AbstractServerThread { @@ -219,7 +219,9 @@ private[kafka] class Processor(val id: Int, configureNewConnections() // register any new responses for writing processNewResponses() + val startSelectTime = SystemTime.milliseconds val ready = selector.select(300) + trace("Processor id " + id + " selection time = " + (SystemTime.milliseconds - startSelectTime) + " ms") if(ready > 0) { val keys = selector.selectedKeys() val iter = keys.iterator() @@ -259,11 +261,21 @@ private[kafka] class Processor(val id: Int, private def processNewResponses() { var curr = requestChannel.receiveResponse(id) while(curr != null) { - trace("Socket server received response to send, registering for write: " + curr) val key = curr.request.requestKey.asInstanceOf[SelectionKey] try { - key.interestOps(SelectionKey.OP_WRITE) - key.attach(curr) + if(curr.responseSend == null) { + // a null response send object indicates that there is no response to send to the client. + // In this case, we just want to turn the interest ops to READ to be able to read more pipelined requests + // that are sitting in the server's socket buffer + trace("Socket server received empty response to send, registering for read: " + curr) + key.interestOps(SelectionKey.OP_READ) + key.attach(null) + curr.request.updateRequestMetrics + } else { + trace("Socket server received response to send, registering for write: " + curr) + key.interestOps(SelectionKey.OP_WRITE) + key.attach(curr) + } } catch { case e: CancelledKeyException => { debug("Ignoring response for closed socket.") @@ -298,7 +310,7 @@ private[kafka] class Processor(val id: Int, private def configureNewConnections() { while(newConnections.size() > 0) { val channel = newConnections.poll() - debug("Listening to new connection from " + channel.socket.getRemoteSocketAddress) + debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress) channel.register(selector, SelectionKey.OP_READ) } } @@ -321,10 +333,12 @@ private[kafka] class Processor(val id: Int, } else if(receive.complete) { val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address) requestChannel.sendRequest(req) - trace("Received request, sending for processing by handler: " + req) key.attach(null) + // explicitly reset interest ops to not READ, no need to wake up the selector just yet + key.interestOps(key.interestOps & (~SelectionKey.OP_READ)) } else { // more reading to be done + trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) wakeup() } @@ -344,8 +358,10 @@ private[kafka] class Processor(val id: Int, if(responseSend.complete) { response.request.updateRequestMetrics() key.attach(null) + trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_READ) } else { + trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress()) key.interestOps(SelectionKey.OP_WRITE) wakeup() } http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala index b209a97..a0e2b44 100644 --- a/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala +++ b/core/src/main/scala/kafka/producer/BrokerPartitionInfo.scala @@ -80,7 +80,7 @@ class BrokerPartitionInfo(producerConfig: ProducerConfig, if(tmd.errorCode == ErrorMapping.NoError){ topicPartitionInfo.put(tmd.topic, tmd) } else - warn("Metadata for topic [%s] is erronous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode)) + warn("Metadata for topic [%s] is erroneous: [%s]".format(tmd.topic, tmd), ErrorMapping.exceptionFor(tmd.errorCode)) tmd.partitionsMetadata.foreach(pmd =>{ if (pmd.errorCode != ErrorMapping.NoError){ debug("Metadata for topic partition [%s, %d] is errornous: [%s]".format(tmd.topic, pmd.partitionId, pmd), ErrorMapping.exceptionFor(pmd.errorCode)) http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/ConsoleProducer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/ConsoleProducer.scala b/core/src/main/scala/kafka/producer/ConsoleProducer.scala index 8b77465..eebfda6 100644 --- a/core/src/main/scala/kafka/producer/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/producer/ConsoleProducer.scala @@ -87,7 +87,12 @@ object ConsoleProducer { .describedAs("reader_class") .ofType(classOf[java.lang.String]) .defaultsTo(classOf[LineMessageReader].getName) - val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " + + val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.") + .withRequiredArg + .describedAs("size") + .ofType(classOf[java.lang.Integer]) + .defaultsTo(1024*100) + val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " + "This allows custom configuration for a user-defined message reader.") .withRequiredArg .describedAs("prop") @@ -116,6 +121,7 @@ object ConsoleProducer { val keyEncoderClass = options.valueOf(keyEncoderOpt) val valueEncoderClass = options.valueOf(valueEncoderOpt) val readerClass = options.valueOf(messageReaderOpt) + val socketBuffer = options.valueOf(socketBufferSizeOpt) val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt)) cmdLineProps.put("topic", topic) @@ -133,7 +139,7 @@ object ConsoleProducer { props.put("request.timeout.ms", requestTimeoutMs.toString) props.put("key.serializer.class", keyEncoderClass) props.put("serializer.class", valueEncoderClass) - + props.put("send.buffer.bytes", socketBuffer.toString) val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]] reader.init(System.in, cmdLineProps) http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala index af077e0..3d22e6d 100644 --- a/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala +++ b/core/src/main/scala/kafka/producer/KafkaLog4jAppender.scala @@ -32,6 +32,7 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { var compressionCodec:String = null var enqueueTimeout:String = null var queueSize:String = null + var requiredNumAcks: Int = Int.MaxValue private var producer: Producer[String, String] = null @@ -40,22 +41,25 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { def getBrokerList:String = brokerList def setBrokerList(brokerList: String) { this.brokerList = brokerList } - + def getSerializerClass:String = serializerClass def setSerializerClass(serializerClass:String) { this.serializerClass = serializerClass } def getProducerType:String = producerType def setProducerType(producerType:String) { this.producerType = producerType } - + def getCompressionCodec:String = compressionCodec def setCompressionCodec(compressionCodec:String) { this.compressionCodec = compressionCodec } - + def getEnqueueTimeout:String = enqueueTimeout def setEnqueueTimeout(enqueueTimeout:String) { this.enqueueTimeout = enqueueTimeout } def getQueueSize:String = queueSize def setQueueSize(queueSize:String) { this.queueSize = queueSize } + def getRequiredNumAcks:Int = requiredNumAcks + def setRequiredNumAcks(requiredNumAcks:Int) { this.requiredNumAcks = requiredNumAcks } + override def activateOptions() { // check for config parameter validity val props = new Properties() @@ -75,12 +79,13 @@ class KafkaLog4jAppender extends AppenderSkeleton with Logging { if(compressionCodec != null) props.put("compression.codec", compressionCodec) if(enqueueTimeout != null) props.put("queue.enqueue.timeout.ms", enqueueTimeout) if(queueSize != null) props.put("queue.buffering.max.messages", queueSize) + if(requiredNumAcks != Int.MaxValue) props.put("request.required.acks", requiredNumAcks.toString) val config : ProducerConfig = new ProducerConfig(props) producer = new Producer[String, String](config) LogLog.debug("Kafka producer connected to " + config.brokerList) LogLog.debug("Logging for topic: " + topic) } - + override def append(event: LoggingEvent) { val message : String = if( this.layout == null) { event.getRenderedMessage http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/Producer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/Producer.scala b/core/src/main/scala/kafka/producer/Producer.scala index 66638f2..3ded46e 100644 --- a/core/src/main/scala/kafka/producer/Producer.scala +++ b/core/src/main/scala/kafka/producer/Producer.scala @@ -26,7 +26,7 @@ import kafka.common.QueueFullException import kafka.metrics._ -class Producer[K,V](config: ProducerConfig, +class Producer[K,V](val config: ProducerConfig, private val eventHandler: EventHandler[K,V]) // only for unit testing extends Logging { http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/SyncProducer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 0469a39..306f200 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -62,7 +62,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { /** * Common functionality for the public send methods */ - private def doSend(request: RequestOrResponse): Receive = { + private def doSend(request: RequestOrResponse, readResponse: Boolean = true): Receive = { lock synchronized { verifyRequest(request) getOrMakeConnection() @@ -70,7 +70,10 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { var response: Receive = null try { blockingChannel.send(request) - response = blockingChannel.receive() + if(readResponse) + response = blockingChannel.receive() + else + trace("Skipping reading response") } catch { case e: java.io.IOException => // no way to tell if write succeeded. Disconnect and re-throw exception to let client handle retry @@ -83,7 +86,8 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { } /** - * Send a message + * Send a message. If the producerRequest had required.request.acks=0, then the + * returned response object is null */ def send(producerRequest: ProducerRequest): ProducerResponse = { val requestSize = producerRequest.sizeInBytes @@ -95,10 +99,13 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer aggregateTimer.time { specificTimer.time { - response = doSend(producerRequest) + response = doSend(producerRequest, if(producerRequest.requiredAcks == 0) false else true) } } - ProducerResponse.readFrom(response.buffer) + if(producerRequest.requiredAcks != 0) + ProducerResponse.readFrom(response.buffer) + else + null } def send(request: TopicMetadataRequest): TopicMetadataResponse = { http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index 5569cc2..ebab1da 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -243,20 +243,22 @@ class DefaultEventHandler[K,V](config: ProducerConfig, val response = syncProducer.send(producerRequest) debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d" .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port)) - if (response.status.size != producerRequest.data.size) - throw new KafkaException("Incomplete response (%s) for producer request (%s)" - .format(response, producerRequest)) - if (logger.isTraceEnabled) { - val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError) - successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message => - trace("Successfully sent message: %s".format(Utils.readString(message.message.payload))))) - } - failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq - .map(partitionStatus => partitionStatus._1) - if(failedTopicPartitions.size > 0) - error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s" - .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(","))) - failedTopicPartitions + if(response != null) { + if (response.status.size != producerRequest.data.size) + throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest)) + if (logger.isTraceEnabled) { + val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError) + successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message => + trace("Successfully sent message: %s".format(Utils.readString(message.message.payload))))) + } + failedTopicPartitions = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq + .map(partitionStatus => partitionStatus._1) + if(failedTopicPartitions.size > 0) + error("Produce request with correlation id %d failed due to response %s. List of failed topic partitions is %s" + .format(currentCorrelationId, response.toString, failedTopicPartitions.mkString(","))) + failedTopicPartitions + } else + Seq.empty[TopicAndPartition] } catch { case t: Throwable => warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s" http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6df077b..ece1b46 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -23,13 +23,13 @@ import kafka.message._ import kafka.network._ import org.apache.log4j.Logger import scala.collection._ -import kafka.network.RequestChannel.Response import java.util.concurrent.TimeUnit import java.util.concurrent.atomic._ import kafka.metrics.KafkaMetricsGroup import org.I0Itec.zkclient.ZkClient import kafka.common._ import kafka.utils.{ZkUtils, Pool, SystemTime, Logging} +import kafka.network.RequestChannel.Response /** @@ -127,8 +127,11 @@ class KafkaApis(val requestChannel: RequestChannel, val allPartitionHaveReplicationFactorOne = !produceRequest.data.keySet.exists( m => replicaManager.getReplicationFactorForPartition(m.topic, m.partition) != 1) - if (produceRequest.requiredAcks == 0 || - produceRequest.requiredAcks == 1 || + if(produceRequest.requiredAcks == 0) { + // send a fake producer response if producer request.required.acks = 0. This mimics the behavior of a 0.7 producer + // and is tuned for very high throughput + requestChannel.sendResponse(new RequestChannel.Response(request.processor, request, null)) + } else if (produceRequest.requiredAcks == 1 || produceRequest.numPartitions <= 0 || allPartitionHaveReplicationFactorOne || numPartitionsInError == produceRequest.numPartitions) { http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala index 402fced..007e85d 100644 --- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala +++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala @@ -21,11 +21,7 @@ import java.nio.ByteBuffer import junit.framework.Assert._ import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder} import kafka.server.{KafkaRequestHandler, KafkaConfig} -import java.util.Properties -import kafka.utils.Utils import kafka.producer.{KeyedMessage, Producer, ProducerConfig} -import kafka.serializer._ -import kafka.utils.TestUtils import org.apache.log4j.{Level, Logger} import org.I0Itec.zkclient.ZkClient import kafka.zk.ZooKeeperTestHarness @@ -33,6 +29,7 @@ import org.scalatest.junit.JUnit3Suite import scala.collection._ import kafka.admin.{AdminUtils, CreateTopicCommand} import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException} +import kafka.utils.{TestUtils, Utils} /** * End to end tests of the primitive apis against a local server @@ -83,9 +80,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testDefaultEncoderProducerAndFetch() { val topic = "test-topic" - val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + val props = producer.config.props.props val config = new ProducerConfig(props) val stringProducer1 = new Producer[String, String](config) @@ -111,9 +106,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with def testDefaultEncoderProducerAndFetchWithCompression() { val topic = "test-topic" - val props = new Properties() - props.put("serializer.class", classOf[StringEncoder].getName.toString) - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + val props = producer.config.props.props props.put("compression", "true") val config = new ProducerConfig(props) @@ -272,7 +265,6 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with } producer.send(produceList: _*) - // wait a bit for produced message to be available val request = builder.build() val response = consumer.fetch(request) for( (topic, partition) <- topics) { @@ -315,6 +307,40 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext) } + def testPipelinedProduceRequests() { + createSimpleTopicsAndAwaitLeader(zkClient, List("test1", "test2", "test3", "test4"), config.brokerId) + val props = producer.config.props.props + props.put("request.required.acks", "0") + val pipelinedProducer: Producer[String, String] = new Producer(new ProducerConfig(props)) + + // send some messages + val topics = List(("test4", 0), ("test1", 0), ("test2", 0), ("test3", 0)); + val messages = new mutable.HashMap[String, Seq[String]] + val builder = new FetchRequestBuilder() + var produceList: List[KeyedMessage[String, String]] = Nil + for( (topic, partition) <- topics) { + val messageList = List("a_" + topic, "b_" + topic) + val producerData = messageList.map(new KeyedMessage[String, String](topic, topic, _)) + messages += topic -> messageList + pipelinedProducer.send(producerData:_*) + builder.addFetch(topic, partition, 0, 10000) + } + + // wait until the messages are published + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test1", 0).get.logEndOffset == 2 }, 1000) + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test2", 0).get.logEndOffset == 2 }, 1000) + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test3", 0).get.logEndOffset == 2 }, 1000) + TestUtils.waitUntilTrue(() => { servers.head.logManager.getLog("test4", 0).get.logEndOffset == 2 }, 1000) + + // test if the consumer received the messages in the correct order when producer has enabled request pipelining + val request = builder.build() + val response = consumer.fetch(request) + for( (topic, partition) <- topics) { + val fetched = response.messageSet(topic, partition) + assertEquals(messages(topic), fetched.map(messageAndOffset => Utils.readString(messageAndOffset.message.payload))) + } + } + /** * For testing purposes, just create these topics each with one partition and one replica for * which the provided broker should the leader for. Create and wait for broker to lead. Simple. http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala index 0fde254..731ee59 100644 --- a/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/ProducerConsumerTestHarness.scala @@ -19,11 +19,8 @@ package kafka.integration import kafka.consumer.SimpleConsumer import org.scalatest.junit.JUnit3Suite -import java.util.Properties import kafka.producer.{ProducerConfig, Producer} import kafka.utils.TestUtils -import kafka.serializer._ - trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness { val port: Int val host = "localhost" @@ -32,16 +29,7 @@ trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarnes override def setUp() { super.setUp - val props = new Properties() - props.put("partitioner.class", "kafka.utils.StaticPartitioner") - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) - props.put("send.buffer.bytes", "65536") - props.put("connect.timeout.ms", "100000") - props.put("reconnect.interval", "10000") - props.put("retry.backoff.ms", "1000") - props.put("message.send.max.retries", "3") - props.put("request.required.acks", "-1") - props.put("serializer.class", classOf[StringEncoder].getName.toString) + val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), "kafka.utils.StaticPartitioner") producer = new Producer(new ProducerConfig(props)) consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "") } http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala index 051ebe3..3b2c069 100644 --- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala @@ -24,7 +24,7 @@ import org.junit._ import org.scalatest.junit.JUnitSuite import scala.collection._ import scala.util.Random -import kafka.utils._ +import kafka.utils.TestUtils class OffsetIndexTest extends JUnitSuite { http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala index c25255f..67497dd 100644 --- a/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala +++ b/core/src/test/scala/unit/kafka/log4j/KafkaLog4jAppenderTest.scala @@ -36,7 +36,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with var logDirZk: File = null var config: KafkaConfig = null - var serverZk: KafkaServer = null + var server: KafkaServer = null var simpleConsumerZk: SimpleConsumer = null @@ -55,14 +55,14 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with val logDirZkPath = propsZk.getProperty("log.dir") logDirZk = new File(logDirZkPath) config = new KafkaConfig(propsZk) - serverZk = TestUtils.createServer(config); + server = TestUtils.createServer(config); simpleConsumerZk = new SimpleConsumer("localhost", portZk, 1000000, 64*1024, "") } @After override def tearDown() { simpleConsumerZk.close - serverZk.shutdown + server.shutdown Utils.rm(logDirZk) super.tearDown() } @@ -164,6 +164,7 @@ class KafkaLog4jAppenderTest extends JUnit3Suite with ZooKeeperTestHarness with props.put("log4j.appender.KAFKA.brokerList", TestUtils.getBrokerListStrFromConfigs(Seq(config))) props.put("log4j.appender.KAFKA.Topic", "test-topic") props.put("log4j.logger.kafka.log4j", "INFO,KAFKA") + props.put("log4j.appender.KAFKA.requiredNumAcks", "1") props } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 7395cbc..9322b2c 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -21,7 +21,6 @@ import java.net._ import java.io._ import org.junit._ import org.scalatest.junit.JUnitSuite -import kafka.utils.TestUtils import java.util.Random import junit.framework.Assert._ import kafka.producer.SyncProducerConfig @@ -29,13 +28,14 @@ import kafka.api.ProducerRequest import java.nio.ByteBuffer import kafka.common.TopicAndPartition import kafka.message.ByteBufferMessageSet +import java.nio.channels.SelectionKey class SocketServerTest extends JUnitSuite { val server: SocketServer = new SocketServer(0, host = null, - port = TestUtils.choosePort, + port = kafka.utils.TestUtils.choosePort, numProcessorThreads = 1, maxQueuedRequests = 50, maxRequestSize = 50) @@ -102,4 +102,36 @@ class SocketServerTest extends JUnitSuite { receiveResponse(socket) } + @Test + def testPipelinedRequestOrdering() { + val socket = connect() + val correlationId = -1 + val clientId = SyncProducerConfig.DefaultClientId + val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs + val ack: Short = 0 + val emptyRequest = + new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]()) + + val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes) + emptyRequest.writeTo(byteBuffer) + byteBuffer.rewind() + val serializedBytes = new Array[Byte](byteBuffer.remaining) + byteBuffer.get(serializedBytes) + + sendRequest(socket, 0, serializedBytes) + sendRequest(socket, 0, serializedBytes) + + // here the socket server should've read only the first request completely and since the response is not sent yet + // the selection key should not be readable + val request = server.requestChannel.receiveRequest + Assert.assertFalse((request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) == SelectionKey.OP_READ) + + server.requestChannel.sendResponse(new RequestChannel.Response(0, request, null)) + + // if everything is working correctly, until you send a response for the first request, + // the 2nd request will not be read by the socket server + val request2 = server.requestChannel.receiveRequest + server.requestChannel.sendResponse(new RequestChannel.Response(0, request2, null)) + Assert.assertFalse((request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) == SelectionKey.OP_READ) + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala index fb0666f..922a200 100644 --- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala @@ -356,11 +356,9 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testBrokerListAndAsync() { return - val props = new Properties() - props.put("serializer.class", "kafka.serializer.StringEncoder") + val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs)) props.put("producer.type", "async") props.put("batch.num.messages", "5") - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) val config = new ProducerConfig(props) @@ -394,9 +392,10 @@ class AsyncProducerTest extends JUnit3Suite { @Test def testFailedSendRetryLogic() { val props = new Properties() + props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) + props.put("request.required.acks", "1") props.put("serializer.class", classOf[StringEncoder].getName.toString) props.put("key.serializer.class", classOf[NullEncoder[Int]].getName.toString) - props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(configs)) val config = new ProducerConfig(props) @@ -410,12 +409,12 @@ class AsyncProducerTest extends JUnit3Suite { // produce request for topic1 and partitions 0 and 1. Let the first request fail // entirely. The second request will succeed for partition 1 but fail for partition 0. // On the third try for partition 0, let it succeed. - val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 11) - val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 0, correlationId = 17) + val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 11) + val request2 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), messagesToSet(msgs), acks = 1, correlationId = 17) val response1 = ProducerResponse(0, Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)), (TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L)))) - val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), correlationId = 21) + val request3 = TestUtils.produceRequest(topic1, 0, messagesToSet(msgs), acks = 1, correlationId = 21) val response2 = ProducerResponse(0, Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L)))) val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/producer/ProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala index 792919b..04acef5 100644 --- a/core/src/test/scala/unit/kafka/producer/ProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/ProducerTest.scala @@ -199,7 +199,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("serializer.class", "kafka.serializer.StringEncoder") props.put("partitioner.class", "kafka.utils.StaticPartitioner") props.put("request.timeout.ms", "2000") -// props.put("request.required.acks", "-1") + props.put("request.required.acks", "1") props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) // create topic @@ -258,6 +258,7 @@ class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness with Logging{ props.put("partitioner.class", "kafka.utils.StaticPartitioner") props.put("request.timeout.ms", String.valueOf(timeoutMs)) props.put("broker.list", TestUtils.getBrokerListStrFromConfigs(Seq(config1, config2))) + props.put("request.required.acks", "1") val config = new ProducerConfig(props) val producer = new Producer[String, String](config) http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index 89ba944..81b2736 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -18,7 +18,6 @@ package kafka.producer import java.net.SocketTimeoutException -import java.util.Properties import junit.framework.Assert import kafka.admin.CreateTopicCommand import kafka.integration.KafkaServerTestHarness @@ -38,16 +37,13 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testReachableServer() { val server = servers.head - val props = new Properties() - props.put("host", "localhost") - props.put("port", server.socketServer.port.toString) - props.put("send.buffer.bytes", "102400") - props.put("connect.timeout.ms", "500") - props.put("reconnect.interval", "1000") + val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val producer = new SyncProducer(new SyncProducerConfig(props)) val firstStart = SystemTime.milliseconds try { - val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))) + val response = producer.send(TestUtils.produceRequest("test", 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)) Assert.assertNotNull(response) } catch { case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage) @@ -56,7 +52,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { Assert.assertTrue((firstEnd-firstStart) < 500) val secondStart = SystemTime.milliseconds try { - val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))) + val response = producer.send(TestUtils.produceRequest("test", 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)) Assert.assertNotNull(response) } catch { case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage) @@ -64,7 +61,8 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val secondEnd = SystemTime.milliseconds Assert.assertTrue((secondEnd-secondStart) < 500) try { - val response = producer.send(TestUtils.produceRequest("test", 0, new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)))) + val response = producer.send(TestUtils.produceRequest("test", 0, + new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(messageBytes)), acks = 1)) Assert.assertNotNull(response) } catch { case e: Exception => Assert.fail("Unexpected failure sending message to broker. " + e.getMessage) @@ -74,36 +72,31 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testEmptyProduceRequest() { val server = servers.head - val props = new Properties() - props.put("host", "localhost") - props.put("port", server.socketServer.port.toString) - props.put("send.buffer.bytes", "102400") - props.put("connect.timeout.ms", "300") - props.put("reconnect.interval", "500") + val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val correlationId = 0 val clientId = SyncProducerConfig.DefaultClientId val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs - val ack = SyncProducerConfig.DefaultRequiredAcks + val ack: Short = 1 val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]()) - val producer = new SyncProducer(new SyncProducerConfig(props)) val response = producer.send(emptyRequest) + Assert.assertTrue(response != null) Assert.assertTrue(!response.hasError && response.status.size == 0) } @Test def testMessageSizeTooLarge() { val server = servers.head - val props = new Properties() - props.put("host", "localhost") - props.put("port", server.socketServer.port.toString) + val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val producer = new SyncProducer(new SyncProducerConfig(props)) CreateTopicCommand.createTopic(zkClient, "test", 1, 1) TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "test", 0, 500) val message1 = new Message(new Array[Byte](configs(0).messageMaxBytes + 1)) val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1) - val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1)) + val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1, acks = 1)) Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError)) Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.status(TopicAndPartition("test", 0)).error) @@ -112,7 +105,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val safeSize = configs(0).messageMaxBytes - Message.MessageOverhead - MessageSet.LogOverhead - 1 val message2 = new Message(new Array[Byte](safeSize)) val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2) - val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2)) + val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2, acks = 1)) Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError)) Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("test", 0)).error) @@ -122,12 +115,7 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { @Test def testProduceCorrectlyReceivesResponse() { val server = servers.head - val props = new Properties() - props.put("host", "localhost") - props.put("port", server.socketServer.port.toString) - props.put("send.buffer.bytes", "102400") - props.put("connect.timeout.ms", "300") - props.put("reconnect.interval", "500") + val props = TestUtils.getSyncProducerConfig(server.socketServer.port) val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) @@ -173,15 +161,11 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { val timeoutMs = 500 val server = servers.head - val props = new Properties() - props.put("host", "localhost") - props.put("port", server.socketServer.port.toString) - props.put("send.buffer.bytes", "102400") - props.put("request.timeout.ms", String.valueOf(timeoutMs)) + val props = TestUtils.getSyncProducerConfig(server.socketServer.port) val producer = new SyncProducer(new SyncProducerConfig(props)) val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes)) - val request = TestUtils.produceRequest("topic1", 0, messages) + val request = TestUtils.produceRequest("topic1", 0, messages, acks = 1) // stop IO threads and request handling, but leave networking operational // any requests should be accepted and queue up, but not handled @@ -196,8 +180,21 @@ class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { case e => Assert.fail("Unexpected exception when expecting timeout: " + e) } val t2 = SystemTime.milliseconds - // make sure we don't wait fewer than timeoutMs for a response Assert.assertTrue((t2-t1) >= timeoutMs) } + + @Test + def testProduceRequestWithNoResponse() { + val server = servers.head + val props = TestUtils.getSyncProducerConfig(server.socketServer.port) + val correlationId = 0 + val clientId = SyncProducerConfig.DefaultClientId + val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs + val ack: Short = 0 + val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, ByteBufferMessageSet]()) + val producer = new SyncProducer(new SyncProducerConfig(props)) + val response = producer.send(emptyRequest) + Assert.assertTrue(response == null) + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala index cd724a3..db46247 100644 --- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala +++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala @@ -48,7 +48,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness { var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDirs(0)) var servers: Seq[KafkaServer] = Seq.empty[KafkaServer] - val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000) + val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs)) producerProps.put("key.serializer.class", classOf[IntEncoder].getName.toString) producerProps.put("request.required.acks", "-1") http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala index 7afbe54..3728f8c 100644 --- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala @@ -44,7 +44,7 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness { def testCleanShutdown() { var server = new KafkaServer(config) server.startup() - val producerConfig = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000) + val producerConfig = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config))) producerConfig.put("key.serializer.class", classOf[IntEncoder].getName.toString) var producer = new Producer[Int, String](new ProducerConfig(producerConfig)) http://git-wip-us.apache.org/repos/asf/kafka/blob/4d8fb1ee/core/src/test/scala/unit/kafka/utils/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 9400328..217ff7a 100644 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -301,16 +301,25 @@ object TestUtils extends Logging { new Producer[K, V](new ProducerConfig(props)) } - def getProducerConfig(brokerList: String, bufferSize: Int, connectTimeout: Int, - reconnectInterval: Int): Properties = { + def getProducerConfig(brokerList: String, partitioner: String = "kafka.producer.DefaultPartitioner"): Properties = { val props = new Properties() - props.put("producer.type", "sync") props.put("broker.list", brokerList) - props.put("partitioner.class", "kafka.utils.FixedValuePartitioner") - props.put("send.buffer.bytes", bufferSize.toString) - props.put("connect.timeout.ms", connectTimeout.toString) - props.put("reconnect.interval", reconnectInterval.toString) - props.put("request.timeout.ms", 30000.toString) + props.put("partitioner.class", partitioner) + props.put("message.send.max.retries", "3") + props.put("retry.backoff.ms", "1000") + props.put("request.timeout.ms", "500") + props.put("request.required.acks", "-1") + props.put("serializer.class", classOf[StringEncoder].getName.toString) + + props + } + + def getSyncProducerConfig(port: Int): Properties = { + val props = new Properties() + props.put("host", "localhost") + props.put("port", port.toString) + props.put("request.timeout.ms", "500") + props.put("request.required.acks", "1") props.put("serializer.class", classOf[StringEncoder].getName.toString) props }