This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 304096566868d0f51d461be7c14c8bcc1ebfb277
Merge: a63df4d 0388d89
Author: Sam Tunnicliffe <s...@beobal.com>
AuthorDate: Tue Oct 8 18:38:42 2019 +0100

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   3 +-
 NEWS.txt                                           |   5 +
 bin/cqlsh.py                                       |   2 +-
 src/java/org/apache/cassandra/config/Config.java   |   2 +-
 .../cassandra/config/DatabaseDescriptor.java       |  22 +++
 .../org/apache/cassandra/db/SystemKeyspace.java    |  43 ++++++
 .../apache/cassandra/service/CassandraDaemon.java  |  10 ++
 .../cassandra/service/NativeTransportService.java  |  19 +++
 .../apache/cassandra/service/StorageService.java   |  21 ++-
 .../cassandra/service/StorageServiceMBean.java     |   3 +
 .../org/apache/cassandra/transport/Client.java     |   2 +-
 .../cassandra/transport/ConfiguredLimit.java       | 112 ++++++++++++++
 src/java/org/apache/cassandra/transport/Frame.java |   6 +-
 .../org/apache/cassandra/transport/Message.java    |  10 +-
 .../cassandra/transport/ProtocolVersion.java       |   6 +-
 .../cassandra/transport/ProtocolVersionLimit.java  |  27 ++++
 .../org/apache/cassandra/transport/Server.java     |  22 ++-
 .../apache/cassandra/transport/SimpleClient.java   |   4 +-
 test/unit/org/apache/cassandra/cql3/CQLTester.java |  52 ++++++-
 .../cassandra/transport/DynamicLimitTest.java      | 111 ++++++++++++++
 .../cassandra/transport/ProtocolErrorTest.java     |   8 +-
 .../transport/ProtocolNegotiationTest.java         | 166 +++++++++++++++++++++
 .../cassandra/transport/ProtocolTestHelper.java    |  95 ++++++++++++
 .../cassandra/transport/ProtocolVersionTest.java   |   6 +-
 24 files changed, 730 insertions(+), 27 deletions(-)

diff --cc CHANGES.txt
index 3544594,925a90a..b4b1245
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,12 -1,8 +1,13 @@@
 -3.0.19
 +3.11.5
 + * Make sure user defined compaction transactions are always closed 
(CASSANDRA-15123)
 + * Fix cassandra-env.sh to use $CASSANDRA_CONF to find cassandra-jaas.config 
(CASSANDRA-14305)
 + * Fixed nodetool cfstats printing index name twice (CASSANDRA-14903)
 + * Add flag to disable SASI indexes, and warnings on creation 
(CASSANDRA-14866)
- Merged from 3.0:
++Merged from 3.0:a
+  * Add ability to cap max negotiable protocol version (CASSANDRA-15193)
   * Gossip tokens on startup if available (CASSANDRA-15335)
   * Fix resource leak in CompressedSequentialWriter (CASSANDRA-15340)
 - * Fix merge which reverted CASSANDRA-14993 (CASSANDRA-15289)
 + * Fix bad merge that reverted CASSANDRA-14993 (CASSANDRA-15289)
   * Fix LegacyLayout RangeTombstoneList IndexOutOfBoundsException when 
upgrading and RangeTombstone bounds are asymmetric (CASSANDRA-15172)
   * Fix NPE when using allocate_tokens_for_keyspace on new DC/rack 
(CASSANDRA-14952)
   * Filter sstables earlier when running cleanup (CASSANDRA-15100)
diff --cc NEWS.txt
index 2feac81,c03284b..2aa6deb
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -49,16 -49,13 +49,21 @@@ Upgradin
  ---------
        - repair_session_max_tree_depth setting has been added to 
cassandra.yaml to allow operators to reduce
          merkle tree size if repair is creating too much heap pressure. See 
CASSANDRA-14096 for details.
+     - native_transport_max_negotiable_protocol_version has been added to 
cassandra.yaml to allow operators to
 -      enforce an upper limit on the version of the native protocol that 
servers will negotiate with clients.
++      enforce an upper limit on the version of the native protocol that 
servers will negotiate with clients. 
+       This can be used during upgrades from 2.1 to 3.0 to prevent errors due 
to incompatible paging state formats
+       between the two versions. See CASSANDRA-15193 for details.
+ 
  
 -3.0.18
 +Experimental features
 +---------------------
 +    - An 'enable_sasi_indexes' flag, true by default, has been added to 
cassandra.yaml to allow operators to prevent
 +      the creation of new SASI indexes, which are considered experimental and 
