Merge branch 'cassandra-2.1' into trunk

Conflicts:
        src/java/org/apache/cassandra/repair/StreamingRepairTask.java
        src/java/org/apache/cassandra/streaming/StreamCoordinator.java
        src/java/org/apache/cassandra/streaming/StreamSession.java
        test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
        test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c6de8829
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c6de8829
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c6de8829

Branch: refs/heads/trunk
Commit: c6de882968bc3b4d77a59fd9b1df0e99a6a07636
Parents: b6b08f2 6cca24f
Author: Yuki Morishita <[email protected]>
Authored: Mon Oct 20 10:45:40 2014 -0500
Committer: Yuki Morishita <[email protected]>
Committed: Mon Oct 20 10:45:40 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/db/SystemKeyspace.java |  8 ++++-
 .../org/apache/cassandra/dht/RangeStreamer.java |  4 ++-
 .../net/OutboundTcpConnectionPool.java          | 12 ++++----
 .../apache/cassandra/repair/LocalSyncTask.java  |  6 ++--
 .../cassandra/repair/StreamingRepairTask.java   |  9 ++++--
 .../cassandra/service/StorageService.java       | 17 +++++++----
 .../apache/cassandra/streaming/SessionInfo.java |  3 ++
 .../cassandra/streaming/StreamCoordinator.java  | 20 ++++++-------
 .../apache/cassandra/streaming/StreamPlan.java  | 31 ++++++++++++++------
 .../cassandra/streaming/StreamResultFuture.java |  2 +-
 .../cassandra/streaming/StreamSession.java      | 24 +++++++++++----
 .../management/SessionInfoCompositeData.java    | 30 +++++++++++--------
 .../org/apache/cassandra/tools/NodeTool.java    |  8 ++++-
 .../cassandra/streaming/SessionInfoTest.java    |  2 +-
 .../streaming/StreamTransferTaskTest.java       |  4 ++-
 .../streaming/StreamingTransferTest.java        |  2 +-
 17 files changed, 123 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/LocalSyncTask.java
index b34c508,0000000..a43d326
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@@ -1,81 -1,0 +1,83 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.repair;
 +
 +import java.net.InetAddress;
 +import java.util.List;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
++import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.streaming.StreamEvent;
 +import org.apache.cassandra.streaming.StreamEventHandler;
 +import org.apache.cassandra.streaming.StreamPlan;
 +import org.apache.cassandra.streaming.StreamState;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +/**
 + * LocalSyncTask performs streaming between local(coordinator) node and 
remote replica.
 + */
 +public class LocalSyncTask extends SyncTask implements StreamEventHandler
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(LocalSyncTask.class);
 +
 +    private final long repairedAt;
 +
 +    public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse 
