This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new 9c50b1f Failure to execute queries should emit a KPI other than read
timeout/unavailable so it can be alerted/tracked
9c50b1f is described below
commit 9c50b1f9a12a95b55851cc52d4b66440f9fafaea
Author: David Capwell <[email protected]>
AuthorDate: Tue Jun 1 15:06:48 2021 -0700
Failure to execute queries should emit a KPI other than read
timeout/unavailable so it can be alerted/tracked
patch by David Capwell; reviewed by Sam Tunnicliffe for CASSANDRA-16581
---
CHANGES.txt | 1 +
.../apache/cassandra/metrics/ClientMetrics.java | 16 ++++
src/java/org/apache/cassandra/transport/Frame.java | 19 ++++-
.../org/apache/cassandra/transport/Message.java | 31 +++++++-
.../cassandra/transport/ProtocolException.java | 24 ++++++
.../apache/cassandra/transport/SimpleClient.java | 15 +++-
.../org/apache/cassandra/utils/Throwables.java | 11 +++
.../test/UnableToParseClientMessageTest.java | 88 ++++++++++++++++++++++
.../cassandra/transport/MessagePayloadTest.java | 11 +++
.../cassandra/transport/WrappedSimpleClient.java | 67 ++++++++++++++++
10 files changed, 275 insertions(+), 8 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 99e53e9..923337e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.25:
+ * Failure to execute queries should emit a KPI other than read
timeout/unavailable so it can be alerted/tracked (CASSANDRA-16581)
* Don't wait on schema versions from replacement target when replacing
(CASSANDRA-16692)
* StandaloneVerifier does not fail when unable to verify SSTables, it only
fails if Corruption is thrown (CASSANDRA-16683)
* Fix bloom filter false ratio calculation by including true negatives
(CASSANDRA-15834)
diff --git a/src/java/org/apache/cassandra/metrics/ClientMetrics.java
b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
index 08f0531..e034be8 100644
--- a/src/java/org/apache/cassandra/metrics/ClientMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
@@ -43,6 +43,9 @@ public class ClientMetrics
private Gauge<Integer> pausedConnectionsGauge;
private Meter requestDiscarded;
+ private Meter protocolException;
+ private Meter unknownException;
+
private ClientMetrics()
{
}
@@ -51,6 +54,16 @@ public class ClientMetrics
public void unpauseConnection() { pausedConnections.decrementAndGet(); }
public void markRequestDiscarded() { requestDiscarded.mark(); }
+ public void markProtocolException()
+ {
+ protocolException.mark();
+ }
+
+ public void markUnknownException()
+ {
+ unknownException.mark();
+ }
+
public synchronized void init(Collection<Server> servers)
{
if (initialized)
@@ -64,6 +77,9 @@ public class ClientMetrics
pausedConnectionsGauge = registerGauge("PausedConnections",
pausedConnections::get);
requestDiscarded = registerMeter("RequestDiscarded");
+ protocolException = registerMeter("ProtocolException");
+ unknownException = registerMeter("UnknownException");
+
initialized = true;
}
diff --git a/src/java/org/apache/cassandra/transport/Frame.java
b/src/java/org/apache/cassandra/transport/Frame.java
index a07551f..ed9e3dc 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -35,6 +35,9 @@ public class Frame
{
public static final byte PROTOCOL_VERSION_MASK = 0x7f;
+ /** These versions are sent by some clients, but are not valid Apache
Cassandra versions (66, and 65 are DSE versions) */
+ private static int[] KNOWN_INVALID_VERSIONS = { 66, 65};
+
public final Header header;
public final ByteBuf body;
@@ -177,9 +180,15 @@ public class Frame
int firstByte = buffer.getByte(idx++);
Message.Direction direction =
Message.Direction.extractFromVersion(firstByte);
int version = firstByte & PROTOCOL_VERSION_MASK;
+ for (int dseVersion : KNOWN_INVALID_VERSIONS)
+ {
+ if (dseVersion == version)
+ throw ProtocolException.toSilentException(new
ProtocolException(invalidVersionMessage(version)));
+ }
if (version < Server.MIN_SUPPORTED_VERSION || version >
versionCap.getMaxVersion())
- throw new ProtocolException(String.format("Invalid or
unsupported protocol version (%d); the lowest supported version is %d and the
greatest is %d",
- version,
Server.MIN_SUPPORTED_VERSION, versionCap.getMaxVersion()),
+ throw new ProtocolException(invalidVersionMessage(version),
+ // only override the version IFF
the version is less than the min supported, as this is relativly safe since
older versions were the same up to v3.
+ // in the case where version is
greater than, it isn't known if the protocol has changed, so reply back normally
version <
Server.MIN_SUPPORTED_VERSION ? version : null);
// Wait until we have the complete header
@@ -247,6 +256,12 @@ public class Frame
results.add(new Frame(new Header(version, flags, streamId, type,
bodyLength), body));
}
+ private String invalidVersionMessage(int version)
+ {
+ return String.format("Invalid or unsupported protocol version
(%d); the lowest supported version is %d and the greatest is %d",
+ version, Server.MIN_SUPPORTED_VERSION,
versionCap.getMaxVersion());
+ }
+
private void fail()
{
// Reset to the initial state and throw the exception
diff --git a/src/java/org/apache/cassandra/transport/Message.java
b/src/java/org/apache/cassandra/transport/Message.java
index a88540b..3c2f38c 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -51,6 +51,8 @@ import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.transport.messages.*;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.Throwables;
import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
@@ -743,14 +745,14 @@ public abstract class Message
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, Throwable
cause)
{
- // Provide error message to client in case channel is still open
- UnexpectedChannelExceptionHandler handler = new
UnexpectedChannelExceptionHandler(ctx.channel(), false);
- ErrorMessage errorMessage = ErrorMessage.fromException(cause,
handler);
if (ctx.channel().isOpen())
{
+ // Provide error message to client in case channel is still
open
+ UnexpectedChannelExceptionHandler handler = new
UnexpectedChannelExceptionHandler(ctx.channel(), false);
+ ErrorMessage errorMessage = ErrorMessage.fromException(cause,
handler);
ChannelFuture future = ctx.writeAndFlush(errorMessage);
// On protocol exception, close the channel as soon as the
message have been sent
- if (cause instanceof ProtocolException)
+ if (isFatal(cause))
{
future.addListener(new ChannelFutureListener()
{
@@ -761,6 +763,27 @@ public abstract class Message
});
}
}
+ if (Throwables.anyCauseMatches(cause, t -> t instanceof
ProtocolException))
+ {
+ // if any ProtocolExceptions is not silent, then handle
+ if (Throwables.anyCauseMatches(cause, t -> t instanceof
ProtocolException && !((ProtocolException) t).isSilent()))
+ {
+ ClientMetrics.instance.markProtocolException();
+ // since protocol exceptions are expected to be client
issues, not logging stack trace
+ // to avoid spamming the logs once a bad client shows up
+ NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1,
TimeUnit.MINUTES, "Protocol exception with client networking: " +
cause.getMessage());
+ }
+ }
+ else
+ {
+ ClientMetrics.instance.markUnknownException();
+ logger.warn("Unknown exception in client networking", cause);
+ }
+ }
+
+ private static boolean isFatal(Throwable cause)
+ {
+ return cause instanceof ProtocolException;
}
}
diff --git a/src/java/org/apache/cassandra/transport/ProtocolException.java
b/src/java/org/apache/cassandra/transport/ProtocolException.java
index a589e9b..0b307a9 100644
--- a/src/java/org/apache/cassandra/transport/ProtocolException.java
+++ b/src/java/org/apache/cassandra/transport/ProtocolException.java
@@ -52,4 +52,28 @@ public class ProtocolException extends RuntimeException
implements TransportExce
{
return attemptedLowProtocolVersion;
}
+
+ public boolean isSilent()
+ {
+ return false;
+ }
+
+ public static ProtocolException toSilentException(ProtocolException e)
+ {
+ return new Silent(e);
+ }
+
+ private static class Silent extends ProtocolException
+ {
+ public Silent(ProtocolException cause)
+ {
+ super(cause.getMessage(), cause.attemptedLowProtocolVersion);
+ }
+
+ @Override
+ public boolean isSilent()
+ {
+ return true;
+ }
+ }
}
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java
b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 7d34d98..40423c3 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -35,11 +35,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.handler.ssl.SslHandler;
import io.netty.util.internal.logging.InternalLoggerFactory;
import io.netty.util.internal.logging.Slf4JLoggerFactory;
import org.apache.cassandra.cql3.QueryOptions;
@@ -309,7 +314,7 @@ public class SimpleClient implements Closeable
}
@ChannelHandler.Sharable
- private static class ResponseHandler extends
SimpleChannelInboundHandler<Message.Response>
+ static class ResponseHandler extends
SimpleChannelInboundHandler<Message.Response>
{
public final BlockingQueue<Message.Response> responses = new
SynchronousQueue<>(true);
public EventHandler eventHandler;
@@ -333,11 +338,17 @@ public class SimpleClient implements Closeable
}
}
+ @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) throws Exception
{
if (this == ctx.pipeline().last())
+ {
logger.error("Exception in response", cause);
- ctx.fireExceptionCaught(cause);
+ }
+ else
+ {
+ ctx.fireExceptionCaught(cause);
+ }
}
}
}
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java
b/src/java/org/apache/cassandra/utils/Throwables.java
index 5ad9686..1b71098 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
+import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.cassandra.io.FSReadError;
@@ -37,6 +38,16 @@ public final class Throwables
void perform() throws E;
}
+ public static boolean anyCauseMatches(Throwable t, Predicate<Throwable>
cause)
+ {
+ do
+ {
+ if (cause.test(t))
+ return true;
+ } while ((t = t.getCause()) != null);
+ return false;
+ }
+
public static <T extends Throwable> T merge(T existingFail, T newFail)
{
if (existingFail == null)
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageTest.java
b/test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageTest.java
new file mode 100644
index 0000000..75cc905
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.netty.buffer.Unpooled;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.metrics.CassandraMetricsRegistry;
+import org.apache.cassandra.transport.WrappedSimpleClient;
+
+/**
+ * If a client sends a message that can not be parsed by the server then we
need to detect this and update metrics
+ * for monitoring.
+ *
+ * An issue was found between 2.1 to 3.0 upgrades with regards to paging
serialization. Since
+ * this is a serialization issue we hit similar paths by sending bad bytes to
the server, so can simulate the mixed-mode
+ * paging issue without needing to send proper messages.
+ */
+public class UnableToParseClientMessageTest extends TestBaseImpl
+{
+ @Test
+ public void badMessageCausesProtocolException() throws IOException,
InterruptedException
+ {
+ try (Cluster cluster = init(Cluster.build(1).withConfig(c ->
c.with(Feature.values())).start()))
+ {
+ // write gibberish to the native protocol
+ IInvokableInstance node = cluster.get(1);
+ // make sure everything is fine at the start
+ node.runOnInstance(() -> {
+ Assert.assertEquals(0,
CassandraMetricsRegistry.Metrics.getMeters()
+
.get("org.apache.cassandra.metrics.Client.ProtocolException")
+
.getCount());
+ Assert.assertEquals(0,
CassandraMetricsRegistry.Metrics.getMeters()
+
.get("org.apache.cassandra.metrics.Client.UnknownException")
+
.getCount());
+ });
+
+ try (WrappedSimpleClient client = new
WrappedSimpleClient("127.0.0.1", 9042))
+ {
+ client.connect(false, true);
+
+ // this should return a failed response
+ String response = client.write(Unpooled.wrappedBuffer("This is
just a test".getBytes(StandardCharsets.UTF_8)), false).toString();
+ Assert.assertTrue("Resposne '" + response + "' expected to
contain 'Invalid or unsupported protocol version (84); the lowest supported
version is 3 and the greatest is 4'",
+ response.contains("Invalid or unsupported
protocol version (84); the lowest supported version is 3 and the greatest is
4"));
+
+ node.runOnInstance(() -> {
+ Util.spinAssertEquals(1L,
+ () ->
CassandraMetricsRegistry.Metrics.getMeters()
+
.get("org.apache.cassandra.metrics.Client.ProtocolException")
+
.getCount(),
+ 10);
+ Assert.assertEquals(0,
CassandraMetricsRegistry.Metrics.getMeters()
+
.get("org.apache.cassandra.metrics.Client.UnknownException")
+
.getCount());
+ });
+ List<String> results = node.logs().grep("Protocol exception
with client networking").getResult();
+ results.forEach(s -> Assert.assertTrue("Expected logs '" + s +
"' to contain: Invalid or unsupported protocol version (84)",
+ s.contains("Invalid or
unsupported protocol version (84)")));
+ Assert.assertEquals(1, results.size()); // this logs less
offtan than metrics as the log has a nospamlogger wrapper
+ }
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
index 865a173..e88126f 100644
--- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
+++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
@@ -218,6 +218,10 @@ public class MessagePayloadTest extends CQLTester
{
Assert.assertTrue(e.getCause() instanceof
ProtocolException);
}
+ // when a protocol exception is thrown by the server the
connection is closed, so need to re-connect
+ client.close();
+ client.connect(false);
+
queryMessage.setCustomPayload(null);
client.execute(queryMessage);
@@ -233,6 +237,10 @@ public class MessagePayloadTest extends CQLTester
{
Assert.assertTrue(e.getCause() instanceof
ProtocolException);
}
+ // when a protocol exception is thrown by the server the
connection is closed, so need to re-connect
+ client.close();
+ client.connect(false);
+
prepareMessage.setCustomPayload(null);
ResultMessage.Prepared prepareResponse =
(ResultMessage.Prepared) client.execute(prepareMessage);
@@ -249,6 +257,9 @@ public class MessagePayloadTest extends CQLTester
{
Assert.assertTrue(e.getCause() instanceof
ProtocolException);
}
+ // when a protocol exception is thrown by the server the
connection is closed, so need to re-connect
+ client.close();
+ client.connect(false);
BatchMessage batchMessage = new
BatchMessage(BatchStatement.Type.UNLOGGED,
Collections.<Object>singletonList("INSERT INTO " + KEYSPACE + ".atable (pk,v)
VALUES (1, 'foo')"),
diff --git a/test/unit/org/apache/cassandra/transport/WrappedSimpleClient.java
b/test/unit/org/apache/cassandra/transport/WrappedSimpleClient.java
new file mode 100644
index 0000000..af7c1d7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/WrappedSimpleClient.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.exceptions.ExceptionCode;
+import org.apache.cassandra.transport.messages.ErrorMessage;
+
+/**
+ * Enhances {@link SimpleClient} to add custom logic to send to the server.
+ */
+public class WrappedSimpleClient extends SimpleClient
+{
+ public WrappedSimpleClient(String host, int port, int version,
EncryptionOptions.ClientEncryptionOptions encryptionOptions)
+ {
+ super(host, port, version, encryptionOptions);
+ }
+
+ public WrappedSimpleClient(String host, int port,
EncryptionOptions.ClientEncryptionOptions encryptionOptions)
+ {
+ super(host, port, encryptionOptions);
+ }
+
+ public WrappedSimpleClient(String host, int port, int version)
+ {
+ super(host, port, version);
+ }
+
+ public WrappedSimpleClient(String host, int port)
+ {
+ super(host, port);
+ }
+
+ public Message.Response write(ByteBuf buffer) throws InterruptedException
+ {
+ return write(buffer, true);
+ }
+
+ public Message.Response write(ByteBuf buffer, boolean
awaitCloseOnProtocolError) throws InterruptedException
+ {
+ lastWriteFuture = channel.writeAndFlush(buffer);
+ Message.Response response = responseHandler.responses.take();
+ if (awaitCloseOnProtocolError
+ && response instanceof ErrorMessage && ((ErrorMessage)
response).error.code() == ExceptionCode.PROTOCOL_ERROR)
+ {
+ // protocol errors shutdown the connection, wait for it to close
+ connection.channel().closeFuture().awaitUninterruptibly();
+ }
+ return response;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]