This is an automated email from the ASF dual-hosted git repository.

edimitrova pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 11cb8104facc4d3455efb73d2701fd5ba510bef1
Merge: c1f60ae 0a1e900
Author: Ekaterina Dimitrova <[email protected]>
AuthorDate: Tue Jan 19 17:52:16 2021 -0500

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 .../cassandra/concurrent/InfiniteLoopExecutor.java |   8 +
 .../cassandra/config/DatabaseDescriptor.java       |  36 +--
 .../org/apache/cassandra/db/ColumnFamilyStore.java | 337 +++++++++++----------
 src/java/org/apache/cassandra/db/Memtable.java     |  13 +-
 src/java/org/apache/cassandra/db/ReadCommand.java  |   2 +-
 .../apache/cassandra/utils/memory/HeapPool.java    |   2 +-
 .../cassandra/utils/memory/MemtableAllocator.java  | 121 ++++++--
 .../cassandra/utils/memory/MemtableCleaner.java    |  40 +++
 .../utils/memory/MemtableCleanerThread.java        |  67 +++-
 .../cassandra/utils/memory/MemtablePool.java       |  35 ++-
 .../apache/cassandra/utils/memory/NativePool.java  |   2 +-
 .../apache/cassandra/utils/memory/SlabPool.java    |   2 +-
 test/unit/org/apache/cassandra/cql3/CQLTester.java |  14 +-
 .../org/apache/cassandra/db/NativeCellTest.java    |  16 +-
 .../utils/memory/MemtableCleanerThreadTest.java    | 187 ++++++++++++
 .../utils/memory/NativeAllocatorTest.java          | 204 +++++++------
 17 files changed, 758 insertions(+), 329 deletions(-)

diff --cc CHANGES.txt
index c732513,4f3ab1f..29723d4
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,8 -1,5 +1,9 @@@
 -3.0.24:
 - * Prevent unbounded number of pending flushing tasks; Add PendingFlushTasks 
metric (CASSANDRA-16261)
 +3.11.10
 + * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925)
 + * Rate limit validation compactions using compaction_throughput_mb_per_sec 
(CASSANDRA-16161)
 + * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to 
default of 1GB (CASSANDRA-16071)
 +Merged from 3.0:
++ * Prevent unbounded number of pending flushing tasks (CASSANDRA-16261)
   * Improve empty hint file handling during startup (CASSANDRA-16162)
   * Allow empty string in collections with COPY FROM in cqlsh (CASSANDRA-16372)
   * Fix skipping on pre-3.0 created compact storage sstables due to missing 
primary key liveness (CASSANDRA-16226)
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index c88a0e7,52f01b6..aa9a4cc
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -269,10 -148,10 +269,10 @@@ public class DatabaseDescripto
          if (Config.getOverrideLoadConfig() != null)
              return Config.getOverrideLoadConfig().get();
  
 -        String loaderClass = System.getProperty("cassandra.config.loader");
 +        String loaderClass = System.getProperty(Config.PROPERTY_PREFIX + 
"config.loader");
          ConfigurationLoader loader = loaderClass == null
