stream to private IP when available

patch by yukim; reviewed by Josh McKenzie for CASSANDRA-8084


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c6867c2c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c6867c2c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c6867c2c

Branch: refs/heads/trunk
Commit: c6867c2c25e1a220abef24e54a86eeb64dab28c5
Parents: 29a8b88
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Oct 20 09:25:42 2014 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Oct 20 09:25:42 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/SystemKeyspace.java |  8 ++++-
 .../org/apache/cassandra/dht/RangeStreamer.java |  4 ++-
 .../net/OutboundTcpConnectionPool.java          | 12 +++----
 .../cassandra/repair/StreamingRepairTask.java   |  9 +++--
 .../cassandra/service/StorageService.java       | 23 +++++++++----
 .../apache/cassandra/streaming/SessionInfo.java |  3 ++
 .../apache/cassandra/streaming/StreamPlan.java  | 36 ++++++++++++++------
 .../cassandra/streaming/StreamResultFuture.java |  2 +-
 .../cassandra/streaming/StreamSession.java      | 23 ++++++++++---
 .../management/SessionInfoCompositeData.java    | 26 ++++++++------
 .../org/apache/cassandra/tools/NodeCmd.java     |  8 ++++-
 .../cassandra/streaming/SessionInfoTest.java    |  2 +-
 .../streaming/StreamTransferTaskTest.java       |  4 ++-
 .../streaming/StreamingTransferTest.java        |  2 +-
 15 files changed, 115 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 544cf9a..4ed7bed 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -56,6 +56,7 @@
  * Fix possible infinite loop in creating repair range (CASSANDRA-7983)
  * Fix unit in nodetool for streaming throughput (CASSANDRA-7375)
  * Do not exit nodetool repair when receiving JMX NOTIF_LOST (CASSANDRA-7909)
+ * Stream to private IP when available (CASSANDRA-8084)
 Merged from 1.2:
  * Don't index tombstones (CASSANDRA-7828)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 5b77f63..30e6d47 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -497,13 +497,19 @@ public class SystemKeyspace
         return hostIdMap;
     }
 
