Author: gdusbabek
Date: Mon Jun  7 12:26:59 2010
New Revision: 952202

URL: http://svn.apache.org/viewvc?rev=952202&view=rev
Log:
detect partitioner changes and fail fast. patch by gdusbabek reviewed by 
jbellis. CASSANDRA-1169

Modified:
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
    cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
    cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java?rev=952202&r1=952201&r2=952202&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java Mon 
Jun  7 12:26:59 2010
@@ -32,9 +32,11 @@ import org.apache.avro.ipc.SocketServer;
 import org.apache.avro.ipc.HttpServer;
 import org.apache.avro.specific.SpecificResponder;
 import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.CompactionManager;
 import org.apache.cassandra.db.DefsTable;
+import org.apache.cassandra.db.SystemTable;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.migration.Migration;
@@ -83,6 +85,17 @@ public class CassandraDaemon {
             }
         });
         
+        // check the system table for mismatched partitioner.
+        try
+        {
+            SystemTable.checkHealth();
+        }
+        catch (ConfigurationException e)
+        {
+            logger.error("Fatal exception during initialization", e);
+            System.exit(100);
+        }
+        
         try
         {
             DatabaseDescriptor.loadSchemas();

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=952202&r1=952201&r2=952202&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Mon Jun  
7 12:26:59 2010
@@ -18,11 +18,13 @@
 
 package org.apache.cassandra.db;
 
+import java.io.File;
+import java.io.FilenameFilter;
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.io.IOError;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.utils.FBUtilities;
 import static org.apache.cassandra.utils.FBUtilities.UTF8;
 
@@ -42,6 +44,8 @@ import java.net.InetAddress;
 import java.util.Collection;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 public class SystemTable
 {
@@ -54,6 +58,7 @@ public class SystemTable
     private static final byte[] TOKEN = "Token".getBytes(UTF8);
     private static final byte[] GENERATION = "Generation".getBytes(UTF8);
     private static final byte[] CLUSTERNAME = "ClusterName".getBytes(UTF8);
+    private static final byte[] PARTITIONER = "Partioner".getBytes(UTF8);
     private static StorageMetadata metadata;
 
     private static DecoratedKey decorate(byte[] key)
@@ -102,6 +107,67 @@ public class SystemTable
         }
         metadata.setToken(token);
     }
