This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new 9c50b1f  Failure to execute queries should emit a KPI other than read 
timeout/unavailable so it can be alerted/tracked
9c50b1f is described below

commit 9c50b1f9a12a95b55851cc52d4b66440f9fafaea
Author: David Capwell <[email protected]>
AuthorDate: Tue Jun 1 15:06:48 2021 -0700

    Failure to execute queries should emit a KPI other than read 
timeout/unavailable so it can be alerted/tracked
    
    patch by David Capwell; reviewed by Sam Tunnicliffe for CASSANDRA-16581
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/metrics/ClientMetrics.java    | 16 ++++
 src/java/org/apache/cassandra/transport/Frame.java | 19 ++++-
 .../org/apache/cassandra/transport/Message.java    | 31 +++++++-
 .../cassandra/transport/ProtocolException.java     | 24 ++++++
 .../apache/cassandra/transport/SimpleClient.java   | 15 +++-
 .../org/apache/cassandra/utils/Throwables.java     | 11 +++
 .../test/UnableToParseClientMessageTest.java       | 88 ++++++++++++++++++++++
 .../cassandra/transport/MessagePayloadTest.java    | 11 +++
 .../cassandra/transport/WrappedSimpleClient.java   | 67 ++++++++++++++++
 10 files changed, 275 insertions(+), 8 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 99e53e9..923337e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.25:
+ * Failure to execute queries should emit a KPI other than read 
timeout/unavailable so it can be alerted/tracked (CASSANDRA-16581)
  * Don't wait on schema versions from replacement target when replacing 
(CASSANDRA-16692)
  * StandaloneVerifier does not fail when unable to verify SSTables, it only 
fails if Corruption is thrown (CASSANDRA-16683)
  * Fix bloom filter false ratio calculation by including true negatives 
(CASSANDRA-15834)
diff --git a/src/java/org/apache/cassandra/metrics/ClientMetrics.java 
b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
index 08f0531..e034be8 100644
--- a/src/java/org/apache/cassandra/metrics/ClientMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientMetrics.java
@@ -43,6 +43,9 @@ public class ClientMetrics
     private Gauge<Integer> pausedConnectionsGauge;
     private Meter requestDiscarded;
 
+    private Meter protocolException;
+    private Meter unknownException;
+
     private ClientMetrics()
     {
     }
@@ -51,6 +54,16 @@ public class ClientMetrics
     public void unpauseConnection() { pausedConnections.decrementAndGet(); }
     public void markRequestDiscarded() { requestDiscarded.mark(); }
 
+    public void markProtocolException()
+    {
+        protocolException.mark();
+    }
+
+    public void markUnknownException()
+    {
+        unknownException.mark();
+    }
+
     public synchronized void init(Collection<Server> servers)
     {
         if (initialized)
@@ -64,6 +77,9 @@ public class ClientMetrics
         pausedConnectionsGauge = registerGauge("PausedConnections", 
pausedConnections::get);
         requestDiscarded = registerMeter("RequestDiscarded");
 
+        protocolException = registerMeter("ProtocolException");
+        unknownException = registerMeter("UnknownException");
+
         initialized = true;
     }
 
