Add max_streaming_retries instead of relying solely on the FD. Patch by brandonwilliams and yukim, reviewed by brandonwilliams for CASSANDRA-4051
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3ee8682e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3ee8682e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3ee8682e Branch: refs/heads/trunk Commit: 3ee8682eb8e32f04979f44984911cf3547ead206 Parents: e3c4c09 Author: Brandon Williams <[email protected]> Authored: Tue Mar 27 15:00:41 2012 -0500 Committer: Brandon Williams <[email protected]> Committed: Tue Mar 27 15:00:41 2012 -0500 ---------------------------------------------------------------------- src/java/org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 7 +- .../org/apache/cassandra/dht/RangeStreamer.java | 7 +- .../apache/cassandra/io/sstable/SSTableLoader.java | 6 +- .../apache/cassandra/service/StorageService.java | 17 ++- .../cassandra/streaming/AbstractStreamSession.java | 112 +++++++++++++++ .../cassandra/streaming/IStreamCallback.java | 36 +++++ .../org/apache/cassandra/streaming/StreamIn.java | 4 +- .../cassandra/streaming/StreamInSession.java | 68 +++++---- .../org/apache/cassandra/streaming/StreamOut.java | 2 +- .../cassandra/streaming/StreamOutSession.java | 88 ++---------- .../apache/cassandra/streaming/StreamReply.java | 1 + .../streaming/StreamReplyVerbHandler.java | 5 +- .../cassandra/streaming/StreamingRepairTask.java | 22 ++-- 14 files changed, 244 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index d875584..1131721 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -88,6 +88,8 @@ public class Config public Integer compaction_throughput_mb_per_sec = 16; public Boolean multithreaded_compaction = false; + public Integer max_streaming_retries = 3; + public Integer stream_throughput_outbound_megabits_per_sec; public String[] data_file_directories; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 439c958..60b4724 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -648,11 +648,16 @@ public class DatabaseDescriptor return System.getProperty("cassandra.replace_token", null); } - public static String getClusterName() + public static String getClusterName() { return conf.cluster_name; } + public static int getMaxStreamingRetries() + { + return conf.max_streaming_retries; + } + public static String getJobJarLocation() { return conf.job_jar_file_location; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index 47931f8..dac05cf 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import org.apache.cassandra.streaming.IStreamCallback; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -219,15 +220,17 @@ public class RangeStreamer final InetAddress source = entry.getValue().getKey(); Collection<Range<Token>> ranges = entry.getValue().getValue(); /* Send messages to respective folks to stream data over to me */ - Runnable callback = new Runnable() + IStreamCallback callback = new IStreamCallback() { - public void run() + public void onSuccess() { latch.countDown(); if (logger.isDebugEnabled()) logger.debug(String.format("Removed %s/%s as a %s source; remaining is %s", source, table, opType, latch.getCount())); } + + public void onFailure() {} }; if (logger.isDebugEnabled()) logger.debug("" + opType + "ing from " + source + " ranges " + StringUtils.join(ranges, ", ")); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 1ee7a2f..85b5146 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -206,7 +206,7 @@ public class SSTableLoader return builder.toString(); } - private class CountDownCallback implements Runnable + private class CountDownCallback implements IStreamCallback { private final InetAddress endpoint; private final CountDownLatch latch; @@ -217,7 +217,7 @@ public class SSTableLoader this.endpoint = endpoint; } - public void run() + public void onSuccess() { latch.countDown(); outputHandler.debug(String.format("Streaming session to %s completed (waiting on %d outstanding sessions)", endpoint, latch.getCount())); @@ -226,6 +226,8 @@ public class SSTableLoader if (latch.getCount() == 0) client.stop(); } + + public void onFailure() {} } public interface OutputHandler http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 986fd81..84c0096 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1490,9 +1490,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe { final InetAddress source = entry.getKey(); Collection<Range<Token>> ranges = entry.getValue(); - final Runnable callback = new Runnable() + final IStreamCallback callback = new IStreamCallback() { - public void run() + public void onSuccess() { synchronized (fetchSources) { @@ -1501,6 +1501,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe sendReplicationNotification(myAddress, notifyEndpoint); } } + + public void onFailure() {} }; if (logger_.isDebugEnabled()) logger_.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", ")); @@ -2799,9 +2801,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe final Range<Token> range = endPointEntry.getKey(); final InetAddress newEndpoint = endPointEntry.getValue(); - final Runnable callback = new Runnable() + final IStreamCallback callback = new IStreamCallback() { - public void run() + public void onSuccess() { synchronized (pending) { @@ -2811,6 +2813,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe latch.countDown(); } } + public void onFailure() {} }; StageManager.getStage(Stage.STREAM).execute(new Runnable() @@ -2852,15 +2855,17 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe { Collection<Range<Token>> toFetch = endpointWithRanges.get(source); - final Runnable callback = new Runnable() + final IStreamCallback callback = new IStreamCallback() { - public void run() + public void onSuccess() { pending.remove(source); if (pending.isEmpty()) latch.countDown(); } + + public void onFailure() {} }; if (logger_.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java new file mode 100644 index 0000000..1938e3d --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/AbstractStreamSession.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.streaming; + +import java.net.InetAddress; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.gms.*; +import org.apache.cassandra.utils.Pair; + +public abstract class AbstractStreamSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener +{ + private static final Logger logger = LoggerFactory.getLogger(AbstractStreamSession.class); + + protected String table; + protected Pair<InetAddress, Long> context; + protected final IStreamCallback callback; + private final AtomicBoolean isClosed = new AtomicBoolean(false); + + protected AbstractStreamSession(String table, Pair<InetAddress, Long> context, IStreamCallback callback) + { + this.table = table; + this.context = context; + this.callback = callback; + Gossiper.instance.register(this); + FailureDetector.instance.registerFailureDetectionEventListener(this); + } + + public long getSessionId() + { + return context.right; + } + + public InetAddress getHost() + { + return context.left; + } + + public void close(boolean success) + { + if (!isClosed.compareAndSet(false, true)) + { + logger.debug("Stream session {} already closed", getSessionId()); + return; + } + + closeInternal(success); + + Gossiper.instance.unregister(this); + FailureDetector.instance.unregisterFailureDetectionEventListener(this); + + logger.debug("closing with status " + success); + if (callback != null) + { + if (success) + callback.onSuccess(); + else + callback.onFailure(); + } + } + + protected abstract void closeInternal(boolean success); + + public void onJoin(InetAddress endpoint, EndpointState epState) {} + public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {} + public void onAlive(InetAddress endpoint, EndpointState state) {} + public void onDead(InetAddress endpoint, EndpointState state) {} + + public void onRemove(InetAddress endpoint) + { + convict(endpoint, Double.MAX_VALUE); + } + + public void onRestart(InetAddress endpoint, EndpointState epState) + { + convict(endpoint, Double.MAX_VALUE); + } + + public void convict(InetAddress endpoint, double phi) + { + if (!endpoint.equals(getHost())) + return; + + // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost. + if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold()) + return; + + logger.error("Stream failed because {} died or was restarted/removed (streams may still be active " + + "in background, but further streams won't be started)", endpoint); + close(false); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/IStreamCallback.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/IStreamCallback.java b/src/java/org/apache/cassandra/streaming/IStreamCallback.java new file mode 100644 index 0000000..f0d7754 --- /dev/null +++ b/src/java/org/apache/cassandra/streaming/IStreamCallback.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.streaming; + +/** + * Callback interface for streaming session success/failure. + */ +public interface IStreamCallback +{ + /** + * called when stream session is finished successfully. + */ + public void onSuccess(); + + /** + * called when streaming somehow got in trouble. + */ + public void onFailure(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/StreamIn.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamIn.java b/src/java/org/apache/cassandra/streaming/StreamIn.java index ce6f7ef..0621086 100644 --- a/src/java/org/apache/cassandra/streaming/StreamIn.java +++ b/src/java/org/apache/cassandra/streaming/StreamIn.java @@ -48,7 +48,7 @@ public class StreamIn private static Logger logger = LoggerFactory.getLogger(StreamIn.class); /** Request ranges for all column families in the given keyspace. */ - public static void requestRanges(InetAddress source, String tableName, Collection<Range<Token>> ranges, Runnable callback, OperationType type) + public static void requestRanges(InetAddress source, String tableName, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type) { requestRanges(source, tableName, Table.open(tableName).getColumnFamilyStores(), ranges, callback, type); } @@ -56,7 +56,7 @@ public class StreamIn /** * Request ranges to be transferred from specific CFs */ - public static void requestRanges(InetAddress source, String tableName, Collection<ColumnFamilyStore> columnFamilies, Collection<Range<Token>> ranges, Runnable callback, OperationType type) + public static void requestRanges(InetAddress source, String tableName, Collection<ColumnFamilyStore> columnFamilies, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type) { assert ranges.size() > 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/StreamInSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamInSession.java b/src/java/org/apache/cassandra/streaming/StreamInSession.java index 2e25436..e662a49 100644 --- a/src/java/org/apache/cassandra/streaming/StreamInSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java @@ -25,42 +25,40 @@ import java.net.Socket; import java.util.*; import java.util.concurrent.ConcurrentMap; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.net.MessagingService; +import org.cliffc.high_scale_lib.NonBlockingHashMap; +import org.cliffc.high_scale_lib.NonBlockingHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Table; -import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.*; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.OutboundTcpConnection; import org.apache.cassandra.utils.Pair; -import org.cliffc.high_scale_lib.NonBlockingHashMap; -import org.cliffc.high_scale_lib.NonBlockingHashSet; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** each context gets its own StreamInSession. So there may be >1 Session per host */ -public class StreamInSession +public class StreamInSession extends AbstractStreamSession { private static final Logger logger = LoggerFactory.getLogger(StreamInSession.class); private static ConcurrentMap<Pair<InetAddress, Long>, StreamInSession> sessions = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamInSession>(); private final Set<PendingFile> files = new NonBlockingHashSet<PendingFile>(); - private final Pair<InetAddress, Long> context; - private final Runnable callback; - private String table; private final List<SSTableReader> readers = new ArrayList<SSTableReader>(); private PendingFile current; private Socket socket; + private volatile int retries; - private StreamInSession(Pair<InetAddress, Long> context, Runnable callback) + private StreamInSession(Pair<InetAddress, Long> context, IStreamCallback callback) { - this.context = context; - this.callback = callback; + super(null, context, callback); } - public static StreamInSession create(InetAddress host, Runnable callback) + public static StreamInSession create(InetAddress host, IStreamCallback callback) { Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, System.nanoTime()); StreamInSession session = new StreamInSession(context, callback); @@ -76,9 +74,7 @@ public class StreamInSession { StreamInSession possibleNew = new StreamInSession(context, null); if ((session = sessions.putIfAbsent(context, possibleNew)) == null) - { session = possibleNew; - } } return session; } @@ -126,8 +122,16 @@ public class StreamInSession public void retry(PendingFile remoteFile) throws IOException { + retries++; + if (retries > DatabaseDescriptor.getMaxStreamingRetries()) + { + logger.error(String.format("Failed streaming session %d from %s while receiving %s", getSessionId(), getHost().toString(), current), + new IllegalStateException("Too many retries for " + remoteFile)); + closeInternal(false); + return; + } StreamReply reply = new StreamReply(remoteFile.getFilename(), getSessionId(), StreamReply.Status.FILE_RETRY); - logger.info("Streaming of file {} from {} failed: requesting a retry.", remoteFile, this); + logger.info("Streaming of file {} for {} failed: requesting a retry.", remoteFile, this); sendMessage(reply.getMessage(Gossiper.instance.getVersion(getHost()))); } @@ -136,7 +140,6 @@ public class StreamInSession OutboundTcpConnection.write(message, String.valueOf(getSessionId()), new DataOutputStream(socket.getOutputStream())); } - public void closeIfFinished() throws IOException { if (files.isEmpty()) @@ -151,7 +154,7 @@ public class StreamInSession // Acquire the reference (for secondary index building) before submitting the index build, // so it can't get compacted out of existence in between if (!sstable.acquireReference()) - throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transfered"); + throw new AssertionError("We shouldn't fail acquiring a reference on a sstable that has just been transferred"); ColumnFamilyStore cfs = Table.open(sstable.getTableName()).getColumnFamilyStore(sstable.getColumnFamilyName()); cfs.addSSTable(sstable); @@ -189,20 +192,25 @@ public class StreamInSession socket.close(); } - if (callback != null) - callback.run(); - sessions.remove(context); + close(true); } } - public long getSessionId() + protected void closeInternal(boolean success) { - return context.right; - } - - public InetAddress getHost() - { - return context.left; + sessions.remove(context); + if (!success && FailureDetector.instance.isAlive(getHost())) + { + try + { + StreamReply reply = new StreamReply("", getSessionId(), StreamReply.Status.SESSION_FAILURE); + MessagingService.instance().sendOneWay(reply.getMessage(Gossiper.instance.getVersion(getHost())), getHost()); + } + catch (IOException ex) + { + logger.error("Error sending streaming session failure notification to " + getHost(), ex); + } + } } /** query method to determine which hosts are streaming to this node. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/StreamOut.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamOut.java b/src/java/org/apache/cassandra/streaming/StreamOut.java index 710c9ed..d3f37b5 100644 --- a/src/java/org/apache/cassandra/streaming/StreamOut.java +++ b/src/java/org/apache/cassandra/streaming/StreamOut.java @@ -81,7 +81,7 @@ public class StreamOut /** * Stream the given ranges to the target endpoint from each CF in the given keyspace. */ - public static void transferRanges(InetAddress target, Table table, Collection<Range<Token>> ranges, Runnable callback, OperationType type) + public static void transferRanges(InetAddress target, Table table, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type) { StreamOutSession session = StreamOutSession.create(table.name, target, callback); transferRanges(session, table.getColumnFamilyStores(), ranges, type); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/StreamOutSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamOutSession.java b/src/java/org/apache/cassandra/streaming/StreamOutSession.java index 80629b8..7a53c7c 100644 --- a/src/java/org/apache/cassandra/streaming/StreamOutSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamOutSession.java @@ -22,29 +22,27 @@ import java.io.IOException; import java.net.InetAddress; import java.util.*; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang.StringUtils; +import org.cliffc.high_scale_lib.NonBlockingHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.gms.*; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.utils.Pair; -import org.cliffc.high_scale_lib.NonBlockingHashMap; /** * This class manages the streaming of multiple files one after the other. -*/ -public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener + */ +public class StreamOutSession extends AbstractStreamSession { - private static final Logger logger = LoggerFactory.getLogger( StreamOutSession.class ); + private static final Logger logger = LoggerFactory.getLogger(StreamOutSession.class); // one host may have multiple stream sessions. private static final ConcurrentMap<Pair<InetAddress, Long>, StreamOutSession> streams = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamOutSession>(); - public static StreamOutSession create(String table, InetAddress host, Runnable callback) + public static StreamOutSession create(String table, InetAddress host, IStreamCallback callback) { return create(table, host, System.nanoTime(), callback); } @@ -54,7 +52,7 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailur return create(table, host, sessionId, null); } - public static StreamOutSession create(String table, InetAddress host, long sessionId, Runnable callback) + public static StreamOutSession create(String table, InetAddress host, long sessionId, IStreamCallback callback) { Pair<InetAddress, Long> context = new Pair<InetAddress, Long>(host, sessionId); StreamOutSession session = new StreamOutSession(table, context, callback); @@ -69,29 +67,11 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailur private final Map<String, PendingFile> files = new NonBlockingHashMap<String, PendingFile>(); - public final String table; - private final Pair<InetAddress, Long> context; - private final Runnable callback; private volatile String currentFile; - private final AtomicBoolean isClosed = new AtomicBoolean(false); - - private StreamOutSession(String table, Pair<InetAddress, Long> context, Runnable callback) - { - this.table = table; - this.context = context; - this.callback = callback; - Gossiper.instance.register(this); - FailureDetector.instance.registerFailureDetectionEventListener(this); - } - - public InetAddress getHost() - { - return context.left; - } - public long getSessionId() + private StreamOutSession(String table, Pair<InetAddress, Long> context, IStreamCallback callback) { - return context.right; + super(table, context, callback); } public void addFilesToStream(List<PendingFile> pendingFiles) @@ -127,33 +107,12 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailur streamFile(iter.next()); } - public void close() + protected void closeInternal(boolean success) { - close(true); - } - - private void close(boolean success) - { - // Though unlikely, it is possible for close to be called multiple - // time, if the endpoint die at the exact wrong time for instance. - if (!isClosed.compareAndSet(false, true)) - { - logger.debug("StreamOutSession {} already closed", getSessionId()); - return; - } - - Gossiper.instance.unregister(this); - FailureDetector.instance.unregisterFailureDetectionEventListener(this); - // Release reference on last file (or any uncompleted ones) for (PendingFile file : files.values()) file.sstable.releaseReference(); streams.remove(context); - // Instead of just not calling the callback on failure, we could have - // allow to register a specific callback for failures, but we leave - // that to a future ticket (likely CASSANDRA-3112) - if (callback != null && success) - callback.run(); } /** convenience method for use when testing */ @@ -204,33 +163,4 @@ public class StreamOutSession implements IEndpointStateChangeSubscriber, IFailur logger.debug("Files are {}", StringUtils.join(files.values(), ",")); MessagingService.instance().stream(header, getHost()); } - - public void onJoin(InetAddress endpoint, EndpointState epState) {} - public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {} - public void onAlive(InetAddress endpoint, EndpointState state) {} - public void onDead(InetAddress endpoint, EndpointState state) {} - - public void onRemove(InetAddress endpoint) - { - convict(endpoint, Double.MAX_VALUE); - } - - public void onRestart(InetAddress endpoint, EndpointState epState) - { - convict(endpoint, Double.MAX_VALUE); - } - - public void convict(InetAddress endpoint, double phi) - { - if (!endpoint.equals(getHost())) - return; - - // We want a higher confidence in the failure detection than usual because failing a streaming wrongly has a high cost. - if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold()) - return; - - logger.error("StreamOutSession {} failed because {} died or was restarted/removed (streams may still be active " - + "in background, but further streams won't be started)", endpoint); - close(false); - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/StreamReply.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReply.java b/src/java/org/apache/cassandra/streaming/StreamReply.java index f97cfee..c9b82a7 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReply.java +++ b/src/java/org/apache/cassandra/streaming/StreamReply.java @@ -37,6 +37,7 @@ class StreamReply implements MessageProducer FILE_FINISHED, FILE_RETRY, SESSION_FINISHED, + SESSION_FAILURE, } public static final IVersionedSerializer<StreamReply> serializer = new FileStatusSerializer(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java index ddc0690..e839fd5 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java +++ b/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java @@ -65,7 +65,10 @@ public class StreamReplyVerbHandler implements IVerbHandler session.retry(); break; case SESSION_FINISHED: - session.close(); + session.close(true); + break; + case SESSION_FAILURE: + session.close(false); break; default: throw new RuntimeException("Cannot handle FileStatus.Action: " + reply.action); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3ee8682e/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java index ab341ed..af9a059 100644 --- a/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java @@ -63,9 +63,9 @@ public class StreamingRepairTask implements Runnable private final String tableName; private final String cfName; private final Collection<Range<Token>> ranges; - private final Runnable callback; + private final IStreamCallback callback; - private StreamingRepairTask(UUID id, InetAddress owner, InetAddress src, InetAddress dst, String tableName, String cfName, Collection<Range<Token>> ranges, Runnable callback) + private StreamingRepairTask(UUID id, InetAddress owner, InetAddress src, InetAddress dst, String tableName, String cfName, Collection<Range<Token>> ranges, IStreamCallback callback) { this.id = id; this.owner = owner; @@ -143,14 +143,14 @@ public class StreamingRepairTask implements Runnable } } - private static Runnable makeReplyingCallback(final InetAddress taskOwner, final UUID taskId) + private static IStreamCallback makeReplyingCallback(final InetAddress taskOwner, final UUID taskId) { - return new Runnable() + return new IStreamCallback() { // we expect one callback for the receive, and one for the send private final AtomicInteger outstanding = new AtomicInteger(2); - public void run() + public void onSuccess() { if (outstanding.decrementAndGet() > 0) // waiting on more calls @@ -165,18 +165,20 @@ public class StreamingRepairTask implements Runnable throw new IOError(e); } } + + public void onFailure() {} }; } // wrap a given callback so as to unregister the streaming repair task on completion - private static Runnable wrapCallback(final Runnable callback, final UUID taskid, final boolean isLocalTask) + private static IStreamCallback wrapCallback(final Runnable callback, final UUID taskid, final boolean isLocalTask) { - return new Runnable() + return new IStreamCallback() { // we expect one callback for the receive, and one for the send private final AtomicInteger outstanding = new AtomicInteger(isLocalTask ? 2 : 1); - public void run() + public void onSuccess() { if (outstanding.decrementAndGet() > 0) // waiting on more calls @@ -186,6 +188,8 @@ public class StreamingRepairTask implements Runnable if (callback != null) callback.run(); } + + public void onFailure() {} }; } @@ -253,7 +257,7 @@ public class StreamingRepairTask implements Runnable logger.info(String.format("[streaming task #%s] task succeeded", task.id)); if (task.callback != null) - task.callback.run(); + task.callback.onSuccess(); } private static void reply(InetAddress remote, UUID taskid) throws IOException
