Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/repair/StreamingRepairTask.java
src/java/org/apache/cassandra/streaming/StreamCoordinator.java
src/java/org/apache/cassandra/streaming/StreamSession.java
test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c6de8829
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c6de8829
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c6de8829
Branch: refs/heads/trunk
Commit: c6de882968bc3b4d77a59fd9b1df0e99a6a07636
Parents: b6b08f2 6cca24f
Author: Yuki Morishita <[email protected]>
Authored: Mon Oct 20 10:45:40 2014 -0500
Committer: Yuki Morishita <[email protected]>
Committed: Mon Oct 20 10:45:40 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/SystemKeyspace.java | 8 ++++-
.../org/apache/cassandra/dht/RangeStreamer.java | 4 ++-
.../net/OutboundTcpConnectionPool.java | 12 ++++----
.../apache/cassandra/repair/LocalSyncTask.java | 6 ++--
.../cassandra/repair/StreamingRepairTask.java | 9 ++++--
.../cassandra/service/StorageService.java | 17 +++++++----
.../apache/cassandra/streaming/SessionInfo.java | 3 ++
.../cassandra/streaming/StreamCoordinator.java | 20 ++++++-------
.../apache/cassandra/streaming/StreamPlan.java | 31 ++++++++++++++------
.../cassandra/streaming/StreamResultFuture.java | 2 +-
.../cassandra/streaming/StreamSession.java | 24 +++++++++++----
.../management/SessionInfoCompositeData.java | 30 +++++++++++--------
.../org/apache/cassandra/tools/NodeTool.java | 8 ++++-
.../cassandra/streaming/SessionInfoTest.java | 2 +-
.../streaming/StreamTransferTaskTest.java | 4 ++-
.../streaming/StreamingTransferTest.java | 2 +-
17 files changed, 123 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/LocalSyncTask.java
index b34c508,0000000..a43d326
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@@ -1,81 -1,0 +1,83 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
++import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.streaming.StreamEvent;
+import org.apache.cassandra.streaming.StreamEventHandler;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * LocalSyncTask performs streaming between local(coordinator) node and
remote replica.
+ */
+public class LocalSyncTask extends SyncTask implements StreamEventHandler
+{
+ private static final Logger logger =
LoggerFactory.getLogger(LocalSyncTask.class);
+
+ private final long repairedAt;
+
+ public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse
r2, long repairedAt)
+ {
+ super(desc, r1, r2);
+ this.repairedAt = repairedAt;
+ }
+
+ /**
+ * Starts sending/receiving our list of differences to/from the remote
endpoint: creates a callback
+ * that will be called out of band once the streams complete.
+ */
+ protected void startSync(List<Range<Token>> differences)
+ {
+ InetAddress local = FBUtilities.getBroadcastAddress();
+ // We can take anyone of the node as source or destination, however
if one is localhost, we put at source to avoid a forwarding
+ InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint :
r2.endpoint;
++ InetAddress preferred = SystemKeyspace.getPreferredIP(dst);
+
+ logger.info(String.format("[repair #%s] Performing streaming repair
of %d ranges with %s", desc.sessionId, differences.size(), dst));
+ new StreamPlan("Repair", repairedAt, 1, false).listeners(this)
+ .flushBeforeTransfer(true)
+ // request ranges from the remote
node
- .requestRanges(dst,
desc.keyspace, differences, desc.columnFamily)
++ .requestRanges(dst, preferred,
desc.keyspace, differences, desc.columnFamily)
+ // send ranges to the remote node
- .transferRanges(dst,
desc.keyspace, differences, desc.columnFamily)
++ .transferRanges(dst, preferred,
desc.keyspace, differences, desc.columnFamily)
+ .execute();
+ }
+
+ public void handleStreamEvent(StreamEvent event) { /* noop */ }
+
+ public void onSuccess(StreamState result)
+ {
+ logger.info(String.format("[repair #%s] Sync complete between %s and
%s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily));
+ set(stat);
+ }
+
+ public void onFailure(Throwable t)
+ {
+ setException(t);
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/StreamingRepairTask.java
index 1472720,25ec698..cbf0580
--- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
+++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java
@@@ -49,14 -52,34 +52,16 @@@ public class StreamingRepairTask implem
public void run()
{
- if (request.src.equals(FBUtilities.getBroadcastAddress()))
- initiateStreaming();
- else
- forwardToSource();
- }
-
- private void initiateStreaming()
- {
- long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE;
+ InetAddress dest = request.dst;
+ InetAddress preferred = SystemKeyspace.getPreferredIP(dest);
- if (desc.parentSessionId != null &&
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) !=
null)
- repairedAt =
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt;
logger.info(String.format("[streaming task #%s] Performing streaming
repair of %d ranges with %s", desc.sessionId, request.ranges.size(),
request.dst));
- StreamResultFuture op = new StreamPlan("Repair", repairedAt, 1)
- .flushBeforeTransfer(true)
- // request ranges from the remote node
- .requestRanges(dest, preferred,
desc.keyspace, request.ranges, desc.columnFamily)
- // send ranges to the remote node
- .transferRanges(dest, preferred,
desc.keyspace, request.ranges, desc.columnFamily)
- .execute();
- op.addEventListener(this);
- }
-
- private void forwardToSource()
- {
- logger.info(String.format("[repair #%s] Forwarding streaming repair
of %d ranges to %s (to be streamed with %s)", desc.sessionId,
request.ranges.size(), request.src, request.dst));
- MessagingService.instance().sendOneWay(request.createMessage(),
request.src);
+ new StreamPlan("Repair", repairedAt, 1, false).listeners(this)
+ .flushBeforeTransfer(true)
+ // request ranges from the remote
node
- .requestRanges(request.dst,
desc.keyspace, request.ranges, desc.columnFamily)
++ .requestRanges(dest, preferred,
desc.keyspace, request.ranges, desc.columnFamily)
+ // send ranges to the remote node
- .transferRanges(request.dst,
desc.keyspace, request.ranges, desc.columnFamily)
++ .transferRanges(dest, preferred,
desc.keyspace, request.ranges, desc.columnFamily)
+ .execute();
}
public void handleStreamEvent(StreamEvent event)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamCoordinator.java
index 130bd45,a0c99fe..8d0cdce
--- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
+++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java
@@@ -235,7 -233,7 +235,7 @@@ public class StreamCoordinato
// create
if (streamSessions.size() < connectionsPerHost)
{
- StreamSession session = new StreamSession(peer, factory,
streamSessions.size(), keepSSTableLevel);
- StreamSession session = new StreamSession(peer, connecting,
factory, streamSessions.size());
++ StreamSession session = new StreamSession(peer, connecting,
factory, streamSessions.size(), keepSSTableLevel);
streamSessions.put(++lastReturned, session);
return session;
}
@@@ -267,7 -265,7 +267,7 @@@
StreamSession session = streamSessions.get(id);
if (session == null)
{
- session = new StreamSession(peer, factory, id,
keepSSTableLevel);
- session = new StreamSession(peer, connecting, factory, id);
++ session = new StreamSession(peer, connecting, factory, id,
keepSSTableLevel);
streamSessions.put(id, session);
}
return session;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/streaming/StreamPlan.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/streaming/StreamResultFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 560a9fa,3ba296c..48b88c4
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -152,16 -159,17 +160,18 @@@ public class StreamSession implements I
* Create new streaming session with the peer.
*
* @param peer Address of streaming peer
+ * @param connecting Actual connecting address
* @param factory is used for establishing connection
*/
- public StreamSession(InetAddress peer, StreamConnectionFactory factory,
int index, boolean keepSSTableLevel)
- public StreamSession(InetAddress peer, InetAddress connecting,
StreamConnectionFactory factory, int index)
++ public StreamSession(InetAddress peer, InetAddress connecting,
StreamConnectionFactory factory, int index, boolean keepSSTableLevel)
{
this.peer = peer;
+ this.connecting = connecting;
this.index = index;
this.factory = factory;
this.handler = new ConnectionHandler(this);
- this.metrics = StreamingMetrics.get(peer);
+ this.metrics = StreamingMetrics.get(connecting);
+ this.keepSSTableLevel = keepSSTableLevel;
}
public UUID planId()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index d84f9b7,3e73b24..5e4e1c5
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@@ -17,10 -17,9 +17,11 @@@
*/
package org.apache.cassandra.streaming;
+ import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@@ -60,10 -41,11 +61,11 @@@ public class StreamTransferTaskTes
@Test
public void testScheduleTimeout() throws Exception
{
- String ks = "Keyspace1";
+ String ks = KEYSPACE1;
String cf = "Standard1";
- StreamSession session = new
StreamSession(FBUtilities.getBroadcastAddress(), null, 0, true);
+ InetAddress peer = FBUtilities.getBroadcastAddress();
- StreamSession session = new StreamSession(peer, peer, null, 0);
++ StreamSession session = new StreamSession(peer, peer, null, 0, true);
ColumnFamilyStore cfs = Keyspace.open(ks).getColumnFamilyStore(cf);
// create two sstables
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c6de8829/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
index d7e5f45,2b5d029..b193d9d
--- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
@@@ -137,7 -105,7 +137,7 @@@ public class StreamingTransferTes
ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")),
p.getMinimumToken()));
StreamResultFuture futureResult = new
StreamPlan("StreamingTransferTest")
- .requestRanges(LOCAL,
KEYSPACE2, ranges)
- .requestRanges(LOCAL,
LOCAL, "Keyspace2", ranges)
++ .requestRanges(LOCAL,
LOCAL, KEYSPACE2, ranges)
.execute();
UUID planId = futureResult.planId;