http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 6f71817..f9d0498 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -35,30 +35,31 @@ import com.google.common.primitives.Longs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.auth.*; +import org.apache.cassandra.auth.AuthConfig; +import org.apache.cassandra.auth.IAuthenticator; +import org.apache.cassandra.auth.IAuthorizer; +import org.apache.cassandra.auth.IInternodeAuthenticator; +import org.apache.cassandra.auth.IRoleManager; import org.apache.cassandra.config.Config.CommitLogSync; import org.apache.cassandra.config.Config.RequestSchedulerId; -import org.apache.cassandra.config.EncryptionOptions.ClientEncryptionOptions; -import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.sstable.format.SSTableFormat; import org.apache.cassandra.io.util.DiskOptimizationStrategy; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy; import org.apache.cassandra.io.util.SsdDiskOptimizationStrategy; -import org.apache.cassandra.locator.*; -import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.locator.DynamicEndpointSnitch; +import org.apache.cassandra.locator.EndpointSnitchInfo; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.SeedProvider; import org.apache.cassandra.scheduler.IRequestScheduler; import org.apache.cassandra.scheduler.NoScheduler; import org.apache.cassandra.security.EncryptionContext; -import org.apache.cassandra.service.CacheService; -import org.apache.cassandra.thrift.ThriftServer; +import org.apache.cassandra.service.CacheService.CacheType; +import org.apache.cassandra.thrift.ThriftServer.ThriftServerType; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.memory.*; + import org.apache.commons.lang3.StringUtils; public class DatabaseDescriptor @@ -71,6 +72,8 @@ public class DatabaseDescriptor */ private static final int MAX_NUM_TOKENS = 1536; + private static Config conf; + private static IEndpointSnitch snitch; private static InetAddress listenAddress; // leave null so we can fall through to getLocalHost private static InetAddress broadcastAddress; @@ -85,12 +88,8 @@ public class DatabaseDescriptor private static Config.DiskAccessMode indexAccessMode; - private static Config conf; - - private static SSTableFormat.Type sstable_format = SSTableFormat.Type.BIG; - - private static IAuthenticator authenticator = new AllowAllAuthenticator(); - private static IAuthorizer authorizer = new AllowAllAuthorizer(); + private static IAuthenticator authenticator; + private static IAuthorizer authorizer; // Don't initialize the role manager until applying config. The options supported by CassandraRoleManager // depend on the configured IAuthenticator, so defer creating it until that's been set. private static IRoleManager roleManager; @@ -113,36 +112,63 @@ public class DatabaseDescriptor private static DiskOptimizationStrategy diskOptimizationStrategy; - public static void forceStaticInitialization() {} - static + private static boolean clientInitialized; + private static boolean toolInitialized; + private static boolean daemonInitialized; + + public static void daemonInitialization() throws ConfigurationException { - // In client mode, we use a default configuration. Note that the fields of this class will be - // left unconfigured however (the partitioner or localDC will be null for instance) so this - // should be used with care. - try - { - if (Config.isClientMode()) - { - conf = new Config(); - } - else - { - applyConfig(loadConfig()); - } - switch (conf.disk_optimization_strategy) - { - case ssd: - diskOptimizationStrategy = new SsdDiskOptimizationStrategy(conf.disk_optimization_page_cross_chance); - break; - case spinning: - diskOptimizationStrategy = new SpinningDiskOptimizationStrategy(); - break; - } - } - catch (Exception e) - { - throw new ExceptionInInitializerError(e); - } + assert !toolInitialized; + assert !clientInitialized; + + // Some unit tests require this :( + if (daemonInitialized) + return; + daemonInitialized = true; + + setConfig(loadConfig()); + applyAll(); + AuthConfig.applyAuthz(); + } + + public static void toolInitialization() + { + assert !daemonInitialized; + assert !clientInitialized; + + if (toolInitialized) + return; + toolInitialized = true; + + Config.setToolsMode(true); + + setConfig(loadConfig()); + + applySimpleConfig(); + + applyPartitioner(); + + applySnitch(); + + applyEncryptionContext(); + } + + public static void clientInitialization() + { + assert !daemonInitialized; + assert !toolInitialized; + + if (clientInitialized) + return; + clientInitialized = true; + + Config.setClientMode(true); + conf = new Config(); + } + + public static Config getRawConfig() + { + return conf; } @VisibleForTesting @@ -193,105 +219,34 @@ public class DatabaseDescriptor } } - @VisibleForTesting - public static void applyAddressConfig(Config config) throws ConfigurationException + private static void setConfig(Config config) { - listenAddress = null; - rpcAddress = null; - broadcastAddress = null; - broadcastRpcAddress = null; + conf = config; + } - /* Local IP, hostname or interface to bind services to */ - if (config.listen_address != null && config.listen_interface != null) - { - throw new ConfigurationException("Set listen_address OR listen_interface, not both", false); - } - else if (config.listen_address != null) - { - try - { - listenAddress = InetAddress.getByName(config.listen_address); - } - catch (UnknownHostException e) - { - throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false); - } + private static void applyAll() throws ConfigurationException + { + applySimpleConfig(); - if (listenAddress.isAnyLocalAddress()) - throw new ConfigurationException("listen_address cannot be a wildcard address (" + config.listen_address + ")!", false); - } - else if (config.listen_interface != null) - { - listenAddress = getNetworkInterfaceAddress(config.listen_interface, "listen_interface", config.listen_interface_prefer_ipv6); - } + applyPartitioner(); - /* Gossip Address to broadcast */ - if (config.broadcast_address != null) - { - try - { - broadcastAddress = InetAddress.getByName(config.broadcast_address); - } - catch (UnknownHostException e) - { - throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false); - } + applyAddressConfig(); - if (broadcastAddress.isAnyLocalAddress()) - throw new ConfigurationException("broadcast_address cannot be a wildcard address (" + config.broadcast_address + ")!", false); - } + applyThriftHSHA(); - /* Local IP, hostname or interface to bind RPC server to */ - if (config.rpc_address != null && config.rpc_interface != null) - { - throw new ConfigurationException("Set rpc_address OR rpc_interface, not both", false); - } - else if (config.rpc_address != null) - { - try - { - rpcAddress = InetAddress.getByName(config.rpc_address); - } - catch (UnknownHostException e) - { - throw new ConfigurationException("Unknown host in rpc_address " + config.rpc_address, false); - } - } - else if (config.rpc_interface != null) - { - rpcAddress = getNetworkInterfaceAddress(config.rpc_interface, "rpc_interface", config.rpc_interface_prefer_ipv6); - } - else - { - rpcAddress = FBUtilities.getLocalAddress(); - } + applySnitch(); - /* RPC address to broadcast */ - if (config.broadcast_rpc_address != null) - { - try - { - broadcastRpcAddress = InetAddress.getByName(config.broadcast_rpc_address); - } - catch (UnknownHostException e) - { - throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false); - } + applyRequestScheduler(); - if (broadcastRpcAddress.isAnyLocalAddress()) - throw new ConfigurationException("broadcast_rpc_address cannot be a wildcard address (" + config.broadcast_rpc_address + ")!", false); - } - else - { - if (rpcAddress.isAnyLocalAddress()) - throw new ConfigurationException("If rpc_address is set to a wildcard address (" + config.rpc_address + "), then " + - "you must set broadcast_rpc_address to a value other than " + config.rpc_address, false); - } + applyInitialTokens(); + + applySeedProvider(); + + applyEncryptionContext(); } - public static void applyConfig(Config config) throws ConfigurationException + private static void applySimpleConfig() { - conf = config; if (conf.commitlog_sync == null) { @@ -342,67 +297,6 @@ public class DatabaseDescriptor logger.info("DiskAccessMode is {}, indexAccessMode is {}", conf.disk_access_mode, indexAccessMode); } - /* Authentication, authorization and role management backend, implementing IAuthenticator, IAuthorizer & IRoleMapper*/ - if (conf.authenticator != null) - authenticator = FBUtilities.newAuthenticator(conf.authenticator); - - // the configuration options regarding credentials caching are only guaranteed to - // work with PasswordAuthenticator, so log a message if some other authenticator - // is in use and non-default values are detected - if (!(authenticator instanceof PasswordAuthenticator) - && (conf.credentials_update_interval_in_ms != -1 - || conf.credentials_validity_in_ms != 2000 - || conf.credentials_cache_max_entries != 1000)) - { - logger.info("Configuration options credentials_update_interval_in_ms, credentials_validity_in_ms and " + - "credentials_cache_max_entries may not be applicable for the configured authenticator ({})", - authenticator.getClass().getName()); - } - - if (conf.authorizer != null) - authorizer = FBUtilities.newAuthorizer(conf.authorizer); - - if (!authenticator.requireAuthentication() && authorizer.requireAuthorization()) - throw new ConfigurationException(conf.authenticator + " can't be used with " + conf.authorizer, false); - - if (conf.role_manager != null) - roleManager = FBUtilities.newRoleManager(conf.role_manager); - else - roleManager = new CassandraRoleManager(); - - if (authenticator instanceof PasswordAuthenticator && !(roleManager instanceof CassandraRoleManager)) - throw new ConfigurationException("CassandraRoleManager must be used with PasswordAuthenticator", false); - - if (conf.internode_authenticator != null) - internodeAuthenticator = FBUtilities.construct(conf.internode_authenticator, "internode_authenticator"); - else - internodeAuthenticator = new AllowAllInternodeAuthenticator(); - - authenticator.validateConfiguration(); - authorizer.validateConfiguration(); - roleManager.validateConfiguration(); - internodeAuthenticator.validateConfiguration(); - - /* Hashing strategy */ - if (conf.partitioner == null) - { - throw new ConfigurationException("Missing directive: partitioner", false); - } - try - { - partitioner = FBUtilities.newPartitioner(System.getProperty("cassandra.partitioner", conf.partitioner)); - } - catch (Exception e) - { - throw new ConfigurationException("Invalid partitioner class " + conf.partitioner, false); - } - paritionerName = partitioner.getClass().getCanonicalName(); - - if (config.gc_log_threshold_in_ms < 0) - { - throw new ConfigurationException("gc_log_threshold_in_ms must be a positive integer"); - } - if (conf.gc_warn_threshold_in_ms < 0) { throw new ConfigurationException("gc_warn_threshold_in_ms must be a positive integer"); @@ -454,83 +348,12 @@ public class DatabaseDescriptor else logger.info("Global memtable off-heap threshold is enabled at {}MB", conf.memtable_offheap_space_in_mb); - applyAddressConfig(config); - if (conf.thrift_framed_transport_size_in_mb <= 0) throw new ConfigurationException("thrift_framed_transport_size_in_mb must be positive, but was " + conf.thrift_framed_transport_size_in_mb, false); if (conf.native_transport_max_frame_size_in_mb <= 0) throw new ConfigurationException("native_transport_max_frame_size_in_mb must be positive, but was " + conf.native_transport_max_frame_size_in_mb, false); - // fail early instead of OOMing (see CASSANDRA-8116) - if (ThriftServer.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads == Integer.MAX_VALUE) - throw new ConfigurationException("The hsha rpc_server_type is not compatible with an rpc_max_threads " + - "setting of 'unlimited'. Please see the comments in cassandra.yaml " + - "for rpc_server_type and rpc_max_threads.", - false); - if (ThriftServer.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads > (FBUtilities.getAvailableProcessors() * 2 + 1024)) - logger.warn("rpc_max_threads setting of {} may be too high for the hsha server and cause unnecessary thread contention, reducing performance", conf.rpc_max_threads); - - /* end point snitch */ - if (conf.endpoint_snitch == null) - { - throw new ConfigurationException("Missing endpoint_snitch directive", false); - } - snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch); - EndpointSnitchInfo.create(); - - localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress()); - localComparator = new Comparator<InetAddress>() - { - public int compare(InetAddress endpoint1, InetAddress endpoint2) - { - boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1)); - boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2)); - if (local1 && !local2) - return -1; - if (local2 && !local1) - return 1; - return 0; - } - }; - - /* Request Scheduler setup */ - requestSchedulerOptions = conf.request_scheduler_options; - if (conf.request_scheduler != null) - { - try - { - if (requestSchedulerOptions == null) - { - requestSchedulerOptions = new RequestSchedulerOptions(); - } - Class<?> cls = Class.forName(conf.request_scheduler); - requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions); - } - catch (ClassNotFoundException e) - { - throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler, false); - } - catch (Exception e) - { - throw new ConfigurationException("Unable to instantiate request scheduler", e); - } - } - else - { - requestScheduler = new NoScheduler(); - } - - if (conf.request_scheduler_id == RequestSchedulerId.keyspace) - { - requestSchedulerId = conf.request_scheduler_id; - } - else - { - // Default to Keyspace - requestSchedulerId = RequestSchedulerId.keyspace; - } - // if data dirs, commitlog dir, or saved caches dir are set in cassandra.yaml, use that. Otherwise, // use -Dcassandra.storagedir (set in cassandra-env.sh) as the parent dir for data/, commitlog/, and saved_caches/ if (conf.commitlog_directory == null) @@ -700,97 +523,242 @@ public class DatabaseDescriptor else if (conf.num_tokens > MAX_NUM_TOKENS) throw new ConfigurationException(String.format("A maximum number of %d tokens per node is supported", MAX_NUM_TOKENS), false); - if (conf.initial_token != null) + try { - Collection<String> tokens = tokensFromString(conf.initial_token); - if (tokens.size() != conf.num_tokens) - throw new ConfigurationException("The number of initial tokens (by initial_token) specified is different from num_tokens value", false); + // if prepared_statements_cache_size_mb option was set to "auto" then size of the cache should be "max(1/256 of Heap (in MB), 10MB)" + preparedStatementsCacheSizeInMB = (conf.prepared_statements_cache_size_mb == null) + ? Math.max(10, (int) (Runtime.getRuntime().maxMemory() / 1024 / 1024 / 256)) + : conf.prepared_statements_cache_size_mb; - for (String token : tokens) - partitioner.getTokenFactory().validate(token); + if (preparedStatementsCacheSizeInMB <= 0) + throw new NumberFormatException(); // to escape duplicating error message + } + catch (NumberFormatException e) + { + throw new ConfigurationException("prepared_statements_cache_size_mb option was set incorrectly to '" + + conf.prepared_statements_cache_size_mb + "', supported values are <integer> >= 0.", false); + } + + try + { + // if thrift_prepared_statements_cache_size_mb option was set to "auto" then size of the cache should be "max(1/256 of Heap (in MB), 10MB)" + thriftPreparedStatementsCacheSizeInMB = (conf.thrift_prepared_statements_cache_size_mb == null) + ? Math.max(10, (int) (Runtime.getRuntime().maxMemory() / 1024 / 1024 / 256)) + : conf.thrift_prepared_statements_cache_size_mb; + + if (thriftPreparedStatementsCacheSizeInMB <= 0) + throw new NumberFormatException(); // to escape duplicating error message + } + catch (NumberFormatException e) + { + throw new ConfigurationException("thrift_prepared_statements_cache_size_mb option was set incorrectly to '" + + conf.thrift_prepared_statements_cache_size_mb + "', supported values are <integer> >= 0.", false); } + try + { + // if key_cache_size_in_mb option was set to "auto" then size of the cache should be "min(5% of Heap (in MB), 100MB) + keyCacheSizeInMB = (conf.key_cache_size_in_mb == null) + ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)), 100) + : conf.key_cache_size_in_mb; + + if (keyCacheSizeInMB < 0) + throw new NumberFormatException(); // to escape duplicating error message + } + catch (NumberFormatException e) + { + throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '" + + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.", false); + } try { - // if prepared_statements_cache_size_mb option was set to "auto" then size of the cache should be "max(1/256 of Heap (in MB), 10MB)" - preparedStatementsCacheSizeInMB = (conf.prepared_statements_cache_size_mb == null) - ? Math.max(10, (int) (Runtime.getRuntime().maxMemory() / 1024 / 1024 / 256)) - : conf.prepared_statements_cache_size_mb; + // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB) + counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null) + ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50) + : conf.counter_cache_size_in_mb; + + if (counterCacheSizeInMB < 0) + throw new NumberFormatException(); // to escape duplicating error message + } + catch (NumberFormatException e) + { + throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '" + + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.", false); + } + + // if set to empty/"auto" then use 5% of Heap size + indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null) + ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)) + : conf.index_summary_capacity_in_mb; + + if (indexSummaryCapacityInMB < 0) + throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '" + + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false); + + if(conf.encryption_options != null) + { + logger.warn("Please rename encryption_options as server_encryption_options in the yaml"); + //operate under the assumption that server_encryption_options is not set in yaml rather than both + conf.server_encryption_options = conf.encryption_options; + } + + if (conf.user_defined_function_fail_timeout < 0) + throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false); + if (conf.user_defined_function_warn_timeout < 0) + throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false); + + if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout) + throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false); + + if (conf.max_mutation_size_in_kb == null) + conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2; + else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb) + throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false); + + // native transport encryption options + if (conf.native_transport_port_ssl != null + && conf.native_transport_port_ssl.intValue() != conf.native_transport_port.intValue() + && !conf.client_encryption_options.enabled) + { + throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false); + } + + if (conf.max_value_size_in_mb == null || conf.max_value_size_in_mb <= 0) + throw new ConfigurationException("max_value_size_in_mb must be positive", false); + + switch (conf.disk_optimization_strategy) + { + case ssd: + diskOptimizationStrategy = new SsdDiskOptimizationStrategy(conf.disk_optimization_page_cross_chance); + break; + case spinning: + diskOptimizationStrategy = new SpinningDiskOptimizationStrategy(); + break; + } + } + + public static void applyAddressConfig() throws ConfigurationException + { + applyAddressConfig(conf); + } + + public static void applyAddressConfig(Config config) throws ConfigurationException + { + listenAddress = null; + rpcAddress = null; + broadcastAddress = null; + broadcastRpcAddress = null; + + /* Local IP, hostname or interface to bind services to */ + if (config.listen_address != null && config.listen_interface != null) + { + throw new ConfigurationException("Set listen_address OR listen_interface, not both", false); + } + else if (config.listen_address != null) + { + try + { + listenAddress = InetAddress.getByName(config.listen_address); + } + catch (UnknownHostException e) + { + throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false); + } - if (preparedStatementsCacheSizeInMB <= 0) - throw new NumberFormatException(); // to escape duplicating error message + if (listenAddress.isAnyLocalAddress()) + throw new ConfigurationException("listen_address cannot be a wildcard address (" + config.listen_address + ")!", false); } - catch (NumberFormatException e) + else if (config.listen_interface != null) { - throw new ConfigurationException("prepared_statements_cache_size_mb option was set incorrectly to '" - + conf.prepared_statements_cache_size_mb + "', supported values are <integer> >= 0.", false); + listenAddress = getNetworkInterfaceAddress(config.listen_interface, "listen_interface", config.listen_interface_prefer_ipv6); } - try + /* Gossip Address to broadcast */ + if (config.broadcast_address != null) { - // if thrift_prepared_statements_cache_size_mb option was set to "auto" then size of the cache should be "max(1/256 of Heap (in MB), 10MB)" - thriftPreparedStatementsCacheSizeInMB = (conf.thrift_prepared_statements_cache_size_mb == null) - ? Math.max(10, (int) (Runtime.getRuntime().maxMemory() / 1024 / 1024 / 256)) - : conf.thrift_prepared_statements_cache_size_mb; + try + { + broadcastAddress = InetAddress.getByName(config.broadcast_address); + } + catch (UnknownHostException e) + { + throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false); + } - if (thriftPreparedStatementsCacheSizeInMB <= 0) - throw new NumberFormatException(); // to escape duplicating error message + if (broadcastAddress.isAnyLocalAddress()) + throw new ConfigurationException("broadcast_address cannot be a wildcard address (" + config.broadcast_address + ")!", false); } - catch (NumberFormatException e) + + /* Local IP, hostname or interface to bind RPC server to */ + if (config.rpc_address != null && config.rpc_interface != null) { - throw new ConfigurationException("thrift_prepared_statements_cache_size_mb option was set incorrectly to '" - + conf.thrift_prepared_statements_cache_size_mb + "', supported values are <integer> >= 0.", false); + throw new ConfigurationException("Set rpc_address OR rpc_interface, not both", false); } - - try + else if (config.rpc_address != null) { - // if key_cache_size_in_mb option was set to "auto" then size of the cache should be "min(5% of Heap (in MB), 100MB) - keyCacheSizeInMB = (conf.key_cache_size_in_mb == null) - ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)), 100) - : conf.key_cache_size_in_mb; - - if (keyCacheSizeInMB < 0) - throw new NumberFormatException(); // to escape duplicating error message + try + { + rpcAddress = InetAddress.getByName(config.rpc_address); + } + catch (UnknownHostException e) + { + throw new ConfigurationException("Unknown host in rpc_address " + config.rpc_address, false); + } } - catch (NumberFormatException e) + else if (config.rpc_interface != null) { - throw new ConfigurationException("key_cache_size_in_mb option was set incorrectly to '" - + conf.key_cache_size_in_mb + "', supported values are <integer> >= 0.", false); + rpcAddress = getNetworkInterfaceAddress(config.rpc_interface, "rpc_interface", config.rpc_interface_prefer_ipv6); + } + else + { + rpcAddress = FBUtilities.getLocalAddress(); } - try + /* RPC address to broadcast */ + if (config.broadcast_rpc_address != null) { - // if counter_cache_size_in_mb option was set to "auto" then size of the cache should be "min(2.5% of Heap (in MB), 50MB) - counterCacheSizeInMB = (conf.counter_cache_size_in_mb == null) - ? Math.min(Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.025 / 1024 / 1024)), 50) - : conf.counter_cache_size_in_mb; + try + { + broadcastRpcAddress = InetAddress.getByName(config.broadcast_rpc_address); + } + catch (UnknownHostException e) + { + throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false); + } - if (counterCacheSizeInMB < 0) - throw new NumberFormatException(); // to escape duplicating error message + if (broadcastRpcAddress.isAnyLocalAddress()) + throw new ConfigurationException("broadcast_rpc_address cannot be a wildcard address (" + config.broadcast_rpc_address + ")!", false); } - catch (NumberFormatException e) + else { - throw new ConfigurationException("counter_cache_size_in_mb option was set incorrectly to '" - + conf.counter_cache_size_in_mb + "', supported values are <integer> >= 0.", false); + if (rpcAddress.isAnyLocalAddress()) + throw new ConfigurationException("If rpc_address is set to a wildcard address (" + config.rpc_address + "), then " + + "you must set broadcast_rpc_address to a value other than " + config.rpc_address, false); } + } - // if set to empty/"auto" then use 5% of Heap size - indexSummaryCapacityInMB = (conf.index_summary_capacity_in_mb == null) - ? Math.max(1, (int) (Runtime.getRuntime().totalMemory() * 0.05 / 1024 / 1024)) - : conf.index_summary_capacity_in_mb; - - if (indexSummaryCapacityInMB < 0) - throw new ConfigurationException("index_summary_capacity_in_mb option was set incorrectly to '" - + conf.index_summary_capacity_in_mb + "', it should be a non-negative integer.", false); + public static void applyThriftHSHA() + { + // fail early instead of OOMing (see CASSANDRA-8116) + if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads == Integer.MAX_VALUE) + throw new ConfigurationException("The hsha rpc_server_type is not compatible with an rpc_max_threads " + + "setting of 'unlimited'. Please see the comments in cassandra.yaml " + + "for rpc_server_type and rpc_max_threads.", + false); + if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads > (FBUtilities.getAvailableProcessors() * 2 + 1024)) + logger.warn("rpc_max_threads setting of {} may be too high for the hsha server and cause unnecessary thread contention, reducing performance", conf.rpc_max_threads); + } - if(conf.encryption_options != null) - { - logger.warn("Please rename encryption_options as server_encryption_options in the yaml"); - //operate under the assumption that server_encryption_options is not set in yaml rather than both - conf.server_encryption_options = conf.encryption_options; - } + public static void applyEncryptionContext() + { + // always attempt to load the cipher factory, as we could be in the situation where the user has disabled encryption, + // but has existing commitlogs and sstables on disk that are still encrypted (and still need to be read) + encryptionContext = new EncryptionContext(conf.transparent_data_encryption_options); + } + public static void applySeedProvider() + { // load the seeds for node contact points if (conf.seed_provider == null) { @@ -808,34 +776,107 @@ public class DatabaseDescriptor } if (seedProvider.getSeeds().size() == 0) throw new ConfigurationException("The seed provider lists no seeds.", false); + } - if (conf.user_defined_function_fail_timeout < 0) - throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false); - if (conf.user_defined_function_warn_timeout < 0) - throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false); + public static void applyInitialTokens() + { + if (conf.initial_token != null) + { + Collection<String> tokens = tokensFromString(conf.initial_token); + if (tokens.size() != conf.num_tokens) + throw new ConfigurationException("The number of initial tokens (by initial_token) specified is different from num_tokens value", false); - if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout) - throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false); + for (String token : tokens) + partitioner.getTokenFactory().validate(token); + } + } - // always attempt to load the cipher factory, as we could be in the situation where the user has disabled encryption, - // but has existing commitlogs and sstables on disk that are still encrypted (and still need to be read) - encryptionContext = new EncryptionContext(config.transparent_data_encryption_options); + // Maybe safe for clients + tools + public static void applyRequestScheduler() + { + /* Request Scheduler setup */ + requestSchedulerOptions = conf.request_scheduler_options; + if (conf.request_scheduler != null) + { + try + { + if (requestSchedulerOptions == null) + { + requestSchedulerOptions = new RequestSchedulerOptions(); + } + Class<?> cls = Class.forName(conf.request_scheduler); + requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions); + } + catch (ClassNotFoundException e) + { + throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler, false); + } + catch (Exception e) + { + throw new ConfigurationException("Unable to instantiate request scheduler", e); + } + } + else + { + requestScheduler = new NoScheduler(); + } - if (conf.max_mutation_size_in_kb == null) - conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2; - else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb) - throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false); + if (conf.request_scheduler_id == RequestSchedulerId.keyspace) + { + requestSchedulerId = conf.request_scheduler_id; + } + else + { + // Default to Keyspace + requestSchedulerId = RequestSchedulerId.keyspace; + } + } - // native transport encryption options - if (conf.native_transport_port_ssl != null - && conf.native_transport_port_ssl.intValue() != conf.native_transport_port.intValue() - && !conf.client_encryption_options.enabled) + // definitely not safe for tools + clients - implicitly instantiates StorageService + public static void applySnitch() + { + /* end point snitch */ + if (conf.endpoint_snitch == null) { - throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false); + throw new ConfigurationException("Missing endpoint_snitch directive", false); } + snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch); + EndpointSnitchInfo.create(); - if (conf.max_value_size_in_mb == null || conf.max_value_size_in_mb <= 0) - throw new ConfigurationException("max_value_size_in_mb must be positive", false); + localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress()); + localComparator = new Comparator<InetAddress>() + { + public int compare(InetAddress endpoint1, InetAddress endpoint2) + { + boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1)); + boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2)); + if (local1 && !local2) + return -1; + if (local2 && !local1) + return 1; + return 0; + } + }; + } + + // definitely not safe for tools + clients - implicitly instantiates schema + public static void applyPartitioner() + { + /* Hashing strategy */ + if (conf.partitioner == null) + { + throw new ConfigurationException("Missing directive: partitioner", false); + } + try + { + partitioner = FBUtilities.newPartitioner(System.getProperty("cassandra.partitioner", conf.partitioner)); + } + catch (Exception e) + { + throw new ConfigurationException("Invalid partitioner class " + conf.partitioner, false); + } + + paritionerName = partitioner.getClass().getCanonicalName(); } private static FileStore guessFileStore(String dir) throws IOException @@ -870,16 +911,31 @@ public class DatabaseDescriptor return authenticator; } + public static void setAuthenticator(IAuthenticator authenticator) + { + DatabaseDescriptor.authenticator = authenticator; + } + public static IAuthorizer getAuthorizer() { return authorizer; } + public static void setAuthorizer(IAuthorizer authorizer) + { + DatabaseDescriptor.authorizer = authorizer; + } + public static IRoleManager getRoleManager() { return roleManager; } + public static void setRoleManager(IRoleManager roleManager) + { + DatabaseDescriptor.roleManager = roleManager; + } + public static int getPermissionsValidity() { return conf.permissions_validity_in_ms; @@ -1183,16 +1239,6 @@ public class DatabaseDescriptor } } - public static boolean isReplacing() - { - if (System.getProperty("cassandra.replace_address_first_boot", null) != null && SystemKeyspace.bootstrapComplete()) - { - logger.info("Replace address on first boot requested; this node is already bootstrapped"); - return false; - } - return getReplaceAddress() != null; - } - public static String getClusterName() { return conf.cluster_name; @@ -1293,34 +1339,6 @@ public class DatabaseDescriptor return conf.cross_node_timeout; } - // not part of the Verb enum so we can change timeouts easily via JMX - public static long getTimeout(MessagingService.Verb verb) - { - switch (verb) - { - case READ: - return getReadRpcTimeout(); - case RANGE_SLICE: - case PAGED_RANGE: - return getRangeRpcTimeout(); - case TRUNCATE: - return getTruncateRpcTimeout(); - case READ_REPAIR: - case MUTATION: - case PAXOS_COMMIT: - case PAXOS_PREPARE: - case PAXOS_PROPOSE: - case HINT: - case BATCH_STORE: - case BATCH_REMOVE: - return getWriteRpcTimeout(); - case COUNTER_MUTATION: - return getCounterWriteRpcTimeout(); - default: - return getRpcTimeout(); - } - } - public static long getSlowQueryTimeout() { return conf.slow_query_log_timeout_in_ms; @@ -1530,6 +1548,11 @@ public class DatabaseDescriptor return internodeAuthenticator; } + public static void setInternodeAuthenticator(IInternodeAuthenticator internodeAuthenticator) + { + DatabaseDescriptor.internodeAuthenticator = internodeAuthenticator; + } + public static void setBroadcastAddress(InetAddress broadcastAdd) { broadcastAddress = broadcastAdd; @@ -1793,7 +1816,7 @@ public class DatabaseDescriptor return new File(conf.hints_directory); } - public static File getSerializedCachePath(CacheService.CacheType cacheType, String version, String extension) + public static File getSerializedCachePath(CacheType cacheType, String version, String extension) { String name = cacheType.toString() + (version == null ? "" : "-" + version + "." + extension); @@ -1828,12 +1851,12 @@ public class DatabaseDescriptor conf.dynamic_snitch_badness_threshold = dynamicBadnessThreshold; } - public static ServerEncryptionOptions getServerEncryptionOptions() + public static EncryptionOptions.ServerEncryptionOptions getServerEncryptionOptions() { return conf.server_encryption_options; } - public static ClientEncryptionOptions getClientEncryptionOptions() + public static EncryptionOptions.ClientEncryptionOptions getClientEncryptionOptions() { return conf.client_encryption_options; } @@ -2060,33 +2083,24 @@ public class DatabaseDescriptor return conf.inter_dc_tcp_nodelay; } + public static long getMemtableHeapSpaceInMb() + { + return conf.memtable_heap_space_in_mb; + } + + public static long getMemtableOffheapSpaceInMb() + { + return conf.memtable_offheap_space_in_mb; + } - public static SSTableFormat.Type getSSTableFormat() + public static Config.MemtableAllocationType getMemtableAllocationType() { - return sstable_format; + return conf.memtable_allocation_type; } - public static MemtablePool getMemtableAllocatorPool() + public static Float getMemtableCleanupThreshold() { - long heapLimit = ((long) conf.memtable_heap_space_in_mb) << 20; - long offHeapLimit = ((long) conf.memtable_offheap_space_in_mb) << 20; - switch (conf.memtable_allocation_type) - { - case unslabbed_heap_buffers: - return new HeapPool(heapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily()); - case heap_buffers: - return new SlabPool(heapLimit, 0, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily()); - case offheap_buffers: - if (!FileUtils.isCleanerAvailable) - { - throw new IllegalStateException("Could not free direct byte buffer: offheap_buffers is not a safe memtable_allocation_type without this ability, please adjust your config. This feature is only guaranteed to work on an Oracle JVM. Refusing to start."); - } - return new SlabPool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily()); - case offheap_objects: - return new NativePool(heapLimit, offHeapLimit, conf.memtable_cleanup_threshold, new ColumnFamilyStore.FlushLargestColumnFamily()); - default: - throw new AssertionError(); - } + return conf.memtable_cleanup_threshold; } public static int getIndexSummaryResizeIntervalInMinutes()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/config/Schema.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Schema.java b/src/java/org/apache/cassandra/config/Schema.java index dd42779..eed316b 100644 --- a/src/java/org/apache/cassandra/config/Schema.java +++ b/src/java/org/apache/cassandra/config/Schema.java @@ -17,18 +17,14 @@ */ package org.apache.cassandra.config; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.*; import java.util.stream.Collectors; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.auth.AuthKeyspace; import org.apache.cassandra.cql3.functions.*; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; @@ -40,10 +36,8 @@ import org.apache.cassandra.db.marshal.UserType; import org.apache.cassandra.index.Index; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.locator.LocalStrategy; -import org.apache.cassandra.repair.SystemDistributedKeyspace; import org.apache.cassandra.schema.*; import org.apache.cassandra.service.MigrationManager; -import org.apache.cassandra.tracing.TraceKeyspace; import org.apache.cassandra.utils.ConcurrentBiMap; import org.apache.cassandra.utils.Pair; import org.cliffc.high_scale_lib.NonBlockingHashMap; @@ -54,22 +48,6 @@ public class Schema public static final Schema instance = new Schema(); - /* system keyspace names (the ones with LocalStrategy replication strategy) */ - public static final Set<String> SYSTEM_KEYSPACE_NAMES = ImmutableSet.of(SystemKeyspace.NAME, SchemaKeyspace.NAME); - - /* replicate system keyspace names (the ones with a "true" replication strategy) */ - public static final Set<String> REPLICATED_SYSTEM_KEYSPACE_NAMES = ImmutableSet.of(TraceKeyspace.NAME, - AuthKeyspace.NAME, - SystemDistributedKeyspace.NAME); - - /** - * longest permissible KS or CF name. Our main concern is that filename not be more than 255 characters; - * the filename will contain both the KS and CF names. Since non-schema-name components only take up - * ~64 characters, we could allow longer names than this, but on Windows, the entire path should be not greater than - * 255 characters, so a lower limit here helps avoid problems. See CASSANDRA-4110. - */ - public static final int NAME_LENGTH = 48; - /* metadata map for faster keyspace lookup */ private final Map<String, KeyspaceMetadata> keyspaces = new NonBlockingHashMap<>(); @@ -81,22 +59,6 @@ public class Schema private volatile UUID version; - // 59adb24e-f3cd-3e02-97f0-5b395827453f - public static final UUID emptyVersion; - - - static - { - try - { - emptyVersion = UUID.nameUUIDFromBytes(MessageDigest.getInstance("MD5").digest()); - } - catch (NoSuchAlgorithmException e) - { - throw new AssertionError(); - } - } - /** * Initialize empty schema object and load the hardcoded system tables */ @@ -110,14 +72,6 @@ public class Schema } /** - * @return whether or not the keyspace is a really system one (w/ LocalStrategy, unmodifiable, hardcoded) - */ - public static boolean isSystemKeyspace(String keyspaceName) - { - return SYSTEM_KEYSPACE_NAMES.contains(keyspaceName.toLowerCase()); - } - - /** * load keyspace (keyspace) definitions, but do not initialize the keyspace instances. * Schema version may be updated as the result. */ @@ -341,7 +295,7 @@ public class Schema private Set<String> getNonSystemKeyspacesSet() { - return Sets.difference(keyspaces.keySet(), SYSTEM_KEYSPACE_NAMES); + return Sets.difference(keyspaces.keySet(), SchemaConstants.SYSTEM_KEYSPACE_NAMES); } /** @@ -370,7 +324,7 @@ public class Schema */ public List<String> getUserKeyspaces() { - return ImmutableList.copyOf(Sets.difference(getNonSystemKeyspacesSet(), REPLICATED_SYSTEM_KEYSPACE_NAMES)); + return ImmutableList.copyOf(Sets.difference(getNonSystemKeyspacesSet(), SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES)); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/config/SchemaConstants.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/SchemaConstants.java b/src/java/org/apache/cassandra/config/SchemaConstants.java new file mode 100644 index 0000000..2416d6b --- /dev/null +++ b/src/java/org/apache/cassandra/config/SchemaConstants.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.config; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Set; +import java.util.UUID; + +import com.google.common.collect.ImmutableSet; + +public final class SchemaConstants +{ + public static final String SYSTEM_KEYSPACE_NAME = "system"; + public static final String SCHEMA_KEYSPACE_NAME = "system_schema"; + + public static final String TRACE_KEYSPACE_NAME = "system_traces"; + public static final String AUTH_KEYSPACE_NAME = "system_auth"; + public static final String DISTRIBUTED_KEYSPACE_NAME = "system_distributed"; + + /* system keyspace names (the ones with LocalStrategy replication strategy) */ + public static final Set<String> SYSTEM_KEYSPACE_NAMES = ImmutableSet.of(SYSTEM_KEYSPACE_NAME, SCHEMA_KEYSPACE_NAME); + + /* replicate system keyspace names (the ones with a "true" replication strategy) */ + public static final Set<String> REPLICATED_SYSTEM_KEYSPACE_NAMES = ImmutableSet.of(TRACE_KEYSPACE_NAME, + AUTH_KEYSPACE_NAME, + DISTRIBUTED_KEYSPACE_NAME); + /** + * longest permissible KS or CF name. Our main concern is that filename not be more than 255 characters; + * the filename will contain both the KS and CF names. Since non-schema-name components only take up + * ~64 characters, we could allow longer names than this, but on Windows, the entire path should be not greater than + * 255 characters, so a lower limit here helps avoid problems. See CASSANDRA-4110. + */ + public static final int NAME_LENGTH = 48; + + // 59adb24e-f3cd-3e02-97f0-5b395827453f + public static final UUID emptyVersion; + + static + { + try + { + emptyVersion = UUID.nameUUIDFromBytes(MessageDigest.getInstance("MD5").digest()); + } + catch (NoSuchAlgorithmException e) + { + throw new AssertionError(); + } + } + + /** + * @return whether or not the keyspace is a really system one (w/ LocalStrategy, unmodifiable, hardcoded) + */ + public static boolean isSystemKeyspace(String keyspaceName) + { + return SYSTEM_KEYSPACE_NAMES.contains(keyspaceName.toLowerCase()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java index bd5638a..ca5e41a 100644 --- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java +++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java @@ -91,11 +91,13 @@ public class YamlConfigurationLoader implements ConfigurationLoader return url; } - private static final URL storageConfigURL = getStorageConfigURL(); + private static URL storageConfigURL; @Override public Config loadConfig() throws ConfigurationException { + if (storageConfigURL == null) + storageConfigURL = getStorageConfigURL(); return loadConfig(storageConfigURL); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index 47462e4..4baa38c 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -35,8 +35,10 @@ import org.slf4j.LoggerFactory; import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; import org.antlr.runtime.*; import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.cql3.functions.Function; import org.apache.cassandra.cql3.functions.FunctionName; import org.apache.cassandra.cql3.statements.*; @@ -134,7 +136,7 @@ public class QueryProcessor implements QueryHandler InternalStateInstance() { ClientState state = ClientState.forInternalCalls(); - state.setKeyspace(SystemKeyspace.NAME); + state.setKeyspace(SchemaConstants.SYSTEM_KEYSPACE_NAME); this.queryState = new QueryState(state); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/functions/FunctionName.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/functions/FunctionName.java b/src/java/org/apache/cassandra/cql3/functions/FunctionName.java index d732efa..aa980e9 100644 --- a/src/java/org/apache/cassandra/cql3/functions/FunctionName.java +++ b/src/java/org/apache/cassandra/cql3/functions/FunctionName.java @@ -19,7 +19,7 @@ package org.apache.cassandra.cql3.functions; import com.google.common.base.Objects; -import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.config.SchemaConstants; public final class FunctionName { @@ -28,7 +28,7 @@ public final class FunctionName public static FunctionName nativeFunction(String name) { - return new FunctionName(SystemKeyspace.NAME, name); + return new FunctionName(SchemaConstants.SYSTEM_KEYSPACE_NAME, name); } public FunctionName(String keyspace, String name) @@ -67,8 +67,8 @@ public final class FunctionName public final boolean equalsNativeFunction(FunctionName nativeFunction) { - assert nativeFunction.keyspace.equals(SystemKeyspace.NAME); - if (this.hasKeyspace() && !this.keyspace.equals(SystemKeyspace.NAME)) + assert nativeFunction.keyspace.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME); + if (this.hasKeyspace() && !this.keyspace.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME)) return false; return Objects.equal(this.name, nativeFunction.name); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java index 5642b0d..591e54a 100644 --- a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java @@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements; import org.apache.cassandra.auth.Permission; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.locator.LocalStrategy; import org.apache.cassandra.schema.KeyspaceMetadata; @@ -55,7 +56,7 @@ public class AlterKeyspaceStatement extends SchemaAlteringStatement KeyspaceMetadata ksm = Schema.instance.getKSMetaData(name); if (ksm == null) throw new InvalidRequestException("Unknown keyspace " + name); - if (Schema.isSystemKeyspace(ksm.name)) + if (SchemaConstants.isSystemKeyspace(ksm.name)) throw new InvalidRequestException("Cannot alter system keyspace"); attrs.validate(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java index f88c04f..33d2ce4 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateKeyspaceStatement.java @@ -21,7 +21,7 @@ import java.util.regex.Pattern; import org.apache.cassandra.auth.*; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.config.Schema; +import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.locator.LocalStrategy; import org.apache.cassandra.schema.KeyspaceMetadata; @@ -79,8 +79,8 @@ public class CreateKeyspaceStatement extends SchemaAlteringStatement // keyspace name if (!PATTERN_WORD_CHARS.matcher(name).matches()) throw new InvalidRequestException(String.format("\"%s\" is not a valid keyspace name", name)); - if (name.length() > Schema.NAME_LENGTH) - throw new InvalidRequestException(String.format("Keyspace names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, name)); + if (name.length() > SchemaConstants.NAME_LENGTH) + throw new InvalidRequestException(String.format("Keyspace names shouldn't be more than %s characters long (got \"%s\")", SchemaConstants.NAME_LENGTH, name)); attrs.validate(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java index 08c3a4c..90f0cdb 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java @@ -206,8 +206,8 @@ public class CreateTableStatement extends SchemaAlteringStatement // Column family name if (!PATTERN_WORD_CHARS.matcher(columnFamily()).matches()) throw new InvalidRequestException(String.format("\"%s\" is not a valid table name (must be alphanumeric character or underscore only: [a-zA-Z_0-9]+)", columnFamily())); - if (columnFamily().length() > Schema.NAME_LENGTH) - throw new InvalidRequestException(String.format("Table names shouldn't be more than %s characters long (got \"%s\")", Schema.NAME_LENGTH, columnFamily())); + if (columnFamily().length() > SchemaConstants.NAME_LENGTH) + throw new InvalidRequestException(String.format("Table names shouldn't be more than %s characters long (got \"%s\")", SchemaConstants.NAME_LENGTH, columnFamily())); for (Multiset.Entry<ColumnIdentifier> entry : definedNames.entrySet()) if (entry.getCount() > 1) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java index 58f8e9c..b8f2f92 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ListPermissionsStatement.java @@ -21,6 +21,7 @@ import java.util.*; import org.apache.cassandra.auth.*; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.cql3.*; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.exceptions.InvalidRequestException; @@ -31,7 +32,7 @@ import org.apache.cassandra.transport.messages.ResultMessage; public class ListPermissionsStatement extends AuthorizationStatement { - private static final String KS = AuthKeyspace.NAME; + private static final String KS = SchemaConstants.AUTH_KEYSPACE_NAME; private static final String CF = "permissions"; // virtual cf to use for now. private static final List<ColumnSpecification> metadata; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java index 477aedc..3fee57a 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ListRolesStatement.java @@ -26,6 +26,7 @@ import com.google.common.collect.Lists; import org.apache.cassandra.auth.*; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.cql3.*; import org.apache.cassandra.db.marshal.BooleanType; import org.apache.cassandra.db.marshal.MapType; @@ -37,7 +38,7 @@ import org.apache.cassandra.transport.messages.ResultMessage; public class ListRolesStatement extends AuthorizationStatement { // pseudo-virtual cf as the actual datasource is dependent on the IRoleManager impl - private static final String KS = AuthKeyspace.NAME; + private static final String KS = SchemaConstants.AUTH_KEYSPACE_NAME; private static final String CF = AuthKeyspace.ROLES; private static final MapType optionsType = MapType.getInstance(UTF8Type.instance, UTF8Type.instance, false); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java index 7251980..0101363 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ListUsersStatement.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import org.apache.cassandra.auth.*; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.cql3.ColumnSpecification; import org.apache.cassandra.cql3.ResultSet; @@ -33,7 +34,7 @@ import org.apache.cassandra.transport.messages.ResultMessage; public class ListUsersStatement extends ListRolesStatement { // pseudo-virtual cf as the actual datasource is dependent on the IRoleManager impl - private static final String KS = AuthKeyspace.NAME; + private static final String KS = SchemaConstants.AUTH_KEYSPACE_NAME; private static final String CF = "users"; private static final List<ColumnSpecification> metadata = http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java b/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java index b22e400..3ae6bd8 100644 --- a/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/PermissionsManagementStatement.java @@ -21,8 +21,8 @@ import java.util.Set; import org.apache.cassandra.auth.*; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.cql3.RoleName; -import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.RequestValidationException; import org.apache.cassandra.exceptions.UnauthorizedException; @@ -54,7 +54,7 @@ public abstract class PermissionsManagementStatement extends AuthorizationStatem // altering permissions on builtin functions is not supported if (resource instanceof FunctionResource - && SystemKeyspace.NAME.equals(((FunctionResource)resource).getKeyspace())) + && SchemaConstants.SYSTEM_KEYSPACE_NAME.equals(((FunctionResource)resource).getKeyspace())) { throw new InvalidRequestException("Altering permissions on builtin functions is not supported"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 06dff0f..14d6440 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -817,7 +817,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public String getSSTablePath(File directory) { - return getSSTablePath(directory, DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), DatabaseDescriptor.getSSTableFormat()); + return getSSTablePath(directory, SSTableFormat.Type.current().info.getLatestVersion(), SSTableFormat.Type.current()); } public String getSSTablePath(File directory, SSTableFormat.Type format) @@ -1775,7 +1775,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean } writeSnapshotManifest(filesJSONArr, snapshotName); - if (!Schema.SYSTEM_KEYSPACE_NAMES.contains(metadata.ksName) && !Schema.REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(metadata.ksName)) + if (!SchemaConstants.SYSTEM_KEYSPACE_NAMES.contains(metadata.ksName) && !SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(metadata.ksName)) writeSnapshotSchema(snapshotName); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 0d78245..741058f 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -100,7 +100,7 @@ public class Keyspace public static Keyspace open(String keyspaceName) { - assert initialized || Schema.isSystemKeyspace(keyspaceName); + assert initialized || SchemaConstants.isSystemKeyspace(keyspaceName); return open(keyspaceName, Schema.instance, true); } @@ -685,7 +685,7 @@ public class Keyspace public static Iterable<Keyspace> system() { - return Iterables.transform(Schema.SYSTEM_KEYSPACE_NAMES, keyspaceTransformer); + return Iterables.transform(SchemaConstants.SYSTEM_KEYSPACE_NAMES, keyspaceTransformer); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/db/LegacyLayout.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/LegacyLayout.java b/src/java/org/apache/cassandra/db/LegacyLayout.java index 9e7e9b6..4fdf28c 100644 --- a/src/java/org/apache/cassandra/db/LegacyLayout.java +++ b/src/java/org/apache/cassandra/db/LegacyLayout.java @@ -24,6 +24,7 @@ import java.nio.ByteBuffer; import java.security.MessageDigest; import java.util.*; +import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.utils.AbstractIterator; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; @@ -1025,7 +1026,7 @@ public abstract class LegacyLayout // then simply ignore the cell is fine. But also not that we ignore if it's the // system keyspace because for those table we actually remove columns without registering // them in the dropped columns - assert metadata.ksName.equals(SystemKeyspace.NAME) || metadata.getDroppedColumnDefinition(e.columnName) != null : e.getMessage(); + assert metadata.ksName.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME) || metadata.getDroppedColumnDefinition(e.columnName) != null : e.getMessage(); } } } @@ -1105,7 +1106,7 @@ public abstract class LegacyLayout // then simply ignore the cell is fine. But also not that we ignore if it's the // system keyspace because for those table we actually remove columns without registering // them in the dropped columns - if (metadata.ksName.equals(SystemKeyspace.NAME) || metadata.getDroppedColumnDefinition(e.columnName) != null) + if (metadata.ksName.equals(SchemaConstants.SYSTEM_KEYSPACE_NAME) || metadata.getDroppedColumnDefinition(e.columnName) != null) return computeNext(); else throw new IOError(e); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index e9cca4a..f8b258c 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.SchemaConstants; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.commitlog.IntervalSet; @@ -48,20 +49,48 @@ import org.apache.cassandra.index.transactions.UpdateTransaction; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.SSTableMultiWriter; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; +import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.HeapPool; import org.apache.cassandra.utils.memory.MemtableAllocator; import org.apache.cassandra.utils.memory.MemtablePool; +import org.apache.cassandra.utils.memory.NativePool; +import org.apache.cassandra.utils.memory.SlabPool; public class Memtable implements Comparable<Memtable> { private static final Logger logger = LoggerFactory.getLogger(Memtable.class); - public static final MemtablePool MEMORY_POOL = DatabaseDescriptor.getMemtableAllocatorPool(); + public static final MemtablePool MEMORY_POOL = createMemtableAllocatorPool(); + + private static MemtablePool createMemtableAllocatorPool() + { + long heapLimit = DatabaseDescriptor.getMemtableHeapSpaceInMb() << 20; + long offHeapLimit = DatabaseDescriptor.getMemtableOffheapSpaceInMb() << 20; + switch (DatabaseDescriptor.getMemtableAllocationType()) + { + case unslabbed_heap_buffers: + return new HeapPool(heapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily()); + case heap_buffers: + return new SlabPool(heapLimit, 0, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily()); + case offheap_buffers: + if (!FileUtils.isCleanerAvailable) + { + throw new IllegalStateException("Could not free direct byte buffer: offheap_buffers is not a safe memtable_allocation_type without this ability, please adjust your config. This feature is only guaranteed to work on an Oracle JVM. Refusing to start."); + } + return new SlabPool(heapLimit, offHeapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily()); + case offheap_objects: + return new NativePool(heapLimit, offHeapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily()); + default: + throw new AssertionError(); + } + } + private static final int ROW_OVERHEAD_HEAP_SIZE = estimateRowOverhead(Integer.parseInt(System.getProperty("cassandra.memtable_row_overhead_computation_step", "100000"))); private final MemtableAllocator allocator; @@ -416,7 +445,7 @@ public class Memtable implements Comparable<Memtable> + liveDataSize.get()) // data * 1.2); // bloom filter and row index overhead - this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME); + this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SchemaConstants.SYSTEM_KEYSPACE_NAME); if (flushLocation == null) writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getWriteableLocationAsFile(estimatedSize)), columnsCollector.get(), statsCollector.get()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9797511c/src/java/org/apache/cassandra/db/ReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 5f01733..63ea89d 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -452,7 +452,7 @@ public abstract class ReadCommand extends MonitorableImpl implements ReadQuery private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold(); private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold(); - private final boolean respectTombstoneThresholds = !Schema.isSystemKeyspace(ReadCommand.this.metadata().ksName); + private final boolean respectTombstoneThresholds = !SchemaConstants.isSystemKeyspace(ReadCommand.this.metadata().ksName); private int liveRows = 0; private int tombstones = 0;