are not recommended for production use.
 +      (See 
https://www.mail-archive.com/dev@cassandra.apache.org/msg13582.html)
 +    - The flags 'enable_sasi_indexes' and 'enable_materialized_views' have 
been grouped under an experimental features
 +      section in cassandra.yaml.
 +
 +3.11.4
  ======
  
  Upgrading
diff --cc src/java/org/apache/cassandra/config/Config.java
index 1d79e2a,bc3e3bf..7f28546
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -163,10 -156,10 +163,10 @@@ public class Confi
      public boolean native_transport_flush_in_batches_legacy = true;
      public volatile long 
native_transport_max_concurrent_requests_in_bytes_per_ip = -1L;
      public volatile long native_transport_max_concurrent_requests_in_bytes = 
-1L;
- 
+     public Integer native_transport_max_negotiable_protocol_version = 
Integer.MIN_VALUE;
  
      @Deprecated
 -    public Integer thrift_max_message_length_in_mb = 16;
 +    public int thrift_max_message_length_in_mb = 16;
      /**
       * Max size of values in SSTables, in MegaBytes.
       * Default is the same as the native protocol frame limit: 256Mb.
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 75296b6,a161a2a..1e2e1a1
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -46,26 -40,24 +46,28 @@@ import org.apache.cassandra.config.Conf
  import org.apache.cassandra.dht.IPartitioner;
  import org.apache.cassandra.exceptions.ConfigurationException;
  import org.apache.cassandra.io.FSWriteError;
 -import org.apache.cassandra.io.sstable.format.SSTableFormat;
 +import org.apache.cassandra.io.util.DiskOptimizationStrategy;
  import org.apache.cassandra.io.util.FileUtils;
 -import org.apache.cassandra.locator.*;
 -import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy;
 +import org.apache.cassandra.io.util.SsdDiskOptimizationStrategy;
 +import org.apache.cassandra.locator.DynamicEndpointSnitch;
 +import org.apache.cassandra.locator.EndpointSnitchInfo;
 +import org.apache.cassandra.locator.IEndpointSnitch;
 +import org.apache.cassandra.locator.SeedProvider;
 +import org.apache.cassandra.net.BackPressureStrategy;
 +import org.apache.cassandra.net.RateBasedBackPressure;
  import org.apache.cassandra.scheduler.IRequestScheduler;
  import org.apache.cassandra.scheduler.NoScheduler;
 -import org.apache.cassandra.service.CacheService;
 -import org.apache.cassandra.thrift.ThriftServer;
 -import org.apache.cassandra.transport.Server;
 +import org.apache.cassandra.security.EncryptionContext;
 +import org.apache.cassandra.service.CacheService.CacheType;
 +import org.apache.cassandra.thrift.ThriftServer.ThriftServerType;
++import org.apache.cassandra.transport.ProtocolVersion;
++import org.apache.cassandra.transport.ProtocolVersionLimit;
  import org.apache.cassandra.utils.FBUtilities;
 -import org.apache.cassandra.utils.memory.*;
 +
 +import org.apache.commons.lang3.StringUtils;
  
  import static org.apache.cassandra.io.util.FileUtils.ONE_GB;
 -import static org.apache.cassandra.io.util.FileUtils.ONE_MB;
  
  public class DatabaseDescriptor
  {
@@@ -700,212 -719,6 +702,227 @@@
              conf.server_encryption_options = conf.encryption_options;
          }
  
 +        if (conf.user_defined_function_fail_timeout < 0)
 +            throw new 
ConfigurationException("user_defined_function_fail_timeout must not be 
negative", false);
 +        if (conf.user_defined_function_warn_timeout < 0)
 +            throw new 
ConfigurationException("user_defined_function_warn_timeout must not be 
negative", false);
 +
 +        if (conf.user_defined_function_fail_timeout < 
conf.user_defined_function_warn_timeout)
 +            throw new 
ConfigurationException("user_defined_function_warn_timeout must less than 
user_defined_function_fail_timeout", false);
 +
 +        if (conf.commitlog_segment_size_in_mb <= 0)
 +            throw new ConfigurationException("commitlog_segment_size_in_mb 
must be positive, but was "
 +                    + conf.commitlog_segment_size_in_mb, false);
 +        else if (conf.commitlog_segment_size_in_mb >= 2048)
 +            throw new ConfigurationException("commitlog_segment_size_in_mb 
must be smaller than 2048, but was "
 +                    + conf.commitlog_segment_size_in_mb, false);
 +
 +        if (conf.max_mutation_size_in_kb == null)
 +            conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb 
* 1024 / 2;
 +        else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * 
conf.max_mutation_size_in_kb)
 +            throw new ConfigurationException("commitlog_segment_size_in_mb 
must be at least twice the size of max_mutation_size_in_kb / 1024", false);
 +
 +        // native transport encryption options
 +        if (conf.native_transport_port_ssl != null
 +            && conf.native_transport_port_ssl != conf.native_transport_port
 +            && !conf.client_encryption_options.enabled)
 +        {
 +            throw new ConfigurationException("Encryption must be enabled in 
client_encryption_options for native_transport_port_ssl", false);
 +        }
 +
++        // If max protocol version has been set, just validate it's within an 
acceptable range
++        if (conf.native_transport_max_negotiable_protocol_version != 
Integer.MIN_VALUE)
++        {
++            try
++            {
++                
ProtocolVersion.decode(conf.native_transport_max_negotiable_protocol_version, 
ProtocolVersionLimit.SERVER_DEFAULT);
++                logger.info("Native transport max negotiable version 
statically limited to {}", 
conf.native_transport_max_negotiable_protocol_version);
++            }
++            catch (Exception e)
++            {
++                throw new ConfigurationException("Invalid setting for 
native_transport_max_negotiable_protocol_version; " +
++                                                 
ProtocolVersion.invalidVersionMessage(conf.native_transport_max_negotiable_protocol_version));
++            }
++        }
++
 +        if (conf.max_value_size_in_mb <= 0)
 +            throw new ConfigurationException("max_value_size_in_mb must be 
positive", false);
 +        else if (conf.max_value_size_in_mb >= 2048)
 +            throw new ConfigurationException("max_value_size_in_mb must be 
smaller than 2048, but was "
 +                    + conf.max_value_size_in_mb, false);
 +
 +        switch (conf.disk_optimization_strategy)
 +        {
 +            case ssd:
 +                diskOptimizationStrategy = new 
SsdDiskOptimizationStrategy(conf.disk_optimization_page_cross_chance);
 +                break;
 +            case spinning:
 +                diskOptimizationStrategy = new 
SpinningDiskOptimizationStrategy();
 +                break;
 +        }
 +
 +        try
 +        {
 +            ParameterizedClass strategy = conf.back_pressure_strategy != null 
? conf.back_pressure_strategy : RateBasedBackPressure.withDefaultParams();
 +            Class<?> clazz = Class.forName(strategy.class_name);
 +            if (!BackPressureStrategy.class.isAssignableFrom(clazz))
 +                throw new ConfigurationException(strategy + " is not an 
instance of " + BackPressureStrategy.class.getCanonicalName(), false);
 +
 +            Constructor<?> ctor = clazz.getConstructor(Map.class);
 +            BackPressureStrategy instance = (BackPressureStrategy) 
ctor.newInstance(strategy.parameters);
 +            logger.info("Back-pressure is {} with strategy {}.", 
backPressureEnabled() ? "enabled" : "disabled", conf.back_pressure_strategy);
 +            backPressureStrategy = instance;
 +        }
 +        catch (ConfigurationException ex)
 +        {
 +            throw ex;
 +        }
 +        catch (Exception ex)
 +        {
 +            throw new ConfigurationException("Error configuring back-pressure 
strategy: " + conf.back_pressure_strategy, ex);
 +        }
 +
 +        if (conf.otc_coalescing_enough_coalesced_messages > 128)
 +            throw new 
ConfigurationException("otc_coalescing_enough_coalesced_messages must be 
smaller than 128", false);
 +
 +        if (conf.otc_coalescing_enough_coalesced_messages <= 0)
 +            throw new 
ConfigurationException("otc_coalescing_enough_coalesced_messages must be 
positive", false);
 +    }
 +
 +    private static String storagedirFor(String type)
 +    {
 +        return storagedir(type + "_directory") + File.separator + type;
 +    }
 +
 +    private static String storagedir(String errMsgType)
 +    {
 +        String storagedir = System.getProperty(Config.PROPERTY_PREFIX + 
"storagedir", null);
 +        if (storagedir == null)
 +            throw new ConfigurationException(errMsgType + " is missing and 
-Dcassandra.storagedir is not set", false);
 +        return storagedir;
 +    }
 +
 +    public static void applyAddressConfig() throws ConfigurationException
 +    {
 +        applyAddressConfig(conf);
 +    }
 +
 +    public static void applyAddressConfig(Config config) throws 
ConfigurationException
 +    {
 +        listenAddress = null;
 +        rpcAddress = null;
 +        broadcastAddress = null;
 +        broadcastRpcAddress = null;
 +
 +        /* Local IP, hostname or interface to bind services to */
 +        if (config.listen_address != null && config.listen_interface != null)
 +        {
 +            throw new ConfigurationException("Set listen_address OR 
listen_interface, not both", false);
 +        }
 +        else if (config.listen_address != null)
 +        {
 +            try
 +            {
 +                listenAddress = InetAddress.getByName(config.listen_address);
 +            }
 +            catch (UnknownHostException e)
 +            {
 +                throw new ConfigurationException("Unknown listen_address '" + 
config.listen_address + "'", false);
 +            }
 +
 +            if (listenAddress.isAnyLocalAddress())
 +                throw new ConfigurationException("listen_address cannot be a 
wildcard address (" + config.listen_address + ")!", false);
 +        }
 +        else if (config.listen_interface != null)
 +        {
 +            listenAddress = 
getNetworkInterfaceAddress(config.listen_interface, "listen_interface", 
config.listen_interface_prefer_ipv6);
 +        }
 +
 +        /* Gossip Address to broadcast */
 +        if (config.broadcast_address != null)
 +        {
 +            try
 +            {
 +                broadcastAddress = 
InetAddress.getByName(config.broadcast_address);
 +            }
 +            catch (UnknownHostException e)
 +            {
 +                throw new ConfigurationException("Unknown broadcast_address 
'" + config.broadcast_address + "'", false);
 +            }
 +
 +            if (broadcastAddress.isAnyLocalAddress())
 +                throw new ConfigurationException("broadcast_address cannot be 
a wildcard address (" + config.broadcast_address + ")!", false);
 +        }
 +
 +        /* Local IP, hostname or interface to bind RPC server to */
 +        if (config.rpc_address != null && config.rpc_interface != null)
 +        {
 +            throw new ConfigurationException("Set rpc_address OR 
rpc_interface, not both", false);
 +        }
 +        else if (config.rpc_address != null)
 +        {
 +            try
 +            {
 +                rpcAddress = InetAddress.getByName(config.rpc_address);
 +            }
 +            catch (UnknownHostException e)
 +            {
 +                throw new ConfigurationException("Unknown host in rpc_address 
" + config.rpc_address, false);
 +            }
 +        }
 +        else if (config.rpc_interface != null)
 +        {
 +            rpcAddress = getNetworkInterfaceAddress(config.rpc_interface, 
"rpc_interface", config.rpc_interface_prefer_ipv6);
 +        }
 +        else
 +        {
 +            rpcAddress = FBUtilities.getLocalAddress();
 +        }
 +
 +        /* RPC address to broadcast */
 +        if (config.broadcast_rpc_address != null)
 +        {
 +            try
 +            {
 +                broadcastRpcAddress = 
InetAddress.getByName(config.broadcast_rpc_address);
 +            }
 +            catch (UnknownHostException e)
 +            {
 +                throw new ConfigurationException("Unknown 
broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false);
 +            }
 +
 +            if (broadcastRpcAddress.isAnyLocalAddress())
 +                throw new ConfigurationException("broadcast_rpc_address 
cannot be a wildcard address (" + config.broadcast_rpc_address + ")!", false);
 +        }
 +        else
 +        {
 +            if (rpcAddress.isAnyLocalAddress())
 +                throw new ConfigurationException("If rpc_address is set to a 
wildcard address (" + config.rpc_address + "), then " +
 +                                                 "you must set 
broadcast_rpc_address to a value other than " + config.rpc_address, false);
 +        }
 +    }
 +
 +    public static void applyThriftHSHA()
 +    {
 +        // fail early instead of OOMing (see CASSANDRA-8116)
 +        if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && 
conf.rpc_max_threads == Integer.MAX_VALUE)
 +            throw new ConfigurationException("The hsha rpc_server_type is not 
compatible with an rpc_max_threads " +
 +                                             "setting of 'unlimited'.  Please 
see the comments in cassandra.yaml " +
 +                                             "for rpc_server_type and 
rpc_max_threads.",
 +                                             false);
 +        if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && 
conf.rpc_max_threads > (FBUtilities.getAvailableProcessors() * 2 + 1024))
 +            logger.warn("rpc_max_threads setting of {} may be too high for 
the hsha server and cause unnecessary thread contention, reducing performance", 
conf.rpc_max_threads);
 +    }
 +
 +    public static void applyEncryptionContext()
 +    {
 +        // always attempt to load the cipher factory, as we could be in the 
situation where the user has disabled encryption,
 +        // but has existing commitlogs and sstables on disk that are still 
encrypted (and still need to be read)
 +        encryptionContext = new 
EncryptionContext(conf.transparent_data_encryption_options);
 +    }
 +
 +    public static void applySeedProvider()
 +    {
          // load the seeds for node contact points
          if (conf.seed_provider == null)
          {
diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java
index 812659c,7c222dd..96e0416
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@@ -757,6 -716,18 +757,18 @@@ public final class SystemKeyspac
          return executorService.submit((Runnable) () -> 
executeInternal(String.format(req, PEERS, columnName), ep, value));
      }
  
+     public static void updatePeerReleaseVersion(final InetAddress ep, final 
Object value, Runnable postUpdateTask, ExecutorService executorService)
+     {
+         if (ep.equals(FBUtilities.getBroadcastAddress()))
+             return;
+ 
 -        String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)";
++        String req = "INSERT INTO system.%s (peer, release_version) VALUES 
(?, ?)";
+         executorService.execute(() -> {
 -            executeInternal(String.format(req, PEERS, "release_version"), ep, 
value);
++            executeInternal(String.format(req, PEERS), ep, value);
+             postUpdateTask.run();
+         });
+     }
+ 
      public static synchronized void updateHintsDropped(InetAddress ep, UUID 
timePeriod, int value)
      {
          // with 30 day TTL
diff --cc src/java/org/apache/cassandra/service/NativeTransportService.java
index 6343f0d,587f781..c58bb5e
--- a/src/java/org/apache/cassandra/service/NativeTransportService.java
+++ b/src/java/org/apache/cassandra/service/NativeTransportService.java
@@@ -31,10 -31,9 +31,11 @@@ import io.netty.channel.EventLoopGroup
  import io.netty.channel.epoll.Epoll;
  import io.netty.channel.epoll.EpollEventLoopGroup;
  import io.netty.channel.nio.NioEventLoopGroup;
 +import io.netty.util.concurrent.EventExecutor;
  import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.metrics.AuthMetrics;
  import org.apache.cassandra.metrics.ClientMetrics;
+ import org.apache.cassandra.transport.ConfiguredLimit;
  import org.apache.cassandra.transport.Message;
  import org.apache.cassandra.transport.Server;
  
@@@ -141,6 -142,20 +146,20 @@@ public class NativeTransportServic
          Message.Dispatcher.shutdown();
      }
  
+     public int getMaxProtocolVersion()
+     {
 -        return protocolVersionLimit.getMaxVersion();
++        return protocolVersionLimit.getMaxVersion().asInt();
+     }
+ 
+     public void refreshMaxNegotiableProtocolVersion()
+     {
+         // lowering the max negotiable protocol version is only safe if we 
haven't already
+         // allowed clients to connect with a higher version. This still 
allows the max
+         // version to be raised, as that is safe.
+         if (initialized)
+             protocolVersionLimit.updateMaxSupportedVersion();
+     }
+ 
      /**
       * @return intend to use epoll bassed event looping
       */
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 346f9a4,8c29601..309be3d
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -441,9 -442,26 +441,26 @@@ public class StorageService extends Not
          return daemon.isNativeTransportRunning();
      }
  
+     public int getMaxNativeProtocolVersion()
+     {
+         if (daemon == null)
+         {
+             throw new IllegalStateException("No configured daemon");
+         }
+         return daemon.getMaxNativeProtocolVersion();
+     }
+ 
+     private void refreshMaxNativeProtocolVersion()
+     {
+         if (daemon != null)
+         {
+             daemon.refreshMaxNativeProtocolVersion();
+         }
+     }
+ 
      public void stopTransports()
      {
 -        if (isInitialized())
 +        if (isGossipActive())
          {
              logger.error("Stopping gossiper");
              stopGossiping();
diff --cc src/java/org/apache/cassandra/transport/Client.java
index e428b06,92466d2..368b1d7
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@@ -251,7 -250,7 +251,7 @@@ public class Client extends SimpleClien
          // Parse options.
          String host = args[0];
          int port = Integer.parseInt(args[1]);
-         ProtocolVersion version = args.length == 3 ? 
ProtocolVersion.decode(Integer.parseInt(args[2])) : ProtocolVersion.CURRENT;
 -        int version = args.length == 3 ? Integer.parseInt(args[2]) : 
Server.CURRENT_VERSION;
++        ProtocolVersion version = args.length == 3 ? 
ProtocolVersion.decode(Integer.parseInt(args[2]), 
ProtocolVersionLimit.SERVER_DEFAULT) : ProtocolVersion.CURRENT;
  
          ClientEncryptionOptions encryptionOptions = new 
ClientEncryptionOptions();
          System.out.println("CQL binary protocol console " + host + "@" + port 
+ " using native protocol version " + version);
diff --cc src/java/org/apache/cassandra/transport/ConfiguredLimit.java
index 0000000,98518b8..16d3867
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/transport/ConfiguredLimit.java
+++ b/src/java/org/apache/cassandra/transport/ConfiguredLimit.java
@@@ -1,0 -1,117 +1,112 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.transport;
+ 
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.SystemKeyspace;
+ import org.apache.cassandra.utils.CassandraVersion;
+ 
+ public abstract class ConfiguredLimit implements ProtocolVersionLimit
+ {
+     private static final Logger logger = 
LoggerFactory.getLogger(ConfiguredLimit.class);
+     static final String DISABLE_MAX_PROTOCOL_AUTO_OVERRIDE = 
"cassandra.disable_max_protocol_auto_override";
+     static final CassandraVersion MIN_VERSION_FOR_V4 = new 
CassandraVersion("3.0.0");
+ 
 -    public abstract int getMaxVersion();
++    public abstract ProtocolVersion getMaxVersion();
+     public abstract void updateMaxSupportedVersion();
+ 
+     public static ConfiguredLimit newLimit()
+     {
+         if (Boolean.getBoolean(DISABLE_MAX_PROTOCOL_AUTO_OVERRIDE))
 -            return new StaticLimit(Server.CURRENT_VERSION);
++            return new StaticLimit(ProtocolVersion.MAX_SUPPORTED_VERSION);
+ 
+         int fromConfig = 
DatabaseDescriptor.getNativeProtocolMaxVersionOverride();
+         return fromConfig != Integer.MIN_VALUE
 -               ? new StaticLimit(fromConfig)
 -               : new DynamicLimit(Server.CURRENT_VERSION);
++               ? new StaticLimit(ProtocolVersion.decode(fromConfig, 
ProtocolVersionLimit.SERVER_DEFAULT))
++               : new DynamicLimit(ProtocolVersion.MAX_SUPPORTED_VERSION);
+     }
+ 
+     private static class StaticLimit extends ConfiguredLimit
+     {
 -        private final int maxVersion;
 -        private StaticLimit(int maxVersion)
++        private final ProtocolVersion maxVersion;
++        private StaticLimit(ProtocolVersion maxVersion)
+         {
 -            if (maxVersion < Server.MIN_SUPPORTED_VERSION || maxVersion > 
Server.CURRENT_VERSION)
 -                throw new IllegalArgumentException(String.format("Invalid max 
protocol version supplied (%s); " +
 -                                                                 "Values 
between %s and %s are supported",
 -                                                                 maxVersion,
 -                                                                 
Server.MIN_SUPPORTED_VERSION,
 -                                                                 
Server.CURRENT_VERSION));
+             this.maxVersion = maxVersion;
+             logger.info("Native transport max negotiable version statically 
limited to {}", maxVersion);
+         }
+ 
 -        public int getMaxVersion()
++        public ProtocolVersion getMaxVersion()
+         {
+             return maxVersion;
+         }
+ 
+         public void updateMaxSupportedVersion()
+         {
+             // statically configured, so this is a no-op
+         }
+     }
+ 
+     private static class DynamicLimit extends ConfiguredLimit
+     {
 -        private volatile int maxVersion;
 -        private DynamicLimit(int initialLimit)
++        private volatile ProtocolVersion maxVersion;
++        private DynamicLimit(ProtocolVersion initialLimit)
+         {
+             maxVersion = initialLimit;
+             maybeUpdateVersion(true);
+         }
+ 
 -        public int getMaxVersion()
++        public ProtocolVersion getMaxVersion()
+         {
+             return maxVersion;
+         }
+ 
+         public void updateMaxSupportedVersion()
+         {
+             maybeUpdateVersion(false);
+         }
+ 
+         private void maybeUpdateVersion(boolean allowLowering)
+         {
+             boolean enforceV3Cap = SystemKeyspace.loadPeerVersions()
+                                                  .values()
+                                                  .stream()
+                                                  .anyMatch(v -> 
v.compareTo(MIN_VERSION_FOR_V4) < 0);
+ 
+             if (!enforceV3Cap)
+             {
 -                maxVersion = Server.CURRENT_VERSION;
++                maxVersion = ProtocolVersion.MAX_SUPPORTED_VERSION;
+                 return;
+             }
+ 
 -            if (maxVersion > Server.VERSION_3 && !allowLowering)
++            if (ProtocolVersion.V3.isSmallerThan(maxVersion) && 
!allowLowering)
+             {
+                 logger.info("Detected peers which do not fully support 
protocol V4, but V4 was previously negotiable. " +
+                             "Not enforcing cap as this can cause issues for 
older client versions. After the next " +
+                             "restart the server will apply the cap");
+                 return;
+             }
++
+             logger.info("Detected peers which do not fully support protocol 
V4. Capping max negotiable version to V3");
 -            maxVersion = Server.VERSION_3;
++            maxVersion = ProtocolVersion.V3;
+         }
+     }
+ }
diff --cc src/java/org/apache/cassandra/transport/Frame.java
index 388cbc2,a07551f..e603e6c
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@@ -171,8 -176,11 +173,8 @@@ public class Fram
              // 1 and 2 use a shorter header, so we may never have a complete 
header's worth of bytes.
              int firstByte = buffer.getByte(idx++);
              Message.Direction direction = 
Message.Direction.extractFromVersion(firstByte);
 -            int version = firstByte & PROTOCOL_VERSION_MASK;
 -            if (version < Server.MIN_SUPPORTED_VERSION || version > 
versionCap.getMaxVersion())
 -                throw new ProtocolException(String.format("Invalid or 
unsupported protocol version (%d); the lowest supported version is %d and the 
greatest is %d",
 -                                                          version, 
Server.MIN_SUPPORTED_VERSION, versionCap.getMaxVersion()),
 -                                            version < 
Server.MIN_SUPPORTED_VERSION ? version : null);
 +            int versionNum = firstByte & PROTOCOL_VERSION_MASK;
-             ProtocolVersion version = ProtocolVersion.decode(versionNum);
++            ProtocolVersion version = ProtocolVersion.decode(versionNum, 
versionCap);
  
              // Wait until we have the complete header
              if (readableBytes < Header.LENGTH)
diff --cc src/java/org/apache/cassandra/transport/Message.java
index 271b690,5202578..f899022
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@@ -326,7 -333,8 +333,8 @@@ public abstract class Messag
          {
              Connection connection = 
ctx.channel().attr(Connection.attributeKey).get();
              // The only case the connection can be null is when we send the 
initial STARTUP message (client side thus)
-             ProtocolVersion version = connection == null ? 
ProtocolVersion.CURRENT : connection.getVersion();
 -            int version = connection == null ? versionCap.getMaxVersion() : 
connection.getVersion();
++            ProtocolVersion version = connection == null ? 
versionCap.getMaxVersion() : connection.getVersion();
+ 
              EnumSet<Frame.Header.Flag> flags = 
EnumSet.noneOf(Frame.Header.Flag.class);
  
              Codec<Message> codec = (Codec<Message>)message.type.codec;
diff --cc src/java/org/apache/cassandra/transport/ProtocolVersion.java
index cd73c86,0000000..05fafc2
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/transport/ProtocolVersion.java
+++ b/src/java/org/apache/cassandra/transport/ProtocolVersion.java
@@@ -1,153 -1,0 +1,153 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.transport;
 +
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.EnumSet;
 +import java.util.List;
 +import java.util.Optional;
 +
 +import org.apache.commons.lang3.ArrayUtils;
 +
 +/**
 + * The native (CQL binary) protocol version.
 + *
 + * Some versions may be in beta, which means that the client must
 + * specify the beta flag in the frame for the version to be considered valid.
 + * Beta versions must have the word "beta" in their description, this is 
mandated
 + * by the specs.
 + *
 + */
 +public enum ProtocolVersion implements Comparable<ProtocolVersion>
 +{
 +    // The order is important as it defines the chronological history of 
versions, which is used
 +    // to determine if a feature is supported or some serdes formats
 +    V1(1, "v1", false), // no longer supported
 +    V2(2, "v2", false), // no longer supported
 +    V3(3, "v3", false),
 +    V4(4, "v4", false),
 +    V5(5, "v5-beta", true);
 +
 +    /** The version number */
 +    private final int num;
 +
 +    /** A description of the version, beta versions should have the word 
"-beta" */
 +    private final String descr;
 +
 +    /** Set this to true for beta versions */
 +    private final boolean beta;
 +
 +    ProtocolVersion(int num, String descr, boolean beta)
 +    {
 +        this.num = num;
 +        this.descr = descr;
 +        this.beta = beta;
 +    }
 +
 +    /** The supported versions stored as an array, these should be private 
and are required for fast decoding*/
 +    private final static ProtocolVersion[] SUPPORTED_VERSIONS = new 
ProtocolVersion[] { V3, V4, V5 };
 +    final static ProtocolVersion MIN_SUPPORTED_VERSION = 
SUPPORTED_VERSIONS[0];
 +    final static ProtocolVersion MAX_SUPPORTED_VERSION = 
SUPPORTED_VERSIONS[SUPPORTED_VERSIONS.length - 1];
 +
 +    /** All supported versions, published as an enumset */
 +    public final static EnumSet<ProtocolVersion> SUPPORTED = 
EnumSet.copyOf(Arrays.asList((ProtocolVersion[]) 
ArrayUtils.addAll(SUPPORTED_VERSIONS)));
 +
 +    /** Old unsupported versions, this is OK as long as we never add newer 
unsupported versions */
 +    public final static EnumSet<ProtocolVersion> UNSUPPORTED = 
EnumSet.complementOf(SUPPORTED);
 +
 +    /** The preferred versions */
 +    public final static ProtocolVersion CURRENT = V4;
 +    public final static Optional<ProtocolVersion> BETA = Optional.of(V5);
 +
 +    public static List<String> supportedVersions()
 +    {
 +        List<String> ret = new ArrayList<>(SUPPORTED.size());
 +        for (ProtocolVersion version : SUPPORTED)
 +            ret.add(version.toString());
 +        return ret;
 +    }
 +
-     public static ProtocolVersion decode(int versionNum)
++    public static ProtocolVersion decode(int versionNum, ProtocolVersionLimit 
ceiling)
 +    {
-         ProtocolVersion ret = versionNum >= MIN_SUPPORTED_VERSION.num && 
versionNum <= MAX_SUPPORTED_VERSION.num
++        ProtocolVersion ret = versionNum >= MIN_SUPPORTED_VERSION.num && 
versionNum <= ceiling.getMaxVersion().num
 +                              ? SUPPORTED_VERSIONS[versionNum - 
MIN_SUPPORTED_VERSION.num]
 +                              : null;
 +
 +        if (ret == null)
 +        {
 +            // if this is not a supported version check the old versions
 +            for (ProtocolVersion version : UNSUPPORTED)
 +            {
 +                // if it is an old version that is no longer supported this 
ensures that we reply
 +                // with that same version
 +                if (version.num == versionNum)
 +                    throw new 
ProtocolException(ProtocolVersion.invalidVersionMessage(versionNum), version);
 +            }
 +
 +            // If the version is invalid reply with the highest version that 
we support
-             throw new ProtocolException(invalidVersionMessage(versionNum), 
MAX_SUPPORTED_VERSION);
++            throw new ProtocolException(invalidVersionMessage(versionNum), 
ceiling.getMaxVersion());
 +        }
 +
 +        return ret;
 +    }
 +
 +    public boolean isBeta()
 +    {
 +        return beta;
 +    }
 +
 +    public static String invalidVersionMessage(int version)
 +    {
 +        return String.format("Invalid or unsupported protocol version (%d); 
supported versions are (%s)",
 +                             version, String.join(", ", 
ProtocolVersion.supportedVersions()));
 +    }
 +
 +    public int asInt()
 +    {
 +        return num;
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        // This format is mandated by the protocl specs for the SUPPORTED 
message, see OptionsMessage execute().
 +        return String.format("%d/%s", num, descr);
 +    }
 +
 +    public final boolean isGreaterThan(ProtocolVersion other)
 +    {
 +        return num > other.num;
 +    }
 +
 +    public final boolean isGreaterOrEqualTo(ProtocolVersion other)
 +    {
 +        return num >= other.num;
 +    }
 +
 +    public final boolean isSmallerThan(ProtocolVersion other)
 +    {
 +        return num < other.num;
 +    }
 +
 +    public final boolean isSmallerOrEqualTo(ProtocolVersion other)
 +    {
 +        return num <= other.num;
 +    }
 +}
diff --cc src/java/org/apache/cassandra/transport/ProtocolVersionLimit.java
index 0000000,c476efb..9738a19
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/transport/ProtocolVersionLimit.java
+++ b/src/java/org/apache/cassandra/transport/ProtocolVersionLimit.java
@@@ -1,0 -1,27 +1,27 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.transport;
+ 
+ @FunctionalInterface
+ public interface ProtocolVersionLimit
+ {
 -    public int getMaxVersion();
++    public ProtocolVersion getMaxVersion();
+ 
 -    public static final ProtocolVersionLimit SERVER_DEFAULT = () -> 
Server.CURRENT_VERSION;
++    public static final ProtocolVersionLimit SERVER_DEFAULT = () -> 
ProtocolVersion.MAX_SUPPORTED_VERSION;
+ }
diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java
index d438630,95366c2..e545e9f
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@@ -62,8 -62,9 +62,10 @@@ import org.apache.cassandra.serializers
  import org.apache.cassandra.service.ClientState;
  import org.apache.cassandra.service.QueryState;
  import org.apache.cassandra.service.StorageService;
- import org.apache.cassandra.transport.*;
 +import org.apache.cassandra.transport.ProtocolVersion;
+ import org.apache.cassandra.transport.ConfiguredLimit;
+ import org.apache.cassandra.transport.Event;
+ import org.apache.cassandra.transport.Server;
  import org.apache.cassandra.transport.messages.ResultMessage;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;
@@@ -89,8 -89,9 +91,9 @@@ public abstract class CQLTeste
      private static org.apache.cassandra.transport.Server server;
      protected static final int nativePort;
      protected static final InetAddress nativeAddr;
 +    private static final Map<ProtocolVersion, Cluster> clusters = new 
HashMap<>();
 +    private static final Map<ProtocolVersion, Session> sessions = new 
HashMap<>();
+     protected static ConfiguredLimit protocolVersionLimit;
 -    private static final Map<Integer, Cluster> clusters = new HashMap<>();
 -    private static final Map<Integer, Session> sessions = new HashMap<>();
  
      private static boolean isServerPrepared = false;
  
@@@ -366,11 -377,14 +401,14 @@@
              if (clusters.containsKey(version))
                  continue;
  
 -            if (version > protocolVersionLimit.getMaxVersion())
++            if (version.isGreaterThan(protocolVersionLimit.getMaxVersion()))
+                 continue;
+ 
              Cluster cluster = Cluster.builder()
                                       .addContactPoints(nativeAddr)
-                                      .withClusterName("Test Cluster")
 -                                     .withClusterName("Test Cluster-v" + 
version)
++                                     .withClusterName("Test Cluster-" + 
version.name())
                                       .withPort(nativePort)
 -                                     
.withProtocolVersion(ProtocolVersion.fromInt(version))
 +                                     
.withProtocolVersion(com.datastax.driver.core.ProtocolVersion.fromInt(version.asInt()))
                                       .build();
              clusters.put(version, cluster);
              sessions.put(version, cluster.connect());
diff --cc test/unit/org/apache/cassandra/transport/DynamicLimitTest.java
index 0000000,83a0dd9..df06fba
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/transport/DynamicLimitTest.java
+++ b/test/unit/org/apache/cassandra/transport/DynamicLimitTest.java
@@@ -1,0 -1,111 +1,111 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.transport;
+ 
+ import java.net.InetAddress;
+ 
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.cql3.CQLTester;
+ 
+ import static org.apache.cassandra.transport.ProtocolTestHelper.cleanupPeers;
+ import static 
org.apache.cassandra.transport.ProtocolTestHelper.setStaticLimitInConfig;
+ import static org.apache.cassandra.transport.ProtocolTestHelper.setupPeer;
+ import static 
org.apache.cassandra.transport.ProtocolTestHelper.updatePeerInfo;
+ import static org.junit.Assert.assertEquals;
+ 
+ public class DynamicLimitTest
+ {
+     @BeforeClass
+     public static void setup()
+     {
+         CQLTester.prepareServer();
+     }
+ 
+     @Test
+     public void disableDynamicLimitWithSystemProperty() throws Throwable
+     {
+         // Dynamic limiting of the max negotiable protocol version can be
+         // disabled with a system property
+ 
+         // ensure that no static limit is configured
+         setStaticLimitInConfig(null);
+ 
+         // set the property which disables dynamic limiting
+         
System.setProperty(ConfiguredLimit.DISABLE_MAX_PROTOCOL_AUTO_OVERRIDE, "true");
+         // insert a legacy peer into system.peers and also
+         InetAddress peer = null;
+         try
+         {
+             peer = setupPeer("127.1.0.1", "2.2.0");
+             ConfiguredLimit limit = ConfiguredLimit.newLimit();
 -            assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion());
++            assertEquals(ProtocolVersion.MAX_SUPPORTED_VERSION, 
limit.getMaxVersion());
+ 
+             // clearing the property after the limit has been returned has no 
effect
+             
System.clearProperty(ConfiguredLimit.DISABLE_MAX_PROTOCOL_AUTO_OVERRIDE);
+             limit.updateMaxSupportedVersion();
 -            assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion());
++            assertEquals(ProtocolVersion.MAX_SUPPORTED_VERSION, 
limit.getMaxVersion());
+ 
+             // a new limit should now be dynamic
+             limit = ConfiguredLimit.newLimit();
 -            assertEquals(Server.VERSION_3, limit.getMaxVersion());
++            assertEquals(ProtocolVersion.V3, limit.getMaxVersion());
+         }
+         finally
+         {
+             
System.clearProperty(ConfiguredLimit.DISABLE_MAX_PROTOCOL_AUTO_OVERRIDE);
+             cleanupPeers(peer);
+         }
+     }
+ 
+     @Test
+     public void disallowLoweringMaxVersion() throws Throwable
+     {
+         // Lowering the max version once connections have been established is 
a problem
+         // for some clients. So for a dynamic limit, if notifications of peer 
versions
+         // trigger a change to the max version, it's only allowed to increase 
the max
+         // negotiable version
+ 
+         InetAddress peer = null;
+         try
+         {
+             // ensure that no static limit is configured
+             setStaticLimitInConfig(null);
+             ConfiguredLimit limit = ConfiguredLimit.newLimit();
 -            assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion());
++            assertEquals(ProtocolVersion.MAX_SUPPORTED_VERSION, 
limit.getMaxVersion());
+ 
+             peer = setupPeer("127.1.0.1", "3.0.0");
+             limit.updateMaxSupportedVersion();
 -            assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion());
++            assertEquals(ProtocolVersion.MAX_SUPPORTED_VERSION, 
limit.getMaxVersion());
+ 
+             // learn that peer doesn't actually fully support V4, behaviour 
should remain the same
+             updatePeerInfo(peer, "2.2.0");
+             limit.updateMaxSupportedVersion();
 -            assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion());
++            assertEquals(ProtocolVersion.MAX_SUPPORTED_VERSION, 
limit.getMaxVersion());
+ 
+             // finally learn that peer2 has been upgraded, just for 
completeness
+             updatePeerInfo(peer, "3.3.0");
+             limit.updateMaxSupportedVersion();
 -            assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion());
++            assertEquals(ProtocolVersion.MAX_SUPPORTED_VERSION, 
limit.getMaxVersion());
+ 
+         } finally {
+             cleanupPeers(peer);
+         }
+     }
+ }
diff --cc test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java
index e287c08,0000000..7b56a49
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java
+++ b/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java
@@@ -1,97 -1,0 +1,97 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.transport;
 +
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +public class ProtocolVersionTest
 +{
 +    @Test
 +    public void testDecode()
 +    {
 +        for (ProtocolVersion version : ProtocolVersion.SUPPORTED)
-             Assert.assertEquals(version, 
ProtocolVersion.decode(version.asInt()));
++            Assert.assertEquals(version, 
ProtocolVersion.decode(version.asInt(), ProtocolVersionLimit.SERVER_DEFAULT));
 +
 +        for (ProtocolVersion version : ProtocolVersion.UNSUPPORTED)
 +        { // unsupported old versions
 +            try
 +            {
-                 Assert.assertEquals(version, 
ProtocolVersion.decode(version.asInt()));
++                Assert.assertEquals(version, 
ProtocolVersion.decode(version.asInt(), ProtocolVersionLimit.SERVER_DEFAULT));
 +                Assert.fail("Expected invalid protocol exception");
 +            }
 +            catch (ProtocolException ex)
 +            {
 +                Assert.assertNotNull(ex.getForcedProtocolVersion());
 +                Assert.assertEquals(version, ex.getForcedProtocolVersion());
 +            }
 +        }
 +
 +        try
 +        { // unsupported newer version
-             Assert.assertEquals(null, ProtocolVersion.decode(63));
++            Assert.assertEquals(null, ProtocolVersion.decode(63, 
ProtocolVersionLimit.SERVER_DEFAULT));
 +            Assert.fail("Expected invalid protocol exception");
 +        }
 +        catch (ProtocolException ex)
 +        {
 +            Assert.assertNotNull(ex.getForcedProtocolVersion());
 +            Assert.assertEquals(ProtocolVersion.MAX_SUPPORTED_VERSION, 
ex.getForcedProtocolVersion());
 +        }
 +    }
 +
 +    @Test
 +    public void testSupportedVersions()
 +    {
 +        Assert.assertTrue(ProtocolVersion.supportedVersions().size() >= 2); 
// at least one OS and one DSE
 +        Assert.assertNotNull(ProtocolVersion.CURRENT);
 +
 +        Assert.assertFalse(ProtocolVersion.V4.isBeta());
 +        Assert.assertTrue(ProtocolVersion.V5.isBeta());
 +    }
 +
 +    @Test
 +    public void testComparisons()
 +    {
 +        
Assert.assertTrue(ProtocolVersion.V1.isSmallerOrEqualTo(ProtocolVersion.V1));
 +        
Assert.assertTrue(ProtocolVersion.V2.isSmallerOrEqualTo(ProtocolVersion.V2));
 +        
Assert.assertTrue(ProtocolVersion.V3.isSmallerOrEqualTo(ProtocolVersion.V3));
 +        
Assert.assertTrue(ProtocolVersion.V4.isSmallerOrEqualTo(ProtocolVersion.V4));
 +
 +        
Assert.assertTrue(ProtocolVersion.V1.isGreaterOrEqualTo(ProtocolVersion.V1));
 +        
Assert.assertTrue(ProtocolVersion.V2.isGreaterOrEqualTo(ProtocolVersion.V2));
 +        
Assert.assertTrue(ProtocolVersion.V3.isGreaterOrEqualTo(ProtocolVersion.V3));
 +        
Assert.assertTrue(ProtocolVersion.V4.isGreaterOrEqualTo(ProtocolVersion.V4));
 +
 +        
Assert.assertTrue(ProtocolVersion.V1.isSmallerThan(ProtocolVersion.V2));
 +        
Assert.assertTrue(ProtocolVersion.V2.isSmallerThan(ProtocolVersion.V3));
 +        
Assert.assertTrue(ProtocolVersion.V3.isSmallerThan(ProtocolVersion.V4));
 +
 +        
Assert.assertFalse(ProtocolVersion.V1.isGreaterThan(ProtocolVersion.V2));
 +        
Assert.assertFalse(ProtocolVersion.V2.isGreaterThan(ProtocolVersion.V3));
 +        
Assert.assertFalse(ProtocolVersion.V3.isGreaterThan(ProtocolVersion.V4));
 +
 +        
Assert.assertTrue(ProtocolVersion.V4.isGreaterThan(ProtocolVersion.V3));
 +        
Assert.assertTrue(ProtocolVersion.V3.isGreaterThan(ProtocolVersion.V2));
 +        
Assert.assertTrue(ProtocolVersion.V2.isGreaterThan(ProtocolVersion.V1));
 +
 +        
Assert.assertFalse(ProtocolVersion.V4.isSmallerThan(ProtocolVersion.V3));
 +        
Assert.assertFalse(ProtocolVersion.V3.isSmallerThan(ProtocolVersion.V2));
 +        
Assert.assertFalse(ProtocolVersion.V2.isSmallerThan(ProtocolVersion.V1));
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to