This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 56b979bec6c2d7602982baad622f52860baa0cb9 Merge: a209139 9c50b1f Author: David Capwell <[email protected]> AuthorDate: Tue Jun 1 20:48:34 2021 -0700 Merge branch 'cassandra-3.0' into cassandra-3.11 CHANGES.txt | 1 + .../apache/cassandra/metrics/ClientMetrics.java | 16 ++++ .../org/apache/cassandra/transport/Message.java | 31 ++++++- .../cassandra/transport/ProtocolException.java | 24 ++++++ .../cassandra/transport/ProtocolVersion.java | 17 ++-- .../apache/cassandra/transport/SimpleClient.java | 15 +++- .../org/apache/cassandra/utils/Throwables.java | 11 +++ .../test/UnableToParseClientMessageTest.java | 97 ++++++++++++++++++++++ .../cassandra/transport/MessagePayloadTest.java | 11 +++ .../cassandra/transport/ProtocolVersionTest.java | 3 +- .../cassandra/transport/WrappedSimpleClient.java | 72 ++++++++++++++++ 11 files changed, 285 insertions(+), 13 deletions(-) diff --cc CHANGES.txt index 474025d,923337e..2faa6d6 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,14 -1,6 +1,15 @@@ -3.0.25: +3.11.11 + * Fix LeveledCompactionStrategy compacts last level throw an ArrayIndexOutOfBoundsException (CASSANDRA-15669) + * Maps $CASSANDRA_LOG_DIR to cassandra.logdir java property when executing nodetool (CASSANDRA-16199) + * Nodetool garbagecollect should retain SSTableLevel for LCS (CASSANDRA-16634) + * Ignore stale acks received in the shadow round (CASSANDRA-16588) + * Add autocomplete and error messages for provide_overlapping_tombstones (CASSANDRA-16350) + * Add StorageServiceMBean.getKeyspaceReplicationInfo(keyspaceName) (CASSANDRA-16447) + * Make sure sstables with moved starts are removed correctly in LeveledGenerations (CASSANDRA-16552) + * Upgrade jackson-databind to 2.9.10.8 (CASSANDRA-16462) +Merged from 3.0: + * Failure to execute queries should emit a KPI other than read timeout/unavailable so it can be alerted/tracked (CASSANDRA-16581) - * Don't wait on schema versions from replacement target when replacing (CASSANDRA-16692) + * Don't wait on schema versions from replacement target when replacing a node (CASSANDRA-16692) * StandaloneVerifier does not fail when unable to verify SSTables, it only fails if Corruption is thrown (CASSANDRA-16683) * Fix bloom filter false ratio calculation by including true negatives (CASSANDRA-15834) * Prevent loss of commit log data when moving sstables between nodes (CASSANDRA-16619) diff --cc src/java/org/apache/cassandra/transport/ProtocolException.java index 6ef17ac,0b307a9..bdfdb6f --- a/src/java/org/apache/cassandra/transport/ProtocolException.java +++ b/src/java/org/apache/cassandra/transport/ProtocolException.java @@@ -43,8 -43,37 +43,32 @@@ public class ProtocolException extends return ExceptionCode.PROTOCOL_ERROR; } - /** - * If the ProtocolException is due to a connection being made with a protocol version that is lower - * than Server.MIN_SUPPORTED_VERSION, this will return that unsupported protocol version. Otherwise, - * null is returned. - */ - public Integer getAttemptedLowProtocolVersion() + public ProtocolVersion getForcedProtocolVersion() { - return attemptedLowProtocolVersion; + return forcedProtocolVersion; } + + public boolean isSilent() + { + return false; + } + + public static ProtocolException toSilentException(ProtocolException e) + { + return new Silent(e); + } + + private static class Silent extends ProtocolException + { + public Silent(ProtocolException cause) + { - super(cause.getMessage(), cause.attemptedLowProtocolVersion); ++ super(cause.getMessage(), cause.forcedProtocolVersion); + } + + @Override + public boolean isSilent() + { + return true; + } + } } diff --cc src/java/org/apache/cassandra/transport/ProtocolVersion.java index 05fafc2,0000000..4037817 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/transport/ProtocolVersion.java +++ b/src/java/org/apache/cassandra/transport/ProtocolVersion.java @@@ -1,153 -1,0 +1,160 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.List; +import java.util.Optional; + +import org.apache.commons.lang3.ArrayUtils; + +/** + * The native (CQL binary) protocol version. + * + * Some versions may be in beta, which means that the client must + * specify the beta flag in the frame for the version to be considered valid. + * Beta versions must have the word "beta" in their description, this is mandated + * by the specs. + * + */ +public enum ProtocolVersion implements Comparable<ProtocolVersion> +{ + // The order is important as it defines the chronological history of versions, which is used + // to determine if a feature is supported or some serdes formats + V1(1, "v1", false), // no longer supported + V2(2, "v2", false), // no longer supported + V3(3, "v3", false), + V4(4, "v4", false), + V5(5, "v5-beta", true); + + /** The version number */ + private final int num; + + /** A description of the version, beta versions should have the word "-beta" */ + private final String descr; + + /** Set this to true for beta versions */ + private final boolean beta; + + ProtocolVersion(int num, String descr, boolean beta) + { + this.num = num; + this.descr = descr; + this.beta = beta; + } + + /** The supported versions stored as an array, these should be private and are required for fast decoding*/ + private final static ProtocolVersion[] SUPPORTED_VERSIONS = new ProtocolVersion[] { V3, V4, V5 }; + final static ProtocolVersion MIN_SUPPORTED_VERSION = SUPPORTED_VERSIONS[0]; + final static ProtocolVersion MAX_SUPPORTED_VERSION = SUPPORTED_VERSIONS[SUPPORTED_VERSIONS.length - 1]; ++ /** These versions are sent by some clients, but are not valid Apache Cassandra versions (66, and 65 are DSE versions) */ ++ private static int[] KNOWN_INVALID_VERSIONS = { 66, 65}; + + /** All supported versions, published as an enumset */ + public final static EnumSet<ProtocolVersion> SUPPORTED = EnumSet.copyOf(Arrays.asList((ProtocolVersion[]) ArrayUtils.addAll(SUPPORTED_VERSIONS))); + + /** Old unsupported versions, this is OK as long as we never add newer unsupported versions */ + public final static EnumSet<ProtocolVersion> UNSUPPORTED = EnumSet.complementOf(SUPPORTED); + + /** The preferred versions */ + public final static ProtocolVersion CURRENT = V4; + public final static Optional<ProtocolVersion> BETA = Optional.of(V5); + + public static List<String> supportedVersions() + { + List<String> ret = new ArrayList<>(SUPPORTED.size()); + for (ProtocolVersion version : SUPPORTED) + ret.add(version.toString()); + return ret; + } + + public static ProtocolVersion decode(int versionNum, ProtocolVersionLimit ceiling) + { + ProtocolVersion ret = versionNum >= MIN_SUPPORTED_VERSION.num && versionNum <= ceiling.getMaxVersion().num + ? SUPPORTED_VERSIONS[versionNum - MIN_SUPPORTED_VERSION.num] + : null; + + if (ret == null) + { + // if this is not a supported version check the old versions - for (ProtocolVersion version : UNSUPPORTED) ++ for (ProtocolVersion dseVersion : UNSUPPORTED) + { + // if it is an old version that is no longer supported this ensures that we reply + // with that same version - if (version.num == versionNum) - throw new ProtocolException(ProtocolVersion.invalidVersionMessage(versionNum), version); ++ if (dseVersion.num == versionNum) ++ throw new ProtocolException(ProtocolVersion.invalidVersionMessage(versionNum), dseVersion); ++ } ++ for (int version : KNOWN_INVALID_VERSIONS) ++ { ++ if (versionNum == version) ++ throw ProtocolException.toSilentException(new ProtocolException(ProtocolVersion.invalidVersionMessage(versionNum))); + } + - // If the version is invalid reply with the highest version that we support - throw new ProtocolException(invalidVersionMessage(versionNum), ceiling.getMaxVersion()); ++ // If the version is invalid reply with the channel's version ++ throw new ProtocolException(invalidVersionMessage(versionNum)); + } + + return ret; + } + + public boolean isBeta() + { + return beta; + } + + public static String invalidVersionMessage(int version) + { + return String.format("Invalid or unsupported protocol version (%d); supported versions are (%s)", + version, String.join(", ", ProtocolVersion.supportedVersions())); + } + + public int asInt() + { + return num; + } + + @Override + public String toString() + { + // This format is mandated by the protocl specs for the SUPPORTED message, see OptionsMessage execute(). + return String.format("%d/%s", num, descr); + } + + public final boolean isGreaterThan(ProtocolVersion other) + { + return num > other.num; + } + + public final boolean isGreaterOrEqualTo(ProtocolVersion other) + { + return num >= other.num; + } + + public final boolean isSmallerThan(ProtocolVersion other) + { + return num < other.num; + } + + public final boolean isSmallerOrEqualTo(ProtocolVersion other) + { + return num <= other.num; + } +} diff --cc test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageTest.java index 0000000,75cc905..7bfe5a0 mode 000000,100644..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageTest.java @@@ -1,0 -1,88 +1,97 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.distributed.test; + + import java.io.IOException; + import java.nio.charset.StandardCharsets; + import java.util.List; + + import org.junit.Assert; ++import org.junit.BeforeClass; + import org.junit.Test; + + import io.netty.buffer.Unpooled; + import org.apache.cassandra.Util; ++import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.distributed.Cluster; + import org.apache.cassandra.distributed.api.Feature; + import org.apache.cassandra.distributed.api.IInvokableInstance; + import org.apache.cassandra.metrics.CassandraMetricsRegistry; + import org.apache.cassandra.transport.WrappedSimpleClient; + + /** + * If a client sends a message that can not be parsed by the server then we need to detect this and update metrics + * for monitoring. + * + * An issue was found between 2.1 to 3.0 upgrades with regards to paging serialization. Since + * this is a serialization issue we hit similar paths by sending bad bytes to the server, so can simulate the mixed-mode + * paging issue without needing to send proper messages. + */ + public class UnableToParseClientMessageTest extends TestBaseImpl + { ++ @BeforeClass ++ public static void setup() ++ { ++ DatabaseDescriptor.daemonInitialization(); ++ } ++ + @Test + public void badMessageCausesProtocolException() throws IOException, InterruptedException + { + try (Cluster cluster = init(Cluster.build(1).withConfig(c -> c.with(Feature.values())).start())) + { + // write gibberish to the native protocol + IInvokableInstance node = cluster.get(1); + // make sure everything is fine at the start + node.runOnInstance(() -> { + Assert.assertEquals(0, CassandraMetricsRegistry.Metrics.getMeters() + .get("org.apache.cassandra.metrics.Client.ProtocolException") + .getCount()); + Assert.assertEquals(0, CassandraMetricsRegistry.Metrics.getMeters() + .get("org.apache.cassandra.metrics.Client.UnknownException") + .getCount()); + }); + + try (WrappedSimpleClient client = new WrappedSimpleClient("127.0.0.1", 9042)) + { + client.connect(false, true); + + // this should return a failed response + String response = client.write(Unpooled.wrappedBuffer("This is just a test".getBytes(StandardCharsets.UTF_8)), false).toString(); + Assert.assertTrue("Resposne '" + response + "' expected to contain 'Invalid or unsupported protocol version (84); the lowest supported version is 3 and the greatest is 4'", - response.contains("Invalid or unsupported protocol version (84); the lowest supported version is 3 and the greatest is 4")); ++ response.contains("Invalid or unsupported protocol version (84)")); + + node.runOnInstance(() -> { + Util.spinAssertEquals(1L, + () -> CassandraMetricsRegistry.Metrics.getMeters() + .get("org.apache.cassandra.metrics.Client.ProtocolException") + .getCount(), + 10); ++ + Assert.assertEquals(0, CassandraMetricsRegistry.Metrics.getMeters() + .get("org.apache.cassandra.metrics.Client.UnknownException") + .getCount()); + }); + List<String> results = node.logs().grep("Protocol exception with client networking").getResult(); + results.forEach(s -> Assert.assertTrue("Expected logs '" + s + "' to contain: Invalid or unsupported protocol version (84)", + s.contains("Invalid or unsupported protocol version (84)"))); + Assert.assertEquals(1, results.size()); // this logs less offtan than metrics as the log has a nospamlogger wrapper + } + } + } + } diff --cc test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java index 7b56a49,0000000..234435e mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java +++ b/test/unit/org/apache/cassandra/transport/ProtocolVersionTest.java @@@ -1,97 -1,0 +1,96 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport; + +import org.junit.Assert; +import org.junit.Test; + +public class ProtocolVersionTest +{ + @Test + public void testDecode() + { + for (ProtocolVersion version : ProtocolVersion.SUPPORTED) + Assert.assertEquals(version, ProtocolVersion.decode(version.asInt(), ProtocolVersionLimit.SERVER_DEFAULT)); + + for (ProtocolVersion version : ProtocolVersion.UNSUPPORTED) + { // unsupported old versions + try + { + Assert.assertEquals(version, ProtocolVersion.decode(version.asInt(), ProtocolVersionLimit.SERVER_DEFAULT)); + Assert.fail("Expected invalid protocol exception"); + } + catch (ProtocolException ex) + { + Assert.assertNotNull(ex.getForcedProtocolVersion()); + Assert.assertEquals(version, ex.getForcedProtocolVersion()); + } + } + + try + { // unsupported newer version + Assert.assertEquals(null, ProtocolVersion.decode(63, ProtocolVersionLimit.SERVER_DEFAULT)); + Assert.fail("Expected invalid protocol exception"); + } + catch (ProtocolException ex) + { - Assert.assertNotNull(ex.getForcedProtocolVersion()); - Assert.assertEquals(ProtocolVersion.MAX_SUPPORTED_VERSION, ex.getForcedProtocolVersion()); ++ Assert.assertNull(ex.getForcedProtocolVersion()); + } + } + + @Test + public void testSupportedVersions() + { + Assert.assertTrue(ProtocolVersion.supportedVersions().size() >= 2); // at least one OS and one DSE + Assert.assertNotNull(ProtocolVersion.CURRENT); + + Assert.assertFalse(ProtocolVersion.V4.isBeta()); + Assert.assertTrue(ProtocolVersion.V5.isBeta()); + } + + @Test + public void testComparisons() + { + Assert.assertTrue(ProtocolVersion.V1.isSmallerOrEqualTo(ProtocolVersion.V1)); + Assert.assertTrue(ProtocolVersion.V2.isSmallerOrEqualTo(ProtocolVersion.V2)); + Assert.assertTrue(ProtocolVersion.V3.isSmallerOrEqualTo(ProtocolVersion.V3)); + Assert.assertTrue(ProtocolVersion.V4.isSmallerOrEqualTo(ProtocolVersion.V4)); + + Assert.assertTrue(ProtocolVersion.V1.isGreaterOrEqualTo(ProtocolVersion.V1)); + Assert.assertTrue(ProtocolVersion.V2.isGreaterOrEqualTo(ProtocolVersion.V2)); + Assert.assertTrue(ProtocolVersion.V3.isGreaterOrEqualTo(ProtocolVersion.V3)); + Assert.assertTrue(ProtocolVersion.V4.isGreaterOrEqualTo(ProtocolVersion.V4)); + + Assert.assertTrue(ProtocolVersion.V1.isSmallerThan(ProtocolVersion.V2)); + Assert.assertTrue(ProtocolVersion.V2.isSmallerThan(ProtocolVersion.V3)); + Assert.assertTrue(ProtocolVersion.V3.isSmallerThan(ProtocolVersion.V4)); + + Assert.assertFalse(ProtocolVersion.V1.isGreaterThan(ProtocolVersion.V2)); + Assert.assertFalse(ProtocolVersion.V2.isGreaterThan(ProtocolVersion.V3)); + Assert.assertFalse(ProtocolVersion.V3.isGreaterThan(ProtocolVersion.V4)); + + Assert.assertTrue(ProtocolVersion.V4.isGreaterThan(ProtocolVersion.V3)); + Assert.assertTrue(ProtocolVersion.V3.isGreaterThan(ProtocolVersion.V2)); + Assert.assertTrue(ProtocolVersion.V2.isGreaterThan(ProtocolVersion.V1)); + + Assert.assertFalse(ProtocolVersion.V4.isSmallerThan(ProtocolVersion.V3)); + Assert.assertFalse(ProtocolVersion.V3.isSmallerThan(ProtocolVersion.V2)); + Assert.assertFalse(ProtocolVersion.V2.isSmallerThan(ProtocolVersion.V1)); + } +} diff --cc test/unit/org/apache/cassandra/transport/WrappedSimpleClient.java index 0000000,af7c1d7..151d284 mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/transport/WrappedSimpleClient.java +++ b/test/unit/org/apache/cassandra/transport/WrappedSimpleClient.java @@@ -1,0 -1,67 +1,72 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.transport; + + import io.netty.buffer.ByteBuf; + import org.apache.cassandra.config.EncryptionOptions; + import org.apache.cassandra.exceptions.ExceptionCode; + import org.apache.cassandra.transport.messages.ErrorMessage; + + /** + * Enhances {@link SimpleClient} to add custom logic to send to the server. + */ + public class WrappedSimpleClient extends SimpleClient + { - public WrappedSimpleClient(String host, int port, int version, EncryptionOptions.ClientEncryptionOptions encryptionOptions) ++ public WrappedSimpleClient(String host, int port, ProtocolVersion version, EncryptionOptions.ClientEncryptionOptions encryptionOptions) + { + super(host, port, version, encryptionOptions); + } + + public WrappedSimpleClient(String host, int port, EncryptionOptions.ClientEncryptionOptions encryptionOptions) + { + super(host, port, encryptionOptions); + } + - public WrappedSimpleClient(String host, int port, int version) ++ public WrappedSimpleClient(String host, int port, ProtocolVersion version) + { + super(host, port, version); + } + ++ public WrappedSimpleClient(String host, int port, ProtocolVersion version, boolean useBeta, EncryptionOptions.ClientEncryptionOptions encryptionOptions) ++ { ++ super(host, port, version, useBeta, encryptionOptions); ++ } ++ + public WrappedSimpleClient(String host, int port) + { + super(host, port); + } + + public Message.Response write(ByteBuf buffer) throws InterruptedException + { + return write(buffer, true); + } + + public Message.Response write(ByteBuf buffer, boolean awaitCloseOnProtocolError) throws InterruptedException + { + lastWriteFuture = channel.writeAndFlush(buffer); + Message.Response response = responseHandler.responses.take(); + if (awaitCloseOnProtocolError + && response instanceof ErrorMessage && ((ErrorMessage) response).error.code() == ExceptionCode.PROTOCOL_ERROR) + { + // protocol errors shutdown the connection, wait for it to close + connection.channel().closeFuture().awaitUninterruptibly(); + } + return response; + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