+    /**
+     * Get preferred IP for given endpoint if it is known. Otherwise this 
returns given endpoint itself.
+     *
+     * @param ep endpoint address to check
+     * @return Preferred IP for given endpoint if present, otherwise returns 
given ep
+     */
     public static InetAddress getPreferredIP(InetAddress ep)
     {
         String req = "SELECT preferred_ip FROM system.%s WHERE peer='%s'";
         UntypedResultSet result = processInternal(String.format(req, PEERS_CF, 
ep.getHostAddress()));
         if (!result.isEmpty() && result.one().has("preferred_ip"))
             return result.one().getInetAddress("preferred_ip");
-        return null;
+        return ep;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java 
b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index 1e6d9b8..4e925d3 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.IFailureDetector;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
@@ -221,11 +222,12 @@ public class RangeStreamer
         {
             String keyspace = entry.getKey();
             InetAddress source = entry.getValue().getKey();
+            InetAddress preferred = SystemKeyspace.getPreferredIP(source);
             Collection<Range<Token>> ranges = entry.getValue().getValue();
             /* Send messages to respective folks to stream data over to me */
             if (logger.isDebugEnabled())
                 logger.debug("" + description + "ing from " + source + " 
ranges " + StringUtils.join(ranges, ", "));
-            streamPlan.requestRanges(source, keyspace, ranges);
+            streamPlan.requestRanges(source, preferred, keyspace, ranges);
         }
 
         return streamPlan.execute();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java 
b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
index c45fc53..66a0362 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnectionPool.java
@@ -41,14 +41,14 @@ public class OutboundTcpConnectionPool
     private final CountDownLatch started;
     public final OutboundTcpConnection cmdCon;
     public final OutboundTcpConnection ackCon;
-    // pointer to the reseted Address.
-    private InetAddress resetedEndpoint;
+    // pointer to the reset Address.
+    private InetAddress resetEndpoint;
     private ConnectionMetrics metrics;
 
     OutboundTcpConnectionPool(InetAddress remoteEp)
     {
         id = remoteEp;
-        resetedEndpoint = SystemKeyspace.getPreferredIP(remoteEp);
+        resetEndpoint = SystemKeyspace.getPreferredIP(remoteEp);
         started = new CountDownLatch(1);
 
         cmdCon = new OutboundTcpConnection(this);
@@ -90,13 +90,13 @@ public class OutboundTcpConnectionPool
     public void reset(InetAddress remoteEP)
     {
         SystemKeyspace.updatePreferredIP(id, remoteEP);
-        resetedEndpoint = remoteEP;
+        resetEndpoint = remoteEP;
         for (OutboundTcpConnection conn : new OutboundTcpConnection[] { 
cmdCon, ackCon })
             conn.softCloseSocket();
 
         // release previous metrics and create new one with reset address
         metrics.release();
-        metrics = new ConnectionMetrics(resetedEndpoint, this);
+        metrics = new ConnectionMetrics(resetEndpoint, this);
     }
 
     public long getTimeouts()
@@ -142,7 +142,7 @@ public class OutboundTcpConnectionPool
     {
         if (id.equals(FBUtilities.getBroadcastAddress()))
             return FBUtilities.getLocalAddress();
-        return resetedEndpoint == null ? id : resetedEndpoint;
+        return resetEndpoint;
     }
 
     public static boolean isEncryptedChannel(InetAddress address)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java 
b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index f7203a4..4226184 100644
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@ -17,9 +17,12 @@
  */
 package org.apache.cassandra.repair;
 
+import java.net.InetAddress;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.SyncComplete;
 import org.apache.cassandra.repair.messages.SyncRequest;
@@ -56,13 +59,15 @@ public class StreamingRepairTask implements Runnable, 
StreamEventHandler
 
     private void initiateStreaming()
     {
+        InetAddress dest = request.dst;
+        InetAddress preferred = SystemKeyspace.getPreferredIP(dest);
         logger.info(String.format("[streaming task #%s] Performing streaming 
repair of %d ranges with %s", desc.sessionId, request.ranges.size(), 
request.dst));
         StreamResultFuture op = new StreamPlan("Repair")
                                     .flushBeforeTransfer(true)
                                     // request ranges from the remote node
-                                    .requestRanges(request.dst, desc.keyspace, 
request.ranges, desc.columnFamily)
+                                    .requestRanges(dest, preferred, 
desc.keyspace, request.ranges, desc.columnFamily)
                                     // send ranges to the remote node
-                                    .transferRanges(request.dst, 
desc.keyspace, request.ranges, desc.columnFamily)
+                                    .transferRanges(dest, preferred, 
desc.keyspace, request.ranges, desc.columnFamily)
                                     .execute();
         op.addEventListener(this);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 56056ab..4973e40 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1890,11 +1890,12 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         {
             for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : 
rangesToFetch.get(keyspaceName))
             {
-                final InetAddress source = entry.getKey();
+                InetAddress source = entry.getKey();
+                InetAddress preferred = SystemKeyspace.getPreferredIP(source);
                 Collection<Range<Token>> ranges = entry.getValue();
                 if (logger.isDebugEnabled())
                     logger.debug("Requesting from " + source + " ranges " + 
StringUtils.join(ranges, ", "));
-                stream.requestRanges(source, keyspaceName, ranges);
+                stream.requestRanges(source, preferred, keyspaceName, ranges);
             }
         }
         StreamResultFuture future = stream.execute();
@@ -3022,12 +3023,14 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             // stream to the closest peer as chosen by the snitch
             
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(),
 candidates);
             InetAddress hintsDestinationHost = candidates.get(0);
+            InetAddress preferred = 
SystemKeyspace.getPreferredIP(hintsDestinationHost);
 
             // stream all hints -- range list will be a singleton of "the 
entire ring"
             Token token = StorageService.getPartitioner().getMinimumToken();
             List<Range<Token>> ranges = Collections.singletonList(new 
Range<Token>(token, token));
 
             return new StreamPlan("Hints").transferRanges(hintsDestinationHost,
+                                                          preferred,
                                                                       
Keyspace.SYSTEM_KS,
                                                                       ranges,
                                                                       
SystemKeyspace.HINTS_CF)
@@ -3182,15 +3185,20 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
                     // stream ranges
                     for (InetAddress address : endpointRanges.keySet())
-                        streamPlan.transferRanges(address, keyspace, 
endpointRanges.get(address));
+                    {
+                        InetAddress preferred = 
SystemKeyspace.getPreferredIP(address);
+                        streamPlan.transferRanges(address, preferred, 
keyspace, endpointRanges.get(address));
+                    }
 
                     // stream requests
                     Multimap<InetAddress, Range<Token>> workMap = 
RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints);
                     for (InetAddress address : workMap.keySet())
-                        streamPlan.requestRanges(address, keyspace, 
workMap.get(address));
+                    {
+                        InetAddress preferred = 
SystemKeyspace.getPreferredIP(address);
+                        streamPlan.requestRanges(address, preferred, keyspace, 
workMap.get(address));
+                    }
 
-                    if (logger.isDebugEnabled())
-                        logger.debug("Keyspace {}: work map {}.", keyspace, 
workMap);
+                    logger.debug("Keyspace {}: work map {}.", keyspace, 
workMap);
                 }
             }
         }
