Wait for all repair sessions to finish patch by yukim; reviewed by krummas for CASSANDRA-8208
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b2808b1d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b2808b1d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b2808b1d Branch: refs/heads/trunk Commit: b2808b1dcea1158511421f947660f03d583e84b0 Parents: f4456a2 Author: Yuki Morishita <[email protected]> Authored: Mon Nov 3 14:17:37 2014 -0600 Committer: Yuki Morishita <[email protected]> Committed: Thu Nov 13 12:50:10 2014 -0600 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../repair/RepairMessageVerbHandler.java | 2 +- .../apache/cassandra/repair/RepairResult.java | 3 ++ .../apache/cassandra/repair/RepairSession.java | 6 +-- .../cassandra/repair/RepairSessionResult.java | 43 ++++++++++++++++++++ .../repair/messages/AnticompactionRequest.java | 26 ++++++++++-- .../cassandra/service/ActiveRepairService.java | 16 +++++--- .../cassandra/service/StorageService.java | 30 +++++++++----- 8 files changed, 104 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d656faf..82fbbc5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -30,7 +30,7 @@ * Use unsafe mutations for most unit tests (CASSANDRA-6969) * Fix race condition during calculation of pending ranges (CASSANDRA-7390) * Fail on very large batch sizes (CASSANDRA-8011) - * improve concurrency of repair (CASSANDRA-6455) + * improve concurrency of repair (CASSANDRA-6455, 8208) 2.1.3 * Support for frozen collections (CASSANDRA-7859) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 2e96ee3..1880e8e 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -124,7 +124,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> logger.debug("Got anticompaction request {}", anticompactionRequest); try { - List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession); + List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession, anticompactionRequest.successfulRanges); FBUtilities.waitOnFutures(futures); } catch (Exception e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/repair/RepairResult.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairResult.java b/src/java/org/apache/cassandra/repair/RepairResult.java index 259d5f3..333b48a 100644 --- a/src/java/org/apache/cassandra/repair/RepairResult.java +++ b/src/java/org/apache/cassandra/repair/RepairResult.java @@ -19,6 +19,9 @@ package org.apache.cassandra.repair; import java.util.List; +/** + * RepairJob's result + */ public class RepairResult { public final RepairJobDesc desc; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index c273c4e..cc46dbe 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -75,7 +75,7 @@ import org.apache.cassandra.utils.Pair; * Similarly, if a job is sequential, it will handle one SyncTask at a time, but will handle * all of them in parallel otherwise. */ -public class RepairSession extends AbstractFuture<List<RepairResult>> implements IEndpointStateChangeSubscriber, +public class RepairSession extends AbstractFuture<RepairSessionResult> implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener { private static Logger logger = LoggerFactory.getLogger(RepairSession.class); @@ -223,7 +223,7 @@ public class RepairSession extends AbstractFuture<List<RepairResult>> implements if (endpoints.isEmpty()) { logger.info(String.format("[repair #%s] No neighbors to repair with on range %s: session completed", getId(), range)); - set(Lists.<RepairResult>newArrayList()); + set(new RepairSessionResult(id, keyspace, range, Lists.<RepairResult>newArrayList())); return; } @@ -255,7 +255,7 @@ public class RepairSession extends AbstractFuture<List<RepairResult>> implements { // this repair session is completed logger.info(String.format("[repair #%s] session completed successfully", getId())); - set(results); + set(new RepairSessionResult(id, keyspace, range, results)); taskExecutor.shutdown(); // mark this session as terminated terminate(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/repair/RepairSessionResult.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSessionResult.java b/src/java/org/apache/cassandra/repair/RepairSessionResult.java new file mode 100644 index 0000000..4551608 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/RepairSessionResult.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.Collection; +import java.util.UUID; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +/** + * Repair session result + */ +public class RepairSessionResult +{ + public final UUID sessionId; + public final String keyspace; + public final Range<Token> range; + public final Collection<RepairResult> repairJobResults; + + public RepairSessionResult(UUID sessionId, String keyspace, Range<Token> range, Collection<RepairResult> repairJobResults) + { + this.sessionId = sessionId; + this.keyspace = keyspace; + this.range = range; + this.repairJobResults = repairJobResults; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java index 1a13ad1..239ab0e 100644 --- a/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java +++ b/src/java/org/apache/cassandra/repair/messages/AnticompactionRequest.java @@ -19,8 +19,13 @@ package org.apache.cassandra.repair.messages; import java.io.DataInput; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.UUID; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.UUIDSerializer; @@ -28,11 +33,16 @@ public class AnticompactionRequest extends RepairMessage { public static MessageSerializer serializer = new AnticompactionRequestSerializer(); public final UUID parentRepairSession; + /** + * Successfully repaired ranges. Does not contain null. + */ + public final Collection<Range<Token>> successfulRanges; - public AnticompactionRequest(UUID parentRepairSession) + public AnticompactionRequest(UUID parentRepairSession, Collection<Range<Token>> ranges) { super(Type.ANTICOMPACTION_REQUEST, null); this.parentRepairSession = parentRepairSession; + this.successfulRanges = ranges; } public static class AnticompactionRequestSerializer implements MessageSerializer<AnticompactionRequest> @@ -40,17 +50,27 @@ public class AnticompactionRequest extends RepairMessage public void serialize(AnticompactionRequest message, DataOutputPlus out, int version) throws IOException { UUIDSerializer.serializer.serialize(message.parentRepairSession, out, version); + out.writeInt(message.successfulRanges.size()); + for (Range r : message.successfulRanges) + Range.serializer.serialize(r, out, version); } public AnticompactionRequest deserialize(DataInput in, int version) throws IOException { UUID parentRepairSession = UUIDSerializer.serializer.deserialize(in, version); - return new AnticompactionRequest(parentRepairSession); + int rangeCount = in.readInt(); + List<Range<Token>> ranges = new ArrayList<>(rangeCount); + for (int i = 0; i < rangeCount; i++) + ranges.add((Range<Token>) Range.serializer.deserialize(in, version).toTokenBounds()); + return new AnticompactionRequest(parentRepairSession, ranges); } public long serializedSize(AnticompactionRequest message, int version) { - return UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version); + long size = UUIDSerializer.serializer.serializedSize(message.parentRepairSession, version); + for (Range r : message.successfulRanges) + size += Range.serializer.serializedSize(r, version); + return size; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 763ecdf..3c1cc48 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -257,7 +257,7 @@ public class ActiveRepairService for (ColumnFamilyStore cfs : columnFamilyStores) cfIds.add(cfs.metadata.cfId); - for(InetAddress neighbour : endpoints) + for (InetAddress neighbour : endpoints) { PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental()); MessageOut<RepairMessage> msg = message.createMessage(); @@ -287,17 +287,17 @@ public class ActiveRepairService parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, isIncremental, System.currentTimeMillis())); } - public void finishParentSession(UUID parentSession, Set<InetAddress> neighbors) + public void finishParentSession(UUID parentSession, Set<InetAddress> neighbors, Collection<Range<Token>> successfulRanges) { try { for (InetAddress neighbor : neighbors) { - AnticompactionRequest acr = new AnticompactionRequest(parentSession); + AnticompactionRequest acr = new AnticompactionRequest(parentSession, successfulRanges); MessageOut<RepairMessage> req = acr.createMessage(); MessagingService.instance().sendOneWay(req, neighbor); } - List<Future<?>> futures = doAntiCompaction(parentSession); + List<Future<?>> futures = doAntiCompaction(parentSession, successfulRanges); FBUtilities.waitOnFutures(futures); } finally @@ -316,12 +316,16 @@ public class ActiveRepairService return parentRepairSessions.remove(parentSessionId); } - public List<Future<?>> doAntiCompaction(UUID parentRepairSession) + public List<Future<?>> doAntiCompaction(UUID parentRepairSession, Collection<Range<Token>> successfulRanges) { assert parentRepairSession != null; ParentRepairSession prs = getParentRepairSession(parentRepairSession); + assert prs.ranges.containsAll(successfulRanges) : "Trying to perform anticompaction on unknown ranges"; List<Future<?>> futures = new ArrayList<>(); + // if we don't have successful repair ranges, then just skip anticompaction + if (successfulRanges.isEmpty()) + return futures; for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet()) { @@ -338,7 +342,7 @@ public class ActiveRepairService success = sstables.isEmpty() || cfs.getDataTracker().markCompacting(sstables); } - futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt)); + futures.add(CompactionManager.instance.submitAntiCompaction(cfs, successfulRanges, sstables, prs.repairedAt)); } return futures; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b2808b1d/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 450bc5c..a0b7975 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -30,7 +30,6 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import javax.annotation.Nullable; import javax.management.JMX; import javax.management.MBeanServer; import javax.management.Notification; @@ -78,6 +77,7 @@ import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.ResponseVerbHandler; import org.apache.cassandra.repair.RepairMessageVerbHandler; +import org.apache.cassandra.repair.RepairSessionResult; import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.repair.RepairResult; import org.apache.cassandra.repair.RepairSession; @@ -2679,7 +2679,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE new NamedThreadFactory("Repair#" + cmd), "internal")); - List<ListenableFuture<?>> futures = new ArrayList<>(options.getRanges().size()); + List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size()); String[] cfnames = new String[columnFamilyStores.size()]; for (int i = 0; i < columnFamilyStores.size(); i++) { @@ -2698,9 +2698,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (session == null) continue; // After repair session completes, notify client its result - Futures.addCallback(session, new FutureCallback<List<RepairResult>>() + Futures.addCallback(session, new FutureCallback<RepairSessionResult>() { - public void onSuccess(List<RepairResult> results) + public void onSuccess(RepairSessionResult result) { String message = String.format("Repair session %s for range %s finished", session.getId(), session.getRange().toString()); logger.info(message); @@ -2719,14 +2719,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // After all repair sessions completes(successful or not), // run anticompaction if necessary and send finish notice back to client - ListenableFuture<?> allSessions = Futures.allAsList(futures); - Futures.addCallback(allSessions, new FutureCallback<Object>() + final ListenableFuture<List<RepairSessionResult>> allSessions = Futures.successfulAsList(futures); + Futures.addCallback(allSessions, new FutureCallback<List<RepairSessionResult>>() { - public void onSuccess(@Nullable Object result) + public void onSuccess(List<RepairSessionResult> result) { + // filter out null(=failed) results and get successful ranges + Collection<Range<Token>> successfulRanges = new ArrayList<>(); + for (RepairSessionResult sessionResult : result) + { + if (sessionResult != null) + { + successfulRanges.add(sessionResult.range); + } + } try { - ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors); + ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges); } catch (Exception e) { @@ -2742,14 +2751,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private void repairComplete() { - String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, true, true); + String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, + true, true); String message = String.format("Repair command #%d finished in %s", cmd, duration); sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); logger.info(message); executor.shutdownNow(); } - }, MoreExecutors.sameThreadExecutor()); + }); } }, null); }
