Check keyspace existence on RepairMessageVerbHandler

Patch by Paulo Motta; reviewed by Yuki Morishita for CASSANDRA-11065


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/37680ee4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/37680ee4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/37680ee4

Branch: refs/heads/trunk
Commit: 37680ee4e2a1b129900ff3c58b153e5a7661b757
Parents: c116207
Author: Yuki Morishita <yu...@apache.org>
Authored: Thu Feb 18 11:03:31 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Feb 18 11:11:19 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 37 +++++++++++++++++
 .../repair/RepairMessageVerbHandler.java        | 42 +++++++++++++++-----
 3 files changed, 69 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/37680ee4/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c85fc45..53fc168 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.6
+ * Protect from keyspace dropped during repair (CASSANDRA-11065)
  * Handle adding fields to a UDT in SELECT JSON and toJson() (CASSANDRA-11146)
  * Better error message for cleanup (CASSANDRA-10991)
  * cqlsh pg-style-strings broken if line ends with ';' (CASSANDRA-11123)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37680ee4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index cf5d7c7..da4a84a 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -3074,4 +3074,41 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
             return sstables;
         }
     };
+
+    /**
+     * Returns a ColumnFamilyStore by cfId if it exists, null otherwise
+     * Differently from others, this method does not throw exception if the 
table does not exist.
+     */
+    public static ColumnFamilyStore getIfExists(UUID cfId)
+    {
+        Pair<String, String> kscf = Schema.instance.getCF(cfId);
+        if (kscf == null)
+            return null;
+
+        Keyspace keyspace = Keyspace.open(kscf.left);
+        if (keyspace == null)
+            return null;
+
+        return keyspace.getColumnFamilyStore(cfId);
+    }
+
+    /**
+     * Returns a ColumnFamilyStore by ksname and cfname if it exists, null 
otherwise
+     * Differently from others, this method does not throw exception if the 
keyspace or table does not exist.
+     */
+    public static ColumnFamilyStore getIfExists(String ksName, String cfName)
+    {
+        if (ksName == null || cfName == null)
+            return null;
+
+        Keyspace keyspace = Keyspace.open(ksName);
+        if (keyspace == null)
+            return null;
+
+        UUID id = Schema.instance.getId(ksName, cfName);
+        if (id == null)
+            return null;
+
+        return keyspace.getColumnFamilyStore(id);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/37680ee4/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java 
b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 41d79aa..b8f8b65 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.repair;
 
+import java.net.InetAddress;
 import java.util.*;
 
 import com.google.common.base.Predicate;
@@ -26,9 +27,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.dht.Bounds;
@@ -43,7 +42,6 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.*;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.CassandraVersion;
-import org.apache.cassandra.utils.Pair;
 
 /**
  * Handles all repair related message.
@@ -68,8 +66,13 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                     List<ColumnFamilyStore> columnFamilyStores = new 
ArrayList<>(prepareMessage.cfIds.size());
                     for (UUID cfId : prepareMessage.cfIds)
                     {
-                        Pair<String, String> kscf = 
Schema.instance.getCF(cfId);
-                        ColumnFamilyStore columnFamilyStore = 
Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+                        ColumnFamilyStore columnFamilyStore = 
ColumnFamilyStore.getIfExists(cfId);
+                        if (columnFamilyStore == null)
+                        {
+                            
logErrorAndSendFailureResponse(String.format("Table with id %s was dropped 
during prepare phase of repair",
+                                                                         
cfId.toString()), message.from, id);
+                            return;
+                        }
                         columnFamilyStores.add(columnFamilyStore);
                     }
                     CassandraVersion peerVersion = 
SystemKeyspace.getReleaseVersion(message.from);
@@ -88,7 +91,13 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
 
                 case SNAPSHOT:
                     logger.debug("Snapshotting {}", desc);
-                    final ColumnFamilyStore cfs = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
+                    final ColumnFamilyStore cfs = 
ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily);
+                    if (cfs == null)
+                    {
+                        logErrorAndSendFailureResponse(String.format("Table 
%s.%s was dropped during snapshot phase of repair",
+                                                                     
desc.keyspace, desc.columnFamily), message.from, id);
+                        return;
+                    }
                     final Range<Token> repairingRange = desc.range;
                     Set<SSTableReader> snapshottedSSSTables = 
cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>()
                     {
@@ -105,10 +114,7 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                     {
                         // clear snapshot that we just created
                         cfs.clearSnapshot(desc.sessionId.toString());
-                        logger.error("Cannot start multiple repair sessions 
over the same sstables");
-                        MessageOut reply = new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
-                                               
.withParameter(MessagingService.FAILURE_RESPONSE_PARAM, 
MessagingService.ONE_BYTE);
-                        MessagingService.instance().sendReply(reply, id, 
message.from);
+                        logErrorAndSendFailureResponse("Cannot start multiple 
repair sessions over the same sstables", message.from, id);
                         return;
                     }
                     
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).addSSTables(cfs.metadata.cfId,
 snapshottedSSSTables);
@@ -120,7 +126,13 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                     ValidationRequest validationRequest = (ValidationRequest) 
message.payload;
                     logger.debug("Validating {}", validationRequest);
                     // trigger read-only compaction
-                    ColumnFamilyStore store = 
Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
+                    ColumnFamilyStore store = 
ColumnFamilyStore.getIfExists(desc.keyspace, desc.columnFamily);
+                    if (store == null)
+                    {
+                        logger.error("Table {}.{} was dropped during snapshot 
phase of repair", desc.keyspace, desc.columnFamily);
+                        MessagingService.instance().sendOneWay(new 
ValidationComplete(desc).createMessage(), message.from);
+                        return;
+                    }
 
                     Validator validator = new Validator(desc, message.from, 
validationRequest.gcBefore);
                     CompactionManager.instance.submitValidation(store, 
validator);
@@ -172,4 +184,12 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
             throw new RuntimeException(e);
         }
     }
+
+    private void logErrorAndSendFailureResponse(String errorMessage, 
InetAddress to, int id)
+    {
+        logger.error(errorMessage);
+        MessageOut reply = new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE)
+                               
.withParameter(MessagingService.FAILURE_RESPONSE_PARAM, 
MessagingService.ONE_BYTE);
+        MessagingService.instance().sendReply(reply, id, to);
+    }
 }

Reply via email to