This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 304096566868d0f51d461be7c14c8bcc1ebfb277 Merge: a63df4d 0388d89 Author: Sam Tunnicliffe <s...@beobal.com> AuthorDate: Tue Oct 8 18:38:42 2019 +0100 Merge branch 'cassandra-3.0' into cassandra-3.11 CHANGES.txt | 3 +- NEWS.txt | 5 + bin/cqlsh.py | 2 +- src/java/org/apache/cassandra/config/Config.java | 2 +- .../cassandra/config/DatabaseDescriptor.java | 22 +++ .../org/apache/cassandra/db/SystemKeyspace.java | 43 ++++++ .../apache/cassandra/service/CassandraDaemon.java | 10 ++ .../cassandra/service/NativeTransportService.java | 19 +++ .../apache/cassandra/service/StorageService.java | 21 ++- .../cassandra/service/StorageServiceMBean.java | 3 + .../org/apache/cassandra/transport/Client.java | 2 +- .../cassandra/transport/ConfiguredLimit.java | 112 ++++++++++++++ src/java/org/apache/cassandra/transport/Frame.java | 6 +- .../org/apache/cassandra/transport/Message.java | 10 +- .../cassandra/transport/ProtocolVersion.java | 6 +- .../cassandra/transport/ProtocolVersionLimit.java | 27 ++++ .../org/apache/cassandra/transport/Server.java | 22 ++- .../apache/cassandra/transport/SimpleClient.java | 4 +- test/unit/org/apache/cassandra/cql3/CQLTester.java | 52 ++++++- .../cassandra/transport/DynamicLimitTest.java | 111 ++++++++++++++ .../cassandra/transport/ProtocolErrorTest.java | 8 +- .../transport/ProtocolNegotiationTest.java | 166 +++++++++++++++++++++ .../cassandra/transport/ProtocolTestHelper.java | 95 ++++++++++++ .../cassandra/transport/ProtocolVersionTest.java | 6 +- 24 files changed, 730 insertions(+), 27 deletions(-) diff --cc CHANGES.txt index 3544594,925a90a..b4b1245 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,12 -1,8 +1,13 @@@ -3.0.19 +3.11.5 + * Make sure user defined compaction transactions are always closed (CASSANDRA-15123) + * Fix cassandra-env.sh to use $CASSANDRA_CONF to find cassandra-jaas.config (CASSANDRA-14305) + * Fixed nodetool cfstats printing index name twice (CASSANDRA-14903) + * Add flag to disable SASI indexes, and warnings on creation (CASSANDRA-14866) - Merged from 3.0: ++Merged from 3.0:a + * Add ability to cap max negotiable protocol version (CASSANDRA-15193) * Gossip tokens on startup if available (CASSANDRA-15335) * Fix resource leak in CompressedSequentialWriter (CASSANDRA-15340) - * Fix merge which reverted CASSANDRA-14993 (CASSANDRA-15289) + * Fix bad merge that reverted CASSANDRA-14993 (CASSANDRA-15289) * Fix LegacyLayout RangeTombstoneList IndexOutOfBoundsException when upgrading and RangeTombstone bounds are asymmetric (CASSANDRA-15172) * Fix NPE when using allocate_tokens_for_keyspace on new DC/rack (CASSANDRA-14952) * Filter sstables earlier when running cleanup (CASSANDRA-15100) diff --cc NEWS.txt index 2feac81,c03284b..2aa6deb --- a/NEWS.txt +++ b/NEWS.txt @@@ -49,16 -49,13 +49,21 @@@ Upgradin --------- - repair_session_max_tree_depth setting has been added to cassandra.yaml to allow operators to reduce merkle tree size if repair is creating too much heap pressure. See CASSANDRA-14096 for details. + - native_transport_max_negotiable_protocol_version has been added to cassandra.yaml to allow operators to - enforce an upper limit on the version of the native protocol that servers will negotiate with clients. ++ enforce an upper limit on the version of the native protocol that servers will negotiate with clients. + This can be used during upgrades from 2.1 to 3.0 to prevent errors due to incompatible paging state formats + between the two versions. See CASSANDRA-15193 for details. + -3.0.18 +Experimental features +--------------------- + - An 'enable_sasi_indexes' flag, true by default, has been added to cassandra.yaml to allow operators to prevent + the creation of new SASI indexes, which are considered experimental and are not recommended for production use. + (See https://www.mail-archive.com/dev@cassandra.apache.org/msg13582.html) + - The flags 'enable_sasi_indexes' and 'enable_materialized_views' have been grouped under an experimental features + section in cassandra.yaml. + +3.11.4 ====== Upgrading diff --cc src/java/org/apache/cassandra/config/Config.java index 1d79e2a,bc3e3bf..7f28546 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@@ -163,10 -156,10 +163,10 @@@ public class Confi public boolean native_transport_flush_in_batches_legacy = true; public volatile long native_transport_max_concurrent_requests_in_bytes_per_ip = -1L; public volatile long native_transport_max_concurrent_requests_in_bytes = -1L; - + public Integer native_transport_max_negotiable_protocol_version = Integer.MIN_VALUE; @Deprecated - public Integer thrift_max_message_length_in_mb = 16; + public int thrift_max_message_length_in_mb = 16; /** * Max size of values in SSTables, in MegaBytes. * Default is the same as the native protocol frame limit: 256Mb. diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 75296b6,a161a2a..1e2e1a1 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@@ -46,26 -40,24 +46,28 @@@ import org.apache.cassandra.config.Conf import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.util.DiskOptimizationStrategy; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.locator.*; -import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.io.util.SpinningDiskOptimizationStrategy; +import org.apache.cassandra.io.util.SsdDiskOptimizationStrategy; +import org.apache.cassandra.locator.DynamicEndpointSnitch; +import org.apache.cassandra.locator.EndpointSnitchInfo; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.SeedProvider; +import org.apache.cassandra.net.BackPressureStrategy; +import org.apache.cassandra.net.RateBasedBackPressure; import org.apache.cassandra.scheduler.IRequestScheduler; import org.apache.cassandra.scheduler.NoScheduler; -import org.apache.cassandra.service.CacheService; -import org.apache.cassandra.thrift.ThriftServer; -import org.apache.cassandra.transport.Server; +import org.apache.cassandra.security.EncryptionContext; +import org.apache.cassandra.service.CacheService.CacheType; +import org.apache.cassandra.thrift.ThriftServer.ThriftServerType; ++import org.apache.cassandra.transport.ProtocolVersion; ++import org.apache.cassandra.transport.ProtocolVersionLimit; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.memory.*; + +import org.apache.commons.lang3.StringUtils; import static org.apache.cassandra.io.util.FileUtils.ONE_GB; -import static org.apache.cassandra.io.util.FileUtils.ONE_MB; public class DatabaseDescriptor { @@@ -700,212 -719,6 +702,227 @@@ conf.server_encryption_options = conf.encryption_options; } + if (conf.user_defined_function_fail_timeout < 0) + throw new ConfigurationException("user_defined_function_fail_timeout must not be negative", false); + if (conf.user_defined_function_warn_timeout < 0) + throw new ConfigurationException("user_defined_function_warn_timeout must not be negative", false); + + if (conf.user_defined_function_fail_timeout < conf.user_defined_function_warn_timeout) + throw new ConfigurationException("user_defined_function_warn_timeout must less than user_defined_function_fail_timeout", false); + + if (conf.commitlog_segment_size_in_mb <= 0) + throw new ConfigurationException("commitlog_segment_size_in_mb must be positive, but was " + + conf.commitlog_segment_size_in_mb, false); + else if (conf.commitlog_segment_size_in_mb >= 2048) + throw new ConfigurationException("commitlog_segment_size_in_mb must be smaller than 2048, but was " + + conf.commitlog_segment_size_in_mb, false); + + if (conf.max_mutation_size_in_kb == null) + conf.max_mutation_size_in_kb = conf.commitlog_segment_size_in_mb * 1024 / 2; + else if (conf.commitlog_segment_size_in_mb * 1024 < 2 * conf.max_mutation_size_in_kb) + throw new ConfigurationException("commitlog_segment_size_in_mb must be at least twice the size of max_mutation_size_in_kb / 1024", false); + + // native transport encryption options + if (conf.native_transport_port_ssl != null + && conf.native_transport_port_ssl != conf.native_transport_port + && !conf.client_encryption_options.enabled) + { + throw new ConfigurationException("Encryption must be enabled in client_encryption_options for native_transport_port_ssl", false); + } + ++ // If max protocol version has been set, just validate it's within an acceptable range ++ if (conf.native_transport_max_negotiable_protocol_version != Integer.MIN_VALUE) ++ { ++ try ++ { ++ ProtocolVersion.decode(conf.native_transport_max_negotiable_protocol_version, ProtocolVersionLimit.SERVER_DEFAULT); ++ logger.info("Native transport max negotiable version statically limited to {}", conf.native_transport_max_negotiable_protocol_version); ++ } ++ catch (Exception e) ++ { ++ throw new ConfigurationException("Invalid setting for native_transport_max_negotiable_protocol_version; " + ++ ProtocolVersion.invalidVersionMessage(conf.native_transport_max_negotiable_protocol_version)); ++ } ++ } ++ + if (conf.max_value_size_in_mb <= 0) + throw new ConfigurationException("max_value_size_in_mb must be positive", false); + else if (conf.max_value_size_in_mb >= 2048) + throw new ConfigurationException("max_value_size_in_mb must be smaller than 2048, but was " + + conf.max_value_size_in_mb, false); + + switch (conf.disk_optimization_strategy) + { + case ssd: + diskOptimizationStrategy = new SsdDiskOptimizationStrategy(conf.disk_optimization_page_cross_chance); + break; + case spinning: + diskOptimizationStrategy = new SpinningDiskOptimizationStrategy(); + break; + } + + try + { + ParameterizedClass strategy = conf.back_pressure_strategy != null ? conf.back_pressure_strategy : RateBasedBackPressure.withDefaultParams(); + Class<?> clazz = Class.forName(strategy.class_name); + if (!BackPressureStrategy.class.isAssignableFrom(clazz)) + throw new ConfigurationException(strategy + " is not an instance of " + BackPressureStrategy.class.getCanonicalName(), false); + + Constructor<?> ctor = clazz.getConstructor(Map.class); + BackPressureStrategy instance = (BackPressureStrategy) ctor.newInstance(strategy.parameters); + logger.info("Back-pressure is {} with strategy {}.", backPressureEnabled() ? "enabled" : "disabled", conf.back_pressure_strategy); + backPressureStrategy = instance; + } + catch (ConfigurationException ex) + { + throw ex; + } + catch (Exception ex) + { + throw new ConfigurationException("Error configuring back-pressure strategy: " + conf.back_pressure_strategy, ex); + } + + if (conf.otc_coalescing_enough_coalesced_messages > 128) + throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be smaller than 128", false); + + if (conf.otc_coalescing_enough_coalesced_messages <= 0) + throw new ConfigurationException("otc_coalescing_enough_coalesced_messages must be positive", false); + } + + private static String storagedirFor(String type) + { + return storagedir(type + "_directory") + File.separator + type; + } + + private static String storagedir(String errMsgType) + { + String storagedir = System.getProperty(Config.PROPERTY_PREFIX + "storagedir", null); + if (storagedir == null) + throw new ConfigurationException(errMsgType + " is missing and -Dcassandra.storagedir is not set", false); + return storagedir; + } + + public static void applyAddressConfig() throws ConfigurationException + { + applyAddressConfig(conf); + } + + public static void applyAddressConfig(Config config) throws ConfigurationException + { + listenAddress = null; + rpcAddress = null; + broadcastAddress = null; + broadcastRpcAddress = null; + + /* Local IP, hostname or interface to bind services to */ + if (config.listen_address != null && config.listen_interface != null) + { + throw new ConfigurationException("Set listen_address OR listen_interface, not both", false); + } + else if (config.listen_address != null) + { + try + { + listenAddress = InetAddress.getByName(config.listen_address); + } + catch (UnknownHostException e) + { + throw new ConfigurationException("Unknown listen_address '" + config.listen_address + "'", false); + } + + if (listenAddress.isAnyLocalAddress()) + throw new ConfigurationException("listen_address cannot be a wildcard address (" + config.listen_address + ")!", false); + } + else if (config.listen_interface != null) + { + listenAddress = getNetworkInterfaceAddress(config.listen_interface, "listen_interface", config.listen_interface_prefer_ipv6); + } + + /* Gossip Address to broadcast */ + if (config.broadcast_address != null) + { + try + { + broadcastAddress = InetAddress.getByName(config.broadcast_address); + } + catch (UnknownHostException e) + { + throw new ConfigurationException("Unknown broadcast_address '" + config.broadcast_address + "'", false); + } + + if (broadcastAddress.isAnyLocalAddress()) + throw new ConfigurationException("broadcast_address cannot be a wildcard address (" + config.broadcast_address + ")!", false); + } + + /* Local IP, hostname or interface to bind RPC server to */ + if (config.rpc_address != null && config.rpc_interface != null) + { + throw new ConfigurationException("Set rpc_address OR rpc_interface, not both", false); + } + else if (config.rpc_address != null) + { + try + { + rpcAddress = InetAddress.getByName(config.rpc_address); + } + catch (UnknownHostException e) + { + throw new ConfigurationException("Unknown host in rpc_address " + config.rpc_address, false); + } + } + else if (config.rpc_interface != null) + { + rpcAddress = getNetworkInterfaceAddress(config.rpc_interface, "rpc_interface", config.rpc_interface_prefer_ipv6); + } + else + { + rpcAddress = FBUtilities.getLocalAddress(); + } + + /* RPC address to broadcast */ + if (config.broadcast_rpc_address != null) + { + try + { + broadcastRpcAddress = InetAddress.getByName(config.broadcast_rpc_address); + } + catch (UnknownHostException e) + { + throw new ConfigurationException("Unknown broadcast_rpc_address '" + config.broadcast_rpc_address + "'", false); + } + + if (broadcastRpcAddress.isAnyLocalAddress()) + throw new ConfigurationException("broadcast_rpc_address cannot be a wildcard address (" + config.broadcast_rpc_address + ")!", false); + } + else + { + if (rpcAddress.isAnyLocalAddress()) + throw new ConfigurationException("If rpc_address is set to a wildcard address (" + config.rpc_address + "), then " + + "you must set broadcast_rpc_address to a value other than " + config.rpc_address, false); + } + } + + public static void applyThriftHSHA() + { + // fail early instead of OOMing (see CASSANDRA-8116) + if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads == Integer.MAX_VALUE) + throw new ConfigurationException("The hsha rpc_server_type is not compatible with an rpc_max_threads " + + "setting of 'unlimited'. Please see the comments in cassandra.yaml " + + "for rpc_server_type and rpc_max_threads.", + false); + if (ThriftServerType.HSHA.equals(conf.rpc_server_type) && conf.rpc_max_threads > (FBUtilities.getAvailableProcessors() * 2 + 1024)) + logger.warn("rpc_max_threads setting of {} may be too high for the hsha server and cause unnecessary thread contention, reducing performance", conf.rpc_max_threads); + } + + public static void applyEncryptionContext() + { + // always attempt to load the cipher factory, as we could be in the situation where the user has disabled encryption, + // but has existing commitlogs and sstables on disk that are still encrypted (and still need to be read) + encryptionContext = new EncryptionContext(conf.transparent_data_encryption_options); + } + + public static void applySeedProvider() + { // load the seeds for node contact points if (conf.seed_provider == null) { diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java index 812659c,7c222dd..96e0416 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@@ -757,6 -716,18 +757,18 @@@ public final class SystemKeyspac return executorService.submit((Runnable) () -> executeInternal(String.format(req, PEERS, columnName), ep, value)); } + public static void updatePeerReleaseVersion(final InetAddress ep, final Object value, Runnable postUpdateTask, ExecutorService executorService) + { + if (ep.equals(FBUtilities.getBroadcastAddress())) + return; + - String req = "INSERT INTO system.%s (peer, %s) VALUES (?, ?)"; ++ String req = "INSERT INTO system.%s (peer, release_version) VALUES (?, ?)"; + executorService.execute(() -> { - executeInternal(String.format(req, PEERS, "release_version"), ep, value); ++ executeInternal(String.format(req, PEERS), ep, value); + postUpdateTask.run(); + }); + } + public static synchronized void updateHintsDropped(InetAddress ep, UUID timePeriod, int value) { // with 30 day TTL diff --cc src/java/org/apache/cassandra/service/NativeTransportService.java index 6343f0d,587f781..c58bb5e --- a/src/java/org/apache/cassandra/service/NativeTransportService.java +++ b/src/java/org/apache/cassandra/service/NativeTransportService.java @@@ -31,10 -31,9 +31,11 @@@ import io.netty.channel.EventLoopGroup import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.EventExecutor; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.metrics.AuthMetrics; import org.apache.cassandra.metrics.ClientMetrics; + import org.apache.cassandra.transport.ConfiguredLimit; import org.apache.cassandra.transport.Message; import org.apache.cassandra.transport.Server; @@@ -141,6 -142,20 +146,20 @@@ public class NativeTransportServic Message.Dispatcher.shutdown(); } + public int getMaxProtocolVersion() + { - return protocolVersionLimit.getMaxVersion(); ++ return protocolVersionLimit.getMaxVersion().asInt(); + } + + public void refreshMaxNegotiableProtocolVersion() + { + // lowering the max negotiable protocol version is only safe if we haven't already + // allowed clients to connect with a higher version. This still allows the max + // version to be raised, as that is safe. + if (initialized) + protocolVersionLimit.updateMaxSupportedVersion(); + } + /** * @return intend to use epoll bassed event looping */ diff --cc src/java/org/apache/cassandra/service/StorageService.java index 346f9a4,8c29601..309be3d --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -441,9 -442,26 +441,26 @@@ public class StorageService extends Not return daemon.isNativeTransportRunning(); } + public int getMaxNativeProtocolVersion() + { + if (daemon == null) + { + throw new IllegalStateException("No configured daemon"); + } + return daemon.getMaxNativeProtocolVersion(); + } + + private void refreshMaxNativeProtocolVersion() + { + if (daemon != null) + { + daemon.refreshMaxNativeProtocolVersion(); + } + } + public void stopTransports() { - if (isInitialized()) + if (isGossipActive()) { logger.error("Stopping gossiper"); stopGossiping(); diff --cc src/java/org/apache/cassandra/transport/Client.java index e428b06,92466d2..368b1d7 --- a/src/java/org/apache/cassandra/transport/Client.java +++ b/src/java/org/apache/cassandra/transport/Client.java @@@ -251,7 -250,7 +251,7 @@@ public class Client extends SimpleClien // Parse options. String host = args[0]; int port = Integer.parseInt(args[1]); - ProtocolVersion version = args.length == 3 ? ProtocolVersion.decode(Integer.parseInt(args[2])) : ProtocolVersion.CURRENT; - int version = args.length == 3 ? Integer.parseInt(args[2]) : Server.CURRENT_VERSION; ++ ProtocolVersion version = args.length == 3 ? ProtocolVersion.decode(Integer.parseInt(args[2]), ProtocolVersionLimit.SERVER_DEFAULT) : ProtocolVersion.CURRENT; ClientEncryptionOptions encryptionOptions = new ClientEncryptionOptions(); System.out.println("CQL binary protocol console " + host + "@" + port + " using native protocol version " + version); diff --cc src/java/org/apache/cassandra/transport/ConfiguredLimit.java index 0000000,98518b8..16d3867 mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/transport/ConfiguredLimit.java +++ b/src/java/org/apache/cassandra/transport/ConfiguredLimit.java @@@ -1,0 -1,117 +1,112 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.transport; + + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.db.SystemKeyspace; + import org.apache.cassandra.utils.CassandraVersion; + + public abstract class ConfiguredLimit implements ProtocolVersionLimit + { + private static final Logger logger = LoggerFactory.getLogger(ConfiguredLimit.class); + static final String DISABLE_MAX_PROTOCOL_AUTO_OVERRIDE = "cassandra.disable_max_protocol_auto_override"; + static final CassandraVersion MIN_VERSION_FOR_V4 = new CassandraVersion("3.0.0"); + - public abstract int getMaxVersion(); ++ public abstract ProtocolVersion getMaxVersion(); + public abstract void updateMaxSupportedVersion(); + + public static ConfiguredLimit newLimit() + { + if (Boolean.getBoolean(DISABLE_MAX_PROTOCOL_AUTO_OVERRIDE)) - return new StaticLimit(Server.CURRENT_VERSION); ++ return new StaticLimit(ProtocolVersion.MAX_SUPPORTED_VERSION); + + int fromConfig = DatabaseDescriptor.getNativeProtocolMaxVersionOverride(); + return fromConfig != Integer.MIN_VALUE - ? new StaticLimit(fromConfig) - : new DynamicLimit(Server.CURRENT_VERSION); ++ ? new StaticLimit(ProtocolVersion.decode(fromConfig, ProtocolVersionLimit.SERVER_DEFAULT)) ++ : new DynamicLimit(ProtocolVersion.MAX_SUPPORTED_VERSION); + } + + private static class StaticLimit extends ConfiguredLimit + { - private final int maxVersion; - private StaticLimit(int maxVersion) ++ private final ProtocolVersion maxVersion; ++ private StaticLimit(ProtocolVersion maxVersion) + { - if (maxVersion < Server.MIN_SUPPORTED_VERSION || maxVersion > Server.CURRENT_VERSION) - throw new IllegalArgumentException(String.format("Invalid max protocol version supplied (%s); " + - "Values between %s and %s are supported", - maxVersion, - Server.MIN_SUPPORTED_VERSION, - Server.CURRENT_VERSION)); + this.maxVersion = maxVersion; + logger.info("Native transport max negotiable version statically limited to {}", maxVersion); + } + - public int getMaxVersion() ++ public ProtocolVersion getMaxVersion() + { + return maxVersion; + } + + public void updateMaxSupportedVersion() + { + // statically configured, so this is a no-op + } + } + + private static class DynamicLimit extends ConfiguredLimit + { - private volatile int maxVersion; - private DynamicLimit(int initialLimit) ++ private volatile ProtocolVersion maxVersion; ++ private DynamicLimit(ProtocolVersion initialLimit) + { + maxVersion = initialLimit; + maybeUpdateVersion(true); + } + - public int getMaxVersion() ++ public ProtocolVersion getMaxVersion() + { + return maxVersion; + } + + public void updateMaxSupportedVersion() + { + maybeUpdateVersion(false); + } + + private void maybeUpdateVersion(boolean allowLowering) + { + boolean enforceV3Cap = SystemKeyspace.loadPeerVersions() + .values() + .stream() + .anyMatch(v -> v.compareTo(MIN_VERSION_FOR_V4) < 0); + + if (!enforceV3Cap) + { - maxVersion = Server.CURRENT_VERSION; ++ maxVersion = ProtocolVersion.MAX_SUPPORTED_VERSION; + return; + } + - if (maxVersion > Server.VERSION_3 && !allowLowering) ++ if (ProtocolVersion.V3.isSmallerThan(maxVersion) && !allowLowering) + { + logger.info("Detected peers which do not fully support protocol V4, but V4 was previously negotiable. " + + "Not enforcing cap as this can cause issues for older client versions. After the next " + + "restart the server will apply the cap"); + return; + } ++ + logger.info("Detected peers which do not fully support protocol V4. Capping max negotiable version to V3"); - maxVersion = Server.VERSION_3; ++ maxVersion = ProtocolVersion.V3; + } + } + } diff --cc src/java/org/apache/cassandra/transport/Frame.java index 388cbc2,a07551f..e603e6c --- a/src/java/org/apache/cassandra/transport/Frame.java +++ b/src/java/org/apache/cassandra/transport/Frame.java @@@ -171,8 -176,11 +173,8 @@@ public class Fram // 1 and 2 use a shorter header, so we may never have a complete header's worth of bytes. int firstByte = buffer.getByte(idx++); Message.Direction direction = Message.Direction.extractFromVersion(firstByte); - int version = firstByte & PROTOCOL_VERSION_MASK; - if (version < Server.MIN_SUPPORTED_VERSION || version > versionCap.getMaxVersion()) - throw new ProtocolException(String.format("Invalid or unsupported protocol version (%d); the lowest supported version is %d and the greatest is %d", - version, Server.MIN_SUPPORTED_VERSION, versionCap.getMaxVersion()), - version < Server.MIN_SUPPORTED_VERSION ? version : null); + int versionNum = firstByte & PROTOCOL_VERSION_MASK; - ProtocolVersion version = ProtocolVersion.decode(versionNum); ++ ProtocolVersion version = ProtocolVersion.decode(versionNum, versionCap); // Wait until we have the complete header if (readableBytes < Header.LENGTH) diff --cc src/java/org/apache/cassandra/transport/Message.java index 271b690,5202578..f899022 --- a/src/java/org/apache/cassandra/transport/Message.java +++ b/src/java/org/apache/cassandra/transport/Message.java @@@ -326,7 -333,8 +333,8 @@@ public abstract class Messag { Connection connection = ctx.channel().attr(Connection.attributeKey).get(); // The only case the connection can be null is when we send the initial STARTUP message (client side thus) - ProtocolVersion version = connection == null ? ProtocolVersion.CURRENT : connection.getVersion(); - int version = connection == null ? versionCap.getMaxVersion() : connection.getVersion(); ++ ProtocolVersion version = connection == null ? versionCap.getMaxVersion() : connection.getVersion(); + EnumSet<Frame.Header.Flag> flags = EnumSet.noneOf(Frame.Header.Flag.class); Codec<Message> codec = (Codec<Message>)message.type.codec; diff --cc src/java/org/apache/cassandra/transport/ProtocolVersion.java index cd73c86,0000000..05fafc2 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/transport/ProtocolVersion.java +++ b/src/java/org/apache/cassandra/transport/ProtocolVersion.java @@@ -1,153 -1,0 +1,153 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.lang3.ArrayUtils; + +/** + * The native (CQL binary) protocol version. + * + * Some versions may be in beta, which means that the client must + * specify the beta flag in the frame for the version to be considered valid. + * Beta versions must have the word "beta" in their description, this is mandated + * by the specs. + * + */ +public enum ProtocolVersion implements Comparable<ProtocolVersion> +{ + // The order is important as it defines the chronological history of versions, which is used + // to determine if a feature is supported or some serdes formats + V1(1, "v1", false), // no longer supported + V2(2, "v2", false), // no longer supported + V3(3, "v3", false), + V4(4, "v4", false), + V5(5, "v5-beta", true); + + /** The version number */ + private final int num; + + /** A description of the version, beta versions should have the word "-beta" */ + private final String descr; + + /** Set this to true for beta versions */ + private final boolean beta; + + ProtocolVersion(int num, String descr, boolean beta) + { + this.num = num; + this.descr = descr; + this.beta = beta; + } + + /** The supported versions stored as an array, these should be private and are required for fast decoding*/ + private final static ProtocolVersion[] SUPPORTED_VERSIONS = new ProtocolVersion[] { V3, V4, V5 }; + final static ProtocolVersion MIN_SUPPORTED_VERSION = SUPPORTED_VERSIONS[0]; + final static ProtocolVersion MAX_SUPPORTED_VERSION = SUPPORTED_VERSIONS[SUPPORTED_VERSIONS.length - 1]; + + /** All supported versions, published as an enumset */ + public final static EnumSet<ProtocolVersion> SUPPORTED = EnumSet.copyOf(Arrays.asList((ProtocolVersion[]) ArrayUtils.addAll(SUPPORTED_VERSIONS))); + + /** Old unsupported versions, this is OK as long as we never add newer unsupported versions */ + public final static EnumSet<ProtocolVersion> UNSUPPORTED = EnumSet.complementOf(SUPPORTED); + + /** The preferred versions */ + public final static ProtocolVersion CURRENT = V4; + public final static Optional<ProtocolVersion> BETA = Optional.of(V5); + + public static List<String> supportedVersions() + { + List<String> ret = new ArrayList<>(SUPPORTED.size()); + for (ProtocolVersion version : SUPPORTED) + ret.add(version.toString()); + return ret; + } + - public static ProtocolVersion decode(int versionNum) ++ public static ProtocolVersion decode(int versionNum, ProtocolVersionLimit ceiling) + { - ProtocolVersion ret = versionNum >= MIN_SUPPORTED_VERSION.num && versionNum <= MAX_SUPPORTED_VERSION.num ++ ProtocolVersion ret = versionNum >= MIN_SUPPORTED_VERSION.num && versionNum <= ceiling.getMaxVersion().num + ? SUPPORTED_VERSIONS[versionNum - MIN_SUPPORTED_VERSION.num] + : null; + + if (ret == null) + { + // if this is not a supported version check the old versions + for (ProtocolVersion version : UNSUPPORTED) + { + // if it is an old version that is no longer supported this ensures that we reply + // with that same version + if (version.num == versionNum) + throw new ProtocolException(ProtocolVersion.invalidVersionMessage(versionNum), version); + } + + // If the version is invalid reply with the highest version that we support - throw new ProtocolException(invalidVersionMessage(versionNum), MAX_SUPPORTED_VERSION); ++ throw new ProtocolException(invalidVersionMessage(versionNum), ceiling.getMaxVersion()); + } + + return ret; + } + + public boolean isBeta() + { + return beta; + } + + public static String invalidVersionMessage(int version) + { + return String.format("Invalid or unsupported protocol version (%d); supported versions are (%s)", + version, String.join(", ", ProtocolVersion.supportedVersions())); + } + + public int asInt() + { + return num; + } + + @Override + public String toString() + { + // This format is mandated by the protocl specs for the SUPPORTED message, see OptionsMessage execute(). + return String.format("%d/%s", num, descr); + } + + public final boolean isGreaterThan(ProtocolVersion other) + { + return num > other.num; + } + + public final boolean isGreaterOrEqualTo(ProtocolVersion other) + { + return num >= other.num; + } + + public final boolean isSmallerThan(ProtocolVersion other) + { + return num < other.num; + } + + public final boolean isSmallerOrEqualTo(ProtocolVersion other) + { + return num <= other.num; + } +} diff --cc src/java/org/apache/cassandra/transport/ProtocolVersionLimit.java index 0000000,c476efb..9738a19 mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/transport/ProtocolVersionLimit.java +++ b/src/java/org/apache/cassandra/transport/ProtocolVersionLimit.java @@@ -1,0 -1,27 +1,27 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.transport; + + @FunctionalInterface + public interface ProtocolVersionLimit + { - public int getMaxVersion(); ++ public ProtocolVersion getMaxVersion(); + - public static final ProtocolVersionLimit SERVER_DEFAULT = () -> Server.CURRENT_VERSION; ++ public static final ProtocolVersionLimit SERVER_DEFAULT = () -> ProtocolVersion.MAX_SUPPORTED_VERSION; + } diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java index d438630,95366c2..e545e9f --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@@ -62,8 -62,9 +62,10 @@@ import org.apache.cassandra.serializers import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageService; - import org.apache.cassandra.transport.*; +import org.apache.cassandra.transport.ProtocolVersion; + import org.apache.cassandra.transport.ConfiguredLimit; + import org.apache.cassandra.transport.Event; + import org.apache.cassandra.transport.Server; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; @@@ -89,8 -89,9 +91,9 @@@ public abstract class CQLTeste private static org.apache.cassandra.transport.Server server; protected static final int nativePort; protected static final InetAddress nativeAddr; + private static final Map<ProtocolVersion, Cluster> clusters = new HashMap<>(); + private static final Map<ProtocolVersion, Session> sessions = new HashMap<>(); + protected static ConfiguredLimit protocolVersionLimit; - private static final Map<Integer, Cluster> clusters = new HashMap<>(); - private static final Map<Integer, Session> sessions = new HashMap<>(); private static boolean isServerPrepared = false; @@@ -366,11 -377,14 +401,14 @@@ if (clusters.containsKey(version)) continue; - if (version > protocolVersionLimit.getMaxVersion()) ++ if (version.isGreaterThan(protocolVersionLimit.getMaxVersion())) + continue; + Cluster cluster = Cluster.builder() .addContactPoints(nativeAddr) - .withClusterName("Test Cluster") - .withClusterName("Test Cluster-v" + version) ++ .withClusterName("Test Cluster-" + version.name()) .withPort(nativePort) - .withProtocolVersion(ProtocolVersion.fromInt(version)) + .withProtocolVersion(com.datastax.driver.core.ProtocolVersion.fromInt(version.asInt())) .build(); clusters.put(version, cluster); sessions.put(version, cluster.connect()); diff --cc test/unit/org/apache/cassandra/transport/DynamicLimitTest.java index 0000000,83a0dd9..df06fba mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/transport/DynamicLimitTest.java +++ b/test/unit/org/apache/cassandra/transport/DynamicLimitTest.java @@@ -1,0 -1,111 +1,111 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.transport; + + import java.net.InetAddress; + + import org.junit.BeforeClass; + import org.junit.Test; + + import org.apache.cassandra.cql3.CQLTester; + + import static org.apache.cassandra.transport.ProtocolTestHelper.cleanupPeers; + import static org.apache.cassandra.transport.ProtocolTestHelper.setStaticLimitInConfig; + import static org.apache.cassandra.transport.ProtocolTestHelper.setupPeer; + import static org.apache.cassandra.transport.ProtocolTestHelper.updatePeerInfo; + import static org.junit.Assert.assertEquals; + + public class DynamicLimitTest + { + @BeforeClass + public static void setup() + { + CQLTester.prepareServer(); + } + + @Test + public void disableDynamicLimitWithSystemProperty() throws Throwable + { + // Dynamic limiting of the max negotiable protocol version can be + // disabled with a system property + + // ensure that no static limit is configured + setStaticLimitInConfig(null); + + // set the property which disables dynamic limiting + System.setProperty(ConfiguredLimit.DISABLE_MAX_PROTOCOL_AUTO_OVERRIDE, "true"); + // insert a legacy peer into system.peers and also + InetAddress peer = null; + try + { + peer = setupPeer("127.1.0.1", "2.2.0"); + ConfiguredLimit limit = ConfiguredLimit.newLimit(); - assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion()); ++ assertEquals(ProtocolVersion.MAX_SUPPORTED_VERSION, limit.getMaxVersion()); + + // clearing the property after the limit has been returned has no effect + System.clearProperty(ConfiguredLimit.DISABLE_MAX_PROTOCOL_AUTO_OVERRIDE); + limit.updateMaxSupportedVersion(); - assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion()); ++ assertEquals(ProtocolVersion.MAX_SUPPORTED_VERSION, limit.getMaxVersion()); + + // a new limit should now be dynamic + limit = ConfiguredLimit.newLimit(); - assertEquals(Server.VERSION_3, limit.getMaxVersion()); ++ assertEquals(ProtocolVersion.V3, limit.getMaxVersion()); + } + finally + { + System.clearProperty(ConfiguredLimit.DISABLE_MAX_PROTOCOL_AUTO_OVERRIDE); + cleanupPeers(peer); + } + } + + @Test + public void disallowLoweringMaxVersion() throws Throwable + { + // Lowering the max version once connections have been established is a problem + // for some clients. So for a dynamic limit, if notifications of peer versions + // trigger a change to the max version, it's only allowed to increase the max + // negotiable version + + InetAddress peer = null; + try + { + // ensure that no static limit is configured + setStaticLimitInConfig(null); + ConfiguredLimit limit = ConfiguredLimit.newLimit(); - assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion()); ++ assertEquals(ProtocolVersion.MAX_SUPPORTED_VERSION, limit.getMaxVersion()); + + peer = setupPeer("127.1.0.1", "3.0.0"); + limit.updateMaxSupportedVersion(); - assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion()); ++ assertEquals(ProtocolVersion.MAX_SUPPORTED_VERSION, limit.getMaxVersion()); + + // learn that peer doesn't actually fully support V4, behaviour should remain the same + updatePeerInfo(peer, "2.2.0"); + limit.updateMaxSupportedVersion(); - assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion()); ++ assertEquals(ProtocolVersion.MAX_SUPPORTED_VERSION, limit.getMaxVersion()); + + // finally learn that peer2 has been upgraded, just for completeness + updatePeerInfo(peer, "3.3.0"); + limit.updateMaxSupportedVersion(); - assertEquals(Server.CURRENT_VERSION, limit.getMaxVersion()); ++ assertEquals(ProtocolVersion.MAX_SUPPORTED_VERSION, limit.getMaxVersion()); + + } finally { + cleanupPeers(peer); + } + } + } diff --cc test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java index e287c08,0000000..7b56a49 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java +++ b/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java @@@ -1,97 -1,0 +1,97 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport; + +import org.junit.Assert; +import org.junit.Test; + +public class ProtocolVersionTest +{ + @Test + public void testDecode() + { + for (ProtocolVersion version : ProtocolVersion.SUPPORTED) - Assert.assertEquals(version, ProtocolVersion.decode(version.asInt())); ++ Assert.assertEquals(version, ProtocolVersion.decode(version.asInt(), ProtocolVersionLimit.SERVER_DEFAULT)); + + for (ProtocolVersion version : ProtocolVersion.UNSUPPORTED) + { // unsupported old versions + try + { - Assert.assertEquals(version, ProtocolVersion.decode(version.asInt())); ++ Assert.assertEquals(version, ProtocolVersion.decode(version.asInt(), ProtocolVersionLimit.SERVER_DEFAULT)); + Assert.fail("Expected invalid protocol exception"); + } + catch (ProtocolException ex) + { + Assert.assertNotNull(ex.getForcedProtocolVersion()); + Assert.assertEquals(version, ex.getForcedProtocolVersion()); + } + } + + try + { // unsupported newer version - Assert.assertEquals(null, ProtocolVersion.decode(63)); ++ Assert.assertEquals(null, ProtocolVersion.decode(63, ProtocolVersionLimit.SERVER_DEFAULT)); + Assert.fail("Expected invalid protocol exception"); + } + catch (ProtocolException ex) + { + Assert.assertNotNull(ex.getForcedProtocolVersion()); + Assert.assertEquals(ProtocolVersion.MAX_SUPPORTED_VERSION, ex.getForcedProtocolVersion()); + } + } + + @Test + public void testSupportedVersions() + { + Assert.assertTrue(ProtocolVersion.supportedVersions().size() >= 2); // at least one OS and one DSE + Assert.assertNotNull(ProtocolVersion.CURRENT); + + Assert.assertFalse(ProtocolVersion.V4.isBeta()); + Assert.assertTrue(ProtocolVersion.V5.isBeta()); + } + + @Test + public void testComparisons() + { + Assert.assertTrue(ProtocolVersion.V1.isSmallerOrEqualTo(ProtocolVersion.V1)); + Assert.assertTrue(ProtocolVersion.V2.isSmallerOrEqualTo(ProtocolVersion.V2)); + Assert.assertTrue(ProtocolVersion.V3.isSmallerOrEqualTo(ProtocolVersion.V3)); + Assert.assertTrue(ProtocolVersion.V4.isSmallerOrEqualTo(ProtocolVersion.V4)); + + Assert.assertTrue(ProtocolVersion.V1.isGreaterOrEqualTo(ProtocolVersion.V1)); + Assert.assertTrue(ProtocolVersion.V2.isGreaterOrEqualTo(ProtocolVersion.V2)); + Assert.assertTrue(ProtocolVersion.V3.isGreaterOrEqualTo(ProtocolVersion.V3)); + Assert.assertTrue(ProtocolVersion.V4.isGreaterOrEqualTo(ProtocolVersion.V4)); + + Assert.assertTrue(ProtocolVersion.V1.isSmallerThan(ProtocolVersion.V2)); + Assert.assertTrue(ProtocolVersion.V2.isSmallerThan(ProtocolVersion.V3)); + Assert.assertTrue(ProtocolVersion.V3.isSmallerThan(ProtocolVersion.V4)); + + Assert.assertFalse(ProtocolVersion.V1.isGreaterThan(ProtocolVersion.V2)); + Assert.assertFalse(ProtocolVersion.V2.isGreaterThan(ProtocolVersion.V3)); + Assert.assertFalse(ProtocolVersion.V3.isGreaterThan(ProtocolVersion.V4)); + + Assert.assertTrue(ProtocolVersion.V4.isGreaterThan(ProtocolVersion.V3)); + Assert.assertTrue(ProtocolVersion.V3.isGreaterThan(ProtocolVersion.V2)); + Assert.assertTrue(ProtocolVersion.V2.isGreaterThan(ProtocolVersion.V1)); + + Assert.assertFalse(ProtocolVersion.V4.isSmallerThan(ProtocolVersion.V3)); + Assert.assertFalse(ProtocolVersion.V3.isSmallerThan(ProtocolVersion.V2)); + Assert.assertFalse(ProtocolVersion.V2.isSmallerThan(ProtocolVersion.V1)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org