r2, long repairedAt)
 +    {
 +        super(desc, r1, r2);
 +        this.repairedAt = repairedAt;
 +    }
 +
 +    /**
 +     * Starts sending/receiving our list of differences to/from the remote 
endpoint: creates a callback
 +     * that will be called out of band once the streams complete.
 +     */
 +    protected void startSync(List<Range<Token>> differences)
 +    {
 +        InetAddress local = FBUtilities.getBroadcastAddress();
 +        // We can take anyone of the node as source or destination, however 
if one is localhost, we put at source to avoid a forwarding
 +        InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : 
r2.endpoint;
++        InetAddress preferred = SystemKeyspace.getPreferredIP(dst);
 +
 +        logger.info(String.format("[repair #%s] Performing streaming repair 
of %d ranges with %s", desc.sessionId, differences.size(), dst));
 +        new StreamPlan("Repair", repairedAt, 1, false).listeners(this)
 +                                            .flushBeforeTransfer(true)
 +                                            // request ranges from the remote 
node
-                                             .requestRanges(dst, 
desc.keyspace, differences, desc.columnFamily)
++                                            .requestRanges(dst, preferred, 
desc.keyspace, differences, desc.columnFamily)
 +                                            // send ranges to the remote node
-                                             .transferRanges(dst, 
desc.keyspace, differences, desc.columnFamily)
++                                            .transferRanges(dst, preferred, 
desc.keyspace, differences, desc.columnFamily)
 +                                            .execute();
 +    }
 +
 +    public void handleStreamEvent(StreamEvent event) { /* noop */ }
 +
 +    public void onSuccess(StreamState result)
 +    {
 +        logger.info(String.format("[repair #%s] Sync complete between %s and 
%s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily));
 +        set(stat);
 +    }
 +
 +    public void onFailure(Throwable t)
 +    {
 +        setException(t);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 1472720,25ec698..cbf0580
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@@ -49,14 -52,34 +52,16 @@@ public class StreamingRepairTask implem
  
      public void run()
      {
 -        if (request.src.equals(FBUtilities.getBroadcastAddress()))
 -            initiateStreaming();
 -        else
 -            forwardToSource();
 -    }
 -
 -    private void initiateStreaming()
 -    {
 -        long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE;
+         InetAddress dest = request.dst;
+         InetAddress preferred = SystemKeyspace.getPreferredIP(dest);
 -        if (desc.parentSessionId != null && 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != 
null)
 -            repairedAt = 
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt;
          logger.info(String.format("[streaming task #%s] Performing streaming 
repair of %d ranges with %s", desc.sessionId, request.ranges.size(), 
request.dst));
 -        StreamResultFuture op = new StreamPlan("Repair", repairedAt, 1)
 -                                    .flushBeforeTransfer(true)
 -                                    // request ranges from the remote node
 -                                    .requestRanges(dest, preferred, 
desc.keyspace, request.ranges, desc.columnFamily)
 -                                    // send ranges to the remote node
 -                                    .transferRanges(dest, preferred, 
desc.keyspace, request.ranges, desc.columnFamily)
 -                                    .execute();
 -        op.addEventListener(this);
 -    }
 -
 -    private void forwardToSource()
 -    {
 -        logger.info(String.format("[repair #%s] Forwarding streaming repair 
of %d ranges to %s (to be streamed with %s)", desc.sessionId, 
request.ranges.size(), request.src, request.dst));
 -        MessagingService.instance().sendOneWay(request.createMessage(), 
request.src);
 +        new StreamPlan("Repair", repairedAt, 1, false).listeners(this)
 +                                            .flushBeforeTransfer(true)
 +                                            // request ranges from the remote 
node
-                                             .requestRanges(request.dst, 
desc.keyspace, request.ranges, desc.columnFamily)
++                                            .requestRanges(dest, preferred, 
desc.keyspace, request.ranges, desc.columnFamily)
 +                                            // send ranges to the remote node
-                                             .transferRanges(request.dst, 
desc.keyspace, request.ranges, desc.columnFamily)
++                                            .transferRanges(dest, preferred, 
desc.keyspace, request.ranges, desc.columnFamily)
 +                                            .execute();
      }
  
      public void handleStreamEvent(StreamEvent event)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 130bd45,a0c99fe..8d0cdce
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@@ -235,7 -233,7 +235,7 @@@ public class StreamCoordinato
              // create
              if (streamSessions.size() < connectionsPerHost)
              {
-                 StreamSession session = new StreamSession(peer, factory, 
streamSessions.size(), keepSSTableLevel);
 -                StreamSession session = new StreamSession(peer, connecting, 
factory, streamSessions.size());
++                StreamSession session = new StreamSession(peer, connecting, 
factory, streamSessions.size(), keepSSTableLevel);
                  streamSessions.put(++lastReturned, session);
                  return session;
              }
@@@ -267,7 -265,7 +267,7 @@@
              StreamSession session = streamSessions.get(id);
              if (session == null)
              {
-                 session = new StreamSession(peer, factory, id, 
keepSSTableLevel);
 -                session = new StreamSession(peer, connecting, factory, id);
++                session = new StreamSession(peer, connecting, factory, id, 
keepSSTableLevel);
                  streamSessions.put(id, session);
              }
              return session;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 560a9fa,3ba296c..48b88c4
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -152,16 -159,17 +160,18 @@@ public class StreamSession implements I
       * Create new streaming session with the peer.
       *
       * @param peer Address of streaming peer
+      * @param connecting Actual connecting address
       * @param factory is used for establishing connection
       */
-     public StreamSession(InetAddress peer, StreamConnectionFactory factory, 
int index, boolean keepSSTableLevel)
 -    public StreamSession(InetAddress peer, InetAddress connecting, 
StreamConnectionFactory factory, int index)
++    public StreamSession(InetAddress peer, InetAddress connecting, 
StreamConnectionFactory factory, int index, boolean keepSSTableLevel)
      {
          this.peer = peer;
+         this.connecting = connecting;
          this.index = index;
          this.factory = factory;
          this.handler = new ConnectionHandler(this);
-         this.metrics = StreamingMetrics.get(peer);
+         this.metrics = StreamingMetrics.get(connecting);
 +        this.keepSSTableLevel = keepSSTableLevel;
      }
  
      public UUID planId()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index d84f9b7,3e73b24..5e4e1c5
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@@ -17,10 -17,9 +17,11 @@@
   */
  package org.apache.cassandra.streaming;
  
+ import java.net.InetAddress;
  import java.util.ArrayList;
  import java.util.List;
 +import java.util.concurrent.CancellationException;
 +import java.util.concurrent.Future;
  import java.util.concurrent.ScheduledFuture;
  import java.util.concurrent.TimeUnit;
  
@@@ -60,10 -41,11 +61,11 @@@ public class StreamTransferTaskTes
      @Test
      public void testScheduleTimeout() throws Exception
      {
 -        String ks = "Keyspace1";
 +        String ks = KEYSPACE1;
          String cf = "Standard1";
  
-         StreamSession session = new 
StreamSession(FBUtilities.getBroadcastAddress(), null, 0, true);
+         InetAddress peer = FBUtilities.getBroadcastAddress();
 -        StreamSession session = new StreamSession(peer, peer, null, 0);
++        StreamSession session = new StreamSession(peer, peer, null, 0, true);
          ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
  
          // create two sstables

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index d7e5f45,2b5d029..b193d9d
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@@ -137,7 -105,7 +137,7 @@@ public class StreamingTransferTes
          ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), 
p.getMinimumToken()));
  
          StreamResultFuture futureResult = new 
StreamPlan("StreamingTransferTest")
-                                                   .requestRanges(LOCAL, 
KEYSPACE2, ranges)
 -                                                  .requestRanges(LOCAL, 
LOCAL, "Keyspace2", ranges)
++                                                  .requestRanges(LOCAL, 
LOCAL, KEYSPACE2, ranges)
                                                    .execute();
  
          UUID planId = futureResult.planId;

Reply via email to