This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 8a04204c10eafc2c1051a336d82211899889e276
Merge: 149caf0 f5fe483
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Mon Jul 15 15:13:53 2019 +0100

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 doc/native_protocol_v4.spec                        |   4 +
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |  30 +++
 .../apache/cassandra/metrics/ClientMetrics.java    |  14 ++
 .../org/apache/cassandra/net/ResourceLimits.java   |  70 ++++--
 .../cassandra/service/NativeTransportService.java  |  17 +-
 .../org/apache/cassandra/transport/Connection.java |  11 +
 src/java/org/apache/cassandra/transport/Frame.java |   8 +-
 .../org/apache/cassandra/transport/Message.java    | 146 +++++++++++-
 .../transport/RequestThreadPoolExecutor.java       |  96 --------
 .../org/apache/cassandra/transport/Server.java     |  64 +++--
 .../apache/cassandra/transport/SimpleClient.java   |   8 +
 .../transport/messages/StartupMessage.java         |   3 +
 test/unit/org/apache/cassandra/cql3/CQLTester.java |   9 +-
 .../service/NativeTransportServiceTest.java        |   3 +-
 .../InflightRequestPayloadTrackerTest.java         | 258 +++++++++++++++++++++
 17 files changed, 592 insertions(+), 152 deletions(-)

diff --cc src/java/org/apache/cassandra/config/Config.java
index 6b487fe,1d79e2a..34a5ce8
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -182,11 -160,13 +182,13 @@@ public class Confi
      public int native_transport_max_frame_size_in_mb = 256;
      public volatile long native_transport_max_concurrent_connections = -1L;
      public volatile long native_transport_max_concurrent_connections_per_ip = 
-1L;
 -    public boolean native_transport_flush_in_batches_legacy = true;
 +    public boolean native_transport_flush_in_batches_legacy = false;
 +    public volatile boolean native_transport_allow_older_protocols = true;
 +    public int native_transport_frame_block_size_in_kb = 32;
+     public volatile long 
native_transport_max_concurrent_requests_in_bytes_per_ip = -1L;
+     public volatile long native_transport_max_concurrent_requests_in_bytes = 
-1L;
  
  
 -    @Deprecated
 -    public int thrift_max_message_length_in_mb = 16;
      /**
       * Max size of values in SSTables, in MegaBytes.
       * Default is the same as the native protocol frame limit: 256Mb.
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index bb92716,75296b6..0166c5f
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -2010,31 -1849,36 +2020,51 @@@ public class DatabaseDescripto
          return conf.native_transport_flush_in_batches_legacy;
      }
  
 -    public static double getCommitLogSyncBatchWindow()
 +    public static boolean getNativeTransportAllowOlderProtocols()
      {
 -        return conf.commitlog_sync_batch_window_in_ms;
 +        return conf.native_transport_allow_older_protocols;
      }
  
 -    public static void setCommitLogSyncBatchWindow(double windowMillis)
 +    public static void setNativeTransportAllowOlderProtocols(boolean 
isEnabled)
      {
 -        conf.commitlog_sync_batch_window_in_ms = windowMillis;
 +        conf.native_transport_allow_older_protocols = isEnabled;
 +    }
 +
 +    public static int getNativeTransportFrameBlockSize()
 +    {
 +        return conf.native_transport_frame_block_size_in_kb * 1024;
 +    }
 +
 +    public static double getCommitLogSyncGroupWindow()
 +    {
 +        return conf.commitlog_sync_group_window_in_ms;
 +    }
 +
 +    public static void setCommitLogSyncGroupWindow(double windowMillis)
 +    {
 +        conf.commitlog_sync_group_window_in_ms = windowMillis;
      }
  
+     public static long getNativeTransportMaxConcurrentRequestsInBytesPerIp()
+     {
+         return conf.native_transport_max_concurrent_requests_in_bytes_per_ip;
+     }
+ 
+     public static void 
setNativeTransportMaxConcurrentRequestsInBytesPerIp(long 
maxConcurrentRequestsInBytes)
+     {
+         conf.native_transport_max_concurrent_requests_in_bytes_per_ip = 
maxConcurrentRequestsInBytes;
+     }
+ 
+     public static long getNativeTransportMaxConcurrentRequestsInBytes()
+     {
+         return conf.native_transport_max_concurrent_requests_in_bytes;
+     }
+ 
+     public static void setNativeTransportMaxConcurrentRequestsInBytes(long 
maxConcurrentRequestsInBytes)
+     {
+         conf.native_transport_max_concurrent_requests_in_bytes = 
maxConcurrentRequestsInBytes;
+     }
+ 
      public static int getCommitLogSyncPeriod()
      {
          return conf.commitlog_sync_period_in_ms;
diff --cc src/java/org/apache/cassandra/metrics/ClientMetrics.java
index a80033a,67aa05b..7599096
--- a/src/java/org/apache/cassandra/metrics/ClientMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
@@@ -18,7 -18,10 +18,8 @@@
   */
  package org.apache.cassandra.metrics;
  
 -import java.util.Collection;
 -import java.util.Collections;
 -import java.util.concurrent.Callable;
 +import java.util.*;
