Author: gdusbabek
Date: Mon Jun 7 12:26:59 2010
New Revision: 952202
URL: http://svn.apache.org/viewvc?rev=952202&view=rev
Log:
detect partitioner changes and fail fast. patch by gdusbabek reviewed by
jbellis. CASSANDRA-1169
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java?rev=952202&r1=952201&r2=952202&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java Mon
Jun 7 12:26:59 2010
@@ -32,9 +32,11 @@ import org.apache.avro.ipc.SocketServer;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.specific.SpecificResponder;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.CompactionManager;
import org.apache.cassandra.db.DefsTable;
+import org.apache.cassandra.db.SystemTable;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.migration.Migration;
@@ -83,6 +85,17 @@ public class CassandraDaemon {
}
});
+ // check the system table for mismatched partitioner.
+ try
+ {
+ SystemTable.checkHealth();
+ }
+ catch (ConfigurationException e)
+ {
+ logger.error("Fatal exception during initialization", e);
+ System.exit(100);
+ }
+
try
{
DatabaseDescriptor.loadSchemas();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=952202&r1=952201&r2=952202&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Mon Jun
7 12:26:59 2010
@@ -18,11 +18,13 @@
package org.apache.cassandra.db;
+import java.io.File;
+import java.io.FilenameFilter;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.io.IOError;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.utils.FBUtilities.UTF8;
@@ -42,6 +44,8 @@ import java.net.InetAddress;
import java.util.Collection;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
public class SystemTable
{
@@ -54,6 +58,7 @@ public class SystemTable
private static final byte[] TOKEN = "Token".getBytes(UTF8);
private static final byte[] GENERATION = "Generation".getBytes(UTF8);
private static final byte[] CLUSTERNAME = "ClusterName".getBytes(UTF8);
+ private static final byte[] PARTITIONER = "Partioner".getBytes(UTF8);
private static StorageMetadata metadata;
private static DecoratedKey decorate(byte[] key)
@@ -102,6 +107,67 @@ public class SystemTable
}
metadata.setToken(token);
}
+
+ /**
+ * One of three things will happen if you try to read the system table:
+ * 1. files are present and you can read them: great
+ * 2. no files are there: great (new node is assumed)
+ * 3. files are present but you can't read them: bad (suspect that the
partitioner was changed).
+ * @throws ConfigurationException
+ */
+ public static void checkHealth() throws ConfigurationException
+ {
+ Table table = null;
+ try
+ {
+ table = Table.open(Table.SYSTEM_TABLE);
+ }
+ catch (AssertionError err)
+ {
+ // this happens when a user switches from OPP to RP.
+ ConfigurationException ex = new ConfigurationException("Could not
read system table. Did you change partitioners?");
+ ex.initCause(err);
+ throw ex;
+ }
+
+ SortedSet<byte[]> cols = new TreeSet<byte[]>(BytesType.instance);
+ cols.add(TOKEN);
+ cols.add(GENERATION);
+ cols.add(PARTITIONER);
+ QueryFilter filter =
QueryFilter.getNamesFilter(decorate(LOCATION_KEY), new QueryPath(STATUS_CF),
cols);
+ ColumnFamily cf =
table.getColumnFamilyStore(STATUS_CF).getColumnFamily(filter);
+
+ if (cf == null)
+ {
+ // this is either a brand new node (there will be no files), or
the partitioner was changed from RP to OPP.
+ for (String path :
DatabaseDescriptor.getAllDataFileLocationsForTable("system"))
+ {
+ File[] dbContents = new File(path).listFiles(new
FilenameFilter()
+ {
+ @Override
+ public boolean accept(File dir, String name)
+ {
+ return name.endsWith(".db");
+ }
+ });
+ if (dbContents.length > 0)
+ throw new ConfigurationException("Found system table
files, but they couldn't be loaded. Did you change the partitioner?");
+ }
+ // no system files. data is either in the commit log or this is a
new node.
+ return;
+ }
+
+
+ // token and generation should *always* be there. If either are
missing, we can assume that the partitioner has
+ // been switched.
+ if (cf.getColumnCount() > 0 && (cf.getColumn(GENERATION) == null ||
cf.getColumn(TOKEN) == null))
+ throw new ConfigurationException("Couldn't read system generation
or token. Did you change the partitioner?");
+ IColumn partitionerCol = cf.getColumn(PARTITIONER);
+ if (partitionerCol != null &&
!DatabaseDescriptor.getPartitioner().getClass().getName().equals(new
String(partitionerCol.value(), UTF8)))
+ throw new ConfigurationException("Detected partitioner mismatch!
Did you change the partitioner?");
+ if (partitionerCol == null)
+ logger.info("Did not see a partitioner in system storage.");
+ }
/*
* This method reads the system table and retrieves the metadata
@@ -125,6 +191,7 @@ public class SystemTable
columns.add(CLUSTERNAME);
QueryFilter filter =
QueryFilter.getNamesFilter(decorate(LOCATION_KEY), new QueryPath(STATUS_CF),
columns);
ColumnFamily cf =
table.getColumnFamilyStore(STATUS_CF).getColumnFamily(filter);
+ String partitioner =
DatabaseDescriptor.getPartitioner().getClass().getName();
if (cf == null)
{
@@ -148,8 +215,21 @@ public class SystemTable
cf.addColumn(new Column(TOKEN,
p.getTokenFactory().toByteArray(token), TimestampClock.ZERO_VALUE));
cf.addColumn(new Column(GENERATION,
FBUtilities.toByteArray(generation), TimestampClock.ZERO_VALUE));
cf.addColumn(new Column(CLUSTERNAME,
DatabaseDescriptor.getClusterName().getBytes(), TimestampClock.ZERO_VALUE));
+ cf.addColumn(new Column(PARTITIONER, partitioner.getBytes(UTF8),
TimestampClock.ZERO_VALUE));
rm.add(cf);
rm.apply();
+ try
+ {
+
table.getColumnFamilyStore(SystemTable.STATUS_CF).forceBlockingFlush();
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
metadata = new StorageMetadata(token, generation,
DatabaseDescriptor.getClusterName().getBytes());
return metadata;
}
@@ -167,6 +247,7 @@ public class SystemTable
int gen = Math.max(FBUtilities.byteArrayToInt(generation.value()) + 1,
(int) (System.currentTimeMillis() / 1000));
IColumn cluster = cf.getColumn(CLUSTERNAME);
+ IColumn partitionerColumn = cf.getColumn(PARTITIONER);
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, LOCATION_KEY);
cf = ColumnFamily.create(Table.SYSTEM_TABLE, SystemTable.STATUS_CF);
@@ -186,8 +267,29 @@ public class SystemTable
cname = DatabaseDescriptor.getClusterName().getBytes();
logger.info("Saved ClusterName not found. Using " +
DatabaseDescriptor.getClusterName());
}
+
+ if (partitionerColumn == null)
+ {
+ Column c = new Column(PARTITIONER, partitioner.getBytes(UTF8),
TimestampClock.ZERO_VALUE);
+ cf.addColumn(c);
+ logger.info("Saved partitioner not found. Using " + partitioner);
+ }
+
rm.add(cf);
rm.apply();
+ try
+ {
+
table.getColumnFamilyStore(SystemTable.STATUS_CF).forceBlockingFlush();
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+
metadata = new StorageMetadata(token, gen, cname);
return metadata;
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java?rev=952202&r1=952201&r2=952202&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraDaemon.java
Mon Jun 7 12:26:59 2010
@@ -28,7 +28,9 @@ import java.util.concurrent.SynchronousQ
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.cassandra.db.SystemTable;
import org.apache.thrift.server.TServer;
+import org.apache.cassandra.config.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,6 +88,17 @@ public class CassandraDaemon
}
}
});
+
+ // check the system table for mismatched partitioner.
+ try
+ {
+ SystemTable.checkHealth();
+ }
+ catch (ConfigurationException e)
+ {
+ logger.error("Fatal exception during initialization", e);
+ System.exit(100);
+ }
try
{
Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java?rev=952202&r1=952201&r2=952202&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/service/MoveTest.java Mon
Jun 7 12:26:59 2010
@@ -26,6 +26,7 @@ import java.net.UnknownHostException;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.commons.lang.StringUtils;
import org.junit.Test;
@@ -43,7 +44,7 @@ import org.apache.cassandra.locator.Rack
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.gms.ApplicationState;
-public class MoveTest
+public class MoveTest extends CleanupHelper
{
// handy way of creating a mapping of strategies to use in StorageService.
private static Map<String, AbstractReplicationStrategy>
createReplacements(AbstractReplicationStrategy strat)