Repository: cassandra
Updated Branches:
  refs/heads/trunk c1f623902 -> 33ab4902a


Remove obsoleted CredentialsMessage

patch by Stefan Podkowinski; reviewed by Jeremiah Jordan for CASSANDRA-13662


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/33ab4902
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/33ab4902
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/33ab4902

Branch: refs/heads/trunk
Commit: 33ab4902a1bef5aa662b80b89b5dd9a318f67db5
Parents: c1f6239
Author: Stefan Podkowinski <stefan.podkowin...@1und1.de>
Authored: Tue Jul 4 10:05:51 2017 +0200
Committer: Stefan Podkowinski <stefan.podkowin...@1und1.de>
Committed: Tue Sep 5 16:11:42 2017 +0200

----------------------------------------------------------------------
 .../org/apache/cassandra/transport/Client.java  |  7 --
 .../org/apache/cassandra/transport/Frame.java   | 37 +++++---
 .../org/apache/cassandra/transport/Message.java |  2 +-
 .../cassandra/transport/SimpleClient.java       |  8 --
 .../transport/messages/CredentialsMessage.java  | 97 --------------------
 .../messages/UnsupportedMessageCodec.java       | 56 +++++++++++
 .../cassandra/transport/ProtocolErrorTest.java  | 21 +++++
 7 files changed, 102 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/33ab4902/src/java/org/apache/cassandra/transport/Client.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Client.java 
b/src/java/org/apache/cassandra/transport/Client.java
index 9a76e03..7fec473 100644
--- a/src/java/org/apache/cassandra/transport/Client.java
+++ b/src/java/org/apache/cassandra/transport/Client.java
@@ -175,13 +175,6 @@ public class Client extends SimpleClient
         {
             return new OptionsMessage();
         }
-        else if (msgType.equals("CREDENTIALS"))
-        {
-            System.err.println("[WARN] CREDENTIALS command is deprecated, use 
AUTHENTICATE instead");
-            CredentialsMessage msg = new CredentialsMessage();
-            msg.credentials.putAll(readCredentials(iter));
-            return msg;
-        }
         else if (msgType.equals("AUTHENTICATE"))
         {
             Map<String, String> credentials = readCredentials(iter);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33ab4902/src/java/org/apache/cassandra/transport/Frame.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Frame.java 
b/src/java/org/apache/cassandra/transport/Frame.java
index 6cd8b1e..41e64f9 100644
--- a/src/java/org/apache/cassandra/transport/Frame.java
+++ b/src/java/org/apache/cassandra/transport/Frame.java
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.util.EnumSet;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.*;
 import io.netty.handler.codec.ByteToMessageDecoder;
@@ -146,8 +148,8 @@ public class Frame
             this.factory = factory;
         }
 
-        @Override
-        protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, 
List<Object> results)
+        @VisibleForTesting
+        Frame decodeFrame(ByteBuf buffer)
         throws Exception
         {
             if (discardingTooLongFrame)
@@ -156,12 +158,12 @@ public class Frame
                 // If we have discarded everything, throw the exception
                 if (bytesToDiscard <= 0)
                     fail();
-                return;
+                return null;
             }
 
             int readableBytes = buffer.readableBytes();
             if (readableBytes == 0)
-                return;
+                return null;
 
             int idx = buffer.readerIndex();
 
@@ -174,7 +176,7 @@ public class Frame
 
             // Wait until we have the complete header
             if (readableBytes < Header.LENGTH)
-                return;
+                return null;
 
             int flags = buffer.getByte(idx++);
             EnumSet<Header.Flag> decodedFlags = Header.Flag.deserialize(flags);
@@ -210,11 +212,11 @@ public class Frame
                 bytesToDiscard = discard(buffer, frameLength);
                 if (bytesToDiscard <= 0)
                     fail();
-                return;
+                return null;
             }
 
             if (buffer.readableBytes() < frameLength)
-                return;
+                return null;
 
             // extract body
             ByteBuf body = buffer.slice(idx, (int) bodyLength);
@@ -223,24 +225,33 @@ public class Frame
             idx += bodyLength;
             buffer.readerIndex(idx);
 