+ import java.util.concurrent.atomic.AtomicInteger;
  
  import com.codahale.metrics.Gauge;
  import com.codahale.metrics.Meter;
@@@ -28,42 -29,28 +29,51 @@@ import org.apache.cassandra.transport.S
  
  import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
  
 -
 -public class ClientMetrics
 +public final class ClientMetrics
  {
 -    private static final MetricNameFactory factory = new 
DefaultNameFactory("Client");
      public static final ClientMetrics instance = new ClientMetrics();
  
 -    private volatile boolean initialized = false;
 +    private static final MetricNameFactory factory = new 
DefaultNameFactory("Client");
  
 +    private volatile boolean initialized = false;
      private Collection<Server> servers = Collections.emptyList();
  
 +    private Meter authSuccess;
 +    private Meter authFailure;
 +
+     private AtomicInteger pausedConnections;
+     private Gauge<Integer> pausedConnectionsGauge;
+     private Meter requestDiscarded;
+ 
      private ClientMetrics()
      {
      }
  
 +    public void markAuthSuccess()
 +    {
 +        authSuccess.mark();
 +    }
 +
 +    public void markAuthFailure()
 +    {
 +        authFailure.mark();
 +    }
 +
+     public void pauseConnection() { pausedConnections.incrementAndGet(); }
+     public void unpauseConnection() { pausedConnections.decrementAndGet(); }
++
+     public void markRequestDiscarded() { requestDiscarded.mark(); }
+ 
 +    public List<ConnectedClient> allConnectedClients()
 +    {
 +        List<ConnectedClient> clients = new ArrayList<>();
 +
 +        for (Server server : servers)
 +            clients.addAll(server.getConnectedClients());
 +
 +        return clients;
 +    }
 +
      public synchronized void init(Collection<Server> servers)
      {
          if (initialized)
@@@ -71,14 -58,12 +81,18 @@@
  
          this.servers = servers;
  
 -        registerGauge("connectedNativeClients", this::countConnectedClients);
 +        registerGauge("connectedNativeClients",       
this::countConnectedClients);
 +        registerGauge("connectedNativeClientsByUser", 
this::countConnectedClientsByUser);
 +        registerGauge("connections",                  this::connectedClients);
 +        registerGauge("clientsByProtocolVersion",     
this::recentClientStats);
 +
 +        authSuccess = registerMeter("AuthSuccess");
 +        authFailure = registerMeter("AuthFailure");
  
+         pausedConnections = new AtomicInteger();
+         pausedConnectionsGauge = registerGauge("PausedConnections", 
pausedConnections::get);
+         requestDiscarded = registerMeter("RequestDiscarded");
+ 
          initialized = true;
      }
  
diff --cc src/java/org/apache/cassandra/service/NativeTransportService.java
index 79caafc,6343f0d..66b5000
--- a/src/java/org/apache/cassandra/service/NativeTransportService.java
+++ b/src/java/org/apache/cassandra/service/NativeTransportService.java
@@@ -32,12 -31,12 +32,11 @@@ import io.netty.channel.EventLoopGroup
  import io.netty.channel.epoll.Epoll;
  import io.netty.channel.epoll.EpollEventLoopGroup;
  import io.netty.channel.nio.NioEventLoopGroup;
--import io.netty.util.concurrent.EventExecutor;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.metrics.AuthMetrics;
  import org.apache.cassandra.metrics.ClientMetrics;
- import org.apache.cassandra.transport.RequestThreadPoolExecutor;
+ import org.apache.cassandra.transport.Message;
  import org.apache.cassandra.transport.Server;
 +import org.apache.cassandra.utils.NativeLibrary;
  
  /**
   * Handles native transport server lifecycle and associated resources. Lazily 
initialized.
diff --cc src/java/org/apache/cassandra/transport/Connection.java
index 908e7e9,7e17f46..b7f5b17
--- a/src/java/org/apache/cassandra/transport/Connection.java
+++ b/src/java/org/apache/cassandra/transport/Connection.java
@@@ -29,7 -28,8 +29,8 @@@ public class Connectio
      private final ProtocolVersion version;
      private final Tracker tracker;
  
 -    private volatile FrameCompressor frameCompressor;
 +    private volatile FrameBodyTransformer transformer;
+     private boolean throwOnOverload;
  
      public Connection(Channel channel, ProtocolVersion version, Tracker 
tracker)
      {
@@@ -40,16 -40,26 +41,26 @@@
          tracker.addConnection(channel, this);
      }
  
 -    public void setCompressor(FrameCompressor compressor)
 +    public void setTransformer(FrameBodyTransformer transformer)
      {
 -        this.frameCompressor = compressor;
 +        this.transformer = transformer;
      }
  
 -    public FrameCompressor getCompressor()
 +    public FrameBodyTransformer getTransformer()
      {
 -        return frameCompressor;
 +        return transformer;
      }
  
+     public void setThrowOnOverload(boolean throwOnOverload)
+     {
+         this.throwOnOverload = throwOnOverload;
+     }
+ 
+     public boolean isThrowOnOverload()
+     {
+         return throwOnOverload;
+     }
+ 
      public Tracker getTracker()
      {
          return tracker;
diff --cc src/java/org/apache/cassandra/transport/Frame.java
index d3c810b,388cbc2..8163d7a
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@@ -227,16 -225,6 +229,16 @@@ public class Fram
              idx += bodyLength;
              buffer.readerIndex(idx);
  
-             return new Frame(new Header(version, decodedFlags, streamId, 
type), body);
++            return new Frame(new Header(version, decodedFlags, streamId, 
type, bodyLength), body);
 +        }
 +
 +        @Override
 +        protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, 
List<Object> results)
 +        throws Exception
 +        {
 +            Frame frame = decodeFrame(buffer);
 +            if (frame == null) return;
 +
              Attribute<Connection> attrConn = 
ctx.channel().attr(Connection.attributeKey);
              Connection connection = attrConn.get();
              if (connection == null)
diff --cc src/java/org/apache/cassandra/transport/Message.java
index 0571478,271b690..99c0127
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@@ -42,14 -42,18 +42,21 @@@ import com.google.common.collect.Immuta
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
+ import org.apache.cassandra.concurrent.LocalAwareExecutorService;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.exceptions.OverloadedException;
+ import org.apache.cassandra.metrics.ClientMetrics;
+ import org.apache.cassandra.net.ResourceLimits;
  import org.apache.cassandra.service.ClientWarn;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.transport.messages.*;
  import org.apache.cassandra.service.QueryState;
  import org.apache.cassandra.utils.JVMStabilityInspector;
 +import org.apache.cassandra.utils.UUIDGen;
  
+ import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
+ 
  /**
   * A message from the CQL binary protocol.
   */
diff --cc src/java/org/apache/cassandra/transport/Server.java
index f16aa88,b96c517..c4690f1
--- a/src/java/org/apache/cassandra/transport/Server.java
+++ b/src/java/org/apache/cassandra/transport/Server.java
@@@ -23,8 -23,11 +23,10 @@@ import java.net.InetSocketAddress
  import java.net.UnknownHostException;
  import java.util.*;
  import java.util.concurrent.ConcurrentHashMap;
 -import java.util.concurrent.atomic.AtomicBoolean;
 -import javax.net.ssl.SSLContext;
 -import javax.net.ssl.SSLEngine;
 +import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -54,9 -51,7 +56,10 @@@ import org.apache.cassandra.auth.Authen
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.config.EncryptionOptions;
  import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.locator.InetAddressAndPort;
+ import org.apache.cassandra.net.ResourceLimits;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.SchemaChangeListener;
  import org.apache.cassandra.security.SSLFactory;
  import org.apache.cassandra.service.*;
  import org.apache.cassandra.transport.messages.EventMessage;
@@@ -104,11 -98,9 +106,9 @@@ public class Server implements Cassandr
              else
                  workerGroup = new NioEventLoopGroup();
          }
