Author: gdusbabek
Date: Wed May 26 18:46:56 2010
New Revision: 948533

URL: http://svn.apache.org/viewvc?rev=948533&view=rev
Log:
merge from 0.6

Modified:
    cassandra/trunk/   (props changed)
    cassandra/trunk/CHANGES.txt
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
   (props changed)
    
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
   (props changed)
    cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java

Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed May 26 18:46:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-947886
+/cassandra/branches/cassandra-0.6:922689-948531
 /incubator/cassandra/branches/cassandra-0.3:774578-796573
 /incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
 /incubator/cassandra/branches/cassandra-0.5:888872-915439

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=948533&r1=948532&r2=948533&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Wed May 26 18:46:56 2010
@@ -25,6 +25,10 @@ dev
  * http mini-interface using mx4j (CASSANDRA-1068)
 
 
+0.6.3
+ * retry to make streaming connections up to 8 times. (CASSANDRA-1019)
+
+
 0.6.2
  * fix contrib/word_count build. (CASSANDRA-992)
  * split CommitLogExecutorService into BatchCommitLogExecutorService and 
@@ -53,6 +57,7 @@ dev
  * make repair of RF==1 a no-op (CASSANDRA-1090)
  * improve default JVM GC options (CASSANDRA-1014)
  * fix SlicePredicate serialization inside Hadoop jobs (CASSANDRA-1049)
+ * close Thrift sockets in Hadoop ColumnFamilyRecordReader (CASSANDRA-1081)
 
 
 0.6.1

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed May 26 18:46:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-947886
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-948531
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
 
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Cassandra.java:888872-903502

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed May 26 18:46:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-947886
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-948531
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
 
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/Column.java:888872-903502

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed May 26 18:46:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-947886
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-948531
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
 
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:888872-903502

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed May 26 18:46:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-947886
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-948531
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
 
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:888872-903502

Propchange: 
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed May 26 18:46:56 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-947886
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-948531
 
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
 
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
 
/incubator/cassandra/branches/cassandra-0.5/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:888872-903502

Modified: cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java?rev=948533&r1=948532&r2=948533&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/net/FileStreamTask.java Wed 
May 26 18:46:56 2010
@@ -38,6 +38,8 @@ public class FileStreamTask extends Wrap
     private static Logger logger = LoggerFactory.getLogger( 
FileStreamTask.class );
     
     public static final int CHUNK_SIZE = 32*1024*1024;
+    // around 10 minutes at the default rpctimeout
+    public static final int MAX_CONNECT_ATTEMPTS = 8;
 
     private final String file;
     private final long startPosition;
@@ -54,11 +56,10 @@ public class FileStreamTask extends Wrap
     
     public void runMayThrow() throws IOException
     {
-        SocketChannel channel = SocketChannel.open();
-        // force local binding on correctly specified interface.
-        channel.socket().bind(new 
InetSocketAddress(FBUtilities.getLocalAddress(), 0));
-        // obey the unwritten law that all nodes on a cluster must use the 
same storage port.
-        channel.connect(new InetSocketAddress(to, 
DatabaseDescriptor.getStoragePort()));
+        SocketChannel channel = connect();
+
+        // successfully connected: stream.
+        // (at this point, if we fail, it is the receiver's job to re-request)
         try
         {
             stream(channel);
@@ -113,4 +114,41 @@ public class FileStreamTask extends Wrap
         }
     }
 
+    /**
+     * Connects to the destination, with backoff for failed attempts.
+     * TODO: all nodes on a cluster must currently use the same storage port
+     * @throws IOException If all attempts fail.
+     */
+    private SocketChannel connect() throws IOException
+    {
+        SocketChannel channel = SocketChannel.open();
+        // force local binding on correctly specified interface.
+        channel.socket().bind(new 
InetSocketAddress(FBUtilities.getLocalAddress(), 0));
+        int attempts = 0;
+        while (true)
+        {
+            try
+            {
+                channel.connect(new InetSocketAddress(to, 
DatabaseDescriptor.getStoragePort()));
+                // success
+                return channel;
+            }
+            catch (IOException e)
+            {
+                if (++attempts >= MAX_CONNECT_ATTEMPTS)
+                    throw e;
+
+                long waitms = DatabaseDescriptor.getRpcTimeout() * 
(long)Math.pow(2, attempts);
+                logger.warn("Failed attempt " + attempts + " to connect to " + 
to + " to stream " + file + ". Retrying in " + waitms + " ms. (" + e + ")");
+                try
+                {
+                    Thread.sleep(waitms);
+                }
+                catch (InterruptedException wtf)
+                {
+                    throw new RuntimeException(wtf);
+                }
+            }
+        }
+    }
 }


Reply via email to