diff --git a/src/java/org/apache/cassandra/transport/Frame.java 
b/src/java/org/apache/cassandra/transport/Frame.java
index a07551f..ed9e3dc 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -35,6 +35,9 @@ public class Frame
 {
     public static final byte PROTOCOL_VERSION_MASK = 0x7f;
 
+    /** These versions are sent by some clients, but are not valid Apache 
Cassandra versions (66, and 65 are DSE versions) */
+    private static int[] KNOWN_INVALID_VERSIONS = { 66, 65};
+
     public final Header header;
     public final ByteBuf body;
 
@@ -177,9 +180,15 @@ public class Frame
             int firstByte = buffer.getByte(idx++);
             Message.Direction direction = 
Message.Direction.extractFromVersion(firstByte);
             int version = firstByte & PROTOCOL_VERSION_MASK;
+            for (int dseVersion : KNOWN_INVALID_VERSIONS)
+            {
+                if (dseVersion == version)
+                    throw ProtocolException.toSilentException(new 
ProtocolException(invalidVersionMessage(version)));
+            }
             if (version < Server.MIN_SUPPORTED_VERSION || version > 
versionCap.getMaxVersion())
-                throw new ProtocolException(String.format("Invalid or 
unsupported protocol version (%d); the lowest supported version is %d and the 
greatest is %d",
-                                                          version, 
Server.MIN_SUPPORTED_VERSION, versionCap.getMaxVersion()),
+                throw new ProtocolException(invalidVersionMessage(version),
+                                            // only override the version IFF 
the version is less than the min supported, as this is relativly safe since 
older versions were the same up to v3.
+                                            // in the case where version is 
greater than, it isn't known if the protocol has changed, so reply back normally
                                             version < 
Server.MIN_SUPPORTED_VERSION ? version : null);
 
             // Wait until we have the complete header
@@ -247,6 +256,12 @@ public class Frame
             results.add(new Frame(new Header(version, flags, streamId, type, 
bodyLength), body));
         }
 
+        private String invalidVersionMessage(int version)
+        {
+            return String.format("Invalid or unsupported protocol version 
(%d); the lowest supported version is %d and the greatest is %d",
+                                 version, Server.MIN_SUPPORTED_VERSION, 
versionCap.getMaxVersion());
+        }
+
         private void fail()
         {
             // Reset to the initial state and throw the exception
diff --git a/src/java/org/apache/cassandra/transport/Message.java 
b/src/java/org/apache/cassandra/transport/Message.java
index a88540b..3c2f38c 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -51,6 +51,8 @@ import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.transport.messages.*;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.Throwables;
 
 import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
 
@@ -743,14 +745,14 @@ public abstract class Message
         @Override
         public void exceptionCaught(final ChannelHandlerContext ctx, Throwable 
cause)
         {
-            // Provide error message to client in case channel is still open
-            UnexpectedChannelExceptionHandler handler = new 
UnexpectedChannelExceptionHandler(ctx.channel(), false);
-            ErrorMessage errorMessage = ErrorMessage.fromException(cause, 
handler);
             if (ctx.channel().isOpen())
             {
+                // Provide error message to client in case channel is still 
open
+                UnexpectedChannelExceptionHandler handler = new 
UnexpectedChannelExceptionHandler(ctx.channel(), false);
+                ErrorMessage errorMessage = ErrorMessage.fromException(cause, 
handler);
                 ChannelFuture future = ctx.writeAndFlush(errorMessage);
                 // On protocol exception, close the channel as soon as the 
message have been sent
-                if (cause instanceof ProtocolException)
+                if (isFatal(cause))
                 {
                     future.addListener(new ChannelFutureListener()
                     {
@@ -761,6 +763,27 @@ public abstract class Message
                     });
                 }
             }
+            if (Throwables.anyCauseMatches(cause, t -> t instanceof 
ProtocolException))
+            {
+                // if any ProtocolExceptions is not silent, then handle
+                if (Throwables.anyCauseMatches(cause, t -> t instanceof 
ProtocolException && !((ProtocolException) t).isSilent()))
+                {
+                    ClientMetrics.instance.markProtocolException();
+                    // since protocol exceptions are expected to be client 
issues, not logging stack trace
+                    // to avoid spamming the logs once a bad client shows up
+                    NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, 
TimeUnit.MINUTES, "Protocol exception with client networking: " + 
cause.getMessage());
+                }
+            }
+            else
+            {
+                ClientMetrics.instance.markUnknownException();
+                logger.warn("Unknown exception in client networking", cause);
+            }
+        }
+
+        private static boolean isFatal(Throwable cause)
+        {
+            return cause instanceof ProtocolException;
         }
     }
 
diff --git a/src/java/org/apache/cassandra/transport/ProtocolException.java 
b/src/java/org/apache/cassandra/transport/ProtocolException.java
index a589e9b..0b307a9 100644
--- a/src/java/org/apache/cassandra/transport/ProtocolException.java
+++ b/src/java/org/apache/cassandra/transport/ProtocolException.java
@@ -52,4 +52,28 @@ public class ProtocolException extends RuntimeException 
implements TransportExce
     {
         return attemptedLowProtocolVersion;
     }
