[SPARK-3019] Pluggable block transfer interface (BlockTransferService)

This pull request creates a new BlockTransferService interface for block 
fetch/upload and refactors the existing ConnectionManager to implement 
BlockTransferService (NioBlockTransferService).

Most of the changes are simply moving code around. The main class to inspect is 
ShuffleBlockFetcherIterator.

Review guide:
- Most of the ConnectionManager code is now in network.cm package
- ManagedBuffer is a new buffer abstraction backed by several different 
implementations (file segment, nio ByteBuffer, Netty ByteBuf)
- BlockTransferService is the main internal interface introduced in this PR
- NioBlockTransferService implements BlockTransferService and replaces the old 
BlockManagerWorker
- ShuffleBlockFetcherIterator replaces the told BlockFetcherIterator to use the 
new interface

TODOs that should be separate PRs:
- Implement NettyBlockTransferService
- Finalize the API/semantics for ManagedBuffer.release()

Author: Reynold Xin <[email protected]>

Closes #2240 from rxin/blockTransferService and squashes the following commits:

64cd9d7 [Reynold Xin] Merge branch 'master' into blockTransferService
1dfd3d7 [Reynold Xin] Limit the length of the FileInputStream.
1332156 [Reynold Xin] Fixed style violation from refactoring.
2960c93 [Reynold Xin] Added ShuffleBlockFetcherIteratorSuite.
e29c721 [Reynold Xin] Updated comment for ShuffleBlockFetcherIterator.
8a1046e [Reynold Xin] Code review feedback:
2c6b1e1 [Reynold Xin] Removed println in test cases.
2a907e4 [Reynold Xin] Merge branch 'master' into blockTransferService-merge
07ccf0d [Reynold Xin] Added init check to CMBlockTransferService.
98c668a [Reynold Xin] Added failure handling and fixed unit tests.
ae05fcd [Reynold Xin] Updated tests, although DistributedSuite is hanging.
d8d595c [Reynold Xin] Merge branch 'master' of github.com:apache/spark into 
blockTransferService
9ef279c [Reynold Xin] Initial refactoring to move ConnectionManager to use the 
BlockTransferService.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08ce1888
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08ce1888
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08ce1888

Branch: refs/heads/master
Commit: 08ce18881e09c6e91db9c410d1d9ce1e5ae63a62
Parents: 939a322
Author: Reynold Xin <[email protected]>
Authored: Mon Sep 8 15:59:20 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Mon Sep 8 15:59:20 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkEnv.scala  |   15 +-
 .../apache/spark/network/BlockDataManager.scala |   36 +
 .../spark/network/BlockFetchingListener.scala   |   37 +
 .../spark/network/BlockTransferService.scala    |  131 +++
 .../apache/spark/network/BufferMessage.scala    |  113 --
 .../org/apache/spark/network/Connection.scala   |  587 ----------
 .../org/apache/spark/network/ConnectionId.scala |   34 -
 .../spark/network/ConnectionManager.scala       | 1047 ------------------
 .../spark/network/ConnectionManagerId.scala     |   37 -
 .../spark/network/ConnectionManagerTest.scala   |  103 --
 .../apache/spark/network/ManagedBuffer.scala    |  107 ++
 .../org/apache/spark/network/Message.scala      |   95 --
 .../org/apache/spark/network/MessageChunk.scala |   41 -
 .../spark/network/MessageChunkHeader.scala      |   82 --
 .../org/apache/spark/network/ReceiverTest.scala |   37 -
 .../apache/spark/network/SecurityMessage.scala  |  162 ---
 .../org/apache/spark/network/SenderTest.scala   |   76 --
 .../apache/spark/network/nio/BlockMessage.scala |  197 ++++
 .../spark/network/nio/BlockMessageArray.scala   |  160 +++
 .../spark/network/nio/BufferMessage.scala       |  114 ++
 .../apache/spark/network/nio/Connection.scala   |  587 ++++++++++
 .../apache/spark/network/nio/ConnectionId.scala |   34 +
 .../spark/network/nio/ConnectionManager.scala   | 1042 +++++++++++++++++
 .../spark/network/nio/ConnectionManagerId.scala |   37 +
 .../org/apache/spark/network/nio/Message.scala  |   96 ++
 .../apache/spark/network/nio/MessageChunk.scala |   41 +
 .../spark/network/nio/MessageChunkHeader.scala  |   81 ++
 .../network/nio/NioBlockTransferService.scala   |  205 ++++
 .../spark/network/nio/SecurityMessage.scala     |  160 +++
 .../spark/serializer/KryoSerializer.scala       |    2 +-
 .../spark/shuffle/FileShuffleBlockManager.scala |   35 +-
 .../shuffle/IndexShuffleBlockManager.scala      |   24 +-
 .../spark/shuffle/ShuffleBlockManager.scala     |    6 +-
 .../shuffle/hash/BlockStoreShuffleFetcher.scala |   14 +-
 .../spark/shuffle/hash/HashShuffleReader.scala  |    4 +-
 .../spark/storage/BlockFetcherIterator.scala    |  254 -----
 .../org/apache/spark/storage/BlockManager.scala |   98 +-
 .../apache/spark/storage/BlockManagerId.scala   |    4 +-
 .../spark/storage/BlockManagerWorker.scala      |  147 ---
 .../org/apache/spark/storage/BlockMessage.scala |  209 ----
 .../spark/storage/BlockMessageArray.scala       |  160 ---
 .../storage/ShuffleBlockFetcherIterator.scala   |  271 +++++
 .../apache/spark/storage/ThreadingTest.scala    |  120 --
 .../org/apache/spark/DistributedSuite.scala     |   15 +-
 .../spark/network/ConnectionManagerSuite.scala  |  301 -----
 .../network/nio/ConnectionManagerSuite.scala    |  296 +++++
 .../shuffle/hash/HashShuffleManagerSuite.scala  |   17 +-
 .../storage/BlockFetcherIteratorSuite.scala     |  237 ----
 .../spark/storage/BlockManagerSuite.scala       |  133 +--
 .../spark/storage/DiskBlockManagerSuite.scala   |    2 +-
 .../ShuffleBlockFetcherIteratorSuite.scala      |  183 +++
 51 files changed, 3941 insertions(+), 4085 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 20a7444..dd95e40 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -31,7 +31,8 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.api.python.PythonWorkerFactory
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.network.ConnectionManager
+import org.apache.spark.network.BlockTransferService
+import org.apache.spark.network.nio.NioBlockTransferService
 import org.apache.spark.scheduler.LiveListenerBus
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.{ShuffleMemoryManager, ShuffleManager}
@@ -59,8 +60,8 @@ class SparkEnv (
     val mapOutputTracker: MapOutputTracker,
     val shuffleManager: ShuffleManager,
     val broadcastManager: BroadcastManager,
+    val blockTransferService: BlockTransferService,
     val blockManager: BlockManager,
-    val connectionManager: ConnectionManager,
     val securityManager: SecurityManager,
     val httpFileServer: HttpFileServer,
     val sparkFilesDir: String,
@@ -88,6 +89,8 @@ class SparkEnv (
     // down, but let's call it anyway in case it gets fixed in a later release
     // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we 
can't call it.
     // actorSystem.awaitTermination()
+
+    // Note that blockTransferService is stopped by BlockManager since it is 
started by it.
   }
 
   private[spark]
@@ -223,14 +226,14 @@ object SparkEnv extends Logging {
 
     val shuffleMemoryManager = new ShuffleMemoryManager(conf)
 
+    val blockTransferService = new NioBlockTransferService(conf, 
securityManager)
+
     val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
       "BlockManagerMaster",
       new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)
 
     val blockManager = new BlockManager(executorId, actorSystem, 
blockManagerMaster,
-      serializer, conf, securityManager, mapOutputTracker, shuffleManager)
-
-    val connectionManager = blockManager.connectionManager
+      serializer, conf, mapOutputTracker, shuffleManager, blockTransferService)
 
     val broadcastManager = new BroadcastManager(isDriver, conf, 
securityManager)
 
