Author: jbellis
Date: Wed Dec 2 18:51:45 2009
New Revision: 886253
URL: http://svn.apache.org/viewvc?rev=886253&view=rev
Log:
add StorageService.initClient, which starts up Gossiper without setting a token
or anything other application state.
patch by gdusbabek and jbellis for CASSANDRA-535
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=886253&r1=886252&r2=886253&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Wed Dec 2 18:51:45 2009
@@ -390,22 +390,9 @@
columnIndexSizeInKB_ = Integer.parseInt(columnIndexSizeInKB);
}
- /* data file directory */
+ /* data file and commit log directories. they get created later,
when they're needed. */
dataFileDirectories_ =
xmlUtils.getNodeValues("/Storage/DataFileDirectories/DataFileDirectory");
- if (dataFileDirectories_.length == 0)
- {
- throw new ConfigurationException("At least one
DataFileDirectory must be specified");
- }
- for ( String dataFileDirectory : dataFileDirectories_ )
- FileUtils.createDirectory(dataFileDirectory);
-
- /* commit log directory */
logFileDirectory_ =
xmlUtils.getNodeValue("/Storage/CommitLogDirectory");
- if (logFileDirectory_ == null)
- {
- throw new ConfigurationException("CommitLogDirectory must be
specified");
- }
- FileUtils.createDirectory(logFileDirectory_);
/* threshold after which commit log should be rotated. */
String value =
xmlUtils.getNodeValue("/Storage/CommitLogRotationThresholdInMB");
@@ -547,9 +534,6 @@
tableToCFMetaDataMap_.put(Table.SYSTEM_TABLE, systemMetadata);
tableKeysCachedFractions_.put(Table.SYSTEM_TABLE, 0.0);
- /* make sure we have a directory for each table */
- createTableDirectories();
-
/* Load the seeds for node contact points */
String[] seeds = xmlUtils.getNodeValues("/Storage/Seeds/Seed");
if (seeds.length <= 0)
@@ -603,11 +587,31 @@
}
/**
- * Create the table directory in each data directory
+ * Creates all storage-related directories.
+ * @throws IOException when a disk problem is encountered.
*/
- public static void createTableDirectories() throws IOException
+ public static void createAllDirectories() throws IOException
{
- for (String dataFile : dataFileDirectories_)
+ try {
+ if (dataFileDirectories_.length == 0)
+ {
+ throw new ConfigurationException("At least one
DataFileDirectory must be specified");
+ }
+ for ( String dataFileDirectory : dataFileDirectories_ )
+ FileUtils.createDirectory(dataFileDirectory);
+ if (logFileDirectory_ == null)
+ {
+ throw new ConfigurationException("CommitLogDirectory must be
specified");
+ }
+ FileUtils.createDirectory(logFileDirectory_);
+ }
+ catch (ConfigurationException ex) {
+ logger_.error("Fatal error: " + ex.getMessage());
+ System.err.println("Bad configuration; unable to start server");
+ System.exit(1);
+ }
+ /* make sure we have a directory for each table */
+ for (String dataFile : dataFileDirectories_)
{
FileUtils.createDirectory(dataFile + File.separator +
Table.SYSTEM_TABLE);
for (String table : tables_)
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=886253&r1=886252&r2=886253&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed
Dec 2 18:51:45 2009
@@ -29,14 +29,8 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.io.SSTableWriter;
import org.apache.cassandra.io.DataOutputBuffer;
import java.net.InetAddress;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.io.IStreamComplete;
-import org.apache.cassandra.net.io.StreamContextManager;
-import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.db.filter.*;
@@ -53,6 +47,21 @@
private static Timer flushTimer_ = new Timer("FLUSH-TIMER");
+ // This is a result of pushing down the point in time when storage
directories get created. It used to happen in
+ // CassandraDaemon, but it is possible to call Table.open without a
running daemon, so it made sense to ensure
+ // proper directories here.
+ static
+ {
+ try
+ {
+ DatabaseDescriptor.createAllDirectories();
+ }
+ catch (IOException ex)
+ {
+ throw new RuntimeException(ex);
+ }
+ }
+
/*
* This class represents the metadata of this Table. The metadata
* is basically the column family name and the ID associated with
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java?rev=886253&r1=886252&r2=886253&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
Wed Dec 2 18:51:45 2009
@@ -23,6 +23,7 @@
import java.io.IOException;
import java.util.*;
import org.apache.cassandra.io.ICompactSerializer;
+import org.apache.cassandra.service.StorageService;
import org.apache.log4j.Logger;
@@ -82,7 +83,8 @@
}
void addApplicationState(String key, ApplicationState appState)
- {
+ {
+ assert !StorageService.instance().isClientMode();
applicationState_.put(key, appState);
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=886253&r1=886252&r2=886253&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
Wed Dec 2 18:51:45 2009
@@ -66,7 +66,7 @@
LogUtil.init();
//LogUtil.setLogLevel("com.facebook", "DEBUG");
// Start the storage service
- storageService.start();
+ storageService.initServer();
}
protected Map<String, ColumnFamily> readColumnFamily(List<ReadCommand>
commands, int consistency_level)
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=886253&r1=886252&r2=886253&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Wed Dec 2 18:51:45 2009
@@ -32,6 +32,7 @@
import org.apache.cassandra.concurrent.*;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.locator.*;
@@ -149,6 +150,8 @@
/* Are we starting this node in bootstrap mode? */
private boolean isBootstrapMode;
private Set<InetAddress> bootstrapSet;
+ /* when intialized as a client, we shouldn't write to the system table. */
+ private boolean isClientMode;
public synchronized void addBootstrapSource(InetAddress s)
{
@@ -185,8 +188,9 @@
}
/**
- * for bulk loading clients to be able to use
tokenmetadata/messagingservice
- * without fully starting storageservice / systemtable.
+ * Intended for operation in client-only (non-storage mode). E.g.: for
bulk loading clients
+ * to be able to use tokenmetadata/messagingservice without fully starting
storageservice / systemtable,
+ * or java clients that wish to bypase Thrift entirely.
*/
public void updateForeignTokenUnsafe(Token token, InetAddress endpoint)
{
@@ -251,10 +255,34 @@
}
return replicationStrategy;
}
+
+ public void stopClient()
+ {
+ Gossiper.instance().unregister(this);
+ Gossiper.instance().stop();
+ MessagingService.shutdown();
+ }
+
+ public void initClient() throws IOException
+ {
+ isClientMode = true;
+ logger_.info("Starting up client gossip");
+ MessagingService.instance().listen(FBUtilities.getLocalAddress());
+ MessagingService.instance().listenUDP(FBUtilities.getLocalAddress());
+
+ SelectorManager.getSelectorManager().start();
+ SelectorManager.getUdpSelectorManager().start();
+
+ Gossiper.instance().register(this);
+ Gossiper.instance().start(FBUtilities.getLocalAddress(),
(int)(System.currentTimeMillis() / 1000)); // needed for node-ring gathering.
+ }
- public void start() throws IOException
+ public void initServer() throws IOException
{
+ isClientMode = false;
storageMetadata_ = SystemTable.initMetadata();
+ DatabaseDescriptor.createAllDirectories();
+ logger_.info("Starting up server gossip");
/* Listen for application messages */
MessagingService.instance().listen(FBUtilities.getLocalAddress());
@@ -270,7 +298,7 @@
// for bootstrap to get the load info it needs.
// (we won't be part of the storage ring though until we add a nodeId
to our state, below.)
Gossiper.instance().register(this);
- Gossiper.instance().start(FBUtilities.getLocalAddress(),
storageMetadata_.getGeneration());
+ Gossiper.instance().start(FBUtilities.getLocalAddress(),
storageMetadata_.getGeneration()); // needed for node-ring gathering.
if (DatabaseDescriptor.isAutoBootstrap()
&&
!(DatabaseDescriptor.getSeeds().contains(FBUtilities.getLocalAddress()) ||
SystemTable.isBootstrapped()))
@@ -284,7 +312,7 @@
while (isBootstrapMode)
{
try
- {
+ {
Thread.sleep(100);
}
catch (InterruptedException e)
@@ -405,7 +433,10 @@
Token token =
getPartitioner().getTokenFactory().fromString(state.getValue());
if (logger_.isDebugEnabled())
logger_.debug(endpoint + " state normal, token " + token);
- updateForeignToken(token, endpoint);
+ if (isClientMode)
+ updateForeignTokenUnsafe(token, endpoint);
+ else
+ updateForeignToken(token, endpoint);
replicationStrategy_.removeObsoletePendingRanges();
}
else if (STATE_LEAVING.equals(stateName))
@@ -499,7 +530,8 @@
public void onAlive(InetAddress endpoint, EndPointState state)
{
- deliverHints(endpoint);
+ if (!isClientMode)
+ deliverHints(endpoint);
}
public void onDead(InetAddress endpoint, EndPointState state) {}
@@ -983,6 +1015,7 @@
public void run()
{
Gossiper.instance().stop();
+ MessagingService.shutdown();
logger_.info("DECOMMISSION FINISHED.");
// let op be responsible for killing the process
}
@@ -1111,4 +1144,9 @@
{
tokenMetadata_.clearPendingRanges();
}
+
+ public boolean isClientMode()
+ {
+ return isClientMode;
+ }
}