+
+    public boolean isSilent()
+    {
+        return false;
+    }
+
+    public static ProtocolException toSilentException(ProtocolException e)
+    {
+        return new Silent(e);
+    }
+
+    private static class Silent extends ProtocolException
+    {
+        public Silent(ProtocolException cause)
+        {
+            super(cause.getMessage(), cause.attemptedLowProtocolVersion);
+        }
+
+        @Override
+        public boolean isSilent()
+        {
+            return true;
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java 
b/src/java/org/apache/cassandra/transport/SimpleClient.java
index 7d34d98..40423c3 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -35,11 +35,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.handler.ssl.SslHandler;
 import io.netty.util.internal.logging.InternalLoggerFactory;
 import io.netty.util.internal.logging.Slf4JLoggerFactory;
 import org.apache.cassandra.cql3.QueryOptions;
@@ -309,7 +314,7 @@ public class SimpleClient implements Closeable
     }
 
     @ChannelHandler.Sharable
-    private static class ResponseHandler extends 
SimpleChannelInboundHandler<Message.Response>
+    static class ResponseHandler extends 
SimpleChannelInboundHandler<Message.Response>
     {
         public final BlockingQueue<Message.Response> responses = new 
SynchronousQueue<>(true);
         public EventHandler eventHandler;
@@ -333,11 +338,17 @@ public class SimpleClient implements Closeable
             }
         }
 
+        @Override
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception
         {
             if (this == ctx.pipeline().last())
+            {
                 logger.error("Exception in response", cause);
-            ctx.fireExceptionCaught(cause);
+            }
+            else
+            {
+                ctx.fireExceptionCaught(cause);
+            }
         }
     }
 }
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java 
b/src/java/org/apache/cassandra/utils/Throwables.java
index 5ad9686..1b71098 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Optional;
+import java.util.function.Predicate;
 import java.util.stream.Stream;
 
 import org.apache.cassandra.io.FSReadError;
@@ -37,6 +38,16 @@ public final class Throwables
         void perform() throws E;
     }
 
