Repository: spark Updated Branches: refs/heads/master e056320cc -> 4d8ae709f
Cleanup on Connection and ConnectionManager Simple cleanup on Connection and ConnectionManager to make IDE happy while working of issue: 1. Replace var with var 2. Add parentheses to Queue#dequeu to be consistent with side-effects. 3. Remove return on final line of a method. Author: Henry Saputra <[email protected]> Closes #1060 from hsaputra/cleanup_connection_classes and squashes the following commits: 245fd09 [Henry Saputra] Cleanup on Connection and ConnectionManager to make IDE happy while working of issue: 1. Replace var with var 2. Add parentheses to Queue#dequeu to be consistent with side-effects. 3. Remove return on final line of a method. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d8ae709 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d8ae709 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d8ae709 Branch: refs/heads/master Commit: 4d8ae709fb8d986634c97d21036391ed4685db1a Parents: e056320 Author: Henry Saputra <[email protected]> Authored: Wed Jun 11 23:17:51 2014 -0700 Committer: Reynold Xin <[email protected]> Committed: Wed Jun 11 23:17:51 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/network/Connection.scala | 4 ++-- .../apache/spark/network/ConnectionManager.scala | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4d8ae709/core/src/main/scala/org/apache/spark/network/Connection.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/Connection.scala b/core/src/main/scala/org/apache/spark/network/Connection.scala index 3ffaaab..3b6298a 100644 --- a/core/src/main/scala/org/apache/spark/network/Connection.scala +++ b/core/src/main/scala/org/apache/spark/network/Connection.scala @@ -210,7 +210,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, var nextMessageToBeUsed = 0 def addMessage(message: Message) { - messages.synchronized{ + messages.synchronized { /* messages += message */ messages.enqueue(message) logDebug("Added [" + message + "] to outbox for sending to " + @@ -223,7 +223,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector, while (!messages.isEmpty) { /* nextMessageToBeUsed = nextMessageToBeUsed % messages.size */ /* val message = messages(nextMessageToBeUsed) */ - val message = messages.dequeue + val message = messages.dequeue() val chunk = message.getChunkForSending(defaultChunkSize) if (chunk.isDefined) { messages.enqueue(message) http://git-wip-us.apache.org/repos/asf/spark/blob/4d8ae709/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index 5dd5fd0..cf1c985 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -250,14 +250,14 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, try { while(!selectorThread.isInterrupted) { while (! registerRequests.isEmpty) { - val conn: SendingConnection = registerRequests.dequeue + val conn: SendingConnection = registerRequests.dequeue() addListeners(conn) conn.connect() addConnection(conn) } while(!keyInterestChangeRequests.isEmpty) { - val (key, ops) = keyInterestChangeRequests.dequeue + val (key, ops) = keyInterestChangeRequests.dequeue() try { if (key.isValid) { @@ -532,9 +532,9 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, } return } - var securityMsgResp = SecurityMessage.fromResponse(replyToken, + val securityMsgResp = SecurityMessage.fromResponse(replyToken, securityMsg.getConnectionId.toString()) - var message = securityMsgResp.toBufferMessage + val message = securityMsgResp.toBufferMessage if (message == null) throw new Exception("Error creating security message") sendSecurityMessage(waitingConn.getRemoteConnectionManagerId(), message) } catch { @@ -568,9 +568,9 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, logDebug("Server sasl not completed: " + connection.connectionId) } if (replyToken != null) { - var securityMsgResp = SecurityMessage.fromResponse(replyToken, + val securityMsgResp = SecurityMessage.fromResponse(replyToken, securityMsg.getConnectionId) - var message = securityMsgResp.toBufferMessage + val message = securityMsgResp.toBufferMessage if (message == null) throw new Exception("Error creating security Message") sendSecurityMessage(connection.getRemoteConnectionManagerId(), message) } @@ -618,7 +618,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, return true } } - return false + false } private def handleMessage( @@ -694,9 +694,9 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, var firstResponse: Array[Byte] = null try { firstResponse = conn.sparkSaslClient.firstToken() - var securityMsg = SecurityMessage.fromResponse(firstResponse, + val securityMsg = SecurityMessage.fromResponse(firstResponse, conn.connectionId.toString()) - var message = securityMsg.toBufferMessage + val message = securityMsg.toBufferMessage if (message == null) throw new Exception("Error creating security message") connectionsAwaitingSasl += ((conn.connectionId, conn)) sendSecurityMessage(connManagerId, message)
