Updated Branches:
  refs/heads/trunk 3d93a3c90 -> 2575275a0

Allow custom configuration loader

patch by slebresne; reviewed by jasobrown for CASSANDRA-5045


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2575275a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2575275a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2575275a

Branch: refs/heads/trunk
Commit: 2575275a04938e361a874ba685e50ac0e88df246
Parents: 3d93a3c
Author: Sylvain Lebresne <[email protected]>
Authored: Fri Jan 4 13:21:11 2013 +0100
Committer: Sylvain Lebresne <[email protected]>
Committed: Wed Mar 6 08:56:42 2013 +0100

----------------------------------------------------------------------
 CHANGES.txt                                        |    3 +-
 src/java/org/apache/cassandra/config/Config.java   |   10 +-
 .../cassandra/config/ConfigurationLoader.java      |   31 +
 .../cassandra/config/DatabaseDescriptor.java       |  656 +++++++--------
 .../cassandra/config/YamlConfigurationLoader.java  |   94 ++
 .../apache/cassandra/hadoop/BulkRecordWriter.java  |    2 +-
 .../apache/cassandra/io/sstable/SSTableLoader.java |    2 +-
 .../org/apache/cassandra/utils/FBUtilities.java    |   11 +-
 .../cassandra/config/DatabaseDescriptorTest.java   |   27 +
 9 files changed, 477 insertions(+), 359 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6da8885..0c1c1ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,4 @@
-1.3
+2.0
  * Move sstable level information into the Stats component, removing the
    need for a separate Manifest file (CASSANDRA-4872)
  * avoid serializing to byte[] on commitlog append (CASSANDRA-5199)
@@ -11,6 +11,7 @@
  * more robust solution to incomplete compactions + counters (CASSANDRA-5151)
  * Change order of directory searching for c*.in.sh (CASSANDRA-3983)
  * Add tool to reset SSTable level (CASSANDRA-5271)
+ * Allow custom configuration loader (CASSANDRA-5045)
 
 
 1.2.3

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index be7e74c..2bc1b0e 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -169,7 +169,7 @@ public class Config
 
     public boolean inter_dc_tcp_nodelay = false;
 
-    private static boolean loadYaml = true;
+    private static boolean isClientMode = false;
     private static boolean outboundBindAny = false;
 
     public static boolean getOutboundBindAny()
@@ -182,14 +182,14 @@ public class Config
         outboundBindAny = value;
     }
 
-    public static boolean getLoadYaml()
+    public static boolean isClientMode()
     {
-       return loadYaml;
+       return isClientMode;
     }
 
