Updated Branches: refs/heads/trunk 3d93a3c90 -> 2575275a0
Allow custom configuration loader patch by slebresne; reviewed by jasobrown for CASSANDRA-5045 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2575275a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2575275a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2575275a Branch: refs/heads/trunk Commit: 2575275a04938e361a874ba685e50ac0e88df246 Parents: 3d93a3c Author: Sylvain Lebresne <[email protected]> Authored: Fri Jan 4 13:21:11 2013 +0100 Committer: Sylvain Lebresne <[email protected]> Committed: Wed Mar 6 08:56:42 2013 +0100 ---------------------------------------------------------------------- CHANGES.txt | 3 +- src/java/org/apache/cassandra/config/Config.java | 10 +- .../cassandra/config/ConfigurationLoader.java | 31 + .../cassandra/config/DatabaseDescriptor.java | 656 +++++++-------- .../cassandra/config/YamlConfigurationLoader.java | 94 ++ .../apache/cassandra/hadoop/BulkRecordWriter.java | 2 +- .../apache/cassandra/io/sstable/SSTableLoader.java | 2 +- .../org/apache/cassandra/utils/FBUtilities.java | 11 +- .../cassandra/config/DatabaseDescriptorTest.java | 27 + 9 files changed, 477 insertions(+), 359 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6da8885..0c1c1ef 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,4 @@ -1.3 +2.0 * Move sstable level information into the Stats component, removing the need for a separate Manifest file (CASSANDRA-4872) * avoid serializing to byte[] on commitlog append (CASSANDRA-5199) @@ -11,6 +11,7 @@ * more robust solution to incomplete compactions + counters (CASSANDRA-5151) * Change order of directory searching for c*.in.sh (CASSANDRA-3983) * Add tool to reset SSTable level (CASSANDRA-5271) + * Allow custom configuration loader (CASSANDRA-5045) 1.2.3 http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index be7e74c..2bc1b0e 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -169,7 +169,7 @@ public class Config public boolean inter_dc_tcp_nodelay = false; - private static boolean loadYaml = true; + private static boolean isClientMode = false; private static boolean outboundBindAny = false; public static boolean getOutboundBindAny() @@ -182,14 +182,14 @@ public class Config outboundBindAny = value; } - public static boolean getLoadYaml() + public static boolean isClientMode() { - return loadYaml; + return isClientMode; } - public static void setLoadYaml(boolean value) + public static void setClientMode(boolean clientMode) { - loadYaml = value; + isClientMode = clientMode; } public static enum CommitLogSync http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/src/java/org/apache/cassandra/config/ConfigurationLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/ConfigurationLoader.java b/src/java/org/apache/cassandra/config/ConfigurationLoader.java new file mode 100644 index 0000000..bb4b7af --- /dev/null +++ b/src/java/org/apache/cassandra/config/ConfigurationLoader.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.config; + +import org.apache.cassandra.exceptions.ConfigurationException; + +public interface ConfigurationLoader +{ + /** + * Loads a {@link Config} object to use to configure a node. + * + * @return the {@link Config} to use. + * @throws ConfigurationException if the configuration cannot be properly loaded. + */ + Config loadConfig() throws ConfigurationException; +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 2a91237..682e45f 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -20,12 +20,11 @@ package org.apache.cassandra.config; import java.io.File; import java.io.FileFilter; import java.io.IOException; -import java.io.InputStream; import java.net.InetAddress; -import java.net.URL; import java.net.UnknownHostException; import java.util.*; +import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Longs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,10 +52,6 @@ import org.apache.cassandra.scheduler.NoScheduler; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.utils.FBUtilities; -import org.yaml.snakeyaml.Loader; -import org.yaml.snakeyaml.TypeDescription; -import org.yaml.snakeyaml.Yaml; -import org.yaml.snakeyaml.error.YAMLException; public class DatabaseDescriptor { @@ -79,8 +74,6 @@ public class DatabaseDescriptor private static IAuthenticator authenticator = new AllowAllAuthenticator(); private static IAuthorizer authorizer = new AllowAllAuthorizer(); - private final static String DEFAULT_CONFIGURATION = "cassandra.yaml"; - private static IRequestScheduler requestScheduler; private static RequestSchedulerId requestSchedulerId; private static RequestSchedulerOptions requestSchedulerOptions; @@ -92,429 +85,404 @@ public class DatabaseDescriptor private static String localDC; private static Comparator<InetAddress> localComparator; - /** - * Inspect the classpath to find storage configuration file - */ - static URL getStorageConfigURL() throws ConfigurationException + static { - String configUrl = System.getProperty("cassandra.config"); - if (configUrl == null) - configUrl = DEFAULT_CONFIGURATION; - - URL url; - try - { - url = new URL(configUrl); - url.openStream().close(); // catches well-formed but bogus URLs - } - catch (Exception e) + // In client mode, we use a default configuration. Note that the fields of this class will be + // left unconfigured however (the partitioner or localDC will be null for instance) so this + // should be used with care. + if (Config.isClientMode()) { - ClassLoader loader = DatabaseDescriptor.class.getClassLoader(); - url = loader.getResource(configUrl); - if (url == null) - throw new ConfigurationException("Cannot locate " + configUrl); + conf = new Config(); } - - return url; - } - - static - { - if (Config.getLoadYaml()) - loadYaml(); else - conf = new Config(); - } - static void loadYaml() - { - try { - URL url = getStorageConfigURL(); - logger.info("Loading settings from " + url); - InputStream input; try { - input = url.openStream(); + applyConfig(loadConfig()); } - catch (IOException e) + catch (ConfigurationException e) { - // getStorageConfigURL should have ruled this out - throw new AssertionError(e); + logger.error("Fatal configuration error", e); + System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server. See log for stacktrace."); + System.exit(1); } - org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class); - TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class); - seedDesc.putMapPropertyType("parameters", String.class, String.class); - constructor.addTypeDescription(seedDesc); - Yaml yaml = new Yaml(new Loader(constructor)); - conf = (Config)yaml.load(input); - - if (!System.getProperty("os.arch").contains("64")) - logger.info("32bit JVM detected. It is recommended to run Cassandra on a 64bit JVM for better performance."); - - if (conf.commitlog_sync == null) + catch (Exception e) { - throw new ConfigurationException("Missing required directive CommitLogSync"); + logger.error("Fatal error during configuration loading", e); + System.err.println(e.getMessage() + "\nFatal error during configuration loading; unable to start server. See log for stacktrace."); + System.exit(1); } + } + } - if (conf.commitlog_sync == Config.CommitLogSync.batch) - { - if (conf.commitlog_sync_batch_window_in_ms == null) - { - throw new ConfigurationException("Missing value for commitlog_sync_batch_window_in_ms: Double expected."); - } - else if (conf.commitlog_sync_period_in_ms != null) - { - throw new ConfigurationException("Batch sync specified, but commitlog_sync_period_in_ms found. Only specify commitlog_sync_batch_window_in_ms when using batch sync"); - } - logger.debug("Syncing log with a batch window of " + conf.commitlog_sync_batch_window_in_ms); - } - else - { - if (conf.commitlog_sync_period_in_ms == null) - { - throw new ConfigurationException("Missing value for commitlog_sync_period_in_ms: Integer expected"); - } - else if (conf.commitlog_sync_batch_window_in_ms != null) - { - throw new ConfigurationException("commitlog_sync_period_in_ms specified, but commitlog_sync_batch_window_in_ms found. Only specify commitlog_sync_period_in_ms when using periodic sync."); - } - logger.debug("Syncing log with a period of " + conf.commitlog_sync_period_in_ms); - } + @VisibleForTesting + static Config loadConfig() throws ConfigurationException + { + String loaderClass = System.getProperty("cassandra.config.loader"); + ConfigurationLoader loader = loaderClass == null + ? new YamlConfigurationLoader() + : FBUtilities.<ConfigurationLoader>construct(loaderClass, "configuration loading"); + return loader.loadConfig(); + } - if (conf.commitlog_total_space_in_mb == null) - conf.commitlog_total_space_in_mb = System.getProperty("os.arch").contains("64") ? 1024 : 32; + private static void applyConfig(Config config) throws ConfigurationException + { + conf = config; + + if (!System.getProperty("os.arch").contains("64")) + logger.info("32bit JVM detected. It is recommended to run Cassandra on a 64bit JVM for better performance."); - /* evaluate the DiskAccessMode Config directive, which also affects indexAccessMode selection */ - if (conf.disk_access_mode == Config.DiskAccessMode.auto) + if (conf.commitlog_sync == null) + { + throw new ConfigurationException("Missing required directive CommitLogSync"); + } + + if (conf.commitlog_sync == Config.CommitLogSync.batch) + { + if (conf.commitlog_sync_batch_window_in_ms == null) { - conf.disk_access_mode = System.getProperty("os.arch").contains("64") ? Config.DiskAccessMode.mmap : Config.DiskAccessMode.standard; - indexAccessMode = conf.disk_access_mode; - logger.info("DiskAccessMode 'auto' determined to be " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode ); + throw new ConfigurationException("Missing value for commitlog_sync_batch_window_in_ms: Double expected."); } - else if (conf.disk_access_mode == Config.DiskAccessMode.mmap_index_only) + else if (conf.commitlog_sync_period_in_ms != null) { - conf.disk_access_mode = Config.DiskAccessMode.standard; - indexAccessMode = Config.DiskAccessMode.mmap; - logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode ); + throw new ConfigurationException("Batch sync specified, but commitlog_sync_period_in_ms found. Only specify commitlog_sync_batch_window_in_ms when using batch sync"); } - else + logger.debug("Syncing log with a batch window of " + conf.commitlog_sync_batch_window_in_ms); + } + else + { + if (conf.commitlog_sync_period_in_ms == null) { - indexAccessMode = conf.disk_access_mode; - logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode ); + throw new ConfigurationException("Missing value for commitlog_sync_period_in_ms: Integer expected"); } + else if (conf.commitlog_sync_batch_window_in_ms != null) + { + throw new ConfigurationException("commitlog_sync_period_in_ms specified, but commitlog_sync_batch_window_in_ms found. Only specify commitlog_sync_period_in_ms when using periodic sync."); + } + logger.debug("Syncing log with a period of " + conf.commitlog_sync_period_in_ms); + } + + if (conf.commitlog_total_space_in_mb == null) + conf.commitlog_total_space_in_mb = System.getProperty("os.arch").contains("64") ? 1024 : 32; - logger.info("disk_failure_policy is " + conf.disk_failure_policy); + /* evaluate the DiskAccessMode Config directive, which also affects indexAccessMode selection */ + if (conf.disk_access_mode == Config.DiskAccessMode.auto) + { + conf.disk_access_mode = System.getProperty("os.arch").contains("64") ? Config.DiskAccessMode.mmap : Config.DiskAccessMode.standard; + indexAccessMode = conf.disk_access_mode; + logger.info("DiskAccessMode 'auto' determined to be " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode ); + } + else if (conf.disk_access_mode == Config.DiskAccessMode.mmap_index_only) + { + conf.disk_access_mode = Config.DiskAccessMode.standard; + indexAccessMode = Config.DiskAccessMode.mmap; + logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode ); + } + else + { + indexAccessMode = conf.disk_access_mode; + logger.info("DiskAccessMode is " + conf.disk_access_mode + ", indexAccessMode is " + indexAccessMode ); + } - /* Authentication and authorization backend, implementing IAuthenticator and IAuthorizer */ - if (conf.authenticator != null) - authenticator = FBUtilities.construct(conf.authenticator, "authenticator"); + logger.info("disk_failure_policy is " + conf.disk_failure_policy); - if (conf.authority != null) - { - logger.warn("Please rename 'authority' to 'authorizer' in cassandra.yaml"); - if (!conf.authority.equals("org.apache.cassandra.auth.AllowAllAuthority")) - throw new ConfigurationException("IAuthority interface has been deprecated," - + " please implement IAuthorizer instead."); - } + /* Authentication and authorization backend, implementing IAuthenticator and IAuthorizer */ + if (conf.authenticator != null) + authenticator = FBUtilities.construct(conf.authenticator, "authenticator"); - if (conf.authorizer != null) - authorizer = FBUtilities.construct(conf.authorizer, "authorizer"); + if (conf.authority != null) + { + logger.warn("Please rename 'authority' to 'authorizer' in cassandra.yaml"); + if (!conf.authority.equals("org.apache.cassandra.auth.AllowAllAuthority")) + throw new ConfigurationException("IAuthority interface has been deprecated," + + " please implement IAuthorizer instead."); + } - authenticator.validateConfiguration(); - authorizer.validateConfiguration(); + if (conf.authorizer != null) + authorizer = FBUtilities.construct(conf.authorizer, "authorizer"); - /* Hashing strategy */ - if (conf.partitioner == null) - { - throw new ConfigurationException("Missing directive: partitioner"); - } - try - { - partitioner = FBUtilities.newPartitioner(System.getProperty("cassandra.partitioner", conf.partitioner)); - } - catch (Exception e) - { - throw new ConfigurationException("Invalid partitioner class " + conf.partitioner); - } - paritionerName = partitioner.getClass().getCanonicalName(); + authenticator.validateConfiguration(); + authorizer.validateConfiguration(); - /* phi convict threshold for FailureDetector */ - if (conf.phi_convict_threshold < 5 || conf.phi_convict_threshold > 16) - { - throw new ConfigurationException("phi_convict_threshold must be between 5 and 16"); - } + /* Hashing strategy */ + if (conf.partitioner == null) + { + throw new ConfigurationException("Missing directive: partitioner"); + } + try + { + partitioner = FBUtilities.newPartitioner(System.getProperty("cassandra.partitioner", conf.partitioner)); + } + catch (Exception e) + { + throw new ConfigurationException("Invalid partitioner class " + conf.partitioner); + } + paritionerName = partitioner.getClass().getCanonicalName(); - /* Thread per pool */ - if (conf.concurrent_reads != null && conf.concurrent_reads < 2) - { - throw new ConfigurationException("concurrent_reads must be at least 2"); - } + /* phi convict threshold for FailureDetector */ + if (conf.phi_convict_threshold < 5 || conf.phi_convict_threshold > 16) + { + throw new ConfigurationException("phi_convict_threshold must be between 5 and 16"); + } - if (conf.concurrent_writes != null && conf.concurrent_writes < 2) - { - throw new ConfigurationException("concurrent_writes must be at least 2"); - } + /* Thread per pool */ + if (conf.concurrent_reads != null && conf.concurrent_reads < 2) + { + throw new ConfigurationException("concurrent_reads must be at least 2"); + } - if (conf.concurrent_replicates != null && conf.concurrent_replicates < 2) - { - throw new ConfigurationException("concurrent_replicates must be at least 2"); - } + if (conf.concurrent_writes != null && conf.concurrent_writes < 2) + { + throw new ConfigurationException("concurrent_writes must be at least 2"); + } + + if (conf.concurrent_replicates != null && conf.concurrent_replicates < 2) + { + throw new ConfigurationException("concurrent_replicates must be at least 2"); + } - if (conf.memtable_total_space_in_mb == null) - conf.memtable_total_space_in_mb = (int) (Runtime.getRuntime().maxMemory() / (3 * 1048576)); - if (conf.memtable_total_space_in_mb <= 0) - throw new ConfigurationException("memtable_total_space_in_mb must be positive"); - logger.info("Global memtable threshold is enabled at {}MB", conf.memtable_total_space_in_mb); + if (conf.memtable_total_space_in_mb == null) + conf.memtable_total_space_in_mb = (int) (Runtime.getRuntime().maxMemory() / (3 * 1048576)); + if (conf.memtable_total_space_in_mb <= 0) + throw new ConfigurationException("memtable_total_space_in_mb must be positive"); + logger.info("Global memtable threshold is enabled at {}MB", conf.memtable_total_space_in_mb); + + /* Memtable flush writer threads */ + if (conf.memtable_flush_writers != null && conf.memtable_flush_writers < 1) + { + throw new ConfigurationException("memtable_flush_writers must be at least 1"); + } + else if (conf.memtable_flush_writers == null) + { + conf.memtable_flush_writers = conf.data_file_directories.length; + } - /* Memtable flush writer threads */ - if (conf.memtable_flush_writers != null && conf.memtable_flush_writers < 1) + /* Local IP or hostname to bind services to */ + if (conf.listen_address != null) + { + try { - throw new ConfigurationException("memtable_flush_writers must be at least 1"); + listenAddress = InetAddress.getByName(conf.listen_address); } - else if (conf.memtable_flush_writers == null) + catch (UnknownHostException e) { - conf.memtable_flush_writers = conf.data_file_directories.length; + throw new ConfigurationException("Unknown listen_address '" + conf.listen_address + "'"); } + } - /* Local IP or hostname to bind services to */ - if (conf.listen_address != null) + /* Gossip Address to broadcast */ + if (conf.broadcast_address != null) + { + if (conf.broadcast_address.equals("0.0.0.0")) { - try - { - listenAddress = InetAddress.getByName(conf.listen_address); - } - catch (UnknownHostException e) - { - throw new ConfigurationException("Unknown listen_address '" + conf.listen_address + "'"); - } + throw new ConfigurationException("broadcast_address cannot be 0.0.0.0!"); } - /* Gossip Address to broadcast */ - if (conf.broadcast_address != null) + try { - if (conf.broadcast_address.equals("0.0.0.0")) - { - throw new ConfigurationException("broadcast_address cannot be 0.0.0.0!"); - } - - try - { - broadcastAddress = InetAddress.getByName(conf.broadcast_address); - } - catch (UnknownHostException e) - { - throw new ConfigurationException("Unknown broadcast_address '" + conf.broadcast_address + "'"); - } + broadcastAddress = InetAddress.getByName(conf.broadcast_address); + } + catch (UnknownHostException e) + { + throw new ConfigurationException("Unknown broadcast_address '" + conf.broadcast_address + "'"); } + } - /* Local IP or hostname to bind RPC server to */ - if (conf.rpc_address != null) + /* Local IP or hostname to bind RPC server to */ + if (conf.rpc_address != null) + { + try { - try - { - rpcAddress = InetAddress.getByName(conf.rpc_address); - } - catch (UnknownHostException e) - { - throw new ConfigurationException("Unknown host in rpc_address " + conf.rpc_address); - } + rpcAddress = InetAddress.getByName(conf.rpc_address); } - else + catch (UnknownHostException e) { - rpcAddress = FBUtilities.getLocalAddress(); + throw new ConfigurationException("Unknown host in rpc_address " + conf.rpc_address); } + } + else + { + rpcAddress = FBUtilities.getLocalAddress(); + } - if (conf.thrift_framed_transport_size_in_mb <= 0) - throw new ConfigurationException("thrift_framed_transport_size_in_mb must be positive"); + if (conf.thrift_framed_transport_size_in_mb <= 0) + throw new ConfigurationException("thrift_framed_transport_size_in_mb must be positive"); - if (conf.thrift_max_message_length_in_mb < conf.thrift_framed_transport_size_in_mb) - throw new ConfigurationException("thrift_max_message_length_in_mb must be greater than thrift_framed_transport_size_in_mb"); + if (conf.thrift_max_message_length_in_mb < conf.thrift_framed_transport_size_in_mb) + throw new ConfigurationException("thrift_max_message_length_in_mb must be greater than thrift_framed_transport_size_in_mb"); - /* end point snitch */ - if (conf.endpoint_snitch == null) - { - throw new ConfigurationException("Missing endpoint_snitch directive"); - } - snitch = createEndpointSnitch(conf.endpoint_snitch); - EndpointSnitchInfo.create(); + /* end point snitch */ + if (conf.endpoint_snitch == null) + { + throw new ConfigurationException("Missing endpoint_snitch directive"); + } + snitch = createEndpointSnitch(conf.endpoint_snitch); + EndpointSnitchInfo.create(); - localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress()); - localComparator = new Comparator<InetAddress>() + localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress()); + localComparator = new Comparator<InetAddress>() + { + public int compare(InetAddress endpoint1, InetAddress endpoint2) { - public int compare(InetAddress endpoint1, InetAddress endpoint2) - { - boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1)); - boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2)); - if (local1 && !local2) - return -1; - if (local2 && !local1) - return 1; - return 0; - } - }; + boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1)); + boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2)); + if (local1 && !local2) + return -1; + if (local2 && !local1) + return 1; + return 0; + } + }; - /* Request Scheduler setup */ - requestSchedulerOptions = conf.request_scheduler_options; - if (conf.request_scheduler != null) + /* Request Scheduler setup */ + requestSchedulerOptions = conf.request_scheduler_options; + if (conf.request_scheduler != null) + { + try { - try - { - if (requestSchedulerOptions == null) - { - requestSchedulerOptions = new RequestSchedulerOptions(); - } - Class<?> cls = Class.forName(conf.request_scheduler); - requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions); - } - catch (ClassNotFoundException e) - { - throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler); - } - catch (Exception e) + if (requestSchedulerOptions == null) { - throw new ConfigurationException("Unable to instantiate request scheduler", e); + requestSchedulerOptions = new RequestSchedulerOptions(); } + Class<?> cls = Class.forName(conf.request_scheduler); + requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions); } - else - { - requestScheduler = new NoScheduler(); - } - - if (conf.request_scheduler_id == RequestSchedulerId.keyspace) + catch (ClassNotFoundException e) { - requestSchedulerId = conf.request_scheduler_id; + throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler); } - else + catch (Exception e) { - // Default to Keyspace - requestSchedulerId = RequestSchedulerId.keyspace; + throw new ConfigurationException("Unable to instantiate request scheduler", e); } + } + else + { + requestScheduler = new NoScheduler(); + } - if (logger.isDebugEnabled() && conf.auto_bootstrap != null) - { - logger.debug("setting auto_bootstrap to " + conf.auto_bootstrap); - } + if (conf.request_scheduler_id == RequestSchedulerId.keyspace) + { + requestSchedulerId = conf.request_scheduler_id; + } + else + { + // Default to Keyspace + requestSchedulerId = RequestSchedulerId.keyspace; + } - if (conf.in_memory_compaction_limit_in_mb != null && conf.in_memory_compaction_limit_in_mb <= 0) - { - throw new ConfigurationException("in_memory_compaction_limit_in_mb must be a positive integer"); - } + if (logger.isDebugEnabled() && conf.auto_bootstrap != null) + { + logger.debug("setting auto_bootstrap to " + conf.auto_bootstrap); + } - if (conf.concurrent_compactors == null) - conf.concurrent_compactors = FBUtilities.getAvailableProcessors(); + if (conf.in_memory_compaction_limit_in_mb != null && conf.in_memory_compaction_limit_in_mb <= 0) + { + throw new ConfigurationException("in_memory_compaction_limit_in_mb must be a positive integer"); + } - if (conf.concurrent_compactors <= 0) - throw new ConfigurationException("concurrent_compactors should be strictly greater than 0"); + if (conf.concurrent_compactors == null) + conf.concurrent_compactors = FBUtilities.getAvailableProcessors(); - if (conf.compaction_throughput_mb_per_sec == null) - conf.compaction_throughput_mb_per_sec = 16; + if (conf.concurrent_compactors <= 0) + throw new ConfigurationException("concurrent_compactors should be strictly greater than 0"); - if (conf.stream_throughput_outbound_megabits_per_sec == null) - conf.stream_throughput_outbound_megabits_per_sec = 400; + if (conf.compaction_throughput_mb_per_sec == null) + conf.compaction_throughput_mb_per_sec = 16; - if (conf.rpc_min_threads == null) - conf.rpc_min_threads = 16; + if (conf.stream_throughput_outbound_megabits_per_sec == null) + conf.stream_throughput_outbound_megabits_per_sec = 400; - if (conf.rpc_max_threads == null) - conf.rpc_max_threads = Integer.MAX_VALUE; + if (conf.rpc_min_threads == null) + conf.rpc_min_threads = 16; - /* data file and commit log directories. they get created later, when they're needed. */ - if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null) - { - for (String datadir : conf.data_file_directories) - { - if (datadir.equals(conf.commitlog_directory)) - throw new ConfigurationException("commitlog_directory must not be the same as any data_file_directories"); - if (datadir.equals(conf.saved_caches_directory)) - throw new ConfigurationException("saved_caches_directory must not be the same as any data_file_directories"); - } + if (conf.rpc_max_threads == null) + conf.rpc_max_threads = Integer.MAX_VALUE; - if (conf.commitlog_directory.equals(conf.saved_caches_directory)) - throw new ConfigurationException("saved_caches_directory must not be the same as the commitlog_directory"); - } - else + /* data file and commit log directories. they get created later, when they're needed. */ + if (conf.commitlog_directory != null && conf.data_file_directories != null && conf.saved_caches_directory != null) + { + for (String datadir : conf.data_file_directories) { - if (conf.commitlog_directory == null) - throw new ConfigurationException("commitlog_directory missing"); - if (conf.data_file_directories == null) - throw new ConfigurationException("data_file_directories missing; at least one data directory must be specified"); - if (conf.saved_caches_directory == null) - throw new ConfigurationException("saved_caches_directory missing"); + if (datadir.equals(conf.commitlog_directory)) + throw new ConfigurationException("commitlog_directory must not be the same as any data_file_directories"); + if (datadir.equals(conf.saved_caches_directory)) + throw new ConfigurationException("saved_caches_directory must not be the same as any data_file_directories"); } - if (conf.initial_token != null) - for (String token : tokensFromString(conf.initial_token)) - partitioner.getTokenFactory().validate(token); + if (conf.commitlog_directory.equals(conf.saved_caches_directory)) + throw new ConfigurationException("saved_caches_directory must not be the same as the commitlog_directory"); + } + else + { + if (conf.commitlog_directory == null) + throw new ConfigurationException("commitlog_directory missing"); + if (conf.data_file_directories == null) + throw new ConfigurationException("data_file_directories missing; at least one data directory must be specified"); + if (conf.saved_caches_directory == null) + throw new ConfigurationException("saved_caches_directory missing"); + } + + if (conf.initial_token != null) + for (String token : tokensFromString(conf.initial_token)) + partitioner.getTokenFactory().validate(token); - try - { - // if key_cache_size_in_mb option was set to "auto" then size of the cache should be "min(5% of Heap (in MB), 100MB) - keyCacheSizeInMB = (conf.key_cache_size_in_mb == null) - ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)), 100) - : conf.key_cache_size_in_mb; + try + { + // if key_cache_size_in_mb option was set to "auto" then size of the cache should be "min(5% of Heap (in MB), 100MB) + keyCacheSizeInMB = (conf.key_cache_size_in_mb == null) + ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)), 100) + : conf.key_cache_size_in_mb; - if (keyCacheSizeInMB < 0) - throw new NumberFormatException(); // to escape duplicating error message - } - catch (NumberFormatException e) - { - throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '" - + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0."); - } + if (keyCacheSizeInMB < 0) + throw new NumberFormatException(); // to escape duplicating error message + } + catch (NumberFormatException e) + { + throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '" + + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0."); + } - rowCacheProvider = FBUtilities.newCacheProvider(conf.row_cache_provider); - memoryAllocator = FBUtilities.newOffHeapAllocator(conf.memory_allocator); + rowCacheProvider = FBUtilities.newCacheProvider(conf.row_cache_provider); + memoryAllocator = FBUtilities.newOffHeapAllocator(conf.memory_allocator); - if(conf.encryption_options != null) - { - logger.warn("Please rename encryption_options as server_encryption_options in the yaml"); - //operate under the assumption that server_encryption_options is not set in yaml rather than both - conf.server_encryption_options = conf.encryption_options; - } + if(conf.encryption_options != null) + { + logger.warn("Please rename encryption_options as server_encryption_options in the yaml"); + //operate under the assumption that server_encryption_options is not set in yaml rather than both + conf.server_encryption_options = conf.encryption_options; + } - // Hardcoded system tables - List<KSMetaData> systemKeyspaces = Arrays.asList(KSMetaData.systemKeyspace(), KSMetaData.traceKeyspace()); - assert systemKeyspaces.size() == Schema.systemKeyspaceNames.size(); - for (KSMetaData ksmd : systemKeyspaces) - { - // install the definition - for (CFMetaData cfm : ksmd.cfMetaData().values()) - Schema.instance.load(cfm); - Schema.instance.setTableDefinition(ksmd); - } + // Hardcoded system tables + List<KSMetaData> systemKeyspaces = Arrays.asList(KSMetaData.systemKeyspace(), KSMetaData.traceKeyspace()); + assert systemKeyspaces.size() == Schema.systemKeyspaceNames.size(); + for (KSMetaData ksmd : systemKeyspaces) + { + // install the definition + for (CFMetaData cfm : ksmd.cfMetaData().values()) + Schema.instance.load(cfm); + Schema.instance.setTableDefinition(ksmd); + } - /* Load the seeds for node contact points */ - if (conf.seed_provider == null) - { - throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required."); - } - try - { - Class<?> seedProviderClass = Class.forName(conf.seed_provider.class_name); - seedProvider = (SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters); - } - // there are about 5 checked exceptions that could be thrown here. - catch (Exception e) - { - logger.error("Fatal configuration error", e); - System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server. See log for stacktrace."); - System.exit(1); - } - if (seedProvider.getSeeds().size() == 0) - throw new ConfigurationException("The seed provider lists no seeds."); + /* Load the seeds for node contact points */ + if (conf.seed_provider == null) + { + throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required."); } - catch (ConfigurationException e) + try { - logger.error("Fatal configuration error", e); - System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server. See log for stacktrace."); - System.exit(1); + Class<?> seedProviderClass = Class.forName(conf.seed_provider.class_name); + seedProvider = (SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters); } - catch (YAMLException e) + // there are about 5 checked exceptions that could be thrown here. + catch (Exception e) { - logger.error("Fatal configuration error error", e); - System.err.println(e.getMessage() + "\nInvalid yaml; unable to start server. See log for stacktrace."); + logger.error("Fatal configuration error", e); + System.err.println(e.getMessage() + "\nFatal configuration error; unable to start server. See log for stacktrace."); System.exit(1); } + if (seedProvider.getSeeds().size() == 0) + throw new ConfigurationException("The seed provider lists no seeds."); } private static IEndpointSnitch createEndpointSnitch(String snitchClassName) throws ConfigurationException http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java new file mode 100644 index 0000000..a46cd65 --- /dev/null +++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.config; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.exceptions.ConfigurationException; + +import org.yaml.snakeyaml.Loader; +import org.yaml.snakeyaml.TypeDescription; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.error.YAMLException; + +public class YamlConfigurationLoader implements ConfigurationLoader +{ + private static final Logger logger = LoggerFactory.getLogger(YamlConfigurationLoader.class); + + private final static String DEFAULT_CONFIGURATION = "cassandra.yaml"; + + /** + * Inspect the classpath to find storage configuration file + */ + private URL getStorageConfigURL() throws ConfigurationException + { + String configUrl = System.getProperty("cassandra.config"); + if (configUrl == null) + configUrl = DEFAULT_CONFIGURATION; + + URL url; + try + { + url = new URL(configUrl); + url.openStream().close(); // catches well-formed but bogus URLs + } + catch (Exception e) + { + ClassLoader loader = DatabaseDescriptor.class.getClassLoader(); + url = loader.getResource(configUrl); + if (url == null) + throw new ConfigurationException("Cannot locate " + configUrl); + } + + return url; + } + + public Config loadConfig() throws ConfigurationException + { + try + { + URL url = getStorageConfigURL(); + logger.info("Loading settings from " + url); + InputStream input; + try + { + input = url.openStream(); + } + catch (IOException e) + { + // getStorageConfigURL should have ruled this out + throw new AssertionError(e); + } + org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class); + TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class); + seedDesc.putMapPropertyType("parameters", String.class, String.class); + constructor.addTypeDescription(seedDesc); + Yaml yaml = new Yaml(new Loader(constructor)); + return (Config)yaml.load(input); + } + catch (YAMLException e) + { + throw new ConfigurationException("Invalid yaml", e); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java index 3f70ca5..e0bd09b 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java @@ -94,7 +94,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> BulkRecordWriter(Configuration conf) throws IOException { - Config.setLoadYaml(false); + Config.setClientMode(true); Config.setOutboundBindAny(true); this.conf = conf; DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0"))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 32dbbc3..263f39c 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -49,7 +49,7 @@ public class SSTableLoader static { - Config.setLoadYaml(false); + Config.setClientMode(true); } public SSTableLoader(File directory, Client client, OutputHandler outputHandler) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index 15f7fb0..e6e2e50 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -453,7 +453,7 @@ public class FBUtilities } /** - * Constructs an instance of the given class, which must have a no-arg constructor. + * Constructs an instance of the given class, which must have a no-arg or default constructor. * @param classname Fully qualified classname. * @param readable Descriptive noun for the role the class plays. * @throws ConfigurationException If the class cannot be found. @@ -463,11 +463,7 @@ public class FBUtilities Class<T> cls = FBUtilities.classForName(classname, readable); try { - return cls.getConstructor().newInstance(); - } - catch (NoSuchMethodException e) - { - throw new ConfigurationException(String.format("No default constructor for %s class '%s'.", readable, classname)); + return cls.newInstance(); } catch (IllegalAccessException e) { @@ -477,8 +473,9 @@ public class FBUtilities { throw new ConfigurationException(String.format("Cannot use abstract class '%s' as %s.", classname, readable)); } - catch (InvocationTargetException e) + catch (Exception e) { + // Catch-all because Class.newInstance() "propagates any exception thrown by the nullary constructor, including a checked exception". if (e.getCause() instanceof ConfigurationException) throw (ConfigurationException)e.getCause(); throw new ConfigurationException(String.format("Error instantiating %s class '%s'.", readable, classname), e); http://git-wip-us.apache.org/repos/asf/cassandra/blob/2575275a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java index a357ed6..9cca997 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java @@ -26,6 +26,8 @@ import org.apache.cassandra.locator.SimpleStrategy; import org.apache.cassandra.service.MigrationManager; import org.junit.Test; +import static org.junit.Assert.*; + import java.io.IOException; @@ -94,4 +96,29 @@ public class DatabaseDescriptorTest Gossiper.instance.stop(); } } + + @Test + public void testConfigurationLoader() throws Exception + { + // By default, we should load from the yaml + Config config = DatabaseDescriptor.loadConfig(); + assertEquals("Test Cluster", config.cluster_name); + + // Now try custom loader + ConfigurationLoader testLoader = new TestLoader(); + System.setProperty("cassandra.config.loader", testLoader.getClass().getName()); + + config = DatabaseDescriptor.loadConfig(); + assertEquals("ConfigurationLoader Test", config.cluster_name); + } + + public static class TestLoader implements ConfigurationLoader + { + public Config loadConfig() throws ConfigurationException + { + Config testConfig = new Config(); + testConfig.cluster_name = "ConfigurationLoader Test";; + return testConfig; + } + } }
