add a convenient way to reset a node's schema
patch by Yuki Morishita and Pavel Yaskevich; reviewed by Pavel Yaskevich for 
CASSANDRA-2963


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e376bc02
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e376bc02
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e376bc02

Branch: refs/heads/trunk
Commit: e376bc02eca6f75489d2aff41e0cc36105eb8567
Parents: 5030f78
Author: Pavel Yaskevich <[email protected]>
Authored: Wed Feb 22 21:54:43 2012 +0300
Committer: Pavel Yaskevich <[email protected]>
Committed: Wed Feb 22 21:59:31 2012 +0300

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 src/java/org/apache/cassandra/config/Schema.java   |   18 ++
 src/java/org/apache/cassandra/db/Table.java        |   25 +++-
 .../apache/cassandra/service/MigrationManager.java |  144 +++++++++++----
 .../apache/cassandra/service/StorageService.java   |    7 +-
 .../cassandra/service/StorageServiceMBean.java     |    4 +-
 src/java/org/apache/cassandra/tools/NodeCmd.java   |    3 +
 src/java/org/apache/cassandra/tools/NodeProbe.java |    6 +-
 8 files changed, 162 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/e376bc02/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7811b1f..a41bdba 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@
  * Fix BulkRecordWriter to not throw NPE if reducer gets no map data from 
Hadoop (CASSANDRA-3944)
  * Fix bug with counters in super columns (CASSANDRA-3821)
  * Remove deprecated merge_shard_chance (CASSANDRA-3940)
+ * add a convenient way to reset a node's schema (CASSANDRA-2963)
 Merged from 1.0:
  * remove the wait on hint future during write (CASSANDRA-3870)
  * (cqlsh) ignore missing CfDef opts (CASSANDRA-3933)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e376bc02/src/java/org/apache/cassandra/config/Schema.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Schema.java 
b/src/java/org/apache/cassandra/config/Schema.java
index 89c5d1e..d27b347 100644
--- a/src/java/org/apache/cassandra/config/Schema.java
+++ b/src/java/org/apache/cassandra/config/Schema.java
@@ -479,6 +479,7 @@ public class Schema
             versionLock.writeLock().unlock();
         }
     }
+
     /*
      * Like updateVersion, but also announces via gossip
      */
@@ -487,4 +488,21 @@ public class Schema
         updateVersion();
         MigrationManager.passiveAnnounce(version);
     }
