Author: slebresne
Date: Mon Sep 19 15:32:49 2011
New Revision: 1172665
URL: http://svn.apache.org/viewvc?rev=1172665&view=rev
Log:
merge from 1.0.0
Modified:
cassandra/branches/cassandra-1.0/ (props changed)
cassandra/branches/cassandra-1.0/CHANGES.txt
cassandra/branches/cassandra-1.0/contrib/ (props changed)
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/StreamOutSession.java
Propchange: cassandra/branches/cassandra-1.0/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:32:49 2011
@@ -5,7 +5,7 @@
/cassandra/branches/cassandra-0.8.0:1125021-1130369
/cassandra/branches/cassandra-0.8.1:1101014-1125018
/cassandra/branches/cassandra-1.0:1167106,1167185
-/cassandra/branches/cassandra-1.0.0:1167104-1171098,1172597
+/cassandra/branches/cassandra-1.0.0:1167104-1171098,1172597,1172663
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1:1102511-1125020
/cassandra/trunk:1167085-1167102,1169870
Modified: cassandra/branches/cassandra-1.0/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/CHANGES.txt?rev=1172665&r1=1172664&r2=1172665&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0/CHANGES.txt Mon Sep 19 15:32:49 2011
@@ -5,6 +5,9 @@
1.0.0-rc1
* Fix counting CFMetadata towards Memtable liveRatio (CASSANDRA-3023)
* Log message when a full repair operation completes (CASSANDRA-3207)
+ * Fix streamOutSession keeping sstables references forever if the remote end
+ dies (CASSANDRA-3216)
+
1.0.0-beta1
* removed binarymemtable (CASSANDRA-2692)
Propchange: cassandra/branches/cassandra-1.0/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:32:49 2011
@@ -5,7 +5,7 @@
/cassandra/branches/cassandra-0.8.0/contrib:1125021-1130369
/cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
/cassandra/branches/cassandra-1.0/contrib:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/contrib:1167104-1171098,1172597
+/cassandra/branches/cassandra-1.0.0/contrib:1167104-1171098,1172597,1172663
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/contrib:1102511-1125020
/cassandra/trunk/contrib:1167085-1167102,1169870
Propchange:
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:32:49 2011
@@ -5,7 +5,7 @@
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1171098,1172597
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167104-1171098,1172597,1172663
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1102511-1125020
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1167085-1167102,1169870
Propchange:
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:32:49 2011
@@ -5,7 +5,7 @@
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1171098,1172597
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167104-1171098,1172597,1172663
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1102511-1125020
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1167085-1167102,1169870
Propchange:
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:32:49 2011
@@ -5,7 +5,7 @@
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1171098,1172597
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167104-1171098,1172597,1172663
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1102511-1125020
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1167085-1167102,1169870
Propchange:
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:32:49 2011
@@ -5,7 +5,7 @@
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1171098,1172597
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167104-1171098,1172597,1172663
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1102511-1125020
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1167085-1167102,1169870
Propchange:
cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Sep 19 15:32:49 2011
@@ -5,7 +5,7 @@
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1130369
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
/cassandra/branches/cassandra-1.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167106,1167185
-/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1171098,1172597
+/cassandra/branches/cassandra-1.0.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167104-1171098,1172597,1172663
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
/cassandra/tags/cassandra-0.8.0-rc1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1102511-1125020
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1167085-1167102,1169870
Modified:
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=1172665&r1=1172664&r2=1172665&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/StreamOutSession.java
(original)
+++
cassandra/branches/cassandra-1.0/src/java/org/apache/cassandra/streaming/StreamOutSession.java
Mon Sep 19 15:32:49 2011
@@ -22,11 +22,14 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.*;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.Pair;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -34,7 +37,7 @@ import org.cliffc.high_scale_lib.NonBloc
/**
* This class manages the streaming of multiple files one after the other.
*/
-public class StreamOutSession
+public class StreamOutSession implements IEndpointStateChangeSubscriber,
IFailureDetectionEventListener
{
private static final Logger logger = LoggerFactory.getLogger(
StreamOutSession.class );
@@ -70,12 +73,15 @@ public class StreamOutSession
private final Pair<InetAddress, Long> context;
private final Runnable callback;
private volatile String currentFile;
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
private StreamOutSession(String table, Pair<InetAddress, Long> context,
Runnable callback)
{
this.table = table;
this.context = context;
this.callback = callback;
+ Gossiper.instance.register(this);
+ FailureDetector.instance.registerFailureDetectionEventListener(this);
}
public InetAddress getHost()
@@ -123,11 +129,30 @@ public class StreamOutSession
public void close()
{
- // Release reference on last file
+ close(true);
+ }
+
+ private void close(boolean success)
+ {
+ // Though unlikely, it is possible for close to be called multiple
+ // time, if the endpoint die at the exact wrong time for instance.
+ if (!isClosed.compareAndSet(false, true))
+ {
+ logger.debug("StreamOutSession {} already closed", getSessionId());
+ return;
+ }
+
+ Gossiper.instance.unregister(this);
+ FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+
+ // Release reference on last file (or any uncompleted ones)
for (PendingFile file : files.values())
file.sstable.releaseReference();
streams.remove(context);
- if (callback != null)
+ // Instead of just not calling the callback on failure, we could have
+ // allow to register a specific callback for failures, but we leave
+ // that to a future ticket (likely CASSANDRA-3112)
+ if (callback != null && success)
callback.run();
}
@@ -179,4 +204,32 @@ public class StreamOutSession
logger.debug("Files are {}", StringUtils.join(files.values(), ","));
MessagingService.instance().stream(header, getHost());
}
+
+ public void onJoin(InetAddress endpoint, EndpointState epState) {}
+ public void onChange(InetAddress endpoint, ApplicationState state,
VersionedValue value) {}
+ public void onAlive(InetAddress endpoint, EndpointState state) {}
+ public void onDead(InetAddress endpoint, EndpointState state) {}
+
+ public void onRemove(InetAddress endpoint)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void onRestart(InetAddress endpoint, EndpointState epState)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void convict(InetAddress endpoint, double phi)
+ {
+ if (!endpoint.equals(getHost()))
+ return;
+
+ // We want a higher confidence in the failure detection than usual
because failing a streaming wrongly has a high cost.
+ if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+ return;
+
+ logger.error("StreamOutSession {} failed because {} died or was
restarted/removed", endpoint);
+ close(false);
+ }
}