Author: gdusbabek
Date: Tue Apr 6 16:01:57 2010
New Revision: 931200
URL: http://svn.apache.org/viewvc?rev=931200&view=rev
Log:
Gossip metadata version and request updates. Patch by Gary Dusbabek, reviewed
by Jonthan Ellis. CASSANDRA-827
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
cassandra/trunk/test/conf/storage-conf.xml
Modified:
cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java?rev=931200&r1=931199&r2=931200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/concurrent/StageManager.java
Tue Apr 6 16:01:57 2010
@@ -47,6 +47,7 @@ public class StageManager
public static final String RESPONSE_STAGE = "RESPONSE-STAGE";
public final static String AE_SERVICE_STAGE = "AE-SERVICE-STAGE";
private static final String LOADBALANCE_STAGE = "LOAD-BALANCER-STAGE";
+ public static final String MIGRATION_STAGE = "MIGRATION-STAGE";
static
{
@@ -58,6 +59,7 @@ public class StageManager
stages.put(GOSSIP_STAGE, new JMXEnabledThreadPoolExecutor("GMFD"));
stages.put(AE_SERVICE_STAGE, new
JMXEnabledThreadPoolExecutor(AE_SERVICE_STAGE));
stages.put(LOADBALANCE_STAGE, new
JMXEnabledThreadPoolExecutor(LOADBALANCE_STAGE));
+ stages.put(MIGRATION_STAGE, new
JMXEnabledThreadPoolExecutor(MIGRATION_STAGE));
}
private static ThreadPoolExecutor multiThreadedStage(String name, int
numThreads)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=931200&r1=931199&r2=931200&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Tue Apr 6 16:01:57 2010
@@ -132,7 +132,8 @@ public class DatabaseDescriptor
private final static String STORAGE_CONF_FILE = "storage-conf.xml";
- private static UUID defsVersion = null;
+ private static final UUID INITIAL_VERSION = new UUID(4096, 0); // has type
nibble set to 1, everything else to zero.
+ private static UUID defsVersion = INITIAL_VERSION;
/**
* Try the storage-config system property, and then inspect the classpath.
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java?rev=931200&view=auto
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
(added)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsAnnounceVerbHandler.java
Tue Apr 6 16:01:57 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.MigrationManager;
+
+import java.util.UUID;
+
+public class DefinitionsAnnounceVerbHandler implements IVerbHandler
+{
+
+ /** someone is announcing their schema version. */
+ public void doVerb(Message message)
+ {
+ UUID theirVersion = UUID.fromString(new
String(message.getMessageBody()));
+ MigrationManager.rectify(theirVersion, message.getFrom());
+ }
+}
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java?rev=931200&view=auto
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
(added)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/DefinitionsUpdateResponseVerbHandler.java
Tue Apr 6 16:01:57 2010
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.migration.Migration;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.cassandra.utils.WrappedRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.UUID;
+
+public class DefinitionsUpdateResponseVerbHandler implements IVerbHandler
+{
+ private static final Logger logger =
LoggerFactory.getLogger(DefinitionsUpdateResponseVerbHandler.class);
+
+ /** someone sent me their data definitions */
+ public void doVerb(final Message message)
+ {
+ try
+ {
+ // these are the serialized row mutations that I must apply.
+ // check versions at every step along the way to make sure
migrations are not applied out of order.
+ Collection<Column> cols = MigrationManager.makeColumns(message);
+ for (Column col : cols)
+ {
+ final UUID version = UUIDGen.makeType1UUID(col.name());
+ if (version.timestamp() >
DatabaseDescriptor.getDefsVersion().timestamp())
+ {
+ final Migration m = Migration.deserialize(new
ByteArrayInputStream(col.value()));
+ assert m.getVersion().equals(version);
+
StageManager.getStage(StageManager.MIGRATION_STAGE).submit(new WrappedRunnable()
+ {
+ @Override
+ protected void runMayThrow() throws Exception
+ {
+ // check to make sure the current version is
before this one.
+ if
(DatabaseDescriptor.getDefsVersion().timestamp() >= version.timestamp())
+ logger.debug("Not applying " +
version.toString());
+ else
+ {
+ logger.debug("Applying {} from {}",
m.getClass().getSimpleName(), message.getFrom());
+ m.apply();
+ m.announce();
+ }
+ }
+ });
+ }
+ }
+ }
+ catch (IOException ex)
+ {
+ throw new IOError(ex);
+ }
+ }
+}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=931200&r1=931199&r2=931200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Tue Apr 6 16:01:57 2010
@@ -19,7 +19,6 @@
package org.apache.cassandra.service;
import java.io.IOException;
-import java.io.IOError;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
import java.util.*;
@@ -40,9 +39,7 @@ import org.apache.cassandra.db.commitlog
import org.apache.cassandra.dht.*;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.io.DeletionService;
-import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.IndexSummary;
import org.apache.cassandra.locator.*;
import org.apache.cassandra.net.*;
import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
@@ -106,6 +103,8 @@ public class StorageService implements I
GOSSIP_DIGEST_SYN,
GOSSIP_DIGEST_ACK,
GOSSIP_DIGEST_ACK2,
+ DEFINITIONS_ANNOUNCE,
+ DEFINITIONS_UPDATE_RESPONSE,
;
// remember to add new verbs at the end, since we serialize by ordinal
}
@@ -151,6 +150,7 @@ public class StorageService implements I
private boolean isClientMode;
private boolean initialized;
private String operationMode;
+ private MigrationManager migrationManager = new MigrationManager();
public void addBootstrapSource(InetAddress s, String table)
{
@@ -226,14 +226,13 @@ public class StorageService implements I
MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN,
new Gossiper.GossipDigestSynVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK,
new Gossiper.GossipDigestAckVerbHandler());
MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new
Gossiper.GossipDigestAck2VerbHandler());
+
+
MessagingService.instance.registerVerbHandlers(Verb.DEFINITIONS_ANNOUNCE, new
DefinitionsAnnounceVerbHandler());
+
MessagingService.instance.registerVerbHandlers(Verb.DEFINITIONS_UPDATE_RESPONSE,
new DefinitionsUpdateResponseVerbHandler());
replicationStrategies = new HashMap<String,
AbstractReplicationStrategy>();
for (String table : DatabaseDescriptor.getNonSystemTables())
- {
- AbstractReplicationStrategy strat =
getReplicationStrategy(tokenMetadata_, table);
- replicationStrategies.put(table, strat);
- }
- replicationStrategies =
Collections.unmodifiableMap(replicationStrategies);
+ initReplicationStrategy(table);
// spin up the streaming serivice so it is available for jmx tools.
if (StreamingService.instance == null)
@@ -281,6 +280,7 @@ public class StorageService implements I
public void stopClient()
{
+ Gossiper.instance.unregister(migrationManager);
Gossiper.instance.unregister(this);
Gossiper.instance.stop();
MessagingService.shutdown();
@@ -336,6 +336,7 @@ public class StorageService implements I
// for bootstrap to get the load info it needs.
// (we won't be part of the storage ring though until we add a nodeId
to our state, below.)
Gossiper.instance.register(this);
+ Gossiper.instance.register(migrationManager);
Gossiper.instance.start(FBUtilities.getLocalAddress(),
storageMetadata_.getGeneration()); // needed for node-ring gathering.
if (DatabaseDescriptor.isAutoBootstrap()
@@ -357,7 +358,17 @@ public class StorageService implements I
}
setMode("Joining: getting bootstrap token", true);
Token token = BootStrapper.getBootstrapToken(tokenMetadata_,
StorageLoadBalancer.instance.getLoadInfo());
- startBootstrap(token);
+ // don't bootstrap if there are no tables defined.
+ if (DatabaseDescriptor.getNonSystemTables().size() > 0)
+ startBootstrap(token);
+ else
+ {
+ isBootstrapMode = false;
+ SystemTable.setBootstrapped(true);
+ tokenMetadata_.updateNormalToken(token,
FBUtilities.getLocalAddress());
+ Gossiper.instance.addLocalApplicationState(MOVE_STATE, new
ApplicationState(STATE_NORMAL + Delimiter +
partitioner_.getTokenFactory().toString(token)));
+ setMode("Normal", false);
+ }
// don't finish startup (enabling thrift) until after bootstrap is
done
while (isBootstrapMode)
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=931200&r1=931199&r2=931200&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
Tue Apr 6 16:01:57 2010
@@ -530,14 +530,12 @@ public class CassandraServer implements
{
Map<String, Map<String, String>> columnFamiliesMap = new
HashMap<String, Map<String, String>>();
- Map<String, CFMetaData> tableMetaData =
DatabaseDescriptor.getTableMetaData(table);
- // table doesn't exist
- if (tableMetaData == null)
- {
+ KSMetaData ksm = DatabaseDescriptor.getTableDefinition(table);
+ if (ksm == null)
throw new NotFoundException();
- }
+
- for (Map.Entry<String, CFMetaData> stringCFMetaDataEntry :
tableMetaData.entrySet())
+ for (Map.Entry<String, CFMetaData> stringCFMetaDataEntry :
ksm.cfMetaData().entrySet())
{
CFMetaData columnFamilyMetaData = stringCFMetaDataEntry.getValue();
@@ -679,3 +677,4 @@ public class CassandraServer implements
// main method moved to CassandraDaemon
}
+
\ No newline at end of file
Modified: cassandra/trunk/test/conf/storage-conf.xml
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/conf/storage-conf.xml?rev=931200&r1=931199&r2=931200&view=diff
==============================================================================
--- cassandra/trunk/test/conf/storage-conf.xml (original)
+++ cassandra/trunk/test/conf/storage-conf.xml Tue Apr 6 16:01:57 2010
@@ -77,7 +77,7 @@
</Keyspace>
</Keyspaces>
<Seeds>
- <!-- Add names of hosts that are deemed contact points -->
- <Seed>127.0.0.1</Seed>
+ <!-- we don't want this node to think it is a seed. -->
+ <Seed>127.0.0.2</Seed>
</Seeds>
</Storage>