Check keyspace existence on RepairMessageVerbHandler Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11065
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/37680ee4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/37680ee4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/37680ee4 Branch: refs/heads/trunk Commit: 37680ee4e2a1b129900ff3c58b153e5a7661b757 Parents: c116207 Author: Yuki Morishita <yu...@apache.org> Authored: Thu Feb 18 11:03:31 2016 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Thu Feb 18 11:11:19 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 37 +++++++++++++++++ .../repair/RepairMessageVerbHandler.java | 42 +++++++++++++++----- 3 files changed, 69 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/37680ee4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c85fc45..53fc168 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.6 + * Protect from keyspace dropped during repair (CASSANDRA-11065) * Handle adding fields to a UDT in SELECT JSON and toJson() (CASSANDRA-11146) * Better error message for cleanup (CASSANDRA-10991) * cqlsh pg-style-strings broken if line ends with ';' (CASSANDRA-11123) http://git-wip-us.apache.org/repos/asf/cassandra/blob/37680ee4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index cf5d7c7..da4a84a 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -3074,4 +3074,41 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return sstables; } }; + + /** + * Returns a ColumnFamilyStore by cfId if it exists, null otherwise + * Differently from others, this method does not throw exception if the table does not exist. + */ + public static ColumnFamilyStore getIfExists(UUID cfId) + { + Pair<String, String> kscf = Schema.instance.getCF(cfId); + if (kscf == null) + return null; + + Keyspace keyspace = Keyspace.open(kscf.left); + if (keyspace == null) + return null; + + return keyspace.getColumnFamilyStore(cfId); + } + + /** + * Returns a ColumnFamilyStore by ksname and cfname if it exists, null otherwise + * Differently from others, this method does not throw exception if the keyspace or table does not exist. + */ + public static ColumnFamilyStore getIfExists(String ksName, String cfName) + { + if (ksName == null || cfName == null) + return null; + + Keyspace keyspace = Keyspace.open(ksName); + if (keyspace == null) + return null; + + UUID id = Schema.instance.getId(ksName, cfName); + if (id == null) + return null; + + return keyspace.getColumnFamilyStore(id); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/37680ee4/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 41d79aa..b8f8b65 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.repair; +import java.net.InetAddress; import java.util.*; import com.google.common.base.Predicate; @@ -26,9 +27,7 @@ import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.dht.Bounds; @@ -43,7 +42,6 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.*; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.CassandraVersion; -import org.apache.cassandra.utils.Pair; /** * Handles all repair related message. @@ -68,8 +66,13 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.cfIds.size()); for (UUID cfId : prepareMessage.cfIds) { - Pair<String, String> kscf = Schema.instance.getCF(cfId); - ColumnFamilyStore columnFamilyStore = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right); + ColumnFamilyStore columnFamilyStore = ColumnFamilyStore.getIfExists(cfId); + if (columnFamilyStore == null) + { + logErrorAndSendFailureResponse(String.format("Table with id %s was dropped during prepare phase of repair", + cfId.toString()), message.from, id); + return; + } columnFamilyStores.add(columnFamilyStore); } CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(message.from); @@ -88,7 +91,13 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> case SNAPSHOT: logger.debug("Snapshotting {}", desc); - final ColumnFamilyStore cfs = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily); + final ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily); + if (cfs == null) + { + logErrorAndSendFailureResponse(String.format("Table %s.%s was dropped during snapshot phase of repair", + desc.keyspace, desc.columnFamily), message.from, id); + return; + } final Range<Token> repairingRange = desc.range; Set<SSTableReader> snapshottedSSSTables = cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>() { @@ -105,10 +114,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> { // clear snapshot that we just created cfs.clearSnapshot(desc.sessionId.toString()); - logger.error("Cannot start multiple repair sessions over the same sstables"); - MessageOut reply = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE) - .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE); - MessagingService.instance().sendReply(reply, id, message.from); + logErrorAndSendFailureResponse("Cannot start multiple repair sessions over the same sstables", message.from, id); return; } ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).addSSTables(cfs.metadata.cfId, snapshottedSSSTables); @@ -120,7 +126,13 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> ValidationRequest validationRequest = (ValidationRequest) message.payload; logger.debug("Validating {}", validationRequest); // trigger read-only compaction - ColumnFamilyStore store = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily); + ColumnFamilyStore store = ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily); + if (store == null) + { + logger.error("Table {}.{} was dropped during snapshot phase of repair", desc.keyspace, desc.columnFamily); + MessagingService.instance().sendOneWay(new ValidationComplete(desc).createMessage(), message.from); + return; + } Validator validator = new Validator(desc, message.from, validationRequest.gcBefore); CompactionManager.instance.submitValidation(store, validator); @@ -172,4 +184,12 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> throw new RuntimeException(e); } } + + private void logErrorAndSendFailureResponse(String errorMessage, InetAddress to, int id) + { + logger.error(errorMessage); + MessageOut reply = new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE) + .withParameter(MessagingService.FAILURE_RESPONSE_PARAM, MessagingService.ONE_BYTE); + MessagingService.instance().sendReply(reply, id, to); + } }