@@ -278,8 +281,8 @@ object SparkEnv extends Logging {
       mapOutputTracker,
       shuffleManager,
       broadcastManager,
+      blockTransferService,
       blockManager,
-      connectionManager,
       securityManager,
       httpFileServer,
       sparkFilesDir,

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala 
b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
new file mode 100644
index 0000000..e0e9172
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network
+
+import org.apache.spark.storage.StorageLevel
+
+
+trait BlockDataManager {
+
+  /**
+   * Interface to get local block data.
+   *
+   * @return Some(buffer) if the block exists locally, and None if it doesn't.
+   */
+  def getBlockData(blockId: String): Option[ManagedBuffer]
+
+  /**
+   * Put the block locally, using the given storage level.
+   */
+  def putBlockData(blockId: String, data: ManagedBuffer, level: StorageLevel): 
Unit
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala 
b/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala
new file mode 100644
index 0000000..34acaa5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network
+
+import java.util.EventListener
+
+
+/**
+ * Listener callback interface for [[BlockTransferService.fetchBlocks]].
+ */
+trait BlockFetchingListener extends EventListener {
+
+  /**
+   * Called once per successfully fetched block.
+   */
+  def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit
+
+  /**
+   * Called upon failures. For each failure, this is called only once (i.e. 
not once per block).
+   */
+  def onBlockFetchFailure(exception: Throwable): Unit
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala 
b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
new file mode 100644
index 0000000..84d991f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network
+
+import scala.concurrent.{Await, Future}
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.storage.StorageLevel
+
+
+abstract class BlockTransferService {
+
+  /**
+   * Initialize the transfer service by giving it the BlockDataManager that 
can be used to fetch
+   * local blocks or put local blocks.
+   */
+  def init(blockDataManager: BlockDataManager)
+
+  /**
+   * Tear down the transfer service.
+   */
+  def stop(): Unit
+
+  /**
+   * Port number the service is listening on, available only after [[init]] is 
invoked.
+   */
+  def port: Int
+
+  /**
+   * Host name the service is listening on, available only after [[init]] is 
invoked.
+   */
+  def hostName: String
+
+  /**
+   * Fetch a sequence of blocks from a remote node asynchronously,
+   * available only after [[init]] is invoked.
+   *
+   * Note that [[BlockFetchingListener.onBlockFetchSuccess]] is called once 
per block,
+   * while [[BlockFetchingListener.onBlockFetchFailure]] is called once per 
failure (not per block).
+   *
+   * Note that this API takes a sequence so the implementation can batch 
requests, and does not
+   * return a future so the underlying implementation can invoke 
onBlockFetchSuccess as soon as
+   * the data of a block is fetched, rather than waiting for all blocks to be 
fetched.
+   */
+  def fetchBlocks(
+      hostName: String,
+      port: Int,
+      blockIds: Seq[String],
+      listener: BlockFetchingListener): Unit
+
+  /**
+   * Upload a single block to a remote node, available only after [[init]] is 
invoked.
+   */
+  def uploadBlock(
+      hostname: String,
+      port: Int,
+      blockId: String,
+      blockData: ManagedBuffer,
+      level: StorageLevel): Future[Unit]
+
+  /**
+   * A special case of [[fetchBlocks]], as it fetches only one block and is 
blocking.
+   *
+   * It is also only available after [[init]] is invoked.
+   */
+  def fetchBlockSync(hostName: String, port: Int, blockId: String): 
ManagedBuffer = {
+    // A monitor for the thread to wait on.
+    val lock = new Object
+    @volatile var result: Either[ManagedBuffer, Throwable] = null
+    fetchBlocks(hostName, port, Seq(blockId), new BlockFetchingListener {
+      override def onBlockFetchFailure(exception: Throwable): Unit = {
+        lock.synchronized {
+          result = Right(exception)
+          lock.notify()
+        }
+      }
+      override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): 
Unit = {
+        lock.synchronized {
+          result = Left(data)
+          lock.notify()
+        }
+      }
+    })
+
+    // Sleep until result is no longer null
+    lock.synchronized {
+      while (result == null) {
+        try {
+          lock.wait()
+        } catch {
+          case e: InterruptedException =>
+        }
+      }
+    }
+
+    result match {
+      case Left(data) => data
+      case Right(e) => throw e
+    }
+  }
+
+  /**
+   * Upload a single block to a remote node, available only after [[init]] is 
invoked.
+   *
+   * This method is similar to [[uploadBlock]], except this one blocks the 
thread
+   * until the upload finishes.
+   */
+  def uploadBlockSync(
+      hostname: String,
+      port: Int,
+      blockId: String,
+      blockData: ManagedBuffer,
+      level: StorageLevel): Unit = {
+    Await.result(uploadBlock(hostname, port, blockId, blockData, level), 
Duration.Inf)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/network/BufferMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/BufferMessage.scala 
b/core/src/main/scala/org/apache/spark/network/BufferMessage.scala
deleted file mode 100644
index af35f1f..0000000
--- a/core/src/main/scala/org/apache/spark/network/BufferMessage.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network
-
-import java.nio.ByteBuffer
-
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.spark.storage.BlockManager
-
-private[spark]
-class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var 
ackId: Int)
-  extends Message(Message.BUFFER_MESSAGE, id_) {
-
-  val initialSize = currentSize()
-  var gotChunkForSendingOnce = false
-
-  def size = initialSize
-
-  def currentSize() = {
-    if (buffers == null || buffers.isEmpty) {
-      0
-    } else {
-      buffers.map(_.remaining).reduceLeft(_ + _)
-    }
-  }
-
-  def getChunkForSending(maxChunkSize: Int): Option[MessageChunk] = {
-    if (maxChunkSize <= 0) {
-      throw new Exception("Max chunk size is " + maxChunkSize)
-    }
-
-    val security = if (isSecurityNeg) 1 else 0
-    if (size == 0 && !gotChunkForSendingOnce) {
-      val newChunk = new MessageChunk(
-        new MessageChunkHeader(typ, id, 0, 0, ackId, hasError, security, 
senderAddress), null)
-      gotChunkForSendingOnce = true
-      return Some(newChunk)
-    }
-
-    while(!buffers.isEmpty) {
-      val buffer = buffers(0)
-      if (buffer.remaining == 0) {
-        BlockManager.dispose(buffer)
-        buffers -= buffer
-      } else {
-        val newBuffer = if (buffer.remaining <= maxChunkSize) {
-          buffer.duplicate()
-        } else {
-          buffer.slice().limit(maxChunkSize).asInstanceOf[ByteBuffer]
-        }
-        buffer.position(buffer.position + newBuffer.remaining)
-        val newChunk = new MessageChunk(new MessageChunkHeader(
-          typ, id, size, newBuffer.remaining, ackId,
-          hasError, security, senderAddress), newBuffer)
-        gotChunkForSendingOnce = true
-        return Some(newChunk)
-      }
-    }
-    None
-  }
-
-  def getChunkForReceiving(chunkSize: Int): Option[MessageChunk] = {
-    // STRONG ASSUMPTION: BufferMessage created when receiving data has ONLY 
ONE data buffer
-    if (buffers.size > 1) {
-      throw new Exception("Attempting to get chunk from message with multiple 
data buffers")
-    }
-    val buffer = buffers(0)
-    val security = if (isSecurityNeg) 1 else 0
-    if (buffer.remaining > 0) {
-      if (buffer.remaining < chunkSize) {
-        throw new Exception("Not enough space in data buffer for receiving 
chunk")
-      }
-      val newBuffer = buffer.slice().limit(chunkSize).asInstanceOf[ByteBuffer]
-      buffer.position(buffer.position + newBuffer.remaining)
-      val newChunk = new MessageChunk(new MessageChunkHeader(
-          typ, id, size, newBuffer.remaining, ackId, hasError, security, 
senderAddress), newBuffer)
-      return Some(newChunk)
-    }
-    None
-  }
-
-  def flip() {
-    buffers.foreach(_.flip)
-  }
-
-  def hasAckId() = (ackId != 0)
-
-  def isCompletelyReceived() = !buffers(0).hasRemaining
-
-  override def toString = {
-    if (hasAckId) {
-      "BufferAckMessage(aid = " + ackId + ", id = " + id + ", size = " + size 
+ ")"
-    } else {
-      "BufferMessage(id = " + id + ", size = " + size + ")"
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/network/Connection.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala 
b/core/src/main/scala/org/apache/spark/network/Connection.scala
deleted file mode 100644
index 5285ec8..0000000
--- a/core/src/main/scala/org/apache/spark/network/Connection.scala
+++ /dev/null
@@ -1,587 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network
-
-import java.net._
-import java.nio._
-import java.nio.channels._
-
-import scala.collection.mutable.{ArrayBuffer, HashMap, Queue}
-
-import org.apache.spark._
-
-private[spark]
-abstract class Connection(val channel: SocketChannel, val selector: Selector,
-    val socketRemoteConnectionManagerId: ConnectionManagerId, val 
connectionId: ConnectionId)
-  extends Logging {
-
-  var sparkSaslServer: SparkSaslServer = null
-  var sparkSaslClient: SparkSaslClient = null
-
-  def this(channel_ : SocketChannel, selector_ : Selector, id_ : ConnectionId) 
= {
-    this(channel_, selector_,
-      ConnectionManagerId.fromSocketAddress(
-        
channel_.socket.getRemoteSocketAddress.asInstanceOf[InetSocketAddress]), id_)
-  }
-
-  channel.configureBlocking(false)
-  channel.socket.setTcpNoDelay(true)
-  channel.socket.setReuseAddress(true)
-  channel.socket.setKeepAlive(true)
-  /* channel.socket.setReceiveBufferSize(32768) */
-
-  @volatile private var closed = false
-  var onCloseCallback: Connection => Unit = null
-  var onExceptionCallback: (Connection, Exception) => Unit = null
-  var onKeyInterestChangeCallback: (Connection, Int) => Unit = null
-
-  val remoteAddress = getRemoteAddress()
-
-  /**
-   * Used to synchronize client requests: client's work-related requests must
-   * wait until SASL authentication completes.
-   */
-  private val authenticated = new Object()
-
-  def getAuthenticated(): Object = authenticated
-
-  def isSaslComplete(): Boolean
-
-  def resetForceReregister(): Boolean
-
-  // Read channels typically do not register for write and write does not for 
read
-  // Now, we do have write registering for read too (temporarily), but this is 
to detect
-  // channel close NOT to actually read/consume data on it !
-  // How does this work if/when we move to SSL ?
-
-  // What is the interest to register with selector for when we want this 
connection to be selected
-  def registerInterest()
-
-  // What is the interest to register with selector for when we want this 
connection to
-  // be de-selected
-  // Traditionally, 0 - but in our case, for example, for close-detection on 
SendingConnection hack,
-  // it will be SelectionKey.OP_READ (until we fix it properly)
-  def unregisterInterest()
-
-  // On receiving a read event, should we change the interest for this channel 
or not ?
-  // Will be true for ReceivingConnection, false for SendingConnection.
-  def changeInterestForRead(): Boolean
-
-  private def disposeSasl() {
-    if (sparkSaslServer != null) {
-      sparkSaslServer.dispose()
-    }
-
-    if (sparkSaslClient != null) {
-      sparkSaslClient.dispose()
-    }
-  }
-
-  // On receiving a write event, should we change the interest for this 
channel or not ?
-  // Will be false for ReceivingConnection, true for SendingConnection.
-  // Actually, for now, should not get triggered for ReceivingConnection
-  def changeInterestForWrite(): Boolean
-
-  def getRemoteConnectionManagerId(): ConnectionManagerId = {
-    socketRemoteConnectionManagerId
-  }
-
-  def key() = channel.keyFor(selector)
-
-  def getRemoteAddress() = 
channel.socket.getRemoteSocketAddress().asInstanceOf[InetSocketAddress]
-
-  // Returns whether we have to register for further reads or not.
-  def read(): Boolean = {
-    throw new UnsupportedOperationException(
-      "Cannot read on connection of type " + this.getClass.toString)
-  }
-
-  // Returns whether we have to register for further writes or not.
-  def write(): Boolean = {
-    throw new UnsupportedOperationException(
-      "Cannot write on connection of type " + this.getClass.toString)
-  }
-
-  def close() {
-    closed = true
-    val k = key()
-    if (k != null) {
-      k.cancel()
-    }
-    channel.close()
-    disposeSasl()
-    callOnCloseCallback()
-  }
-
-  protected def isClosed: Boolean = closed
-
-  def onClose(callback: Connection => Unit) {
-    onCloseCallback = callback
-  }
-
-  def onException(callback: (Connection, Exception) => Unit) {
-    onExceptionCallback = callback
-  }
-
-  def onKeyInterestChange(callback: (Connection, Int) => Unit) {
-    onKeyInterestChangeCallback = callback
-  }
-
-  def callOnExceptionCallback(e: Exception) {
-    if (onExceptionCallback != null) {
-      onExceptionCallback(this, e)
-    } else {
-      logError("Error in connection to " + getRemoteConnectionManagerId() +
-        " and OnExceptionCallback not registered", e)
-    }
-  }
-
-  def callOnCloseCallback() {
-    if (onCloseCallback != null) {
-      onCloseCallback(this)
-    } else {
-      logWarning("Connection to " + getRemoteConnectionManagerId() +
-        " closed and OnExceptionCallback not registered")
-    }
-
-  }
-
-  def changeConnectionKeyInterest(ops: Int) {
-    if (onKeyInterestChangeCallback != null) {
-      onKeyInterestChangeCallback(this, ops)
-    } else {
-      throw new Exception("OnKeyInterestChangeCallback not registered")
-    }
-  }
-
-  def printRemainingBuffer(buffer: ByteBuffer) {
-    val bytes = new Array[Byte](buffer.remaining)
-    val curPosition = buffer.position
-    buffer.get(bytes)
-    bytes.foreach(x => print(x + " "))
-    buffer.position(curPosition)
-    print(" (" + bytes.size + ")")
-  }
-
-  def printBuffer(buffer: ByteBuffer, position: Int, length: Int) {
-    val bytes = new Array[Byte](length)
-    val curPosition = buffer.position
-    buffer.position(position)
-    buffer.get(bytes)
-    bytes.foreach(x => print(x + " "))
-    print(" (" + position + ", " + length + ")")
-    buffer.position(curPosition)
-  }
-}
-
-
-private[spark]
-class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
-    remoteId_ : ConnectionManagerId, id_ : ConnectionId)
-  extends Connection(SocketChannel.open, selector_, remoteId_, id_) {
-
-  def isSaslComplete(): Boolean = {
-    if (sparkSaslClient != null) sparkSaslClient.isComplete() else false
-  }
-
-  private class Outbox {
-    val messages = new Queue[Message]()
-    val defaultChunkSize = 65536
-    var nextMessageToBeUsed = 0
-
-    def addMessage(message: Message) {
-      messages.synchronized {
-        /* messages += message */
-        messages.enqueue(message)
-        logDebug("Added [" + message + "] to outbox for sending to " +
-          "[" + getRemoteConnectionManagerId() + "]")
-      }
-    }
-
-    def getChunk(): Option[MessageChunk] = {
-      messages.synchronized {
-        while (!messages.isEmpty) {
-          /* nextMessageToBeUsed = nextMessageToBeUsed % messages.size */
-          /* val message = messages(nextMessageToBeUsed) */
-          val message = messages.dequeue()
-          val chunk = message.getChunkForSending(defaultChunkSize)
-          if (chunk.isDefined) {
-            messages.enqueue(message)
-            nextMessageToBeUsed = nextMessageToBeUsed + 1
-            if (!message.started) {
-              logDebug(
-                "Starting to send [" + message + "] to [" + 
getRemoteConnectionManagerId() + "]")
-              message.started = true
-              message.startTime = System.currentTimeMillis
-            }
-            logTrace(
-              "Sending chunk from [" + message + "] to [" + 
getRemoteConnectionManagerId() + "]")
-            return chunk
-          } else {
-            message.finishTime = System.currentTimeMillis
-            logDebug("Finished sending [" + message + "] to [" + 
getRemoteConnectionManagerId() +
-              "] in "  + message.timeTaken )
-          }
-        }
-      }
-      None
-    }
-  }
-
-  // outbox is used as a lock - ensure that it is always used as a leaf (since 
methods which
-  // lock it are invoked in context of other locks)
-  private val outbox = new Outbox()
-  /*
-    This is orthogonal to whether we have pending bytes to write or not - and 
satisfies a slightly
-    different purpose. This flag is to see if we need to force reregister for 
write even when we
-    do not have any pending bytes to write to socket.
-    This can happen due to a race between adding pending buffers, and checking 
for existing of
-    data as detailed in https://github.com/mesos/spark/pull/791
-   */
-  private var needForceReregister = false
-
-  val currentBuffers = new ArrayBuffer[ByteBuffer]()
-
-  /* channel.socket.setSendBufferSize(256 * 1024) */
-
-  override def getRemoteAddress() = address
-
-  val DEFAULT_INTEREST = SelectionKey.OP_READ
-
-  override def registerInterest() {
-    // Registering read too - does not really help in most cases, but for some
-    // it does - so let us keep it for now.
-    changeConnectionKeyInterest(SelectionKey.OP_WRITE | DEFAULT_INTEREST)
-  }
-
-  override def unregisterInterest() {
-    changeConnectionKeyInterest(DEFAULT_INTEREST)
-  }
-
-  def send(message: Message) {
-    outbox.synchronized {
-      outbox.addMessage(message)
-      needForceReregister = true
-    }
-    if (channel.isConnected) {
-      registerInterest()
-    }
-  }
-
-  // return previous value after resetting it.
-  def resetForceReregister(): Boolean = {
-    outbox.synchronized {
-      val result = needForceReregister
-      needForceReregister = false
-      result
-    }
-  }
-
-  // MUST be called within the selector loop
-  def connect() {
-    try{
-      channel.register(selector, SelectionKey.OP_CONNECT)
-      channel.connect(address)
-      logInfo("Initiating connection to [" + address + "]")
-    } catch {
-      case e: Exception => {
-        logError("Error connecting to " + address, e)
-        callOnExceptionCallback(e)
-      }
-    }
-  }
-
-  def finishConnect(force: Boolean): Boolean = {
-    try {
-      // Typically, this should finish immediately since it was triggered by a 
connect
-      // selection - though need not necessarily always complete successfully.
-      val connected = channel.finishConnect
-      if (!force && !connected) {
-        logInfo(
-          "finish connect failed [" + address + "], " + outbox.messages.size + 
" messages pending")
-        return false
-      }
-
-      // Fallback to previous behavior - assume finishConnect completed
-      // This will happen only when finishConnect failed for some repeated 
number of times
-      // (10 or so)
-      // Is highly unlikely unless there was an unclean close of socket, etc
-      registerInterest()
-      logInfo("Connected to [" + address + "], " + outbox.messages.size + " 
messages pending")
-    } catch {
-      case e: Exception => {
-        logWarning("Error finishing connection to " + address, e)
-        callOnExceptionCallback(e)
-      }
-    }
-    true
-  }
-
-  override def write(): Boolean = {
-    try {
-      while (true) {
-        if (currentBuffers.size == 0) {
-          outbox.synchronized {
-            outbox.getChunk() match {
-              case Some(chunk) => {
-                val buffers = chunk.buffers
-                // If we have 'seen' pending messages, then reset flag - since 
we handle that as
-                // normal registering of event (below)
-                if (needForceReregister && buffers.exists(_.remaining() > 0)) 
resetForceReregister()
-
-                currentBuffers ++= buffers
-              }
-              case None => {
-                // changeConnectionKeyInterest(0)
-                /* key.interestOps(0) */
-                return false
-              }
-            }
-          }
-        }
-
-        if (currentBuffers.size > 0) {
-          val buffer = currentBuffers(0)
-          val remainingBytes = buffer.remaining
-          val writtenBytes = channel.write(buffer)
-          if (buffer.remaining == 0) {
-            currentBuffers -= buffer
-          }
-          if (writtenBytes < remainingBytes) {
-            // re-register for write.
-            return true
-          }
-        }
-      }
-    } catch {
-      case e: Exception => {
-        logWarning("Error writing in connection to " + 
getRemoteConnectionManagerId(), e)
-        callOnExceptionCallback(e)
-        close()
-        return false
-      }
-    }
-    // should not happen - to keep scala compiler happy
-    true
-  }
-
-  // This is a hack to determine if remote socket was closed or not.
-  // SendingConnection DOES NOT expect to receive any data - if it does, it is 
an error
-  // For a bunch of cases, read will return -1 in case remote socket is closed 
: hence we
-  // register for reads to determine that.
-  override def read(): Boolean = {
-    // We don't expect the other side to send anything; so, we just read to 
detect an error or EOF.
-    try {
-      val length = channel.read(ByteBuffer.allocate(1))
-      if (length == -1) { // EOF
-        close()
-      } else if (length > 0) {
-        logWarning(
-          "Unexpected data read from SendingConnection to " + 
getRemoteConnectionManagerId())
-      }
-    } catch {
-      case e: Exception =>
-        logError("Exception while reading SendingConnection to " + 
getRemoteConnectionManagerId(),
-          e)
-        callOnExceptionCallback(e)
-        close()
-    }
-
-    false
-  }
-
-  override def changeInterestForRead(): Boolean = false
-
-  override def changeInterestForWrite(): Boolean = ! isClosed
-}
-
-
-// Must be created within selector loop - else deadlock
-private[spark] class ReceivingConnection(
-    channel_ : SocketChannel,
-    selector_ : Selector,
-    id_ : ConnectionId)
-    extends Connection(channel_, selector_, id_) {
-
-  def isSaslComplete(): Boolean = {
-    if (sparkSaslServer != null) sparkSaslServer.isComplete() else false
-  }
-
-  class Inbox() {
-    val messages = new HashMap[Int, BufferMessage]()
-
-    def getChunk(header: MessageChunkHeader): Option[MessageChunk] = {
-
-      def createNewMessage: BufferMessage = {
-        val newMessage = Message.create(header).asInstanceOf[BufferMessage]
-        newMessage.started = true
-        newMessage.startTime = System.currentTimeMillis
-        newMessage.isSecurityNeg = header.securityNeg == 1
-        logDebug(
-          "Starting to receive [" + newMessage + "] from [" + 
getRemoteConnectionManagerId() + "]")
-        messages += ((newMessage.id, newMessage))
-        newMessage
-      }
-
-      val message = messages.getOrElseUpdate(header.id, createNewMessage)
-      logTrace(
-        "Receiving chunk of [" + message + "] from [" + 
getRemoteConnectionManagerId() + "]")
-      message.getChunkForReceiving(header.chunkSize)
-    }
-
-    def getMessageForChunk(chunk: MessageChunk): Option[BufferMessage] = {
-      messages.get(chunk.header.id)
-    }
-
-    def removeMessage(message: Message) {
-      messages -= message.id
-    }
-  }
-
-  @volatile private var inferredRemoteManagerId: ConnectionManagerId = null
-
-  override def getRemoteConnectionManagerId(): ConnectionManagerId = {
-    val currId = inferredRemoteManagerId
-    if (currId != null) currId else super.getRemoteConnectionManagerId()
-  }
-
-  // The reciever's remote address is the local socket on remote side : which 
is NOT
-  // the connection manager id of the receiver.
-  // We infer that from the messages we receive on the receiver socket.
-  private def processConnectionManagerId(header: MessageChunkHeader) {
-    val currId = inferredRemoteManagerId
-    if (header.address == null || currId != null) return
-
-    val managerId = ConnectionManagerId.fromSocketAddress(header.address)
-
-    if (managerId != null) {
-      inferredRemoteManagerId = managerId
-    }
-  }
-
-
-  val inbox = new Inbox()
-  val headerBuffer: ByteBuffer = 
ByteBuffer.allocate(MessageChunkHeader.HEADER_SIZE)
-  var onReceiveCallback: (Connection, Message) => Unit = null
-  var currentChunk: MessageChunk = null
-
-  channel.register(selector, SelectionKey.OP_READ)
-
-  override def read(): Boolean = {
-    try {
-      while (true) {
-        if (currentChunk == null) {
-          val headerBytesRead = channel.read(headerBuffer)
-          if (headerBytesRead == -1) {
-            close()
-            return false
-          }
-          if (headerBuffer.remaining > 0) {
-            // re-register for read event ...
-            return true
-          }
-          headerBuffer.flip
-          if (headerBuffer.remaining != MessageChunkHeader.HEADER_SIZE) {
-            throw new Exception(
-              "Unexpected number of bytes (" + headerBuffer.remaining + ") in 
the header")
-          }
-          val header = MessageChunkHeader.create(headerBuffer)
-          headerBuffer.clear()
-
-          processConnectionManagerId(header)
-
-          header.typ match {
-            case Message.BUFFER_MESSAGE => {
-              if (header.totalSize == 0) {
-                if (onReceiveCallback != null) {
-                  onReceiveCallback(this, Message.create(header))
-                }
-                currentChunk = null
-                // re-register for read event ...
-                return true
-              } else {
-                currentChunk = inbox.getChunk(header).orNull
-              }
-            }
-            case _ => throw new Exception("Message of unknown type received")
-          }
-        }
-
-        if (currentChunk == null) throw new Exception("No message chunk to 
receive data")
-
-        val bytesRead = channel.read(currentChunk.buffer)
-        if (bytesRead == 0) {
-          // re-register for read event ...
-          return true
-        } else if (bytesRead == -1) {
-          close()
-          return false
-        }
-
-        /* logDebug("Read " + bytesRead + " bytes for the buffer") */
-
-        if (currentChunk.buffer.remaining == 0) {
-          /* println("Filled buffer at " + System.currentTimeMillis) */
-          val bufferMessage = inbox.getMessageForChunk(currentChunk).get
-          if (bufferMessage.isCompletelyReceived) {
-            bufferMessage.flip()
-            bufferMessage.finishTime = System.currentTimeMillis
-            logDebug("Finished receiving [" + bufferMessage + "] from " +
-              "[" + getRemoteConnectionManagerId() + "] in " + 
bufferMessage.timeTaken)
-            if (onReceiveCallback != null) {
-              onReceiveCallback(this, bufferMessage)
-            }
-            inbox.removeMessage(bufferMessage)
-          }
-          currentChunk = null
-        }
-      }
-    } catch {
-      case e: Exception => {
-        logWarning("Error reading from connection to " + 
getRemoteConnectionManagerId(), e)
-        callOnExceptionCallback(e)
-        close()
-        return false
-      }
-    }
-    // should not happen - to keep scala compiler happy
-    true
-  }
-
-  def onReceive(callback: (Connection, Message) => Unit) {onReceiveCallback = 
callback}
-
-  // override def changeInterestForRead(): Boolean = ! isClosed
-  override def changeInterestForRead(): Boolean = true
-
-  override def changeInterestForWrite(): Boolean = {
-    throw new IllegalStateException("Unexpected invocation right now")
-  }
-
-  override def registerInterest() {
-    // Registering read too - does not really help in most cases, but for some
-    // it does - so let us keep it for now.
-    changeConnectionKeyInterest(SelectionKey.OP_READ)
-  }
-
-  override def unregisterInterest() {
-    changeConnectionKeyInterest(0)
-  }
-
-  // For read conn, always false.
-  override def resetForceReregister(): Boolean = false
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/network/ConnectionId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionId.scala 
b/core/src/main/scala/org/apache/spark/network/ConnectionId.scala
deleted file mode 100644
index d579c16..0000000
--- a/core/src/main/scala/org/apache/spark/network/ConnectionId.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network
-
-private[spark] case class ConnectionId(connectionManagerId: 
ConnectionManagerId, uniqId: Int) {
-  override def toString = connectionManagerId.host + "_" + 
connectionManagerId.port + "_" + uniqId
-}
-
-private[spark] object ConnectionId {
-
-  def createConnectionIdFromString(connectionIdString: String): ConnectionId = 
{
-    val res = connectionIdString.split("_").map(_.trim())
-    if (res.size != 3) {
-      throw new Exception("Error converting ConnectionId string: " + 
connectionIdString +
-        " to a ConnectionId Object")
-    }
-    new ConnectionId(new ConnectionManagerId(res(0), res(1).toInt), 
res(2).toInt)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala 
b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
deleted file mode 100644
index 578d806..0000000
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ /dev/null
@@ -1,1047 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network
-
-import java.io.IOException
-import java.nio._
-import java.nio.channels._
-import java.nio.channels.spi._
-import java.net._
-import java.util.{Timer, TimerTask}
-import java.util.concurrent.atomic.AtomicInteger
-
-import java.util.concurrent.{LinkedBlockingDeque, TimeUnit, ThreadPoolExecutor}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import scala.collection.mutable.SynchronizedMap
-import scala.collection.mutable.SynchronizedQueue
-
-import scala.concurrent.{Await, ExecutionContext, Future, Promise}
-import scala.concurrent.duration._
-import scala.language.postfixOps
-
-import org.apache.spark._
-import org.apache.spark.util.{SystemClock, Utils}
-
-private[spark] class ConnectionManager(
-    port: Int,
-    conf: SparkConf,
-    securityManager: SecurityManager,
-    name: String = "Connection manager")
-  extends Logging {
-
-  /**
-   * Used by sendMessageReliably to track messages being sent.
-   * @param message the message that was sent
-   * @param connectionManagerId the connection manager that sent this message
-   * @param completionHandler callback that's invoked when the send has 
completed or failed
-   */
-  class MessageStatus(
-      val message: Message,
-      val connectionManagerId: ConnectionManagerId,
-      completionHandler: MessageStatus => Unit) {
-
-    /** This is non-None if message has been ack'd */
-    var ackMessage: Option[Message] = None
-
-    def markDone(ackMessage: Option[Message]) {
-      this.ackMessage = ackMessage
-      completionHandler(this)
-    }
-  }
-
-  private val selector = SelectorProvider.provider.openSelector()
-  private val ackTimeoutMonitor = new Timer("AckTimeoutMonitor", true)
-
-  // default to 30 second timeout waiting for authentication
-  private val authTimeout = 
conf.getInt("spark.core.connection.auth.wait.timeout", 30)
-  private val ackTimeout = 
conf.getInt("spark.core.connection.ack.wait.timeout", 60)
-
-  private val handleMessageExecutor = new ThreadPoolExecutor(
-    conf.getInt("spark.core.connection.handler.threads.min", 20),
-    conf.getInt("spark.core.connection.handler.threads.max", 60),
-    conf.getInt("spark.core.connection.handler.threads.keepalive", 60), 
TimeUnit.SECONDS,
-    new LinkedBlockingDeque[Runnable](),
-    Utils.namedThreadFactory("handle-message-executor"))
-
-  private val handleReadWriteExecutor = new ThreadPoolExecutor(
-    conf.getInt("spark.core.connection.io.threads.min", 4),
-    conf.getInt("spark.core.connection.io.threads.max", 32),
-    conf.getInt("spark.core.connection.io.threads.keepalive", 60), 
TimeUnit.SECONDS,
-    new LinkedBlockingDeque[Runnable](),
-    Utils.namedThreadFactory("handle-read-write-executor"))
-
-  // Use a different, yet smaller, thread pool - infrequently used with very 
short lived tasks :
-  // which should be executed asap
-  private val handleConnectExecutor = new ThreadPoolExecutor(
-    conf.getInt("spark.core.connection.connect.threads.min", 1),
-    conf.getInt("spark.core.connection.connect.threads.max", 8),
-    conf.getInt("spark.core.connection.connect.threads.keepalive", 60), 
TimeUnit.SECONDS,
-    new LinkedBlockingDeque[Runnable](),
-    Utils.namedThreadFactory("handle-connect-executor"))
-
-  private val serverChannel = ServerSocketChannel.open()
-  // used to track the SendingConnections waiting to do SASL negotiation
-  private val connectionsAwaitingSasl = new HashMap[ConnectionId, 
SendingConnection]
-    with SynchronizedMap[ConnectionId, SendingConnection]
-  private val connectionsByKey =
-    new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, 
Connection]
-  private val connectionsById = new HashMap[ConnectionManagerId, 
SendingConnection]
-    with SynchronizedMap[ConnectionManagerId, SendingConnection]
-  private val messageStatuses = new HashMap[Int, MessageStatus]
-  private val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, 
Int)]
-  private val registerRequests = new SynchronizedQueue[SendingConnection]
-
-  implicit val futureExecContext = ExecutionContext.fromExecutor(
-    Utils.newDaemonCachedThreadPool("Connection manager future execution 
context"))
-
-  @volatile
-  private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => 
Option[Message] = null
-
-  private val authEnabled = securityManager.isAuthenticationEnabled()
-
-  serverChannel.configureBlocking(false)
-  serverChannel.socket.setReuseAddress(true)
-  serverChannel.socket.setReceiveBufferSize(256 * 1024)
-
-  private def startService(port: Int): (ServerSocketChannel, Int) = {
-    serverChannel.socket.bind(new InetSocketAddress(port))
-    (serverChannel, serverChannel.socket.getLocalPort)
-  }
-  Utils.startServiceOnPort[ServerSocketChannel](port, startService, name)
-  serverChannel.register(selector, SelectionKey.OP_ACCEPT)
-
-  val id = new ConnectionManagerId(Utils.localHostName, 
serverChannel.socket.getLocalPort)
-  logInfo("Bound socket to port " + serverChannel.socket.getLocalPort() + " 
with id = " + id)
-
-  // used in combination with the ConnectionManagerId to create unique 
Connection ids
-  // to be able to track asynchronous messages
-  private val idCount: AtomicInteger = new AtomicInteger(1)
-
-  private val selectorThread = new Thread("connection-manager-thread") {
-    override def run() = ConnectionManager.this.run()
-  }
-  selectorThread.setDaemon(true)
-  selectorThread.start()
-
-  private val writeRunnableStarted: HashSet[SelectionKey] = new 
HashSet[SelectionKey]()
-
-  private def triggerWrite(key: SelectionKey) {
-    val conn = connectionsByKey.getOrElse(key, null)
-    if (conn == null) return
-
-    writeRunnableStarted.synchronized {
-      // So that we do not trigger more write events while processing this one.
-      // The write method will re-register when done.
-      if (conn.changeInterestForWrite()) conn.unregisterInterest()
-      if (writeRunnableStarted.contains(key)) {
-        // key.interestOps(key.interestOps() & ~ SelectionKey.OP_WRITE)
-        return
-      }
-
-      writeRunnableStarted += key
-    }
-    handleReadWriteExecutor.execute(new Runnable {
-      override def run() {
-        var register: Boolean = false
-        try {
-          register = conn.write()
-        } finally {
-          writeRunnableStarted.synchronized {
-            writeRunnableStarted -= key
-            val needReregister = register || conn.resetForceReregister()
-            if (needReregister && conn.changeInterestForWrite()) {
-              conn.registerInterest()
-            }
-          }
-        }
-      }
-    } )
-  }
-
-  private val readRunnableStarted: HashSet[SelectionKey] = new 
HashSet[SelectionKey]()
-
-  private def triggerRead(key: SelectionKey) {
-    val conn = connectionsByKey.getOrElse(key, null)
-    if (conn == null) return
-
-    readRunnableStarted.synchronized {
-      // So that we do not trigger more read events while processing this one.
-      // The read method will re-register when done.
-      if (conn.changeInterestForRead())conn.unregisterInterest()
-      if (readRunnableStarted.contains(key)) {
-        return
-      }
-
-      readRunnableStarted += key
-    }
-    handleReadWriteExecutor.execute(new Runnable {
-      override def run() {
-        var register: Boolean = false
-        try {
-          register = conn.read()
-        } finally {
-          readRunnableStarted.synchronized {
-            readRunnableStarted -= key
-            if (register && conn.changeInterestForRead()) {
-              conn.registerInterest()
-            }
-          }
-        }
-      }
-    } )
-  }
-
-  private def triggerConnect(key: SelectionKey) {
-    val conn = connectionsByKey.getOrElse(key, 
null).asInstanceOf[SendingConnection]
-    if (conn == null) return
-
-    // prevent other events from being triggered
-    // Since we are still trying to connect, we do not need to do the 
additional steps in
-    // triggerWrite
-    conn.changeConnectionKeyInterest(0)
-
-    handleConnectExecutor.execute(new Runnable {
-      override def run() {
-
-        var tries: Int = 10
-        while (tries >= 0) {
-          if (conn.finishConnect(false)) return
-          // Sleep ?
-          Thread.sleep(1)
-          tries -= 1
-        }
-
-        // fallback to previous behavior : we should not really come here 
since this method was
-        // triggered since channel became connectable : but at times, the 
first finishConnect need
-        // not succeed : hence the loop to retry a few 'times'.
-        conn.finishConnect(true)
-      }
-    } )
-  }
-
-  // MUST be called within selector loop - else deadlock.
-  private def triggerForceCloseByException(key: SelectionKey, e: Exception) {
-    try {
-      key.interestOps(0)
-    } catch {
-      // ignore exceptions
-      case e: Exception => logDebug("Ignoring exception", e)
-    }
-
-    val conn = connectionsByKey.getOrElse(key, null)
-    if (conn == null) return
-
-    // Pushing to connect threadpool
-    handleConnectExecutor.execute(new Runnable {
-      override def run() {
-        try {
-          conn.callOnExceptionCallback(e)
-        } catch {
-          // ignore exceptions
-          case e: Exception => logDebug("Ignoring exception", e)
-        }
-        try {
-          conn.close()
-        } catch {
-          // ignore exceptions
-          case e: Exception => logDebug("Ignoring exception", e)
-        }
-      }
-    })
-  }
-
-
-  def run() {
-    try {
-      while(!selectorThread.isInterrupted) {
-        while (!registerRequests.isEmpty) {
-          val conn: SendingConnection = registerRequests.dequeue()
-          addListeners(conn)
-          conn.connect()
-          addConnection(conn)
-        }
-
-        while(!keyInterestChangeRequests.isEmpty) {
-          val (key, ops) = keyInterestChangeRequests.dequeue()
-
-          try {
-            if (key.isValid) {
-              val connection = connectionsByKey.getOrElse(key, null)
-              if (connection != null) {
-                val lastOps = key.interestOps()
-                key.interestOps(ops)
-
-                // hot loop - prevent materialization of string if trace not 
enabled.
-                if (isTraceEnabled()) {
-                  def intToOpStr(op: Int): String = {
-                    val opStrs = ArrayBuffer[String]()
-                    if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ"
-                    if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE"
-                    if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += 
"CONNECT"
-                    if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT"
-                    if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else 
" "
-                  }
-
-                  logTrace("Changed key for connection to [" +
-                    connection.getRemoteConnectionManagerId()  + "] changed 
from [" +
-                      intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
-                }
-              }
-            } else {
-              logInfo("Key not valid ? " + key)
-              throw new CancelledKeyException()
-            }
-          } catch {
-            case e: CancelledKeyException => {
-              logInfo("key already cancelled ? " + key, e)
-              triggerForceCloseByException(key, e)
-            }
-            case e: Exception => {
-              logError("Exception processing key " + key, e)
-              triggerForceCloseByException(key, e)
-            }
-          }
-        }
-
-        val selectedKeysCount =
-          try {
-            selector.select()
-          } catch {
-            // Explicitly only dealing with CancelledKeyException here since 
other exceptions
-            // should be dealt with differently.
-            case e: CancelledKeyException => {
-              // Some keys within the selectors list are invalid/closed. clear 
them.
-              val allKeys = selector.keys().iterator()
-
-              while (allKeys.hasNext) {
-                val key = allKeys.next()
-                try {
-                  if (! key.isValid) {
-                    logInfo("Key not valid ? " + key)
-                    throw new CancelledKeyException()
-                  }
-                } catch {
-                  case e: CancelledKeyException => {
-                    logInfo("key already cancelled ? " + key, e)
-                    triggerForceCloseByException(key, e)
-                  }
-                  case e: Exception => {
-                    logError("Exception processing key " + key, e)
-                    triggerForceCloseByException(key, e)
-                  }
-                }
-              }
-            }
-            0
-          }
-
-        if (selectedKeysCount == 0) {
-          logDebug("Selector selected " + selectedKeysCount + " of " + 
selector.keys.size +
-            " keys")
-        }
-        if (selectorThread.isInterrupted) {
-          logInfo("Selector thread was interrupted!")
-          return
-        }
-
-        if (0 != selectedKeysCount) {
-          val selectedKeys = selector.selectedKeys().iterator()
-          while (selectedKeys.hasNext) {
-            val key = selectedKeys.next
-            selectedKeys.remove()
-            try {
-              if (key.isValid) {
-                if (key.isAcceptable) {
-                  acceptConnection(key)
-                } else
-                if (key.isConnectable) {
-                  triggerConnect(key)
-                } else
-                if (key.isReadable) {
-                  triggerRead(key)
-                } else
-                if (key.isWritable) {
-                  triggerWrite(key)
-                }
-              } else {
-                logInfo("Key not valid ? " + key)
-                throw new CancelledKeyException()
-              }
-            } catch {
-              // weird, but we saw this happening - even though key.isValid 
was true,
-              // key.isAcceptable would throw CancelledKeyException.
-              case e: CancelledKeyException => {
-                logInfo("key already cancelled ? " + key, e)
-                triggerForceCloseByException(key, e)
-              }
-              case e: Exception => {
-                logError("Exception processing key " + key, e)
-                triggerForceCloseByException(key, e)
-              }
-            }
-          }
-        }
-      }
-    } catch {
-      case e: Exception => logError("Error in select loop", e)
-    }
-  }
-
-  def acceptConnection(key: SelectionKey) {
-    val serverChannel = key.channel.asInstanceOf[ServerSocketChannel]
-
-    var newChannel = serverChannel.accept()
-
-    // accept them all in a tight loop. non blocking accept with no 
processing, should be fine
-    while (newChannel != null) {
-      try {
-        val newConnectionId = new ConnectionId(id, 
idCount.getAndIncrement.intValue)
-        val newConnection = new ReceivingConnection(newChannel, selector, 
newConnectionId)
-        newConnection.onReceive(receiveMessage)
-        addListeners(newConnection)
-        addConnection(newConnection)
-        logInfo("Accepted connection from [" + newConnection.remoteAddress + 
"]")
-      } catch {
-        // might happen in case of issues with registering with selector
-        case e: Exception => logError("Error in accept loop", e)
-      }
-
-      newChannel = serverChannel.accept()
-    }
-  }
-
-  private def addListeners(connection: Connection) {
-    connection.onKeyInterestChange(changeConnectionKeyInterest)
-    connection.onException(handleConnectionError)
-    connection.onClose(removeConnection)
-  }
-
-  def addConnection(connection: Connection) {
-    connectionsByKey += ((connection.key, connection))
-  }
-
-  def removeConnection(connection: Connection) {
-    connectionsByKey -= connection.key
-
-    try {
-      connection match {
-        case sendingConnection: SendingConnection =>
-          val sendingConnectionManagerId = 
sendingConnection.getRemoteConnectionManagerId()
-          logInfo("Removing SendingConnection to " + 
sendingConnectionManagerId)
-
-          connectionsById -= sendingConnectionManagerId
-          connectionsAwaitingSasl -= connection.connectionId
-
-          messageStatuses.synchronized {
-            messageStatuses.values.filter(_.connectionManagerId == 
sendingConnectionManagerId)
-              .foreach(status => {
-                logInfo("Notifying " + status)
-                status.markDone(None)
-              })
-
-            messageStatuses.retain((i, status) => {
-              status.connectionManagerId != sendingConnectionManagerId
-            })
-          }
-        case receivingConnection: ReceivingConnection =>
-          val remoteConnectionManagerId = 
receivingConnection.getRemoteConnectionManagerId()
-          logInfo("Removing ReceivingConnection to " + 
remoteConnectionManagerId)
-
-          val sendingConnectionOpt = 
connectionsById.get(remoteConnectionManagerId)
-          if (!sendingConnectionOpt.isDefined) {
-            logError(s"Corresponding SendingConnection to 
${remoteConnectionManagerId} not found")
-            return
-          }
-
-          val sendingConnection = sendingConnectionOpt.get
-          connectionsById -= remoteConnectionManagerId
-          sendingConnection.close()
-
-          val sendingConnectionManagerId = 
sendingConnection.getRemoteConnectionManagerId()
-
-          assert(sendingConnectionManagerId == remoteConnectionManagerId)
-
-          messageStatuses.synchronized {
-            for (s <- messageStatuses.values
-                 if s.connectionManagerId == sendingConnectionManagerId) {
-              logInfo("Notifying " + s)
-              s.markDone(None)
-            }
-
-            messageStatuses.retain((i, status) => {
-              status.connectionManagerId != sendingConnectionManagerId
-            })
-          }
-        case _ => logError("Unsupported type of connection.")
-      }
-    } finally {
-      // So that the selection keys can be removed.
-      wakeupSelector()
-    }
-  }
-
-  def handleConnectionError(connection: Connection, e: Exception) {
-    logInfo("Handling connection error on connection to " +
-      connection.getRemoteConnectionManagerId())
-    removeConnection(connection)
-  }
-
-  def changeConnectionKeyInterest(connection: Connection, ops: Int) {
-    keyInterestChangeRequests += ((connection.key, ops))
-    // so that registerations happen !
-    wakeupSelector()
-  }
-
-  def receiveMessage(connection: Connection, message: Message) {
-    val connectionManagerId = 
ConnectionManagerId.fromSocketAddress(message.senderAddress)
-    logDebug("Received [" + message + "] from [" + connectionManagerId + "]")
-    val runnable = new Runnable() {
-      val creationTime = System.currentTimeMillis
-      def run() {
-        logDebug("Handler thread delay is " + (System.currentTimeMillis - 
creationTime) + " ms")
-        handleMessage(connectionManagerId, message, connection)
-        logDebug("Handling delay is " + (System.currentTimeMillis - 
creationTime) + " ms")
-      }
-    }
-    handleMessageExecutor.execute(runnable)
-    /* handleMessage(connection, message) */
-  }
-
-  private def handleClientAuthentication(
-      waitingConn: SendingConnection,
-      securityMsg: SecurityMessage,
-      connectionId : ConnectionId) {
-    if (waitingConn.isSaslComplete()) {
-      logDebug("Client sasl completed for id: "  + waitingConn.connectionId)
-      connectionsAwaitingSasl -= waitingConn.connectionId
-      waitingConn.getAuthenticated().synchronized {
-        waitingConn.getAuthenticated().notifyAll()
-      }
-      return
-    } else {
-      var replyToken : Array[Byte] = null
-      try {
-        replyToken = 
waitingConn.sparkSaslClient.saslResponse(securityMsg.getToken)
-        if (waitingConn.isSaslComplete()) {
-          logDebug("Client sasl completed after evaluate for id: " + 
waitingConn.connectionId)
-          connectionsAwaitingSasl -= waitingConn.connectionId
-          waitingConn.getAuthenticated().synchronized {
-            waitingConn.getAuthenticated().notifyAll()
-          }
-          return
-        }
-        val securityMsgResp = SecurityMessage.fromResponse(replyToken,
-          securityMsg.getConnectionId.toString)
-        val message = securityMsgResp.toBufferMessage
-        if (message == null) throw new IOException("Error creating security 
message")
-        sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(), 
message)
-      } catch  {
-        case e: Exception => {
-          logError("Error handling sasl client authentication", e)
-          waitingConn.close()
-          throw new IOException("Error evaluating sasl response: ", e)
-        }
-      }
-    }
-  }
-
-  private def handleServerAuthentication(
-      connection: Connection,
-      securityMsg: SecurityMessage,
-      connectionId: ConnectionId) {
-    if (!connection.isSaslComplete()) {
-      logDebug("saslContext not established")
-      var replyToken : Array[Byte] = null
-      try {
-        connection.synchronized {
-          if (connection.sparkSaslServer == null) {
-            logDebug("Creating sasl Server")
-            connection.sparkSaslServer = new SparkSaslServer(securityManager)
-          }
-        }
-        replyToken = connection.sparkSaslServer.response(securityMsg.getToken)
-        if (connection.isSaslComplete()) {
-          logDebug("Server sasl completed: " + connection.connectionId)
-        } else {
-          logDebug("Server sasl not completed: " + connection.connectionId)
-        }
-        if (replyToken != null) {
-          val securityMsgResp = SecurityMessage.fromResponse(replyToken,
-            securityMsg.getConnectionId)
-          val message = securityMsgResp.toBufferMessage
-          if (message == null) throw new Exception("Error creating security 
Message")
-          sendSecurityMessage(connection.getRemoteConnectionManagerId(), 
message)
-        }
-      } catch {
-        case e: Exception => {
-          logError("Error in server auth negotiation: " + e)
-          // It would probably be better to send an error message telling 
other side auth failed
-          // but for now just close
-          connection.close()
-        }
-      }
-    } else {
-      logDebug("connection already established for this connection id: " + 
connection.connectionId)
-    }
-  }
-
-
-  private def handleAuthentication(conn: Connection, bufferMessage: 
BufferMessage): Boolean = {
-    if (bufferMessage.isSecurityNeg) {
-      logDebug("This is security neg message")
-
-      // parse as SecurityMessage
-      val securityMsg = SecurityMessage.fromBufferMessage(bufferMessage)
-      val connectionId = 
ConnectionId.createConnectionIdFromString(securityMsg.getConnectionId)
-
-      connectionsAwaitingSasl.get(connectionId) match {
-        case Some(waitingConn) => {
-          // Client - this must be in response to us doing Send
-          logDebug("Client handleAuth for id: " +  waitingConn.connectionId)
-          handleClientAuthentication(waitingConn, securityMsg, connectionId)
-        }
-        case None => {
-          // Server - someone sent us something and we haven't authenticated 
yet
-          logDebug("Server handleAuth for id: " + connectionId)
-          handleServerAuthentication(conn, securityMsg, connectionId)
-        }
-      }
-      return true
-    } else {
-      if (!conn.isSaslComplete()) {
-        // We could handle this better and tell the client we need to do 
authentication
-        // negotiation, but for now just ignore them.
-        logError("message sent that is not security negotiation message on 
connection " +
-                 "not authenticated yet, ignoring it!!")
-        return true
-      }
-    }
-    false
-  }
-
-  private def handleMessage(
-      connectionManagerId: ConnectionManagerId,
-      message: Message,
-      connection: Connection) {
-    logDebug("Handling [" + message + "] from [" + connectionManagerId + "]")
-    message match {
-      case bufferMessage: BufferMessage => {
-        if (authEnabled) {
-          val res = handleAuthentication(connection, bufferMessage)
-          if (res) {
-            // message was security negotiation so skip the rest
-            logDebug("After handleAuth result was true, returning")
-            return
-          }
-        }
-        if (bufferMessage.hasAckId()) {
-          messageStatuses.synchronized {
-            messageStatuses.get(bufferMessage.ackId) match {
-              case Some(status) => {
-                messageStatuses -= bufferMessage.ackId
-                status.markDone(Some(message))
-              }
-              case None => {
-                /**
-                 * We can fall down on this code because of following 2 cases
-                 *
-                 * (1) Invalid ack sent due to buggy code.
-                 *
-                 * (2) Late-arriving ack for a SendMessageStatus
-                 *     To avoid unwilling late-arriving ack
-                 *     caused by long pause like GC, you can set
-                 *     larger value than default to 
spark.core.connection.ack.wait.timeout
-                 */
-                logWarning(s"Could not find reference for received ack Message 
${message.id}")
-              }
-            }
-          }
-        } else {
-          var ackMessage : Option[Message] = None
-          try {
-            ackMessage = if (onReceiveCallback != null) {
-              logDebug("Calling back")
-              onReceiveCallback(bufferMessage, connectionManagerId)
-            } else {
-              logDebug("Not calling back as callback is null")
-              None
-            }
-
-            if (ackMessage.isDefined) {
-              if (!ackMessage.get.isInstanceOf[BufferMessage]) {
-                logDebug("Response to " + bufferMessage + " is not a buffer 
message, it is of type "
-                  + ackMessage.get.getClass)
-              } else if (!ackMessage.get.asInstanceOf[BufferMessage].hasAckId) 
{
-                logDebug("Response to " + bufferMessage + " does not have ack 
id set")
-                ackMessage.get.asInstanceOf[BufferMessage].ackId = 
bufferMessage.id
-              }
-            }
-          } catch {
-            case e: Exception => {
-              logError(s"Exception was thrown while processing message", e)
-              val m = Message.createBufferMessage(bufferMessage.id)
-              m.hasError = true
-              ackMessage = Some(m)
-            }
-          } finally {
-            sendMessage(connectionManagerId, ackMessage.getOrElse {
-              Message.createBufferMessage(bufferMessage.id)
-            })
-          }
-        }
-      }
-      case _ => throw new Exception("Unknown type message received")
-    }
-  }
-
-  private def checkSendAuthFirst(connManagerId: ConnectionManagerId, conn: 
SendingConnection) {
-    // see if we need to do sasl before writing
-    // this should only be the first negotiation as the Client!!!
-    if (!conn.isSaslComplete()) {
-      conn.synchronized {
-        if (conn.sparkSaslClient == null) {
-          conn.sparkSaslClient = new SparkSaslClient(securityManager)
-          var firstResponse: Array[Byte] = null
-          try {
-            firstResponse = conn.sparkSaslClient.firstToken()
-            val securityMsg = SecurityMessage.fromResponse(firstResponse,
-              conn.connectionId.toString())
-            val message = securityMsg.toBufferMessage
-            if (message == null) throw new Exception("Error creating security 
message")
-            connectionsAwaitingSasl += ((conn.connectionId, conn))
-            sendSecurityMessage(connManagerId, message)
-            logDebug("adding connectionsAwaitingSasl id: " + conn.connectionId)
-          } catch {
-            case e: Exception => {
-              logError("Error getting first response from the SaslClient.", e)
-              conn.close()
-              throw new Exception("Error getting first response from the 
SaslClient")
-            }
-          }
-        }
-      }
-    } else {
-      logDebug("Sasl already established ")
-    }
-  }
-
-  // allow us to add messages to the inbox for doing sasl negotiating
-  private def sendSecurityMessage(connManagerId: ConnectionManagerId, message: 
Message) {
-    def startNewConnection(): SendingConnection = {
-      val inetSocketAddress = new InetSocketAddress(connManagerId.host, 
connManagerId.port)
-      val newConnectionId = new ConnectionId(id, 
idCount.getAndIncrement.intValue)
-      val newConnection = new SendingConnection(inetSocketAddress, selector, 
connManagerId,
-        newConnectionId)
-      logInfo("creating new sending connection for security! " + 
newConnectionId )
-      registerRequests.enqueue(newConnection)
-
-      newConnection
-    }
-    // I removed the lookupKey stuff as part of merge ... should I re-add it ?
-    // We did not find it useful in our test-env ...
-    // If we do re-add it, we should consistently use it everywhere I guess ?
-    message.senderAddress = id.toSocketAddress()
-    logTrace("Sending Security [" + message + "] to [" + connManagerId + "]")
-    val connection = connectionsById.getOrElseUpdate(connManagerId, 
startNewConnection())
-
-    // send security message until going connection has been authenticated
-    connection.send(message)
-
-    wakeupSelector()
-  }
-
-  private def sendMessage(connectionManagerId: ConnectionManagerId, message: 
Message) {
-    def startNewConnection(): SendingConnection = {
-      val inetSocketAddress = new InetSocketAddress(connectionManagerId.host,
-        connectionManagerId.port)
-      val newConnectionId = new ConnectionId(id, 
idCount.getAndIncrement.intValue)
-      val newConnection = new SendingConnection(inetSocketAddress, selector, 
connectionManagerId,
-        newConnectionId)
-      logTrace("creating new sending connection: " + newConnectionId)
-      registerRequests.enqueue(newConnection)
-
-      newConnection
-    }
-    val connection = connectionsById.getOrElseUpdate(connectionManagerId, 
startNewConnection())
-    if (authEnabled) {
-      checkSendAuthFirst(connectionManagerId, connection)
-    }
-    message.senderAddress = id.toSocketAddress()
-    logDebug("Before Sending [" + message + "] to [" + connectionManagerId + 
"]" + " " +
-      "connectionid: "  + connection.connectionId)
-
-    if (authEnabled) {
-      // if we aren't authenticated yet lets block the senders until 
authentication completes
-      try {
-        connection.getAuthenticated().synchronized {
-          val clock = SystemClock
-          val startTime = clock.getTime()
-
-          while (!connection.isSaslComplete()) {
-            logDebug("getAuthenticated wait connectionid: " + 
connection.connectionId)
-            // have timeout in case remote side never responds
-            connection.getAuthenticated().wait(500)
-            if (((clock.getTime() - startTime) >= (authTimeout * 1000))
-              && (!connection.isSaslComplete())) {
-              // took to long to authenticate the connection, something 
probably went wrong
-              throw new Exception("Took to long for authentication to " + 
connectionManagerId +
-                ", waited " + authTimeout + "seconds, failing.")
-            }
-          }
-        }
-      } catch {
-        case e: Exception => logError("Exception while waiting for 
authentication.", e)
-
-        // need to tell sender it failed
-        messageStatuses.synchronized {
-          val s = messageStatuses.get(message.id)
-          s match {
-            case Some(msgStatus) => {
-              messageStatuses -= message.id
-              logInfo("Notifying " + msgStatus.connectionManagerId)
-              msgStatus.markDone(None)
-            }
-            case None => {
-              logError("no messageStatus for failed message id: " + message.id)
-            }
-          }
-        }
-      }
-    }
-    logDebug("Sending [" + message + "] to [" + connectionManagerId + "]")
-    connection.send(message)
-
-    wakeupSelector()
-  }
-
-  private def wakeupSelector() {
-    selector.wakeup()
-  }
-
-  /**
-   * Send a message and block until an acknowldgment is received or an error 
occurs.
-   * @param connectionManagerId the message's destination
-   * @param message the message being sent
-   * @return a Future that either returns the acknowledgment message or 
captures an exception.
-   */
-  def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: 
Message)
-      : Future[Message] = {
-    val promise = Promise[Message]()
-
-    val timeoutTask = new TimerTask {
-      override def run(): Unit = {
-        messageStatuses.synchronized {
-          messageStatuses.remove(message.id).foreach ( s => {
-            promise.failure(
-              new IOException("sendMessageReliably failed because ack " +
-                s"was not received within $ackTimeout sec"))
-          })
-        }
-      }
-    }
-
-    val status = new MessageStatus(message, connectionManagerId, s => {
-      timeoutTask.cancel()
-      s.ackMessage match {
-        case None => // Indicates a failure where we either never sent or 
never got ACK'd
-          promise.failure(new IOException("sendMessageReliably failed without 
being ACK'd"))
-        case Some(ackMessage) =>
-          if (ackMessage.hasError) {
-            promise.failure(
-              new IOException("sendMessageReliably failed with ACK that 
signalled a remote error"))
-          } else {
-            promise.success(ackMessage)
-          }
-      }
-    })
-    messageStatuses.synchronized {
-      messageStatuses += ((message.id, status))
-    }
-
-    ackTimeoutMonitor.schedule(timeoutTask, ackTimeout * 1000)
-    sendMessage(connectionManagerId, message)
-    promise.future
-  }
-
-  def onReceiveMessage(callback: (Message, ConnectionManagerId) => 
Option[Message]) {
-    onReceiveCallback = callback
-  }
-
-  def stop() {
-    ackTimeoutMonitor.cancel()
-    selectorThread.interrupt()
-    selectorThread.join()
-    selector.close()
-    val connections = connectionsByKey.values
-    connections.foreach(_.close())
-    if (connectionsByKey.size != 0) {
-      logWarning("All connections not cleaned up")
-    }
-    handleMessageExecutor.shutdown()
-    handleReadWriteExecutor.shutdown()
-    handleConnectExecutor.shutdown()
-    logInfo("ConnectionManager stopped")
-  }
-}
-
-
-private[spark] object ConnectionManager {
-  import ExecutionContext.Implicits.global
-
-  def main(args: Array[String]) {
-    val conf = new SparkConf
-    val manager = new ConnectionManager(9999, conf, new SecurityManager(conf))
-    manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
-      println("Received [" + msg + "] from [" + id + "]")
-      None
-    })
-
-    /* testSequentialSending(manager) */
-    /* System.gc() */
-
-    /* testParallelSending(manager) */
-    /* System.gc() */
-
-    /* testParallelDecreasingSending(manager) */
-    /* System.gc() */
-
-    testContinuousSending(manager)
-    System.gc()
-  }
-
-  def testSequentialSending(manager: ConnectionManager) {
-    println("--------------------------")
-    println("Sequential Sending")
-    println("--------------------------")
-    val size = 10 * 1024 * 1024
-    val count = 10
-
-    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
-    buffer.flip
-
-    (0 until count).map(i => {
-      val bufferMessage = Message.createBufferMessage(buffer.duplicate)
-      Await.result(manager.sendMessageReliably(manager.id, bufferMessage), 
Duration.Inf)
-    })
-    println("--------------------------")
-    println()
-  }
-
-  def testParallelSending(manager: ConnectionManager) {
-    println("--------------------------")
-    println("Parallel Sending")
-    println("--------------------------")
-    val size = 10 * 1024 * 1024
-    val count = 10
-
-    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
-    buffer.flip
-
-    val startTime = System.currentTimeMillis
-    (0 until count).map(i => {
-      val bufferMessage = Message.createBufferMessage(buffer.duplicate)
-      manager.sendMessageReliably(manager.id, bufferMessage)
-    }).foreach(f => {
-      f.onFailure {
-        case e => println("Failed due to " + e)
-      }
-      Await.ready(f, 1 second)
-    })
-    val finishTime = System.currentTimeMillis
-
-    val mb = size * count / 1024.0 / 1024.0
-    val ms = finishTime - startTime
-    val tput = mb * 1000.0 / ms
-    println("--------------------------")
-    println("Started at " + startTime + ", finished at " + finishTime)
-    println("Sent " + count + " messages of size " + size + " in " + ms + " ms 
" +
-      "(" + tput + " MB/s)")
-    println("--------------------------")
-    println()
-  }
-
-  def testParallelDecreasingSending(manager: ConnectionManager) {
-    println("--------------------------")
-    println("Parallel Decreasing Sending")
-    println("--------------------------")
-    val size = 10 * 1024 * 1024
-    val count = 10
-    val buffers = Array.tabulate(count) { i =>
-      val bufferLen = size * (i + 1)
-      val bufferContent = Array.tabulate[Byte](bufferLen)(x => x.toByte)
-      ByteBuffer.allocate(bufferLen).put(bufferContent)
-    }
-    buffers.foreach(_.flip)
-    val mb = buffers.map(_.remaining).reduceLeft(_ + _) / 1024.0 / 1024.0
-
-    val startTime = System.currentTimeMillis
-    (0 until count).map(i => {
-      val bufferMessage = Message.createBufferMessage(buffers(count - 1 - 
i).duplicate)
-      manager.sendMessageReliably(manager.id, bufferMessage)
-    }).foreach(f => {
-      f.onFailure {
-        case e => println("Failed due to " + e)
-      }
-      Await.ready(f, 1 second)
-    })
-    val finishTime = System.currentTimeMillis
-
-    val ms = finishTime - startTime
-    val tput = mb * 1000.0 / ms
-    println("--------------------------")
-    /* println("Started at " + startTime + ", finished at " + finishTime) */
-    println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)")
-    println("--------------------------")
-    println()
-  }
-
-  def testContinuousSending(manager: ConnectionManager) {
-    println("--------------------------")
-    println("Continuous Sending")
-    println("--------------------------")
-    val size = 10 * 1024 * 1024
-    val count = 10
-
-    val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => 
x.toByte))
-    buffer.flip
-
-    val startTime = System.currentTimeMillis
-    while(true) {
-      (0 until count).map(i => {
-          val bufferMessage = Message.createBufferMessage(buffer.duplicate)
-          manager.sendMessageReliably(manager.id, bufferMessage)
-        }).foreach(f => {
-          f.onFailure {
-            case e => println("Failed due to " + e)
-          }
-          Await.ready(f, 1 second)
-        })
-      val finishTime = System.currentTimeMillis
-      Thread.sleep(1000)
-      val mb = size * count / 1024.0 / 1024.0
-      val ms = finishTime - startTime
-      val tput = mb * 1000.0 / ms
-      println("Sent " + mb + " MB in " + ms + " ms (" + tput + " MB/s)")
-      println("--------------------------")
-      println()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/08ce1888/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala 
b/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
deleted file mode 100644
index 57f7586..0000000
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.network
-
-import java.net.InetSocketAddress
-
-import org.apache.spark.util.Utils
-
-private[spark] case class ConnectionManagerId(host: String, port: Int) {
-  // DEBUG code
-  Utils.checkHost(host)
-  assert (port > 0)
-
-  def toSocketAddress() = new InetSocketAddress(host, port)
-}
-
-
-private[spark] object ConnectionManagerId {
-  def fromSocketAddress(socketAddress: InetSocketAddress): ConnectionManagerId 
= {
-    new ConnectionManagerId(socketAddress.getHostName, socketAddress.getPort)
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to