This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit f5fe483d0d8f967f3c52a60178df3ea7d43443be
Merge: ecdbef3 5a03898
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Mon Jul 15 14:41:22 2019 +0100

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 doc/native_protocol_v4.spec                        |   4 +
 src/java/org/apache/cassandra/config/Config.java   |   3 +
 .../cassandra/config/DatabaseDescriptor.java       |  30 +++
 .../org/apache/cassandra/metrics/AuthMetrics.java  |   4 +-
 .../apache/cassandra/metrics/ClientMetrics.java    |  52 ++++-
 .../org/apache/cassandra/net/ResourceLimits.java   | 245 ++++++++++++++++++++
 .../cassandra/service/NativeTransportService.java  |  24 +-
 .../org/apache/cassandra/transport/Connection.java |  11 +
 src/java/org/apache/cassandra/transport/Frame.java |   8 +-
 .../org/apache/cassandra/transport/Message.java    | 146 +++++++++++-
 .../transport/RequestThreadPoolExecutor.java       |  96 --------
 .../org/apache/cassandra/transport/Server.java     |  64 ++++--
 .../apache/cassandra/transport/SimpleClient.java   |  10 +
 .../transport/messages/StartupMessage.java         |   3 +
 test/unit/org/apache/cassandra/cql3/CQLTester.java |   2 +
 .../service/NativeTransportServiceTest.java        |   3 +-
 .../InflightRequestPayloadTrackerTest.java         | 248 +++++++++++++++++++++
 18 files changed, 805 insertions(+), 149 deletions(-)

diff --cc CHANGES.txt
index 406246e,68d309c..7f5d52a
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,8 -1,5 +1,9 @@@
 -3.0.19
 +3.11.5
 + * Fix cassandra-env.sh to use $CASSANDRA_CONF to find cassandra-jaas.config 
(CASSANDRA-14305)
 + * Fixed nodetool cfstats printing index name twice (CASSANDRA-14903)
 + * Add flag to disable SASI indexes, and warnings on creation 
(CASSANDRA-14866)
 +Merged from 3.0:
+  * Prevent client requests from blocking on executor task queue 
(CASSANDRA-15013)
   * Toughen up column drop/recreate type validations (CASSANDRA-15204)
   * LegacyLayout should handle paging states that cross a collection column 
(CASSANDRA-15201)
   * Prevent RuntimeException when username or password is empty/null 
(CASSANDRA-15198)
diff --cc src/java/org/apache/cassandra/config/Config.java
index 1976b95,830d3e1..1d79e2a
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -137,33 -130,36 +137,36 @@@ public class Confi
  
      /* intentionally left set to true, despite being set to false in stock 
2.2 cassandra.yaml
         we don't want to surprise Thrift users who have the setting blank in 
the yaml during 2.1->2.2 upgrade */
 -    public Boolean start_rpc = true;
 +    public boolean start_rpc = true;
      public String rpc_address;
      public String rpc_interface;
 -    public Boolean rpc_interface_prefer_ipv6 = false;
 +    public boolean rpc_interface_prefer_ipv6 = false;
      public String broadcast_rpc_address;
 -    public Integer rpc_port = 9160;
 -    public Integer rpc_listen_backlog = 50;
 +    public int rpc_port = 9160;
 +    public int rpc_listen_backlog = 50;
      public String rpc_server_type = "sync";
 -    public Boolean rpc_keepalive = true;
 -    public Integer rpc_min_threads = 16;
 -    public Integer rpc_max_threads = Integer.MAX_VALUE;
 +    public boolean rpc_keepalive = true;
 +    public int rpc_min_threads = 16;
 +    public int rpc_max_threads = Integer.MAX_VALUE;
      public Integer rpc_send_buff_size_in_bytes;
      public Integer rpc_recv_buff_size_in_bytes;
 -    public Integer internode_send_buff_size_in_bytes;
 -    public Integer internode_recv_buff_size_in_bytes;
 +    public int internode_send_buff_size_in_bytes = 0;
 +    public int internode_recv_buff_size_in_bytes = 0;
  
 -    public Boolean start_native_transport = false;
 -    public Integer native_transport_port = 9042;
 +    public boolean start_native_transport = false;
 +    public int native_transport_port = 9042;
      public Integer native_transport_port_ssl = null;
 -    public Integer native_transport_max_threads = 128;
 -    public Integer native_transport_max_frame_size_in_mb = 256;
 -    public volatile Long native_transport_max_concurrent_connections = -1L;
 -    public volatile Long native_transport_max_concurrent_connections_per_ip = 
