Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 def6b5fa1 -> 93478ab46 refs/heads/trunk 40a7e8606 -> a583f70ee
Wait for anticompaction to finish patch by yukim; reviewed by marcuse for CASSANDRA-9097 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/93478ab4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/93478ab4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/93478ab4 Branch: refs/heads/cassandra-2.1 Commit: 93478ab46c88d6fd198db67ed3ba25251cc30c8c Parents: def6b5f Author: Yuki Morishita <[email protected]> Authored: Fri May 8 13:10:06 2015 -0500 Committer: Yuki Morishita <[email protected]> Committed: Fri May 8 17:25:44 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/SystemKeyspace.java | 31 +++++++ .../cassandra/repair/AnticompactionTask.java | 93 ++++++++++++++++++++ .../repair/RepairMessageVerbHandler.java | 14 ++- .../cassandra/service/ActiveRepairService.java | 14 +-- .../cassandra/service/StorageService.java | 20 +++-- .../apache/cassandra/utils/SemanticVersion.java | 2 +- 7 files changed, 161 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/93478ab4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 4a9b34a..5b7843a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -16,6 +16,7 @@ is modified (CASSANDRA-9148, CASSANDRA-9192) * Use higher timeout for prepair and snapshot in repair (CASSANDRA-9261) * Fix anticompaction blocking ANTI_ENTROPY stage (CASSANDRA-9151) + * Repair waits for anticompaction to finish (CASSANDRA-9097) Merged from 2.0: * Include keyspace and table name in error log for collections over the size limit (CASSANDRA-9286) http://git-wip-us.apache.org/repos/asf/cassandra/blob/93478ab4/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 20d5387..5beb709 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -546,6 +546,37 @@ public class SystemKeyspace } /** + * Get release version for given endpoint. + * If release version is unknown, then this returns null. + * + * @param ep endpoint address to check + * @return Release version or null if version is unknown. + */ + public static SemanticVersion getReleaseVersion(InetAddress ep) + { + try + { + if (FBUtilities.getBroadcastAddress().equals(ep)) + { + return new SemanticVersion(FBUtilities.getReleaseVersionString()); + } + String req = "SELECT release_version FROM system.%s WHERE peer=?"; + UntypedResultSet result = executeInternal(String.format(req, PEERS_CF), ep); + if (result != null && result.one().has("release_version")) + { + return new SemanticVersion(result.one().getString("release_version")); + } + // version is unknown + return null; + } + catch (IllegalArgumentException e) + { + // version string cannot be parsed + return null; + } + } + + /** * One of three things will happen if you try to read the system keyspace: * 1. files are present and you can read them: great * 2. no files are there: great (new node is assumed) http://git-wip-us.apache.org/repos/asf/cassandra/blob/93478ab4/src/java/org/apache/cassandra/repair/AnticompactionTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java new file mode 100644 index 0000000..e505d91 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.net.InetAddress; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.AbstractFuture; + +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.repair.messages.AnticompactionRequest; +import org.apache.cassandra.utils.SemanticVersion; + +public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable +{ + /* + * Version that anticompaction response is not supported up to. + * If Cassandra version is more than this, we need to wait for anticompaction response. + */ + private static final SemanticVersion VERSION_CHECKER = new SemanticVersion("2.1.5"); + + private final UUID parentSession; + private final InetAddress neighbor; + + public AnticompactionTask(UUID parentSession, InetAddress neighbor) + { + this.parentSession = parentSession; + this.neighbor = neighbor; + } + + public void run() + { + AnticompactionRequest acr = new AnticompactionRequest(parentSession); + SemanticVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor); + if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0) + { + MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true); + } + else + { + MessagingService.instance().sendOneWay(acr.createMessage(), neighbor); + // immediately return after sending request + set(neighbor); + } + } + + /** + * Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage. + */ + public static class AnticompactionCallback implements IAsyncCallbackWithFailure + { + final AnticompactionTask task; + + public AnticompactionCallback(AnticompactionTask task) + { + this.task = task; + } + + public void response(MessageIn msg) + { + task.set(msg.from); + } + + public boolean isLatencyForSnitch() + { + return false; + } + + public void onFailure(InetAddress from) + { + task.setException(new RuntimeException("Anticompaction failed or timed out in " + from)); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/93478ab4/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 5b25afa..60b2243 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -24,6 +24,8 @@ import java.util.UUID; import java.util.concurrent.Future; import com.google.common.base.Predicate; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +55,7 @@ import org.apache.cassandra.utils.Pair; public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> { private static final Logger logger = LoggerFactory.getLogger(RepairMessageVerbHandler.class); - public void doVerb(MessageIn<RepairMessage> message, int id) + public void doVerb(final MessageIn<RepairMessage> message, final int id) { // TODO add cancel/interrupt message RepairJobDesc desc = message.payload.desc; @@ -112,7 +114,15 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> case ANTICOMPACTION_REQUEST: logger.debug("Got anticompaction request"); AnticompactionRequest anticompactionRequest = (AnticompactionRequest) message.payload; - ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession); + ListenableFuture<?> compactionDone = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession); + compactionDone.addListener(new Runnable() + { + @Override + public void run() + { + MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); + } + }, MoreExecutors.sameThreadExecutor()); break; default: http://git-wip-us.apache.org/repos/asf/cassandra/blob/93478ab4/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index 5cc26ed..8d3563c 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -50,7 +50,6 @@ import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.*; -import org.apache.cassandra.repair.messages.AnticompactionRequest; import org.apache.cassandra.repair.messages.PrepareMessage; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.SyncComplete; @@ -330,21 +329,24 @@ public class ActiveRepairService * @throws InterruptedException * @throws ExecutionException */ - public synchronized void finishParentSession(UUID parentSession, Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException + public synchronized ListenableFuture<?> finishParentSession(UUID parentSession, Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException { if (doAntiCompaction) { + List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size() + 1); for (InetAddress neighbor : neighbors) { - AnticompactionRequest acr = new AnticompactionRequest(parentSession); - MessageOut<RepairMessage> req = acr.createMessage(); - MessagingService.instance().sendOneWay(req, neighbor); + AnticompactionTask task = new AnticompactionTask(parentSession, neighbor); + tasks.add(task); + task.run(); // 'run' is just sending message } - doAntiCompaction(parentSession).get(); + tasks.add(doAntiCompaction(parentSession)); + return Futures.successfulAsList(tasks); } else { removeParentRepairSession(parentSession); + return Futures.immediateFuture(null); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/93478ab4/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 7f9259c..b6c6ecf 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -142,6 +142,8 @@ import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.OutputHandler; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.WrappedRunnable; + +import com.google.common.util.concurrent.*; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,9 +165,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.Uninterruptibles; /** * This abstraction contains the token/identifier of this node @@ -2949,9 +2948,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } if (!fullRepair) { - ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successful); + ListenableFuture future = ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successful); + future.addListener(new Runnable() + { + @Override + public void run() + { + sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); + } + }, MoreExecutors.sameThreadExecutor()); + } + else + { + sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); } - sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); } }, null); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/93478ab4/src/java/org/apache/cassandra/utils/SemanticVersion.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/SemanticVersion.java b/src/java/org/apache/cassandra/utils/SemanticVersion.java index 694f09f..858029d 100644 --- a/src/java/org/apache/cassandra/utils/SemanticVersion.java +++ b/src/java/org/apache/cassandra/utils/SemanticVersion.java @@ -32,7 +32,7 @@ import com.google.common.base.Objects; */ public class SemanticVersion implements Comparable<SemanticVersion> { - private static final String VERSION_REGEXP = "(\\d+)\\.(\\d+)\\.(\\d+)(\\-[.\\w]+)?(\\+[.\\w]+)?"; + private static final String VERSION_REGEXP = "(\\d+)\\.(\\d+)\\.(\\d+)(\\-[.\\w]+)?([.+][.\\w]+)?"; private static final Pattern pattern = Pattern.compile(VERSION_REGEXP); public final int major;