@@ -3648,9 +3656,10 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             {
                 final List<Range<Token>> ranges = rangesEntry.getValue();
                 final InetAddress newEndpoint = rangesEntry.getKey();
+                final InetAddress preferred = 
SystemKeyspace.getPreferredIP(newEndpoint);
 
                 // TODO each call to transferRanges re-flushes, this is 
potentially a lot of waste
-                streamPlan.transferRanges(newEndpoint, keyspaceName, ranges);
+                streamPlan.transferRanges(newEndpoint, preferred, 
keyspaceName, ranges);
             }
         }
         return streamPlan.execute();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/streaming/SessionInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/SessionInfo.java 
b/src/java/org/apache/cassandra/streaming/SessionInfo.java
index b722ecf..4f80461 100644
--- a/src/java/org/apache/cassandra/streaming/SessionInfo.java
+++ b/src/java/org/apache/cassandra/streaming/SessionInfo.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Iterables;
 public final class SessionInfo implements Serializable
 {
     public final InetAddress peer;
+    public final InetAddress connecting;
     /** Immutable collection of receiving summaries */
     public final Collection<StreamSummary> receivingSummaries;
     /** Immutable collection of sending summaries*/
@@ -44,11 +45,13 @@ public final class SessionInfo implements Serializable
     private final Map<String, ProgressInfo> sendingFiles;
 
     public SessionInfo(InetAddress peer,
+                       InetAddress connecting,
                        Collection<StreamSummary> receivingSummaries,
                        Collection<StreamSummary> sendingSummaries,
                        StreamSession.State state)
     {
         this.peer = peer;
+        this.connecting = connecting;
         this.receivingSummaries = ImmutableSet.copyOf(receivingSummaries);
         this.sendingSummaries = ImmutableSet.copyOf(sendingSummaries);
         this.receivingFiles = new ConcurrentHashMap<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java 
b/src/java/org/apache/cassandra/streaming/StreamPlan.java
index e582c79..326bf48 100644
--- a/src/java/org/apache/cassandra/streaming/StreamPlan.java
+++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java
@@ -56,56 +56,70 @@ public class StreamPlan
      * Request data in {@code keyspace} and {@code ranges} from specific node.
      *
      * @param from endpoint address to fetch data from.
+     * @param connecting Actual connecting address for the endpoint
      * @param keyspace name of keyspace
      * @param ranges ranges to fetch
      * @return this object for chaining
      */
-    public StreamPlan requestRanges(InetAddress from, String keyspace, 
Collection<Range<Token>> ranges)
+    public StreamPlan requestRanges(InetAddress from, InetAddress connecting, 
String keyspace, Collection<Range<Token>> ranges)
     {
-        return requestRanges(from, keyspace, ranges, new String[0]);
+        return requestRanges(from, connecting, keyspace, ranges, new 
String[0]);
     }
 
     /**
      * Request data in {@code columnFamilies} under {@code keyspace} and 
{@code ranges} from specific node.
      *
      * @param from endpoint address to fetch data from.
+     * @param connecting Actual connecting address for the endpoint
      * @param keyspace name of keyspace
      * @param ranges ranges to fetch
      * @param columnFamilies specific column families
      * @return this object for chaining
      */
-    public StreamPlan requestRanges(InetAddress from, String keyspace, 
Collection<Range<Token>> ranges, String... columnFamilies)
+    public StreamPlan requestRanges(InetAddress from, InetAddress connecting, 
String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
     {
-        StreamSession session = getOrCreateSession(from);
+        StreamSession session = getOrCreateSession(from, connecting);
         session.addStreamRequest(keyspace, ranges, 
Arrays.asList(columnFamilies));
         return this;
     }
 
     /**
+     * Add transfer task to send data of specific {@code columnFamilies} under 
{@code keyspace} and {@code ranges}.
+     *
+     * @see #transferRanges(java.net.InetAddress, java.net.InetAddress, 
String, java.util.Collection, String...)
+     */
+    public StreamPlan transferRanges(InetAddress to, String keyspace, 
Collection<Range<Token>> ranges, String... columnFamilies)
+    {
+        return transferRanges(to, to, keyspace, ranges, columnFamilies);
+    }
+
+    /**
      * Add transfer task to send data of specific keyspace and ranges.
      *
      * @param to endpoint address of receiver
+     * @param connecting Actual connecting address of the endpoint
      * @param keyspace name of keyspace
      * @param ranges ranges to send
      * @return this object for chaining
      */
-    public StreamPlan transferRanges(InetAddress to, String keyspace, 
Collection<Range<Token>> ranges)
+    public StreamPlan transferRanges(InetAddress to, InetAddress connecting, 
String keyspace, Collection<Range<Token>> ranges)
     {
-        return transferRanges(to, keyspace, ranges, new String[0]);
+        return transferRanges(to, connecting, keyspace, ranges, new String[0]);
     }
 
     /**
      * Add transfer task to send data of specific {@code columnFamilies} under 
{@code keyspace} and {@code ranges}.
      *
      * @param to endpoint address of receiver
+     * @param connecting Actual connecting address of the endpoint
      * @param keyspace name of keyspace
      * @param ranges ranges to send
      * @param columnFamilies specific column families
      * @return this object for chaining
      */
-    public StreamPlan transferRanges(InetAddress to, String keyspace, 
Collection<Range<Token>> ranges, String... columnFamilies)
+    public StreamPlan transferRanges(InetAddress to, InetAddress connecting, 
String keyspace, Collection<Range<Token>> ranges, String... columnFamilies)
     {
-        StreamSession session = getOrCreateSession(to);
+        StreamSession session = getOrCreateSession(to, connecting);
         session.addTransferRanges(keyspace, ranges, 
Arrays.asList(columnFamilies), flushBeforeTransfer);
         return this;
     }
@@ -120,7 +134,7 @@ public class StreamPlan
      */
     public StreamPlan transferFiles(InetAddress to, 
Collection<StreamSession.SSTableStreamingSections> sstableDetails)
     {
-        StreamSession session = getOrCreateSession(to);
+        StreamSession session = getOrCreateSession(to, to);
         session.addTransferFiles(sstableDetails);
         return this;
     }
@@ -176,12 +190,12 @@ public class StreamPlan
         return this;
     }
 
-    private StreamSession getOrCreateSession(InetAddress peer)
+    private StreamSession getOrCreateSession(InetAddress peer, InetAddress 
preferred)
     {
         StreamSession session = sessions.get(peer);
         if (session == null)
         {
-            session = new StreamSession(peer, connectionFactory);
+            session = new StreamSession(peer, preferred, connectionFactory);
             sessions.put(peer, session);
         }
         return session;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java 
b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
index add14f7..bde5934 100644
--- a/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
+++ b/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
@@ -106,7 +106,7 @@ public final class StreamResultFuture extends 
AbstractFuture<StreamState>
         StreamResultFuture future = 
StreamManager.instance.getReceivingStream(planId);
         if (future == null)
         {
-            final StreamSession session = new StreamSession(from, null);
+            final StreamSession session = new StreamSession(from, 
socket.getInetAddress(), null);
 
             // The main reason we create a StreamResultFuture on the receiving 
side is for JMX exposure.
             future = new StreamResultFuture(planId, description, 
Collections.singleton(session));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java 
b/src/java/org/apache/cassandra/streaming/StreamSession.java
index 4fcbe36..db0c484 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -70,7 +70,7 @@ import org.apache.cassandra.utils.Pair;
  *
  *   (a) This phase is started when the initiator onInitializationComplete() 
method is called. This method sends a
  *       PrepareMessage that includes what files/sections this node will 
stream to the follower
- *       (stored in a StreamTranferTask, each column family has it's own 
transfer task) and what
+ *       (stored in a StreamTransferTask, each column family has it's own 
transfer task) and what
  *       the follower needs to stream back (StreamReceiveTask, same as above). 
If the initiator has
  *       nothing to receive from the follower, it goes directly to its 
Streaming phase. Otherwise,
  *       it waits for the follower PrepareMessage.
@@ -117,7 +117,15 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber, IFailureDe
     // is directly handled by the ConnectionHandler incoming and outgoing 
threads.
     private static final DebuggableThreadPoolExecutor streamExecutor = 
DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher",
                                                                                
                                             
FBUtilities.getAvailableProcessors());
+
+    /**
+     * Streaming endpoint.
+     *
+     * Each {@code StreamSession} is identified by this InetAddress which is 
broadcast address of the node streaming.
+     */
     public final InetAddress peer;
+    /** Actual connecting address. Can be the same as {@linkplain #peer}. */
+    public final InetAddress connecting;
 
     // should not be null when session is started
     private StreamResultFuture streamResult;
@@ -155,14 +163,16 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber, IFailureDe
      * Create new streaming session with the peer.
      *
      * @param peer Address of streaming peer
+     * @param connecting Actual connecting address
      * @param factory is used for establishing connection
      */
-    public StreamSession(InetAddress peer, StreamConnectionFactory factory)
+    public StreamSession(InetAddress peer, InetAddress connecting, 
StreamConnectionFactory factory)
     {
         this.peer = peer;
+        this.connecting = connecting;
         this.factory = factory;
         this.handler = new ConnectionHandler(this);
-        this.metrics = StreamingMetrics.get(peer);
+        this.metrics = StreamingMetrics.get(connecting);
     }
 
     public UUID planId()
@@ -205,6 +215,9 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber, IFailureDe
             {
                 try
                 {
+                    logger.info("[Stream #{}] Starting streaming to {}{}", 
planId(),
+                                                                           
peer,
+                                                                           
peer.equals(connecting) ? "" : " through " + connecting);
                     handler.initiate();
                     onInitializationComplete();
                 }
@@ -219,7 +232,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber, IFailureDe
     public Socket createConnection() throws IOException
     {
         assert factory != null;
-        return factory.createConnection(peer);
+        return factory.createConnection(connecting);
     }
 
     /**
@@ -591,7 +604,7 @@ public class StreamSession implements 
IEndpointStateChangeSubscriber, IFailureDe
         List<StreamSummary> transferSummaries = Lists.newArrayList();
         for (StreamTask transfer : transfers.values())
             transferSummaries.add(transfer.getSummary());
-        return new SessionInfo(peer, receivingSummaries, transferSummaries, 
state);
+        return new SessionInfo(peer, connecting, receivingSummaries, 
transferSummaries, state);
     }
 
     public synchronized void taskCompleted(StreamReceiveTask completedTask)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
 
b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
index 658facf..809bc0d 100644
--- 
a/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
+++ 
b/src/java/org/apache/cassandra/streaming/management/SessionInfoCompositeData.java
@@ -36,6 +36,7 @@ public class SessionInfoCompositeData
 {
     private static final String[] ITEM_NAMES = new String[]{"planId",
                                                             "peer",
+                                                            "connecting",
                                                             
"receivingSummaries",
                                                             "sendingSummaries",
                                                             "state",
@@ -43,6 +44,7 @@ public class SessionInfoCompositeData
                                                             "sendingFiles"};
     private static final String[] ITEM_DESCS = new String[]{"Plan ID",
                                                             "Session peer",
+                                                            "Connecting 
address",
                                                             "Summaries of 
receiving data",
                                                             "Summaries of 
sending data",
                                                             "Current session 
state",
@@ -56,6 +58,7 @@ public class SessionInfoCompositeData
         {
             ITEM_TYPES = new OpenType[]{SimpleType.STRING,
                                         SimpleType.STRING,
+                                        SimpleType.STRING,
                                         
ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
                                         
ArrayType.getArrayType(StreamSummaryCompositeData.COMPOSITE_TYPE),
                                         SimpleType.STRING,
@@ -78,6 +81,7 @@ public class SessionInfoCompositeData
         Map<String, Object> valueMap = new HashMap<>();
         valueMap.put(ITEM_NAMES[0], planId.toString());
         valueMap.put(ITEM_NAMES[1], sessionInfo.peer.getHostAddress());
+        valueMap.put(ITEM_NAMES[2], sessionInfo.connecting.getHostAddress());
         Function<StreamSummary, CompositeData> fromStreamSummary = new 
Function<StreamSummary, CompositeData>()
         {
             public CompositeData apply(StreamSummary input)
@@ -85,9 +89,9 @@ public class SessionInfoCompositeData
                 return StreamSummaryCompositeData.toCompositeData(input);
             }
         };
-        valueMap.put(ITEM_NAMES[2], 
toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary));
-        valueMap.put(ITEM_NAMES[3], 
toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary));
-        valueMap.put(ITEM_NAMES[4], sessionInfo.state.name());
+        valueMap.put(ITEM_NAMES[3], 
toArrayOfCompositeData(sessionInfo.receivingSummaries, fromStreamSummary));
+        valueMap.put(ITEM_NAMES[4], 
toArrayOfCompositeData(sessionInfo.sendingSummaries, fromStreamSummary));
+        valueMap.put(ITEM_NAMES[5], sessionInfo.state.name());
         Function<ProgressInfo, CompositeData> fromProgressInfo = new 
Function<ProgressInfo, CompositeData>()
         {
             public CompositeData apply(ProgressInfo input)
@@ -95,8 +99,8 @@ public class SessionInfoCompositeData
                 return ProgressInfoCompositeData.toCompositeData(planId, 
input);
             }
         };
-        valueMap.put(ITEM_NAMES[5], 
toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo));
-        valueMap.put(ITEM_NAMES[6], 
toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo));
+        valueMap.put(ITEM_NAMES[6], 
toArrayOfCompositeData(sessionInfo.getReceivingFiles(), fromProgressInfo));
+        valueMap.put(ITEM_NAMES[7], 
toArrayOfCompositeData(sessionInfo.getSendingFiles(), fromProgressInfo));
         try
         {
             return new CompositeDataSupport(COMPOSITE_TYPE, valueMap);
@@ -112,10 +116,11 @@ public class SessionInfoCompositeData
         assert cd.getCompositeType().equals(COMPOSITE_TYPE);
 
         Object[] values = cd.getAll(ITEM_NAMES);
-        InetAddress peer;
+        InetAddress peer, connecting;
         try
         {
             peer = InetAddress.getByName((String) values[1]);
+            connecting = InetAddress.getByName((String) values[2]);
         }
         catch (UnknownHostException e)
         {
@@ -129,9 +134,10 @@ public class SessionInfoCompositeData
             }
         };
         SessionInfo info = new SessionInfo(peer,
-                                           
fromArrayOfCompositeData((CompositeData[]) values[2], toStreamSummary),
+                                           connecting,
                                            
fromArrayOfCompositeData((CompositeData[]) values[3], toStreamSummary),
-                                           
StreamSession.State.valueOf((String) values[4]));
+                                           
fromArrayOfCompositeData((CompositeData[]) values[4], toStreamSummary),
+                                           
StreamSession.State.valueOf((String) values[5]));
         Function<CompositeData, ProgressInfo> toProgressInfo = new 
Function<CompositeData, ProgressInfo>()
         {
             public ProgressInfo apply(CompositeData input)
@@ -139,11 +145,11 @@ public class SessionInfoCompositeData
                 return ProgressInfoCompositeData.fromCompositeData(input);
             }
         };
-        for (ProgressInfo progress : 
fromArrayOfCompositeData((CompositeData[]) values[5], toProgressInfo))
+        for (ProgressInfo progress : 
fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo))
         {
             info.updateProgress(progress);
         }
-        for (ProgressInfo progress : 
fromArrayOfCompositeData((CompositeData[]) values[6], toProgressInfo))
+        for (ProgressInfo progress : 
fromArrayOfCompositeData((CompositeData[]) values[7], toProgressInfo))
         {
             info.updateProgress(progress);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java 
b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 27b50a7..d9f3607 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -706,7 +706,13 @@ public class NodeCmd
             outs.printf("%s %s%n", status.description, 
status.planId.toString());
             for (SessionInfo info : status.sessions)
             {
-                outs.printf("    %s%n", info.peer.toString());
+                outs.printf("    %s", info.peer.toString());
+                // print private IP when it is used
+                if (!info.peer.equals(info.connecting))
+                {
+                    outs.printf(" (using %s)", info.connecting.toString());
+                }
+                outs.printf("%n");
                 if (!info.receivingSummaries.isEmpty())
                 {
                     outs.printf("        Receiving %d files, %d bytes 
total%n", info.getTotalFilesToReceive(), info.getTotalSizeToReceive());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java 
b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
index 60fbf40..c8b6254 100644
--- a/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
+++ b/test/unit/org/apache/cassandra/streaming/SessionInfoTest.java
@@ -46,7 +46,7 @@ public class SessionInfoTest
         }
 
         StreamSummary sending = new StreamSummary(cfId, 10, 100);
-        SessionInfo info = new SessionInfo(local, summaries, 
Collections.singleton(sending), StreamSession.State.PREPARING);
+        SessionInfo info = new SessionInfo(local, local, summaries, 
Collections.singleton(sending), StreamSession.State.PREPARING);
 
         assert info.getTotalFilesToReceive() == 45;
         assert info.getTotalFilesToSend() == 10;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index ce0f9d0..b51f75b 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.streaming;
 
+import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ScheduledFuture;
@@ -43,7 +44,8 @@ public class StreamTransferTaskTest extends SchemaLoader
         String ks = "Keyspace1";
         String cf = "Standard1";
 
-        StreamSession session = new 
StreamSession(FBUtilities.getBroadcastAddress(), null);
+        InetAddress peer = FBUtilities.getBroadcastAddress();
+        StreamSession session = new StreamSession(peer, peer, null);
         ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
 
         // create two sstables

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6867c2c/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java 
b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index 4cd578d..d2047fc 100644
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@ -107,7 +107,7 @@ public class StreamingTransferTest extends SchemaLoader
         ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), 
p.getMinimumToken()));
 
         StreamResultFuture futureResult = new 
StreamPlan("StreamingTransferTest")
-                                                  .requestRanges(LOCAL, 
"Keyspace2", ranges)
+                                                  .requestRanges(LOCAL, LOCAL, 
"Keyspace2", ranges)
                                                   .execute();
 
         UUID planId = futureResult.planId;

Reply via email to