Allow transient node to serve as a repair coordinator Patch by Alex Petrov and Blake Eggleston, reviewed by Ariel Weisberg, Blake Eggleston, Marcus Eriksson for CASSANDRA-14693
Co-authored-by: Blake Eggleston <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0841353e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0841353e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0841353e Branch: refs/heads/trunk Commit: 0841353e90f1cc94dc47b435af87e4d5876478ea Parents: 2886cac Author: Alex Petrov <[email protected]> Authored: Tue Sep 4 19:38:27 2018 +0200 Committer: Alex Petrov <[email protected]> Committed: Tue Sep 11 22:58:01 2018 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/DiskBoundaryManager.java | 21 +- .../cassandra/repair/AbstractSyncTask.java | 31 --- .../repair/AsymmetricLocalSyncTask.java | 105 -------- .../repair/AsymmetricRemoteSyncTask.java | 19 +- .../cassandra/repair/AsymmetricSyncTask.java | 81 ------ .../apache/cassandra/repair/CommonRange.java | 8 +- .../apache/cassandra/repair/LocalSyncTask.java | 158 ++++++++++++ .../org/apache/cassandra/repair/NodePair.java | 91 ------- .../org/apache/cassandra/repair/RepairJob.java | 174 +++++++------ .../apache/cassandra/repair/RepairRunnable.java | 27 +- .../apache/cassandra/repair/RepairSession.java | 16 +- .../repair/SymmetricLocalSyncTask.java | 142 ----------- .../repair/SymmetricRemoteSyncTask.java | 22 +- .../cassandra/repair/SymmetricSyncTask.java | 94 ------- .../apache/cassandra/repair/SyncNodePair.java | 91 +++++++ .../org/apache/cassandra/repair/SyncStat.java | 6 +- .../org/apache/cassandra/repair/SyncTask.java | 96 +++++++ .../cassandra/repair/messages/SyncComplete.java | 14 +- .../cassandra/service/ActiveRepairService.java | 3 - .../service/reads/AbstractReadExecutor.java | 2 +- .../cassandra/repair/LocalSyncTaskTest.java | 249 +++++++++++++++++++ .../repair/SymmetricLocalSyncTaskTest.java | 232 ----------------- .../repair/SymmetricRemoteSyncTaskTest.java | 6 +- .../RepairMessageSerializationsTest.java | 6 +- .../cassandra/service/SerializationsTest.java | 4 +- 26 files changed, 766 insertions(+), 933 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index b7bc775..ef285e0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Allow transient node to serve as a repair coordinator (CASSANDRA-14693) * DecayingEstimatedHistogramReservoir.EstimatedHistogramReservoirSnapshot returns wrong value for size() and incorrectly calculates count (CASSANDRA-14696) * AbstractReplicaCollection equals and hash code should throw due to conflict between order sensitive/insensitive uses (CASSANDRA-14700) * Detect inconsistencies in repaired data on the read path (CASSANDRA-14145) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/db/DiskBoundaryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java index acfe71a..0961a42 100644 --- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java +++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java @@ -109,17 +109,7 @@ public class DiskBoundaryManager if (localRanges == null || localRanges.isEmpty()) return new DiskBoundaries(dirs, null, ringVersion, directoriesVersion); - // note that Range.sort unwraps any wraparound ranges, so we need to sort them here - List<Range<Token>> fullLocalRanges = Range.sort(localRanges.stream() - .filter(Replica::isFull) - .map(Replica::range) - .collect(Collectors.toList())); - List<Range<Token>> transientLocalRanges = Range.sort(localRanges.stream() - .filter(Replica::isTransient) - .map(Replica::range) - .collect(Collectors.toList())); - - List<PartitionPosition> positions = getDiskBoundaries(fullLocalRanges, transientLocalRanges, cfs.getPartitioner(), dirs); + List<PartitionPosition> positions = getDiskBoundaries(localRanges, cfs.getPartitioner(), dirs); return new DiskBoundaries(dirs, positions, ringVersion, directoriesVersion); } @@ -133,18 +123,19 @@ public class DiskBoundaryManager * * The final entry in the returned list will always be the partitioner maximum tokens upper key bound */ - private static List<PartitionPosition> getDiskBoundaries(List<Range<Token>> fullRanges, List<Range<Token>> transientRanges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories) + private static List<PartitionPosition> getDiskBoundaries(RangesAtEndpoint ranges, IPartitioner partitioner, Directories.DataDirectory[] dataDirectories) { assert partitioner.splitter().isPresent(); Splitter splitter = partitioner.splitter().get(); boolean dontSplitRanges = DatabaseDescriptor.getNumTokens() > 1; - List<Splitter.WeightedRange> weightedRanges = new ArrayList<>(fullRanges.size() + transientRanges.size()); - for (Range<Token> r : fullRanges) + List<Splitter.WeightedRange> weightedRanges = new ArrayList<>(ranges.size()); + // note that Range.sort unwraps any wraparound ranges, so we need to sort them here + for (Range<Token> r : Range.sort(ranges.fullRanges())) weightedRanges.add(new Splitter.WeightedRange(1.0, r)); - for (Range<Token> r : transientRanges) + for (Range<Token> r : Range.sort(ranges.transientRanges())) weightedRanges.add(new Splitter.WeightedRange(0.1, r)); weightedRanges.sort(Comparator.comparing(Splitter.WeightedRange::left)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/AbstractSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AbstractSyncTask.java b/src/java/org/apache/cassandra/repair/AbstractSyncTask.java deleted file mode 100644 index 124baa1..0000000 --- a/src/java/org/apache/cassandra/repair/AbstractSyncTask.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.repair; - -import java.util.List; - -import com.google.common.util.concurrent.AbstractFuture; - -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; - -public abstract class AbstractSyncTask extends AbstractFuture<SyncStat> implements Runnable -{ - protected abstract void startSync(List<Range<Token>> rangesToStream); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java deleted file mode 100644 index eaf890a..0000000 --- a/src/java/org/apache/cassandra/repair/AsymmetricLocalSyncTask.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.repair; - -import java.util.Collections; -import java.util.List; -import java.util.UUID; - -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.RangesAtEndpoint; -import org.apache.cassandra.streaming.PreviewKind; -import org.apache.cassandra.streaming.ProgressInfo; -import org.apache.cassandra.streaming.StreamEvent; -import org.apache.cassandra.streaming.StreamEventHandler; -import org.apache.cassandra.streaming.StreamOperation; -import org.apache.cassandra.streaming.StreamPlan; -import org.apache.cassandra.streaming.StreamState; -import org.apache.cassandra.tracing.TraceState; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.FBUtilities; - -public class AsymmetricLocalSyncTask extends AsymmetricSyncTask implements StreamEventHandler -{ - private final UUID pendingRepair; - private final TraceState state = Tracing.instance.get(); - - public AsymmetricLocalSyncTask(RepairJobDesc desc, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, UUID pendingRepair, PreviewKind previewKind) - { - super(desc, FBUtilities.getBroadcastAddressAndPort(), fetchFrom, rangesToFetch, previewKind); - this.pendingRepair = pendingRepair; - } - - public void startSync(List<Range<Token>> rangesToFetch) - { - StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, - 1, false, - pendingRepair, - previewKind) - .listeners(this) - .flushBeforeTransfer(pendingRepair == null) - // request ranges from the remote node, see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here - .requestRanges(fetchFrom, desc.keyspace, RangesAtEndpoint.toDummyList(rangesToFetch), - RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); - plan.execute(); - - } - - public void handleStreamEvent(StreamEvent event) - { - if (state == null) - return; - switch (event.eventType) - { - case STREAM_PREPARED: - StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event; - state.trace("Streaming session with {} prepared", spe.session.peer); - break; - case STREAM_COMPLETE: - StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event; - state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed"); - break; - case FILE_PROGRESS: - ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress; - state.trace("{}/{} ({}%) {} idx:{}{}", - new Object[] { FBUtilities.prettyPrintMemory(pi.currentBytes), - FBUtilities.prettyPrintMemory(pi.totalBytes), - pi.currentBytes * 100 / pi.totalBytes, - pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from", - pi.sessionIndex, - pi.peer }); - } - } - - public void onSuccess(StreamState result) - { - String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, fetchingNode, fetchFrom, desc.columnFamily); - Tracing.traceRepair(message); - set(stat); - finished(); - } - - public void onFailure(Throwable t) - { - setException(t); - finished(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java index 2b171c9..9ba33dd 100644 --- a/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java +++ b/src/java/org/apache/cassandra/repair/AsymmetricRemoteSyncTask.java @@ -26,27 +26,36 @@ import org.apache.cassandra.exceptions.RepairException; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.AsymmetricSyncRequest; +import org.apache.cassandra.repair.messages.SyncRequest; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.SessionSummary; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.MerkleTrees; -public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements CompletableRemoteSyncTask +/** + * AsymmetricRemoteSyncTask sends {@link AsymmetricSyncRequest} to target node to repair(stream) + * data with other target replica. + * + * When AsymmetricRemoteSyncTask receives SyncComplete from the target, task completes. + */ +public class AsymmetricRemoteSyncTask extends SyncTask implements CompletableRemoteSyncTask { - public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort fetchNode, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind) + public AsymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort fetchNode, InetAddressAndPort fetchFrom, + List<Range<Token>> rangesToFetch, PreviewKind previewKind) { super(desc, fetchNode, fetchFrom, rangesToFetch, previewKind); } + public AsymmetricRemoteSyncTask(RepairJobDesc desc, TreeResponse to, TreeResponse from, PreviewKind previewKind) { this(desc, to.endpoint, from.endpoint, MerkleTrees.difference(to.trees, from.trees), previewKind); } - public void startSync(List<Range<Token>> rangesToFetch) + public void startSync() { InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); - AsymmetricSyncRequest request = new AsymmetricSyncRequest(desc, local, fetchingNode, fetchFrom, rangesToFetch, previewKind); + AsymmetricSyncRequest request = new AsymmetricSyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind); String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.fetchingNode, request.fetchFrom); Tracing.traceRepair(message); MessagingService.instance().sendOneWay(request.createMessage(), request.fetchingNode); @@ -60,7 +69,7 @@ public class AsymmetricRemoteSyncTask extends AsymmetricSyncTask implements Comp } else { - setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", fetchingNode, fetchFrom))); + setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", nodePair.coordinator, nodePair.peer))); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java deleted file mode 100644 index 35474af..0000000 --- a/src/java/org/apache/cassandra/repair/AsymmetricSyncTask.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.repair; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.streaming.PreviewKind; -import org.apache.cassandra.tracing.Tracing; - -public abstract class AsymmetricSyncTask extends AbstractSyncTask -{ - private static Logger logger = LoggerFactory.getLogger(AsymmetricSyncTask.class); - protected final RepairJobDesc desc; - protected final InetAddressAndPort fetchFrom; - protected final List<Range<Token>> rangesToFetch; - protected final InetAddressAndPort fetchingNode; - protected final PreviewKind previewKind; - private long startTime = Long.MIN_VALUE; - protected volatile SyncStat stat; - - public AsymmetricSyncTask(RepairJobDesc desc, InetAddressAndPort fetchingNode, InetAddressAndPort fetchFrom, List<Range<Token>> rangesToFetch, PreviewKind previewKind) - { - assert !fetchFrom.equals(fetchingNode) : "Fetching from self " + fetchFrom; - this.desc = desc; - this.fetchFrom = fetchFrom; - this.fetchingNode = fetchingNode; - this.rangesToFetch = rangesToFetch; - // todo: make an AsymmetricSyncStat? - stat = new SyncStat(new NodePair(fetchingNode, fetchFrom), rangesToFetch.size()); - this.previewKind = previewKind; - } - - public void run() - { - startTime = System.currentTimeMillis(); - // choose a repair method based on the significance of the difference - String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), fetchingNode, fetchFrom, desc.columnFamily); - if (rangesToFetch.isEmpty()) - { - logger.info(String.format(format, "are consistent")); - Tracing.traceRepair("Endpoint {} is consistent with {} for {}", fetchingNode, fetchFrom, desc.columnFamily); - set(stat); - return; - } - - // non-0 difference: perform streaming repair - logger.info(String.format(format, "have " + rangesToFetch.size() + " range(s) out of sync")); - Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", fetchingNode, rangesToFetch.size(), fetchFrom, desc.columnFamily); - startSync(rangesToFetch); - } - - protected void finished() - { - if (startTime != Long.MIN_VALUE) - Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/CommonRange.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/CommonRange.java b/src/java/org/apache/cassandra/repair/CommonRange.java index 928e570..6b55dc7 100644 --- a/src/java/org/apache/cassandra/repair/CommonRange.java +++ b/src/java/org/apache/cassandra/repair/CommonRange.java @@ -48,7 +48,13 @@ public class CommonRange this.endpoints = ImmutableSet.copyOf(endpoints); this.transEndpoints = ImmutableSet.copyOf(transEndpoints); - this.ranges = new ArrayList(ranges); + this.ranges = new ArrayList<>(ranges); + } + + public boolean matchesEndpoints(Set<InetAddressAndPort> endpoints, Set<InetAddressAndPort> transEndpoints) + { + // Use strict equality here, as worst thing that can happen is we generate one more stream + return this.endpoints.equals(endpoints) && this.transEndpoints.equals(transEndpoints); } public boolean equals(Object o) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/LocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java new file mode 100644 index 0000000..1923fbe --- /dev/null +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamEvent; +import org.apache.cassandra.streaming.StreamEventHandler; +import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.streaming.StreamPlan; +import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTrees; + +/** + * LocalSyncTask performs streaming between local(coordinator) node and remote replica. + */ +public class LocalSyncTask extends SyncTask implements StreamEventHandler +{ + private final TraceState state = Tracing.instance.get(); + + private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class); + + private final UUID pendingRepair; + private final boolean requestRanges; + private final boolean transferRanges; + + public LocalSyncTask(RepairJobDesc desc, TreeResponse local, TreeResponse remote, UUID pendingRepair, + boolean requestRanges, boolean transferRanges, PreviewKind previewKind) + { + this(desc, local.endpoint, remote.endpoint, MerkleTrees.difference(local.trees, remote.trees), + pendingRepair, requestRanges, transferRanges, previewKind); + } + + public LocalSyncTask(RepairJobDesc desc, InetAddressAndPort local, InetAddressAndPort remote, + List<Range<Token>> diff, UUID pendingRepair, + boolean requestRanges, boolean transferRanges, PreviewKind previewKind) + { + super(desc, local, remote, diff, previewKind); + Preconditions.checkArgument(requestRanges || transferRanges, "Nothing to do in a sync job"); + Preconditions.checkArgument(local.equals(FBUtilities.getBroadcastAddressAndPort())); + + this.pendingRepair = pendingRepair; + this.requestRanges = requestRanges; + this.transferRanges = transferRanges; + } + + @VisibleForTesting + StreamPlan createStreamPlan(InetAddressAndPort remote, List<Range<Token>> differences) + { + StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind) + .listeners(this) + .flushBeforeTransfer(pendingRepair == null); + + if (requestRanges) + { + // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here + plan.requestRanges(remote, desc.keyspace, RangesAtEndpoint.toDummyList(differences), + RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); + } + + if (transferRanges) + { + // send ranges to the remote node if we are not performing a pull repair + // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here + plan.transferRanges(remote, desc.keyspace, RangesAtEndpoint.toDummyList(differences), desc.columnFamily); + } + + return plan; + } + + /** + * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback + * that will be called out of band once the streams complete. + */ + @Override + protected void startSync() + { + InetAddressAndPort remote = nodePair.peer; + + String message = String.format("Performing streaming repair of %d ranges with %s", rangesToSync.size(), remote); + logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); + Tracing.traceRepair(message); + + createStreamPlan(remote, rangesToSync).execute(); + } + + public void handleStreamEvent(StreamEvent event) + { + if (state == null) + return; + switch (event.eventType) + { + case STREAM_PREPARED: + StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event; + state.trace("Streaming session with {} prepared", spe.session.peer); + break; + case STREAM_COMPLETE: + StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event; + state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed"); + break; + case FILE_PROGRESS: + ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress; + state.trace("{}/{} ({}%) {} idx:{}{}", + new Object[] { FBUtilities.prettyPrintMemory(pi.currentBytes), + FBUtilities.prettyPrintMemory(pi.totalBytes), + pi.currentBytes * 100 / pi.totalBytes, + pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from", + pi.sessionIndex, + pi.peer }); + } + } + + public void onSuccess(StreamState result) + { + String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, nodePair.coordinator, nodePair.peer, desc.columnFamily); + logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); + Tracing.traceRepair(message); + set(stat.withSummaries(result.createSummaries())); + finished(); + } + + public void onFailure(Throwable t) + { + setException(t); + finished(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/NodePair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/NodePair.java b/src/java/org/apache/cassandra/repair/NodePair.java deleted file mode 100644 index bfb237e..0000000 --- a/src/java/org/apache/cassandra/repair/NodePair.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.repair; - -import java.io.IOException; - -import com.google.common.base.Objects; - -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.net.CompactEndpointSerializationHelper; - -/** - * NodePair is used for repair message body to indicate the pair of nodes. - * - * @since 2.0 - */ -public class NodePair -{ - public static IVersionedSerializer<NodePair> serializer = new NodePairSerializer(); - - public final InetAddressAndPort endpoint1; - public final InetAddressAndPort endpoint2; - - public NodePair(InetAddressAndPort endpoint1, InetAddressAndPort endpoint2) - { - this.endpoint1 = endpoint1; - this.endpoint2 = endpoint2; - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - NodePair nodePair = (NodePair) o; - return endpoint1.equals(nodePair.endpoint1) && endpoint2.equals(nodePair.endpoint2); - } - - @Override - public String toString() - { - return endpoint1.toString() + " - " + endpoint2.toString(); - } - - @Override - public int hashCode() - { - return Objects.hashCode(endpoint1, endpoint2); - } - - public static class NodePairSerializer implements IVersionedSerializer<NodePair> - { - public void serialize(NodePair nodePair, DataOutputPlus out, int version) throws IOException - { - CompactEndpointSerializationHelper.instance.serialize(nodePair.endpoint1, out, version); - CompactEndpointSerializationHelper.instance.serialize(nodePair.endpoint2, out, version); - } - - public NodePair deserialize(DataInputPlus in, int version) throws IOException - { - InetAddressAndPort ep1 = CompactEndpointSerializationHelper.instance.deserialize(in, version); - InetAddressAndPort ep2 = CompactEndpointSerializationHelper.instance.deserialize(in, version); - return new NodePair(ep1, ep2); - } - - public long serializedSize(NodePair nodePair, int version) - { - return CompactEndpointSerializationHelper.instance.serializedSize(nodePair.endpoint1, version) - + CompactEndpointSerializationHelper.instance.serializedSize(nodePair.endpoint2, version); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index d38435b..c96e7fb 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -20,6 +20,7 @@ package org.apache.cassandra.repair; import java.util.*; import java.util.stream.Collectors; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.*; import org.slf4j.Logger; @@ -45,7 +46,7 @@ import org.apache.cassandra.utils.Pair; */ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable { - private static Logger logger = LoggerFactory.getLogger(RepairJob.class); + private static final Logger logger = LoggerFactory.getLogger(RepairJob.class); private final RepairSession session; private final RepairJobDesc desc; @@ -128,7 +129,9 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable } // When all validations complete, submit sync tasks - ListenableFuture<List<SyncStat>> syncResults = Futures.transformAsync(validations, optimiseStreams && !session.pullRepair ? optimisedSyncing() : standardSyncing(), taskExecutor); + ListenableFuture<List<SyncStat>> syncResults = Futures.transformAsync(validations, + optimiseStreams && !session.pullRepair ? this::optimisedSyncing : this::standardSyncing, + taskExecutor); // When all sync complete, set the final result Futures.addCallback(syncResults, new FutureCallback<List<SyncStat>>() @@ -165,107 +168,116 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable return session.commonRange.transEndpoints.contains(ep); } - private AsyncFunction<List<TreeResponse>, List<SyncStat>> standardSyncing() + private ListenableFuture<List<SyncStat>> standardSyncing(List<TreeResponse> trees) { - return trees -> - { - InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); + InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); - List<AbstractSyncTask> syncTasks = new ArrayList<>(); - // We need to difference all trees one against another - for (int i = 0; i < trees.size() - 1; ++i) + List<SyncTask> syncTasks = new ArrayList<>(); + // We need to difference all trees one against another + for (int i = 0; i < trees.size() - 1; ++i) + { + TreeResponse r1 = trees.get(i); + for (int j = i + 1; j < trees.size(); ++j) { - TreeResponse r1 = trees.get(i); - for (int j = i + 1; j < trees.size(); ++j) + TreeResponse r2 = trees.get(j); + + // Avoid streming between two tansient replicas + if (isTransient(r1.endpoint) && isTransient(r2.endpoint)) + continue; + + SyncTask task; + if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) { - TreeResponse r2 = trees.get(j); + TreeResponse self = r1.endpoint.equals(local) ? r1 : r2; + TreeResponse remote = r2.endpoint.equals(local) ? r1 : r2; - if (isTransient(r1.endpoint) && isTransient(r2.endpoint)) + // pull only if local is full + boolean requestRanges = !isTransient(self.endpoint); + // push only if remote is full; additionally check for pull repair + boolean transferRanges = !isTransient(remote.endpoint) && !session.pullRepair; + + // Nothing to do + if (!requestRanges && !transferRanges) continue; - AbstractSyncTask task; - if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) - { - InetAddressAndPort remote = r1.endpoint.equals(local) ? r2.endpoint : r1.endpoint; - task = new SymmetricLocalSyncTask(desc, r1, r2, isTransient(remote), isIncremental ? desc.parentSessionId : null, session.pullRepair, session.previewKind); - } - else if (isTransient(r1.endpoint) || isTransient(r2.endpoint)) - { - TreeResponse streamFrom = isTransient(r1.endpoint) ? r1 : r2; - TreeResponse streamTo = isTransient(r1.endpoint) ? r2: r1; - task = new AsymmetricRemoteSyncTask(desc, streamTo, streamFrom, previewKind); - session.waitForSync(Pair.create(desc, new NodePair(streamTo.endpoint, streamFrom.endpoint)), (AsymmetricRemoteSyncTask) task); - } - else - { - task = new SymmetricRemoteSyncTask(desc, r1, r2, session.previewKind); - // SymmetricRemoteSyncTask expects SyncComplete message sent back. - // Register task to RepairSession to receive response. - session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (SymmetricRemoteSyncTask) task); - } - syncTasks.add(task); - taskExecutor.submit(task); + task = new LocalSyncTask(desc, self, remote, isIncremental ? desc.parentSessionId : null, + requestRanges, transferRanges, session.previewKind); + } + else if (isTransient(r1.endpoint) || isTransient(r2.endpoint)) + { + // Stream only from transient replica + TreeResponse streamFrom = isTransient(r1.endpoint) ? r1 : r2; + TreeResponse streamTo = isTransient(r1.endpoint) ? r2 : r1; + task = new AsymmetricRemoteSyncTask(desc, streamTo, streamFrom, previewKind); + session.waitForSync(Pair.create(desc, task.nodePair()), (AsymmetricRemoteSyncTask) task); + } + else + { + task = new SymmetricRemoteSyncTask(desc, r1, r2, session.previewKind); + // SymmetricRemoteSyncTask expects SyncComplete message sent back. + // Register task to RepairSession to receive response. + session.waitForSync(Pair.create(desc, task.nodePair()), (SymmetricRemoteSyncTask) task); } + syncTasks.add(task); + taskExecutor.submit(task); } - return Futures.allAsList(syncTasks); - }; + } + return Futures.allAsList(syncTasks); } - private AsyncFunction<List<TreeResponse>, List<SyncStat>> optimisedSyncing() + private ListenableFuture<List<SyncStat>> optimisedSyncing(List<TreeResponse> trees) { - return trees -> - { - InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); + InetAddressAndPort local = FBUtilities.getLocalAddressAndPort(); - List<AbstractSyncTask> syncTasks = new ArrayList<>(); - // We need to difference all trees one against another - DifferenceHolder diffHolder = new DifferenceHolder(trees); + List<SyncTask> syncTasks = new ArrayList<>(); + // We need to difference all trees one against another + DifferenceHolder diffHolder = new DifferenceHolder(trees); - logger.debug("diffs = {}", diffHolder); - PreferedNodeFilter preferSameDCFilter = (streaming, candidates) -> - candidates.stream() - .filter(node -> getDC(streaming) - .equals(getDC(node))) - .collect(Collectors.toSet()); - ImmutableMap<InetAddressAndPort, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter); + logger.debug("diffs = {}", diffHolder); + PreferedNodeFilter preferSameDCFilter = (streaming, candidates) -> + candidates.stream() + .filter(node -> getDC(streaming) + .equals(getDC(node))) + .collect(Collectors.toSet()); + ImmutableMap<InetAddressAndPort, HostDifferences> reducedDifferences = ReduceHelper.reduce(diffHolder, preferSameDCFilter); - for (int i = 0; i < trees.size(); i++) - { - InetAddressAndPort address = trees.get(i).endpoint; + for (int i = 0; i < trees.size(); i++) + { + InetAddressAndPort address = trees.get(i).endpoint; - // we don't stream to transient replicas - if (isTransient(address)) - continue; + // we don't stream to transient replicas + if (isTransient(address)) + continue; - HostDifferences streamsFor = reducedDifferences.get(address); - if (streamsFor != null) + HostDifferences streamsFor = reducedDifferences.get(address); + if (streamsFor != null) + { + Preconditions.checkArgument(streamsFor.get(address).isEmpty(), "We should not fetch ranges from ourselves"); + for (InetAddressAndPort fetchFrom : streamsFor.hosts()) { - assert streamsFor.get(address).isEmpty() : "We should not fetch ranges from ourselves"; - for (InetAddressAndPort fetchFrom : streamsFor.hosts()) + List<Range<Token>> toFetch = streamsFor.get(fetchFrom); + logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom); + SyncTask task; + if (address.equals(local)) { - List<Range<Token>> toFetch = streamsFor.get(fetchFrom); - logger.debug("{} is about to fetch {} from {}", address, toFetch, fetchFrom); - AsymmetricSyncTask task; - if (address.equals(local)) - { - task = new AsymmetricLocalSyncTask(desc, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null, previewKind); - } - else - { - task = new AsymmetricRemoteSyncTask(desc, address, fetchFrom, toFetch, previewKind); - session.waitForSync(Pair.create(desc, new NodePair(address, fetchFrom)),(AsymmetricRemoteSyncTask)task); - } - syncTasks.add(task); - taskExecutor.submit(task); + task = new LocalSyncTask(desc, address, fetchFrom, toFetch, isIncremental ? desc.parentSessionId : null, + true, false, session.previewKind); } - } - else - { - logger.debug("Node {} has nothing to stream", address); + else + { + task = new AsymmetricRemoteSyncTask(desc, address, fetchFrom, toFetch, previewKind); + session.waitForSync(Pair.create(desc, task.nodePair()), (AsymmetricRemoteSyncTask) task); + } + syncTasks.add(task); + taskExecutor.submit(task); } } - return Futures.allAsList(syncTasks); - }; + else + { + logger.debug("Node {} has nothing to stream", address); + } + } + return Futures.allAsList(syncTasks); } private String getDC(InetAddressAndPort address) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 8d3cd54..fa0c2a9 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -196,21 +196,17 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti Set<InetAddressAndPort> allNeighbors = new HashSet<>(); List<CommonRange> commonRanges = new ArrayList<>(); - //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent - //calculation multiple times - // we arbitrarily limit ourselves to only full replicas, in lieu of ensuring it is safe to coordinate from a transient replica - Iterable<Range<Token>> keyspaceLocalRanges = storageService - .getLocalReplicas(keyspace) - .filter(Replica::isFull) - .ranges(); - try { + //pre-calculate output of getLocalReplicas and pass it to getNeighbors to increase performance and prevent + //calculation multiple times + Iterable<Range<Token>> keyspaceLocalRanges = storageService.getLocalReplicas(keyspace).ranges(); + for (Range<Token> range : options.getRanges()) { EndpointsForRange neighbors = ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges, range, - options.getDataCenters(), - options.getHosts()); + options.getDataCenters(), + options.getHosts()); addRangeToNeighbors(commonRanges, range, neighbors); allNeighbors.addAll(neighbors.endpoints()); @@ -647,17 +643,16 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti ImmutableList.of(failureMessage, completionMessage)); } - private void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors) + private static void addRangeToNeighbors(List<CommonRange> neighborRangeList, Range<Token> range, EndpointsForRange neighbors) { Set<InetAddressAndPort> endpoints = neighbors.endpoints(); Set<InetAddressAndPort> transEndpoints = neighbors.filter(Replica::isTransient).endpoints(); - for (int i = 0; i < neighborRangeList.size(); i++) - { - CommonRange cr = neighborRangeList.get(i); - if (cr.endpoints.containsAll(endpoints) && cr.transEndpoints.containsAll(transEndpoints)) + for (CommonRange commonRange : neighborRangeList) + { + if (commonRange.matchesEndpoints(endpoints, transEndpoints)) { - cr.ranges.add(range); + commonRange.ranges.add(range); return; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 2ff60ec..4dc563a 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -54,9 +54,8 @@ import org.apache.cassandra.utils.Pair; * ({@link org.apache.cassandra.repair.ValidationTask}) and waits until all trees are received (in * validationComplete()). * </li> - * <li>Synchronization phase: once all trees are received, the job compares each tree with - * all the other using a so-called {@link SymmetricSyncTask}. If there is difference between 2 trees, the - * concerned SymmetricSyncTask will start a streaming of the difference between the 2 endpoint concerned. + * <li>Synchronization phase: once all trees are received, the job compares each tree with all the others. If there is + * difference between 2 trees, the differences between the 2 endpoints will be streamed with a {@link SyncTask}. * </li> * </ol> * The job is done once all its SyncTasks are done (i.e. have either computed no differences @@ -103,7 +102,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address) private final ConcurrentMap<Pair<RepairJobDesc, InetAddressAndPort>, ValidationTask> validating = new ConcurrentHashMap<>(); // Remote syncing jobs wait response in syncingTasks map - private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<>(); + private final ConcurrentMap<Pair<RepairJobDesc, SyncNodePair>, CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<>(); // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor public final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask")); @@ -195,12 +194,11 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement validating.put(key, task); } - public void waitForSync(Pair<RepairJobDesc, NodePair> key, CompletableRemoteSyncTask task) + public void waitForSync(Pair<RepairJobDesc, SyncNodePair> key, CompletableRemoteSyncTask task) { syncingTasks.put(key, task); } - /** * Receive merkle tree response or failed response from {@code endpoint} for current repair job. * @@ -224,13 +222,13 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement } /** - * Notify this session that sync completed/failed with given {@code NodePair}. + * Notify this session that sync completed/failed with given {@code SyncNodePair}. * * @param desc synced repair job * @param nodes nodes that completed sync * @param success true if sync succeeded */ - public void syncComplete(RepairJobDesc desc, NodePair nodes, boolean success, List<SessionSummary> summaries) + public void syncComplete(RepairJobDesc desc, SyncNodePair nodes, boolean success, List<SessionSummary> summaries) { CompletableRemoteSyncTask task = syncingTasks.get(Pair.create(desc, nodes)); if (task == null) @@ -240,7 +238,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement } if (logger.isDebugEnabled()) - logger.debug("{} Repair completed between {} and {} on {}", previewKind.logPrefix(getId()), nodes.endpoint1, nodes.endpoint2, desc.columnFamily); + logger.debug("{} Repair completed between {} and {} on {}", previewKind.logPrefix(getId()), nodes.coordinator, nodes.peer, desc.columnFamily); task.syncComplete(success, summaries); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java deleted file mode 100644 index 7eedab7..0000000 --- a/src/java/org/apache/cassandra/repair/SymmetricLocalSyncTask.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.repair; - -import java.util.Collections; -import java.util.List; -import java.util.UUID; - -import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.RangesAtEndpoint; -import org.apache.cassandra.streaming.PreviewKind; -import org.apache.cassandra.streaming.ProgressInfo; -import org.apache.cassandra.streaming.StreamEvent; -import org.apache.cassandra.streaming.StreamEventHandler; -import org.apache.cassandra.streaming.StreamOperation; -import org.apache.cassandra.streaming.StreamPlan; -import org.apache.cassandra.streaming.StreamState; -import org.apache.cassandra.tracing.TraceState; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.FBUtilities; - -/** - * SymmetricLocalSyncTask performs streaming between local(coordinator) node and remote replica. - */ -public class SymmetricLocalSyncTask extends SymmetricSyncTask implements StreamEventHandler -{ - private final TraceState state = Tracing.instance.get(); - - private static final Logger logger = LoggerFactory.getLogger(SymmetricLocalSyncTask.class); - - private final boolean remoteIsTransient; - private final UUID pendingRepair; - private final boolean pullRepair; - - public SymmetricLocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, boolean remoteIsTransient, UUID pendingRepair, boolean pullRepair, PreviewKind previewKind) - { - super(desc, r1, r2, previewKind); - this.remoteIsTransient = remoteIsTransient; - this.pendingRepair = pendingRepair; - this.pullRepair = pullRepair; - } - - @VisibleForTesting - StreamPlan createStreamPlan(InetAddressAndPort dst, List<Range<Token>> differences) - { - StreamPlan plan = new StreamPlan(StreamOperation.REPAIR, 1, false, pendingRepair, previewKind) - .listeners(this) - .flushBeforeTransfer(pendingRepair == null) - // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here - .requestRanges(dst, desc.keyspace, RangesAtEndpoint.toDummyList(differences), - RangesAtEndpoint.toDummyList(Collections.emptyList()), desc.columnFamily); // request ranges from the remote node - - if (!pullRepair && !remoteIsTransient) - { - // send ranges to the remote node if we are not performing a pull repair - // see comment on RangesAtEndpoint.toDummyList for why we synthesize replicas here - plan.transferRanges(dst, desc.keyspace, RangesAtEndpoint.toDummyList(differences), desc.columnFamily); - } - - return plan; - } - - /** - * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback - * that will be called out of band once the streams complete. - */ - @Override - protected void startSync(List<Range<Token>> differences) - { - InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); - // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding - InetAddressAndPort dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint; - - String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst); - logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); - Tracing.traceRepair(message); - - createStreamPlan(dst, differences).execute(); - } - - public void handleStreamEvent(StreamEvent event) - { - if (state == null) - return; - switch (event.eventType) - { - case STREAM_PREPARED: - StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event; - state.trace("Streaming session with {} prepared", spe.session.peer); - break; - case STREAM_COMPLETE: - StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event; - state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed"); - break; - case FILE_PROGRESS: - ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress; - state.trace("{}/{} ({}%) {} idx:{}{}", - new Object[] { FBUtilities.prettyPrintMemory(pi.currentBytes), - FBUtilities.prettyPrintMemory(pi.totalBytes), - pi.currentBytes * 100 / pi.totalBytes, - pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from", - pi.sessionIndex, - pi.peer }); - } - } - - public void onSuccess(StreamState result) - { - String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily); - logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); - Tracing.traceRepair(message); - set(stat.withSummaries(result.createSummaries())); - finished(); - } - - public void onFailure(Throwable t) - { - setException(t); - finished(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java index 1f2740f..4e44c15 100644 --- a/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java +++ b/src/java/org/apache/cassandra/repair/SymmetricRemoteSyncTask.java @@ -18,8 +18,8 @@ package org.apache.cassandra.repair; import java.util.List; -import java.util.function.Predicate; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,13 +29,13 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.RepairException; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.repair.messages.AsymmetricSyncRequest; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.SyncRequest; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.SessionSummary; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTrees; /** * SymmetricRemoteSyncTask sends {@link SyncRequest} to remote(non-coordinator) node @@ -43,13 +43,20 @@ import org.apache.cassandra.utils.FBUtilities; * * When SymmetricRemoteSyncTask receives SyncComplete from remote node, task completes. */ -public class SymmetricRemoteSyncTask extends SymmetricSyncTask implements CompletableRemoteSyncTask +public class SymmetricRemoteSyncTask extends SyncTask implements CompletableRemoteSyncTask { private static final Logger logger = LoggerFactory.getLogger(SymmetricRemoteSyncTask.class); public SymmetricRemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, PreviewKind previewKind) { - super(desc, r1, r2, previewKind); + super(desc, r1.endpoint, r2.endpoint, MerkleTrees.difference(r1.trees, r2.trees), previewKind); + } + + @VisibleForTesting + SymmetricRemoteSyncTask(RepairJobDesc desc, InetAddressAndPort e1, InetAddressAndPort e2, + List<Range<Token>> differences, PreviewKind previewKind) + { + super(desc, e1, e2, differences, previewKind); } void sendRequest(RepairMessage request, InetAddressAndPort to) @@ -58,11 +65,12 @@ public class SymmetricRemoteSyncTask extends SymmetricSyncTask implements Comple } @Override - protected void startSync(List<Range<Token>> differences) + protected void startSync() { InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); - SyncRequest request = new SyncRequest(desc, local, r1.endpoint, r2.endpoint, differences, previewKind); + SyncRequest request = new SyncRequest(desc, local, nodePair.coordinator, nodePair.peer, rangesToSync, previewKind); + Preconditions.checkArgument(nodePair.coordinator.equals(request.src)); String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst); logger.info("{} {}", previewKind.logPrefix(desc.sessionId), message); Tracing.traceRepair(message); @@ -77,7 +85,7 @@ public class SymmetricRemoteSyncTask extends SymmetricSyncTask implements Comple } else { - setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", r1.endpoint, r2.endpoint))); + setException(new RepairException(desc, previewKind, String.format("Sync failed between %s and %s", nodePair.coordinator, nodePair.peer))); } finished(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java b/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java deleted file mode 100644 index 3da2293..0000000 --- a/src/java/org/apache/cassandra/repair/SymmetricSyncTask.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.repair; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.streaming.PreviewKind; -import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.MerkleTrees; - -/** - * SymmetricSyncTask will calculate the difference of MerkleTree between two nodes - * and perform necessary operation to repair replica. - */ -public abstract class SymmetricSyncTask extends AbstractSyncTask -{ - private static Logger logger = LoggerFactory.getLogger(SymmetricSyncTask.class); - - protected final RepairJobDesc desc; - protected final TreeResponse r1; - protected final TreeResponse r2; - protected final PreviewKind previewKind; - - protected volatile SyncStat stat; - protected long startTime = Long.MIN_VALUE; - - public SymmetricSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, PreviewKind previewKind) - { - this.desc = desc; - this.r1 = r1; - this.r2 = r2; - this.previewKind = previewKind; - } - - /** - * Compares trees, and triggers repairs for any ranges that mismatch. - */ - public void run() - { - startTime = System.currentTimeMillis(); - // compare trees, and collect differences - List<Range<Token>> differences = MerkleTrees.difference(r1.trees, r2.trees); - - stat = new SyncStat(new NodePair(r1.endpoint, r2.endpoint), differences.size()); - - // choose a repair method based on the significance of the difference - String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), r1.endpoint, r2.endpoint, desc.columnFamily); - if (differences.isEmpty()) - { - logger.info(String.format(format, "are consistent")); - Tracing.traceRepair("Endpoint {} is consistent with {} for {}", r1.endpoint, r2.endpoint, desc.columnFamily); - set(stat); - return; - } - - // non-0 difference: perform streaming repair - logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync")); - Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", r1.endpoint, differences.size(), r2.endpoint, desc.columnFamily); - startSync(differences); - } - - public SyncStat getCurrentStat() - { - return stat; - } - - protected void finished() - { - if (startTime != Long.MIN_VALUE) - Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SyncNodePair.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SyncNodePair.java b/src/java/org/apache/cassandra/repair/SyncNodePair.java new file mode 100644 index 0000000..b353eb3 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/SyncNodePair.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.io.IOException; + +import com.google.common.base.Objects; + +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.CompactEndpointSerializationHelper; + +/** + * SyncNodePair is used for repair message body to indicate the pair of nodes. + * + * @since 2.0 + */ +public class SyncNodePair +{ + public static IVersionedSerializer<SyncNodePair> serializer = new NodePairSerializer(); + + public final InetAddressAndPort coordinator; + public final InetAddressAndPort peer; + + public SyncNodePair(InetAddressAndPort coordinator, InetAddressAndPort peer) + { + this.coordinator = coordinator; + this.peer = peer; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + SyncNodePair nodePair = (SyncNodePair) o; + return coordinator.equals(nodePair.coordinator) && peer.equals(nodePair.peer); + } + + @Override + public String toString() + { + return coordinator.toString() + " - " + peer.toString(); + } + + @Override + public int hashCode() + { + return Objects.hashCode(coordinator, peer); + } + + public static class NodePairSerializer implements IVersionedSerializer<SyncNodePair> + { + public void serialize(SyncNodePair nodePair, DataOutputPlus out, int version) throws IOException + { + CompactEndpointSerializationHelper.instance.serialize(nodePair.coordinator, out, version); + CompactEndpointSerializationHelper.instance.serialize(nodePair.peer, out, version); + } + + public SyncNodePair deserialize(DataInputPlus in, int version) throws IOException + { + InetAddressAndPort ep1 = CompactEndpointSerializationHelper.instance.deserialize(in, version); + InetAddressAndPort ep2 = CompactEndpointSerializationHelper.instance.deserialize(in, version); + return new SyncNodePair(ep1, ep2); + } + + public long serializedSize(SyncNodePair nodePair, int version) + { + return CompactEndpointSerializationHelper.instance.serializedSize(nodePair.coordinator, version) + + CompactEndpointSerializationHelper.instance.serializedSize(nodePair.peer, version); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SyncStat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SyncStat.java b/src/java/org/apache/cassandra/repair/SyncStat.java index dab5659..7bb503f 100644 --- a/src/java/org/apache/cassandra/repair/SyncStat.java +++ b/src/java/org/apache/cassandra/repair/SyncStat.java @@ -26,16 +26,16 @@ import org.apache.cassandra.streaming.SessionSummary; */ public class SyncStat { - public final NodePair nodes; + public final SyncNodePair nodes; public final long numberOfDifferences; // TODO: revert to Range<Token> public final List<SessionSummary> summaries; - public SyncStat(NodePair nodes, long numberOfDifferences) + public SyncStat(SyncNodePair nodes, long numberOfDifferences) { this(nodes, numberOfDifferences, null); } - public SyncStat(NodePair nodes, long numberOfDifferences, List<SessionSummary> summaries) + public SyncStat(SyncNodePair nodes, long numberOfDifferences, List<SessionSummary> summaries) { this.nodes = nodes; this.numberOfDifferences = numberOfDifferences; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/SyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java b/src/java/org/apache/cassandra/repair/SyncTask.java new file mode 100644 index 0000000..ccbd26c --- /dev/null +++ b/src/java/org/apache/cassandra/repair/SyncTask.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.streaming.PreviewKind; +import org.apache.cassandra.tracing.Tracing; + +public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runnable +{ + private static Logger logger = LoggerFactory.getLogger(SyncTask.class); + + protected final RepairJobDesc desc; + protected final List<Range<Token>> rangesToSync; + protected final PreviewKind previewKind; + protected final SyncNodePair nodePair; + + protected volatile long startTime = Long.MIN_VALUE; + protected final SyncStat stat; + + protected SyncTask(RepairJobDesc desc, InetAddressAndPort primaryEndpoint, InetAddressAndPort peer, List<Range<Token>> rangesToSync, PreviewKind previewKind) + { + Preconditions.checkArgument(!peer.equals(primaryEndpoint), "Sending and receiving node are the same: %s", peer); + this.desc = desc; + this.rangesToSync = rangesToSync; + this.nodePair = new SyncNodePair(primaryEndpoint, peer); + this.previewKind = previewKind; + this.stat = new SyncStat(nodePair, rangesToSync.size()); + } + + protected abstract void startSync(); + + public SyncNodePair nodePair() + { + return nodePair; + } + + /** + * Compares trees, and triggers repairs for any ranges that mismatch. + */ + public final void run() + { + startTime = System.currentTimeMillis(); + + + // choose a repair method based on the significance of the difference + String format = String.format("%s Endpoints %s and %s %%s for %s", previewKind.logPrefix(desc.sessionId), nodePair.coordinator, nodePair.peer, desc.columnFamily); + if (rangesToSync.isEmpty()) + { + logger.info(String.format(format, "are consistent")); + Tracing.traceRepair("Endpoint {} is consistent with {} for {}", nodePair.coordinator, nodePair.peer, desc.columnFamily); + set(stat); + return; + } + + // non-0 difference: perform streaming repair + logger.info(String.format(format, "have " + rangesToSync.size() + " range(s) out of sync")); + Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", nodePair.coordinator, rangesToSync.size(), nodePair.peer, desc.columnFamily); + startSync(); + } + + + protected void finished() + { + if (startTime != Long.MIN_VALUE) + Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).metric.syncTime.update(System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/repair/messages/SyncComplete.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java index 1f1344d..c51d1fd 100644 --- a/src/java/org/apache/cassandra/repair/messages/SyncComplete.java +++ b/src/java/org/apache/cassandra/repair/messages/SyncComplete.java @@ -26,7 +26,7 @@ import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.repair.NodePair; +import org.apache.cassandra.repair.SyncNodePair; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.streaming.SessionSummary; @@ -39,13 +39,13 @@ public class SyncComplete extends RepairMessage public static final MessageSerializer serializer = new SyncCompleteSerializer(); /** nodes that involved in this sync */ - public final NodePair nodes; + public final SyncNodePair nodes; /** true if sync success, false otherwise */ public final boolean success; public final List<SessionSummary> summaries; - public SyncComplete(RepairJobDesc desc, NodePair nodes, boolean success, List<SessionSummary> summaries) + public SyncComplete(RepairJobDesc desc, SyncNodePair nodes, boolean success, List<SessionSummary> summaries) { super(Type.SYNC_COMPLETE, desc); this.nodes = nodes; @@ -57,7 +57,7 @@ public class SyncComplete extends RepairMessage { super(Type.SYNC_COMPLETE, desc); this.summaries = summaries; - this.nodes = new NodePair(endpoint1, endpoint2); + this.nodes = new SyncNodePair(endpoint1, endpoint2); this.success = success; } @@ -85,7 +85,7 @@ public class SyncComplete extends RepairMessage public void serialize(SyncComplete message, DataOutputPlus out, int version) throws IOException { RepairJobDesc.serializer.serialize(message.desc, out, version); - NodePair.serializer.serialize(message.nodes, out, version); + SyncNodePair.serializer.serialize(message.nodes, out, version); out.writeBoolean(message.success); out.writeInt(message.summaries.size()); @@ -98,7 +98,7 @@ public class SyncComplete extends RepairMessage public SyncComplete deserialize(DataInputPlus in, int version) throws IOException { RepairJobDesc desc = RepairJobDesc.serializer.deserialize(in, version); - NodePair nodes = NodePair.serializer.deserialize(in, version); + SyncNodePair nodes = SyncNodePair.serializer.deserialize(in, version); boolean success = in.readBoolean(); int numSummaries = in.readInt(); @@ -114,7 +114,7 @@ public class SyncComplete extends RepairMessage public long serializedSize(SyncComplete message, int version) { long size = RepairJobDesc.serializer.serializedSize(message.desc, version); - size += NodePair.serializer.serializedSize(message.nodes, version); + size += SyncNodePair.serializer.serializedSize(message.nodes, version); size += TypeSizes.sizeof(message.success); size += TypeSizes.sizeof(message.summaries.size()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 9f37095..8ffca6a 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -33,7 +33,6 @@ import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.ListeningExecutorService; @@ -62,14 +61,12 @@ import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; import org.apache.cassandra.gms.IFailureDetectionEventListener; import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.Replicas; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.CommonRange; -import org.apache.cassandra.repair.RepairRunnable; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.repair.RepairJobDesc; import org.apache.cassandra.repair.RepairParallelism; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0841353e/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index 031326e..5543fcc 100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@ -113,7 +113,7 @@ public abstract class AbstractReadExecutor protected void makeFullDataRequests(ReplicaCollection<?> replicas) { assert all(replicas, Replica::isFull); - makeRequests(command, replicas.filter(Replica::isFull)); + makeRequests(command, replicas); } protected void makeTransientDataRequests(ReplicaCollection<?> replicas) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