+
+    /**
+     * Clear all KS/CF metadata and reset version.
+     */
+    public synchronized void clear()
+    {
+        for (String table : getNonSystemTables())
+        {
+            KSMetaData ksm = getTableDefinition(table);
+            for (CFMetaData cfm : ksm.cfMetaData().values())
+                purge(cfm);
+            clearTableDefinition(ksm);
+        }
+
+        updateVersionAndAnnounce();
+        fixCFMaxId();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e376bc02/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Table.java 
b/src/java/org/apache/cassandra/db/Table.java
index afe9a65..22285be 100644
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.io.util.MmappedSegmentedFile;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.NodeId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -334,9 +335,27 @@ public class Table
     /** adds a cf to internal structures, ends up creating disk files). */
     public void initCf(Integer cfId, String cfName)
     {
-        assert !columnFamilyStores.containsKey(cfId) : String.format("tried to 
init %s as %s, but already used by %s",
-                                                                     cfName, 
cfId, columnFamilyStores.get(cfId));
-        columnFamilyStores.put(cfId, 
ColumnFamilyStore.createColumnFamilyStore(this, cfName));
+        if (columnFamilyStores.containsKey(cfId))
+        {
+            // this is the case when you reset local schema
+            // just reload metadata
+            ColumnFamilyStore cfs = columnFamilyStores.get(cfId);
+            assert cfs.getColumnFamilyName().equals(cfName);
+
+            try
+            {
+                cfs.metadata.reload();
+                cfs.reload();
+            }
+            catch (IOException e)
+            {
+                throw FBUtilities.unchecked(e);
+            }
+        }
+        else
+        {
+            columnFamilyStores.put(cfId, 
ColumnFamilyStore.createColumnFamilyStore(this, cfName));
+        }
     }
 
     public Row getRow(QueryFilter filter) throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e376bc02/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java 
b/src/java/org/apache/cassandra/service/MigrationManager.java
index d303d40..b37fc2b 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -23,9 +23,8 @@ import java.io.DataOutputStream;
 import java.io.IOError;
 import java.io.IOException;
 import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -37,8 +36,7 @@ import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.DefsTable;
-import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.io.util.FastByteArrayInputStream;
 import org.apache.cassandra.io.util.FastByteArrayOutputStream;
@@ -96,41 +94,7 @@ public class MigrationManager implements 
IEndpointStateChangeSubscriber
          * Do not de-ref the future because that causes distributed deadlock 
(CASSANDRA-3832) because we are
          * running in the gossip stage.
          */
-
-        StageManager.getStage(Stage.MIGRATION).submit(new WrappedRunnable()
-        {
-            public void runMayThrow() throws Exception
-            {
-                Message message = new 
Message(FBUtilities.getBroadcastAddress(),
-                                              
StorageService.Verb.MIGRATION_REQUEST,
-                                              ArrayUtils.EMPTY_BYTE_ARRAY,
-                                              
Gossiper.instance.getVersion(endpoint));
-
-                int retries = 0;
-                while (retries < MIGRATION_REQUEST_RETRIES)
-                {
-                    if (!FailureDetector.instance.isAlive(endpoint))
-                    {
-                        logger.error("Can't send migration request: node {} is 
down.", endpoint);
-                        return;
-                    }
-
-                    IAsyncResult iar = 
MessagingService.instance().sendRR(message, endpoint);
-
-                    try
-                    {
-                        byte[] reply = 
iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-
-                        DefsTable.mergeRemoteSchema(reply, 
message.getVersion());
-                        return;
-                    }
-                    catch(TimeoutException e)
-                    {
-                        retries++;
-                    }
-                }
-            }
-        });
+        StageManager.getStage(Stage.MIGRATION).submit(new 
MigrationTask(endpoint));
     }
 
     public static boolean isReadyForBootstrap()
@@ -244,4 +208,104 @@ public class MigrationManager implements 
IEndpointStateChangeSubscriber
 
         return schema;
     }