-         if (builder.eventExecutorGroup != null)
-             eventExecutorGroup = builder.eventExecutorGroup;
          EventNotifier notifier = new EventNotifier(this);
          StorageService.instance.register(notifier);
 -        MigrationManager.instance.register(notifier);
 +        Schema.instance.registerListener(notifier);
      }
  
      public void stop()
@@@ -321,36 -274,60 +315,78 @@@
              */
              return allChannels.size() != 0 ? allChannels.size() - 1 : 0;
          }
 +
 +        Map<String, Integer> countConnectedClientsByUser()
 +        {
 +            Map<String, Integer> result = new HashMap<>();
 +            for (Channel c : allChannels)
 +            {
 +                Connection connection = c.attr(Connection.attributeKey).get();
 +                if (connection instanceof ServerConnection)
 +                {
 +                    ServerConnection conn = (ServerConnection) connection;
 +                    AuthenticatedUser user = conn.getClientState().getUser();
 +                    String name = (null != user) ? user.getName() : null;
 +                    result.put(name, result.getOrDefault(name, 0) + 1);
 +                }
 +            }
 +            return result;
 +        }
 +
      }
  
+     // global inflight payload across all channels across all endpoints
+     private static final ResourceLimits.Concurrent 
globalRequestPayloadInFlight = new 
ResourceLimits.Concurrent(DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytes());
+ 
+     public static class EndpointPayloadTracker
+     {
+         // inflight payload per endpoint across corresponding channels
+         private static final ConcurrentMap<InetAddress, 
EndpointPayloadTracker> requestPayloadInFlightPerEndpoint = new 
ConcurrentHashMap<>();
+ 
+         private final AtomicInteger refCount = new AtomicInteger(0);
+         private final InetAddress endpoint;
+ 
+         final ResourceLimits.EndpointAndGlobal 
endpointAndGlobalPayloadsInFlight = new ResourceLimits.EndpointAndGlobal(new 
ResourceLimits.Concurrent(DatabaseDescriptor.getNativeTransportMaxConcurrentRequestsInBytesPerIp()),
+                                                                               
                                           globalRequestPayloadInFlight);
+ 
+         private EndpointPayloadTracker(InetAddress endpoint)
+         {
+             this.endpoint = endpoint;
+         }
+ 
+         public static EndpointPayloadTracker get(InetAddress endpoint)
+         {
+             while (true)
+             {
+                 EndpointPayloadTracker result = 
requestPayloadInFlightPerEndpoint.computeIfAbsent(endpoint, 
EndpointPayloadTracker::new);
+                 if (result.acquire())
+                     return result;
+ 
+                 requestPayloadInFlightPerEndpoint.remove(endpoint, result);
+             }
+         }
+ 
+         private boolean acquire()
+         {
+             return 0 < refCount.updateAndGet(i -> i < 0 ? i : i + 1);
+         }
+ 
+         public void release()
+         {
+             if (-1 == refCount.updateAndGet(i -> i == 1 ? -1 : i - 1))
+                 requestPayloadInFlightPerEndpoint.remove(endpoint, this);
+         }
+     }
+ 
      private static class Initializer extends ChannelInitializer<Channel>
      {
          // Stateless handlers
          private static final Message.ProtocolDecoder messageDecoder = new 
Message.ProtocolDecoder();
          private static final Message.ProtocolEncoder messageEncoder = new 
Message.ProtocolEncoder();
 -        private static final Frame.Decompressor frameDecompressor = new 
Frame.Decompressor();
 -        private static final Frame.Compressor frameCompressor = new 
Frame.Compressor();
 +        private static final Frame.InboundBodyTransformer 
inboundFrameTransformer = new Frame.InboundBodyTransformer();
 +        private static final Frame.OutboundBodyTransformer 
outboundFrameTransformer = new Frame.OutboundBodyTransformer();
          private static final Frame.Encoder frameEncoder = new Frame.Encoder();
          private static final Message.ExceptionHandler exceptionHandler = new 
Message.ExceptionHandler();
-         private static final Message.Dispatcher dispatcher = new 
Message.Dispatcher(DatabaseDescriptor.useNativeTransportLegacyFlusher());
          private static final ConnectionLimitHandler connectionLimitHandler = 
new ConnectionLimitHandler();
  
          private final Server server;
diff --cc src/java/org/apache/cassandra/transport/SimpleClient.java
index deba207,c03becd..6340b69
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@@ -119,31 -118,31 +119,39 @@@ public class SimpleClient implements Cl
  
      public SimpleClient(String host, int port)
      {
 -        this(host, port, new ClientEncryptionOptions());
 +        this(host, port, new EncryptionOptions());
      }
  
 -    public void connect(boolean useCompression) throws IOException
 +    public SimpleClient connect(boolean useCompression, boolean useChecksums) 
throws IOException
      {
 -        connect(useCompression, false);
++        return connect(useCompression, useChecksums, false);
+     }
+ 
 -    public void connect(boolean useCompression, boolean throwOnOverload) 
throws IOException
++    public SimpleClient connect(boolean useCompression, boolean useChecksums, 
boolean throwOnOverload) throws IOException
+     {
          establishConnection();
  
          Map<String, String> options = new HashMap<>();
          options.put(StartupMessage.CQL_VERSION, "3.0.0");
 -
+         if (throwOnOverload)
+             options.put(StartupMessage.THROW_ON_OVERLOAD, "1");
+         connection.setThrowOnOverload(throwOnOverload);
  
 -        if (useCompression)
 +        if (useChecksums)
 +        {
 +            Compressor compressor = useCompression ? LZ4Compressor.INSTANCE : 
null;
 +            
connection.setTransformer(ChecksummingTransformer.getTransformer(ChecksumType.CRC32,
 compressor));
 +            options.put(StartupMessage.CHECKSUM, "crc32");
 +            options.put(StartupMessage.COMPRESSION, "lz4");
 +        }
 +        else if (useCompression)
          {
 -            options.put(StartupMessage.COMPRESSION, "snappy");
 -            
connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
 +            
connection.setTransformer(CompressingTransformer.getTransformer(LZ4Compressor.INSTANCE));
 +            options.put(StartupMessage.COMPRESSION, "lz4");
          }
 +
          execute(new StartupMessage(options));
 +        return this;
      }
  
      public void setEventHandler(EventHandler eventHandler)
diff --cc src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index ef846c1,8b4b0a4..ee2b34e
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@@ -43,9 -36,8 +43,10 @@@ public class StartupMessage extends Mes
      public static final String CQL_VERSION = "CQL_VERSION";
      public static final String COMPRESSION = "COMPRESSION";
      public static final String PROTOCOL_VERSIONS = "PROTOCOL_VERSIONS";
 -    public static final String NO_COMPACT = "NO_COMPACT";
 +    public static final String DRIVER_NAME = "DRIVER_NAME";
 +    public static final String DRIVER_VERSION = "DRIVER_VERSION";
 +    public static final String CHECKSUM = "CONTENT_CHECKSUM";
+     public static final String THROW_ON_OVERLOAD = "THROW_ON_OVERLOAD";
  
      public static final Message.Codec<StartupMessage> codec = new 
Message.Codec<StartupMessage>()
      {
@@@ -90,28 -81,30 +91,30 @@@
              throw new ProtocolException(e.getMessage());
          }
  
 -        if (options.containsKey(COMPRESSION))
 +        ChecksumType checksumType = getChecksumType();
 +        Compressor compressor = getCompressor();
 +
 +        if (null != checksumType)
          {
 -            String compression = options.get(COMPRESSION).toLowerCase();
 -            if (compression.equals("snappy"))
 -            {
 -                if (FrameCompressor.SnappyCompressor.instance == null)
 -                    throw new ProtocolException("This instance does not 
support Snappy compression");
 -                
connection.setCompressor(FrameCompressor.SnappyCompressor.instance);
 -            }
 -            else if (compression.equals("lz4"))
 -            {
 -                
connection.setCompressor(FrameCompressor.LZ4Compressor.instance);
 -            }
 -            else
 -            {
 -                throw new ProtocolException(String.format("Unknown 
compression algorithm: %s", compression));
 -            }
 +            if (!connection.getVersion().supportsChecksums())
 +                throw new ProtocolException(String.format("Invalid message 
flag. Protocol version %s does not support frame body checksums", 
connection.getVersion().toString()));
 +            
connection.setTransformer(ChecksummingTransformer.getTransformer(checksumType, 
compressor));
 +        }
 +        else if (null != compressor)
 +        {
 +            
connection.setTransformer(CompressingTransformer.getTransformer(compressor));
          }
 -
 -        if (options.containsKey(NO_COMPACT) && 
Boolean.parseBoolean(options.get(NO_COMPACT)))
 -            state.getClientState().setNoCompactMode();
  
+         
connection.setThrowOnOverload("1".equals(options.get(THROW_ON_OVERLOAD)));
+ 
 +        ClientState clientState = state.getClientState();
 +        String driverName = options.get(DRIVER_NAME);
 +        if (null != driverName)
 +        {
 +            clientState.setDriverName(driverName);
 +            clientState.setDriverVersion(options.get(DRIVER_VERSION));
 +        }
 +
          if (DatabaseDescriptor.getAuthenticator().requireAuthentication())
              return new 
AuthenticateMessage(DatabaseDescriptor.getAuthenticator().getClass().getName());
          else
diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java
index 1eb56bc,d438630..9c4f22e
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@@ -47,14 -43,13 +47,15 @@@ import com.datastax.driver.core.ResultS
  
  import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.concurrent.ScheduledExecutors;
 -import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.index.SecondaryIndexManager;
 +import org.apache.cassandra.config.EncryptionOptions;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.locator.TokenMetadata;
+ import org.apache.cassandra.metrics.ClientMetrics;
 +import org.apache.cassandra.schema.*;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.Schema;
 -import org.apache.cassandra.config.SchemaConstants;
  import org.apache.cassandra.cql3.functions.FunctionName;
 -import org.apache.cassandra.cql3.statements.ParsedStatement;
  import org.apache.cassandra.db.*;
  import org.apache.cassandra.db.commitlog.CommitLog;
  import org.apache.cassandra.db.marshal.*;
@@@ -891,11 -761,6 +893,16 @@@ public abstract class CQLTeste
          return sessions.get(protocolVersion);
      }
  
++    protected SimpleClient newSimpleClient(ProtocolVersion version, boolean 
compression, boolean checksums, boolean isOverloadedException) throws 
IOException
++    {
++        return new SimpleClient(nativeAddr.getHostAddress(), nativePort, 
version, version.isBeta(), new EncryptionOptions()).connect(compression, 
checksums, isOverloadedException);
++    }
++
 +    protected SimpleClient newSimpleClient(ProtocolVersion version, boolean 
compression, boolean checksums) throws IOException
 +    {
-         return new SimpleClient(nativeAddr.getHostAddress(), nativePort, 
version, version.isBeta(), new EncryptionOptions()).connect(compression, 
checksums);
++        return newSimpleClient(version, compression, checksums, false);
 +    }
 +
      protected String formatQuery(String query)
      {
          return formatQuery(KEYSPACE, query);
diff --cc test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
index 80b3596,25fac21..86b73ab
--- a/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/NativeTransportServiceTest.java
@@@ -84,12 -84,11 +84,11 @@@ public class NativeTransportServiceTes
      public void testDestroy()
      {
          withService((NativeTransportService service) -> {
 -            Supplier<Boolean> allTerminated = () ->
 -                                              
service.getWorkerGroup().isShutdown() && 
service.getWorkerGroup().isTerminated();
 -            assertFalse(allTerminated.get());
 +            BooleanSupplier allTerminated = () ->
-                                               
service.getWorkerGroup().isShutdown() && 
service.getWorkerGroup().isTerminated() &&
-                                               
service.getEventExecutor().isShutdown() && 
service.getEventExecutor().isTerminated();
++                                            
service.getWorkerGroup().isShutdown() && 
service.getWorkerGroup().isTerminated();
 +            assertFalse(allTerminated.getAsBoolean());
              service.destroy();
 -            assertTrue(allTerminated.get());
 +            assertTrue(allTerminated.getAsBoolean());
          });
      }
  
diff --cc 
test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
index 0000000,21dfed8..c9a9a02
mode 000000,100644..100644
--- 
a/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
+++ 
b/test/unit/org/apache/cassandra/transport/InflightRequestPayloadTrackerTest.java
@@@ -1,0 -1,248 +1,258 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.transport;
+ 
+ import org.junit.After;
+ import org.junit.AfterClass;
+ import org.junit.Assert;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ import org.junit.runner.RunWith;
+ 
+ import org.apache.cassandra.OrderedJUnit4ClassRunner;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.config.EncryptionOptions;
+ import org.apache.cassandra.cql3.CQLTester;
+ import org.apache.cassandra.cql3.QueryOptions;
+ import org.apache.cassandra.cql3.QueryProcessor;
+ import org.apache.cassandra.exceptions.OverloadedException;
+ import org.apache.cassandra.transport.messages.QueryMessage;
+ 
+ @RunWith(OrderedJUnit4ClassRunner.class)
+ public class InflightRequestPayloadTrackerTest extends CQLTester
+ {
+     @BeforeClass
+     public static void setUp()
+     {
+         
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(600);
+         
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(600);
+         requireNetwork();
+     }
+ 
+     @AfterClass
+     public static void tearDown()
+     {
+         
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytesPerIp(3000000000L);
+         
DatabaseDescriptor.setNativeTransportMaxConcurrentRequestsInBytes(5000000000L);
+     }
+ 
+     @After
+     public void dropCreatedTable()
+     {
+         try
+         {
+             QueryProcessor.executeOnceInternal("DROP TABLE " + KEYSPACE + 
".atable");
+         }
+         catch (Throwable t)
+         {
+             // ignore
+         }
+     }
+ 
+     @Test
+     public void testQueryExecutionWithThrowOnOverload() throws Throwable
+     {
+         SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                                nativePort,
 -                                               ProtocolVersion.V4,
 -                                               new 
EncryptionOptions.ClientEncryptionOptions());
++                                               ProtocolVersion.V5,
++                                               true,
++                                               new EncryptionOptions());
+ 
+         try
+         {
 -            client.connect(false, true);
++            client.connect(false, false, true);
+             QueryOptions queryOptions = QueryOptions.create(
+             QueryOptions.DEFAULT.getConsistency(),
+             QueryOptions.DEFAULT.getValues(),
+             QueryOptions.DEFAULT.skipMetadata(),
+             QueryOptions.DEFAULT.getPageSize(),
+             QueryOptions.DEFAULT.getPagingState(),
+             QueryOptions.DEFAULT.getSerialConsistency(),
 -            ProtocolVersion.V4);
++            ProtocolVersion.V5,
++            KEYSPACE);
+ 
 -            QueryMessage queryMessage = new 
QueryMessage(String.format("CREATE TABLE %s.atable (pk1 int PRIMARY KEY, v 
text)", KEYSPACE),
++            QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable 
(pk1 int PRIMARY KEY, v text)",
+                                                          queryOptions);
+             client.execute(queryMessage);
+         }
+         finally
+         {
+             client.close();
+         }
+     }
+ 
+     @Test
+     public void testQueryExecutionWithoutThrowOnOverload() throws Throwable
+     {
+         SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                                nativePort,
 -                                               ProtocolVersion.V4,
 -                                               new 
EncryptionOptions.ClientEncryptionOptions());
++                                               ProtocolVersion.V5,
++                                               true,
++                                               new EncryptionOptions());
+ 
+         try
+         {
 -            client.connect(false, false);
++            client.connect(false, false, false);
+             QueryOptions queryOptions = QueryOptions.create(
+             QueryOptions.DEFAULT.getConsistency(),
+             QueryOptions.DEFAULT.getValues(),
+             QueryOptions.DEFAULT.skipMetadata(),
+             QueryOptions.DEFAULT.getPageSize(),
+             QueryOptions.DEFAULT.getPagingState(),
+             QueryOptions.DEFAULT.getSerialConsistency(),
 -            ProtocolVersion.V4);
++            ProtocolVersion.V5,
++            KEYSPACE);
+ 
 -            QueryMessage queryMessage = new 
QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v 
text)", KEYSPACE),
++            QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable 
(pk int PRIMARY KEY, v text)",
+                                                          queryOptions);
+             client.execute(queryMessage);
 -            queryMessage = new QueryMessage(String.format("SELECT * FROM 
%s.atable", KEYSPACE),
++            queryMessage = new QueryMessage("SELECT * FROM atable",
+                                             queryOptions);
+             client.execute(queryMessage);
+         }
+         finally
+         {
+             client.close();
+         }
+     }
+ 
+     @Test
+     public void 
testQueryExecutionWithoutThrowOnOverloadAndInflightLimitedExceeded() throws 
Throwable
+     {
+         SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                                nativePort,
 -                                               ProtocolVersion.V4,
 -                                               new 
EncryptionOptions.ClientEncryptionOptions());
++                                               ProtocolVersion.V5,
++                                               true,
++                                               new EncryptionOptions());
+ 
+         try
+         {
 -            client.connect(false, false);
++            client.connect(false, false, false);
+             QueryOptions queryOptions = QueryOptions.create(
+             QueryOptions.DEFAULT.getConsistency(),
+             QueryOptions.DEFAULT.getValues(),
+             QueryOptions.DEFAULT.skipMetadata(),
+             QueryOptions.DEFAULT.getPageSize(),
+             QueryOptions.DEFAULT.getPagingState(),
+             QueryOptions.DEFAULT.getSerialConsistency(),
 -            ProtocolVersion.V4);
++            ProtocolVersion.V5,
++            KEYSPACE);
+ 
 -            QueryMessage queryMessage = new 
QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v 
text)", KEYSPACE),
++            QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable 
(pk int PRIMARY KEY, v text)",
+                                                          queryOptions);
+             client.execute(queryMessage);
+ 
 -            queryMessage = new QueryMessage(String.format("INSERT INTO 
%s.atable (pk, v) VALUES (1, 
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
 [...]
++            queryMessage = new QueryMessage("INSERT INTO atable (pk, v) 
VALUES (1, 
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
 [...]
+                                             queryOptions);
+             client.execute(queryMessage);
+         }
+         finally
+         {
+             client.close();
+         }
+     }
+ 
+     @Test
+     public void testOverloadedExceptionForEndpointInflightLimit() throws 
Throwable
+     {
+         SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                                nativePort,
 -                                               ProtocolVersion.V4,
 -                                               new 
EncryptionOptions.ClientEncryptionOptions());
++                                               ProtocolVersion.V5,
++                                               true,
++                                               new EncryptionOptions());
+ 
+         try
+         {
 -            client.connect(false, true);
++            client.connect(false, false, true);
+             QueryOptions queryOptions = QueryOptions.create(
+             QueryOptions.DEFAULT.getConsistency(),
+             QueryOptions.DEFAULT.getValues(),
+             QueryOptions.DEFAULT.skipMetadata(),
+             QueryOptions.DEFAULT.getPageSize(),
+             QueryOptions.DEFAULT.getPagingState(),
+             QueryOptions.DEFAULT.getSerialConsistency(),
 -            ProtocolVersion.V4);
++            ProtocolVersion.V5,
++            KEYSPACE);
+ 
 -            QueryMessage queryMessage = new 
QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v 
text)", KEYSPACE),
++            QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable 
(pk int PRIMARY KEY, v text)",
+                                                          queryOptions);
+             client.execute(queryMessage);
+ 
 -            queryMessage = new QueryMessage(String.format("INSERT INTO 
%s.atable (pk, v) VALUES (1, 
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
 [...]
++            queryMessage = new QueryMessage("INSERT INTO atable (pk, v) 
VALUES (1, 
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
 [...]
+                                             queryOptions);
+             try
+             {
+                 client.execute(queryMessage);
+                 Assert.fail();
+             }
+             catch (RuntimeException e)
+             {
+                 Assert.assertTrue(e.getCause() instanceof 
OverloadedException);
+             }
+         }
+         finally
+         {
+             client.close();
+         }
+     }
+ 
+     @Test
+     public void testOverloadedExceptionForOverallInflightLimit() throws 
Throwable
+     {
+         SimpleClient client = new SimpleClient(nativeAddr.getHostAddress(),
+                                                nativePort,
 -                                               ProtocolVersion.V4,
 -                                               new 
EncryptionOptions.ClientEncryptionOptions());
++                                               ProtocolVersion.V5,
++                                               true,
++                                               new EncryptionOptions());
+ 
+         try
+         {
 -            client.connect(false, true);
++            client.connect(false, false, true);
+             QueryOptions queryOptions = QueryOptions.create(
+             QueryOptions.DEFAULT.getConsistency(),
+             QueryOptions.DEFAULT.getValues(),
+             QueryOptions.DEFAULT.skipMetadata(),
+             QueryOptions.DEFAULT.getPageSize(),
+             QueryOptions.DEFAULT.getPagingState(),
+             QueryOptions.DEFAULT.getSerialConsistency(),
 -            ProtocolVersion.V4);
++            ProtocolVersion.V5,
++            KEYSPACE);
+ 
 -            QueryMessage queryMessage = new 
QueryMessage(String.format("CREATE TABLE %s.atable (pk int PRIMARY KEY, v 
text)", KEYSPACE),
++            QueryMessage queryMessage = new QueryMessage("CREATE TABLE atable 
(pk int PRIMARY KEY, v text)",
+                                                          queryOptions);
+             client.execute(queryMessage);
+ 
 -            queryMessage = new QueryMessage(String.format("INSERT INTO 
%s.atable (pk, v) VALUES (1, 
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
 [...]
++            queryMessage = new QueryMessage("INSERT INTO atable (pk, v) 
VALUES (1, 
'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
 [...]
+                                             queryOptions);
+             try
+             {
+                 client.execute(queryMessage);
+                 Assert.fail();
+             }
+             catch (RuntimeException e)
+             {
+                 Assert.assertTrue(e.getCause() instanceof 
OverloadedException);
+             }
+         }
+         finally
+         {
+             client.close();
+         }
+     }
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to