Updated Branches: refs/heads/cassandra-1.1 8ac731efa -> 42a0a46a8 refs/heads/trunk b651778c4 -> e30febf25
Merge branch 'cassandra-1.1' into trunk Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e30febf2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e30febf2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e30febf2 Branch: refs/heads/trunk Commit: e30febf258a2de3394aed6f78db412aa7a7c89b9 Parents: b651778 42a0a46 Author: Brandon Williams <[email protected]> Authored: Tue Mar 27 15:04:12 2012 -0500 Committer: Brandon Williams <[email protected]> Committed: Tue Mar 27 15:04:12 2012 -0500 ---------------------------------------------------------------------- src/java/org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 7 +- .../org/apache/cassandra/dht/RangeStreamer.java | 7 +- .../apache/cassandra/io/sstable/SSTableLoader.java | 6 +- .../apache/cassandra/service/StorageService.java | 17 ++- .../cassandra/streaming/AbstractStreamSession.java | 112 +++++++++++++++ .../cassandra/streaming/IStreamCallback.java | 36 +++++ .../org/apache/cassandra/streaming/StreamIn.java | 4 +- .../cassandra/streaming/StreamInSession.java | 68 +++++---- .../org/apache/cassandra/streaming/StreamOut.java | 2 +- .../cassandra/streaming/StreamOutSession.java | 88 ++---------- .../apache/cassandra/streaming/StreamReply.java | 1 + .../streaming/StreamReplyVerbHandler.java | 5 +- .../cassandra/streaming/StreamingRepairTask.java | 22 ++-- 14 files changed, 244 insertions(+), 133 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e30febf2/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e30febf2/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e30febf2/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e30febf2/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e30febf2/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index 1eb98f0,d3105a9..01bb7d0 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -1503,9 -1504,11 +1503,11 @@@ public class StorageService implements sendReplicationNotification(myAddress, notifyEndpoint); } } + + public void onFailure() {} }; - if (logger_.isDebugEnabled()) - logger_.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", ")); + if (logger.isDebugEnabled()) + logger.debug("Requesting from " + source + " ranges " + StringUtils.join(ranges, ", ")); StreamIn.requestRanges(source, table, ranges, callback, OperationType.RESTORE_REPLICA_COUNT); } } @@@ -2863,10 -2867,12 +2866,12 @@@ if (pending.isEmpty()) latch.countDown(); } + + public void onFailure() {} }; - if (logger_.isDebugEnabled()) - logger_.debug("Requesting from " + source + " ranges " + StringUtils.join(toFetch, ", ")); + if (logger.isDebugEnabled()) + logger.debug("Requesting from " + source + " ranges " + StringUtils.join(toFetch, ", ")); // sending actual request StreamIn.requestRanges(source, table, toFetch, callback, OperationType.BOOTSTRAP); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e30febf2/src/java/org/apache/cassandra/streaming/StreamIn.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamIn.java index b46f894,0621086..6b77714 --- a/src/java/org/apache/cassandra/streaming/StreamIn.java +++ b/src/java/org/apache/cassandra/streaming/StreamIn.java @@@ -41,10 -45,10 +41,10 @@@ import org.apache.cassandra.utils.FBUti */ public class StreamIn { - private static Logger logger = LoggerFactory.getLogger(StreamIn.class); + private static final Logger logger = LoggerFactory.getLogger(StreamIn.class); /** Request ranges for all column families in the given keyspace. */ - public static void requestRanges(InetAddress source, String tableName, Collection<Range<Token>> ranges, Runnable callback, OperationType type) + public static void requestRanges(InetAddress source, String tableName, Collection<Range<Token>> ranges, IStreamCallback callback, OperationType type) { requestRanges(source, tableName, Table.open(tableName).getColumnFamilyStores(), ranges, callback, type); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e30febf2/src/java/org/apache/cassandra/streaming/StreamInSession.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamInSession.java index ee75fa6,e662a49..254a44b --- a/src/java/org/apache/cassandra/streaming/StreamInSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamInSession.java @@@ -43,12 -45,9 +44,9 @@@ public class StreamInSession extends Ab { private static final Logger logger = LoggerFactory.getLogger(StreamInSession.class); - private static ConcurrentMap<Pair<InetAddress, Long>, StreamInSession> sessions = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamInSession>(); + private static final ConcurrentMap<Pair<InetAddress, Long>, StreamInSession> sessions = new NonBlockingHashMap<Pair<InetAddress, Long>, StreamInSession>(); private final Set<PendingFile> files = new NonBlockingHashSet<PendingFile>(); - private final Pair<InetAddress, Long> context; - private final Runnable callback; - private String table; private final List<SSTableReader> readers = new ArrayList<SSTableReader>(); private PendingFile current; private Socket socket; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e30febf2/src/java/org/apache/cassandra/streaming/StreamOut.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e30febf2/src/java/org/apache/cassandra/streaming/StreamOutSession.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e30febf2/src/java/org/apache/cassandra/streaming/StreamReply.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e30febf2/src/java/org/apache/cassandra/streaming/StreamReplyVerbHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e30febf2/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java ----------------------------------------------------------------------