+
+    /**
+     * Clear all locally stored schema information and reset schema to initial 
state.
+     * Called by user (via JMX) who wants to get rid of schema disagreement.
+     *
+     * @throws IOException if schema tables truncation fails
+     */
+    public static void resetLocalSchema() throws IOException
+    {
+        logger.info("Starting local schema reset...");
+
+        try
+        {
+            if (logger.isDebugEnabled())
+                logger.debug("Truncating schema tables...");
+
+            // truncate schema tables
+            FBUtilities.waitOnFutures(new ArrayList<Future<?>>()
+            {{
+                
SystemTable.schemaCFS(SystemTable.SCHEMA_KEYSPACES_CF).truncate();
+                
SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNFAMILIES_CF).truncate();
+                
SystemTable.schemaCFS(SystemTable.SCHEMA_COLUMNS_CF).truncate();
+            }});
+
+            if (logger.isDebugEnabled())
+                logger.debug("Clearing local schema keyspace definitions...");
+
+            Schema.instance.clear();
+
+            Set<InetAddress> liveEndpoints = 
Gossiper.instance.getLiveMembers();
+            liveEndpoints.remove(FBUtilities.getBroadcastAddress());
+
+            // force migration is there are nodes around, first of all
+            // check if there are nodes with versions >= 1.1 to request 
migrations from,
+            // because migration format of the nodes with versions < 1.1 is 
incompatible with older versions
+            for (InetAddress node : liveEndpoints)
+            {
+                if (Gossiper.instance.getVersion(node) >= 
MessagingService.VERSION_11)
+                {
+                    if (logger.isDebugEnabled())
+                        logger.debug("Requesting schema from " + node);
+
+                    
FBUtilities.waitOnFuture(StageManager.getStage(Stage.MIGRATION).submit(new 
MigrationTask(node)));
+                    break;
+                }
+            }
+
+            logger.info("Local schema reset is complete.");
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    static class MigrationTask extends WrappedRunnable
+    {
+        private final InetAddress endpoint;
+
+        MigrationTask(InetAddress endpoint)
+        {
+            this.endpoint = endpoint;
+        }
+
+        public void runMayThrow() throws Exception
+        {
+            Message message = new Message(FBUtilities.getBroadcastAddress(),
+                                          
StorageService.Verb.MIGRATION_REQUEST,
+                                          ArrayUtils.EMPTY_BYTE_ARRAY,
+                                          
Gossiper.instance.getVersion(endpoint));
+
+            int retries = 0;
+            while (retries < MIGRATION_REQUEST_RETRIES)
+            {
+                if (!FailureDetector.instance.isAlive(endpoint))
+                {
+                    logger.error("Can't send migration request: node {} is 
down.", endpoint);
+                    return;
+                }
+
+                IAsyncResult iar = MessagingService.instance().sendRR(message, 
endpoint);
+
+                try
+                {
+                    byte[] reply = iar.get(DatabaseDescriptor.getRpcTimeout(), 
TimeUnit.MILLISECONDS);
+
+                    DefsTable.mergeRemoteSchema(reply, message.getVersion());
+                    return;
+                }
+                catch(TimeoutException e)
+                {
+                    retries++;
+                }
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e376bc02/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 7a7bc7c..a0e99ee 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -2955,7 +2955,7 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
     {
         ColumnFamilyStore.loadNewSSTables(ksName, cfName);
     }
-    
+
     /**
      * #{@inheritDoc}
      */
@@ -2973,4 +2973,9 @@ public class StorageService implements 
IEndpointStateChangeSubscriber, StorageSe
     {
         ColumnFamilyStore.rebuildSecondaryIndex(ksName, cfName, idxNames);
     }
+
+    public void resetLocalSchema() throws IOException
+    {
+        MigrationManager.resetLocalSchema();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e376bc02/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 4f589d7..a3cbc7b 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -374,7 +374,7 @@ public interface StorageServiceMBean
      * @param cfName The ColumnFamily name where SSTables belong
      */
     public void loadNewSSTables(String ksName, String cfName);
-    
+
     /**
      * Return a List of Tokens representing a sample of keys
      * across all ColumnFamilyStores
@@ -387,4 +387,6 @@ public interface StorageServiceMBean
      * rebuild the specified indexes
      */
     public void rebuildSecondaryIndex(String ksName, String cfName, String... 
idxNames);
+
+    public void resetLocalSchema() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e376bc02/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java 
b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 198e659..04324f2 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -121,6 +121,7 @@ public class NodeCmd
         DESCRIBERING,
         RANGEKEYSAMPLE,
         REBUILD_INDEX,
+        RESETLOCALSCHEMA
     }
 
     
@@ -150,6 +151,7 @@ public class NodeCmd
         addCmdHelp(header, "gossipinfo", "Shows the gossip information for the 
cluster");
         addCmdHelp(header, "invalidatekeycache", "Invalidate the key cache");
         addCmdHelp(header, "invalidaterowcache", "Invalidate the row cache");
+        addCmdHelp(header, "resetlocalschema", "Reset node's local schema and 
resync");
 
         // One arg
         addCmdHelp(header, "netstats [host]", "Print network information on 
provided host (connecting node by default)");
@@ -682,6 +684,7 @@ public class NodeCmd
                 case DISABLETHRIFT   : probe.stopThriftServer(); break;
                 case ENABLETHRIFT    : probe.startThriftServer(); break;
                 case STATUSTHRIFT    : 
nodeCmd.printIsThriftServerRunning(System.out); break;
+                case RESETLOCALSCHEMA: probe.resetLocalSchema(); break;
     
                 case DRAIN :
                     try { probe.drain(); }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e376bc02/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 8b0d668..fc0c21c 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -653,12 +653,16 @@ public class NodeProbe
     {
         ssProxy.rebuild(sourceDc);
     }
-    
+
     public List<String> getRangeKeySample()
     {
         return ssProxy.getRangeKeySample();
     }
 
+    public void resetLocalSchema() throws IOException
+    {
+        ssProxy.resetLocalSchema();
+    }
 }
 
 class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, 
ColumnFamilyStoreMBean>>

Reply via email to