Repository: kafka
Updated Branches:
  refs/heads/trunk d22987f01 -> 78ba492e3


http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala 
b/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
deleted file mode 100755
index c0d7726..0000000
--- a/core/src/main/scala/kafka/network/BoundedByteBufferReceive.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.network
-
-import java.nio._
-import java.nio.channels._
-import kafka.utils._
-
-/**
- * Represents a communication between the client and server
- * 
- */
-@nonthreadsafe
-private[kafka] class BoundedByteBufferReceive(val maxSize: Int) extends 
Receive with Logging {
-  
-  private val sizeBuffer = ByteBuffer.allocate(4)
-  private var contentBuffer: ByteBuffer = null
-  
-  def this() = this(Int.MaxValue)
-  
-  var complete: Boolean = false
-  
-  /**
-   * Get the content buffer for this transmission
-   */
-  def buffer: ByteBuffer = {
-    expectComplete()
-    contentBuffer
-  }
-  
-  /**
-   * Read the bytes in this response from the given channel
-   */
-  def readFrom(channel: ReadableByteChannel): Int = {
-    expectIncomplete()
-    var read = 0
-    // have we read the request size yet?
-    if(sizeBuffer.remaining > 0)
-      read += CoreUtils.read(channel, sizeBuffer)
-    // have we allocated the request buffer yet?
-    if(contentBuffer == null && !sizeBuffer.hasRemaining) {
-      sizeBuffer.rewind()
-      val size = sizeBuffer.getInt()
-      if(size <= 0)
-        throw new InvalidRequestException("%d is not a valid request 
size.".format(size))
-      if(size > maxSize)
-        throw new InvalidRequestException("Request of length %d is not valid, 
it is larger than the maximum size of %d bytes.".format(size, maxSize))
-      contentBuffer = byteBufferAllocate(size)
-    }
-    // if we have a buffer read some stuff into it
-    if(contentBuffer != null) {
-      read = CoreUtils.read(channel, contentBuffer)
-      // did we get everything?
-      if(!contentBuffer.hasRemaining) {
-        contentBuffer.rewind()
-        complete = true
-      }
-    }
-    read
-  }
-
-  private def byteBufferAllocate(size: Int): ByteBuffer = {
-    var buffer: ByteBuffer = null
-    try {
-      buffer = ByteBuffer.allocate(size)
-    } catch {
-      case e: OutOfMemoryError =>
-        error("OOME with size " + size, e)
-        throw e
-      case e2: Throwable =>
-        throw e2
-    }
-    buffer
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala 
b/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
deleted file mode 100644
index b95b73b..0000000
--- a/core/src/main/scala/kafka/network/BoundedByteBufferSend.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.network
-
-import java.nio._
-import java.nio.channels._
-import kafka.utils._
-import kafka.api.RequestOrResponse
-import org.apache.kafka.common.requests.{AbstractRequestResponse, 
ResponseHeader}
-
-@nonthreadsafe
-private[kafka] class BoundedByteBufferSend(val buffer: ByteBuffer) extends 
Send {
-  
-  private val sizeBuffer = ByteBuffer.allocate(4)
-
-  // Avoid possibility of overflow for 2GB-4 byte buffer
-  if(buffer.remaining > Int.MaxValue - sizeBuffer.limit)
-    throw new IllegalStateException("Attempt to create a bounded buffer of " + 
buffer.remaining + " bytes, but the maximum " +
-                                       "allowable size for a bounded buffer is 
" + (Int.MaxValue - sizeBuffer.limit) + ".")    
-  sizeBuffer.putInt(buffer.limit)
-  sizeBuffer.rewind()
-
-  var complete: Boolean = false
-
-  def this(size: Int) = this(ByteBuffer.allocate(size))
-  
-  def this(request: RequestOrResponse) = {
-    this(request.sizeInBytes + (if(request.requestId != None) 2 else 0))
-    request.requestId match {
-      case Some(requestId) =>
-        buffer.putShort(requestId)
-      case None =>
-    }
-
-    request.writeTo(buffer)
-    buffer.rewind()
-  }
-
-  def this(header: ResponseHeader, body: AbstractRequestResponse) = {
-    this(header.sizeOf + body.sizeOf)
-    header.writeTo(buffer)
-    body.writeTo(buffer)
-    buffer.rewind
-  }
-
-  
-  def writeTo(channel: GatheringByteChannel): Int = {
-    expectIncomplete()
-    val written = channel.write(Array(sizeBuffer, buffer))
-    // if we are done, mark it off
-    if(!buffer.hasRemaining)
-      complete = true    
-    written.asInstanceOf[Int]
-  }
-    
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/ByteBufferSend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/ByteBufferSend.scala 
b/core/src/main/scala/kafka/network/ByteBufferSend.scala
deleted file mode 100644
index af30042..0000000
--- a/core/src/main/scala/kafka/network/ByteBufferSend.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.network
-
-import java.nio._
-import java.nio.channels._
-import kafka.utils._
-
-@nonthreadsafe
-private[kafka] class ByteBufferSend(val buffer: ByteBuffer) extends Send {
-  
-  var complete: Boolean = false
-
-  def this(size: Int) = this(ByteBuffer.allocate(size))
-  
-  def writeTo(channel: GatheringByteChannel): Int = {
-    expectIncomplete()
-    var written = 0
-    written += channel.write(buffer)
-    if(!buffer.hasRemaining)
-      complete = true
-    written
-  }
-    
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/Handler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/Handler.scala 
b/core/src/main/scala/kafka/network/Handler.scala
index a030033..1a7d56e 100644
--- a/core/src/main/scala/kafka/network/Handler.scala
+++ b/core/src/main/scala/kafka/network/Handler.scala
@@ -17,17 +17,19 @@
 
 package kafka.network
 
+import org.apache.kafka.common.network.{NetworkReceive, Send}
+
 private[kafka] object Handler {
   
   /**
    * A request handler is a function that turns an incoming 
    * transmission into an outgoing transmission
    */
-  type Handler = Receive => Option[Send]
+  type Handler = NetworkReceive => Option[Send]
   
   /**
    * A handler mapping finds the right Handler function for a given request
    */
-  type HandlerMapping = (Short, Receive) => Handler
+  type HandlerMapping = (Short, NetworkReceive) => Handler
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 24edb61..357d8b9 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -17,22 +17,23 @@
 
 package kafka.network
 
+import java.nio.ByteBuffer
 import java.util.concurrent._
-import kafka.metrics.KafkaMetricsGroup
+
 import com.yammer.metrics.core.Gauge
-import java.nio.ByteBuffer
 import kafka.api._
 import kafka.common.TopicAndPartition
-import kafka.utils.{Logging, SystemTime}
 import kafka.message.ByteBufferMessageSet
-import java.net._
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.{Logging, SystemTime}
+import org.apache.kafka.common.network.Send
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader}
 import org.apache.log4j.Logger
 
 
 object RequestChannel extends Logging {
-  val AllDone = new Request(processor = 1, requestKey = 2, buffer = 
getShutdownReceive(), startTimeMs = 0, securityProtocol = 
SecurityProtocol.PLAINTEXT)
+  val AllDone = new Request(processor = 1, connectionId = "2", buffer = 
getShutdownReceive(), startTimeMs = 0, securityProtocol = 
SecurityProtocol.PLAINTEXT)
 
   def getShutdownReceive() = {
     val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, 
collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]())
@@ -43,7 +44,7 @@ object RequestChannel extends Logging {
     byteBuffer
   }
 
-  case class Request(processor: Int, requestKey: Any, private var buffer: 
ByteBuffer, startTimeMs: Long, remoteAddress: SocketAddress = new 
InetSocketAddress(0), securityProtocol: SecurityProtocol) {
+  case class Request(processor: Int, connectionId: String, private var buffer: 
ByteBuffer, startTimeMs: Long, securityProtocol: SecurityProtocol) {
     @volatile var requestDequeueTimeMs = -1L
     @volatile var apiLocalCompleteTimeMs = -1L
     @volatile var responseCompleteTimeMs = -1L
@@ -71,7 +72,15 @@ object RequestChannel extends Logging {
 
     buffer = null
     private val requestLogger = Logger.getLogger("kafka.request.logger")
-    trace("Processor %d received request : %s".format(processor, if 
(requestObj != null) requestObj.describe(false) else header.toString + " : " + 
body.toString))
+
+    private def requestDesc: String = {
+      if (requestObj != null)
+        requestObj.describe(false)
+      else
+        header.toString + " -- " + body.toString
+    }
+
+    trace("Processor %d received request : %s".format(processor, requestDesc))
 
     def updateRequestMetrics() {
       val endTimeMs = SystemTime.milliseconds
@@ -102,13 +111,13 @@ object RequestChannel extends Logging {
              m.responseSendTimeHist.update(responseSendTime)
              m.totalTimeHist.update(totalTime)
       }
+
       if(requestLogger.isTraceEnabled)
-        requestLogger.trace("Completed request:%s from client 
%s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
-          .format(if (requestObj != null) requestObj.describe(true) else 
header.toString + " : " + body.toString, remoteAddress, totalTime, 
requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
responseSendTime))
-      else if(requestLogger.isDebugEnabled) {
-        requestLogger.debug("Completed request:%s from client 
%s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
-          .format(if (requestObj != null) requestObj.describe(false) else 
header.toString + " : " + body.toString, remoteAddress, totalTime, 
requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
responseSendTime))
-      }
+        requestLogger.trace("Completed request:%s from connection 
%s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
+                .format(requestDesc, connectionId, totalTime, 
requestQueueTime, apiLocalTime, apiRemoteTime, responseQueueTime, 
responseSendTime))
+      else if(requestLogger.isDebugEnabled)
+        requestLogger.debug("Completed request:%s from connection 
%s;totalTime:%d,requestQueueTime:%d,localTime:%d,remoteTime:%d,responseQueueTime:%d,sendTime:%d"
+          .format(requestDesc, connectionId, totalTime, requestQueueTime, 
apiLocalTime, apiRemoteTime, responseQueueTime, responseSendTime))
     }
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestOrResponseSend.scala 
b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
new file mode 100644
index 0000000..364f24b
--- /dev/null
+++ b/core/src/main/scala/kafka/network/RequestOrResponseSend.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.nio.ByteBuffer
+import java.nio.channels.GatheringByteChannel
+
+import kafka.api.RequestOrResponse
+import kafka.utils.Logging
+import org.apache.kafka.common.network.NetworkSend
+
+object RequestOrResponseSend {
+  def serialize(request: RequestOrResponse): ByteBuffer = {
+    val buffer = ByteBuffer.allocate(request.sizeInBytes + 
(if(request.requestId != None) 2 else 0))
+    request.requestId match {
+      case Some(requestId) =>
+        buffer.putShort(requestId)
+      case None =>
+    }
+    request.writeTo(buffer)
+    buffer.rewind()
+    buffer
+  }
+}
+
+class RequestOrResponseSend(val dest: String, val buffer: ByteBuffer) extends 
NetworkSend(dest, buffer) with Logging {
+
+  def this(dest: String, request: RequestOrResponse) {
+    this(dest, RequestOrResponseSend.serialize(request))
+  }
+
+  def writeCompletely(channel: GatheringByteChannel): Long = {
+    var totalWritten = 0L
+    while(!completed()) {
+      val written = writeTo(channel)
+      trace(written + " bytes written.")
+      totalWritten += written
+    }
+    totalWritten
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index edf6214..91319fa 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -17,23 +17,26 @@
 
 package kafka.network
 
+import java.io.IOException
+import java.net._
+import java.nio.channels._
 import java.util
 import java.util.concurrent._
 import java.util.concurrent.atomic._
-import java.net._
-import java.io._
-import java.nio.channels._
 
+import com.yammer.metrics.core.Gauge
 import kafka.cluster.EndPoint
-import org.apache.kafka.common.protocol.SecurityProtocol
-
-import scala.collection._
-
 import kafka.common.KafkaException
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
-import com.yammer.metrics.core.{Gauge, Meter}
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.InvalidReceiveException
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.protocol.types.SchemaException
+import org.apache.kafka.common.utils.{SystemTime, Time, Utils}
+
+import scala.collection._
 
 /**
  * An NIO socket server. The threading model is
@@ -50,16 +53,20 @@ class SocketServer(val brokerId: Int,
                    val maxRequestSize: Int = Int.MaxValue,
                    val maxConnectionsPerIp: Int = Int.MaxValue,
                    val connectionsMaxIdleMs: Long,
-                   val maxConnectionsPerIpOverrides: Map[String, Int] ) 
extends Logging with KafkaMetricsGroup {
+                   val maxConnectionsPerIpOverrides: Map[String, Int],
+                   val time: Time,
+                   val metrics: Metrics) extends Logging with 
KafkaMetricsGroup {
   this.logIdent = "[Socket Server on Broker " + brokerId + "], "
-  private val time = SystemTime
+
   private val processors = new Array[Processor](numProcessorThreads)
   private[network] var acceptors =  mutable.Map[EndPoint,Acceptor]()
   val requestChannel = new RequestChannel(numProcessorThreads, 
maxQueuedRequests)
 
-  /* a meter to track the average free capacity of the network processors */
-  private val aggregateIdleMeter = newMeter("NetworkProcessorAvgIdlePercent", 
"percent", TimeUnit.NANOSECONDS)
-
+  private val allMetricNames = (0 until numProcessorThreads).map { i =>
+    val tags = new util.HashMap[String, String]()
+    tags.put("networkProcessor", i.toString)
+    new MetricName("io-wait-ratio", "socket-server-metrics", tags)
+  }
 
   /* I'm pushing the mapping of port-to-protocol to the processor level,
      so the processor can put the correct protocol in the request channel.
@@ -75,26 +82,30 @@ class SocketServer(val brokerId: Int,
   def startup() {
     val quotas = new ConnectionQuotas(maxConnectionsPerIp, 
maxConnectionsPerIpOverrides)
 
+    newGauge("NetworkProcessorAvgIdlePercent",
+      new Gauge[Double] {
+        def value = allMetricNames.map( metricName =>
+          metrics.metrics().get(metricName).value()).sum / numProcessorThreads
+      }
+    )
+
+
     this.synchronized {
       for (i <- 0 until numProcessorThreads) {
         processors(i) = new Processor(i,
           time,
           maxRequestSize,
-          aggregateIdleMeter,
-          newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, 
Map("networkProcessor" -> i.toString)),
           numProcessorThreads,
           requestChannel,
           quotas,
           connectionsMaxIdleMs,
-          portToProtocol)
+          portToProtocol,
+          metrics
+          )
         Utils.newThread("kafka-network-thread-%d-%d".format(brokerId, i), 
processors(i), false).start()
       }
     }
 
-    newGauge("ResponsesBeingSent", new Gauge[Int] {
-      def value = processors.foldLeft(0) { (total, p) => total + 
p.countInterestOps(SelectionKey.OP_WRITE) }
-    })
-
     // register the processor threads for notification of responses
     requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
    
@@ -140,18 +151,19 @@ class SocketServer(val brokerId: Int,
  */
 private[kafka] abstract class AbstractServerThread(connectionQuotas: 
ConnectionQuotas) extends Runnable with Logging {
 
-  protected val selector = Selector.open()
   private val startupLatch = new CountDownLatch(1)
   private val shutdownLatch = new CountDownLatch(1)
   private val alive = new AtomicBoolean(true)
 
+  def wakeup()
+
   /**
    * Initiates a graceful shutdown by signaling to stop and waiting for the 
shutdown to complete
    */
   def shutdown(): Unit = {
     alive.set(false)
-    selector.wakeup()
-    shutdownLatch.await
+    wakeup()
+    shutdownLatch.await()
   }
 
   /**
@@ -163,13 +175,13 @@ private[kafka] abstract class 
AbstractServerThread(connectionQuotas: ConnectionQ
    * Record that the thread startup is complete
    */
   protected def startupComplete() = {
-    startupLatch.countDown
+    startupLatch.countDown()
   }
 
   /**
    * Record that the thread shutdown is complete
    */
-  protected def shutdownComplete() = shutdownLatch.countDown
+  protected def shutdownComplete() = shutdownLatch.countDown()
 
   /**
    * Is the server still running?
@@ -177,11 +189,6 @@ private[kafka] abstract class 
AbstractServerThread(connectionQuotas: ConnectionQ
   protected def isRunning = alive.get
   
   /**
-   * Wakeup the thread for selection.
-   */
-  def wakeup() = selector.wakeup()
-  
-  /**
    * Close the given key and associated socket
    */
   def close(key: SelectionKey) {
@@ -200,30 +207,6 @@ private[kafka] abstract class 
AbstractServerThread(connectionQuotas: ConnectionQ
       swallowError(channel.close())
     }
   }
-  
-  /**
-   * Close all open connections
-   */
-  def closeAll() {
-    // removes cancelled keys from selector.keys set
-    this.selector.selectNow() 
-    val iter = this.selector.keys().iterator()
-    while (iter.hasNext) {
-      val key = iter.next()
-      close(key)
-    }
-  }
-
-  def countInterestOps(ops: Int): Int = {
-    var count = 0
-    val it = this.selector.keys().iterator()
-    while (it.hasNext) {
-      if ((it.next().interestOps() & ops) != 0) {
-        count += 1
-      }
-    }
-    count
-  }
 }
 
 /**
@@ -237,6 +220,7 @@ private[kafka] class Acceptor(val host: String,
                               connectionQuotas: ConnectionQuotas,
                               protocol: SecurityProtocol,
                               portToProtocol: ConcurrentHashMap[Int, 
SecurityProtocol]) extends AbstractServerThread(connectionQuotas) {
+  val nioSelector = java.nio.channels.Selector.open()
   val serverChannel = openServerSocket(host, port)
   portToProtocol.put(serverChannel.socket().getLocalPort, protocol)
 
@@ -244,13 +228,13 @@ private[kafka] class Acceptor(val host: String,
    * Accept loop that checks for new connection attempts
    */
   def run() {
-    serverChannel.register(selector, SelectionKey.OP_ACCEPT)
+    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT);
     startupComplete()
     var currentProcessor = 0
     while(isRunning) {
-      val ready = selector.select(500)
+      val ready = nioSelector.select(500)
       if(ready > 0) {
-        val keys = selector.selectedKeys()
+        val keys = nioSelector.selectedKeys()
         val iter = keys.iterator()
         while(iter.hasNext && isRunning) {
           var key: SelectionKey = null
@@ -258,7 +242,7 @@ private[kafka] class Acceptor(val host: String,
             key = iter.next
             iter.remove()
             if(key.isAcceptable)
-               accept(key, processors(currentProcessor))
+              accept(key, processors(currentProcessor))
             else
                throw new IllegalStateException("Unrecognized key state for 
acceptor thread.")
 
@@ -272,7 +256,7 @@ private[kafka] class Acceptor(val host: String,
     }
     debug("Closing server socket and selector.")
     swallowError(serverChannel.close())
-    swallowError(selector.close())
+    swallowError(nioSelector.close())
     shutdownComplete()
   }
   
@@ -290,7 +274,7 @@ private[kafka] class Acceptor(val host: String,
     serverChannel.socket().setReceiveBufferSize(recvBufferSize)
     try {
       serverChannel.socket.bind(socketAddress)
-      info("Awaiting socket connections on 
%s:%d.".format(socketAddress.getHostName, port))
+      info("Awaiting socket connections on 
%s:%d.".format(socketAddress.getHostName, serverChannel.socket.getLocalPort))
     } catch {
       case e: SocketException => 
         throw new KafkaException("Socket server failed to bind to %s:%d: 
%s.".format(socketAddress.getHostName, port, e.getMessage), e)
@@ -324,6 +308,12 @@ private[kafka] class Acceptor(val host: String,
     }
   }
 
+  /**
+   * Wakeup the thread for selection.
+   */
+  @Override
+  def wakeup = nioSelector.wakeup()
+
 }
 
 /**
@@ -333,19 +323,36 @@ private[kafka] class Acceptor(val host: String,
 private[kafka] class Processor(val id: Int,
                                val time: Time,
                                val maxRequestSize: Int,
-                               val aggregateIdleMeter: Meter,
-                               val idleMeter: Meter,
                                val totalProcessorThreads: Int,
                                val requestChannel: RequestChannel,
                                connectionQuotas: ConnectionQuotas,
                                val connectionsMaxIdleMs: Long,
-                               val portToProtocol: 
ConcurrentHashMap[Int,SecurityProtocol]) extends 
AbstractServerThread(connectionQuotas) {
+                               val portToProtocol: 
ConcurrentHashMap[Int,SecurityProtocol],
+                               val metrics: Metrics) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
   private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
-  private val connectionsMaxIdleNanos = connectionsMaxIdleMs * 1000 * 1000
-  private var currentTimeNanos = SystemTime.nanoseconds
-  private val lruConnections = new util.LinkedHashMap[SelectionKey, Long]
-  private var nextIdleCloseCheckTime = currentTimeNanos + 
connectionsMaxIdleNanos
+  private val inflightResponses = mutable.Map[String, 
RequestChannel.Response]()
+
+  private val metricTags = new util.HashMap[String, String]()
+  metricTags.put("networkProcessor", id.toString)
+
+  newGauge("IdlePercent",
+    new Gauge[Double] {
+      def value = {
+        metrics.metrics().get(new MetricName("io-wait-ratio", 
"socket-server-metrics", metricTags)).value()
+      }
+    },
+    JavaConversions.mapAsScalaMap(metricTags)
+  )
+
+  private val selector = new org.apache.kafka.common.network.Selector(
+    maxRequestSize,
+    connectionsMaxIdleMs,
+    metrics,
+    time,
+    "socket-server",
+    metricTags,
+    false)
 
   override def run() {
     startupComplete()
@@ -354,68 +361,51 @@ private[kafka] class Processor(val id: Int,
       configureNewConnections()
       // register any new responses for writing
       processNewResponses()
-      val startSelectTime = SystemTime.nanoseconds
-      val ready = selector.select(300)
-      currentTimeNanos = SystemTime.nanoseconds
-      val idleTime = currentTimeNanos - startSelectTime
-      idleMeter.mark(idleTime)
-      // We use a single meter for aggregate idle percentage for the thread 
pool.
-      // Since meter is calculated as total_recorded_value / time_window and
-      // time_window is independent of the number of threads, each recorded 
idle
-      // time should be discounted by # threads.
-      aggregateIdleMeter.mark(idleTime / totalProcessorThreads)
-
-      trace("Processor id " + id + " selection time = " + idleTime + " ns")
-      if(ready > 0) {
-        val keys = selector.selectedKeys()
-        val iter = keys.iterator()
-        while(iter.hasNext && isRunning) {
-          var key: SelectionKey = null
-          try {
-            key = iter.next
-            iter.remove()
-            if(key.isReadable)
-              read(key)
-            else if(key.isWritable)
-              write(key)
-            else if(!key.isValid)
-              close(key)
-            else
-              throw new IllegalStateException("Unrecognized key state for 
processor thread.")
-          } catch {
-            case e: EOFException => {
-              debug("Closing socket connection to 
%s.".format(channelFor(key).socket.getInetAddress))
-              close(key)
-            } case e: InvalidRequestException => {
-              info("Closing socket connection to %s due to invalid request: 
%s".format(channelFor(key).socket.getInetAddress, e.getMessage))
-              close(key)
-            } case e: Throwable => {
-              error("Closing socket for " + 
channelFor(key).socket.getInetAddress + " because of error", e)
-              close(key)
-            }
-          }
+
+      try {
+        selector.poll(300)
+      } catch {
+        case e @ (_: IllegalStateException | _: IOException) => {
+          error("Closing processor %s due to illegal state or IO 
exception".format(id))
+          swallow(closeAll())
+          shutdownComplete()
+          throw e
         }
+        case e: InvalidReceiveException =>
+          // Log warning and continue since Selector already closed the 
connection
+          warn("Connection was closed due to invalid receive. Processor will 
continue handling other connections")
       }
-      maybeCloseOldestConnection
+      
collection.JavaConversions.collectionAsScalaIterable(selector.completedReceives).foreach(
 receive => {
+        try {
+          val req = RequestChannel.Request(processor = id, connectionId = 
receive.source, buffer = receive.payload, startTimeMs = time.milliseconds, 
securityProtocol = SecurityProtocol.PLAINTEXT)
+          requestChannel.sendRequest(req)
+        } catch {
+          case e @ (_: InvalidRequestException | _: SchemaException) => {
+            // note that even though we got an exception, we can assume that 
receive.source is valid. Issues with constructing a valid receive object were 
handled earlier
+            error("Closing socket for " + receive.source + " because of 
error", e)
+            selector.close(receive.source)
+          }
+        }
+        selector.mute(receive.source)
+      })
+
+      
collection.JavaConversions.iterableAsScalaIterable(selector.completedSends()).foreach(
 send => {
+        val resp = inflightResponses.remove(send.destination()).get
+        resp.request.updateRequestMetrics()
+        selector.unmute(send.destination())
+      })
     }
-    debug("Closing selector.")
+
+
+
+    debug("Closing selector - processor " + id)
     closeAll()
-    swallowError(selector.close())
     shutdownComplete()
   }
 
-  /**
-   * Close the given key and associated socket
-   */
-  override def close(key: SelectionKey): Unit = {
-    lruConnections.remove(key)
-    super.close(key)
-  }
-
   private def processNewResponses() {
     var curr = requestChannel.receiveResponse(id)
     while(curr != null) {
-      val key = curr.request.requestKey.asInstanceOf[SelectionKey]
       try {
         curr.responseAction match {
           case RequestChannel.NoOpAction => {
@@ -423,26 +413,21 @@ private[kafka] class Processor(val id: Int,
             // that are sitting in the server's socket buffer
             curr.request.updateRequestMetrics
             trace("Socket server received empty response to send, registering 
for read: " + curr)
-            key.interestOps(SelectionKey.OP_READ)
-            key.attach(null)
+            selector.unmute(curr.request.connectionId)
           }
           case RequestChannel.SendAction => {
-            trace("Socket server received response to send, registering for 
write: " + curr)
-            key.interestOps(SelectionKey.OP_WRITE)
-            key.attach(curr)
+            trace("Socket server received response to send, registering for 
write and sending data: " + curr)
+            selector.send(curr.responseSend)
+            inflightResponses += (curr.request.connectionId -> curr)
           }
           case RequestChannel.CloseConnectionAction => {
             curr.request.updateRequestMetrics
             trace("Closing socket connection actively according to the 
response code.")
-            close(key)
+            selector.close(curr.request.connectionId)
           }
-          case responseCode => throw new KafkaException("No mapping found for 
response code " + responseCode)
-        }
-      } catch {
-        case e: CancelledKeyException => {
-          debug("Ignoring response for closed socket.")
-          close(key)
         }
+
+
       } finally {
         curr = requestChannel.receiveResponse(id)
       }
@@ -464,84 +449,27 @@ private[kafka] class Processor(val id: Int,
     while(!newConnections.isEmpty) {
       val channel = newConnections.poll()
       debug("Processor " + id + " listening to new connection from " + 
channel.socket.getRemoteSocketAddress)
-      channel.register(selector, SelectionKey.OP_READ)
+      val localHost = channel.socket().getLocalAddress.getHostAddress
+      val localPort = channel.socket().getLocalPort
+      val remoteHost = channel.socket().getInetAddress.getHostAddress
+      val remotePort = channel.socket().getPort
+      val connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" 
+ remotePort
+      selector.register(connectionId, channel)
     }
   }
 
-  /*
-   * Process reads from ready sockets
+  /**
+   * Close all open connections
    */
-  def read(key: SelectionKey) {
-    lruConnections.put(key, currentTimeNanos)
-    val socketChannel = channelFor(key)
-    var receive = key.attachment.asInstanceOf[Receive]
-    if(key.attachment == null) {
-      receive = new BoundedByteBufferReceive(maxRequestSize)
-      key.attach(receive)
-    }
-    val read = receive.readFrom(socketChannel)
-    val address = socketChannel.socket.getRemoteSocketAddress()
-    trace(read + " bytes read from " + address)
-    if(read < 0) {
-      close(key)
-    } else if(receive.complete) {
-      val port = socketChannel.socket().getLocalPort
-      val protocol = portToProtocol.get(port)
-      val req = RequestChannel.Request(processor = id, requestKey = key, 
buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = 
address, securityProtocol = protocol)
-      requestChannel.sendRequest(req)
-      key.attach(null)
-      // explicitly reset interest ops to not READ, no need to wake up the 
selector just yet
-      key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
-    } else {
-      // more reading to be done
-      trace("Did not finish reading, registering for read again on connection 
" + socketChannel.socket.getRemoteSocketAddress())
-      key.interestOps(SelectionKey.OP_READ)
-      wakeup()
-    }
+  def closeAll() {
+    selector.close()
   }
 
-  /*
-   * Process writes to ready sockets
+  /**
+   * Wakeup the thread for selection.
    */
-  def write(key: SelectionKey) {
-    val socketChannel = channelFor(key)
-    val response = key.attachment().asInstanceOf[RequestChannel.Response]
-    val responseSend = response.responseSend
-    if(responseSend == null)
-      throw new IllegalStateException("Registered for write interest but no 
response attached to key.")
-    val written = responseSend.writeTo(socketChannel)
-    trace(written + " bytes written to " + 
socketChannel.socket.getRemoteSocketAddress() + " using key " + key)
-    if(responseSend.complete) {
-      response.request.updateRequestMetrics()
-      key.attach(null)
-      trace("Finished writing, registering for read on connection " + 
socketChannel.socket.getRemoteSocketAddress())
-      key.interestOps(SelectionKey.OP_READ)
-    } else {
-      trace("Did not finish writing, registering for write again on connection 
" + socketChannel.socket.getRemoteSocketAddress())
-      key.interestOps(SelectionKey.OP_WRITE)
-      wakeup()
-    }
-  }
-
-  private def channelFor(key: SelectionKey) = 
key.channel().asInstanceOf[SocketChannel]
-
-  private def maybeCloseOldestConnection {
-    if(currentTimeNanos > nextIdleCloseCheckTime) {
-      if(lruConnections.isEmpty) {
-        nextIdleCloseCheckTime = currentTimeNanos + connectionsMaxIdleNanos
-      } else {
-        val oldestConnectionEntry = lruConnections.entrySet.iterator().next()
-        val connectionLastActiveTime = oldestConnectionEntry.getValue
-        nextIdleCloseCheckTime = connectionLastActiveTime + 
connectionsMaxIdleNanos
-        if(currentTimeNanos > nextIdleCloseCheckTime) {
-          val key: SelectionKey = oldestConnectionEntry.getKey
-          trace("About to close the idle connection from " + 
key.channel.asInstanceOf[SocketChannel].socket.getRemoteSocketAddress
-            + " due to being idle for " + (currentTimeNanos - 
connectionLastActiveTime) / 1000 / 1000 + " millis")
-          close(key)
-        }
-      }
-    }
-  }
+  @Override
+  def wakeup = selector.wakeup()
 
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/network/Transmission.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/Transmission.scala 
b/core/src/main/scala/kafka/network/Transmission.scala
deleted file mode 100644
index 2827103..0000000
--- a/core/src/main/scala/kafka/network/Transmission.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.network
-
-import java.nio._
-import java.nio.channels._
-import kafka.utils.Logging
-import kafka.common.KafkaException
-
-/**
- * Represents a stateful transfer of data to or from the network
- */
-private[network] trait Transmission extends Logging {
-  
-  def complete: Boolean
-  
-  protected def expectIncomplete(): Unit = {
-    if(complete)
-      throw new KafkaException("This operation cannot be completed on a 
complete request.")
-  }
-  
-  protected def expectComplete(): Unit = {
-    if(!complete)
-      throw new KafkaException("This operation cannot be completed on an 
incomplete request.")
-  }
-  
-}
-
-/**
- * A transmission that is being received from a channel
- */
-trait Receive extends Transmission {
-  
-  def buffer: ByteBuffer
-  
-  def readFrom(channel: ReadableByteChannel): Int
-  
-  def readCompletely(channel: ReadableByteChannel): Int = {
-    var totalRead = 0
-    while(!complete) {
-      val read = readFrom(channel)
-      trace(read + " bytes read.")
-      totalRead += read
-    }
-    totalRead
-  }
-  
-}
-
-/**
- * A transmission that is being sent out to the channel
- */
-trait Send extends Transmission {
-    
-  def writeTo(channel: GatheringByteChannel): Int
-
-  def writeCompletely(channel: GatheringByteChannel): Int = {
-    var totalWritten = 0
-    while(!complete) {
-      val written = writeTo(channel)
-      trace(written + " bytes written.")
-      totalWritten += written
-    }
-    totalWritten
-  }
-    
-}
-
-/**
- * A set of composite sends, sent one after another
- */
-abstract class MultiSend[S <: Send](val sends: List[S]) extends Send {
-  val expectedBytesToWrite: Int
-  private var current = sends
-  var totalWritten = 0
-
-  /**
-   *  This method continues to write to the socket buffer till an incomplete
-   *  write happens. On an incomplete write, it returns to the caller to give 
it
-   *  a chance to schedule other work till the buffered write completes.
-   */
-  def writeTo(channel: GatheringByteChannel): Int = {
-    expectIncomplete
-    var totalWrittenPerCall = 0
-    var sendComplete: Boolean = false
-    do {
-      val written = current.head.writeTo(channel)
-      totalWritten += written
-      totalWrittenPerCall += written
-      sendComplete = current.head.complete
-      if(sendComplete)
-        current = current.tail
-    } while (!complete && sendComplete)
-    trace("Bytes written as part of multisend call : " + totalWrittenPerCall + 
 "Total bytes written so far : " + totalWritten + "Expected bytes to write : " 
+ expectedBytesToWrite)
-    totalWrittenPerCall
-  }
-  
-  def complete: Boolean = {
-    if (current == Nil) {
-      if (totalWritten != expectedBytesToWrite)
-        error("mismatch in sending bytes over socket; expected: " + 
expectedBytesToWrite + " actual: " + totalWritten)
-      true
-    } else {
-      false
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/producer/SyncProducer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala 
b/core/src/main/scala/kafka/producer/SyncProducer.scala
index 0f09951..dcee501 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -17,11 +17,12 @@
 
 package kafka.producer
 
-import kafka.api._
-import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
-import kafka.utils._
 import java.util.Random
 
+import kafka.api._
+import kafka.network.{RequestOrResponseSend, BlockingChannel}
+import kafka.utils._
+import org.apache.kafka.common.network.NetworkReceive
 import org.apache.kafka.common.utils.Utils._
 
 object SyncProducer {
@@ -50,7 +51,7 @@ class SyncProducer(val config: SyncProducerConfig) extends 
Logging {
      * data. So, leaving the rest of the logging at TRACE, while errors should 
be logged at ERROR level
      */
     if (logger.isDebugEnabled) {
-      val buffer = new BoundedByteBufferSend(request).buffer
+      val buffer = new RequestOrResponseSend("", request).buffer
       trace("verifying sendbuffer of size " + buffer.limit)
       val requestTypeId = buffer.getShort()
       if(requestTypeId == RequestKeys.ProduceKey) {
@@ -63,12 +64,12 @@ class SyncProducer(val config: SyncProducerConfig) extends 
Logging {
   /**
    * Common functionality for the public send methods
    */
-  private def doSend(request: RequestOrResponse, readResponse: Boolean = 
true): Receive = {
+  private def doSend(request: RequestOrResponse, readResponse: Boolean = 
true): NetworkReceive = {
     lock synchronized {
       verifyRequest(request)
       getOrMakeConnection()
 
-      var response: Receive = null
+      var response: NetworkReceive = null
       try {
         blockingChannel.send(request)
         if(readResponse)
@@ -95,7 +96,7 @@ class SyncProducer(val config: SyncProducerConfig) extends 
Logging {
     producerRequestStats.getProducerRequestStats(config.host, 
config.port).requestSizeHist.update(requestSize)
     
producerRequestStats.getProducerRequestAllBrokersStats.requestSizeHist.update(requestSize)
 
-    var response: Receive = null
+    var response: NetworkReceive = null
     val specificTimer = 
producerRequestStats.getProducerRequestStats(config.host, 
config.port).requestTimer
     val aggregateTimer = 
producerRequestStats.getProducerRequestAllBrokersStats.requestTimer
     aggregateTimer.time {
@@ -104,14 +105,14 @@ class SyncProducer(val config: SyncProducerConfig) 
extends Logging {
       }
     }
     if(producerRequest.requiredAcks != 0)
-      ProducerResponse.readFrom(response.buffer)
+      ProducerResponse.readFrom(response.payload)
     else
       null
   }
 
   def send(request: TopicMetadataRequest): TopicMetadataResponse = {
     val response = doSend(request)
-    TopicMetadataResponse.readFrom(response.buffer)
+    TopicMetadataResponse.readFrom(response.payload)
   }
 
   def close() = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 387e387..d63bc18 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import org.apache.kafka.common.protocol.SecurityProtocol
-import org.apache.kafka.common.requests.{JoinGroupResponse, JoinGroupRequest, 
HeartbeatRequest, HeartbeatResponse, ResponseHeader}
 import org.apache.kafka.common.TopicPartition
 import kafka.api._
 import kafka.admin.AdminUtils
@@ -28,10 +27,9 @@ import kafka.coordinator.ConsumerCoordinator
 import kafka.log._
 import kafka.network._
 import kafka.network.RequestChannel.Response
+import org.apache.kafka.common.requests.{JoinGroupRequest, JoinGroupResponse, 
HeartbeatRequest, HeartbeatResponse, ResponseHeader, ResponseSend}
 import kafka.utils.{ZkUtils, ZKGroupTopicDirs, SystemTime, Logging}
-
 import scala.collection._
-
 import org.I0Itec.zkclient.ZkClient
 
 /**
@@ -54,7 +52,7 @@ class KafkaApis(val requestChannel: RequestChannel,
    */
   def handle(request: RequestChannel.Request) {
     try{
-      trace("Handling request: " + request.requestObj + " from client: " + 
request.remoteAddress)
+      trace("Handling request: " + request.requestObj + " from connection: " + 
request.connectionId)
       request.requestId match {
         case RequestKeys.ProduceKey => handleProducerRequest(request)
         case RequestKeys.FetchKey => handleFetchRequest(request)
@@ -84,7 +82,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           if (response == null)
             requestChannel.closeConnection(request.processor, request)
           else
-            requestChannel.sendResponse(new Response(request, new 
BoundedByteBufferSend(respHeader, response)))
+            requestChannel.sendResponse(new Response(request, new 
ResponseSend(request.connectionId, respHeader, response)))
         }
         error("error when handling request %s".format(request.requestObj), e)
     } finally
@@ -99,7 +97,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     try {
       val (response, error) = 
replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager)
       val leaderAndIsrResponse = new 
LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error)
-      requestChannel.sendResponse(new Response(request, new 
BoundedByteBufferSend(leaderAndIsrResponse)))
+      requestChannel.sendResponse(new Response(request, new 
RequestOrResponseSend(request.connectionId, leaderAndIsrResponse)))
     } catch {
       case e: KafkaStorageException =>
         fatal("Disk error during leadership change.", e)
@@ -114,7 +112,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val stopReplicaRequest = 
request.requestObj.asInstanceOf[StopReplicaRequest]
     val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
     val stopReplicaResponse = new 
StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error)
-    requestChannel.sendResponse(new Response(request, new 
BoundedByteBufferSend(stopReplicaResponse)))
+    requestChannel.sendResponse(new Response(request, new 
RequestOrResponseSend(request.connectionId, stopReplicaResponse)))
     replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
   }
 
@@ -123,7 +121,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     replicaManager.maybeUpdateMetadataCache(updateMetadataRequest, 
metadataCache)
 
     val updateMetadataResponse = new 
UpdateMetadataResponse(updateMetadataRequest.correlationId)
-    requestChannel.sendResponse(new Response(request, new 
BoundedByteBufferSend(updateMetadataResponse)))
+    requestChannel.sendResponse(new Response(request, new 
RequestOrResponseSend(request.connectionId, updateMetadataResponse)))
   }
 
   def handleControlledShutdownRequest(request: RequestChannel.Request) {
@@ -134,7 +132,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val partitionsRemaining = 
controller.shutdownBroker(controlledShutdownRequest.brokerId)
     val controlledShutdownResponse = new 
ControlledShutdownResponse(controlledShutdownRequest.correlationId,
       ErrorMapping.NoError, partitionsRemaining)
-    requestChannel.sendResponse(new Response(request, new 
BoundedByteBufferSend(controlledShutdownResponse)))
+    requestChannel.sendResponse(new Response(request, new 
RequestOrResponseSend(request.connectionId, controlledShutdownResponse)))
   }
 
 
@@ -158,7 +156,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       val response = OffsetCommitResponse(commitStatus, 
offsetCommitRequest.correlationId)
-      requestChannel.sendResponse(new RequestChannel.Response(request, new 
BoundedByteBufferSend(response)))
+      requestChannel.sendResponse(new RequestChannel.Response(request, new 
RequestOrResponseSend(request.connectionId, response)))
     }
 
     if (offsetCommitRequest.versionId == 0) {
@@ -260,7 +258,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
       } else {
         val response = ProducerResponse(produceRequest.correlationId, 
responseStatus)
-        requestChannel.sendResponse(new RequestChannel.Response(request, new 
BoundedByteBufferSend(response)))
+        requestChannel.sendResponse(new RequestChannel.Response(request, new 
RequestOrResponseSend(request.connectionId, response)))
       }
     }
 
@@ -305,7 +303,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       val response = FetchResponse(fetchRequest.correlationId, 
responsePartitionData)
-      requestChannel.sendResponse(new RequestChannel.Response(request, new 
FetchResponseSend(response)))
+      requestChannel.sendResponse(new RequestChannel.Response(request, new 
FetchResponseSend(request.connectionId, response)))
     }
 
     // call the replica manager to fetch messages from the local replica
@@ -363,7 +361,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     })
     val response = OffsetResponse(offsetRequest.correlationId, responseMap)
-    requestChannel.sendResponse(new RequestChannel.Response(request, new 
BoundedByteBufferSend(response)))
+    requestChannel.sendResponse(new RequestChannel.Response(request, new 
RequestOrResponseSend(request.connectionId, response)))
   }
 
   def fetchOffsets(logManager: LogManager, topicAndPartition: 
TopicAndPartition, timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
@@ -466,7 +464,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val brokers = metadataCache.getAliveBrokers
     trace("Sending topic metadata %s and brokers %s for correlation id %d to 
client %s".format(topicMetadata.mkString(","), brokers.mkString(","), 
metadataRequest.correlationId, metadataRequest.clientId))
     val response = new 
TopicMetadataResponse(brokers.map(_.getBrokerEndPoint(request.securityProtocol)),
 topicMetadata, metadataRequest.correlationId)
-    requestChannel.sendResponse(new RequestChannel.Response(request, new 
BoundedByteBufferSend(response)))
+    requestChannel.sendResponse(new RequestChannel.Response(request, new 
RequestOrResponseSend(request.connectionId, response)))
   }
 
   /*
@@ -514,8 +512,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     trace("Sending offset fetch response %s for correlation id %d to client 
%s."
-      .format(response, offsetFetchRequest.correlationId, 
offsetFetchRequest.clientId))
-    requestChannel.sendResponse(new RequestChannel.Response(request, new 
BoundedByteBufferSend(response)))
+          .format(response, offsetFetchRequest.correlationId, 
offsetFetchRequest.clientId))
+
+    requestChannel.sendResponse(new RequestChannel.Response(request, new 
RequestOrResponseSend(request.connectionId, response)))
+
   }
 
   /*
@@ -540,7 +540,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     trace("Sending consumer metadata %s for correlation id %d to client %s."
           .format(response, consumerMetadataRequest.correlationId, 
consumerMetadataRequest.clientId))
-    requestChannel.sendResponse(new RequestChannel.Response(request, new 
BoundedByteBufferSend(response)))
+    requestChannel.sendResponse(new RequestChannel.Response(request, new 
RequestOrResponseSend(request.connectionId, response)))
   }
 
   def handleJoinGroupRequest(request: RequestChannel.Request) {
@@ -554,8 +554,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       val partitionList = partitions.map(tp => new TopicPartition(tp.topic, 
tp.partition)).toBuffer
       val responseBody = new JoinGroupResponse(errorCode, generationId, 
consumerId, partitionList)
       trace("Sending join group response %s for correlation id %d to client 
%s."
-        .format(responseBody, request.header.correlationId, 
request.header.clientId))
-      requestChannel.sendResponse(new RequestChannel.Response(request, new 
BoundedByteBufferSend(respHeader, responseBody)))
+              .format(responseBody, request.header.correlationId, 
request.header.clientId))
+      requestChannel.sendResponse(new RequestChannel.Response(request, new 
ResponseSend(request.connectionId, respHeader, responseBody)))
     }
 
     // let the coordinator to handle join-group
@@ -574,10 +574,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // the callback for sending a heartbeat response
     def sendResponseCallback(errorCode: Short) {
-      val responseBody = new HeartbeatResponse(errorCode)
+      val response = new HeartbeatResponse(errorCode)
       trace("Sending heartbeat response %s for correlation id %d to client %s."
-        .format(responseBody, request.header.correlationId, 
request.header.clientId))
-      requestChannel.sendResponse(new RequestChannel.Response(request, new 
BoundedByteBufferSend(respHeader, responseBody)))
+              .format(response, request.header.correlationId, 
request.header.clientId))
+      requestChannel.sendResponse(new RequestChannel.Response(request, new 
ResponseSend(request.connectionId, respHeader, response)))
     }
 
     // let the coordinator to handle heartbeat

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 6f25afd..2d75186 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import java.util
 import java.util.Properties
 
 import kafka.api.ApiVersion
@@ -24,9 +25,11 @@ import kafka.cluster.EndPoint
 import kafka.consumer.ConsumerConfig
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, 
MessageSet}
 import kafka.utils.CoreUtils
+import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.common.metrics.MetricsReporter
 import org.apache.kafka.common.protocol.SecurityProtocol
-import scala.collection.{immutable, JavaConversions, Map}
+import scala.collection.{mutable, immutable, JavaConversions, Map}
 
 object Defaults {
   /** ********* Zookeeper Configuration ***********/
@@ -130,6 +133,10 @@ object Defaults {
   val DeleteTopicEnable = false
 
   val CompressionType = "producer"
+
+  val MetricNumSamples = 2
+  val MetricSampleWindowMs = 1000
+  val MetricReporterClasses = ""
 }
 
 object KafkaConfig {
@@ -240,6 +247,10 @@ object KafkaConfig {
   val DeleteTopicEnableProp = "delete.topic.enable"
   val CompressionTypeProp = "compression.type"
 
+  val MetricSampleWindowMsProp = 
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG
+  val MetricNumSamplesProp: String = 
CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG
+  val MetricReporterClassesProp: String = 
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
+
 
   /* Documentation */
   /** ********* Zookeeper Configuration ***********/
@@ -374,6 +385,10 @@ object KafkaConfig {
     "('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is 
equivalent to no compression; and " +
     "'producer' which means retain the original compression codec set by the 
producer."
 
+  val MetricSampleWindowMsDoc = 
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC
+  val MetricNumSamplesDoc = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC
+  val MetricReporterClassesDoc = 
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC
+
 
   private val configDef = {
     import ConfigDef.Range._
@@ -494,6 +509,9 @@ object KafkaConfig {
       .define(OffsetCommitRequiredAcksProp, SHORT, 
Defaults.OffsetCommitRequiredAcks, HIGH, OffsetCommitRequiredAcksDoc)
       .define(DeleteTopicEnableProp, BOOLEAN, Defaults.DeleteTopicEnable, 
HIGH, DeleteTopicEnableDoc)
       .define(CompressionTypeProp, STRING, Defaults.CompressionType, HIGH, 
CompressionTypeDoc)
+      .define(MetricNumSamplesProp, INT, Defaults.MetricNumSamples, 
atLeast(1), LOW, MetricNumSamplesDoc)
+      .define(MetricSampleWindowMsProp, LONG, Defaults.MetricSampleWindowMs, 
atLeast(1), LOW, MetricSampleWindowMsDoc)
+      .define(MetricReporterClassesProp, LIST, Defaults.MetricReporterClasses, 
LOW, MetricReporterClassesDoc)
   }
 
   def configNames() = {
@@ -618,7 +636,10 @@ object KafkaConfig {
       offsetCommitTimeoutMs = 
parsed.get(OffsetCommitTimeoutMsProp).asInstanceOf[Int],
       offsetCommitRequiredAcks = 
parsed.get(OffsetCommitRequiredAcksProp).asInstanceOf[Short],
       deleteTopicEnable = 
parsed.get(DeleteTopicEnableProp).asInstanceOf[Boolean],
-      compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String]
+      compressionType = parsed.get(CompressionTypeProp).asInstanceOf[String],
+      metricNumSamples = parsed.get(MetricNumSamplesProp).asInstanceOf[Int],
+      metricSampleWindowMs = 
parsed.get(MetricSampleWindowMsProp).asInstanceOf[Long],
+      _metricReporterClasses = 
parsed.get(MetricReporterClassesProp).asInstanceOf[java.util.List[String]]
     )
   }
 
@@ -653,7 +674,7 @@ object KafkaConfig {
   }
 }
 
-class KafkaConfig(/** ********* Zookeeper Configuration ***********/
+class KafkaConfig (/** ********* Zookeeper Configuration ***********/
                   val zkConnect: String,
                   val zkSessionTimeoutMs: Int = Defaults.ZkSessionTimeoutMs,
                   private val _zkConnectionTimeoutMs: Option[Int] = None,
@@ -766,7 +787,11 @@ class KafkaConfig(/** ********* Zookeeper Configuration 
***********/
                   val offsetCommitRequiredAcks: Short = 
Defaults.OffsetCommitRequiredAcks,
 
                   val deleteTopicEnable: Boolean = Defaults.DeleteTopicEnable,
-                  val compressionType: String = Defaults.CompressionType
+                  val compressionType: String = Defaults.CompressionType,
+
+                  val metricSampleWindowMs: Long = 
Defaults.MetricSampleWindowMs,
+                  val metricNumSamples: Int = Defaults.MetricNumSamples,
+                  private val _metricReporterClasses: java.util.List[String] = 
util.Arrays.asList(Defaults.MetricReporterClasses)
                    ) {
 
   val zkConnectionTimeoutMs: Int = 
_zkConnectionTimeoutMs.getOrElse(zkSessionTimeoutMs)
@@ -786,6 +811,8 @@ class KafkaConfig(/** ********* Zookeeper Configuration 
***********/
   val maxConnectionsPerIpOverrides: Map[String, Int] =
     getMap(KafkaConfig.MaxConnectionsPerIpOverridesProp, 
_maxConnectionsPerIpOverrides).map { case (k, v) => (k, v.toInt)}
 
+  val metricReporterClasses: java.util.List[MetricsReporter] = 
getMetricClasses(_metricReporterClasses)
+
   private def getLogRetentionTimeMillis: Long = {
     val millisInMinute = 60L * 1000L
     val millisInHour = 60L * millisInMinute
@@ -850,6 +877,24 @@ class KafkaConfig(/** ********* Zookeeper Configuration 
***********/
     }
   }
 
+  private def getMetricClasses(metricClasses: java.util.List[String]): 
java.util.List[MetricsReporter] = {
+
+    val reporterList = new util.ArrayList[MetricsReporter]();
+    val iterator = metricClasses.iterator()
+
+    while (iterator.hasNext) {
+      val reporterName = iterator.next()
+      if (!reporterName.isEmpty) {
+        val reporter: MetricsReporter = 
CoreUtils.createObject[MetricsReporter](reporterName)
+        reporter.configure(toProps.asInstanceOf[java.util.Map[String, _]])
+        reporterList.add(reporter)
+      }
+    }
+
+    reporterList
+
+  }
+
 
 
   validateValues()
@@ -992,6 +1037,9 @@ class KafkaConfig(/** ********* Zookeeper Configuration 
***********/
     props.put(OffsetCommitRequiredAcksProp, offsetCommitRequiredAcks.toString)
     props.put(DeleteTopicEnableProp, deleteTopicEnable.toString)
     props.put(CompressionTypeProp, compressionType.toString)
+    props.put(MetricNumSamplesProp, metricNumSamples.toString)
+    props.put(MetricSampleWindowMsProp, metricSampleWindowMs.toString)
+    props.put(MetricReporterClassesProp, 
JavaConversions.collectionAsScalaIterable(_metricReporterClasses).mkString(","))
 
     props
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index e66710d..b320ce9 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -21,18 +21,21 @@ import kafka.admin._
 import kafka.log.LogConfig
 import kafka.log.CleanerConfig
 import kafka.log.LogManager
-import kafka.utils._
 import java.util.concurrent._
 import atomic.{AtomicInteger, AtomicBoolean}
 import java.io.File
 
-import collection.mutable
+import kafka.utils._
+import org.apache.kafka.common.metrics._
+import org.apache.kafka.common.network.NetworkReceive
+
+import scala.collection.{JavaConversions, mutable}
 import org.I0Itec.zkclient.ZkClient
 import kafka.controller.{ControllerStats, KafkaController}
 import kafka.cluster.{EndPoint, Broker}
 import kafka.api.{ControlledShutdownResponse, ControlledShutdownRequest}
 import kafka.common.{ErrorMapping, InconsistentBrokerIdException, 
GenerateBrokerIdException}
-import kafka.network.{Receive, BlockingChannel, SocketServer}
+import kafka.network.{BlockingChannel, SocketServer}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import kafka.coordinator.ConsumerCoordinator
@@ -48,6 +51,19 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
 
   private var shutdownLatch = new CountDownLatch(1)
 
+  private val metricConfig: MetricConfig = new MetricConfig()
+          .samples(config.metricNumSamples)
+          .timeWindow(config.metricSampleWindowMs, TimeUnit.MILLISECONDS)
+  private val jmxPrefix: String = "kafka.server"
+  private val reporters: java.util.List[MetricsReporter] =  
config.metricReporterClasses
+  reporters.add(new JmxReporter(jmxPrefix))
+
+
+
+  // This exists so SocketServer (which uses Client libraries) can use the 
client Time objects without having to convert all of Kafka to use them
+  // Once we get rid of kafka.utils.time, we can get rid of this too
+  private val socketServerTime: org.apache.kafka.common.utils.Time = new 
org.apache.kafka.common.utils.SystemTime()
+
   val brokerState: BrokerState = new BrokerState
 
   var apis: KafkaApis = null
@@ -117,6 +133,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
         config.brokerId =  getBrokerId
         this.logIdent = "[Kafka Server " + config.brokerId + "], "
 
+        val metrics = new Metrics(metricConfig, reporters, socketServerTime)
+
+
         socketServer = new SocketServer(config.brokerId,
                                         config.listeners,
                                         config.numNetworkThreads,
@@ -126,7 +145,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
                                         config.socketRequestMaxBytes,
                                         config.maxConnectionsPerIp,
                                         config.connectionsMaxIdleMs,
-                                        config.maxConnectionsPerIpOverrides)
+                                        config.maxConnectionsPerIpOverrides,
+                                        socketServerTime,
+                                        metrics)
           socketServer.startup()
 
           /* start replica manager */
@@ -262,14 +283,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime) extends Logg
 
           // 2. issue a controlled shutdown to the controller
           if (channel != null) {
-            var response: Receive = null
+            var response: NetworkReceive = null
             try {
               // send the controlled shutdown request
               val request = new 
ControlledShutdownRequest(correlationId.getAndIncrement, config.brokerId)
               channel.send(request)
 
               response = channel.receive()
-              val shutdownResponse = 
ControlledShutdownResponse.readFrom(response.buffer)
+              val shutdownResponse = 
ControlledShutdownResponse.readFrom(response.payload())
               if (shutdownResponse.errorCode == ErrorMapping.NoError && 
shutdownResponse.partitionsRemaining != null &&
                   shutdownResponse.partitionsRemaining.size == 0) {
                 shutdownSucceeded = true

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/server/MessageSetSend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/MessageSetSend.scala 
b/core/src/main/scala/kafka/server/MessageSetSend.scala
deleted file mode 100644
index 5667648..0000000
--- a/core/src/main/scala/kafka/server/MessageSetSend.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.nio._
-import java.nio.channels._
-import kafka.network._
-import kafka.message._
-import kafka.utils._
-import kafka.common.ErrorMapping
-
-/**
- * A zero-copy message response that writes the bytes needed directly from the 
file
- * wholly in kernel space
- */
-@nonthreadsafe
-private[server] class MessageSetSend(val messages: MessageSet, val errorCode: 
Short) extends Send {
-  
-  private var sent: Int = 0
-  private val size: Int = messages.sizeInBytes
-  private val header = ByteBuffer.allocate(6)
-  header.putInt(size + 2)
-  header.putShort(errorCode)
-  header.rewind()
-  
-  var complete: Boolean = false
-
-  def this(messages: MessageSet) = this(messages, ErrorMapping.NoError)
-
-  def this() = this(MessageSet.Empty)
-
-  def writeTo(channel: GatheringByteChannel): Int = {
-    expectIncomplete()
-    var written = 0
-    if(header.hasRemaining)
-      written += channel.write(header)
-    if(!header.hasRemaining) {
-      val fileBytesSent = messages.writeTo(channel, sent, size - sent)
-      written += fileBytesSent
-      sent += fileBytesSent
-    }
-
-    if(logger.isTraceEnabled)
-      if (channel.isInstanceOf[SocketChannel]) {
-        val socketChannel = channel.asInstanceOf[SocketChannel]
-        logger.trace(sent + " bytes written to " + 
socketChannel.socket.getRemoteSocketAddress() + " expecting to send " + size + 
" bytes")
-      }
-
-    if(sent >= size)
-      complete = true
-    written
-  }
-  
-  def sendSize: Int = size + header.capacity
-  
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 
b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index ad64cee..3d52f62 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -162,7 +162,7 @@ object ConsumerOffsetChecker extends Logging {
 
       debug("Sending offset fetch request to coordinator 
%s:%d.".format(channel.host, channel.port))
       channel.send(OffsetFetchRequest(group, topicPartitions))
-      val offsetFetchResponse = 
OffsetFetchResponse.readFrom(channel.receive().buffer)
+      val offsetFetchResponse = 
OffsetFetchResponse.readFrom(channel.receive().payload())
       debug("Received offset fetch response %s.".format(offsetFetchResponse))
 
       offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, 
offsetAndMetadata) =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/test/scala/other/kafka/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/TestOffsetManager.scala 
b/core/src/test/scala/other/kafka/TestOffsetManager.scala
index 4e90534..8047da4 100644
--- a/core/src/test/scala/other/kafka/TestOffsetManager.scala
+++ b/core/src/test/scala/other/kafka/TestOffsetManager.scala
@@ -72,7 +72,7 @@ object TestOffsetManager {
         offsetsChannel.send(commitRequest)
         numCommits.getAndIncrement
         commitTimer.time {
-          val response = 
OffsetCommitResponse.readFrom(offsetsChannel.receive().buffer)
+          val response = 
OffsetCommitResponse.readFrom(offsetsChannel.receive().payload())
           if (response.commitStatus.exists(_._2 != ErrorMapping.NoError)) 
numErrors.getAndIncrement
         }
         offset += 1
@@ -119,7 +119,7 @@ object TestOffsetManager {
       val group = "group-" + id
       try {
         metadataChannel.send(ConsumerMetadataRequest(group))
-        val coordinatorId = 
ConsumerMetadataResponse.readFrom(metadataChannel.receive().buffer).coordinatorOpt.map(_.id).getOrElse(-1)
+        val coordinatorId = 
ConsumerMetadataResponse.readFrom(metadataChannel.receive().payload()).coordinatorOpt.map(_.id).getOrElse(-1)
 
         val channel = if (channels.contains(coordinatorId))
           channels(coordinatorId)
@@ -135,7 +135,7 @@ object TestOffsetManager {
           channel.send(fetchRequest)
 
           fetchTimer.time {
-            val response = 
OffsetFetchResponse.readFrom(channel.receive().buffer)
+            val response = 
OffsetFetchResponse.readFrom(channel.receive().payload())
             if (response.requestInfo.exists(_._2.error != 
ErrorMapping.NoError)) {
               numErrors.getAndIncrement
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
index bc4aef3..4cb92de 100644
--- a/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/KafkaConfigTest.scala
@@ -20,6 +20,7 @@ import java.io.{FileOutputStream, File}
 import java.security.Permission
 
 import kafka.Kafka
+import kafka.server.KafkaConfig
 import org.junit.{After, Before, Test}
 import junit.framework.Assert._
 
@@ -57,20 +58,20 @@ class KafkaTest {
     val propertiesFile = prepareDefaultConfig()
 
     // We should load configuration file without any arguments
-    val config1 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile))
+    val config1 = 
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
     assertEquals(1, config1.brokerId)
 
     // We should be able to override given property on command line
-    val config2 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, 
"--override", "broker.id=2"))
+    val config2 = 
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, 
"--override", "broker.id=2")))
     assertEquals(2, config2.brokerId)
 
     // We should be also able to set completely new property
-    val config3 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, 
"--override", "log.cleanup.policy=compact"))
+    val config3 = 
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, 
"--override", "log.cleanup.policy=compact")))
     assertEquals(1, config3.brokerId)
     assertEquals("compact", config3.logCleanupPolicy)
 
     // We should be also able to set several properties
-    val config4 = Kafka.getKafkaConfigFromArgs(Array(propertiesFile, 
"--override", "log.cleanup.policy=compact", "--override", "broker.id=2"))
+    val config4 = 
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, 
"--override", "log.cleanup.policy=compact", "--override", "broker.id=2")))
     assertEquals(2, config4.brokerId)
     assertEquals("compact", config4.logCleanupPolicy)
   }
@@ -78,25 +79,25 @@ class KafkaTest {
   @Test(expected = classOf[ExitCalled])
   def testGetKafkaConfigFromArgsWrongSetValue(): Unit = {
     val propertiesFile = prepareDefaultConfig()
-    Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", "a=b=c"))
+    KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, 
"--override", "a=b=c")))
   }
 
   @Test(expected = classOf[ExitCalled])
   def testGetKafkaConfigFromArgsNonArgsAtTheEnd(): Unit = {
     val propertiesFile = prepareDefaultConfig()
-    Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "--override", 
"broker.id=1", "broker.id=2"))
+    KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, 
"--override", "broker.id=1", "broker.id=2")))
   }
 
   @Test(expected = classOf[ExitCalled])
   def testGetKafkaConfigFromArgsNonArgsOnly(): Unit = {
     val propertiesFile = prepareDefaultConfig()
-    Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "broker.id=1", 
"broker.id=2"))
+    KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, 
"broker.id=1", "broker.id=2")))
   }
 
   @Test(expected = classOf[ExitCalled])
   def testGetKafkaConfigFromArgsNonArgsAtTheBegging(): Unit = {
     val propertiesFile = prepareDefaultConfig()
-    Kafka.getKafkaConfigFromArgs(Array(propertiesFile, "broker.id=1", 
"--override", "broker.id=2"))
+    KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, 
"broker.id=1", "--override", "broker.id=2")))
   }
 
   def prepareDefaultConfig(): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 95d5621..7dc2fad 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -20,7 +20,10 @@ package kafka.network;
 import java.net._
 import java.io._
 import kafka.cluster.EndPoint
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.NetworkSend
 import org.apache.kafka.common.protocol.SecurityProtocol
+import org.apache.kafka.common.utils.SystemTime
 import org.junit._
 import org.scalatest.junit.JUnitSuite
 import java.util.Random
@@ -46,7 +49,9 @@ class SocketServerTest extends JUnitSuite {
                                               maxRequestSize = 50,
                                               maxConnectionsPerIp = 5,
                                               connectionsMaxIdleMs = 60*1000,
-                                              maxConnectionsPerIpOverrides = 
Map.empty[String,Int])
+                                              maxConnectionsPerIpOverrides = 
Map.empty[String,Int],
+                                              new SystemTime(),
+                                              new Metrics())
   server.startup()
 
   def sendRequest(socket: Socket, id: Short, request: Array[Byte]) {
@@ -71,7 +76,7 @@ class SocketServerTest extends JUnitSuite {
     val byteBuffer = ByteBuffer.allocate(request.requestObj.sizeInBytes)
     request.requestObj.writeTo(byteBuffer)
     byteBuffer.rewind()
-    val send = new BoundedByteBufferSend(byteBuffer)
+    val send = new NetworkSend(request.connectionId, byteBuffer)
     channel.sendResponse(new RequestChannel.Response(request.processor, 
request, send))
   }
 
@@ -112,33 +117,17 @@ class SocketServerTest extends JUnitSuite {
     assertEquals(serializedBytes.toSeq, receiveResponse(traceSocket).toSeq)
   }
 
-  @Test(expected = classOf[IOException])
+  @Test
   def tooBigRequestIsRejected() {
     val tooManyBytes = new Array[Byte](server.maxRequestSize + 1)
     new Random().nextBytes(tooManyBytes)
     val socket = connect()
     sendRequest(socket, 0, tooManyBytes)
-    receiveResponse(socket)
-  }
-
-  @Test
-  def testNullResponse() {
-    val socket = connect()
-    val bytes = new Array[Byte](40)
-    sendRequest(socket, 0, bytes)
-
-    val request = server.requestChannel.receiveRequest
-    // Since the response is not sent yet, the selection key should not be 
readable.
-    TestUtils.waitUntilTrue(
-      () => { (request.requestKey.asInstanceOf[SelectionKey].interestOps & 
SelectionKey.OP_READ) != SelectionKey.OP_READ },
-      "Socket key shouldn't be available for read")
-
-    server.requestChannel.sendResponse(new RequestChannel.Response(0, request, 
null))
-
-    // After the response is sent to the client (which is async and may take a 
bit of time), the socket key should be available for reads.
-    TestUtils.waitUntilTrue(
-      () => { (request.requestKey.asInstanceOf[SelectionKey].interestOps & 
SelectionKey.OP_READ) == SelectionKey.OP_READ },
-      "Socket key should be available for reads")
+    try {
+      receiveResponse(socket)
+    } catch {
+      case e: IOException => // thats fine
+    }
   }
 
   @Test
@@ -198,7 +187,9 @@ class SocketServerTest extends JUnitSuite {
                                                 maxRequestSize = 50,
                                                 maxConnectionsPerIp = 5,
                                                 connectionsMaxIdleMs = 60*1000,
-                                                maxConnectionsPerIpOverrides = 
overrides)
+                                                maxConnectionsPerIpOverrides = 
overrides,
+                                                new SystemTime(),
+                                                new Metrics())
     overrideServer.startup()
     // make the maximum allowable number of connections and then leak them
     val conns = ((0 until overrideNum).map(i => connect(overrideServer)))

http://git-wip-us.apache.org/repos/asf/kafka/blob/78ba492e/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
index 71f48c0..ace6321 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala
@@ -245,6 +245,10 @@ class KafkaConfigConfigDefTest extends JUnit3Suite {
         //BrokerCompressionCodec.isValid(compressionType)
         case KafkaConfig.CompressionTypeProp => expected.setProperty(name, 
randFrom(BrokerCompressionCodec.brokerCompressionOptions))
 
+        case KafkaConfig.MetricNumSamplesProp => expected.setProperty(name, 
"2")
+        case KafkaConfig.MetricSampleWindowMsProp => 
expected.setProperty(name, "1000")
+        case KafkaConfig.MetricReporterClassesProp => 
expected.setProperty(name, "")
+
         case nonNegativeIntProperty => expected.setProperty(name, 
nextInt(Int.MaxValue).toString)
       }
     })
@@ -348,6 +352,10 @@ class KafkaConfigConfigDefTest extends JUnit3Suite {
 
         case KafkaConfig.DeleteTopicEnableProp => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
 
+        case KafkaConfig.MetricNumSamplesProp => 
assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
+        case KafkaConfig.MetricSampleWindowMsProp => 
assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
+        case KafkaConfig.MetricReporterClassesProp => // ignore string
+
         case nonNegativeIntProperty => 
assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
       }
     })

Reply via email to