+
+    /**
+     * One of three things will happen if you try to read the system table:
+     * 1. files are present and you can read them: great
+     * 2. no files are there: great (new node is assumed)
+     * 3. files are present but you can't read them: bad (suspect that the 
partitioner was changed).
+     * @throws ConfigurationException
+     */
+    public static void checkHealth() throws ConfigurationException
+    {
+        Table table = null;
+        try
+        {
+            table = Table.open(Table.SYSTEM_TABLE);
+        }
+        catch (AssertionError err)
+        {
+            // this happens when a user switches from OPP to RP.
+            ConfigurationException ex = new ConfigurationException("Could not 
read system table. Did you change partitioners?");
+            ex.initCause(err);
+            throw ex;
+        }
+        
+        SortedSet<byte[]> cols = new TreeSet<byte[]>(BytesType.instance);
+        cols.add(TOKEN);
+        cols.add(GENERATION);
+        cols.add(PARTITIONER);
+        QueryFilter filter = 
QueryFilter.getNamesFilter(decorate(LOCATION_KEY), new QueryPath(STATUS_CF), 
cols);
+        ColumnFamily cf = 
table.getColumnFamilyStore(STATUS_CF).getColumnFamily(filter);
+        
+        if (cf == null)
+        {
+            // this is either a brand new node (there will be no files), or 
the partitioner was changed from RP to OPP.
+            for (String path : 
DatabaseDescriptor.getAllDataFileLocationsForTable("system"))
+            {
+                File[] dbContents = new File(path).listFiles(new 
FilenameFilter()
+                {
+                    @Override
+                    public boolean accept(File dir, String name)
+                    {
+                        return name.endsWith(".db");
+                    }
+                }); 
+                if (dbContents.length > 0)
+                    throw new ConfigurationException("Found system table 
files, but they couldn't be loaded. Did you change the partitioner?");
+            }   
+            // no system files. data is either in the commit log or this is a 
new node.
+            return;
+        }
+        
+        
+        // token and generation should *always* be there. If either are 
missing, we can assume that the partitioner has
+        // been switched.
+        if (cf.getColumnCount() > 0 && (cf.getColumn(GENERATION) == null || 
cf.getColumn(TOKEN) == null))
+            throw new ConfigurationException("Couldn't read system generation 
or token. Did you change the partitioner?");
+        IColumn partitionerCol = cf.getColumn(PARTITIONER);
+        if (partitionerCol != null && 
!DatabaseDescriptor.getPartitioner().getClass().getName().equals(new 
String(partitionerCol.value(), UTF8)))
+            throw new ConfigurationException("Detected partitioner mismatch! 
Did you change the partitioner?");
+        if (partitionerCol == null)
+            logger.info("Did not see a partitioner in system storage.");
+    }
     
     /*
      * This method reads the system table and retrieves the metadata
@@ -125,6 +191,7 @@ public class SystemTable
         columns.add(CLUSTERNAME);
         QueryFilter filter = 
QueryFilter.getNamesFilter(decorate(LOCATION_KEY), new QueryPath(STATUS_CF), 
columns);
         ColumnFamily cf = 
table.getColumnFamilyStore(STATUS_CF).getColumnFamily(filter);
+        String partitioner = 
DatabaseDescriptor.getPartitioner().getClass().getName();
 
         if (cf == null)
         {
@@ -148,8 +215,21 @@ public class SystemTable
             cf.addColumn(new Column(TOKEN, 
p.getTokenFactory().toByteArray(token), TimestampClock.ZERO_VALUE));
             cf.addColumn(new Column(GENERATION, 
FBUtilities.toByteArray(generation), TimestampClock.ZERO_VALUE));
             cf.addColumn(new Column(CLUSTERNAME, 
DatabaseDescriptor.getClusterName().getBytes(), TimestampClock.ZERO_VALUE));
+            cf.addColumn(new Column(PARTITIONER, partitioner.getBytes(UTF8), 
TimestampClock.ZERO_VALUE));
             rm.add(cf);
             rm.apply();
+            try
+            {
+                
table.getColumnFamilyStore(SystemTable.STATUS_CF).forceBlockingFlush();
+            }
+            catch (ExecutionException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
             metadata = new StorageMetadata(token, generation, 
DatabaseDescriptor.getClusterName().getBytes());
             return metadata;
         }
@@ -167,6 +247,7 @@ public class SystemTable
         int gen = Math.max(FBUtilities.byteArrayToInt(generation.value()) + 1, 
(int) (System.currentTimeMillis() / 1000));
 
         IColumn cluster = cf.getColumn(CLUSTERNAME);
+        IColumn partitionerColumn = cf.getColumn(PARTITIONER);
 
         RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
         cf = ColumnFamily.create(Table.SYSTEM_TABLE, SystemTable.STATUS_CF);
@@ -186,8 +267,29 @@ public class SystemTable
             cname = DatabaseDescriptor.getClusterName().getBytes();
             logger.info("Saved ClusterName not found. Using " + 
DatabaseDescriptor.getClusterName());
         }
+        
+        if (partitionerColumn == null)
+        {
+            Column c = new Column(PARTITIONER, partitioner.getBytes(UTF8), 
TimestampClock.ZERO_VALUE);
+            cf.addColumn(c);
+            logger.info("Saved partitioner not found. Using " + partitioner);
+        }
+        
         rm.add(cf);
         rm.apply();
+        try
+        {
+            
table.getColumnFamilyStore(SystemTable.STATUS_CF).forceBlockingFlush();
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+
         metadata = new StorageMetadata(token, gen, cname);
         return metadata;
     }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=952202&r1=952201&r2=952202&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java 
Mon Jun  7 12:26:59 2010
@@ -28,7 +28,9 @@ import java.util.concurrent.SynchronousQ
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.cassandra.db.SystemTable;
 import org.apache.thrift.server.TServer;
+import org.apache.cassandra.config.ConfigurationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,6 +88,17 @@ public class CassandraDaemon
                 }
             }
         });
+        
+        // check the system table for mismatched partitioner.
+        try
+        {
+            SystemTable.checkHealth();
+        }
+        catch (ConfigurationException e)
+        {
+            logger.error("Fatal exception during initialization", e);
+            System.exit(100);
+        }
 
         try
         {

Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java?rev=952202&r1=952201&r2=952202&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java Mon 
Jun  7 12:26:59 2010
@@ -26,6 +26,7 @@ import java.net.UnknownHostException;
 
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
+import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.commons.lang.StringUtils;
 import org.junit.Test;
@@ -43,7 +44,7 @@ import org.apache.cassandra.locator.Rack
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.gms.ApplicationState;
 
-public class MoveTest
+public class MoveTest extends CleanupHelper
 {
     // handy way of creating a mapping of strategies to use in StorageService.
     private static Map<String, AbstractReplicationStrategy> 
createReplacements(AbstractReplicationStrategy strat)


Reply via email to