This is an automated email from the ASF dual-hosted git repository. edimitrova pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 11cb8104facc4d3455efb73d2701fd5ba510bef1 Merge: c1f60ae 0a1e900 Author: Ekaterina Dimitrova <[email protected]> AuthorDate: Tue Jan 19 17:52:16 2021 -0500 Merge branch 'cassandra-3.0' into cassandra-3.11 CHANGES.txt | 1 + .../cassandra/concurrent/InfiniteLoopExecutor.java | 8 + .../cassandra/config/DatabaseDescriptor.java | 36 +-- .../org/apache/cassandra/db/ColumnFamilyStore.java | 337 +++++++++++---------- src/java/org/apache/cassandra/db/Memtable.java | 13 +- src/java/org/apache/cassandra/db/ReadCommand.java | 2 +- .../apache/cassandra/utils/memory/HeapPool.java | 2 +- .../cassandra/utils/memory/MemtableAllocator.java | 121 ++++++-- .../cassandra/utils/memory/MemtableCleaner.java | 40 +++ .../utils/memory/MemtableCleanerThread.java | 67 +++- .../cassandra/utils/memory/MemtablePool.java | 35 ++- .../apache/cassandra/utils/memory/NativePool.java | 2 +- .../apache/cassandra/utils/memory/SlabPool.java | 2 +- test/unit/org/apache/cassandra/cql3/CQLTester.java | 14 +- .../org/apache/cassandra/db/NativeCellTest.java | 16 +- .../utils/memory/MemtableCleanerThreadTest.java | 187 ++++++++++++ .../utils/memory/NativeAllocatorTest.java | 204 +++++++------ 17 files changed, 758 insertions(+), 329 deletions(-) diff --cc CHANGES.txt index c732513,4f3ab1f..29723d4 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,8 -1,5 +1,9 @@@ -3.0.24: - * Prevent unbounded number of pending flushing tasks; Add PendingFlushTasks metric (CASSANDRA-16261) +3.11.10 + * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925) + * Rate limit validation compactions using compaction_throughput_mb_per_sec (CASSANDRA-16161) + * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to default of 1GB (CASSANDRA-16071) +Merged from 3.0: ++ * Prevent unbounded number of pending flushing tasks (CASSANDRA-16261) * Improve empty hint file handling during startup (CASSANDRA-16162) * Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372) * Fix skipping on pre-3.0 created compact storage sstables due to missing primary key liveness (CASSANDRA-16226) diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java index c88a0e7,52f01b6..aa9a4cc --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@@ -269,10 -148,10 +269,10 @@@ public class DatabaseDescripto if (Config.getOverrideLoadConfig() != null) return Config.getOverrideLoadConfig().get(); - String loaderClass = System.getProperty("cassandra.config.loader"); + String loaderClass = System.getProperty(Config.PROPERTY_PREFIX + "config.loader"); ConfigurationLoader loader = loaderClass == null - ? new YamlConfigurationLoader() - : FBUtilities.<ConfigurationLoader>construct(loaderClass, "configuration loading"); + ? new YamlConfigurationLoader() - : FBUtilities.construct(loaderClass, "configuration loading"); ++ : FBUtilities.construct(loaderClass, "configuration loading"); Config config = loader.loadConfig(); if (!hasLoggedConfig) @@@ -478,46 -516,83 +478,46 @@@ conf.native_transport_max_concurrent_requests_in_bytes_per_ip = Runtime.getRuntime().maxMemory() / 40; } - snitch = createEndpointSnitch(conf.endpoint_snitch); - EndpointSnitchInfo.create(); - - localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress()); - localComparator = (endpoint1, endpoint2) -> { - boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1)); - boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2)); - if (local1 && !local2) - return -1; - if (local2 && !local1) - return 1; - return 0; - }; + if (conf.cdc_raw_directory == null) + { + conf.cdc_raw_directory = storagedirFor("cdc_raw"); + } - /* Request Scheduler setup */ - requestSchedulerOptions = conf.request_scheduler_options; - if (conf.request_scheduler != null) + if (conf.commitlog_total_space_in_mb == null) { + int preferredSize = 8192; + int minSize = 0; try { - if (requestSchedulerOptions == null) - { - requestSchedulerOptions = new RequestSchedulerOptions(); - } - Class<?> cls = Class.forName(conf.request_scheduler); - requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions); - } - catch (ClassNotFoundException e) + // use 1/4 of available space. See discussion on #10013 and #10199 + minSize = Ints.saturatedCast((guessFileStore(conf.commitlog_directory).getTotalSpace() / 1048576) / 4); + } + catch (IOException e) { - throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler, false); + logger.debug("Error checking disk space", e); + throw new ConfigurationException(String.format("Unable to check disk space available to %s. Perhaps the Cassandra user does not have the necessary permissions", + conf.commitlog_directory), e); } - catch (Exception e) + if (minSize < preferredSize) { - throw new ConfigurationException("Unable to instantiate request scheduler", e); + logger.warn("Small commitlog volume detected at {}; setting commitlog_total_space_in_mb to {}. You can override this in cassandra.yaml", + conf.commitlog_directory, minSize); + conf.commitlog_total_space_in_mb = minSize; + } + else + { + conf.commitlog_total_space_in_mb = preferredSize; } - } - else - { - requestScheduler = new NoScheduler(); - } - - if (conf.request_scheduler_id == RequestSchedulerId.keyspace) - { - requestSchedulerId = conf.request_scheduler_id; - } - else - { - // Default to Keyspace - requestSchedulerId = RequestSchedulerId.keyspace; - } - - // if data dirs, commitlog dir, or saved caches dir are set in cassandra.yaml, use that. Otherwise, - // use -Dcassandra.storagedir (set in cassandra-env.sh) as the parent dir for data/, commitlog/, and saved_caches/ - if (conf.commitlog_directory == null) - { - conf.commitlog_directory = System.getProperty("cassandra.storagedir", null); - if (conf.commitlog_directory == null) - throw new ConfigurationException("commitlog_directory is missing and -Dcassandra.storagedir is not set", false); - conf.commitlog_directory += File.separator + "commitlog"; - } - - if (conf.hints_directory == null) - { - conf.hints_directory = System.getProperty("cassandra.storagedir", null); - if (conf.hints_directory == null) - throw new ConfigurationException("hints_directory is missing and -Dcassandra.storagedir is not set", false); - conf.hints_directory += File.separator + "hints"; } - if (conf.commitlog_total_space_in_mb == null) + if (conf.cdc_total_space_in_mb == 0) { - int preferredSize = 8192; + int preferredSize = 4096; - int minSize = 0; + int minSize; try { - // use 1/4 of available space. See discussion on #10013 and #10199 - minSize = Ints.saturatedCast((guessFileStore(conf.commitlog_directory).getTotalSpace() / 1048576) / 4); + // use 1/8th of available space. See discussion on #10013 and #10199 on the CL, taking half that for CDC + minSize = Ints.saturatedCast((guessFileStore(conf.cdc_raw_directory).getTotalSpace() / 1048576) / 8); } catch (IOException e) { @@@ -702,356 -735,10 +702,352 @@@ conf.server_encryption_options = conf.encryption_options; } - // load the seeds for node contact points - if (conf.seed_provider == null) + if (conf.user_defined_function_fail_timeout < 0) + throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false); + if (conf.user_defined_function_warn_timeout < 0) + throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false); + + if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout) + throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false); + + if (conf.commitlog_segment_size_in_mb <= 0) + throw new ConfigurationException("commitlog_segment_size_in_mb must be positive, but was " + + conf.commitlog_segment_size_in_mb, false); + else if (conf.commitlog_segment_size_in_mb >= 2048) + throw new ConfigurationException("commitlog_segment_size_in_mb must be smaller than 2048, but was " + + conf.commitlog_segment_size_in_mb, false); + + if (conf.max_mutation_size_in_kb == null) + conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2; + else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb) + throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false); + + // native transport encryption options + if (conf.native_transport_port_ssl != null + && conf.native_transport_port_ssl != conf.native_transport_port + && !conf.client_encryption_options.enabled) + { + throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false); + } + + // If max protocol version has been set, just validate it's within an acceptable range + if (conf.native_transport_max_negotiable_protocol_version != Integer.MIN_VALUE) + { + try + { + ProtocolVersion.decode(conf.native_transport_max_negotiable_protocol_version, ProtocolVersionLimit.SERVER_DEFAULT); + logger.info("Native transport max negotiable version statically limited to {}", conf.native_transport_max_negotiable_protocol_version); + } + catch (Exception e) + { + throw new ConfigurationException("Invalid setting for native_transport_max_negotiable_protocol_version; " + + ProtocolVersion.invalidVersionMessage(conf.native_transport_max_negotiable_protocol_version)); + } + } + + if (conf.max_value_size_in_mb <= 0) + throw new ConfigurationException("max_value_size_in_mb must be positive", false); + else if (conf.max_value_size_in_mb >= 2048) + throw new ConfigurationException("max_value_size_in_mb must be smaller than 2048, but was " + + conf.max_value_size_in_mb, false); + + switch (conf.disk_optimization_strategy) + { + case ssd: + diskOptimizationStrategy = new SsdDiskOptimizationStrategy(conf.disk_optimization_page_cross_chance); + break; + case spinning: + diskOptimizationStrategy = new SpinningDiskOptimizationStrategy(); + break; + } + + try + { + ParameterizedClass strategy = conf.back_pressure_strategy != null ? conf.back_pressure_strategy : RateBasedBackPressure.withDefaultParams(); + Class<?> clazz = Class.forName(strategy.class_name); + if (!BackPressureStrategy.class.isAssignableFrom(clazz)) + throw new ConfigurationException(strategy + " is not an instance of " + BackPressureStrategy.class.getCanonicalName(), false); + + Constructor<?> ctor = clazz.getConstructor(Map.class); + BackPressureStrategy instance = (BackPressureStrategy) ctor.newInstance(strategy.parameters); + logger.info("Back-pressure is {} with strategy {}.", backPressureEnabled() ? "enabled" : "disabled", conf.back_pressure_strategy); + backPressureStrategy = instance; + } + catch (ConfigurationException ex) + { + throw ex; + } + catch (Exception ex) + { + throw new ConfigurationException("Error configuring back-pressure strategy: " + conf.back_pressure_strategy, ex); + } + + if (conf.otc_coalescing_enough_coalesced_messages > 128) + throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false); + + if (conf.otc_coalescing_enough_coalesced_messages <= 0) + throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false); + } + + private static String storagedirFor(String type) + { + return storagedir(type + "_directory") + File.separator + type; + } + + private static String storagedir(String errMsgType) + { + String storagedir = System.getProperty(Config.PROPERTY_PREFIX + "storagedir", null); + if (storagedir == null) + throw new ConfigurationException(errMsgType + " is missing and -Dcassandra.storagedir is not set", false); + return storagedir; + } + + public static void applyAddressConfig() throws ConfigurationException + { + applyAddressConfig(conf); + } + + public static void applyAddressConfig(Config config) throws ConfigurationException + { + listenAddress = null; + rpcAddress = null; + broadcastAddress = null; + broadcastRpcAddress = null; + + /* Local IP, hostname or interface to bind services to */ + if (config.listen_address != null && config.listen_interface != null) + { + throw new ConfigurationException("Set listen_address OR listen_interface, not both", false); + } + else if (config.listen_address != null) + { + try + { + listenAddress = InetAddress.getByName(config.listen_address); + } + catch (UnknownHostException e) + { - throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false); ++ throw new ConfigurationException("Unknown listen_address '" + config.listen_address + '\'', false); + } + + if (listenAddress.isAnyLocalAddress()) + throw new ConfigurationException("listen_address cannot be a wildcard address (" + config.listen_address + ")!", false); + } + else if (config.listen_interface != null) + { + listenAddress = getNetworkInterfaceAddress(config.listen_interface, "listen_interface", config.listen_interface_prefer_ipv6); + } + + /* Gossip Address to broadcast */ + if (config.broadcast_address != null) + { + try + { + broadcastAddress = InetAddress.getByName(config.broadcast_address); + } + catch (UnknownHostException e) + { - throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false); ++ throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + '\'', false); + } + + if (broadcastAddress.isAnyLocalAddress()) + throw new ConfigurationException("broadcast_address cannot be a wildcard address (" + config.broadcast_address + ")!", false); + } + + /* Local IP, hostname or interface to bind RPC server to */ + if (config.rpc_address != null && config.rpc_interface != null) + { + throw new ConfigurationException("Set rpc_address OR rpc_interface, not both", false); + } + else if (config.rpc_address != null) + { + try + { + rpcAddress = InetAddress.getByName(config.rpc_address); + } + catch (UnknownHostException e) + { + throw new ConfigurationException("Unknown host in rpc_address " + config.rpc_address, false); + } + } + else if (config.rpc_interface != null) + { + rpcAddress = getNetworkInterfaceAddress(config.rpc_interface, "rpc_interface", config.rpc_interface_prefer_ipv6); + } + else + { + rpcAddress = FBUtilities.getLocalAddress(); + } + + /* RPC address to broadcast */ + if (config.broadcast_rpc_address != null) + { + try + { + broadcastRpcAddress = InetAddress.getByName(config.broadcast_rpc_address); + } + catch (UnknownHostException e) + { - throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false); ++ throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + '\'', false); + } + + if (broadcastRpcAddress.isAnyLocalAddress()) + throw new ConfigurationException("broadcast_rpc_address cannot be a wildcard address (" + config.broadcast_rpc_address + ")!", false); + } + else + { + if (rpcAddress.isAnyLocalAddress()) + throw new ConfigurationException("If rpc_address is set to a wildcard address (" + config.rpc_address + "), then " + + "you must set broadcast_rpc_address to a value other than " + config.rpc_address, false); + } + } + + public static void applyThriftHSHA() + { + // fail early instead of OOMing (see CASSANDRA-8116) + if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads == Integer.MAX_VALUE) + throw new ConfigurationException("The hsha rpc_server_type is not compatible with an rpc_max_threads " + + "setting of 'unlimited'. Please see the comments in cassandra.yaml " + + "for rpc_server_type and rpc_max_threads.", + false); + if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads > (FBUtilities.getAvailableProcessors() * 2 + 1024)) + logger.warn("rpc_max_threads setting of {} may be too high for the hsha server and cause unnecessary thread contention, reducing performance", conf.rpc_max_threads); + } + + public static void applyEncryptionContext() + { + // always attempt to load the cipher factory, as we could be in the situation where the user has disabled encryption, + // but has existing commitlogs and sstables on disk that are still encrypted (and still need to be read) + encryptionContext = new EncryptionContext(conf.transparent_data_encryption_options); + } + + public static void applySeedProvider() + { + // load the seeds for node contact points + if (conf.seed_provider == null) + { + throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.", false); + } + try + { + Class<?> seedProviderClass = Class.forName(conf.seed_provider.class_name); + seedProvider = (SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters); + } + // there are about 5 checked exceptions that could be thrown here. + catch (Exception e) + { + throw new ConfigurationException(e.getMessage() + "\nFatal configuration error; unable to start server. See log for stacktrace.", true); + } + if (seedProvider.getSeeds().size() == 0) + throw new ConfigurationException("The seed provider lists no seeds.", false); + } + + public static void applyTokensConfig() + { + applyTokensConfig(conf); + } + + static void applyTokensConfig(Config conf) + { + if (conf.initial_token != null) + { + Collection<String> tokens = tokensFromString(conf.initial_token); + if (conf.num_tokens == null) + { + if (tokens.size() == 1) + conf.num_tokens = 1; + else + throw new ConfigurationException("initial_token was set but num_tokens is not!", false); + } + + if (tokens.size() != conf.num_tokens) + { + throw new ConfigurationException(String.format("The number of initial tokens (by initial_token) specified (%s) is different from num_tokens value (%s)", + tokens.size(), + conf.num_tokens), + false); + } + + for (String token : tokens) + partitioner.getTokenFactory().validate(token); + } + else if (conf.num_tokens == null) + { + conf.num_tokens = 1; + } + } + + // Maybe safe for clients + tools + public static void applyRequestScheduler() + { + /* Request Scheduler setup */ + requestSchedulerOptions = conf.request_scheduler_options; + if (conf.request_scheduler != null) + { + try + { + if (requestSchedulerOptions == null) + { + requestSchedulerOptions = new RequestSchedulerOptions(); + } + Class<?> cls = Class.forName(conf.request_scheduler); + requestScheduler = (IRequestScheduler) cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions); + } + catch (ClassNotFoundException e) + { + throw new ConfigurationException("Invalid Request Scheduler class " + conf.request_scheduler, false); + } + catch (Exception e) + { + throw new ConfigurationException("Unable to instantiate request scheduler", e); + } + } + else + { + requestScheduler = new NoScheduler(); + } + + if (conf.request_scheduler_id == RequestSchedulerId.keyspace) + { + requestSchedulerId = conf.request_scheduler_id; + } + else + { + // Default to Keyspace + requestSchedulerId = RequestSchedulerId.keyspace; + } + } + + // definitely not safe for tools + clients - implicitly instantiates StorageService + public static void applySnitch() + { + /* end point snitch */ + if (conf.endpoint_snitch == null) + { + throw new ConfigurationException("Missing endpoint_snitch directive", false); + } + snitch = createEndpointSnitch(conf.dynamic_snitch, conf.endpoint_snitch); + EndpointSnitchInfo.create(); + + localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress()); - localComparator = new Comparator<InetAddress>() - { - public int compare(InetAddress endpoint1, InetAddress endpoint2) - { - boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1)); - boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2)); - if (local1 && !local2) - return -1; - if (local2 && !local1) - return 1; - return 0; - } ++ localComparator = (endpoint1, endpoint2) -> { ++ boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1)); ++ boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2)); ++ if (local1 && !local2) ++ return -1; ++ if (local2 && !local1) ++ return 1; ++ return 0; + }; + } + + // definitely not safe for tools + clients - implicitly instantiates schema + public static void applyPartitioner() + { + /* Hashing strategy */ + if (conf.partitioner == null) { - throw new ConfigurationException("seeds configuration is missing; a minimum of one seed is required.", false); + throw new ConfigurationException("Missing directive: partitioner", false); } try { @@@ -1395,10 -1057,10 +1391,10 @@@ public static Collection<String> tokensFromString(String tokenString) { - List<String> tokens = new ArrayList<String>(); + List<String> tokens = new ArrayList<>(); if (tokenString != null) - for (String token : tokenString.split(",")) - tokens.add(token.replaceAll("^\\s+", "").replaceAll("\\s+$", "")); + for (String token : StringUtils.split(tokenString, ',')) + tokens.add(token.trim()); return tokens; } @@@ -2073,10 -1740,10 +2069,10 @@@ return new File(conf.hints_directory); } - public static File getSerializedCachePath(CacheService.CacheType cacheType, String version, String extension) + public static File getSerializedCachePath(CacheType cacheType, String version, String extension) { String name = cacheType.toString() - + (version == null ? "" : "-" + version + "." + extension); + + (version == null ? "" : '-' + version + '.' + extension); return new File(conf.saved_caches_directory, name); } diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index abdc3f8,61d60b1..83241e5 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -128,49 -127,27 +129,49 @@@ public class ColumnFamilyStore implemen private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); + /* + We keep a pool of threads for each data directory, size of each pool is memtable_flush_writers. + When flushing we start a Flush runnable in the flushExecutor. Flush calculates how to split the + memtable ranges over the existing data directories and creates a FlushRunnable for each of the directories. + The FlushRunnables are executed in the perDiskflushExecutors and the Flush will block until all FlushRunnables + are finished. By having flushExecutor size the same size as each of the perDiskflushExecutors we make sure we can + have that many flushes going at the same time. + */ - private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), - StageManager.KEEPALIVE, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("MemtableFlushWriter"), - "internal"); + private static final ThreadPoolExecutor flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), + StageManager.KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("MemtableFlushWriter"), + "internal"); + private static final ExecutorService [] perDiskflushExecutors = new ExecutorService[DatabaseDescriptor.getAllDataFileLocations().length]; + static + { + for (int i = 0; i < DatabaseDescriptor.getAllDataFileLocations().length; i++) + { + perDiskflushExecutors[i] = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), + StageManager.KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), + new NamedThreadFactory("PerDiskMemtableFlushWriter_"+i), + "internal"); + } + } + // post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed - private static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1, - StageManager.KEEPALIVE, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("MemtablePostFlush"), - "internal"); - - private static final ExecutorService reclaimExecutor = new JMXEnabledThreadPoolExecutor(1, - StageManager.KEEPALIVE, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("MemtableReclaimMemory"), - "internal"); + private static final ThreadPoolExecutor postFlushExecutor = new JMXEnabledThreadPoolExecutor(1, + StageManager.KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("MemtablePostFlush"), + "internal"); + + private static final ThreadPoolExecutor reclaimExecutor = new JMXEnabledThreadPoolExecutor(1, + StageManager.KEEPALIVE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory("MemtableReclaimMemory"), + "internal"); private static final String[] COUNTER_NAMES = new String[]{"raw", "count", "error", "string"}; private static final String[] COUNTER_DESCS = new String[] @@@ -397,18 -355,28 +394,18 @@@ } } - private ColumnFamilyStore(Keyspace keyspace, - String columnFamilyName, - int generation, - CFMetaData metadata, - Directories directories, - boolean loadSSTables) - { - this(keyspace, columnFamilyName, generation, metadata, directories, loadSSTables, true); - } - - @VisibleForTesting public ColumnFamilyStore(Keyspace keyspace, - String columnFamilyName, - int generation, - CFMetaData metadata, - Directories directories, - boolean loadSSTables, - boolean registerBookeeping, - boolean offline) + String columnFamilyName, + int generation, + CFMetaData metadata, + Directories directories, + boolean loadSSTables, - boolean registerBookkeeping) ++ boolean registerBookeeping, ++ boolean offline) { assert directories != null; - assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName; + assert metadata != null : "null metadata for " + keyspace + ':' + columnFamilyName; this.keyspace = keyspace; this.metadata = metadata; @@@ -612,22 -564,10 +605,22 @@@ CFMetaData metadata, boolean loadSSTables) { - // get the max generation number, to prevent generation conflicts Directories directories = new Directories(metadata, initialDirectories); + return createColumnFamilyStore(keyspace, columnFamily, metadata, directories, loadSSTables, true, false); + } + + /** This is only directly used by offline tools */ + public static synchronized ColumnFamilyStore createColumnFamilyStore(Keyspace keyspace, + String columnFamily, + CFMetaData metadata, + Directories directories, + boolean loadSSTables, + boolean registerBookkeeping, + boolean offline) + { + // get the max generation number, to prevent generation conflicts Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).includeBackups(true); - List<Integer> generations = new ArrayList<Integer>(); + List<Integer> generations = new ArrayList<>(); for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet()) { Descriptor desc = entry.getKey(); @@@ -1078,10 -1011,10 +1074,10 @@@ * In doing so it also tells the write operations to update the commitLogUpperBound of the memtable, so * that we know the CL position we are dirty to, which can be marked clean when we complete. */ - writeBarrier = keyspace.writeOrder.newBarrier(); + writeBarrier = Keyspace.writeOrder.newBarrier(); // submit flushes for the memtable for any indexed sub-cfses, and our own - AtomicReference<ReplayPosition> commitLogUpperBound = new AtomicReference<>(); + AtomicReference<CommitLogPosition> commitLogUpperBound = new AtomicReference<>(); for (ColumnFamilyStore cfs : concatWithIndexes()) { // switch all memtables, regardless of their dirty status, setting the barrier @@@ -1120,125 -1063,42 +1124,135 @@@ try { - boolean flushNonCf2i = true; - for (Memtable memtable : memtables) + // Flush "data" memtable with non-cf 2i first; + flushMemtable(memtables.get(0), true); + for (int i = 1; i < memtables.size(); i++) + flushMemtable(memtables.get(i), false); + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + postFlush.flushFailure = t; + } ++ ++ if (logger.isTraceEnabled()) ++ logger.trace("Flush task {}@{} signaling post flush task", hashCode(), name); ++ + // signal the post-flush we've done our work + postFlush.latch.countDown(); ++ ++ if (logger.isTraceEnabled()) ++ logger.trace("Flush task task {}@{} finished", hashCode(), name); + } + + public Collection<SSTableReader> flushMemtable(Memtable memtable, boolean flushNonCf2i) + { ++ if (logger.isTraceEnabled()) ++ logger.trace("Flush task task {}@{} flushing memtable {}", hashCode(), name, memtable); ++ + if (memtable.isClean() || truncate) + { + memtable.cfs.replaceFlushed(memtable, Collections.emptyList()); + reclaim(memtable); + return Collections.emptyList(); + } + + List<Future<SSTableMultiWriter>> futures = new ArrayList<>(); + long totalBytesOnDisk = 0; + long maxBytesOnDisk = 0; + long minBytesOnDisk = Long.MAX_VALUE; + List<SSTableReader> sstables = new ArrayList<>(); + try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH)) + { + List<Memtable.FlushRunnable> flushRunnables = null; + List<SSTableMultiWriter> flushResults = null; + + try { - Collection<SSTableReader> readers = Collections.emptyList(); - if (!memtable.isClean() && !truncate) + // flush the memtable + flushRunnables = memtable.flushRunnables(txn); + + for (int i = 0; i < flushRunnables.size(); i++) + futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i))); + + /** + * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the + * flushed memtables and CL position, which is as good as we can guarantee. + * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly + * with CL as we do with memtables/CFS-backed SecondaryIndexes. + */ + if (flushNonCf2i) + indexManager.flushAllNonCFSBackedIndexesBlocking(); + + flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures)); + } + catch (Throwable t) + { + t = memtable.abortRunnables(flushRunnables, t); + t = txn.abort(t); + throw Throwables.propagate(t); + } + + try + { + Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator(); + while (writerIterator.hasNext()) { - // TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly - // with CL as we do with memtables/CFS-backed SecondaryIndexes. - if (flushNonCf2i) + @SuppressWarnings("resource") + SSTableMultiWriter writer = writerIterator.next(); + if (writer.getFilePointer() > 0) { - indexManager.flushAllNonCFSBackedIndexesBlocking(); - flushNonCf2i = false; + writer.setOpenResult(true).prepareToCommit(); + } + else + { + maybeFail(writer.abort(null)); + writerIterator.remove(); } - readers = memtable.flush(); } - memtable.cfs.replaceFlushed(memtable, readers); - reclaim(memtable); } - } - catch (Throwable e) - { - JVMStabilityInspector.inspectThrowable(e); - // If we weren't killed, try to continue work but do not allow CommitLog to be discarded. - postFlush.flushFailure = e; - } - finally - { - if (logger.isTraceEnabled()) - logger.trace("Flush task {}@{} signaling post flush task", hashCode(), name); + catch (Throwable t) + { + for (SSTableMultiWriter writer : flushResults) + t = writer.abort(t); + t = txn.abort(t); + Throwables.propagate(t); + } + + txn.prepareToCommit(); - // signal the post-flush we've done our work - postFlush.latch.countDown(); + Throwable accumulate = null; + for (SSTableMultiWriter writer : flushResults) + accumulate = writer.commit(accumulate); - if (logger.isTraceEnabled()) - logger.trace("Flush task task {}@{} finished", hashCode(), name); + maybeFail(txn.commit(accumulate)); + + for (SSTableMultiWriter writer : flushResults) + { + Collection<SSTableReader> flushedSSTables = writer.finished(); + for (SSTableReader sstable : flushedSSTables) + { + if (sstable != null) + { + sstables.add(sstable); + long size = sstable.bytesOnDisk(); + totalBytesOnDisk += size; + maxBytesOnDisk = Math.max(maxBytesOnDisk, size); + minBytesOnDisk = Math.min(minBytesOnDisk, size); + } + } + } } + memtable.cfs.replaceFlushed(memtable, sstables); + reclaim(memtable); + memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables); + logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}", + sstables, + sstables.size(), + FBUtilities.prettyPrintMemory(totalBytesOnDisk), + FBUtilities.prettyPrintMemory(maxBytesOnDisk), + FBUtilities.prettyPrintMemory(minBytesOnDisk)); + return sstables; } private void reclaim(final Memtable memtable) @@@ -1278,58 -1138,77 +1292,77 @@@ * Finds the largest memtable, as a percentage of *either* on- or off-heap memory limits, and immediately * queues it for flushing. If the memtable selected is flushed before this completes, no work is done. */ - public static class FlushLargestColumnFamily implements Runnable + public static CompletableFuture<Boolean> flushLargestMemtable() { - public void run() + float largestRatio = 0f; + Memtable largest = null; + float liveOnHeap = 0, liveOffHeap = 0; + for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { - float largestRatio = 0f; - Memtable largest = null; - float liveOnHeap = 0, liveOffHeap = 0; - for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) - { - // we take a reference to the current main memtable for the CF prior to snapping its ownership ratios - // to ensure we have some ordering guarantee for performing the switchMemtableIf(), i.e. we will only - // swap if the memtables we are measuring here haven't already been swapped by the time we try to swap them - Memtable current = cfs.getTracker().getView().getCurrentMemtable(); - - // find the total ownership ratio for the memtable and all SecondaryIndexes owned by this CF, - // both on- and off-heap, and select the largest of the two ratios to weight this CF - float onHeap = 0f, offHeap = 0f; - onHeap += current.getAllocator().onHeap().ownershipRatio(); - offHeap += current.getAllocator().offHeap().ownershipRatio(); - - for (ColumnFamilyStore indexCfs : cfs.indexManager.getAllIndexColumnFamilyStores()) - { - MemtableAllocator allocator = indexCfs.getTracker().getView().getCurrentMemtable().getAllocator(); - onHeap += allocator.onHeap().ownershipRatio(); - offHeap += allocator.offHeap().ownershipRatio(); - } + // we take a reference to the current main memtable for the CF prior to snapping its ownership ratios + // to ensure we have some ordering guarantee for performing the switchMemtableIf(), i.e. we will only + // swap if the memtables we are measuring here haven't already been swapped by the time we try to swap them + Memtable current = cfs.getTracker().getView().getCurrentMemtable(); - float ratio = Math.max(onHeap, offHeap); - if (ratio > largestRatio) - { - largest = current; - largestRatio = ratio; - } + // find the total ownership ratio for the memtable and all SecondaryIndexes owned by this CF, + // both on- and off-heap, and select the largest of the two ratios to weight this CF + float onHeap = 0f, offHeap = 0f; + onHeap += current.getAllocator().onHeap().ownershipRatio(); + offHeap += current.getAllocator().offHeap().ownershipRatio(); - liveOnHeap += onHeap; - liveOffHeap += offHeap; + for (ColumnFamilyStore indexCfs : cfs.indexManager.getAllIndexColumnFamilyStores()) + { + MemtableAllocator allocator = indexCfs.getTracker().getView().getCurrentMemtable().getAllocator(); + onHeap += allocator.onHeap().ownershipRatio(); + offHeap += allocator.offHeap().ownershipRatio(); } - if (largest != null) + float ratio = Math.max(onHeap, offHeap); + if (ratio > largestRatio) { - float usedOnHeap = Memtable.MEMORY_POOL.onHeap.usedRatio(); - float usedOffHeap = Memtable.MEMORY_POOL.offHeap.usedRatio(); - float flushingOnHeap = Memtable.MEMORY_POOL.onHeap.reclaimingRatio(); - float flushingOffHeap = Memtable.MEMORY_POOL.offHeap.reclaimingRatio(); - float thisOnHeap = largest.getAllocator().onHeap().ownershipRatio(); - float thisOffHeap = largest.getAllocator().offHeap().ownershipRatio(); - logger.debug("Flushing largest {} to free up room. Used total: {}, live: {}, flushing: {}, this: {}", - largest.cfs, ratio(usedOnHeap, usedOffHeap), ratio(liveOnHeap, liveOffHeap), - ratio(flushingOnHeap, flushingOffHeap), ratio(thisOnHeap, thisOffHeap)); - largest.cfs.switchMemtableIfCurrent(largest); + largest = current; + largestRatio = ratio; } + + liveOnHeap += onHeap; + liveOffHeap += offHeap; + } + + CompletableFuture<Boolean> returnFuture = new CompletableFuture<>(); + + if (largest != null) + { + float usedOnHeap = Memtable.MEMORY_POOL.onHeap.usedRatio(); + float usedOffHeap = Memtable.MEMORY_POOL.offHeap.usedRatio(); + float flushingOnHeap = Memtable.MEMORY_POOL.onHeap.reclaimingRatio(); + float flushingOffHeap = Memtable.MEMORY_POOL.offHeap.reclaimingRatio(); + float thisOnHeap = largest.getAllocator().onHeap().ownershipRatio(); + float thisOffHeap = largest.getAllocator().offHeap().ownershipRatio(); + logger.debug("Flushing largest {} to free up room. Used total: {}, live: {}, flushing: {}, this: {}", + largest.cfs, ratio(usedOnHeap, usedOffHeap), ratio(liveOnHeap, liveOffHeap), + ratio(flushingOnHeap, flushingOffHeap), ratio(thisOnHeap, thisOffHeap)); + - ListenableFuture<ReplayPosition> flushFuture = largest.cfs.switchMemtableIfCurrent(largest); ++ ListenableFuture<CommitLogPosition> flushFuture = largest.cfs.switchMemtableIfCurrent(largest); + flushFuture.addListener(() -> { + try + { + flushFuture.get(); + returnFuture.complete(true); + } + catch (Throwable t) + { + returnFuture.completeExceptionally(t); + } + }, MoreExecutors.directExecutor()); + } + else + { + logger.debug("Flushing of largest memtable, not done, no memtable found"); + + returnFuture.complete(false); } + + return returnFuture; } private static String ratio(float onHeap, float offHeap) @@@ -1737,17 -1609,15 +1770,17 @@@ TabularDataSupport result = new TabularDataSupport(COUNTER_TYPE); for (Counter<ByteBuffer> counter : samplerResults.topK) { - byte[] key = counter.getItem().array(); + //Not duplicating the buffer for safety because AbstractSerializer and ByteBufferUtil.bytesToHex + //don't modify position or limit + ByteBuffer key = counter.getItem(); result.put(new CompositeDataSupport(COUNTER_COMPOSITE_TYPE, COUNTER_NAMES, new Object[] { - Hex.bytesToHex(key), // raw - counter.getCount(), // count - counter.getError(), // error - metadata.getKeyValidator().getString(ByteBuffer.wrap(key)) })); // string + ByteBufferUtil.bytesToHex(key), // raw + counter.getCount(), // count + counter.getError(), // error + metadata.getKeyValidator().getString(key) })); // string } return new CompositeDataSupport(SAMPLING_RESULT, SAMPLER_NAMES, new Object[]{ - samplerResults.cardinality, result}); + samplerResults.cardinality, result}); } public boolean isCompactionDiskSpaceCheckEnabled() @@@ -2106,19 -1961,15 +2139,19 @@@ forceMajorCompaction(false); } - public void forceMajorCompaction(boolean splitOutput) throws InterruptedException, ExecutionException - { - + public void forceMajorCompaction(boolean splitOutput) - { ++ { CompactionManager.instance.performMaximal(this, splitOutput); } + public void forceCompactionForTokenRange(Collection<Range<Token>> tokenRanges) throws ExecutionException, InterruptedException + { + CompactionManager.instance.forceCompactionForTokenRange(this, tokenRanges); + } + public static Iterable<ColumnFamilyStore> all() { - List<Iterable<ColumnFamilyStore>> stores = new ArrayList<Iterable<ColumnFamilyStore>>(Schema.instance.getKeyspaces().size()); + List<Iterable<ColumnFamilyStore>> stores = new ArrayList<>(Schema.instance.getKeyspaces().size()); for (Keyspace keyspace : Keyspace.all()) { stores.add(keyspace.getColumnFamilyStores()); @@@ -2161,13 -2012,9 +2194,9 @@@ { for (final ColumnFamilyStore cfs : concatWithIndexes()) { - cfs.runWithCompactionsDisabled(new Callable<Void>() - { - public Void call() - { - cfs.data.reset(new Memtable(new AtomicReference<>(CommitLogPosition.NONE), cfs)); - return null; - } + cfs.runWithCompactionsDisabled((Callable<Void>) () -> { - cfs.data.reset(new Memtable(new AtomicReference<>(ReplayPosition.NONE), cfs)); ++ cfs.data.reset(new Memtable(new AtomicReference<>(CommitLogPosition.NONE), cfs)); + return null; }, true, false); } } @@@ -2220,25 -2067,21 +2249,21 @@@ now = Math.max(now, sstable.maxDataAge); truncatedAt = now; - Runnable truncateRunnable = new Runnable() - { - public void run() - { - logger.debug("Discarding sstable data for truncated CF + indexes"); - data.notifyTruncated(truncatedAt); + Runnable truncateRunnable = () -> { + logger.debug("Discarding sstable data for truncated CF + indexes"); + data.notifyTruncated(truncatedAt); - if (DatabaseDescriptor.isAutoSnapshot()) - snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(name, SNAPSHOT_TRUNCATE_PREFIX)); + if (DatabaseDescriptor.isAutoSnapshot()) - snapshot(Keyspace.getTimestampedSnapshotName(name)); ++ snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(name, SNAPSHOT_TRUNCATE_PREFIX)); - discardSSTables(truncatedAt); + discardSSTables(truncatedAt); - indexManager.truncateAllIndexesBlocking(truncatedAt); - viewManager.truncateBlocking(replayAfter, truncatedAt); + indexManager.truncateAllIndexesBlocking(truncatedAt); + viewManager.truncateBlocking(replayAfter, truncatedAt); - SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter); - logger.trace("cleaning out row cache"); - invalidateCaches(); - } + SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter); + logger.trace("cleaning out row cache"); + invalidateCaches(); }; runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true); @@@ -2699,16 -2532,6 +2720,16 @@@ public static TableMetrics metricsFor(UUID tableId) { - return getIfExists(tableId).metric; + return Objects.requireNonNull(getIfExists(tableId)).metric; } + + public DiskBoundaries getDiskBoundaries() + { + return diskBoundaryManager.getDiskBoundaries(this); + } + + public void invalidateDiskBoundaries() + { + diskBoundaryManager.invalidate(); + } - } + } diff --cc src/java/org/apache/cassandra/db/Memtable.java index 5af789e,139663e..ae8b8d3 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@@ -54,41 -54,15 +54,44 @@@ import org.apache.cassandra.utils.ByteB import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.HeapPool; import org.apache.cassandra.utils.memory.MemtableAllocator; ++import org.apache.cassandra.utils.memory.MemtableCleaner; import org.apache.cassandra.utils.memory.MemtablePool; +import org.apache.cassandra.utils.memory.NativePool; +import org.apache.cassandra.utils.memory.SlabPool; public class Memtable implements Comparable<Memtable> { private static final Logger logger = LoggerFactory.getLogger(Memtable.class); - @VisibleForTesting - public static final MemtablePool MEMORY_POOL = DatabaseDescriptor.getMemtableAllocatorPool(); + public static final MemtablePool MEMORY_POOL = createMemtableAllocatorPool(); + + private static MemtablePool createMemtableAllocatorPool() + { + long heapLimit = DatabaseDescriptor.getMemtableHeapSpaceInMb() << 20; + long offHeapLimit = DatabaseDescriptor.getMemtableOffheapSpaceInMb() << 20; ++ final float cleaningThreshold = DatabaseDescriptor.getMemtableCleanupThreshold(); ++ final MemtableCleaner cleaner = ColumnFamilyStore::flushLargestMemtable; + switch (DatabaseDescriptor.getMemtableAllocationType()) + { + case unslabbed_heap_buffers: - return new HeapPool(heapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily()); ++ return new HeapPool(heapLimit, cleaningThreshold, cleaner); + case heap_buffers: - return new SlabPool(heapLimit, 0, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily()); ++ return new SlabPool(heapLimit, 0, cleaningThreshold, cleaner); + case offheap_buffers: + if (!FileUtils.isCleanerAvailable) + { + throw new IllegalStateException("Could not free direct byte buffer: offheap_buffers is not a safe memtable_allocation_type without this ability, please adjust your config. This feature is only guaranteed to work on an Oracle JVM. Refusing to start."); + } - return new SlabPool(heapLimit, offHeapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily()); ++ return new SlabPool(heapLimit, offHeapLimit, cleaningThreshold, cleaner); + case offheap_objects: - return new NativePool(heapLimit, offHeapLimit, DatabaseDescriptor.getMemtableCleanupThreshold(), new ColumnFamilyStore.FlushLargestColumnFamily()); ++ return new NativePool(heapLimit, offHeapLimit, cleaningThreshold, cleaner); + default: + throw new AssertionError(); + } + } + private static final int ROW_OVERHEAD_HEAP_SIZE = estimateRowOverhead(Integer.parseInt(System.getProperty("cassandra.memtable_row_overhead_computation_step", "100000"))); private final MemtableAllocator allocator; @@@ -395,66 -339,33 +398,66 @@@ @VisibleForTesting public void makeUnflushable() { - liveDataSize.addAndGet(1L * 1024 * 1024 * 1024 * 1024 * 1024); + liveDataSize.addAndGet((long) 1024 * 1024 * 1024 * 1024 * 1024); } - private long estimatedSize() + class FlushRunnable implements Callable<SSTableMultiWriter> { - long keySize = 0; - for (PartitionPosition key : partitions.keySet()) + private final long estimatedSize; + private final ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush; + + private final boolean isBatchLogTable; + private final SSTableMultiWriter writer; + + // keeping these to be able to log what we are actually flushing + private final PartitionPosition from; + private final PartitionPosition to; + + FlushRunnable(PartitionPosition from, PartitionPosition to, Directories.DataDirectory flushLocation, LifecycleTransaction txn) { - // make sure we don't write non-sensical keys - assert key instanceof DecoratedKey; - keySize += ((DecoratedKey)key).getKey().remaining(); + this(partitions.subMap(from, to), flushLocation, from, to, txn); } - return (long) ((keySize // index entries - + keySize // keys in data file - + liveDataSize.get()) // data - * 1.2); // bloom filter and row index overhead - } - private Collection<SSTableReader> writeSortedContents(File sstableDirectory) - { - boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SystemKeyspace.NAME); + FlushRunnable(LifecycleTransaction txn) + { + this(partitions, null, null, null, txn); + } + + FlushRunnable(ConcurrentNavigableMap<PartitionPosition, AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, PartitionPosition from, PartitionPosition to, LifecycleTransaction txn) + { + this.toFlush = toFlush; + this.from = from; + this.to = to; + long keySize = 0; + for (PartitionPosition key : toFlush.keySet()) + { + // make sure we don't write non-sensical keys + assert key instanceof DecoratedKey; + keySize += ((DecoratedKey) key).getKey().remaining(); + } + estimatedSize = (long) ((keySize // index entries + + keySize // keys in data file + + liveDataSize.get()) // data + * 1.2); // bloom filter and row index overhead + + this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && cfs.keyspace.getName().equals(SchemaConstants.SYSTEM_KEYSPACE_NAME); + + if (flushLocation == null) + writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getWriteableLocationAsFile(estimatedSize)), columnsCollector.get(), statsCollector.get()); + else + writer = createFlushWriter(txn, cfs.getSSTablePath(getDirectories().getLocationForDisk(flushLocation)), columnsCollector.get(), statsCollector.get()); - logger.debug("Writing {}", Memtable.this.toString()); + } - Collection<SSTableReader> ssTables; - try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get())) + protected Directories getDirectories() { + return cfs.getDirectories(); + } + + private void writeSortedContents() + { + logger.debug("Writing {}, flushed range = ({}, {}]", Memtable.this.toString(), from, to); + boolean trackContention = logger.isTraceEnabled(); int heavilyContendedRowCount = 0; // (we can't clear out the map as-we-go to free up memory, diff --cc src/java/org/apache/cassandra/db/ReadCommand.java index efe0820,925708e..0543173 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@@ -576,11 -543,9 +576,11 @@@ public abstract class ReadCommand exten logger.warn(msg); } - Tracing.trace("Read {} live and {} tombstone cells{}", liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : "")); + Tracing.trace("Read {} live rows and {} tombstone cells{}", + liveRows, tombstones, + (warnTombstones ? " (see tombstone_warn_threshold)" : "")); } - }; + } return Transformation.apply(iter, new MetricRecording()); } diff --cc src/java/org/apache/cassandra/utils/memory/MemtablePool.java index bd17f78,cd434c5..89d5e37 --- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java +++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java @@@ -74,9 -82,18 +80,19 @@@ public abstract class MemtablePoo ExecutorUtils.shutdownNowAndWait(timeout, unit, cleaner); } + public abstract MemtableAllocator newAllocator(); + public boolean needsCleaning() + { + return onHeap.needsCleaning() || offHeap.needsCleaning(); + } + + public Long getNumPendingtasks() + { + return numPendingTasks.getValue(); + } + /** * Note the difference between acquire() and allocate(); allocate() makes more resources available to all owners, * and acquire() makes shared resources unavailable but still recorded. An Owner must always acquire resources, diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java index 4e320ef,ddeb9da..e09c4df --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@@ -475,18 -444,35 +474,11 @@@ public abstract class CQLTeste store.disableAutoCompaction(); } - public void flush(boolean forceFlush) - { - if (forceFlush) - flush(); - } - - @FunctionalInterface - public interface CheckedFunction { - void apply() throws Throwable; - } - - /** - * Runs the given function before and after a flush of sstables. This is useful for checking that behavior is - * the same whether data is in memtables or sstables. - * @param runnable - * @throws Throwable - */ - public void beforeAndAfterFlush(CheckedFunction runnable) throws Throwable - { - runnable.apply(); - flush(); - runnable.apply(); - } - public void compact() { - try - { - ColumnFamilyStore store = getCurrentColumnFamilyStore(); - if (store != null) - store.forceMajorCompaction(); - } - catch (InterruptedException | ExecutionException e) - { - throw new RuntimeException(e); - } + ColumnFamilyStore store = getCurrentColumnFamilyStore(); + if (store != null) + store.forceMajorCompaction(); } public void disableCompaction() diff --cc test/unit/org/apache/cassandra/db/NativeCellTest.java index 69e615b,0000000..9b18f65 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/db/NativeCellTest.java +++ b/test/unit/org/apache/cassandra/db/NativeCellTest.java @@@ -1,171 -1,0 +1,175 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + - import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; +import java.util.UUID; ++import java.util.concurrent.CompletableFuture; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.SetType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.HeapAllocator; +import org.apache.cassandra.utils.memory.NativeAllocator; +import org.apache.cassandra.utils.memory.NativePool; + +public class NativeCellTest +{ + + private static final Logger logger = LoggerFactory.getLogger(NativeCellTest.class); - private static final NativeAllocator nativeAllocator = new NativePool(Integer.MAX_VALUE, Integer.MAX_VALUE, 1f, null).newAllocator(); ++ private static final NativeAllocator nativeAllocator = new NativePool(Integer.MAX_VALUE, ++ Integer.MAX_VALUE, ++ 1f, ++ () -> CompletableFuture.completedFuture(true)).newAllocator(); ++ @SuppressWarnings("resource") + private static final OpOrder.Group group = new OpOrder().start(); + private static Random rand; + + @BeforeClass + public static void setUp() + { + long seed = System.currentTimeMillis(); + logger.info("Seed : {}", seed); + rand = new Random(seed); + } + + @Test - public void testCells() throws IOException ++ public void testCells() + { + for (int run = 0 ; run < 1000 ; run++) + { + Row.Builder builder = BTreeRow.unsortedBuilder(1); + builder.newRow(rndclustering()); + int count = 1 + rand.nextInt(10); + for (int i = 0 ; i < count ; i++) + rndcd(builder); + test(builder.build()); + } + } + + private static Clustering rndclustering() + { + int count = 1 + rand.nextInt(100); + ByteBuffer[] values = new ByteBuffer[count]; + int size = rand.nextInt(65535); + for (int i = 0 ; i < count ; i++) + { + int twiceShare = 1 + (2 * size) / (count - i); + int nextSize = Math.min(size, rand.nextInt(twiceShare)); + if (nextSize < 10 && rand.nextBoolean()) + continue; + + byte[] bytes = new byte[nextSize]; + rand.nextBytes(bytes); + values[i] = ByteBuffer.wrap(bytes); + size -= nextSize; + } + return Clustering.make(values); + } + + private static void rndcd(Row.Builder builder) + { + ColumnDefinition col = rndcol(); + if (!col.isComplex()) + { + builder.addCell(rndcell(col)); + } + else + { + int count = 1 + rand.nextInt(100); + for (int i = 0 ; i < count ; i++) + builder.addCell(rndcell(col)); + } + } + + private static ColumnDefinition rndcol() + { + UUID uuid = new UUID(rand.nextLong(), rand.nextLong()); + boolean isComplex = rand.nextBoolean(); + return new ColumnDefinition("", + "", + ColumnIdentifier.getInterned(uuid.toString(), false), + isComplex ? new SetType<>(BytesType.instance, true) : BytesType.instance, + -1, + ColumnDefinition.Kind.REGULAR); + } + + private static Cell rndcell(ColumnDefinition col) + { + long timestamp = rand.nextLong(); + int ttl = rand.nextInt(); + int localDeletionTime = rand.nextInt(); + byte[] value = new byte[rand.nextInt(sanesize(expdecay()))]; + rand.nextBytes(value); + CellPath path = null; + if (col.isComplex()) + { + byte[] pathbytes = new byte[rand.nextInt(sanesize(expdecay()))]; + rand.nextBytes(value); + path = CellPath.create(ByteBuffer.wrap(pathbytes)); + } + + return new BufferCell(col, timestamp, ttl, localDeletionTime, ByteBuffer.wrap(value), path); + } + + private static int expdecay() + { + return 1 << Integer.numberOfTrailingZeros(Integer.lowestOneBit(rand.nextInt())); + } + + private static int sanesize(int randomsize) + { + return Math.min(Math.max(1, randomsize), 1 << 26); + } + + private static void test(Row row) + { + Row nrow = clone(row, nativeAllocator.rowBuilder(group)); + Row brow = clone(row, HeapAllocator.instance.cloningBTreeRowBuilder()); + Assert.assertEquals(row, nrow); + Assert.assertEquals(row, brow); + Assert.assertEquals(nrow, brow); + + Assert.assertEquals(row.clustering(), nrow.clustering()); + Assert.assertEquals(row.clustering(), brow.clustering()); + Assert.assertEquals(nrow.clustering(), brow.clustering()); + + ClusteringComparator comparator = new ClusteringComparator(UTF8Type.instance); - Assert.assertTrue(comparator.compare(row.clustering(), nrow.clustering()) == 0); - Assert.assertTrue(comparator.compare(row.clustering(), brow.clustering()) == 0); - Assert.assertTrue(comparator.compare(nrow.clustering(), brow.clustering()) == 0); ++ Assert.assertEquals(0, comparator.compare(row.clustering(), nrow.clustering())); ++ Assert.assertEquals(0, comparator.compare(row.clustering(), brow.clustering())); ++ Assert.assertEquals(0, comparator.compare(nrow.clustering(), brow.clustering())); + } + + private static Row clone(Row row, Row.Builder builder) + { + return Rows.copy(row, builder).build(); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
