Author: slebresne
Date: Mon Sep 19 15:31:35 2011
New Revision: 1172663
URL: http://svn.apache.org/viewvc?rev=1172663&view=rev
Log:
Fix streamOutSession keeping sstables references forever if the remote end dies
patch by slebresne; reviewed by jbellis for CASSANDRA-3216
Modified:
cassandra/branches/cassandra-1.0.0/CHANGES.txt
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/streaming/StreamOutSession.java
Modified: cassandra/branches/cassandra-1.0.0/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/CHANGES.txt?rev=1172663&r1=1172662&r2=1172663&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/CHANGES.txt (original)
+++ cassandra/branches/cassandra-1.0.0/CHANGES.txt Mon Sep 19 15:31:35 2011
@@ -3,6 +3,9 @@
* Kill server on wrapped OOME such as from FileChannel.map (CASSANDRA-3201)
* remove unnecessary copy when adding to row cache (CASSANDRA-3223)
* Log message when a full repair operation completes (CASSANDRA-3207)
+ * Fix streamOutSession keeping sstables references forever if the remote end
+ dies (CASSANDRA-3216)
+
1.0.0-beta1
* removed binarymemtable (CASSANDRA-2692)
Modified:
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/streaming/StreamOutSession.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/streaming/StreamOutSession.java?rev=1172663&r1=1172662&r2=1172663&view=diff
==============================================================================
---
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/streaming/StreamOutSession.java
(original)
+++
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/streaming/StreamOutSession.java
Mon Sep 19 15:31:35 2011
@@ -22,11 +22,14 @@ import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.*;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.Pair;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
@@ -34,7 +37,7 @@ import org.cliffc.high_scale_lib.NonBloc
/**
* This class manages the streaming of multiple files one after the other.
*/
-public class StreamOutSession
+public class StreamOutSession implements IEndpointStateChangeSubscriber,
IFailureDetectionEventListener
{
private static final Logger logger = LoggerFactory.getLogger(
StreamOutSession.class );
@@ -70,12 +73,15 @@ public class StreamOutSession
private final Pair<InetAddress, Long> context;
private final Runnable callback;
private volatile String currentFile;
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
private StreamOutSession(String table, Pair<InetAddress, Long> context,
Runnable callback)
{
this.table = table;
this.context = context;
this.callback = callback;
+ Gossiper.instance.register(this);
+ FailureDetector.instance.registerFailureDetectionEventListener(this);
}
public InetAddress getHost()
@@ -123,11 +129,30 @@ public class StreamOutSession
public void close()
{
- // Release reference on last file
+ close(true);
+ }
+
+ private void close(boolean success)
+ {
+ // Though unlikely, it is possible for close to be called multiple
+ // time, if the endpoint die at the exact wrong time for instance.
+ if (!isClosed.compareAndSet(false, true))
+ {
+ logger.debug("StreamOutSession {} already closed", getSessionId());
+ return;
+ }
+
+ Gossiper.instance.unregister(this);
+ FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+
+ // Release reference on last file (or any uncompleted ones)
for (PendingFile file : files.values())
file.sstable.releaseReference();
streams.remove(context);
- if (callback != null)
+ // Instead of just not calling the callback on failure, we could have
+ // allow to register a specific callback for failures, but we leave
+ // that to a future ticket (likely CASSANDRA-3112)
+ if (callback != null && success)
callback.run();
}
@@ -179,4 +204,32 @@ public class StreamOutSession
logger.debug("Files are {}", StringUtils.join(files.values(), ","));
MessagingService.instance().stream(header, getHost());
}
+
+ public void onJoin(InetAddress endpoint, EndpointState epState) {}
+ public void onChange(InetAddress endpoint, ApplicationState state,
VersionedValue value) {}
+ public void onAlive(InetAddress endpoint, EndpointState state) {}
+ public void onDead(InetAddress endpoint, EndpointState state) {}
+
+ public void onRemove(InetAddress endpoint)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void onRestart(InetAddress endpoint, EndpointState epState)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void convict(InetAddress endpoint, double phi)
+ {
+ if (!endpoint.equals(getHost()))
+ return;
+
+ // We want a higher confidence in the failure detection than usual
because failing a streaming wrongly has a high cost.
+ if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+ return;
+
+ logger.error("StreamOutSession {} failed because {} died or was
restarted/removed", endpoint);
+ close(false);
+ }
}