-    public static void setLoadYaml(boolean value)
+    public static void setClientMode(boolean clientMode)
     {
-        loadYaml = value;
+        isClientMode = clientMode;
     }
 
     public static enum CommitLogSync

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/src/java/org/apache/cassandra/config/ConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/ConfigurationLoader.java 
b/src/java/org/apache/cassandra/config/ConfigurationLoader.java
new file mode 100644
index 0000000..bb4b7af
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/ConfigurationLoader.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.config;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+public interface ConfigurationLoader
+{
+    /**
+     * Loads a {@link Config} object to use to configure a node.
+     *
+     * @return the {@link Config} to use.
+     * @throws ConfigurationException if the configuration cannot be properly 
loaded.
+     */
+    Config loadConfig() throws ConfigurationException;
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 2a91237..682e45f 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -20,12 +20,11 @@ package org.apache.cassandra.config;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.InetAddress;
-import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.*;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Longs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,10 +52,6 @@ import org.apache.cassandra.scheduler.NoScheduler;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.MigrationManager;
 import org.apache.cassandra.utils.FBUtilities;
-import org.yaml.snakeyaml.Loader;
-import org.yaml.snakeyaml.TypeDescription;
-import org.yaml.snakeyaml.Yaml;
-import org.yaml.snakeyaml.error.YAMLException;
 
 public class DatabaseDescriptor
 {
@@ -79,8 +74,6 @@ public class DatabaseDescriptor
     private static IAuthenticator authenticator = new AllowAllAuthenticator();
     private static IAuthorizer authorizer = new AllowAllAuthorizer();
 
-    private final static String DEFAULT_CONFIGURATION = "cassandra.yaml";
-
     private static IRequestScheduler requestScheduler;
     private static RequestSchedulerId requestSchedulerId;
     private static RequestSchedulerOptions requestSchedulerOptions;
@@ -92,429 +85,404 @@ public class DatabaseDescriptor
     private static String localDC;
     private static Comparator<InetAddress> localComparator;
 
-    /**
-     * Inspect the classpath to find storage configuration file
-     */
-    static URL getStorageConfigURL() throws ConfigurationException
+    static
     {
-        String configUrl = System.getProperty("cassandra.config");
-        if (configUrl == null)
-            configUrl = DEFAULT_CONFIGURATION;
-
-        URL url;
-        try
-        {
-            url = new URL(configUrl);
-            url.openStream().close(); // catches well-formed but bogus URLs
-        }
-        catch (Exception e)
+        // In client mode, we use a default configuration. Note that the 
fields of this class will be
+        // left unconfigured however (the partitioner or localDC will be null 
for instance) so this
+        // should be used with care.
+        if (Config.isClientMode())
         {
-            ClassLoader loader = DatabaseDescriptor.class.getClassLoader();
-            url = loader.getResource(configUrl);
-            if (url == null)
-                throw new ConfigurationException("Cannot locate " + configUrl);
+            conf = new Config();
         }
-
-        return url;
-    }
-
-    static
-    {
-        if (Config.getLoadYaml())
-            loadYaml();
         else
-            conf = new Config();
-    }
-    static void loadYaml()
-    {
-        try
         {
-            URL url = getStorageConfigURL();
-            logger.info("Loading settings from " + url);
-            InputStream input;
             try
             {
-                input = url.openStream();
+                applyConfig(loadConfig());
             }
-            catch (IOException e)
+            catch (ConfigurationException e)
             {
-                // getStorageConfigURL should have ruled this out
-                throw new AssertionError(e);
+                logger.error("Fatal configuration error", e);
+                System.err.println(e.getMessage() + "\nFatal configuration 
error; unable to start server. See log for stacktrace.");
+                System.exit(1);
             }
-            org.yaml.snakeyaml.constructor.Constructor constructor = new 
org.yaml.snakeyaml.constructor.Constructor(Config.class);
-            TypeDescription seedDesc = new 
TypeDescription(SeedProviderDef.class);
-            seedDesc.putMapPropertyType("parameters", String.class, 
String.class);
-            constructor.addTypeDescription(seedDesc);
-            Yaml yaml = new Yaml(new Loader(constructor));
-            conf = (Config)yaml.load(input);
-
-            if (!System.getProperty("os.arch").contains("64"))
-                logger.info("32bit JVM detected.  It is recommended to run 
Cassandra on a 64bit JVM for better performance.");
-
-            if (conf.commitlog_sync == null)
+            catch (Exception e)
             {
-                throw new ConfigurationException("Missing required directive 
CommitLogSync");
+                logger.error("Fatal error during configuration loading", e);
+                System.err.println(e.getMessage() + "\nFatal error during 
configuration loading; unable to start server. See log for stacktrace.");
+                System.exit(1);
             }
+        }
+    }
 
-            if (conf.commitlog_sync == Config.CommitLogSync.batch)
-            {
-                if (conf.commitlog_sync_batch_window_in_ms == null)
-                {
-                    throw new ConfigurationException("Missing value for 
commitlog_sync_batch_window_in_ms: Double expected.");
-                }
-                else if (conf.commitlog_sync_period_in_ms != null)
-                {
-                    throw new ConfigurationException("Batch sync specified, 
but commitlog_sync_period_in_ms found. Only specify 
commitlog_sync_batch_window_in_ms when using batch sync");
-                }
-                logger.debug("Syncing log with a batch window of " + 
conf.commitlog_sync_batch_window_in_ms);
-            }
-            else
-            {
-                if (conf.commitlog_sync_period_in_ms == null)
-                {
-                    throw new ConfigurationException("Missing value for 
commitlog_sync_period_in_ms: Integer expected");
-                }
-                else if (conf.commitlog_sync_batch_window_in_ms != null)
-                {
-                    throw new 
ConfigurationException("commitlog_sync_period_in_ms specified, but 
commitlog_sync_batch_window_in_ms found.  Only specify 
commitlog_sync_period_in_ms when using periodic sync.");
-                }
-                logger.debug("Syncing log with a period of " + 
conf.commitlog_sync_period_in_ms);
-            }
+    @VisibleForTesting
+    static Config loadConfig() throws ConfigurationException
+    {
+        String loaderClass = System.getProperty("cassandra.config.loader");
+        ConfigurationLoader loader = loaderClass == null
+                                   ? new YamlConfigurationLoader()
+                                   : 
FBUtilities.<ConfigurationLoader>construct(loaderClass, "configuration 
loading");
+        return loader.loadConfig();
+    }
 
-            if (conf.commitlog_total_space_in_mb == null)
-                conf.commitlog_total_space_in_mb = 
System.getProperty("os.arch").contains("64") ? 1024 : 32;
+    private static void applyConfig(Config config) throws 
ConfigurationException
+    {
+        conf = config;
+
+        if (!System.getProperty("os.arch").contains("64"))
+            logger.info("32bit JVM detected.  It is recommended to run 
Cassandra on a 64bit JVM for better performance.");
 
-            /* evaluate the DiskAccessMode Config directive, which also 
affects indexAccessMode selection */
-            if (conf.disk_access_mode == Config.DiskAccessMode.auto)
+        if (conf.commitlog_sync == null)
+        {
+            throw new ConfigurationException("Missing required directive 
CommitLogSync");
+        }
+
+        if (conf.commitlog_sync == Config.CommitLogSync.batch)
+        {
+            if (conf.commitlog_sync_batch_window_in_ms == null)
             {
-                conf.disk_access_mode = 
System.getProperty("os.arch").contains("64") ? Config.DiskAccessMode.mmap : 
Config.DiskAccessMode.standard;
-                indexAccessMode = conf.disk_access_mode;
-                logger.info("DiskAccessMode 'auto' determined to be " + 
conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
+                throw new ConfigurationException("Missing value for 
commitlog_sync_batch_window_in_ms: Double expected.");
             }
-            else if (conf.disk_access_mode == 
Config.DiskAccessMode.mmap_index_only)
+            else if (conf.commitlog_sync_period_in_ms != null)
             {
-                conf.disk_access_mode = Config.DiskAccessMode.standard;
-                indexAccessMode = Config.DiskAccessMode.mmap;
-                logger.info("DiskAccessMode is " + conf.disk_access_mode + ", 
indexAccessMode is " + indexAccessMode );
+                throw new ConfigurationException("Batch sync specified, but 
commitlog_sync_period_in_ms found. Only specify 
commitlog_sync_batch_window_in_ms when using batch sync");
             }
-            else
+            logger.debug("Syncing log with a batch window of " + 
conf.commitlog_sync_batch_window_in_ms);
+        }
+        else
+        {
+            if (conf.commitlog_sync_period_in_ms == null)
             {
-                indexAccessMode = conf.disk_access_mode;
-                logger.info("DiskAccessMode is " + conf.disk_access_mode + ", 
indexAccessMode is " + indexAccessMode );
+                throw new ConfigurationException("Missing value for 
commitlog_sync_period_in_ms: Integer expected");
             }
+            else if (conf.commitlog_sync_batch_window_in_ms != null)
+            {
+                throw new ConfigurationException("commitlog_sync_period_in_ms 
specified, but commitlog_sync_batch_window_in_ms found.  Only specify 
commitlog_sync_period_in_ms when using periodic sync.");
+            }
+            logger.debug("Syncing log with a period of " + 
conf.commitlog_sync_period_in_ms);
+        }
+
+        if (conf.commitlog_total_space_in_mb == null)
+            conf.commitlog_total_space_in_mb = 
System.getProperty("os.arch").contains("64") ? 1024 : 32;
 
-            logger.info("disk_failure_policy is " + conf.disk_failure_policy);
+        /* evaluate the DiskAccessMode Config directive, which also affects 
indexAccessMode selection */
+        if (conf.disk_access_mode == Config.DiskAccessMode.auto)
+        {
+            conf.disk_access_mode = 
System.getProperty("os.arch").contains("64") ? Config.DiskAccessMode.mmap : 
Config.DiskAccessMode.standard;
+            indexAccessMode = conf.disk_access_mode;
+            logger.info("DiskAccessMode 'auto' determined to be " + 
conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode );
+        }
+        else if (conf.disk_access_mode == 
Config.DiskAccessMode.mmap_index_only)
+        {
+            conf.disk_access_mode = Config.DiskAccessMode.standard;
+            indexAccessMode = Config.DiskAccessMode.mmap;
+            logger.info("DiskAccessMode is " + conf.disk_access_mode + ", 
indexAccessMode is " + indexAccessMode );
+        }
+        else
+        {
+            indexAccessMode = conf.disk_access_mode;
+            logger.info("DiskAccessMode is " + conf.disk_access_mode + ", 
indexAccessMode is " + indexAccessMode );
+        }
 
-            /* Authentication and authorization backend, implementing 
IAuthenticator and IAuthorizer */
-            if (conf.authenticator != null)
-                authenticator = FBUtilities.construct(conf.authenticator, 
"authenticator");
+        logger.info("disk_failure_policy is " + conf.disk_failure_policy);
 
-            if (conf.authority != null)
-            {
-                logger.warn("Please rename 'authority' to 'authorizer' in 
cassandra.yaml");
-                if 
(!conf.authority.equals("org.apache.cassandra.auth.AllowAllAuthority"))
-                    throw new ConfigurationException("IAuthority interface has 
been deprecated,"
-                                                     + " please implement 
IAuthorizer instead.");
-            }
+        /* Authentication and authorization backend, implementing 
IAuthenticator and IAuthorizer */
+        if (conf.authenticator != null)
+            authenticator = FBUtilities.construct(conf.authenticator, 
"authenticator");
 
-            if (conf.authorizer != null)
-                authorizer = FBUtilities.construct(conf.authorizer, 
"authorizer");
+        if (conf.authority != null)
+        {
+            logger.warn("Please rename 'authority' to 'authorizer' in 
cassandra.yaml");
+            if 
(!conf.authority.equals("org.apache.cassandra.auth.AllowAllAuthority"))
+                throw new ConfigurationException("IAuthority interface has 
been deprecated,"
+                        + " please implement IAuthorizer instead.");
+        }
 
-            authenticator.validateConfiguration();
-            authorizer.validateConfiguration();
+        if (conf.authorizer != null)
+            authorizer = FBUtilities.construct(conf.authorizer, "authorizer");
 
-            /* Hashing strategy */
-            if (conf.partitioner == null)
-            {
-                throw new ConfigurationException("Missing directive: 
partitioner");
-            }
-            try
-            {
-                partitioner = 
FBUtilities.newPartitioner(System.getProperty("cassandra.partitioner", 
conf.partitioner));
-            }
-            catch (Exception e)
-            {
-                throw new ConfigurationException("Invalid partitioner class " 
+ conf.partitioner);
-            }
-            paritionerName = partitioner.getClass().getCanonicalName();
+        authenticator.validateConfiguration();
+        authorizer.validateConfiguration();
 
-            /* phi convict threshold for FailureDetector */
-            if (conf.phi_convict_threshold < 5 || conf.phi_convict_threshold > 
16)
-            {
-                throw new ConfigurationException("phi_convict_threshold must 
be between 5 and 16");
-            }
+        /* Hashing strategy */
+        if (conf.partitioner == null)
+        {
+            throw new ConfigurationException("Missing directive: partitioner");
+        }
+        try
+        {
+            partitioner = 
FBUtilities.newPartitioner(System.getProperty("cassandra.partitioner", 
conf.partitioner));
+        }
+        catch (Exception e)
+        {
+            throw new ConfigurationException("Invalid partitioner class " + 
conf.partitioner);
+        }
+        paritionerName = partitioner.getClass().getCanonicalName();
 
-            /* Thread per pool */
-            if (conf.concurrent_reads != null && conf.concurrent_reads < 2)
-            {
-                throw new ConfigurationException("concurrent_reads must be at 
least 2");
-            }
+        /* phi convict threshold for FailureDetector */
+        if (conf.phi_convict_threshold < 5 || conf.phi_convict_threshold > 16)
+        {
+            throw new ConfigurationException("phi_convict_threshold must be 
between 5 and 16");
+        }
 
-            if (conf.concurrent_writes != null && conf.concurrent_writes < 2)
-            {
-                throw new ConfigurationException("concurrent_writes must be at 
least 2");
-            }
+        /* Thread per pool */
+        if (conf.concurrent_reads != null && conf.concurrent_reads < 2)
+        {
+            throw new ConfigurationException("concurrent_reads must be at 
least 2");
+        }
 
-            if (conf.concurrent_replicates != null && 
conf.concurrent_replicates < 2)
-            {
-                throw new ConfigurationException("concurrent_replicates must 
be at least 2");
-            }
+        if (conf.concurrent_writes != null && conf.concurrent_writes < 2)
+        {
+            throw new ConfigurationException("concurrent_writes must be at 
least 2");
+        }
+
+        if (conf.concurrent_replicates != null && conf.concurrent_replicates < 
2)
+        {
+            throw new ConfigurationException("concurrent_replicates must be at 
least 2");
+        }
 
-            if (conf.memtable_total_space_in_mb == null)
-                conf.memtable_total_space_in_mb = (int) 
(Runtime.getRuntime().maxMemory() / (3 * 1048576));
-            if (conf.memtable_total_space_in_mb <= 0)
-                throw new ConfigurationException("memtable_total_space_in_mb 
must be positive");
-            logger.info("Global memtable threshold is enabled at {}MB", 
conf.memtable_total_space_in_mb);
+        if (conf.memtable_total_space_in_mb == null)
+            conf.memtable_total_space_in_mb = (int) 
(Runtime.getRuntime().maxMemory() / (3 * 1048576));
+        if (conf.memtable_total_space_in_mb <= 0)
+            throw new ConfigurationException("memtable_total_space_in_mb must 
be positive");
+        logger.info("Global memtable threshold is enabled at {}MB", 
conf.memtable_total_space_in_mb);
+
+        /* Memtable flush writer threads */
+        if (conf.memtable_flush_writers != null && conf.memtable_flush_writers 
< 1)
+        {
+            throw new ConfigurationException("memtable_flush_writers must be 
at least 1");
+        }
+        else if (conf.memtable_flush_writers == null)
+        {
+            conf.memtable_flush_writers = conf.data_file_directories.length;
+        }
 
-            /* Memtable flush writer threads */
-            if (conf.memtable_flush_writers != null && 
conf.memtable_flush_writers < 1)
+        /* Local IP or hostname to bind services to */
+        if (conf.listen_address != null)
+        {
+            try
             {
-                throw new ConfigurationException("memtable_flush_writers must 
be at least 1");
+                listenAddress = InetAddress.getByName(conf.listen_address);
             }
-            else if (conf.memtable_flush_writers == null)
+            catch (UnknownHostException e)
             {
-                conf.memtable_flush_writers = 
conf.data_file_directories.length;
+                throw new ConfigurationException("Unknown listen_address '" + 
conf.listen_address + "'");
             }
+        }
 
-            /* Local IP or hostname to bind services to */
-            if (conf.listen_address != null)
+        /* Gossip Address to broadcast */
+        if (conf.broadcast_address != null)
+        {
+            if (conf.broadcast_address.equals("0.0.0.0"))
             {
-                try
-                {
-                    listenAddress = InetAddress.getByName(conf.listen_address);
-                }
-                catch (UnknownHostException e)
-                {
-                    throw new ConfigurationException("Unknown listen_address 
'" + conf.listen_address + "'");
-                }
+                throw new ConfigurationException("broadcast_address cannot be 
0.0.0.0!");
             }
 
-            /* Gossip Address to broadcast */
-            if (conf.broadcast_address != null)
+            try
             {
-                if (conf.broadcast_address.equals("0.0.0.0"))
-                {
-                    throw new ConfigurationException("broadcast_address cannot 
be 0.0.0.0!");
-                }
-
-                try
-                {
-                    broadcastAddress = 
InetAddress.getByName(conf.broadcast_address);
-                }
-                catch (UnknownHostException e)
-                {
-                    throw new ConfigurationException("Unknown 
broadcast_address '" + conf.broadcast_address + "'");
-                }
+                broadcastAddress = 
InetAddress.getByName(conf.broadcast_address);
+            }
+            catch (UnknownHostException e)
+            {
+                throw new ConfigurationException("Unknown broadcast_address '" 
+ conf.broadcast_address + "'");
             }
+        }
 
-            /* Local IP or hostname to bind RPC server to */
-            if (conf.rpc_address != null)
+        /* Local IP or hostname to bind RPC server to */
+        if (conf.rpc_address != null)
+        {
+            try
             {
-                try
-                {
-                    rpcAddress = InetAddress.getByName(conf.rpc_address);
-                }
-                catch (UnknownHostException e)
-                {
-                    throw new ConfigurationException("Unknown host in 
rpc_address " + conf.rpc_address);
-                }
+                rpcAddress = InetAddress.getByName(conf.rpc_address);
             }
-            else
+            catch (UnknownHostException e)
             {
-                rpcAddress = FBUtilities.getLocalAddress();
+                throw new ConfigurationException("Unknown host in rpc_address 
" + conf.rpc_address);
             }
+        }
+        else
+        {
+            rpcAddress = FBUtilities.getLocalAddress();
+        }
 
-            if (conf.thrift_framed_transport_size_in_mb <= 0)
-                throw new 
ConfigurationException("thrift_framed_transport_size_in_mb must be positive");
+        if (conf.thrift_framed_transport_size_in_mb <= 0)
+            throw new 
ConfigurationException("thrift_framed_transport_size_in_mb must be positive");
 
-            if (conf.thrift_max_message_length_in_mb < 
conf.thrift_framed_transport_size_in_mb)
-                throw new 
ConfigurationException("thrift_max_message_length_in_mb must be greater than 
thrift_framed_transport_size_in_mb");
+        if (conf.thrift_max_message_length_in_mb < 
conf.thrift_framed_transport_size_in_mb)
+            throw new ConfigurationException("thrift_max_message_length_in_mb 
must be greater than thrift_framed_transport_size_in_mb");
 
-            /* end point snitch */
-            if (conf.endpoint_snitch == null)
-            {
-                throw new ConfigurationException("Missing endpoint_snitch 
directive");
-            }
-            snitch = createEndpointSnitch(conf.endpoint_snitch);
-            EndpointSnitchInfo.create();
+        /* end point snitch */
+        if (conf.endpoint_snitch == null)
+        {
+            throw new ConfigurationException("Missing endpoint_snitch 
directive");
+        }
+        snitch = createEndpointSnitch(conf.endpoint_snitch);
+        EndpointSnitchInfo.create();
 
-            localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
-            localComparator = new Comparator<InetAddress>()
+        localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
+        localComparator = new Comparator<InetAddress>()
+        {
+            public int compare(InetAddress endpoint1, InetAddress endpoint2)
             {
-                public int compare(InetAddress endpoint1, InetAddress 
endpoint2)
-                {
-                    boolean local1 = 
localDC.equals(snitch.getDatacenter(endpoint1));
-                    boolean local2 = 
localDC.equals(snitch.getDatacenter(endpoint2));
-                    if (local1 && !local2)
-                        return -1;
-                    if (local2 && !local1)
-                        return 1;
-                    return 0;
-                }
-            };
+                boolean local1 = 
localDC.equals(snitch.getDatacenter(endpoint1));
+                boolean local2 = 
localDC.equals(snitch.getDatacenter(endpoint2));
+                if (local1 && !local2)
+                    return -1;
+                if (local2 && !local1)
+                    return 1;
+                return 0;
+            }
+        };
 
-            /* Request Scheduler setup */
-            requestSchedulerOptions = conf.request_scheduler_options;
-            if (conf.request_scheduler != null)
+        /* Request Scheduler setup */
+        requestSchedulerOptions = conf.request_scheduler_options;
+        if (conf.request_scheduler != null)
+        {
+            try
             {
-                try
-                {
-                    if (requestSchedulerOptions == null)
-                    {
-                        requestSchedulerOptions = new 
RequestSchedulerOptions();
-                    }
-                    Class<?> cls = Class.forName(conf.request_scheduler);
-                    requestScheduler = (IRequestScheduler) 
cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
-                }
-                catch (ClassNotFoundException e)
-                {
-                    throw new ConfigurationException("Invalid Request 
Scheduler class " + conf.request_scheduler);
-                }
-                catch (Exception e)
+                if (requestSchedulerOptions == null)
                 {
-                    throw new ConfigurationException("Unable to instantiate 
request scheduler", e);
+                    requestSchedulerOptions = new RequestSchedulerOptions();
                 }
+                Class<?> cls = Class.forName(conf.request_scheduler);
+                requestScheduler = (IRequestScheduler) 
cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
             }
-            else
-            {
-                requestScheduler = new NoScheduler();
-            }
-
-            if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
+            catch (ClassNotFoundException e)
             {
-                requestSchedulerId = conf.request_scheduler_id;
+                throw new ConfigurationException("Invalid Request Scheduler 
class " + conf.request_scheduler);
             }
-            else
+            catch (Exception e)
             {
-                // Default to Keyspace
-                requestSchedulerId = RequestSchedulerId.keyspace;
+                throw new ConfigurationException("Unable to instantiate 
request scheduler", e);
             }
+        }
+        else
+        {
+            requestScheduler = new NoScheduler();
+        }
 
-            if (logger.isDebugEnabled() && conf.auto_bootstrap != null)
-            {
-                logger.debug("setting auto_bootstrap to " + 
conf.auto_bootstrap);
-            }
+        if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
+        {
+            requestSchedulerId = conf.request_scheduler_id;
+        }
+        else
+        {
+            // Default to Keyspace
+            requestSchedulerId = RequestSchedulerId.keyspace;
+        }
 
-           if (conf.in_memory_compaction_limit_in_mb != null && 
conf.in_memory_compaction_limit_in_mb <= 0)
-            {
-                throw new 
ConfigurationException("in_memory_compaction_limit_in_mb must be a positive 
integer");
-            }
+        if (logger.isDebugEnabled() && conf.auto_bootstrap != null)
+        {
+            logger.debug("setting auto_bootstrap to " + conf.auto_bootstrap);
+        }
 
-            if (conf.concurrent_compactors == null)
-                conf.concurrent_compactors = 
FBUtilities.getAvailableProcessors();
+        if (conf.in_memory_compaction_limit_in_mb != null && 
conf.in_memory_compaction_limit_in_mb <= 0)
+        {
+            throw new ConfigurationException("in_memory_compaction_limit_in_mb 
must be a positive integer");
+        }
 
-            if (conf.concurrent_compactors <= 0)
-                throw new ConfigurationException("concurrent_compactors should 
be strictly greater than 0");
+        if (conf.concurrent_compactors == null)
+            conf.concurrent_compactors = FBUtilities.getAvailableProcessors();
 
-            if (conf.compaction_throughput_mb_per_sec == null)
-                conf.compaction_throughput_mb_per_sec = 16;
+        if (conf.concurrent_compactors <= 0)
+            throw new ConfigurationException("concurrent_compactors should be 
strictly greater than 0");
 
-            if (conf.stream_throughput_outbound_megabits_per_sec == null)
-                conf.stream_throughput_outbound_megabits_per_sec = 400;
+        if (conf.compaction_throughput_mb_per_sec == null)
+            conf.compaction_throughput_mb_per_sec = 16;
 
-            if (conf.rpc_min_threads == null)
-                conf.rpc_min_threads = 16;
+        if (conf.stream_throughput_outbound_megabits_per_sec == null)
+            conf.stream_throughput_outbound_megabits_per_sec = 400;
 
-            if (conf.rpc_max_threads == null)
-                conf.rpc_max_threads = Integer.MAX_VALUE;
+        if (conf.rpc_min_threads == null)
+            conf.rpc_min_threads = 16;
 
-            /* data file and commit log directories. they get created later, 
when they're needed. */
-            if (conf.commitlog_directory != null && conf.data_file_directories 
!= null && conf.saved_caches_directory != null)
-            {
-                for (String datadir : conf.data_file_directories)
-                {
-                    if (datadir.equals(conf.commitlog_directory))
-                        throw new ConfigurationException("commitlog_directory 
must not be the same as any data_file_directories");
-                    if (datadir.equals(conf.saved_caches_directory))
-                        throw new 
ConfigurationException("saved_caches_directory must not be the same as any 
data_file_directories");
-                }
+        if (conf.rpc_max_threads == null)
+            conf.rpc_max_threads = Integer.MAX_VALUE;
 
-                if 
(conf.commitlog_directory.equals(conf.saved_caches_directory))
-                    throw new ConfigurationException("saved_caches_directory 
must not be the same as the commitlog_directory");
-            }
-            else
+        /* data file and commit log directories. they get created later, when 
they're needed. */
+        if (conf.commitlog_directory != null && conf.data_file_directories != 
null && conf.saved_caches_directory != null)
+        {
+            for (String datadir : conf.data_file_directories)
             {
-                if (conf.commitlog_directory == null)
-                    throw new ConfigurationException("commitlog_directory 
missing");
-                if (conf.data_file_directories == null)
-                    throw new ConfigurationException("data_file_directories 
missing; at least one data directory must be specified");
-                if (conf.saved_caches_directory == null)
-                    throw new ConfigurationException("saved_caches_directory 
missing");
+                if (datadir.equals(conf.commitlog_directory))
+                    throw new ConfigurationException("commitlog_directory must 
not be the same as any data_file_directories");
+                if (datadir.equals(conf.saved_caches_directory))
+                    throw new ConfigurationException("saved_caches_directory 
must not be the same as any data_file_directories");
             }
 
-            if (conf.initial_token != null)
-                for (String token : tokensFromString(conf.initial_token))
-                    partitioner.getTokenFactory().validate(token);
+            if (conf.commitlog_directory.equals(conf.saved_caches_directory))
+                throw new ConfigurationException("saved_caches_directory must 
not be the same as the commitlog_directory");
+        }
+        else
+        {
+            if (conf.commitlog_directory == null)
+                throw new ConfigurationException("commitlog_directory 
missing");
+            if (conf.data_file_directories == null)
+                throw new ConfigurationException("data_file_directories 
missing; at least one data directory must be specified");
+            if (conf.saved_caches_directory == null)
+                throw new ConfigurationException("saved_caches_directory 
missing");
+        }
+
+        if (conf.initial_token != null)
+            for (String token : tokensFromString(conf.initial_token))
+                partitioner.getTokenFactory().validate(token);
 
-            try
-            {
-                // if key_cache_size_in_mb option was set to "auto" then size 
of the cache should be "min(5% of Heap (in MB), 100MB)
-                keyCacheSizeInMB = (conf.key_cache_size_in_mb == null)
-                                    ? Math.min(Math.max(1, (int) 
(Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)), 100)
-                                    : conf.key_cache_size_in_mb;
+        try
+        {
+            // if key_cache_size_in_mb option was set to "auto" then size of 
the cache should be "min(5% of Heap (in MB), 100MB)
+            keyCacheSizeInMB = (conf.key_cache_size_in_mb == null)
+                ? Math.min(Math.max(1, (int) 
(Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)), 100)
+                : conf.key_cache_size_in_mb;
 
-                if (keyCacheSizeInMB < 0)
-                    throw new NumberFormatException(); // to escape 
duplicating error message
-            }
-            catch (NumberFormatException e)
-            {
-                throw new ConfigurationException("key_cache_size_in_mb option 
was set incorrectly to '"
-                                                 + conf.key_cache_size_in_mb + 
"', supported values are <integer> >= 0.");
-            }
+            if (keyCacheSizeInMB < 0)
+                throw new NumberFormatException(); // to escape duplicating 
error message
+        }
+        catch (NumberFormatException e)
+        {
+            throw new ConfigurationException("key_cache_size_in_mb option was 
set incorrectly to '"
+                    + conf.key_cache_size_in_mb + "', supported values are 
<integer> >= 0.");
+        }
 
-            rowCacheProvider = 
FBUtilities.newCacheProvider(conf.row_cache_provider);
-            memoryAllocator = 
FBUtilities.newOffHeapAllocator(conf.memory_allocator);
+        rowCacheProvider = 
FBUtilities.newCacheProvider(conf.row_cache_provider);
+        memoryAllocator = 
FBUtilities.newOffHeapAllocator(conf.memory_allocator);
 
-            if(conf.encryption_options != null)
-            {
-                logger.warn("Please rename encryption_options as 
server_encryption_options in the yaml");
-                //operate under the assumption that server_encryption_options 
is not set in yaml rather than both
-                conf.server_encryption_options = conf.encryption_options;
-            }
+        if(conf.encryption_options != null)
+        {
+            logger.warn("Please rename encryption_options as 
server_encryption_options in the yaml");
+            //operate under the assumption that server_encryption_options is 
not set in yaml rather than both
+            conf.server_encryption_options = conf.encryption_options;
+        }
 
-            // Hardcoded system tables
-            List<KSMetaData> systemKeyspaces = 
Arrays.asList(KSMetaData.systemKeyspace(), KSMetaData.traceKeyspace());
-            assert systemKeyspaces.size() == Schema.systemKeyspaceNames.size();
-            for (KSMetaData ksmd : systemKeyspaces)
-            {
-                // install the definition
-                for (CFMetaData cfm : ksmd.cfMetaData().values())
-                    Schema.instance.load(cfm);
-                Schema.instance.setTableDefinition(ksmd);
-            }
+        // Hardcoded system tables
+        List<KSMetaData> systemKeyspaces = 
Arrays.asList(KSMetaData.systemKeyspace(), KSMetaData.traceKeyspace());
+        assert systemKeyspaces.size() == Schema.systemKeyspaceNames.size();
+        for (KSMetaData ksmd : systemKeyspaces)
+        {
+            // install the definition
+            for (CFMetaData cfm : ksmd.cfMetaData().values())
+                Schema.instance.load(cfm);
+            Schema.instance.setTableDefinition(ksmd);
+        }
 
-            /* Load the seeds for node contact points */
-            if (conf.seed_provider == null)
-            {
-                throw new ConfigurationException("seeds configuration is 
missing; a minimum of one seed is required.");
-            }
-            try
-            {
-                Class<?> seedProviderClass = 
Class.forName(conf.seed_provider.class_name);
-                seedProvider = 
(SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters);
-            }
-            // there are about 5 checked exceptions that could be thrown here.
-            catch (Exception e)
-            {
-                logger.error("Fatal configuration error", e);
-                System.err.println(e.getMessage() + "\nFatal configuration 
error; unable to start server.  See log for stacktrace.");
-                System.exit(1);
-            }
-            if (seedProvider.getSeeds().size() == 0)
-                throw new ConfigurationException("The seed provider lists no 
seeds.");
+        /* Load the seeds for node contact points */
+        if (conf.seed_provider == null)
+        {
+            throw new ConfigurationException("seeds configuration is missing; 
a minimum of one seed is required.");
         }
-        catch (ConfigurationException e)
+        try
         {
-            logger.error("Fatal configuration error", e);
-            System.err.println(e.getMessage() + "\nFatal configuration error; 
unable to start server.  See log for stacktrace.");
-            System.exit(1);
+            Class<?> seedProviderClass = 
Class.forName(conf.seed_provider.class_name);
+            seedProvider = 
(SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters);
         }
-        catch (YAMLException e)
+        // there are about 5 checked exceptions that could be thrown here.
+        catch (Exception e)
         {
-            logger.error("Fatal configuration error error", e);
-            System.err.println(e.getMessage() + "\nInvalid yaml; unable to 
start server.  See log for stacktrace.");
+            logger.error("Fatal configuration error", e);
+            System.err.println(e.getMessage() + "\nFatal configuration error; 
unable to start server.  See log for stacktrace.");
             System.exit(1);
         }
+        if (seedProvider.getSeeds().size() == 0)
+            throw new ConfigurationException("The seed provider lists no 
seeds.");
     }
 
     private static IEndpointSnitch createEndpointSnitch(String 
snitchClassName) throws ConfigurationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java 
b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
new file mode 100644
index 0000000..a46cd65
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.config;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+import org.yaml.snakeyaml.Loader;
+import org.yaml.snakeyaml.TypeDescription;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.error.YAMLException;
+
+public class YamlConfigurationLoader implements ConfigurationLoader
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(YamlConfigurationLoader.class);
+
+    private final static String DEFAULT_CONFIGURATION = "cassandra.yaml";
+
+    /**
+     * Inspect the classpath to find storage configuration file
+     */
+    private URL getStorageConfigURL() throws ConfigurationException
+    {
+        String configUrl = System.getProperty("cassandra.config");
+        if (configUrl == null)
+            configUrl = DEFAULT_CONFIGURATION;
+
+        URL url;
+        try
+        {
+            url = new URL(configUrl);
+            url.openStream().close(); // catches well-formed but bogus URLs
+        }
+        catch (Exception e)
+        {
+            ClassLoader loader = DatabaseDescriptor.class.getClassLoader();
+            url = loader.getResource(configUrl);
+            if (url == null)
+                throw new ConfigurationException("Cannot locate " + configUrl);
+        }
+
+        return url;
+    }
+
+    public Config loadConfig() throws ConfigurationException
+    {
+        try
+        {
+            URL url = getStorageConfigURL();
+            logger.info("Loading settings from " + url);
+            InputStream input;
+            try
+            {
+                input = url.openStream();
+            }
+            catch (IOException e)
+            {
+                // getStorageConfigURL should have ruled this out
+                throw new AssertionError(e);
+            }
+            org.yaml.snakeyaml.constructor.Constructor constructor = new 
org.yaml.snakeyaml.constructor.Constructor(Config.class);
+            TypeDescription seedDesc = new 
TypeDescription(SeedProviderDef.class);
+            seedDesc.putMapPropertyType("parameters", String.class, 
String.class);
+            constructor.addTypeDescription(seedDesc);
+            Yaml yaml = new Yaml(new Loader(constructor));
+            return (Config)yaml.load(input);
+        }
+        catch (YAMLException e)
+        {
+            throw new ConfigurationException("Invalid yaml", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java 
b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index 3f70ca5..e0bd09b 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -94,7 +94,7 @@ implements 
org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
 
     BulkRecordWriter(Configuration conf) throws IOException
     {
-        Config.setLoadYaml(false);
+        Config.setClientMode(true);
         Config.setOutboundBindAny(true);
         this.conf = conf;
         
DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS,
 "0")));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 32dbbc3..263f39c 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -49,7 +49,7 @@ public class SSTableLoader
 
     static
     {
-        Config.setLoadYaml(false);
+        Config.setClientMode(true);
     }
 
     public SSTableLoader(File directory, Client client, OutputHandler 
outputHandler)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/src/java/org/apache/cassandra/utils/FBUtilities.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java 
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 15f7fb0..e6e2e50 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -453,7 +453,7 @@ public class FBUtilities
     }
 
     /**
-     * Constructs an instance of the given class, which must have a no-arg 
constructor.
+     * Constructs an instance of the given class, which must have a no-arg or 
default constructor.
      * @param classname Fully qualified classname.
      * @param readable Descriptive noun for the role the class plays.
      * @throws ConfigurationException If the class cannot be found.
@@ -463,11 +463,7 @@ public class FBUtilities
         Class<T> cls = FBUtilities.classForName(classname, readable);
         try
         {
-            return cls.getConstructor().newInstance();
-        }
-        catch (NoSuchMethodException e)
-        {
-            throw new ConfigurationException(String.format("No default 
constructor for %s class '%s'.", readable, classname));
+            return cls.newInstance();
         }
         catch (IllegalAccessException e)
         {
@@ -477,8 +473,9 @@ public class FBUtilities
         {
             throw new ConfigurationException(String.format("Cannot use 
abstract class '%s' as %s.", classname, readable));
         }
-        catch (InvocationTargetException e)
+        catch (Exception e)
         {
+            // Catch-all because Class.newInstance() "propagates any exception 
thrown by the nullary constructor, including a checked exception".
             if (e.getCause() instanceof ConfigurationException)
                 throw (ConfigurationException)e.getCause();
             throw new ConfigurationException(String.format("Error 
instantiating %s class '%s'.", readable, classname), e);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java 
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
index a357ed6..9cca997 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
@@ -26,6 +26,8 @@ import org.apache.cassandra.locator.SimpleStrategy;
 import org.apache.cassandra.service.MigrationManager;
 
 import org.junit.Test;
+import static org.junit.Assert.*;
+
 
 import java.io.IOException;
 
@@ -94,4 +96,29 @@ public class DatabaseDescriptorTest
             Gossiper.instance.stop();
         }
     }
+
+    @Test
+    public void testConfigurationLoader() throws Exception
+    {
+        // By default, we should load from the yaml
+        Config config = DatabaseDescriptor.loadConfig();
+        assertEquals("Test Cluster", config.cluster_name);
+
+        // Now try custom loader
+        ConfigurationLoader testLoader = new TestLoader();
+        System.setProperty("cassandra.config.loader", 
testLoader.getClass().getName());
+
+        config = DatabaseDescriptor.loadConfig();
+        assertEquals("ConfigurationLoader Test", config.cluster_name);
+    }
+
+    public static class TestLoader implements ConfigurationLoader
+    {
+        public Config loadConfig() throws ConfigurationException
+        {
+            Config testConfig = new Config();
+            testConfig.cluster_name = "ConfigurationLoader Test";;
+            return testConfig;
+        }
+    }
 }

Reply via email to