Author: gdusbabek
Date: Wed Dec 30 23:12:53 2009
New Revision: 894732
URL: http://svn.apache.org/viewvc?rev=894732&view=rev
Log:
CASSANDRA-651 TcpConnectionManager was holding on to disconnected connections,
giving the false indication they were being used. Patch by Jonathan Ellis and
Gary Dusbabek, reviewed by same. Merged from trunk.
Modified:
incubator/cassandra/branches/cassandra-0.5/CHANGES.txt
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/net/MessagingService.java
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/net/TcpConnection.java
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/net/TcpConnectionManager.java
Modified: incubator/cassandra/branches/cassandra-0.5/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/CHANGES.txt?rev=894732&r1=894731&r2=894732&view=diff
==============================================================================
--- incubator/cassandra/branches/cassandra-0.5/CHANGES.txt (original)
+++ incubator/cassandra/branches/cassandra-0.5/CHANGES.txt Wed Dec 30 23:12:53
2009
@@ -3,6 +3,8 @@
(CASSANDRA-647, CASSANDRA-649)
* expose java.util.concurrent.TimeoutException in StorageProxy methods
(CASSANDRA-600)
+ * TcpConnectionManager was holding on to disconnected connections,
+ giving the false indication they were being used. (CASSANDRA-651)
0.5.0 RC1
Modified:
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/net/MessagingService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/net/MessagingService.java?rev=894732&r1=894731&r2=894732&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/net/MessagingService.java
(original)
+++
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/net/MessagingService.java
Wed Dec 30 23:12:53 2009
@@ -20,6 +20,8 @@
import org.apache.cassandra.concurrent.*;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
import org.apache.cassandra.net.io.SerializerType;
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.utils.*;
@@ -27,7 +29,6 @@
import java.io.IOException;
import java.net.ServerSocket;
-import java.net.SocketException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@@ -41,7 +42,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
-public class MessagingService
+public class MessagingService implements IFailureDetectionEventListener
{
private static int version_ = 1;
//TODO: make this parameter dynamic somehow. Not sure if config is
appropriate.
@@ -184,7 +185,14 @@
}
return result;
}
-
+
+ /** called by failure detection code to notify that housekeeping should be
performed on downed sockets. */
+ public void convict(InetAddress ep)
+ {
+ logger_.debug("Canceling pool for " + ep);
+ getConnectionPool(FBUtilities.getLocalAddress(), ep).reset();
+ }
+
/**
* Listen on the specified port.
* @param localEp InetAddress whose port to listen on.
@@ -200,7 +208,8 @@
SelectionKey key =
SelectorManager.getSelectorManager().register(serverChannel, handler,
SelectionKey.OP_ACCEPT);
endPoints_.add(localEp);
- listenSockets_.put(localEp, key);
+ listenSockets_.put(localEp, key);
+ FailureDetector.instance().registerFailureDetectionEventListener(this);
}
/**
@@ -411,12 +420,6 @@
connection =
MessagingService.getConnection(processedMessage.getFrom(), to, message);
connection.write(message);
}
- catch (SocketException se)
- {
- // Shutting down the entire pool. May be too conservative an
approach.
- MessagingService.getConnectionPool(message.getFrom(),
to).shutdown();
- logger_.error("socket error writing to " + to, se);
- }
catch (IOException e)
{
if (connection != null)
@@ -493,6 +496,7 @@
logger_.info("Shutting down ...");
synchronized (MessagingService.class)
{
+
FailureDetector.instance().unregisterFailureDetectionEventListener(MessagingService.instance());
/* Stop listening on any TCP socket */
for (SelectionKey skey : listenSockets_.values())
{
Modified:
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/net/TcpConnection.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/net/TcpConnection.java?rev=894732&r1=894731&r2=894732&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/net/TcpConnection.java
(original)
+++
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/net/TcpConnection.java
Wed Dec 30 23:12:53 2009
@@ -317,7 +317,7 @@
cancel(key_);
pendingWrites_.clear();
if (pool_ != null)
- pool_.destroy(this);
+ pool_.reset();
}
private void cancel(SelectionKey key)
Modified:
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/net/TcpConnectionManager.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/net/TcpConnectionManager.java?rev=894732&r1=894731&r2=894732&view=diff
==============================================================================
---
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/net/TcpConnectionManager.java
(original)
+++
incubator/cassandra/branches/cassandra-0.5/src/java/org/apache/cassandra/net/TcpConnectionManager.java
Wed Dec 30 23:12:53 2009
@@ -64,24 +64,12 @@
}
}
- synchronized void shutdown()
+ synchronized void reset()
{
for (TcpConnection con : new TcpConnection[] { cmdCon, ackCon })
if (con != null)
con.closeSocket();
- }
-
- synchronized void destroy(TcpConnection con)
- {
- assert con != null;
- if (cmdCon == con)
- {
- cmdCon = null;
- }
- else
- {
- assert ackCon == con;
- ackCon = null;
- }
+ cmdCon = null;
+ ackCon = null;
}
}