Author: gdusbabek
Date: Wed May 26 18:40:29 2010
New Revision: 948531
URL: http://svn.apache.org/viewvc?rev=948531&view=rev
Log:
retry streaming connections up to 8 times. patch by stuhood, reviewed by
gdusbabek. CASSANDRA-1019
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/FileStreamTask.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=948531&r1=948530&r2=948531&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Wed May 26 18:40:29 2010
@@ -1,3 +1,6 @@
+0.6.3
+ * retry to make streaming connections up to 8 times. (CASSANDRA-1019)
+
0.6.2
* fix contrib/word_count build. (CASSANDRA-992)
* split CommitLogExecutorService into BatchCommitLogExecutorService and
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/FileStreamTask.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=948531&r1=948530&r2=948531&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/FileStreamTask.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/net/FileStreamTask.java
Wed May 26 18:40:29 2010
@@ -37,6 +37,8 @@ public class FileStreamTask extends Wrap
private static Logger logger = Logger.getLogger( FileStreamTask.class );
public static final int CHUNK_SIZE = 32*1024*1024;
+ // around 10 minutes at the default rpctimeout
+ public static final int MAX_CONNECT_ATTEMPTS = 8;
private final String file;
private final long startPosition;
@@ -53,11 +55,10 @@ public class FileStreamTask extends Wrap
public void runMayThrow() throws IOException
{
- SocketChannel channel = SocketChannel.open();
- // force local binding on correctly specified interface.
- channel.socket().bind(new
InetSocketAddress(FBUtilities.getLocalAddress(), 0));
- // obey the unwritten law that all nodes on a cluster must use the
same storage port.
- channel.connect(new InetSocketAddress(to,
DatabaseDescriptor.getStoragePort()));
+ SocketChannel channel = connect();
+
+ // successfully connected: stream.
+ // (at this point, if we fail, it is the receiver's job to re-request)
try
{
stream(channel);
@@ -112,4 +113,41 @@ public class FileStreamTask extends Wrap
}
}
+ /**
+ * Connects to the destination, with backoff for failed attempts.
+ * TODO: all nodes on a cluster must currently use the same storage port
+ * @throws IOException If all attempts fail.
+ */
+ private SocketChannel connect() throws IOException
+ {
+ SocketChannel channel = SocketChannel.open();
+ // force local binding on correctly specified interface.
+ channel.socket().bind(new
InetSocketAddress(FBUtilities.getLocalAddress(), 0));
+ int attempts = 0;
+ while (true)
+ {
+ try
+ {
+ channel.connect(new InetSocketAddress(to,
DatabaseDescriptor.getStoragePort()));
+ // success
+ return channel;
+ }
+ catch (IOException e)
+ {
+ if (++attempts >= MAX_CONNECT_ATTEMPTS)
+ throw e;
+
+ long waitms = DatabaseDescriptor.getRpcTimeout() *
(long)Math.pow(2, attempts);
+ logger.warn("Failed attempt " + attempts + " to connect to " +
to + " to stream " + file + ". Retrying in " + waitms + " ms. (" + e + ")");
+ try
+ {
+ Thread.sleep(waitms);
+ }
+ catch (InterruptedException wtf)
+ {
+ throw new RuntimeException(wtf);
+ }
+ }
+ }
+ }
}