-1L;
 +    public int native_transport_max_threads = 128;
 +    public int native_transport_max_frame_size_in_mb = 256;
 +    public volatile long native_transport_max_concurrent_connections = -1L;
 +    public volatile long native_transport_max_concurrent_connections_per_ip = 
-1L;
      public boolean native_transport_flush_in_batches_legacy = true;
+     public volatile long 
native_transport_max_concurrent_requests_in_bytes_per_ip = -1L;
+     public volatile long native_transport_max_concurrent_requests_in_bytes = 
-1L;
+ 
  
      @Deprecated
 -    public Integer thrift_max_message_length_in_mb = 16;
 +    public int thrift_max_message_length_in_mb = 16;
      /**
       * Max size of values in SSTables, in MegaBytes.
       * Default is the same as the native protocol frame limit: 256Mb.
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index e452830,8417c39..75296b6
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -454,53 -456,107 +454,63 @@@ public class DatabaseDescripto
              throw new 
ConfigurationException("native_transport_max_frame_size_in_mb must be smaller 
than 2048, but was "
                      + conf.native_transport_max_frame_size_in_mb, false);
  
 -        // fail early instead of OOMing (see CASSANDRA-8116)
 -        if (ThriftServer.HSHA.equals(conf.rpc_server_type) && 
conf.rpc_max_threads == Integer.MAX_VALUE)
 -            throw new ConfigurationException("The hsha rpc_server_type is not 
compatible with an rpc_max_threads " +
 -                                             "setting of 'unlimited'.  Please 
see the comments in cassandra.yaml " +
 -                                             "for rpc_server_type and 
rpc_max_threads.",
 -                                             false);
 -        if (ThriftServer.HSHA.equals(conf.rpc_server_type) && 
conf.rpc_max_threads > (FBUtilities.getAvailableProcessors() * 2 + 1024))
 -            logger.warn("rpc_max_threads setting of {} may be too high for 
the hsha server and cause unnecessary thread contention, reducing performance", 
conf.rpc_max_threads);
 +        // if data dirs, commitlog dir, or saved caches dir are set in 
cassandra.yaml, use that.  Otherwise,
 +        // use -Dcassandra.storagedir (set in cassandra-env.sh) as the parent 
dir for data/, commitlog/, and saved_caches/
 +        if (conf.commitlog_directory == null)
 +        {
 +            conf.commitlog_directory = storagedirFor("commitlog");
 +        }
  
 -        /* end point snitch */
 -        if (conf.endpoint_snitch == null)
 +        if (conf.hints_directory == null)
          {
 -            throw new ConfigurationException("Missing endpoint_snitch 
directive", false);
 +            conf.hints_directory = storagedirFor("hints");
          }
  
+         if (conf.native_transport_max_concurrent_requests_in_bytes <= 0)
+         {
+             conf.native_transport_max_concurrent_requests_in_bytes = 
Runtime.getRuntime().maxMemory() / 10;
+         }
+ 
+         if (conf.native_transport_max_concurrent_requests_in_bytes_per_ip <= 
0)
+         {
+             conf.native_transport_max_concurrent_requests_in_bytes_per_ip = 
Runtime.getRuntime().maxMemory() / 40;
+         }
+ 
 -        snitch = createEndpointSnitch(conf.endpoint_snitch);
 -        EndpointSnitchInfo.create();
 -
 -        localDC = snitch.getDatacenter(FBUtilities.getBroadcastAddress());
 -        localComparator = new Comparator<InetAddress>()
 +        if (conf.cdc_raw_directory == null)
          {
 -            public int compare(InetAddress endpoint1, InetAddress endpoint2)
 -            {
 -                boolean local1 = 
localDC.equals(snitch.getDatacenter(endpoint1));
 -                boolean local2 = 
localDC.equals(snitch.getDatacenter(endpoint2));
 -                if (local1 && !local2)
 -                    return -1;
 -                if (local2 && !local1)
 -                    return 1;
 -                return 0;
 -            }
 -        };
 +            conf.cdc_raw_directory = storagedirFor("cdc_raw");
 +        }
  
 -        /* Request Scheduler setup */
 -        requestSchedulerOptions = conf.request_scheduler_options;
 -        if (conf.request_scheduler != null)
 +        if (conf.commitlog_total_space_in_mb == null)
          {
 +            int preferredSize = 8192;
 +            int minSize = 0;
              try
              {
 -                if (requestSchedulerOptions == null)
 -                {
 -                    requestSchedulerOptions = new RequestSchedulerOptions();
 -                }
 -                Class<?> cls = Class.forName(conf.request_scheduler);
 -                requestScheduler = (IRequestScheduler) 
cls.getConstructor(RequestSchedulerOptions.class).newInstance(requestSchedulerOptions);
 +                // use 1/4 of available space.  See discussion on #10013 and 
#10199
 +                minSize = 
Ints.saturatedCast((guessFileStore(conf.commitlog_directory).getTotalSpace() / 
1048576) / 4);
              }
 -            catch (ClassNotFoundException e)
 +            catch (IOException e)
              {
 -                throw new ConfigurationException("Invalid Request Scheduler 
class " + conf.request_scheduler, false);
 +                logger.debug("Error checking disk space", e);
 +                throw new ConfigurationException(String.format("Unable to 
check disk space available to %s. Perhaps the Cassandra user does not have the 
necessary permissions",
 +                                                               
conf.commitlog_directory), e);
              }
 -            catch (Exception e)
 +            if (minSize < preferredSize)
              {
 -                throw new ConfigurationException("Unable to instantiate 
request scheduler", e);
 +                logger.warn("Small commitlog volume detected at {}; setting 
commitlog_total_space_in_mb to {}.  You can override this in cassandra.yaml",
 +                            conf.commitlog_directory, minSize);
 +                conf.commitlog_total_space_in_mb = minSize;
 +            }
 +            else
 +            {
 +                conf.commitlog_total_space_in_mb = preferredSize;
              }
          }
 -        else
 -        {
 -            requestScheduler = new NoScheduler();
 -        }
 -
 -        if (conf.request_scheduler_id == RequestSchedulerId.keyspace)
 -        {
 -            requestSchedulerId = conf.request_scheduler_id;
 -        }
 -        else
 -        {
 -            // Default to Keyspace
 -            requestSchedulerId = RequestSchedulerId.keyspace;
 -        }
 -
 -        // if data dirs, commitlog dir, or saved caches dir are set in 
cassandra.yaml, use that.  Otherwise,
 -        // use -Dcassandra.storagedir (set in cassandra-env.sh) as the parent 
dir for data/, commitlog/, and saved_caches/
 -        if (conf.commitlog_directory == null)
 -        {
 -            conf.commitlog_directory = 
System.getProperty("cassandra.storagedir", null);
 -            if (conf.commitlog_directory == null)
 -                throw new ConfigurationException("commitlog_directory is 
missing and -Dcassandra.storagedir is not set", false);
 -            conf.commitlog_directory += File.separator + "commitlog";
 -        }
 -
 -        if (conf.hints_directory == null)
 -        {
 -            conf.hints_directory = System.getProperty("cassandra.storagedir", 
null);
 -            if (conf.hints_directory == null)
 -                throw new ConfigurationException("hints_directory is missing 
and -Dcassandra.storagedir is not set", false);
 -            conf.hints_directory += File.separator + "hints";
 -        }
  
 -        if (conf.commitlog_total_space_in_mb == null)
 +        if (conf.cdc_total_space_in_mb == 0)
          {
 -            int preferredSize = 8192;
 +            int preferredSize = 4096;
              int minSize = 0;
              try
              {
diff --cc src/java/org/apache/cassandra/metrics/AuthMetrics.java
index 126738c,0000000..57d08ef
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/metrics/AuthMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AuthMetrics.java
@@@ -1,40 -1,0 +1,40 @@@
 +package org.apache.cassandra.metrics;
 +
 +import com.codahale.metrics.Meter;
 +
 +/**
 + * Metrics about authentication
 + */
 +public class AuthMetrics
 +{
 +
 +    public static final AuthMetrics instance = new AuthMetrics();
 +
 +    public static void init()
 +    {
 +        // no-op, just used to force instance creation
 +    }
 +
 +    /** Number and rate of successful logins */
 +    protected final Meter success;
 +
 +    /** Number and rate of login failures */
 +    protected final Meter failure;
 +
 +    private AuthMetrics()
 +    {
 +
-         success = ClientMetrics.instance.addMeter("AuthSuccess");
-         failure = ClientMetrics.instance.addMeter("AuthFailure");
++        success = ClientMetrics.instance.registerMeter("AuthSuccess");
++        failure = ClientMetrics.instance.registerMeter("AuthFailure");
 +    }
 +
 +    public void markSuccess()
 +    {
 +        success.mark();
 +    }
 +
 +    public void markFailure()
 +    {
 +        failure.mark();
 +    }
 +}
diff --cc src/java/org/apache/cassandra/metrics/ClientMetrics.java
index db6422c,08f0531..67aa05b
--- a/src/java/org/apache/cassandra/metrics/ClientMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
@@@ -36,20 -47,59 +47,55 @@@ public class ClientMetric
      {
      }
  
+     public void pauseConnection() { pausedConnections.incrementAndGet(); }
+     public void unpauseConnection() { pausedConnections.decrementAndGet(); }
+     public void markRequestDiscarded() { requestDiscarded.mark(); }
+ 
+     public synchronized void init(Collection<Server> servers)
+     {
+         if (initialized)
+             return;
+ 
+         this.servers = servers;
+ 
+         registerGauge("connectedNativeClients", this::countConnectedClients);
+ 
+         pausedConnections = new AtomicInteger();
+         pausedConnectionsGauge = registerGauge("PausedConnections", 
pausedConnections::get);
+         requestDiscarded = registerMeter("RequestDiscarded");
+ 
+         initialized = true;
+     }
+ 
      public void addCounter(String name, final Callable<Integer> provider)
      {
 -        Metrics.register(factory.createMetricName(name), new Gauge<Integer>()
 -        {
 -            public Integer getValue()
 +        Metrics.register(factory.createMetricName(name), (Gauge<Integer>) () 
-> {
 +            try
              {
 -                try
 -                {
 -                    return provider.call();
 -                } catch (Exception e)
 -                {
 -                    throw new RuntimeException(e);
 -                }
 +                return provider.call();
 +            } catch (Exception e)
 +            {
 +                throw new RuntimeException(e);
              }
          });
      }
  
-     public Meter addMeter(String name)
+     private int countConnectedClients()
+     {
+         int count = 0;
+ 
+         for (Server server : servers)
+             count += server.getConnectedClients();
+ 
+         return count;
+     }
+ 
+     private <T> Gauge<T> registerGauge(String name, Gauge<T> gauge)
+     {
+         return Metrics.register(factory.createMetricName(name), gauge);
+     }
+ 
 -    private Meter registerMeter(String name)
++    public Meter registerMeter(String name)
      {
          return Metrics.meter(factory.createMetricName(name));
      }
diff --cc src/java/org/apache/cassandra/service/NativeTransportService.java
index 28e7dac,2280818..6343f0d
--- a/src/java/org/apache/cassandra/service/NativeTransportService.java
+++ b/src/java/org/apache/cassandra/service/NativeTransportService.java
@@@ -31,11 -31,9 +31,11 @@@ import io.netty.channel.EventLoopGroup
  import io.netty.channel.epoll.Epoll;
  import io.netty.channel.epoll.EpollEventLoopGroup;
  import io.netty.channel.nio.NioEventLoopGroup;
 +import io.netty.util.concurrent.EventExecutor;
  import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.metrics.AuthMetrics;
  import org.apache.cassandra.metrics.ClientMetrics;
- import org.apache.cassandra.transport.RequestThreadPoolExecutor;
+ import org.apache.cassandra.transport.Message;
  import org.apache.cassandra.transport.Server;
  
  /**
@@@ -108,16 -101,8 +103,10 @@@ public class NativeTransportServic
          }
  
          // register metrics
-         ClientMetrics.instance.addCounter("connectedNativeClients", () ->
-         {
-             int ret = 0;
-             for (Server server : servers)
-                 ret += server.getConnectedClients();
-             return ret;
-         });
+         ClientMetrics.instance.init(servers);
  
 +        AuthMetrics.init();
 +
          initialized = true;
      }
  
diff --cc src/java/org/apache/cassandra/transport/Connection.java
index a04a055,2966d9b..7e17f46
--- a/src/java/org/apache/cassandra/transport/Connection.java
+++ b/src/java/org/apache/cassandra/transport/Connection.java
@@@ -29,8 -29,9 +29,9 @@@ public class Connectio
      private final Tracker tracker;
  
      private volatile FrameCompressor frameCompressor;
+     private boolean throwOnOverload;
  
 -    public Connection(Channel channel, int version, Tracker tracker)
 +    public Connection(Channel channel, ProtocolVersion version, Tracker 
tracker)
      {
          this.channel = channel;
          this.version = version;
diff --cc src/java/org/apache/cassandra/transport/Frame.java
index 6cd8b1e,c28be9f..388cbc2
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@@ -67,9 -66,9 +67,9 @@@ public class Fram
          return body.release();
      }
  
 -    public static Frame create(Message.Type type, int streamId, int version, 
EnumSet<Header.Flag> flags, ByteBuf body)
 +    public static Frame create(Message.Type type, int streamId, 
ProtocolVersion version, EnumSet<Header.Flag> flags, ByteBuf body)
      {
-         Header header = new Header(version, flags, streamId, type);
+         Header header = new Header(version, flags, streamId, type, 
body.readableBytes());
          return new Frame(header, body);
      }
  
@@@ -84,16 -83,23 +84,18 @@@
          public final EnumSet<Flag> flags;
          public final int streamId;
          public final Message.Type type;
+         public final long bodySizeInBytes;
  
-         private Header(ProtocolVersion version, EnumSet<Flag> flags, int 
streamId, Message.Type type)
 -        private Header(int version, int flags, int streamId, Message.Type 
type, long bodySizeInBytes)
 -        {
 -            this(version, Flag.deserialize(flags), streamId, type, 
bodySizeInBytes);
 -        }
 -
 -        private Header(int version, EnumSet<Flag> flags, int streamId, 
Message.Type type, long bodySizeInBytes)
++        private Header(ProtocolVersion version, EnumSet<Flag> flags, int 
streamId, Message.Type type, long bodySizeInBytes)
          {
              this.version = version;
              this.flags = flags;
              this.streamId = streamId;
              this.type = type;
+             this.bodySizeInBytes = bodySizeInBytes;
          }
  
 -        public static enum Flag
 +        public enum Flag
          {
              // The order of that enum matters!!
              COMPRESSED,
@@@ -240,7 -242,7 +242,7 @@@
                          streamId);
              }
  
-             results.add(new Frame(new Header(version, decodedFlags, streamId, 
type), body));
 -            results.add(new Frame(new Header(version, flags, streamId, type, 
bodyLength), body));
++            results.add(new Frame(new Header(version, decodedFlags, streamId, 
type, bodyLength), body));
          }
  
          private void fail()
diff --cc src/java/org/apache/cassandra/transport/Message.java
index de048a6,08a8600..271b690
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@@ -548,10 -576,90 +579,91 @@@ public abstract class Messag
          @Override
          public void channelRead0(ChannelHandlerContext ctx, Request request)
          {
+             // if we decide to handle this message, process it outside of the 
netty event loop
+             if (shouldHandleRequest(ctx, request))
+                 requestExecutor.submit(() -> processRequest(ctx, request));
+         }
  
+         /** This check for inflight payload to potentially discard the 
request should have been ideally in one of the
+          * first handlers in the pipeline (Frame::decode()). However, incase 
of any exception thrown between that
+          * handler (where inflight payload is incremented) and this handler 
(Dispatcher::channelRead0) (where inflight
+          * payload in decremented), inflight payload becomes erroneous. 
ExceptionHandler is not sufficient for this
+          * purpose since it does not have the frame associated with the 
exception.
+          *
+          * Note: this method should execute on the netty event loop.
+          */
+         private boolean shouldHandleRequest(ChannelHandlerContext ctx, 
Request request)
+         {
+             long frameSize = request.getSourceFrame().header.bodySizeInBytes;
+ 
+             ResourceLimits.EndpointAndGlobal 
endpointAndGlobalPayloadsInFlight = 
endpointPayloadTracker.endpointAndGlobalPayloadsInFlight;
+ 
+             // check for overloaded state by trying to allocate framesize to 
inflight payload trackers
+             if (endpointAndGlobalPayloadsInFlight.tryAllocate(frameSize) != 
ResourceLimits.Outcome.SUCCESS)
+             {
+                 if (request.connection.isThrowOnOverload())
+                 {
+                     // discard the request and throw an exception
+                     ClientMetrics.instance.markRequestDiscarded();
+                     logger.trace("Discarded request of size: {}. 
InflightChannelRequestPayload: {}, InflightEndpointRequestPayload: {}, 
InflightOverallRequestPayload: {}, Request: {}",
+                                  frameSize,
+                                  channelPayloadBytesInFlight,
+                                  
endpointAndGlobalPayloadsInFlight.endpoint().using(),
+                                  
endpointAndGlobalPayloadsInFlight.global().using(),
+                                  request);
+                     throw ErrorMessage.wrap(new OverloadedException("Server 
is in overloaded state. Cannot accept more requests at this point"),
+                                             
request.getSourceFrame().header.streamId);
+                 }
+                 else
+                 {
+                     // set backpressure on the channel, and handle the request
+                     endpointAndGlobalPayloadsInFlight.allocate(frameSize);
+                     ctx.channel().config().setAutoRead(false);
+                     ClientMetrics.instance.pauseConnection();
+                     paused = true;
+                 }
+             }
+ 
+             channelPayloadBytesInFlight += frameSize;
+             return true;
+         }
+ 
+         /**
+          * Note: this method will be used in the {@link Flusher#run()}, which 
executes on the netty event loop
+          * ({@link Dispatcher#flusherLookup}). Thus, we assume the semantics 
and visibility of variables
+          * of being on the event loop.
+          */
+         private void releaseItem(FlushItem item)
+         {
+             long itemSize = item.sourceFrame.header.bodySizeInBytes;
+             item.sourceFrame.release();
+ 
+             // since the request has been processed, decrement inflight 
payload at channel, endpoint and global levels
+             channelPayloadBytesInFlight -= itemSize;
+             ResourceLimits.Outcome endpointGlobalReleaseOutcome = 
endpointPayloadTracker.endpointAndGlobalPayloadsInFlight.release(itemSize);
+ 
+             // now check to see if we need to reenable the channel's autoRead.
+             // If the current payload side is zero, we must reenable autoread 
as
+             // 1) we allow no other thread/channel to do it, and
+             // 2) there's no other events following this one (becuase we're 
at zero bytes in flight),
+             // so no successive to trigger the other clause in this if-block
+             ChannelConfig config = item.ctx.channel().config();
+             if (paused && (channelPayloadBytesInFlight == 0 || 
endpointGlobalReleaseOutcome == ResourceLimits.Outcome.BELOW_LIMIT))
+             {
+                 paused = false;
+                 ClientMetrics.instance.unpauseConnection();
+                 config.setAutoRead(true);
+             }
+         }
+ 
+         /**
+          * Note: this method is not expected to execute on the netty event 
loop.
+          */
+         void processRequest(ChannelHandlerContext ctx, Request request)
+         {
              final Response response;
              final ServerConnection connection;
 +            long queryStartNanoTime = System.nanoTime();
  
              try
              {
diff --cc src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index eb82292,92278fa..8b4b0a4
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@@ -35,8 -35,8 +35,9 @@@ public class StartupMessage extends Mes
  {
      public static final String CQL_VERSION = "CQL_VERSION";
      public static final String COMPRESSION = "COMPRESSION";
 +    public static final String PROTOCOL_VERSIONS = "PROTOCOL_VERSIONS";
      public static final String NO_COMPACT = "NO_COMPACT";
+     public static final String THROW_ON_OVERLOAD = "THROW_ON_OVERLOAD";
  
      public static final Message.Codec<StartupMessage> codec = new 
Message.Codec<StartupMessage>()
      {
diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java
index 192cbbc,999404e..d438630
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@@ -44,10 -44,11 +44,11 @@@ import com.datastax.driver.core.ResultS
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.concurrent.ScheduledExecutors;
  import org.apache.cassandra.config.CFMetaData;
+ import org.apache.cassandra.metrics.ClientMetrics;
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.config.Schema;
 +import org.apache.cassandra.config.SchemaConstants;
  import org.apache.cassandra.cql3.functions.FunctionName;
 -import org.apache.cassandra.cql3.functions.ThreadAwareSecurityManager;
  import org.apache.cassandra.cql3.statements.ParsedStatement;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.commitlog.CommitLog;
@@@ -357,9 -335,10 +358,10 @@@ public abstract class CQLTeste
          SchemaLoader.startGossiper();
  
          server = new 
Server.Builder().withHost(nativeAddr).withPort(nativePort).build();
+         ClientMetrics.instance.init(Collections.singleton(server));
          server.start();
  
 -        for (int version : PROTOCOL_VERSIONS)
 +        for (ProtocolVersion version : PROTOCOL_VERSIONS)
          {
              if (clusters.containsKey(version))
                  continue;
diff --cc 
test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
index 0000000,e4d335b..21dfed8
mode 000000,100644..100644
--- 
a/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
+++ 
b/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
@@@ -1,0 -1,248 +1,248 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.transport;
+ 
+ import org.junit.After;
+ import org.junit.AfterClass;
+ import org.junit.Assert;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ 
+ import org.apache.cassandra.OrderedJUnit4ClassRunner;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.EncryptionOptions;
+ import org.apache.cassandra.cql3.CQLTester;
+ import org.apache.cassandra.cql3.QueryOptions;
+ import org.apache.cassandra.cql3.QueryProcessor;
+ import org.apache.cassandra.exceptions.OverloadedException;
+ import org.apache.cassandra.transport.messages.QueryMessage;
+ 
+ @RunWith(OrderedJUnit4ClassRunner.class)
+ public class InflightRequestPayloadTrackerTest extends CQLTester
+ {
+     @BeforeClass
+     public static void setUp()
+     {
+         
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(600);
+         
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(600);
+         requireNetwork();
+     }
+ 
+     @AfterClass
+     public static void tearDown()
+     {
+         
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(3000000000L);
+         
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(5000000000L);
+     }
+ 
+     @After
+     public void dropCreatedTable()
+     {
+         try
+         {
+             QueryProcessor.executeOnceInternal("DROP TABLE " + KEYSPACE + 
".atable");
+         }
+         catch (Throwable t)
+         {
+             // ignore
+         }
+     }
+ 
+     @Test
+     public void testQueryExecutionWithThrowOnOverload() throws Throwable
+     {
+         SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                                nativePort,
 -                                               Server.CURRENT_VERSION,
++                                               ProtocolVersion.V4,
+                                                new 
EncryptionOptions.ClientEncryptionOptions());
+ 
+         try
+         {
+             client.connect(false, true);
+             QueryOptions queryOptions = QueryOptions.create(
+             QueryOptions.DEFAULT.getConsistency(),
+             QueryOptions.DEFAULT.getValues(),
+             QueryOptions.DEFAULT.skipMetadata(),
+             QueryOptions.DEFAULT.getPageSize(),
+             QueryOptions.DEFAULT.getPagingState(),
+             QueryOptions.DEFAULT.getSerialConsistency(),
 -            Server.CURRENT_VERSION);
++            ProtocolVersion.V4);
+ 
+             QueryMessage queryMessage = new 
QueryMessage(String.format("CREATE TABLE %s.atable (pk1 int PRIMARY KEY, v 
text)", KEYSPACE),
+                                                          queryOptions);
+             client.execute(queryMessage);
+         }
+         finally
+         {
+             client.close();
+         }
+     }
+ 
+     @Test
+     public void testQueryExecutionWithoutThrowOnOverload() throws Throwable
+     {
+         SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                                nativePort,
 -                                               Server.CURRENT_VERSION,
++                                               ProtocolVersion.V4,
+                                                new 
EncryptionOptions.ClientEncryptionOptions());
+ 
+         try
+         {
+             client.connect(false, false);
+             QueryOptions queryOptions = QueryOptions.create(
+             QueryOptions.DEFAULT.getConsistency(),
+             QueryOptions.DEFAULT.getValues(),
+             QueryOptions.DEFAULT.skipMetadata(),
+             QueryOptions.DEFAULT.getPageSize(),
+             QueryOptions.DEFAULT.getPagingState(),
+             QueryOptions.DEFAULT.getSerialConsistency(),
 -            Server.CURRENT_VERSION);
++            ProtocolVersion.V4);
+ 
+             QueryMessage queryMessage = new 
QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v 
text)", KEYSPACE),
+                                                          queryOptions);
+             client.execute(queryMessage);
+             queryMessage = new QueryMessage(String.format("SELECT * FROM 
%s.atable", KEYSPACE),
+                                             queryOptions);
+             client.execute(queryMessage);
+         }
+         finally
+         {
+             client.close();
+         }
+     }
+ 
+     @Test
+     public void 
testQueryExecutionWithoutThrowOnOverloadAndInflightLimitedExceeded() throws 
Throwable
+     {
+         SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                                nativePort,
 -                                               Server.CURRENT_VERSION,
++                                               ProtocolVersion.V4,
+                                                new 
EncryptionOptions.ClientEncryptionOptions());
+ 
+         try
+         {
+             client.connect(false, false);
+             QueryOptions queryOptions = QueryOptions.create(
+             QueryOptions.DEFAULT.getConsistency(),
+             QueryOptions.DEFAULT.getValues(),
+             QueryOptions.DEFAULT.skipMetadata(),
+             QueryOptions.DEFAULT.getPageSize(),
+             QueryOptions.DEFAULT.getPagingState(),
+             QueryOptions.DEFAULT.getSerialConsistency(),
 -            Server.CURRENT_VERSION);
++            ProtocolVersion.V4);
+ 
+             QueryMessage queryMessage = new 
QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v 
text)", KEYSPACE),
+                                                          queryOptions);
+             client.execute(queryMessage);
+ 
+             queryMessage = new QueryMessage(String.format("INSERT INTO 
%s.atable (pk, v) VALUES (1, 
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
 [...]
+                                             queryOptions);
+             client.execute(queryMessage);
+         }
+         finally
+         {
+             client.close();
+         }
+     }
+ 
+     @Test
+     public void testOverloadedExceptionForEndpointInflightLimit() throws 
Throwable
+     {
+         SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                                nativePort,
 -                                               Server.CURRENT_VERSION,
++                                               ProtocolVersion.V4,
+                                                new 
EncryptionOptions.ClientEncryptionOptions());
+ 
+         try
+         {
+             client.connect(false, true);
+             QueryOptions queryOptions = QueryOptions.create(
+             QueryOptions.DEFAULT.getConsistency(),
+             QueryOptions.DEFAULT.getValues(),
+             QueryOptions.DEFAULT.skipMetadata(),
+             QueryOptions.DEFAULT.getPageSize(),
+             QueryOptions.DEFAULT.getPagingState(),
+             QueryOptions.DEFAULT.getSerialConsistency(),
 -            Server.CURRENT_VERSION);
++            ProtocolVersion.V4);
+ 
+             QueryMessage queryMessage = new 
QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v 
text)", KEYSPACE),
+                                                          queryOptions);
+             client.execute(queryMessage);
+ 
+             queryMessage = new QueryMessage(String.format("INSERT INTO 
%s.atable (pk, v) VALUES (1, 
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
 [...]
+                                             queryOptions);
+             try
+             {
+                 client.execute(queryMessage);
+                 Assert.fail();
+             }
+             catch (RuntimeException e)
+             {
+                 Assert.assertTrue(e.getCause() instanceof 
OverloadedException);
+             }
+         }
+         finally
+         {
+             client.close();
+         }
+     }
+ 
+     @Test
+     public void testOverloadedExceptionForOverallInflightLimit() throws 
Throwable
+     {
+         SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                                nativePort,
 -                                               Server.CURRENT_VERSION,
++                                               ProtocolVersion.V4,
+                                                new 
EncryptionOptions.ClientEncryptionOptions());
+ 
+         try
+         {
+             client.connect(false, true);
+             QueryOptions queryOptions = QueryOptions.create(
+             QueryOptions.DEFAULT.getConsistency(),
+             QueryOptions.DEFAULT.getValues(),
+             QueryOptions.DEFAULT.skipMetadata(),
+             QueryOptions.DEFAULT.getPageSize(),
+             QueryOptions.DEFAULT.getPagingState(),
+             QueryOptions.DEFAULT.getSerialConsistency(),
 -            Server.CURRENT_VERSION);
++            ProtocolVersion.V4);
+ 
+             QueryMessage queryMessage = new 
QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v 
text)", KEYSPACE),
+                                                          queryOptions);
+             client.execute(queryMessage);
+ 
+             queryMessage = new QueryMessage(String.format("INSERT INTO 
%s.atable (pk, v) VALUES (1, 
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
 [...]
+                                             queryOptions);
+             try
+             {
+                 client.execute(queryMessage);
+                 Assert.fail();
+             }
+             catch (RuntimeException e)
+             {
+                 Assert.assertTrue(e.getCause() instanceof 
OverloadedException);
+             }
+         }
+         finally
+         {
+             client.close();
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to