+            return new Frame(new Header(version, decodedFlags, streamId, 
type), body);
+        }
+
+        @Override
+        protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, 
List<Object> results)
+        throws Exception
+        {
+            Frame frame = decodeFrame(buffer);
+            if (frame == null) return;
+
             Attribute<Connection> attrConn = 
ctx.channel().attr(Connection.attributeKey);
             Connection connection = attrConn.get();
             if (connection == null)
             {
                 // First message seen on this channel, attach the connection 
object
-                connection = factory.newConnection(ctx.channel(), version);
+                connection = factory.newConnection(ctx.channel(), 
frame.header.version);
                 attrConn.set(connection);
             }
-            else if (connection.getVersion() != version)
+            else if (connection.getVersion() != frame.header.version)
             {
                 throw ErrorMessage.wrap(
                         new ProtocolException(String.format(
                                 "Invalid message version. Got %s but previous 
messages on this connection had version %s",
-                                version, connection.getVersion())),
-                        streamId);
+                                frame.header.version, 
connection.getVersion())),
+                        frame.header.streamId);
             }
-
-            results.add(new Frame(new Header(version, decodedFlags, streamId, 
type), body));
+            results.add(frame);
         }
 
         private void fail()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33ab4902/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java 
b/src/java/org/apache/cassandra/transport/Message.java
index 1afe910..2da2ca7 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -88,7 +88,7 @@ public abstract class Message
         STARTUP        (1,  Direction.REQUEST,  StartupMessage.codec),
         READY          (2,  Direction.RESPONSE, ReadyMessage.codec),
         AUTHENTICATE   (3,  Direction.RESPONSE, AuthenticateMessage.codec),
-        CREDENTIALS    (4,  Direction.REQUEST,  CredentialsMessage.codec),
+        CREDENTIALS    (4,  Direction.REQUEST,  
UnsupportedMessageCodec.instance),
         OPTIONS        (5,  Direction.REQUEST,  OptionsMessage.codec),
         SUPPORTED      (6,  Direction.RESPONSE, SupportedMessage.codec),
         QUERY          (7,  Direction.REQUEST,  QueryMessage.codec),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33ab4902/src/java/org/apache/cassandra/transport/SimpleClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/SimpleClient.java 
b/src/java/org/apache/cassandra/transport/SimpleClient.java
index c72d6e9..d5148ab 100644
--- a/src/java/org/apache/cassandra/transport/SimpleClient.java
+++ b/src/java/org/apache/cassandra/transport/SimpleClient.java
@@ -44,7 +44,6 @@ import io.netty.util.internal.logging.Slf4JLoggerFactory;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.security.SSLFactory;
-import org.apache.cassandra.transport.messages.CredentialsMessage;
 import org.apache.cassandra.transport.messages.ErrorMessage;
 import org.apache.cassandra.transport.messages.EventMessage;
 import org.apache.cassandra.transport.messages.ExecuteMessage;
@@ -166,13 +165,6 @@ public class SimpleClient implements Closeable
         }
     }
 
