Author: gdusbabek
Date: Mon Jan 25 21:26:28 2010
New Revision: 902981

URL: http://svn.apache.org/viewvc?rev=902981&view=rev
Log:
bind outgoing sockets to the locally specified cassandra interface (avoids 
using the result of InetAddress.anyLocalAddress(), which may not be the right 
cassandra interface). Patch by Gary Dusbabek, reviewed by Jonathan Ellis.

Modified:
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=902981&r1=902980&r2=902981&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java 
Mon Jan 25 21:26:28 2010
@@ -25,6 +25,7 @@
 import java.nio.channels.FileChannel;
 import java.nio.channels.SocketChannel;
 
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.log4j.Logger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -51,7 +52,11 @@
     
     public void runMayThrow() throws IOException
     {
-        SocketChannel channel = SocketChannel.open(new InetSocketAddress(to, 
DatabaseDescriptor.getStoragePort()));
+        SocketChannel channel = SocketChannel.open();
+        // force local binding on correctly specified interface.
+        channel.socket().bind(new 
InetSocketAddress(FBUtilities.getLocalAddress(), 0));
+        // obey the unwritten law that all nodes on a cluster must use the 
same storage port.
+        channel.connect(new InetSocketAddress(to, 
DatabaseDescriptor.getStoragePort()));
         try
         {
             stream(channel);

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java?rev=902981&r1=902980&r2=902981&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
 Mon Jan 25 21:26:28 2010
@@ -1,148 +1,150 @@
-package org.apache.cassandra.net;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.log4j.Logger;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-
-public class OutboundTcpConnection extends Thread
-{
-    private static final Logger logger = 
Logger.getLogger(OutboundTcpConnection.class);
-
-    private static final ByteBuffer CLOSE_SENTINEL = ByteBuffer.allocate(0);
-    private static final int OPEN_RETRY_DELAY = 100; // ms between retries
-
-    private final OutboundTcpConnectionPool pool;
-    private final InetAddress endpoint;
-    private final BlockingQueue<ByteBuffer> queue = new 
LinkedBlockingQueue<ByteBuffer>();
-    private DataOutputStream output;
-    private Socket socket;
-
-    public OutboundTcpConnection(final OutboundTcpConnectionPool pool, final 
InetAddress remoteEp)
-    {
-        super("WRITE-" + remoteEp);
-        this.pool = pool;
-        this.endpoint = remoteEp;
-    }
-
-    public void write(ByteBuffer buffer)
-    {
-        try
-        {
-            queue.put(buffer);
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-    }
-
-    public void closeSocket()
-    {
-        queue.clear();
-        write(CLOSE_SENTINEL);
-    }
-
-    public void run()
-    {
-        while (true)
-        {
-            ByteBuffer bb = take();
-            if (bb == CLOSE_SENTINEL)
-            {
-                disconnect();
-                continue;
-            }
-            if (socket != null || connect())
-                writeConnected(bb);
-        }
-    }
-
-    private void writeConnected(ByteBuffer bb)
-    {
-        try
-        {
-            output.write(bb.array(), 0, bb.limit());
-            if (queue.peek() == null)
-            {
-                output.flush();
-            }
-        }
-        catch (IOException e)
-        {
-            logger.info("error writing to " + endpoint);
-            disconnect();
-        }
-    }
-
-    private void disconnect()
-    {
-        if (socket != null)
-        {
-            try
-            {
-                socket.close();
-            }
-            catch (IOException e)
-            {
-                if (logger.isDebugEnabled())
-                    logger.debug("exception closing connection to " + 
endpoint, e);
-            }
-            output = null;
-            socket = null;
-        }
-    }
-
-    private ByteBuffer take()
-    {
-        ByteBuffer bb;
-        try
-        {
-            bb = queue.take();
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-        return bb;
-    }
-
-    private boolean connect()
-    {
-        if (logger.isDebugEnabled())
-            logger.debug("attempting to connect to " + endpoint);
-        long start = System.currentTimeMillis();
-        while (System.currentTimeMillis() < start + 
DatabaseDescriptor.getRpcTimeout())
-        {
-            try
-            {
-                socket = new Socket(endpoint, 
DatabaseDescriptor.getStoragePort());
-                socket.setTcpNoDelay(true);
-                output = new DataOutputStream(socket.getOutputStream());
-                return true;
-            }
-            catch (IOException e)
-            {
-                socket = null;
-                if (logger.isTraceEnabled())
-                    logger.trace("unable to connect to " + endpoint, e);
-                try
-                {
-                    Thread.sleep(OPEN_RETRY_DELAY);
-                }
-                catch (InterruptedException e1)
-                {
-                    throw new AssertionError(e1);
-                }
-            }
-        }
-        return false;
-    }
-}
+package org.apache.cassandra.net;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+public class OutboundTcpConnection extends Thread
+{
+    private static final Logger logger = 
Logger.getLogger(OutboundTcpConnection.class);
+
+    private static final ByteBuffer CLOSE_SENTINEL = ByteBuffer.allocate(0);
+    private static final int OPEN_RETRY_DELAY = 100; // ms between retries
+
+    private final OutboundTcpConnectionPool pool;
+    private final InetAddress endpoint;
+    private final BlockingQueue<ByteBuffer> queue = new 
LinkedBlockingQueue<ByteBuffer>();
+    private DataOutputStream output;
+    private Socket socket;
+
+    public OutboundTcpConnection(final OutboundTcpConnectionPool pool, final 
InetAddress remoteEp)
+    {
+        super("WRITE-" + remoteEp);
+        this.pool = pool;
+        this.endpoint = remoteEp;
+    }
+
+    public void write(ByteBuffer buffer)
+    {
+        try
+        {
+            queue.put(buffer);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
+    public void closeSocket()
+    {
+        queue.clear();
+        write(CLOSE_SENTINEL);
+    }
+
+    public void run()
+    {
+        while (true)
+        {
+            ByteBuffer bb = take();
+            if (bb == CLOSE_SENTINEL)
+            {
+                disconnect();
+                continue;
+            }
+            if (socket != null || connect())
+                writeConnected(bb);
+        }
+    }
+
+    private void writeConnected(ByteBuffer bb)
+    {
+        try
+        {
+            output.write(bb.array(), 0, bb.limit());
+            if (queue.peek() == null)
+            {
+                output.flush();
+            }
+        }
+        catch (IOException e)
+        {
+            logger.info("error writing to " + endpoint);
+            disconnect();
+        }
+    }
+
+    private void disconnect()
+    {
+        if (socket != null)
+        {
+            try
+            {
+                socket.close();
+            }
+            catch (IOException e)
+            {
+                if (logger.isDebugEnabled())
+                    logger.debug("exception closing connection to " + 
endpoint, e);
+            }
+            output = null;
+            socket = null;
+        }
+    }
+
+    private ByteBuffer take()
+    {
+        ByteBuffer bb;
+        try
+        {
+            bb = queue.take();
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+        return bb;
+    }
+
+    private boolean connect()
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("attempting to connect to " + endpoint);
+        long start = System.currentTimeMillis();
+        while (System.currentTimeMillis() < start + 
DatabaseDescriptor.getRpcTimeout())
+        {
+            try
+            {
+                // zero means 'bind on any available port.'
+                socket = new Socket(endpoint, 
DatabaseDescriptor.getStoragePort(), FBUtilities.getLocalAddress(), 0);
+                socket.setTcpNoDelay(true);
+                output = new DataOutputStream(socket.getOutputStream());
+                return true;
+            }
+            catch (IOException e)
+            {
+                socket = null;
+                if (logger.isTraceEnabled())
+                    logger.trace("unable to connect to " + endpoint, e);
+                try
+                {
+                    Thread.sleep(OPEN_RETRY_DELAY);
+                }
+                catch (InterruptedException e1)
+                {
+                    throw new AssertionError(e1);
+                }
+            }
+        }
+        return false;
+    }
+}


Reply via email to