Repository: kafka Updated Branches: refs/heads/trunk d22987f01 -> 78ba492e3
http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala deleted file mode 100755 index c0d7726..0000000 --- a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.network - -import java.nio._ -import java.nio.channels._ -import kafka.utils._ - -/** - * Represents a communication between the client and server - * - */ -@nonthreadsafe -private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends Receive with Logging { - - private val sizeBuffer = ByteBuffer.allocate(4) - private var contentBuffer: ByteBuffer = null - - def this() = this(Int.MaxValue) - - var complete: Boolean = false - - /** - * Get the content buffer for this transmission - */ - def buffer: ByteBuffer = { - expectComplete() - contentBuffer - } - - /** - * Read the bytes in this response from the given channel - */ - def readFrom(channel: ReadableByteChannel): Int = { - expectIncomplete() - var read = 0 - // have we read the request size yet? - if(sizeBuffer.remaining > 0) - read += CoreUtils.read(channel, sizeBuffer) - // have we allocated the request buffer yet? - if(contentBuffer == null && !sizeBuffer.hasRemaining) { - sizeBuffer.rewind() - val size = sizeBuffer.getInt() - if(size <= 0) - throw new InvalidRequestException("%d is not a valid request size.".format(size)) - if(size > maxSize) - throw new InvalidRequestException("Request of length %d is not valid, it is larger than the maximum size of %d bytes.".format(size, maxSize)) - contentBuffer = byteBufferAllocate(size) - } - // if we have a buffer read some stuff into it - if(contentBuffer != null) { - read = CoreUtils.read(channel, contentBuffer) - // did we get everything? - if(!contentBuffer.hasRemaining) { - contentBuffer.rewind() - complete = true - } - } - read - } - - private def byteBufferAllocate(size: Int): ByteBuffer = { - var buffer: ByteBuffer = null - try { - buffer = ByteBuffer.allocate(size) - } catch { - case e: OutOfMemoryError => - error("OOME with size " + size, e) - throw e - case e2: Throwable => - throw e2 - } - buffer - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala deleted file mode 100644 index b95b73b..0000000 --- a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.network - -import java.nio._ -import java.nio.channels._ -import kafka.utils._ -import kafka.api.RequestOrResponse -import org.apache.kafka.common.requests.{AbstractRequestResponse, ResponseHeader} - -@nonthreadsafe -private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends Send { - - private val sizeBuffer = ByteBuffer.allocate(4) - - // Avoid possibility of overflow for 2GB-4 byte buffer - if(buffer.remaining > Int.MaxValue - sizeBuffer.limit) - throw new IllegalStateException("Attempt to create a bounded buffer of " + buffer.remaining + " bytes, but the maximum " + - "allowable size for a bounded buffer is " + (Int.MaxValue - sizeBuffer.limit) + ".") - sizeBuffer.putInt(buffer.limit) - sizeBuffer.rewind() - - var complete: Boolean = false - - def this(size: Int) = this(ByteBuffer.allocate(size)) - - def this(request: RequestOrResponse) = { - this(request.sizeInBytes + (if(request.requestId != None) 2 else 0)) - request.requestId match { - case Some(requestId) => - buffer.putShort(requestId) - case None => - } - - request.writeTo(buffer) - buffer.rewind() - } - - def this(header: ResponseHeader, body: AbstractRequestResponse) = { - this(header.sizeOf + body.sizeOf) - header.writeTo(buffer) - body.writeTo(buffer) - buffer.rewind - } - - - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete() - val written = channel.write(Array(sizeBuffer, buffer)) - // if we are done, mark it off - if(!buffer.hasRemaining) - complete = true - written.asInstanceOf[Int] - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/ByteBufferSend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/ByteBufferSend.scala b/core/src/main/scala/kafka/network/ByteBufferSend.scala deleted file mode 100644 index af30042..0000000 --- a/core/src/main/scala/kafka/network/ByteBufferSend.scala +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.network - -import java.nio._ -import java.nio.channels._ -import kafka.utils._ - -@nonthreadsafe -private[kafka] class ByteBufferSend(val buffer: ByteBuffer) extends Send { - - var complete: Boolean = false - - def this(size: Int) = this(ByteBuffer.allocate(size)) - - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete() - var written = 0 - written += channel.write(buffer) - if(!buffer.hasRemaining) - complete = true - written - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/Handler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/Handler.scala b/core/src/main/scala/kafka/network/Handler.scala index a030033..1a7d56e 100644 --- a/core/src/main/scala/kafka/network/Handler.scala +++ b/core/src/main/scala/kafka/network/Handler.scala @@ -17,17 +17,19 @@ package kafka.network +import org.apache.kafka.common.network.{NetworkReceive, Send} + private[kafka] object Handler { /** * A request handler is a function that turns an incoming * transmission into an outgoing transmission */ - type Handler = Receive => Option[Send] + type Handler = NetworkReceive => Option[Send] /** * A handler mapping finds the right Handler function for a given request */ - type HandlerMapping = (Short, Receive) => Handler + type HandlerMapping = (Short, NetworkReceive) => Handler } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 24edb61..357d8b9 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -17,22 +17,23 @@ package kafka.network +import java.nio.ByteBuffer import java.util.concurrent._ -import kafka.metrics.KafkaMetricsGroup + import com.yammer.metrics.core.Gauge -import java.nio.ByteBuffer import kafka.api._ import kafka.common.TopicAndPartition -import kafka.utils.{Logging, SystemTime} import kafka.message.ByteBufferMessageSet -import java.net._ +import kafka.metrics.KafkaMetricsGroup +import kafka.utils.{Logging, SystemTime} +import org.apache.kafka.common.network.Send import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader} import org.apache.log4j.Logger object RequestChannel extends Logging { - val AllDone = new Request(processor = 1, requestKey = 2, buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT) + val AllDone = new Request(processor = 1, connectionId = "2", buffer = getShutdownReceive(), startTimeMs = 0, securityProtocol = SecurityProtocol.PLAINTEXT) def getShutdownReceive() = { val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]()) @@ -43,7 +44,7 @@ object RequestChannel extends Logging { byteBuffer } - case class Request(processor: Int, requestKey: Any, private var buffer: ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new InetSocketAddress(0), securityProtocol: SecurityProtocol) { + case class Request(processor: Int, connectionId: String, private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) { @volatile var requestDequeueTimeMs = -1L @volatile var apiLocalCompleteTimeMs = -1L @volatile var responseCompleteTimeMs = -1L @@ -71,7 +72,15 @@ object RequestChannel extends Logging { buffer = null private val requestLogger = Logger.getLogger("kafka.request.logger") - trace("Processor %d received request : %s".format(processor, if (requestObj != null) requestObj.describe(false) else header.toString + " : " + body.toString)) + + private def requestDesc: String = { + if (requestObj != null) + requestObj.describe(false) + else + header.toString + " -- " + body.toString + } + + trace("Processor %d received request : %s".format(processor, requestDesc)) def updateRequestMetrics() { val endTimeMs = SystemTime.milliseconds @@ -102,13 +111,13 @@ object RequestChannel extends Logging { m.responseSendTimeHist.update(responseSendTime) m.totalTimeHist.update(totalTime) } + if(requestLogger.isTraceEnabled) - requestLogger.trace("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" - .format(if (requestObj != null) requestObj.describe(true) else header.toString + " : " + body.toString, remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) - else if(requestLogger.isDebugEnabled) { - requestLogger.debug("Completed request:%s from client %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" - .format(if (requestObj != null) requestObj.describe(false) else header.toString + " : " + body.toString, remoteAddress, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) - } + requestLogger.trace("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" + .format(requestDesc, connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) + else if(requestLogger.isDebugEnabled) + requestLogger.debug("Completed request:%s from connection %s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d" + .format(requestDesc, connectionId, totalTime, requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/RequestOrResponseSend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala new file mode 100644 index 0000000..364f24b --- /dev/null +++ b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.nio.ByteBuffer +import java.nio.channels.GatheringByteChannel + +import kafka.api.RequestOrResponse +import kafka.utils.Logging +import org.apache.kafka.common.network.NetworkSend + +object RequestOrResponseSend { + def serialize(request: RequestOrResponse): ByteBuffer = { + val buffer = ByteBuffer.allocate(request.sizeInBytes + (if(request.requestId != None) 2 else 0)) + request.requestId match { + case Some(requestId) => + buffer.putShort(requestId) + case None => + } + request.writeTo(buffer) + buffer.rewind() + buffer + } +} + +class RequestOrResponseSend(val dest: String, val buffer: ByteBuffer) extends NetworkSend(dest, buffer) with Logging { + + def this(dest: String, request: RequestOrResponse) { + this(dest, RequestOrResponseSend.serialize(request)) + } + + def writeCompletely(channel: GatheringByteChannel): Long = { + var totalWritten = 0L + while(!completed()) { + val written = writeTo(channel) + trace(written + " bytes written.") + totalWritten += written + } + totalWritten + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/SocketServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index edf6214..91319fa 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -17,23 +17,26 @@ package kafka.network +import java.io.IOException +import java.net._ +import java.nio.channels._ import java.util import java.util.concurrent._ import java.util.concurrent.atomic._ -import java.net._ -import java.io._ -import java.nio.channels._ +import com.yammer.metrics.core.Gauge import kafka.cluster.EndPoint -import org.apache.kafka.common.protocol.SecurityProtocol - -import scala.collection._ - import kafka.common.KafkaException import kafka.metrics.KafkaMetricsGroup import kafka.utils._ -import com.yammer.metrics.core.{Gauge, Meter} -import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.MetricName +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network.InvalidReceiveException +import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.protocol.types.SchemaException +import org.apache.kafka.common.utils.{SystemTime, Time, Utils} + +import scala.collection._ /** * An NIO socket server. The threading model is @@ -50,16 +53,20 @@ class SocketServer(val brokerId: Int, val maxRequestSize: Int = Int.MaxValue, val maxConnectionsPerIp: Int = Int.MaxValue, val connectionsMaxIdleMs: Long, - val maxConnectionsPerIpOverrides: Map[String, Int] ) extends Logging with KafkaMetricsGroup { + val maxConnectionsPerIpOverrides: Map[String, Int], + val time: Time, + val metrics: Metrics) extends Logging with KafkaMetricsGroup { this.logIdent = "[Socket Server on Broker " + brokerId + "], " - private val time = SystemTime + private val processors = new Array[Processor](numProcessorThreads) private[network] var acceptors = mutable.Map[EndPoint,Acceptor]() val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests) - /* a meter to track the average free capacity of the network processors */ - private val aggregateIdleMeter = newMeter("NetworkProcessorAvgIdlePercent", "percent", TimeUnit.NANOSECONDS) - + private val allMetricNames = (0 until numProcessorThreads).map { i => + val tags = new util.HashMap[String, String]() + tags.put("networkProcessor", i.toString) + new MetricName("io-wait-ratio", "socket-server-metrics", tags) + } /* I'm pushing the mapping of port-to-protocol to the processor level, so the processor can put the correct protocol in the request channel. @@ -75,26 +82,30 @@ class SocketServer(val brokerId: Int, def startup() { val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) + newGauge("NetworkProcessorAvgIdlePercent", + new Gauge[Double] { + def value = allMetricNames.map( metricName => + metrics.metrics().get(metricName).value()).sum / numProcessorThreads + } + ) + + this.synchronized { for (i <- 0 until numProcessorThreads) { processors(i) = new Processor(i, time, maxRequestSize, - aggregateIdleMeter, - newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)), numProcessorThreads, requestChannel, quotas, connectionsMaxIdleMs, - portToProtocol) + portToProtocol, + metrics + ) Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), processors(i), false).start() } } - newGauge("ResponsesBeingSent", new Gauge[Int] { - def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) } - }) - // register the processor threads for notification of responses requestChannel.addResponseListener((id:Int) => processors(id).wakeup()) @@ -140,18 +151,19 @@ class SocketServer(val brokerId: Int, */ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQuotas) extends Runnable with Logging { - protected val selector = Selector.open() private val startupLatch = new CountDownLatch(1) private val shutdownLatch = new CountDownLatch(1) private val alive = new AtomicBoolean(true) + def wakeup() + /** * Initiates a graceful shutdown by signaling to stop and waiting for the shutdown to complete */ def shutdown(): Unit = { alive.set(false) - selector.wakeup() - shutdownLatch.await + wakeup() + shutdownLatch.await() } /** @@ -163,13 +175,13 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ * Record that the thread startup is complete */ protected def startupComplete() = { - startupLatch.countDown + startupLatch.countDown() } /** * Record that the thread shutdown is complete */ - protected def shutdownComplete() = shutdownLatch.countDown + protected def shutdownComplete() = shutdownLatch.countDown() /** * Is the server still running? @@ -177,11 +189,6 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ protected def isRunning = alive.get /** - * Wakeup the thread for selection. - */ - def wakeup() = selector.wakeup() - - /** * Close the given key and associated socket */ def close(key: SelectionKey) { @@ -200,30 +207,6 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ swallowError(channel.close()) } } - - /** - * Close all open connections - */ - def closeAll() { - // removes cancelled keys from selector.keys set - this.selector.selectNow() - val iter = this.selector.keys().iterator() - while (iter.hasNext) { - val key = iter.next() - close(key) - } - } - - def countInterestOps(ops: Int): Int = { - var count = 0 - val it = this.selector.keys().iterator() - while (it.hasNext) { - if ((it.next().interestOps() & ops) != 0) { - count += 1 - } - } - count - } } /** @@ -237,6 +220,7 @@ private[kafka] class Acceptor(val host: String, connectionQuotas: ConnectionQuotas, protocol: SecurityProtocol, portToProtocol: ConcurrentHashMap[Int, SecurityProtocol]) extends AbstractServerThread(connectionQuotas) { + val nioSelector = java.nio.channels.Selector.open() val serverChannel = openServerSocket(host, port) portToProtocol.put(serverChannel.socket().getLocalPort, protocol) @@ -244,13 +228,13 @@ private[kafka] class Acceptor(val host: String, * Accept loop that checks for new connection attempts */ def run() { - serverChannel.register(selector, SelectionKey.OP_ACCEPT) + serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT); startupComplete() var currentProcessor = 0 while(isRunning) { - val ready = selector.select(500) + val ready = nioSelector.select(500) if(ready > 0) { - val keys = selector.selectedKeys() + val keys = nioSelector.selectedKeys() val iter = keys.iterator() while(iter.hasNext && isRunning) { var key: SelectionKey = null @@ -258,7 +242,7 @@ private[kafka] class Acceptor(val host: String, key = iter.next iter.remove() if(key.isAcceptable) - accept(key, processors(currentProcessor)) + accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") @@ -272,7 +256,7 @@ private[kafka] class Acceptor(val host: String, } debug("Closing server socket and selector.") swallowError(serverChannel.close()) - swallowError(selector.close()) + swallowError(nioSelector.close()) shutdownComplete() } @@ -290,7 +274,7 @@ private[kafka] class Acceptor(val host: String, serverChannel.socket().setReceiveBufferSize(recvBufferSize) try { serverChannel.socket.bind(socketAddress) - info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, port)) + info("Awaiting socket connections on %s:%d.".format(socketAddress.getHostName, serverChannel.socket.getLocalPort)) } catch { case e: SocketException => throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e) @@ -324,6 +308,12 @@ private[kafka] class Acceptor(val host: String, } } + /** + * Wakeup the thread for selection. + */ + @Override + def wakeup = nioSelector.wakeup() + } /** @@ -333,19 +323,36 @@ private[kafka] class Acceptor(val host: String, private[kafka] class Processor(val id: Int, val time: Time, val maxRequestSize: Int, - val aggregateIdleMeter: Meter, - val idleMeter: Meter, val totalProcessorThreads: Int, val requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, val connectionsMaxIdleMs: Long, - val portToProtocol: ConcurrentHashMap[Int,SecurityProtocol]) extends AbstractServerThread(connectionQuotas) { + val portToProtocol: ConcurrentHashMap[Int,SecurityProtocol], + val metrics: Metrics) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { private val newConnections = new ConcurrentLinkedQueue[SocketChannel]() - private val connectionsMaxIdleNanos = connectionsMaxIdleMs * 1000 * 1000 - private var currentTimeNanos = SystemTime.nanoseconds - private val lruConnections = new util.LinkedHashMap[SelectionKey, Long] - private var nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos + private val inflightResponses = mutable.Map[String, RequestChannel.Response]() + + private val metricTags = new util.HashMap[String, String]() + metricTags.put("networkProcessor", id.toString) + + newGauge("IdlePercent", + new Gauge[Double] { + def value = { + metrics.metrics().get(new MetricName("io-wait-ratio", "socket-server-metrics", metricTags)).value() + } + }, + JavaConversions.mapAsScalaMap(metricTags) + ) + + private val selector = new org.apache.kafka.common.network.Selector( + maxRequestSize, + connectionsMaxIdleMs, + metrics, + time, + "socket-server", + metricTags, + false) override def run() { startupComplete() @@ -354,68 +361,51 @@ private[kafka] class Processor(val id: Int, configureNewConnections() // register any new responses for writing processNewResponses() - val startSelectTime = SystemTime.nanoseconds - val ready = selector.select(300) - currentTimeNanos = SystemTime.nanoseconds - val idleTime = currentTimeNanos - startSelectTime - idleMeter.mark(idleTime) - // We use a single meter for aggregate idle percentage for the thread pool. - // Since meter is calculated as total_recorded_value / time_window and - // time_window is independent of the number of threads, each recorded idle - // time should be discounted by # threads. - aggregateIdleMeter.mark(idleTime / totalProcessorThreads) - - trace("Processor id " + id + " selection time = " + idleTime + " ns") - if(ready > 0) { - val keys = selector.selectedKeys() - val iter = keys.iterator() - while(iter.hasNext && isRunning) { - var key: SelectionKey = null - try { - key = iter.next - iter.remove() - if(key.isReadable) - read(key) - else if(key.isWritable) - write(key) - else if(!key.isValid) - close(key) - else - throw new IllegalStateException("Unrecognized key state for processor thread.") - } catch { - case e: EOFException => { - debug("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress)) - close(key) - } case e: InvalidRequestException => { - info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage)) - close(key) - } case e: Throwable => { - error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e) - close(key) - } - } + + try { + selector.poll(300) + } catch { + case e @ (_: IllegalStateException | _: IOException) => { + error("Closing processor %s due to illegal state or IO exception".format(id)) + swallow(closeAll()) + shutdownComplete() + throw e } + case e: InvalidReceiveException => + // Log warning and continue since Selector already closed the connection + warn("Connection was closed due to invalid receive. Processor will continue handling other connections") } - maybeCloseOldestConnection + collection.JavaConversions.collectionAsScalaIterable(selector.completedReceives).foreach( receive => { + try { + val req = RequestChannel.Request(processor = id, connectionId = receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = SecurityProtocol.PLAINTEXT) + requestChannel.sendRequest(req) + } catch { + case e @ (_: InvalidRequestException | _: SchemaException) => { + // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier + error("Closing socket for " + receive.source + " because of error", e) + selector.close(receive.source) + } + } + selector.mute(receive.source) + }) + + collection.JavaConversions.iterableAsScalaIterable(selector.completedSends()).foreach( send => { + val resp = inflightResponses.remove(send.destination()).get + resp.request.updateRequestMetrics() + selector.unmute(send.destination()) + }) } - debug("Closing selector.") + + + + debug("Closing selector - processor " + id) closeAll() - swallowError(selector.close()) shutdownComplete() } - /** - * Close the given key and associated socket - */ - override def close(key: SelectionKey): Unit = { - lruConnections.remove(key) - super.close(key) - } - private def processNewResponses() { var curr = requestChannel.receiveResponse(id) while(curr != null) { - val key = curr.request.requestKey.asInstanceOf[SelectionKey] try { curr.responseAction match { case RequestChannel.NoOpAction => { @@ -423,26 +413,21 @@ private[kafka] class Processor(val id: Int, // that are sitting in the server's socket buffer curr.request.updateRequestMetrics trace("Socket server received empty response to send, registering for read: " + curr) - key.interestOps(SelectionKey.OP_READ) - key.attach(null) + selector.unmute(curr.request.connectionId) } case RequestChannel.SendAction => { - trace("Socket server received response to send, registering for write: " + curr) - key.interestOps(SelectionKey.OP_WRITE) - key.attach(curr) + trace("Socket server received response to send, registering for write and sending data: " + curr) + selector.send(curr.responseSend) + inflightResponses += (curr.request.connectionId -> curr) } case RequestChannel.CloseConnectionAction => { curr.request.updateRequestMetrics trace("Closing socket connection actively according to the response code.") - close(key) + selector.close(curr.request.connectionId) } - case responseCode => throw new KafkaException("No mapping found for response code " + responseCode) - } - } catch { - case e: CancelledKeyException => { - debug("Ignoring response for closed socket.") - close(key) } + + } finally { curr = requestChannel.receiveResponse(id) } @@ -464,84 +449,27 @@ private[kafka] class Processor(val id: Int, while(!newConnections.isEmpty) { val channel = newConnections.poll() debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress) - channel.register(selector, SelectionKey.OP_READ) + val localHost = channel.socket().getLocalAddress.getHostAddress + val localPort = channel.socket().getLocalPort + val remoteHost = channel.socket().getInetAddress.getHostAddress + val remotePort = channel.socket().getPort + val connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort + selector.register(connectionId, channel) } } - /* - * Process reads from ready sockets + /** + * Close all open connections */ - def read(key: SelectionKey) { - lruConnections.put(key, currentTimeNanos) - val socketChannel = channelFor(key) - var receive = key.attachment.asInstanceOf[Receive] - if(key.attachment == null) { - receive = new BoundedByteBufferReceive(maxRequestSize) - key.attach(receive) - } - val read = receive.readFrom(socketChannel) - val address = socketChannel.socket.getRemoteSocketAddress() - trace(read + " bytes read from " + address) - if(read < 0) { - close(key) - } else if(receive.complete) { - val port = socketChannel.socket().getLocalPort - val protocol = portToProtocol.get(port) - val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address, securityProtocol = protocol) - requestChannel.sendRequest(req) - key.attach(null) - // explicitly reset interest ops to not READ, no need to wake up the selector just yet - key.interestOps(key.interestOps & (~SelectionKey.OP_READ)) - } else { - // more reading to be done - trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress()) - key.interestOps(SelectionKey.OP_READ) - wakeup() - } + def closeAll() { + selector.close() } - /* - * Process writes to ready sockets + /** + * Wakeup the thread for selection. */ - def write(key: SelectionKey) { - val socketChannel = channelFor(key) - val response = key.attachment().asInstanceOf[RequestChannel.Response] - val responseSend = response.responseSend - if(responseSend == null) - throw new IllegalStateException("Registered for write interest but no response attached to key.") - val written = responseSend.writeTo(socketChannel) - trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " using key " + key) - if(responseSend.complete) { - response.request.updateRequestMetrics() - key.attach(null) - trace("Finished writing, registering for read on connection " + socketChannel.socket.getRemoteSocketAddress()) - key.interestOps(SelectionKey.OP_READ) - } else { - trace("Did not finish writing, registering for write again on connection " + socketChannel.socket.getRemoteSocketAddress()) - key.interestOps(SelectionKey.OP_WRITE) - wakeup() - } - } - - private def channelFor(key: SelectionKey) = key.channel().asInstanceOf[SocketChannel] - - private def maybeCloseOldestConnection { - if(currentTimeNanos > nextIdleCloseCheckTime) { - if(lruConnections.isEmpty) { - nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos - } else { - val oldestConnectionEntry = lruConnections.entrySet.iterator().next() - val connectionLastActiveTime = oldestConnectionEntry.getValue - nextIdleCloseCheckTime = connectionLastActiveTime + connectionsMaxIdleNanos - if(currentTimeNanos > nextIdleCloseCheckTime) { - val key: SelectionKey = oldestConnectionEntry.getKey - trace("About to close the idle connection from " + key.channel.asInstanceOf[SocketChannel].socket.getRemoteSocketAddress - + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis") - close(key) - } - } - } - } + @Override + def wakeup = selector.wakeup() } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/Transmission.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/Transmission.scala b/core/src/main/scala/kafka/network/Transmission.scala deleted file mode 100644 index 2827103..0000000 --- a/core/src/main/scala/kafka/network/Transmission.scala +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.network - -import java.nio._ -import java.nio.channels._ -import kafka.utils.Logging -import kafka.common.KafkaException - -/** - * Represents a stateful transfer of data to or from the network - */ -private[network] trait Transmission extends Logging { - - def complete: Boolean - - protected def expectIncomplete(): Unit = { - if(complete) - throw new KafkaException("This operation cannot be completed on a complete request.") - } - - protected def expectComplete(): Unit = { - if(!complete) - throw new KafkaException("This operation cannot be completed on an incomplete request.") - } - -} - -/** - * A transmission that is being received from a channel - */ -trait Receive extends Transmission { - - def buffer: ByteBuffer - - def readFrom(channel: ReadableByteChannel): Int - - def readCompletely(channel: ReadableByteChannel): Int = { - var totalRead = 0 - while(!complete) { - val read = readFrom(channel) - trace(read + " bytes read.") - totalRead += read - } - totalRead - } - -} - -/** - * A transmission that is being sent out to the channel - */ -trait Send extends Transmission { - - def writeTo(channel: GatheringByteChannel): Int - - def writeCompletely(channel: GatheringByteChannel): Int = { - var totalWritten = 0 - while(!complete) { - val written = writeTo(channel) - trace(written + " bytes written.") - totalWritten += written - } - totalWritten - } - -} - -/** - * A set of composite sends, sent one after another - */ -abstract class MultiSend[S <: Send](val sends: List[S]) extends Send { - val expectedBytesToWrite: Int - private var current = sends - var totalWritten = 0 - - /** - * This method continues to write to the socket buffer till an incomplete - * write happens. On an incomplete write, it returns to the caller to give it - * a chance to schedule other work till the buffered write completes. - */ - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete - var totalWrittenPerCall = 0 - var sendComplete: Boolean = false - do { - val written = current.head.writeTo(channel) - totalWritten += written - totalWrittenPerCall += written - sendComplete = current.head.complete - if(sendComplete) - current = current.tail - } while (!complete && sendComplete) - trace("Bytes written as part of multisend call : " + totalWrittenPerCall + "Total bytes written so far : " + totalWritten + "Expected bytes to write : " + expectedBytesToWrite) - totalWrittenPerCall - } - - def complete: Boolean = { - if (current == Nil) { - if (totalWritten != expectedBytesToWrite) - error("mismatch in sending bytes over socket; expected: " + expectedBytesToWrite + " actual: " + totalWritten) - true - } else { - false - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/producer/SyncProducer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala b/core/src/main/scala/kafka/producer/SyncProducer.scala index 0f09951..dcee501 100644 --- a/core/src/main/scala/kafka/producer/SyncProducer.scala +++ b/core/src/main/scala/kafka/producer/SyncProducer.scala @@ -17,11 +17,12 @@ package kafka.producer -import kafka.api._ -import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive} -import kafka.utils._ import java.util.Random +import kafka.api._ +import kafka.network.{RequestOrResponseSend, BlockingChannel} +import kafka.utils._ +import org.apache.kafka.common.network.NetworkReceive import org.apache.kafka.common.utils.Utils._ object SyncProducer { @@ -50,7 +51,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { * data. So, leaving the rest of the logging at TRACE, while errors should be logged at ERROR level */ if (logger.isDebugEnabled) { - val buffer = new BoundedByteBufferSend(request).buffer + val buffer = new RequestOrResponseSend("", request).buffer trace("verifying sendbuffer of size " + buffer.limit) val requestTypeId = buffer.getShort() if(requestTypeId == RequestKeys.ProduceKey) { @@ -63,12 +64,12 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { /** * Common functionality for the public send methods */ - private def doSend(request: RequestOrResponse, readResponse: Boolean = true): Receive = { + private def doSend(request: RequestOrResponse, readResponse: Boolean = true): NetworkReceive = { lock synchronized { verifyRequest(request) getOrMakeConnection() - var response: Receive = null + var response: NetworkReceive = null try { blockingChannel.send(request) if(readResponse) @@ -95,7 +96,7 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { producerRequestStats.getProducerRequestStats(config.host, config.port).requestSizeHist.update(requestSize) producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize) - var response: Receive = null + var response: NetworkReceive = null val specificTimer = producerRequestStats.getProducerRequestStats(config.host, config.port).requestTimer val aggregateTimer = producerRequestStats.getProducerRequestAllBrokersStats.requestTimer aggregateTimer.time { @@ -104,14 +105,14 @@ class SyncProducer(val config: SyncProducerConfig) extends Logging { } } if(producerRequest.requiredAcks != 0) - ProducerResponse.readFrom(response.buffer) + ProducerResponse.readFrom(response.payload) else null } def send(request: TopicMetadataRequest): TopicMetadataResponse = { val response = doSend(request) - TopicMetadataResponse.readFrom(response.buffer) + TopicMetadataResponse.readFrom(response.payload) } def close() = { http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 387e387..d63bc18 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -18,7 +18,6 @@ package kafka.server import org.apache.kafka.common.protocol.SecurityProtocol -import org.apache.kafka.common.requests.{JoinGroupResponse, JoinGroupRequest, HeartbeatRequest, HeartbeatResponse, ResponseHeader} import org.apache.kafka.common.TopicPartition import kafka.api._ import kafka.admin.AdminUtils @@ -28,10 +27,9 @@ import kafka.coordinator.ConsumerCoordinator import kafka.log._ import kafka.network._ import kafka.network.RequestChannel.Response +import org.apache.kafka.common.requests.{JoinGroupRequest, JoinGroupResponse, HeartbeatRequest, HeartbeatResponse, ResponseHeader, ResponseSend} import kafka.utils.{ZkUtils, ZKGroupTopicDirs, SystemTime, Logging} - import scala.collection._ - import org.I0Itec.zkclient.ZkClient /** @@ -54,7 +52,7 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handle(request: RequestChannel.Request) { try{ - trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress) + trace("Handling request: " + request.requestObj + " from connection: " + request.connectionId) request.requestId match { case RequestKeys.ProduceKey => handleProducerRequest(request) case RequestKeys.FetchKey => handleFetchRequest(request) @@ -84,7 +82,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (response == null) requestChannel.closeConnection(request.processor, request) else - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(respHeader, response))) + requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, respHeader, response))) } error("error when handling request %s".format(request.requestObj), e) } finally @@ -99,7 +97,7 @@ class KafkaApis(val requestChannel: RequestChannel, try { val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager) val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, leaderAndIsrResponse))) } catch { case e: KafkaStorageException => fatal("Disk error during leadership change.", e) @@ -114,7 +112,7 @@ class KafkaApis(val requestChannel: RequestChannel, val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest] val (response, error) = replicaManager.stopReplicas(stopReplicaRequest) val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, stopReplicaResponse))) replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads() } @@ -123,7 +121,7 @@ class KafkaApis(val requestChannel: RequestChannel, replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, metadataCache) val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, updateMetadataResponse))) } def handleControlledShutdownRequest(request: RequestChannel.Request) { @@ -134,7 +132,7 @@ class KafkaApis(val requestChannel: RequestChannel, val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId) val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId, ErrorMapping.NoError, partitionsRemaining) - requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(controlledShutdownResponse))) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, controlledShutdownResponse))) } @@ -158,7 +156,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } if (offsetCommitRequest.versionId == 0) { @@ -260,7 +258,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } else { val response = ProducerResponse(produceRequest.correlationId, responseStatus) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } } @@ -305,7 +303,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val response = FetchResponse(fetchRequest.correlationId, responsePartitionData) - requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response))) } // call the replica manager to fetch messages from the local replica @@ -363,7 +361,7 @@ class KafkaApis(val requestChannel: RequestChannel, } }) val response = OffsetResponse(offsetRequest.correlationId, responseMap) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } def fetchOffsets(logManager: LogManager, topicAndPartition: TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = { @@ -466,7 +464,7 @@ class KafkaApis(val requestChannel: RequestChannel, val brokers = metadataCache.getAliveBrokers trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(topicMetadata.mkString(","), brokers.mkString(","), metadataRequest.correlationId, metadataRequest.clientId)) val response = new TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)), topicMetadata, metadataRequest.correlationId) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } /* @@ -514,8 +512,10 @@ class KafkaApis(val requestChannel: RequestChannel, } trace("Sending offset fetch response %s for correlation id %d to client %s." - .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId)) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId)) + + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) + } /* @@ -540,7 +540,7 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Sending consumer metadata %s for correlation id %d to client %s." .format(response, consumerMetadataRequest.correlationId, consumerMetadataRequest.clientId)) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) + requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } def handleJoinGroupRequest(request: RequestChannel.Request) { @@ -554,8 +554,8 @@ class KafkaApis(val requestChannel: RequestChannel, val partitionList = partitions.map(tp => new TopicPartition(tp.topic, tp.partition)).toBuffer val responseBody = new JoinGroupResponse(errorCode, generationId, consumerId, partitionList) trace("Sending join group response %s for correlation id %d to client %s." - .format(responseBody, request.header.correlationId, request.header.clientId)) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, responseBody))) + .format(responseBody, request.header.correlationId, request.header.clientId)) + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, responseBody))) } // let the coordinator to handle join-group @@ -574,10 +574,10 @@ class KafkaApis(val requestChannel: RequestChannel, // the callback for sending a heartbeat response def sendResponseCallback(errorCode: Short) { - val responseBody = new HeartbeatResponse(errorCode) + val response = new HeartbeatResponse(errorCode) trace("Sending heartbeat response %s for correlation id %d to client %s." - .format(responseBody, request.header.correlationId, request.header.clientId)) - requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(respHeader, responseBody))) + .format(response, request.header.correlationId, request.header.clientId)) + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response))) } // let the coordinator to handle heartbeat http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 6f25afd..2d75186 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -17,6 +17,7 @@ package kafka.server +import java.util import java.util.Properties import kafka.api.ApiVersion @@ -24,9 +25,11 @@ import kafka.cluster.EndPoint import kafka.consumer.ConsumerConfig import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet} import kafka.utils.CoreUtils +import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.ConfigDef +import org.apache.kafka.common.metrics.MetricsReporter import org.apache.kafka.common.protocol.SecurityProtocol -import scala.collection.{immutable, JavaConversions, Map} +import scala.collection.{mutable, immutable, JavaConversions, Map} object Defaults { /** ********* Zookeeper Configuration ***********/ @@ -130,6 +133,10 @@ object Defaults { val DeleteTopicEnable = false val CompressionType = "producer" + + val MetricNumSamples = 2 + val MetricSampleWindowMs = 1000 + val MetricReporterClasses = "" } object KafkaConfig { @@ -240,6 +247,10 @@ object KafkaConfig { val DeleteTopicEnableProp = "delete.topic.enable" val CompressionTypeProp = "compression.type" + val MetricSampleWindowMsProp = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG + val MetricNumSamplesProp: String = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG + val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG + /* Documentation */ /** ********* Zookeeper Configuration ***********/ @@ -374,6 +385,10 @@ object KafkaConfig { "('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and " + "'producer' which means retain the original compression codec set by the producer." + val MetricSampleWindowMsDoc = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC + val MetricNumSamplesDoc = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC + val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC + private val configDef = { import ConfigDef.Range._ @@ -494,6 +509,9 @@ object KafkaConfig { .define(OffsetCommitRequiredAcksProp, SHORT, Defaults.OffsetCommitRequiredAcks, HIGH, OffsetCommitRequiredAcksDoc) .define(DeleteTopicEnableProp, BOOLEAN, Defaults.DeleteTopicEnable, HIGH, DeleteTopicEnableDoc) .define(CompressionTypeProp, STRING, Defaults.CompressionType, HIGH, CompressionTypeDoc) + .define(MetricNumSamplesProp, INT, Defaults.MetricNumSamples, atLeast(1), LOW, MetricNumSamplesDoc) + .define(MetricSampleWindowMsProp, LONG, Defaults.MetricSampleWindowMs, atLeast(1), LOW, MetricSampleWindowMsDoc) + .define(MetricReporterClassesProp, LIST, Defaults.MetricReporterClasses, LOW, MetricReporterClassesDoc) } def configNames() = { @@ -618,7 +636,10 @@ object KafkaConfig { offsetCommitTimeoutMs = parsed.get(OffsetCommitTimeoutMsProp).asInstanceOf[Int], offsetCommitRequiredAcks = parsed.get(OffsetCommitRequiredAcksProp).asInstanceOf[Short], deleteTopicEnable = parsed.get(DeleteTopicEnableProp).asInstanceOf[Boolean], - compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String] + compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String], + metricNumSamples = parsed.get(MetricNumSamplesProp).asInstanceOf[Int], + metricSampleWindowMs = parsed.get(MetricSampleWindowMsProp).asInstanceOf[Long], + _metricReporterClasses = parsed.get(MetricReporterClassesProp).asInstanceOf[java.util.List[String]] ) } @@ -653,7 +674,7 @@ object KafkaConfig { } } -class KafkaConfig(/** ********* Zookeeper Configuration ***********/ +class KafkaConfig (/** ********* Zookeeper Configuration ***********/ val zkConnect: String, val zkSessionTimeoutMs: Int = Defaults.ZkSessionTimeoutMs, private val _zkConnectionTimeoutMs: Option[Int] = None, @@ -766,7 +787,11 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val offsetCommitRequiredAcks: Short = Defaults.OffsetCommitRequiredAcks, val deleteTopicEnable: Boolean = Defaults.DeleteTopicEnable, - val compressionType: String = Defaults.CompressionType + val compressionType: String = Defaults.CompressionType, + + val metricSampleWindowMs: Long = Defaults.MetricSampleWindowMs, + val metricNumSamples: Int = Defaults.MetricNumSamples, + private val _metricReporterClasses: java.util.List[String] = util.Arrays.asList(Defaults.MetricReporterClasses) ) { val zkConnectionTimeoutMs: Int = _zkConnectionTimeoutMs.getOrElse(zkSessionTimeoutMs) @@ -786,6 +811,8 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ val maxConnectionsPerIpOverrides: Map[String, Int] = getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, _maxConnectionsPerIpOverrides).map { case (k, v) => (k, v.toInt)} + val metricReporterClasses: java.util.List[MetricsReporter] = getMetricClasses(_metricReporterClasses) + private def getLogRetentionTimeMillis: Long = { val millisInMinute = 60L * 1000L val millisInHour = 60L * millisInMinute @@ -850,6 +877,24 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ } } + private def getMetricClasses(metricClasses: java.util.List[String]): java.util.List[MetricsReporter] = { + + val reporterList = new util.ArrayList[MetricsReporter](); + val iterator = metricClasses.iterator() + + while (iterator.hasNext) { + val reporterName = iterator.next() + if (!reporterName.isEmpty) { + val reporter: MetricsReporter = CoreUtils.createObject[MetricsReporter](reporterName) + reporter.configure(toProps.asInstanceOf[java.util.Map[String, _]]) + reporterList.add(reporter) + } + } + + reporterList + + } + validateValues() @@ -992,6 +1037,9 @@ class KafkaConfig(/** ********* Zookeeper Configuration ***********/ props.put(OffsetCommitRequiredAcksProp, offsetCommitRequiredAcks.toString) props.put(DeleteTopicEnableProp, deleteTopicEnable.toString) props.put(CompressionTypeProp, compressionType.toString) + props.put(MetricNumSamplesProp, metricNumSamples.toString) + props.put(MetricSampleWindowMsProp, metricSampleWindowMs.toString) + props.put(MetricReporterClassesProp, JavaConversions.collectionAsScalaIterable(_metricReporterClasses).mkString(",")) props } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index e66710d..b320ce9 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -21,18 +21,21 @@ import kafka.admin._ import kafka.log.LogConfig import kafka.log.CleanerConfig import kafka.log.LogManager -import kafka.utils._ import java.util.concurrent._ import atomic.{AtomicInteger, AtomicBoolean} import java.io.File -import collection.mutable +import kafka.utils._ +import org.apache.kafka.common.metrics._ +import org.apache.kafka.common.network.NetworkReceive + +import scala.collection.{JavaConversions, mutable} import org.I0Itec.zkclient.ZkClient import kafka.controller.{ControllerStats, KafkaController} import kafka.cluster.{EndPoint, Broker} import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest} import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException} -import kafka.network.{Receive, BlockingChannel, SocketServer} +import kafka.network.{BlockingChannel, SocketServer} import kafka.metrics.KafkaMetricsGroup import com.yammer.metrics.core.Gauge import kafka.coordinator.ConsumerCoordinator @@ -48,6 +51,19 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg private var shutdownLatch = new CountDownLatch(1) + private val metricConfig: MetricConfig = new MetricConfig() + .samples(config.metricNumSamples) + .timeWindow(config.metricSampleWindowMs, TimeUnit.MILLISECONDS) + private val jmxPrefix: String = "kafka.server" + private val reporters: java.util.List[MetricsReporter] = config.metricReporterClasses + reporters.add(new JmxReporter(jmxPrefix)) + + + + // This exists so SocketServer (which uses Client libraries) can use the client Time objects without having to convert all of Kafka to use them + // Once we get rid of kafka.utils.time, we can get rid of this too + private val socketServerTime: org.apache.kafka.common.utils.Time = new org.apache.kafka.common.utils.SystemTime() + val brokerState: BrokerState = new BrokerState var apis: KafkaApis = null @@ -117,6 +133,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.brokerId = getBrokerId this.logIdent = "[Kafka Server " + config.brokerId + "], " + val metrics = new Metrics(metricConfig, reporters, socketServerTime) + + socketServer = new SocketServer(config.brokerId, config.listeners, config.numNetworkThreads, @@ -126,7 +145,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.socketRequestMaxBytes, config.maxConnectionsPerIp, config.connectionsMaxIdleMs, - config.maxConnectionsPerIpOverrides) + config.maxConnectionsPerIpOverrides, + socketServerTime, + metrics) socketServer.startup() /* start replica manager */ @@ -262,14 +283,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg // 2. issue a controlled shutdown to the controller if (channel != null) { - var response: Receive = null + var response: NetworkReceive = null try { // send the controlled shutdown request val request = new ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId) channel.send(request) response = channel.receive() - val shutdownResponse = ControlledShutdownResponse.readFrom(response.buffer) + val shutdownResponse = ControlledShutdownResponse.readFrom(response.payload()) if (shutdownResponse.errorCode == ErrorMapping.NoError && shutdownResponse.partitionsRemaining != null && shutdownResponse.partitionsRemaining.size == 0) { shutdownSucceeded = true http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/server/MessageSetSend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/MessageSetSend.scala b/core/src/main/scala/kafka/server/MessageSetSend.scala deleted file mode 100644 index 5667648..0000000 --- a/core/src/main/scala/kafka/server/MessageSetSend.scala +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import java.nio._ -import java.nio.channels._ -import kafka.network._ -import kafka.message._ -import kafka.utils._ -import kafka.common.ErrorMapping - -/** - * A zero-copy message response that writes the bytes needed directly from the file - * wholly in kernel space - */ -@nonthreadsafe -private[server] class MessageSetSend(val messages: MessageSet, val errorCode: Short) extends Send { - - private var sent: Int = 0 - private val size: Int = messages.sizeInBytes - private val header = ByteBuffer.allocate(6) - header.putInt(size + 2) - header.putShort(errorCode) - header.rewind() - - var complete: Boolean = false - - def this(messages: MessageSet) = this(messages, ErrorMapping.NoError) - - def this() = this(MessageSet.Empty) - - def writeTo(channel: GatheringByteChannel): Int = { - expectIncomplete() - var written = 0 - if(header.hasRemaining) - written += channel.write(header) - if(!header.hasRemaining) { - val fileBytesSent = messages.writeTo(channel, sent, size - sent) - written += fileBytesSent - sent += fileBytesSent - } - - if(logger.isTraceEnabled) - if (channel.isInstanceOf[SocketChannel]) { - val socketChannel = channel.asInstanceOf[SocketChannel] - logger.trace(sent + " bytes written to " + socketChannel.socket.getRemoteSocketAddress() + " expecting to send " + size + " bytes") - } - - if(sent >= size) - complete = true - written - } - - def sendSize: Int = size + header.capacity - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index ad64cee..3d52f62 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -162,7 +162,7 @@ object ConsumerOffsetChecker extends Logging { debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port)) channel.send(OffsetFetchRequest(group, topicPartitions)) - val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer) + val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().payload()) debug("Received offset fetch response %s.".format(offsetFetchResponse)) offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/test/scala/other/kafka/TestOffsetManager.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala b/core/src/test/scala/other/kafka/TestOffsetManager.scala index 4e90534..8047da4 100644 --- a/core/src/test/scala/other/kafka/TestOffsetManager.scala +++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala @@ -72,7 +72,7 @@ object TestOffsetManager { offsetsChannel.send(commitRequest) numCommits.getAndIncrement commitTimer.time { - val response = OffsetCommitResponse.readFrom(offsetsChannel.receive().buffer) + val response = OffsetCommitResponse.readFrom(offsetsChannel.receive().payload()) if (response.commitStatus.exists(_._2 != ErrorMapping.NoError)) numErrors.getAndIncrement } offset += 1 @@ -119,7 +119,7 @@ object TestOffsetManager { val group = "group-" + id try { metadataChannel.send(ConsumerMetadataRequest(group)) - val coordinatorId = ConsumerMetadataResponse.readFrom(metadataChannel.receive().buffer).coordinatorOpt.map(_.id).getOrElse(-1) + val coordinatorId = ConsumerMetadataResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1) val channel = if (channels.contains(coordinatorId)) channels(coordinatorId) @@ -135,7 +135,7 @@ object TestOffsetManager { channel.send(fetchRequest) fetchTimer.time { - val response = OffsetFetchResponse.readFrom(channel.receive().buffer) + val response = OffsetFetchResponse.readFrom(channel.receive().payload()) if (response.requestInfo.exists(_._2.error != ErrorMapping.NoError)) { numErrors.getAndIncrement } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/test/scala/unit/kafka/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala index bc4aef3..4cb92de 100644 --- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala @@ -20,6 +20,7 @@ import java.io.{FileOutputStream, File} import java.security.Permission import kafka.Kafka +import kafka.server.KafkaConfig import org.junit.{After, Before, Test} import junit.framework.Assert._ @@ -57,20 +58,20 @@ class KafkaTest { val propertiesFile = prepareDefaultConfig() // We should load configuration file without any arguments - val config1 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile)) + val config1 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile))) assertEquals(1, config1.brokerId) // We should be able to override given property on command line - val config2 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "broker.id=2")) + val config2 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "broker.id=2"))) assertEquals(2, config2.brokerId) // We should be also able to set completely new property - val config3 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact")) + val config3 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact"))) assertEquals(1, config3.brokerId) assertEquals("compact", config3.logCleanupPolicy) // We should be also able to set several properties - val config4 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact", "--override", "broker.id=2")) + val config4 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact", "--override", "broker.id=2"))) assertEquals(2, config4.brokerId) assertEquals("compact", config4.logCleanupPolicy) } @@ -78,25 +79,25 @@ class KafkaTest { @Test(expected = classOf[ExitCalled]) def testGetKafkaConfigFromArgsWrongSetValue(): Unit = { val propertiesFile = prepareDefaultConfig() - Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "a=b=c")) + KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "a=b=c"))) } @Test(expected = classOf[ExitCalled]) def testGetKafkaConfigFromArgsNonArgsAtTheEnd(): Unit = { val propertiesFile = prepareDefaultConfig() - Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "broker.id=1", "broker.id=2")) + KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "broker.id=1", "broker.id=2"))) } @Test(expected = classOf[ExitCalled]) def testGetKafkaConfigFromArgsNonArgsOnly(): Unit = { val propertiesFile = prepareDefaultConfig() - Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "broker.id=1", "broker.id=2")) + KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "broker.id=1", "broker.id=2"))) } @Test(expected = classOf[ExitCalled]) def testGetKafkaConfigFromArgsNonArgsAtTheBegging(): Unit = { val propertiesFile = prepareDefaultConfig() - Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "broker.id=1", "--override", "broker.id=2")) + KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "broker.id=1", "--override", "broker.id=2"))) } def prepareDefaultConfig(): String = { http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/test/scala/unit/kafka/network/SocketServerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 95d5621..7dc2fad 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -20,7 +20,10 @@ package kafka.network; import java.net._ import java.io._ import kafka.cluster.EndPoint +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network.NetworkSend import org.apache.kafka.common.protocol.SecurityProtocol +import org.apache.kafka.common.utils.SystemTime import org.junit._ import org.scalatest.junit.JUnitSuite import java.util.Random @@ -46,7 +49,9 @@ class SocketServerTest extends JUnitSuite { maxRequestSize = 50, maxConnectionsPerIp = 5, connectionsMaxIdleMs = 60*1000, - maxConnectionsPerIpOverrides = Map.empty[String,Int]) + maxConnectionsPerIpOverrides = Map.empty[String,Int], + new SystemTime(), + new Metrics()) server.startup() def sendRequest(socket: Socket, id: Short, request: Array[Byte]) { @@ -71,7 +76,7 @@ class SocketServerTest extends JUnitSuite { val byteBuffer = ByteBuffer.allocate(request.requestObj.sizeInBytes) request.requestObj.writeTo(byteBuffer) byteBuffer.rewind() - val send = new BoundedByteBufferSend(byteBuffer) + val send = new NetworkSend(request.connectionId, byteBuffer) channel.sendResponse(new RequestChannel.Response(request.processor, request, send)) } @@ -112,33 +117,17 @@ class SocketServerTest extends JUnitSuite { assertEquals(serializedBytes.toSeq, receiveResponse(traceSocket).toSeq) } - @Test(expected = classOf[IOException]) + @Test def tooBigRequestIsRejected() { val tooManyBytes = new Array[Byte](server.maxRequestSize + 1) new Random().nextBytes(tooManyBytes) val socket = connect() sendRequest(socket, 0, tooManyBytes) - receiveResponse(socket) - } - - @Test - def testNullResponse() { - val socket = connect() - val bytes = new Array[Byte](40) - sendRequest(socket, 0, bytes) - - val request = server.requestChannel.receiveRequest - // Since the response is not sent yet, the selection key should not be readable. - TestUtils.waitUntilTrue( - () => { (request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) != SelectionKey.OP_READ }, - "Socket key shouldn't be available for read") - - server.requestChannel.sendResponse(new RequestChannel.Response(0, request, null)) - - // After the response is sent to the client (which is async and may take a bit of time), the socket key should be available for reads. - TestUtils.waitUntilTrue( - () => { (request.requestKey.asInstanceOf[SelectionKey].interestOps & SelectionKey.OP_READ) == SelectionKey.OP_READ }, - "Socket key should be available for reads") + try { + receiveResponse(socket) + } catch { + case e: IOException => // thats fine + } } @Test @@ -198,7 +187,9 @@ class SocketServerTest extends JUnitSuite { maxRequestSize = 50, maxConnectionsPerIp = 5, connectionsMaxIdleMs = 60*1000, - maxConnectionsPerIpOverrides = overrides) + maxConnectionsPerIpOverrides = overrides, + new SystemTime(), + new Metrics()) overrideServer.startup() // make the maximum allowable number of connections and then leak them val conns = ((0 until overrideNum).map(i => connect(overrideServer))) http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala index 71f48c0..ace6321 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala @@ -245,6 +245,10 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { //BrokerCompressionCodec.isValid(compressionType) case KafkaConfig.CompressionTypeProp => expected.setProperty(name, randFrom(BrokerCompressionCodec.brokerCompressionOptions)) + case KafkaConfig.MetricNumSamplesProp => expected.setProperty(name, "2") + case KafkaConfig.MetricSampleWindowMsProp => expected.setProperty(name, "1000") + case KafkaConfig.MetricReporterClassesProp => expected.setProperty(name, "") + case nonNegativeIntProperty => expected.setProperty(name, nextInt(Int.MaxValue).toString) } }) @@ -348,6 +352,10 @@ class KafkaConfigConfigDefTest extends JUnit3Suite { case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") + case KafkaConfig.MetricNumSamplesProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0") + case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0") + case KafkaConfig.MetricReporterClassesProp => // ignore string + case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1") } })