-    public void login(Map<String, String> credentials)
-    {
-        CredentialsMessage msg = new CredentialsMessage();
-        msg.credentials.putAll(credentials);
-        execute(msg);
-    }
-
     public ResultMessage execute(String query, ConsistencyLevel consistency)
     {
         return execute(query, Collections.<ByteBuffer>emptyList(), 
consistency);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33ab4902/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java 
b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
deleted file mode 100644
index 764d992..0000000
--- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.transport.messages;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.cassandra.auth.AuthenticatedUser;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.exceptions.AuthenticationException;
-import org.apache.cassandra.metrics.AuthMetrics;
-import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.transport.CBUtil;
-import org.apache.cassandra.transport.Message;
-import org.apache.cassandra.transport.ProtocolException;
-import org.apache.cassandra.transport.ProtocolVersion;
-
-/**
- * Message to indicate that the server is ready to receive requests.
- */
-public class CredentialsMessage extends Message.Request
-{
-    public static final Message.Codec<CredentialsMessage> codec = new 
Message.Codec<CredentialsMessage>()
-    {
-        public CredentialsMessage decode(ByteBuf body, ProtocolVersion version)
-        {
-            if (version.isGreaterThan(ProtocolVersion.V1))
-                throw new ProtocolException("Legacy credentials authentication 
is not supported in " +
-                        "protocol versions > 1. Please use SASL authentication 
via a SaslResponse message");
-
-            Map<String, String> credentials = CBUtil.readStringMap(body);
-            return new CredentialsMessage(credentials);
-        }
-
-        public void encode(CredentialsMessage msg, ByteBuf dest, 
ProtocolVersion version)
-        {
-            CBUtil.writeStringMap(msg.credentials, dest);
-        }
-
-        public int encodedSize(CredentialsMessage msg, ProtocolVersion version)
-        {
-            return CBUtil.sizeOfStringMap(msg.credentials);
-        }
-    };
-
-    public final Map<String, String> credentials;
-
-    public CredentialsMessage()
-    {
-        this(new HashMap<String, String>());
-    }
-
-    private CredentialsMessage(Map<String, String> credentials)
-    {
-        super(Message.Type.CREDENTIALS);
-        this.credentials = credentials;
-    }
-
-    public Message.Response execute(QueryState state, long queryStartNanoTime)
-    {
-        try
-        {
-            AuthenticatedUser user = 
DatabaseDescriptor.getAuthenticator().legacyAuthenticate(credentials);
-            state.getClientState().login(user);
-            AuthMetrics.instance.markSuccess();
-        }
-        catch (AuthenticationException e)
-        {
-            AuthMetrics.instance.markFailure();
-            return ErrorMessage.fromException(e);
-        }
-
-        return new ReadyMessage();
-    }
-
-    @Override
-    public String toString()
-    {
-        return "CREDENTIALS";
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33ab4902/src/java/org/apache/cassandra/transport/messages/UnsupportedMessageCodec.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/transport/messages/UnsupportedMessageCodec.java 
b/src/java/org/apache/cassandra/transport/messages/UnsupportedMessageCodec.java
new file mode 100644
index 0000000..563e5d6
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/transport/messages/UnsupportedMessageCodec.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.transport.messages;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.cassandra.transport.Message;
+import org.apache.cassandra.transport.ProtocolException;
+import org.apache.cassandra.transport.ProtocolVersion;
+
+/**
+ * Catch-all codec for any unsupported legacy messages.
+ */
+public class UnsupportedMessageCodec <T extends Message> implements 
Message.Codec<T>
+{
+    public final static UnsupportedMessageCodec instance = new 
UnsupportedMessageCodec();
+
+    private static final Logger logger = 
LoggerFactory.getLogger(UnsupportedMessageCodec.class);
+
+    public T decode(ByteBuf body, ProtocolVersion version)
+    {
+        if (ProtocolVersion.SUPPORTED.contains(version))
+        {
+            logger.error("Received invalid message for supported protocol 
version {}", version);
+        }
+        throw new ProtocolException("Unsupported message");
+    }
+
+    public void encode(T t, ByteBuf dest, ProtocolVersion version)
+    {
+        throw new ProtocolException("Unsupported message");
+    }
+
+    public int encodedSize(T t, ProtocolVersion version)
+    {
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/33ab4902/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java 
b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
index 5e9731a..26b3d96 100644
--- a/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
+++ b/test/unit/org/apache/cassandra/transport/ProtocolErrorTest.java
@@ -172,4 +172,25 @@ public class ProtocolErrorTest {
 
         Assert.assertEquals(expected, buf);
     }
+
+    @Test
+    public void testUnsupportedMessage() throws Exception
+    {
+        byte[] incomingFrame = new byte[] {
+        (byte) REQUEST.addToVersion(ProtocolVersion.CURRENT.asInt()),  // 
direction & version
+        0x00,  // flags
+        0x00, 0x01,  // stream ID
+        0x04,  // opcode for obsoleted CREDENTIALS message
+        0x00, (byte) 0x00, (byte) 0x00, (byte) 0x10,  // body length
+        };
+        byte[] body = new byte[0x10];
+        ByteBuf buf = Unpooled.wrappedBuffer(incomingFrame, body);
+        Frame decodedFrame = new Frame.Decoder(null).decodeFrame(buf);
+        try {
+            decodedFrame.header.type.codec.decode(decodedFrame.body, 
decodedFrame.header.version);
+            Assert.fail("Expected protocol error");
+        } catch (ProtocolException e) {
+            Assert.assertTrue(e.getMessage().contains("Unsupported message"));
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to