+    public static boolean anyCauseMatches(Throwable t, Predicate<Throwable> 
cause)
+    {
+        do
+        {
+            if (cause.test(t))
+                return true;
+        } while ((t = t.getCause()) != null);
+        return false;
+    }
+
     public static <T extends Throwable> T merge(T existingFail, T newFail)
     {
         if (existingFail == null)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageTest.java
new file mode 100644
index 0000000..75cc905
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/UnableToParseClientMessageTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import io.netty.buffer.Unpooled;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.metrics.CassandraMetricsRegistry;
+import org.apache.cassandra.transport.WrappedSimpleClient;
+
+/**
+ * If a client sends a message that can not be parsed by the server then we 
need to detect this and update metrics
+ * for monitoring.
+ *
+ * An issue was found between 2.1 to 3.0 upgrades with regards to paging 
serialization. Since
+ * this is a serialization issue we hit similar paths by sending bad bytes to 
the server, so can simulate the mixed-mode
+ * paging issue without needing to send proper messages.
+ */
+public class UnableToParseClientMessageTest extends TestBaseImpl
+{
+    @Test
+    public void badMessageCausesProtocolException() throws IOException, 
InterruptedException
+    {
+        try (Cluster cluster = init(Cluster.build(1).withConfig(c -> 
c.with(Feature.values())).start()))
+        {
+            // write gibberish to the native protocol
+            IInvokableInstance node = cluster.get(1);
+            // make sure everything is fine at the start
+            node.runOnInstance(() -> {
+                Assert.assertEquals(0, 
CassandraMetricsRegistry.Metrics.getMeters()
+                                                                       
.get("org.apache.cassandra.metrics.Client.ProtocolException")
+                                                                       
.getCount());
+                Assert.assertEquals(0, 
CassandraMetricsRegistry.Metrics.getMeters()
+                                                                       
.get("org.apache.cassandra.metrics.Client.UnknownException")
+                                                                       
.getCount());
+            });
+
+            try (WrappedSimpleClient client = new 
WrappedSimpleClient("127.0.0.1", 9042))
+            {
+                client.connect(false, true);
+
+                // this should return a failed response
+                String response = client.write(Unpooled.wrappedBuffer("This is 
just a test".getBytes(StandardCharsets.UTF_8)), false).toString();
+                Assert.assertTrue("Resposne '" + response + "' expected to 
contain 'Invalid or unsupported protocol version (84); the lowest supported 
version is 3 and the greatest is 4'",
+                                  response.contains("Invalid or unsupported 
protocol version (84); the lowest supported version is 3 and the greatest is 
4"));
+
+                node.runOnInstance(() -> {
+                    Util.spinAssertEquals(1L,
+                                          () -> 
CassandraMetricsRegistry.Metrics.getMeters()
+                                                                               
 .get("org.apache.cassandra.metrics.Client.ProtocolException")
+                                                                               
 .getCount(),
+                                          10);
+                    Assert.assertEquals(0, 
CassandraMetricsRegistry.Metrics.getMeters()
+                                                                           
.get("org.apache.cassandra.metrics.Client.UnknownException")
+                                                                           
.getCount());
+                });
+                List<String> results = node.logs().grep("Protocol exception 
with client networking").getResult();
+                results.forEach(s -> Assert.assertTrue("Expected logs '" + s + 
"' to contain: Invalid or unsupported protocol version (84)",
+                                                       s.contains("Invalid or 
unsupported protocol version (84)")));
+                Assert.assertEquals(1, results.size()); // this logs less 
offtan than metrics as the log has a nospamlogger wrapper
+            }
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java 
b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
index 865a173..e88126f 100644
--- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
+++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
@@ -218,6 +218,10 @@ public class MessagePayloadTest extends CQLTester
                 {
                     Assert.assertTrue(e.getCause() instanceof 
ProtocolException);
                 }
+                // when a protocol exception is thrown by the server the 
connection is closed, so need to re-connect
+                client.close();
+                client.connect(false);
+
                 queryMessage.setCustomPayload(null);
                 client.execute(queryMessage);
 
@@ -233,6 +237,10 @@ public class MessagePayloadTest extends CQLTester
                 {
                     Assert.assertTrue(e.getCause() instanceof 
ProtocolException);
                 }
+                // when a protocol exception is thrown by the server the 
connection is closed, so need to re-connect
+                client.close();
+                client.connect(false);
+
                 prepareMessage.setCustomPayload(null);
                 ResultMessage.Prepared prepareResponse = 
(ResultMessage.Prepared) client.execute(prepareMessage);
 
@@ -249,6 +257,9 @@ public class MessagePayloadTest extends CQLTester
                 {
                     Assert.assertTrue(e.getCause() instanceof 
ProtocolException);
                 }
+                // when a protocol exception is thrown by the server the 
connection is closed, so need to re-connect
+                client.close();
+                client.connect(false);
 
                 BatchMessage batchMessage = new 
BatchMessage(BatchStatement.Type.UNLOGGED,
                                                              
Collections.<Object>singletonList("INSERT INTO " + KEYSPACE + ".atable (pk,v) 
VALUES (1, 'foo')"),
diff --git a/test/unit/org/apache/cassandra/transport/WrappedSimpleClient.java 
b/test/unit/org/apache/cassandra/transport/WrappedSimpleClient.java
new file mode 100644
index 0000000..af7c1d7
--- /dev/null
+++ b/test/unit/org/apache/cassandra/transport/WrappedSimpleClient.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.transport;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.exceptions.ExceptionCode;
+import org.apache.cassandra.transport.messages.ErrorMessage;
+
+/**
+ * Enhances {@link SimpleClient} to add custom logic to send to the server.
+ */
+public class WrappedSimpleClient extends SimpleClient
+{
+    public WrappedSimpleClient(String host, int port, int version, 
EncryptionOptions.ClientEncryptionOptions encryptionOptions)
+    {
+        super(host, port, version, encryptionOptions);
+    }
+
+    public WrappedSimpleClient(String host, int port, 
EncryptionOptions.ClientEncryptionOptions encryptionOptions)
+    {
+        super(host, port, encryptionOptions);
+    }
+
+    public WrappedSimpleClient(String host, int port, int version)
+    {
+        super(host, port, version);
+    }
+
+    public WrappedSimpleClient(String host, int port)
+    {
+        super(host, port);
+    }
+
+    public Message.Response write(ByteBuf buffer) throws InterruptedException
+    {
+        return write(buffer, true);
+    }
+
+    public Message.Response write(ByteBuf buffer, boolean 
awaitCloseOnProtocolError) throws InterruptedException
+    {
+        lastWriteFuture = channel.writeAndFlush(buffer);
+        Message.Response response = responseHandler.responses.take();
+        if (awaitCloseOnProtocolError
+            && response instanceof ErrorMessage && ((ErrorMessage) 
response).error.code() == ExceptionCode.PROTOCOL_ERROR)
+        {
+            // protocol errors shutdown the connection, wait for it to close
+            connection.channel().closeFuture().awaitUninterruptibly();
+        }
+        return response;
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to