Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/dht/RangeStreamer.java
src/java/org/apache/cassandra/repair/StreamingRepairTask.java
src/java/org/apache/cassandra/service/StorageService.java
src/java/org/apache/cassandra/streaming/SessionInfo.java
src/java/org/apache/cassandra/streaming/StreamPlan.java
src/java/org/apache/cassandra/streaming/StreamResultFuture.java
src/java/org/apache/cassandra/streaming/StreamSession.java
src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
src/java/org/apache/cassandra/tools/NodeCmd.java
test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6cca24f4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6cca24f4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6cca24f4
Branch: refs/heads/cassandra-2.1
Commit: 6cca24f442a17d2abedfffd4a5322cb61caeef74
Parents: 42f8590 c6867c2
Author: Yuki Morishita <[email protected]>
Authored: Mon Oct 20 10:21:55 2014 -0500
Committer: Yuki Morishita <[email protected]>
Committed: Mon Oct 20 10:21:55 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/SystemKeyspace.java | 8 ++++-
.../org/apache/cassandra/dht/RangeStreamer.java | 4 ++-
.../net/OutboundTcpConnectionPool.java | 12 ++++----
.../cassandra/repair/StreamingRepairTask.java | 10 +++++--
.../cassandra/service/StorageService.java | 17 +++++++----
.../apache/cassandra/streaming/SessionInfo.java | 3 ++
.../cassandra/streaming/StreamCoordinator.java | 20 ++++++-------
.../apache/cassandra/streaming/StreamPlan.java | 31 ++++++++++++++------
.../cassandra/streaming/StreamResultFuture.java | 2 +-
.../cassandra/streaming/StreamSession.java | 24 +++++++++++----
.../management/SessionInfoCompositeData.java | 30 +++++++++++--------
.../org/apache/cassandra/tools/NodeTool.java | 8 ++++-
.../cassandra/streaming/SessionInfoTest.java | 2 +-
.../streaming/StreamTransferTaskTest.java | 4 ++-
.../streaming/StreamingTransferTest.java | 2 +-
16 files changed, 119 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 7cd5154,4ed7bed..4e5cd24
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,89 -1,4 +1,90 @@@
-2.0.11:
+2.1.1
+ * Fix IllegalArgumentException when a list of IN values containing tuples
+ is passed as a single arg to a prepared statement with the v1 or v2
+ protocol (CASSANDRA-8062)
+ * Fix ClassCastException in DISTINCT query on static columns with
+ query paging (CASSANDRA-8108)
+ * Fix NPE on null nested UDT inside a set (CASSANDRA-8105)
+ * Fix exception when querying secondary index on set items or map keys
+ when some clustering columns are specified (CASSANDRA-8073)
+ * Send proper error response when there is an error during native
+ protocol message decode (CASSANDRA-8118)
+ * Gossip should ignore generation numbers too far in the future
(CASSANDRA-8113)
+ * Fix NPE when creating a table with frozen sets, lists (CASSANDRA-8104)
+ * Fix high memory use due to tracking reads on incrementally opened sstable
+ readers (CASSANDRA-8066)
+ * Fix EXECUTE request with skipMetadata=false returning no metadata
+ (CASSANDRA-8054)
+ * Allow concurrent use of CQLBulkOutputFormat (CASSANDRA-7776)
+ * Shutdown JVM on OOM (CASSANDRA-7507)
+ * Upgrade netty version and enable epoll event loop (CASSANDRA-7761)
+ * Don't duplicate sstables smaller than split size when using
+ the sstablesplitter tool (CASSANDRA-7616)
+ * Avoid re-parsing already prepared statements (CASSANDRA-7923)
+ * Fix some Thrift slice deletions and updates of COMPACT STORAGE
+ tables with some clustering columns omitted (CASSANDRA-7990)
+ * Fix filtering for CONTAINS on sets (CASSANDRA-8033)
+ * Properly track added size (CASSANDRA-7239)
+ * Allow compilation in java 8 (CASSANDRA-7208)
+ * Fix Assertion error on RangeTombstoneList diff (CASSANDRA-8013)
+ * Release references to overlapping sstables during compaction
(CASSANDRA-7819)
+ * Send notification when opening compaction results early (CASSANDRA-8034)
+ * Make native server start block until properly bound (CASSANDRA-7885)
+ * (cqlsh) Fix IPv6 support (CASSANDRA-7988)
+ * Ignore fat clients when checking for endpoint collision (CASSANDRA-7939)
+ * Make sstablerepairedset take a list of files (CASSANDRA-7995)
+ * (cqlsh) Tab completeion for indexes on map keys (CASSANDRA-7972)
+ * (cqlsh) Fix UDT field selection in select clause (CASSANDRA-7891)
+ * Fix resource leak in event of corrupt sstable
+ * (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131)
+ * Provide visibility into prepared statements churn (CASSANDRA-7921,
CASSANDRA-7930)
+ * Invalidate prepared statements when their keyspace or table is
+ dropped (CASSANDRA-7566)
+ * cassandra-stress: fix support for NetworkTopologyStrategy (CASSANDRA-7945)
+ * Fix saving caches when a table is dropped (CASSANDRA-7784)
+ * Add better error checking of new stress profile (CASSANDRA-7716)
+ * Use ThreadLocalRandom and remove FBUtilities.threadLocalRandom
(CASSANDRA-7934)
+ * Prevent operator mistakes due to simultaneous bootstrap (CASSANDRA-7069)
+ * cassandra-stress supports whitelist mode for node config (CASSANDRA-7658)
+ * GCInspector more closely tracks GC; cassandra-stress and nodetool report
it (CASSANDRA-7916)
+ * nodetool won't output bogus ownership info without a keyspace
(CASSANDRA-7173)
+ * Add human readable option to nodetool commands (CASSANDRA-5433)
+ * Don't try to set repairedAt on old sstables (CASSANDRA-7913)
+ * Add metrics for tracking PreparedStatement use (CASSANDRA-7719)
+ * (cqlsh) tab-completion for triggers (CASSANDRA-7824)
+ * (cqlsh) Support for query paging (CASSANDRA-7514)
+ * (cqlsh) Show progress of COPY operations (CASSANDRA-7789)
+ * Add syntax to remove multiple elements from a map (CASSANDRA-6599)
+ * Support non-equals conditions in lightweight transactions (CASSANDRA-6839)
+ * Add IF [NOT] EXISTS to create/drop triggers (CASSANDRA-7606)
+ * (cqlsh) Display the current logged-in user (CASSANDRA-7785)
+ * (cqlsh) Don't ignore CTRL-C during COPY FROM execution (CASSANDRA-7815)
+ * (cqlsh) Order UDTs according to cross-type dependencies in DESCRIBE
+ output (CASSANDRA-7659)
+ * (cqlsh) Fix handling of CAS statement results (CASSANDRA-7671)
+ * (cqlsh) COPY TO/FROM improvements (CASSANDRA-7405)
+ * Support list index operations with conditions (CASSANDRA-7499)
+ * Add max live/tombstoned cells to nodetool cfstats output (CASSANDRA-7731)
+ * Validate IPv6 wildcard addresses properly (CASSANDRA-7680)
+ * (cqlsh) Error when tracing query (CASSANDRA-7613)
+ * Avoid IOOBE when building SyntaxError message snippet (CASSANDRA-7569)
+ * SSTableExport uses correct validator to create string representation of
partition
+ keys (CASSANDRA-7498)
+ * Avoid NPEs when receiving type changes for an unknown keyspace
(CASSANDRA-7689)
+ * Add support for custom 2i validation (CASSANDRA-7575)
+ * Pig support for hadoop CqlInputFormat (CASSANDRA-6454)
+ * Add listen_interface and rpc_interface options (CASSANDRA-7417)
+ * Improve schema merge performance (CASSANDRA-7444)
+ * Adjust MT depth based on # of partition validating (CASSANDRA-5263)
+ * Optimise NativeCell comparisons (CASSANDRA-6755)
+ * Configurable client timeout for cqlsh (CASSANDRA-7516)
+ * Include snippet of CQL query near syntax error in messages (CASSANDRA-7111)
+ * Make repair -pr work with -local (CASSANDRA-7450)
+ * Fix error in sstableloader with -cph > 1 (CASSANDRA-8007)
+ * Fix snapshot repair error on indexed tables (CASSANDRA-8020)
+ * Do not exit nodetool repair when receiving JMX NOTIF_LOST (CASSANDRA-7909)
++ * Stream to private IP when available (CASSANDRA-8084)
+Merged from 2.0:
* Reject conditions on DELETE unless full PK is given (CASSANDRA-6430)
* Properly reject the token function DELETE (CASSANDRA-7747)
* Force batchlog replay before decommissioning a node (CASSANDRA-7446)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index 3c647b6,30e6d47..8b16564
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -508,13 -497,19 +508,19 @@@ public class SystemKeyspac
return hostIdMap;
}
+ /**
+ * Get preferred IP for given endpoint if it is known. Otherwise this
returns given endpoint itself.
+ *
+ * @param ep endpoint address to check
+ * @return Preferred IP for given endpoint if present, otherwise returns
given ep
+ */
public static InetAddress getPreferredIP(InetAddress ep)
{
- String req = "SELECT preferred_ip FROM system.%s WHERE peer='%s'";
- UntypedResultSet result = processInternal(String.format(req,
PEERS_CF, ep.getHostAddress()));
+ String req = "SELECT preferred_ip FROM system.%s WHERE peer=?";
+ UntypedResultSet result = executeInternal(String.format(req,
PEERS_CF), ep);
if (!result.isEmpty() && result.one().has("preferred_ip"))
return result.one().getInetAddress("preferred_ip");
- return null;
+ return ep;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/dht/RangeStreamer.java
index d84a951,4e925d3..11d82d6
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@@ -31,8 -29,8 +31,9 @@@ import org.slf4j.LoggerFactory
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.gms.IFailureDetector;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
@@@ -308,8 -226,8 +310,8 @@@ public class RangeStreame
Collection<Range<Token>> ranges = entry.getValue().getValue();
/* Send messages to respective folks to stream data over to me */
if (logger.isDebugEnabled())
- logger.debug("" + description + "ing from " + source + "
ranges " + StringUtils.join(ranges, ", "));
+ logger.debug("{}ing from {} ranges {}", description, source,
StringUtils.join(ranges, ", "));
- streamPlan.requestRanges(source, keyspace, ranges);
+ streamPlan.requestRanges(source, preferred, keyspace, ranges);
}
return streamPlan.execute();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 9af949d,4226184..25ec698
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@@ -57,17 -59,15 +60,18 @@@ public class StreamingRepairTask implem
private void initiateStreaming()
{
+ long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE;
+ InetAddress dest = request.dst;
+ InetAddress preferred = SystemKeyspace.getPreferredIP(dest);
+ if (desc.parentSessionId != null &&
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) !=
null)
+ repairedAt =
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt;
-
logger.info(String.format("[streaming task #%s] Performing streaming
repair of %d ranges with %s", desc.sessionId, request.ranges.size(),
request.dst));
- StreamResultFuture op = new StreamPlan("Repair")
+ StreamResultFuture op = new StreamPlan("Repair", repairedAt, 1)
.flushBeforeTransfer(true)
// request ranges from the remote node
- .requestRanges(request.dst,
desc.keyspace, request.ranges, desc.columnFamily)
+ .requestRanges(dest, preferred,
desc.keyspace, request.ranges, desc.columnFamily)
// send ranges to the remote node
- .transferRanges(request.dst,
desc.keyspace, request.ranges, desc.columnFamily)
+ .transferRanges(dest, preferred,
desc.keyspace, request.ranges, desc.columnFamily)
.execute();
op.addEventListener(this);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 4fb0435,4973e40..849bc2c
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -1939,10 -1891,11 +1939,11 @@@ public class StorageService extends Not
for (Map.Entry<InetAddress, Collection<Range<Token>>> entry :
rangesToFetch.get(keyspaceName))
{
InetAddress source = entry.getKey();
+ InetAddress preferred = SystemKeyspace.getPreferredIP(source);
Collection<Range<Token>> ranges = entry.getValue();
if (logger.isDebugEnabled())
- logger.debug("Requesting from " + source + " ranges " +
StringUtils.join(ranges, ", "));
+ logger.debug("Requesting from {} ranges {}", source,
StringUtils.join(ranges, ", "));
- stream.requestRanges(source, keyspaceName, ranges);
+ stream.requestRanges(source, preferred, keyspaceName, ranges);
}
}
StreamResultFuture future = stream.execute();
@@@ -3173,9 -3027,10 +3175,10 @@@
// stream all hints -- range list will be a singleton of "the
entire ring"
Token token = StorageService.getPartitioner().getMinimumToken();
- List<Range<Token>> ranges = Collections.singletonList(new
Range<Token>(token, token));
+ List<Range<Token>> ranges = Collections.singletonList(new
Range<>(token, token));
return new
StreamPlan("Hints").transferRanges(hintsDestinationHost,
+ preferred,
Keyspace.SYSTEM_KS,
ranges,
SystemKeyspace.HINTS_CF)
@@@ -3375,20 -3186,19 +3378,21 @@@
// stream ranges
for (InetAddress address : endpointRanges.keySet())
{
+ logger.debug("Will stream range {} of keyspace {} to
endpoint {}", endpointRanges.get(address), keyspace, address);
- streamPlan.transferRanges(address, keyspace,
endpointRanges.get(address));
+ InetAddress preferred =
SystemKeyspace.getPreferredIP(address);
+ streamPlan.transferRanges(address, preferred,
keyspace, endpointRanges.get(address));
}
// stream requests
Multimap<InetAddress, Range<Token>> workMap =
RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints);
for (InetAddress address : workMap.keySet())
{
+ logger.debug("Will request range {} of keyspace {}
from endpoint {}", workMap.get(address), keyspace, address);
- streamPlan.requestRanges(address, keyspace,
workMap.get(address));
+ InetAddress preferred =
SystemKeyspace.getPreferredIP(address);
+ streamPlan.requestRanges(address, preferred,
keyspace, workMap.get(address));
}
- if (logger.isDebugEnabled())
- logger.debug("Keyspace {}: work map {}.", keyspace,
workMap);
+ logger.debug("Keyspace {}: work map {}.", keyspace,
workMap);
}
}
}
@@@ -3863,16 -3649,17 +3867,17 @@@
StreamPlan streamPlan = new StreamPlan("Unbootstrap");
for (Map.Entry<String, Map<InetAddress, List<Range<Token>>>> entry :
sessionsToStreamByKeyspace.entrySet())
{
- final String keyspaceName = entry.getKey();
- final Map<InetAddress, List<Range<Token>>> rangesPerEndpoint =
entry.getValue();
+ String keyspaceName = entry.getKey();
+ Map<InetAddress, List<Range<Token>>> rangesPerEndpoint =
entry.getValue();
- for (final Map.Entry<InetAddress, List<Range<Token>>> rangesEntry
: rangesPerEndpoint.entrySet())
+ for (Map.Entry<InetAddress, List<Range<Token>>> rangesEntry :
rangesPerEndpoint.entrySet())
{
- final List<Range<Token>> ranges = rangesEntry.getValue();
- final InetAddress newEndpoint = rangesEntry.getKey();
- final InetAddress preferred =
SystemKeyspace.getPreferredIP(newEndpoint);
+ List<Range<Token>> ranges = rangesEntry.getValue();
+ InetAddress newEndpoint = rangesEntry.getKey();
++ InetAddress preferred =
SystemKeyspace.getPreferredIP(newEndpoint);
// TODO each call to transferRanges re-flushes, this is
potentially a lot of waste
- streamPlan.transferRanges(newEndpoint, keyspaceName, ranges);
+ streamPlan.transferRanges(newEndpoint, preferred,
keyspaceName, ranges);
}
}
return streamPlan.execute();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/SessionInfo.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/SessionInfo.java
index 98e945b,4f80461..3bcb20c
--- a/src/java/org/apache/cassandra/streaming/SessionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@@ -33,7 -33,7 +33,8 @@@ import com.google.common.collect.Iterab
public final class SessionInfo implements Serializable
{
public final InetAddress peer;
+ public final int sessionIndex;
+ public final InetAddress connecting;
/** Immutable collection of receiving summaries */
public final Collection<StreamSummary> receivingSummaries;
/** Immutable collection of sending summaries*/
@@@ -45,13 -45,13 +46,15 @@@
private final Map<String, ProgressInfo> sendingFiles;
public SessionInfo(InetAddress peer,
+ int sessionIndex,
+ InetAddress connecting,
Collection<StreamSummary> receivingSummaries,
Collection<StreamSummary> sendingSummaries,
StreamSession.State state)
{
this.peer = peer;
+ this.sessionIndex = sessionIndex;
+ this.connecting = connecting;
this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries);
this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries);
this.receivingFiles = new ConcurrentHashMap<>();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 48192b4,0000000..a0c99fe
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@@ -1,289 -1,0 +1,289 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.streaming;
+
+import java.net.InetAddress;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * {@link StreamCoordinator} is a helper class that abstracts away
maintaining multiple
+ * StreamSession and ProgressInfo instances per peer.
+ *
+ * This class coordinates multiple SessionStreams per peer in both the
outgoing StreamPlan context and on the
+ * inbound StreamResultFuture context.
+ */
+public class StreamCoordinator
+{
+ private static final Logger logger =
LoggerFactory.getLogger(StreamCoordinator.class);
+
+ // Executor strictly for establishing the initial connections. Once we're
connected to the other end the rest of the
+ // streaming is handled directly by the ConnectionHandler's incoming and
outgoing threads.
+ private static final DebuggableThreadPoolExecutor streamExecutor =
DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
+
FBUtilities.getAvailableProcessors());
+
+ private Map<InetAddress, HostStreamingData> peerSessions = new
HashMap<>();
+ private final int connectionsPerHost;
+ private StreamConnectionFactory factory;
+
+ public StreamCoordinator(int connectionsPerHost, StreamConnectionFactory
factory)
+ {
+ this.connectionsPerHost = connectionsPerHost;
+ this.factory = factory;
+ }
+
+ public void setConnectionFactory(StreamConnectionFactory factory)
+ {
+ this.factory = factory;
+ }
+
+ /**
+ * @return true if any stream session is active
+ */
+ public synchronized boolean hasActiveSessions()
+ {
+ for (HostStreamingData data : peerSessions.values())
+ {
+ if (data.hasActiveSessions())
+ return true;
+ }
+ return false;
+ }
+
+ public synchronized Collection<StreamSession> getAllStreamSessions()
+ {
+ Collection<StreamSession> results = new ArrayList<>();
+ for (HostStreamingData data : peerSessions.values())
+ {
+ results.addAll(data.getAllStreamSessions());
+ }
+ return results;
+ }
+
+ public boolean isReceiving()
+ {
+ return connectionsPerHost == 0;
+ }
+
+ public void connectAllStreamSessions()
+ {
+ for (HostStreamingData data : peerSessions.values())
+ data.connectAllStreamSessions();
+ }
+
+ public synchronized Set<InetAddress> getPeers()
+ {
+ return new HashSet<>(peerSessions.keySet());
+ }
+
- public synchronized StreamSession getOrCreateNextSession(InetAddress peer)
++ public synchronized StreamSession getOrCreateNextSession(InetAddress
peer, InetAddress connecting)
+ {
- return getOrCreateHostData(peer).getOrCreateNextSession(peer);
++ return getOrCreateHostData(peer).getOrCreateNextSession(peer,
connecting);
+ }
+
- public synchronized StreamSession getOrCreateSessionById(InetAddress
peer, int id)
++ public synchronized StreamSession getOrCreateSessionById(InetAddress
peer, int id, InetAddress connecting)
+ {
- return getOrCreateHostData(peer).getOrCreateSessionById(peer, id);
++ return getOrCreateHostData(peer).getOrCreateSessionById(peer, id,
connecting);
+ }
+
+ public synchronized void updateProgress(ProgressInfo info)
+ {
+ getHostData(info.peer).updateProgress(info);
+ }
+
+ public synchronized void addSessionInfo(SessionInfo session)
+ {
+ HostStreamingData data = getOrCreateHostData(session.peer);
+ data.addSessionInfo(session);
+ }
+
+ public synchronized Set<SessionInfo> getAllSessionInfo()
+ {
+ Set<SessionInfo> result = new HashSet<>();
+ for (HostStreamingData data : peerSessions.values())
+ {
+ result.addAll(data.getAllSessionInfo());
+ }
+ return result;
+ }
+
+ public synchronized void transferFiles(InetAddress to,
Collection<StreamSession.SSTableStreamingSections> sstableDetails)
+ {
+ HostStreamingData sessionList = getOrCreateHostData(to);
+
+ if (connectionsPerHost > 1)
+ {
+ List<List<StreamSession.SSTableStreamingSections>> buckets =
sliceSSTableDetails(sstableDetails);
+
+ for (List<StreamSession.SSTableStreamingSections> subList :
buckets)
+ {
- StreamSession session =
sessionList.getOrCreateNextSession(to);
++ StreamSession session =
sessionList.getOrCreateNextSession(to, to);
+ session.addTransferFiles(subList);
+ }
+ }
+ else
+ {
- StreamSession session = sessionList.getOrCreateNextSession(to);
++ StreamSession session = sessionList.getOrCreateNextSession(to,
to);
+ session.addTransferFiles(sstableDetails);
+ }
+ }
+
+ private List<List<StreamSession.SSTableStreamingSections>>
sliceSSTableDetails(Collection<StreamSession.SSTableStreamingSections>
sstableDetails)
+ {
+ // There's no point in divvying things up into more buckets than we
have sstableDetails
+ int targetSlices = Math.min(sstableDetails.size(),
connectionsPerHost);
+ int step = Math.round((float) sstableDetails.size() / (float)
targetSlices);
+ int index = 0;
+
+ List<List<StreamSession.SSTableStreamingSections>> result = new
ArrayList<>();
+ List<StreamSession.SSTableStreamingSections> slice = null;
+ Iterator<StreamSession.SSTableStreamingSections> iter =
sstableDetails.iterator();
+ while (iter.hasNext())
+ {
+ StreamSession.SSTableStreamingSections streamSession =
iter.next();
+
+ if (index % step == 0)
+ {
+ slice = new ArrayList<>();
+ result.add(slice);
+ }
+ slice.add(streamSession);
+ ++index;
+ iter.remove();
+ }
+
+ return result;
+ }
+
+ private HostStreamingData getHostData(InetAddress peer)
+ {
+ HostStreamingData data = peerSessions.get(peer);
+ if (data == null)
+ throw new IllegalArgumentException("Unknown peer requested: " +
peer.toString());
+ return data;
+ }
+
+ private HostStreamingData getOrCreateHostData(InetAddress peer)
+ {
+ HostStreamingData data = peerSessions.get(peer);
+ if (data == null)
+ {
+ data = new HostStreamingData();
+ peerSessions.put(peer, data);
+ }
+ return data;
+ }
+
+ private static class StreamSessionConnector implements Runnable
+ {
+ private final StreamSession session;
+ public StreamSessionConnector(StreamSession session)
+ {
+ this.session = session;
+ }
+
+ @Override
+ public void run()
+ {
+ session.start();
+ logger.info("[Stream #{}, ID#{}] Beginning stream session with
{}", session.planId(), session.sessionIndex(), session.peer);
+ }
+ }
+
+ private class HostStreamingData
+ {
+ private Map<Integer, StreamSession> streamSessions = new HashMap<>();
+ private Map<Integer, SessionInfo> sessionInfos = new HashMap<>();
+
+ private int lastReturned = -1;
+
+ public boolean hasActiveSessions()
+ {
+ for (StreamSession session : streamSessions.values())
+ {
+ StreamSession.State state = session.state();
+ if (state != StreamSession.State.COMPLETE && state !=
StreamSession.State.FAILED)
+ return true;
+ }
+ return false;
+ }
+
- public StreamSession getOrCreateNextSession(InetAddress peer)
++ public StreamSession getOrCreateNextSession(InetAddress peer,
InetAddress connecting)
+ {
+ // create
+ if (streamSessions.size() < connectionsPerHost)
+ {
- StreamSession session = new StreamSession(peer, factory,
streamSessions.size());
++ StreamSession session = new StreamSession(peer, connecting,
factory, streamSessions.size());
+ streamSessions.put(++lastReturned, session);
+ return session;
+ }
+ // get
+ else
+ {
+ if (lastReturned >= streamSessions.size() - 1)
+ lastReturned = 0;
+
+ return streamSessions.get(lastReturned++);
+ }
+ }
+
+ public void connectAllStreamSessions()
+ {
+ for (StreamSession session : streamSessions.values())
+ {
+ streamExecutor.execute(new StreamSessionConnector(session));
+ }
+ }
+
+ public Collection<StreamSession> getAllStreamSessions()
+ {
+ return
Collections.unmodifiableCollection(streamSessions.values());
+ }
+
- public StreamSession getOrCreateSessionById(InetAddress peer, int id)
++ public StreamSession getOrCreateSessionById(InetAddress peer, int id,
InetAddress connecting)
+ {
+ StreamSession session = streamSessions.get(id);
+ if (session == null)
+ {
- session = new StreamSession(peer, factory, id);
++ session = new StreamSession(peer, connecting, factory, id);
+ streamSessions.put(id, session);
+ }
+ return session;
+ }
+
+ public void updateProgress(ProgressInfo info)
+ {
+ sessionInfos.get(info.sessionIndex).updateProgress(info);
+ }
+
+ public void addSessionInfo(SessionInfo info)
+ {
+ sessionInfos.put(info.sessionIndex, info);
+ }
+
+ public Collection<SessionInfo> getAllSessionInfo()
+ {
+ return sessionInfos.values();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamPlan.java
index f7b6203,326bf48..1bb0ce5
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@@ -81,10 -76,10 +83,10 @@@ public class StreamPla
* @param columnFamilies specific column families
* @return this object for chaining
*/
- public StreamPlan requestRanges(InetAddress from, String keyspace,
Collection<Range<Token>> ranges, String... columnFamilies)
+ public StreamPlan requestRanges(InetAddress from, InetAddress connecting,
String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
{
- StreamSession session = coordinator.getOrCreateNextSession(from);
- StreamSession session = getOrCreateSession(from, connecting);
- session.addStreamRequest(keyspace, ranges,
Arrays.asList(columnFamilies));
++ StreamSession session = coordinator.getOrCreateNextSession(from,
connecting);
+ session.addStreamRequest(keyspace, ranges,
Arrays.asList(columnFamilies), repairedAt);
return this;
}
@@@ -110,10 -117,10 +124,10 @@@
* @param columnFamilies specific column families
* @return this object for chaining
*/
- public StreamPlan transferRanges(InetAddress to, String keyspace,
Collection<Range<Token>> ranges, String... columnFamilies)
+ public StreamPlan transferRanges(InetAddress to, InetAddress connecting,
String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
{
- StreamSession session = coordinator.getOrCreateNextSession(to);
- StreamSession session = getOrCreateSession(to, connecting);
- session.addTransferRanges(keyspace, ranges,
Arrays.asList(columnFamilies), flushBeforeTransfer);
++ StreamSession session = coordinator.getOrCreateNextSession(to,
connecting);
+ session.addTransferRanges(keyspace, ranges,
Arrays.asList(columnFamilies), flushBeforeTransfer, repairedAt);
return this;
}
@@@ -182,5 -189,16 +196,4 @@@
this.flushBeforeTransfer = flushBeforeTransfer;
return this;
}
--
- private StreamSession getOrCreateSession(InetAddress peer, InetAddress
preferred)
- {
- StreamSession session = sessions.get(peer);
- if (session == null)
- {
- session = new StreamSession(peer, preferred, connectionFactory);
- sessions.put(peer, session);
- }
- return session;
- }
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index f28a937,bde5934..6a6f2b9
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@@ -124,10 -130,11 +124,10 @@@ public final class StreamResultFuture e
return future;
}
- public void attachSocket(InetAddress from, Socket socket, boolean
isForOutgoing, int version) throws IOException
+ private void attachSocket(InetAddress from, int sessionIndex, Socket
socket, boolean isForOutgoing, int version) throws IOException
{
- StreamSession session = coordinator.getOrCreateSessionById(from,
sessionIndex);
- StreamSession session = ongoingSessions.get(from);
- if (session == null)
- throw new RuntimeException(String.format("Got connection from %s
for stream session %s but no such session locally", from, planId));
++ StreamSession session = coordinator.getOrCreateSessionById(from,
sessionIndex, socket.getInetAddress());
+ session.init(this);
session.handler.initiateOnReceivingSide(socket, isForOutgoing,
version);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 2efa00d,db0c484..3ba296c
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -109,11 -109,23 +109,19 @@@ import org.apache.cassandra.utils.Pair
* session is done is is closed (closeSession()). Otherwise, the node
switch to the WAIT_COMPLETE state and
* send a CompleteMessage to the other side.
*/
-public class StreamSession implements IEndpointStateChangeSubscriber,
IFailureDetectionEventListener
+public class StreamSession implements IEndpointStateChangeSubscriber
{
private static final Logger logger =
LoggerFactory.getLogger(StreamSession.class);
+
- // Executor that establish the streaming connection. Once we're connected
to the other end, the rest of the streaming
- // is directly handled by the ConnectionHandler incoming and outgoing
threads.
- private static final DebuggableThreadPoolExecutor streamExecutor =
DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
-
FBUtilities.getAvailableProcessors());
-
+ /**
+ * Streaming endpoint.
+ *
+ * Each {@code StreamSession} is identified by this InetAddress which is
broadcast address of the node streaming.
+ */
public final InetAddress peer;
+ private final int index;
+ /** Actual connecting address. Can be the same as {@linkplain #peer}. */
+ public final InetAddress connecting;
// should not be null when session is started
private StreamResultFuture streamResult;
@@@ -151,15 -163,16 +159,17 @@@
* Create new streaming session with the peer.
*
* @param peer Address of streaming peer
+ * @param connecting Actual connecting address
* @param factory is used for establishing connection
*/
- public StreamSession(InetAddress peer, StreamConnectionFactory factory,
int index)
- public StreamSession(InetAddress peer, InetAddress connecting,
StreamConnectionFactory factory)
++ public StreamSession(InetAddress peer, InetAddress connecting,
StreamConnectionFactory factory, int index)
{
this.peer = peer;
+ this.connecting = connecting;
+ this.index = index;
this.factory = factory;
this.handler = new ConnectionHandler(this);
- this.metrics = StreamingMetrics.get(peer);
+ this.metrics = StreamingMetrics.get(connecting);
}
public UUID planId()
@@@ -197,16 -209,24 +207,18 @@@
return;
}
- streamExecutor.execute(new Runnable()
+ try
{
- logger.info("[Stream #{}, ID#{}] Beginning stream session with
{}", planId(), sessionIndex(), peer);
- public void run()
- {
- try
- {
- logger.info("[Stream #{}] Starting streaming to {}{}",
planId(),
-
peer,
-
peer.equals(connecting) ? "" : " through " + connecting);
- handler.initiate();
- onInitializationComplete();
- }
- catch (IOException e)
- {
- onError(e);
- }
- }
- });
++ logger.info("[Stream #{}] Starting streaming to {}{}", planId(),
++ peer,
++
peer.equals(connecting) ? "" : " through " + connecting);
+ handler.initiate();
+ onInitializationComplete();
+ }
+ catch (Exception e)
+ {
+ onError(e);
+ }
}
public Socket createConnection() throws IOException
@@@ -595,7 -604,7 +607,7 @@@
List<StreamSummary> transferSummaries = Lists.newArrayList();
for (StreamTask transfer : transfers.values())
transferSummaries.add(transfer.getSummary());
- return new SessionInfo(peer, index, receivingSummaries,
transferSummaries, state);
- return new SessionInfo(peer, connecting, receivingSummaries,
transferSummaries, state);
++ return new SessionInfo(peer, index, connecting, receivingSummaries,
transferSummaries, state);
}
public synchronized void taskCompleted(StreamReceiveTask completedTask)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6cca24f4/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
----------------------------------------------------------------------
diff --cc
src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
index bef6682,809bc0d..63e4ab7
---
a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
+++
b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
@@@ -40,10 -41,10 +41,11 @@@ public class SessionInfoCompositeDat
"sendingSummaries",
"state",
"receivingFiles",
- "sendingFiles"};
+ "sendingFiles",
+ "sessionIndex"};
private static final String[] ITEM_DESCS = new String[]{"Plan ID",
"Session peer",
+ "Connecting
address",
"Summaries of
receiving data",
"Summaries of
sending data",
"Current session
state",
@@@ -98,9 -99,8 +102,9 @@@
return ProgressInfoCompositeData.toCompositeData(planId,
input);
}
};
- valueMap.put(ITEM_NAMES[5],
toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo));
- valueMap.put(ITEM_NAMES[6],
toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo));
- valueMap.put(ITEM_NAMES[7], sessionInfo.sessionIndex);
+ valueMap.put(ITEM_NAMES[6],
toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo));
+ valueMap.put(ITEM_NAMES[7],
toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo));
++ valueMap.put(ITEM_NAMES[8], sessionInfo.sessionIndex);
try
{
return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
@@@ -133,10 -134,10 +138,11 @@@
}
};
SessionInfo info = new SessionInfo(peer,
- (int)values[7],
-
fromArrayOfCompositeData((CompositeData[]) values[2], toStreamSummary),
++ (int)values[8],
+ connecting,
fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
-
StreamSession.State.valueOf((String) values[4]));
+
fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary),
+
StreamSession.State.valueOf((String) values[5]));
Function<CompositeData, ProgressInfo> toProgressInfo = new
Function<CompositeData, ProgressInfo>()
{
public ProgressInfo apply(CompositeData input)