Author: gdusbabek
Date: Fri May 21 20:29:54 2010
New Revision: 947163
URL: http://svn.apache.org/viewvc?rev=947163&view=rev
Log:
recover when a migration crashes before system table is flushed. patch by
gdusbabek, reviewed by stuhood. CASSANDRA-987
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java?rev=947163&r1=947162&r2=947163&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraDaemon.java Fri
May 21 20:29:54 2010
@@ -22,18 +22,28 @@ import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
import org.apache.avro.ipc.SocketServer;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.specific.SpecificResponder;
+import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.CompactionManager;
+import org.apache.cassandra.db.DefsTable;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.migration.Migration;
+import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Mx4jTool;
+import org.apache.cassandra.utils.WrappedRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,6 +104,16 @@ public class CassandraDaemon {
// replay the log if necessary and check for compaction candidates
CommitLog.recover();
CompactionManager.instance.checkAllColumnFamilies();
+
+ // check to see if CL.recovery modified the lastMigrationId. if it
did, we need to re apply migrations. this isn't
+ // the same as merely reloading the schema (which wouldn't perform
file deletion after a DROP). The solution
+ // is to read those migrations from disk and apply them.
+ UUID currentMigration = DatabaseDescriptor.getDefsVersion();
+ UUID lastMigration = Migration.getLastMigrationId();
+ if (lastMigration.timestamp() > currentMigration.timestamp())
+ {
+ MigrationManager.applyMigrations(currentMigration, lastMigration);
+ }
// start server internals
StorageService.instance.initServer();
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java?rev=947163&r1=947162&r2=947163&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/migration/Migration.java
Fri May 21 20:29:54 2010
@@ -18,6 +18,7 @@
package org.apache.cassandra.db.migration;
+import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.KSMetaData;
import org.apache.cassandra.db.ColumnFamily;
@@ -98,11 +99,11 @@ public abstract class Migration
public void beforeApplyModels() {}
/** apply changes */
- public final void apply() throws IOException
+ public final void apply() throws IOException, ConfigurationException
{
// ensure migration is serial. don't apply unless the previous version
matches.
if (!DatabaseDescriptor.getDefsVersion().equals(lastVersion))
- throw new IOException("Previous version mismatch. cannot apply.");
+ throw new ConfigurationException("Previous version mismatch.
cannot apply.");
// write to schema
assert rm != null;
if (!clientMode)
@@ -124,6 +125,10 @@ public abstract class Migration
migration = new RowMutation(Table.SYSTEM_TABLE,
LAST_MIGRATION_KEY);
migration.add(new QueryPath(SCHEMA_CF, null, LAST_MIGRATION_KEY),
UUIDGen.decompose(newVersion), now);
migration.apply();
+
+ // if we fail here, there will be schema changes in the CL that
will get replayed *AFTER* the schema is loaded.
+ // CassandraDaemon checks for this condition (the stored version
will be greater than the loaded version)
+ // and calls MigrationManager.applyMigrations(loaded version,
stored version).
// flush changes out of memtables so we don't need to rely on the
commit log.
ColumnFamilyStore[] schemaStores = new ColumnFamilyStore[] {
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java?rev=947163&r1=947162&r2=947163&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/MigrationManager.java
Fri May 21 20:29:54 2010
@@ -19,6 +19,7 @@
package org.apache.cassandra.service;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.IColumn;
@@ -42,8 +43,11 @@ import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
public class MigrationManager implements IEndpointStateChangeSubscriber
{
@@ -99,6 +103,59 @@ public class MigrationManager implements
for (InetAddress host : hosts)
MessagingService.instance.sendOneWay(msg, host);
}
+
+ /**
+ * gets called during startup if we notice a mismatch between the current
migration version and the one saved. This
+ * can only happen as a result of the commit log recovering schema
updates, which overwrites lastVersionId.
+ *
+ * This method silently eats IOExceptions thrown by Migration.apply() as a
result of applying a migration out of
+ * order.
+ */
+ public static void applyMigrations(UUID from, UUID to) throws IOException
+ {
+ List<Future> updates = new ArrayList<Future>();
+ Collection<IColumn> migrations = Migration.getLocalMigrations(from,
to);
+ for (IColumn col : migrations)
+ {
+ final Migration migration = Migration.deserialize(new
ByteArrayInputStream(col.value()));
+ Future update =
StageManager.getStage(StageManager.MIGRATION_STAGE).submit(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ migration.apply();
+ }
+ catch (ConfigurationException ex)
+ {
+ // this happens if we try to apply something that's
already been applied. ignore and proceed.
+ }
+ catch (IOException ex)
+ {
+ throw new RuntimeException(ex);
+ }
+ }
+ });
+ updates.add(update);
+ }
+
+ // wait on all the updates before proceeding.
+ for (Future f : updates)
+ {
+ try
+ {
+ f.get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new IOException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new IOException(e);
+ }
+ }
+ }
/** pushes migrations from this host to another host */
public static void pushMigrations(UUID from, UUID to, InetAddress host)