-                                    ? new YamlConfigurationLoader()
-                                    : 
FBUtilities.<ConfigurationLoader>construct(loaderClass, "configuration 
loading");
+                                      ? new YamlConfigurationLoader()
 -                                   : FBUtilities.construct(loaderClass, 
"configuration loading");
++                                     : FBUtilities.construct(loaderClass, 
"configuration loading");
          Config config = loader.loadConfig();
  
          if (!hasLoggedConfig)
@@@ -478,46 -516,83 +478,46 @@@
              conf.native_transport_max_concurrent_requests_in_bytes_per_ip = 
Runtime.getRuntime().maxMemory() / 40;
          }
  
 -        snitch = createEndpointSnitch(conf.endpoint_snitch);
 -        EndpointSnitchInfo.create();
 -
 -        localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
 -        localComparator = (endpoint1, endpoint2) -> {
 -            boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
 -            boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
 -            if (local1 && !local2)
 -                return -1;
 -            if (local2 && !local1)
 -                return 1;
 -            return 0;
 -        };
 +        if (conf.cdc_raw_directory == null)
 +        {
 +            conf.cdc_raw_directory = storagedirFor("cdc_raw");
 +        }
  
 -        /* Request Scheduler setup */
 -        requestSchedulerOptions = conf.request_scheduler_options;
 -        if (conf.request_scheduler != null)
 +        if (conf.commitlog_total_space_in_mb == null)
          {
 +            int preferredSize = 8192;
 +            int minSize = 0;
              try
              {
 -                if (requestSchedulerOptions == null)
 -                {
 -                    requestSchedulerOptions = new RequestSchedulerOptions();
 -                }
 -                Class<?> cls = Class.forName(conf.request_scheduler);
 -                requestScheduler = (IRequestScheduler) 
cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
 -            }
 -            catch (ClassNotFoundException e)
 +                // use 1/4 of available space.  See discussion on #10013 and 
#10199
 +                minSize = 
Ints.saturatedCast((guessFileStore(conf.commitlog_directory).getTotalSpace() / 
1048576) / 4);
 +            }
 +            catch (IOException e)
              {
 -                throw new ConfigurationException("Invalid Request Scheduler 
class " + conf.request_scheduler, false);
 +                logger.debug("Error checking disk space", e);
 +                throw new ConfigurationException(String.format("Unable to 
check disk space available to %s. Perhaps the Cassandra user does not have the 
necessary permissions",
 +                                                               
conf.commitlog_directory), e);
              }
 -            catch (Exception e)
 +            if (minSize < preferredSize)
              {
 -                throw new ConfigurationException("Unable to instantiate 
request scheduler", e);
 +                logger.warn("Small commitlog volume detected at {}; setting 
commitlog_total_space_in_mb to {}.  You can override this in cassandra.yaml",
 +                            conf.commitlog_directory, minSize);
 +                conf.commitlog_total_space_in_mb = minSize;
 +            }
 +            else
 +            {
 +                conf.commitlog_total_space_in_mb = preferredSize;
              }
 -        }
 -        else
 -        {
 -            requestScheduler = new NoScheduler();
 -        }
 -
 -        if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
 -        {
 -            requestSchedulerId = conf.request_scheduler_id;
 -        }
 -        else
 -        {
 -            // Default to Keyspace
 -            requestSchedulerId = RequestSchedulerId.keyspace;
 -        }
 -
 -        // if data dirs, commitlog dir, or saved caches dir are set in 
cassandra.yaml, use that.  Otherwise,
 -        // use -Dcassandra.storagedir (set in cassandra-env.sh) as the parent 
dir for data/, commitlog/, and saved_caches/
 -        if (conf.commitlog_directory == null)
 -        {
 -            conf.commitlog_directory = 
System.getProperty("cassandra.storagedir", null);
 -            if (conf.commitlog_directory == null)
 -                throw new ConfigurationException("commitlog_directory is 
missing and -Dcassandra.storagedir is not set", false);
 -            conf.commitlog_directory += File.separator + "commitlog";
 -        }
 -
 -        if (conf.hints_directory == null)
 -        {
 -            conf.hints_directory = System.getProperty("cassandra.storagedir", 
null);
 -            if (conf.hints_directory == null)
 -                throw new ConfigurationException("hints_directory is missing 
and -Dcassandra.storagedir is not set", false);
 -            conf.hints_directory += File.separator + "hints";
          }
  
 -        if (conf.commitlog_total_space_in_mb == null)
 +        if (conf.cdc_total_space_in_mb == 0)
          {
 -            int preferredSize = 8192;
 +            int preferredSize = 4096;
-             int minSize = 0;
+             int minSize;
              try
              {
 -                // use 1/4 of available space.  See discussion on #10013 and 
#10199
 -                minSize = 
Ints.saturatedCast((guessFileStore(conf.commitlog_directory).getTotalSpace() / 
1048576) / 4);
 +                // use 1/8th of available space.  See discussion on #10013 
and #10199 on the CL, taking half that for CDC
 +                minSize = 
Ints.saturatedCast((guessFileStore(conf.cdc_raw_directory).getTotalSpace() / 
1048576) / 8);
              }
              catch (IOException e)
              {
@@@ -702,356 -735,10 +702,352 @@@
              conf.server_encryption_options = conf.encryption_options;
          }
  
 -        // load the seeds for node contact points
 -        if (conf.seed_provider == null)
 +        if (conf.user_defined_function_fail_timeout < 0)
 +            throw new 
ConfigurationException("user_defined_function_fail_timeout must not be 
negative", false);
 +        if (conf.user_defined_function_warn_timeout < 0)
 +            throw new 
ConfigurationException("user_defined_function_warn_timeout must not be 
negative", false);
 +
 +        if (conf.user_defined_function_fail_timeout < 
conf.user_defined_function_warn_timeout)
 +            throw new 
ConfigurationException("user_defined_function_warn_timeout must less than 
user_defined_function_fail_timeout", false);
 +
 +        if (conf.commitlog_segment_size_in_mb <= 0)
 +            throw new ConfigurationException("commitlog_segment_size_in_mb 
must be positive, but was "
 +                    + conf.commitlog_segment_size_in_mb, false);
 +        else if (conf.commitlog_segment_size_in_mb >= 2048)
 +            throw new ConfigurationException("commitlog_segment_size_in_mb 
must be smaller than 2048, but was "
 +                    + conf.commitlog_segment_size_in_mb, false);
 +
 +        if (conf.max_mutation_size_in_kb == null)
 +            conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb 
* 1024 / 2;
 +        else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * 
conf.max_mutation_size_in_kb)
 +            throw new ConfigurationException("commitlog_segment_size_in_mb 
must be at least twice the size of max_mutation_size_in_kb / 1024", false);
 +
 +        // native transport encryption options
 +        if (conf.native_transport_port_ssl != null
 +            && conf.native_transport_port_ssl != conf.native_transport_port
 +            && !conf.client_encryption_options.enabled)
 +        {
 +            throw new ConfigurationException("Encryption must be enabled in 
client_encryption_options for native_transport_port_ssl", false);
 +        }
 +
 +        // If max protocol version has been set, just validate it's within an 
acceptable range
 +        if (conf.native_transport_max_negotiable_protocol_version != 
Integer.MIN_VALUE)
 +        {
 +            try
 +            {
 +                
ProtocolVersion.decode(conf.native_transport_max_negotiable_protocol_version, 
ProtocolVersionLimit.SERVER_DEFAULT);
 +                logger.info("Native transport max negotiable version 
statically limited to {}", 
conf.native_transport_max_negotiable_protocol_version);
 +            }
 +            catch (Exception e)
 +            {
 +                throw new ConfigurationException("Invalid setting for 
native_transport_max_negotiable_protocol_version; " +
 +                                                 
ProtocolVersion.invalidVersionMessage(conf.native_transport_max_negotiable_protocol_version));
 +            }
 +        }
 +
 +        if (conf.max_value_size_in_mb <= 0)
 +            throw new ConfigurationException("max_value_size_in_mb must be 
positive", false);
 +        else if (conf.max_value_size_in_mb >= 2048)
 +            throw new ConfigurationException("max_value_size_in_mb must be 
smaller than 2048, but was "
 +                    + conf.max_value_size_in_mb, false);
 +
 +        switch (conf.disk_optimization_strategy)
 +        {
 +            case ssd:
 +                diskOptimizationStrategy = new 
SsdDiskOptimizationStrategy(conf.disk_optimization_page_cross_chance);
 +                break;
 +            case spinning:
 +                diskOptimizationStrategy = new 
SpinningDiskOptimizationStrategy();
 +                break;
 +        }
 +
 +        try
 +        {
 +            ParameterizedClass strategy = conf.back_pressure_strategy != null 
? conf.back_pressure_strategy : RateBasedBackPressure.withDefaultParams();
 +            Class<?> clazz = Class.forName(strategy.class_name);
 +            if (!BackPressureStrategy.class.isAssignableFrom(clazz))
 +                throw new ConfigurationException(strategy + " is not an 
instance of " + BackPressureStrategy.class.getCanonicalName(), false);
 +
 +            Constructor<?> ctor = clazz.getConstructor(Map.class);
 +            BackPressureStrategy instance = (BackPressureStrategy) 
ctor.newInstance(strategy.parameters);
 +            logger.info("Back-pressure is {} with strategy {}.", 
backPressureEnabled() ? "enabled" : "disabled", conf.back_pressure_strategy);
 +            backPressureStrategy = instance;
 +        }
 +        catch (ConfigurationException ex)
 +        {
 +            throw ex;
 +        }
 +        catch (Exception ex)
 +        {
 +            throw new ConfigurationException("Error configuring back-pressure 
strategy: " + conf.back_pressure_strategy, ex);
 +        }
 +
 +        if (conf.otc_coalescing_enough_coalesced_messages > 128)
 +            throw new 
ConfigurationException("otc_coalescing_enough_coalesced_messages must be 
smaller than 128", false);
 +
 +        if (conf.otc_coalescing_enough_coalesced_messages <= 0)
 +            throw new 
ConfigurationException("otc_coalescing_enough_coalesced_messages must be 
positive", false);
 +    }
 +
 +    private static String storagedirFor(String type)
 +    {
 +        return storagedir(type + "_directory") + File.separator + type;
 +    }
 +
 +    private static String storagedir(String errMsgType)
 +    {
 +        String storagedir = System.getProperty(Config.PROPERTY_PREFIX + 
"storagedir", null);
 +        if (storagedir == null)
 +            throw new ConfigurationException(errMsgType + " is missing and 
-Dcassandra.storagedir is not set", false);
 +        return storagedir;
 +    }
 +
 +    public static void applyAddressConfig() throws ConfigurationException
 +    {
 +        applyAddressConfig(conf);
 +    }
 +
 +    public static void applyAddressConfig(Config config) throws 
ConfigurationException
 +    {
 +        listenAddress = null;
 +        rpcAddress = null;
 +        broadcastAddress = null;
 +        broadcastRpcAddress = null;
 +
 +        /* Local IP, hostname or interface to bind services to */
 +        if (config.listen_address != null && config.listen_interface != null)
 +        {
 +            throw new ConfigurationException("Set listen_address OR 
listen_interface, not both", false);
 +        }
 +        else if (config.listen_address != null)
 +        {
 +            try
 +            {
 +                listenAddress = InetAddress.getByName(config.listen_address);
 +            }
 +            catch (UnknownHostException e)
 +            {
-                 throw new ConfigurationException("Unknown listen_address '" + 
config.listen_address + "'", false);
++                throw new ConfigurationException("Unknown listen_address '" + 
config.listen_address + '\'', false);
 +            }
 +
 +            if (listenAddress.isAnyLocalAddress())
 +                throw new ConfigurationException("listen_address cannot be a 
wildcard address (" + config.listen_address + ")!", false);
 +        }
 +        else if (config.listen_interface != null)
 +        {
 +            listenAddress = 
getNetworkInterfaceAddress(config.listen_interface, "listen_interface", 
config.listen_interface_prefer_ipv6);
 +        }
 +
 +        /* Gossip Address to broadcast */
 +        if (config.broadcast_address != null)
 +        {
 +            try
 +            {
 +                broadcastAddress = 
InetAddress.getByName(config.broadcast_address);
 +            }
 +            catch (UnknownHostException e)
 +            {
-                 throw new ConfigurationException("Unknown broadcast_address 
'" + config.broadcast_address + "'", false);
++                throw new ConfigurationException("Unknown broadcast_address 
'" + config.broadcast_address + '\'', false);
 +            }
 +
 +            if (broadcastAddress.isAnyLocalAddress())
 +                throw new ConfigurationException("broadcast_address cannot be 
a wildcard address (" + config.broadcast_address + ")!", false);
 +        }
 +
 +        /* Local IP, hostname or interface to bind RPC server to */
 +        if (config.rpc_address != null && config.rpc_interface != null)
 +        {
 +            throw new ConfigurationException("Set rpc_address OR 
rpc_interface, not both", false);
 +        }
 +        else if (config.rpc_address != null)
 +        {
 +            try
 +            {
 +                rpcAddress = InetAddress.getByName(config.rpc_address);
 +            }
 +            catch (UnknownHostException e)
 +            {
 +                throw new ConfigurationException("Unknown host in rpc_address 
" + config.rpc_address, false);
 +            }
 +        }
 +        else if (config.rpc_interface != null)
 +        {
 +            rpcAddress = getNetworkInterfaceAddress(config.rpc_interface, 
"rpc_interface", config.rpc_interface_prefer_ipv6);
 +        }
 +        else
 +        {
 +            rpcAddress = FBUtilities.getLocalAddress();
 +        }
 +
 +        /* RPC address to broadcast */
 +        if (config.broadcast_rpc_address != null)
 +        {
 +            try
 +            {
 +                broadcastRpcAddress = 
InetAddress.getByName(config.broadcast_rpc_address);
 +            }
 +            catch (UnknownHostException e)
 +            {
-                 throw new ConfigurationException("Unknown 
broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false);
++                throw new ConfigurationException("Unknown 
broadcast_rpc_address '" + config.broadcast_rpc_address + '\'', false);
 +            }
 +
 +            if (broadcastRpcAddress.isAnyLocalAddress())
 +                throw new ConfigurationException("broadcast_rpc_address 
cannot be a wildcard address (" + config.broadcast_rpc_address + ")!", false);
 +        }
 +        else
 +        {
 +            if (rpcAddress.isAnyLocalAddress())
 +                throw new ConfigurationException("If rpc_address is set to a 
wildcard address (" + config.rpc_address + "), then " +
 +                                                 "you must set 
broadcast_rpc_address to a value other than " + config.rpc_address, false);
 +        }
 +    }
 +
 +    public static void applyThriftHSHA()
 +    {
 +        // fail early instead of OOMing (see CASSANDRA-8116)
 +        if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && 
conf.rpc_max_threads == Integer.MAX_VALUE)
 +            throw new ConfigurationException("The hsha rpc_server_type is not 
compatible with an rpc_max_threads " +
 +                                             "setting of 'unlimited'.  Please 
see the comments in cassandra.yaml " +
 +                                             "for rpc_server_type and 
rpc_max_threads.",
 +                                             false);
 +        if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && 
conf.rpc_max_threads > (FBUtilities.getAvailableProcessors() * 2 + 1024))
 +            logger.warn("rpc_max_threads setting of {} may be too high for 
the hsha server and cause unnecessary thread contention, reducing performance", 
conf.rpc_max_threads);
 +    }
 +
 +    public static void applyEncryptionContext()
 +    {
 +        // always attempt to load the cipher factory, as we could be in the 
situation where the user has disabled encryption,
 +        // but has existing commitlogs and sstables on disk that are still 
encrypted (and still need to be read)
 +        encryptionContext = new 
EncryptionContext(conf.transparent_data_encryption_options);
 +    }
 +
 +    public static void applySeedProvider()
 +    {
 +        // load the seeds for node contact points
 +        if (conf.seed_provider == null)
 +        {
 +            throw new ConfigurationException("seeds configuration is missing; 
a minimum of one seed is required.", false);
 +        }
 +        try
 +        {
 +            Class<?> seedProviderClass = 
Class.forName(conf.seed_provider.class_name);
 +            seedProvider = 
(SeedProvider)seedProviderClass.getConstructor(Map.class).newInstance(conf.seed_provider.parameters);
 +        }
 +        // there are about 5 checked exceptions that could be thrown here.
 +        catch (Exception e)
 +        {
 +            throw new ConfigurationException(e.getMessage() + "\nFatal 
configuration error; unable to start server.  See log for stacktrace.", true);
 +        }
 +        if (seedProvider.getSeeds().size() == 0)
 +            throw new ConfigurationException("The seed provider lists no 
seeds.", false);
 +    }
 +
 +    public static void applyTokensConfig()
 +    {
 +        applyTokensConfig(conf);
 +    }
 +
 +    static void applyTokensConfig(Config conf)
 +    {
 +        if (conf.initial_token != null)
 +        {
 +            Collection<String> tokens = tokensFromString(conf.initial_token);
 +            if (conf.num_tokens == null)
 +            {
 +                if (tokens.size() == 1)
 +                    conf.num_tokens = 1;
 +                else
 +                    throw new ConfigurationException("initial_token was set 
but num_tokens is not!", false);
 +            }
 +
 +            if (tokens.size() != conf.num_tokens)
 +            {
 +                throw new ConfigurationException(String.format("The number of 
initial tokens (by initial_token) specified (%s) is different from num_tokens 
value (%s)",
 +                                                               tokens.size(),
 +                                                               
conf.num_tokens),
 +                                                 false);
 +            }
 +
 +            for (String token : tokens)
 +                partitioner.getTokenFactory().validate(token);
 +        }
 +        else if (conf.num_tokens == null)
 +        {
 +            conf.num_tokens = 1;
 +        }
 +    }
 +
 +    // Maybe safe for clients + tools
 +    public static void applyRequestScheduler()
 +    {
 +        /* Request Scheduler setup */
 +        requestSchedulerOptions = conf.request_scheduler_options;
 +        if (conf.request_scheduler != null)
 +        {
 +            try
 +            {
 +                if (requestSchedulerOptions == null)
 +                {
 +                    requestSchedulerOptions = new RequestSchedulerOptions();
 +                }
 +                Class<?> cls = Class.forName(conf.request_scheduler);
 +                requestScheduler = (IRequestScheduler) 
cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
 +            }
 +            catch (ClassNotFoundException e)
 +            {
 +                throw new ConfigurationException("Invalid Request Scheduler 
class " + conf.request_scheduler, false);
 +            }
 +            catch (Exception e)
 +            {
 +                throw new ConfigurationException("Unable to instantiate 
request scheduler", e);
 +            }
 +        }
 +        else
 +        {
 +            requestScheduler = new NoScheduler();
 +        }
 +
 +        if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
 +        {
 +            requestSchedulerId = conf.request_scheduler_id;
 +        }
 +        else
 +        {
 +            // Default to Keyspace
 +            requestSchedulerId = RequestSchedulerId.keyspace;
 +        }
 +    }
 +
 +    // definitely not safe for tools + clients - implicitly instantiates 
StorageService
 +    public static void applySnitch()
 +    {
 +        /* end point snitch */
 +        if (conf.endpoint_snitch == null)
 +        {
 +            throw new ConfigurationException("Missing endpoint_snitch 
directive", false);
 +        }
 +        snitch = createEndpointSnitch(conf.dynamic_snitch, 
conf.endpoint_snitch);
 +        EndpointSnitchInfo.create();
 +
 +        localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
-         localComparator = new Comparator<InetAddress>()
-         {
-             public int compare(InetAddress endpoint1, InetAddress endpoint2)
-             {
-                 boolean local1 = 
localDC.equals(snitch.getDatacenter(endpoint1));
-                 boolean local2 = 
localDC.equals(snitch.getDatacenter(endpoint2));
-                 if (local1 && !local2)
-                     return -1;
-                 if (local2 && !local1)
-                     return 1;
-                 return 0;
-             }
++        localComparator = (endpoint1, endpoint2) -> {
++            boolean local1 = localDC.equals(snitch.getDatacenter(endpoint1));
++            boolean local2 = localDC.equals(snitch.getDatacenter(endpoint2));
++            if (local1 && !local2)
++                return -1;
++            if (local2 && !local1)
++                return 1;
++            return 0;
 +        };
 +    }
 +
 +    // definitely not safe for tools + clients - implicitly instantiates 
schema
 +    public static void applyPartitioner()
 +    {
 +        /* Hashing strategy */
 +        if (conf.partitioner == null)
          {
 -            throw new ConfigurationException("seeds configuration is missing; 
a minimum of one seed is required.", false);
 +            throw new ConfigurationException("Missing directive: 
partitioner", false);
          }
          try
          {
@@@ -1395,10 -1057,10 +1391,10 @@@
  
      public static Collection<String> tokensFromString(String tokenString)
      {
-         List<String> tokens = new ArrayList<String>();
+         List<String> tokens = new ArrayList<>();
          if (tokenString != null)
 -            for (String token : tokenString.split(","))
 -                tokens.add(token.replaceAll("^\\s+", "").replaceAll("\\s+$", 
""));
 +            for (String token : StringUtils.split(tokenString, ','))
 +                tokens.add(token.trim());
          return tokens;
      }
  
@@@ -2073,10 -1740,10 +2069,10 @@@
          return new File(conf.hints_directory);
      }
  
 -    public static File getSerializedCachePath(CacheService.CacheType 
cacheType, String version, String extension)
 +    public static File getSerializedCachePath(CacheType cacheType, String 
version, String extension)
      {
          String name = cacheType.toString()
-                 + (version == null ? "" : "-" + version + "." + extension);
+                 + (version == null ? "" : '-' + version + '.' + extension);
          return new File(conf.saved_caches_directory, name);
      }
  
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index abdc3f8,61d60b1..83241e5
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -128,49 -127,27 +129,49 @@@ public class ColumnFamilyStore implemen
  
      private static final Logger logger = 
LoggerFactory.getLogger(ColumnFamilyStore.class);
  
 +    /*
 +    We keep a pool of threads for each data directory, size of each pool is 
memtable_flush_writers.
 +    When flushing we start a Flush runnable in the flushExecutor. Flush 
calculates how to split the
 +    memtable ranges over the existing data directories and creates a 
FlushRunnable for each of the directories.
 +    The FlushRunnables are executed in the perDiskflushExecutors and the 
Flush will block until all FlushRunnables
 +    are finished. By having flushExecutor size the same size as each of the 
perDiskflushExecutors we make sure we can
 +    have that many flushes going at the same time.
 +    */
-     private static final ExecutorService flushExecutor = new 
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
-                                                                               
            StageManager.KEEPALIVE,
-                                                                               
            TimeUnit.SECONDS,
-                                                                               
            new LinkedBlockingQueue<Runnable>(),
-                                                                               
            new NamedThreadFactory("MemtableFlushWriter"),
-                                                                               
            "internal");
+     private static final ThreadPoolExecutor flushExecutor = new 
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
+                                                                               
               StageManager.KEEPALIVE,
+                                                                               
               TimeUnit.SECONDS,
+                                                                               
               new LinkedBlockingQueue<>(),
+                                                                               
               new NamedThreadFactory("MemtableFlushWriter"),
+                                                                               
               "internal");
  
 +    private static final ExecutorService [] perDiskflushExecutors = new 
ExecutorService[DatabaseDescriptor.getAllDataFileLocations().length];
 +    static
 +    {
 +        for (int i = 0; i < 
DatabaseDescriptor.getAllDataFileLocations().length; i++)
 +        {
 +            perDiskflushExecutors[i] = new 
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
 +                                                                        
StageManager.KEEPALIVE,
 +                                                                        
TimeUnit.SECONDS,
 +                                                                        new 
LinkedBlockingQueue<Runnable>(),
 +                                                                        new 
NamedThreadFactory("PerDiskMemtableFlushWriter_"+i),
 +                                                                        
"internal");
 +        }
 +    }
 +
      // post-flush executor is single threaded to provide guarantee that any 
flush Future on a CF will never return until prior flushes have completed
-     private static final ExecutorService postFlushExecutor = new 
JMXEnabledThreadPoolExecutor(1,
-                                                                               
                StageManager.KEEPALIVE,
-                                                                               
                TimeUnit.SECONDS,
-                                                                               
                new LinkedBlockingQueue<Runnable>(),
-                                                                               
                new NamedThreadFactory("MemtablePostFlush"),
-                                                                               
                "internal");
- 
-     private static final ExecutorService reclaimExecutor = new 
JMXEnabledThreadPoolExecutor(1,
-                                                                               
              StageManager.KEEPALIVE,
-                                                                               
              TimeUnit.SECONDS,
-                                                                               
              new LinkedBlockingQueue<Runnable>(),
-                                                                               
              new NamedThreadFactory("MemtableReclaimMemory"),
-                                                                               
              "internal");
+     private static final ThreadPoolExecutor postFlushExecutor = new 
JMXEnabledThreadPoolExecutor(1,
+                                                                               
                   StageManager.KEEPALIVE,
+                                                                               
                   TimeUnit.SECONDS,
+                                                                               
                   new LinkedBlockingQueue<>(),
+                                                                               
                   new NamedThreadFactory("MemtablePostFlush"),
+                                                                               
                   "internal");
+ 
+     private static final ThreadPoolExecutor reclaimExecutor = new 
JMXEnabledThreadPoolExecutor(1,
+                                                                               
                 StageManager.KEEPALIVE,
+                                                                               
                 TimeUnit.SECONDS,
+                                                                               
                 new LinkedBlockingQueue<>(),
+                                                                               
                 new NamedThreadFactory("MemtableReclaimMemory"),
+                                                                               
                 "internal");
  
      private static final String[] COUNTER_NAMES = new String[]{"raw", 
"count", "error", "string"};
      private static final String[] COUNTER_DESCS = new String[]
@@@ -397,18 -355,28 +394,18 @@@
          }
      }
  
 -    private ColumnFamilyStore(Keyspace keyspace,
 -                              String columnFamilyName,
 -                              int generation,
 -                              CFMetaData metadata,
 -                              Directories directories,
 -                              boolean loadSSTables)
 -    {
 -        this(keyspace, columnFamilyName, generation, metadata, directories, 
loadSSTables, true);
 -    }
 -
 -
      @VisibleForTesting
      public ColumnFamilyStore(Keyspace keyspace,
-                               String columnFamilyName,
-                               int generation,
-                               CFMetaData metadata,
-                               Directories directories,
-                               boolean loadSSTables,
-                               boolean registerBookeeping,
-                               boolean offline)
+                              String columnFamilyName,
+                              int generation,
+                              CFMetaData metadata,
+                              Directories directories,
+                              boolean loadSSTables,
 -                             boolean registerBookkeeping)
++                             boolean registerBookeeping,
++                             boolean offline)
      {
          assert directories != null;
-         assert metadata != null : "null metadata for " + keyspace + ":" + 
columnFamilyName;
+         assert metadata != null : "null metadata for " + keyspace + ':' + 
columnFamilyName;
  
          this.keyspace = keyspace;
          this.metadata = metadata;
@@@ -612,22 -564,10 +605,22 @@@
                                                                           
CFMetaData metadata,
                                                                           
boolean loadSSTables)
      {
 -        // get the max generation number, to prevent generation conflicts
          Directories directories = new Directories(metadata, 
initialDirectories);
 +        return createColumnFamilyStore(keyspace, columnFamily, metadata, 
directories, loadSSTables, true, false);
 +    }
 +
 +    /** This is only directly used by offline tools */
 +    public static synchronized ColumnFamilyStore 
createColumnFamilyStore(Keyspace keyspace,
 +                                                                         
String columnFamily,
 +                                                                         
CFMetaData metadata,
 +                                                                         
Directories directories,
 +                                                                         
boolean loadSSTables,
 +                                                                         
boolean registerBookkeeping,
 +                                                                         
boolean offline)
 +    {
 +        // get the max generation number, to prevent generation conflicts
          Directories.SSTableLister lister = 
directories.sstableLister(Directories.OnTxnErr.IGNORE).includeBackups(true);
-         List<Integer> generations = new ArrayList<Integer>();
+         List<Integer> generations = new ArrayList<>();
          for (Map.Entry<Descriptor, Set<Component>> entry : 
lister.list().entrySet())
          {
              Descriptor desc = entry.getKey();
@@@ -1078,10 -1011,10 +1074,10 @@@
               * In doing so it also tells the write operations to update the 
commitLogUpperBound of the memtable, so
               * that we know the CL position we are dirty to, which can be 
marked clean when we complete.
               */
-             writeBarrier = keyspace.writeOrder.newBarrier();
+             writeBarrier = Keyspace.writeOrder.newBarrier();
  
              // submit flushes for the memtable for any indexed sub-cfses, and 
our own
 -            AtomicReference<ReplayPosition> commitLogUpperBound = new 
AtomicReference<>();
 +            AtomicReference<CommitLogPosition> commitLogUpperBound = new 
AtomicReference<>();
              for (ColumnFamilyStore cfs : concatWithIndexes())
              {
                  // switch all memtables, regardless of their dirty status, 
setting the barrier
@@@ -1120,125 -1063,42 +1124,135 @@@
  
              try
              {
 -                boolean flushNonCf2i = true;
 -                for (Memtable memtable : memtables)
 +                // Flush "data" memtable with non-cf 2i first;
 +                flushMemtable(memtables.get(0), true);
 +                for (int i = 1; i < memtables.size(); i++)
 +                    flushMemtable(memtables.get(i), false);
 +            }
 +            catch (Throwable t)
 +            {
 +                JVMStabilityInspector.inspectThrowable(t);
 +                postFlush.flushFailure = t;
 +            }
++
++            if (logger.isTraceEnabled())
++                logger.trace("Flush task {}@{} signaling post flush task", 
hashCode(), name);
++
 +            // signal the post-flush we've done our work
 +            postFlush.latch.countDown();
++
++            if (logger.isTraceEnabled())
++                logger.trace("Flush task task {}@{} finished", hashCode(), 
name);
 +        }
 +
 +        public Collection<SSTableReader> flushMemtable(Memtable memtable, 
boolean flushNonCf2i)
 +        {
++            if (logger.isTraceEnabled())
++                logger.trace("Flush task task {}@{} flushing memtable {}", 
hashCode(), name, memtable);
++
 +            if (memtable.isClean() || truncate)
 +            {
 +                memtable.cfs.replaceFlushed(memtable, 
Collections.emptyList());
 +                reclaim(memtable);
 +                return Collections.emptyList();
 +            }
 +
 +            List<Future<SSTableMultiWriter>> futures = new ArrayList<>();
 +            long totalBytesOnDisk = 0;
 +            long maxBytesOnDisk = 0;
 +            long minBytesOnDisk = Long.MAX_VALUE;
 +            List<SSTableReader> sstables = new ArrayList<>();
 +            try (LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.FLUSH))
 +            {
 +                List<Memtable.FlushRunnable> flushRunnables = null;
 +                List<SSTableMultiWriter> flushResults = null;
 +
 +                try
                  {
 -                    Collection<SSTableReader> readers = 
Collections.emptyList();
 -                    if (!memtable.isClean() && !truncate)
 +                    // flush the memtable
 +                    flushRunnables = memtable.flushRunnables(txn);
 +
 +                    for (int i = 0; i < flushRunnables.size(); i++)
 +                        
futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i)));
 +
 +                    /**
 +                     * we can flush 2is as soon as the barrier completes, as 
they will be consistent with (or ahead of) the
 +                     * flushed memtables and CL position, which is as good as 
we can guarantee.
 +                     * TODO: SecondaryIndex should support setBarrier(), so 
custom implementations can co-ordinate exactly
 +                     * with CL as we do with memtables/CFS-backed 
SecondaryIndexes.
 +                     */
 +                    if (flushNonCf2i)
 +                        indexManager.flushAllNonCFSBackedIndexesBlocking();
 +
 +                    flushResults = 
Lists.newArrayList(FBUtilities.waitOnFutures(futures));
 +                }
 +                catch (Throwable t)
 +                {
 +                    t = memtable.abortRunnables(flushRunnables, t);
 +                    t = txn.abort(t);
 +                    throw Throwables.propagate(t);
 +                }
 +
 +                try
 +                {
 +                    Iterator<SSTableMultiWriter> writerIterator = 
flushResults.iterator();
 +                    while (writerIterator.hasNext())
                      {
 -                        // TODO: SecondaryIndex should support setBarrier(), 
so custom implementations can co-ordinate exactly
 -                        // with CL as we do with memtables/CFS-backed 
SecondaryIndexes.
 -                        if (flushNonCf2i)
 +                        @SuppressWarnings("resource")
 +                        SSTableMultiWriter writer = writerIterator.next();
 +                        if (writer.getFilePointer() > 0)
                          {
 -                            
indexManager.flushAllNonCFSBackedIndexesBlocking();
 -                            flushNonCf2i = false;
 +                            writer.setOpenResult(true).prepareToCommit();
 +                        }
 +                        else
 +                        {
 +                            maybeFail(writer.abort(null));
 +                            writerIterator.remove();
                          }
 -                        readers = memtable.flush();
                      }
 -                    memtable.cfs.replaceFlushed(memtable, readers);
 -                    reclaim(memtable);
                  }
 -            }
 -            catch (Throwable e)
 -            {
 -                JVMStabilityInspector.inspectThrowable(e);
 -                // If we weren't killed, try to continue work but do not 
allow CommitLog to be discarded.
 -                postFlush.flushFailure = e;
 -            }
 -            finally
 -            {
 -                if (logger.isTraceEnabled())
 -                    logger.trace("Flush task {}@{} signaling post flush 
task", hashCode(), name);
 +                catch (Throwable t)
 +                {
 +                    for (SSTableMultiWriter writer : flushResults)
 +                        t = writer.abort(t);
 +                    t = txn.abort(t);
 +                    Throwables.propagate(t);
 +                }
 +
 +                txn.prepareToCommit();
  
 -                // signal the post-flush we've done our work
 -                postFlush.latch.countDown();
 +                Throwable accumulate = null;
 +                for (SSTableMultiWriter writer : flushResults)
 +                    accumulate = writer.commit(accumulate);
  
 -                if (logger.isTraceEnabled())
 -                    logger.trace("Flush task task {}@{} finished", 
hashCode(), name);
 +                maybeFail(txn.commit(accumulate));
 +
 +                for (SSTableMultiWriter writer : flushResults)
 +                {
 +                    Collection<SSTableReader> flushedSSTables = 
writer.finished();
 +                    for (SSTableReader sstable : flushedSSTables)
 +                    {
 +                        if (sstable != null)
 +                        {
 +                            sstables.add(sstable);
 +                            long size = sstable.bytesOnDisk();
 +                            totalBytesOnDisk += size;
 +                            maxBytesOnDisk = Math.max(maxBytesOnDisk, size);
 +                            minBytesOnDisk = Math.min(minBytesOnDisk, size);
 +                        }
 +                    }
 +                }
              }
 +            memtable.cfs.replaceFlushed(memtable, sstables);
 +            reclaim(memtable);
 +            
memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables);
 +            logger.debug("Flushed to {} ({} sstables, {}), biggest {}, 
smallest {}",
 +                         sstables,
 +                         sstables.size(),
 +                         FBUtilities.prettyPrintMemory(totalBytesOnDisk),
 +                         FBUtilities.prettyPrintMemory(maxBytesOnDisk),
 +                         FBUtilities.prettyPrintMemory(minBytesOnDisk));
 +            return sstables;
          }
  
          private void reclaim(final Memtable memtable)
@@@ -1278,58 -1138,77 +1292,77 @@@
       * Finds the largest memtable, as a percentage of *either* on- or 
off-heap memory limits, and immediately
       * queues it for flushing. If the memtable selected is flushed before 
this completes, no work is done.
       */
-     public static class FlushLargestColumnFamily implements Runnable
+     public static CompletableFuture<Boolean> flushLargestMemtable()
      {
-         public void run()
+         float largestRatio = 0f;
+         Memtable largest = null;
+         float liveOnHeap = 0, liveOffHeap = 0;
+         for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
          {
-             float largestRatio = 0f;
-             Memtable largest = null;
-             float liveOnHeap = 0, liveOffHeap = 0;
-             for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
-             {
-                 // we take a reference to the current main memtable for the 
CF prior to snapping its ownership ratios
-                 // to ensure we have some ordering guarantee for performing 
the switchMemtableIf(), i.e. we will only
-                 // swap if the memtables we are measuring here haven't 
already been swapped by the time we try to swap them
-                 Memtable current = 
cfs.getTracker().getView().getCurrentMemtable();
- 
-                 // find the total ownership ratio for the memtable and all 
SecondaryIndexes owned by this CF,
-                 // both on- and off-heap, and select the largest of the two 
ratios to weight this CF
-                 float onHeap = 0f, offHeap = 0f;
-                 onHeap += current.getAllocator().onHeap().ownershipRatio();
-                 offHeap += current.getAllocator().offHeap().ownershipRatio();
- 
-                 for (ColumnFamilyStore indexCfs : 
cfs.indexManager.getAllIndexColumnFamilyStores())
-                 {
-                     MemtableAllocator allocator = 
indexCfs.getTracker().getView().getCurrentMemtable().getAllocator();
-                     onHeap += allocator.onHeap().ownershipRatio();
-                     offHeap += allocator.offHeap().ownershipRatio();
-                 }
+             // we take a reference to the current main memtable for the CF 
prior to snapping its ownership ratios
+             // to ensure we have some ordering guarantee for performing the 
switchMemtableIf(), i.e. we will only
+             // swap if the memtables we are measuring here haven't already 
been swapped by the time we try to swap them
+             Memtable current = 
cfs.getTracker().getView().getCurrentMemtable();
  
-                 float ratio = Math.max(onHeap, offHeap);
-                 if (ratio > largestRatio)
-                 {
-                     largest = current;
-                     largestRatio = ratio;
-                 }
+             // find the total ownership ratio for the memtable and all 
SecondaryIndexes owned by this CF,
+             // both on- and off-heap, and select the largest of the two 
ratios to weight this CF
+             float onHeap = 0f, offHeap = 0f;
+             onHeap += current.getAllocator().onHeap().ownershipRatio();
+             offHeap += current.getAllocator().offHeap().ownershipRatio();
  
-                 liveOnHeap += onHeap;
-                 liveOffHeap += offHeap;
+             for (ColumnFamilyStore indexCfs : 
cfs.indexManager.getAllIndexColumnFamilyStores())
+             {
+                 MemtableAllocator allocator = 
indexCfs.getTracker().getView().getCurrentMemtable().getAllocator();
+                 onHeap += allocator.onHeap().ownershipRatio();
+                 offHeap += allocator.offHeap().ownershipRatio();
              }
  
-             if (largest != null)
+             float ratio = Math.max(onHeap, offHeap);
+             if (ratio > largestRatio)
              {
-                 float usedOnHeap = Memtable.MEMORY_POOL.onHeap.usedRatio();
-                 float usedOffHeap = Memtable.MEMORY_POOL.offHeap.usedRatio();
-                 float flushingOnHeap = 
Memtable.MEMORY_POOL.onHeap.reclaimingRatio();
-                 float flushingOffHeap = 
Memtable.MEMORY_POOL.offHeap.reclaimingRatio();
-                 float thisOnHeap = 
largest.getAllocator().onHeap().ownershipRatio();
-                 float thisOffHeap = 
largest.getAllocator().offHeap().ownershipRatio();
-                 logger.debug("Flushing largest {} to free up room. Used 
total: {}, live: {}, flushing: {}, this: {}",
-                             largest.cfs, ratio(usedOnHeap, usedOffHeap), 
ratio(liveOnHeap, liveOffHeap),
-                             ratio(flushingOnHeap, flushingOffHeap), 
ratio(thisOnHeap, thisOffHeap));
-                 largest.cfs.switchMemtableIfCurrent(largest);
+                 largest = current;
+                 largestRatio = ratio;
              }
+ 
+             liveOnHeap += onHeap;
+             liveOffHeap += offHeap;
+         }
+ 
+         CompletableFuture<Boolean> returnFuture = new CompletableFuture<>();
+ 
+         if (largest != null)
+         {
+             float usedOnHeap = Memtable.MEMORY_POOL.onHeap.usedRatio();
+             float usedOffHeap = Memtable.MEMORY_POOL.offHeap.usedRatio();
+             float flushingOnHeap = 
Memtable.MEMORY_POOL.onHeap.reclaimingRatio();
+             float flushingOffHeap = 
Memtable.MEMORY_POOL.offHeap.reclaimingRatio();
+             float thisOnHeap = 
largest.getAllocator().onHeap().ownershipRatio();
+             float thisOffHeap = 
largest.getAllocator().offHeap().ownershipRatio();
+             logger.debug("Flushing largest {} to free up room. Used total: 
{}, live: {}, flushing: {}, this: {}",
+                          largest.cfs, ratio(usedOnHeap, usedOffHeap), 
ratio(liveOnHeap, liveOffHeap),
+                          ratio(flushingOnHeap, flushingOffHeap), 
ratio(thisOnHeap, thisOffHeap));
+ 
 -            ListenableFuture<ReplayPosition> flushFuture = 
largest.cfs.switchMemtableIfCurrent(largest);
++            ListenableFuture<CommitLogPosition> flushFuture = 
largest.cfs.switchMemtableIfCurrent(largest);
+             flushFuture.addListener(() -> {
+                 try
+                 {
+                     flushFuture.get();
+                     returnFuture.complete(true);
+                 }
+                 catch (Throwable t)
+                 {
+                     returnFuture.completeExceptionally(t);
+                 }
+             }, MoreExecutors.directExecutor());
+         }
+         else
+         {
+             logger.debug("Flushing of largest memtable, not done, no memtable 
found");
+ 
+             returnFuture.complete(false);
          }
+ 
+         return returnFuture;
      }
  
      private static String ratio(float onHeap, float offHeap)
@@@ -1737,17 -1609,15 +1770,17 @@@
          TabularDataSupport result = new TabularDataSupport(COUNTER_TYPE);
          for (Counter<ByteBuffer> counter : samplerResults.topK)
          {
 -            byte[] key = counter.getItem().array();
 +            //Not duplicating the buffer for safety because 
AbstractSerializer and ByteBufferUtil.bytesToHex
 +            //don't modify position or limit
 +            ByteBuffer key = counter.getItem();
              result.put(new CompositeDataSupport(COUNTER_COMPOSITE_TYPE, 
COUNTER_NAMES, new Object[] {
 -            Hex.bytesToHex(key), // raw
 -            counter.getCount(),  // count
 -            counter.getError(),  // error
 -            metadata.getKeyValidator().getString(ByteBuffer.wrap(key)) })); 
// string
 +                    ByteBufferUtil.bytesToHex(key), // raw
 +                    counter.getCount(),  // count
 +                    counter.getError(),  // error
 +                    metadata.getKeyValidator().getString(key) })); // string
          }
          return new CompositeDataSupport(SAMPLING_RESULT, SAMPLER_NAMES, new 
Object[]{
-                 samplerResults.cardinality, result});
+         samplerResults.cardinality, result});
      }
  
      public boolean isCompactionDiskSpaceCheckEnabled()
@@@ -2106,19 -1961,15 +2139,19 @@@
          forceMajorCompaction(false);
      }
  
-     public void forceMajorCompaction(boolean splitOutput) throws 
InterruptedException, ExecutionException
-     {
 -
+     public void forceMajorCompaction(boolean splitOutput)
 -    {
++   {
          CompactionManager.instance.performMaximal(this, splitOutput);
      }
  
 +    public void forceCompactionForTokenRange(Collection<Range<Token>> 
tokenRanges) throws ExecutionException, InterruptedException
 +    {
 +        CompactionManager.instance.forceCompactionForTokenRange(this, 
tokenRanges);
 +    }
 +
      public static Iterable<ColumnFamilyStore> all()
      {
-         List<Iterable<ColumnFamilyStore>> stores = new 
ArrayList<Iterable<ColumnFamilyStore>>(Schema.instance.getKeyspaces().size());
+         List<Iterable<ColumnFamilyStore>> stores = new 
ArrayList<>(Schema.instance.getKeyspaces().size());
          for (Keyspace keyspace : Keyspace.all())
          {
              stores.add(keyspace.getColumnFamilyStores());
@@@ -2161,13 -2012,9 +2194,9 @@@
      {
          for (final ColumnFamilyStore cfs : concatWithIndexes())
          {
-             cfs.runWithCompactionsDisabled(new Callable<Void>()
-             {
-                 public Void call()
-                 {
-                     cfs.data.reset(new Memtable(new 
AtomicReference<>(CommitLogPosition.NONE), cfs));
-                     return null;
-                 }
+             cfs.runWithCompactionsDisabled((Callable<Void>) () -> {
 -                cfs.data.reset(new Memtable(new 
AtomicReference<>(ReplayPosition.NONE), cfs));
++                cfs.data.reset(new Memtable(new 
AtomicReference<>(CommitLogPosition.NONE), cfs));
+                 return null;
              }, true, false);
          }
      }
@@@ -2220,25 -2067,21 +2249,21 @@@
                  now = Math.max(now, sstable.maxDataAge);
          truncatedAt = now;
  
-         Runnable truncateRunnable = new Runnable()
-         {
-             public void run()
-             {
-                 logger.debug("Discarding sstable data for truncated CF + 
indexes");
-                 data.notifyTruncated(truncatedAt);
+         Runnable truncateRunnable = () -> {
+             logger.debug("Discarding sstable data for truncated CF + 
indexes");
+             data.notifyTruncated(truncatedAt);
  
-                 if (DatabaseDescriptor.isAutoSnapshot())
-                     
snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(name, 
SNAPSHOT_TRUNCATE_PREFIX));
+             if (DatabaseDescriptor.isAutoSnapshot())
 -                snapshot(Keyspace.getTimestampedSnapshotName(name));
++                snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(name, 
SNAPSHOT_TRUNCATE_PREFIX));
  
-                 discardSSTables(truncatedAt);
+             discardSSTables(truncatedAt);
  
-                 indexManager.truncateAllIndexesBlocking(truncatedAt);
-                 viewManager.truncateBlocking(replayAfter, truncatedAt);
+             indexManager.truncateAllIndexesBlocking(truncatedAt);
+             viewManager.truncateBlocking(replayAfter, truncatedAt);
  
-                 SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, 
truncatedAt, replayAfter);
-                 logger.trace("cleaning out row cache");
-                 invalidateCaches();
-             }
+             SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, 
truncatedAt, replayAfter);
+             logger.trace("cleaning out row cache");
+             invalidateCaches();
          };
  
          runWithCompactionsDisabled(Executors.callable(truncateRunnable), 
true, true);
@@@ -2699,16 -2532,6 +2720,16 @@@
  
      public static TableMetrics metricsFor(UUID tableId)
      {
-         return getIfExists(tableId).metric;
+         return Objects.requireNonNull(getIfExists(tableId)).metric;
      }
 +
 +    public DiskBoundaries getDiskBoundaries()
 +    {
 +        return diskBoundaryManager.getDiskBoundaries(this);
 +    }
 +
 +    public void invalidateDiskBoundaries()
 +    {
 +        diskBoundaryManager.invalidate();
 +    }
- }
+ }
diff --cc src/java/org/apache/cassandra/db/Memtable.java
index 5af789e,139663e..ae8b8d3
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@@ -54,41 -54,15 +54,44 @@@ import org.apache.cassandra.utils.ByteB
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.ObjectSizes;
  import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.memory.HeapPool;
  import org.apache.cassandra.utils.memory.MemtableAllocator;
++import org.apache.cassandra.utils.memory.MemtableCleaner;
  import org.apache.cassandra.utils.memory.MemtablePool;
 +import org.apache.cassandra.utils.memory.NativePool;
 +import org.apache.cassandra.utils.memory.SlabPool;
  
  public class Memtable implements Comparable<Memtable>
  {
      private static final Logger logger = 
LoggerFactory.getLogger(Memtable.class);
  
 -    @VisibleForTesting
 -    public static final MemtablePool MEMORY_POOL = 
DatabaseDescriptor.getMemtableAllocatorPool();
 +    public static final MemtablePool MEMORY_POOL = 
createMemtableAllocatorPool();
 +
 +    private static MemtablePool createMemtableAllocatorPool()
 +    {
 +        long heapLimit = DatabaseDescriptor.getMemtableHeapSpaceInMb() << 20;
 +        long offHeapLimit = DatabaseDescriptor.getMemtableOffheapSpaceInMb() 
<< 20;
++        final float cleaningThreshold = 
DatabaseDescriptor.getMemtableCleanupThreshold();
++        final MemtableCleaner cleaner = 
ColumnFamilyStore::flushLargestMemtable;
 +        switch (DatabaseDescriptor.getMemtableAllocationType())
 +        {
 +            case unslabbed_heap_buffers:
-                 return new HeapPool(heapLimit, 
DatabaseDescriptor.getMemtableCleanupThreshold(), new 
ColumnFamilyStore.FlushLargestColumnFamily());
++                return new HeapPool(heapLimit, cleaningThreshold, cleaner);
 +            case heap_buffers:
-                 return new SlabPool(heapLimit, 0, 
DatabaseDescriptor.getMemtableCleanupThreshold(), new 
ColumnFamilyStore.FlushLargestColumnFamily());
++                return new SlabPool(heapLimit, 0, cleaningThreshold, cleaner);
 +            case offheap_buffers:
 +                if (!FileUtils.isCleanerAvailable)
 +                {
 +                    throw new IllegalStateException("Could not free direct 
byte buffer: offheap_buffers is not a safe memtable_allocation_type without 
this ability, please adjust your config. This feature is only guaranteed to 
work on an Oracle JVM. Refusing to start.");
 +                }
-                 return new SlabPool(heapLimit, offHeapLimit, 
DatabaseDescriptor.getMemtableCleanupThreshold(), new 
ColumnFamilyStore.FlushLargestColumnFamily());
++                return new SlabPool(heapLimit, offHeapLimit, 
cleaningThreshold, cleaner);
 +            case offheap_objects:
-                 return new NativePool(heapLimit, offHeapLimit, 
DatabaseDescriptor.getMemtableCleanupThreshold(), new 
ColumnFamilyStore.FlushLargestColumnFamily());
++                return new NativePool(heapLimit, offHeapLimit, 
cleaningThreshold, cleaner);
 +            default:
 +                throw new AssertionError();
 +        }
 +    }
 +
      private static final int ROW_OVERHEAD_HEAP_SIZE = 
estimateRowOverhead(Integer.parseInt(System.getProperty("cassandra.memtable_row_overhead_computation_step",
 "100000")));
  
      private final MemtableAllocator allocator;
@@@ -395,66 -339,33 +398,66 @@@
      @VisibleForTesting
      public void makeUnflushable()
      {
-         liveDataSize.addAndGet(1L * 1024 * 1024 * 1024 * 1024 * 1024);
+         liveDataSize.addAndGet((long) 1024 * 1024 * 1024 * 1024 * 1024);
      }
  
 -    private long estimatedSize()
 +    class FlushRunnable implements Callable<SSTableMultiWriter>
      {
 -        long keySize = 0;
 -        for (PartitionPosition key : partitions.keySet())
 +        private final long estimatedSize;
 +        private final ConcurrentNavigableMap<PartitionPosition, 
AtomicBTreePartition> toFlush;
 +
 +        private final boolean isBatchLogTable;
 +        private final SSTableMultiWriter writer;
 +
 +        // keeping these to be able to log what we are actually flushing
 +        private final PartitionPosition from;
 +        private final PartitionPosition to;
 +
 +        FlushRunnable(PartitionPosition from, PartitionPosition to, 
Directories.DataDirectory flushLocation, LifecycleTransaction txn)
          {
 -            //  make sure we don't write non-sensical keys
 -            assert key instanceof DecoratedKey;
 -            keySize += ((DecoratedKey)key).getKey().remaining();
 +            this(partitions.subMap(from, to), flushLocation, from, to, txn);
          }
 -        return (long) ((keySize // index entries
 -                        + keySize // keys in data file
 -                        + liveDataSize.get()) // data
 -                       * 1.2); // bloom filter and row index overhead
 -    }
  
 -    private Collection<SSTableReader> writeSortedContents(File 
sstableDirectory)
 -    {
 -        boolean isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && 
cfs.keyspace.getName().equals(SystemKeyspace.NAME);
 +        FlushRunnable(LifecycleTransaction txn)
 +        {
 +            this(partitions, null, null, null, txn);
 +        }
 +
 +        FlushRunnable(ConcurrentNavigableMap<PartitionPosition, 
AtomicBTreePartition> toFlush, Directories.DataDirectory flushLocation, 
PartitionPosition from, PartitionPosition to, LifecycleTransaction txn)
 +        {
 +            this.toFlush = toFlush;
 +            this.from = from;
 +            this.to = to;
 +            long keySize = 0;
 +            for (PartitionPosition key : toFlush.keySet())
 +            {
 +                //  make sure we don't write non-sensical keys
 +                assert key instanceof DecoratedKey;
 +                keySize += ((DecoratedKey) key).getKey().remaining();
 +            }
 +            estimatedSize = (long) ((keySize // index entries
 +                                    + keySize // keys in data file
 +                                    + liveDataSize.get()) // data
 +                                    * 1.2); // bloom filter and row index 
overhead
 +
 +            this.isBatchLogTable = cfs.name.equals(SystemKeyspace.BATCHES) && 
cfs.keyspace.getName().equals(SchemaConstants.SYSTEM_KEYSPACE_NAME);
 +
 +            if (flushLocation == null)
 +                writer = createFlushWriter(txn, 
cfs.getSSTablePath(getDirectories().getWriteableLocationAsFile(estimatedSize)), 
columnsCollector.get(), statsCollector.get());
 +            else
 +                writer = createFlushWriter(txn, 
cfs.getSSTablePath(getDirectories().getLocationForDisk(flushLocation)), 
columnsCollector.get(), statsCollector.get());
  
 -        logger.debug("Writing {}", Memtable.this.toString());
 +        }
  
 -        Collection<SSTableReader> ssTables;
 -        try (SSTableTxnWriter writer = 
createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), 
statsCollector.get()))
 +        protected Directories getDirectories()
          {
 +            return cfs.getDirectories();
 +        }
 +
 +        private void writeSortedContents()
 +        {
 +            logger.debug("Writing {}, flushed range = ({}, {}]", 
Memtable.this.toString(), from, to);
 +
              boolean trackContention = logger.isTraceEnabled();
              int heavilyContendedRowCount = 0;
              // (we can't clear out the map as-we-go to free up memory,
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index efe0820,925708e..0543173
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -576,11 -543,9 +576,11 @@@ public abstract class ReadCommand exten
                      logger.warn(msg);
                  }
  
 -                Tracing.trace("Read {} live and {} tombstone cells{}", 
liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : 
""));
 +                Tracing.trace("Read {} live rows and {} tombstone cells{}",
 +                        liveRows, tombstones,
 +                        (warnTombstones ? " (see tombstone_warn_threshold)" : 
""));
              }
-         };
+         }
  
          return Transformation.apply(iter, new MetricRecording());
      }
diff --cc src/java/org/apache/cassandra/utils/memory/MemtablePool.java
index bd17f78,cd434c5..89d5e37
--- a/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
+++ b/src/java/org/apache/cassandra/utils/memory/MemtablePool.java
@@@ -74,9 -82,18 +80,19 @@@ public abstract class MemtablePoo
          ExecutorUtils.shutdownNowAndWait(timeout, unit, cleaner);
      }
  
 +
      public abstract MemtableAllocator newAllocator();
  
+     public boolean needsCleaning()
+     {
+         return onHeap.needsCleaning() || offHeap.needsCleaning();
+     }
+ 
+     public Long getNumPendingtasks()
+     {
+         return numPendingTasks.getValue();
+     }
+ 
      /**
       * Note the difference between acquire() and allocate(); allocate() makes 
more resources available to all owners,
       * and acquire() makes shared resources unavailable but still recorded. 
An Owner must always acquire resources,
diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java
index 4e320ef,ddeb9da..e09c4df
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@@ -475,18 -444,35 +474,11 @@@ public abstract class CQLTeste
              store.disableAutoCompaction();
      }
  
 -    public void flush(boolean forceFlush)
 -    {
 -        if (forceFlush)
 -            flush();
 -    }
 -
 -    @FunctionalInterface
 -    public interface CheckedFunction {
 -        void apply() throws Throwable;
 -    }
 -
 -    /**
 -     * Runs the given function before and after a flush of sstables.  This is 
useful for checking that behavior is
 -     * the same whether data is in memtables or sstables.
 -     * @param runnable
 -     * @throws Throwable
 -     */
 -    public void beforeAndAfterFlush(CheckedFunction runnable) throws Throwable
 -    {
 -        runnable.apply();
 -        flush();
 -        runnable.apply();
 -    }
 -
      public void compact()
      {
-         try
-         {
-             ColumnFamilyStore store = getCurrentColumnFamilyStore();
-             if (store != null)
-                 store.forceMajorCompaction();
-         }
-         catch (InterruptedException | ExecutionException e)
-         {
-             throw new RuntimeException(e);
-         }
+          ColumnFamilyStore store = getCurrentColumnFamilyStore();
+          if (store != null)
+              store.forceMajorCompaction();
      }
  
      public void disableCompaction()
diff --cc test/unit/org/apache/cassandra/db/NativeCellTest.java
index 69e615b,0000000..9b18f65
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/db/NativeCellTest.java
+++ b/test/unit/org/apache/cassandra/db/NativeCellTest.java
@@@ -1,171 -1,0 +1,175 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.db;
 +
- import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Random;
 +import java.util.UUID;
++import java.util.concurrent.CompletableFuture;
 +
 +import org.junit.Assert;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.ColumnDefinition;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.db.marshal.BytesType;
 +import org.apache.cassandra.db.marshal.SetType;
 +import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.db.rows.*;
 +import org.apache.cassandra.utils.concurrent.OpOrder;
 +import org.apache.cassandra.utils.memory.HeapAllocator;
 +import org.apache.cassandra.utils.memory.NativeAllocator;
 +import org.apache.cassandra.utils.memory.NativePool;
 +
 +public class NativeCellTest
 +{
 +
 +    private static final Logger logger = 
LoggerFactory.getLogger(NativeCellTest.class);
-     private static final NativeAllocator nativeAllocator = new 
NativePool(Integer.MAX_VALUE, Integer.MAX_VALUE, 1f, null).newAllocator();
++    private static final NativeAllocator nativeAllocator = new 
NativePool(Integer.MAX_VALUE,
++                                                                          
Integer.MAX_VALUE,
++                                                                          1f,
++                                                                          () 
-> CompletableFuture.completedFuture(true)).newAllocator();
++    @SuppressWarnings("resource")
 +    private static final OpOrder.Group group = new OpOrder().start();
 +    private static Random rand;
 +
 +    @BeforeClass
 +    public static void setUp()
 +    {
 +        long seed = System.currentTimeMillis();
 +        logger.info("Seed : {}", seed);
 +        rand = new Random(seed);
 +    }
 +
 +    @Test
-     public void testCells() throws IOException
++    public void testCells()
 +    {
 +        for (int run = 0 ; run < 1000 ; run++)
 +        {
 +            Row.Builder builder = BTreeRow.unsortedBuilder(1);
 +            builder.newRow(rndclustering());
 +            int count = 1 + rand.nextInt(10);
 +            for (int i = 0 ; i < count ; i++)
 +                rndcd(builder);
 +            test(builder.build());
 +        }
 +    }
 +
 +    private static Clustering rndclustering()
 +    {
 +        int count = 1 + rand.nextInt(100);
 +        ByteBuffer[] values = new ByteBuffer[count];
 +        int size = rand.nextInt(65535);
 +        for (int i = 0 ; i < count ; i++)
 +        {
 +            int twiceShare = 1 + (2 * size) / (count - i);
 +            int nextSize = Math.min(size, rand.nextInt(twiceShare));
 +            if (nextSize < 10 && rand.nextBoolean())
 +                continue;
 +
 +            byte[] bytes = new byte[nextSize];
 +            rand.nextBytes(bytes);
 +            values[i] = ByteBuffer.wrap(bytes);
 +            size -= nextSize;
 +        }
 +        return Clustering.make(values);
 +    }
 +
 +    private static void rndcd(Row.Builder builder)
 +    {
 +        ColumnDefinition col = rndcol();
 +        if (!col.isComplex())
 +        {
 +            builder.addCell(rndcell(col));
 +        }
 +        else
 +        {
 +            int count = 1 + rand.nextInt(100);
 +            for (int i = 0 ; i < count ; i++)
 +                builder.addCell(rndcell(col));
 +        }
 +    }
 +
 +    private static ColumnDefinition rndcol()
 +    {
 +        UUID uuid = new UUID(rand.nextLong(), rand.nextLong());
 +        boolean isComplex = rand.nextBoolean();
 +        return new ColumnDefinition("",
 +                                    "",
 +                                    
ColumnIdentifier.getInterned(uuid.toString(), false),
 +                                    isComplex ? new 
SetType<>(BytesType.instance, true) : BytesType.instance,
 +                                    -1,
 +                                    ColumnDefinition.Kind.REGULAR);
 +    }
 +
 +    private static Cell rndcell(ColumnDefinition col)
 +    {
 +        long timestamp = rand.nextLong();
 +        int ttl = rand.nextInt();
 +        int localDeletionTime = rand.nextInt();
 +        byte[] value = new byte[rand.nextInt(sanesize(expdecay()))];
 +        rand.nextBytes(value);
 +        CellPath path = null;
 +        if (col.isComplex())
 +        {
 +            byte[] pathbytes = new byte[rand.nextInt(sanesize(expdecay()))];
 +            rand.nextBytes(value);
 +            path = CellPath.create(ByteBuffer.wrap(pathbytes));
 +        }
 +
 +        return new BufferCell(col, timestamp, ttl, localDeletionTime, 
ByteBuffer.wrap(value), path);
 +    }
 +
 +    private static int expdecay()
 +    {
 +        return 1 << 
Integer.numberOfTrailingZeros(Integer.lowestOneBit(rand.nextInt()));
 +    }
 +
 +    private static int sanesize(int randomsize)
 +    {
 +        return Math.min(Math.max(1, randomsize), 1 << 26);
 +    }
 +
 +    private static void test(Row row)
 +    {
 +        Row nrow = clone(row, nativeAllocator.rowBuilder(group));
 +        Row brow = clone(row, 
HeapAllocator.instance.cloningBTreeRowBuilder());
 +        Assert.assertEquals(row, nrow);
 +        Assert.assertEquals(row, brow);
 +        Assert.assertEquals(nrow, brow);
 +
 +        Assert.assertEquals(row.clustering(), nrow.clustering());
 +        Assert.assertEquals(row.clustering(), brow.clustering());
 +        Assert.assertEquals(nrow.clustering(), brow.clustering());
 +
 +        ClusteringComparator comparator = new 
ClusteringComparator(UTF8Type.instance);
-         Assert.assertTrue(comparator.compare(row.clustering(), 
nrow.clustering()) == 0);
-         Assert.assertTrue(comparator.compare(row.clustering(), 
brow.clustering()) == 0);
-         Assert.assertTrue(comparator.compare(nrow.clustering(), 
brow.clustering()) == 0);
++        Assert.assertEquals(0, comparator.compare(row.clustering(), 
nrow.clustering()));
++        Assert.assertEquals(0, comparator.compare(row.clustering(), 
brow.clustering()));
++        Assert.assertEquals(0, comparator.compare(nrow.clustering(), 
brow.clustering()));
 +    }
 +
 +    private static Row clone(Row row, Row.Builder builder)
 +    {
 +        return Rows.copy(row, builder).build();
 +    }
 +
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to